SpringBoot與ElasticJob整合,實(shí)現(xiàn)一百萬條數(shù)據(jù)的狀態(tài)秒級(jí)更新
隨著訂單數(shù)量急劇增加,傳統(tǒng)的單線程訂單狀態(tài)更新方式已經(jīng)無法滿足高效處理的需求。為了提高訂單狀態(tài)更新的效率和系統(tǒng)的響應(yīng)能力,我們決定采用分布式任務(wù)調(diào)度實(shí)現(xiàn)高效的訂單狀態(tài)批量更新。
應(yīng)用場(chǎng)景
緩存預(yù)熱
- 緩存填充:在系統(tǒng)啟動(dòng)時(shí)或高峰期來臨前,預(yù)先加載數(shù)據(jù)到緩存中,每個(gè)分片處理一部分?jǐn)?shù)據(jù)。
用戶通知系統(tǒng)
- 郵件發(fā)送:向大量用戶發(fā)送電子郵件,可以通過分片來并行處理郵件發(fā)送任務(wù),減少延遲。
- 推送通知:向移動(dòng)設(shè)備推送通知,每個(gè)分片處理一部分用戶的推送任務(wù)。
消息隊(duì)列處理
- 消息處理:消費(fèi)消息隊(duì)列中的消息,通過分片來提高消息處理的速度和效率。
- 事件驅(qū)動(dòng)任務(wù):根據(jù)事件觸發(fā)的任務(wù),可以使用分片來并行處理不同的事件類型或事件來源。
數(shù)據(jù)批處理
- 批量數(shù)據(jù)導(dǎo)入/導(dǎo)出:將大量數(shù)據(jù)從一個(gè)系統(tǒng)遷移到另一個(gè)系統(tǒng)時(shí),可以將數(shù)據(jù)分成多個(gè)批次進(jìn)行處理。
- 日志處理:分析和處理大量的日志文件,每個(gè)分片處理一部分日志。
定時(shí)任務(wù)調(diào)度
- 定時(shí)數(shù)據(jù)同步:定期將數(shù)據(jù)從源數(shù)據(jù)庫(kù)同步到目標(biāo)數(shù)據(jù)庫(kù),可以通過分片來并行處理不同部分的數(shù)據(jù)。
- 報(bào)告生成:生成復(fù)雜的報(bào)表或統(tǒng)計(jì)信息,每個(gè)分片負(fù)責(zé)計(jì)算一部分?jǐn)?shù)據(jù)的結(jié)果。
大規(guī)模數(shù)據(jù)分析
- 數(shù)據(jù)清洗:對(duì)海量數(shù)據(jù)進(jìn)行清洗和預(yù)處理,每個(gè)分片處理一部分?jǐn)?shù)據(jù)。
- 特征提?。簭拇髷?shù)據(jù)集中提取特征,每個(gè)分片處理一部分?jǐn)?shù)據(jù)集。
分布式爬蟲
- 網(wǎng)頁抓取:分布式爬蟲可以從多個(gè)節(jié)點(diǎn)同時(shí)抓取網(wǎng)頁內(nèi)容,每個(gè)分片負(fù)責(zé)抓取一組URL。
- 數(shù)據(jù)采集:從不同的數(shù)據(jù)源收集數(shù)據(jù),每個(gè)分片處理一個(gè)數(shù)據(jù)源。
游戲服務(wù)器管理
- 玩家數(shù)據(jù)更新:在游戲中,頻繁地更新玩家數(shù)據(jù),可以通過分片來并行處理不同玩家的數(shù)據(jù)。
- 游戲邏輯計(jì)算:在多人在線游戲中,并行計(jì)算不同區(qū)域的游戲邏輯。
內(nèi)容推薦系統(tǒng)
- 個(gè)性化推薦:為用戶提供個(gè)性化的推薦內(nèi)容,每個(gè)分片處理一部分用戶的推薦任務(wù)。
實(shí)時(shí)監(jiān)控與報(bào)警
- 監(jiān)控指標(biāo)收集:實(shí)時(shí)收集和處理監(jiān)控指標(biāo),每個(gè)分片負(fù)責(zé)收集一部分系統(tǒng)的監(jiān)控?cái)?shù)據(jù)。
- 報(bào)警規(guī)則評(píng)估:評(píng)估報(bào)警規(guī)則,每個(gè)分片處理一部分報(bào)警條件。
任務(wù)分片的目的
- 負(fù)載均衡:通過將任務(wù)分配到多個(gè)節(jié)點(diǎn)上,避免單個(gè)節(jié)點(diǎn)過載。
- 提高性能:利用多核CPU或多臺(tái)機(jī)器的計(jì)算能力來加速任務(wù)執(zhí)行。
- 容錯(cuò)性:即使某個(gè)節(jié)點(diǎn)失敗,其他節(jié)點(diǎn)仍然可以繼續(xù)處理剩余的任務(wù)。
我們?yōu)槭裁催x擇ElasticJob?
簡(jiǎn)化任務(wù)開發(fā)
ElasticJob 提供了簡(jiǎn)潔的任務(wù)接口,開發(fā)者只需關(guān)注具體的業(yè)務(wù)邏輯,而不必過多關(guān)心任務(wù)調(diào)度的底層細(xì)節(jié)。這大大提高了開發(fā)效率,減少了潛在的錯(cuò)誤。
細(xì)粒度的日志記錄
ElasticJob 支持詳細(xì)的日志記錄,可以幫助開發(fā)者追蹤任務(wù)的執(zhí)行過程,定位和解決問題。這對(duì)于調(diào)試和優(yōu)化性能至關(guān)重要。
可靠的作業(yè)執(zhí)行機(jī)制
ElasticJob具備強(qiáng)大的容錯(cuò)能力和故障恢復(fù)機(jī)制。如果某個(gè)節(jié)點(diǎn)發(fā)生故障,其他節(jié)點(diǎn)可以接管其分片任務(wù),確保任務(wù)的連續(xù)性和可靠性。這種設(shè)計(jì)使得系統(tǒng)能夠在面對(duì)突發(fā)情況時(shí)保持穩(wěn)定運(yùn)行。
動(dòng)態(tài)分片策略
ElasticJob支持多種分片策略,可以根據(jù)實(shí)際需求調(diào)整分片的數(shù)量和分布方式。這對(duì)于處理不同規(guī)模的數(shù)據(jù)集非常有用。例如,在訂單數(shù)量增加時(shí),可以通過簡(jiǎn)單的配置調(diào)整分片總數(shù),而無需對(duì)代碼進(jìn)行大量修改。
可視化監(jiān)控與管理
ElasticJob 提供了一個(gè)內(nèi)置的控制臺(tái),用于監(jiān)控和管理任務(wù)的執(zhí)行情況。通過這個(gè)控制臺(tái),管理員可以實(shí)時(shí)查看任務(wù)的狀態(tài)、執(zhí)行歷史、分片信息等,方便進(jìn)行運(yùn)維和調(diào)優(yōu)。
支持定時(shí)任務(wù)
ElasticJob 內(nèi)置了Cron表達(dá)式的支持,可以輕松地設(shè)置任務(wù)的執(zhí)行時(shí)間表。這對(duì)于需要定期執(zhí)行的任務(wù)(如每天或每小時(shí)執(zhí)行一次的訂單狀態(tài)更新)非常方便。
如何進(jìn)行任務(wù)分片?
- 確定分片總數(shù):根據(jù)任務(wù)的特點(diǎn)和系統(tǒng)的資源情況決定需要分成多少個(gè)小任務(wù)。
- 分配分片項(xiàng):為每個(gè)小任務(wù)分配一個(gè)唯一的分片項(xiàng)。
- 實(shí)現(xiàn)任務(wù)邏輯:編寫代碼來處理特定分片項(xiàng)對(duì)應(yīng)的任務(wù)數(shù)據(jù)。
- 調(diào)度器管理:使用調(diào)度框架ElasticJob來管理和調(diào)度這些分片任務(wù)。
代碼實(shí)操
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Data JPA -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- MySQL Connector -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!-- ElasticJob Lite Core -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<!-- ElasticJob Lite Spring Boot Starter -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring-boot-starter</artifactId>
<version>2.1.5</version>
</dependency>
<!-- Zookeeper Client -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>
application.yml
server:
port:8080
spring:
datasource:
url:jdbc:mysql://localhost:3306/order_db?useSSL=false&serverTimezone=UTC
username:root
password:123456
driver-class-name:com.mysql.cj.jdbc.Driver
jpa:
hibernate:
ddl-auto:update
show-sql:true
properties:
hibernate:
dialect:org.hibernate.dialect.MySQL5InnoDBDialect
elasticjob:
regCenter:
serverLists:localhost:2181# ZooKeeper服務(wù)器地址
namespace:elastic-job-demo # 命名空間
jobs:
orderStatusUpdateJob:
cron: 0 0 * * * ? # Cron表達(dá)式,每小時(shí)執(zhí)行一次
shardingTotalCount:5 # 分片總數(shù)
jobClass:com.example.job.OrderStatusUpdateJob# 任務(wù)類全限定名
description:"更新訂單狀態(tài)"# 任務(wù)描述
Application.java
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling // 啟用定時(shí)任務(wù)調(diào)度
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
訂單狀態(tài)更新任務(wù)
package com.example.job;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.simple.api.SimpleJob;
import com.example.service.OrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 訂單狀態(tài)更新任務(wù)類
* 實(shí)現(xiàn)SimpleJob接口,用于定期更新訂單狀態(tài)
*/
@Component
publicclass OrderStatusUpdateJob implements SimpleJob {
privatestaticfinal Logger log = LoggerFactory.getLogger(OrderStatusUpdateJob.class);
@Autowired
private OrderService orderService; // 注入OrderService服務(wù)
/**
* 執(zhí)行任務(wù)的方法
* @param context 分片上下文
*/
@Override
public void execute(ShardingContext context) {
int shardingItem = context.getShardingItem(); // 獲取當(dāng)前分片項(xiàng)
int shardingTotalCount = context.getShardingTotalCount(); // 獲取總分片數(shù)
long maxOrderId = orderService.getMaxOrderId(); // 獲取最大訂單ID
// 計(jì)算當(dāng)前分片需要處理的訂單范圍
long startId = (long) shardingItem * (maxOrderId / shardingTotalCount);
long endId = Math.min(startId + (maxOrderId / shardingTotalCount), maxOrderId);
log.info("Processing orders from {} to {}", startId, endId); // 記錄處理的訂單范圍
int updatedCount = orderService.updateStatusInRange(startId, endId, "Processed"); // 更新訂單狀態(tài)
log.info("Updated {} orders in range {} to {}", updatedCount, startId, endId); // 記錄更新結(jié)果
}
}
訂單實(shí)體類
package com.example.model;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
/**
* 訂單實(shí)體類
* 映射到數(shù)據(jù)庫(kù)中的order表
*/
@Entity
publicclass Order {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id; // 訂單ID
private String status; // 訂單狀態(tài)
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
}
訂單Repository
package com.example.repository;
import com.example.model.Order;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
/**
* 訂單Repository接口
* 提供基本的CRUD操作和自定義查詢
*/
publicinterface OrderRepository extends JpaRepository<Order, Long> {
/**
* 根據(jù)ID范圍查找訂單
* @param startId 開始ID
* @param endId 結(jié)束ID
* @return 訂單列表
*/
List<Order> findByIdBetween(Long startId, Long endId);
/**
* 根據(jù)ID范圍更新訂單狀態(tài)
* @param startId 開始ID
* @param endId 結(jié)束ID
* @param newStatus 新狀態(tài)
* @return 更新的訂單數(shù)量
*/
@Modifying
@Transactional
@Query("UPDATE Order o SET o.status = :newStatus WHERE o.id BETWEEN :startId AND :endId")
int updateStatusInRange(@Param("startId") Long startId, @Param("endId") Long endId, @Param("newStatus") String newStatus);
}
訂單服務(wù)類
package com.example.service;
import com.example.model.Order;
import com.example.repository.OrderRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* 訂單服務(wù)類
* 處理訂單相關(guān)的業(yè)務(wù)邏輯
*/
@Service
publicclass OrderService {
@Autowired
private OrderRepository orderRepository; // 注入OrderRepository
/**
* 根據(jù)ID范圍獲取訂單
* @param startId 開始ID
* @param endId 結(jié)束ID
* @return 訂單列表
*/
public List<Order> getOrdersByRange(Long startId, Long endId) {
return orderRepository.findByIdBetween(startId, endId);
}
/**
* 根據(jù)ID范圍更新訂單狀態(tài)
* @param startId 開始ID
* @param endId 結(jié)束ID
* @param newStatus 新狀態(tài)
* @return 更新的訂單數(shù)量
*/
public int updateStatusInRange(Long startId, Long endId, String newStatus) {
return orderRepository.updateStatusInRange(startId, endId, newStatus);
}
/**
* 獲取最大訂單ID
* @return 最大訂單ID
*/
public long getMaxOrderId() {
return orderRepository.findAll().stream()
.mapToLong(Order::getId)
.max()
.orElse(0L);
}
}