為了帶你精通 Java AQS,我畫(huà)了 40 張圖,從管程模型講起!
大家好,我是君哥。
Java中 AQS 是 AbstractQueuedSynchronizer 類(lèi),AQS 依賴(lài) FIFO 隊(duì)列來(lái)提供一個(gè)框架,這個(gè)框架用于實(shí)現(xiàn)鎖以及鎖相關(guān)的同步器,比如信號(hào)量、事件等。
在 AQS 中,主要有兩部分功能,一部分是操作 state 變量,第二部分是實(shí)現(xiàn)排隊(duì)和阻塞機(jī)制。
注意,AQS 并沒(méi)有實(shí)現(xiàn)任何同步接口,它只是提供了類(lèi)似 acquireInterruptible 的方法,調(diào)用這些方法可以實(shí)現(xiàn)鎖和同步器。
1、管程模型
Java 使用 MESA 管程模型來(lái)管理類(lèi)的成員變量和方法,讓這個(gè)類(lèi)的成員變量和方法的操作是線程安全的。下圖是 MESA 管程模型,里面除了定義共享變量外,還定義了條件變量和條件變量等待隊(duì)列:
上圖中有三個(gè)知識(shí)點(diǎn):
- MESA 管程模型封裝了共享變量和對(duì)共享變量的操作,線程要進(jìn)入管程內(nèi)部,必須獲取到鎖,如果獲取鎖失敗就進(jìn)入入口等待隊(duì)列阻塞等待。
- 如果線程獲取到鎖,就進(jìn)入到管程內(nèi)部。但是進(jìn)入到管程內(nèi)部,也不一定能立刻操作共享變量,而是要看條件變量是否滿足,如果不滿足,只能進(jìn)入條件變量等待隊(duì)列阻塞等待。
- 在條件變量等待隊(duì)列中,如果被其他線程喚醒,也不一定能立刻操作共享變量,而是需要去入口等待隊(duì)列重新排隊(duì)等待獲取鎖。
Java 中的 MESA 管程模型有一點(diǎn)改進(jìn),就是管程內(nèi)部只有一個(gè)條件變量和一個(gè)等待隊(duì)列。下圖是 AQS 的管程模型:
AQS 的管程模型依賴(lài) AQS 中的 FIFO 隊(duì)列實(shí)現(xiàn)入口等待隊(duì)列,要進(jìn)入管程內(nèi)部,就由各種并發(fā)鎖的限制。而 ConditionObject 則實(shí)現(xiàn)了條件隊(duì)列,這個(gè)隊(duì)列可以創(chuàng)建多個(gè)。
下面就從入口等待隊(duì)列、并發(fā)鎖、條件等待隊(duì)列三個(gè)方面來(lái)帶你徹底理解 AQS。
2、入口等待隊(duì)列
2.1 獲取獨(dú)占鎖
獨(dú)占, 忽略 interrupts
這里的 tryAcquire 是抽象方法,由 AQS 的子類(lèi)來(lái)實(shí)現(xiàn),因?yàn)槊總€(gè)子類(lèi)實(shí)現(xiàn)的鎖是不一樣的。
2.1.1 入隊(duì)
上面的代碼可以看到,獲取鎖失敗后,會(huì)先執(zhí)行 addWaiter 方法加入隊(duì)列,然后執(zhí)行 acquireQueued 方法自旋地獲取鎖直到成功。
addWaiter 代碼邏輯如下圖,簡(jiǎn)單說(shuō)就是把 node 入隊(duì),入隊(duì)后返回 node 參數(shù)給 acquireQueued 方法:
這里有一個(gè)點(diǎn)需要注意,如果隊(duì)列為空,則新建一個(gè) Node 作為隊(duì)頭。
2.1.2 入隊(duì)后獲取鎖
acquireQueued 自旋獲取鎖邏輯如下圖:
這里有幾個(gè)細(xì)節(jié):
(1)waitStatus
- CANCELLED(1):當(dāng)前節(jié)點(diǎn)取消獲取鎖。當(dāng)?shù)却瑫r(shí)或被中斷(響應(yīng)中斷),會(huì)觸發(fā)變更為此狀態(tài),進(jìn)入該狀態(tài)后節(jié)點(diǎn)狀態(tài)不再變化;
- SIGNAL(-1):后面節(jié)點(diǎn)等待當(dāng)前節(jié)點(diǎn)喚醒;
- CONDITION(-2):Condition 中使用,當(dāng)前線程阻塞在 Condition,如果其他線程調(diào)用了 Condition 的 signal 方法,這個(gè)結(jié)點(diǎn)將從等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列隊(duì)尾,等待獲取同步鎖;
- PROPAGATE(-3):共享模式,前置節(jié)點(diǎn)喚醒后面節(jié)點(diǎn)后,喚醒操作無(wú)條件傳播下去;
- 0:中間狀態(tài),當(dāng)前節(jié)點(diǎn)后面的節(jié)點(diǎn)已經(jīng)喚醒,但是當(dāng)前節(jié)點(diǎn)線程還沒(méi)有執(zhí)行完成。
(2)獲取鎖失敗后掛起
如果前置節(jié)點(diǎn)不是頭節(jié)點(diǎn),或者前置節(jié)點(diǎn)是頭節(jié)點(diǎn)但當(dāng)前節(jié)點(diǎn)獲取鎖失敗,這時(shí)當(dāng)前節(jié)點(diǎn)需要掛起,分三種情況:
- 前置節(jié)點(diǎn) waitStatus=-1,如下圖:
- 前置節(jié)點(diǎn) waitStatus > 0,如下圖:
- 前置節(jié)點(diǎn) waitStatus < 0 但不等于 -1,如下圖:
(3)取消獲取鎖
如果獲取鎖拋出異常,則取消獲取鎖,如果當(dāng)前節(jié)點(diǎn)是 tail 節(jié)點(diǎn),分兩種情況如下圖:
如果當(dāng)前節(jié)點(diǎn)不是 tail 節(jié)點(diǎn),也分兩種情況,如下圖:
4.對(duì)中斷狀態(tài)忽略
5.如果前置節(jié)點(diǎn)的狀態(tài)是 0 或 PROPAGATE,會(huì)被當(dāng)前節(jié)點(diǎn)自旋過(guò)程中更新成 -1,以便之后通知當(dāng)前節(jié)點(diǎn)。
2.1.3 獨(dú)占 + 響應(yīng)中斷
對(duì)應(yīng)方法 acquireInterruptibly(int arg)。
跟忽略中斷(acquire方法)不同的是要響應(yīng)中斷,下面兩個(gè)地方響應(yīng)中斷:
- 獲取鎖之前會(huì)檢查當(dāng)前線程是否中斷。
- 獲取鎖失敗入隊(duì),在隊(duì)列中自旋獲取鎖的過(guò)程中也會(huì)檢查當(dāng)前線程是否中斷。如果檢查到當(dāng)前線程已經(jīng)中斷,則拋出 InterruptedException,當(dāng)前線程退出。
2.1.4 獨(dú)占 + 響應(yīng)中斷 + 考慮超時(shí)
對(duì)應(yīng)方法 tryAcquireNanos(int arg, long nanosTimeout)。
這個(gè)方法具備了獨(dú)占 + 響應(yīng)中斷 + 超時(shí)的功能,下面2個(gè)地方要判斷是否超時(shí):
- 自旋獲取鎖的過(guò)程中每次獲取鎖失敗都要判斷是否超時(shí);
- 獲取鎖失敗 park 之前要判斷超時(shí)時(shí)間是否大于自旋的閾值時(shí)間 (spinForTimeoutThreshold = 1ns) 另外,park 線程的操作使用 parkNanos 傳入阻塞時(shí)間。
2.2 釋放獨(dú)占鎖
獨(dú)占鎖釋放分兩步:釋放鎖,喚醒后繼節(jié)點(diǎn)。
釋放鎖的方法 tryRelease 是抽象的,由子類(lèi)去實(shí)現(xiàn)。
我們看一下喚醒后繼節(jié)點(diǎn)的邏輯,首先需要滿足兩個(gè)條件:
- head 節(jié)點(diǎn)不等于 null;
- head 節(jié)點(diǎn) waitStatus 不等于 0。這里有兩種情況(在方法 unparkSuccessor):
- 情況一,后繼節(jié)點(diǎn) waitStatus <= 0,直接喚醒后繼節(jié)點(diǎn),如下圖:
- 情況二:后繼節(jié)點(diǎn)為空或者 waitStatus > 0,從后往前查找最接近當(dāng)前節(jié)點(diǎn)的節(jié)點(diǎn)進(jìn)行喚醒,如下圖:
2.3 獲取共享鎖
之前我們講了獨(dú)占鎖,這一小節(jié)我們談共享鎖,有什么不同呢?
2.3.1 共享,忽略 interrupts
對(duì)應(yīng)方法 acquireShared,代碼如下:
2.3.2 tryAcquireShared
這里獲取鎖使用的方法是 tryAcquireShared,獲取的是共享鎖。獲取共享鎖跟獲取獨(dú)占鎖不同的是,會(huì)返回一個(gè)整數(shù)值,說(shuō)明如下:
- 返回負(fù)數(shù):獲取鎖失敗。
- 返回 0:獲取鎖成功但是之后再由線程來(lái)獲取共享鎖時(shí)就會(huì)失敗。
- 返回正數(shù):獲取鎖成功而且之后再有線程來(lái)獲取共享鎖時(shí)也可能會(huì)成功。所以需要把喚醒操作傳播下去。tryAcquireShared 獲取鎖失敗后(返回負(fù)數(shù)),就需要入隊(duì)后自旋獲取,也就是執(zhí)行方法 doAcquireShared。
2.3.3 doAcquireShared
怎么判斷隊(duì)列中等待節(jié)點(diǎn)是在等待共享鎖呢?nextWaiter == SHARED,這個(gè)參數(shù)值是入隊(duì)新建節(jié)點(diǎn)的時(shí)候構(gòu)造函數(shù)傳入的。
自旋過(guò)程中,如果獲取鎖成功(返回正數(shù)),首先把自己設(shè)置成新的 head 節(jié)點(diǎn),然后把通知傳播下去。如下圖:
之后會(huì)喚醒后面節(jié)點(diǎn)并保證喚醒操作可以傳播下去。但是需要滿足四個(gè)條件中的一個(gè):
- tryAcquireShared 返回值大于0,有多余的鎖,可以繼續(xù)喚醒后繼節(jié)點(diǎn)。
- 舊的 head 節(jié)點(diǎn) waitStatus < 0,應(yīng)該是其他線程釋放共享鎖過(guò)程中把它的狀態(tài)更新成了 -3。
- 新的 hade 節(jié)點(diǎn) waitStatus < 0,只要不是 tail 節(jié)點(diǎn),就可能是 -1。這里會(huì)造成不必要的喚醒,因?yàn)閱拘押螳@取不到鎖只能繼續(xù)入隊(duì)等待。
- 當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)是空或者非空但正在等待共享鎖。
喚醒后面節(jié)點(diǎn)的操作,其實(shí)就是釋放共享鎖,對(duì)應(yīng)方法是 doReleaseShared,見(jiàn)釋放共享鎖一節(jié)。
2.3.4 共享 + 響應(yīng)中斷
對(duì)應(yīng)方法 acquireSharedInterruptibly(int arg)。
跟共享忽略中斷(acquireShared 方法)不同的是要響應(yīng)中斷,下面兩個(gè)地方響應(yīng)中斷:
- 獲取鎖之前會(huì)檢查當(dāng)前線程是否中斷。
- 獲取鎖失敗入隊(duì),在隊(duì)列中自旋獲取鎖的過(guò)程中也會(huì)檢查當(dāng)前線程是否中斷。
如果檢查到當(dāng)前線程已經(jīng)中斷,則拋出 InterruptedException,當(dāng)前線程退出。
2.3.5 共享 + 響應(yīng)中斷 + 考慮超時(shí)
對(duì)應(yīng)方法 tryAcquireSharedNanos(int arg, long nanosTimeout)。
這個(gè)方法具備了共享 + 響應(yīng)中斷 + 超時(shí)的功能,下面兩個(gè)個(gè)地方要判斷是否超時(shí):
- 自旋獲取鎖的過(guò)程中每次獲取鎖失敗都要判斷是否超時(shí)。
- 獲取鎖失敗 park 之前要判斷超時(shí)時(shí)間是否大于自旋的閾值時(shí)間(spinForTimeoutThreshold = 1ns)。
另外,park 線程的操作使用 parkNanos 傳入阻塞時(shí)間。
2.4 釋放共享鎖
釋放共享鎖代碼如下:
首先嘗試釋放共享鎖,tryReleaseShared 代碼由子類(lèi)來(lái)實(shí)現(xiàn)。釋放成功后執(zhí)行AQS中的 doReleaseShared 方法,是一個(gè)自旋操作。
自旋的條件是隊(duì)列中至少有兩個(gè)節(jié)點(diǎn),這里分三種情況。
情況一:當(dāng)前節(jié)點(diǎn) waitStatus 是 -1,如下圖:
情況二:當(dāng)前節(jié)點(diǎn) waitStatus 是 0(被其他線程更新新成了中間狀態(tài)),如下圖:
情況三:當(dāng)前節(jié)點(diǎn) waitStatus 是 -3,為什么會(huì)這樣呢?需要解釋一下,head節(jié)點(diǎn)喚醒后繼節(jié)點(diǎn)之前 waitStatus 已經(jīng)被更新中間態(tài) 0 了,喚醒后繼節(jié)點(diǎn)動(dòng)作還沒(méi)有執(zhí)行,又被其他線程更成了 -3,也就是其他線程釋放鎖執(zhí)行了上面情況二。這時(shí)需要先把 waitStatus 再更成 0 (在方法 unparkSuccessor),如下圖:
2.5 抽象方法
上面的講解可以看出,如果要基于 AQS 來(lái)實(shí)現(xiàn)并發(fā)鎖,可以根據(jù)需求重寫(xiě)下面四個(gè)方法來(lái)實(shí)現(xiàn),這四個(gè)方法在 AQS 中沒(méi)有具體實(shí)現(xiàn):
- tryAcquire(int arg):獲取獨(dú)占鎖
- tryRelease(int arg):釋放獨(dú)占鎖
- tryAcquireShared(int arg):獲取共享鎖
- tryReleaseShared(int arg):釋放共享鎖
AQS 的子類(lèi)需要重寫(xiě)上面的方法來(lái)修改 state 值,并且定義獲取鎖或者釋放鎖時(shí) state 值的變化。子類(lèi)也可以定義自己的 state 變量,但是只有更新 AQS 中的 state變量才會(huì)對(duì)同步起作用。
還有一個(gè)判斷當(dāng)前線程是否持有獨(dú)占鎖的方法 isHeldExclusively,也可以供子類(lèi)重寫(xiě)后使用。
獲取/釋放鎖的具體實(shí)現(xiàn)放到下篇文章講解。
2.6 總結(jié)
AQS 使用 FIFO 隊(duì)列實(shí)現(xiàn)了一個(gè)鎖相關(guān)的并發(fā)器模板,可以基于這個(gè)模板來(lái)實(shí)現(xiàn)各種鎖,包括獨(dú)占鎖、共享鎖、信號(hào)量等。
AQS 中,有一個(gè)核心狀態(tài)是 waitStatus,這個(gè)代表節(jié)點(diǎn)的狀態(tài),決定了當(dāng)前節(jié)點(diǎn)的后續(xù)操作,比如是否等待喚醒,是否要喚醒后繼節(jié)點(diǎn)。
3 并發(fā)鎖
這一章節(jié)講解 Java AQS 中的并發(fā)鎖。其實(shí) Java AQS 中的并發(fā)鎖主要是基于 state 這個(gè)變量值來(lái)實(shí)現(xiàn)的。
3.1 ReentrantLock
我們先來(lái)看一下 UML 類(lèi)圖:
從圖中可以看到,ReentrantLock 使用抽象內(nèi)部類(lèi) Sync 來(lái)實(shí)現(xiàn)了 AQS 的方法,然后基于 Sync 這個(gè)同步器實(shí)現(xiàn)了公平鎖和非公平鎖。主要實(shí)現(xiàn)了下面 3 個(gè)方法:
- tryAcquire(int arg):獲取獨(dú)占鎖
- tryRelease(int arg):釋放獨(dú)占鎖
- isHeldExclusively:當(dāng)前線程是否占有獨(dú)占鎖。ReentrantLock 默認(rèn)實(shí)現(xiàn)的是非公平鎖,可以在構(gòu)造函數(shù)指定。
從實(shí)現(xiàn)的方法可以看到,ReentrantLock 中獲取的鎖是獨(dú)占鎖,我們?cè)賮?lái)看一下獲取和釋放獨(dú)占鎖的代碼:
獨(dú)占鎖的特點(diǎn)是調(diào)用上面 acquire 方法,傳入的參數(shù)是 1。
3.1.1 獲取公平鎖
獲取鎖首先判斷同步狀態(tài)(state)的值。
3.1.1.1 state 等于 0
這說(shuō)明沒(méi)有線程占用鎖,當(dāng)前線程如果符合下面兩個(gè)條件,就可以獲取到鎖:
沒(méi)有前任節(jié)點(diǎn),如下圖:
CAS 的方式更新 state 值(把 0 更新成 1)成功。如果獲取獨(dú)占鎖成功,會(huì)更新 AQS 中 exclusiveOwnerThread 為當(dāng)前線程,這個(gè)很容易理解。
3.1.1.2 state 不等于 0
這說(shuō)明已經(jīng)有線程占有鎖,判斷占有鎖的線程是不是當(dāng)前線程,如下圖:
state += 1 值如果小于 0,會(huì)拋出異常。
如果獲取鎖失敗,則進(jìn)入 AQS 隊(duì)列等待喚醒。
3.1.2 獲取非公平鎖
跟公平鎖相比,非公平鎖的唯一不同是如果判斷到 state 等于 0,不用判斷有沒(méi)有前任節(jié)點(diǎn),只要 CAS 設(shè)置 state 值(把 0 更新成 1)成功,就獲取到了鎖。
3.1.3 釋放鎖
公平鎖和非公平鎖,釋放邏輯完全一樣,都是在內(nèi)部類(lèi) Sync 中實(shí)現(xiàn)的。釋放鎖需要注意兩點(diǎn),如下圖:
為什么 state 會(huì)大于 1,因?yàn)槭强梢灾厝氲?,占有鎖的線程可以多次獲取鎖。
3.1.4 總結(jié)
公平鎖的特點(diǎn)是每個(gè)線程都要進(jìn)行排隊(duì),不用擔(dān)心線程永遠(yuǎn)獲取不到鎖,但有個(gè)缺點(diǎn)是每個(gè)線程入隊(duì)后都需要阻塞和被喚醒,這一定程度上影響了效率。非公平鎖的特點(diǎn)是每個(gè)線程入隊(duì)前都會(huì)先嘗試獲取鎖,如果獲取成功就不會(huì)入隊(duì)了,這比公平鎖效率高。但也有一個(gè)缺點(diǎn),隊(duì)列中的線程有可能等待很長(zhǎng)時(shí)間,高并發(fā)下甚至可能永遠(yuǎn)獲取不到鎖。
3.2 ReentrantReadWriteLock
我們先來(lái)看一下 UML 類(lèi)圖:
從圖中可以看到,ReentrantReadWriteLock 使用抽象內(nèi)部類(lèi)Sync來(lái)實(shí)現(xiàn)了 AQS 的方法,然后基于 Sync 這個(gè)同步器實(shí)現(xiàn)了公平鎖和非公平鎖。主要實(shí)現(xiàn)了下面 3 個(gè)方法:
- tryAcquire(int arg):獲取獨(dú)占鎖
- tryRelease(int arg):釋放獨(dú)占鎖
- tryAcquireShared(int arg):獲取共享鎖
- tryReleaseShared(int arg):釋放共享鎖
- isHeldExclusively:當(dāng)前線程是否占有獨(dú)占鎖 可見(jiàn)ReentrantReadWriteLock里面同時(shí)用到了共享鎖和獨(dú)占鎖。
下圖是定義的幾個(gè)常用變量:
下面這 2 個(gè)方法用戶獲取共享鎖和獨(dú)占鎖的數(shù)量:
從sharedCount 可以看到,共享鎖的數(shù)量要右移 16 位獲取,也就是說(shuō)共享鎖占了高 16 位。從上圖 EXCLUSIVE_MASK 的定義看到,跟 EXCLUSIVE_MASK 進(jìn)行與運(yùn)算,得到的是低 16 位的值,所以獨(dú)占鎖占了低 16 位。如下圖:
這樣上面獲取鎖數(shù)量的方法就很好理解了。
3.2.1 讀鎖
讀鎖的實(shí)現(xiàn)對(duì)應(yīng)內(nèi)部類(lèi) ReadLock。
3.2.1.1 獲取讀鎖
獲取讀鎖實(shí)際上是 ReadLock 調(diào)用了 AQS 的下面方法,傳入?yún)?shù)是 1:
ReentrantReadWriteLock 內(nèi)部類(lèi) Sync 實(shí)現(xiàn)了 tryAcquireShared 方法,主要包括如下三種情況:
- 使用 exclusiveCount 方法查看 state 中是否有獨(dú)占鎖,如果有并且獨(dú)占線程不是當(dāng)前線程,返回 -1,獲取失敗;
- 使用 sharedCount 查看 state 中共享鎖數(shù)量,如果讀鎖數(shù)量小于最大值(MAX_COUNT=65535),則再滿足下面 3 個(gè)條件就可以獲取成功并返回 1:
a.當(dāng)前線程不需要阻塞(readerShouldBlock)。在公平鎖中,需要判斷是否有前置節(jié)點(diǎn),如下圖就需要阻塞:
在非公平鎖中,則是判斷第一個(gè)節(jié)點(diǎn)是不是有獨(dú)占鎖,如下圖就需要阻塞:
b.使用 CAS 把 state 的值加 SHARED_UNIT(65536)。這里是不是就更理解讀鎖占高位的說(shuō)法了,獲取一個(gè)讀鎖,state 的值就要加 SHARED_UNIT 這么多個(gè)。
c.給當(dāng)前線程的 holdCount 加 1。
- 如果 2 失敗,自旋,重復(fù)上面的步驟直到獲取到鎖。tryAcquireShared (獲取共享鎖)會(huì)返回一個(gè)整數(shù),如下:
- 返回負(fù)數(shù):獲取鎖失敗。
- 返回 0:獲取鎖成功但是之后再由線程來(lái)獲取共享鎖時(shí)就會(huì)失敗。
- 返回正數(shù):獲取鎖成功而且之后再有線程來(lái)獲取共享鎖時(shí)也可能會(huì)成功。
3.2.1.2 釋放讀鎖
ReentrantReadWriteLock 釋放讀鎖是在 ReadLock 中調(diào)用了 AQS 下面方法,傳入的參數(shù)是1:
ReentrantReadWriteLock 內(nèi)部類(lèi) Sync 實(shí)現(xiàn)了 releaseShared 方法,具體邏輯分為下面兩步:
- 當(dāng)前線程 holdCounter 值減 1。
- CAS的方式將 state 的值減去 SHARED_UNIT。
3.2.2 寫(xiě)鎖
寫(xiě)鎖的實(shí)現(xiàn)對(duì)應(yīng)內(nèi)部類(lèi) WriteLock。
3.2.2.1 獲取寫(xiě)鎖
ReentrantReadWriteLock 獲取寫(xiě)鎖其實(shí)是在 WriteLock 中調(diào)用了 AQS 的下面方法,傳入?yún)?shù) 1:
在ReentrantReadWriteLock 內(nèi)部類(lèi) Sync 實(shí)現(xiàn)了 tryAcquire 方法,首先獲取 state 值和獨(dú)占鎖數(shù)量(exclusiveCount),之后分如下兩種情況,如下圖:
1.state 不等于 0:
- 獨(dú)占鎖數(shù)量等于 0,這時(shí)說(shuō)明有線程占用了共享鎖,如果當(dāng)前線程不是獨(dú)占線程,獲取鎖失敗。
- 獨(dú)占鎖數(shù)量不等于 0,獨(dú)占鎖數(shù)量加 1 后大于 MAX_COUNT,獲取鎖失敗。
- 上面 2 種情況不符合,獲取鎖成功,state 值加 1。2.state 等于 0,判斷當(dāng)前線程是否需要阻塞(writerShouldBlock)。在公平鎖中,跟 readerShouldBlock 的邏輯完全一樣,就是判斷隊(duì)列中 head 節(jié)點(diǎn)的后繼節(jié)點(diǎn)是不是當(dāng)前線程。在非公平鎖中,直接返回 false,即可以直接嘗試獲取鎖。
如果當(dāng)前線程不需要阻塞,并且給 state 賦值成功,使用 CAS 方式把 state 值加 1,把獨(dú)占線程置為當(dāng)前線程。
3.2.2.2 釋放寫(xiě)鎖
ReentrantReadWriteLock 釋放寫(xiě)鎖其實(shí)是在 WriteLock 中調(diào)用了 AQS 的下面方法,傳入?yún)?shù) 1:
ReentrantReadWriteLock 在 Sync 中實(shí)現(xiàn)了 tryRelease(arg) 方法,邏輯如下:
- 判斷當(dāng)前線程是不是獨(dú)占線程,如果不是,拋出異常。
- state值減1后,用新state值判斷獨(dú)占鎖數(shù)量是否等于0
- 如果等于0,則把獨(dú)占線程置為空,返回true,這樣上面的代碼就可以喚醒隊(duì)列中的后置節(jié)點(diǎn)了
- 如果不等于0,返回false,不喚醒后繼節(jié)點(diǎn)。
3.3 CountDownLatch
我們先來(lái)看一下UML類(lèi)圖:
從上面的圖中看出,CountDownLatch 的內(nèi)部類(lèi) Sync 實(shí)現(xiàn)了獲取共享鎖和釋放共享鎖的邏輯。
使用 CountDownLatch 時(shí),構(gòu)造函數(shù)會(huì)傳入一個(gè) int 類(lèi)型的參數(shù) count,表示調(diào)動(dòng) count 次的 countDown 后主線程才可以被喚醒。
上面的 Sync(count) 就是將 AQS 中的 state 賦值為 count。
3.3.1 await
CountDownLatch 的 await 方法調(diào)用了 AQS 中的 acquireSharedInterruptibly(int arg),傳入?yún)?shù) 1,不過(guò)這個(gè)參數(shù)并沒(méi)有用。代碼如下:
Sync 中實(shí)現(xiàn)了 tryAcquireShared 方法,await 邏輯如下圖:
上面的自旋過(guò)程就是等待 state 的值不斷減小,只有 state 值成為 0 的時(shí)候,主線程才會(huì)跳出自旋執(zhí)行之后的邏輯。
3.3.2 countDown
CountDownLatch 的 countDown 方法調(diào)用了 AQS 的 releaseShared(int arg),傳入?yún)?shù) 1,不過(guò)這個(gè)參數(shù)并沒(méi)有用。內(nèi)部類(lèi) Sync 實(shí)現(xiàn)了 tryReleaseShared 方法,邏輯如下圖:
3.3.3 總結(jié)
CountDownLatch 的構(gòu)造函數(shù)入?yún)⒅禃?huì)賦值給 state 變量,入隊(duì)操作是主線程入隊(duì),每個(gè)子線程調(diào)用了countDown 后 state 值減 1,當(dāng) state 值成為 0 后喚醒主線程。
3.4 Semaphore
Semaphore 是一個(gè)信號(hào)量,用來(lái)保護(hù)共享資源。如果線程要訪問(wèn)共享資源,首先從 Semaphore 獲取鎖(信號(hào)量),如果信號(hào)量的計(jì)數(shù)器等于 0,則當(dāng)前線程進(jìn)入 AQS 隊(duì)列阻塞等待。否則,線程獲取鎖成功,信號(hào)量減 1。使用完共享資源后,釋放鎖(信號(hào)量加 1)。
Semaphore 跟管程模型不一樣的是,允許多個(gè)(構(gòu)造函數(shù)的 permits)線程進(jìn)入管程內(nèi)部,因此也常用它來(lái)做限流。
UML 類(lèi)圖如下:
Semaphore的構(gòu)造函數(shù)會(huì)傳入一個(gè)int類(lèi)型參數(shù),用來(lái)初始化state的值。
3.4.1 acquire
獲取鎖的操作調(diào)用了 AQS 中的 acquireSharedInterruptibly 方法,傳入?yún)?shù) 1,代碼見(jiàn) CountDownLatch 中 await 小節(jié)。Semaphore 在公平鎖和非公平鎖中分別實(shí)現(xiàn)了 tryAcquireShared 方法。
3.4.1.1 公平鎖
Semaphore 默認(rèn)使用非公平鎖,如果使用公平鎖,需要在構(gòu)造函數(shù)指定。獲取公平鎖邏輯比較簡(jiǎn)單,如下圖:
3.4.1.2 非公平鎖
acquire 在非公平的鎖唯一的區(qū)別就是不會(huì)判斷 AQS 隊(duì)列是否有前置節(jié)點(diǎn)(hasQueuedPredecessors),而是直接嘗試獲取鎖。
除了 acquire 方法外,還有其他幾個(gè)獲取鎖的方法,原理類(lèi)似,只是調(diào)用了 AQS 中的不同方法。
3.4.2 release
釋放鎖的操作調(diào)用了 AQS 中的 releaseShared(int arg) 方法,傳入?yún)?shù) 1,在內(nèi)部類(lèi) Sync 中實(shí)現(xiàn)了 tryReleaseShared 方法,邏輯很簡(jiǎn)單:使用 CAS 的方式將 state 的值加 1,之后喚醒隊(duì)列中的后繼節(jié)點(diǎn)。
3.5 ThreadPoolExecutor
ThreadPoolExecutor 中也用到了 AQS,看下面的 UML 類(lèi)圖:
Worker 主要在 ThreadPoolExecutor 中斷線程的時(shí)候使用。Worker 自己實(shí)現(xiàn)了獨(dú)占鎖,在中斷線程時(shí)首先進(jìn)行加鎖,中斷操作后釋放鎖。按照官方說(shuō)法,這里不直接使用 ReentrantLock 的原因是防止調(diào)用控制線程池的方法(類(lèi)似 setCorePoolSize)時(shí)能夠重新獲取到鎖,
3.5.1 tryAcquire
使用 CAS 的方式把 AQS 中 state 從 0 改為 1,把當(dāng)前線程置為獨(dú)占線程。
3.5.2 tryRelease
把獨(dú)占線程置為空,把 AQS 中 state 改為 0。
Worker 初始化的時(shí)候會(huì)把 state 置為 -1,這樣是不能獲取鎖成功的。只有調(diào)用了 runWorker 方法,才會(huì)通過(guò)釋放鎖操作把 state 更為 0。這樣保證了只中斷運(yùn)行中的線程,而不會(huì)中斷等待中的線程。
3.6 總結(jié)
AQS 基于雙向隊(duì)列實(shí)現(xiàn)了入口等待隊(duì)列,基于 state 變量實(shí)現(xiàn)了各種并發(fā)鎖,上篇文章講了入口等待隊(duì)列,而這篇文章主要講了基于 AQS 的并發(fā)鎖原理。
4、條件變量等待隊(duì)列
本章節(jié)主要講解管程模型中條件變量等待隊(duì)列。
4.1 官方示例
首先我們看一下官方給出的示例代碼:
這個(gè)代碼定義了兩個(gè)條件變量,notFull 和 notEmpty,說(shuō)明如下:
- 如果 items 數(shù)組已經(jīng)滿了,則 notFull 變量不滿足,線程需要進(jìn)入 notFull 條件等待隊(duì)列進(jìn)行等待。當(dāng) take 方法取走一個(gè)數(shù)組元素時(shí),notFull 條件滿足了,喚醒 notFull 條件等待隊(duì)列中等待線程。
- 如果 items 數(shù)組為空,則 notEmpty 變量不滿足,線程需要進(jìn)入 notEmpty 條件等待隊(duì)列進(jìn)行等待。當(dāng) put 方法加入一個(gè)數(shù)組元素時(shí),notEmpty 條件滿足了,喚醒 notEmpty 條件等待隊(duì)列中等待線程。
- 條件變量是綁定在 Lock 上的,示例代碼使用了 ReentrantLock。在執(zhí)行 await 和 signal 方法時(shí)首先要獲取到鎖。
4.2 原理簡(jiǎn)介
Java AQS 的條件變量等待隊(duì)列是基于接口 Condition 和 ConditionObject 來(lái)實(shí)現(xiàn)的,URM 類(lèi)圖如下:
Condition 接口主要定義了下面3個(gè)方法:
- await:進(jìn)入條件等待隊(duì)列
- signal:?jiǎn)拘褩l件等待隊(duì)列中的元素
- signalAll:?jiǎn)拘褩l件等待隊(duì)列中的所有元素
4.3 await
條件等待隊(duì)列跟入口等待隊(duì)列有兩個(gè)不同:
- 雖然二者共用了 Node 類(lèi),但是條件等待隊(duì)列是單向隊(duì)列,入口等待隊(duì)列是雙向隊(duì)列,條件隊(duì)列中下一個(gè)節(jié)點(diǎn)的引用是 nextWaiter,入口等待隊(duì)列中下一個(gè)節(jié)點(diǎn)的引用是 next。
- 條件等待隊(duì)列中元素的 waitStatus 必須是 -2。await 方法的流程如下圖:
4.3.1 進(jìn)入條件等待隊(duì)列
入隊(duì)方法對(duì)應(yīng)方法 addConditionWaiter,這里有三種情況:
- 隊(duì)列為空,則新建一個(gè)節(jié)點(diǎn),如下圖:
- 隊(duì)列非空,最后一個(gè)元素的 waitStatus 是 -2,如下圖:
- 隊(duì)列非空,最后一個(gè)元素的 waitStatus 不是 -2,如下圖:
可以看到,這種情況會(huì)從隊(duì)列第一個(gè)元素開(kāi)始檢查 waitStatus 不是 -2 的元素,并從隊(duì)列中移除。
4.3.2 釋放鎖
AQS 的并發(fā)鎖是基于 state 變量實(shí)現(xiàn)的,線程進(jìn)入條件等待隊(duì)列后,要釋放鎖,即 state 會(huì)變?yōu)?0,釋放操作會(huì)喚醒入口等待隊(duì)列中的線程。對(duì)應(yīng)方法 fullyRelease,返回值是釋放鎖減掉的 state 值 savedState。
4.3.3 阻塞等待
釋放鎖后,線程阻塞,自旋等待被喚醒。
4.3.4 喚醒之后
喚醒之后,當(dāng)前線程主要有四個(gè)動(dòng)作:
- 轉(zhuǎn)入入口等待隊(duì)列,并把 waitStatus 改為 0。waitStatus 等于 0 表示中間狀態(tài),當(dāng)前節(jié)點(diǎn)后面的節(jié)點(diǎn)已經(jīng)喚醒,但是當(dāng)前節(jié)點(diǎn)線程還沒(méi)有執(zhí)行完成。
- 重新獲取鎖,如果獲取成功,則當(dāng)前線程成為入口等待隊(duì)列頭結(jié)點(diǎn),interruptMode 置為 1。
- 如果當(dāng)前節(jié)點(diǎn)在條件等待隊(duì)列中有后繼節(jié)點(diǎn),則剔除條件等待隊(duì)列中 waitStatus!=-2 的節(jié)點(diǎn),即隊(duì)列中狀態(tài)為取消的節(jié)點(diǎn)。
- interruptMode 如果不等于 0,則處理中斷。
4.3.5 一個(gè)細(xì)節(jié)
上面提到了 interruptMode,這個(gè)屬性有三個(gè)值:
- 0:沒(méi)有被中斷
- -1:中斷后拋出 InterruptedException,這種情況是當(dāng)前線程阻塞,沒(méi)有被 signal 之前發(fā)生了中斷
- 1:重新進(jìn)入中斷狀態(tài),這種情況是指當(dāng)前線程阻塞,被 signal 之后發(fā)生了中斷
4.3.6 擴(kuò)展
AQS 還提供了其他幾個(gè) await 方法,如下:
- awaitUninterruptibly:不用處理中斷。
- awaitNanos:自旋等待喚醒過(guò)程中有超時(shí)時(shí)間限制,超時(shí)則轉(zhuǎn)入入口等待隊(duì)列。
- awaitUntil:自旋等待喚醒過(guò)程中有截止時(shí)間,時(shí)間到則轉(zhuǎn)入入口等待隊(duì)列。
4.4 signal
喚醒條件等待隊(duì)列中的元素,首先判斷當(dāng)前線程是否持有獨(dú)占鎖,如果沒(méi)有,拋出異常。
喚醒條件隊(duì)列中的元素,會(huì)從第一個(gè)元素也就是 firstWaiter 開(kāi)始,根據(jù) firstWaiter 的 waitStatus 是不是 -2,分兩種情況。
4.4.1 waitStatus==-2
條件隊(duì)列第一個(gè)節(jié)點(diǎn)進(jìn)入入口等待隊(duì)列,等待獲取鎖,如下圖:
這里有兩個(gè)注意點(diǎn):
- 如果入口等待隊(duì)列中 tail 節(jié)點(diǎn)的 waitStatus 小于等于 0,則 firstWaiter 加入后需要把舊 tail 節(jié)點(diǎn)置為 -1 (表示后面節(jié)點(diǎn)等待當(dāng)前節(jié)點(diǎn)喚醒),如下圖:
如果重置 waitStatus 狀態(tài)失敗,則 unpark 節(jié)點(diǎn) firstWaiter。
- 如果入口等待隊(duì)列中 tail 節(jié)點(diǎn)的 waitStatus 大于 0,則 unpark 節(jié)點(diǎn) firstWaiter。
4.4.2 waitStatus!=-2
如果 firstWaiter 的 waitStatus 不等于 -2,則查找 firstWaiter 的 nextWaiter,直到找到一個(gè) waitStatus 等于 -2 的節(jié)點(diǎn),然后將這個(gè)節(jié)點(diǎn)加入入口等待隊(duì)列隊(duì)尾,如下圖:
4.4.3 waitStatus 修改
上面的兩種情況無(wú)論哪種,進(jìn)入入口等待隊(duì)列之前都要用 CAS 的方式把 waitStatus 改為 0。
4.5 signalAll
理解了 signal 的邏輯,signalAll 的邏輯就非常容易理解了。首先判斷當(dāng)前線程是否持有獨(dú)占鎖,如果沒(méi)有,拋出異常。
將條件等待隊(duì)列中的所有節(jié)點(diǎn)依次加入入口等待隊(duì)列。如下圖:
4.6 使用案例
4.6.1 示例代碼
Java 并發(fā)包下有很多類(lèi)使用到了 AQS 中的 Condition,如下圖:
這里我們以 CyclicBarrier 為例來(lái)講解。CyclicBarrier 是讓一組線程相互等待共同達(dá)到一個(gè)屏障點(diǎn)。從 Cyclic 可以看出 Barrier 可以循環(huán)利用,也就是當(dāng)線程釋放之后可以繼續(xù)使用。
看下面這段示例代碼:
執(zhí)行結(jié)果:
4.6.2 原理講解
CyclicBarrier 初始化的時(shí)候,會(huì)指定線程的數(shù)量 count,每個(gè)線程執(zhí)行完邏輯后,調(diào)用 CyclicBarrier 的 await 方法,這個(gè)方法首先將 count 減 1,然后調(diào)用 Condition的 await,讓當(dāng)前線程進(jìn)入條件等待隊(duì)列。當(dāng)最后一個(gè)線程將 count 減 1 后,count 數(shù)量等于 0,這時(shí)就會(huì)調(diào)用 Condition 的 signalAll 方法喚醒所有線程。
4.7 總結(jié)
Java 的管程模型使用了 MESA 模型,基于 AQS 實(shí)現(xiàn)的 MESA 模型中,使用雙向隊(duì)列實(shí)現(xiàn)了入口等待隊(duì)列,使用變量 state 實(shí)現(xiàn)了并發(fā)鎖,使用 Condition 實(shí)現(xiàn)了條件等待隊(duì)列。
在 AQS 的實(shí)現(xiàn)中,使用同步隊(duì)列這個(gè)術(shù)語(yǔ)來(lái)表示雙向隊(duì)列,本文中使用入口等待隊(duì)列來(lái)描述是為了更好的配合管程模型來(lái)講解。
AQS 的 Condition 中,使用 await 方法將當(dāng)前線程放入條件變量等待隊(duì)列阻塞等待,使用 notify 來(lái)喚醒條件等待隊(duì)列中的線程,被喚醒之后,線程并不能立刻執(zhí)行,而是進(jìn)入入口等待隊(duì)列等待獲取鎖。