10分鐘手?jǐn)]Java線(xiàn)程池,yyds??!
最近有不少小伙伴私信我說(shuō):看了我在【精通高并發(fā)系列】文章中寫(xiě)的深度解析線(xiàn)程池源碼部分的文章,但是還是有些不明白線(xiàn)程池的實(shí)現(xiàn)原理。問(wèn)我能不能手寫(xiě)一個(gè)簡(jiǎn)單的線(xiàn)程池,幫助讀者深刻理解線(xiàn)程池的原理。
這不,我熬夜肝了這篇文章。
在【精通高并發(fā)系列】的文章中,我們?cè)?jīng)深度解析過(guò)線(xiàn)程池的源碼,從源碼層面深度解析了線(xiàn)程池的實(shí)現(xiàn)原理。
其實(shí),源碼是原理落地的最直接體現(xiàn),看懂源碼對(duì)于深刻理解原理有著很大的幫助。但是不少小伙伴看源碼時(shí),總覺(jué)得源碼太枯燥了,看不懂。
那今天,我們就一起花10分鐘手?jǐn)]一個(gè)極簡(jiǎn)版的Java線(xiàn)程池,讓小伙伴們更好的理解線(xiàn)程池的核心原理。
本文的整體結(jié)構(gòu)如下所示。
Java線(xiàn)程池核心原理
看過(guò)Java線(xiàn)程池源碼的小伙伴都知道,在Java線(xiàn)程池中最核心的類(lèi)就是ThreadPoolExecutor,而在ThreadPoolExecutor類(lèi)中最核心的構(gòu)造方法就是帶有7個(gè)參數(shù)的構(gòu)造方法,如下所示。
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler)
各參數(shù)的含義如下所示。
- corePoolSize:線(xiàn)程池中的常駐核心線(xiàn)程數(shù)。
- maximumPoolSize:線(xiàn)程池能夠容納同時(shí)執(zhí)行的最大線(xiàn)程數(shù),此值大于等于1。
- keepAliveTime:多余的空閑線(xiàn)程存活時(shí)間,當(dāng)空間時(shí)間達(dá)到keepAliveTime值時(shí),多余的線(xiàn)程會(huì)被銷(xiāo)毀直到只剩下corePoolSize個(gè)線(xiàn)程為止。
- unit:keepAliveTime的單位。
- workQueue:任務(wù)隊(duì)列,被提交但尚未被執(zhí)行的任務(wù)。
- threadFactory:表示生成線(xiàn)程池中工作線(xiàn)程的線(xiàn)程工廠(chǎng),用戶(hù)創(chuàng)建新線(xiàn)程,一般用默認(rèn)即可。
- handler:拒絕策略,表示當(dāng)線(xiàn)程隊(duì)列滿(mǎn)了并且工作線(xiàn)程大于等于線(xiàn)程池的最大顯示數(shù)(maxnumPoolSize)時(shí),如何來(lái)拒絕請(qǐng)求執(zhí)行的runnable的策略。
并且Java的線(xiàn)程池是通過(guò) 生產(chǎn)者-消費(fèi)者模式 實(shí)現(xiàn)的,線(xiàn)程池的使用方是生產(chǎn)者,而線(xiàn)程池本身就是消費(fèi)者。
Java線(xiàn)程池的核心工作流程如下圖所示。
手?jǐn)]Java線(xiàn)程池
我們自己手動(dòng)實(shí)現(xiàn)的線(xiàn)程池要比Java自身的線(xiàn)程池簡(jiǎn)單的多,我們?nèi)サ袅烁鞣N復(fù)雜的處理方式,只保留了最核心的原理:線(xiàn)程池的使用者向任務(wù)隊(duì)列中添加任務(wù),而線(xiàn)程池本身從任務(wù)隊(duì)列中消費(fèi)任務(wù)并執(zhí)行任務(wù)。
只要理解了這個(gè)核心原理,接下來(lái)的代碼就簡(jiǎn)單多了。在實(shí)現(xiàn)這個(gè)簡(jiǎn)單的線(xiàn)程池時(shí),我們可以將整個(gè)實(shí)現(xiàn)過(guò)程進(jìn)行拆解。拆解后的實(shí)現(xiàn)流程為:定義核心字段、創(chuàng)建內(nèi)部類(lèi)WorkThread、創(chuàng)建ThreadPool類(lèi)的構(gòu)造方法和創(chuàng)建執(zhí)行任務(wù)的方法。
定義核心字段
首先,我們創(chuàng)建一個(gè)名稱(chēng)為T(mén)hreadPool的Java類(lèi),并在這個(gè)類(lèi)中定義如下核心字段。
- DEFAULT_WORKQUEUE_SIZE:靜態(tài)常量,表示默認(rèn)的阻塞隊(duì)列大小。
- workQueue:模擬實(shí)際的線(xiàn)程池使用阻塞隊(duì)列來(lái)實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模式。
- workThreads:模擬實(shí)際的線(xiàn)程池使用List集合保存線(xiàn)程池內(nèi)部的工作線(xiàn)程。
核心代碼如下所示。
- //默認(rèn)阻塞隊(duì)列大小
- private static final int DEFAULT_WORKQUEUE_SIZE = 5;
- //模擬實(shí)際的線(xiàn)程池使用阻塞隊(duì)列來(lái)實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模式
- private BlockingQueue<Runnable> workQueue;
- //模擬實(shí)際的線(xiàn)程池使用List集合保存線(xiàn)程池內(nèi)部的工作線(xiàn)程
- private List<WorkThread> workThreads = new ArrayList<WorkThread>();
創(chuàng)建內(nèi)部類(lèi)WordThread
在ThreadPool類(lèi)中創(chuàng)建一個(gè)內(nèi)部類(lèi)WorkThread,模擬線(xiàn)程池中的工作線(xiàn)程。主要的作用就是消費(fèi)workQueue中的任務(wù),并執(zhí)行任務(wù)。由于工作線(xiàn)程需要不斷從workQueue中獲取任務(wù),所以,這里使用了while(true)循環(huán)不斷嘗試消費(fèi)隊(duì)列中的任務(wù)。
核心代碼如下所示。
- //內(nèi)部類(lèi)WorkThread,模擬線(xiàn)程池中的工作線(xiàn)程
- //主要的作用就是消費(fèi)workQueue中的任務(wù),并執(zhí)行
- //由于工作線(xiàn)程需要不斷從workQueue中獲取任務(wù),使用了while(true)循環(huán)不斷嘗試消費(fèi)隊(duì)列中的任務(wù)
- class WorkThread extends Thread{
- @Override
- public void run() {
- //不斷循環(huán)獲取隊(duì)列中的任務(wù)
- while (true){
- //當(dāng)沒(méi)有任務(wù)時(shí),會(huì)阻塞
- try {
- Runnable workTask = workQueue.take();
- workTask.run();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
創(chuàng)建ThreadPool類(lèi)的構(gòu)造方法
這里,我們?yōu)門(mén)hreadPool類(lèi)創(chuàng)建兩個(gè)構(gòu)造方法,一個(gè)構(gòu)造方法中傳入線(xiàn)程池的容量大小和阻塞隊(duì)列,另一個(gè)構(gòu)造方法中只傳入線(xiàn)程池的容量大小。
核心代碼如下所示。
- //在ThreadPool的構(gòu)造方法中傳入線(xiàn)程池的大小和阻塞隊(duì)列
- public ThreadPool(int poolSize, BlockingQueue<Runnable> workQueue){
- this.workQueue = workQueue;
- //創(chuàng)建poolSize個(gè)工作線(xiàn)程并將其加入到workThreads集合中
- IntStream.range(0, poolSize).forEach((i) -> {
- WorkThread workThread = new WorkThread();
- workThread.start();
- workThreads.add(workThread);
- });
- }
- //在ThreadPool的構(gòu)造方法中傳入線(xiàn)程池的大小
- public ThreadPool(int poolSize){
- this(poolSize, new LinkedBlockingQueue<>(DEFAULT_WORKQUEUE_SIZE));
- }
創(chuàng)建執(zhí)行任務(wù)的方法
在ThreadPool類(lèi)中創(chuàng)建執(zhí)行任務(wù)的方法execute(),execute()方法的實(shí)現(xiàn)比較簡(jiǎn)單,就是將方法接收到的Runnable任務(wù)加入到workQueue隊(duì)列中。
核心代碼如下所示。
- //通過(guò)線(xiàn)程池執(zhí)行任務(wù)
- public void execute(Runnable task){
- try {
- workQueue.put(task);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
完整源碼
這里,我們給出手動(dòng)實(shí)現(xiàn)的ThreadPool線(xiàn)程池的完整源代碼,如下所示。
- package io.binghe.thread.pool;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.stream.IntStream;
- /**
- * @author binghe
- * @version 1.0.0
- * @description 自定義線(xiàn)程池
- */
- public class ThreadPool {
- //默認(rèn)阻塞隊(duì)列大小
- private static final int DEFAULT_WORKQUEUE_SIZE = 5;
- //模擬實(shí)際的線(xiàn)程池使用阻塞隊(duì)列來(lái)實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模式
- private BlockingQueue<Runnable> workQueue;
- //模擬實(shí)際的線(xiàn)程池使用List集合保存線(xiàn)程池內(nèi)部的工作線(xiàn)程
- private List<WorkThread> workThreads = new ArrayList<WorkThread>();
- //在ThreadPool的構(gòu)造方法中傳入線(xiàn)程池的大小和阻塞隊(duì)列
- public ThreadPool(int poolSize, BlockingQueue<Runnable> workQueue){
- this.workQueue = workQueue;
- //創(chuàng)建poolSize個(gè)工作線(xiàn)程并將其加入到workThreads集合中
- IntStream.range(0, poolSize).forEach((i) -> {
- WorkThread workThread = new WorkThread();
- workThread.start();
- workThreads.add(workThread);
- });
- }
- //在ThreadPool的構(gòu)造方法中傳入線(xiàn)程池的大小
- public ThreadPool(int poolSize){
- this(poolSize, new LinkedBlockingQueue<>(DEFAULT_WORKQUEUE_SIZE));
- }
- //通過(guò)線(xiàn)程池執(zhí)行任務(wù)
- public void execute(Runnable task){
- try {
- workQueue.put(task);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- //內(nèi)部類(lèi)WorkThread,模擬線(xiàn)程池中的工作線(xiàn)程
- //主要的作用就是消費(fèi)workQueue中的任務(wù),并執(zhí)行
- //由于工作線(xiàn)程需要不斷從workQueue中獲取任務(wù),使用了while(true)循環(huán)不斷嘗試消費(fèi)隊(duì)列中的任務(wù)
- class WorkThread extends Thread{
- @Override
- public void run() {
- //不斷循環(huán)獲取隊(duì)列中的任務(wù)
- while (true){
- //當(dāng)沒(méi)有任務(wù)時(shí),會(huì)阻塞
- try {
- Runnable workTask = workQueue.take();
- workTask.run();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
沒(méi)錯(cuò),我們僅僅用了幾十行Java代碼就實(shí)現(xiàn)了一個(gè)極簡(jiǎn)版的Java線(xiàn)程池,沒(méi)錯(cuò),這個(gè)極簡(jiǎn)版的Java線(xiàn)程池的代碼卻體現(xiàn)了Java線(xiàn)程池的核心原理。
接下來(lái),我們測(cè)試下這個(gè)極簡(jiǎn)版的Java線(xiàn)程池。
編寫(xiě)測(cè)試程序
測(cè)試程序也比較簡(jiǎn)單,就是通過(guò)在main()方法中調(diào)用ThreadPool類(lèi)的構(gòu)造方法,傳入線(xiàn)程池的大小,創(chuàng)建一個(gè)ThreadPool類(lèi)的實(shí)例,然后循環(huán)10次調(diào)用ThreadPool類(lèi)的execute()方法,向線(xiàn)程池中提交的任務(wù)為:打印當(dāng)前線(xiàn)程的名稱(chēng)--->> Hello ThreadPool。
整體測(cè)試代碼如下所示。
- package io.binghe.thread.pool.test;
- import io.binghe.thread.pool.ThreadPool;
- import java.util.stream.IntStream;
- /**
- * @author binghe
- * @version 1.0.0
- * @description 測(cè)試自定義線(xiàn)程池
- */
- public class ThreadPoolTest {
- public static void main(String[] args){
- ThreadPool threadPool = new ThreadPool(10);
- IntStream.range(0, 10).forEach((i) -> {
- threadPool.execute(() -> {
- System.out.println(Thread.currentThread().getName() + "--->> Hello ThreadPool");
- });
- });
- }
- }
接下來(lái),運(yùn)行ThreadPoolTest類(lèi)的main()方法,會(huì)輸出如下信息。
- Thread-0--->> Hello ThreadPool
- Thread-9--->> Hello ThreadPool
- Thread-5--->> Hello ThreadPool
- Thread-8--->> Hello ThreadPool
- Thread-4--->> Hello ThreadPool
- Thread-1--->> Hello ThreadPool
- Thread-2--->> Hello ThreadPool
- Thread-5--->> Hello ThreadPool
- Thread-9--->> Hello ThreadPool
- Thread-0--->> Hello ThreadPool
至此,我們自定義的Java線(xiàn)程池就開(kāi)發(fā)完成了。
總結(jié)
線(xiàn)程池的核心原理其實(shí)并不復(fù)雜,只要我們耐心的分析,深入其源碼理解線(xiàn)程池的核心本質(zhì),你就會(huì)發(fā)現(xiàn)線(xiàn)程池的設(shè)計(jì)原來(lái)是如此的優(yōu)雅。希望通過(guò)這個(gè)手寫(xiě)線(xiàn)程池的小例子,能夠讓你更好的理解線(xiàn)程池的核心原理。
本文轉(zhuǎn)載自微信公眾號(hào)「冰河技術(shù)」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系冰河技術(shù)公眾號(hào)。