新手也能看懂的線程池學習總結(jié)
一 使用線程池的好處
池化技術(shù)相比大家已經(jīng)屢見不鮮了,線程池、數(shù)據(jù)庫連接池、Http 連接池等等都是對這個思想的應(yīng)用。池化技術(shù)的思想主要是為了減少每次獲取資源的消耗,提高對資源的利用率。
線程池提供了一種限制和管理資源(包括執(zhí)行一個任務(wù))。每個線程池還維護一些基本統(tǒng)計信息,例如已完成任務(wù)的數(shù)量。
這里借用《Java 并發(fā)編程的藝術(shù)》提到的來說一下使用線程池的好處:
- 降低資源消耗。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
- 提高響應(yīng)速度。當任務(wù)到達時,任務(wù)可以不需要的等到線程創(chuàng)建就能立即執(zhí)行。
- 提高線程的可管理性。線程是稀缺資源,如果無限制的創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進行統(tǒng)一的分配,調(diào)優(yōu)和監(jiān)控。
二 Executor 框架
2.1 簡介
Executor 框架是 Java5 之后引進的,在 Java 5 之后,通過 Executor 來啟動線程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用線程池實現(xiàn),節(jié)約開銷)外,還有關(guān)鍵的一點:有助于避免 this 逃逸問題。
補充:this 逃逸是指在構(gòu)造函數(shù)返回之前其他線程就持有該對象的引用. 調(diào)用尚未構(gòu)造完全的對象的方法可能引發(fā)令人疑惑的錯誤。
Executor 框架不僅包括了線程池的管理,還提供了線程工廠、隊列以及拒絕策略等,Executor 框架讓并發(fā)編程變得更加簡單。
2.2 Executor 框架結(jié)構(gòu)(主要由三大部分組成)
1) 任務(wù)(Runnable /Callable)
執(zhí)行任務(wù)需要實現(xiàn)的 Runnable 接口 或 Callable接口。Runnable 接口或 Callable 接口實現(xiàn)類都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 執(zhí)行。
2) 任務(wù)的執(zhí)行(Executor)
如下圖所示,包括任務(wù)執(zhí)行機制的核心接口 Executor ,以及繼承自 Executor 接口的 ExecutorService 接口。ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 這兩個關(guān)鍵類實現(xiàn)了 ExecutorService 接口。
這里提了很多底層的類關(guān)系,但是,實際上我們需要更多關(guān)注的是 ThreadPoolExecutor 這個類,這個類在我們實際使用線程池的過程中,使用頻率還是非常高的。
注意: 通過查看 ScheduledThreadPoolExecutor 源代碼我們發(fā)現(xiàn) ScheduledThreadPoolExecutor 實際上是繼承了 ThreadPoolExecutor 并實現(xiàn)了 ScheduledExecutorService ,而 ScheduledExecutorService 又實現(xiàn)了 ExecutorService,正如我們下面給出的類關(guān)系圖顯示的一樣。
ThreadPoolExecutor 類描述:
- //AbstractExecutorService實現(xiàn)了ExecutorService接口
- public class ThreadPoolExecutor extends AbstractExecutorService
ScheduledThreadPoolExecutor 類描述:
- //ScheduledExecutorService實現(xiàn)了ExecutorService接口
- public class ScheduledThreadPoolExecutor
- extends ThreadPoolExecutor
- implements ScheduledExecutorService
任務(wù)的執(zhí)行相關(guān)接口
3) 異步計算的結(jié)果(Future)
Future 接口以及 Future 接口的實現(xiàn)類 FutureTask 類都可以代表異步計算的結(jié)果。
當我們把 Runnable接口 或 Callable 接口 的實現(xiàn)類提交給 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 執(zhí)行。(調(diào)用 submit() 方法時會返回一個 FutureTask 對象)
2.3 Executor 框架的使用示意圖
Executor 框架的使用示意圖
主線程首先要創(chuàng)建實現(xiàn) Runnable 或者 Callable 接口的任務(wù)對象。
把創(chuàng)建完成的實現(xiàn) Runnable/Callable接口的 對象直接交給 ExecutorService 執(zhí)行:ExecutorService.execute(Runnable command))或者也可以把 Runnable 對象或Callable 對象提交給 ExecutorService 執(zhí)行(ExecutorService.submit(Runnable task)或ExecutorService.submit(Callable
如果執(zhí)行 ExecutorService.submit(…),ExecutorService 將返回一個實現(xiàn)Future接口的對象(我們剛剛也提到過了執(zhí)行 execute()方法和 submit()方法的區(qū)別,submit()會返回一個 FutureTask 對象)。由于 FutureTask 實現(xiàn)了 Runnable,我們也可以創(chuàng)建 FutureTask,然后直接交給 ExecutorService 執(zhí)行。
最后,主線程可以執(zhí)行 FutureTask.get()方法來等待任務(wù)執(zhí)行完成。主線程也可以執(zhí)行FutureTask.cancel(boolean mayInterruptIfRunning)來取消此任務(wù)的執(zhí)行。
三 (重要)ThreadPoolExecutor 類簡單介紹
線程池實現(xiàn)類 ThreadPoolExecutor 是 Executor 框架最核心的類。
3.1 ThreadPoolExecutor 類分析
ThreadPoolExecutor 類中提供的四個構(gòu)造方法。我們來看最長的那個,其余三個都是在這個構(gòu)造方法的基礎(chǔ)上產(chǎn)生(其他幾個構(gòu)造方法說白點都是給定某些默認參數(shù)的構(gòu)造方法比如默認制定拒絕策略是什么),這里就不貼代碼講了,比較簡單。
- /**
- * 用給定的初始參數(shù)創(chuàng)建一個新的ThreadPoolExecutor。
- */
- public ThreadPoolExecutor(int corePoolSize,//線程池的核心線程數(shù)量
- int maximumPoolSize,//線程池的最大線程數(shù)
- long keepAliveTime,//當線程數(shù)大于核心線程數(shù)時,多余的空閑線程存活的最長時間
- TimeUnit unit,//時間單位
- BlockingQueue<Runnable> workQueue,//任務(wù)隊列,用來儲存等待執(zhí)行任務(wù)的隊列
- ThreadFactory threadFactory,//線程工廠,用來創(chuàng)建線程,一般默認即可
- RejectedExecutionHandler handler//拒絕策略,當提交的任務(wù)過多而不能及時處理時,我們可以定制策略來處理任務(wù)
- ) {
- if (corePoolSize < 0 ||
- maximumPoolSize <= 0 ||
- maximumPoolSize < corePoolSize ||
- keepAliveTime < 0)
- throw new IllegalArgumentException();
- if (workQueue == null || threadFactory == null || handler == null)
- throw new NullPointerException();
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.threadFactory = threadFactory;
- this.handler = handler;
- }
下面這些對創(chuàng)建 非常重要,在后面使用線程池的過程中你一定會用到!所以,務(wù)必拿著小本本記清楚。
- ThreadPoolExecutor 3 個最重要的參數(shù):
- corePoolSize : 核心線程數(shù)線程數(shù)定義了最小可以同時運行的線程數(shù)量。
- maximumPoolSize : 當隊列中存放的任務(wù)達到隊列容量的時候,當前可以同時運行的線程數(shù)量變?yōu)樽畲缶€程數(shù)。
workQueue: 當新任務(wù)來的時候會先判斷當前運行的線程數(shù)量是否達到核心線程數(shù),如果達到的話,信任就會被存放在隊列中。
ThreadPoolExecutor其他常見參數(shù):
- keepAliveTime:當線程池中的線程數(shù)量大于 corePoolSize 的時候,如果這時沒有新的任務(wù)提交,核心線程外的線程不會立即銷毀,而是會等待,直到等待的時間超過了 keepAliveTime才會被回收銷毀;
- unit : keepAliveTime 參數(shù)的時間單位。
- threadFactory :executor 創(chuàng)建新線程的時候會用到。
- handler :飽和策略。關(guān)于飽和策略下面單獨介紹一下。
下面這張圖可以加深你對線程池中各個參數(shù)的相互關(guān)系的理解(圖片來源:《Java性能調(diào)優(yōu)實戰(zhàn)》):
線程池各個參數(shù)的關(guān)系
ThreadPoolExecutor 飽和策略定義:
如果當前同時運行的線程數(shù)量達到最大線程數(shù)量并且隊列也已經(jīng)被放滿了任時,ThreadPoolTaskExecutor 定義一些策略:
- ThreadPoolExecutor.AbortPolicy:拋出 RejectedExecutionException來拒絕新任務(wù)的處理。
- ThreadPoolExecutor.CallerRunsPolicy:調(diào)用執(zhí)行自己的線程運行任務(wù)。您不會任務(wù)請求。但是這種策略會降低對于新任務(wù)提交速度,影響程序的整體性能。另外,這個策略喜歡增加隊列容量。如果您的應(yīng)用程序可以承受此延遲并且你不能任務(wù)丟棄任何一個任務(wù)請求的話,你可以選擇這個策略。
- ThreadPoolExecutor.DiscardPolicy: 不處理新任務(wù),直接丟棄掉。
- ThreadPoolExecutor.DiscardOldestPolicy: 此策略將丟棄最早的未處理的任務(wù)請求。
舉個例子:
Spring 通過 ThreadPoolTaskExecutor 或者我們直接通過 ThreadPoolExecutor 的構(gòu)造函數(shù)創(chuàng)建線程池的時候,當我們不指定 RejectedExecutionHandler 飽和策略的話來配置線程池的時候默認使用的是 ThreadPoolExecutor.AbortPolicy。在默認情況下,ThreadPoolExecutor 將拋出 RejectedExecutionException 來拒絕新來的任務(wù) ,這代表你將丟失對這個任務(wù)的處理。對于可伸縮的應(yīng)用程序,建議使用 ThreadPoolExecutor.CallerRunsPolicy。當最大池被填滿時,此策略為我們提供可伸縮隊列。(這個直接查看 ThreadPoolExecutor 的構(gòu)造函數(shù)源碼就可以看出,比較簡單的原因,這里就不貼代碼了。)
3.2 推薦使用 ThreadPoolExecutor 構(gòu)造函數(shù)創(chuàng)建線程池
在《阿里巴巴 Java 開發(fā)手冊》“并發(fā)處理”這一章節(jié),明確指出線程資源必須通過線程池提供,不允許在應(yīng)用中自行顯示創(chuàng)建線程。
為什么呢?
使用線程池的好處是減少在創(chuàng)建和銷毀線程上所消耗的時間以及系統(tǒng)資源開銷,解決資源不足的問題。如果不使用線程池,有可能會造成系統(tǒng)創(chuàng)建大量同類線程而導(dǎo)致消耗完內(nèi)存或者“過度切換”的問題。
另外《阿里巴巴 Java 開發(fā)手冊》中強制線程池不允許使用 Executors 去創(chuàng)建,而是通過 ThreadPoolExecutor 構(gòu)造函數(shù)的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規(guī)則,規(guī)避資源耗盡的風險
Executors 返回線程池對象的弊端如下:
FixedThreadPool 和 SingleThreadExecutor :允許請求的隊列長度為 Integer.MAX_VALUE,可能堆積大量的請求,從而導(dǎo)致 OOM。
CachedThreadPool 和 ScheduledThreadPool :允許創(chuàng)建的線程數(shù)量為 Integer.MAX_VALUE ,可能會創(chuàng)建大量線程,從而導(dǎo)致 OOM。
方式一:通過ThreadPoolExecutor構(gòu)造函數(shù)實現(xiàn)(推薦)
方式二:通過 Executor 框架的工具類 Executors 來實現(xiàn)我們可以創(chuàng)建三種類型的 ThreadPoolExecutor:
- FixedThreadPool
- SingleThreadExecutor
- CachedThreadPool
對應(yīng) Executors 工具類中的方法如圖所示:
四 (重要)ThreadPoolExecutor 使用示例我們上面講解了 Executor框架以及 ThreadPoolExecutor 類,下面讓我們實戰(zhàn)一下,來通過寫一個 ThreadPoolExecutor 的小 Demo 來回顧上面的內(nèi)容。
4.1 示例代碼:Runnable+ThreadPoolExecutor
首先創(chuàng)建一個 Runnable 接口的實現(xiàn)類(當然也可以是 Callable 接口,我們上面也說了兩者的區(qū)別。)
MyRunnable.java
- import java.util.Date;
- /**
- * 這是一個簡單的Runnable類,需要大約5秒鐘來執(zhí)行其任務(wù)。
- * @author shuang.kou
- */
- public class MyRunnable implements Runnable {
- private String command;
- public MyRunnable(String s) {
- this.command = s;
- }
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
- processCommand();
- System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
- }
- private void processCommand() {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- @Override
- public String toString() {
- return this.command;
- }
- }
編寫測試程序,我們這里以阿里巴巴推薦的使用 ThreadPoolExecutor 構(gòu)造函數(shù)自定義參數(shù)的方式來創(chuàng)建線程池。
ThreadPoolExecutorDemo.java
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- public class ThreadPoolExecutorDemo {
- private static final int CORE_POOL_SIZE = 5;
- private static final int MAX_POOL_SIZE = 10;
- private static final int QUEUE_CAPACITY = 100;
- private static final Long KEEP_ALIVE_TIME = 1L;
- public static void main(String[] args) {
- //使用阿里巴巴推薦的創(chuàng)建線程池的方式
- //通過ThreadPoolExecutor構(gòu)造函數(shù)自定義參數(shù)創(chuàng)建
- ThreadPoolExecutor executor = new ThreadPoolExecutor(
- CORE_POOL_SIZE,
- MAX_POOL_SIZE,
- KEEP_ALIVE_TIME,
- TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(QUEUE_CAPACITY),
- new ThreadPoolExecutor.CallerRunsPolicy());
- for (int i = 0; i < 10; i++) {
- //創(chuàng)建WorkerThread對象(WorkerThread類實現(xiàn)了Runnable 接口)
- Runnable worker = new MyRunnable("" + i);
- //執(zhí)行Runnable
- executor.execute(worker);
- }
- //終止線程池
- executor.shutdown();
- while (!executor.isTerminated()) {
- }
- System.out.println("Finished all threads");
- }
- }
可以看到我們上面的代碼指定了:
- corePoolSize: 核心線程數(shù)為 5。
- maximumPoolSize :最大線程數(shù) 10
- keepAliveTime : 等待時間為 1L。
- unit: 等待時間的單位為 TimeUnit.SECONDS。
- workQueue:任務(wù)隊列為 ArrayBlockingQueue,并且容量為 100;
- handler:飽和策略為 CallerRunsPolicy。
Output:
- pool-1-thread-2 Start. Time = Tue Nov 12 20:59:44 CST 2019
- pool-1-thread-5 Start. Time = Tue Nov 12 20:59:44 CST 2019
- pool-1-thread-4 Start. Time = Tue Nov 12 20:59:44 CST 2019
- pool-1-thread-1 Start. Time = Tue Nov 12 20:59:44 CST 2019
- pool-1-thread-3 Start. Time = Tue Nov 12 20:59:44 CST 2019
- pool-1-thread-5 End. Time = Tue Nov 12 20:59:49 CST 2019
- pool-1-thread-3 End. Time = Tue Nov 12 20:59:49 CST 2019
- pool-1-thread-2 End. Time = Tue Nov 12 20:59:49 CST 2019
- pool-1-thread-4 End. Time = Tue Nov 12 20:59:49 CST 2019
- pool-1-thread-1 End. Time = Tue Nov 12 20:59:49 CST 2019
- pool-1-thread-2 Start. Time = Tue Nov 12 20:59:49 CST 2019
- pool-1-thread-1 Start. Time = Tue Nov 12 20:59:49 CST 2019
- pool-1-thread-4 Start. Time = Tue Nov 12 20:59:49 CST 2019
- pool-1-thread-3 Start. Time = Tue Nov 12 20:59:49 CST 2019
- pool-1-thread-5 Start. Time = Tue Nov 12 20:59:49 CST 2019
- pool-1-thread-2 End. Time = Tue Nov 12 20:59:54 CST 2019
- pool-1-thread-3 End. Time = Tue Nov 12 20:59:54 CST 2019
- pool-1-thread-4 End. Time = Tue Nov 12 20:59:54 CST 2019
- pool-1-thread-5 End. Time = Tue Nov 12 20:59:54 CST 2019
- pool-1-thread-1 End. Time = Tue Nov 12 20:59:54 CST 2019
4.2 線程池原理分析
承接 5.1 節(jié),我們通過代碼輸出結(jié)果可以看出:線程池每次會同時執(zhí)行 5 個任務(wù),這 5 個任務(wù)執(zhí)行完之后,剩余的 5 個任務(wù)才會被執(zhí)行。 大家可以先通過上面講解的內(nèi)容,分析一下到底是咋回事?(自己獨立思考一會)
現(xiàn)在,我們就分析上面的輸出內(nèi)容來簡單分析一下線程池原理。
**為了搞懂線程池的原理,我們需要首先分析一下 execute方法。**在 5.1 節(jié)中的 Demo 中我們使用 executor.execute(worker)來提交一個任務(wù)到線程池中去,這個方法非常重要,下面我們來看看它的源碼:
- // 存放線程池的運行狀態(tài) (runState) 和線程池內(nèi)有效線程的數(shù)量 (workerCount)
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- private static int workerCountOf(int c) {
- return c & CAPACITY;
- }
- private final BlockingQueue<Runnable> workQueue;
- public void execute(Runnable command) {
- // 如果任務(wù)為null,則拋出異常。
- if (command == null)
- throw new NullPointerException();
- // ctl 中保存的線程池當前的一些狀態(tài)信息
- int c = ctl.get();
- // 下面會涉及到 3 步 操作
- // 1.首先判斷當前線程池中之行的任務(wù)數(shù)量是否小于 corePoolSize
- // 如果小于的話,通過addWorker(command, true)新建一個線程,并將任務(wù)(command)添加到該線程中;然后,啟動該線程從而執(zhí)行任務(wù)。
- if (workerCountOf(c) < corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- // 2.如果當前之行的任務(wù)數(shù)量大于等于 corePoolSize 的時候就會走到這里
- // 通過 isRunning 方法判斷線程池狀態(tài),線程池處于 RUNNING 狀態(tài)才會被并且隊列可以加入任務(wù),該任務(wù)才會被加入進去
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- // 再次獲取線程池狀態(tài),如果線程池狀態(tài)不是 RUNNING 狀態(tài)就需要從任務(wù)隊列中移除任務(wù),并嘗試判斷線程是否全部執(zhí)行完畢。同時執(zhí)行拒絕策略。
- if (!isRunning(recheck) && remove(command))
- reject(command);
- // 如果當前線程池為空就新創(chuàng)建一個線程并執(zhí)行。
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- //3. 通過addWorker(command, false)新建一個線程,并將任務(wù)(command)添加到該線程中;然后,啟動該線程從而執(zhí)行任務(wù)。
- //如果addWorker(command, false)執(zhí)行失敗,則通過reject()執(zhí)行相應(yīng)的拒絕策略的內(nèi)容。
- else if (!addWorker(command, false))
- reject(command);
- }
通過下圖可以更好的對上面這 3 步做一個展示,下圖是我為了省事直接從網(wǎng)上找到,原地址不明。
圖解線程池實現(xiàn)原理
現(xiàn)在,讓我們在回到 5.1 節(jié)我們寫的 Demo, 現(xiàn)在應(yīng)該是不是很容易就可以搞懂它的原理了呢?
沒搞懂的話,也沒關(guān)系,可以看看我的分析:
我們在代碼中模擬了 10 個任務(wù),我們配置的核心線程數(shù)為 5 、等待隊列容量為 100 ,所以每次只可能存在 5 個任務(wù)同時執(zhí)行,剩下的 5 個任務(wù)會被放到等待隊列中去。當前的 5 個任務(wù)之行完成后,才會之行剩下的 5 個任務(wù)。
4.3 幾個常見的對比
4.3.1 Runnable vs Callable
Runnable自 Java 1.0 以來一直存在,但Callable僅在 Java 1.5 中引入,目的就是為了來處理Runnable不支持的用例。Runnable 接口不會返回結(jié)果或拋出檢查異常,但是**Callable 接口**可以。所以,如果任務(wù)不需要返回結(jié)果或拋出異常推薦使用 Runnable 接口,這樣代碼看起來會更加簡潔。
工具類 Executors 可以實現(xiàn) Runnable 對象和 Callable 對象之間的相互轉(zhuǎn)換。(Executors.callable(Runnable task)或 Executors.callable(Runnable task,Object resule))。
Runnable.java
- @FunctionalInterface
- public interface Runnable {
- /**
- * 被線程執(zhí)行,沒有返回值也無法拋出異常
- */
- public abstract void run();
- }
Callable.java
- @FunctionalInterface
- public interface Callable<V> {
- /**
- * 計算結(jié)果,或在無法這樣做時拋出異常。
- * @return 計算得出的結(jié)果
- * @throws 如果無法計算結(jié)果,則拋出異常
- */
- V call() throws Exception;
- }
4.3.2 execute() vs submit()
- execute()方法用于提交不需要返回值的任務(wù),所以無法判斷任務(wù)是否被線程池執(zhí)行成功與否;
- submit()方法用于提交需要返回值的任務(wù)。線程池會返回一個 Future 類型的對象,通過這個 Future 對象可以判斷任務(wù)是否執(zhí)行成功,并且可以通過 Future 的 get()方法來獲取返回值,get()方法會阻塞當前線程直到任務(wù)完成,而使用 get(long timeout,TimeUnit unit)方法則會阻塞當前線程一段時間后立即返回,這時候有可能任務(wù)沒有執(zhí)行完。
我們以**AbstractExecutorService**接口中的一個 submit 方法為例子來看看源代碼:
- public Future<?> submit(Runnable task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<Void> ftask = newTaskFor(task, null);
- execute(ftask);
- return ftask;
- }
上面方法調(diào)用的 newTaskFor 方法返回了一個 FutureTask 對象。
- protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
- return new FutureTask<T>(runnable, value);
- }
我們再來看看execute()方法:
- public void execute(Runnable command) {
- ...
- }
4.3.3 shutdown()VSshutdownNow()
shutdown() :關(guān)閉線程池,線程池的狀態(tài)變?yōu)?SHUTDOWN。線程池不再接受新任務(wù)了,但是隊列里的任務(wù)得執(zhí)行完畢。
shutdownNow() :關(guān)閉線程池,線程的狀態(tài)變?yōu)?STOP。線程池會終止當前正在運行的任務(wù),并停止處理排隊的任務(wù)并返回正在等待執(zhí)行的 List。
4.3.2 isTerminated() VS isShutdown()
- isShutDown 當調(diào)用 shutdown() 方法后返回為 true。
- isTerminated 當調(diào)用 shutdown() 方法后,并且所有提交的任務(wù)完成后返回為 true
4.4 加餐:Callable+ThreadPoolExecutor示例代碼
MyCallable.java
- import java.util.concurrent.Callable;
- public class MyCallable implements Callable<String> {
- @Override
- public String call() throws Exception {
- Thread.sleep(1000);
- //返回執(zhí)行當前 Callable 的線程名字
- return Thread.currentThread().getName();
- }
- }
CallableDemo.java
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.List;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.Callable;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Future;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- public class CallableDemo {
- private static final int CORE_POOL_SIZE = 5;
- private static final int MAX_POOL_SIZE = 10;
- private static final int QUEUE_CAPACITY = 100;
- private static final Long KEEP_ALIVE_TIME = 1L;
- public static void main(String[] args) {
- //使用阿里巴巴推薦的創(chuàng)建線程池的方式
- //通過ThreadPoolExecutor構(gòu)造函數(shù)自定義參數(shù)創(chuàng)建
- ThreadPoolExecutor executor = new ThreadPoolExecutor(
- CORE_POOL_SIZE,
- MAX_POOL_SIZE,
- KEEP_ALIVE_TIME,
- TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(QUEUE_CAPACITY),
- new ThreadPoolExecutor.CallerRunsPolicy());
- List<Future<String>> futureList = new ArrayList<>();
- Callable<String> callable = new MyCallable();
- for (int i = 0; i < 10; i++) {
- //提交任務(wù)到線程池
- Future<String> future = executor.submit(callable);
- //將返回值 future 添加到 list,我們可以通過 future 獲得 執(zhí)行 Callable 得到的返回值
- futureList.add(future);
- }
- for (Future<String> fut : futureList) {
- try {
- System.out.println(new Date() + "::" + fut.get());
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
- }
- }
- //關(guān)閉線程池
- executor.shutdown();
- }
- }
Output:
- Wed Nov 13 13:40:41 CST 2019::pool-1-thread-1
- Wed Nov 13 13:40:42 CST 2019::pool-1-thread-2
- Wed Nov 13 13:40:42 CST 2019::pool-1-thread-3
- Wed Nov 13 13:40:42 CST 2019::pool-1-thread-4
- Wed Nov 13 13:40:42 CST 2019::pool-1-thread-5
- Wed Nov 13 13:40:42 CST 2019::pool-1-thread-3
- Wed Nov 13 13:40:43 CST 2019::pool-1-thread-2
- Wed Nov 13 13:40:43 CST 2019::pool-1-thread-1
- Wed Nov 13 13:40:43 CST 2019::pool-1-thread-4
- Wed Nov 13 13:40:43 CST 2019::pool-1-thread-5
##五 幾種常見的線程池詳解
5.1 FixedThreadPool
5.1.1 介紹
FixedThreadPool 被稱為可重用固定線程數(shù)的線程池。通過 Executors 類中的相關(guān)源代碼來看一下相關(guān)實現(xiàn):
- /**
- * 創(chuàng)建一個可重用固定數(shù)量線程的線程池
- */
- public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(),
- threadFactory);
- }
另外還有一個 FixedThreadPool 的實現(xiàn)方法,和上面的類似,所以這里不多做闡述:
- public static ExecutorService newFixedThreadPool(int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>());
- }
從上面源代碼可以看出新創(chuàng)建的 FixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被設(shè)置為 nThreads,這個 nThreads 參數(shù)是我們使用的時候自己傳遞的。
5.1.2 執(zhí)行任務(wù)過程介紹
FixedThreadPool 的 execute() 方法運行示意圖(該圖片來源:《Java 并發(fā)編程的藝術(shù)》):
FixedThreadPool的execute()方法運行示意圖
上圖說明:
- 如果當前運行的線程數(shù)小于 corePoolSize, 如果再來新任務(wù)的話,就創(chuàng)建新的線程來執(zhí)行任務(wù);
- 當前運行的線程數(shù)等于 corePoolSize 后, 如果再來新任務(wù)的話,會將任務(wù)加入 LinkedBlockingQueue;
- 線程池中的線程執(zhí)行完 手頭的任務(wù)后,會在循環(huán)中反復(fù)從 LinkedBlockingQueue 中獲取任務(wù)來執(zhí)行;
5.1.3 為什么不推薦使用FixedThreadPool?
FixedThreadPool 使用無界隊列 LinkedBlockingQueue(隊列的容量為 Intger.MAX_VALUE)作為線程池的工作隊列會對線程池帶來如下影響 :
- 當線程池中的線程數(shù)達到 corePoolSize 后,新任務(wù)將在無界隊列中等待,因此線程池中的線程數(shù)不會超過 corePoolSize;
- 由于使用無界隊列時 maximumPoolSize 將是一個無效參數(shù),因為不可能存在任務(wù)隊列滿的情況。所以,通過創(chuàng)建 FixedThreadPool的源碼可以看出創(chuàng)建的 FixedThreadPool 的 corePoolSize 和 maximumPoolSize 被設(shè)置為同一個值。
- 由于 1 和 2,使用無界隊列時 keepAliveTime 將是一個無效參數(shù);
- 運行中的 FixedThreadPool(未執(zhí)行 shutdown()或 shutdownNow())不會拒絕任務(wù),在任務(wù)比較多的時候會導(dǎo)致 OOM(內(nèi)存溢出)。
5.2 SingleThreadExecutor 詳解
5.2.1 介紹
SingleThreadExecutor 是只有一個線程的線程池。下面看看SingleThreadExecutor 的實現(xiàn):
- /**
- *返回只有一個線程的線程池
- */
- public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
- return new FinalizableDelegatedExecutorService
- (new ThreadPoolExecutor(1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(),
- threadFactory));
- }
- public static ExecutorService newSingleThreadExecutor() {
- return new FinalizableDelegatedExecutorService
- (new ThreadPoolExecutor(1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>()));
- }
從上面源代碼可以看出新創(chuàng)建的 SingleThreadExecutor 的 corePoolSize 和 maximumPoolSize 都被設(shè)置為 1.其他參數(shù)和 FixedThreadPool 相同。
5.2.2 執(zhí)行任務(wù)過程介紹
SingleThreadExecutor 的運行示意圖(該圖片來源:《Java 并發(fā)編程的藝術(shù)》):
上圖說明;
如果當前運行的線程數(shù)少于 corePoolSize,則創(chuàng)建一個新的線程執(zhí)行任務(wù);
當前線程池中有一個運行的線程后,將任務(wù)加入 LinkedBlockingQueue
線程執(zhí)行完當前的任務(wù)后,會在循環(huán)中反復(fù)從LinkedBlockingQueue 中獲取任務(wù)來執(zhí)行;
5.2.3 為什么不推薦使用FixedThreadPool?
SingleThreadExecutor 使用無界隊列 LinkedBlockingQueue 作為線程池的工作隊列(隊列的容量為 Intger.MAX_VALUE)。SingleThreadExecutor 使用無界隊列作為線程池的工作隊列會對線程池帶來的影響與 FixedThreadPool 相同。說簡單點就是可能會導(dǎo)致 OOM,
5.3 CachedThreadPool 詳解
5.3.1 介紹
CachedThreadPool 是一個會根據(jù)需要創(chuàng)建新線程的線程池。下面通過源碼來看看 CachedThreadPool 的實現(xiàn):
- /**
- * 創(chuàng)建一個線程池,根據(jù)需要創(chuàng)建新線程,但會在先前構(gòu)建的線程可用時重用它。
- */
- public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>(),
- threadFactory);
- }
- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
- }
CachedThreadPool 的corePoolSize 被設(shè)置為空(0),maximumPoolSize被設(shè)置為 Integer.MAX.VALUE,即它是無界的,這也就意味著如果主線程提交任務(wù)的速度高于 maximumPool 中線程處理任務(wù)的速度時,CachedThreadPool 會不斷創(chuàng)建新的線程。極端情況下,這樣會導(dǎo)致耗盡 cpu 和內(nèi)存資源。
5.3.2 執(zhí)行任務(wù)過程介紹
CachedThreadPool 的 execute()方法的執(zhí)行示意圖(該圖片來源:《Java 并發(fā)編程的藝術(shù)》):
上圖說明:
首先執(zhí)行 SynchronousQueue.offer(Runnable task) 提交任務(wù)到任務(wù)隊列。如果當前 maximumPool 中有閑線程正在執(zhí)行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主線程執(zhí)行 offer 操作與空閑線程執(zhí)行的 poll 操作配對成功,主線程把任務(wù)交給空閑線程執(zhí)行,execute()方法執(zhí)行完成,否則執(zhí)行下面的步驟 2;
當初始 maximumPool 為空,或者 maximumPool 中沒有空閑線程時,將沒有線程執(zhí)行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。這種情況下,步驟 1 將失敗,此時 CachedThreadPool 會創(chuàng)建新線程執(zhí)行任務(wù),execute 方法執(zhí)行完成;
5.3.3 為什么不推薦使用CachedThreadPool?
CachedThreadPool允許創(chuàng)建的線程數(shù)量為 Integer.MAX_VALUE ,可能會創(chuàng)建大量線程,從而導(dǎo)致 OOM。
六 ScheduledThreadPoolExecutor 詳解
ScheduledThreadPoolExecutor 主要用來在給定的延遲后運行任務(wù),或者定期執(zhí)行任務(wù)。 這個在實際項目中基本不會被用到,所以對這部分大家只需要簡單了解一下它的思想。關(guān)于如何在Spring Boot 中 實現(xiàn)定時任務(wù),可以查看這篇文章《5分鐘搞懂如何在Spring Boot中Schedule Tasks》。
6.1 簡介
ScheduledThreadPoolExecutor 使用的任務(wù)隊列 DelayQueue 封裝了一個PriorityQueue,PriorityQueue 會對隊列中的任務(wù)進行排序,執(zhí)行所需時間短的放在前面先被執(zhí)行(ScheduledFutureTask 的 time 變量小的先執(zhí)行),如果執(zhí)行所需時間相同則先提交的任務(wù)將被先執(zhí)行(ScheduledFutureTask 的 squenceNumber 變量小的先執(zhí)行)。
ScheduledThreadPoolExecutor 和 Timer 的比較:
- Timer 對系統(tǒng)時鐘的變化敏感,ScheduledThreadPoolExecutor不是;
- Timer 只有一個執(zhí)行線程,因此長時間運行的任務(wù)可以延遲其他任務(wù)。ScheduledThreadPoolExecutor 可以配置任意數(shù)量的線程。此外,如果你想(通過提供 ThreadFactory),你可以完全控制創(chuàng)建的線程;
- 在TimerTask 中拋出的運行時異常會殺死一個線程,從而導(dǎo)致 Timer 死機:-( ...即計劃任務(wù)將不再運行。ScheduledThreadExecutor 不僅捕獲運行時異常,還允許您在需要時處理它們(通過重寫 afterExecute 方法ThreadPoolExecutor)。拋出異常的任務(wù)將被取消,但其他任務(wù)將繼續(xù)運行。
綜上,在 JDK1.5 之后,你沒有理由再使用 Timer 進行任務(wù)調(diào)度了。
備注: Quartz 是一個由 java 編寫的任務(wù)調(diào)度庫,由 OpenSymphony 組織開源出來。在實際項目開發(fā)中使用 Quartz 的還是居多,比較推薦使用 Quartz。因為 Quartz 理論上能夠同時對上萬個任務(wù)進行調(diào)度,擁有豐富的功能特性,包括任務(wù)調(diào)度、任務(wù)持久化、可集群化、插件等等。
6.2 運行機制
ScheduledThreadPoolExecutor運行機制
ScheduledThreadPoolExecutor 的執(zhí)行主要分為兩大部分:
當調(diào)用 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate() 方法或者**scheduleWirhFixedDelay()** 方法時,會向 ScheduledThreadPoolExecutor 的 DelayQueue 添加一個實現(xiàn)了 RunnableScheduledFuture 接口的 ScheduledFutureTask 。
線程池中的線程從 DelayQueue 中獲取 ScheduledFutureTask,然后執(zhí)行任務(wù)。
ScheduledThreadPoolExecutor 為了實現(xiàn)周期性的執(zhí)行任務(wù),對 ThreadPoolExecutor做了如下修改:
- 使用 DelayQueue 作為任務(wù)隊列;
- 獲取任務(wù)的方不同
- 執(zhí)行周期任務(wù)后,增加了額外的處理
6.3 ScheduledThreadPoolExecutor 執(zhí)行周期任務(wù)的步驟
ScheduledThreadPoolExecutor執(zhí)行周期任務(wù)的步驟
- 線程 1 從 DelayQueue 中獲取已到期的 ScheduledFutureTask(DelayQueue.take())。到期任務(wù)是指 ScheduledFutureTask的 time 大于等于當前系統(tǒng)的時間;
- 線程 1 執(zhí)行這個 ScheduledFutureTask;
- 線程 1 修改 ScheduledFutureTask 的 time 變量為下次將要被執(zhí)行的時間;
- 線程 1 把這個修改 time 之后的 ScheduledFutureTask 放回 DelayQueue 中(DelayQueue.add())。
七 線程池大小確定
線程池數(shù)量的確定一直是困擾著程序員的一個難題,大部分程序員在設(shè)定線程池大小的時候就是隨心而定。我們并沒有考慮過這樣大小的配置是否會帶來什么問題,我自己就是這大部分程序員中的一個代表。
由于筆主對如何確定線程池大小也沒有什么實際經(jīng)驗,所以,這部分內(nèi)容參考了網(wǎng)上很多文章/書籍。
首先,可以肯定的一點是線程池大小設(shè)置過大或者過小都會有問題。合適的才是最好,貌似在 95 % 的場景下都是合適的。
如果閱讀過我的上一篇關(guān)于線程池的文章的話,你一定知道:
如果我們設(shè)置的線程池數(shù)量太小的話,如果同一時間有大量任務(wù)/請求需要處理,可能會導(dǎo)致大量的請求/任務(wù)在任務(wù)隊列中排隊等待執(zhí)行,甚至會出現(xiàn)任務(wù)隊列滿了之后任務(wù)/請求無法處理的情況,或者大量任務(wù)堆積在任務(wù)隊列導(dǎo)致 OOM。這樣很明顯是有問題的!CPU 根本沒有得到充分利用。
但是,如果我們設(shè)置線程數(shù)量太大,大量線程可能會同時在爭取 CPU 資源,這樣會導(dǎo)致大量的上下文切換,從而增加線程的執(zhí)行時間,影響了整體執(zhí)行效率。
上下文切換:
多線程編程中一般線程的個數(shù)都大于 CPU 核心的個數(shù),而一個 CPU 核心在任意時刻只能被一個線程使用,為了讓這些線程都能得到有效執(zhí)行,CPU 采取的策略是為每個線程分配時間片并輪轉(zhuǎn)的形式。當一個線程的時間片用完的時候就會重新處于就緒狀態(tài)讓給其他線程使用,這個過程就屬于一次上下文切換。概括來說就是:當前任務(wù)在執(zhí)行完 CPU 時間片切換到另一個任務(wù)之前會先保存自己的狀態(tài),以便下次再切換回這個任務(wù)時,可以再加載這個任務(wù)的狀態(tài)。任務(wù)從保存到再加載的過程就是一次上下文切換。
上下文切換通常是計算密集型的。也就是說,它需要相當可觀的處理器時間,在每秒幾十上百次的切換中,每次切換都需要納秒量級的時間。所以,上下文切換對系統(tǒng)來說意味著消耗大量的 CPU 時間,事實上,可能是操作系統(tǒng)中時間消耗最大的操作。
Linux 相比與其他操作系統(tǒng)(包括其他類 Unix 系統(tǒng))有很多的優(yōu)點,其中有一項就是,其上下文切換和模式切換的時間消耗非常少。
有一個簡單并且適用面比較廣的公式:
- CPU 密集型任務(wù)(N+1): 這種任務(wù)消耗的主要是 CPU 資源,可以將線程數(shù)設(shè)置為 N(CPU 核心數(shù))+1,比 CPU 核心數(shù)多出來的一個線程是為了防止線程偶發(fā)的缺頁中斷,或者其它原因?qū)е碌娜蝿?wù)暫停而帶來的影響。一旦任務(wù)暫停,CPU 就會處于空閑狀態(tài),而在這種情況下多出來的一個線程就可以充分利用 CPU 的空閑時間。
- I/O 密集型任務(wù)(2N): 這種任務(wù)應(yīng)用起來,系統(tǒng)會用大部分的時間來處理 I/O 交互,而線程在處理 I/O 的時間段內(nèi)不會占用 CPU 來處理,這時就可以將 CPU 交出給其它線程使用。因此在 I/O 密集型任務(wù)的應(yīng)用中,我們可以多配置一些線程,具體的計算方法是 2N。
八 參考《Java 并發(fā)編程的藝術(shù)》
Java Scheduler ScheduledExecutorService ScheduledThreadPoolExecutor Example[1]
java.util.concurrent.ScheduledThreadPoolExecutor Example[2]
ThreadPoolExecutor – Java Thread Pool Example[3]