輕量級(jí)動(dòng)態(tài)線程池才是“王道”?
大家好,我是龍臺(tái)。
一、前言
最初設(shè)計(jì) Hippo4j 的初衷是盡可能提高以及保障線程池對(duì)于線上應(yīng)用的作用,所以加了很多個(gè)性化功能,這也間接導(dǎo)致強(qiáng)依賴 Hippo4j Server 項(xiàng)目。
自 Hippo4j 1.0.0 版本發(fā)布之后,不斷有社區(qū)小伙伴提出相同的一個(gè)問題,如何能夠輕量級(jí)使用動(dòng)態(tài)線程池?
這不,它來了。
GitHub:https://github.com/acmenlt/dynamic-threadpool
Gitee:https://gitee.com/acmenlt/dynamic-threadpool
隨著 Hippo4j 1.1.0 版本的發(fā)布,除了在原有功能進(jìn)行迭代輸出外,額外添加了一種使用模式:依賴配置中心實(shí)現(xiàn)的輕量級(jí)動(dòng)態(tài)線程池,將 Hippo4j 的源代碼從一種使用模式拆分為兩種。
兩種模式共用一套核心源代碼,保留了基礎(chǔ)且大家關(guān)注的功能,模塊取名為:Hippo4j Core。
二、Hippo4j Core
所謂“一圖勝千言”,小編畫了一張圖,來描述它的交互行為以及所支持功能。
只要你們項(xiàng)目中有配置中心,引用 hippo4j-core-spring-boot-starter后,就可以使用以上功能啦。
1. 動(dòng)態(tài)線程池參數(shù)更新
客戶端項(xiàng)目啟動(dòng)時(shí)向配置中心請(qǐng)求動(dòng)態(tài)線程池配置,獲取配置后創(chuàng)建DynamicThreadPool 線程池。
并向配置中心發(fā)起監(jiān)聽事件,當(dāng)配置中心中配置發(fā)生變更時(shí),監(jiān)聽事件實(shí)時(shí)修改項(xiàng)目中的線程池參數(shù)。
如在配置中心變更了動(dòng)態(tài)線程池配置,會(huì)在日志中打印變更信息:
[MESSAGE-CONSUME] Changed thread pool.
coreSize :: [1 => 10]
maxSize :: [1 => 20]
queueType :: [ResizableCapacityLinkedBlockIngQueue => ResizableCapacityLinkedBlockIngQueue]
capacity :: [1024 => 2048]
keepAliveTime :: [1000 => 1000]
executeTimeOut :: [600 => 600]
rejectedType :: [DiscardOldestPolicy => DiscardOldestPolicy]
allowCoreThreadTimeOut :: [false => false]
同時(shí),通過消息推送通知相關(guān)負(fù)責(zé)人。目前通知平臺(tái)已支持釘釘、企業(yè)微信以及飛書三種常用辦公軟件,以企業(yè)微信群聊機(jī)器人舉例:
2. Web 線程池參數(shù)更新
SpringBoot 內(nèi)置三種 Web 容器:Tomcat、Jetty、Undertow。
Hippo4j Core 已支持容器線程池的核心參數(shù)變更:corePoolSize、maximumPoolSize、keepAliveTime。
為什么要加 Web 線程池的動(dòng)態(tài)更新?兩個(gè)原因:
- 壓測(cè)應(yīng)用時(shí),需要針對(duì)不同的壓測(cè)流量來調(diào)整 Web 容器線程池的線程數(shù)。正常流程,調(diào)整后需要重新發(fā)布項(xiàng)目,無疑是比較費(fèi)時(shí)費(fèi)力;
- 當(dāng) SpringBoot Java 應(yīng)用響應(yīng)時(shí)間變慢,并且服務(wù)器整體負(fù)載不高時(shí),我們可以通過修改 Web 容器線程池來提高并行處理能力,以此提高響應(yīng)時(shí)間。
當(dāng)然,正常來說,線上的容器線程池配置是通過壓測(cè)后得出的最優(yōu)值。所以,這個(gè)功能在線上應(yīng)該謹(jǐn)慎使用,或者說盡量不在線上使用。
3. 動(dòng)態(tài)線程池報(bào)警策略
為了讓線程池運(yùn)行出現(xiàn)問題,及時(shí)通知到相關(guān)負(fù)責(zé)人,Hippo4j 針對(duì)線程池做了四種定制化報(bào)警策略:
- 活躍度報(bào)警:假設(shè)設(shè)置線程池活躍度報(bào)警閾值為 80%,最大線程數(shù) 10。當(dāng)線程數(shù)達(dá)到 8 發(fā)起報(bào)警;
- 阻塞隊(duì)列容量報(bào)警:假設(shè)設(shè)置容量報(bào)警閾值為 80%,阻塞隊(duì)列容量 100。當(dāng)容量達(dá)到 80 發(fā)起報(bào)警;
- 拒絕任務(wù)報(bào)警:當(dāng)線程池?zé)o法執(zhí)行任務(wù),開始執(zhí)行拒絕策略時(shí)報(bào)警;
- 執(zhí)行時(shí)間報(bào)警:假設(shè)線程池超時(shí)時(shí)間設(shè)置 1000ms,任務(wù)執(zhí)行時(shí)間超過 1000ms 發(fā)起報(bào)警。
問題比較多的小伙伴就問了,如果線程池 頻繁拒絕任務(wù)或者執(zhí)行時(shí)間頻繁超時(shí),那豈不是要被信息轟炸?
不會(huì)的。報(bào)警策略做了優(yōu)化,當(dāng)設(shè)置報(bào)警間隔時(shí)間內(nèi),線程池 + 報(bào)警類型 兩個(gè)維度僅會(huì)發(fā)出一條通知報(bào)警消息。
舉個(gè)例子,有一個(gè)線程池 ID:message-consum 的線程池,設(shè)置了報(bào)警間隔為 5 分鐘。
也就是說,活躍度、阻塞隊(duì)列容量、拒絕任務(wù)、執(zhí)行時(shí)間幾個(gè)報(bào)警緯度,message-consum 線程池在 5分鐘內(nèi)最多每個(gè)類型發(fā)送一條報(bào)警通知。
目前已支持了釘釘、企業(yè)微信以及飛書的群機(jī)器人報(bào)警。企業(yè)微信機(jī)器人示例如下:
上圖中的鏈路信息只會(huì)在超時(shí)報(bào)警時(shí)存在,這樣可以通過鏈路信息,更方便定位到線程池任務(wù)執(zhí)行緩慢的原因。
三、代碼示例
Nacos 或 Apollo 配置中心任選其一。
SpringBoot Pom 文件引入 Hippo4j Core Maven 坐標(biāo)。
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-core-spring-boot-starter</artifactId>
<version>1.1.0</version>
</dependency>
啟動(dòng)類上添加 @EnableDynamicThreadPool 注解。
@SpringBootApplication
@EnableDynamicThreadPool
public class ExampleApplication {
public static void main(String[] args) {
SpringApplication.run(ExampleApplication.class, args);
}
}
在配置中心中添加 spring.dynamic.thread-pool 前綴的配置。如下:
server:
port: 8090
servlet:
context-path: /example
spring:
profiles:
active: dev
dynamic:
thread-pool:
enable: true # 是否開啟動(dòng)態(tài)線程池
banner: true # 是否打印 banner
collect: true # 是否開啟線程池?cái)?shù)據(jù)采集,對(duì)接 Prometheus
check-state-interval: 3 # 檢查線程池狀態(tài),是否達(dá)到報(bào)警條件,單位秒
notify-platforms: # 通知報(bào)警平臺(tái),支持多個(gè),或者任選其一
- platform: 'WECHAT' # 企業(yè)微信
secret-key: 1d307bfa-815f-4662-a2e5-99415e947bb8
- platform: 'DING' # 釘釘
secret-key: 56417ebba6a27ca352f0de77a2ae9da66d01f39610b5ee8a6033c60ef9071c55
- platform: 'LARK' # 飛書
secret-key: 2cbf2808-3839-4c26-a04d-fd201dd51f9e
nacos: # nacos apollo 任選其一
data-id: xxx
group: xxx
apollo:
namespace: xxxx
config-file-type: yml # 配置中心文件格式
executors:
- thread-pool-id: 'message-consume' # 線程池標(biāo)識(shí)
core-pool-size: 1 # 核心線程數(shù)
maximum-pool-size: 1 # 最大線程數(shù)
queue-capacity: 1 # 阻塞隊(duì)列大小
execute-time-out: 1000 # 執(zhí)行超時(shí)時(shí)間,執(zhí)行任務(wù)時(shí)間超過此時(shí)間發(fā)起報(bào)警
blocking-queue: 'LinkedBlockingQueue' # 阻塞隊(duì)列名稱,參考 QueueTypeEnum,支持 SPI
rejected-handler: 'AbortPolicy' # 拒絕策略名稱,參考 RejectedPolicies,支持 SPI
keep-alive-time: 1024 # 線程存活時(shí)間,單位秒
allow-core-thread-time-out: true # 是否允許核心線程超時(shí)
thread-name-prefix: 'message-consume' # 線程名稱前綴
notify: # 通知配置
is-alarm: true # 是否報(bào)警
active-alarm: 80 # 活躍度報(bào)警閾值;假設(shè)線程池最大線程數(shù) 10,當(dāng)線程數(shù)達(dá)到 8 發(fā)起報(bào)警
capacity-alarm: 80 # 容量報(bào)警閾值;假設(shè)阻塞隊(duì)列容量 100,當(dāng)容量達(dá)到 80 發(fā)起報(bào)警
interval: 8 # 報(bào)警間隔,同一線程池下同一報(bào)警緯度,在 interval 時(shí)間內(nèi)只會(huì)報(bào)警一次,單位分鐘
receives: # 任選其一
DING: 'xxx' # 手機(jī)號(hào)
WECHAT: 'xxx' # 填寫企業(yè)微信用戶 ID(填寫其它將無法達(dá)到 @ 效果)
LARK: 'xxx' # 填寫 ou_開頭的用戶唯一標(biāo)識(shí),否則只能普通 @
使用 Hippo4j ThreadPoolBuilder 構(gòu)建動(dòng)態(tài)線程池。
import cn.hippo4j.core.executor.DynamicThreadPool;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
@Bean
@DynamicThreadPool
public ThreadPoolExecutor dynamicThreadPoolExecutor() {
String consumeThreadPoolId = "message-consume";
return ThreadPoolBuilder.builder()
.threadFactory(consumeThreadPoolId)
.dynamicPool()
.build();
}
按照 Spring Bean 注入的方式使用動(dòng)態(tài)線程池即可。
@Resource
private ThreadPoolExecutor dynamicThreadPoolExecutor;
dynamicThreadPoolExecutor.execute(() -> xxx);
沒了,是不是很 easy?我大致試了下,不到兩分鐘的時(shí)間,就能讓你的 SpringBoot 項(xiàng)目快速接入動(dòng)態(tài)線程池。
總結(jié)下接入步驟:
- Pom 中引入 Hippo4j Core 包依賴;
- 啟動(dòng)類上添加動(dòng)態(tài)線程池啟用注解;
- 配置中心(Nacos 或 Apollo)添加動(dòng)態(tài)線程池配置;
- 項(xiàng)目中以 Spring Bean 的形式創(chuàng)建動(dòng)態(tài)線程池 。
四、常見問題
1. 項(xiàng)目關(guān)閉時(shí),如何保障線程池中任務(wù)全部完成
答:借鑒了 Spring 封裝的線程池框架。構(gòu)建動(dòng)態(tài)線程池時(shí),指定waitForTasksToCompleteOnShutdown 和 awaitTerminationMillis
import cn.hippo4j.core.executor.DynamicThreadPool;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
@Bean
@DynamicThreadPool
public ThreadPoolExecutor dynamicThreadPoolExecutor() {
String consumeThreadPoolId = "message-consume";
return ThreadPoolBuilder.builder()
.threadFactory(consumeThreadPoolId)
.waitForTasksToCompleteOnShutdown(true)
.awaitTerminationMillis(5000L)
.dynamicPool()
.build();
}
這兩個(gè)參數(shù)什么意思呢?
- waitForTasksToCompleteOnShutdown:是否在關(guān)閉線程池時(shí)等待任務(wù)完成,這里我們?cè)O(shè)置 true;
- awaitTerminationMillis:等待任務(wù)完成的時(shí)間,單位毫秒。
問題很多的小伙伴可能就問了:為啥要有 awaitTerminationMillis 這個(gè)參數(shù)?直接等待全部任務(wù)完成不就行了。
線程池中都是執(zhí)行很快的任務(wù)可能是沒問題。但是,如果線程池里面都是耗時(shí)的任務(wù)呢?
停止項(xiàng)目時(shí)等個(gè)幾分鐘甚至更長(zhǎng)時(shí)間是無法忍受的。這個(gè)需要根據(jù)大家項(xiàng)目的實(shí)際情況評(píng)估。
2. 動(dòng)態(tài)線程池是否可以傳遞上下文參數(shù)
可以的,同樣是借鑒 Spring 線程池框架。實(shí)現(xiàn) TaskDecorator 接口,并在構(gòu)建動(dòng)態(tài)線程池時(shí)指定。
import cn.hippo4j.core.executor.DynamicThreadPool;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
@Bean
@DynamicThreadPool
public ThreadPoolExecutor dynamicThreadPoolExecutor() {
String consumeThreadPoolId = "message-consume";
return ThreadPoolBuilder.builder()
.threadFactory(consumeThreadPoolId)
.waitForTasksToCompleteOnShutdown(true)
.awaitTerminationMillis(5000L)
.taskDecorator(new TaskDecoratorTest.ContextCopyingDecorator())
.dynamicPool()
.build();
}
3. Hippo4j Core 和 Hippo4j Server 轉(zhuǎn)換麻煩么
有些小伙伴最初用的 Hippo4j Core,覺得功能并不滿足使用,想要使用 Hippo4j Server,問我如何轉(zhuǎn)換?
其實(shí)這里非常簡(jiǎn)單。從項(xiàng)目上來說:代碼無需任何改變,把 Pom 文件中的依賴坐標(biāo)改下就可以。
其次就是將配置中心里的配置遷移到 Hippo4j 的控制臺(tái),將線程池記錄創(chuàng)建才出來即可。
五、總結(jié)
文章介紹了 Hippo4j 新增的一種使用模式:依賴配置中心的輕量動(dòng)態(tài)線程池的實(shí)現(xiàn)。
不太好評(píng)價(jià) Hippo4j Server 和 Hippo4j Core 的好壞。一個(gè)是功能更強(qiáng)大,一個(gè)是引入更加輕量,使用上具體如何,交給使用者來評(píng)價(jià)。
如果項(xiàng)目中使用了 配置中心以及線程池 的話,強(qiáng)烈推薦大家引入到項(xiàng)目中試一試,為項(xiàng)目線上的穩(wěn)定性多了一份保障。
因?yàn)閭€(gè)人能力有限,項(xiàng)目中難免會(huì)有考慮不到或待優(yōu)化的地方,各位小伙伴有興趣可以提交 PR 修復(fù)。