自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Java線程進(jìn)階之ThreadPoolExecutor線程池執(zhí)行原理機(jī)制詳解

開發(fā) 前端
線程池中,把線程的狀態(tài)和數(shù)量通過int類型進(jìn)行維護(hù),高三位表示狀態(tài),低29位表示線程數(shù)量。這樣可以保證線程的狀態(tài)和數(shù)量的一致性。

[[423243]]

前言

線程池有很多優(yōu)點(diǎn):

降低資源消耗;

提高響應(yīng)速度;

提高線程的可管理性等等;

今天我們就來分析探討下原理實(shí)現(xiàn)

一、線程池接口簡(jiǎn)單分析

1、Executor接口

  1. public interface Executor { 
  2. // 執(zhí)行一個(gè)任務(wù)。任務(wù)都被封裝成Runnable的實(shí)現(xiàn) 
  3.     void execute(Runnable command); 

2、 ExecutorService接口

  1. public interface ExecutorService extends Executor { 
  2. // 啟動(dòng)有序的關(guān)閉,之前提交的任務(wù)將會(huì)被執(zhí)行,但不會(huì)接受新的任務(wù)。 
  3.     void shutdown(); 
  4. // 嘗試停止所有正在執(zhí)行的任務(wù),停止等待處理的任務(wù),病返回任務(wù)列表 
  5.     List<Runnable> shutdownNow(); 
  6. // 判斷線程池是否已經(jīng)關(guān)閉 
  7.     boolean isShutdown(); 
  8. // 如果關(guān)閉后所有任務(wù)都已完成。但是前提是必須先執(zhí)行:shutdown 或者 shutdownNow 
  9.     boolean isTerminated(); 
  10. // 在開啟shutdown之后,阻止所有的任務(wù)知道執(zhí)行完成 
  11.     boolean awaitTermination(long timeout, TimeUnit unit) 
  12.         throws InterruptedException; 
  13. // 提交任務(wù),帶返回結(jié)果的 
  14.     <T> Future<T> submit(Callable<T> task); 
  15. // 提交任務(wù),封裝返回結(jié)果為T 
  16.     <T> Future<T> submit(Runnable task, T result); 
  17.  // 提交一個(gè)普通任務(wù),返回結(jié)果任意 
  18.     Future<?> submit(Runnable task); 
  19. // 執(zhí)行一批任務(wù),返回結(jié)果為 List<Future<T>> 
  20.     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
  21.         throws InterruptedException; 
  22.     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 
  23.                                   long timeout, TimeUnit unit) 
  24.         throws InterruptedException; 
  25.     <T> T invokeAny(Collection<? extends Callable<T>> tasks) 
  26.         throws InterruptedException, ExecutionException; 
  27.     <T> T invokeAny(Collection<? extends Callable<T>> tasks, 
  28.                     long timeout, TimeUnit unit) 
  29.         throws InterruptedException, ExecutionException, TimeoutException; 

其具有5個(gè)核心的內(nèi)部類。其中4內(nèi)部類對(duì)應(yīng)的是拒絕策略。Worker是核心的執(zhí)行代碼;

3、 RejectedExecutionHandler

  1. public interface RejectedExecutionHandler { 
  2. // 拒絕執(zhí)行策略 
  3.     void rejectedExecution(Runnable r, ThreadPoolExecutor executor); 

4、 AbortPolicy 策略

Java線程池默認(rèn)的阻塞策略,不執(zhí)行此任務(wù),而且直接拋出一個(gè)運(yùn)行時(shí)異常

  1. public static class AbortPolicy implements RejectedExecutionHandler { 
  2.         public AbortPolicy() { } 
  3.         // 直接拋出異常,描述前線程的基本信息 
  4.         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 
  5.             throw new RejectedExecutionException("Task " + r.toString() + 
  6.                                                  " rejected from " + 
  7.                                                  e.toString()); 
  8.         } 
  9.     } 

5、DiscardPolicy策略

空方法,不做任何處理

  1. public static class DiscardPolicy implements RejectedExecutionHandler { 
  2.         public DiscardPolicy() { } 
  3.         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 
  4.         } 
  5.     } 

6、DiscardOldestPolicy 策略

