自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Java并發(fā):juc Executor框架詳解

開發(fā) 后端
Executor 框架是 juc 里提供的線程池的實現。前兩天看了下 Executor 框架的一些源碼,做個簡單的總結。

Executor 框架是 juc 里提供的線程池的實現。前兩天看了下 Executor 框架的一些源碼,做個簡單的總結。

線程池大概的思路是維護一個的線程池用于執(zhí)行提交的任務。我理解池的技術的主要意義有兩個:

1. 資源的控制,如并發(fā)量限制。像連接池這種是對數據庫資源的保護。

2. 資源的有效利用,如線程復用,避免頻繁創(chuàng)建線程和線程上下文切換。

那么想象中設計一個線程池就需要有線程池大小、線程生命周期管理、等待隊列等等功能,下面結合代碼看看原理。

Excutor 整體結構如下:

 

Executor 接口定義了最基本的 execute 方法,用于接收用戶提交任務。 ExecutorService 定義了線程池終止和創(chuàng)建及提交 futureTask 任務支持的方法。

AbstractExecutorService 是抽象類,主要實現了 ExecutorService 和 futureTask 相關的一些任務創(chuàng)建和提交的方法。

ThreadPoolExecutor 是最核心的一個類,是線程池的內部實現。線程池的功能都在這里實現了,平時用的最多的基本就是這個了。其源碼很精練,遠沒當時想象的多。

ScheduledThreadPoolExecutor 在 ThreadPoolExecutor 的基礎上提供了支持定時調度的功能。線程任務可以在一定延時時間后才被觸發(fā)執(zhí)行。

1、ThreadPoolExecutor 原理

1.1 ThreadPoolExecutor內部的幾個重要屬性

1.線程池本身的狀態(tài)

  1. volatile int runState;   
  2. static final int RUNNING = 0;   
  3. static final int SHUTDOWN = 1;   
  4. static final int STOP = 2;   
  5. static final int TERMINATED = 3;  

2.等待任務隊列和工作集

  1. private final BlockingQueue<Runnable> workQueue; //等待被執(zhí)行的Runnable任務   
  2. private final HashSet<Worker> workers = new HashSet<Worker>(); //正在被執(zhí)行的Worker任務集  

 3.線程池的主要狀態(tài)鎖。線程池內部的狀態(tài)變化 ( 如線程大小 ) 都需要基于此鎖。

  1. private final ReentrantLock mainLock = new ReentrantLock(); 

4.線程的存活時間和大小

  1. private volatile long keepAliveTime;// 線程存活時間   
  2. private volatile boolean allowCoreThreadTimeOut;// 是否允許核心線程存活   
  3. private volatile int corePoolSize;// 核心池大小   
  4. private volatile int maximumPoolSize; // 最大池大小   
  5. private volatile int poolSize; //當前池大小   
  6. private int largestPoolSize; //最大池大小,區(qū)別于maximumPoolSize,是用于記錄線程池曾經達到過的最大并發(fā),理論上小于等于maximumPoolSize。   

5.線程工廠和拒絕策略

  1. private volatile RejectedExecutionHandler handler;// 拒絕策略,用于當線程池無法承載新線程是的處理策略。  
  2.  private volatile ThreadFactory threadFactory;// 線程工廠,用于在線程池需要新創(chuàng)建線程的時候創(chuàng)建線程 

6.線程池完成任務數

  1. private long completedTaskCount;//線程池運行到當前完成的任務數總和  

1.2 ThreadPoolExecutor 的內部工作原理

有了以上定義好的數據,下面來看看內部是如何實現的 。 Doug Lea 的整個思路總結起來就是 5 句話:

  1. 如果當前池大小 poolSize 小于 corePoolSize ,則創(chuàng)建新線程執(zhí)行任務。
  2. 如果當前池大小 poolSize 大于 corePoolSize ,且等待隊列未滿,則進入等待隊列
  3. 如果當前池大小 poolSize 大于 corePoolSize 且小于 maximumPoolSize ,且等待隊列已滿,則創(chuàng)建新線程執(zhí)行任務。
  4. 如果當前池大小 poolSize 大于 corePoolSize 且大于 maximumPoolSize ,且等待隊列已滿,則調用拒絕策略來處理該任務。
  5. 線程池里的每個線程執(zhí)行完任務后不會立刻退出,而是會去檢查下等待隊列里是否還有線程任務需要執(zhí)行,如果在 keepAliveTime 里等不到新的任務了,那么線程就會退出。

