自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

1.1w字,10圖徹底掌握阻塞隊(duì)列(并發(fā)必備)

開發(fā) 前端
隊(duì)列是一種 先進(jìn)先出的特殊線性表,簡稱 FIFO。特殊之處在于只允許在一端插入,在另一端刪除,進(jìn)行插入操作的端稱為隊(duì)尾,進(jìn)行刪除操作的端稱為隊(duì)頭。隊(duì)列中沒有元素時(shí),稱為空隊(duì)列。

[[374753]]

本文轉(zhuǎn)載自微信公眾號(hào)「源碼興趣圈」,作者malt  。轉(zhuǎn)載本文請聯(lián)系源碼興趣圈公眾號(hào)。

什么是隊(duì)列

隊(duì)列是一種 先進(jìn)先出的特殊線性表,簡稱 FIFO。特殊之處在于只允許在一端插入,在另一端刪除

進(jìn)行插入操作的端稱為隊(duì)尾,進(jìn)行刪除操作的端稱為隊(duì)頭。隊(duì)列中沒有元素時(shí),稱為空隊(duì)列

隊(duì)列在程序設(shè)計(jì)中使用非常的多,包括一些中間件底層數(shù)據(jù)結(jié)構(gòu)就是隊(duì)列(基礎(chǔ)內(nèi)容沒有過多講解)

 

什么是阻塞隊(duì)列

隊(duì)列就隊(duì)列唄,阻塞隊(duì)列又是什么鬼

阻塞隊(duì)列是在隊(duì)列的基礎(chǔ)上額外添加兩個(gè)操作的隊(duì)列,分別是:

 

  1. 支持阻塞的插入方法:隊(duì)列容量滿時(shí),插入元素線程會(huì)被阻塞,直到隊(duì)列有多余容量為止
  2. 支持阻塞的移除方法:當(dāng)隊(duì)列中無元素時(shí),移除元素的線程會(huì)被阻塞,直到隊(duì)列有元素可被移除

文章以 LinkedBlockingQueue 為例,講述隊(duì)列之間實(shí)現(xiàn)的不同點(diǎn),為方便小伙伴閱讀,LinkedBlockingQueue 取別名 LBQ

因?yàn)槭窃创a解析文章,建議小伙伴們在 PC 端觀看。當(dāng)然,如果屏足夠大當(dāng)我沒說~

阻塞隊(duì)列繼承關(guān)系

阻塞隊(duì)列是一個(gè)抽象的叫法,阻塞隊(duì)列底層數(shù)據(jù)結(jié)構(gòu) 可以是數(shù)組,可以是單向鏈表,亦或者是雙向鏈表...

LBQ 是一個(gè)以 單向鏈表組成的隊(duì)列,下圖為 LBQ 上下繼承關(guān)系圖

從圖中得知,LBQ 實(shí)現(xiàn)了 BlockingQueue 接口,BlockingQueue 實(shí)現(xiàn)了 Queue 接口

 

Queue 接口分析

我們以自上而下的方式,先分析一波 Queue 接口里都定義了哪些方法

  1. // 如果隊(duì)列容量允許,立即將元素插入隊(duì)列,成功后返回 
  2. // 🌟如果隊(duì)列容量已滿,則拋出異常 
  3. boolean add(E e); 
  4.  
  5. //  如果隊(duì)列容量允許,立即將元素插入隊(duì)列,成功后返回 
  6. // 🌟如果隊(duì)列容量已滿,則返回 false 
  7. // 當(dāng)使用有界隊(duì)列時(shí),offer 比 add 方法更何時(shí) 
  8. boolean offer(E e); 
  9.  
  10. // 檢索并刪除隊(duì)列的頭節(jié)點(diǎn),返回值為刪除的隊(duì)列頭節(jié)點(diǎn) 
  11. // 🌟如果隊(duì)列為空則拋出異常 
  12. E remove(); 
  13.  
  14. // 檢索并刪除隊(duì)列的頭節(jié)點(diǎn),返回值為刪除的隊(duì)列頭節(jié)點(diǎn) 
  15. // 🌟如果隊(duì)列為空則返回 null 
  16. E poll(); 
  17.  
  18. // 檢查但不刪除隊(duì)列頭節(jié)點(diǎn) 
  19. // 🌟如果隊(duì)列為空則拋出異常 
  20. E element(); 
  21.  
  22. // 檢查但不刪除隊(duì)列頭節(jié)點(diǎn) 
  23. // 🌟如果隊(duì)列為空則返回 null 
  24. E peek(); 