從隊(duì)列里面拋棄一個(gè)最老的任務(wù),并再次execute 此task

  1. public static class DiscardOldestPolicy implements RejectedExecutionHandler { 
  2.         public DiscardOldestPolicy() { } 
  3.         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 
  4.             if (!e.isShutdown()) { 
  5.             // 從隊(duì)列里面取出最老的一個(gè)任務(wù) 
  6.                 e.getQueue().poll(); 
  7.                 // 手動(dòng)調(diào)用execute方法執(zhí)行,將任務(wù)添加到隊(duì)列中 
  8.                 e.execute(r); 
  9.             } 
  10.         } 
  11.     } 

7、CallerRunsPolicy 策略

  1. public static class CallerRunsPolicy implements RejectedExecutionHandler { 
  2.         /** 
  3.          * Creates a {@code CallerRunsPolicy}. 
  4.          */ 
  5.         public CallerRunsPolicy() { } 
  6.       // 如果當(dāng)前線程池沒有關(guān)閉,則調(diào)用線程的run方法 
  7.         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 
  8.             if (!e.isShutdown()) { 
  9.                 r.run(); 
  10.             } 
  11.         } 
  12.     } 

8、ThreadPoolExecutor

構(gòu)造函數(shù)詳解

  1. public class ThreadPoolExecutor extends AbstractExecutorService { 
  2. public ThreadPoolExecutor(int corePoolSize, 
  3.                               int maximumPoolSize, 
  4.                               long keepAliveTime, 
  5.                               TimeUnit unit, 
  6.                               BlockingQueue<Runnable> workQueue) { 
  7.         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
  8.              Executors.defaultThreadFactory(), defaultHandler); 
  9.     } 

構(gòu)造函數(shù)參數(shù)說明:

  • corePoolSize:線程池中的核心線程數(shù),空閑時(shí)候線程也不會(huì)回收,除非把a(bǔ)llowCoreThreadTimeOut設(shè)置為 true,這時(shí)核心線程才會(huì)被回收;
  • maximumPoolSize:線程池中可以創(chuàng)建的最大線程數(shù),限定為2^29-1;
  • keepAliveTime:當(dāng)線程池中創(chuàng)建的線程超過了核心線程數(shù)的時(shí)候,在沒有新任務(wù)加入的等待時(shí)間;
  • workQueue:存放任務(wù)的隊(duì)列,只有當(dāng)線程數(shù) > 核心線程數(shù),才會(huì)把其他的任務(wù)放入queue,一般常用的是queue就是ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue;
  • threadFactory:創(chuàng)建線程的工廠類;
  • handler:當(dāng)queue滿了和線程數(shù)達(dá)到最大限制,對(duì)于繼續(xù)到達(dá)的任務(wù)采取的策略。默認(rèn)采取AbortPolicy , 也就是拒絕策略,直接拋出異常;

9、核心成員變量分析

  • 線程池中設(shè)計(jì)非常巧妙的一個(gè)地方是把線程池的狀態(tài)和運(yùn)行的線程數(shù)量用一個(gè)int類型進(jìn)行存儲(chǔ);這樣一來可以保持線程池狀態(tài)和線程池活躍線程數(shù)量的一致性。因?yàn)锳tomicInteger是線程安全的;
  • workerCount:線程池中當(dāng)前活動(dòng)的線程數(shù)量,占據(jù)ctl的低29位;
  • runState:線程池運(yùn)行狀態(tài),占據(jù)ctl的高3位,有RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五種狀態(tài);
  • 為了將線程池的狀態(tài)和線程池中的工作線程的數(shù)量放到一個(gè)int里面進(jìn)行管理。他們利用了二進(jìn)制數(shù)據(jù)進(jìn)行位運(yùn)算。其中int類型有4個(gè)字節(jié),一個(gè)字節(jié)8位??偣灿?2位。其中高的3位表示線程的狀態(tài)。低29位代表線程的數(shù)量;

其中32位中,高三位代表的是狀態(tài):

  1. 111 > RUNNING 
  2. 000 > SHUTDOWN 
  3. 001 > STOP 
  4. 010 > TIDYING 
  5. 110 > TERMINATED 