下面看看代碼實現:

線程池最重要的方法是由 Executor 接口定義的 execute 方法 , 是任務提交的入口。

我們看看 ThreadPoolExecutor.execute(Runnable cmd) 的實現:

  1. public void execute(Runnable command) {  
  2.         if (command == null)  
  3.             throw new NullPointerException();  
  4.         if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {  
  5.             if (runState == RUNNING && workQueue.offer(command)) {  
  6.                 if (runState != RUNNING || poolSize == 0)  
  7.                     ensureQueuedTaskHandled(command);  
  8.             }  
  9.             else if (!addIfUnderMaximumPoolSize(command))  
  10.                 reject(command); // is shutdown or saturated  
  11.         }  
  12. }  

解釋如下:

當提交一個新的 Runnable 任務:

分支1 : 如果當前池大小小于 corePoolSize, 執(zhí)行 addIfUnderCorePoolSize(command) , 如果線程池處于運行狀態(tài)且 poolSize < corePoolSize addIfUnderCorePoolSize(command) 會做如下事情,將 Runnable 任務封裝成 Worker 任務 , 創(chuàng)建新的 Thread ,執(zhí)行 Worker 任務。如果不滿足條件,則返回 false 。

代碼如下:

  1.  private boolean addIfUnderCorePoolSize(Runnable firstTask) {  
  2.         Thread t = null;  
  3.         final ReentrantLock mainLock = this.mainLock;  
  4.         mainLock.lock();  
  5.         try {  
  6.             if (poolSize < corePoolSize && runState == RUNNING)  
  7.                 t = addThread(firstTask);  
  8.         } finally {  
  9.             mainLock.unlock();  
  10.         }  
  11.         if (t == null)  
  12.             return false;  
  13.         t.start();  
  14.         return true;  
  15. }  

分支2 : 如果大于 corePoolSize 或 1 失敗失敗,則:

  • 如果等待隊列未滿,把 Runnable 任務加入到 workQueue 等待隊列
    workQueue .offer(command)
  • 如多等待隊列已經滿了,調用 addIfUnderMaximumPoolSize(command) ,和 addIfUnderCorePoolSize 基本類似,只不過判斷條件是 poolSize < maximumPoolSize 。如果大于 maximumPoolSize ,則把 Runnable 任務交由 RejectedExecutionHandler 來處理。

問題:如何實現線程的復用?

Doug Lea 的實現思路是 線程池里的每個線程執(zhí)行完任務后不立刻退出,而是去檢查下等待隊列里是否還有線程任務需要執(zhí)行,如果在 keepAliveTime 里等不到新的任務了,那么線程就會退出。這個功能的實現 關鍵在于 Worker 。線程池在執(zhí)行 Runnable 任務的時候,并不單純把 Runnable 任務交給創(chuàng)建一個 Thread 。而是會把 Runnable 任務封裝成 Worker 任務。

下面看看 Worker 的實現:

代碼很簡單,可以看出, worker 里面包裝了 firstTask 屬性,在構造worker 的時候傳進來的那個 Runnable 任務就是 firstTask 。 同時也實現了Runnable接口,所以是個代理模式,看看代理增加了哪些功能。 關鍵看 woker 的 run 方法:

  1. public void run() {  
  2.            try {  
  3.                Runnable task = firstTask;  
  4.                firstTask = null;  
  5.                while (task != null || (task = getTask()) != null) {  
  6.                    runTask(task);  
  7.                    task = null;  
  8.                }  
  9.            } finally {  
  10.                workerDone(this);  
  11.            }  
  12.        }  

可以看出 worker 的 run 方法是一個循環(huán),第一次循環(huán)運行的必然是 firstTask ,在運行完 firstTask 之后,并不會立刻結束,而是會調用 getTask 獲取新的任務( getTask 會從等待隊列里獲取等待中的任務),如果 keepAliveTime 時間內得到新任務則繼續(xù)執(zhí)行,得不到新任務則那么線程才會退出。這樣就保證了多個任務可以復用一個線程,而不是每次都創(chuàng)建新任務。 keepAliveTime 的邏輯在哪里實現的呢?主要是利用了 BlockingQueue 的 poll 方法支持等待??煽?getTask 的代碼段:

  1. if (state == SHUTDOWN)  // Help drain queue  
  2.     r = workQueue.poll();  
  3. else if (poolSize > corePoolSize || allowCoreThreadTimeOut)  
  4.     r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);  
  5. else 
  6.     r = workQueue.take();  

