看看面包超人的 '招牌線程池' 用得可還行?
本文轉載自微信公眾號「Shooter茶杯」,作者Shooter 。轉載本文請聯(lián)系Shooter茶杯公眾號。
本文主要是介紹線程池的一些進階玩法 。
面包超人鎮(zhèn)樓
1、線程池簡簡單單 4 連發(fā)
- 1、線程池的核心線程數(shù)怎么設置?
- 2、8C16G 的機器需要幾臺可以抗起 3W 的qps?
- 3、如何動態(tài)的修改線程池參數(shù)?
- 4、線程池可以先啟動最大線程數(shù)再將任務放到阻塞隊列里么?
后面的舉例的機器配置統(tǒng)一是 8核16G !
2、線程池的核心線程數(shù)到底怎么設置?首先說個不太正確的答案:
IO 密集型的設置為 2n, 計算密集型設置為 n+1
為什么不對?因為核心線程數(shù)設置多少要具體情況具體分析,大家使用線程池的業(yè)務場景不同,解決方案自然是不一樣的,下面我舉個例子做詳細的分析,然后總結出一個方法論就可以適用各個不同的場景了!!!
舉例:
- 1、假設現(xiàn)在要給 100w 用戶發(fā)放優(yōu)惠券,通過線程池異步發(fā)送
- 2、假設某線程池執(zhí)行發(fā)優(yōu)惠券的任務共耗時 50ms,其中 45ms 在io, 5ms 在進行計算
(真正的 io 耗時 計算耗時可以通過 記錄log 判斷時間差值計算出來 取平均值即可 )
3、如何設置線程池的參數(shù)快速的將這 100w 張券發(fā)完?
先拋出答案公式,再論證這個公式的正確性:
核心線程數(shù) = CPU核數(shù) * ((Io耗時 / 計算耗時) + 1)
核心線程數(shù) = 8C * ((45ms / 5ms) +1 ) = 80個
45ms / 5ms 是什么意思?
CPU 在等待 IO 返回時完全可以將 CPU 時間片拿出來去做其他的計算,45ms 可以多處理 9 個計算任務,再加上原本就有一個 5ms 在計算,也就是說: 一個CPU 核在執(zhí)行這個 50ms 發(fā)券任務時,可以并發(fā)的起10個線程去處理任務!那8C CPU 最多同時可以有 8個核心并行的處理任務, 8 * 10 = 80
一秒鐘一個線程可以處理 1000ms / 50ms = 20個任務
可以算出線程池執(zhí)行任務的峰值 qps = 20 * 80 = 1600
發(fā)完100w 張券所需時間: 100w / 1600 = 625S,也就是說大概 10分鐘左右就能發(fā)完 100w 張券。
不太正確的結論: 核心線程數(shù)在處理這個任務的情況下可以設置為 80 用來極限的壓榨機器CPU 的性能。
what?為什么算出 80 又不正確了?
因為將核心線程數(shù)設置為 80,這幾乎吃完了所有的 CPU 時間片, CPU 的負載將會達到 100% ; 試想一下生產(chǎn)環(huán)境如果你的機器 CPU 負載是 100% , 慌不慌?(CPU 負載打滿機器不會宕機, 但沒有 CPU 資源來處理用戶的請求,表現(xiàn)為服務假死/機器請求半天無反應)
設置線程池核心線程數(shù)要考慮 CPU 的使用要素
- 1、每臺機器操作系統(tǒng)需要消耗一些 CPU 資源; 假設用了 2% 的CPU 資源;
- 2、如果是面向用戶的服務,處理用戶的請求也是要消耗CPU 資源的,可以通過一些監(jiān)控系統(tǒng),看看平時 CPU 在繁忙時間段的負載是多少; 假設用了 10% 的資源;
- 3、如果除了發(fā)券任務的線程池還有其他線程池在運行,就得把其他線程池消耗的CPU資源也算上,假設用了 13% 的資源;
- 4、實際情況一些中間件框架也會用線程池,也會吃一些CPU 資源, 這里暫不做考慮。
在我的實際項目里有一個專門跑定時任務和消費 MQ 消息的服務:
我需要考慮的點:
- 1、操作系統(tǒng)的CPU 資源, 算占用 2% 的CPU資源
- 2、MQ 消費消息 算占用 5% 的CPU 資源
- 3、有其他的定時任務也在用線程池跑任務 算占用 13% 的CPU 資源
- 4、機器的 CPU 在無人監(jiān)控的非必要時段不能超過 60%。
60% - 2% - 5% - 13% = 40%
發(fā) 100w 張優(yōu)惠券的線程池就只能消耗 40%的資源于是核心線程數(shù)最多可以設置為:
核心線程數(shù): 80個 * 40% = 32個;
CPU 100% 時可以設置 80個線程去跑任務 CPU 40% 時可以設置 32個線程去跑任務 那這樣設置系統(tǒng)正常運行CPU大概是 60% 左右, 就算偶爾飆高到 70%-80% 也不用太慌~
補充: 為什么用線程池沒考慮上下文的切換?
1ms = 1000us, 一次上下文的切換大概是 1us, 上下文切換的時間跟執(zhí)行任務的時間比起來可以忽略不計。
結論 : CPU核數(shù) * ((Io耗時 / 計算耗時) + 1)
這是機器 CPU 負載 100% 時極限的值, 乘以期望的 CPU 負載百分比即可算出實際情況最佳的線程數(shù);
PS: 萬一設置錯了核心線程數(shù)又不想改代碼重新發(fā)布,可以繼續(xù)看第三個問題如何動態(tài)修改線程池參數(shù)!
2、8C16G 的機器需要幾臺可以抗起 3W 的qps?
首先算出單臺機器的 QPS, 3w 除以單臺機器的 qps 即可算出所需的機器數(shù)。
想知道單臺機器某個接口的 QPS 很簡單, 壓測即可。
不過顯然面試的時候如果被問這個問題是壓測不了的。
實際上是面試官在考察你對線程池的理解,接著往下看~
假設一個 用戶領券系統(tǒng)的 qps 在3w左右
大部分服務通常的部署在 Tomcat 上, Tomcat 內(nèi)部也是通過線程來處理用戶的請求,Tomcat 也是通過線程池來管理線程, 實際上算出 Tomcat 實際的并發(fā)和理想狀態(tài)能支持的的并發(fā)就好了。
上個問題分析出來發(fā)券接口 50ms 耗時, 8C 的CPU 占用 100%, 不考慮內(nèi)存 磁盤 網(wǎng)絡等其他開銷, 線程池極限的QPS 是1600, 這里也不考慮有沒有其他線程池或者七七八八的東西消耗 CPU 資源了。假設 CPU 只能維持在 70% 左右的負載;
單臺機器的 qps 就只能有 1600 * 70% = 1120,就算 1100
3w / 1100 = 27.27 向上取整 大概需要 28 臺機器。
作為一個有經(jīng)驗的開發(fā)人員實際部署的時候絕對要多擴容幾臺服務器來兜底, 推薦部署 32 - 36 臺機器分兩個集群部署。
3、如何動態(tài)的修改線程池參數(shù)?為什么需要動態(tài)的修改線程池參數(shù)呢?
比如第一個發(fā)券任務發(fā) 100w 張券需要 10 分鐘, 假設今天突然要發(fā) 200w 張券了, 多了100w 的發(fā)券任務,也不想用其他手段來解決了, 且機器的 CPU 負載很低只有 1% ; (為了強行舉例修改線程池參數(shù)費盡苦心)
看到第一個和第二個問題,想必你也收獲了如下信息:
使用 8C16G 的機器發(fā)放 100w 張優(yōu)惠券, 處理每個優(yōu)惠券任務耗時 50ms , 其中 45ms在IO , 5ms 在計算, 核心線程數(shù)設置為 32, CPU 負載到 40% 左右, 10分鐘可以把優(yōu)惠券發(fā)完。
如果想發(fā) 200w 張券, 最快的方法是將 核心線程數(shù) 32 設置為 64, CPU 負載在 80% 左右。
如何動態(tài)的修改線程池參數(shù)呢?
JDK 的 ThreadPoolExecutor 提供了修改線程池參數(shù)的 API
- ThreadPoolExecutor.setCorePoolSize // 修改核心線程數(shù)
- ThreadPoolExecutor.setMaximumPoolSize // 修改最大線程數(shù)
- ThreadPoolExecutor.setKeepAliveTime // 修改空閑線程存活時間
- ThreadPoolExecutor.setRejectedExecutionHandler // 修改拒絕策略
- ThreadPoolExecutor.setThreadFactory // 修改線程工廠
(不可直接修改阻塞隊列大小,想達到修改阻塞隊列的效果對線程池做一些封裝即可)
- 1、首先將線程池定義為一個 Bean 對象;
- @Bean("refreshLowPriceExecutor")
- public ThreadPoolExecutor refreshLowPriceExecutor() {
- final BlockingQueue<Runnable> queue = new LinkedBlockingDeque<>(1000000);
- final int corePoolSize = 20;
- final int maximumPoolSize = 100;
- final int keepAliveTime = 200;
- ThreadPoolExecutor executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, queue);
- return executorService;
- }
- 2、可以通過分布式配置 or controller接口 or 數(shù)據(jù)庫觸發(fā)修改線程的各個參數(shù), 推薦使用分布式配置(各種用法大同小異):
- private Map<String, String> config;
- @QMapConfig("config.properties")
- private void getValueChange(Map<String, String> config) {
- refreshLowPriceExecutor.setCorePoolSize(Integer.valueOf(config.get("core_size")));
- refreshLowPriceExecutor.setMaximumPoolSize(Integer.valueOf(config.get("max_size")));
- System.out.println("當前核心線程數(shù)為 :" + refreshLowPriceExecutor.getCorePoolSize());
- System.out.println("當前最大線程數(shù)為 :" + refreshLowPriceExecutor.getMaximumPoolSize());
- this.config = config;
- }
- 3、改了核心線程數(shù),線程池是如何讓線程數(shù)立即生效的?
- public void execute(Runnable command) {
- // 省略注釋/非核心代碼
- int c = ctl.get();
- // 線程池執(zhí)行任務的處理邏輯主要分三步
- // 第一步 : 當前線程數(shù)小于核心線程數(shù)則繼續(xù)添加worker創(chuàng)建線程
- if (workerCountOf(c) < corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- // 第二步 : 當前線程數(shù)達到了核心線程數(shù)后,將任務放進阻塞隊列
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- // 第三步 : 隊列滿了就將啟動最大線程數(shù)限制的線程, 失敗就將任務交給拒絕策略去處理
- else if (!addWorker(command, false))
- reject(command);
- }
在線程池的核心線程數(shù)被修改后,只要有任務繼續(xù)添加進線程池,execute 方法就會繼續(xù)創(chuàng)建新線程去處理任務,這樣核心線程數(shù)就生效了。
- 4、使用 ScheduledThreadPoolExecutor 監(jiān)控線程池內(nèi)部狀況
- // 封裝成一個任務
- Runnable runnable = () -> monitorThreadPool();
- public void monitorThreadPool(){
- log.info("核心線程數(shù)" + refreshLowPriceExecutor.getCorePoolSize());
- log.info("活躍線程數(shù)" + refreshLowPriceExecutor.getActiveCount());
- log.info("最大線程數(shù)" + refreshLowPriceExecutor.getMaximumPoolSize());
- log.info("任務數(shù)" + refreshLowPriceExecutor.getTaskCount());
- log.info("線程池里的線程數(shù)" + refreshLowPriceExecutor.getPoolSize());
- log.info("獲取隊列再獲取隊列任務數(shù)" + refreshLowPriceExecutor.getQueue().size());
- }
- // 將任務交給延時線程池
- executor.scheduleAtFixedRate(runnable, initialDelay,period, TimeUnit);
4、線程池可以先啟動最大線程數(shù)再將任務放到阻塞隊列里么?
答案是當然可以!
繼續(xù)分析線程池三步走的后兩步邏輯
- public void execute(Runnable command) {
- // 省略注釋/非必要代碼
- // 第二步 : 當前線程池正在運行且 阻塞隊列的 offer 方法返回 true
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- // 第三步 : 啟動大于核心線程數(shù)但小于最大線程數(shù)個線程, 添加worker失敗就將任務交給拒絕策略去處理
- else if (!addWorker(command, false))
- reject(command);
- }
啟動最大線程數(shù)再將任務放到阻塞隊列的訣竅就在 workQueue 的 offer 方法;
我們可以用自己實現(xiàn)的阻塞隊列在重寫 offer 方法; 在 offer 方法中判斷 當前線程數(shù)是否大于等于最大線程數(shù),如果不大于就返回 false, 這樣就跳過了 execute 方法的第二步, 來到了第三步的創(chuàng)建最大線程數(shù)的邏輯。
看看 dubbo 是怎么做的 , 直接將代碼 copy(白嫖) 過來即可 地址
https://github.com/apache/dubbo/blob/master/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueue.java
- @Override
- public boolean offer(Runnable runnable) {
- if (executor == null) {
- throw new RejectedExecutionException("The task queue does not have executor!");
- }
- int currentPoolThreadSize = executor.getPoolSize();
- // 主要是這個邏輯 當前線程數(shù)是否小于最大線程數(shù),如果小于返回 false
- // 這樣就可以跳過 execute 方法的第二步, 來到了第三步的創(chuàng)建最大線程數(shù)的邏輯。
- // return false to let executor create new worker.
- if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
- return false;
- }
- // currentPoolThreadSize >= max
- return super.offer(runnable);
- }
本文轉載自微信公眾號「Shooter茶杯」,可以通過以下二維碼關注。轉載本文請聯(lián)系Shooter茶杯公眾號。