自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

SpringBatch高階應(yīng)用:大數(shù)據(jù)批處理框架實(shí)戰(zhàn)指南

開發(fā) 前端
配置Job,Job是封裝整個(gè)批處理流程的實(shí)體。在 Spring Batch 中,Job只是Step實(shí)例的容器。它將邏輯上屬于一個(gè)流程的多個(gè)步驟組合在一起,并允許對(duì)所有步驟的全局屬性(如可重啟性)進(jìn)行配置。

本篇文章主要內(nèi)容:通過Spring Batch從一個(gè)庫中讀取數(shù)據(jù)進(jìn)過處理后寫入到另外一個(gè)庫中。

1. 環(huán)境準(zhǔn)備

1.1 引入依賴

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

2.2 配置Job

配置Job啟動(dòng)器

@Bean
JobLauncher userJobLauncher(JobRepository userJobRepository) {
  SimpleJobLauncher jobLauncher = new SimpleJobLauncher() ;
  jobLauncher.setJobRepository(userJobRepository) ;
  return jobLauncher ;
}

配置任務(wù)Repository存儲(chǔ)元信息

@Bean
JobRepository userJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) {
  JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean() ;
  factory.setDatabaseType("mysql") ;
  factory.setTransactionManager(transactionManager) ;
  factory.setDataSource(dataSource) ;
  try {
    factory.afterPropertiesSet() ; 
    return factory.getObject() ;
  } catch (Exception e) {
    throw new RuntimeException(e) ;
  }
}

配置ItemReader讀取器

@Bean
ItemReader<User> userReader(JobOperator jobOperator) throws Exception {
  JpaPagingItemReaderBuilder<User> builder = new JpaPagingItemReaderBuilder<>() ;
  builder.entityManagerFactory(entityManagerFactory) ;
  // 每次分頁查詢多少條數(shù)據(jù)
  builder.pageSize(10) ;
  builder.queryString("select u from User u where u.uid <= 50") ;
  builder.saveState(true) ;
  builder.name("userReader") ;
  return builder.build() ;
}

配置數(shù)據(jù)源,該數(shù)據(jù)源是用來寫入操作的

public DataSource dataSource() {
  HikariDataSource dataSource = new HikariDataSource() ;
  dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/testjpa?serverTimezone=GMT%2B8&useSSL=false") ;
  dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver") ;
  dataSource.setUsername("root") ;
  dataSource.setPassword("xxxooo") ;
  return dataSource ;
}

配置ItemWriter用來寫入操作(當(dāng)前庫的數(shù)據(jù)寫入到另外一個(gè)庫,上面的數(shù)據(jù)源)

@Bean
ItemWriter<User> userWriter() {
  // 通過JDBC批量處理
  JdbcBatchItemWriterBuilder<User> builder = new JdbcBatchItemWriterBuilder<>() ;
  DataSource dataSource = dataSource() ;
  builder.dataSource(dataSource) ;
  builder.sql("insert into st (id, name, sex, mobile, age, birthday) values (?, ?, ?, ?, ?, ?)") ;
  builder.itemPreparedStatementSetter(new ItemPreparedStatementSetter<User>() {
    @Override
    public void setValues(User item, PreparedStatement ps) throws SQLException {
      ps.setInt(1, item.getUid()) ;
      ps.setString(2, item.getName()) ;
      ps.setString(3, item.getSex()) ;
      ps.setString(4, item.getMobile()) ;
      ps.setInt(5, item.getAge()) ;
      ps.setObject(6, item.getBirthday()) ;
    }
  }) ;
  return builder.build() ;
}

配置ItemProcessor處理器,數(shù)據(jù)從當(dāng)前庫讀取處理后經(jīng)過處理后再寫入另外的庫中

