沒研究過SynchronousQueue源碼,就別寫精通線程池
引言
前面文章我們講解了ArrayBlockingQueue和LinkedBlockingQueue源碼,這篇文章開始講解SynchronousQueue源碼。從名字上就能看到ArrayBlockingQueue是基于數(shù)組實現(xiàn)的,而LinkedBlockingQueue是基于鏈表實現(xiàn),而SynchronousQueue是基于什么數(shù)據(jù)結(jié)構(gòu)實現(xiàn)的,看不來。
無論是ArrayBlockingQueue還是LinkedBlockingQueue都是起到緩沖隊列的作用,當消費者的消費速度跟不上時,任務就在隊列中堆積,需要等待消費者慢慢消費。
如果我們想要自己的任務快速執(zhí)行,不要積壓在隊列中,該怎么辦? 今天的主角SynchronousQueue就派上用場了。
SynchronousQueue被稱為同步隊列,當生產(chǎn)者往隊列中放元素的時候,必須等待消費者把這個元素取走,否則一直阻塞。消費者取元素的時候,同理也必須等待生產(chǎn)者放隊列中放元素。
由于SynchronousQueue實現(xiàn)了BlockingQueue接口,而BlockingQueue接口中定義了幾組放數(shù)據(jù)和取數(shù)據(jù)的方法,來滿足不同的場景。
操作 | 拋出異常 | 返回特定值 | 一直阻塞 | 阻塞指定時間 |
放數(shù)據(jù) | add() | offer() | put() | offer(e, time, unit) |
取數(shù)據(jù)(同時刪除數(shù)據(jù)) | remove() | poll() | take() | poll(time, unit) |
取數(shù)據(jù)(不刪除) | element() | peek() | 不支持 | 不支持 |
SynchronousQueue也會有針對這幾組放數(shù)據(jù)和取數(shù)據(jù)方法的具體實現(xiàn)。
Java線程池中的帶緩存的線程池就是基于SynchronousQueue實現(xiàn)的:
// 創(chuàng)建帶緩存的線程池
ExecutorService executorService = Executors.newCachedThreadPool();
對應的源碼實現(xiàn):
// 底層使用SynchronousQueue隊列處理任務
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
類結(jié)構(gòu)
先看一下SynchronousQueue類里面有哪些屬性:
public class SynchronousQueue<E>
extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 轉(zhuǎn)接器(棧和隊列的父類)
*/
abstract static class Transferer<E> {
/**
* 轉(zhuǎn)移(put和take都用這一個方法)
*
* @param e 元素
* @param timed 是否超時
* @param nanos 納秒
*/
abstract E transfer(E e, boolean timed, long nanos);
}
/**
* 棧實現(xiàn)類
*/
static final class TransferStack<E> extends Transferer<E> {
}
/**
* 隊列實現(xiàn)類
*/
static final class TransferQueue<E> extends Transferer<E> {
}
}
SynchronousQueue底層是基于Transferer抽象類實現(xiàn)的,放數(shù)據(jù)和取數(shù)據(jù)的邏輯都耦合在transfer()方法中。而Transferer抽象類又有兩個實現(xiàn)類,分別是基于棧結(jié)構(gòu)實現(xiàn)和基于隊列實現(xiàn)。
初始化
SynchronousQueue常用的初始化方法有兩個:
- 無參構(gòu)造方法
- 指定容量大小的有參構(gòu)造方法
/**
* 無參構(gòu)造方法
*/
BlockingQueue<Integer> blockingQueue1 = new SynchronousQueue<>();
/**
* 有參構(gòu)造方法,指定是否使用公平鎖(默認使用非公平鎖)
*/
BlockingQueue<Integer> blockingQueue2 = new SynchronousQueue<>(true);
再看一下對應的源碼實現(xiàn):
/**
* 無參構(gòu)造方法
*/
public SynchronousQueue() {
this(false);
}
/**
* 有參構(gòu)造方法,指定是否使用公平鎖
*/
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
可以看出SynchronousQueue的無參構(gòu)造方法默認使用的非公平策略,有參構(gòu)造方法可以指定使用公平策略。操作策略:
- 公平策略,基于隊列實現(xiàn)的是公平策略,先進先出。
- 非公平策略,基于棧實現(xiàn)的是非公平策略,先進后出。
棧實現(xiàn)
棧的類結(jié)構(gòu)
/**
* 棧實現(xiàn)
*/
static final class TransferStack<E> extends Transferer<E> {
/**
* 頭節(jié)點(也是棧頂節(jié)點)
*/
volatile SNode head;
/**
* 棧節(jié)點類
*/
static final class SNode {
/**
* 當前操作的線程
*/
volatile Thread waiter;
/**
* 節(jié)點值(取數(shù)據(jù)的時候,該字段為null)
*/
Object item;
/**
* 節(jié)點模式(也叫操作類型)
*/
int mode;
/**
* 后繼節(jié)點
*/
volatile SNode next;
/**
* 匹配到的節(jié)點
*/
volatile SNode match;
}
}
節(jié)點模式有以下三種:
類型值 | 類型描述 | 作用 |
0 | REQUEST | 表示取數(shù)據(jù) |
1 | DATA | 表示放數(shù)據(jù) |
2 | FULFILLING | 表示正在執(zhí)行中(比如取數(shù)據(jù)的線程正在匹配放數(shù)據(jù)的線程) |
圖片
棧的transfer方法實現(xiàn)
transfer()方法中,把放數(shù)據(jù)和取數(shù)據(jù)的邏輯耦合在一塊了,邏輯有點繞,不過核心邏輯就四點,把握住就能豁然開朗。其實就是從棧頂壓入,從棧頂彈出。
詳細流程如下:
- 首先判斷當前線程的操作類型與棧頂節(jié)點的操作類型是否一致,比如都是放數(shù)據(jù),或者都是取數(shù)據(jù)。
- 如果是一致,把當前操作包裝成SNode節(jié)點,壓入棧頂,并掛起當前線程。
- 如果不一致,表示相互匹配(比如當前操作是放數(shù)據(jù),而棧頂節(jié)點是取數(shù)據(jù),或者相反)。然后也把當前操作包裝成SNode節(jié)點壓入棧頂,并使用tryMatch()方法匹配兩個節(jié)點,匹配成功后,彈出兩個這兩個節(jié)點,并喚醒棧頂節(jié)點線程,同時把數(shù)據(jù)傳遞給棧頂節(jié)點線程,最后返回。
- 棧頂節(jié)點線程被喚醒,繼續(xù)執(zhí)行,然后返回傳遞過來的數(shù)據(jù)。
/**
* 轉(zhuǎn)移(put和take都用這一個方法)
*
* @param e 元素(取數(shù)據(jù)的時候,元素為null)
* @param timed 是否超時
* @param nanos 納秒
*/
E transfer(E e, boolean timed, long nanos) {
SNode s = null;
// 1. e為null,表示要取數(shù)據(jù),否則是放數(shù)據(jù)
int mode = (e == null) ? REQUEST : DATA;
for (; ; ) {
SNode h = head;
// 2. 如果本次操作跟棧頂節(jié)點模式相同(都是取數(shù)據(jù),或者都是放數(shù)據(jù)),就把本次操作包裝成SNode,壓入棧頂
if (h == null || h.mode == mode) {
if (timed && nanos <= 0) {
if (h != null && h.isCancelled()) {
casHead(h, h.next);
} else {
return null;
}
// 3. 把本次操作包裝成SNode,壓入棧頂,并掛起當前線程
} else if (casHead(h, s = snode(s, e, h, mode))) {
// 4. 掛起當前線程
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) {
clean(s);
return null;
}
// 5. 當前線程被喚醒后,如果棧頂有了新節(jié)點,就刪除當前節(jié)點
if ((h = head) != null && h.next == s) {
casHead(h, s.next);
}
return (E) ((mode == REQUEST) ? m.item : s.item);
}
// 6. 如果棧頂節(jié)點類型跟本次操作不同,并且模式不是FULFILLING類型
} else if (!isFulfilling(h.mode)) {
if (h.isCancelled()) {
casHead(h, h.next);
}
// 7. 把本次操作包裝成SNode(類型是FULFILLING),壓入棧頂
else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) {
// 8. 使用死循環(huán),直到匹配到對應的節(jié)點
for (; ; ) {
// 9. 遍歷下個節(jié)點
SNode m = s.next;
// 10. 如果節(jié)點是null,表示遍歷到末尾,設置棧頂節(jié)點是null,結(jié)束。
if (m == null) {
casHead(s, null);
s = null;
break;
}
SNode mn = m.next;
// 11. 如果棧頂?shù)暮罄^節(jié)點跟棧頂節(jié)點匹配成功,就刪除這兩個節(jié)點,結(jié)束。
if (m.tryMatch(s)) {
casHead(s, mn);
return (E) ((mode == REQUEST) ? m.item : s.item);
} else {
// 12. 如果沒有匹配成功,就刪除棧頂?shù)暮罄^節(jié)點,繼續(xù)匹配
s.casNext(m, mn);
}
}
}
} else {
// 13. 如果棧頂節(jié)點類型跟本次操作不同,并且是FULFILLING類型,
// 就再執(zhí)行一遍上面第8步for循環(huán)中的邏輯(很少概率出現(xiàn))
SNode m = h.next;
if (m == null) {
casHead(h, null);
} else {
SNode mn = m.next;
if (m.tryMatch(h)) {
casHead(h, mn);
} else {
h.casNext(m, mn);
}
}
}
}
}
不用關心細枝末節(jié),把握住代碼核心邏輯即可。 再看一下第4步,掛起線程的代碼邏輯: 核心邏輯就兩條:
- 第6步,掛起當前線程
- 第3步,當前線程被喚醒后,直接返回傳遞過來的match節(jié)點
/**
* 等待執(zhí)行
*
* @param s 節(jié)點
* @param timed 是否超時
* @param nanos 超時時間
*/
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
// 1. 計算超時時間
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 2. 計算自旋次數(shù)
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (; ; ) {
if (w.isInterrupted())
s.tryCancel();
// 3. 如果已經(jīng)匹配到其他節(jié)點,直接返回
SNode m = s.match;
if (m != null)
return m;
if (timed) {
// 4. 超時時間遞減
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
// 5. 自旋次數(shù)減一
if (spins > 0)
spins = shouldSpin(s) ? (spins - 1) : 0;
else if (s.waiter == null)
s.waiter = w;
// 6. 開始掛起當前線程
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
再看一下匹配節(jié)點的tryMatch()方法邏輯: 作用就是喚醒棧頂節(jié)點,并當前節(jié)點傳遞給棧頂節(jié)點。
/**
* 匹配節(jié)點
*
* @param s 當前節(jié)點
*/
boolean tryMatch(SNode s) {
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) {
waiter = null;
// 1. 喚醒棧頂節(jié)點
LockSupport.unpark(w);
}
return true;
}
// 2. 把當前節(jié)點傳遞給棧頂節(jié)點
return match == s;
}
隊列實現(xiàn)
隊列的類結(jié)構(gòu)
/**
* 隊列實現(xiàn)
*/
static final class TransferQueue<E> extends Transferer<E> {
/**
* 頭節(jié)點
*/
transient volatile QNode head;
/**
* 尾節(jié)點
*/
transient volatile QNode tail;
/**
* 隊列節(jié)點類
*/
static final class QNode {
/**
* 當前操作的線程
*/
volatile Thread waiter;
/**
* 節(jié)點值
*/
volatile Object item;
/**
* 后繼節(jié)點
*/
volatile QNode next;
/**
* 當前節(jié)點是否為數(shù)據(jù)節(jié)點
*/
final boolean isData;
}
}
可以看出TransferQueue隊列是使用帶有頭尾節(jié)點的單鏈表實現(xiàn)的。 還有一點需要提一下,TransferQueue默認構(gòu)造方法,會初始化頭尾節(jié)點,默認是空節(jié)點。
/**
* TransferQueue默認的構(gòu)造方法
*/
TransferQueue() {
QNode h = new QNode(null, false);
head = h;
tail = h;
}
隊列的transfer方法實現(xiàn)
隊列使用的公平策略,體現(xiàn)在,每次操作的時候,都是從隊尾壓入,從隊頭彈出。 詳細流程如下:
- 首先判斷當前線程的操作類型與隊尾節(jié)點的操作類型是否一致,比如都是放數(shù)據(jù),或者都是取數(shù)據(jù)。
- 如果是一致,把當前操作包裝成QNode節(jié)點,壓入隊尾,并掛起當前線程。
- 如果不一致,表示相互匹配(比如當前操作是放數(shù)據(jù),而隊尾節(jié)點是取數(shù)據(jù),或者相反)。然后在隊頭節(jié)點開始遍歷,找到與當前操作類型相匹配的節(jié)點,把當前操作的節(jié)點值傳遞給這個節(jié)點,并彈出這個節(jié)點,喚醒這個節(jié)點的線程,最后返回。
- 隊頭節(jié)點線程被喚醒,繼續(xù)執(zhí)行,然后返回傳遞過來的數(shù)據(jù)。
/**
* 轉(zhuǎn)移(put和take都用這一個方法)
*
* @param e 元素(取數(shù)據(jù)的時候,元素為null)
* @param timed 是否超時
* @param nanos 超時時間
*/
E transfer(E e, boolean timed, long nanos) {
QNode s = null;
// 1. e不為null,表示要放數(shù)據(jù),否則是取數(shù)據(jù)
boolean isData = (e != null);
for (; ; ) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) {
continue;
}
// 2. 如果本次操作跟隊尾節(jié)點模式相同(都是取數(shù)據(jù),或者都是放數(shù)據(jù)),就把本次操作包裝成QNode,壓入隊尾
if (h == t || t.isData == isData) {
QNode tn = t.next;
if (t != tail) {
continue;
}
if (tn != null) {
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) {
return null;
}
// 3. 把本次操作包裝成QNode,壓入隊尾
if (s == null) {
s = new QNode(e, isData);
}
if (!t.casNext(null, s)) {
continue;
}
advanceTail(t, s);
// 4. 掛起當前線程
Object x = awaitFulfill(s, e, timed, nanos);
// 5. 當前線程被喚醒后,返回返回傳遞過來的節(jié)點值
if (x == s) {
clean(t, s);
return null;
}
if (!s.isOffList()) {
advanceHead(t, s);
if (x != null) {
s.item = s;
}
s.waiter = null;
}
return (x != null) ? (E) x : e;
} else {
// 6. 如果本次操作跟隊尾節(jié)點模式不同,就從隊頭結(jié)點開始遍歷,找到模式相匹配的節(jié)點
QNode m = h.next;
if (t != tail || m == null || h != head) {
continue;
}
Object x = m.item;
// 7. 把當前節(jié)點值e傳遞給匹配到的節(jié)點m
if (isData == (x != null) || x == m ||
!m.casItem(x, e)) {
advanceHead(h, m);
continue;
}
// 8. 彈出隊頭節(jié)點,并喚醒節(jié)點m
advanceHead(h, m);
LockSupport.unpark(m.waiter);
return (x != null) ? (E) x : e;
}
}
}
看完了底層源碼,再看一下上層包裝好的工具方法。
放數(shù)據(jù)源碼
放數(shù)據(jù)的方法有四個:
操作 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞一段時間 |
放數(shù)據(jù) | add() | offer() | put() | offer(e, time, unit) |
offer方法源碼
先看一下offer()方法源碼,其他放數(shù)據(jù)方法邏輯也是大同小異,底層都是調(diào)用的transfer()方法實現(xiàn)。 如果沒有匹配到合適的節(jié)點,offer()方法會直接返回false,表示插入失敗。
/**
* offer方法入口
*
* @param e 元素
* @return 是否插入成功
*/
public boolean offer(E e) {
// 1. 判空,傳參不允許為null
if (e == null) {
throw new NullPointerException();
}
// 2. 調(diào)用底層transfer方法
return transferer.transfer(e, true, 0) != null;
}
再看一下另外三個添加元素方法源碼:
add方法源碼
如果沒有匹配到合適的節(jié)點,add()方法會拋出異常,底層基于offer()實現(xiàn)。
/**
* add方法入口
*
* @param e 元素
* @return 是否添加成功
*/
public boolean add(E e) {
if (offer(e)) {
return true;
} else {
throw new IllegalStateException("Queue full");
}
}
put方法源碼
如果沒有匹配到合適的節(jié)點,put()方法會一直阻塞,直到有其他線程取走數(shù)據(jù),才能添加成功。
/**
* put方法入口
*
* @param e 元素
*/
public void put(E e) throws InterruptedException {
// 1. 判空,傳參不允許為null
if (e == null) {
throw new NullPointerException();
}
// 2. 調(diào)用底層transfer方法
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
offer(e, time, unit)源碼
再看一下offer(e, time, unit)方法源碼,如果沒有匹配到合適的節(jié)點, offer(e, time, unit)方法會阻塞一段時間,然后返回false。
/**
* offer方法入口
*
* @param e 元素
* @param timeout 超時時間
* @param unit 時間單位
* @return 是否添加成功
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// 1. 判空,傳參不允許為null
if (e == null) {
throw new NullPointerException();
}
// 2. 調(diào)用底層transfer方法
if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) {
return true;
}
if (!Thread.interrupted()) {
return false;
}
throw new InterruptedException();
}
彈出數(shù)據(jù)源碼
彈出數(shù)據(jù)(取出數(shù)據(jù)并刪除)的方法有四個:
操作 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞一段時間 |
取數(shù)據(jù)(同時刪除數(shù)據(jù)) | remove() | poll() | take() | poll(time, unit) |
poll方法源碼
看一下poll()方法源碼,其他方取數(shù)據(jù)法邏輯大同小異,底層都是調(diào)用的transfer方法實現(xiàn)。 poll()方法在彈出元素的時候,如果沒有匹配到合適的節(jié)點,直接返回null,表示彈出失敗。
/**
* poll方法入口
*/
public E poll() {
// 調(diào)用底層transfer方法
return transferer.transfer(null, true, 0);
}
remove方法源碼
再看一下remove()方法源碼,如果沒有匹配到合適的節(jié)點,remove()會拋出異常。
/**
* remove方法入口
*/
public E remove() {
// 1. 直接調(diào)用poll方法
E x = poll();
// 2. 如果取到數(shù)據(jù),直接返回,否則拋出異常
if (x != null) {
return x;
} else {
throw new NoSuchElementException();
}
}
take方法源碼
再看一下take()方法源碼,如果沒有匹配到合適的節(jié)點,take()方法就一直阻塞,直到被喚醒。
/**
* take方法入口
*/
public E take() throws InterruptedException {
// 調(diào)用底層transfer方法
E e = transferer.transfer(null, false, 0);
if (e != null) {
return e;
}
Thread.interrupted();
throw new InterruptedException();
}
poll(time, unit)源碼
再看一下poll(time, unit)方法源碼,如果沒有匹配到合適的節(jié)點, poll(time, unit)方法會阻塞指定時間,然后然后null。
/**
* poll方法入口
*
* @param timeout 超時時間
* @param unit 時間單位
* @return 元素
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 調(diào)用底層transfer方法
E e = transferer.transfer(null, true, unit.toNanos(timeout));
if (e != null || !Thread.interrupted()) {
return e;
}
throw new InterruptedException();
}
查看數(shù)據(jù)源碼
再看一下查看數(shù)據(jù)源碼,查看數(shù)據(jù),并不刪除數(shù)據(jù)。
操作 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞一段時間 |
取數(shù)據(jù)(不刪除) | element() | peek() | 不支持 | 不支持 |
peek方法源碼
先看一下peek()方法源碼,直接返回null,SynchronousQueue不支持這種操作。
/**
* peek方法入口
*/
public E peek() {
return null;
}
element方法源碼
再看一下element()方法源碼,底層調(diào)用的也是peek()方法,也是不支持這種操作。
/**
* element方法入口
*/
public E element() {
// 1. 調(diào)用peek方法查詢數(shù)據(jù)
E x = peek();
// 2. 如果查到數(shù)據(jù),直接返回
if (x != null) {
return x;
} else {
// 3. 如果沒找到,則拋出異常
throw new NoSuchElementException();
}
}
總結(jié)
這篇文章講解了SynchronousQueue阻塞隊列的核心源碼,了解到SynchronousQueue隊列具有以下特點:
- SynchronousQueue實現(xiàn)了BlockingQueue接口,提供了四組放數(shù)據(jù)和讀數(shù)據(jù)的方法,來滿足不同的場景。
- SynchronousQueue底層有兩種實現(xiàn)方式,分別是基于棧實現(xiàn)非公平策略,以及基于隊列實現(xiàn)的公平策略。
- SynchronousQueue初始化的時候,可以指定使用公平策略還是非公平策略。
- SynchronousQueue不存儲元素,不適合作為緩存隊列使用。適用于生產(chǎn)者與消費者速度相匹配的場景,可減少任務執(zhí)行的等待時間。