自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

并發(fā)容器——BlockingQueue相關(guān)類

開發(fā) 后端

java.util.concurrent提供了多種并發(fā)容器,總體上來說有4類

Queue類:BlockingQueue ConcurrentLinkedQueue

Map類:ConcurrentMap

Set類:ConcurrentSkipListSet CopyOnWriteArraySet

List類:CopyOnWriteArrayList

接下來一系列文章,我會(huì)對(duì)每一類的源碼進(jìn)行分析,試圖讓它們的實(shí)現(xiàn)機(jī)制完全暴露在大家面前。這篇主要是BlockingQueue及其相關(guān)類。

先給出結(jié)構(gòu)圖:

 

 

下面我按這樣的順序來展開:

1、BlockingQueue

2、ArrayBlockingQueue 2.1 添加新元素的方法:add/put/offer

2.2 該類的幾個(gè)實(shí)例變量:takeIndex/putIndex/count/

2.3 Condition實(shí)現(xiàn)

3、LinkedBlockingQueue

4、PriorityBlockingQueue

5、DelayQueue

6、BlockingDque+LinkedBlockingQueue

其中前兩個(gè)分析的盡量詳細(xì),為了方便大家看,基本貼出了所有相關(guān)源碼。后面幾個(gè)就用盡量用文字論述,如果看得吃力,建議對(duì)著jdk的源碼看。

1、BlockingQueue

BlockingQueue繼承了Queue,Queu是先入先出(FIFO),BlockingQueue是JDK 5.0新引入的。

根據(jù)隊(duì)列null/full時(shí)的表現(xiàn),BlockingQueue的方法分為以下幾類:

 

 

至于為什么要使用并發(fā)容器,一個(gè)典型的例子就是生產(chǎn)者-消費(fèi)者的例子,為了精簡本文篇幅,放到附件中見附件:“生產(chǎn)者-消費(fèi)者 測試.rar”。

另外,BlockingQueue接口定義的所有方法實(shí)現(xiàn)都是線程安全的,它的實(shí)現(xiàn)類里面都會(huì)用鎖和其他控制并發(fā)的手段保證這種線程安全,但是這些類同時(shí)也實(shí)現(xiàn)了Collection接口(主要是AbstractQueue實(shí)現(xiàn)),所以會(huì)出現(xiàn)BlockingQueue的實(shí)現(xiàn)類也能同時(shí)使用Conllection接口方法,而這時(shí)會(huì)出現(xiàn)的問題就是像addAll,containsAll,retainAll和removeAll這類批量方法的實(shí)現(xiàn)不保證線程安全,舉個(gè)例子就是addAll 10個(gè)items到一個(gè)ArrayBlockingQueue,可能中途失敗但是卻有幾個(gè)item已經(jīng)被放進(jìn)這個(gè)隊(duì)列里面了。

2、ArrayBlockingQueue

ArrayBlockingQueue創(chuàng)建的時(shí)候需要指定容量capacity(可以存儲(chǔ)的最大的元素個(gè)數(shù),因?yàn)樗粫?huì)自動(dòng)擴(kuò)容)以及是否為公平鎖(fair參數(shù))。

在創(chuàng)建ArrayBlockingQueue的時(shí)候默認(rèn)創(chuàng)建的是非公平鎖,不過我們可以在它的構(gòu)造函數(shù)里指定。這里調(diào)用ReentrantLock的構(gòu)造函數(shù)創(chuàng)建鎖的時(shí)候,調(diào)用了:

public ReentrantLock(boolean fair) {

sync = (fair)? new FairSync() : new NonfairSync();

}

FairSync/ NonfairSync是ReentrantLock的內(nèi)部類:

線程按順序請(qǐng)求獲得公平鎖,而一個(gè)非公平鎖可以闖入,如果鎖的狀態(tài)可用,請(qǐng)求非公平鎖的線程可在等待隊(duì)列中向前跳躍,獲得該鎖。內(nèi)部鎖synchronized沒有提供確定的公平性保證。

分三點(diǎn)來講這個(gè)類:

2.1 添加新元素的方法:add/put/offer

2.2 該類的幾個(gè)實(shí)例變量:takeIndex/putIndex/count/

2.3 Condition實(shí)現(xiàn)

2.1 添加新元素的方法:add/put/offer

首先,談到添加元素的方法,首先得分析以下該類同步機(jī)制中用到的鎖:

