javascript
SpringBoot整合SpringBatch实用简例
SpringBatch主要是一個輕量級的大數據量的并行處理(批處理)的框架。
作用和Hadoop很相似,不過Hadoop是基于重量級的分布式環境(處理巨量數據),而SpringBatch是基于輕量的應用框架(處理中小數據)。
這里使用SpringBatch做了一個能跑的最簡單例子,進行描述SpringBatch的基本作用。
如果需要進行深入學習,請詳細參考閱讀?https://docs.spring.io/spring-batch/4.0.x/reference/html/index.html?;英文不好的同學,請和我一樣右鍵(翻譯成中文查看)。
簡單的技術棧 : SpringBoot + SpringBatch + JPA ,?完整demo的項目地址 :?https://github.com/EalenXie/springboot-batch
1 . 新建項目springboot-batch,基本的pom.xml依賴 :?
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>name.ealen</groupId><artifactId>springboot-batch</artifactId><version>1.0</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.1.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency></dependencies> </project>2 . 你需要在數據庫中建立springbatch的相關元數據表,所以你需要在數據庫中執行如下來自官方元數據模式的腳本。
-- do not edit this file -- BATCH JOB 實例表 包含與aJobInstance相關的所有信息 -- JOB ID由batch_job_seq分配 -- JOB 名稱,與spring配置一致 -- JOB KEY 對job參數的MD5編碼,正因為有這個字段的存在,同一個job如果第一次運行成功,第二次再運行會拋出JobInstanceAlreadyCompleteException異常。 CREATE TABLE BATCH_JOB_INSTANCE (JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,VERSION BIGINT ,JOB_NAME VARCHAR(100) NOT NULL,JOB_KEY VARCHAR(32) NOT NULL,constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY) ) ENGINE=InnoDB; -- 該BATCH_JOB_EXECUTION表包含與該JobExecution對象相關的所有信息 CREATE TABLE BATCH_JOB_EXECUTION (JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,VERSION BIGINT ,JOB_INSTANCE_ID BIGINT NOT NULL,CREATE_TIME DATETIME NOT NULL,START_TIME DATETIME DEFAULT NULL ,END_TIME DATETIME DEFAULT NULL ,STATUS VARCHAR(10) ,EXIT_CODE VARCHAR(2500) ,EXIT_MESSAGE VARCHAR(2500) ,LAST_UPDATED DATETIME,JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID) ) ENGINE=InnoDB; -- 該表包含與該JobParameters對象相關的所有信息 CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (JOB_EXECUTION_ID BIGINT NOT NULL ,TYPE_CD VARCHAR(6) NOT NULL ,KEY_NAME VARCHAR(100) NOT NULL ,STRING_VAL VARCHAR(250) ,DATE_VAL DATETIME DEFAULT NULL ,LONG_VAL BIGINT ,DOUBLE_VAL DOUBLE PRECISION ,IDENTIFYING CHAR(1) NOT NULL ,constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ENGINE=InnoDB; -- 該表包含與該StepExecution 對象相關的所有信息 CREATE TABLE BATCH_STEP_EXECUTION (STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,VERSION BIGINT NOT NULL,STEP_NAME VARCHAR(100) NOT NULL,JOB_EXECUTION_ID BIGINT NOT NULL,START_TIME DATETIME NOT NULL ,END_TIME DATETIME DEFAULT NULL ,STATUS VARCHAR(10) ,COMMIT_COUNT BIGINT ,READ_COUNT BIGINT ,FILTER_COUNT BIGINT ,WRITE_COUNT BIGINT ,READ_SKIP_COUNT BIGINT ,WRITE_SKIP_COUNT BIGINT ,PROCESS_SKIP_COUNT BIGINT ,ROLLBACK_COUNT BIGINT ,EXIT_CODE VARCHAR(2500) ,EXIT_MESSAGE VARCHAR(2500) ,LAST_UPDATED DATETIME,constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ENGINE=InnoDB; -- 該BATCH_STEP_EXECUTION_CONTEXT表包含ExecutionContext與Step相關的所有信息 CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,SHORT_CONTEXT VARCHAR(2500) NOT NULL,SERIALIZED_CONTEXT TEXT ,constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID) ) ENGINE=InnoDB; -- 該表包含ExecutionContext與Job相關的所有信息 CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,SHORT_CONTEXT VARCHAR(2500) NOT NULL,SERIALIZED_CONTEXT TEXT ,constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ENGINE=InnoDB; CREATE TABLE BATCH_STEP_EXECUTION_SEQ (ID BIGINT NOT NULL,UNIQUE_KEY CHAR(1) NOT NULL,constraint UNIQUE_KEY_UN unique (UNIQUE_KEY) ) ENGINE=InnoDB; INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ); CREATE TABLE BATCH_JOB_EXECUTION_SEQ (ID BIGINT NOT NULL,UNIQUE_KEY CHAR(1) NOT NULL,constraint UNIQUE_KEY_UN unique (UNIQUE_KEY) ) ENGINE=InnoDB; INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ); CREATE TABLE BATCH_JOB_SEQ (ID BIGINT NOT NULL,UNIQUE_KEY CHAR(1) NOT NULL,constraint UNIQUE_KEY_UN unique (UNIQUE_KEY) ) ENGINE=InnoDB; INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);3 . 測試數據的實體類 :?Access.java
package name.ealen.model;import javax.persistence.*; /*** Created by EalenXie on 2018/9/10 16:17.*/ @Entity @Table public class Access {@Id@GeneratedValue(strategy = GenerationType.AUTO)private Integer id;private String username;private String shopName;private String categoryName;private String brandName;private String shopId;private String omit;private String updateTime;private boolean deleteStatus;private String createTime;private String description;public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getShopName() {return shopName;}public void setShopName(String shopName) {this.shopName = shopName;}public String getCategoryName() {return categoryName;}public void setCategoryName(String categoryName) {this.categoryName = categoryName;}public String getBrandName() {return brandName;}public void setBrandName(String brandName) {this.brandName = brandName;}public String getShopId() {return shopId;}public void setShopId(String shopId) {this.shopId = shopId;}public String getOmit() {return omit;}public void setOmit(String omit) {this.omit = omit;}public String getUpdateTime() {return updateTime;}public void setUpdateTime(String updateTime) {this.updateTime = updateTime;}public boolean isDeleteStatus() {return deleteStatus;}public void setDeleteStatus(boolean deleteStatus) {this.deleteStatus = deleteStatus;}public String getCreateTime() {return createTime;}public void setCreateTime(String createTime) {this.createTime = createTime;}public String getDescription() {return description;}public void setDescription(String description) {this.description = description;}@Overridepublic String toString() {return "Access{" +"id=" + id +", username='" + username + '\'' +", shopName='" + shopName + '\'' +", categoryName='" + categoryName + '\'' +", brandName='" + brandName + '\'' +", shopId='" + shopId + '\'' +", omit='" + omit + '\'' +", updateTime='" + updateTime + '\'' +", deleteStatus=" + deleteStatus +", createTime='" + createTime + '\'' +", description='" + description + '\'' +'}';} }4 . 配置一個最簡單的Job 之前,準備一些基本配置,例如為Job添加一個監聽器 :?
配置TaskExecutor,ExecutorConfiguration.java
package name.ealen.config;import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;/*** 配置TaskExecutor*/ @Configuration public class ExecutorConfiguration {@Beanpublic ThreadPoolTaskExecutor threadPoolTaskExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();threadPoolTaskExecutor.setCorePoolSize(50);threadPoolTaskExecutor.setMaxPoolSize(200);threadPoolTaskExecutor.setQueueCapacity(1000);threadPoolTaskExecutor.setThreadNamePrefix("Data-Job");return threadPoolTaskExecutor;} }為Job準備一個簡單的監聽器 ,實現JobExecutionListener即可 :?
package name.ealen.listener;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionListener; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** Created by EalenXie on 2018/9/10 15:09.* 一個簡單的JOB listener*/ @Component public class JobListener implements JobExecutionListener {private static final Logger log = LoggerFactory.getLogger(JobListener.class);@Resourceprivate ThreadPoolTaskExecutor threadPoolTaskExecutor;private long startTime;@Overridepublic void beforeJob(JobExecution jobExecution) {startTime = System.currentTimeMillis();log.info("job before " + jobExecution.getJobParameters());}@Overridepublic void afterJob(JobExecution jobExecution) {log.info("JOB STATUS : {}", jobExecution.getStatus());if (jobExecution.getStatus() == BatchStatus.COMPLETED) {log.info("JOB FINISHED");threadPoolTaskExecutor.destroy();} else if (jobExecution.getStatus() == BatchStatus.FAILED) {log.info("JOB FAILED");}log.info("Job Cost Time : {}ms" , (System.currentTimeMillis() - startTime));} }5 . 配置一個最基本的Job : 一個Job 通常由一個或多個Step組成(基本就像是一個工作流);一個Step通常由三部分組成(讀入數據 ItemReader,處理數據 ItemProcessor,寫入數據 ItemWriter)
package name.ealen.batch;import name.ealen.listener.JobListener; import name.ealen.model.Access; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.database.JpaPagingItemReader; import org.springframework.batch.item.database.orm.JpaNativeQueryProvider; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import javax.annotation.Resource; import javax.persistence.EntityManagerFactory;/*** Created by EalenXie on 2018/9/10 14:50.* :@EnableBatchProcessing提供用于構建批處理作業的基本配置*/ @Configuration @EnableBatchProcessing public class DataBatchConfiguration {private static final Logger log = LoggerFactory.getLogger(DataBatchConfiguration.class);@Resourceprivate JobBuilderFactory jobBuilderFactory; //用于構建JOB@Resourceprivate StepBuilderFactory stepBuilderFactory; //用于構建Step@Resourceprivate EntityManagerFactory emf; //注入實例化Factory 訪問數據@Resourceprivate JobListener jobListener; //簡單的JOB listener/*** 一個簡單基礎的Job通常由一個或者多個Step組成*/@Beanpublic Job dataHandleJob() {return jobBuilderFactory.get("dataHandleJob").incrementer(new RunIdIncrementer()).start(handleDataStep()). //start是JOB執行的第一個step // next(xxxStep()). // next(xxxStep()). // ...listener(jobListener). //設置了一個簡單JobListenerbuild();}/*** 一個簡單基礎的Step主要分為三個部分* ItemReader : 用于讀取數據* ItemProcessor : 用于處理數據* ItemWriter : 用于寫數據*/@Beanpublic Step handleDataStep() {return stepBuilderFactory.get("getData").<Access, Access>chunk(100). // <輸入,輸出> 。chunk通俗的講類似于SQL的commit; 這里表示處理(processor)100條后寫入(writer)一次。faultTolerant().retryLimit(3).retry(Exception.class).skipLimit(100).skip(Exception.class). //捕捉到異常就重試,重試100次還是異常,JOB就停止并標志失敗reader(getDataReader()). //指定ItemReaderprocessor(getDataProcessor()). //指定ItemProcessorwriter(getDataWriter()). //指定ItemWriterbuild();}@Beanpublic ItemReader<? extends Access> getDataReader() {//讀取數據,這里可以用JPA,JDBC,JMS 等方式 讀入數據JpaPagingItemReader<Access> reader = new JpaPagingItemReader<>();//這里選擇JPA方式讀數據 一個簡單的 native SQLString sqlQuery = "SELECT * FROM access";try {JpaNativeQueryProvider<Access> queryProvider = new JpaNativeQueryProvider<>();queryProvider.setSqlQuery(sqlQuery);queryProvider.setEntityClass(Access.class);queryProvider.afterPropertiesSet();reader.setEntityManagerFactory(emf);reader.setPageSize(3);reader.setQueryProvider(queryProvider);reader.afterPropertiesSet();//所有ItemReader和ItemWriter實現都會在ExecutionContext提交之前將其當前狀態存儲在其中,如果不希望這樣做,可以設置setSaveState(false)reader.setSaveState(true);} catch (Exception e) {e.printStackTrace();}return reader;}@Beanpublic ItemProcessor<Access, Access> getDataProcessor() {return new ItemProcessor<Access, Access>() {@Overridepublic Access process(Access access) throws Exception {log.info("processor data : " + access.toString()); //模擬 假裝處理數據,這里處理就是打印一下return access;}}; // lambda也可以寫為: // return access -> { // log.info("processor data : " + access.toString()); // return access; // };}@Beanpublic ItemWriter<Access> getDataWriter() {return list -> {for (Access access : list) {log.info("write data : " + access); //模擬 假裝寫數據 ,這里寫真正寫入數據的邏輯}};} }6 . 配置好基本的Job之后,為Access表導入一些基本的數據(git上面有demo數據,access.sql),寫一個SpringBoot的啟動類進行測試。
注意 : Job中的各個組件請使用@Bean注解聲明,這樣在元數據中才會有相應的正常操作記錄 :?
package name.ealen;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;/*** Created by EalenXie on 2018/9/10 14:41.*/ @SpringBootApplication public class SpringBatchApplication {public static void main(String[] args) {SpringApplication.run(SpringBatchApplication.class, args);} }7 . 運行可以看到基本數據處理效果,這里是模擬處理,和模擬寫入 :?
8 . 從元數據等表中查看驗證JOB的執行情況 :?
?
總結
以上是生活随笔為你收集整理的SpringBoot整合SpringBatch实用简例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 三种Shell脚本编程中避免SFTP输入
- 下一篇: springbatch的reader,如