面試官:啥?SynchronousQueue是鐘?點?房?
今天這篇文章,我們繼續(xù)講架構(gòu)師大劉的故事。
大劉有段時間經(jīng)常會給一些程序員講課。這一方面是由于團隊培訓(xùn)的需要,一方面也是大劉自身想搞搞凡爾賽,嘚瑟一下自身的實力。
大劉講課是允許公司任何一個人進去聽的。提前一個星期把主題公布在公司群里,有人想聽到日子直接去就是了。
有一次,大劉在聊并發(fā)話題的時候,為了彰顯自己確實是個并發(fā)達人,用了個 SynchronousQueue 舉例子。他說這個隊列其實沒有容積的概念,就是線程持有數(shù)據(jù)互相匹配。
嗯,談到這里還是要說一下,大劉其實也不太懂 SynchronousQueue。只是一來這東西沒人用,自然就沒人懂;二來它的概念也比較晦澀,有些時候比較違背直覺,所以,即使隨口說的一些話可能不太對,也未必會被發(fā)現(xiàn),還能給人一種不明覺厲的感覺。
大劉用過幾次,感覺良好。因此沒事兒就要秀一下 SynchronousQueue,表示自己這么生僻的也懂,并發(fā)達人的名頭是沒有叫錯的。
也就那一次,恰恰被人拆了臺。
當(dāng)時課上來了個新入職的技術(shù),此人長得中等身材,相貌平平,只是臉卻長的像種地多年的老農(nóng)的巴掌。臉上的疙瘩如同老農(nóng)巴掌上的老繭。這人姓張,這里由于他臉長得像個大巴掌,那就暫且叫他巴掌張。
這個巴掌張打斷了大劉的話,言之鑿鑿說大劉說的是錯的,說他看過這個 SynchronousQueue,并不是大劉說的這樣。
大劉有點心虛,脖子滲出了一圈汗,但是并發(fā)達人的稱呼大劉并不想丟掉。于是說了一大堆云里霧里的廢話,把話題帶偏了開去。并告訴巴掌張,下回要和他在這個舞臺上 PK 一二, 要好好看看誰是真正的 SynchronousQueue 的知心朋友。
由于大劉感覺被巴掌張的巴掌糊了臉,便就此下了決心要研究透 SynchronousQueue。
Google 和百度一起查,東西合璧,洋為中用,搞了好是一陣子。最后有個犄角旮旯的小破網(wǎng)站,有人說了這么一句話:
SynchronousQueue 的目的就是為了接頭,為了匹配,當(dāng)接上頭了就雙方合作愉快,整個工作完成。但是一旦在接頭中,任何一方還沒到達,那么另一方就必須阻塞著等待。
這句話一下子就敲開了大劉的腦殼,讓聰明的智商重新占領(lǐng)了高地。
為啥這句話就點亮了大劉那本來已經(jīng)像燈泡的腦袋了呢?因為大劉想起了他每次的面試經(jīng)歷,就和這個接頭是一樣的。
大劉每次去面試,都很規(guī)矩的提前趕到新公司。但是大部分情況,時間到了之后都需要等很長時間才開始面試。大劉那時候也年輕,只是以為領(lǐng)導(dǎo)忙,所以倒也恭恭敬敬的等著。
直到大劉自己當(dāng)了領(lǐng)導(dǎo),去面試別人的時候,被 HR 委婉的提醒了下,要讓候選人等一會兒再過去,顯的公司業(yè)務(wù)很忙,讓候選人對公司保持一定的敬畏。那時候,大劉才知道這是一種 PUA 術(shù)……
大劉對照著自己的面試經(jīng)歷,一下就理解了 SynchronousQueue 的概念。
SynchronousQueue 本身是為了交接、匹配而存在的。當(dāng)一個線程往 SynchronousQueue 放東西,發(fā)現(xiàn)沒線程在等著拿,就給阻塞掉——這就像面試者來早了等面試官。
當(dāng)一個線程去 SynchronousQueue 拿東西,發(fā)現(xiàn)沒東西,就去等的時候——就像面試官來早了等面試者。
搞懂 SynchronousQueue 的時候,正是一個冬天,屋外面的寒風(fēng)在虎虎生威,屋里面的大劉在熠熠生輝。
只是一個堂而皇之?dāng)[在 JDK 底層并發(fā)包中的隊列結(jié)構(gòu),SynchronousQueue 當(dāng)然沒那么簡單,里面還存在著億點點細(xì)節(jié)。
所以,大劉在整體方向搞懂之后,開始研究起了細(xì)節(jié)。他要奮發(fā),狠狠把巴掌張的囂張氣焰壓下去,大劉要當(dāng)公司技術(shù)的頭牌。
回到現(xiàn)實里,SynchronousQueue 真正的目的就是為了讓兩個線程的工作結(jié)果進行交接。這沒什么問題。但是,在這個交接中是需要嚴(yán)格保密的,沒有人可以窺視。
嗯,沒錯,就和你約了女朋友去鐘點房那樣的不能被窺視。
好,圍繞這個 SynchronousQueue 的鐘點房,咱們通過源代碼,來看這億點點細(xì)節(jié)。
首先,鐘點房嚴(yán)格保密,里面是多少人,就不能讓人知道。所以,就不能讓別人通過方法得到具體的數(shù)據(jù)。對于 SynchronousQueue 來說,自然就是通過 size() 你得不到什么信息。
- /**
- * Always returns zero.
- * A {@code SynchronousQueue} has no internal capacity.
- *
- * @return zero
- */
- public int size() {
- return 0;
- }
- /**
- * Always returns {@code true}.
- * A {@code SynchronousQueue} has no internal capacity.
- *
- * @return {@code true}
- */
- public boolean isEmpty() {
- return true;
- }
其次,鐘點房也不能隨便進去查房,看看都是誰。所以,自然就不能迭代。
- /**
- * Returns an empty iterator in which {@code hasNext} always returns
- * {@code false}.
- *
- * @return an empty iterator
- */
- public Iterator<E> iterator() {
- return Collections.emptyIterator();
- }
再次,鐘點房保護隱私,它也不能讓你鉆了漏子,不告訴你 XXX 是不是躲在了鐘點房里。所以,你也不能知道鐘點房里有沒有這個人。
- /**
- * Always returns {@code false}.
- * A {@code SynchronousQueue} has no internal capacity.
- *
- * @param o the element
- * @return {@code false}
- */
- public boolean contains(Object o) {
- return false;
- }
- /**
- * Returns {@code false} unless the given collection is empty.
- * A {@code SynchronousQueue} has no internal capacity.
- *
- * @param c the collection
- * @return {@code false} unless given collection is empty
- */
- public boolean containsAll(Collection<?> c) {
- return c.isEmpty();
- }
自然,鐘點房也沒什么權(quán)力趕人出去。
- /**
- * Always returns {@code false}.
- * A {@code SynchronousQueue} has no internal capacity.
- *
- * @param o the element to remove
- * @return {@code false}
- */
- public boolean remove(Object o) {
- return false;
- }
當(dāng)然,作為一個商業(yè)化的鐘點房,SynchronousQueue 還是很注意安全的,它貼心的提供了緊急轉(zhuǎn)移的手段。
- /**
- * @throws UnsupportedOperationException {@inheritDoc}
- * @throws ClassCastException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
- * @throws IllegalArgumentException {@inheritDoc}
- */
- public int drainTo(Collection<? super E> c) {
- if (c == null)
- throw new NullPointerException();
- if (c == this)
- throw new IllegalArgumentException();
- int n = 0;
- for (E e; (e = poll()) != null;) {
- c.add(e);
- ++n;
- }
- return n;
- }
- /**
- * @throws UnsupportedOperationException {@inheritDoc}
- * @throws ClassCastException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
- * @throws IllegalArgumentException {@inheritDoc}
- */
- public int drainTo(Collection<? super E> c, int maxElements) {
- if (c == null)
- throw new NullPointerException();
- if (c == this)
- throw new IllegalArgumentException();
- int n = 0;
- for (E e; n < maxElements && (e = poll()) != null;) {
- c.add(e);
- ++n;
- }
- return n;
- }
最后,鐘點房就只能搞搞交接工作了。交接嗎,自然是有交有接的,交的就得帶東西。
- public void put(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- // put:帶著東西進屋子
- if (transferer.transfer(e, false, 0) == null) {
- Thread.interrupted();
- throw new InterruptedException();
- }
- }
接的肯定不會帶著東西,得留地方拿東西。
- public E take() throws InterruptedException {
- // take:從屋子里把東西拿出來
- E e = transferer.transfer(null, false, 0);
- if (e != null)
- return e;
- Thread.interrupted();
- throw new InterruptedException();
- }
但是呢,這交接工作啊,得在專人安排下進行。
為什么需要專人來幫忙?因為有時候我們的鐘點房太受歡迎了,客人多,得排隊管管。管這些排隊的就是 Transfer,它是鐘點房的經(jīng)理。
- /**
- * The transferer. Set only in constructor, but cannot be declared
- * as final without further complicating serialization. Since
- * this is accessed only at most once per public method, there
- * isn't a noticeable performance penalty for using volatile
- * instead of final here.
- */
- private transient volatile Transferer<E> transferer;
- /**
- * Shared internal API for dual stacks and queues.
- */
- abstract static class Transferer<E> {
- /**
- * Performs a put or take.
- *
- * @param e if non-null, the item to be handed to a consumer;
- * if null, requests that transfer return an item
- * offered by producer.
- * @param timed if this operation should timeout
- * @param nanos the timeout, in nanoseconds
- * @return if non-null, the item provided or received; if null,
- * the operation failed due to timeout or interrupt --
- * the caller can distinguish which of these occurred
- * by checking Thread.interrupted.
- */
- abstract E transfer(E e, boolean timed, long nanos);
- }
Transfer 經(jīng)理每次開門營業(yè)的時候,會收到總部給的牌子,告訴他管理工作要注意方式方法,比如公平有效,比如優(yōu)先服務(wù) VIP 客人之類的。
- /**
- * 默認(rèn)給vip客人開點后門
- */
- public SynchronousQueue() {
- this(false);
- }
- /**
- * 總部遞牌子,告訴Transfer到底是公平還是不公平,
- */
- public SynchronousQueue(boolean fair) {
- transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
- }
先看看適合勞苦大眾的公平模式,先來先享受,晚來沒折扣。
- static final class TransferQueue<E> extends Transferer<E> {
- static final class QNode{...}
- transient volatile QNode head;
- transient volatile QNode tail;
- transient volatile QNode cleanMe;
- TransferQueue() {
- //經(jīng)典的鏈表套路,先搞個虛擬的頭結(jié)點
- QNode h = new QNode(null, false);
- head = h;
- tail = h;
- }
- ……
- ……
QNode 就是 Transfer 經(jīng)理需要的牌子,上面記錄點信息,別到時候弄錯了。
- static final class QNode {
- volatile QNode next; // 下一個排隊的哥們兒
- volatile Object item; // 這次哥們帶來的要交接的東西
- volatile Thread waiter; // 交接的線程
- final boolean isData; // isData == true表示帶著東西
- QNode(Object item, boolean isData) {
- this.item = item;
- this.isData = isData;
- }
- // ...省略一系列CAS方法
- }
怎么搞,秘密都在 transfer() 里。
- @SuppressWarnings("unchecked")
- E transfer(E e, boolean timed, long nanos) {
- //...先省略細(xì)節(jié)
- }
transfer 本質(zhì)就是一直在等待交接完成或者交接被中斷,被取消,或者等待超時。
- for (;;) {
- QNode t = tail;
- QNode h = head;
- //因為初始化是在構(gòu)造函數(shù)里搞得,可能構(gòu)造函數(shù)沒有執(zhí)行完,就被用上了,就會出現(xiàn)t或者h(yuǎn)為null的情況
- if (t == null || h == null)
- continue; //啥也不能做
- //h==t表示沒人,t.isData == isData表示過來的哥們和前面的哥們目的一樣,那就只能考慮排隊等著了。
- if (h == t || t.isData == isData) {
- QNode tn = t.next;
- //線程不安全需要考慮的,現(xiàn)在的尾巴不對,指錯了,重新確認(rèn)下
- if (t != tail)
- continue;
- //隊尾確定了,發(fā)現(xiàn)又來了人,把尾巴指向新來的人
- if (tn != null) {
- advanceTail(t, tn);
- continue;
- }
- //超時了,別等了
- if (timed && nanos <= 0)
- return null;
- //總算沒事兒了,哥們可以登記進屋了
- if (s == null)
- s = new QNode(e, isData);
- //中間可能有人插隊,只能再等等
- if (!t.casNext(null, s))
- continue;
- //準(zhǔn)備進屋等著約的人
- advanceTail(t, s);
- Object x = awaitFulfill(s, e, timed, nanos);
- //同一個人出來,那就是任務(wù)失敗了
- if (x == s) {
- //清理下
- clean(t, s);
- return null;
- }
- if (!s.isOffList()) { //還沒脫隊
- advanceHead(t, s); //排前面單獨處理
- if (x != null) //交接成功設(shè)一下標(biāo)記
- s.item = s;
- s.waiter = null;
- }
- return (x != null) ? (E)x : e;
這段是不是看著很頭痛?其實 Transfer 這小子也頭痛。
它首先要面臨的第一個問題:資源競爭的問題。
客人源源不斷的來,由于 Transfer 強迫癥,他想每次必須從絕對的隊頭或者隊尾巴開始,所以,每次都要判斷下,到底他看到的隊頭或者隊尾,是不是真正的隊頭、隊尾。
確定沒問題了,新來的客人就開始被打造成真正的隊尾。
然后,成為隊尾的哥們就可以等著屬于自己的 Mr.Right 過來交接了。等著交接一直到成功或者失敗的方法就是 awaitFulfill(t, tn)。
這邊有人在等待,同時另外一邊,交接的人們也開始陸續(xù)過來了。
- else { // complementary-mode
- QNode m = h.next; // node to fulfill
- if (t != tail || m == null || h != head)
- continue; // inconsistent read
- Object x = m.item;
- if (isData == (x != null) || // m already fulfilled
- x == m || // m cancelled
- !m.casItem(x, e)) { // 交接的核心語句
- advanceHead(h, m); // dequeue and retry
- continue;
- }
- advanceHead(h, m); // successfully fulfilled
- LockSupport.unpark(m.waiter);
- return (x != null) ? (E)x : e;
- }
交接最核心的其實就是 m.casItem(x, e)。交接成功,大家各回各家了。
整體的流程如下:
開始就是個經(jīng)典鏈表開局,head = tail
陸續(xù)開始有節(jié)點鏈接,put 的時候,isData = true;take 的時候,isData = false
可能會同時有很多的 put 操作,沒有對應(yīng)的 take 操作,他們就按照次序一個個鏈接起來,形成鏈表,并通過 awaitFulfill 方法等著對應(yīng)的 take
也可能同時會有很多的 take 操作,而沒有對應(yīng)的 put 操作,會形成鏈表,并通過 awaitFulfill 方法等著對應(yīng)的 put
take 操作會從鏈表頭開始找匹配的 put,然后通過 casItem 方法交接
put 操作會從鏈表頭開始找匹配的 take,然后通過 casItem 方法交接
所以,SynchronousQueue 你可以看到了,專門就是搞交接任務(wù)。
- put 的哥們發(fā)現(xiàn)沒人 take,就等在那里,等著take操作。
- take的哥們兒發(fā)現(xiàn)沒人put,也會等在那里,等著put操作。
這就是我們的 SynchronousQueue 鐘點房做的事情。
OK,鐘點房既然開門做生意,它也要賺錢的嘛。所以,它還得搞搞 VIP 客戶收費,也得為 VIP 客戶搞一些優(yōu)待。
對于這些 VIP 客人,我們的 Transfer 經(jīng)理會特意安排下,以棧的形式來安排客人,越后來的客人越大牌兒。所以,自然是后來的客人會優(yōu)先搞定交接了。這里簡短的介紹下,就不再贅述了。
Transfer 化身成 TransferStack,后來的優(yōu)先服務(wù)。
開始自然是鏈表開局,一個無意義的鏈表頭指向了 null
發(fā)現(xiàn)鏈表是空了,二話不說,客官,您進來先啦
和 TransferQueue 一樣,如果都是 take 過來,模式就是 REQUEST,就得排隊了
交接人出現(xiàn),哥們可以收攤兒了
其余的不說了,一樣的,說多了沒勁
話說,大劉搞清楚了這些細(xì)節(jié)之后,次日,當(dāng)巴掌張再次進行挑釁時,大劉徹底穩(wěn)下來了。
當(dāng)挨個把細(xì)節(jié)講的一清二楚之后,看著巴掌張那張落寞的巴掌臉,瞬間也不覺得像巴掌了,而是像是在猜拳中出的石頭剪刀布中的布。大劉沒忍住,對著這個布比劃出了個剪刀,光榮的結(jié)束了戰(zhàn)斗。
大劉依然在技術(shù)流中獨占鰲頭。
我們下篇大劉的故事見。
本文轉(zhuǎn)載自微信公眾號「四猿外」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系四猿外公眾號