自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

基于SpringBoot使用Spring Batch批處理框架,處理大數(shù)據(jù)新方案

開發(fā) 前端
Spring Batch可以提供大量的,可重復(fù)的數(shù)據(jù)處理功能,包括日志記錄/跟蹤,事務(wù)管理,作業(yè)處理統(tǒng)計工作重新啟動、跳過,和資源管理等重要功能。

環(huán)境:Springboot2.4.12 + Spring Batch4.2.7

Spring Batch是一個輕量級的,完全面向Spring的批處理框架,可以應(yīng)用于企業(yè)級大量的數(shù)據(jù)處理系統(tǒng)。Spring Batch以POJO和大家熟知的Spring框架為基礎(chǔ),使開發(fā)者更容易的訪問和利用企業(yè)級服務(wù)。Spring Batch可以提供大量的,可重復(fù)的數(shù)據(jù)處理功能,包括日志記錄/跟蹤,事務(wù)管理,作業(yè)處理統(tǒng)計工作重新啟動、跳過,和資源管理等重要功能。

業(yè)務(wù)場景:

  1. 定期提交批處理。
  2. 并行批處理:作業(yè)的并行處理
  3. 分階段、企業(yè)消息驅(qū)動的處理
  4. 大規(guī)模并行批處理
  5. 故障后手動或計劃重新啟動
  6. 相關(guān)步驟的順序處理(擴展到工作流驅(qū)動的批處理)
  7. 部分處理:跳過記錄(例如,回滾時)
  8. 整批事務(wù),適用于小批量或現(xiàn)有存儲過程/腳本的情況

技術(shù)目標:

  • 批處理開發(fā)人員使用Spring編程模型:專注于業(yè)務(wù)邏輯,讓框架負責(zé)基礎(chǔ)設(shè)施。
  • 基礎(chǔ)架構(gòu)、批處理執(zhí)行環(huán)境和批處理應(yīng)用程序之間的關(guān)注點清晰分離。
  • 提供通用的核心執(zhí)行服務(wù),作為所有項目都可以實現(xiàn)的接口。
  • 提供可“開箱即用”的核心執(zhí)行接口的簡單和默認實現(xiàn)。
  • 通過在所有層中利用spring框架,可以輕松配置、定制和擴展服務(wù)。
  • 所有現(xiàn)有的核心服務(wù)都應(yīng)該易于替換或擴展,而不會對基礎(chǔ)架構(gòu)層造成任何影響。
  • 提供一個簡單的部署模型,使用Maven構(gòu)建的架構(gòu)JAR與應(yīng)用程序完全分離。

Spring Batch的結(jié)構(gòu):

圖片圖片

此分層體系結(jié)構(gòu)突出了三個主要的高級組件:應(yīng)用程序、核心和基礎(chǔ)架構(gòu)。該應(yīng)用程序包含開發(fā)人員使用SpringBatch編寫的所有批處理作業(yè)和自定義代碼。批處理核心包含啟動和控制批處理作業(yè)所需的核心運行時類。它包括JobLauncher、Job和Step的實現(xiàn)。應(yīng)用程序和核心都構(gòu)建在公共基礎(chǔ)架構(gòu)之上。此基礎(chǔ)結(jié)構(gòu)包含公共讀寫器和服務(wù)(如RetryTemplate),應(yīng)用程序開發(fā)人員(讀寫器,如ItemReader和ItemWriter)和核心框架本身(retry,它是自己的庫)都使用這些服務(wù)。

下面介紹開發(fā)流程

本例完成 讀取文件內(nèi)容,經(jīng)過處理后,將數(shù)據(jù)保存到數(shù)據(jù)庫中

引入依賴

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
  <groupId>org.hibernate</groupId>
  <artifactId>hibernate-validator</artifactId>
  <version>6.0.7.Final</version>
</dependency>

應(yīng)用配置文件

spring:
  datasource:
    driverClassName: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/batch?serverTimeznotallow=GMT%2B8
    username: root
    password: *******
    type: com.zaxxer.hikari.HikariDataSource
    hikari:
      minimumIdle: 10
      maximumPoolSize: 200
      autoCommit: true
      idleTimeout: 30000
      poolName: MasterDatabookHikariCP
      maxLifetime: 1800000
      connectionTimeout: 30000
      connectionTestQuery: SELECT 1
---
spring:
  jpa:
    generateDdl: false
    hibernate:
      ddlAuto: update
    openInView: true
    show-sql: true
---
spring:
  batch:
    job:
      enabled: false #是否自動執(zhí)行任務(wù)
    initialize-schema: always  #自動為我們創(chuàng)建數(shù)據(jù)庫腳本

開啟批處理功能

@Configuration
@EnableBatchProcessing
public class BatchConfig extends DefaultBatchConfigurer{
}

任務(wù)啟動器

接著上一步的配置類BatchConfig重寫對應(yīng)方法

