自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

阻塞隊(duì)列—LinkedBlockingQueue源碼分析

開(kāi)發(fā) 前端
LinkedBlockingQueue 由鏈接節(jié)點(diǎn)支持的可選有界隊(duì)列,是一個(gè)基于鏈表的無(wú)界隊(duì)列(理論上有界),隊(duì)列按照先進(jìn)先出的順序進(jìn)行排序。LinkedBlockingQueue不同于ArrayBlockingQueue,它如果不指定容量,默認(rèn)為 Integer.MAX_VALUE,也就是無(wú)界隊(duì)列。

 前言

 LinkedBlockingQueue 由鏈接節(jié)點(diǎn)支持的可選有界隊(duì)列,是一個(gè)基于鏈表的無(wú)界隊(duì)列(理論上有界),隊(duì)列按照先進(jìn)先出的順序進(jìn)行排序。LinkedBlockingQueue不同于ArrayBlockingQueue,它如果不指定容量,默認(rèn)為 Integer.MAX_VALUE,也就是無(wú)界隊(duì)列。所以為了避免隊(duì)列過(guò)大造成機(jī)器負(fù)載或者內(nèi)存爆滿的情況出現(xiàn),我們?cè)谑褂玫臅r(shí)候建議手動(dòng)傳一個(gè)隊(duì)列的大小。

隊(duì)列創(chuàng)建 

  1. BlockingQueue blockingQueue = new LinkedBlockingQueue<>(); 

上面這段代碼中,blockingQueue 的容量將設(shè)置為 Integer.MAX_VALUE 。

應(yīng)用場(chǎng)景

多用于任務(wù)隊(duì)列,單線程發(fā)布任務(wù),任務(wù)滿了就停止等待阻塞,當(dāng)任務(wù)被完成消費(fèi)少了又開(kāi)始負(fù)責(zé)發(fā)布任務(wù)。