2.ThreadFactory 和R ejectedExecutionHandler

ThreadFactory 和 RejectedExecutionHandler是ThreadPoolExecutor的兩個屬性,也 可以認為是兩個簡單的擴展點 . ThreadFactory 是創(chuàng)建線程的工廠。

默認的線程工廠會創(chuàng)建一個帶有“ pool-poolNumber-thread-threadNumber ”為名字的線程,如果我們有特別的需要,如線程組命名、優(yōu)先級等,可以定制自己的 ThreadFactory 。

RejectedExecutionHandler 是拒絕的策略。常見有以下幾種:

AbortPolicy :不執(zhí)行,會拋出 RejectedExecutionException 異常。

CallerRunsPolicy :由調用者(調用線程池的主線程)執(zhí)行。

DiscardOldestPolicy :拋棄等待隊列中最老的。

DiscardPolicy: 不做任何處理,即拋棄當前任務。

3.ScheduledThreadPoolExecutor

ScheduleThreadPoolExecutor 是對ThreadPoolExecutor的集成。增加了定時觸發(fā)線程任務的功能。需要注意

從內部實現看, ScheduleThreadPoolExecutor 使用的是 corePoolSize 線程和一個無界隊列的固定大小的池,所以調整 maximumPoolSize 沒有效果。無界隊列是一個內部自定義的 DelayedWorkQueue 。

ScheduleThreadPoolExecutor 線程池接收定時任務的方法是 schedule ,看看內部實現:

  1. public ScheduledFuture<?> schedule(Runnable command,  
  2.                                    long delay,  
  3.                                    TimeUnit unit) {  
  4.     if (command == null || unit == null)  
  5.         throw new NullPointerException();  
  6.     RunnableScheduledFuture<?> t = decorateTask(command,  
  7.         new ScheduledFutureTask<Void>(command, null,  
  8.                                       triggerTime(delay, unit)));  
  9.  
  10.     delayedExecute(t);  
  11.     return t;  
  12. }  

以上代碼會初始化一個 RunnableScheduledFuture 類型的任務 t, 并交給 delayedExecute 方法。 delayedExecute(t) 方法實現如下:

  1.     private void delayedExecute(Runnable command) {  
  2.         if (isShutdown()) {  
  3.             reject(command);  
  4.             return;  
  5.         }  
  6.         if (getPoolSize() < getCorePoolSize())  
  7.             prestartCoreThread();  
  8.  
  9.         super.getQueue().add(command);  
  10. }  

如果當前線程池大小 poolSize 小于 CorePoolSize ,則創(chuàng)建一個新的線程,注意這里創(chuàng)建的線程是空的,不會把任務直接交給線程來做,而是把線程任務放到隊列里。因為任務是要定時觸發(fā)的,所以不能直接交給線程去執(zhí)行。

問題: 那如何做到定時觸發(fā)呢?

關鍵在于DelayedWorkQueue,它代理了 DelayQueue 。可以認為 DelayQueue 是這樣一個隊列(具體可以去看下源碼,不詳細分析):

1. 隊列里的元素按照任務的 delay 時間長短升序排序, delay 時間短的在隊頭, delay 時間長的在隊尾。

