javascript
并发–执行程序和Spring集成
比原始線程版本更好的方法是基于線程池的線程池,其中基于運(yùn)行任務(wù)的系統(tǒng)定義了適當(dāng)?shù)木€程池大小– CPU數(shù)量/(任務(wù)的1-Blocking Coefficient)。 Venkat Subramaniams書中有更多詳細(xì)信息:
首先,在給定“報(bào)告部件請求”的情況下,我定義了一個自定義任務(wù)來生成“報(bào)告部件”,將其實(shí)現(xiàn)為Callable :
public class ReportPartRequestCallable implements Callable<ReportPart> {private final ReportRequestPart reportRequestPart;private final ReportPartGenerator reportPartGenerator;public ReportPartRequestCallable(ReportRequestPart reportRequestPart, ReportPartGenerator reportPartGenerator) {this.reportRequestPart = reportRequestPart;this.reportPartGenerator = reportPartGenerator;}@Overridepublic ReportPart call() {return this.reportPartGenerator.generateReportPart(reportRequestPart);} }public class ExecutorsBasedReportGenerator implements ReportGenerator {private static final Logger logger = LoggerFactory.getLogger(ExecutorsBasedReportGenerator.class);private ReportPartGenerator reportPartGenerator;private ExecutorService executors = Executors.newFixedThreadPool(10);@Overridepublic Report generateReport(ReportRequest reportRequest) {List<Callable<ReportPart>> tasks = new ArrayList<Callable<ReportPart>>();List<ReportRequestPart> reportRequestParts = reportRequest.getRequestParts();for (ReportRequestPart reportRequestPart : reportRequestParts) {tasks.add(new ReportPartRequestCallable(reportRequestPart, reportPartGenerator));}List<Future<ReportPart>> responseForReportPartList;List<ReportPart> reportParts = new ArrayList<ReportPart>();try {responseForReportPartList = executors.invokeAll(tasks);for (Future<ReportPart> reportPartFuture : responseForReportPartList) {reportParts.add(reportPartFuture.get());}} catch (Exception e) {logger.error(e.getMessage(), e);throw new RuntimeException(e);}return new Report(reportParts);}...... }在這里,使用Executors.newFixedThreadPool(10)調(diào)用創(chuàng)建線程池,線程池的大小為10,為每個報(bào)告請求部分生成一個可調(diào)用任務(wù),并使用ExecutorService抽象將其移交給線程池
responseForReportPartList = executors.invokeAll(tasks);此調(diào)用返回一個期貨列表,該列表支持get()方法,這是對響應(yīng)可用的阻塞調(diào)用。
與原始線程版本相比,這顯然是一個更好的實(shí)現(xiàn),線程數(shù)量在負(fù)載下被限制為可管理的數(shù)量。
基于Spring集成的實(shí)現(xiàn)
我個人最喜歡的方法是使用Spring Integration ,原因是使用Spring Integration時 ,我專注于完成不同任務(wù)的組件,并留給Spring Integration使用基于xml或基于注釋的配置將流程連接在一起。 在這里,我將使用基于XML的配置:
在我的案例中,這些組件是:
1.給出報(bào)告部分的請求,生成報(bào)告部分的組件,我之前已經(jīng)顯示過 。
2.用于將報(bào)告請求拆分為報(bào)告請求部分的組件:
3.用于將報(bào)告部分組裝/匯總為整個報(bào)告的組件:
public class DefaultReportAggregator implements ReportAggregator{@Overridepublic Report aggregate(List<ReportPart> reportParts) {return new Report(reportParts);}}這就是Spring Integration所需的所有Java代碼,其余的都是接線–在這里,我使用了Spring Integration配置文件:
<?xml version='1.0' encoding='UTF-8'?> <beans ....<int:channel id='report.partsChannel'/><int:channel id='report.reportChannel'/><int:channel id='report.partReportChannel'><int:queue capacity='50'/></int:channel> <int:channel id='report.joinPartsChannel'/><int:splitter id='splitter' ref='reportsPartSplitter' method='split' input-channel='report.partsChannel' output-channel='report.partReportChannel'/><task:executor id='reportPartGeneratorExecutor' pool-size='10' queue-capacity='50' /><int:service-activator id='reportsPartServiceActivator' ref='reportPartReportGenerator' method='generateReportPart' input-channel='report.partReportChannel' output-channel='report.joinPartsChannel'><int:poller task-executor='reportPartGeneratorExecutor' fixed-delay='500'></int:poller></int:service-activator><int:aggregator ref='reportAggregator' method='aggregate' input-channel='report.joinPartsChannel' output-channel='report.reportChannel' ></int:aggregator> <int:gateway id='reportGeneratorGateway' service-interface='org.bk.sisample.springintegration.ReportGeneratorGateway' default-request-channel='report.partsChannel' default-reply-channel='report.reportChannel'/><bean name='reportsPartSplitter' class='org.bk.sisample.springintegration.processors.DefaultReportRequestSplitter'></bean><bean name='reportPartReportGenerator' class='org.bk.sisample.processors.DummyReportPartGenerator'/><bean name='reportAggregator' class='org.bk.sisample.springintegration.processors.DefaultReportAggregator'/><bean name='reportGenerator' class='org.bk.sisample.springintegration.SpringIntegrationBasedReportGenerator'/></beans>Spring Source Tool Suite提供了一種可視化此文件的好方法:
這完全符合我對用戶流的原始看法:
在代碼的Spring Integration版本中,我定義了不同的組件來處理流程的不同部分:
1.將報(bào)告請求轉(zhuǎn)換為報(bào)告請求部分的拆分器:
2.服務(wù)激活器組件,用于根據(jù)報(bào)告部件請求生成報(bào)告部件:
<int:service-activator id='reportsPartServiceActivator' ref='reportPartReportGenerator' method='generateReportPart' input-channel='report.partReportChannel' output-channel='report.joinPartsChannel'><int:poller task-executor='reportPartGeneratorExecutor' fixed-delay='500'></int:poller></int:service-activator>3.聚合器,用于將報(bào)表部分重新加入報(bào)表,并且足夠智能,可以適當(dāng)?shù)仃P(guān)聯(lián)原始拆分報(bào)表請求,而無需任何顯式編碼:
<int:aggregator ref='reportAggregator' method='aggregate' input-channel='report.joinPartsChannel' output-channel='report.reportChannel' ></int:aggregator>這段代碼有趣的是,就像在基于執(zhí)行者的示例中一樣,使用xml文件,通過使用適當(dāng)?shù)耐ǖ缹⒉煌慕M件連接在一起以及通過使用任務(wù)執(zhí)行器 ,可以完全配置服務(wù)于每個組件的線程數(shù)。設(shè)置為執(zhí)行程序?qū)傩缘木€程池大小。
在這段代碼中,我定義了一個隊(duì)列通道,其中報(bào)告請求部分進(jìn)入其中:
<int:channel id='report.partReportChannel'><int:queue capacity='50'/></int:channel>并由服務(wù)激活器組件使用任務(wù)執(zhí)行器提供服務(wù),該任務(wù)執(zhí)行器的線程池大小為10,容量為50:
<task:executor id='reportPartGeneratorExecutor' pool-size='10' queue-capacity='50' /><int:service-activator id='reportsPartServiceActivator' ref='reportPartReportGenerator' method='generateReportPart' input-channel='report.partReportChannel' output-channel='report.joinPartsChannel'><int:poller task-executor='reportPartGeneratorExecutor' fixed-delay='500'></int:poller></int:service-activator>所有這些都通過配置!
該示例的完整代碼庫可在以下github位置獲得: https : //github.com/bijukunjummen/si-sample
參考: 并發(fā)–來自JCG合作伙伴 Biju Kunjummen的“ 執(zhí)行程序和Spring集成” ,位于all和其他博客上。
翻譯自: https://www.javacodegeeks.com/2012/06/concurrency-executors-and-spring.html
總結(jié)
以上是生活随笔為你收集整理的并发–执行程序和Spring集成的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ddos贴吧(爆吧是ddos吗)
- 下一篇: linux应用开发工程师需要学什么(li