Java代碼

  1. lock = new ReentrantLock(fair);     
  2. notEmpty = lock.newCondition();//Condition Variable 1     
  3. notFull =  lock.newCondition();//Condition Variable 2    

 

這三個(gè)都是該類的實(shí)例變量,只有一個(gè)鎖lock,然后lock實(shí)例化出兩個(gè)Condition,notEmpty/noFull分別用來協(xié)調(diào)多線程的讀寫操作。

Java代碼

  1. 1、     
  2. public boolean offer(E e) {     
  3.         if (e == nullthrow new NullPointerException();     
  4.         final ReentrantLock lock = this.lock;//每個(gè)對(duì)象對(duì)應(yīng)一個(gè)顯示的鎖     
  5.         lock.lock();//請(qǐng)求鎖直到獲得鎖(不可以被interrupte)     
  6.         try {     
  7.             if (count == items.length)//如果隊(duì)列已經(jīng)滿了     
  8.                 return false;     
  9.             else {     
  10.                 insert(e);     
  11.                 return true;     
  12.             }     
  13.         } finally {     
  14.             lock.unlock();//     
  15.         }     
  16. }     
  17. 看insert方法:     
  18. private void insert(E x) {     
  19.         items[putIndex] = x;     
  20.         //增加全局index的值。     
  21.         /*    
  22.         Inc方法體內(nèi)部:    
  23.         final int inc(int i) {    
  24.         return (++i == items.length)? 0 : i;    
  25.             }    
  26.         這里可以看出ArrayBlockingQueue采用從前到后向內(nèi)部數(shù)組插入的方式插入新元素的。如果插完了,putIndex可能重新變?yōu)?(在已經(jīng)執(zhí)行了移除操作的前提下,否則在之前的判斷中隊(duì)列為滿)    
  27.         */    
  28.         putIndex = inc(putIndex);      
  29.         ++count;     
  30.         notEmpty.signal();//wake up one waiting thread     
  31. }    

 

Java代碼

  1. public void put(E e) throws InterruptedException {     
  2.         if (e == nullthrow new NullPointerException();     
  3.         final E[] items = this.items;     
  4.         final ReentrantLock lock = this.lock;     
  5.         lock.lockInterruptibly();//請(qǐng)求鎖直到得到鎖或者變?yōu)閕nterrupted     
  6.         try {     
  7.             try {     
  8.                 while (count == items.length)//如果滿了,當(dāng)前線程進(jìn)入noFull對(duì)應(yīng)的等waiting狀態(tài)     
  9.                     notFull.await();     
  10.             } catch (InterruptedException ie) {     
  11.                 notFull.signal(); // propagate to non-interrupted thread     
  12.                 throw ie;     
  13.             }     
  14.             insert(e);     
  15.         } finally {     
  16.             lock.unlock();     
  17.         }     
  18. }    

 

Java代碼

  1. public boolean offer(E e, long timeout, TimeUnit unit)     
  2.         throws InterruptedException {     
  3.     
  4.         if (e == nullthrow new NullPointerException();     
  5.     long nanos = unit.toNanos(timeout);     
  6.         final ReentrantLock lock = this.lock;     
  7.         lock.lockInterruptibly();     
  8.         try {     
  9.             for (;;) {     
  10.                 if (count != items.length) {     
  11.                     insert(e);     
  12.                     return true;     
  13.                 }     
  14.                 if (nanos <= 0)     
  15.                     return false;     
  16.                 try {     
  17.                 //如果沒有被 signal/interruptes,需要等待nanos時(shí)間才返回     
  18.                     nanos = notFull.awaitNanos(nanos);     
  19.                 } catch (InterruptedException ie) {     
  20.                     notFull.signal(); // propagate to non-interrupted thread     
  21.                     throw ie;     
  22.                 }     
  23.             }     
  24.         } finally {     
  25.             lock.unlock();     
  26.         }     
  27.     }    

 

Java代碼

  1. public boolean add(E e) {     
  2.     return super.add(e);     
  3.     }     
  4. 父類:     
  5. public boolean add(E e) {     
  6.         if (offer(e))     
  7.             return true;     
  8.         else    
  9.             throw new IllegalStateException("Queue full");     
  10.     }    

 

2.2 該類的幾個(gè)實(shí)例變量:takeIndex/putIndex/count

Java代碼

  1. 用三個(gè)數(shù)字來維護(hù)這個(gè)隊(duì)列中的數(shù)據(jù)變更:     
  2. /** items index for next take, poll or remove */    
  3.     private int takeIndex;     
  4.     /** items index for next put, offer, or add. */    
  5.     private int putIndex;     
  6.     /** Number of items in the queue */    
  7.     private int count;    

 

提取元素的三個(gè)方法take/poll/remove內(nèi)部都調(diào)用了這個(gè)方法:

Java代碼

  1. private E extract() {     
  2.         final E[] items = this.items;     
  3.         E x = items[takeIndex];     
  4.         items[takeIndex] = null;//移除已經(jīng)被提取出的元素     
  5.         takeIndex = inc(takeIndex);//策略和添加元素時(shí)相同     
  6.         --count;     
  7.         notFull.signal();//提醒其他在notFull這個(gè)Condition上waiting的線程可以嘗試工作了     
  8.         return x;     
  9.     }   

 

從這個(gè)方法里可見,tabkeIndex維護(hù)一個(gè)可以提取/移除元素的索引位置,因?yàn)閠akeIndex是從0遞增的,所以這個(gè)類是FIFO隊(duì)列。

putIndex維護(hù)一個(gè)可以插入的元素的位置索引。

count顯然是維護(hù)隊(duì)列中已經(jīng)存在的元素總數(shù)。

2.3 Condition實(shí)現(xiàn)

Condition現(xiàn)在的實(shí)現(xiàn)只有java.util.concurrent.locks.AbstractQueueSynchoronizer內(nèi)部的ConditionObject,并且通過ReentranLock的newCondition()方法暴露出來,這是因?yàn)镃ondition的await()/sinal()一般在lock.lock()與lock.unlock()之間執(zhí)行,當(dāng)執(zhí)行condition.await()方法時(shí),它會(huì)首先釋放掉本線程持有的鎖,然后自己進(jìn)入等待隊(duì)列。直到sinal(),喚醒后又會(huì)重新試圖去拿到鎖,拿到后執(zhí)行await()下的代碼,其中釋放當(dāng)前鎖和得到當(dāng)前鎖都需要ReentranLock的tryAcquire(int arg)方法來判定,并且享受ReentranLock的重進(jìn)入特性。

Java代碼

  1. public final void await() throws InterruptedException {     
  2.             if (Thread.interrupted())     
  3.                 throw new InterruptedException();     
  4.            //加一個(gè)新的condition等待節(jié)點(diǎn)     
  5.  Node node = addConditionWaiter();     
  6. //釋放自己的鎖     
  7.             int savedState = fullyRelease(node);      
  8.             int interruptMode = 0;     
  9.             while (!isOnSyncQueue(node)) {     
  10.             //如果當(dāng)前線程 等待狀態(tài)時(shí)CONDITION,park住當(dāng)前線程,等待condition的signal來解除     
  11.                 LockSupport.park(this);     
  12.                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)     
  13.                     break;     
  14.             }     
  15.             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)     
  16.                 interruptMode = REINTERRUPT;     
  17.             if (node.nextWaiter != null)     
  18.                 unlinkCancelledWaiters();     
  19.             if (interruptMode != 0)     
  20.                 reportInterruptAfterWait(interruptMode);     
  21.         }   

 