2. 從 DelayQueue 里 FIFO 的獲取一個元素的時候,不會直接返回 head ??赡軙枞鹊?head 節(jié)點到達 delay 時間后才能被獲取??梢钥聪?DelayQueue 的 take 方法實現:

  1. public E take() throws InterruptedException {  
  2.     final ReentrantLock lock = this.lock;  
  3.     lock.lockInterruptibly();  
  4.     try {  
  5.         for (;;) {  
  6.             E first = q.peek();  
  7.             if (first == null) {  
  8.                 available.await();  
  9.             } else {  
  10.                 long delay =  first.getDelay(TimeUnit.NANOSECONDS);  
  11.                 if (delay > 0) {  
  12.                     long tl = available.awaitNanos(delay);//等待delay時間  
  13.                 } else {  
  14.                     E x = q.poll();  
  15.                     assert x != null;  
  16.                     if (q.size() != 0)  
  17.                         available.signalAll(); // wake up other takers  
  18.                     return x;  
  19.                 }  
  20.             }  
  21.         }  
  22.     } finally {  
  23.         lock.unlock();  
  24.     }  

4.線程池使用策略

通過以上的詳解基本上能夠定制出自己需要的策略了,下面簡單介紹下Executors里面提供的一些常見線程池策略:

1.FixedThreadPool

  1. public static ExecutorService newFixedThreadPool(int nThreads) {  
  2.     return new ThreadPoolExecutor(nThreads, nThreads,  
  3.                                   0L, TimeUnit.MILLISECONDS,  
  4.                                   new LinkedBlockingQueue<Runnable>());  

實際上就是個不支持keepalivetime,且corePoolSize和maximumPoolSize相等的線程池。

2.SingleThreadExecutor

  1. public static ExecutorService newSingleThreadExecutor() {  
  2.     return new FinalizableDelegatedExecutorService  
  3.         (new ThreadPoolExecutor(11,  
  4.                                 0L, TimeUnit.MILLISECONDS,  
  5.                                 new LinkedBlockingQueue<Runnable>()));  

實際上就是個不支持keepalivetime,且corePoolSize和maximumPoolSize都等1的線程池。

3.CachedThreadPool

  1. public static ExecutorService newCachedThreadPool() {  
  2.     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
  3.                                   60L, TimeUnit.SECONDS,  
  4.                                   new SynchronousQueue<Runnable>());  

實際上就是個支持keepalivetime時間是60秒(線程空閑存活時間),且corePoolSize為0,maximumPoolSize無窮大的線程池。

4.SingleThreadScheduledExecutor

  1. public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {  
  2.     return new DelegatedScheduledExecutorService  
  3.         (new ScheduledThreadPoolExecutor(1, threadFactory));  
  4. }  

實際上是個corePoolSize為1的ScheduledExecutor。上文說過ScheduledExecutor采用無界等待隊列,所以maximumPoolSize沒有作用。

5.ScheduledThreadPool

  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {  
  2.     return new ScheduledThreadPoolExecutor(corePoolSize);  

實際上是corePoolSize課設定的ScheduledExecutor。上文說過ScheduledExecutor采用無界等待隊列,所以maximumPoolSize沒有作用。

以上還不一定滿足你的需要,完全可以根據自己需要去定制。

原文鏈接:http://singleant.iteye.com/blog/1423931

【編輯推薦】

  1. 設計Java應用程序的平滑停止
  2. 深入Java探索:Java內存區(qū)域
  3. 深入Java虛擬機:Class文件實例解析
  4. 同一段程序在Java和C中的不同結果
  5. Java初學者都必須理解的六大問題
責任編輯:林師授 來源: singleant的博客
相關推薦

2017-02-14 10:00:19

Java開發(fā)Lock

2015-11-06 10:26:53

JavaExecutor框架

2015-12-24 10:13:29

JavaExecutor框架

2019-11-19 09:00:38

JavaAND信號量

2019-07-18 11:08:09

Java并發(fā)框架

2023-12-14 07:36:16

Java并發(fā)原子類

2024-11-13 15:09:57

Java線程開發(fā)

2017-08-04 11:41:53

Javathreadpool框架

2017-08-07 20:50:27

JavaForkJoin

2023-03-24 15:44:52

Java多線程工具

2016-08-18 13:56:33

AndroidExecutorsubmit

2025-01-03 08:40:53

Java并發(fā)編程Guava庫

2012-02-13 09:57:51

JavaDisruptor

2010-04-27 09:17:23

內存屏障JVM

2021-02-03 06:15:26

工具postManHttp

2023-10-05 11:12:06

JUCUnsafe安全

2012-08-08 09:32:26

C++多進程并發(fā)框架

2010-05-04 08:44:42

Java并發(fā)模型

2012-05-10 10:18:14

JavaDisruptor

2020-11-03 09:10:18

JUC-Future
點贊
收藏

51CTO技術棧公眾號