總結(jié)一下 Queue 接口的方法,分為三個(gè)大類:

  1. 新增元素到隊(duì)列容器中:add、offer
  2. 從隊(duì)列容器中移除元素:remove、poll
  3. 查詢隊(duì)列頭節(jié)點(diǎn)是否為空:element、peek

從接口 API 的程序健壯性考慮,可以分為兩大類:

  1. 健壯 API:offer、poll、peek
  2. 非健壯 API:add、remove、element

接口 API 并無健壯可言,這里說的健壯界限指得是,使用了非健壯性的 API 接口,程序會(huì)出錯(cuò)的幾率大了點(diǎn),所以我們 更應(yīng)該關(guān)注的是如何捕獲可能出現(xiàn)的異常,以及對應(yīng)異常處理

BlockingQueue 接口分析

BlockingQueue 接口繼承自 Queue 接口,所以有些語義相同的 API 接口就沒有放上來解讀

  1. // 將指定元素插入隊(duì)列,如果隊(duì)列已滿,等待直到有空間可用;通過 throws 異常得知,可在等待時(shí)打斷 
  2. // 🌟相對于 Queue 接口而言,是一個(gè)全新的方法 
  3. void put(E e) throws InterruptedException; 
  4.  
  5. // 將指定元素插入隊(duì)列,如果隊(duì)列已滿,在等待指定的時(shí)間內(nèi)等待騰出空間;通過 throws 異常得知,可在等待時(shí)打斷 
  6. // 🌟相當(dāng)于是 offer(E e) 的擴(kuò)展方法 
  7. boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; 
  8.  
  9. // 檢索并除去此隊(duì)列的頭節(jié)點(diǎn),如有必要,等待直到元素可用;通過 throws 異常得知,可在等待時(shí)打斷 
  10. E take() throws InterruptedException; 
  11.  
  12. // 檢索并刪除此隊(duì)列的頭,如果有必要使元素可用,則等待指定的等待時(shí)間;通過 throws 異常得知,可在等待時(shí)打斷 
  13. // 🌟相當(dāng)于是 poll() 的擴(kuò)展方法 
  14. E poll(long timeout, TimeUnit unit) throws InterruptedException; 
  15.  
  16. // 返回隊(duì)列剩余容量,如果為無界隊(duì)列,返回 Integer.MAX_VALUE 
  17. int remainingCapacity(); 
  18.  
  19. // 如果此隊(duì)列包含指定的元素,則返回 true 
  20. public boolean contains(Object o); 
  21.  
  22. // 從此隊(duì)列中刪除所有可用元素,并將它們添加到給定的集合中 
  23. int drainTo(Collection<? super E> c); 
  24.  
  25. // 從此隊(duì)列中最多移除給定數(shù)量的可用元素,并將它們添加到給定的集合中 
  26. int drainTo(Collection<? super E> c, int maxElements); 

可以看到 BlockingQueue 接口中個(gè)性化的方法還是挺多的。本文的豬腳 LBQ 就是實(shí)現(xiàn)自 BlockingQueue 接口

源碼解析