3、LinkedBlockingQueue

單向鏈表結(jié)構(gòu)的隊(duì)列。如果不指定容量默認(rèn)為Integer.MAX_VALUE。通過putLock和takeLock兩個(gè)鎖進(jìn)行同步,兩個(gè)鎖分別實(shí)例化notFull和notEmpty兩個(gè)Condtion,用來協(xié)調(diào)多線程的存取動(dòng)作。其中某些方法(如remove,toArray,toString,clear等)的同步需要同時(shí)獲得這兩個(gè)鎖,并且總是先putLock.lock緊接著takeLock.lock(在同一方法fullyLock中),這樣的順序是為了避免可能出現(xiàn)的死鎖情況(我也想不明白為什么會(huì)是這樣?)

4、PriorityBlockingQueue

看它的三個(gè)屬性,就基本能看懂這個(gè)類了:

Java代碼

  1. private final PriorityQueue q;     
  2.     private final ReentrantLock lock = new ReentrantLock(true);     
  3.     private final Condition notEmpty = lock.newCondition();   

 

q說明,本類內(nèi)部數(shù)據(jù)結(jié)構(gòu)是PriorityQueue,至于PriorityQueue怎么排序看我之前一篇文章:http://jiadongkai-sina-com.iteye.com/blog/825683

lock說明本類使用一個(gè)lock來同步讀寫等操作。

