javascript
SpringBatch批处理框架入门(二)
這篇文章接上一篇SpringBatch批處理框架入門(一),繼續(xù)講解SpringBatch基礎(chǔ)知識(shí)。
目錄
SpringBatch 核心類介紹
?SpringBatch 核心類Job
?SpringBatch 核心類Step
?SpringBatch 核心類StepExecution
?SpringBatch 核心類ExecutionContext
?SpringBatch 核心類JobRepository
?SpringBatch 核心類JobLauncher
?SpringBatch 核心類ItemReader和ItemWriter
?SpringBatch 核心類ItemProcessor?
?SpringBatch 核心類Chunk
?SpringBatch 監(jiān)聽器
?SpringBatch具體小例子(部分代碼)
?
SpringBatch 核心類介紹
下圖是我們要講解spring batch幾個(gè)核心類:
?SpringBatch 核心類Job
Job是封裝整個(gè)批處理過程的一個(gè)概念。Job在spring batch的體系當(dāng)中是一個(gè)最頂層的抽象概念。
一個(gè)job是我們運(yùn)行的基本單位,它內(nèi)部由step組成。job本質(zhì)上可以看成step的一個(gè)容器。一個(gè)job可以按照指定的邏輯順序組合step,并提供了我們給所有step設(shè)置相同屬性的方法,例如一些事件監(jiān)聽,跳過策略。
Spring Batch以SimpleJob類的形式提供了Job接口的默認(rèn)簡(jiǎn)單實(shí)現(xiàn),它在Job之上創(chuàng)建了一些標(biāo)準(zhǔn)功能。一個(gè)使用java bean的例子代碼如下:
@Beanpublic Job cafeCatJob() {return jobBuilderFactory.get("cafeCatJob").start(cafeCatStep()).build();}SpringBatch 核心類Step
帶有步驟的作業(yè)層次結(jié)構(gòu):
?SpringBatch 核心類StepExecution
StepExecution表示一次執(zhí)行Step,?每次運(yùn)行一個(gè)Step時(shí)都會(huì)創(chuàng)建一個(gè)新的StepExecution,類似于JobExecution。 但是,某個(gè)步驟可能由于其之前的步驟失敗而無法執(zhí)行。 且僅當(dāng)Step實(shí)際啟動(dòng)時(shí)才會(huì)創(chuàng)建StepExecution。
一次step執(zhí)行的實(shí)例由StepExecution類的對(duì)象表示。 每個(gè)StepExecution都包含對(duì)其相應(yīng)步驟的引用以及JobExecution和事務(wù)相關(guān)的數(shù)據(jù),例如提交和回滾計(jì)數(shù)以及開始和結(jié)束時(shí)間。 此外,每個(gè)步驟執(zhí)行都包含一個(gè)ExecutionContext,其中包含開發(fā)人員需要在批處理運(yùn)行中保留的任何數(shù)據(jù),例如重新啟動(dòng)所需的統(tǒng)計(jì)信息或狀態(tài)信息。
下面是一個(gè)從數(shù)據(jù)庫(kù)當(dāng)中截圖的實(shí)例:
SpringBatch 核心類ExecutionContext
ExecutionContext即每一個(gè)JobExecution和StepExecution 的執(zhí)行環(huán)境。它包含一系列的鍵值對(duì)。我們可以用如下代碼獲取ExecutionContext
- ExecutionContext stepContext= stepExecution.getExecutionContext();
- ExecutionContext jobContext = jobExecution.getExecutionContext();
SpringBatch 核心類JobRepository
JobRepository是一個(gè)用于將上述job,step等概念進(jìn)行持久化的一個(gè)類。 它同時(shí)給Job和Step以及下文會(huì)提到的JobLauncher實(shí)現(xiàn)提供CRUD操作。 首次啟動(dòng)Job時(shí),將從repository中獲取JobExecution,并且在執(zhí)行批處理的過程中,StepExecution和JobExecution將被存儲(chǔ)到repository當(dāng)中。
@EnableBatchProcessing注解可以為JobRepository提供自動(dòng)配置。
SpringBatch 核心類JobLauncher
JobLauncher這個(gè)接口的功能非常簡(jiǎn)單,它是用于啟動(dòng)指定了JobParameters的Job,為什么這里要強(qiáng)調(diào)指定了JobParameter,原因其實(shí)我們?cè)谇懊嬉呀?jīng)提到了,jobparameter和job一起才能組成一次job的執(zhí)行。
SpringBatch 核心類ItemReader和ItemWriter
ItemReader是一個(gè)讀數(shù)據(jù)的接口, 當(dāng)ItemReader讀完所有數(shù)據(jù)時(shí),返回null表示后續(xù)操作數(shù)據(jù)已經(jīng)讀完。Spring Batch為ItemReader提供了非常多的實(shí)現(xiàn)類。
ItemWriter是一個(gè)寫數(shù)據(jù)的接口,Spring Batch為ItemWriter提供了非常多的實(shí)現(xiàn)類。
- 操作單個(gè)文件:FlatFileItemReader 和 FlatFileItemWriter
- 操作多個(gè)文件:MultiResourceItemReader 和 MultiResourceItemWriter
- 操作數(shù)據(jù)庫(kù):JdbcPagingItemReader 和 JdbcBatchItemWriter
? ??????? MyBatisPagingItemReader 和 MyBatisBatchItemWriter
- 操作XML文件:StaxEventItemReader 和 StaxEventItemWriter
- 操作JSON文件:JsonItemReader 和 JsonFileItemWriter
SpringBatch 核心類ItemProcessor?
ItemProcessor對(duì)項(xiàng)目的業(yè)務(wù)邏輯處理的一個(gè)抽象,?當(dāng)ItemReader讀取到一條記錄之后,ItemWriter還未寫入這條記錄之前,I我們可以借助temProcessor提供一個(gè)處理業(yè)務(wù)邏輯的功能,并對(duì)數(shù)據(jù)進(jìn)行相應(yīng)操作。如果我們?cè)贗temProcessor發(fā)現(xiàn)一條數(shù)據(jù)不應(yīng)該被寫入,可以通過返回null來表示。
SpringBatch 核心類Chunk
一個(gè)chunk處理流程如右圖
一次batch的任務(wù)可能會(huì)有很多的數(shù)據(jù)讀寫操作,因此一條一條的處理并向數(shù)據(jù)庫(kù)提交的話效率不會(huì)很高,因此spring batch提供了chunk這個(gè)概念,我們可以設(shè)定一個(gè)chunk size,spring batch 將一條一條處理數(shù)據(jù),但不提交到數(shù)據(jù)庫(kù),只有當(dāng)處理的數(shù)據(jù)數(shù)量達(dá)到chunk size設(shè)定的值得時(shí)候,才一起去commit。
?SpringBatch 監(jiān)聽器
Spring Batch提供了多種監(jiān)聽器Listener,用于在任務(wù)處理過程中觸發(fā)我們的邏輯代碼。常用的監(jiān)聽器根據(jù)粒度從粗到細(xì)分別有:Job級(jí)別的監(jiān)聽器JobExecutionListener、Step級(jí)別的監(jiān)聽器StepExecutionListener、Chunk監(jiān)聽器ChunkListener、ItemReader監(jiān)聽器ItemReadListener、ItemWriter監(jiān)聽器ItemWriteListener和ItemProcessor監(jiān)聽器ItemProcessListener和SkipListener等。具體可以參考下表:
| 監(jiān)聽器 | 具體說明 |
| JobExecutionListener | 在Job開始之前(beforeJob)和之后(aflerJob)觸發(fā) |
| StepExecutionListener | 在Step開始之前(beforeStep)和之后(afterStep)觸發(fā) |
| ChunkListener | 在 Chunk 開始之前(beforeChunk),之后(afterChunk)和錯(cuò)誤后(afterChunkError)觸發(fā) |
| ItemReadListener | 在 Read 開始之前(beforeRead),之后(afterRead)和錯(cuò)誤后(onReadError)觸發(fā) |
| ItemProcessListener | 在 Processor 開始之前(beforeProcess),之后(afterProcess)和錯(cuò)誤后(onProcessError)觸發(fā) |
| ItemWriteListener | 在 Writer 開始之前(beforeWrite),之后(afterWrite)和錯(cuò)誤后(onWriteError)觸發(fā) |
| SkipListener | 在 Skip(reder)時(shí)候,在 Skip(writer)時(shí)候,在 Skip(processor)時(shí)候 |
?SpringBatch具體小例子(部分代碼)
@Configuration @EnableBatchProcessing public class CafeCatConfiguration {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate CatProcessor catProcessor;@Autowiredprivate SqlSessionFactory sqlSessionFactory;@Beanpublic Job cafeCatJob() {return jobBuilderFactory.get("cafeCatJob").start(cafeCatStep()).build();}@Beanpublic Step cafeCatStep() {return stepBuilderFactory.get("cafeCatStep").<CafeCat, Cat>chunk(10).reader(cafeCatCommonFileItemReader()).processor(catProcessor).writer(catCommonMybatisItemWriter()).build();}@Bean@StepScopepublic CommonFileItemReader<CafeCat> cafeCatCommonFileItemReader() {return new CommonFileItemReader<>(CafeCat.class);}@Bean@StepScopepublic CommonMybatisItemWriter<Cat> catCommonMybatisItemWriter() {return new CommonMybatisItemWriter<>(sqlSessionFactory,Cat.class.getSimpleName());} }詳細(xì)代碼請(qǐng)參考GitHub上面:SpringBatch例子詳細(xì)代碼
總結(jié)
以上是生活随笔為你收集整理的SpringBatch批处理框架入门(二)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SpringBatch批处理框架入门(一
- 下一篇: SpringBatch接口BatchCo