阻塞隊列—ArrayBlockingQueue源碼分析
前言

ArrayBlockingQueue 由數(shù)組支持的有界阻塞隊列,隊列基于數(shù)組實現(xiàn),容量大小在創(chuàng)建 ArrayBlockingQueue 對象時已經(jīng)定義好。 此隊列按照先進先出(FIFO)的原則對元素進行排序。支持公平鎖和非公平鎖,默認(rèn)采用非公平鎖。其數(shù)據(jù)結(jié)構(gòu)如下圖:

- 注:每一個線程在獲取鎖的時候可能都會排隊等待,如果在等待時間上,先獲取鎖的線程和請求一定先被滿足,那么這個鎖就是公平的。反之,這個鎖就是不公平的。公平的獲取鎖,也就是當(dāng)前等待時間最長的線程先獲取鎖
隊列創(chuàng)建
- BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(5);
應(yīng)用場景
在線程池中有比較多的應(yīng)用,生產(chǎn)者消費者場景。
- 先進先出隊列(隊列頭的是最先進隊的元素;隊列尾的是最后進隊的元素)
- 有界隊列(即初始化時指定的容量,就是隊列最大的容量,不會出現(xiàn)擴容,容量滿,則阻塞進隊操作;容量空,則阻塞出隊操作)
- 隊列不支持空元素
- 公平性 (fairness)可以在構(gòu)造函數(shù)中指定。
此類支持對等待的生產(chǎn)者線程和使用者線程進行排序的可選公平策略。默認(rèn)情況下,不保證是這種排序。然而,通過在構(gòu)造函數(shù)將公平性 (fairness) 設(shè)置為 true 而構(gòu)造的隊列允許按照 FIFO 順序訪問線程。公平性通常會降低吞吐量,但也減少了可變性和避免了“不平衡性”。
工作原理
ArrayBlockingQueue是對BlockingQueue的一個數(shù)組實現(xiàn),它使用一把全局的鎖并行對queue的讀寫操作,同時使用兩個Condition阻塞容量為空時的取操作和容量滿時的寫操作。
基于 ReentrantLock 保證線程安全,根據(jù) Condition 實現(xiàn)隊列滿時的阻塞。
- final ReentrantLock lock;
- private final Condition notEmpty;
- private final Condition notFull;
Lock的作用是提供獨占鎖機制,來保護競爭資源;而Condition是為了更加精細(xì)地對鎖進行控制,它依賴于Lock,通過某個條件對多線程進行控制。
notEmpty表示“鎖的非空條件”。當(dāng)某線程想從隊列中取數(shù)據(jù)時,而此時又沒有數(shù)據(jù),則該線程通過notEmpty.await()進行等待;當(dāng)其它線程向隊列中插入了元素之后,就調(diào)用notEmpty.signal()喚醒“之前通過notEmpty.await()進入等待狀態(tài)的線程”。 同理,notFull表示“鎖的滿條件”。當(dāng)某線程想向隊列中插入元素,而此時隊列已滿時,該線程等待;當(dāng)其它線程從隊列中取出元素之后,就喚醒該等待的線程。
- 試圖向已滿隊列中放入元素會導(dǎo)致放入操作受阻塞,直到BlockingQueue里有新的喚空間才會被醒繼續(xù)操作; 試圖從空隊列中檢索元素將導(dǎo)致類似阻塞,直到BlocingkQueue進了新貨才會被喚醒。
源碼分析
以下源碼分析基于JDK1.8
定義
ArrayBlockingQueue的類繼承關(guān)系如下:

其包含的方法定義如下:

成員屬性
- /** 真正存入數(shù)據(jù)的數(shù)組 */
- final Object[] items;
- /** take,poll,peek or remove 的下一個索引 */
- int takeIndex;
- /** put,offer,or add 下一個索引 */
- int putIndex;
- /** 隊列中元素個數(shù) */
- int count;
- /** 可重入鎖 */
- final ReentrantLock lock;
- /** 如果數(shù)組是空的,在該Condition上等待 */
- private final Condition notEmpty;
- /** 如果數(shù)組是滿的,在該Condition上等待 */
- private final Condition notFull;
- /** 遍歷器實現(xiàn) */
- transient Itrs itrs = null;
構(gòu)造函數(shù)
- /**
- * 構(gòu)造函數(shù),設(shè)置隊列的初始容量
- */
- public ArrayBlockingQueue(int capacity) {
- this(capacity, false);
- }
- /**
- * 構(gòu)造函數(shù),
- * capacity and the specified access policy.
- *
- * @param capacity 設(shè)置數(shù)組大小
- * @param fair 設(shè)置是否為公平鎖
- * @throws IllegalArgumentException if {@code capacity < 1}
- */
- public ArrayBlockingQueue(int capacity, boolean fair) {
- if (capacity <= 0)
- throw new IllegalArgumentException();
- this.items = new Object[capacity];
- // 是否為公平鎖,如果是的話,那么先到的線程先獲得鎖對象
- // 否則,由操作系統(tǒng)調(diào)度由哪個線程獲得鎖,一般為false,性能會比較高
- lock = new ReentrantLock(fair);
- notEmpty = lock.newCondition();
- notFull = lock.newCondition();
- }
- /**
- * 構(gòu)造函數(shù),帶有初始內(nèi)容的隊列
- */
- public ArrayBlockingQueue(int capacity, boolean fair,
- Collection<? extends E> c) {
- this(capacity, fair);
- final ReentrantLock lock = this.lock;
- //加鎖的目的是為了其他CPU能夠立即看到修改
- //加鎖和解鎖底層都是CAS,會強制修改寫回主存,對其他CPU可見
- lock.lock(); // 要給數(shù)組設(shè)置內(nèi)容,先上鎖
- try {
- int i = 0;
- try {
- for (E e : c) {
- checkNotNull(e);
- items[i++] = e; // 依次拷貝內(nèi)容
- }
- } catch (ArrayIndexOutOfBoundsException ex) {
- throw new IllegalArgumentException();
- }
- count = i;
- putIndex = (i == capacity) ? 0 : i; // 如果 putIndex大于數(shù)組大小,那么從0重寫開始
- } finally {
- lock.unlock(); // 最后一定要釋放鎖
- }
- }
入隊方法
add / offer / put,這三個方法都是往隊列中添加元素,說明如下:
- add方法依賴于offer方法,如果隊列滿了則拋出異常,否則添加成功返回true;
- offer方法有兩個重載版本,只有一個參數(shù)的版本,如果隊列滿了就返回false,否則加入到隊列中,返回true,add方法就是調(diào)用此版本的offer方法;另一個帶時間參數(shù)的版本,如果隊列滿了則等待,可指定等待的時間,如果這期間中斷了則拋出異常,如果等待超時了則返回false,否則加入到隊列中返回true;
- put方法跟帶時間參數(shù)的offer方法邏輯一樣,不過沒有等待的時間限制,會一直等待直到隊列有空余位置了,再插入到隊列中,返回true
- /**
- * 添加一個元素,其實super.add里面調(diào)用了offer方法
- */
- public boolean add(E e) {
- return super.add(e);
- }
- /**
- * 加入成功返回 true,否則返回 false
- */
- public boolean offer(E e) {
- // 創(chuàng)建插入的元素是否為null,是的話拋出NullPointerException異常
- checkNotNull(e);
- // 獲取“該阻塞隊列的獨占鎖”
- final ReentrantLock lock = this.lock;
- lock.lock(); // 上鎖
- try {
- // 如果隊列已滿,則返回false。
- if (count == items.length) // 超過數(shù)組的容量
- return false;
- else {
- // 如果隊列未滿,則插入e,并返回true。
- enqueue(e);
- return true;
- }
- } finally {
- // 釋放鎖
- lock.unlock();
- }
- }
- /**
- * 如果隊列已滿的話,就會等待
- */
- public void put(E e) throws InterruptedException {
- checkNotNull(e);
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly(); //和lock方法的區(qū)別是讓它在阻塞時可以拋出異常跳出
- try {
- while (count == items.length)
- notFull.await(); // 這里就是阻塞了,要注意:如果運行到這里,那么它會釋放上面的鎖,一直等到 notify
- enqueue(e);
- } finally {
- lock.unlock();
- }
- }
- /**
- * 帶有超時事件的插入方法,unit 表示是按秒、分、時哪一種
- */
- public boolean offer(E e, long timeout, TimeUnit unit)
- throws InterruptedException {
- checkNotNull(e);
- long nanos = unit.toNanos(timeout);
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- while (count == items.length) {
- if (nanos <= 0)
- return false;
- nanos = notFull.awaitNanos(nanos); // 帶有超時等待的阻塞方法
- }
- enqueue(e); // 入隊
- return true;
- } finally {
- lock.unlock();
- }
- }
出隊方法
poll / take / peek,這幾個方法都是獲取隊列頂?shù)脑?,具體說明如下:
- poll方法有兩個重載版本,第一個版本,如果隊列是空的,返回null,否則移除并返回隊列頭部元素;另一個帶時間參數(shù)的版本,如果棧為空則等待,可以指定等待的時間,如果等待超時了則返回null,如果被中斷了則拋出異常,否則移除并返回棧頂元素
- take方法同帶時間參數(shù)的poll方法,但是不能指定等待時間,會一直等待直到隊列中有元素為止,然后移除并返回棧頂元素
- peek方法只是返回隊列頭部元素,不移除
- // 實現(xiàn)的方法,如果當(dāng)前隊列為空,返回null
- public E poll() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- return (count == 0) ? null : dequeue();
- } finally {
- lock.unlock();
- }
- }
- // 實現(xiàn)的方法,如果當(dāng)前隊列為空,一直阻塞
- public E take() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- while (count == 0)
- notEmpty.await(); // 隊列為空,阻塞方法
- return dequeue();
- } finally {
- lock.unlock();
- }
- }
- // 帶有超時事件的取元素方法,否則返回null
- public E poll(long timeout, TimeUnit unit) throws InterruptedException {
- long nanos = unit.toNanos(timeout);
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- while (count == 0) {
- if (nanos <= 0)
- return null;
- nanos = notEmpty.awaitNanos(nanos); // 超時等待
- }
- return dequeue(); // 取得元素
- } finally {
- lock.unlock();
- }
- }
- // 只是看一個隊列最前面的元素,取出是不擅長隊列中原來的元素,隊列為空時返回null
- public E peek() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- return itemAt(takeIndex); // 隊列為空時返回null
- } finally {
- lock.unlock();
- }
- }
刪除元素方法
remove / clear /drainT,這三個方法用于從隊列中移除元素,具體說明如下:
- remove方法用于移除某個元素,如果棧為空或者沒有找到該元素則返回false,否則從棧中移除該元素;移除時,如果該元素位于棧頂則直接移除,如果位于棧中間,則需要將該元素后面的其他元素往前面挪動,移除后需要喚醒因為棧滿了而阻塞的線程
- clear方法用于整個棧,同時將takeIndex置為putIndex,保證棧中的元素先進先出;最后會喚醒最多count個線程,因為正常一個線程插入一個元素,如果喚醒超過count個線程,可能導(dǎo)致部分線程因為棧滿了又再次被阻塞
- drainTo方法有兩個重載版本,一個是不帶個數(shù),將所有的元素都移除并拷貝到指定的集合中;一個帶個數(shù),將指定個數(shù)的元素移除并拷貝到指定的集合中,兩者的底層實現(xiàn)都是同一個方法。移除后需要重置takeIndex和count,并喚醒最多移除個數(shù)的因為棧滿而阻塞的線程。
- /**
- * 從隊列中刪除一個元素的方法。刪除成功返回true,否則返回false
- */
- public boolean remove(Object o) {
- if (o == null) return false;
- final Object[] items = this.items;
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- if (count > 0) {
- final int putIndex = this.putIndex;
- int i = takeIndex;
- //從takeIndex開始往后遍歷直到等于putIndex
- do {
- if (o.equals(items[i])) {
- removeAt(i); // 真正刪除的方法
- return true;
- }
- //走到數(shù)組末尾了又從頭開始,put時也按照這個規(guī)則來
- if (++i == items.length)
- i = 0;
- } while (i != putIndex); // 一直不斷的循環(huán)取出來做判斷
- }
- //如果數(shù)組為空,返回false
- return false;
- } finally {
- lock.unlock();
- }
- }
- /**
- * 指定刪除索引上的元素.
- */
- void removeAt(final int removeIndex) {
- // assert lock.getHoldCount() == 1;
- // assert items[removeIndex] != null;
- // assert removeIndex >= 0 && removeIndex < items.length;
- final Object[] items = this.items;
- if (removeIndex == takeIndex) {
- //如果移除的就是棧頂?shù)脑?nbsp;
- items[takeIndex] = null;
- if (++takeIndex == items.length)
- takeIndex = 0;
- //元素個數(shù)減1
- count--;
- if (itrs != null)
- itrs.elementDequeued();
- } else {
- // an "interior" remove
- // 如果移除的是棧中間的某個元素,需要將該元素后面的元素往前挪動
- final int putIndex = this.putIndex;
- for (int i = removeIndex;;) {
- int next = i + 1;
- //到數(shù)組末尾了,從頭開始
- if (next == items.length)
- next = 0;
- if (next != putIndex) {
- //將后面一個元素復(fù)制到前面來
- items[i] = items[next];
- i = next;
- } else {
- //如果下一個元素的索引等于putIndex,說明i就是棧中最后一個元素了,直接將該元素置為null
- items[i] = null;
- //重置putIndex為i
- this.putIndex = i;
- break;
- }
- }
- count--;
- if (itrs != null)
- //通知itrs節(jié)點移除了
- itrs.removedAt(removeIndex);
- }
- //喚醒因為棧滿了而等待的線程
- notFull.signal(); // 有一個元素刪除成功,那肯定隊列不滿
- }
- /**
- * 清空隊列
- */
- public void clear() {
- final Object[] items = this.items;
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- int k = count;
- if (k > 0) {
- final int putIndex = this.putIndex;
- int i = takeIndex;
- //從takeIndex開始遍歷直到i等于putIndex,將數(shù)組元素置為null
- do {
- items[i] = null;
- if (++i == items.length)
- i = 0;
- } while (i != putIndex);
- //注意此處沒有將這兩個index置為0,只是讓他們相等,因為只要相等就可以實現(xiàn)棧先進先出了
- takeIndex = putIndex;
- count = 0;
- if (itrs != null)
- itrs.queueIsEmpty();
- //如果有因為棧滿了而等待的線程,則將其喚醒
- //注意這里沒有使用signalAll而是通過for循環(huán)來signal多次,單純從喚醒線程來看是可以使用signalAll的,效果跟這里的for循環(huán)是一樣的
- //如果有等待的線程,說明count就是當(dāng)前線程的最大容量了,這里清空了,最多只能put count次,一個線程只能put 1次,只喚醒最多count個線程就避免了
- //線程被喚醒后再次因為棧滿了而阻塞
- for (; k > 0 && lock.hasWaiters(notFull); k--)
- notFull.signal();
- }
- } finally {
- lock.unlock();
- }
- }
- /**
- * 取出所有元素到集合
- */
- public int drainTo(Collection<? super E> c) {
- return drainTo(c, Integer.MAX_VALUE);
- }
- /**
- * 取出所有元素到集合
- */
- public int drainTo(Collection<? super E> c, int maxElements) {
- //校驗參數(shù)合法
- checkNotNull(c);
- if (c == this)
- throw new IllegalArgumentException();
- if (maxElements <= 0)
- return 0;
- final Object[] items = this.items;
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- //取兩者之間的最小值
- int n = Math.min(maxElements, count);
- int take = takeIndex;
- int i = 0;
- try {
- //從takeIndex開始遍歷,取出元素然后添加到c中,直到滿足個數(shù)要求為止
- while (i < n) {
- @SuppressWarnings("unchecked")
- E x = (E) items[take];
- c.add(x);
- items[take] = null;
- if (++take == items.length)
- take = 0;
- i++;
- }
- return n;
- } finally {
- // Restore invariants even if c.add() threw
- if (i > 0) {
- //取完了,修改count減去i
- count -= i;
- takeIndex = take;
- if (itrs != null) {
- if (count == 0)
- //通知itrs ??樟?nbsp;
- itrs.queueIsEmpty();
- else if (i > take)
- //說明take中間變成0了,通知itrs
- itrs.takeIndexWrapped();
- }
- //喚醒在因為棧滿而等待的線程,最多喚醒i個,同上避免線程被喚醒了因為棧又滿了而阻塞
- for (; i > 0 && lock.hasWaiters(notFull); i--)
- notFull.signal();
- }
- }
- } finally {
- lock.unlock();
- }
- }
iterator / Itr / Itrs
Itr和Itrs都是ArrayBlockingQueue的兩個內(nèi)部類,如下:

iterator方法返回一個迭代器實例,用于實現(xiàn)for循環(huán)遍歷和部分Collection接口,該方法的實現(xiàn)如下:
- public Iterator<E> iterator() {
- return new Itr();
- }
- Itr() {
- // assert lock.getHoldCount() == 0;
- lastRet = NONE;
- final ReentrantLock lock = ArrayBlockingQueue.this.lock;
- lock.lock();
- try {
- if (count == 0) {
- //NONE和DETACHED都是常量
- cursor = NONE;
- nextIndex = NONE;
- prevTakeIndex = DETACHED;
- } else {
- //初始化各屬性
- final int takeIndex = ArrayBlockingQueue.this.takeIndex;
- prevTakeIndex = takeIndex;
- nextItem = itemAt(nextIndex = takeIndex);
- cursor = incCursor(takeIndex);
- if (itrs == null) {
- itrs = new Itrs(this);
- } else {
- //初始化Itrs,將當(dāng)前線程注冊到Itrs
- itrs.register(this); // in this order
- itrs.doSomeSweeping(false);
- }
- prevCycles = itrs.cycles;
- // assert takeIndex >= 0;
- // assert prevTakeIndex == takeIndex;
- // assert nextIndex >= 0;
- // assert nextItem != null;
- }
- } finally {
- lock.unlock();
- }
- }
- Itrs(Itr initial) {
- register(initial);
- }
- //根據(jù)index計算cursor
- private int incCursor(int index) {
- // assert lock.getHoldCount() == 1;
- if (++index == items.length)
- index = 0;
- if (index == putIndex)
- index = NONE;
- return index;
- }
- /**
- * 創(chuàng)建一個新的Itr實例時,會調(diào)用此方法將該實例添加到Node鏈表中
- */
- void register(Itr itr) {
- //創(chuàng)建一個新節(jié)點將其插入到head節(jié)點的前面
- head = new Node(itr, head);
- }
小結(jié)
ArrayBlockingQueue是一個阻塞隊列,內(nèi)部由ReentrantLock來實現(xiàn)線程安全,由Condition的await和signal來實現(xiàn)等待喚醒的功能。它的數(shù)據(jù)結(jié)構(gòu)是數(shù)組,準(zhǔn)確地說是一個循環(huán)數(shù)組(可以類比一個圓環(huán)),所有的下標(biāo)在到達最大長度時自動從0繼續(xù)開始。
PS:以上代碼提交在 Github :
https://github.com/Niuh-Study/niuh-juc-final.git