SpringBatch高階應(yīng)用:大數(shù)據(jù)批處理框架實(shí)戰(zhàn)指南
本篇文章主要內(nèi)容:通過Spring Batch從一個(gè)庫中讀取數(shù)據(jù)進(jìn)過處理后寫入到另外一個(gè)庫中。
1. 環(huán)境準(zhǔn)備
1.1 引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
2.2 配置Job
配置Job啟動(dòng)器
@Bean
JobLauncher userJobLauncher(JobRepository userJobRepository) {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher() ;
jobLauncher.setJobRepository(userJobRepository) ;
return jobLauncher ;
}
配置任務(wù)Repository存儲(chǔ)元信息
@Bean
JobRepository userJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean() ;
factory.setDatabaseType("mysql") ;
factory.setTransactionManager(transactionManager) ;
factory.setDataSource(dataSource) ;
try {
factory.afterPropertiesSet() ;
return factory.getObject() ;
} catch (Exception e) {
throw new RuntimeException(e) ;
}
}
配置ItemReader讀取器
@Bean
ItemReader<User> userReader(JobOperator jobOperator) throws Exception {
JpaPagingItemReaderBuilder<User> builder = new JpaPagingItemReaderBuilder<>() ;
builder.entityManagerFactory(entityManagerFactory) ;
// 每次分頁查詢多少條數(shù)據(jù)
builder.pageSize(10) ;
builder.queryString("select u from User u where u.uid <= 50") ;
builder.saveState(true) ;
builder.name("userReader") ;
return builder.build() ;
}
配置數(shù)據(jù)源,該數(shù)據(jù)源是用來寫入操作的
public DataSource dataSource() {
HikariDataSource dataSource = new HikariDataSource() ;
dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/testjpa?serverTimezone=GMT%2B8&useSSL=false") ;
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver") ;
dataSource.setUsername("root") ;
dataSource.setPassword("xxxooo") ;
return dataSource ;
}
配置ItemWriter用來寫入操作(當(dāng)前庫的數(shù)據(jù)寫入到另外一個(gè)庫,上面的數(shù)據(jù)源)
@Bean
ItemWriter<User> userWriter() {
// 通過JDBC批量處理
JdbcBatchItemWriterBuilder<User> builder = new JdbcBatchItemWriterBuilder<>() ;
DataSource dataSource = dataSource() ;
builder.dataSource(dataSource) ;
builder.sql("insert into st (id, name, sex, mobile, age, birthday) values (?, ?, ?, ?, ?, ?)") ;
builder.itemPreparedStatementSetter(new ItemPreparedStatementSetter<User>() {
@Override
public void setValues(User item, PreparedStatement ps) throws SQLException {
ps.setInt(1, item.getUid()) ;
ps.setString(2, item.getName()) ;
ps.setString(3, item.getSex()) ;
ps.setString(4, item.getMobile()) ;
ps.setInt(5, item.getAge()) ;
ps.setObject(6, item.getBirthday()) ;
}
}) ;
return builder.build() ;
}
配置ItemProcessor處理器,數(shù)據(jù)從當(dāng)前庫讀取處理后經(jīng)過處理后再寫入另外的庫中
@Bean
ItemProcessor<User, User> userProcessor() {
return new ItemProcessor<User, User>() {
@Override
public User process(User item) throws Exception {
System.out.printf("%s - 開始處理數(shù)據(jù):%s%n", Thread.currentThread().getName(), item.toString()) ;
// 模擬耗時(shí)操作
TimeUnit.SECONDS.sleep(1) ;
// 在這里你可以對(duì)數(shù)據(jù)進(jìn)行相應(yīng)的處理。
return item ;
}
} ;
}
配置Step將ItemReader、ItemProcessor、ItemWriter串聯(lián)在一起。
@Bean
Step userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {
return steps.get("userStep1")
.<User, User>chunk(5)
.reader(userReader)
.processor(userProcessor)
.writer(userWriter)
.build() ;
}
配置Job,Job是封裝整個(gè)批處理流程的實(shí)體。在 Spring Batch 中,Job只是Step實(shí)例的容器。它將邏輯上屬于一個(gè)流程的多個(gè)步驟組合在一起,并允許對(duì)所有步驟的全局屬性(如可重啟性)進(jìn)行配置。作業(yè)配置包含:
- 簡單的工作名稱。
- Step實(shí)例的定義和排序。
- Job是否可重新啟動(dòng)。
@Bean
Job userJob(Step userStep1, Step userStep2) {
return jobs.get("userJob").start(userStep1).build();
}
以上是Spring Batch定義配置一個(gè)Job所需的核心組件。接下來會(huì)以上面的基礎(chǔ)配置進(jìn)行高階知識(shí)點(diǎn)進(jìn)行介紹。
2. 高階配置管理
2.1 通過Controller接口啟動(dòng)Job
@RequestMapping("/userJob")
public class UserJobController {
@Resource
private JobLauncher userJobLauncher ;
@GetMapping("/start")
public Object start() throws Exception {
JobParameters jobParameters = new JobParameters() ;
this.userJobLauncher.run(userJob, jobParameters) ;
return "started" ;
}
}
通過JobLauncher#run方法啟動(dòng)Job。當(dāng)你調(diào)用該接口時(shí),你會(huì)發(fā)現(xiàn)接口一直不會(huì)返回,一直阻塞,下圖是Job的啟動(dòng)序列
圖片
根據(jù)上圖能知道,當(dāng)你調(diào)用run方法后,會(huì)等待整個(gè)Job退出狀態(tài)為FINISHED或者FAILED后才能結(jié)束。所以,你需要異步完成,以便 SimpleJobLauncher 立即返回給調(diào)用者。而正確的序列應(yīng)該是如下:
圖片
上圖通過異步方式啟動(dòng)Job序列。
2.2 異步啟動(dòng)Job
@Bean
TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor() ;
taskExecutor.setThreadNamePrefix("spring_batch_launcher") ;
taskExecutor.setCorePoolSize(10) ;
taskExecutor.setMaxPoolSize(10) ;
taskExecutor.initialize() ;
return taskExecutor ;
}
@Bean
JobLauncher userJobLauncher(JobRepository userJobRepository) {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher() ;
jobLauncher.setJobRepository(userJobRepository) ;
jobLauncher.setTaskExecutor(taskExecutor()) ;
return jobLauncher ;
}
通過上面配置后,Job啟動(dòng)將是異步的會(huì)直接返回JobExecution。
2.3 重啟Job
當(dāng)一個(gè)Job正在執(zhí)行,由于斷電或者強(qiáng)制終止了程序。當(dāng)程序恢復(fù)后你希望能夠接著程序終止前的進(jìn)度繼續(xù)執(zhí)行,這時(shí)候你需要進(jìn)行如下的操作(本人沒有發(fā)現(xiàn)有什么API能夠操作的,可能文檔沒看仔細(xì))。
當(dāng)程序非正常終止是,下面兩張表的狀態(tài)都是STARTED,END_TIME為null
batch_job_execution表
圖片
batch_step_execution表
圖片
想要重新啟動(dòng)必須將上面的狀態(tài)修改為STOPPED,END_TIME字段設(shè)置上值(是什么值無所謂)。
然后我們就可以繼續(xù)使用上面的Controller接口啟動(dòng)任務(wù)繼續(xù)執(zhí)行了。
2.4 多線程執(zhí)行Step
為了加快程序的執(zhí)行,我們可以為Step配置線程池
@Bean
Step userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {
return steps.get("userStep1")
.<User, User>chunk(5)
.reader(userReader)
.processor(userProcessor)
.writer(userWriter)
// 配置線程池
.taskExecutor(taskExecutor())
.build() ;
}
注意:Step中使用的任何池化資源(如數(shù)據(jù)源)都可能對(duì)并發(fā)性設(shè)置限制。請(qǐng)確保這些資源池至少與步驟中所需的并發(fā)線程數(shù)一樣大。
通過上面配置線程池后,你將在控制臺(tái)看到如下輸出。
圖片
默認(rèn)將有4個(gè)線程同時(shí)進(jìn)行處理??梢酝ㄟ^如下配置進(jìn)行調(diào)整
@Bean
Step userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {
return steps.get("userStep1")
// ...
// 節(jié)流限制10,這里配置的大小應(yīng)該與你的數(shù)據(jù)庫連接池大小及使用的線程池核心線程數(shù)一致。
.throttleLimit(10)
.build() ;
}
2.5 重復(fù)啟動(dòng)Job
要想重復(fù)啟動(dòng)Job,我們可以在啟動(dòng)Job時(shí)設(shè)置不同的JobParameters參數(shù),只要參數(shù)不同那么就可以重復(fù)的啟動(dòng)Job。如下示例:
@GetMapping("/start/{page}")
public Object start(@PathVariable("page") Long page) throws Exception {
Map<String, JobParameter> parameters = new HashMap<>() ;
// 每次設(shè)置的參數(shù)值不同即可。
parameters.put("page", new JobParameter(page)) ;
JobParameters jobParameters = new JobParameters(parameters) ;
this.userJobLauncher.run(userJob, jobParameters) ;
return "started" ;
}
以上是本篇文章的全部內(nèi)容,希望對(duì)你有幫助。