自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

有圖解有案例,我終于把Condition的原理講透徹了

開發(fā) 前端
加油站為了吸引更多的車主前來加油,在加油站投放了自動(dòng)洗車機(jī)來為加油的汽車提供免費(fèi)洗車服務(wù)。我們規(guī)定汽車必須按照“加油->洗車->駛離”的流程來加油,等前一輛汽車駛離之后才允許下一輛車進(jìn)來加油。

哈嘍大家好,我是阿Q!

??20張圖圖解ReentrantLock加鎖解鎖原理???文章一發(fā),便引發(fā)了大家激烈的討論,更有小伙伴前來彈窗:平時(shí)加解鎖都是直接使用Synchronized?關(guān)鍵字來實(shí)現(xiàn)的,簡單好用,為啥還要引用ReentrantLock呢?

為了解決小伙伴的疑問,我們來對兩者做個(gè)簡單的比較吧:

相同點(diǎn)

兩者都是“可重入鎖”,即當(dāng)前線程獲取到鎖對象之后,如果想繼續(xù)獲取鎖對象還是可以繼續(xù)獲取的,只不過鎖對象的計(jì)數(shù)器進(jìn)行“+1”操作就可以了。

不同點(diǎn)

  • ReentrantLock?是基于API?實(shí)現(xiàn)的,Synchronized?是依賴于JVM實(shí)現(xiàn)的;
  • ReentrantLock?可以響應(yīng)中斷,Synchronized是不可以的;
  • ReentrantLock?可以指定是公平鎖還是非公平鎖,而Synchronized只能是非公平鎖;
  • ReentrantLock的lock?是同步非阻塞,采用的是樂觀并發(fā)策略,Synchronized是同步阻塞的,使用的是悲觀并發(fā)策略;
  • ReentrantLock?借助Condition?可以實(shí)現(xiàn)多路選擇通知,Synchronized?通過wait()和notify()/notifyAll()方法可以實(shí)現(xiàn)等待/通知機(jī)制(單路通知);

綜上所述,ReentrantLock?還是有區(qū)別于Synchronized的使用場景的,今天我們就來聊一聊它的多路選擇通知功能。

實(shí)戰(zhàn)

沒有實(shí)戰(zhàn)的“紙上談兵”都是扯淡,今天我們反其道而行,先拋出實(shí)戰(zhàn)Demo。

場景描述

加油站為了吸引更多的車主前來加油,在加油站投放了自動(dòng)洗車機(jī)來為加油的汽車提供免費(fèi)洗車服務(wù)。我們規(guī)定汽車必須按照“加油->洗車->駛離”的流程來加油,等前一輛汽車駛離之后才允許下一輛車進(jìn)來加油。

代碼實(shí)現(xiàn)

首先創(chuàng)建鎖對象并生成三個(gè)Condition

/**
* 控制線程喚醒的標(biāo)志
*/
private int flag = 1;

/**
* 創(chuàng)建鎖對象
*/
private Lock lock = new ReentrantLock();

/**
* 等待隊(duì)列
* c1對應(yīng)加油
* c2對應(yīng)洗車
* c3對應(yīng)開車
*/
Condition c1 = lock.newCondition();
Condition c2 = lock.newCondition();
Condition c3 = lock.newCondition();

然后聲明加油、清洗、駛離的方法,并規(guī)定加完油之后去洗車并駛離加油站

/**
* 汽車加油
*/
public void fuelUp(int num){
lock.lock();
try {
while (flag!=1){
c1.await();
}
System.out.println("第"+num+"輛車開始加油");
flag = 2;
c2.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}

}