低29位代表線程的數(shù)量。所以最大的線程數(shù)為 2^29 -1 = 536870911;

// 記錄線程池狀態(tài)和線程數(shù)量(總共32位,前三位表示線程池狀態(tài),后29位表示線程數(shù)量),保證線程安全性

  1. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 
  2. // int 字節(jié)32位,COUNT_BITS代表的是29位 
  3.     private static final int COUNT_BITS = Integer.SIZE - 3; 
  4. // 線程的最大容量:000 11111111111111111111111111111 
  5.     private static final int CAPACITY   = (1 << COUNT_BITS) - 1; 
  6. // 運(yùn)行狀態(tài):111 00000000000000000000000000000 
  7.     private static final int RUNNING    = -1 << COUNT_BITS; 
  8. // 關(guān)閉狀態(tài):000 00000000000000000000000000000 
  9.     private static final int SHUTDOWN   =  0 << COUNT_BITS; 
  10. // 停止?fàn)顟B(tài):001 00000000000000000000000000000 
  11.     private static final int STOP       =  1 << COUNT_BITS; 
  12. // 整理狀態(tài):010 00000000000000000000000000000 
  13.     private static final int TIDYING    =  2 << COUNT_BITS; 
  14. // 終止?fàn)顟B(tài):011 00000000000000000000000000000 
  15.     private static final int TERMINATED =  3 << COUNT_BITS; 
  1. /** 
  2. * 是按位取反的意思,CAPACITY表示的是高位的3個(gè)0,和低位的29個(gè)1,而~CAPACITY則表示高位的3個(gè)1,2低位的9個(gè)0, 
  3. * 然后再與入?yún)執(zhí)行按位與操作,即高3位保持原樣,低29位全部設(shè)置為0,也就獲取了線程池的運(yùn)行狀態(tài)runState 
  4. */ 
  5.     private static int runStateOf(int c)     { return c & ~CAPACITY; } 
  6. /** 
  7. * 返回當(dāng)前線程的數(shù)量。其中c代表線程池的狀態(tài),即是高三位。: 
  8. * 而CAPACITY 代表的是線程的容量,即000 11111111111111111111111111111 
  9. * c & CAPACITY ,只有當(dāng)都為1的時(shí)候,才為真,這樣直接舍棄高位 
  10. */ 
  11.     private static int workerCountOf(int c)  { return c & CAPACITY; } 
  12. /** 
  13. * 傳入的rs表示線程池運(yùn)行狀態(tài)runState,其是高3位有值,低29位全部為0的int, 
  14. * 而wc則代表線程池中有效線程的數(shù)量workerCount,其為高3位全部為0,而低29位有值得int, 
  1. * 將runState和workerCount做或操作|處理,即用runState的高3位,workerCount的低29位填充的數(shù)字,而默認(rèn)傳入的 
  2.  
  3. */ 
  4.     private static int ctlOf(int rs, int wc) { return rs | wc; } 

線程池的狀態(tài)轉(zhuǎn)換:

  1. // 調(diào)用了shutdown()方法  
  2. RUNNING -> SHUTDOWN  
  3. // 調(diào)用了shutdownNow()  
  4. (RUNNING 或 SHUTDOWN) -> STOP  
  5. // 當(dāng)隊(duì)列和線程池為空  
  6. SHUTDOWN -> TIDYING  
  7. // 當(dāng)線程池為空  
  8. STOP -> TIDYING  
  9. // 當(dāng)terminated()鉤子方法執(zhí)行完成  
  10. TIDYING -> TERMINATED  

二、線程池執(zhí)行流程源碼分析

