Java線程池架構(gòu)(一)原理和源碼解析
在前面介紹JUC的文章中,提到了關(guān)于線程池Execotors的創(chuàng)建介紹,在文章:《java之JUC系列-外部Tools》中第一部分有詳細(xì)的說(shuō)明,請(qǐng)參閱;
文章中其實(shí)說(shuō)明了外部的使用方式,但是沒有說(shuō)內(nèi)部是如何實(shí)現(xiàn)的,為了加深對(duì)實(shí)現(xiàn)的理解,在使用中可以放心,我們這里將做源碼解析以及反饋到原理 上,Executors工具可以創(chuàng)建普通的線程池以及schedule調(diào)度任務(wù)的調(diào)度池,其實(shí)兩者實(shí)現(xiàn)上還是有一些區(qū)別,但是理解了ThreadPoolExecutor,在看ScheduledThreadPoolExecutor就非常輕松了,后面的文章中也會(huì)專門介紹這塊,但是需要先看這篇文章。
使用Executors最常用的莫過(guò)于是使用:Executors.newFixedThreadPool(int)這個(gè)方法,因?yàn)樗瓤梢韵拗茢?shù)量,而且線程用完后不會(huì)一直被cache??;那么就通過(guò)它來(lái)看看源碼,回過(guò)頭來(lái)再看其他構(gòu)造方法的區(qū)別:
在《java之JUC系列-外部Tools》文章中提到了構(gòu)造方法,為了和本文對(duì)接,再貼下代碼。
- public static ExecutorService <strong>newFixedThreadPool</strong>(int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue());
- }
其實(shí)你可以自己new一個(gè)ThreadPoolExecutor,來(lái)達(dá)到自己的參數(shù)可控的程度,例如,可以將LinkedBlockingQueue換成其它的(如:SynchronousQueue),只是可讀性會(huì)降低,這里只是使用了一種設(shè)計(jì)模式。
我們現(xiàn)在來(lái)看看ThreadPoolExecutor的源碼是怎么樣的,也許你剛開始看他的源碼會(huì)很痛苦,因?yàn)槟悴恢雷髡邽槭裁词沁@樣設(shè)計(jì)的,所以本文就我看到的思想會(huì)給你做一個(gè)介紹,此時(shí)也許你通過(guò)知道了一些作者的思想,你也許就知道應(yīng)該該如何去操作了。
這里來(lái)看下構(gòu)造方法中對(duì)那些屬性做了賦值:
源碼段1:
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- if (corePoolSize < 0 ||
- maximumPoolSize <= 0 ||
- maximumPoolSize < corePoolSize ||
- keepAliveTime < 0)
- throw new IllegalArgumentException();
- if (workQueue == null || threadFactory == null || handler == null)
- throw new NullPointerException();
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.threadFactory = threadFactory;
- this.handler = handler;
- }
這里你可以看到最終賦值的過(guò)程,可以先大概知道下參數(shù)的意思:
corePoolSize:核心運(yùn)行的poolSize,也就是當(dāng)超過(guò)這個(gè)范圍的時(shí)候,就需要將新的Runnable放入到等待隊(duì)列workQueue中了,我們把這些Runnable就叫做要去執(zhí)行的任務(wù)吧。
maximumPoolSize:一般你用不到,當(dāng)大于了這個(gè)值就會(huì)將任務(wù)由一個(gè)丟棄處理機(jī)制來(lái)處理,但是當(dāng)你發(fā)生:newFixedThreadPool的時(shí)候,corePoolSize和maximumPoolSize是一樣的,而corePoolSize是先執(zhí)行的,所以他會(huì)先被放入等待隊(duì)列,而不會(huì)執(zhí)行到下面的丟棄處理中,看了后面的代碼你就知道了。
workQueue:等待隊(duì)列,當(dāng)達(dá)到corePoolSize的時(shí)候,就向該等待隊(duì)列放入線程信息(默認(rèn)為一個(gè)LinkedBlockingQueue),運(yùn)行中的線程屬性為:workers,為一個(gè)HashSet;我們的Runnable內(nèi)部被包裝了一層,后面會(huì)看到這部分代碼;這個(gè)隊(duì)列默認(rèn)是一個(gè)無(wú)界隊(duì)列(你也可以設(shè)定一個(gè)有界隊(duì)列),所以在生產(chǎn)者瘋狂生產(chǎn)的時(shí)候,考慮如何控制的問(wèn)題。
keepAliveTime:默認(rèn)都是0,當(dāng)線程沒有任務(wù)處理后,保持多長(zhǎng)時(shí)間,當(dāng)你使用:newCachedThreadPool(),它將是60s的時(shí)間。這個(gè)參數(shù)在運(yùn)行中的線程從workQueue獲取任務(wù)時(shí),當(dāng)(poolSize >corePoolSize || allowCoreThreadTimeOut)會(huì)用到,當(dāng)然allowCoreThreadTimeOut要設(shè)置為true,也會(huì)先判定keepAliveTime是大于0的,不過(guò)由于它在corePoolSize上采用了Integer.MAX_VALUE,當(dāng)遇到系統(tǒng)遇到瞬間沖擊,workers就會(huì)迅速膨脹,所以這個(gè)地方就不要去設(shè)置allowCoreThreadTimeOut=true,否則結(jié)果是這些運(yùn)行中的線程會(huì)持續(xù)60s以上;另外,如果corePoolSize的值還沒到Integer.MAX_VALUE,當(dāng)超過(guò)那個(gè)值以后,這些運(yùn)行中的線程,也是
threadFactory:是構(gòu)造Thread的方法,你可以自己去包裝和傳遞,主要實(shí)現(xiàn)newThread方法即可;
handler:也就是參數(shù)maximumPoolSize達(dá)到后丟棄處理的方法,java提供了5種丟棄處理的方法,當(dāng)然你也可以自己根據(jù)實(shí)際情況去重寫,主要是要實(shí)現(xiàn)接口:RejectedExecutionHandler中的方法: public void rejectedExecution(Runnabler, ThreadPoolExecutor e) java默認(rèn)的是使用:AbortPolicy,他的作用是當(dāng)出現(xiàn)這中情況的時(shí)候會(huì)拋出一個(gè)異常;
其余的還包含:
- CallerRunsPolicy:如果發(fā)現(xiàn)線程池還在運(yùn)行,就直接運(yùn)行這個(gè)線程
- DiscardOldestPolicy:在線程池的等待隊(duì)列中,將頭取出一個(gè)拋棄,然后將當(dāng)前線程放進(jìn)去。
- DiscardPolicy:什么也不做
- AbortPolicy:java默認(rèn),拋出一個(gè)異常:RejectedExecutionException。
你可以自己寫一個(gè),例如我們想在這個(gè)處理中,既不是完全丟棄,也不是完全啟動(dòng),也不是拋異常,而是控制生產(chǎn)者的線程,那么你就可以嘗試某種方式將生產(chǎn)者的線程blocking住,其實(shí)就有點(diǎn)類似提到的Semaphor的功能了。
通常你得到線程池后,會(huì)調(diào)用其中的:submit方法或execute方法 去操作;其實(shí)你會(huì)發(fā)現(xiàn),submit方法最終會(huì)調(diào)用execute方法來(lái)進(jìn)行操作,只是他提供了一個(gè)Future來(lái)托管返回值的處理而已,當(dāng)你調(diào)用需要有 返回值的信息時(shí),你用它來(lái)處理是比較好的;這個(gè)Future會(huì)包裝對(duì)Callable信息,并定義一個(gè)Sync對(duì)象(),當(dāng)你發(fā)生讀取返回值的操作的時(shí) 候,會(huì)通過(guò)Sync對(duì)象進(jìn)入鎖,直到有返回值的數(shù)據(jù)通知,具體細(xì)節(jié)先不要看太多。
繼續(xù)向下,來(lái)看看execute最為核心的方法吧:
源碼段2:
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
- if (runState == RUNNING && workQueue.offer(command)) {
- if (runState != RUNNING || poolSize == 0)
- ensureQueuedTaskHandled(command);
- }
- else if (!addIfUnderMaximumPoolSize(command))
- reject(command); // is shutdown or saturated
- }
這段代碼看似簡(jiǎn)單,其實(shí)有點(diǎn)難懂,很多人也是這里沒看懂,沒事,我一個(gè)if一個(gè)if說(shuō):
首先第一個(gè)判定空操作就不用說(shuō)了,下面判定的poolSize >= corePoolSize成立時(shí)候會(huì)進(jìn)入if的區(qū)域,當(dāng)然它不成立也有可能會(huì)進(jìn)入,他會(huì)判定addIfUnderCorePoolSize是否返回false,如果返回false就會(huì)進(jìn)去;
我們先來(lái)看下addIfUnderCorePoolSize方法的源碼是什么:
源碼段3:
- private boolean addIfUnderCorePoolSize(Runnable firstTask) {
- Thread t = null;
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- if (poolSize < corePoolSize && runState == RUNNING)
- t = addThread(firstTask);
- } finally {
- mainLock.unlock();
- }
- if (t == null)
- return false;
- t.start();
- return true;
- }
可以發(fā)現(xiàn),這段源碼是如果發(fā)現(xiàn)小雨corePoolSize就會(huì)創(chuàng)建一個(gè)新的線程,并且調(diào)用線程的start()方法將線程運(yùn)行起來(lái):這個(gè)addThread()方法,我們先不考慮細(xì)節(jié),因?yàn)槲覀冞€要先看到前面是怎么進(jìn)去的,這里可以發(fā)信啊,只有沒有創(chuàng)建成功Thread才會(huì)返回false,也就是當(dāng)當(dāng)前的poolSize > corePoolSize的時(shí)候,或線程池已經(jīng)不是在running狀態(tài)的時(shí)候才會(huì)出現(xiàn);
注意:這里在外部判定一次poolSize和corePoolSize只是初步判定,內(nèi)部是加鎖后判定的,以得到更為準(zhǔn)確的結(jié)果,而外部初步判定如果是大于了,就沒有必要進(jìn)入這段有鎖的代碼了。
此時(shí)我們知道了,當(dāng)前線程數(shù)量大于corePoolSize的時(shí)候,就會(huì)進(jìn)入【代碼段2】的第一個(gè)if語(yǔ)句中,回到【源碼段2】,繼續(xù)看if語(yǔ)句中的內(nèi)容:
這里標(biāo)記為
源碼段4:
- if (runState == RUNNING && workQueue.offer(command)) {
- if (runState != RUNNING || poolSize == 0)
- ensureQueuedTaskHandled(command);
- }
- else if (!addIfUnderMaximumPoolSize(command))
- reject(command); // is shutdown or saturated
第一個(gè)if,也就是當(dāng)當(dāng)前狀態(tài)為running的時(shí)候,就會(huì)去執(zhí)行workQueue.offer(command),這個(gè)workQueue其實(shí) 就是一個(gè)BlockingQueue,offer()操作就是在隊(duì)列的尾部寫入一個(gè)對(duì)象,此時(shí)寫入的對(duì)象為線程的對(duì)象而已;所以你可以認(rèn)為只有線程池在 RUNNING狀態(tài),才會(huì)在隊(duì)列尾部插入數(shù)據(jù),否則就執(zhí)行else if,其實(shí)else if可以看出是要做一個(gè)是否大于MaximumPoolSize的判定,如果大于這個(gè)值,就會(huì)做reject的操作,關(guān)于reject的說(shuō)明,我們?cè)凇驹?碼段1】的解釋中已經(jīng)非常明確的說(shuō)明,這里可以簡(jiǎn)單看下源碼,以應(yīng)征結(jié)果:
源碼段5:
- private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
- Thread t = null;
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- if (poolSize < maximumPoolSize && runState == RUNNING)
- //在corePoolSize = maximumPoolSize下,該代碼幾乎不可能運(yùn)行
- t = addThread(firstTask);
- } finally {
- mainLock.unlock(); }
- if (t == null)
- return false;
- t.start();
- return true; }
- void reject(Runnable command) {
- handler.rejectedExecution(command, this); }
也就是如果線程池滿了,而且線程池調(diào)用了shutdown后,還在調(diào)用execute方法時(shí),就會(huì)拋出上面說(shuō)明的異常:RejectedExecutionException 再回頭來(lái)看下【代碼段4】中進(jìn)入到等待隊(duì)列后的操作:
- if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command);
這段代碼是要在線程池運(yùn)行狀態(tài)不是RUNNING或poolSize == 0才會(huì)調(diào)用,他是干啥呢? 他為什么會(huì)不等于RUNNING呢?外面那一層不是判定了他== RUNNING了么,其實(shí)有時(shí)間差就是了,如果是poolSize == 0也會(huì)執(zhí)行這段代碼,但是里面的判定條件是如果不是RUNNING,就做reject操作,在第一個(gè)線程進(jìn)去的時(shí)候,會(huì)將第一個(gè)線程直接啟動(dòng)起來(lái);很多人 也是看這段代碼很繞,因?yàn)椴粩嗟难h(huán)判定類似的判定條件,你主要記住他們之間有時(shí)間差,要取最新的就好了。 此時(shí)貌似代碼看完了?咦,此時(shí)有問(wèn)題了: 1、 等待中的線程在后來(lái)是如何跑起來(lái)的呢?線程池是不是有類似Timer一樣的守護(hù)進(jìn)程不斷掃描線程隊(duì)列和等待隊(duì)列?還是利用某種鎖機(jī)制,實(shí)現(xiàn)類似wait和 notify實(shí)現(xiàn)的? 2、 線程池的運(yùn)行隊(duì)列和等待隊(duì)列是如何管理的呢?這里還沒看出影子呢! NO,NO,NO! Java在實(shí)現(xiàn)這部分的時(shí)候,使用了怪異的手段,神馬手段呢,還要再看一部分代碼才曉得。 在前面【源碼段3】中,我們看到了一個(gè)方法叫:addThread(),也許很少有人會(huì)想到關(guān)鍵在這里,其實(shí)關(guān)鍵就是在這里: 我們看看addThread()方法到底做了什么。
源碼段6:
- private Thread addThread(Runnable firstTask) {
- Worker w = new Worker(firstTask);
- Thread t = threadFactory.newThread(w);
- if (t != null) {
- w.thread = t;
- workers.add(w);
- int nt = ++poolSize;
- if (nt > largestPoolSize)
- largestPoolSize = nt;
- }
- return t;
- }
這里創(chuàng)建了一個(gè)Worker,其余的操作,就是將poolSize++的操作,然后將將其放入workers的運(yùn)行的HashSet中等操作;
我們主要關(guān)心Worker是干什么的,因?yàn)檫@個(gè)threadFactory對(duì) 我們用途不大,只是做了Thread的命名處理;而Worker你會(huì)發(fā)現(xiàn)它的定義也是一個(gè)Runnable,外部開始在代碼段中發(fā)現(xiàn)了調(diào)用哪個(gè)這個(gè) Worker的start()方法,也就是線程的啟動(dòng)方法,其實(shí)也就是調(diào)用了Worker的run()方法,那么我們重點(diǎn)要關(guān)心run方法是如何處理的
源碼段7:
- public void run() {
- try {
- Runnable task = firstTask;
- firstTask = null;
- while (task != null || (task = getTask()) != null) {
- runTask(task);
- task = null;
- }
- } finally {
- workerDone(this);
- }
- }
FirstTask其實(shí)就是開始在創(chuàng)建work的時(shí)候,由外部傳入的Runnable對(duì)象,也就是你自己的Thread,你會(huì)發(fā)現(xiàn)它如果發(fā)現(xiàn)task為空,就會(huì)調(diào)用getTask()方法再判定,直到兩者為空,并且是一個(gè)while循環(huán)體。
那么看看getTask()方法的實(shí)現(xiàn)為:
源碼段8:
- Runnable getTask() {
- for (;;) {
- try {
- int state = runState;
- if (state > SHUTDOWN)
- return null;
- Runnable r;
- if (state == SHUTDOWN) // Help drain queue
- r = workQueue.poll();
- else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
- r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
- else
- r = workQueue.take();
- if (r != null)
- return r;
- if (workerCanExit()) {
- if (runState >= SHUTDOWN) // Wake up others
- interruptIdleWorkers();
- return null;
- }
- // Else retry
- } catch (InterruptedException ie) {
- // On interruption, re-check runState
- }
- }
你會(huì)發(fā)現(xiàn)它是從workQueue隊(duì)列中,也就是等待隊(duì)列中獲取一個(gè)元素出來(lái)并返回!
回過(guò)頭來(lái)根據(jù)代碼段6理解下:
當(dāng)前線程運(yùn)行完后,在到workQueue中去獲取一個(gè)task出來(lái),繼續(xù)運(yùn)行,這樣就保證了線程池中有一定的線程一直在運(yùn)行;此時(shí)若跳出了 while循環(huán),只有workQueue隊(duì)列為空才會(huì)出現(xiàn)或出現(xiàn)了類似于shutdown的操作,自然運(yùn)行隊(duì)列會(huì)減少1,當(dāng)再有新的線程進(jìn)來(lái)的時(shí)候,就又 開始向worker里面放數(shù)據(jù)了,這樣以此類推,實(shí)現(xiàn)了線程池的功能。
這里可以看下run方法的finally中調(diào)用的workerDone方法為:
源碼段9:
- void workerDone(Worker w) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- completedTaskCount += w.completedTasks;
- workers.remove(w);
- if (--poolSize == 0)
- tryTerminate();
- } finally {
- mainLock.unlock();
- }
- }
注意這里將workers.remove(w)掉,并且調(diào)用了—poolSize來(lái)做操作。
至于tryTerminate是做了更多關(guān)于回收方面的操作。
最后我們還要看一段代碼就是在【源碼段6】中出現(xiàn)的代碼調(diào)用為:runTask(task);這個(gè)方法也是運(yùn)行的關(guān)鍵。
源碼段10:
- private void runTask(Runnable task) {
- final ReentrantLock runLock = this.runLock;
- runLock.lock();
- try {
- if (runState < STOP && Thread.interrupted() && runState >= STOP)
- thread.interrupt();
- boolean ran = false;
- beforeExecute(thread, task);
- try {
- task.run();
- ran = true;
- afterExecute(task, null);
- ++completedTasks;
- } catch (RuntimeException ex) {
- if (!ran)
- afterExecute(task, ex);
- throw ex;
- }
- } finally {
- runLock.unlock();
- }
- }
你可以看到,這里面的task為傳入的task信息,調(diào)用的不是start方法,而是run方法,因?yàn)閞un方法直接調(diào)用不會(huì)啟動(dòng)新的線程,也是因?yàn)檫@樣,導(dǎo)致了你無(wú)法獲取到你自己的線程的狀態(tài),因?yàn)榫€程池是直接調(diào)用的run方法,而不是start方法來(lái)運(yùn)行。
這里有個(gè)beforeExecute和afterExecute方法,分別代表在執(zhí)行前和執(zhí)行后,你可以做一段操作,在這個(gè)類中,這兩個(gè)方法都是【空body】的,因?yàn)槠胀ň€程池?zé)o需做更多的操作。
如果你要實(shí)現(xiàn)類似暫停等待通知的或其他的操作,可以自己extends后進(jìn)行重寫構(gòu)造;
本文沒有介紹關(guān)于ScheduledThreadPoolExecutor調(diào)用的細(xì)節(jié),下一篇文章會(huì)詳細(xì)說(shuō)明,因?yàn)榇蟛糠执a和本文一致,區(qū)別在于一些細(xì)節(jié),在介紹:ScheduledThreadPoolExecutor的時(shí)候,會(huì)明確的介紹它與Timer和TimerTask的巨大區(qū)別,區(qū)別不在于使用,而是在于本身內(nèi)在的處理細(xì)節(jié)。