notEmpty協(xié)調(diào)隊(duì)列是否有新元素提供,而隊(duì)列滿了以后會(huì)調(diào)用PriorityQueue的grow方法來擴(kuò)容。

5、DelayQueue

Delayed接口繼承自Comparable,我們插入的E元素都要實(shí)現(xiàn)這個(gè)接口。

DelayQueue的設(shè)計(jì)目的間API文檔:

An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired. The head of the queue is that Delayed element whose delay expired furthest in the past. If no delay has expired there is no head and poll will returnnull. Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero. Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements. For example, the size method returns the count of both expired and unexpired elements. This queue does not permit null elements.

因?yàn)镈elayQueue構(gòu)造函數(shù)了里限定死不允許傳入comparator(之前的PriorityBlockingQueue中沒有限定死),即只能在compare方法里定義優(yōu)先級(jí)的比較規(guī)則。再看上面這段英文,“The head of the queue is that Delayed element whose delay expired furthest in the past.”說明compare方法實(shí)現(xiàn)的時(shí)候要保證最先加入的元素最早結(jié)束延時(shí)。而 “Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero.”說明getDelay方法的實(shí)現(xiàn)必須保證延時(shí)到了返回的值變?yōu)?lt;=0的int。

上面這段英文中,還說明了:在poll/take的時(shí)候,隊(duì)列中元素會(huì)判定這個(gè)elment有沒有達(dá)到超時(shí)時(shí)間,如果沒有達(dá)到,poll返回null,而take進(jìn)入等待狀態(tài)。但是,除了這兩個(gè)方法,隊(duì)列中的元素會(huì)被當(dāng)做正常的元素來對(duì)待。例如,size方法返回所有元素的數(shù)量,而不管它們有沒有達(dá)到超時(shí)時(shí)間。而協(xié)調(diào)的Condition available只對(duì)take和poll是有意義的。

另外需要補(bǔ)充的是,在ScheduledThreadPoolExecutor中工作隊(duì)列類型是它的內(nèi)部類DelayedWorkQueue,而DelayedWorkQueue的Task容器是DelayQueue類型,而ScheduledFutureTask作為Delay的實(shí)現(xiàn)類作為Runnable的封裝后的Task類。也就是說ScheduledThreadPoolExecutor是通過DelayQueue優(yōu)先級(jí)判定規(guī)則來執(zhí)行任務(wù)的。

6、BlockingDque+LinkedBlockingQueue

BlockingDque為阻塞雙端隊(duì)列接口,實(shí)現(xiàn)類有LinkedBlockingDque。雙端隊(duì)列特別之處是它首尾都可以操作。LinkedBlockingDque不同于LinkedBlockingQueue,它只用一個(gè)lock來維護(hù)讀寫操作,并由這個(gè)lock實(shí)例化出兩個(gè)Condition notEmpty及notFull,而LinkedBlockingQueue讀和寫分別維護(hù)一個(gè)lock。

【編輯推薦】

  1. Java Web應(yīng)用開發(fā)中的一些概念
  2. Tomcat 7 應(yīng)用實(shí)測:聲明式Servlet 3.0
  3. 探秘Servlet 3.0中的Web安全改進(jìn)
  4. 簡化Web應(yīng)用開發(fā) Servlet 3.0特性詳解
  5. Servlet 3.0的異步處理
責(zé)任編輯:金賀 來源: ITEYE博客
相關(guān)推薦

2023-07-03 09:59:00

并發(fā)編程并發(fā)容器

2020-06-29 07:52:17

Java工具類開發(fā)

2023-06-30 08:27:20

2022-07-04 11:39:21

并發(fā)容器同步容器機(jī)制

2025-01-14 00:00:00

Blocking隊(duì)列元素

2009-08-05 18:39:54

C#異常類

2023-12-07 08:13:58

Java開發(fā)

2020-07-01 07:52:07

Java并發(fā)容器

2010-02-06 15:49:31

刪除C++容器值

2024-05-29 08:49:45

2010-01-05 16:15:05

.NET Framew

2011-07-13 14:58:53

STL容器

2022-10-12 07:53:46

并發(fā)編程同步工具

2024-12-26 07:49:57

Java隊(duì)列線程

2010-02-01 17:31:06

C++類成員

2009-09-01 16:14:08

C# Socket類

2009-12-24 15:42:01

ADO類庫

2011-06-24 14:17:58

Qt 容器類 QVector

2025-02-03 09:10:04

2009-12-21 16:24:24

WCF新到工廠
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)