/**
* 汽車清洗
*/
public void carWash(int num){
lock.lock();
try {
while (flag!=2){
c2.await();
}
System.out.println("第"+num+"輛車開始清洗");
flag = 3;
c3.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

/**
* 駛離
*/
public void drive(int num){
lock.lock();
try {
while (flag!=3){
c3.await();
}
System.out.println("第"+num+"輛車已經(jīng)駛離加油站");
flag = 1;
c1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

其中await?為等待方法,signal為喚醒方法。

最后我們來定義main方法,模擬一下3輛車同時(shí)到達(dá)加油站的場景

public static void main(String[] args){
CarOperation carOperation = new CarOperation();
//汽車加油
new Thread(()->{
for (int i = 1; i < 4; i++) {
carOperation.fuelUp(i);
}
},"fuelUp").start();

//汽車清洗
new Thread(()->{
for (int i = 1; i < 4; i++) {
carOperation.carWash(i);
}
},"carRepair").start();

//駛離
new Thread(()->{
for (int i = 1; i < 4; i++) {
carOperation.drive(i);
}
},"drive").start();
}

使用是不是很絲滑?為了加深大家對Condition?的理解,接下來我們用圖解的方式分析一波Condition的原理~

圖解

大家都看到了,上邊的案例都是圍繞Condition?來操作的,那什么是Condition?呢?Condition是一個(gè)接口,里邊定義了線程等待和喚醒的方法。

圖片

代碼中調(diào)用的lock.newCondition()?實(shí)際調(diào)用的是Sync?類中的newCondition?方法,而ConditionObject?就是Condition的實(shí)現(xiàn)類。

final ConditionObject newCondition(){
return new ConditionObject();
}

我們發(fā)現(xiàn)它處于AQS?的內(nèi)部,沒法直接實(shí)例化,所以需要配合ReentrantLock來使用。

ConditionObject

圖片

ConditionObject?內(nèi)部維護(hù)了一個(gè)基于Node的FIFO?單向隊(duì)列,我們把它稱為等待隊(duì)列。firstWaiter?指向首節(jié)點(diǎn),lastWaiter?指向尾節(jié)點(diǎn),Node?中的nextWaiter?指向隊(duì)列中的下一個(gè)元素,并且等待隊(duì)列中節(jié)點(diǎn)的waitStatus都是-2。

了解了ConditionObject?的數(shù)據(jù)結(jié)構(gòu)之后,我們就從源碼角度來圖解一下ReentrantLock的等待/喚醒機(jī)制。

await

首先找到AQS?類中await的源碼

public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//將當(dāng)前線程封裝成node加入等待隊(duì)列尾部
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
//檢測此節(jié)點(diǎn)的線程是否在同步隊(duì)上,如果不在,則說明該線程還不具備競爭鎖的資格,則繼續(xù)等待直到檢測到此節(jié)點(diǎn)在同步隊(duì)列上
while (!isOnSyncQueue(node)) {
//當(dāng)node處于等待隊(duì)列時(shí),掛起當(dāng)前線程。
LockSupport.park(this);
//如果發(fā)生了中斷,則跳出循環(huán),結(jié)束等待
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//被喚醒后該節(jié)點(diǎn)一定會在AQS隊(duì)列上,
//之前分析過acquireQueued方法獲取不到鎖會繼續(xù)阻塞
//獲取到了鎖,中斷過返回true,未中斷過返回false
//獲取到鎖存在中斷并且不是中斷喚醒的線程將中斷模式設(shè)置為重新中斷
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
//清除條件隊(duì)列中所有狀態(tài)不為 CONDITION 的結(jié)點(diǎn)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

如果線程中斷,清除中斷標(biāo)記并拋出異常。

查看addConditionWaiter

該方法的作用是將當(dāng)前線程封裝成node加入等待隊(duì)列尾部

private Node addConditionWaiter(){
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
//將不處于等待狀態(tài)的結(jié)點(diǎn)從等待隊(duì)列中移除
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//尾節(jié)點(diǎn)為空
if (t == null)
//將首節(jié)點(diǎn)指向node
firstWaiter = node;
else
//將尾節(jié)點(diǎn)的nextWaiter指向node節(jié)點(diǎn)
t.nextWaiter = node;
//尾節(jié)點(diǎn)指向node
lastWaiter = node;
return node;
}

首先將t指向尾節(jié)點(diǎn),如果尾節(jié)點(diǎn)不為空并且它的waitStatus!=-2,則將不處于等待狀態(tài)的結(jié)點(diǎn)從等待隊(duì)列中移除,并且將t指向新的尾節(jié)點(diǎn)。

將當(dāng)前線程封裝成waitStatus為-2的節(jié)點(diǎn)追加到等待隊(duì)列尾部。

如果尾節(jié)點(diǎn)為空,則隊(duì)列為空,將首尾節(jié)點(diǎn)都指向當(dāng)前節(jié)點(diǎn)。

圖片

如果尾節(jié)點(diǎn)不為空,證明隊(duì)列中有其他節(jié)點(diǎn),則將當(dāng)前尾節(jié)點(diǎn)的nextWaiter指向當(dāng)前節(jié)點(diǎn),將當(dāng)前節(jié)點(diǎn)置為尾節(jié)點(diǎn)。

圖片

接著我們來查看下unlinkCancelledWaiters()方法——將不處于等待狀態(tài)的結(jié)點(diǎn)從等待隊(duì)列中移除。

private void unlinkCancelledWaiters(){
Node t = firstWaiter;
//trail是t的前驅(qū)結(jié)點(diǎn)
Node trail = null;
while (t != null) {
//next為t的后繼結(jié)點(diǎn)
Node next = t.nextWaiter;
//如果t節(jié)點(diǎn)的waitStatus不為-2即失效節(jié)點(diǎn)
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
//如果t的前驅(qū)節(jié)點(diǎn)為空,則將首節(jié)點(diǎn)指向next
if (trail == null)
firstWaiter = next;
else
//t的前驅(qū)結(jié)點(diǎn)不為空,將前驅(qū)節(jié)點(diǎn)的后繼指針指向next
trail.nextWaiter = next;
//如果next為null,則將尾節(jié)點(diǎn)指向t的前驅(qū)節(jié)點(diǎn)
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}

t為當(dāng)前節(jié)點(diǎn),trail?為t的前驅(qū)節(jié)點(diǎn),next為t的后繼節(jié)點(diǎn)。

while?方法會從首節(jié)點(diǎn)順著等待隊(duì)列往后尋找waitStatus!=-2?的節(jié)點(diǎn),將當(dāng)前節(jié)點(diǎn)的nextWaiter置為空。

如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)為空,代表當(dāng)前節(jié)點(diǎn)為首節(jié)點(diǎn),則將next設(shè)置為首節(jié)點(diǎn);

圖片

如果不為空,則將前驅(qū)節(jié)點(diǎn)的nextWaiter指向后繼節(jié)點(diǎn)。

圖片

如果后繼節(jié)點(diǎn)為空,則直接將前驅(qū)節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn)。

圖片

查看fullyRelease

從名字也差不多能明白該方法的作用是徹底釋放鎖資源。

final int fullyRelease(Node node){
//釋放鎖失敗為true,釋放鎖成功為false
boolean failed = true;
try {
//獲取當(dāng)前鎖的state
int savedState = getState();
//釋放鎖成功的話
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
//釋放鎖失敗的話將節(jié)點(diǎn)狀態(tài)置為取消
node.waitStatus = Node.CANCELLED;
}
}

最重要的就是release?方法,而我們上文中已經(jīng)講過了,release執(zhí)行成功的話,當(dāng)前線程已經(jīng)釋放了鎖資源。

查看isOnSyncQueue

判斷當(dāng)前線程所在的Node?是否在同步隊(duì)列中(同步隊(duì)列即AQS隊(duì)列)。在這里有必要給大家看一下同步隊(duì)列與等待隊(duì)列的關(guān)系圖了。

圖片

final boolean isOnSyncQueue(Node node){
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null)
return true;
//node節(jié)點(diǎn)的next為null
return findNodeFromTail(node);
}

如果當(dāng)前節(jié)點(diǎn)的waitStatus=-2?,說明它在等待隊(duì)列中,返回false?;如果當(dāng)前節(jié)點(diǎn)有前驅(qū)節(jié)點(diǎn),則證明它在AQS?隊(duì)列中,但是前驅(qū)節(jié)點(diǎn)為空,說明它是頭節(jié)點(diǎn),而頭節(jié)點(diǎn)是不參與鎖競爭的,也返回false。

如果當(dāng)前節(jié)點(diǎn)既不在等待隊(duì)列中,又不是AQS?中的頭結(jié)點(diǎn)且存在next?節(jié)點(diǎn),說明它存在于AQS?中,直接返回true。

接著往下看,如果當(dāng)前節(jié)點(diǎn)的next?為空,該節(jié)點(diǎn)可能是tail?節(jié)點(diǎn),也可能是該節(jié)點(diǎn)的next還未賦值,所以需要從后往前遍歷節(jié)點(diǎn)。

private boolean findNodeFromTail(Node node){
Node t = tail;
for (;;) {
//先用尾節(jié)點(diǎn)來判斷,然后用隊(duì)列中的節(jié)點(diǎn)依次來判斷
if (t == node)
return true;
//節(jié)點(diǎn)為空,說明找到頭也不在AQS隊(duì)列中,返回false
if (t == null)
return false;
t = t.prev;
}
}

在遍歷過程中,如果隊(duì)列中有節(jié)點(diǎn)等于當(dāng)前節(jié)點(diǎn),返回true?;如果找到頭節(jié)點(diǎn)也沒找到,則返回false。

我們回到await的while?循環(huán)處,如果返回false,說明該節(jié)點(diǎn)不在同步隊(duì)列中,進(jìn)入循環(huán)中掛起該線程。

知識點(diǎn)補(bǔ)充

阿Q的理解是線程被喚醒會存在兩種情況:一種是調(diào)用signal/signalAll喚醒線程;一種是通過線程中斷信號,喚醒線程并拋出中斷異常。

查看checkInterruptWhileWaiting(難點(diǎn))

該方法的作用是判斷當(dāng)前線程是否發(fā)生過中斷,如果未發(fā)生中斷返回0?,如果發(fā)生了中斷返回1?或者-1。

private int checkInterruptWhileWaiting(Node node){
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}

我們來看看transferAfterCancelledWait?方法是如果區(qū)分1和-1的

final boolean transferAfterCancelledWait(Node node){
//cas嘗試將node的waitStatus設(shè)置為0
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//將node節(jié)點(diǎn)由等待隊(duì)列加入AQS隊(duì)列
enq(node);
return true;
}
//cas失敗后,看看隊(duì)列是不是已經(jīng)在AQS隊(duì)列中,如果不在,則通過yield方法給其它線程讓路
while (!isOnSyncQueue(node))
Thread.yield();
//如果已經(jīng)在AQS隊(duì)列中,則返回false
return false;
}

那什么情況下cas操作會成功?什么情況下又會失敗呢?

當(dāng)線程接收到中斷信號時(shí)會被喚醒,此時(shí)node的waitStatus=-2?,所以會cas?成功,同時(shí)會將node?從等待隊(duì)列轉(zhuǎn)移到AQS隊(duì)列中。

當(dāng)線程先通過signal?喚醒后接收到中斷信號,由于signal?已經(jīng)將node的waitStatus?設(shè)置為-2了,所以此時(shí)會cas失敗。

舉例

大家可以用下邊的例子在transferAfterCancelledWait中打斷點(diǎn)測試一下,相信就明了了。

public class CarOperation {
//創(chuàng)建一個(gè)重入鎖
private Lock lock = new ReentrantLock();

//聲明等待隊(duì)列
Condition c1 = lock.newCondition();

/*
* 等待操作
*/
public void await(){
lock.lock();
try {
System.out.println("開始阻塞");
c1.await();
System.out.println("喚醒之后繼續(xù)執(zhí)行");
} catch (InterruptedException e) {
System.out.println("喚醒但是拋出異常了");
e.printStackTrace();
} finally {
lock.unlock();
}
}

/*
* 喚醒操作
*/
public void signal(){
lock.lock();
try {
c1.signal();
System.out.println("喚醒了。。。。。。。。。。。。。。");
} finally {
lock.unlock();
}
}
}

中斷測試

public static void main(String[] args){
CarOperation carOperation = new CarOperation();
Thread t1 = new Thread(()->{
//等待,掛起線程
carOperation.await();
});
t1.start();
try {
//模擬其它線程搶占資源執(zhí)行過程
Thread.sleep(10000);
//發(fā)出線程中斷信號
t1.interrupt();
} catch (InterruptedException exception) {
exception.printStackTrace();
}
}

圖片

先喚醒后中斷測試

public static void main(String[] args){
CarOperation carOperation = new CarOperation();
Thread t1 = new Thread(()->{
carOperation.await();
});
t1.start();
try {
Thread.sleep(10000);
//先喚醒線程
carOperation.signal();
//后中斷
t1.interrupt();
} catch (InterruptedException exception) {
exception.printStackTrace();
}
}

圖片

查看reportInterruptAfterWait

//要么拋出異常,要么重新中斷。
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}

以上就是await的全部內(nèi)容了,我們先來做個(gè)簡單的總結(jié)。

總結(jié)

  • 將當(dāng)前線程封裝成node加入等待隊(duì)列尾部;
  • 徹底釋放鎖資源,也就是將它的同步隊(duì)列節(jié)點(diǎn)從同步隊(duì)列隊(duì)首移除;
  • 如果當(dāng)前節(jié)點(diǎn)不在同步隊(duì)列中,掛起當(dāng)前線程;
  • 自旋,直到該線程被中斷或者被喚醒移動(dòng)到同步隊(duì)列中;
  • 阻塞當(dāng)前節(jié)點(diǎn),直到它獲取到鎖資源;

如果你哪個(gè)地方存在疑問可以小窗阿Q!

signal

接下來我們再來捋一捋喚醒的過程

public final void signal(){
//當(dāng)前線程是否是鎖的持有者,不是的話拋出異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//具體的喚醒過程
doSignal(first);
}

private void doSignal(Node first){
do {
//獲取頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)并賦值為頭結(jié)點(diǎn)
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//將之前的頭節(jié)點(diǎn)置為空
first.nextWaiter = null;
//將頭結(jié)點(diǎn)從等待隊(duì)列轉(zhuǎn)移到AQS隊(duì)列中,如果轉(zhuǎn)移失敗,則尋找下一個(gè)節(jié)點(diǎn)繼續(xù)轉(zhuǎn)移
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

首先將等待隊(duì)列的頭結(jié)點(diǎn)從等待隊(duì)列中取出來

圖片

然后執(zhí)行transferForSignal方法進(jìn)行轉(zhuǎn)移

final boolean transferForSignal(Node node){
//將node的waitStatus設(shè)置為0,如果設(shè)置失敗說明node的節(jié)點(diǎn)已經(jīng)不在等待隊(duì)列中了,返回false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//將node從等待隊(duì)列轉(zhuǎn)移到AQS隊(duì)列,并返回node的前驅(qū)節(jié)點(diǎn)
Node p = enq(node);
//獲取node前驅(qū)節(jié)點(diǎn)的狀態(tài)
int ws = p.waitStatus;
//如果該節(jié)點(diǎn)是取消狀態(tài)或者將其設(shè)置為喚醒狀態(tài)失?。ㄕf明本身已經(jīng)是喚醒狀態(tài)了),所以可以去喚醒node節(jié)點(diǎn)所在的線程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//喚醒當(dāng)前節(jié)點(diǎn)
LockSupport.unpark(node.thread);
return true;
}

將等待隊(duì)列的頭結(jié)點(diǎn)從等待隊(duì)列轉(zhuǎn)移到AQS?隊(duì)列中,如果轉(zhuǎn)移失敗,說明該節(jié)點(diǎn)已被取消,直接返回false?,然后將first指向新的頭結(jié)點(diǎn)重新進(jìn)行轉(zhuǎn)移。如果轉(zhuǎn)移成功則根據(jù)前驅(qū)節(jié)點(diǎn)的狀態(tài)判斷是否直接喚醒當(dāng)前線程。

圖片

怎么樣?喚醒的邏輯是不是超級簡單?我們也按例做個(gè)簡單的總結(jié)。

總結(jié)

從等待隊(duì)列的隊(duì)首開始,嘗試對隊(duì)首節(jié)點(diǎn)執(zhí)行喚醒操作,如果節(jié)點(diǎn)已經(jīng)被取消了,就嘗試喚醒下一個(gè)節(jié)點(diǎn)。

對首節(jié)點(diǎn)執(zhí)行喚醒操作時(shí),首先將節(jié)點(diǎn)轉(zhuǎn)移到同步隊(duì)列,如果前驅(qū)節(jié)點(diǎn)的狀態(tài)為取消狀態(tài)或設(shè)置前驅(qū)節(jié)點(diǎn)的狀態(tài)為喚醒狀態(tài)失敗,那么就立即喚醒當(dāng)前節(jié)點(diǎn)對應(yīng)的線程,否則不執(zhí)行喚醒操作。

以上就是今天的全部內(nèi)容了,我們下期再見。感興趣的可以關(guān)注下公眾號,也可以來技術(shù)群討論問題呦!

責(zé)任編輯:武曉燕 來源: 阿Q說代碼
相關(guān)推薦

2020-06-23 09:10:55

Linux零拷貝場景

2019-04-08 12:14:59

Elasticsear程序員Lucene

2021-03-24 14:32:44

人工智能深度學(xué)習(xí)

2021-11-19 06:50:17

OAuth協(xié)議授權(quán)

2020-06-28 10:52:47

HTTP緩存Web

2020-08-06 16:55:37

虛擬化底層計(jì)算機(jī)

2022-07-04 09:43:46

RabbitMQ消息消息隊(duì)列

2023-01-01 13:45:37

Condition機(jī)制條件

2021-12-13 20:09:33

GoElasticsearJava

2020-03-09 09:13:40

HTTPSTCP網(wǎng)絡(luò)協(xié)議

2021-10-09 00:02:04

DevOps敏捷開發(fā)

2021-06-13 12:03:46

SaaS軟件即服務(wù)

2022-03-27 20:32:28

Knative容器事件模型

2019-06-17 08:21:06

RPC框架服務(wù)

2018-11-23 09:25:00

TCC分布式事務(wù)

2019-11-06 10:36:43

MavenoptionalJava

2022-05-01 22:09:27

數(shù)據(jù)模型大數(shù)據(jù)

2020-07-29 09:21:34

Docker集群部署隔離環(huán)境

2020-11-30 08:34:44

大數(shù)據(jù)數(shù)據(jù)分析技術(shù)

2021-02-14 00:21:37

區(qū)塊鏈數(shù)字貨幣金融
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號