我畫(huà)了35張圖就是為了讓你深入 AQS
前言
談到并發(fā),我們不得不說(shuō)AQS(AbstractQueuedSynchronizer),所謂的AQS即是抽象的隊(duì)列式的同步器,內(nèi)部定義了很多鎖相關(guān)的方法,我們熟知的ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore等都是基于AQS來(lái)實(shí)現(xiàn)的。
我們先看下AQS相關(guān)的UML圖:
1.AQS實(shí)現(xiàn)原理
AQS中 維護(hù)了一個(gè)volatile int state(代表共享資源)和一個(gè)FIFO線程等待隊(duì)列(多線程爭(zhēng)用資源被阻塞時(shí)會(huì)進(jìn)入此隊(duì)列)。
這里volatile能夠保證多線程下的可見(jiàn)性,當(dāng)state=1則代表當(dāng)前對(duì)象鎖已經(jīng)被占有,其他線程來(lái)加鎖時(shí)則會(huì)失敗,加鎖失敗的線程會(huì)被放入一個(gè)FIFO的等待隊(duì)列中,比列會(huì)被UNSAFE.park()操作掛起,等待其他獲取鎖的線程釋放鎖才能夠被喚醒。
另外state的操作都是通過(guò)CAS來(lái)保證其并發(fā)修改的安全性。
具體原理我們可以用一張圖來(lái)簡(jiǎn)單概括:
AQS 中提供了很多關(guān)于鎖的實(shí)現(xiàn)方法,
- getState():獲取鎖的標(biāo)志state值
- setState():設(shè)置鎖的標(biāo)志state值
- tryAcquire(int):獨(dú)占方式獲取鎖。嘗試獲取資源,成功則返回true,失敗則返回false。
- tryRelease(int):獨(dú)占方式釋放鎖。嘗試釋放資源,成功則返回true,失敗則返回false。
這里還有一些方法并沒(méi)有列出來(lái),接下來(lái)我們以ReentrantLock作為突破點(diǎn)通過(guò)源碼和畫(huà)圖的形式一步步了解AQS內(nèi)部實(shí)現(xiàn)原理。
2.目錄結(jié)構(gòu)
文章準(zhǔn)備模擬多線程競(jìng)爭(zhēng)鎖、釋放鎖的場(chǎng)景來(lái)進(jìn)行分析AQS源碼:
三個(gè)線程(線程一、線程二、線程三)同時(shí)來(lái)加鎖/釋放鎖
目錄如下:
- 線程一加鎖成功時(shí)AQS內(nèi)部實(shí)現(xiàn)
- 線程二/三加鎖失敗時(shí)AQS中等待隊(duì)列的數(shù)據(jù)模型
- 線程一釋放鎖及線程二獲取鎖實(shí)現(xiàn)原理
- 通過(guò)線程場(chǎng)景來(lái)講解公平鎖具體實(shí)現(xiàn)原理
- 通過(guò)線程場(chǎng)景來(lái)講解Condition中await()和signal()實(shí)現(xiàn)原理
這里會(huì)通過(guò)畫(huà)圖來(lái)分析每個(gè)線程加鎖、釋放鎖后AQS內(nèi)部的數(shù)據(jù)結(jié)構(gòu)和實(shí)現(xiàn)原理
3.場(chǎng)景分析
線程一加鎖成功
如果同時(shí)有三個(gè)線程并發(fā)搶占鎖,此時(shí)線程一搶占鎖成功,線程二和線程三搶占鎖失敗,具體執(zhí)行流程如下:
此時(shí)AQS內(nèi)部數(shù)據(jù)為:

線程二、線程三加鎖失?。?/p>
有圖可以看出,等待隊(duì)列中的節(jié)點(diǎn)Node是一個(gè)雙向鏈表,這里SIGNAL是Node中waitStatus屬性,Node中還有一個(gè)nextWaiter屬性,這個(gè)并未在圖中畫(huà)出來(lái),這個(gè)到后面Condition會(huì)具體講解的。
具體看下?lián)屨兼i代碼實(shí)現(xiàn):
java.util.concurrent.locks.ReentrantLock .NonfairSync:
- static final class NonfairSync extends Sync {
- final void lock() {
- if (compareAndSetState(0, 1))
- setExclusiveOwnerThread(Thread.currentThread());
- else
- acquire(1);
- }
- protected final boolean tryAcquire(int acquires) {
- return nonfairTryAcquire(acquires);
- }
- }
這里使用的ReentrantLock非公平鎖,線程進(jìn)來(lái)直接利用CAS嘗試搶占鎖,如果搶占成功state值回被改為1,且設(shè)置對(duì)象獨(dú)占鎖線程為當(dāng)前線程。如下所示:
- protected final boolean compareAndSetState(int expect, int update) {
- return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
- }
- protected final void setExclusiveOwnerThread(Thread thread) {
- exclusiveOwnerThread = thread;
- }
線程二搶占鎖失敗
我們按照真實(shí)場(chǎng)景來(lái)分析,線程一搶占鎖成功后,state變?yōu)?,線程二通過(guò)CAS修改state變量必然會(huì)失敗。此時(shí)AQS中FIFO(First In First Out 先進(jìn)先出)隊(duì)列中數(shù)據(jù)如圖所示:
我們將線程二執(zhí)行的邏輯一步步拆解來(lái)看:
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire():
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
先看看tryAcquire()的具體實(shí)現(xiàn):java.util.concurrent.locks.ReentrantLock .nonfairTryAcquire():
- final boolean nonfairTryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
- int c = getState();
- if (c == 0) {
- if (compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- if (nextc < 0)
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
nonfairTryAcquire()方法中首先會(huì)獲取state的值,如果不為0則說(shuō)明當(dāng)前對(duì)象的鎖已經(jīng)被其他線程所占有,接著判斷占有鎖的線程是否為當(dāng)前線程,如果是則累加state值,這就是可重入鎖的具體實(shí)現(xiàn),累加state值,釋放鎖的時(shí)候也要依次遞減state值。
如果state為0,則執(zhí)行CAS操作,嘗試更新state值為1,如果更新成功則代表當(dāng)前線程加鎖成功。
以線程二為例,因?yàn)榫€程一已經(jīng)將state修改為1,所以線程二通過(guò)CAS修改state的值不會(huì)成功。加鎖失敗。
線程二執(zhí)行tryAcquire()后會(huì)返回false,接著執(zhí)行addWaiter(Node.EXCLUSIVE)邏輯,將自己加入到一個(gè)FIFO等待隊(duì)列中,代碼實(shí)現(xiàn)如下:
java.util.concurrent.locks.AbstractQueuedSynchronizer.addWaiter():
- private Node addWaiter(Node mode) {
- Node node = new Node(Thread.currentThread(), mode);
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- enq(node);
- return node;
- }
這段代碼首先會(huì)創(chuàng)建一個(gè)和當(dāng)前線程綁定的Node節(jié)點(diǎn),Node為雙向鏈表。此時(shí)等待對(duì)內(nèi)中的tail指針為空,直接調(diào)用enq(node)方法將當(dāng)前線程加入等待隊(duì)列尾部:
- private Node enq(final Node node) {
- for (;;) {
- Node t = tail;
- if (t == null) {
- if (compareAndSetHead(new Node()))
- tail = head;
- } else {
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
第一遍循環(huán)時(shí)tail指針為空,進(jìn)入if邏輯,使用CAS操作設(shè)置head指針,將head指向一個(gè)新創(chuàng)建的Node節(jié)點(diǎn)。此時(shí)AQS中數(shù)據(jù):
執(zhí)行完成之后,head、tail、t都指向第一個(gè)Node元素。
接著執(zhí)行第二遍循環(huán),進(jìn)入else邏輯,此時(shí)已經(jīng)有了head節(jié)點(diǎn),這里要操作的就是將線程二對(duì)應(yīng)的Node節(jié)點(diǎn)掛到head節(jié)點(diǎn)后面。此時(shí)隊(duì)列中就有了兩個(gè)Node節(jié)點(diǎn):
addWaiter()方法執(zhí)行完后,會(huì)返回當(dāng)前線程創(chuàng)建的節(jié)點(diǎn)信息。繼續(xù)往后執(zhí)行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)邏輯,此時(shí)傳入的參數(shù)為線程二對(duì)應(yīng)的Node節(jié)點(diǎn)信息:
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued():
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndChecknIterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int ws = pred.waitStatus;
- if (ws == Node.SIGNAL)
- return true;
- if (ws > 0) {
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus > 0);
- pred.next = node;
- } else {
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false;
- }
- private final boolean parkAndCheckInterrupt() {
- LockSupport.park(this);
- return Thread.interrupted();
- }
acquireQueued()這個(gè)方法會(huì)先判斷當(dāng)前傳入的Node對(duì)應(yīng)的前置節(jié)點(diǎn)是否為head,如果是則嘗試加鎖。加鎖成功過(guò)則將當(dāng)前節(jié)點(diǎn)設(shè)置為head節(jié)點(diǎn),然后空置之前的head節(jié)點(diǎn),方便后續(xù)被垃圾回收掉。
如果加鎖失敗或者Node的前置節(jié)點(diǎn)不是head節(jié)點(diǎn),就會(huì)通過(guò)shouldParkAfterFailedAcquire方法 將head節(jié)點(diǎn)的waitStatus變?yōu)榱薙IGNAL=-1,最后執(zhí)行parkAndChecknIterrupt方法,調(diào)用LockSupport.park()掛起當(dāng)前線程。
此時(shí)AQS中的數(shù)據(jù)如下圖:
此時(shí)線程二就靜靜的待在AQS的等待隊(duì)列里面了,等著其他線程釋放鎖來(lái)喚醒它。
線程三搶占鎖失敗
看完了線程二搶占鎖失敗的分析,那么再來(lái)分析線程三搶占鎖失敗就很簡(jiǎn)單了,先看看addWaiter(Node mode)方法:
- private Node addWaiter(Node mode) {
- Node node = new Node(Thread.currentThread(), mode);
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- enq(node);
- return node;
- }
此時(shí)等待隊(duì)列的tail節(jié)點(diǎn)指向線程二,進(jìn)入if邏輯后,通過(guò)CAS指令將tail節(jié)點(diǎn)重新指向線程三。接著線程三調(diào)用enq()方法執(zhí)行入隊(duì)操作,和上面線程二執(zhí)行方式是一致的,入隊(duì)后會(huì)修改線程二對(duì)應(yīng)的Node中的waitStatus=SIGNAL。最后線程三也會(huì)被掛起。此時(shí)等待隊(duì)列的數(shù)據(jù)如圖:
線程一釋放鎖
現(xiàn)在來(lái)分析下釋放鎖的過(guò)程,首先是線程一釋放鎖,釋放鎖后會(huì)喚醒head節(jié)點(diǎn)的后置節(jié)點(diǎn),也就是我們現(xiàn)在的線程二,具體操作流程如下:
執(zhí)行完后等待隊(duì)列數(shù)據(jù)如下:
此時(shí)線程二已經(jīng)被喚醒,繼續(xù)嘗試獲取鎖,如果獲取鎖失敗,則會(huì)繼續(xù)被掛起。如果獲取鎖成功,則AQS中數(shù)據(jù)如圖:
接著還是一步步拆解來(lái)看,先看看線程一釋放鎖的代碼:
java.util.concurrent.locks.AbstractQueuedSynchronizer.release()
- public final boolean release(int arg) {
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
這里首先會(huì)執(zhí)行tryRelease()方法,這個(gè)方法具體實(shí)現(xiàn)在ReentrantLock中,如果tryRelease執(zhí)行成功,則繼續(xù)判斷head節(jié)點(diǎn)的waitStatus是否為0,前面我們已經(jīng)看到過(guò),head的waitStatue為SIGNAL(-1),這里就會(huì)執(zhí)行unparkSuccessor()方法來(lái)喚醒head的后置節(jié)點(diǎn),也就是我們上面圖中線程二對(duì)應(yīng)的Node節(jié)點(diǎn)。
此時(shí)看ReentrantLock.tryRelease()中的具體實(shí)現(xiàn):
- protected final boolean tryRelease(int releases) {
- int c = getState() - releases;
- if (Thread.currentThread() != getExclusiveOwnerThread())
- throw new IllegalMonitorStateException();
- boolean free = false;
- if (c == 0) {
- free = true;
- setExclusiveOwnerThread(null);
- }
- setState(c);
- return free;
- }
執(zhí)行完ReentrantLock.tryRelease()后,state被設(shè)置成0,Lock對(duì)象的獨(dú)占鎖被設(shè)置為null。此時(shí)看下AQS中的數(shù)據(jù):
接著執(zhí)行java.util.concurrent.locks.AbstractQueuedSynchronizer.unparkSuccessor()方法,喚醒head的后置節(jié)點(diǎn):
- private void unparkSuccessor(Node node) {
- int ws = node.waitStatus;
- if (ws < 0)
- compareAndSetWaitStatus(node, ws, 0);
- Node s = node.next;
- if (s == null || s.waitStatus > 0) {
- s = null;
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- if (s != null)
- LockSupport.unpark(s.thread);
- }
這里主要是將head節(jié)點(diǎn)的waitStatus設(shè)置為0,然后解除head節(jié)點(diǎn)next的指向,使head節(jié)點(diǎn)空置,等待著被垃圾回收。
此時(shí)重新將head指針指向線程二對(duì)應(yīng)的Node節(jié)點(diǎn),且使用LockSupport.unpark方法來(lái)喚醒線程二。
被喚醒的線程二會(huì)接著嘗試獲取鎖,用CAS指令修改state數(shù)據(jù)。執(zhí)行完成后可以查看AQS中數(shù)據(jù):
此時(shí)線程二被喚醒,線程二接著之前被park的地方繼續(xù)執(zhí)行,繼續(xù)執(zhí)行acquireQueued()方法。
線程二喚醒繼續(xù)加鎖
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
此時(shí)線程二被喚醒,繼續(xù)執(zhí)行for循環(huán),判斷線程二的前置節(jié)點(diǎn)是否為head,如果是則繼續(xù)使用tryAcquire()方法來(lái)嘗試獲取鎖,其實(shí)就是使用CAS操作來(lái)修改state值,如果修改成功則代表獲取鎖成功。接著將線程二設(shè)置為head節(jié)點(diǎn),然后空置之前的head節(jié)點(diǎn)數(shù)據(jù),被空置的節(jié)點(diǎn)數(shù)據(jù)等著被垃圾回收。
此時(shí)線程三獲取鎖成功,AQS中隊(duì)列數(shù)據(jù)如下:
等待隊(duì)列中的數(shù)據(jù)都等待著被垃圾回收。
線程二釋放鎖/線程三加鎖
當(dāng)線程二釋放鎖時(shí),會(huì)喚醒被掛起的線程三,流程和上面大致相同,被喚醒的線程三會(huì)再次嘗試加鎖,具體代碼可以參考上面內(nèi)容。具體流程圖如下:
此時(shí)AQS中隊(duì)列數(shù)據(jù)如圖:
4公平鎖實(shí)現(xiàn)原理
上面所有的加鎖場(chǎng)景都是基于非公平鎖來(lái)實(shí)現(xiàn)的,非公平鎖是ReentrantLock的默認(rèn)實(shí)現(xiàn),那我們接著來(lái)看一下公平鎖的實(shí)現(xiàn)原理,這里先用一張圖來(lái)解釋公平鎖和非公平鎖的區(qū)別:
非公平鎖執(zhí)行流程:

這里我們還是用之前的線程模型來(lái)舉例子,當(dāng)線程二釋放鎖的時(shí)候,喚醒被掛起的線程三,線程三執(zhí)行tryAcquire()方法使用CAS操作來(lái)嘗試修改state值,如果此時(shí)又來(lái)了一個(gè)線程四也來(lái)執(zhí)行加鎖操作,同樣會(huì)執(zhí)行tryAcquire()方法。
這種情況就會(huì)出現(xiàn)競(jìng)爭(zhēng),線程四如果獲取鎖成功,線程三仍然需要待在等待隊(duì)列中被掛起。這就是所謂的非公平鎖,線程三辛辛苦苦排隊(duì)等到自己獲取鎖,卻眼巴巴的看到線程四插隊(duì)獲取到了鎖。
公平鎖執(zhí)行流程:
公平鎖在加鎖的時(shí)候,會(huì)先判斷AQS等待隊(duì)列中是存在節(jié)點(diǎn),如果存在節(jié)點(diǎn)則會(huì)直接入隊(duì)等待,具體代碼如下.
公平鎖在獲取鎖是也是首先會(huì)執(zhí)行acquire()方法,只不過(guò)公平鎖單獨(dú)實(shí)現(xiàn)了tryAcquire()方法:
#java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire():
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
這里會(huì)執(zhí)行ReentrantLock中公平鎖的tryAcquire()方法
#java.util.concurrent.locks.ReentrantLock.FairSync.tryAcquire():
- static final class FairSync extends Sync {
- protected final boolean tryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
- int c = getState();
- if (c == 0) {
- if (!hasQueuedPredecessors() &&
- compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- if (nextc < 0)
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
- }
這里會(huì)先判斷state值,如果不為0且獲取鎖的線程不是當(dāng)前線程,直接返回false代表獲取鎖失敗,被加入等待隊(duì)列。如果是當(dāng)前線程則可重入獲取鎖。
如果state=0則代表此時(shí)沒(méi)有線程持有鎖,執(zhí)行hasQueuedPredecessors()判斷AQS等待隊(duì)列中是否有元素存在,如果存在其他等待線程,那么自己也會(huì)加入到等待隊(duì)列尾部,做到真正的先來(lái)后到,有序加鎖。具體代碼如下:
#java.util.concurrent.locks.AbstractQueuedSynchronizer.hasQueuedPredecessors():
- public final boolean hasQueuedPredecessors() {
- Node t = tail;
- Node h = head;
- Node s;
- return h != t &&
- ((s = h.next) == null || s.thread != Thread.currentThread());
- }
這段代碼很有意思,返回false代表隊(duì)列中沒(méi)有節(jié)點(diǎn)或者僅有一個(gè)節(jié)點(diǎn)是當(dāng)前線程創(chuàng)建的節(jié)點(diǎn)。返回true則代表隊(duì)列中存在等待節(jié)點(diǎn),當(dāng)前線程需要入隊(duì)等待。
先判斷head是否等于tail,如果隊(duì)列中只有一個(gè)Node節(jié)點(diǎn),那么head會(huì)等于tail,接著判斷head的后置節(jié)點(diǎn),這里肯定會(huì)是null,如果此Node節(jié)點(diǎn)對(duì)應(yīng)的線程和當(dāng)前的線程是同一個(gè)線程,那么則會(huì)返回false,代表沒(méi)有等待節(jié)點(diǎn)或者等待節(jié)點(diǎn)就是當(dāng)前線程創(chuàng)建的Node節(jié)點(diǎn)。此時(shí)當(dāng)前線程會(huì)嘗試獲取鎖。
如果head和tail不相等,說(shuō)明隊(duì)列中有等待線程創(chuàng)建的節(jié)點(diǎn),此時(shí)直接返回true,如果只有一個(gè)節(jié)點(diǎn),而此節(jié)點(diǎn)的線程和當(dāng)前線程不一致,也會(huì)返回true
非公平鎖和公平鎖的區(qū)別:非公平鎖性能高于公平鎖性能。非公平鎖可以減少CPU喚醒線程的開(kāi)銷,整體的吞吐效率會(huì)高點(diǎn),CPU也不必取喚醒所有線程,會(huì)減少喚起線程的數(shù)量
非公平鎖性能雖然優(yōu)于公平鎖,但是會(huì)存在導(dǎo)致線程饑餓的情況。在最壞的情況下,可能存在某個(gè)線程一直獲取不到鎖。不過(guò)相比性能而言,饑餓問(wèn)題可以暫時(shí)忽略,這可能就是ReentrantLock默認(rèn)創(chuàng)建非公平鎖的原因之一了。
5.Condition實(shí)現(xiàn)原理
Condition 簡(jiǎn)介
上面已經(jīng)介紹了AQS所提供的核心功能,當(dāng)然它還有很多其他的特性,這里我們來(lái)繼續(xù)說(shuō)下Condition這個(gè)組件。
Condition是在java 1.5中才出現(xiàn)的,它用來(lái)替代傳統(tǒng)的Object的wait()、notify()實(shí)現(xiàn)線程間的協(xié)作,相比使用Object的wait()、notify(),使用Condition中的await()、signal()這種方式實(shí)現(xiàn)線程間協(xié)作更加安全和高效。因此通常來(lái)說(shuō)比較推薦使用Condition
其中AbstractQueueSynchronizer中實(shí)現(xiàn)了Condition中的方法,主要對(duì)外提供awaite(Object.wait())和signal(Object.notify())調(diào)用。
Condition Demo示例
使用示例代碼:
- /**
- * ReentrantLock 實(shí)現(xiàn)源碼學(xué)習(xí)
- * @author 一枝花算不算浪漫
- * @date 2020/4/28 7:20
- */
- public class ReentrantLockDemo {
- static ReentrantLock lock = new ReentrantLock();
- public static void main(String[] args) {
- Condition condition = lock.newCondition();
- new Thread(() -> {
- lock.lock();
- try {
- System.out.println("線程一加鎖成功");
- System.out.println("線程一執(zhí)行await被掛起");
- condition.await();
- System.out.println("線程一被喚醒成功");
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- System.out.println("線程一釋放鎖成功");
- }
- }).start();
- new Thread(() -> {
- lock.lock();
- try {
- System.out.println("線程二加鎖成功");
- condition.signal();
- System.out.println("線程二喚醒線程一");
- } finally {
- lock.unlock();
- System.out.println("線程二釋放鎖成功");
- }
- }).start();
- }
- }
執(zhí)行結(jié)果如下圖:
這里線程一先獲取鎖,然后使用await()方法掛起當(dāng)前線程并釋放鎖,線程二獲取鎖后使用signal喚醒線程一。
Condition實(shí)現(xiàn)原理圖解
我們還是用上面的demo作為實(shí)例,執(zhí)行的流程如下:
線程一執(zhí)行await()方法:
先看下具體的代碼實(shí)現(xiàn),#java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject.await():
- public final void await() throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- Node node = addConditionWaiter();
- int savedState = fullyRelease(node);
- int interruptMode = 0;
- while (!isOnSyncQueue(node)) {
- LockSupport.park(this);
- if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- break;
- }
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;
- if (node.nextWaiter != null) // clean up if cancelled
- unlinkCancelledWaiters();
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
- }
await()方法中首先調(diào)用addConditionWaiter()將當(dāng)前線程加入到Condition隊(duì)列中。
執(zhí)行完后我們可以看下Condition隊(duì)列中的數(shù)據(jù):
具體實(shí)現(xiàn)代碼為:
- private Node addConditionWaiter() {
- Node t = lastWaiter;
- if (t != null && t.waitStatus != Node.CONDITION) {
- unlinkCancelledWaiters();
- t = lastWaiter;
- }
- Node node = new Node(Thread.currentThread(), Node.CONDITION);
- if (t == null)
- firstWaiter = node;
- else
- t.nextWaiter = node;
- lastWaiter = node;
- return node;
- }
這里會(huì)用當(dāng)前線程創(chuàng)建一個(gè)Node節(jié)點(diǎn),waitStatus為CONDITION。接著會(huì)釋放該節(jié)點(diǎn)的鎖,調(diào)用之前解析過(guò)的release()方法,釋放鎖后此時(shí)會(huì)喚醒被掛起的線程二,線程二會(huì)繼續(xù)嘗試獲取鎖。
接著調(diào)用isOnSyncQueue()方法判斷當(dāng)前節(jié)點(diǎn)是否為Condition隊(duì)列中的頭部節(jié)點(diǎn),如果是則調(diào)用LockSupport.park(this)掛起Condition中當(dāng)前線程。此時(shí)線程一被掛起,線程二獲取鎖成功。
具體流程如下圖:
線程二執(zhí)行signal()方法:
首先我們考慮下線程二已經(jīng)獲取到鎖,此時(shí)AQS等待隊(duì)列中已經(jīng)沒(méi)有了數(shù)據(jù)。
接著就來(lái)看看線程二喚醒線程一的具體執(zhí)行流程:
- public final void signal() {
- if (!isHeldExclusively())
- throw new IllegalMonitorStateException();
- Node first = firstWaiter;
- if (first != null)
- doSignal(first);
- }
先判斷當(dāng)前線程是否為獲取鎖的線程,如果不是則直接拋出異常。接著調(diào)用doSignal()方法來(lái)喚醒線程。
- private void doSignal(Node first) {
- do {
- if ( (firstWaiter = first.nextWaiter) == null)
- lastWaiter = null;
- first.nextWaiter = null;
- } while (!transferForSignal(first) &&
- (first = firstWaiter) != null);
- }
- final boolean transferForSignal(Node node) {
- if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
- return false;
- Node p = enq(node);
- int ws = p.waitStatus;
- if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
- LockSupport.unpark(node.thread);
- return true;
- }
- /**
- * Inserts node into queue, initializing if necessary. See picture above.
- * @param node the node to insert
- * @return node's predecessor
- */
- private Node enq(final Node node) {
- for (;;) {
- Node t = tail;
- if (t == null) { // Must initialize
- if (compareAndSetHead(new Node()))
- tail = head;
- } else {
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
這里先從transferForSignal()方法來(lái)看,通過(guò)上面的分析我們知道Condition隊(duì)列中只有線程一創(chuàng)建的一個(gè)Node節(jié)點(diǎn),且waitStatue為CONDITION,先通過(guò)CAS修改當(dāng)前節(jié)點(diǎn)waitStatus為0,然后執(zhí)行enq()方法將當(dāng)前線程加入到等待隊(duì)列中,并返回當(dāng)前線程的前置節(jié)點(diǎn)。
加入等待隊(duì)列的代碼在上面也已經(jīng)分析過(guò),此時(shí)等待隊(duì)列中數(shù)據(jù)如下圖:
接著開(kāi)始通過(guò)CAS修改當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)waitStatus為SIGNAL,并且喚醒當(dāng)前線程。此時(shí)AQS中等待隊(duì)列數(shù)據(jù)為:
線程一被喚醒后,繼續(xù)執(zhí)行await()方法中的 while 循環(huán)。
- public final void await() throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- Node node = addConditionWaiter();
- int savedState = fullyRelease(node);
- int interruptMode = 0;
- while (!isOnSyncQueue(node)) {
- LockSupport.park(this);
- if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- break;
- }
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;
- if (node.nextWaiter != null) // clean up if cancelled
- unlinkCancelledWaiters();
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
- }
因?yàn)榇藭r(shí)線程一的waitStatus已經(jīng)被修改為0,所以執(zhí)行isOnSyncQueue()方法會(huì)返回false。跳出while循環(huán)。
接著執(zhí)行acquireQueued()方法,這里之前也有講過(guò),嘗試重新獲取鎖,如果獲取鎖失敗繼續(xù)會(huì)被掛起。直到另外線程釋放鎖才被喚醒。
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
此時(shí)線程一的流程都已經(jīng)分析完了,等線程二釋放鎖后,線程一會(huì)繼續(xù)重試獲取鎖,流程到此終結(jié)。
Condition總結(jié)
我們總結(jié)下 Condition 和 wait/notify 的比較:
- Condition 可以精準(zhǔn)的對(duì)多個(gè)不同條件進(jìn)行控制,wait/notify 只能和 synchronized 關(guān)鍵字一起使用,并且只能喚醒一個(gè)或者全部的等待隊(duì)列;
- Condition 需要使用 Lock 進(jìn)行控制,使用的時(shí)候要注意 lock() 后及時(shí)的 unlock(),Condition 有類似于 await 的機(jī)制,因此不會(huì)產(chǎn)生加鎖方式而產(chǎn)生的死鎖出現(xiàn),同時(shí)底層實(shí)現(xiàn)的是 park/unpark 的機(jī)制,因此也不會(huì)產(chǎn)生先喚醒再掛起的死鎖,一句話就是不會(huì)產(chǎn)生死鎖,但是 wait/notify 會(huì)產(chǎn)生先喚醒再掛起的死鎖。
6.總結(jié)
這里用了一步一圖的方式結(jié)合三個(gè)線程依次加鎖/釋放鎖來(lái)展示了ReentrantLock的實(shí)現(xiàn)方式和實(shí)現(xiàn)原理,而ReentrantLock底層就是基于AQS實(shí)現(xiàn)的,所以我們也對(duì)AQS有了深刻的理解。
另外還介紹了公平鎖與非公平鎖的實(shí)現(xiàn)原理,Condition的實(shí)現(xiàn)原理,基本上都是使用源碼+繪圖的講解方式,盡量讓大家更容易去理解。