一套萬(wàn)能的異步處理方案!
良好的系統(tǒng)設(shè)計(jì)必須要做到開閉原則,隨著業(yè)務(wù)的不斷迭代更新,核心代碼也會(huì)被不斷改動(dòng),出錯(cuò)的概率也會(huì)大大增加。但是大部分增加的功能都是在擴(kuò)展原有的功能,既要保證性能又要保證質(zhì)量,我們往往都會(huì)使用異步線程池來處理,然而卻增加了很多不確定性因素。由此我設(shè)計(jì)了一套通用的異步處理SDK,可以很輕松的實(shí)現(xiàn)各種異步處理。
目的
通過異步處理不僅能夠保證方法能夠得到有效的執(zhí)行而且不影響主流程。
更重要的是各種兜底方法保證數(shù)據(jù)不丟失,從而達(dá)到最終一致性。
優(yōu)點(diǎn)
無侵入設(shè)計(jì),獨(dú)立數(shù)據(jù)庫(kù),獨(dú)立定時(shí)任務(wù),獨(dú)立消息隊(duì)列,獨(dú)立人工執(zhí)行界面(統(tǒng)一登錄認(rèn)證)。
使用spring事務(wù)事件機(jī)制,即使異步策略解析失敗也不會(huì)影響業(yè)務(wù)。
如果你的方法正在運(yùn)行事務(wù),會(huì)等事務(wù)提交后或回滾后再處理事件。
就算事務(wù)提交了,異步策略解析失敗了,我們還有兜底方案執(zhí)行(除非數(shù)據(jù)庫(kù)有問題,消息隊(duì)列有問題,方法有bug)。
原理
容器初始化bean完成后遍歷所有方法,把有@AsyncExec注解的方法緩存起來。
方法運(yùn)行時(shí)通過AOP切面發(fā)布事件。
事務(wù)事件監(jiān)聽處理異步執(zhí)行策略:
@TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMPLETION)
fallbackExecution=true 沒有事務(wù)正在運(yùn)行,依然處理事件。
TransactionPhase.AFTER_COMPLETION 事務(wù)提交后和事務(wù)回滾后都處理事件。
組件
- kafka 消息隊(duì)列
- xxl job 定時(shí)任務(wù)
- mysql 數(shù)據(jù)庫(kù)
- spring 切面
- vue 界面
設(shè)計(jì)模式
- 策略
- 模板方法
- 動(dòng)態(tài)代理
- 流程圖
圖片
數(shù)據(jù)庫(kù)腳本
CREATE TABLE `async_scene` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主鍵ID',
`application_name` varchar(100) NOT NULL DEFAULT '' COMMENT '應(yīng)用名稱',
`method_sign` varchar(50) NOT NULL DEFAULT '' COMMENT '方法簽名',
`scene_name` varchar(200) NOT NULL DEFAULT '' COMMENT '業(yè)務(wù)場(chǎng)景描述',
`async_type` varchar(50) NOT NULL DEFAULT '' COMMENT '異步策略類型',
`queue_name` varchar(200) NOT NULL DEFAULT '' COMMENT '隊(duì)列名稱',
`theme_value` varchar(100) NOT NULL DEFAULT '' COMMENT '消費(fèi)主題',
`exec_count` int NOT NULL DEFAULT '0' COMMENT '失敗重試次數(shù)',
`exec_deleted` int NOT NULL DEFAULT '0' COMMENT '執(zhí)行后是否刪除',
`async_version` varchar(50) NOT NULL DEFAULT '' COMMENT '組件版本號(hào)',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時(shí)間',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新時(shí)間',
`cdc_crt_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '記錄新增時(shí)間',
`cdc_upd_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '記錄修改時(shí)間',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `uk_application_sign` (`application_name`,`method_sign`) USING BTREE,
KEY `idx_cdc_upd_time` (`cdc_upd_time`)
) ENGINE=InnoDB COMMENT='異步場(chǎng)景表';
CREATE TABLE `async_req` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主鍵ID',
`application_name` varchar(100) NOT NULL DEFAULT '' COMMENT '應(yīng)用名稱',
`sign` varchar(50) NOT NULL DEFAULT '' COMMENT '方法簽名',
`class_name` varchar(200) NOT NULL DEFAULT '' COMMENT '全路徑類名稱',
`method_name` varchar(100) NOT NULL DEFAULT '' COMMENT '方法名稱',
`async_type` varchar(50) NOT NULL DEFAULT '' COMMENT '異步策略類型',
`exec_status` tinyint NOT NULL DEFAULT '0' COMMENT '執(zhí)行狀態(tài) 0:初始化 1:執(zhí)行失敗 2:執(zhí)行成功',
`exec_count` int NOT NULL DEFAULT '0' COMMENT '執(zhí)行次數(shù)',
`param_json` longtext COMMENT '請(qǐng)求參數(shù)',
`remark` varchar(200) NOT NULL DEFAULT '' COMMENT '業(yè)務(wù)描述',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時(shí)間',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新時(shí)間',
PRIMARY KEY (`id`) USING BTREE,
KEY `idx_applocation_name` (`application_name`) USING BTREE,
KEY `idx_exec_status` (`exec_status`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='異步處理請(qǐng)求';
CREATE TABLE `async_log` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主鍵ID',
`async_id` bigint NOT NULL DEFAULT '0' COMMENT '異步請(qǐng)求ID',
`error_data` longtext COMMENT '執(zhí)行錯(cuò)誤信息',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時(shí)間',
PRIMARY KEY (`id`) USING BTREE,
KEY `idx_async_id` (`async_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='異步處理日志';
異步策略
圖片
安全級(jí)別
圖片
執(zhí)行狀態(tài)
圖片
流程圖
圖片
圖片
圖片
apollo 配置
# 開關(guān):默認(rèn)關(guān)閉
async.enabled=true
# 應(yīng)用名稱
spring.application.name=xxx
# 數(shù)據(jù)源 druid
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/fc_async?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=true
spring.datasource.username=user
spring.datasource.password=xxxx
spring.datasource.filters=config
spring.datasource.cnotallow=config.decrypt=true;config.decrypt.key=yyy
#靜態(tài)地址
spring.resources.add-mappings=true
spring.resources.static-locatinotallow=classpath:/static/
# 以下配置都有默認(rèn)值
# 核心線程數(shù)
async.executor.thread.corePoolSize=10
# 最大線程數(shù)
async.executor.thread.maxPoolSize=50
# 隊(duì)列容量
async.executor.thread.queueCapacity=10000
# 活躍時(shí)間
async.executor.thread.keepAliveSecnotallow=600
# 執(zhí)行成功是否刪除記錄:默認(rèn)刪除
async.exec.deleted=true
# 自定義隊(duì)列名稱前綴:默認(rèn)應(yīng)用名稱
async.topic=${spring.application.name}
# 重試執(zhí)行次數(shù):默認(rèn)5次
async.exec.count=5
# 重試最大查詢數(shù)量
async.retry.limit=100
# 補(bǔ)償最大查詢數(shù)量
async.comp.limit=100
# 登錄攔截:默認(rèn)false
async.login=false
用法
1.異步開關(guān)
scm.async.enabled=true
2.在需要異步執(zhí)行的方法加注解 (必須是spring代理方法)
@AsyncExec(type = AsyncExecEnum.SAVE_ASYNC, remark = "數(shù)據(jù)字典")
3.人工處理地址
http://localhost:8004/async/index.html
注意
1.應(yīng)用名稱
spring.application.name
2.隊(duì)列名稱
${async.topic:${spring.application.name}}_async_queue
3.自己業(yè)務(wù)要做冪等
4.一個(gè)應(yīng)用公用一個(gè)隊(duì)列
自產(chǎn)自消
5.定時(shí)任務(wù)
- 異步重試定時(shí)任務(wù)(2分鐘重試一次,可配置重試次數(shù))。
- 異步補(bǔ)償定時(shí)任務(wù)(一小時(shí)補(bǔ)償一次,創(chuàng)建時(shí)間在一小時(shí)之前的)。
效果展示
圖片
圖片