@Override
protected JobLauncher createJobLauncher() throws Exception {
  SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
  jobLauncher.setJobRepository(createJobRepository());
  jobLauncher.afterPropertiesSet();
  return jobLauncher;
}

任務(wù)存儲

接著上一步的配置類BatchConfig重寫對應(yīng)方法

@Resource
private PlatformTransactionManager transactionManager ;
@Override
protected JobRepository createJobRepository() throws Exception {
  JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
  factory.setDatabaseType("mysql");
  factory.setTransactionManager(transactionManager);
  factory.setDataSource(dataSource);
  factory.afterPropertiesSet();
  return factory.getObject();
}

定義JOB

@Bean
public Job myJob(JobBuilderFactory builder, @Qualifier("myStep")Step step){
  return builder.get("myJob")
      .incrementer(new RunIdIncrementer())
      .flow(step)
      .end()
      .listener(jobExecutionListener)
      .build();
}

定義ItemReader讀取器

@Bean
public ItemReader<Person> reader(){
  FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
  reader.setResource(new ClassPathResource("cvs/persons.cvs"));
  reader.setLineMapper(new DefaultLineMapper<Person>() {
    // 代碼塊
    {
      setLineTokenizer(new DelimitedLineTokenizer(",") {
        {
          setNames("id", "name");
        }
      }) ;
      setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {
        {
          setTargetType(Person.class) ;
        }
      });
    }
  });
  return reader;
}

定義ItemProcessor處理器

@Bean
public ItemProcessor<Person, Person2> processorPerson(){
  return new ItemProcessor<Person, Person2>() {
      @Override
      public Person2 process(Person item) throws Exception {
        Person2 p = new Person2() ;
        p.setId(item.getId()) ;
        p.setName(item.getName() + ", pk");
        return p ;
      }
  } ;
}

定義ItemWriter寫數(shù)據(jù)

@Resource
private Validator<Person> validator ;
@Resource
private EntityManagerFactory entityManagerFactory ;
@Bean
public ItemWriter<Person2> writerPerson(){
  JpaItemWriter<Person2> writer = null ;
  JpaItemWriterBuilder<Person2> builder = new JpaItemWriterBuilder<>() ;
  builder.entityManagerFactory(entityManagerFactory) ;
  writer = builder.build() ;
  return writer;
}

定義Step

@Bean
public Step myStep(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person, Person> processor){
  return stepBuilderFactory
          .get("myStep")
          .<Person, Person>chunk(2) // Chunk的機制(即每次讀取一條數(shù)據(jù),再處理一條數(shù)據(jù),累積到一定數(shù)量后再一次性交給writer進行寫入操作)
          .reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2)
          .listener(new MyReadListener())
          .processor(processor)
          .writer(writer).faultTolerant().skip(Exception.class).skipLimit(2)
          .listener(new MyWriteListener())
          .build();
}

定義相應(yīng)的監(jiān)聽器

public class MyReadListener implements ItemReadListener<Person> {


  private Logger logger = LoggerFactory.getLogger(MyReadListener.class);


  @Override
  public void beforeRead() {
  }


  @Override
  public void afterRead(Person item) {
    System.out.println("reader after: " + Thread.currentThread().getName()) ;
  }


  @Override
  public void onReadError(Exception ex) {
    logger.info("讀取數(shù)據(jù)錯誤:{}", ex);
  }
}
@Component
public class MyWriteListener implements ItemWriteListener<Person> {
  
    private Logger logger = LoggerFactory.getLogger(MyWriteListener.class);
 
    @Override
    public void beforeWrite(List<? extends Person> items) {
    }
    
    @Override
    public void afterWrite(List<? extends Person> items) {
      System.out.println("writer after: " + Thread.currentThread().getName()) ;
    }
    
    @Override
    public void onWriteError(Exception exception, List<? extends Person> items) {
        try {
            logger.info(format("%s%n", exception.getMessage()));
            for (Person item : items) {
                logger.info(format("Failed writing BlogInfo : %s", item.toString()));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

person.cvs文件內(nèi)容

圖片圖片

實體類:

@Entity
@Table(name = "t_person")
public class Person {
  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  private Integer id ;
  private String name ;
}

啟動任務(wù)執(zhí)行

@RestController
@RequestMapping("/demo")
public class DemoController {
  @Resource
  @Qualifier("myJob")
  private Job job ;
  @Resource
  private JobLauncher launcher ;
  @GetMapping("/index")
  public Object index() {
    JobParameters jobParameters = new JobParametersBuilder().toJobParameters() ;
    try {
      launcher.run(job, jobParameters) ;
    } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
        | JobParametersInvalidException e) {
      e.printStackTrace();
    }
    return "success" ;
  }
}

啟動服務(wù),自動為我們創(chuàng)建了表

圖片圖片

執(zhí)行任務(wù)

查看表情況

圖片圖片


圖片圖片


圖片圖片

責(zé)任編輯:武曉燕 來源: Spring全家桶實戰(zhàn)案例源碼
點贊
收藏

51CTO技術(shù)棧公眾號