變量分析LBQ 為了保證并發(fā)添加、移除等操作,使用了 JUC 包下的 ReentrantLock、Condition 控制

  1. // take, poll 等移除操作需要持有的鎖 
  2. private final ReentrantLock takeLock = new ReentrantLock(); 
  3. // 當(dāng)隊(duì)列沒有數(shù)據(jù)時(shí),刪除元素線程被掛起 
  4. private final Condition notEmpty = takeLock.newCondition(); 
  5. // put, offer 等新增操作需要持有的鎖 
  6. private final ReentrantLock putLock = new ReentrantLock(); 
  7. // 當(dāng)隊(duì)列為空時(shí),添加元素線程被掛起 
  8. private final Condition notFull = putLock.newCondition(); 

ArrayBlockingQueue(ABQ)內(nèi)部元素個(gè)數(shù)字段為什么使用的是 int 類型的 count 變量?不擔(dān)心并發(fā)么

  1. 因?yàn)?ABQ 內(nèi)部使用的一把鎖控制入隊(duì)、出隊(duì)操作,同一時(shí)刻只會(huì)有單線程執(zhí)行 count 變量修改
  2. LBQ 使用的兩把鎖,所以會(huì)出現(xiàn)兩個(gè)線程同時(shí)修改 count 數(shù)值,如果像 ABQ 使用 int 類型,兩個(gè)流程同時(shí)執(zhí)行修改 count 個(gè)數(shù),會(huì)造成數(shù)據(jù)不準(zhǔn)確,所以需要使用并發(fā)原子類修飾

如果不太明白為什么要用原子類統(tǒng)計(jì)數(shù)量,猛戳這里

接下來從結(jié)構(gòu)體入手,知道它是由什么元素組成,每個(gè)元素是做啥使的。如果數(shù)據(jù)結(jié)構(gòu)還不錯(cuò)的小伙伴,應(yīng)該可以猜出來

  1. // 綁定的容量,如果無界,則為 Integer.MAX_VALUE 
  2. private final int capacity; 
  3. // 當(dāng)前隊(duì)列中元素個(gè)數(shù) 
  4. private final AtomicInteger count = new AtomicInteger(); 
  5. // 當(dāng)前隊(duì)列的頭節(jié)點(diǎn) 
  6. transient Node<E> head; 
  7. // 當(dāng)前隊(duì)列的尾節(jié)點(diǎn) 
  8. private transient Node<E> last

