10張圖詳解管程內(nèi)部,進(jìn)去看看
java對共享變量的操作管理使用了MESA管程模型。下圖是Java基于AQS實(shí)現(xiàn)的MESA管程模型:
上圖中有三個知識點(diǎn):
- MESA管程模型封裝了共享變量和對共享變量的操作,線程要進(jìn)入管程內(nèi)部,必須獲取到鎖,如果獲取鎖失敗就進(jìn)入入口等待隊(duì)列阻塞等待。
- 如果線程獲取到鎖,就進(jìn)入到管程內(nèi)部。但是進(jìn)入到管程內(nèi)部,也不一定能立刻操作共享變量,而是要看條件變量是否滿足,如果不滿足,只能進(jìn)入條件變量等待隊(duì)列阻塞等待。
- 在條件變量等待隊(duì)列中,如果被其他線程喚醒,也不一定能立刻操作共享變量,而是需要去入口等待隊(duì)列重新排隊(duì)等待獲取鎖。
本文主要講解管程模型中條件變量等待隊(duì)列。
1 官方示例
首先我們看一下官方給出的示例代碼:
- public class BoundedBuffer {
- final Lock lock = new ReentrantLock();
- final Condition notFull = lock.newCondition();
- final Condition notEmpty = lock.newCondition();
- final Object[] items = new Object[100];
- int putptr, takeptr, count;
- public void put(Object x) throws InterruptedException {
- lock.lock();
- try {
- while (count == items.length)
- notFull.await();
- items[putptr] = x;
- if (++putptr == items.length) putptr = 0;
- ++count;
- notEmpty.signal();
- } finally {
- lock.unlock();
- }
- }
- public Object take() throws InterruptedException {
- lock.lock();
- try {
- while (count == 0)
- notEmpty.await();
- Object x = items[takeptr];
- if (++takeptr == items.length) takeptr = 0;
- --count;
- notFull.signal();
- return x;
- } finally {
- lock.unlock();
- }
- }
- }
這個代碼定義了兩個條件變量,notFull和notEmpty,說明如下:
- 如果items數(shù)組已經(jīng)滿了,則notFull變量不滿足,線程需要進(jìn)入notFull條件等待隊(duì)列進(jìn)行等待。當(dāng)take方法取走一個數(shù)組元素時,notFull條件滿足了,喚醒notFull條件等待隊(duì)列中等待線程。
- 如果items數(shù)組為空,則notEmpty變量不滿足,線程需要進(jìn)入notEmpty條件等待隊(duì)列進(jìn)行等待。當(dāng)put方法加入一個數(shù)組元素時,notEmpty條件滿足了,喚醒notEmpty條件等待隊(duì)列中等待線程。
- 條件變量是綁定在Lock上的,示例代碼使用了ReentrantLock。在執(zhí)行await和signal方法時首先要獲取到鎖。
2 原理簡介
Java AQS的條件變量等待隊(duì)列是基于接口Condition和ConditionObject來實(shí)現(xiàn)的,URM類圖如下:
Condition接口主要定義了下面3個方法:
- await:進(jìn)入條件等待隊(duì)列
- signal:喚醒條件等待隊(duì)列中的元素
- signalAll:喚醒條件等待隊(duì)列中的所有元素
3 await
條件等待隊(duì)列跟入口等待隊(duì)列有兩個不同:
- 雖然二者共用了Node類,但是條件等待隊(duì)列是單向隊(duì)列,入口等待隊(duì)列是雙向隊(duì)列,條件隊(duì)列中下一個節(jié)點(diǎn)的引用是nextWaiter,入口等待隊(duì)列中下一個節(jié)點(diǎn)的引用是next。
- 條件等待隊(duì)列中元素的waitStatus必須是-2。
await方法的流程如下圖:
3.1 進(jìn)入條件等待隊(duì)列
入隊(duì)方法對應(yīng)方法addConditionWaiter,這里有三種情況:
- 隊(duì)列為空,則新建一個節(jié)點(diǎn),如下圖:
- 隊(duì)列非空,最后一個元素的waitStatus是-2,如下圖:
- 隊(duì)列非空,最后一個元素的waitStatus不是-2,如下圖:
可以看到,這種情況會從隊(duì)列第一個元素開始檢查waitStatus不是-2的元素,并從隊(duì)列中移除。
3.2 釋放鎖
AQS的并發(fā)鎖是基于state變量實(shí)現(xiàn)的,線程進(jìn)入條件等待隊(duì)列后,要釋放鎖,即state會變?yōu)?,釋放操作會喚醒入口等待隊(duì)列中的線程。對應(yīng)方法fullyRelease,返回值是釋放鎖減掉的state值savedState。
3.3 阻塞等待
釋放鎖后,線程阻塞,自旋等待被喚醒。
3.4 喚醒之后
喚醒之后,當(dāng)前線程主要有四個動作:
- 轉(zhuǎn)入入口等待隊(duì)列,并把waitStatus改為0。
waitStatus等于0表示中間狀態(tài),當(dāng)前節(jié)點(diǎn)后面的節(jié)點(diǎn)已經(jīng)喚醒,但是當(dāng)前節(jié)點(diǎn)線程還沒有執(zhí)行完成。
- 重新獲取鎖,如果獲取成功,則當(dāng)前線程成為入口等待隊(duì)列頭結(jié)點(diǎn),interruptMode置為1。
- 如果當(dāng)前節(jié)點(diǎn)在條件等待隊(duì)列中有后繼節(jié)點(diǎn),則剔除條件等待隊(duì)列中waitStatus!=-2的節(jié)點(diǎn),即隊(duì)列中狀態(tài)為取消的節(jié)點(diǎn)。
- interruptMode如果不等于0,則處理中斷。
3.5 一個細(xì)節(jié)
上面提到了interruptMode,這個屬性有三個值:
- 0:沒有被中斷
- -1:中斷后拋出InterruptedException,這種情況是當(dāng)前線程阻塞,沒有被signal之前發(fā)生了中斷
- 1:重新進(jìn)入中斷狀態(tài),這種情況是指當(dāng)前線程阻塞,被signal之后發(fā)生了中斷
3.6 擴(kuò)展
AQS還提供了其他幾個await方法,如下:
- awaitUninterruptibly:不用處理中斷。
- awaitNanos:自旋等待喚醒過程中有超時時間限制,超時則轉(zhuǎn)入入口等待隊(duì)列。
- awaitUntil:自旋等待喚醒過程中有截止時間,時間到則轉(zhuǎn)入入口等待隊(duì)列。
4 signal
喚醒條件等待隊(duì)列中的元素,首先判斷當(dāng)前線程是否持有獨(dú)占鎖,如果沒有,拋出異常。
喚醒條件隊(duì)列中的元素,會從第一個元素也就是firstWaiter開始,根據(jù)firstWaiter的waitStatus是不是-2,分兩種情況。
4.1 waitStatus==-2
條件隊(duì)列第一個節(jié)點(diǎn)進(jìn)入入口等待隊(duì)列,等待獲取鎖,如下圖:
這里有兩個注意點(diǎn):
- 如果入口等待隊(duì)列中tail節(jié)點(diǎn)的waitStatus小于等于0,則firstWaiter加入后需要把舊tail節(jié)點(diǎn)置為-1(表示后面節(jié)點(diǎn)等待當(dāng)前節(jié)點(diǎn)喚醒),如下圖:
如果重置waitStatus狀態(tài)失敗,則unpark節(jié)點(diǎn)firstWaiter。
- 如果入口等待隊(duì)列中tail節(jié)點(diǎn)的waitStatus大于0,則unpark節(jié)點(diǎn)firstWaiter。
4.2 waitStatus!=-2
如果firstWaiter的waitStatus不等于-2,則查找firstWaiter的nextWaiter,直到找到一個waitStatus等于-2的節(jié)點(diǎn),然后將這個節(jié)點(diǎn)加入入口等待隊(duì)列隊(duì)尾,如下圖:
4.3 waitStatus修改
上面的兩種情況無論哪種,進(jìn)入入口等待隊(duì)列之前都要用CAS的方式把waitStatus改為0。
5 signalAll
理解了signal的邏輯,signalAll的邏輯就非常容易理解了。首先判斷當(dāng)前線程是否持有獨(dú)占鎖,如果沒有,拋出異常。
將條件等待隊(duì)列中的所有節(jié)點(diǎn)依次加入入口等待隊(duì)列。如下圖:
6 使用案例
6.1 示例代碼
java并發(fā)包下有很多類使用到了AQS中的Condition,如下圖:
這里我們以CyclicBarrier為例來講解。CyclicBarrier是讓一組線程相互等待共同達(dá)到一個屏障點(diǎn)。從Cyclic可以看出Barrier可以循環(huán)利用,也就是當(dāng)線程釋放之后可以繼續(xù)使用。
看下面這段示例代碼:
- public static void main(String[] args) {
- CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
- System.out.println("柵欄中的線程執(zhí)行完成");
- });
- ExecutorService executorService = Executors.newFixedThreadPool(2);
- executorService.submit(() -> {
- try {
- System.out.println("線程1:" + Thread.currentThread().getName());
- cyclicBarrier.await();
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
- executorService.submit(() -> {
- try {
- System.out.println("線程2:" + Thread.currentThread().getName());
- cyclicBarrier.await();
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
- executorService.shutdown();
- }
執(zhí)行結(jié)果:
- 線程1:pool-1-thread-1
- 線程2:pool-1-thread-2
- 柵欄中的線程執(zhí)行完成
6.2 原理講解
CyclicBarrier初始化的時候,會指定線程的數(shù)量count,每個線程執(zhí)行完邏輯后,調(diào)用CyclicBarrier的await方法,這個方法首先將count減1,然后調(diào)用Condition的await,讓當(dāng)前線程進(jìn)入條件等待隊(duì)列。當(dāng)最后一個線程將count減1后,count數(shù)量等于0,這時就會調(diào)用Condition的signalAll方法喚醒所有線程。
7 總結(jié)
java的管程模型使用了MESA模型,基于AQS實(shí)現(xiàn)的MESA模型中,使用雙向隊(duì)列實(shí)現(xiàn)了入口等待隊(duì)列,使用變量state實(shí)現(xiàn)了并發(fā)鎖,使用Condition實(shí)現(xiàn)了條件等待隊(duì)列。
在AQS的實(shí)現(xiàn)中,使用同步隊(duì)列這個術(shù)語來表示雙向隊(duì)列,本文中使用入口等待隊(duì)列來描述是為了更好的配合管程模型來講解。
AQS的Condition中,使用await方法將當(dāng)前線程放入條件等待隊(duì)列阻塞等待,使用notify來喚醒條件等待隊(duì)列中的線程,被喚醒之后,線程并不能立刻執(zhí)行,而是進(jìn)入入口等待隊(duì)列等待獲取鎖。