讀完 RocketMQ 源碼,我學(xué)會(huì)了如何優(yōu)雅的創(chuàng)建線(xiàn)程
RocketMQ 是一款開(kāi)源的分布式消息系統(tǒng),基于高可用分布式集群技術(shù),提供低延時(shí)、高可靠的消息發(fā)布與訂閱服務(wù)。
這篇文章,筆者整理了 RocketMQ 源碼中創(chuàng)建線(xiàn)程的幾點(diǎn)技巧,希望大家讀完之后,能夠有所收獲。
一、創(chuàng)建單線(xiàn)程
首先我們先溫習(xí)下常用的創(chuàng)建單線(xiàn)程的兩種方式:
實(shí)現(xiàn) Runnable 接口
繼承 Thread 類(lèi)
1.實(shí)現(xiàn) Runnable 接口
圖中,MyRunnable 類(lèi)實(shí)現(xiàn)了 Runnable 接口的 run 方法,run 方法中定義具體的任務(wù)代碼或處理邏輯,而Runnable 對(duì)象是作為線(xiàn)程構(gòu)造函數(shù)的參數(shù)。
2.繼承 Thread 類(lèi)
線(xiàn)程實(shí)現(xiàn)類(lèi)直接繼承 Thread ,本質(zhì)上也是實(shí)現(xiàn) Runnable 接口的 run 方法。
二、單線(xiàn)程抽象類(lèi)
創(chuàng)建單線(xiàn)程的兩種方式都很簡(jiǎn)單,但每次創(chuàng)建線(xiàn)程代碼顯得有點(diǎn)冗余,于是 RocketMQ 里實(shí)現(xiàn)了一個(gè)抽象類(lèi) ServiceThread 。
抽象類(lèi) ServiceThread
我們可以看到抽象類(lèi)中包含了如下核心方法:
- 定義線(xiàn)程名;
- 啟動(dòng)線(xiàn)程;
- 關(guān)閉線(xiàn)程。
下圖展示了 RocketMQ 眾多的單線(xiàn)程實(shí)現(xiàn)類(lèi)。
實(shí)現(xiàn)類(lèi)的編程模版類(lèi)似 :
我們僅僅需要繼承抽象類(lèi),并實(shí)現(xiàn) getServiceName 和 run 方法即可。啟動(dòng)的時(shí)候,調(diào)用 start 方法 , 關(guān)閉的時(shí)候調(diào)用 shutdown 方法。
三、線(xiàn)程池原理
線(xiàn)程池是一種基于池化思想管理線(xiàn)程的工具,線(xiàn)程池維護(hù)著多個(gè)線(xiàn)程,等待著監(jiān)督管理者分配可并發(fā)執(zhí)行的任務(wù)。這避免了在處理短時(shí)間任務(wù)時(shí)創(chuàng)建與銷(xiāo)毀線(xiàn)程的代價(jià)。線(xiàn)程池不僅能夠保證內(nèi)核的充分利用,還能防止過(guò)分調(diào)度。
JDK中提供的 ThreadPoolExecutor 類(lèi),是我們最常使用的線(xiàn)程池類(lèi)。
ThreadPoolExecutor構(gòu)造函數(shù)
參數(shù)名 | 作用 |
corePoolSize | 隊(duì)列沒(méi)滿(mǎn)時(shí),線(xiàn)程最大并發(fā)數(shù) |
maximumPoolSizes | 隊(duì)列滿(mǎn)后線(xiàn)程能夠達(dá)到的最大并發(fā)數(shù) |
keepAliveTime | 空閑線(xiàn)程過(guò)多久被回收的時(shí)間限制 |
unit | keepAliveTime 的時(shí)間單位 |
workQueue | 阻塞的隊(duì)列類(lèi)型 |
threadPoolFactory | 改變線(xiàn)程的名稱(chēng)、線(xiàn)程組、優(yōu)先級(jí)、守護(hù)進(jìn)程狀態(tài) |
RejectedExecutionHandler | 超出 maximumPoolSizes + workQueue 時(shí),任務(wù)會(huì)交給RejectedExecutionHandler來(lái)處理 |
任務(wù)的調(diào)度通過(guò)執(zhí)行 execute方法完成,方法的核心流程如下:
- 如果 workerCount < corePoolSize,創(chuàng)建并啟動(dòng)一個(gè)線(xiàn)程來(lái)執(zhí)行新提交的任務(wù)。
- 如果 workerCount >= corePoolSize,且線(xiàn)程池內(nèi)的阻塞隊(duì)列未滿(mǎn),則將任務(wù)添加到該阻塞隊(duì)列中。
- 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且線(xiàn)程池內(nèi)的阻塞隊(duì)列已滿(mǎn),則創(chuàng)建并啟動(dòng)一個(gè)線(xiàn)程來(lái)執(zhí)行新提交的任務(wù)。
- 如果 workerCount >= maximumPoolSize,并且線(xiàn)程池內(nèi)的阻塞隊(duì)列已滿(mǎn), 則根據(jù)拒絕策略來(lái)處理該任務(wù), 默認(rèn)的處理方式是直接拋異常。
四、線(xiàn)程池封裝
在 RocketMQ 里 ,網(wǎng)絡(luò)請(qǐng)求都會(huì)攜帶命令編碼,每種命令映射對(duì)應(yīng)的處理器,而處理器又會(huì)注冊(cè)對(duì)應(yīng)的線(xiàn)程池。
當(dāng)服務(wù)端 Broker 接收到發(fā)送消息命令時(shí),都會(huì)有單獨(dú)的線(xiàn)程池 sendMessageExecutor 來(lái)處理這種命令請(qǐng)求。
基于 ThreadPoolExecutor 做了一個(gè)簡(jiǎn)單的封裝 ,BrokerFixedThreadPoolExecutor 構(gòu)造函數(shù)包含六個(gè)核心參數(shù):
- 核心線(xiàn)程數(shù)和最大線(xiàn)程數(shù)相同 ,數(shù)量是:cpu核數(shù)和4比較后的最小值;
- 空閑線(xiàn)程的回收的時(shí)間限制,默認(rèn)1分鐘;
- 發(fā)送消息隊(duì)列,有界隊(duì)列,默認(rèn)10000;
- 線(xiàn)程工廠 ThreadFactoryImpl ,定義了線(xiàn)程名前綴:SendMessageThread_ 。
RocketMQ 實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的線(xiàn)程工廠:ThreadFactoryImpl,線(xiàn)程工廠可以定義線(xiàn)程名稱(chēng),以及是否是守護(hù)線(xiàn)程 。
線(xiàn)程工廠
開(kāi)源項(xiàng)目 Cobar ,Xmemcached,Metamorphosis 中都有類(lèi)似線(xiàn)程工廠的實(shí)現(xiàn) 。
五、線(xiàn)程名很重要
線(xiàn)程名很重要,線(xiàn)程名很重要,線(xiàn)程名很重要 ,重要的事情說(shuō)三遍。
我們看到 RocketMQ 中,無(wú)論是單線(xiàn)程抽象類(lèi)還是多線(xiàn)程的封裝都會(huì)配置線(xiàn)程名 ,因?yàn)橥ㄟ^(guò)線(xiàn)程名,非常容易定位問(wèn)題,從而大大提升解決問(wèn)題的效率。
定位的媒介常見(jiàn)有兩種:日志文件和堆棧記錄。
1.日志文件
經(jīng)常處理業(yè)務(wù)問(wèn)題的同學(xué),一定都經(jīng)常與日志打交道。
查看 ERROR 日志,追溯到執(zhí)行線(xiàn)程, 要是線(xiàn)程池隔離做的好,基本可以判斷出哪種業(yè)務(wù)場(chǎng)景出了問(wèn)題;
通過(guò)查看線(xiàn)程打印的日志,推斷線(xiàn)程調(diào)度是否正常,比如有的定時(shí)任務(wù)線(xiàn)程打印了開(kāi)始,沒(méi)有打印結(jié)束,推論當(dāng)前線(xiàn)程可能已經(jīng)掛掉或者阻塞。
2.堆棧記錄
jstack 是 java 虛擬機(jī)自帶的一種堆棧跟蹤工具 ,主要用來(lái)查看 Java 線(xiàn)程的調(diào)用堆棧,線(xiàn)程快照包含當(dāng)前 java 虛擬機(jī)內(nèi)每一條線(xiàn)程正在執(zhí)行的方法堆棧的集合,可以用來(lái)分析線(xiàn)程問(wèn)題。
jstack -l 進(jìn)程pid
筆者查看線(xiàn)程堆棧,一般關(guān)注如下幾點(diǎn):
當(dāng)前 jvm 進(jìn)程中的線(xiàn)程數(shù)量和線(xiàn)程分類(lèi)是否在預(yù)期的范圍內(nèi);
系統(tǒng)接口超時(shí)或者定時(shí)任務(wù)停止的異常場(chǎng)景下 ,分析堆棧中是否有鎖未釋放,或者線(xiàn)程一直等待網(wǎng)絡(luò)通訊響應(yīng);
分析 jvm 進(jìn)程中哪個(gè)線(xiàn)程占用的 CPU 最高。
六、總結(jié)
本文是RocketMQ 系列文章的開(kāi)篇,和朋友們簡(jiǎn)單聊聊 RocketMQ 源碼里創(chuàng)建線(xiàn)程的技巧。
單線(xiàn)程抽象類(lèi) ServiceThread使用者只需要實(shí)現(xiàn)業(yè)務(wù)邏輯以及定義線(xiàn)程名即可 ,不需要寫(xiě)冗余的代碼。
線(xiàn)程池封裝適當(dāng)封裝,定義線(xiàn)程工廠,并合理配置線(xiàn)程池參數(shù)。
線(xiàn)程名很重要文件日志,堆棧記錄配合線(xiàn)程名能大大提升解決問(wèn)題的效率。
RocketMQ 的多線(xiàn)程編程技巧很多,比如線(xiàn)程通訊,并發(fā)控制,線(xiàn)程模型等等,后續(xù)的文章會(huì)一一為大家展現(xiàn)。