哈嘍大家好,我是阿Q!
??20張圖圖解ReentrantLock加鎖解鎖原理???文章一發(fā),便引發(fā)了大家激烈的討論,更有小伙伴前來彈窗:平時(shí)加解鎖都是直接使用Synchronized?關(guān)鍵字來實(shí)現(xiàn)的,簡單好用,為啥還要引用ReentrantLock呢?
為了解決小伙伴的疑問,我們來對兩者做個(gè)簡單的比較吧:
相同點(diǎn)
兩者都是“可重入鎖”,即當(dāng)前線程獲取到鎖對象之后,如果想繼續(xù)獲取鎖對象還是可以繼續(xù)獲取的,只不過鎖對象的計(jì)數(shù)器進(jìn)行“+1”操作就可以了。
不同點(diǎn)
- ReentrantLock?是基于API?實(shí)現(xiàn)的,Synchronized?是依賴于JVM實(shí)現(xiàn)的;
- ReentrantLock?可以響應(yīng)中斷,Synchronized是不可以的;
- ReentrantLock?可以指定是公平鎖還是非公平鎖,而Synchronized只能是非公平鎖;
- ReentrantLock的lock?是同步非阻塞,采用的是樂觀并發(fā)策略,Synchronized是同步阻塞的,使用的是悲觀并發(fā)策略;
- ReentrantLock?借助Condition?可以實(shí)現(xiàn)多路選擇通知,Synchronized?通過wait()和notify()/notifyAll()方法可以實(shí)現(xiàn)等待/通知機(jī)制(單路通知);
綜上所述,ReentrantLock?還是有區(qū)別于Synchronized的使用場景的,今天我們就來聊一聊它的多路選擇通知功能。
實(shí)戰(zhàn)
沒有實(shí)戰(zhàn)的“紙上談兵”都是扯淡,今天我們反其道而行,先拋出實(shí)戰(zhàn)Demo。
場景描述
加油站為了吸引更多的車主前來加油,在加油站投放了自動(dòng)洗車機(jī)來為加油的汽車提供免費(fèi)洗車服務(wù)。我們規(guī)定汽車必須按照“加油->洗車->駛離”的流程來加油,等前一輛汽車駛離之后才允許下一輛車進(jìn)來加油。
代碼實(shí)現(xiàn)
首先創(chuàng)建鎖對象并生成三個(gè)Condition
/**
* 控制線程喚醒的標(biāo)志
*/
private int flag = 1;
/**
* 創(chuàng)建鎖對象
*/
private Lock lock = new ReentrantLock();
/**
* 等待隊(duì)列
* c1對應(yīng)加油
* c2對應(yīng)洗車
* c3對應(yīng)開車
*/
Condition c1 = lock.newCondition();
Condition c2 = lock.newCondition();
Condition c3 = lock.newCondition();
然后聲明加油、清洗、駛離的方法,并規(guī)定加完油之后去洗車并駛離加油站
/**
* 汽車加油
*/
public void fuelUp(int num){
lock.lock();
try {
while (flag!=1){
c1.await();
}
System.out.println("第"+num+"輛車開始加油");
flag = 2;
c2.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* 汽車清洗
*/
public void carWash(int num){
lock.lock();
try {
while (flag!=2){
c2.await();
}
System.out.println("第"+num+"輛車開始清洗");
flag = 3;
c3.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* 駛離
*/
public void drive(int num){
lock.lock();
try {
while (flag!=3){
c3.await();
}
System.out.println("第"+num+"輛車已經(jīng)駛離加油站");
flag = 1;
c1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
其中await?為等待方法,signal為喚醒方法。
最后我們來定義main方法,模擬一下3輛車同時(shí)到達(dá)加油站的場景
public static void main(String[] args){
CarOperation carOperation = new CarOperation();
//汽車加油
new Thread(()->{
for (int i = 1; i < 4; i++) {
carOperation.fuelUp(i);
}
},"fuelUp").start();
//汽車清洗
new Thread(()->{
for (int i = 1; i < 4; i++) {
carOperation.carWash(i);
}
},"carRepair").start();
//駛離
new Thread(()->{
for (int i = 1; i < 4; i++) {
carOperation.drive(i);
}
},"drive").start();
}
使用是不是很絲滑?為了加深大家對Condition?的理解,接下來我們用圖解的方式分析一波Condition的原理~
圖解
大家都看到了,上邊的案例都是圍繞Condition?來操作的,那什么是Condition?呢?Condition是一個(gè)接口,里邊定義了線程等待和喚醒的方法。

代碼中調(diào)用的lock.newCondition()?實(shí)際調(diào)用的是Sync?類中的newCondition?方法,而ConditionObject?就是Condition的實(shí)現(xiàn)類。
final ConditionObject newCondition(){
return new ConditionObject();
}
我們發(fā)現(xiàn)它處于AQS?的內(nèi)部,沒法直接實(shí)例化,所以需要配合ReentrantLock來使用。
ConditionObject

ConditionObject?內(nèi)部維護(hù)了一個(gè)基于Node的FIFO?單向隊(duì)列,我們把它稱為等待隊(duì)列。firstWaiter?指向首節(jié)點(diǎn),lastWaiter?指向尾節(jié)點(diǎn),Node?中的nextWaiter?指向隊(duì)列中的下一個(gè)元素,并且等待隊(duì)列中節(jié)點(diǎn)的waitStatus都是-2。
了解了ConditionObject?的數(shù)據(jù)結(jié)構(gòu)之后,我們就從源碼角度來圖解一下ReentrantLock的等待/喚醒機(jī)制。
await
首先找到AQS?類中await的源碼
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//將當(dāng)前線程封裝成node加入等待隊(duì)列尾部
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
//檢測此節(jié)點(diǎn)的線程是否在同步隊(duì)上,如果不在,則說明該線程還不具備競爭鎖的資格,則繼續(xù)等待直到檢測到此節(jié)點(diǎn)在同步隊(duì)列上
while (!isOnSyncQueue(node)) {
//當(dāng)node處于等待隊(duì)列時(shí),掛起當(dāng)前線程。
LockSupport.park(this);
//如果發(fā)生了中斷,則跳出循環(huán),結(jié)束等待
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//被喚醒后該節(jié)點(diǎn)一定會在AQS隊(duì)列上,
//之前分析過acquireQueued方法獲取不到鎖會繼續(xù)阻塞
//獲取到了鎖,中斷過返回true,未中斷過返回false
//獲取到鎖存在中斷并且不是中斷喚醒的線程將中斷模式設(shè)置為重新中斷
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
//清除條件隊(duì)列中所有狀態(tài)不為 CONDITION 的結(jié)點(diǎn)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
如果線程中斷,清除中斷標(biāo)記并拋出異常。
查看addConditionWaiter
該方法的作用是將當(dāng)前線程封裝成node加入等待隊(duì)列尾部
private Node addConditionWaiter(){
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
//將不處于等待狀態(tài)的結(jié)點(diǎn)從等待隊(duì)列中移除
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//尾節(jié)點(diǎn)為空
if (t == null)
//將首節(jié)點(diǎn)指向node
firstWaiter = node;
else
//將尾節(jié)點(diǎn)的nextWaiter指向node節(jié)點(diǎn)
t.nextWaiter = node;
//尾節(jié)點(diǎn)指向node
lastWaiter = node;
return node;
}
首先將t指向尾節(jié)點(diǎn),如果尾節(jié)點(diǎn)不為空并且它的waitStatus!=-2,則將不處于等待狀態(tài)的結(jié)點(diǎn)從等待隊(duì)列中移除,并且將t指向新的尾節(jié)點(diǎn)。
將當(dāng)前線程封裝成waitStatus為-2的節(jié)點(diǎn)追加到等待隊(duì)列尾部。
如果尾節(jié)點(diǎn)為空,則隊(duì)列為空,將首尾節(jié)點(diǎn)都指向當(dāng)前節(jié)點(diǎn)。

如果尾節(jié)點(diǎn)不為空,證明隊(duì)列中有其他節(jié)點(diǎn),則將當(dāng)前尾節(jié)點(diǎn)的nextWaiter指向當(dāng)前節(jié)點(diǎn),將當(dāng)前節(jié)點(diǎn)置為尾節(jié)點(diǎn)。

接著我們來查看下unlinkCancelledWaiters()方法——將不處于等待狀態(tài)的結(jié)點(diǎn)從等待隊(duì)列中移除。
private void unlinkCancelledWaiters(){
Node t = firstWaiter;
//trail是t的前驅(qū)結(jié)點(diǎn)
Node trail = null;
while (t != null) {
//next為t的后繼結(jié)點(diǎn)
Node next = t.nextWaiter;
//如果t節(jié)點(diǎn)的waitStatus不為-2即失效節(jié)點(diǎn)
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
//如果t的前驅(qū)節(jié)點(diǎn)為空,則將首節(jié)點(diǎn)指向next
if (trail == null)
firstWaiter = next;
else
//t的前驅(qū)結(jié)點(diǎn)不為空,將前驅(qū)節(jié)點(diǎn)的后繼指針指向next
trail.nextWaiter = next;
//如果next為null,則將尾節(jié)點(diǎn)指向t的前驅(qū)節(jié)點(diǎn)
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
t為當(dāng)前節(jié)點(diǎn),trail?為t的前驅(qū)節(jié)點(diǎn),next為t的后繼節(jié)點(diǎn)。
while?方法會從首節(jié)點(diǎn)順著等待隊(duì)列往后尋找waitStatus!=-2?的節(jié)點(diǎn),將當(dāng)前節(jié)點(diǎn)的nextWaiter置為空。
如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)為空,代表當(dāng)前節(jié)點(diǎn)為首節(jié)點(diǎn),則將next設(shè)置為首節(jié)點(diǎn);

如果不為空,則將前驅(qū)節(jié)點(diǎn)的nextWaiter指向后繼節(jié)點(diǎn)。

如果后繼節(jié)點(diǎn)為空,則直接將前驅(qū)節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn)。

查看fullyRelease
從名字也差不多能明白該方法的作用是徹底釋放鎖資源。
final int fullyRelease(Node node){
//釋放鎖失敗為true,釋放鎖成功為false
boolean failed = true;
try {
//獲取當(dāng)前鎖的state
int savedState = getState();
//釋放鎖成功的話
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
//釋放鎖失敗的話將節(jié)點(diǎn)狀態(tài)置為取消
node.waitStatus = Node.CANCELLED;
}
}
最重要的就是release?方法,而我們上文中已經(jīng)講過了,release執(zhí)行成功的話,當(dāng)前線程已經(jīng)釋放了鎖資源。
查看isOnSyncQueue
判斷當(dāng)前線程所在的Node?是否在同步隊(duì)列中(同步隊(duì)列即AQS隊(duì)列)。在這里有必要給大家看一下同步隊(duì)列與等待隊(duì)列的關(guān)系圖了。

final boolean isOnSyncQueue(Node node){
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null)
return true;
//node節(jié)點(diǎn)的next為null
return findNodeFromTail(node);
}
如果當(dāng)前節(jié)點(diǎn)的waitStatus=-2?,說明它在等待隊(duì)列中,返回false?;如果當(dāng)前節(jié)點(diǎn)有前驅(qū)節(jié)點(diǎn),則證明它在AQS?隊(duì)列中,但是前驅(qū)節(jié)點(diǎn)為空,說明它是頭節(jié)點(diǎn),而頭節(jié)點(diǎn)是不參與鎖競爭的,也返回false。
如果當(dāng)前節(jié)點(diǎn)既不在等待隊(duì)列中,又不是AQS?中的頭結(jié)點(diǎn)且存在next?節(jié)點(diǎn),說明它存在于AQS?中,直接返回true。
接著往下看,如果當(dāng)前節(jié)點(diǎn)的next?為空,該節(jié)點(diǎn)可能是tail?節(jié)點(diǎn),也可能是該節(jié)點(diǎn)的next還未賦值,所以需要從后往前遍歷節(jié)點(diǎn)。
private boolean findNodeFromTail(Node node){
Node t = tail;
for (;;) {
//先用尾節(jié)點(diǎn)來判斷,然后用隊(duì)列中的節(jié)點(diǎn)依次來判斷
if (t == node)
return true;
//節(jié)點(diǎn)為空,說明找到頭也不在AQS隊(duì)列中,返回false
if (t == null)
return false;
t = t.prev;
}
}
在遍歷過程中,如果隊(duì)列中有節(jié)點(diǎn)等于當(dāng)前節(jié)點(diǎn),返回true?;如果找到頭節(jié)點(diǎn)也沒找到,則返回false。
我們回到await的while?循環(huán)處,如果返回false,說明該節(jié)點(diǎn)不在同步隊(duì)列中,進(jìn)入循環(huán)中掛起該線程。
知識點(diǎn)補(bǔ)充
阿Q的理解是線程被喚醒會存在兩種情況:一種是調(diào)用signal/signalAll喚醒線程;一種是通過線程中斷信號,喚醒線程并拋出中斷異常。
查看checkInterruptWhileWaiting(難點(diǎn))
該方法的作用是判斷當(dāng)前線程是否發(fā)生過中斷,如果未發(fā)生中斷返回0?,如果發(fā)生了中斷返回1?或者-1。
private int checkInterruptWhileWaiting(Node node){
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
我們來看看transferAfterCancelledWait?方法是如果區(qū)分1和-1的
final boolean transferAfterCancelledWait(Node node){
//cas嘗試將node的waitStatus設(shè)置為0
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//將node節(jié)點(diǎn)由等待隊(duì)列加入AQS隊(duì)列
enq(node);
return true;
}
//cas失敗后,看看隊(duì)列是不是已經(jīng)在AQS隊(duì)列中,如果不在,則通過yield方法給其它線程讓路
while (!isOnSyncQueue(node))
Thread.yield();
//如果已經(jīng)在AQS隊(duì)列中,則返回false
return false;
}
那什么情況下cas操作會成功?什么情況下又會失敗呢?
當(dāng)線程接收到中斷信號時(shí)會被喚醒,此時(shí)node的waitStatus=-2?,所以會cas?成功,同時(shí)會將node?從等待隊(duì)列轉(zhuǎn)移到AQS隊(duì)列中。
當(dāng)線程先通過signal?喚醒后接收到中斷信號,由于signal?已經(jīng)將node的waitStatus?設(shè)置為-2了,所以此時(shí)會cas失敗。
舉例
大家可以用下邊的例子在transferAfterCancelledWait中打斷點(diǎn)測試一下,相信就明了了。
public class CarOperation {
//創(chuàng)建一個(gè)重入鎖
private Lock lock = new ReentrantLock();
//聲明等待隊(duì)列
Condition c1 = lock.newCondition();
/*
* 等待操作
*/
public void await(){
lock.lock();
try {
System.out.println("開始阻塞");
c1.await();
System.out.println("喚醒之后繼續(xù)執(zhí)行");
} catch (InterruptedException e) {
System.out.println("喚醒但是拋出異常了");
e.printStackTrace();
} finally {
lock.unlock();
}
}
/*
* 喚醒操作
*/
public void signal(){
lock.lock();
try {
c1.signal();
System.out.println("喚醒了。。。。。。。。。。。。。。");
} finally {
lock.unlock();
}
}
}
中斷測試
public static void main(String[] args){
CarOperation carOperation = new CarOperation();
Thread t1 = new Thread(()->{
//等待,掛起線程
carOperation.await();
});
t1.start();
try {
//模擬其它線程搶占資源執(zhí)行過程
Thread.sleep(10000);
//發(fā)出線程中斷信號
t1.interrupt();
} catch (InterruptedException exception) {
exception.printStackTrace();
}
}

先喚醒后中斷測試
public static void main(String[] args){
CarOperation carOperation = new CarOperation();
Thread t1 = new Thread(()->{
carOperation.await();
});
t1.start();
try {
Thread.sleep(10000);
//先喚醒線程
carOperation.signal();
//后中斷
t1.interrupt();
} catch (InterruptedException exception) {
exception.printStackTrace();
}
}

查看reportInterruptAfterWait
//要么拋出異常,要么重新中斷。
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
以上就是await的全部內(nèi)容了,我們先來做個(gè)簡單的總結(jié)。
總結(jié)
- 將當(dāng)前線程封裝成node加入等待隊(duì)列尾部;
- 徹底釋放鎖資源,也就是將它的同步隊(duì)列節(jié)點(diǎn)從同步隊(duì)列隊(duì)首移除;
- 如果當(dāng)前節(jié)點(diǎn)不在同步隊(duì)列中,掛起當(dāng)前線程;
- 自旋,直到該線程被中斷或者被喚醒移動(dòng)到同步隊(duì)列中;
- 阻塞當(dāng)前節(jié)點(diǎn),直到它獲取到鎖資源;
如果你哪個(gè)地方存在疑問可以小窗阿Q!
signal
接下來我們再來捋一捋喚醒的過程
public final void signal(){
//當(dāng)前線程是否是鎖的持有者,不是的話拋出異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//具體的喚醒過程
doSignal(first);
}
private void doSignal(Node first){
do {
//獲取頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)并賦值為頭結(jié)點(diǎn)
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//將之前的頭節(jié)點(diǎn)置為空
first.nextWaiter = null;
//將頭結(jié)點(diǎn)從等待隊(duì)列轉(zhuǎn)移到AQS隊(duì)列中,如果轉(zhuǎn)移失敗,則尋找下一個(gè)節(jié)點(diǎn)繼續(xù)轉(zhuǎn)移
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
首先將等待隊(duì)列的頭結(jié)點(diǎn)從等待隊(duì)列中取出來

然后執(zhí)行transferForSignal方法進(jìn)行轉(zhuǎn)移
final boolean transferForSignal(Node node){
//將node的waitStatus設(shè)置為0,如果設(shè)置失敗說明node的節(jié)點(diǎn)已經(jīng)不在等待隊(duì)列中了,返回false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//將node從等待隊(duì)列轉(zhuǎn)移到AQS隊(duì)列,并返回node的前驅(qū)節(jié)點(diǎn)
Node p = enq(node);
//獲取node前驅(qū)節(jié)點(diǎn)的狀態(tài)
int ws = p.waitStatus;
//如果該節(jié)點(diǎn)是取消狀態(tài)或者將其設(shè)置為喚醒狀態(tài)失?。ㄕf明本身已經(jīng)是喚醒狀態(tài)了),所以可以去喚醒node節(jié)點(diǎn)所在的線程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//喚醒當(dāng)前節(jié)點(diǎn)
LockSupport.unpark(node.thread);
return true;
}
將等待隊(duì)列的頭結(jié)點(diǎn)從等待隊(duì)列轉(zhuǎn)移到AQS?隊(duì)列中,如果轉(zhuǎn)移失敗,說明該節(jié)點(diǎn)已被取消,直接返回false?,然后將first指向新的頭結(jié)點(diǎn)重新進(jìn)行轉(zhuǎn)移。如果轉(zhuǎn)移成功則根據(jù)前驅(qū)節(jié)點(diǎn)的狀態(tài)判斷是否直接喚醒當(dāng)前線程。

怎么樣?喚醒的邏輯是不是超級簡單?我們也按例做個(gè)簡單的總結(jié)。
總結(jié)
從等待隊(duì)列的隊(duì)首開始,嘗試對隊(duì)首節(jié)點(diǎn)執(zhí)行喚醒操作,如果節(jié)點(diǎn)已經(jīng)被取消了,就嘗試喚醒下一個(gè)節(jié)點(diǎn)。
對首節(jié)點(diǎn)執(zhí)行喚醒操作時(shí),首先將節(jié)點(diǎn)轉(zhuǎn)移到同步隊(duì)列,如果前驅(qū)節(jié)點(diǎn)的狀態(tài)為取消狀態(tài)或設(shè)置前驅(qū)節(jié)點(diǎn)的狀態(tài)為喚醒狀態(tài)失敗,那么就立即喚醒當(dāng)前節(jié)點(diǎn)對應(yīng)的線程,否則不執(zhí)行喚醒操作。
以上就是今天的全部內(nèi)容了,我們下期再見。感興趣的可以關(guān)注下公眾號,也可以來技術(shù)群討論問題呦!