我們來(lái)看一個(gè)例子:

  1. package com.niuh.queue.linked; 
  2.  
  3. import org.apache.commons.lang.RandomStringUtils; 
  4.  
  5. import java.util.concurrent.CountDownLatch; 
  6. import java.util.concurrent.ExecutorService; 
  7. import java.util.concurrent.Executors; 
  8. import java.util.concurrent.LinkedBlockingQueue; 
  9. import java.util.concurrent.TimeUnit; 
  10. import java.util.concurrent.atomic.AtomicLong; 
  11.  
  12. public class TestLinkedBlockingQueue { 
  13.  
  14.     private static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>(); 
  15.     // 線程控制開(kāi)關(guān) 
  16.     private final CountDownLatch latch = new CountDownLatch(1); 
  17.     // 線程池 
  18.     private final ExecutorService pool; 
  19.     // AtomicLong 計(jì)數(shù) 生產(chǎn)數(shù)量 
  20.     private final AtomicLong output = new AtomicLong(0); 
  21.     // AtomicLong 計(jì)數(shù)  銷(xiāo)售數(shù)量 
  22.     private final AtomicLong sales = new AtomicLong(0); 
  23.     // 是否停止線程 
  24.     private final boolean clear; 
  25.  
  26.     public TestLinkedBlockingQueue(boolean clear) { 
  27.         this.pool = Executors.newCachedThreadPool(); 
  28.         this.clear = clear; 
  29.     } 
  30.  
  31.     public void service() throws InterruptedException { 
  32.         Consumer a = new Consumer(queue, sales, latch, clear); 
  33.         pool.submit(a); 
  34.  
  35.         Producer w = new Producer(queue, output, latch); 
  36.         pool.submit(w); 
  37.         latch.countDown(); 
  38.     } 
  39.  
  40.     public static void main(String[] args) { 
  41.         TestLinkedBlockingQueue t = new TestLinkedBlockingQueue(false); 
  42.         try { 
  43.             t.service(); 
  44.         } catch (InterruptedException e) { 
  45.             e.printStackTrace(); 
  46.         } 
  47.     } 
  48.  
  49. /** 
  50.  * 消費(fèi)者(銷(xiāo)售產(chǎn)品) 
  51.  */ 
  52. class Consumer implements Runnable { 
  53.     private final LinkedBlockingQueue<String> queue; 
  54.     private final AtomicLong sales; 
  55.     private final CountDownLatch latch; 
  56.     private final boolean clear; 
  57.  
  58.     public Consumer(LinkedBlockingQueue<String> queue, AtomicLong sales, CountDownLatch latch, boolean clear) { 
  59.         this.queue = queue; 
  60.         this.sales = sales; 
  61.         this.latch = latch; 
  62.         this.clear = clear; 
  63.     } 
  64.  
  65.     public void run() { 
  66.         try { 
  67.             latch.await(); // 放閘之前老實(shí)的等待著 
  68.             for (; ; ) { 
  69.                 sale(); 
  70.                 Thread.sleep(500); 
  71.             } 
  72.         } catch (InterruptedException e) { 
  73.             if (clear) { // 響應(yīng)中斷請(qǐng)求后,如果有要求則銷(xiāo)售完隊(duì)列的產(chǎn)品后再終止線程 
  74.                 cleanWarehouse(); 
  75.             } else { 
  76.                 System.out.println("Seller Thread will be interrupted..."); 
  77.             } 
  78.         } 
  79.     } 
  80.  
  81.     public void sale() { 
  82.         System.out.println("==取take="); 
  83.         try { 
  84.             String item = queue.poll(50, TimeUnit.MILLISECONDS); 
  85.             System.out.println(item); 
  86.             if (item != null) { 
  87.                 sales.incrementAndGet(); // 可以聲明long型的參數(shù)獲得返回值,作為日志的參數(shù) 
  88.             } 
  89.         } catch (InterruptedException e) { 
  90.             e.printStackTrace(); 
  91.         } 
  92.     } 
  93.  
  94.     /** 
  95.      * 銷(xiāo)售完隊(duì)列剩余的產(chǎn)品 
  96.      */ 
  97.     private void cleanWarehouse() { 
  98.         try { 
  99.             while (queue.size() > 0) { 
  100.                 sale(); 
  101.             } 
  102.         } catch (Exception ex) { 
  103.             System.out.println("Seller Thread will be interrupted..."); 
  104.         } 
  105.     } 
  106.  
  107. /** 
  108.  * 生產(chǎn)者(生產(chǎn)產(chǎn)品) 
  109.  * 
  110.  */ 
  111. class Producer implements Runnable { 
  112.     private LinkedBlockingQueue<String> queue; 
  113.     private CountDownLatch latch; 
  114.     private AtomicLong output
  115.  
  116.     public Producer() { 
  117.  
  118.     } 
  119.  
  120.     public Producer(LinkedBlockingQueue<String> queue, AtomicLong output, CountDownLatch latch) { 
  121.         this.queue = queue; 
  122.         this.latch = latch; 
  123.         this.output = output
  124.     } 
  125.  
  126.     public void run() { 
  127.         try { 
  128.             latch.await(); // 線程等待 
  129.             for (; ; ) { 
  130.                 work(); 
  131.                 Thread.sleep(100); 
  132.             } 
  133.         } catch (InterruptedException e) { 
  134.             System.out.println("Producer thread will be interrupted..."); 
  135.         } 
  136.     } 
  137.  
  138.     /** 
  139.      * 工作 
  140.      */ 
  141.     public void work() { 
  142.         try { 
  143.             String product = RandomStringUtils.randomAscii(3); 
  144.             boolean success = queue.offer(product, 100, TimeUnit.MILLISECONDS); 
  145.             if (success) { 
  146.                 output.incrementAndGet();// 可以聲明long型的參數(shù)獲得返回值,作為日志的參數(shù) 
  147.             } 
  148.         } catch (InterruptedException e) { 
  149.             e.printStackTrace(); 
  150.         } 
  151.     } 
  152.  

 工作原理

LinkedBlockingQueue內(nèi)部由單鏈表實(shí)現(xiàn),只能從head取元素,從tail添加元素。添加元素和獲取元素都有獨(dú)立的鎖,也就是說(shuō)LinkedBlockingQueue是讀寫(xiě)分離的,讀寫(xiě)操作可以并行執(zhí)行。LinkedBlockingQueue采用可重入鎖(ReentrantLock)來(lái)保證在并發(fā)情況下的線程安全。

向無(wú)限隊(duì)列添加元素的所有操作都將永遠(yuǎn)不會(huì)阻塞,[注意這里不是說(shuō)不會(huì)加鎖保證線程安全],因此它可以增長(zhǎng)到非常大的容量。

使用無(wú)限 BlockingQueue 設(shè)計(jì)生產(chǎn)者 - 消費(fèi)者模型時(shí)最重要的是 消費(fèi)者應(yīng)該能夠像生產(chǎn)者向隊(duì)列添加消息一樣快地消費(fèi)消息。否則,內(nèi)存可能會(huì)填滿,然后就會(huì)得到一個(gè) OutOfMemory 異常。

源碼分析

定義

LinkedBlockingQueue的類(lèi)繼承關(guān)系如下:

 

 其包含的方法定義如下:

 

成員屬性

  1. /** 
  2. * 節(jié)點(diǎn)類(lèi),用于存儲(chǔ)數(shù)據(jù) 
  3. */ 
  4. static class Node<E> { 
  5.     E item; 
  6.  
  7.     Node<E> next
  8.  
  9.     Node(E x) { item = x; } 
  10.  
  11. /** 阻塞隊(duì)列的大小, 默認(rèn)為Integer.MAX_VALUE */ 
  12. private final int capacity; 
  13.  
  14. /** 當(dāng)前阻塞隊(duì)列中的元素個(gè)數(shù) */ 
  15. private final AtomicInteger count = new AtomicInteger(); 
  16.  
  17. /** 
  18.  * 阻塞隊(duì)列的頭節(jié)點(diǎn) 
  19.  */ 
  20. transient Node<E> head; 
  21.  
  22. /** 
  23.  * 阻塞隊(duì)列的尾節(jié)點(diǎn) 
  24.  */ 
  25. private transient Node<E> last
  26.  
  27. /** 獲取并移除元素時(shí)使用的鎖,如take,poll,etc */ 
  28. private final ReentrantLock takeLock = new ReentrantLock(); 
  29.  
  30. /** notEmpty 條件對(duì)象,當(dāng)隊(duì)列沒(méi)有數(shù)據(jù)時(shí)用于掛起執(zhí)行刪除的線程 */ 
  31. private final Condition notEmpty = takeLock.newCondition(); 
  32.  
  33. /** 添加元素時(shí)使用的鎖,如 put,offer,etc */ 
  34. private final ReentrantLock putLock = new ReentrantLock(); 
  35.  
  36. /** notFull 條件對(duì)象,每當(dāng)隊(duì)列數(shù)據(jù)已滿時(shí)用于掛起執(zhí)行添加的線程 */ 
  37. private final Condition notFull = putLock.newCondition(); 

 從上面的屬性我們知道,每個(gè)添加到LinkedBlockingQueue隊(duì)列中的數(shù)據(jù)都將被封裝成Node節(jié)點(diǎn),添加的鏈表隊(duì)列中,其中head和last分別指向隊(duì)列的頭結(jié)點(diǎn)和尾結(jié)點(diǎn)。與ArrayBlockingQueue不同的是,LinkedBlockingQueue內(nèi)部分別使用了takeLock 和 putLock 對(duì)并發(fā)進(jìn)行控制,也就是說(shuō),添加和刪除操作并不是互斥操作,可以同時(shí)進(jìn)行,這樣也就可以大大提高吞吐量。

這里如果不指定隊(duì)列的容量大小,也就是使用默認(rèn)的Integer.MAX_VALUE,如果存在添加速度大于刪除速度時(shí)候,有可能會(huì)內(nèi)存溢出,這點(diǎn)在使用前希望慎重考慮。

另外,LinkedBlockingQueue對(duì)每一個(gè)lock鎖都提供了一個(gè)Condition用來(lái)掛起和喚醒其他線程。

構(gòu)造函數(shù)

默認(rèn)的構(gòu)造函數(shù)和最后一個(gè)構(gòu)造函數(shù)創(chuàng)建的隊(duì)列大小都為 Integer.MAX_VALUE,只有第二個(gè)構(gòu)造函數(shù)用戶可以指定隊(duì)列的大小。第二個(gè)構(gòu)造函數(shù)最后初始化了last和head節(jié)點(diǎn),讓它們都指向了一個(gè)元素為null的節(jié)點(diǎn)。

最后一個(gè)構(gòu)造函數(shù)使用了putLock來(lái)進(jìn)行加鎖,但是這里并不是為了多線程的競(jìng)爭(zhēng)而加鎖,只是為了放入的元素能立即對(duì)其他線程可見(jiàn)。

  1. public LinkedBlockingQueue() { 
  2.     // 默認(rèn)大小為Integer.MAX_VALUE 
  3.     this(Integer.MAX_VALUE); 
  4.  
  5.  
  6. public LinkedBlockingQueue(int capacity) { 
  7.     if (capacity <= 0) throw new IllegalArgumentException(); 
  8.     this.capacity = capacity; 
  9.     last = head = new Node<E>(null); 
  10.  
  11.  
  12. public LinkedBlockingQueue(Collection<? extends E> c) { 
  13.     this(Integer.MAX_VALUE); 
  14.     final ReentrantLock putLock = this.putLock; 
  15.     putLock.lock(); // Never contended, but necessary for visibility 
  16.     try { 
  17.         int n = 0; 
  18.         for (E e : c) { 
  19.             if (e == null
  20.                 throw new NullPointerException(); 
  21.             if (n == capacity) 
  22.                 throw new IllegalStateException("Queue full"); 
  23.             enqueue(new Node<E>(e)); 
  24.             ++n; 
  25.         } 
  26.         count.set(n); 
  27.     } finally { 
  28.         putLock.unlock(); 
  29.     } 

 入隊(duì)方法

LinkedBlockingQueue提供了多種入隊(duì)操作的實(shí)現(xiàn)來(lái)滿足不同情況下的需求,入隊(duì)操作有如下幾種:

  • void put(E e);
  • boolean offer(E e);
  • boolean offer(E e, long timeout, TimeUnit unit)。

其中:

  • offer方法有兩個(gè)重載版本,只有一個(gè)參數(shù)的版本,如果隊(duì)列滿了就返回false,否則加入到隊(duì)列中,返回true,add方法就是調(diào)用此版本的offer方法;另一個(gè)帶時(shí)間參數(shù)的版本,如果隊(duì)列滿了則等待,可指定等待的時(shí)間,如果這期間中斷了則拋出異常,如果等待超時(shí)了則返回false,否則加入到隊(duì)列中返回true;
  • put方法跟帶時(shí)間參數(shù)的offer方法邏輯一樣,不過(guò)沒(méi)有等待的時(shí)間限制,會(huì)一直等待直到隊(duì)列有空余位置了,再插入到隊(duì)列中,返回true。

put(E e)

  1. public void put(E e) throws InterruptedException { 
  2.     if (e == null) throw new NullPointerException(); 
  3.     int c = -1; 
  4.     Node<E> node = new Node<E>(e); 
  5.     final ReentrantLock putLock = this.putLock; 
  6.     final AtomicInteger count = this.count
  7.     // 獲取鎖中斷 
  8.     putLock.lockInterruptibly(); 
  9.     try { 
  10.         //判斷隊(duì)列是否已滿,如果已滿阻塞等待 
  11.         while (count.get() == capacity) { 
  12.             notFull.await(); 
  13.         } 
  14.         // 把node放入隊(duì)列中 
  15.         enqueue(node); 
  16.         c = count.getAndIncrement(); 
  17.         // 再次判斷隊(duì)列是否有可用空間,如果有喚醒下一個(gè)線程進(jìn)行添加操作 
  18.         if (c + 1 < capacity) 
  19.             notFull.signal(); 
  20.     } finally { 
  21.         putLock.unlock(); 
  22.     } 
  23.     // 如果隊(duì)列中有一條數(shù)據(jù),喚醒消費(fèi)線程進(jìn)行消費(fèi) 
  24.     if (c == 0) 
  25.         signalNotEmpty(); 

 小結(jié)put方法來(lái)看,它總共做了以下情況的考慮:

  • 隊(duì)列已滿,阻塞等待。
  • 隊(duì)列未滿,創(chuàng)建一個(gè)node節(jié)點(diǎn)放入隊(duì)列中,如果放完以后隊(duì)列還有剩余空間,繼續(xù)喚醒下一個(gè)添加線程進(jìn)行添加。如果放之前隊(duì)列中沒(méi)有元素,放完以后要喚醒消費(fèi)線程進(jìn)行消費(fèi)。

我們?cè)倏纯磒ut方法中用到的幾個(gè)其他方法,先來(lái)看看 enqueue(Node node) 方法:

  1. private void enqueue(Node<E> node) { 
  2.     last = last.next = node; 

 用一張圖來(lái)看看往隊(duì)列里依次放入元素A和元素B,如下:

接下來(lái)我們看看signalNotEmpty,順帶著看signalNotFull方法。

  1. private void signalNotEmpty() { 
  2.     final ReentrantLock takeLock = this.takeLock; 
  3.     takeLock.lock(); 
  4.     try { 
  5.         notEmpty.signal(); 
  6.     } finally { 
  7.         takeLock.unlock(); 
  8.     } 
  9.  
  10. private void signalNotFull() { 
  11.     final ReentrantLock putLock = this.putLock; 
  12.     putLock.lock(); 
  13.     try { 
  14.         notFull.signal(); 
  15.     } finally { 
  16.         putLock.unlock(); 
  17.     } 

 為什么要這么寫(xiě)?因?yàn)閟ignal的時(shí)候要獲取到該signal對(duì)應(yīng)的Condition對(duì)象的鎖才行。

offer(E e)

  1. public boolean offer(E e) { 
  2.     if (e == null) throw new NullPointerException(); 
  3.     final AtomicInteger count = this.count
  4.     if (count.get() == capacity) 
  5.         return false
  6.     int c = -1; 
  7.     Node<E> node = new Node<E>(e); 
  8.     final ReentrantLock putLock = this.putLock; 
  9.     putLock.lock(); 
  10.     try { 
  11.         // 隊(duì)列有可用空間,放入node節(jié)點(diǎn),判斷放入元素后是否還有可用空間, 
  12.         // 如果有,喚醒下一個(gè)添加線程進(jìn)行添加操作。 
  13.         if (count.get() < capacity) { 
  14.             enqueue(node); 
  15.             c = count.getAndIncrement(); 
  16.             if (c + 1 < capacity) 
  17.                 notFull.signal(); 
  18.         } 
  19.     } finally { 
  20.         putLock.unlock(); 
  21.     } 
  22.     if (c == 0) 
  23.         signalNotEmpty(); 
  24.     return c >= 0; 

 可以看到offer僅僅對(duì)put方法改動(dòng)了一點(diǎn)點(diǎn),當(dāng)隊(duì)列沒(méi)有可用元素的時(shí)候,不同于put方法的阻塞等待,offer方法直接方法false。

offer(E e, long timeout, TimeUnit unit)

  1. public boolean offer(E e, long timeout, TimeUnit unit) 
  2.         throws InterruptedException { 
  3.  
  4.     if (e == null) throw new NullPointerException(); 
  5.     long nanos = unit.toNanos(timeout); 
  6.     int c = -1; 
  7.     final ReentrantLock putLock = this.putLock; 
  8.     final AtomicInteger count = this.count
  9.     putLock.lockInterruptibly(); 
  10.     try { 
  11.         // 等待超時(shí)時(shí)間nanos,超時(shí)時(shí)間到了返回false 
  12.         while (count.get() == capacity) { 
  13.             if (nanos <= 0) 
  14.                 return false
  15.             nanos = notFull.awaitNanos(nanos); 
  16.         } 
  17.         enqueue(new Node<E>(e)); 
  18.         c = count.getAndIncrement(); 
  19.         if (c + 1 < capacity) 
  20.             notFull.signal(); 
  21.     } finally { 
  22.         putLock.unlock(); 
  23.     } 
  24.     if (c == 0) 
  25.         signalNotEmpty(); 
  26.     return true

 該方法只是對(duì)offer方法進(jìn)行了阻塞超時(shí)處理,使用了Condition的awaitNanos來(lái)進(jìn)行超時(shí)等待,這里為什么要用while循環(huán)?因?yàn)閍waitNanos方法是可中斷的,為了防止在等待過(guò)程中線程被中斷,這里使用while循環(huán)進(jìn)行等待過(guò)程中中斷的處理,繼續(xù)等待剩下需等待的時(shí)間。

出隊(duì)方法

入隊(duì)列的方法說(shuō)完后,我們來(lái)說(shuō)說(shuō)出隊(duì)列的方法。LinkedBlockingQueue提供了多種出隊(duì)操作的實(shí)現(xiàn)來(lái)滿足不同情況下的需求,如下:

  • E take();
  • E poll();
  • E poll(long timeout, TimeUnit unit);

take()

  1. public E take() throws InterruptedException { 
  2.     E x; 
  3.     int c = -1; 
  4.     final AtomicInteger count = this.count
  5.     final ReentrantLock takeLock = this.takeLock; 
  6.     takeLock.lockInterruptibly(); 
  7.     try { 
  8.         // 隊(duì)列為空,阻塞等待 
  9.         while (count.get() == 0) { 
  10.             notEmpty.await(); 
  11.         } 
  12.         x = dequeue(); 
  13.         c = count.getAndDecrement(); 
  14.         // 隊(duì)列中還有元素,喚醒下一個(gè)消費(fèi)線程進(jìn)行消費(fèi) 
  15.         if (c > 1) 
  16.             notEmpty.signal(); 
  17.     } finally { 
  18.         takeLock.unlock(); 
  19.     } 
  20.     // 移除元素之前隊(duì)列是滿的,喚醒生產(chǎn)線程進(jìn)行添加元素 
  21.     if (c == capacity) 
  22.         signalNotFull(); 
  23.     return x; 

 take方法看起來(lái)就是put方法的逆向操作,它總共做了以下情況的考慮:

  • 隊(duì)列為空,阻塞等待
  • 隊(duì)列不為空,從對(duì)首獲取并移除一個(gè)元素,如果消費(fèi)后還有元素在隊(duì)列中,繼續(xù)喚醒下一個(gè)消費(fèi)線程進(jìn)行元素移除。如果放之前隊(duì)列是滿元素的情況,移除完后需要喚醒生產(chǎn)線程進(jìn)行添加元素。

我們來(lái)看看dequeue方法

  1. private E dequeue() { 
  2.     // 獲取到head節(jié)點(diǎn) 
  3.     Node<E> h = head; 
  4.     // 獲取到head節(jié)點(diǎn)指向的下一個(gè)節(jié)點(diǎn) 
  5.     Node<E> first = h.next
  6.     // head節(jié)點(diǎn)原來(lái)指向的節(jié)點(diǎn)的next指向自己,等待下次gc回收 
  7.     h.next = h; // help GC 
  8.     // head節(jié)點(diǎn)指向新的節(jié)點(diǎn) 
  9.     head = first
  10.     // 獲取到新的head節(jié)點(diǎn)的item值 
  11.     E x = first.item; 
  12.     // 新head節(jié)點(diǎn)的item值設(shè)置為null 
  13.     first.item = null
  14.     return x; 

 我們結(jié)合注釋和圖來(lái)看一下鏈表算法: 


其實(shí)這個(gè)寫(xiě)法看起來(lái)很繞,我們其實(shí)也可以這么寫(xiě):

  1. private E dequeue() { 
  2.     // 獲取到head節(jié)點(diǎn) 
  3.     Node<E> h = head; 
  4.     // 獲取到head節(jié)點(diǎn)指向的下一個(gè)節(jié)點(diǎn),也就是節(jié)點(diǎn)A 
  5.     Node<E> first = h.next
  6.     // 獲取到下下個(gè)節(jié)點(diǎn),也就是節(jié)點(diǎn)B 
  7.     Node<E> next = first.next
  8.     // head的next指向下下個(gè)節(jié)點(diǎn),也就是圖中的B節(jié)點(diǎn) 
  9.     h.next = next
  10.     // 得到節(jié)點(diǎn)A的值 
  11.     E x = first.item; 
  12.     first.item = null; // help GC 
  13.     first.next = first; // help GC 
  14.     return x; 

 poll()

  1. public E poll() { 
  2.     final AtomicInteger count = this.count
  3.     if (count.get() == 0) 
  4.         return null
  5.     E x = null
  6.     int c = -1; 
  7.     final ReentrantLock takeLock = this.takeLock; 
  8.     takeLock.lock(); 
  9.     try { 
  10.         if (count.get() > 0) { 
  11.             x = dequeue(); 
  12.             c = count.getAndDecrement(); 
  13.             if (c > 1) 
  14.                 notEmpty.signal(); 
  15.         } 
  16.     } finally { 
  17.         takeLock.unlock(); 
  18.     } 
  19.     if (c == capacity) 
  20.         signalNotFull(); 
  21.     return x; 

 poll方法去除了take方法中元素為空后阻塞等待這一步驟,這里也就不詳細(xì)說(shuō)了。同理,poll(long timeout, TimeUnit unit)也和offer(E e, long timeout, TimeUnit unit)一樣,利用了Condition的awaitNanos方法來(lái)進(jìn)行阻塞等待直至超時(shí)。這里就不列出來(lái)說(shuō)了。

獲取元素方法 

  1. public E peek() { 
  2.     if (count.get() == 0) 
  3.         return null
  4.     final ReentrantLock takeLock = this.takeLock; 
  5.     takeLock.lock(); 
  6.     try { 
  7.         Node<E> first = head.next
  8.         if (first == null
  9.             return null
  10.         else 
  11.             return first.item; 
  12.     } finally { 
  13.         takeLock.unlock(); 
  14.     } 

 加鎖后,獲取到head節(jié)點(diǎn)的next節(jié)點(diǎn),如果為空返回null,如果不為空,返回next節(jié)點(diǎn)的item值。

刪除元素方法

  1. public boolean remove(Object o) { 
  2.     if (o == nullreturn false
  3.     // 兩個(gè)lock全部上鎖 
  4.     fullyLock(); 
  5.     try { 
  6.         // 從head開(kāi)始遍歷元素,直到最后一個(gè)元素 
  7.         for (Node<E> trail = head, p = trail.next
  8.              p != null
  9.              trail = p, p = p.next) { 
  10.             // 如果找到相等的元素,調(diào)用unlink方法刪除元素 
  11.             if (o.equals(p.item)) { 
  12.                 unlink(p, trail); 
  13.                 return true
  14.             } 
  15.         } 
  16.         return false
  17.     } finally { 
  18.         // 兩個(gè)lock全部解鎖 
  19.         fullyUnlock(); 
  20.     } 
  21.  
  22. void fullyLock() { 
  23.     putLock.lock(); 
  24.     takeLock.lock(); 
  25.  
  26. void fullyUnlock() { 
  27.     takeLock.unlock(); 
  28.     putLock.unlock(); 

 因?yàn)閞emove方法使用兩個(gè)鎖全部上鎖,所以其他操作都需要等待它完成,而該方法需要從head節(jié)點(diǎn)遍歷到尾節(jié)點(diǎn),所以時(shí)間復(fù)雜度為O(n)。我們來(lái)看看unlink方法。

  1. void unlink(Node<E> p, Node<E> trail) { 
  2.     // p的元素置為null 
  3.     p.item = null
  4.     // p的前一個(gè)節(jié)點(diǎn)的next指向p的next,也就是把p從鏈表中去除了 
  5.     trail.next = p.next
  6.     // 如果last指向p,刪除p后讓last指向trail 
  7.     if (last == p) 
  8.         last = trail; 
  9.     // 如果刪除之前元素是滿的,刪除之后就有空間了,喚醒生產(chǎn)線程放入元素 
  10.     if (count.getAndDecrement() == capacity) 
  11.         notFull.signal(); 

 總結(jié)

LinkedBlockingQueue是一個(gè)阻塞隊(duì)列,內(nèi)部由兩個(gè)ReentrantLock來(lái)實(shí)現(xiàn)出入隊(duì)列的線程安全,由各自的Condition對(duì)象的await和signal來(lái)實(shí)現(xiàn)等待和喚醒功能。它和ArrayBlockingQueue的不同點(diǎn)在于:

  • 隊(duì)列大小有所不同,ArrayBlockingQueue是有界的初始化必須指定大小,而LinkedBlockingQueue可以是有界的也可以是無(wú)界的(Integer.MAX_VALUE),對(duì)于后者而言,當(dāng)添加速度大于移除速度時(shí),在無(wú)界的情況下,可能會(huì)造成內(nèi)存溢出等問(wèn)題。
  • 數(shù)據(jù)存儲(chǔ)容器不同,ArrayBlockingQueue采用的是數(shù)組作為數(shù)據(jù)存儲(chǔ)容器,而LinkedBlockingQueue采用的則是以Node節(jié)點(diǎn)作為連接對(duì)象的鏈表。
  • 由于ArrayBlockingQueue采用的是數(shù)組的存儲(chǔ)容器,因此在插入或刪除元素時(shí)不會(huì)產(chǎn)生或銷(xiāo)毀任何額外的對(duì)象實(shí)例,而LinkedBlockingQueue則會(huì)生成一個(gè)額外的Node對(duì)象。這可能在長(zhǎng)時(shí)間內(nèi)需要高效并發(fā)地處理大批量數(shù)據(jù)的時(shí),對(duì)于GC可能存在較大影響。
  • 兩者的實(shí)現(xiàn)隊(duì)列添加或移除的鎖不一樣,ArrayBlockingQueue實(shí)現(xiàn)的隊(duì)列中的鎖是沒(méi)有分離的,即添加操作和移除操作采用的同一個(gè)ReenterLock鎖,而LinkedBlockingQueue實(shí)現(xiàn)的隊(duì)列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高隊(duì)列的吞吐量,也意味著在高并發(fā)的情況下生產(chǎn)者和消費(fèi)者可以并行地操作隊(duì)列中的數(shù)據(jù),以此來(lái)提高整個(gè)隊(duì)列的并發(fā)性能。

PS:以上代碼提交在 Github :

https://github.com/Niuh-Study/niuh-juc-final.git

文章持續(xù)更新,可以公眾號(hào)搜一搜「 一角錢(qián)技術(shù) 」第一時(shí)間閱讀, 本文 GitHub org_hejianhui/JavaStudy 已經(jīng)收錄,歡迎 Star。

 

責(zé)任編輯:姜華 來(lái)源: 今日頭條
相關(guān)推薦

2020-11-19 07:41:51

ArrayBlocki

2020-11-25 14:28:56

DelayedWork

2020-11-24 09:04:55

PriorityBlo

2017-04-12 10:02:21

Java阻塞隊(duì)列原理分析

2025-01-14 00:00:00

Blocking隊(duì)列元素

2024-01-29 15:54:41

Java線程池公平鎖

2012-06-14 10:34:40

Java阻塞搜索實(shí)例

2023-12-28 07:49:11

線程池源碼應(yīng)用場(chǎng)景

2025-04-03 07:41:55

API阻塞隊(duì)列數(shù)據(jù)

2023-12-15 09:45:21

阻塞接口

2023-10-30 11:40:36

OOM線程池單線程

2025-04-02 01:20:00

阻塞隊(duì)列源碼

2021-06-04 14:15:10

鴻蒙HarmonyOS應(yīng)用

2022-06-30 08:14:05

Java阻塞隊(duì)列

2021-05-23 16:03:42

LinkedBlock面試阻塞隊(duì)列

2024-10-14 12:34:08

2024-02-20 08:16:10

阻塞隊(duì)列源碼

2021-09-22 14:36:32

鴻蒙HarmonyOS應(yīng)用

2014-08-26 11:11:57

AsyncHttpCl源碼分析

2011-03-15 11:33:18

iptables
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)