@Bean
ItemProcessor<User, User> userProcessor() {
  return new ItemProcessor<User, User>() {
    @Override
    public User process(User item) throws Exception {
      System.out.printf("%s - 開始處理數(shù)據(jù):%s%n", Thread.currentThread().getName(), item.toString()) ;
      // 模擬耗時(shí)操作
      TimeUnit.SECONDS.sleep(1) ;
      // 在這里你可以對(duì)數(shù)據(jù)進(jìn)行相應(yīng)的處理。
      return item ;
    }
  } ;
}

配置Step將ItemReader、ItemProcessor、ItemWriter串聯(lián)在一起。

@Bean
Step userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {
  return steps.get("userStep1")
    .<User, User>chunk(5)
    .reader(userReader)
    .processor(userProcessor)
    .writer(userWriter)
    .build() ;
}

配置Job,Job是封裝整個(gè)批處理流程的實(shí)體。在 Spring Batch 中,Job只是Step實(shí)例的容器。它將邏輯上屬于一個(gè)流程的多個(gè)步驟組合在一起,并允許對(duì)所有步驟的全局屬性(如可重啟性)進(jìn)行配置。作業(yè)配置包含:

  • 簡單的工作名稱。
  • Step實(shí)例的定義和排序。
  • Job是否可重新啟動(dòng)。
@Bean
Job userJob(Step userStep1, Step userStep2) {
  return jobs.get("userJob").start(userStep1).build();
}

以上是Spring Batch定義配置一個(gè)Job所需的核心組件。接下來會(huì)以上面的基礎(chǔ)配置進(jìn)行高階知識(shí)點(diǎn)進(jìn)行介紹。

2. 高階配置管理

2.1 通過Controller接口啟動(dòng)Job

@RequestMapping("/userJob")
public class UserJobController {
  @Resource
  private JobLauncher userJobLauncher ;
  @GetMapping("/start")
  public Object start() throws Exception {
    JobParameters jobParameters = new JobParameters() ;
    this.userJobLauncher.run(userJob, jobParameters) ;
    return "started" ;
  }
}

通過JobLauncher#run方法啟動(dòng)Job。當(dāng)你調(diào)用該接口時(shí),你會(huì)發(fā)現(xiàn)接口一直不會(huì)返回,一直阻塞,下圖是Job的啟動(dòng)序列

圖片圖片

根據(jù)上圖能知道,當(dāng)你調(diào)用run方法后,會(huì)等待整個(gè)Job退出狀態(tài)為FINISHED或者FAILED后才能結(jié)束。所以,你需要異步完成,以便 SimpleJobLauncher 立即返回給調(diào)用者。而正確的序列應(yīng)該是如下:

圖片圖片

上圖通過異步方式啟動(dòng)Job序列。

2.2 異步啟動(dòng)Job

@Bean
TaskExecutor taskExecutor() {
  ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor() ;
  taskExecutor.setThreadNamePrefix("spring_batch_launcher") ;
  taskExecutor.setCorePoolSize(10) ;
  taskExecutor.setMaxPoolSize(10) ;
  taskExecutor.initialize() ; 
  return taskExecutor ;
}
@Bean
JobLauncher userJobLauncher(JobRepository userJobRepository) {
  SimpleJobLauncher jobLauncher = new SimpleJobLauncher() ;
  jobLauncher.setJobRepository(userJobRepository) ;
  jobLauncher.setTaskExecutor(taskExecutor()) ;
  return jobLauncher ;
}

通過上面配置后,Job啟動(dòng)將是異步的會(huì)直接返回JobExecution。

2.3 重啟Job

當(dāng)一個(gè)Job正在執(zhí)行,由于斷電或者強(qiáng)制終止了程序。當(dāng)程序恢復(fù)后你希望能夠接著程序終止前的進(jìn)度繼續(xù)執(zhí)行,這時(shí)候你需要進(jìn)行如下的操作(本人沒有發(fā)現(xiàn)有什么API能夠操作的,可能文檔沒看仔細(xì))。

當(dāng)程序非正常終止是,下面兩張表的狀態(tài)都是STARTED,END_TIME為null

batch_job_execution表

圖片圖片

batch_step_execution表

圖片圖片

