當(dāng)前位置:
首頁(yè) >
前端技术
> javascript
>内容正文
javascript
SpringBatch 配置并行启动Job详解 (八)
生活随笔
收集整理的這篇文章主要介紹了
SpringBatch 配置并行启动Job详解 (八)
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
文章目錄
- 一、創(chuàng)建并行job
前言:在日常業(yè)務(wù)中可能需要job并行執(zhí)行,SpringBatch支持job并行步執(zhí)行,并且配置簡(jiǎn)單。
代碼已上傳GitHub上面地址:https://github.com/FadeHub/spring-boot-learn/tree/master/spring-boot-springbatch
SpringBatch其它文章直通車(chē):
- SpringBatch讀單個(gè)文件(FlatFileItemReader)和寫(xiě)單個(gè)文件(FlatFileItemWriter)(一)
- SpringBatch順序讀取多文件(MultiResourceItemReader)和順序?qū)懳募?MultiResourceItemWriter)(二)
- SpringBatch讀數(shù)據(jù)庫(kù)(MyBatisPagingItemReader)(三)
- SpringBatch讀文件(FlatFileItemReader)寫(xiě)據(jù)庫(kù)(MyBatisBatchItemWriter)(四)
- SpringBatch 監(jiān)聽(tīng)器之Job監(jiān)聽(tīng)器(JobExecutionListener)和Step監(jiān)聽(tīng)器(StepExecutionListener)(五)
- SpringBatch 監(jiān)聽(tīng)器之Chunk監(jiān)聽(tīng)器(ChunkListener)和Skip監(jiān)聽(tīng)器(SkipListener)(六)
- SpringBatch 多線程(TaskExecutor)啟動(dòng)Job詳解 (七)
一、創(chuàng)建并行job
首先使用FlowBuilder構(gòu)建一個(gè)個(gè)小的flow流程,在這個(gè)流程里面指定步驟,兩個(gè)流程flow是并行執(zhí)行的,下面有兩個(gè)并行流flow1和flow2,flow1里面有step1 step2先后順序,flow2有step3,也就是說(shuō){step1,step2}一起和step3是并行的:
package com.sl.config;import com.sl.common.CommonFileItemReader; import com.sl.common.CommonFileItemWriter; import com.sl.common.CommonMybatisItemReader; import com.sl.entity.CafeCat; import com.sl.entity.Cat; import com.sl.entity.People; import com.sl.entity.Student; import com.sl.listener.CatChunkListener; import com.sl.listener.CatJobListener; import com.sl.listener.CatStepListener; import com.sl.processor.CafeCatProcessor; import com.sl.processor.StudentProcessor; import org.apache.ibatis.session.SqlSessionFactory; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.job.builder.FlowBuilder; import org.springframework.batch.core.job.flow.Flow; import org.springframework.batch.core.job.flow.support.SimpleFlow; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.core.task.TaskExecutor;/*** job并行執(zhí)行* @author shuliangzhao* @Title: CatFlowConfiguration* @ProjectName spring-boot-learn* @Description: TODO* @date 2019/9/14 20:10*/ @Configuration @EnableBatchProcessing public class CatFlowConfiguration {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate CafeCatProcessor cafeCatProcessor;@Autowiredprivate SqlSessionFactory sqlSessionFactory;@Autowiredprivate CatJobListener catJobListener;@Autowiredprivate CatStepListener catStepListener;@Autowiredprivate CatChunkListener catChunkListener;@Autowiredprivate StudentProcessor studentProcessor;@Beanpublic Job catFlowJob() {return jobBuilderFactory.get("catFlowJob").start(splitFlow()).next(catFlowStep()).build().build();}@Beanpublic Step catFlowStep() {return stepBuilderFactory.get("catFlowStep")// .listener(catStepListener).listener(catChunkListener).<Cat, CafeCat>chunk(10).reader(catFlowCommonMybatisItemReader()).processor(cafeCatProcessor).writer(cafeCatFlowCommonFileItemWriter()).taskExecutor(flowTaskExecutor()).throttleLimit(8).build();}@Bean@StepScopepublic CommonMybatisItemReader<Cat> catFlowCommonMybatisItemReader() {return new CommonMybatisItemReader(sqlSessionFactory,Cat.class.getSimpleName());}@Bean@StepScopepublic CommonFileItemWriter<CafeCat> cafeCatFlowCommonFileItemWriter() {return new CommonFileItemWriter<>(CafeCat.class);}@Beanpublic Flow splitFlow() {return new FlowBuilder<SimpleFlow>("splitFlow").split(flowTaskExecutor()).add(flow1(), flow2()).build();}@Beanpublic Flow flow1() {return new FlowBuilder<SimpleFlow>("flow1").start(step1()).next(step2()).build();}@Beanpublic Flow flow2() {return new FlowBuilder<SimpleFlow>("flow2").start(step3()).build();}@Beanpublic Step step3() {return stepBuilderFactory.get("step3").<People, Student>chunk(10).reader(step3flow2CommonFileItemReader()).processor(studentProcessor).writer(step3flow2CommonFileItemWriter()).build();}@Bean@StepScopepublic CommonFileItemReader<People> step3flow2CommonFileItemReader() {return new CommonFileItemReader<>(People.class);}@Bean@StepScopepublic CommonFileItemWriter<Student> step3flow2CommonFileItemWriter() {return new CommonFileItemWriter<>(Student.class);}@Beanpublic Step step1() {return stepBuilderFactory.get("step1").<People, Student>chunk(10).reader(step1flow1CommonFileItemReader()).processor(studentProcessor).writer(step1flow1CommonFileItemWriter()).build();}@Bean@StepScopepublic CommonFileItemReader<People> step1flow1CommonFileItemReader() {return new CommonFileItemReader<>(People.class);}@Bean@StepScopepublic CommonFileItemWriter<Student> step1flow1CommonFileItemWriter() {return new CommonFileItemWriter<>(Student.class);}@Beanpublic Step step2() {return stepBuilderFactory.get("step2").<People, Student>chunk(10).reader(step2flow1CommonFileItemReader()).processor(studentProcessor).writer(step2flow1CommonFileItemWriter()).build();}@Bean@StepScopepublic CommonFileItemReader<People> step2flow1CommonFileItemReader() {return new CommonFileItemReader<>(People.class);}@Bean@StepScopepublic CommonFileItemWriter<Student> step2flow1CommonFileItemWriter() {return new CommonFileItemWriter<>(Student.class);}@Beanpublic TaskExecutor flowTaskExecutor(){return new SimpleAsyncTaskExecutor("spring_batch");} }需要指定TaskExecutor 應(yīng)該使用哪個(gè)實(shí)現(xiàn)來(lái)執(zhí)行各個(gè)流。默認(rèn)值為 SyncTaskExecutor沒(méi)有用,需要異步TaskExecutor才能并行運(yùn)行這些步驟。
總結(jié)
以上是生活随笔為你收集整理的SpringBatch 配置并行启动Job详解 (八)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: SpringBatch 多线程(Task
- 下一篇: SpringBatch tasklet实