阻塞隊(duì)列—LinkedBlockingQueue源碼分析
前言

LinkedBlockingQueue 由鏈接節(jié)點(diǎn)支持的可選有界隊(duì)列,是一個(gè)基于鏈表的無(wú)界隊(duì)列(理論上有界),隊(duì)列按照先進(jìn)先出的順序進(jìn)行排序。LinkedBlockingQueue不同于ArrayBlockingQueue,它如果不指定容量,默認(rèn)為 Integer.MAX_VALUE,也就是無(wú)界隊(duì)列。所以為了避免隊(duì)列過(guò)大造成機(jī)器負(fù)載或者內(nèi)存爆滿的情況出現(xiàn),我們?cè)谑褂玫臅r(shí)候建議手動(dòng)傳一個(gè)隊(duì)列的大小。
隊(duì)列創(chuàng)建
- BlockingQueue blockingQueue = new LinkedBlockingQueue<>();
上面這段代碼中,blockingQueue 的容量將設(shè)置為 Integer.MAX_VALUE 。
應(yīng)用場(chǎng)景
多用于任務(wù)隊(duì)列,單線程發(fā)布任務(wù),任務(wù)滿了就停止等待阻塞,當(dāng)任務(wù)被完成消費(fèi)少了又開(kāi)始負(fù)責(zé)發(fā)布任務(wù)。
我們來(lái)看一個(gè)例子:
- package com.niuh.queue.linked;
- import org.apache.commons.lang.RandomStringUtils;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.atomic.AtomicLong;
- public class TestLinkedBlockingQueue {
- private static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>();
- // 線程控制開(kāi)關(guān)
- private final CountDownLatch latch = new CountDownLatch(1);
- // 線程池
- private final ExecutorService pool;
- // AtomicLong 計(jì)數(shù) 生產(chǎn)數(shù)量
- private final AtomicLong output = new AtomicLong(0);
- // AtomicLong 計(jì)數(shù) 銷(xiāo)售數(shù)量
- private final AtomicLong sales = new AtomicLong(0);
- // 是否停止線程
- private final boolean clear;
- public TestLinkedBlockingQueue(boolean clear) {
- this.pool = Executors.newCachedThreadPool();
- this.clear = clear;
- }
- public void service() throws InterruptedException {
- Consumer a = new Consumer(queue, sales, latch, clear);
- pool.submit(a);
- Producer w = new Producer(queue, output, latch);
- pool.submit(w);
- latch.countDown();
- }
- public static void main(String[] args) {
- TestLinkedBlockingQueue t = new TestLinkedBlockingQueue(false);
- try {
- t.service();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 消費(fèi)者(銷(xiāo)售產(chǎn)品)
- */
- class Consumer implements Runnable {
- private final LinkedBlockingQueue<String> queue;
- private final AtomicLong sales;
- private final CountDownLatch latch;
- private final boolean clear;
- public Consumer(LinkedBlockingQueue<String> queue, AtomicLong sales, CountDownLatch latch, boolean clear) {
- this.queue = queue;
- this.sales = sales;
- this.latch = latch;
- this.clear = clear;
- }
- public void run() {
- try {
- latch.await(); // 放閘之前老實(shí)的等待著
- for (; ; ) {
- sale();
- Thread.sleep(500);
- }
- } catch (InterruptedException e) {
- if (clear) { // 響應(yīng)中斷請(qǐng)求后,如果有要求則銷(xiāo)售完隊(duì)列的產(chǎn)品后再終止線程
- cleanWarehouse();
- } else {
- System.out.println("Seller Thread will be interrupted...");
- }
- }
- }
- public void sale() {
- System.out.println("==取take=");
- try {
- String item = queue.poll(50, TimeUnit.MILLISECONDS);
- System.out.println(item);
- if (item != null) {
- sales.incrementAndGet(); // 可以聲明long型的參數(shù)獲得返回值,作為日志的參數(shù)
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- /**
- * 銷(xiāo)售完隊(duì)列剩余的產(chǎn)品
- */
- private void cleanWarehouse() {
- try {
- while (queue.size() > 0) {
- sale();
- }
- } catch (Exception ex) {
- System.out.println("Seller Thread will be interrupted...");
- }
- }
- }
- /**
- * 生產(chǎn)者(生產(chǎn)產(chǎn)品)
- *
- */
- class Producer implements Runnable {
- private LinkedBlockingQueue<String> queue;
- private CountDownLatch latch;
- private AtomicLong output;
- public Producer() {
- }
- public Producer(LinkedBlockingQueue<String> queue, AtomicLong output, CountDownLatch latch) {
- this.queue = queue;
- this.latch = latch;
- this.output = output;
- }
- public void run() {
- try {
- latch.await(); // 線程等待
- for (; ; ) {
- work();
- Thread.sleep(100);
- }
- } catch (InterruptedException e) {
- System.out.println("Producer thread will be interrupted...");
- }
- }
- /**
- * 工作
- */
- public void work() {
- try {
- String product = RandomStringUtils.randomAscii(3);
- boolean success = queue.offer(product, 100, TimeUnit.MILLISECONDS);
- if (success) {
- output.incrementAndGet();// 可以聲明long型的參數(shù)獲得返回值,作為日志的參數(shù)
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
工作原理
LinkedBlockingQueue內(nèi)部由單鏈表實(shí)現(xiàn),只能從head取元素,從tail添加元素。添加元素和獲取元素都有獨(dú)立的鎖,也就是說(shuō)LinkedBlockingQueue是讀寫(xiě)分離的,讀寫(xiě)操作可以并行執(zhí)行。LinkedBlockingQueue采用可重入鎖(ReentrantLock)來(lái)保證在并發(fā)情況下的線程安全。
向無(wú)限隊(duì)列添加元素的所有操作都將永遠(yuǎn)不會(huì)阻塞,[注意這里不是說(shuō)不會(huì)加鎖保證線程安全],因此它可以增長(zhǎng)到非常大的容量。
使用無(wú)限 BlockingQueue 設(shè)計(jì)生產(chǎn)者 - 消費(fèi)者模型時(shí)最重要的是 消費(fèi)者應(yīng)該能夠像生產(chǎn)者向隊(duì)列添加消息一樣快地消費(fèi)消息。否則,內(nèi)存可能會(huì)填滿,然后就會(huì)得到一個(gè) OutOfMemory 異常。
源碼分析
定義
LinkedBlockingQueue的類(lèi)繼承關(guān)系如下:
其包含的方法定義如下:
成員屬性
- /**
- * 節(jié)點(diǎn)類(lèi),用于存儲(chǔ)數(shù)據(jù)
- */
- static class Node<E> {
- E item;
- Node<E> next;
- Node(E x) { item = x; }
- }
- /** 阻塞隊(duì)列的大小, 默認(rèn)為Integer.MAX_VALUE */
- private final int capacity;
- /** 當(dāng)前阻塞隊(duì)列中的元素個(gè)數(shù) */
- private final AtomicInteger count = new AtomicInteger();
- /**
- * 阻塞隊(duì)列的頭節(jié)點(diǎn)
- */
- transient Node<E> head;
- /**
- * 阻塞隊(duì)列的尾節(jié)點(diǎn)
- */
- private transient Node<E> last;
- /** 獲取并移除元素時(shí)使用的鎖,如take,poll,etc */
- private final ReentrantLock takeLock = new ReentrantLock();
- /** notEmpty 條件對(duì)象,當(dāng)隊(duì)列沒(méi)有數(shù)據(jù)時(shí)用于掛起執(zhí)行刪除的線程 */
- private final Condition notEmpty = takeLock.newCondition();
- /** 添加元素時(shí)使用的鎖,如 put,offer,etc */
- private final ReentrantLock putLock = new ReentrantLock();
- /** notFull 條件對(duì)象,每當(dāng)隊(duì)列數(shù)據(jù)已滿時(shí)用于掛起執(zhí)行添加的線程 */
- private final Condition notFull = putLock.newCondition();
從上面的屬性我們知道,每個(gè)添加到LinkedBlockingQueue隊(duì)列中的數(shù)據(jù)都將被封裝成Node節(jié)點(diǎn),添加的鏈表隊(duì)列中,其中head和last分別指向隊(duì)列的頭結(jié)點(diǎn)和尾結(jié)點(diǎn)。與ArrayBlockingQueue不同的是,LinkedBlockingQueue內(nèi)部分別使用了takeLock 和 putLock 對(duì)并發(fā)進(jìn)行控制,也就是說(shuō),添加和刪除操作并不是互斥操作,可以同時(shí)進(jìn)行,這樣也就可以大大提高吞吐量。
這里如果不指定隊(duì)列的容量大小,也就是使用默認(rèn)的Integer.MAX_VALUE,如果存在添加速度大于刪除速度時(shí)候,有可能會(huì)內(nèi)存溢出,這點(diǎn)在使用前希望慎重考慮。
另外,LinkedBlockingQueue對(duì)每一個(gè)lock鎖都提供了一個(gè)Condition用來(lái)掛起和喚醒其他線程。
構(gòu)造函數(shù)
默認(rèn)的構(gòu)造函數(shù)和最后一個(gè)構(gòu)造函數(shù)創(chuàng)建的隊(duì)列大小都為 Integer.MAX_VALUE,只有第二個(gè)構(gòu)造函數(shù)用戶可以指定隊(duì)列的大小。第二個(gè)構(gòu)造函數(shù)最后初始化了last和head節(jié)點(diǎn),讓它們都指向了一個(gè)元素為null的節(jié)點(diǎn)。
最后一個(gè)構(gòu)造函數(shù)使用了putLock來(lái)進(jìn)行加鎖,但是這里并不是為了多線程的競(jìng)爭(zhēng)而加鎖,只是為了放入的元素能立即對(duì)其他線程可見(jiàn)。
- public LinkedBlockingQueue() {
- // 默認(rèn)大小為Integer.MAX_VALUE
- this(Integer.MAX_VALUE);
- }
- public LinkedBlockingQueue(int capacity) {
- if (capacity <= 0) throw new IllegalArgumentException();
- this.capacity = capacity;
- last = head = new Node<E>(null);
- }
- public LinkedBlockingQueue(Collection<? extends E> c) {
- this(Integer.MAX_VALUE);
- final ReentrantLock putLock = this.putLock;
- putLock.lock(); // Never contended, but necessary for visibility
- try {
- int n = 0;
- for (E e : c) {
- if (e == null)
- throw new NullPointerException();
- if (n == capacity)
- throw new IllegalStateException("Queue full");
- enqueue(new Node<E>(e));
- ++n;
- }
- count.set(n);
- } finally {
- putLock.unlock();
- }
- }
入隊(duì)方法
LinkedBlockingQueue提供了多種入隊(duì)操作的實(shí)現(xiàn)來(lái)滿足不同情況下的需求,入隊(duì)操作有如下幾種:
- void put(E e);
- boolean offer(E e);
- boolean offer(E e, long timeout, TimeUnit unit)。
其中:
- offer方法有兩個(gè)重載版本,只有一個(gè)參數(shù)的版本,如果隊(duì)列滿了就返回false,否則加入到隊(duì)列中,返回true,add方法就是調(diào)用此版本的offer方法;另一個(gè)帶時(shí)間參數(shù)的版本,如果隊(duì)列滿了則等待,可指定等待的時(shí)間,如果這期間中斷了則拋出異常,如果等待超時(shí)了則返回false,否則加入到隊(duì)列中返回true;
- put方法跟帶時(shí)間參數(shù)的offer方法邏輯一樣,不過(guò)沒(méi)有等待的時(shí)間限制,會(huì)一直等待直到隊(duì)列有空余位置了,再插入到隊(duì)列中,返回true。
put(E e)
- public void put(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- int c = -1;
- Node<E> node = new Node<E>(e);
- final ReentrantLock putLock = this.putLock;
- final AtomicInteger count = this.count;
- // 獲取鎖中斷
- putLock.lockInterruptibly();
- try {
- //判斷隊(duì)列是否已滿,如果已滿阻塞等待
- while (count.get() == capacity) {
- notFull.await();
- }
- // 把node放入隊(duì)列中
- enqueue(node);
- c = count.getAndIncrement();
- // 再次判斷隊(duì)列是否有可用空間,如果有喚醒下一個(gè)線程進(jìn)行添加操作
- if (c + 1 < capacity)
- notFull.signal();
- } finally {
- putLock.unlock();
- }
- // 如果隊(duì)列中有一條數(shù)據(jù),喚醒消費(fèi)線程進(jìn)行消費(fèi)
- if (c == 0)
- signalNotEmpty();
- }
小結(jié)put方法來(lái)看,它總共做了以下情況的考慮:
- 隊(duì)列已滿,阻塞等待。
- 隊(duì)列未滿,創(chuàng)建一個(gè)node節(jié)點(diǎn)放入隊(duì)列中,如果放完以后隊(duì)列還有剩余空間,繼續(xù)喚醒下一個(gè)添加線程進(jìn)行添加。如果放之前隊(duì)列中沒(méi)有元素,放完以后要喚醒消費(fèi)線程進(jìn)行消費(fèi)。
我們?cè)倏纯磒ut方法中用到的幾個(gè)其他方法,先來(lái)看看 enqueue(Node node) 方法:
- private void enqueue(Node<E> node) {
- last = last.next = node;
- }
用一張圖來(lái)看看往隊(duì)列里依次放入元素A和元素B,如下:
接下來(lái)我們看看signalNotEmpty,順帶著看signalNotFull方法。
- private void signalNotEmpty() {
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lock();
- try {
- notEmpty.signal();
- } finally {
- takeLock.unlock();
- }
- }
- private void signalNotFull() {
- final ReentrantLock putLock = this.putLock;
- putLock.lock();
- try {
- notFull.signal();
- } finally {
- putLock.unlock();
- }
- }
為什么要這么寫(xiě)?因?yàn)閟ignal的時(shí)候要獲取到該signal對(duì)應(yīng)的Condition對(duì)象的鎖才行。
offer(E e)
- public boolean offer(E e) {
- if (e == null) throw new NullPointerException();
- final AtomicInteger count = this.count;
- if (count.get() == capacity)
- return false;
- int c = -1;
- Node<E> node = new Node<E>(e);
- final ReentrantLock putLock = this.putLock;
- putLock.lock();
- try {
- // 隊(duì)列有可用空間,放入node節(jié)點(diǎn),判斷放入元素后是否還有可用空間,
- // 如果有,喚醒下一個(gè)添加線程進(jìn)行添加操作。
- if (count.get() < capacity) {
- enqueue(node);
- c = count.getAndIncrement();
- if (c + 1 < capacity)
- notFull.signal();
- }
- } finally {
- putLock.unlock();
- }
- if (c == 0)
- signalNotEmpty();
- return c >= 0;
- }
可以看到offer僅僅對(duì)put方法改動(dòng)了一點(diǎn)點(diǎn),當(dāng)隊(duì)列沒(méi)有可用元素的時(shí)候,不同于put方法的阻塞等待,offer方法直接方法false。
offer(E e, long timeout, TimeUnit unit)
- public boolean offer(E e, long timeout, TimeUnit unit)
- throws InterruptedException {
- if (e == null) throw new NullPointerException();
- long nanos = unit.toNanos(timeout);
- int c = -1;
- final ReentrantLock putLock = this.putLock;
- final AtomicInteger count = this.count;
- putLock.lockInterruptibly();
- try {
- // 等待超時(shí)時(shí)間nanos,超時(shí)時(shí)間到了返回false
- while (count.get() == capacity) {
- if (nanos <= 0)
- return false;
- nanos = notFull.awaitNanos(nanos);
- }
- enqueue(new Node<E>(e));
- c = count.getAndIncrement();
- if (c + 1 < capacity)
- notFull.signal();
- } finally {
- putLock.unlock();
- }
- if (c == 0)
- signalNotEmpty();
- return true;
- }
該方法只是對(duì)offer方法進(jìn)行了阻塞超時(shí)處理,使用了Condition的awaitNanos來(lái)進(jìn)行超時(shí)等待,這里為什么要用while循環(huán)?因?yàn)閍waitNanos方法是可中斷的,為了防止在等待過(guò)程中線程被中斷,這里使用while循環(huán)進(jìn)行等待過(guò)程中中斷的處理,繼續(xù)等待剩下需等待的時(shí)間。
出隊(duì)方法
入隊(duì)列的方法說(shuō)完后,我們來(lái)說(shuō)說(shuō)出隊(duì)列的方法。LinkedBlockingQueue提供了多種出隊(duì)操作的實(shí)現(xiàn)來(lái)滿足不同情況下的需求,如下:
- E take();
- E poll();
- E poll(long timeout, TimeUnit unit);
take()
- public E take() throws InterruptedException {
- E x;
- int c = -1;
- final AtomicInteger count = this.count;
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lockInterruptibly();
- try {
- // 隊(duì)列為空,阻塞等待
- while (count.get() == 0) {
- notEmpty.await();
- }
- x = dequeue();
- c = count.getAndDecrement();
- // 隊(duì)列中還有元素,喚醒下一個(gè)消費(fèi)線程進(jìn)行消費(fèi)
- if (c > 1)
- notEmpty.signal();
- } finally {
- takeLock.unlock();
- }
- // 移除元素之前隊(duì)列是滿的,喚醒生產(chǎn)線程進(jìn)行添加元素
- if (c == capacity)
- signalNotFull();
- return x;
- }
take方法看起來(lái)就是put方法的逆向操作,它總共做了以下情況的考慮:
- 隊(duì)列為空,阻塞等待
- 隊(duì)列不為空,從對(duì)首獲取并移除一個(gè)元素,如果消費(fèi)后還有元素在隊(duì)列中,繼續(xù)喚醒下一個(gè)消費(fèi)線程進(jìn)行元素移除。如果放之前隊(duì)列是滿元素的情況,移除完后需要喚醒生產(chǎn)線程進(jìn)行添加元素。
我們來(lái)看看dequeue方法
- private E dequeue() {
- // 獲取到head節(jié)點(diǎn)
- Node<E> h = head;
- // 獲取到head節(jié)點(diǎn)指向的下一個(gè)節(jié)點(diǎn)
- Node<E> first = h.next;
- // head節(jié)點(diǎn)原來(lái)指向的節(jié)點(diǎn)的next指向自己,等待下次gc回收
- h.next = h; // help GC
- // head節(jié)點(diǎn)指向新的節(jié)點(diǎn)
- head = first;
- // 獲取到新的head節(jié)點(diǎn)的item值
- E x = first.item;
- // 新head節(jié)點(diǎn)的item值設(shè)置為null
- first.item = null;
- return x;
- }
我們結(jié)合注釋和圖來(lái)看一下鏈表算法:

其實(shí)這個(gè)寫(xiě)法看起來(lái)很繞,我們其實(shí)也可以這么寫(xiě):
- private E dequeue() {
- // 獲取到head節(jié)點(diǎn)
- Node<E> h = head;
- // 獲取到head節(jié)點(diǎn)指向的下一個(gè)節(jié)點(diǎn),也就是節(jié)點(diǎn)A
- Node<E> first = h.next;
- // 獲取到下下個(gè)節(jié)點(diǎn),也就是節(jié)點(diǎn)B
- Node<E> next = first.next;
- // head的next指向下下個(gè)節(jié)點(diǎn),也就是圖中的B節(jié)點(diǎn)
- h.next = next;
- // 得到節(jié)點(diǎn)A的值
- E x = first.item;
- first.item = null; // help GC
- first.next = first; // help GC
- return x;
- }
poll()
- public E poll() {
- final AtomicInteger count = this.count;
- if (count.get() == 0)
- return null;
- E x = null;
- int c = -1;
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lock();
- try {
- if (count.get() > 0) {
- x = dequeue();
- c = count.getAndDecrement();
- if (c > 1)
- notEmpty.signal();
- }
- } finally {
- takeLock.unlock();
- }
- if (c == capacity)
- signalNotFull();
- return x;
- }
poll方法去除了take方法中元素為空后阻塞等待這一步驟,這里也就不詳細(xì)說(shuō)了。同理,poll(long timeout, TimeUnit unit)也和offer(E e, long timeout, TimeUnit unit)一樣,利用了Condition的awaitNanos方法來(lái)進(jìn)行阻塞等待直至超時(shí)。這里就不列出來(lái)說(shuō)了。
獲取元素方法
- public E peek() {
- if (count.get() == 0)
- return null;
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lock();
- try {
- Node<E> first = head.next;
- if (first == null)
- return null;
- else
- return first.item;
- } finally {
- takeLock.unlock();
- }
- }
加鎖后,獲取到head節(jié)點(diǎn)的next節(jié)點(diǎn),如果為空返回null,如果不為空,返回next節(jié)點(diǎn)的item值。
刪除元素方法
- public boolean remove(Object o) {
- if (o == null) return false;
- // 兩個(gè)lock全部上鎖
- fullyLock();
- try {
- // 從head開(kāi)始遍歷元素,直到最后一個(gè)元素
- for (Node<E> trail = head, p = trail.next;
- p != null;
- trail = p, p = p.next) {
- // 如果找到相等的元素,調(diào)用unlink方法刪除元素
- if (o.equals(p.item)) {
- unlink(p, trail);
- return true;
- }
- }
- return false;
- } finally {
- // 兩個(gè)lock全部解鎖
- fullyUnlock();
- }
- }
- void fullyLock() {
- putLock.lock();
- takeLock.lock();
- }
- void fullyUnlock() {
- takeLock.unlock();
- putLock.unlock();
- }
因?yàn)閞emove方法使用兩個(gè)鎖全部上鎖,所以其他操作都需要等待它完成,而該方法需要從head節(jié)點(diǎn)遍歷到尾節(jié)點(diǎn),所以時(shí)間復(fù)雜度為O(n)。我們來(lái)看看unlink方法。
- void unlink(Node<E> p, Node<E> trail) {
- // p的元素置為null
- p.item = null;
- // p的前一個(gè)節(jié)點(diǎn)的next指向p的next,也就是把p從鏈表中去除了
- trail.next = p.next;
- // 如果last指向p,刪除p后讓last指向trail
- if (last == p)
- last = trail;
- // 如果刪除之前元素是滿的,刪除之后就有空間了,喚醒生產(chǎn)線程放入元素
- if (count.getAndDecrement() == capacity)
- notFull.signal();
- }
總結(jié)
LinkedBlockingQueue是一個(gè)阻塞隊(duì)列,內(nèi)部由兩個(gè)ReentrantLock來(lái)實(shí)現(xiàn)出入隊(duì)列的線程安全,由各自的Condition對(duì)象的await和signal來(lái)實(shí)現(xiàn)等待和喚醒功能。它和ArrayBlockingQueue的不同點(diǎn)在于:
- 隊(duì)列大小有所不同,ArrayBlockingQueue是有界的初始化必須指定大小,而LinkedBlockingQueue可以是有界的也可以是無(wú)界的(Integer.MAX_VALUE),對(duì)于后者而言,當(dāng)添加速度大于移除速度時(shí),在無(wú)界的情況下,可能會(huì)造成內(nèi)存溢出等問(wèn)題。
- 數(shù)據(jù)存儲(chǔ)容器不同,ArrayBlockingQueue采用的是數(shù)組作為數(shù)據(jù)存儲(chǔ)容器,而LinkedBlockingQueue采用的則是以Node節(jié)點(diǎn)作為連接對(duì)象的鏈表。
- 由于ArrayBlockingQueue采用的是數(shù)組的存儲(chǔ)容器,因此在插入或刪除元素時(shí)不會(huì)產(chǎn)生或銷(xiāo)毀任何額外的對(duì)象實(shí)例,而LinkedBlockingQueue則會(huì)生成一個(gè)額外的Node對(duì)象。這可能在長(zhǎng)時(shí)間內(nèi)需要高效并發(fā)地處理大批量數(shù)據(jù)的時(shí),對(duì)于GC可能存在較大影響。
- 兩者的實(shí)現(xiàn)隊(duì)列添加或移除的鎖不一樣,ArrayBlockingQueue實(shí)現(xiàn)的隊(duì)列中的鎖是沒(méi)有分離的,即添加操作和移除操作采用的同一個(gè)ReenterLock鎖,而LinkedBlockingQueue實(shí)現(xiàn)的隊(duì)列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高隊(duì)列的吞吐量,也意味著在高并發(fā)的情況下生產(chǎn)者和消費(fèi)者可以并行地操作隊(duì)列中的數(shù)據(jù),以此來(lái)提高整個(gè)隊(duì)列的并發(fā)性能。
PS:以上代碼提交在 Github :
https://github.com/Niuh-Study/niuh-juc-final.git
文章持續(xù)更新,可以公眾號(hào)搜一搜「 一角錢(qián)技術(shù) 」第一時(shí)間閱讀, 本文 GitHub org_hejianhui/JavaStudy 已經(jīng)收錄,歡迎 Star。