1、程序入口:execute 方法

  1. public void execute(Runnable command) { 
  2.    if (command == null
  3.        throw new NullPointerException(); 
  4.    //獲取當(dāng)前線程池的狀態(tài)+線程個(gè)數(shù)變量 
  5.    int c = ctl.get(); 
  6.    //當(dāng)前線程池線程個(gè)數(shù)是否小于corePoolSize,小于則開啟新線程運(yùn)行 
  7.    if (workerCountOf(c) < corePoolSize) { 
  8.        if (addWorker(command, true)) 
  9.            return
  10.        c = ctl.get(); 
  11.    } 
  12.    //如果線程池處于RUNNING狀態(tài),則添加任務(wù)到阻塞隊(duì)列 
  13.    if (isRunning(c) && workQueue.offer(command)) { 
  14.        //二次檢查 
  15.        int recheck = ctl.get(); 
  16.        //如果當(dāng)前線程池狀態(tài)不是RUNNING則從隊(duì)列刪除任務(wù),并執(zhí)行拒絕策略 
  17.        if (! isRunning(recheck) && remove(command)) 
  18.            reject(command); 
  19.        //否者如果當(dāng)前線程池線程空,則添加一個(gè)線程 
  20.        else if (workerCountOf(recheck) == 0) 
  21.            addWorker(nullfalse); 
  22.    } 
  23.    //如果隊(duì)列滿了,則新增線程,新增失敗則執(zhí)行拒絕策略 
  24.    else if (!addWorker(command, false)) 
  25.        reject(command); 
  • 如果當(dāng)前線程池線程個(gè)數(shù)小于corePoolSize則開啟新線程;
  • 否則添加任務(wù)到任務(wù)隊(duì)列;
  • 如果任務(wù)隊(duì)列滿了,則嘗試新開啟線程執(zhí)行任務(wù),如果線程個(gè)數(shù)>maximumPoolSize則執(zhí)行拒絕策略;

重點(diǎn)看addWorkder方法:

  1. private boolean addWorker(Runnable firstTask, boolean core) { 
  2.    retry: 
  3.    for (;;) { 
  4.        int c = ctl.get(); 
  5.        int rs = runStateOf(c); 
  6.        //1、 檢查隊(duì)列是否只在必要時(shí)為空. 
  7.        if (rs >= SHUTDOWN && 
  8.            ! (rs == SHUTDOWN && 
  9.               firstTask == null && 
  10.               ! workQueue.isEmpty())) 
  11.            return false
  12.        //循環(huán)cas增加線程個(gè)數(shù) 
  13.        for (;;) { 
  14.            int wc = workerCountOf(c); 
  15.            //如果線程個(gè)數(shù)超限則返回false 
  16.            if (wc >= CAPACITY || 
  17.                wc >= (core ? corePoolSize : maximumPoolSize)) 
  18.                return false
  19.            //cas增加線程個(gè)數(shù),同時(shí)只有一個(gè)線程成功 
  20.            if (compareAndIncrementWorkerCount(c)) 
  21.                break retry; 
  22.            //cas失敗了,則看線程池狀態(tài)是否變化了,變化則跳到外層循環(huán)重試重新獲取線程池狀態(tài),否者內(nèi)層循環(huán)重新cas。 
  23.            c = ctl.get();  // Re-read ctl 
  24.            if (runStateOf(c) != rs) 
  25.                continue retry; 
  26.        } 
  27.    } 
  28.    //2、到這里說明cas成功了 
  29.    boolean workerStarted = false
  30.    boolean workerAdded = false
  31.    Worker w = null
  32.    try { 
  33.        //創(chuàng)建worker 
  34.        final ReentrantLock mainLock = this.mainLock; 
  35.        w = new Worker(firstTask); 
  36.        final Thread t = w.thread; 
  37.        if (t != null) { 
  38.            //加獨(dú)占鎖,為了workers同步,因?yàn)榭赡芏鄠€(gè)線程調(diào)用了線程池的execute方法。 
  39.            mainLock.lock(); 
  40.            try { 
  41.                //重新檢查線程池狀態(tài),為了避免在獲取鎖前調(diào)用了shutdown接口 
  42.                int c = ctl.get(); 
  43.                int rs = runStateOf(c); 
  44.                if (rs < SHUTDOWN || 
  45.                    (rs == SHUTDOWN && firstTask == null)) { 
  46.                    if (t.isAlive()) // precheck that t is startable 
  47.                        throw new IllegalThreadStateException(); 
  48.                    //添加任務(wù) 
  49.                    workers.add(w); 
  50.                    int s = workers.size(); 
  51.                    if (s > largestPoolSize) 
  52.                        largestPoolSize = s; 
  53.                    workerAdded = true
  54.                } 
  55.            } finally { 
  56.                mainLock.unlock(); 
  57.            } 
  58.            //添加成功則啟動(dòng)任務(wù) 
  59.            if (workerAdded) { 
  60.                t.start(); 
  61.                workerStarted = true
  62.            } 
  63.        } 
  64.    } finally { 
  65.        if (! workerStarted) 
  66.            addWorkerFailed(w); 
  67.    } 
  68.    return workerStarted; 
  • 第(1)雙重循環(huán)目的是通過cas增加線程池線程個(gè)數(shù);
  • 第(2)主要是并發(fā)安全的把任務(wù)添加到workers里面,并且啟動(dòng)任務(wù)執(zhí)行;
  1. rs >= SHUTDOWN && 
  2.               ! (rs == SHUTDOWN && 
  3.                   firstTask == null && 
  4.                   ! workQueue.isEmpty()) 

也就是說下面幾種情況下會(huì)返回false:

  • 當(dāng)前線程池狀態(tài)為STOP,TIDYING,TERMINATED;
  • 當(dāng)前線程池狀態(tài)為SHUTDOWN并且已經(jīng)有了第一個(gè)任務(wù);
  • 當(dāng)前線程池狀態(tài)為SHUTDOWN并且任務(wù)隊(duì)列為空;
  • 內(nèi)層循環(huán)作用是使用cas增加線程個(gè)數(shù),如果線程個(gè)數(shù)超限則返回false,否者進(jìn)行cas,cas成功則退出雙循環(huán),否者cas失敗了,要看當(dāng)前線程池的狀態(tài)是否變化了,如果變了,則重新進(jìn)入外層循環(huán)重新獲取線程池狀態(tài),否者進(jìn)入內(nèi)層循環(huán)繼續(xù)進(jìn)行cas嘗試;
  • 到了第(2)說明CAS成功了,也就是說線程個(gè)數(shù)加一了,但是現(xiàn)在任務(wù)還沒開始執(zhí)行,這里使用全局的獨(dú)占鎖來控制workers里面添加任務(wù),其實(shí)也可以使用并發(fā)安全的set,但是性能沒有獨(dú)占鎖好;
  • 這里需要注意的是要在獲取鎖后重新檢查線程池的狀態(tài),這是因?yàn)槠渌€程可可能在本方法獲取鎖前改變了線程池的狀態(tài),比如調(diào)用了shutdown方法。添加成功則啟動(dòng)任務(wù)執(zhí)行;

2、 工作線程Worker

先看下構(gòu)造函數(shù):

  1. Worker(Runnable firstTask) { 
  2.    setState(-1); // 在調(diào)用runWorker前禁止中斷 
  3.    this.firstTask = firstTask; 
  4.    this.thread = getThreadFactory().newThread(this);//創(chuàng)建一個(gè)線程 
  • 這里添加一個(gè)新狀態(tài)-1是為了避免當(dāng)前線程worker線程被中斷;
  • 這里設(shè)置了-1所以條件不滿足就不會(huì)中斷該線程了;
  • 運(yùn)行runWorker時(shí)候會(huì)調(diào)用unlock方法,該方法吧status變?yōu)榱?,所以這時(shí)候調(diào)用shutdownNow會(huì)中斷worker線程;
  1. final void runWorker(Worker w) { 
  2.        Thread wt = Thread.currentThread(); 
  3.        Runnable task = w.firstTask; 
  4.        w.firstTask = null
  5.        w.unlock(); // status設(shè)置為0,允許中斷 
  6.        boolean completedAbruptly = true
  7.        try { 
  8.            while (task != null || (task = getTask()) != null) { 
  9.                w.lock(); 
  10.                // 如果線程池當(dāng)前狀態(tài)至少是stop,則設(shè)置中斷標(biāo)志; 
  11.                // 如果線程池當(dāng)前狀態(tài)是RUNNININ,則重置中斷標(biāo)志,重置后需要重新 
  12.                //檢查下線程池狀態(tài),因?yàn)楫?dāng)重置中斷標(biāo)志時(shí)候,可能調(diào)用了線程池的shutdown方法 
  13.                //改變了線程池狀態(tài)。 
  14.                if ((runStateAtLeast(ctl.get(), STOP) || 
  15.                     (Thread.interrupted() && 
  16.                      runStateAtLeast(ctl.get(), STOP))) && 
  17.                    !wt.isInterrupted()) 
  18.                    wt.interrupt(); 
  19.                try { 
  20.                    //任務(wù)執(zhí)行前干一些事情 
  21.                    beforeExecute(wt, task); 
  22.                    Throwable thrown = null
  23.                    try { 
  24.                        task.run();//執(zhí)行任務(wù) 
  25.                    } catch (RuntimeException x) { 
  26.                        thrown = x; throw x; 
  27.                    } catch (Error x) { 
  28.                        thrown = x; throw x; 
  29.                    } catch (Throwable x) { 
  30.                        thrown = x; throw new Error(x); 
  31.                    } finally { 
  32.                        //任務(wù)執(zhí)行完畢后干一些事情 
  33.                        afterExecute(task, thrown); 
  34.                    } 
  35.                } finally { 
  36.                    task = null
  37.                    //統(tǒng)計(jì)當(dāng)前worker完成了多少個(gè)任務(wù) 
  38.                    w.completedTasks++; 
  39.                    w.unlock(); 
  40.                } 
  41.            } 
  42.            completedAbruptly = false
  43.        } finally { 
  44.            //執(zhí)行清了工作 
  45.            processWorkerExit(w, completedAbruptly); 
  46.        } 
  47.    } 
  • 如果當(dāng)前task為空,則直接執(zhí)行;
  • 否者調(diào)用getTask從任務(wù)隊(duì)列獲取一個(gè)任務(wù)執(zhí)行,如果任務(wù)隊(duì)列為空,則worker退出;
  1. private Runnable getTask() { 
  2.    boolean timedOut = false; // Did the last poll() time out
  3.    retry: 
  4.    for (;;) { 
  5.        int c = ctl.get(); 
  6.        int rs = runStateOf(c); 
  7.        // 如果當(dāng)前線程池狀態(tài)>=STOP 或者線程池狀態(tài)為shutdown并且工作隊(duì)列為空則,減少工作線程個(gè)數(shù) 
  8.        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 
  9.            decrementWorkerCount(); 
  10.            return null
  11.        } 
  12.        boolean timed;      // Are workers subject to culling? 
  13.        for (;;) { 
  14.            int wc = workerCountOf(c); 
  15.            timed = allowCoreThreadTimeOut || wc > corePoolSize; 
  16.            if (wc <= maximumPoolSize && ! (timedOut && timed)) 
  17.                break; 
  18.            if (compareAndDecrementWorkerCount(c)) 
  19.                return null
  20.            c = ctl.get();  // Re-read ctl 
  21.            if (runStateOf(c) != rs) 
  22.                continue retry; 
  23.            // else CAS failed due to workerCount change; retry inner loop 
  24.        } 
  25.        try { 
  26.            //根據(jù)timed選擇調(diào)用poll還是阻塞的take 
  27.            Runnable r = timed ? 
  28.                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 
  29.                workQueue.take(); 
  30.            if (r != null
  31.                return r; 
  32.            timedOut = true
  33.        } catch (InterruptedException retry) { 
  34.            timedOut = false
  35.        } 
  36.    } 
  37. private void processWorkerExit(Worker w, boolean completedAbruptly) { 
  38.    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 
  39.        decrementWorkerCount(); 
  40.    //統(tǒng)計(jì)整個(gè)線程池完成的任務(wù)個(gè)數(shù) 
  41.    final ReentrantLock mainLock = this.mainLock; 
  42.    mainLock.lock(); 
  43.    try { 
  44.        completedTaskCount += w.completedTasks; 
  45.        workers.remove(w); 
  46.    } finally { 
  47.        mainLock.unlock(); 
  48.    } 
  49.    //嘗試設(shè)置線程池狀態(tài)為TERMINATED,如果當(dāng)前是shutdonw狀態(tài)并且工作隊(duì)列為空 
  50.    //或者當(dāng)前是stop狀態(tài)當(dāng)前線程池里面沒有活動(dòng)線程 
  51.    tryTerminate(); 
  52.    //如果當(dāng)前線程個(gè)數(shù)小于核心個(gè)數(shù),則增加 
  53.    int c = ctl.get(); 
  54.    if (runStateLessThan(c, STOP)) { 
  55.        if (!completedAbruptly) { 
  56.            int min = allowCoreThreadTimeOut ? 0 : corePoolSize; 
  57.            if (min == 0 && ! workQueue.isEmpty()) 
  58.                min = 1; 
  59.            if (workerCountOf(c) >= min
  60.                return; // replacement not needed 
  61.        } 
  62.        addWorker(nullfalse); 
  63.    } 

3、 shutdown操作

  • 調(diào)用shutdown后,線程池就不會(huì)在接受新的任務(wù)了;
  • 但是工作隊(duì)列里面的任務(wù)還是要執(zhí)行的,但是該方法立刻返回的,并不等待隊(duì)列任務(wù)完成在返回;
  1. public void shutdown() { 
  2.    final ReentrantLock mainLock = this.mainLock; 
  3.    mainLock.lock(); 
  4.    try { 
  5.        //權(quán)限檢查 
  6.        checkShutdownAccess(); 
  7.        //設(shè)置當(dāng)前線程池狀態(tài)為SHUTDOWN,如果已經(jīng)是SHUTDOWN則直接返回 
  8.        advanceRunState(SHUTDOWN); 
  9.        //設(shè)置中斷標(biāo)志 
  10.        interruptIdleWorkers(); 
  11.        onShutdown(); // hook for ScheduledThreadPoolExecutor 
  12.    } finally { 
  13.        mainLock.unlock(); 
  14.    } 
  15.    //嘗試狀態(tài)變?yōu)門ERMINATED 
  16.    tryTerminate(); 

如果當(dāng)前狀態(tài)>=targetState則直接返回,否者設(shè)置當(dāng)前狀態(tài)為targetState;

  1. private void advanceRunState(int targetState) { 
  2.    for (;;) { 
  3.        int c = ctl.get(); 
  4.        if (runStateAtLeast(c, targetState) || 
  5.            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) 
  6.            break; 
  7.    } 
  8. private void interruptIdleWorkers() { 
  9.    interruptIdleWorkers(false); 
  • 設(shè)置所有線程的中斷標(biāo)志,主要這里首先加了全局鎖;
  • 同時(shí)只有一個(gè)線程可以調(diào)用shutdown時(shí)候設(shè)置中斷標(biāo)志,然后嘗試獲取worker自己的鎖,獲取成功則設(shè)置中斷標(biāo)示;
  1. private void interruptIdleWorkers(boolean onlyOne) { 
  2.    final ReentrantLock mainLock = this.mainLock; 
  3.    mainLock.lock(); 
  4.    try { 
  5.        for (Worker w : workers) { 
  6.            Thread t = w.thread; 
  7.            if (!t.isInterrupted() && w.tryLock()) { 
  8.                try { 
  9.                    t.interrupt(); 
  10.                } catch (SecurityException ignore) { 
  11.                } finally { 
  12.                    w.unlock(); 
  13.                } 
  14.            } 
  15.            if (onlyOne) 
  16.                break; 
  17.        } 
  18.    } finally { 
  19.        mainLock.unlock(); 
  20.    } 

4、shutdownNow操作

調(diào)用shutdown后,線程池就不會(huì)在接受新的任務(wù)了,并且丟棄工作隊(duì)列里面里面的任務(wù),正在執(zhí)行的任務(wù)會(huì)被中斷,但是該方法立刻返回的,并不等待激活的任務(wù)執(zhí)行完成在返回。返回隊(duì)列里面的任務(wù)列表;

  1. public List<Runnable> shutdownNow() { 
  2.    List<Runnable> tasks; 
  3.    final ReentrantLock mainLock = this.mainLock; 
  4.    mainLock.lock(); 
  5.    try { 
  6.        checkShutdownAccess();//權(quán)限檢查 
  7.        advanceRunState(STOP);// 設(shè)置線程池狀態(tài)為stop 
  8.        interruptWorkers();//中斷線程 
  9.        tasks = drainQueue();//移動(dòng)隊(duì)列任務(wù)到tasks 
  10.    } finally { 
  11.        mainLock.unlock(); 
  12.    } 
  13.    tryTerminate(); 
  14.    return tasks; 
  • 調(diào)用隊(duì)列的drainTo一次當(dāng)前隊(duì)列的元素到taskList;
  • 可能失敗,如果調(diào)用drainTo后隊(duì)列海不為空,則循環(huán)刪除,并添加到taskList;
  1. private List<Runnable> drainQueue() { 
  2.    BlockingQueue<Runnable> q = workQueue; 
  3.    List<Runnable> taskList = new ArrayList<Runnable>(); 
  4.    q.drainTo(taskList); 
  5.    if (!q.isEmpty()) { 
  6.        for (Runnable r : q.toArray(new Runnable[0])) { 
  7.            if (q.remove(r)) 
  8.                taskList.add(r); 
  9.        } 
  10.    } 
  11.    return taskList; 

5、 awaitTermination操作

  • 等待線程池狀態(tài)變?yōu)門ERMINATED則返回,或者時(shí)間超時(shí);
  • 由于整個(gè)過程獨(dú)占鎖,所以一般調(diào)用shutdown或者shutdownNow后使用;
  1. public boolean awaitTermination(long timeout, TimeUnit unit) 
  2.        throws InterruptedException { 
  3.        long nanos = unit.toNanos(timeout); 
  4.        final ReentrantLock mainLock = this.mainLock; 
  5.        mainLock.lock(); 
  6.        try { 
  7.            for (;;) { 
  8.                if (runStateAtLeast(ctl.get(), TERMINATED)) 
  9.                    return true
  10.                if (nanos <= 0) 
  11.                    return false
  12.                nanos = termination.awaitNanos(nanos); 
  13.            } 
  14.        } finally { 
  15.            mainLock.unlock(); 
  16.        } 
  17.    } 

總結(jié)

當(dāng)往線程池中添加任務(wù)的時(shí)候,每次添加一個(gè)任務(wù)都回去新增一個(gè)線程。直到不滿足 wc < corePoolSize;

當(dāng)前線程池的大小已經(jīng)達(dá)到了corePoolSize的時(shí)候,每次添加任務(wù)會(huì)被存放到阻塞任務(wù)隊(duì)列中。等待執(zhí)行;

等等待任務(wù)隊(duì)列也滿的時(shí)候,且添加失敗。此時(shí)在來新的任務(wù),就會(huì)接著增加線程的個(gè)數(shù),直到滿足:wc >= maximumPoolSize ,添加線程失敗執(zhí)行拒絕策略;

線程池中,把線程的狀態(tài)和數(shù)量通過int類型進(jìn)行維護(hù),高三位表示狀態(tài),低29位表示線程數(shù)量。這樣可以保證線程的狀態(tài)和數(shù)量的一致性;

線程池巧妙的使用一個(gè)Integer類型原子變量來記錄線程池狀態(tài)和線程池線程個(gè)數(shù);

 

責(zé)任編輯:武曉燕 來源: Android開發(fā)編程
相關(guān)推薦

2020-12-10 08:24:40

線程池線程方法

2015-10-10 09:39:42

Java線程池源碼解析

2021-07-30 19:44:51

AndroidJava線程

2021-09-18 06:56:01

JavaCAS機(jī)制

2020-12-08 08:53:53

編程ThreadPoolE線程池

2020-07-08 12:05:55

Java線程池策略

2020-03-05 15:34:16

線程池C語言局域網(wǎng)

2012-05-15 02:18:31

Java線程池

2020-09-04 10:29:47

Java線程池并發(fā)

2023-11-29 16:38:12

線程池阻塞隊(duì)列開發(fā)

2021-09-06 13:12:05

前端JavaScript編程

2021-09-01 06:48:16

AndroidGlide緩存

2013-05-23 15:59:00

線程池

2012-02-01 11:20:23

Java線程

2023-06-07 13:49:00

多線程編程C#

2024-11-21 07:00:00

線程池Java開發(fā)

2022-03-22 09:20:57

應(yīng)用線程池技術(shù)

2021-07-16 11:35:20

Java線程池代碼

2024-07-15 08:20:24

2023-10-31 08:22:31

線程類型.NET
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)