看到 head 和 last 元素,是不是對 LBQ 就有個(gè)大致的雛形了,這個(gè)時(shí)候還差一個(gè)結(jié)構(gòu)體 Node

  1. static class Node<E> { 
  2.     // 節(jié)點(diǎn)存儲(chǔ)的元素 
  3.     E item; 
  4.     // 當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn) 
  5.     LinkedBlockingQueue.Node<E> next
  6.  
  7.     Node(E x) { item = x; } 

構(gòu)造器分析這里畫一張圖來理解下 LBQ 默認(rèn)構(gòu)造方法是如何初始化隊(duì)列的

  1. public LinkedBlockingQueue() { 
  2.     this(Integer.MAX_VALUE); 
  3.  
  4. public LinkedBlockingQueue(int capacity) { 
  5.     if (capacity <= 0) throw new IllegalArgumentException(); 
  6.     this.capacity = capacity; 
  7.     last = head = new Node<E>(null); 

可以看出,默認(rèn)構(gòu)造方法會(huì)將容量設(shè)置為 Integer.MAX_VALUE,也就是大家常說的無界隊(duì)列

內(nèi)部其實(shí)調(diào)用的是重載的有參構(gòu)造,方法內(nèi)部設(shè)置了容量大小,以及初始化了 item 為空的 Node 節(jié)點(diǎn),把 head last 兩節(jié)點(diǎn)進(jìn)行一個(gè)關(guān)聯(lián)

初始化的隊(duì)列 head last 節(jié)點(diǎn)指向的 Node 中 item、next 都為空,此時(shí)添加一條記錄,隊(duì)列會(huì)發(fā)生什么樣的變化

 

節(jié)點(diǎn)入隊(duì)

需要添加的元素會(huì)被封裝為 Node 添加到隊(duì)列中, put 入隊(duì)方法語義,如果隊(duì)列元素已滿,阻塞當(dāng)前插入線程,直到隊(duì)列中有空缺位置被喚醒

  1. public void put(E e) throws InterruptedException { 
  2.     if (e == null) throw new NullPointerException(); 
  3.     int c = -1; 
  4.     Node<E> node = new Node<E>(e);  // 將需要添加的數(shù)據(jù)封裝為 Node 
  5.     final ReentrantLock putLock = this.putLock;  // 獲取添加操作的鎖 
  6.     final AtomicInteger count = this.count;  // 獲取隊(duì)列實(shí)際元素?cái)?shù)量 
  7.     putLock.lockInterruptibly();  // 運(yùn)行可被中斷加鎖 API 
  8.     try { 
  9.         while (count.get() == capacity) {  // 如果隊(duì)列元素?cái)?shù)量 == 隊(duì)列最大值,則將線程放入條件隊(duì)列阻塞 
  10.             notFull.await(); 
  11.         } 
  12.         enqueue(node);  // 執(zhí)行入隊(duì)流程 
  13.         c = count.getAndIncrement();  // 獲取值并且自增,舉例:count = 0,執(zhí)行后結(jié)果值 count+1 = 2,返回 0 
  14.         if (c + 1 < capacity)  // 如果自增過的隊(duì)列元素 +1 小于隊(duì)列容器最大數(shù)量,喚醒一條被阻塞在插入等待隊(duì)列的線程 
  15.             notFull.signal(); 
  16.     } finally { 
  17.         putLock.unlock();  // 解鎖操作 
  18.     } 
  19.     if (c == 0)  // 當(dāng)隊(duì)列中有一條數(shù)據(jù),則喚醒消費(fèi)組線程進(jìn)行消費(fèi) 
  20.         signalNotEmpty(); 

入隊(duì)方法整體流程比較清晰,做了以下幾件事:

  1. 隊(duì)列已滿,則將當(dāng)前線程阻塞
  2. 隊(duì)列中如果有空缺位置,將數(shù)據(jù)封裝的 Node 執(zhí)行入隊(duì)操作
  3. 如果 Node 執(zhí)行入隊(duì)操作后,隊(duì)列還有空余位置,則喚醒等待隊(duì)列中的添加線程
  4. 如果數(shù)據(jù)入隊(duì)前隊(duì)列沒有元素,入隊(duì)成功后喚醒消費(fèi)阻塞隊(duì)列中的線程

繼續(xù)看一下入隊(duì)方法 LBQ#enqueue 都做了什么操作

  1. private void enqueue(Node<E> node) { 
  2.     last = last.next = node; 

代碼比較簡單,先把 node 賦值為當(dāng)前 last 節(jié)點(diǎn)的 next 屬性,然后再把 last 節(jié)點(diǎn)指向 node,就完成了節(jié)點(diǎn)入隊(duì)操作

假設(shè) LBQ 的范型是 String 字符串,首先插入元素 a,隊(duì)列如下圖所示:

什么?一條數(shù)據(jù)不過癮?沒有什么是再來一條解決不了的,元素 b 入隊(duì)如下:

隊(duì)列入隊(duì)如上圖所示,head 中 item 永為空,last 中 next 永為空

 

LBQ#offer 也是入隊(duì)方法,不同的是:如果隊(duì)列元素已滿,則直接返回 false,不阻塞線程

節(jié)點(diǎn)出隊(duì)

LBQ#take 出隊(duì)方法,如果隊(duì)列中元素為空,阻塞當(dāng)前出隊(duì)線程,直到隊(duì)列中有元素為止

  1. public E take() throws InterruptedException { 
  2.     E x; 
  3.     int c = -1; 
  4.     final AtomicInteger count = this.count;  // 獲取當(dāng)前隊(duì)列實(shí)際元素個(gè)數(shù) 
  5.     final ReentrantLock takeLock = this.takeLtakeLocock;  // 獲取 takeLock 鎖實(shí)例 
  6.     takeLock.lockInterruptibly();  // 獲取 takeLock 鎖,獲取不到阻塞過程中,可被中斷 
  7.     try { 
  8.         while (count.get() == 0) {  // 如果當(dāng)前隊(duì)列元素 == 0,當(dāng)前獲取節(jié)點(diǎn)線程加入等待隊(duì)列 
  9.             notEmpty.await(); 
  10.         } 
  11.         x = dequeue();  // 當(dāng)前隊(duì)列元素 > 0,執(zhí)行頭節(jié)點(diǎn)出隊(duì)操作 
  12.         c = count.getAndDecrement();  // 獲取當(dāng)前隊(duì)列元素個(gè)數(shù),并將數(shù)量 - 1 
  13.         if (c > 1)  // 當(dāng)隊(duì)列中還有還有元素時(shí),喚醒下一個(gè)消費(fèi)線程進(jìn)行消費(fèi) 
  14.             notEmpty.signal(); 
  15.     } finally { 
  16.         takeLock.unlock();  // 釋放鎖 
  17.     } 
  18.     if (c == capacity)  // 移除元素之前隊(duì)列是滿的,喚醒生產(chǎn)者線程添加元素 
  19.         signalNotFull(); 
  20.     return x;  // 返回頭節(jié)點(diǎn) 

出隊(duì)操作整體流程清晰明了,和入隊(duì)操作執(zhí)行流程相似

隊(duì)列已滿,則將當(dāng)前出隊(duì)線程阻塞

隊(duì)列中如果有元素可消費(fèi),執(zhí)行節(jié)點(diǎn)出隊(duì)操作

如果節(jié)點(diǎn)出隊(duì)后,隊(duì)列中還有可出隊(duì)元素,則喚醒等待隊(duì)列中的出隊(duì)線程

如果移除元素之前隊(duì)列是滿的,喚醒生產(chǎn)者線程添加元素

LBQ#dequeue 出隊(duì)操作相對于入隊(duì)操作稍顯復(fù)雜一些

  1. private E dequeue() { 
  2.     Node<E> h = head;  // 獲取隊(duì)列頭節(jié)點(diǎn) 
  3.     Node<E> first = h.next;  // 獲取頭節(jié)點(diǎn)的后繼節(jié)點(diǎn) 
  4.     h.next = h; // help GC 
  5.     head = first;  // 相當(dāng)于把頭節(jié)點(diǎn)的后繼節(jié)點(diǎn),設(shè)置為新的頭節(jié)點(diǎn) 
  6.     E x = first.item;  // 獲取到新的頭節(jié)點(diǎn) item 
  7.     first.item = null;  // 因?yàn)轭^節(jié)點(diǎn) item 為空,所以 item 賦值為 null 
  8.     return x; 

出隊(duì)流程中,會(huì)將原頭節(jié)點(diǎn)自己指向自己本身,這么做是為了幫助 GC 回收當(dāng)前節(jié)點(diǎn),接著將原 head 的 next 節(jié)點(diǎn)設(shè)置為新的 head,下圖為一個(gè)完整的出隊(duì)流程

出隊(duì)流程圖如上,流程中沒有特別注意的點(diǎn)。另外一個(gè) LBQ#poll 出隊(duì)方法,如果隊(duì)列中元素為空,返回 null,不會(huì)像 take 一樣阻塞

 

節(jié)點(diǎn)查詢

因?yàn)?element 查找方法在父類 AbstractQueue 里實(shí)現(xiàn)的,LBQ 里只對 peek 方法進(jìn)行了實(shí)現(xiàn),節(jié)點(diǎn)查詢就用 peek 做代表了

peek 和 element 都是獲取隊(duì)列頭節(jié)點(diǎn)數(shù)據(jù),兩者的區(qū)別是,前者如果隊(duì)列為空返回 null,后者拋出相關(guān)異常

  1. public E peek() { 
  2.     if (count.get() == 0)  // 隊(duì)列為空返回 null 
  3.         return null
  4.     final ReentrantLock takeLock = this.takeLock; 
  5.     takeLock.lock();  // 獲取鎖 
  6.     try { 
  7.         LinkedBlockingQueue.Node<E> first = head.next;  // 獲取頭節(jié)點(diǎn)的 next 后繼節(jié)點(diǎn) 
  8.         if (first == null)  // 如果后繼節(jié)點(diǎn)為空,返回 null,否則返回后繼節(jié)點(diǎn)的 item 
  9.             return null
  10.         else 
  11.             return first.item; 
  12.     } finally { 
  13.         takeLock.unlock();  // 解鎖 
  14.     } 

看到這里,能夠得到結(jié)論,雖然 head 節(jié)點(diǎn) item 永遠(yuǎn)為 null,但是 peek 方法獲取的是 head.next 節(jié)點(diǎn) item

節(jié)點(diǎn)刪除

刪除操作需要獲得兩把鎖,所以關(guān)于獲取節(jié)點(diǎn)、節(jié)點(diǎn)出隊(duì)、節(jié)點(diǎn)入隊(duì)等操作都會(huì)被阻塞

  1. public boolean remove(Object o) { 
  2.     if (o == nullreturn false
  3.     fullyLock();  // 獲取兩把鎖 
  4.     try { 
  5.         // 從頭節(jié)點(diǎn)開始,循環(huán)遍歷隊(duì)列 
  6.         for (Node<E> trail = head, p = trail.next
  7.              p != null
  8.              trail = p, p = p.next) { 
  9.             if (o.equals(p.item)) {  // item == o 執(zhí)行刪除操作 
  10.                 unlink(p, trail);  // 刪除操作 
  11.                 return true
  12.             } 
  13.         } 
  14.         return false
  15.     } finally { 
  16.         fullyUnlock();  // 釋放兩把鎖 
  17.     } 

鏈表刪除操作,一般而言都是循環(huán)逐條遍歷,而這種的 遍歷時(shí)間復(fù)雜度為 O(n),最壞情況就是遍歷了鏈表全部節(jié)點(diǎn)

看一下 LBQ#remove 中 unlink 是如何取消節(jié)點(diǎn)關(guān)聯(lián)的

  1. void unlink(Node<E> p, Node<E> trail) { 
  2.     p.item = null;  // 以第一次遍歷而言,trail 是頭節(jié)點(diǎn),p 為頭節(jié)點(diǎn)的后繼節(jié)點(diǎn) 
  3.     trail.next = p.next;  // 把頭節(jié)點(diǎn)的后繼指針,設(shè)置為 p 節(jié)點(diǎn)的后繼指針 
  4.     if (last == p)  // 如果 p == last 設(shè)置 last == trail 
  5.         last = trail; 
  6.     // 如果刪除元素前隊(duì)列是滿的,刪除后就有了空余位置,喚醒生產(chǎn)線程 
  7.     if (count.getAndDecrement() == capacity) 
  8.         notFull.signal(); 

remove 方法和 take 方法是有相似之處,如果 remove 方法的元素是頭節(jié)點(diǎn),效果和 take 一致,頭節(jié)點(diǎn)元素出隊(duì)

為了更好的理解,我們刪除中間元素。畫兩張圖理解下其中原委,代碼如下:

  1. public static void main(String[] args) { 
  2.     BlockingQueue<String> blockingQueue = new LinkedBlockingQueue(); 
  3.     blockingQueue.offer("a"); 
  4.     blockingQueue.offer("b"); 
  5.     blockingQueue.offer("c"); 
  6.     // 刪除隊(duì)列中間元素 
  7.     blockingQueue.remove("b"); 

執(zhí)行完上述代碼中三個(gè) offer 操作,隊(duì)列結(jié)構(gòu)圖如下:

執(zhí)行刪除元素 b 操作后隊(duì)列結(jié)構(gòu)如下圖:

如果 p 節(jié)點(diǎn)就是 last 尾節(jié)點(diǎn),則把 p 的前驅(qū)節(jié)點(diǎn)設(shè)置為新的尾節(jié)點(diǎn)。刪除操作大致如此

 

應(yīng)用場景

上文說了阻塞隊(duì)列被大量業(yè)務(wù)場景所應(yīng)用,這里例舉兩個(gè)實(shí)際工作中的例子幫助大家理解

生產(chǎn)者-消費(fèi)者模式

生產(chǎn)者-消費(fèi)者模式是一個(gè)典型的多線程并發(fā)寫作模式,生產(chǎn)者和消費(fèi)者中間需要一個(gè)容器來解決強(qiáng)耦合關(guān)系,生產(chǎn)者向容器放數(shù)據(jù),消費(fèi)者消費(fèi)容器數(shù)據(jù)

生產(chǎn)者-消費(fèi)者實(shí)現(xiàn)有多種方式

Object 類中的 wait、notify、notifyAll

Lock 中 Condition 的 await、signal、signalAll

BlockingQueue

阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模型代碼如下:

  1. @Slf4j 
  2. public class BlockingQueueTest { 
  3.  
  4.     private static final int MAX_NUM = 10; 
  5.     private static final BlockingQueue<String> QUEUE = new LinkedBlockingQueue<>(MAX_NUM); 
  6.  
  7.     public void produce(String str) { 
  8.         try { 
  9.             QUEUE.put(str); 
  10.             log.info("  🔥🔥🔥 隊(duì)列放入元素 :: {}, 隊(duì)列元素?cái)?shù)量 :: {}", str, QUEUE.size()); 
  11.         } catch (InterruptedException ie) { 
  12.             // ignore 
  13.         } 
  14.     } 
  15.  
  16.     public String consume() { 
  17.         String str = null
  18.         try { 
  19.             str = QUEUE.take(); 
  20.             log.info("  🔥🔥🔥 隊(duì)列移出元素 :: {}, 隊(duì)列元素?cái)?shù)量 :: {}", str, QUEUE.size()); 
  21.         } catch (InterruptedException ie) { 
  22.             // ignore 
  23.         } 
  24.         return str; 
  25.     } 
  26.  
  27.     public static void main(String[] args) { 
  28.         BlockingQueueTest queueTest = new BlockingQueueTest(); 
  29.         for (int i = 0; i < 5; i++) { 
  30.             int finalI = i; 
  31.             new Thread(() -> { 
  32.                 String str = "元素-"
  33.                 while (true) { 
  34.                     queueTest.produce(str + finalI); 
  35.                 } 
  36.             }).start(); 
  37.         } 
  38.         for (int i = 0; i < 5; i++) { 
  39.             new Thread(() -> { 
  40.                 while (true) { 
  41.                     queueTest.consume(); 
  42.                 } 
  43.             }).start(); 
  44.         } 
  45.     } 

線程池應(yīng)用

阻塞隊(duì)列在線程池中的具體應(yīng)用屬于是生產(chǎn)者-消費(fèi)者的實(shí)際場景

  1. 線程池在 Java 應(yīng)用里的重要性不言而喻,這里簡要說下線程池的運(yùn)行原理
  2. 線程池線程數(shù)量小于核心線程數(shù)執(zhí)行新增核心線程操作
  3. 線程池線程數(shù)量大于或等于核心線程數(shù)時(shí),將任務(wù)存放阻塞隊(duì)列

滿足線程池中線程數(shù)大于或等于核心線程數(shù)并且阻塞隊(duì)列已滿, 線程池創(chuàng)建非核心線程

重點(diǎn)在于第二點(diǎn),當(dāng)線程池核心線程都在運(yùn)行任務(wù)時(shí),會(huì)把任務(wù)存放阻塞隊(duì)列中。線程池源碼如下:

  1. if (isRunning(c) && workQueue.offer(command)) {} 

看到使用的 offer 方法,通過上面講述,如果阻塞隊(duì)列已滿返回 false。那何時(shí)進(jìn)行消費(fèi)隊(duì)列中的元素呢。涉及線程池中線程執(zhí)行過程原理,這里簡單說明

線程池內(nèi)線程執(zhí)行任務(wù)有兩種方式,一種是創(chuàng)建核心線程時(shí) 自帶 的任務(wù),另一種就是從阻塞隊(duì)列獲取

當(dāng)核心線程執(zhí)行一次任務(wù)后,其實(shí)和非核心線程就沒什么區(qū)別了

線程池獲取阻塞隊(duì)列任務(wù)使用了兩種 API,分別是 poll 和 take

  1. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); 

Q:為啥要用兩個(gè) API?一個(gè)不香么?

A:take 是為了要維護(hù)線程池內(nèi)核心線程的重要手段,如果獲取不到任務(wù),線程被掛起,等待下一次任務(wù)添加

至于帶時(shí)間的 pool 則是為了回收非核心線程準(zhǔn)備的

結(jié)言LBQ 阻塞隊(duì)列到這里就講解完成了,總結(jié)下文章所講述的 LBQ 基本特征

LBQ 是基于鏈表實(shí)現(xiàn)的阻塞隊(duì)列,可以進(jìn)行讀寫并發(fā)執(zhí)行

LBQ 隊(duì)列容量可以自己設(shè)置,如果不設(shè)置默認(rèn) Integer 最大值,也可以稱為無界隊(duì)列

文章結(jié)合源碼,針對 LBQ 的入隊(duì)、出隊(duì)、查詢、刪除等操作進(jìn)行了詳細(xì)講解

LBQ 只是一個(gè)引子,更希望大家能夠通過文章 掌握阻塞隊(duì)列核心思想,繼而查看其它實(shí)現(xiàn)類的代碼,鞏固知識(shí)

 

小伙伴現(xiàn)在已經(jīng)知道 LBQ 是通過鎖的機(jī)制來實(shí)現(xiàn)并發(fā)安全控制,思考一下 不使用鎖,能否實(shí)現(xiàn)以及如何實(shí)現(xiàn)?

 

責(zé)任編輯:武曉燕 來源: 源碼興趣圈
相關(guān)推薦

2020-10-16 08:26:38

AQS通信協(xié)作

2023-12-15 09:45:21

阻塞接口

2025-04-22 08:32:50

2021-01-22 17:57:31

SQL數(shù)據(jù)庫函數(shù)

2024-10-14 12:34:08

2021-08-11 22:17:48

負(fù)載均衡LVS機(jī)制

2024-08-06 08:13:26

2019-07-23 11:01:57

Python同步異步

2024-06-21 09:27:05

2020-10-14 11:31:41

Docker

2021-01-13 14:42:36

GitHub代碼Java

2020-11-25 14:28:56

DelayedWork

2020-11-19 07:41:51

ArrayBlocki

2020-11-24 09:04:55

PriorityBlo

2020-11-20 06:22:02

LinkedBlock

2021-07-24 11:15:19

開發(fā)技能代碼

2020-07-08 08:07:23

高并發(fā)系統(tǒng)消息隊(duì)列

2020-11-03 10:32:48

回調(diào)函數(shù)模塊

2017-04-12 10:02:21

Java阻塞隊(duì)列原理分析

2025-01-14 00:00:00

Blocking隊(duì)列元素
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)