想要重新啟動(dòng)必須將上面的狀態(tài)修改為STOPPED,END_TIME字段設(shè)置上值(是什么值無所謂)。

然后我們就可以繼續(xù)使用上面的Controller接口啟動(dòng)任務(wù)繼續(xù)執(zhí)行了。

2.4 多線程執(zhí)行Step

為了加快程序的執(zhí)行,我們可以為Step配置線程池

@Bean
Step userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {
  return steps.get("userStep1")
    .<User, User>chunk(5)
    .reader(userReader)
    .processor(userProcessor)
    .writer(userWriter)
    // 配置線程池
    .taskExecutor(taskExecutor())
    .build() ;
}

注意:Step中使用的任何池化資源(如數(shù)據(jù)源)都可能對(duì)并發(fā)性設(shè)置限制。請(qǐng)確保這些資源池至少與步驟中所需的并發(fā)線程數(shù)一樣大。

通過上面配置線程池后,你將在控制臺(tái)看到如下輸出。

圖片圖片

默認(rèn)將有4個(gè)線程同時(shí)進(jìn)行處理??梢酝ㄟ^如下配置進(jìn)行調(diào)整

@Bean
Step userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {
  return steps.get("userStep1")
      // ...
      // 節(jié)流限制10,這里配置的大小應(yīng)該與你的數(shù)據(jù)庫連接池大小及使用的線程池核心線程數(shù)一致。
      .throttleLimit(10)
      .build() ;
}

2.5 重復(fù)啟動(dòng)Job

要想重復(fù)啟動(dòng)Job,我們可以在啟動(dòng)Job時(shí)設(shè)置不同的JobParameters參數(shù),只要參數(shù)不同那么就可以重復(fù)的啟動(dòng)Job。如下示例:

@GetMapping("/start/{page}")
public Object start(@PathVariable("page") Long page) throws Exception {
  Map<String, JobParameter> parameters = new HashMap<>() ;
  // 每次設(shè)置的參數(shù)值不同即可。
  parameters.put("page", new JobParameter(page)) ;
  JobParameters jobParameters = new JobParameters(parameters) ;
  this.userJobLauncher.run(userJob, jobParameters) ;
  return "started" ;
}

以上是本篇文章的全部內(nèi)容,希望對(duì)你有幫助。

責(zé)任編輯:武曉燕 來源: Spring全家桶實(shí)戰(zhàn)案例源碼
相關(guān)推薦

2024-12-27 14:45:59

2023-08-22 08:01:42

SpringBatch事務(wù)管理

2017-01-12 14:50:15

大數(shù)據(jù)Spring Batc框架

2012-02-20 09:49:42

ibmdw

2016-11-15 09:44:21

大數(shù)據(jù)批處理流處理

2022-08-02 20:47:38

Spring框架應(yīng)用程序

2015-06-25 13:06:48

大數(shù)據(jù)從選擇到應(yīng)用

2022-03-07 14:39:01

前端框架批處理

2016-12-18 15:03:57

Python Scikit Lea數(shù)據(jù)

2016-12-20 16:07:13

Python數(shù)據(jù)預(yù)處理

2022-03-01 08:40:34

StormHadoop批處理

2017-09-06 17:05:54

大數(shù)據(jù)處理流程處理框架

2018-04-03 10:33:15

大數(shù)據(jù)

2020-10-26 07:05:02

大數(shù)據(jù)管道編排編排框架

2019-05-29 10:42:06

大數(shù)據(jù)IT人工智能

2018-12-04 15:32:09

數(shù)據(jù)處理大數(shù)據(jù)數(shù)據(jù)分析

2018-11-05 15:15:38

大數(shù)據(jù)流式數(shù)據(jù)互聯(lián)網(wǎng)

2017-09-18 17:59:23

Hadoop數(shù)據(jù)分析

2018-04-10 14:25:30

大數(shù)據(jù)高速公路數(shù)據(jù)存儲(chǔ)

2025-03-26 01:22:00

NtyCo協(xié)程框架
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)