1.5w字,30圖帶你徹底掌握 AQS!
前言
AQS( AbstractQueuedSynchronizer )是一個用來構(gòu)建鎖和同步器(所謂同步,是指線程之間的通信、協(xié)作)的框架,Lock 包中的各種鎖(如常見的 ReentrantLock, ReadWriteLock), concurrent 包中的各種同步器(如 CountDownLatch, Semaphore, CyclicBarrier)都是基于 AQS 來構(gòu)建,所以理解 AQS 的實現(xiàn)原理至關(guān)重要,AQS 也是面試中區(qū)分侯選人的常見考點,我們務(wù)必要掌握,本文將用循序漸近地介紹 AQS,相信大家看完一定有收獲。文章目錄如下
- 鎖原理 - 信號量 vs 管程
- AQS 實現(xiàn)原理
- AQS 源碼剖析
- 如何利用 AQS 自定義一個互斥鎖
鎖原理 - 信號量 vs 管程
在并發(fā)編程領(lǐng)域,有兩大核心問題:互斥與同步,互斥即同一時刻只允許一個線程訪問共享資源,同步,即線程之間如何通信、協(xié)作,一般這兩大問題可以通過信號量和管程來解決。
信號量
信號量(Semaphore)是操作系統(tǒng)提供的一種進程間常見的通信方式,主要用來協(xié)調(diào)并發(fā)程序?qū)蚕碣Y源的訪問,操作系統(tǒng)可以保證對信號量操作的原子性。它是怎么實現(xiàn)的呢。
- 信號量由一個共享整型變量 S 和兩個原子操作 PV 組成,S 只能通過 P 和 V 操作來改變
- P 操作:即請求資源,意味著 S 要減 1,如果 S < 0, 則表示沒有資源了,此時線程要進入等待隊列(同步隊列)等待
- V 操作: 即釋放資源,意味著 S 要加 1, 如果 S 小于等于 0,說明等待隊列里有線程,此時就需要喚醒線程。
示意圖如下
信號量機制的引入解決了進程同步和互斥問題,但信號量的大量同步操作分散在各個進程中不便于管理,還有可能導(dǎo)致系統(tǒng)死鎖。如:生產(chǎn)者消費者問題中將P、V顛倒可能死鎖(見文末參考鏈接),另外條件越多,需要的信號量就越多,需要更加謹慎地處理信號量之間的處理順序,否則很容易造成死鎖現(xiàn)象。
基于信號量給編程帶來的隱患,于是有了提出了對開發(fā)者更加友好的并發(fā)編程模型-管程
管程
Dijkstra 于 1971 年提出:把所有進程對某一種臨界資源的同步操作都集中起來,構(gòu)成一個所謂的秘書進程。凡要訪問該臨界資源的進程,都需先報告秘書,由秘書來實現(xiàn)諸進程對同一臨界資源的互斥使用,這種機制就是管程。
管程是一種在信號量機制上進行改進的并發(fā)編程模型,解決了信號量在臨界區(qū)的 PV 操作上配對的麻煩,把配對的 PV 操作集中在一起而形成的并發(fā)編程方法理論,極大降低了使用和理解成本。
- 管程由四部分組成:
- 管程內(nèi)部的共享變量。
- 管程內(nèi)部的條件變量。
- 管程內(nèi)部并行執(zhí)行的進程。
對于局部與管程內(nèi)部的共享數(shù)據(jù)設(shè)置初始值的語句。
由此可見,管程就是一個對象監(jiān)視器。任何線程想要訪問該資源(共享變量),就要排隊進入監(jiān)控范圍。進入之后,接受檢查,不符合條件,則要繼續(xù)等待,直到被通知,然后繼續(xù)進入監(jiān)視器。
需要注意的事,信號量和管程兩者是等價的,信號量可以實現(xiàn)管程,管程也可以實現(xiàn)信號量,只是兩者的表現(xiàn)形式不同而已,管程對開發(fā)者更加友好。
兩者的區(qū)別如下
管程為了解決信號量在臨界區(qū)的 PV 操作上的配對的麻煩,把配對的 PV 操作集中在一起,并且加入了條件變量的概念,使得在多條件下線程間的同步實現(xiàn)變得更加簡單。
怎么理解管程中的入口等待隊列,共享變量,條件變量等概念,有時候技術(shù)上的概念較難理解,我們可以借助生活中的場景來幫助我們理解,就以我們的就醫(yī)場景為例來簡單說明一下,正常的就醫(yī)流程如下:
- 病人去掛號后,去侯診室等待叫號
- 叫到自己時,就可以進入就診室就診了
- 就診時,有兩種情況,一種是醫(yī)生很快就確定病人的病,并作出診斷,診斷完成后,就通知下一位病人進來就診,一種是醫(yī)生無法確定病因,需要病人去做個驗血 / CT 檢查才能確定病情,于是病人就先去驗個血 / CT
- 病人驗完血 / 做完 CT 后,重新取號,等待叫號(進入入口等待隊列)
- 病人等到自己的號,病人又重新拿著驗血 / CT 報告去找醫(yī)生就診
整個流程如下
那么管程是如何解決互斥和同步的呢
首先來看互斥,上文中醫(yī)生即共享資源(也即共享變量),就診室即為臨界區(qū),病人即線程,任何病人如果想要訪問臨界區(qū),必須首先獲取共享資源(即醫(yī)生),入口一次只允許一個線程經(jīng)過,在共享資源被占有的情況下,如果再有線程想占有共享資源,就需要到等待隊列去等候,等到獲取共享資源的線程釋放資源后,等待隊列中的線程就可以去競爭共享資源了,這樣就解決了互斥問題,所以本質(zhì)上管程是通過將共享資源及其對共享資源的操作(線程安全地獲取和釋放)封裝起來來保證互斥性的。
再來看同步,同步是通過文中的條件變量及其等待隊列實現(xiàn)的,同步的實現(xiàn)分兩種情況
- 病人進入就診室后,無需做驗血 / CT 等操作,于是醫(yī)生診斷完成后,就會釋放共享資源(解鎖)去通知(notify,notifyAll)入口等待隊列的下一個病人,下一個病人聽到叫號后就能看醫(yī)生了。
- 如果病人進入就診室后需要做驗血 / CT 等操作,會去驗血 / CT 隊列(條件隊列)排隊, 同時釋放共享變量(醫(yī)生),通知入口等待隊列的其他病人(線程)去獲取共享變量(醫(yī)生),獲得許可的線程執(zhí)行完臨界區(qū)的邏輯后會喚醒條件變量等待隊列中的線程,將它放到入口等待隊列中 ,等到其獲取共享變量(醫(yī)生)時,即可進入入口(臨界區(qū))處理。
在 Java 里,鎖大多是依賴于管程來實現(xiàn)的,以大家熟悉的內(nèi)置鎖 synchronized 為例,它的實現(xiàn)原理如下。
可以看到 synchronized 鎖也是基于管程實現(xiàn)的,只不過它只有且只有一個條件變量(就是鎖對象本身)而已,這也是為什么JDK 要實現(xiàn) Lock 鎖的原因之一,Lock 支持多個條件變量。
通過這樣的類比,相信大家對管程的工作機制有了比較清晰的認識,為啥要花這么大的力氣介紹管程呢,一來管程是解決并發(fā)問題的萬能鑰匙,二來 AQS 是基于 Java 并發(fā)包中管程的一種實現(xiàn),所以理解管程對我們理解 AQS 會大有幫助,接下來我們就來看看 AQS 是如何工作的。
AQS 實現(xiàn)原理
AQS 全稱是 AbstractQueuedSynchronizer,是一個用來構(gòu)建鎖和同步器的框架,它維護了一個共享資源 state 和一個 FIFO 的等待隊列(即上文中管程的入口等待隊列),底層利用了 CAS 機制來保證操作的原子性。
AQS 實現(xiàn)鎖的主要原理如下:
以實現(xiàn)獨占鎖為例(即當前資源只能被一個線程占有),其實現(xiàn)原理如下:state 初始化 0,在多線程條件下,線程要執(zhí)行臨界區(qū)的代碼,必須首先獲取 state,某個線程獲取成功之后, state 加 1,其他線程再獲取的話由于共享資源已被占用,所以會到 FIFO 等待隊列去等待,等占有 state 的線程執(zhí)行完臨界區(qū)的代碼釋放資源( state 減 1)后,會喚醒 FIFO 中的下一個等待線程(head 中的下一個結(jié)點)去獲取 state。
state 由于是多線程共享變量,所以必須定義成 volatile,以保證 state 的可見性, 同時雖然 volatile 能保證可見性,但不能保證原子性,所以 AQS 提供了對 state 的原子操作方法,保證了線程安全。
另外 AQS 中實現(xiàn)的 FIFO 隊列(CLH 隊列)其實是雙向鏈表實現(xiàn)的,由 head, tail 節(jié)點表示,head 結(jié)點代表當前占用的線程,其他節(jié)點由于暫時獲取不到鎖所以依次排隊等待鎖釋放。
所以我們不難明白 AQS 的如下定義:
- public abstract class AbstractQueuedSynchronizer
- extends AbstractOwnableSynchronizer
- implements java.io.Serializable {
- // 以下為雙向鏈表的首尾結(jié)點,代表入口等待隊列
- private transient volatile Node head;
- private transient volatile Node tail;
- // 共享變量 state
- private volatile int state;
- // cas 獲取 / 釋放 state,保證線程安全地獲取鎖
- protected final boolean compareAndSetState(int expect, int update) {
- // See below for intrinsics setup to support this
- return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
- }
- // ...
- }
AQS 源碼
剖析ReentrantLock 是我們比較常用的一種鎖,也是基于 AQS 實現(xiàn)的,所以接下來我們就來分析一下 ReentrantLock 鎖的實現(xiàn)來一探 AQS 究竟。本文將會采用圖文并茂的方式讓大家理解 AQS 的實現(xiàn)原理,大家在學(xué)習(xí)過程中,可以多類比一下上文中就診的例子,相信會有助于理解。
首先我們要知道 ReentrantLock 是獨占鎖,也有公平和非公平兩種鎖模式,什么是獨占與有共享模式,什么又是公平鎖與非公平鎖
與獨占鎖對應(yīng)的是共享鎖,這兩者有什么區(qū)別呢
獨占鎖:即其他線程只有在占有鎖的線程釋放后才能競爭鎖,有且只有一個線程能競爭成功(醫(yī)生只有一個,一次只能看一個病人)
共享鎖:即共享資源可以被多個線程同時占有,直到共享資源被占用完畢(多個醫(yī)生,可以看多個病人),常見的有讀寫鎖 ReadWriteLock, CountdownLatch,兩者的區(qū)別如下
什么是公平鎖與非公平鎖
還是以就醫(yī)為例,所謂公平鎖即大家取號后老老實實按照先來后到的順序在侯診室依次等待叫號,如果是非公平鎖呢,新來的病人(線程)很霸道,不取號排隊 ,直接去搶先看病,占有醫(yī)生(不一定成功)
公平鎖與非公平鎖
本文我們將會重點分析獨占,非公平模式的源碼實現(xiàn),不分析共享模式與 Condition 的實現(xiàn),因為剖析了獨占鎖的實現(xiàn),由于原理都是相似的,再分析共享與 Condition 就不難了。
首先我們先來看下 ReentrantLock 的使用方法
- // 1. 初始化可重入鎖
- private ReentrantLock lock = new ReentrantLock();
- public void run() {
- // 加鎖
- lock.lock();
- try {
- // 2. 執(zhí)行臨界區(qū)代碼
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- // 3. 解鎖
- lock.unlock();
- }
- }
第一步是初始化可重入鎖,可以看到默認使用的是非公平鎖機制
- public ReentrantLock() {
- sync = new NonfairSync();
- }
當然你也可以用如下構(gòu)造方法來指定使用公平鎖:
- public ReentrantLock(boolean fair) {
- sync = fair ? new FairSync() : new NonfairSync();
- }
畫外音: FairSync 和 NonfairSync 是 ReentrantLock 實現(xiàn)的內(nèi)部類,分別指公平和非公平模式,ReentrantLock ReentrantLock 的加鎖(lock),解鎖(unlock)在內(nèi)部具體都是調(diào)用的 FairSync,NonfairSync 的加鎖和解鎖方法。
幾個類的關(guān)系如下:
我們先來剖析下非公平鎖(NonfairSync)的實現(xiàn)方式,來看上述示例代碼的第二步:加鎖,由于默認的是非公平鎖的加鎖,所以我們來分析下非公平鎖是如何加鎖的
可以看到 lock 方法主要有兩步
- 使用 CAS 來獲取 state 資源,如果成功設(shè)置 1,代表 state 資源獲取鎖成功,此時記錄下當前占用 state 的線程 setExclusiveOwnerThread(Thread.currentThread());
- 如果 CAS 設(shè)置 state 為 1 失敗(代表獲取鎖失敗),則執(zhí)行 acquire(1) 方法,這個方法是 AQS 提供的方法,如下
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
tryAcquire 剖析
首先 調(diào)用 tryAcquire 嘗試著獲取 state,如果成功,則跳過后面的步驟。如果失敗,則執(zhí)行 acquireQueued 將線程加入 CLH 等待隊列中。
先來看下 tryAcquire 方法,這個方法是 AQS 提供的一個模板方法,最終由其 AQS 具體的實現(xiàn)類(Sync)實現(xiàn),由于執(zhí)行的是非公平鎖邏輯,執(zhí)行的代碼如下:
- final boolean nonfairTryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
- int c = getState();
- if (c == 0) {
- // 如果 c 等于0,表示此時資源是空閑的(即鎖是釋放的),再用 CAS 獲取鎖
- if (compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- else if (current == getExclusiveOwnerThread()) {
- // 此條件表示之前已有線程獲得鎖,且此線程再一次獲得了鎖,獲取資源次數(shù)再加 1,這也映證了 ReentrantLock 為可重入鎖
- int nextc = c + acquires;
- if (nextc < 0) // overflow
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
此段代碼可知鎖的獲取主要分兩種情況
- state 為 0 時,代表鎖已經(jīng)被釋放,可以去獲取,于是使用 CAS 去重新獲取鎖資源,如果獲取成功,則代表競爭鎖成功,使用 setExclusiveOwnerThread(current) 記錄下此時占有鎖的線程,看到這里的 CAS,大家應(yīng)該不難理解為啥當前實現(xiàn)是非公平鎖了,因為隊列中的線程與新線程都可以 CAS 獲取鎖啊,新來的線程不需要排隊
- 如果 state 不為 0,代表之前已有線程占有了鎖,如果此時的線程依然是之前占有鎖的線程(current == getExclusiveOwnerThread() 為 true),代表此線程再一次占有了鎖(可重入鎖),此時更新 state,記錄下鎖被占有的次數(shù)(鎖的重入次數(shù)),這里的 setState 方法不需要使用 CAS 更新,因為此時的鎖就是當前線程占有的,其他線程沒有機會進入這段代碼執(zhí)行。所以此時更新 state 是線程安全的。
假設(shè)當前 state = 0,即鎖不被占用,現(xiàn)在有 T1, T2, T3 這三個線程要去競爭鎖
假設(shè)現(xiàn)在 T1 獲取鎖成功,則兩種情況分別為 1、 T1 首次獲取鎖成功
2、 T1 再次獲取鎖成功,state 再加 1,表示鎖被重入了兩次,當前如果 T1一直申請占用鎖成功,state 會一直累加
acquireQueued 剖析
如果 tryAcquire(arg) 執(zhí)行失敗,代表獲取鎖失敗,則執(zhí)行 acquireQueued 方法,將線程加入 FIFO 等待隊列
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
所以接下來我們來看看 acquireQueued 的執(zhí)行邏輯,首先會調(diào)用 addWaiter(Node.EXCLUSIVE) 將包含有當前線程的 Node 節(jié)點入隊, Node.EXCLUSIVE 代表此結(jié)點為獨占模式
再來看下 addWaiter 是如何實現(xiàn)的
- private Node addWaiter(Node mode) {
- Node node = new Node(Thread.currentThread(), mode);
- Node pred = tail;
- // 如果尾結(jié)點不為空,則用 CAS 將獲取鎖失敗的線程入隊
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- // 如果結(jié)點為空,執(zhí)行 enq 方法
- enq(node);
- return node;
- }
這段邏輯比較清楚,首先是獲取 FIFO 隊列的尾結(jié)點,如果尾結(jié)點存在,則采用 CAS 的方式將等待線程入隊,如果尾結(jié)點為空則執(zhí)行 enq 方法
- private Node enq(final Node node) {
- for (;;) {
- Node t = tail;
- if (t == null) {
- // 尾結(jié)點為空,說明 FIFO 隊列未初始化,所以先初始化其頭結(jié)點
- if (compareAndSetHead(new Node()))
- tail = head;
- } else {
- // 尾結(jié)點不為空,則將等待線程入隊
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
首先判斷 tail 是否為空,如果為空說明 FIFO 隊列的 head,tail 還未構(gòu)建,此時先構(gòu)建頭結(jié)點,構(gòu)建之后再用 CAS 的方式將此線程結(jié)點入隊
使用 CAS 創(chuàng)建 head 節(jié)點的時候只是簡單調(diào)用了 new Node() 方法,并不像其他節(jié)點那樣記錄 thread,這是為啥
因為 head 結(jié)點為虛結(jié)點,它只代表當前有線程占用了 state,至于占用 state 的是哪個線程,其實是調(diào)用了上文的 setExclusiveOwnerThread(current) ,即記錄在 exclusiveOwnerThread 屬性里。
執(zhí)行完 addWaiter 后,線程入隊成功,現(xiàn)在就要看最后一個最關(guān)鍵的方法 acquireQueued 了,這個方法有點難以理解,先不急,我們先用三個線程來模擬一下之前的代碼對應(yīng)的步驟
1、假設(shè) T1 獲取鎖成功,由于此時 FIFO 未初始化,所以先創(chuàng)建 head 結(jié)點
2、此時 T2 或 T3 再去競爭 state 失敗,入隊,如下圖:
好了,現(xiàn)在問題來了, T2,T3 入隊后怎么處理,是馬上阻塞嗎,馬上阻塞意味著線程從運行態(tài)轉(zhuǎn)為阻塞態(tài) ,涉及到用戶態(tài)向內(nèi)核態(tài)的切換,而且喚醒后也要從內(nèi)核態(tài)轉(zhuǎn)為用戶態(tài),開銷相對比較大,所以 AQS 對這種入隊的線程采用的方式是讓它們自旋來競爭鎖,如下圖示
不過聰明的你可能發(fā)現(xiàn)了一個問題,如果當前鎖是獨占鎖,如果鎖一直被被 T1 占有, T2,T3 一直自旋沒太大意義,反而會占用 CPU,影響性能,所以更合適的方式是它們自旋一兩次競爭不到鎖后識趣地阻塞以等待前置節(jié)點釋放鎖后再來喚醒它。
另外如果鎖在自旋過程中被中斷了,或者自旋超時了,應(yīng)該處于「取消」狀態(tài)。
基于每個 Node 可能所處的狀態(tài),AQS 為其定義了一個變量 waitStatus,根據(jù)這個變量值對相應(yīng)節(jié)點進行相關(guān)的操作,我們一起來看這看這個變量都有哪些值,是時候看一個 Node 結(jié)點的屬性定義了
- static final class Node {
- static final Node SHARED = new Node();//標識等待節(jié)點處于共享模式
- static final Node EXCLUSIVE = null;//標識等待節(jié)點處于獨占模式
- static final int CANCELLED = 1; //由于超時或中斷,節(jié)點已被取消
- static final int SIGNAL = -1; // 節(jié)點阻塞(park)必須在其前驅(qū)結(jié)點為 SIGNAL 的狀態(tài)下才能進行,如果結(jié)點為 SIGNAL,則其釋放鎖或取消后,可以通過 unpark 喚醒下一個節(jié)點,
- static final int CONDITION = -2;//表示線程在等待條件變量(先獲取鎖,加入到條件等待隊列,然后釋放鎖,等待條件變量滿足條件;只有重新獲取鎖之后才能返回)
- static final int PROPAGATE = -3;//表示后續(xù)結(jié)點會傳播喚醒的操作,共享模式下起作用
- //等待狀態(tài):對于condition節(jié)點,初始化為CONDITION;其它情況,默認為0,通過CAS操作原子更新
- volatile int waitStatus;
通過狀態(tài)的定義,我們可以猜測一下 AQS 對線程自旋的處理:如果當前節(jié)點的上一個節(jié)點不為 head,且它的狀態(tài)為 SIGNAL,則結(jié)點進入阻塞狀態(tài)。來看下代碼以映證我們的猜測:
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- // 如果前一個節(jié)點是 head,則嘗試自旋獲取鎖
- if (p == head && tryAcquire(arg)) {
- // 將 head 結(jié)點指向當前節(jié)點,原 head 結(jié)點出隊
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- // 如果前一個節(jié)點不是 head 或者競爭鎖失敗,則進入阻塞狀態(tài)
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- // 如果線程自旋中因為異常等原因獲取鎖最終失敗,則調(diào)用此方法
- cancelAcquire(node);
- }
- }
先來看第一種情況,如果當前結(jié)點的前一個節(jié)點是 head 結(jié)點,且獲取鎖(tryAcquire)成功的處理
可以看到主要的處理就是把 head 指向當前節(jié)點,并且讓原 head 結(jié)點出隊,這樣由于原 head 不可達, 會被垃圾回收。
注意其中 setHead 的處理
- private void setHead(Node node) {
- head = node;
- node.thread = null;
- node.prev = null;
- }
將 head 設(shè)置成當前結(jié)點后,要把節(jié)點的 thread, pre 設(shè)置成 null,因為之前分析過了,head 是虛節(jié)點,不保存除 waitStatus(結(jié)點狀態(tài))的其他信息,所以這里把 thread ,pre 置為空,因為占有鎖的線程由 exclusiveThread 記錄了,如果 head 再記錄 thread 不僅多此一舉,反而在釋放鎖的時候要多操作一遍 head 的 thread 釋放。
如果前一個節(jié)點不是 head 或者競爭鎖失敗,則首先調(diào)用 shouldParkAfterFailedAcquire 方法判斷鎖是否應(yīng)該停止自旋進入阻塞狀態(tài):
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int ws = pred.waitStatus;
- if (ws == Node.SIGNAL)
- // 1. 如果前置頂點的狀態(tài)為 SIGNAL,表示當前節(jié)點可以阻塞了
- return true;
- if (ws > 0) {
- // 2. 移除取消狀態(tài)的結(jié)點
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus > 0);
- pred.next = node;
- } else {
- // 3. 如果前置節(jié)點的 ws 不為 0,則其設(shè)置為 SIGNAL,
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false;
- }
這一段代碼有點繞,需要稍微動點腦子,按以上步驟一步步來看
1、 首先我們要明白,根據(jù)之前 Node 類的注釋,如果前驅(qū)節(jié)點為 SIGNAL,則當前節(jié)點可以進入阻塞狀態(tài)。
如圖示:T2,T3 的前驅(qū)節(jié)點的 waitStatus 都為 SIGNAL,所以 T2,T3 此時都可以阻塞。
2、如果前驅(qū)節(jié)點為取消狀態(tài),則前驅(qū)節(jié)點需要移除,這些采用了一個更巧妙的方法,把所有當前節(jié)點之前所有 waitStatus 為取消狀態(tài)的節(jié)點全部移除,假設(shè)有四個線程如下,T2,T3 為取消狀態(tài),則執(zhí)行邏輯后如下圖所示,T2, T3 節(jié)點會被 GC。
3、如果前驅(qū)節(jié)點小于等于 0,則需要首先將其前驅(qū)節(jié)點置為 SIGNAL,因為前文我們分析過,當前節(jié)點進入阻塞的一個條件是前驅(qū)節(jié)點必須為 SIGNAL,這樣下一次自旋后發(fā)現(xiàn)前驅(qū)節(jié)點為 SIGNAL,就會返回 true(即步驟 1)
shouldParkAfterFailedAcquire 返回 true 代表線程可以進入阻塞中斷,那么下一步 parkAndCheckInterrupt 就該讓線程阻塞了
- private final boolean parkAndCheckInterrupt() {
- // 阻塞線程
- LockSupport.park(this);
- // 返回線程是否中斷過,并且清除中斷狀態(tài)(在獲得鎖后會補一次中斷)
- return Thread.interrupted();
- }
這里的阻塞線程很容易理解,但為啥要判斷線程是否中斷過呢,因為如果線程在阻塞期間收到了中斷,喚醒(轉(zhuǎn)為運行態(tài))獲取鎖后(acquireQueued 為 true)需要補一個中斷,如下所示:
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- // 如果是因為中斷喚醒的線程,獲取鎖后需要補一下中斷
- selfInterrupt();
- }
至此,獲取鎖的流程已經(jīng)分析完畢,不過還有一個疑惑我們還沒解開:前文不是說 Node 狀態(tài)為取消狀態(tài)會被取消嗎,那 Node 什么時候會被設(shè)置為取消狀態(tài)呢。
回頭看 acquireQueued
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- // 省略自旋獲取鎖代碼
- } finally {
- if (failed)
- // 如果線程自旋中因為異常等原因獲取鎖最終失敗,則調(diào)用此方法
- cancelAcquire(node);
- }
- }
看最后一個 cancelAcquire 方法,如果線程自旋中因為異常等原因獲取鎖最終失敗,則會調(diào)用此方法
- private void cancelAcquire(Node node) {
- // 如果節(jié)點為空,直接返回
- if (node == null)
- return;
- // 由于線程要被取消了,所以將 thread 線程清掉
- node.thread = null;
- // 下面這步表示將 node 的 pre 指向之前第一個非取消狀態(tài)的結(jié)點(即跳過所有取消狀態(tài)的結(jié)點),waitStatus > 0 表示當前結(jié)點狀態(tài)為取消狀態(tài)
- Node pred = node.prev;
- while (pred.waitStatus > 0)
- node.prev = pred = pred.prev;
- // 獲取經(jīng)過過濾后的 pre 的 next 結(jié)點,這一步主要用在后面的 CAS 設(shè)置 pre 的 next 節(jié)點上
- Node predNext = pred.next;
- // 將當前結(jié)點設(shè)置為取消狀態(tài)
- node.waitStatus = Node.CANCELLED;
- // 如果當前取消結(jié)點為尾結(jié)點,使用 CAS 則將尾結(jié)點設(shè)置為其前驅(qū)節(jié)點,如果設(shè)置成功,則尾結(jié)點的 next 指針設(shè)置為空
- if (node == tail && compareAndSetTail(node, pred)) {
- compareAndSetNext(pred, predNext, null);
- } else {
- // 這一步看得有點繞,我們想想,如果當前節(jié)點取消了,那是不是要把當前節(jié)點的前驅(qū)節(jié)點指向當前節(jié)點的后繼節(jié)點,但是我們之前也說了,要喚醒或阻塞結(jié)點,須在其前驅(qū)節(jié)點的狀態(tài)為 SIGNAL 的條件才能操作,所以在設(shè)置 pre 的 next 節(jié)點時要保證 pre 結(jié)點的狀態(tài)為 SIGNAL,想通了這一點相信你不難理解以下代碼。
- int ws;
- if (pred != head &&
- ((ws = pred.waitStatus) == Node.SIGNAL ||
- (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
- pred.thread != null) {
- Node next = node.next;
- if (next != null && next.waitStatus <= 0)
- compareAndSetNext(pred, predNext, next);
- } else {
- // 如果 pre 為 head,或者 pre 的狀態(tài)設(shè)置 SIGNAL 失敗,則直接喚醒后繼結(jié)點去競爭鎖,之前我們說過, SIGNAL 的結(jié)點取消(或釋放鎖)后可以喚醒后繼結(jié)點
- unparkSuccessor(node);
- }
- node.next = node; // help GC
- }
- }
這一段代碼有點繞,我們一個個來看,首先考慮以下情況
1、首先第一步當前節(jié)點之前有取消結(jié)點時,則邏輯如下
2、如果當前結(jié)點既非頭結(jié)點的后繼結(jié)點,也非尾結(jié)點,即步驟 1 所示,則最終結(jié)果如下
這里肯定有人問,這種情況下當前節(jié)點與它的前驅(qū)結(jié)點無法被 GC 啊,還記得我們上文分析鎖自旋時的處理嗎,再看下以下代碼
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int ws = pred.waitStatus;
- // 省略無關(guān)代碼
- if (ws > 0) {
- // 移除取消狀態(tài)的結(jié)點
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus > 0);
- pred.next = node;
- }
- return false;
- }
這段代碼會將 node 的 pre 指向之前 waitStatus 為非 CANCEL 的節(jié)點
所以當 T4 執(zhí)行這段代碼時,會變成如下情況
可以看到此時中間的兩個 CANCEL 節(jié)點不可達了,會被 GC
3、如果當前結(jié)點為 tail 結(jié)點,則結(jié)果如下,這種情況下當前結(jié)點不可達,會被 GC
4、如果當前結(jié)點為 head 的后繼結(jié)點時,如下
結(jié)果中的 CANCEL 結(jié)點同樣會在 tail 結(jié)點自旋調(diào)用 shouldParkAfterFailedAcquire 后不可達,如下
自此我們終于分析完了鎖的獲取流程,接下來我們來看看鎖是如何釋放的。
鎖釋放
不管是公平鎖還是非公平鎖,最終都是調(diào)的 AQS 的如下模板方法來釋放鎖
- // java.util.concurrent.locks.AbstractQueuedSynchronizer
- public final boolean release(int arg) {
- // 鎖釋放是否成功
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
tryRelease 方法定義在了 AQS 的子類 Sync 方法里
- // java.util.concurrent.locks.ReentrantLock.Sync
- protected final boolean tryRelease(int releases) {
- int c = getState() - releases;
- // 只有持有鎖的線程才能釋放鎖,所以如果當前鎖不是持有鎖的線程,則拋異常
- if (Thread.currentThread() != getExclusiveOwnerThread())
- throw new IllegalMonitorStateException();
- boolean free = false;
- // 說明線程持有的鎖全部釋放了,需要釋放 exclusiveOwnerThread 的持有線程
- if (c == 0) {
- free = true;
- setExclusiveOwnerThread(null);
- }
- setState(c);
- return free;
- }
鎖釋放成功后該干嘛,顯然是喚醒之后 head 之后節(jié)點,讓它來競爭鎖
- // java.util.concurrent.locks.AbstractQueuedSynchronizer
- public final boolean release(int arg) {
- // 鎖釋放是否成功
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- // 鎖釋放成功后,喚醒 head 之后的節(jié)點,讓它來競爭鎖
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
這里釋放鎖的條件為啥是 h != null && h.waitStatus != 0 呢。
如果 h == null, 這有兩種可能,一種是一個線程在競爭鎖,現(xiàn)在它釋放了,當然沒有所謂的喚醒后繼節(jié)點,一種是其他線程正在運行競爭鎖,只是還未初始化頭節(jié)點,既然其他線程正在運行,也就無需執(zhí)行喚醒操作
如果 h != null 且 h.waitStatus == 0,說明 head 的后繼節(jié)點正在自旋競爭鎖,也就是說線程是運行狀態(tài)的,無需喚醒。
如果 h != null 且 h.waitStatus < 0, 此時 waitStatus 值可能為 SIGNAL,或 PROPAGATE,這兩種情況說明后繼結(jié)點阻塞需要被喚醒
來看一下喚醒方法 unparkSuccessor:
- private void unparkSuccessor(Node node) {
- // 獲取 head 的 waitStatus(假設(shè)其為 SIGNAL),并用 CAS 將其置為 0,為啥要做這一步呢,之前我們分析過多次,其實 waitStatus = SIGNAL(< -1)或 PROPAGATE(-·3) 只是一個標志,代表在此狀態(tài)下,后繼節(jié)點可以喚醒,既然正在喚醒后繼節(jié)點,自然可以將其重置為 0,當然如果失敗了也不影響其喚醒后繼結(jié)點
- int ws = node.waitStatus;
- if (ws < 0)
- compareAndSetWaitStatus(node, ws, 0);
- // 以下操作為獲取隊列第一個非取消狀態(tài)的結(jié)點,并將其喚醒
- Node s = node.next;
- // s 狀態(tài)為非空,或者其為取消狀態(tài),說明 s 是無效節(jié)點,此時需要執(zhí)行 if 里的邏輯
- if (s == null || s.waitStatus > 0) {
- s = null;
- // 以下操作為從尾向前獲取最后一個非取消狀態(tài)的結(jié)點
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- if (s != null)
- LockSupport.unpark(s.thread);
- }
這里的尋找隊列的第一個非取消狀態(tài)的節(jié)點為啥要從后往前找呢,因為節(jié)點入隊并不是原子操作,如下
線程自旋時時是先執(zhí)行 node.pre = pred, 然后再執(zhí)行 pred.next = node,如果 unparkSuccessor 剛好在這兩者之間執(zhí)行,此時是找不到 head 的后繼節(jié)點的,如下
如何利用 AQS 自定義一個互斥鎖
AQS 通過提供 state 及 FIFO 隊列的管理,為我們提供了一套通用的實現(xiàn)鎖的底層方法,基本上定義一個鎖,都是轉(zhuǎn)為在其內(nèi)部定義 AQS 的子類,調(diào)用 AQS 的底層方法來實現(xiàn)的,由于 AQS 在底層已經(jīng)為了定義好了這些獲取 state 及 FIFO 隊列的管理工作,我們要實現(xiàn)一個鎖就比較簡單了,我們可以基于 AQS 來實現(xiàn)一個非可重入的互斥鎖,如下所示
- public class Mutex {
- private Sync sync = new Sync();
- public void lock () {
- sync.acquire(1);
- }
- public void unlock () {
- sync.release(1);
- }
- private static class Sync extends AbstractQueuedSynchronizer {
- @Override
- protected boolean tryAcquire (int arg) {
- return compareAndSetState(0, 1);
- }
- @Override
- protected boolean tryRelease (int arg) {
- setState(0);
- return true;
- }
- @Override
- protected boolean isHeldExclusively () {
- return getState() == 1;
- }
- }
- }
可以看到區(qū)區(qū)幾行代碼就實現(xiàn)了,確實很方便。
總結(jié)本文通過圖文并茂的方式幫助大家梳理了一遍 AQS 的實現(xiàn)方式,相信大家看完對 AQS 應(yīng)該有了比較深入的認識,首先要明白鎖的實現(xiàn)原理,信號量及管程,理解了管程的設(shè)計思路對 AQS 有了一個概念上的認識,再去讀源碼就會用管程的概念去套,也就更容易理解了,另外大家可以多類比一下生活中的場景,如就醫(yī)場景,通過類似的方式學(xué)習(xí)能讓我們更好地理解相關(guān)技術(shù)的設(shè)計思路。
本文轉(zhuǎn)載自微信公眾號「碼海」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系碼海公眾號。