自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

源碼級(jí)深挖AQS隊(duì)列同步器

開(kāi)發(fā) 后端
在java中提供了兩類(lèi)鎖的實(shí)現(xiàn),一種是在jvm層級(jí)上實(shí)現(xiàn)的synchrinized隱式鎖,另一類(lèi)是jdk在代碼層級(jí)實(shí)現(xiàn)的,juc包下的Lock顯示鎖,而提到Lock就不得不提一下它的核心隊(duì)列同步器(AQS)了,它的全稱(chēng)是AbstractQueuedSynchronizer。

[[392393]]

我們知道,在java中提供了兩類(lèi)鎖的實(shí)現(xiàn),一種是在jvm層級(jí)上實(shí)現(xiàn)的synchrinized隱式鎖,另一類(lèi)是jdk在代碼層級(jí)實(shí)現(xiàn)的,juc包下的Lock顯示鎖,而提到Lock就不得不提一下它的核心隊(duì)列同步器(AQS)了,它的全稱(chēng)是AbstractQueuedSynchronizer,是用來(lái)構(gòu)建鎖或者其他一些同步組件的基礎(chǔ),除了ReentrantLock、ReentrantReadWriteLock外,它還在CountDownLatch、Semaphore以及ThreadPoolExecutor中被使用,通過(guò)理解隊(duì)列同步器的工作原理,對(duì)我們了解和使用這些工具類(lèi)會(huì)有很大的幫助。

1、AQS 基礎(chǔ)

為了便于理解AQS的概念,這里首先摘錄部分AbstractQueuedSynchronizer的注釋進(jìn)行了簡(jiǎn)要翻譯:

  • 它提供了一個(gè)框架,對(duì)于依賴(lài)先進(jìn)先出等待隊(duì)列的阻塞鎖和同步器(例如信號(hào)量和事件),可以用它來(lái)實(shí)現(xiàn)。這個(gè)類(lèi)的設(shè)計(jì),對(duì)于大多數(shù)依賴(lài)于單個(gè)原子值來(lái)表示狀態(tài)(state)的同步器,可以提供有力的基礎(chǔ)。子類(lèi)需要重寫(xiě)被protected修飾的方法,例如更改狀態(tài)(state),定義在獲取或釋放對(duì)象時(shí)這些狀態(tài)表示的含義?;谶@些,類(lèi)中的其他方法實(shí)現(xiàn)了隊(duì)列和阻塞機(jī)制。在子類(lèi)中可以維護(hù)其他的狀態(tài)字段,但是只有使用getState,setState,compareAndSetState方法原子更新的狀態(tài)值變量,才與同步有關(guān)。
  • 子類(lèi)被推薦定義為非public的內(nèi)部類(lèi),用來(lái)實(shí)現(xiàn)封閉類(lèi)的屬性同步。同步器本身沒(méi)有實(shí)現(xiàn)任何同步接口,它僅僅定義了一些方法,供具體的鎖和同步組件中的public方法調(diào)用。
  • 隊(duì)列同步器支持獨(dú)占模式和共享模式,當(dāng)一個(gè)線程在獨(dú)占模式下獲取時(shí),其他線程不能獲取成功,在共享模式下多線程的獲取可能成功。在不同模式下,等待的線程使用的是相同的先進(jìn)先出隊(duì)列。通常,實(shí)現(xiàn)子類(lèi)只支持其中的一種模式,但是在ReadWriteLock中兩者都可以發(fā)揮作用。只支持一種模式的子類(lèi)在實(shí)現(xiàn)時(shí)不需要重寫(xiě)另一種模式中的方法。

閱讀這些注釋?zhuān)梢灾繟bstractQueuedSynchronizer是一個(gè)抽象類(lèi),它基于內(nèi)部先進(jìn)先出(FIFO)的雙向隊(duì)列、以及內(nèi)置的一些protected方法來(lái)實(shí)現(xiàn)同步器,完成同步狀態(tài)的管理,并且我們可以通過(guò)子類(lèi)繼承AQS抽象類(lèi)的方式,在共享模式或獨(dú)占模式下,實(shí)現(xiàn)自定義的同步組件。

通過(guò)上面的描述,可以看出AQS中的兩大核心是同步狀態(tài)和雙向的同步隊(duì)列,來(lái)看一下源碼中是如何對(duì)它們進(jìn)行定義的:

 

  1. public abstract class AbstractQueuedSynchronizer 
  2.     extends AbstractOwnableSynchronizer implements java.io.Serializable { 
  3.  static final class Node { 
  4.   volatile int waitStatus; 
  5.   volatile Node prev; 
  6.   volatile Node next
  7.   volatile Thread thread; 
  8.   //... 
  9.  } 
  10.  private transient volatile Node head; 
  11.  private transient volatile Node tail; 
  12.  private volatile int state; 
  13.  //... 

下面針對(duì)這兩個(gè)核心內(nèi)容分別進(jìn)行研究。

同步隊(duì)列

AQS內(nèi)部靜態(tài)類(lèi)Node用于表示同步隊(duì)列中的節(jié)點(diǎn),變量表示的意義如下:

  • prev:當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),如果當(dāng)前節(jié)點(diǎn)是同步隊(duì)列的頭節(jié)點(diǎn),那么prev為null
  • next:當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn),如果當(dāng)前節(jié)點(diǎn)是同步隊(duì)列的尾節(jié)點(diǎn),那么next為null
  • thread:獲取同步狀態(tài)的線程
  • waitStatus:等待狀態(tài),取值可為以下情況
  1. CANCELLED(1):表示當(dāng)前節(jié)點(diǎn)對(duì)應(yīng)的線程被取消,當(dāng)線程等待超時(shí)或被中斷時(shí)會(huì)被修改為此狀態(tài)
  2. SIGNAL(-1):當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)的線程被阻塞,當(dāng)前線程在釋放同步狀態(tài)或取消時(shí),需要喚醒后繼節(jié)點(diǎn)的線程
  3. CONDITION(-2):節(jié)點(diǎn)處于等待隊(duì)列中,節(jié)點(diǎn)線程等待在Condition上,當(dāng)其他線程調(diào)用Condition的signal方法后,會(huì)將節(jié)點(diǎn)從等待隊(duì)列移到同步隊(duì)列
  4. PROPAGATE(-3):表示下一次共享式同步狀態(tài)獲取能夠被執(zhí)行,即同步狀態(tài)的獲取可以向后繼節(jié)點(diǎn)的后繼進(jìn)行無(wú)條件的傳播
  5. 0:初始值,表示當(dāng)前節(jié)點(diǎn)等待獲取同步狀態(tài)

每個(gè)節(jié)點(diǎn)的prev和next指針在加入隊(duì)列的時(shí)候進(jìn)行賦值,通過(guò)這些指針就形成了一個(gè)雙向列表,另外AQS還保存了同步隊(duì)列的頭節(jié)點(diǎn)head和尾節(jié)點(diǎn)tail,通過(guò)這樣的結(jié)構(gòu),就能夠通過(guò)頭節(jié)點(diǎn)或尾節(jié)點(diǎn),找到隊(duì)列中的任何一個(gè)節(jié)點(diǎn)。使用圖來(lái)表示同步隊(duì)列的結(jié)構(gòu)如下:

另外可以看到,在源碼中為了保證可見(jiàn)性,同步器中的head、tail、state,以及節(jié)點(diǎn)中的prev,next屬性都加了關(guān)鍵字volatile修飾。

同步狀態(tài)

AQS的另一核心同步狀態(tài),在代碼中是使用int類(lèi)型的變量state來(lái)表示的,通過(guò)原子操作修改同步狀態(tài)的值,來(lái)實(shí)現(xiàn)對(duì)同步組件的狀態(tài)進(jìn)行修改。在子類(lèi)中,主要通過(guò)AQS提供的下面3個(gè)方法對(duì)同步狀態(tài)的訪問(wèn)和轉(zhuǎn)換進(jìn)行操作:

  • getState():獲取當(dāng)前的同步狀態(tài)
  • setState(int newState):設(shè)置新的同步狀態(tài)
  • compareAndSetState(int expect,int update):調(diào)用Unsafe類(lèi)的compareAndSwapInt方法,使用CAS操作更新同步狀態(tài),保證了狀態(tài)修改的原子性

線程會(huì)試圖修改state的值,如果修改成功那么表示線程得到或釋放了同步狀態(tài),如果失敗就會(huì)將當(dāng)前線程封裝成一個(gè)Node節(jié)點(diǎn),然后將其加入到同步隊(duì)列中,并阻塞當(dāng)前線程。

設(shè)計(jì)思想

AQS的設(shè)計(jì)使用了模板方法的設(shè)計(jì)模式,模板方法一般在父類(lèi)中封裝不變的部分(如算法骨架),把擴(kuò)展的可變部分交給子類(lèi)進(jìn)行擴(kuò)展,子類(lèi)的執(zhí)行結(jié)果會(huì)影響父類(lèi)的結(jié)果,是一種反向的控制結(jié)構(gòu)。AQS中應(yīng)用了這種設(shè)計(jì)模式,將一部分方法交給子類(lèi)進(jìn)行重寫(xiě),而自定義的同步組件在調(diào)用同步器提供的模板方法(父類(lèi)中的方法)時(shí),又會(huì)調(diào)用子類(lèi)重寫(xiě)的方法。

以AQS類(lèi)中常用于獲取鎖的acquire方法為例,它的代碼如下:

  1. public final void acquire(int arg) { 
  2.     if (!tryAcquire(arg) && 
  3.         acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
  4.         selfInterrupt(); 

acquire方法被final修飾,不可以在子類(lèi)中重寫(xiě),因?yàn)樗菍?duì)外提供的模板方法,有相對(duì)具體和固定的執(zhí)行邏輯。在acquire方法中調(diào)用了tryAcquire方法:

  1. protected boolean tryAcquire(int arg) { 
  2.     throw new UnsupportedOperationException(); 

可以看到帶有protected修飾的tryAcquire方法是一個(gè)空殼方法,并沒(méi)有定義實(shí)際獲取同步狀態(tài)的邏輯,這就需要我們?cè)诶^承AQS的子類(lèi)中對(duì)齊進(jìn)行重寫(xiě),從而達(dá)到擴(kuò)展目的。在重寫(xiě)過(guò)程中,就會(huì)用到上面提到的獲取和修改同步狀態(tài)的3個(gè)方法getState、setState和compareAndSetState。

以ReentrantLock中的方法調(diào)用為例,當(dāng)調(diào)用ReentrantLock中的lock方法時(shí),會(huì)調(diào)用繼承了AQS的內(nèi)部類(lèi)Sync的父類(lèi)中的acquire方法,acquire方法再調(diào)用子類(lèi)Sync的tryAcquire方法并返回boolean類(lèi)型結(jié)果。

除了tryAcquire方法外,子類(lèi)中還提供了其他可以重寫(xiě)的方法,列出如下:

  • tryAcquire:獨(dú)占式獲取同步狀態(tài)
  • tryRelease:獨(dú)占式釋放同步狀態(tài)
  • tryAcquireShared:共享式獲取同步狀態(tài)
  • tryReleaseShared:共享式釋放同步狀態(tài)
  • isHeldExclusively:當(dāng)前線程是否獨(dú)占式的占用同步狀態(tài)

而我們?cè)趯?shí)現(xiàn)自定義的同步組件時(shí),可以直接調(diào)用AQS提供的下面這些模板方法:

  • acquire:獨(dú)占式獲取同步狀態(tài),如果線程獲取同步狀態(tài)成功那么方法返回,否則線程阻塞,進(jìn)入同步隊(duì)列中
  • acquireInterruptibly:在acquire基礎(chǔ)上,添加了響應(yīng)中斷功能
  • tryAcquireNanos:在acquireInterruptibly基礎(chǔ)上,添加了超時(shí)限制,超時(shí)會(huì)返回false
  • acquireShared:共享式獲取同步狀態(tài),如果線程獲取同步狀態(tài)成功那么方法返回,否則線程進(jìn)入同步隊(duì)列中阻塞。與acquire不同,該方法允許多個(gè)線程同時(shí)獲取鎖
  • acquireSharedInterruptibly:在acquireShared基礎(chǔ)上,可響應(yīng)中斷
  • tryAcquireSharedNanos:在acquireSharedInterruptibly基礎(chǔ)上,添加了超時(shí)限制
  • release:獨(dú)占式釋放同步狀態(tài),將喚醒同步隊(duì)列中第一個(gè)節(jié)點(diǎn)的線程
  • releaseShared:共享式釋放同步狀態(tài)
  • getQueuedThreads:獲取等待在同步隊(duì)列上的線程集合

從模板方法中可以看出,大多方法都是獨(dú)占模式和共享模式對(duì)稱(chēng)出現(xiàn)的,除去查詢(xún)等待線程方法外,可以將他們分為兩類(lèi):獨(dú)占式獲取或釋放同步狀態(tài)、共享式獲取或釋放同步狀態(tài),并且它們的核心都是acquire與release方法,其他方法只是在它們實(shí)現(xiàn)的基礎(chǔ)上做了部分的邏輯改動(dòng),增加了中斷和超時(shí)功能的支持。下面對(duì)主要的4個(gè)方法進(jìn)行分析。

2、源碼分析

acquire

分析上面acquire方法中源碼的執(zhí)行流程:

1.首先調(diào)用tryAcquire嘗試獲取同步狀態(tài),如果獲取成功,那么直接返回

2.如果獲取同步狀態(tài)失敗,調(diào)用addWaiter方法生成新Node節(jié)點(diǎn)并加入同步隊(duì)列:

  1. private Node addWaiter(Node mode) { 
  2.     Node node = new Node(Thread.currentThread(), mode); 
  3.     // Try the fast path of enq; backup to full enq on failure 
  4.     Node pred = tail; 
  5.     if (pred != null) { 
  6.         node.prev = pred; 
  7.         if (compareAndSetTail(pred, node)) { 
  8.             pred.next = node; 
  9.             return node; 
  10.         } 
  11.     } 
  12.     enq(node); 
  13.     return node; 

方法中使用當(dāng)前線程和等待狀態(tài)構(gòu)造了一個(gè)新的Node節(jié)點(diǎn),在同步隊(duì)列的隊(duì)尾節(jié)點(diǎn)不為空的情況下(說(shuō)明同步隊(duì)列非空),調(diào)用compareAndSetTail方法以CAS的方式把新節(jié)點(diǎn)設(shè)置為同步隊(duì)列的隊(duì)尾節(jié)點(diǎn)。如果隊(duì)尾節(jié)點(diǎn)為空或添加新節(jié)點(diǎn)失敗,則調(diào)用enq方法:

  1. private Node enq(final Node node) { 
  2.     for (;;) { 
  3.         Node t = tail; 
  4.         if (t == null) { // Must initialize 
  5.             if (compareAndSetHead(new Node())) 
  6.                 tail = head; 
  7.         } else { 
  8.             node.prev = t; 
  9.             if (compareAndSetTail(t, node)) { 
  10.                 t.next = node; 
  11.                 return t; 
  12.             } 
  13.         } 
  14.     } 

在同步隊(duì)列為空的情況下,會(huì)先創(chuàng)建一個(gè)新的空節(jié)點(diǎn)作為頭節(jié)點(diǎn),然后通過(guò)CAS的方式將當(dāng)前線程創(chuàng)建的Node設(shè)為尾節(jié)點(diǎn)。在for循環(huán)中,只有通過(guò)CAS將節(jié)點(diǎn)插入到隊(duì)尾后才會(huì)返回,否則就會(huì)重復(fù)循環(huán),通過(guò)這樣的方式,能夠?qū)⒉l(fā)添加節(jié)點(diǎn)的操作變?yōu)榇刑砑?,保證了線程的安全性。這一過(guò)程可以使用下圖表示:

3.添加新節(jié)點(diǎn)完成后,調(diào)用acquireQueued方法,嘗試以自旋的方式獲取同步狀態(tài):

  1. final boolean acquireQueued(final Node node, int arg) { 
  2.     boolean failed = true
  3.     try { 
  4.         boolean interrupted = false
  5.         for (;;) { 
  6.             final Node p = node.predecessor(); 
  7.             if (p == head && tryAcquire(arg)) { 
  8.                 setHead(node); 
  9.                 p.next = null; // help GC 
  10.                 failed = false
  11.                 return interrupted; 
  12.             } 
  13.             if (shouldParkAfterFailedAcquire(p, node) && 
  14.                 parkAndCheckInterrupt()) 
  15.                 interrupted = true
  16.         } 
  17.     } finally { 
  18.         if (failed) 
  19.             cancelAcquire(node); 
  20.     } 

當(dāng)新添加的Node的前驅(qū)節(jié)點(diǎn)是同步隊(duì)列的頭節(jié)點(diǎn)并且嘗試獲取同步狀態(tài)成功時(shí),線程將Node設(shè)為頭節(jié)點(diǎn)并從自旋中退出,否則調(diào)用shouldParkAfterFailedAcquire方法判斷是否需要掛起:

  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 
  2.     int ws = pred.waitStatus; 
  3.     if (ws == Node.SIGNAL) 
  4.         return true
  5.     if (ws > 0) { 
  6.         do { 
  7.             node.prev = pred = pred.prev; 
  8.         } while (pred.waitStatus > 0); 
  9.         pred.next = node; 
  10.     } else { 
  11.         compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 
  12.     } 
  13.     return false

在該方法中,傳入的第一個(gè)Node類(lèi)型的參數(shù)是當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),對(duì)其等待狀態(tài)進(jìn)行判斷:

  • 如果為SIGNAL狀態(tài),那么前驅(qū)節(jié)點(diǎn)釋放同步狀態(tài)或取消時(shí)都會(huì)通知后繼節(jié)點(diǎn),因此可以將當(dāng)前線程阻塞,返回true
  • 如果大于0,那么為CANCEL狀態(tài),表示前驅(qū)節(jié)點(diǎn)被取消,那么一直向前回溯,找到一個(gè)不為CANCEL狀態(tài)的節(jié)點(diǎn),并將當(dāng)前節(jié)點(diǎn)的前驅(qū)指向它
  • 如果不是上面的兩種情況,那么將前驅(qū)節(jié)點(diǎn)的等待狀態(tài)設(shè)為SIGNAL。這里的目的是在每個(gè)節(jié)點(diǎn)進(jìn)入阻塞狀態(tài)前將前驅(qū)節(jié)點(diǎn)的等待狀態(tài)設(shè)為SIGNAL,否則節(jié)點(diǎn)將無(wú)法被喚醒
  • 在后兩種情況下,都會(huì)返回false,然后在acquireQueued方法中進(jìn)行循環(huán),直到進(jìn)入shouldParkAfterFailedAcquire方法時(shí)為第一種情況,阻塞線程

當(dāng)返回為true時(shí),調(diào)用parkAndCheckInterrupt方法:

  1. private final boolean parkAndCheckInterrupt() { 
  2.     LockSupport.park(this); 
  3.     return Thread.interrupted(); 

在方法內(nèi)部調(diào)用了LockSupport的park方法,阻塞當(dāng)前線程,并返回當(dāng)前線程是否被中斷的狀態(tài)。

在上面的代碼中,各節(jié)點(diǎn)通過(guò)自旋的方式檢測(cè)自己的前驅(qū)節(jié)點(diǎn)是否頭節(jié)點(diǎn)的過(guò)程,可用下圖表示:

4.當(dāng)滿足條件,返回acquire方法后,調(diào)用selfInterrupt方法。方法內(nèi)部使用interrupt方法,喚醒被阻塞的線程,繼續(xù)向下執(zhí)行:

  1. static void selfInterrupt() { 
  2.     Thread.currentThread().interrupt(); 

最后,使用流程圖的方式總結(jié)acquire方法獨(dú)占式獲取鎖的整體流程:

release

與acquire方法對(duì)應(yīng),release方法負(fù)責(zé)獨(dú)占式釋放同步狀態(tài),流程也相對(duì)簡(jiǎn)單。在ReentrantLock中,unlock方法就是直接調(diào)用的AQS的release方法。先來(lái)直接看一下它的源碼:

  1. public final boolean release(int arg) { 
  2.     if (tryRelease(arg)) { 
  3.         Node h = head; 
  4.         if (h != null && h.waitStatus != 0) 
  5.             unparkSuccessor(h); 
  6.         return true
  7.     } 
  8.     return false

1.方法中首先調(diào)用子類(lèi)重寫(xiě)的tryRelease方法,嘗試釋放當(dāng)前線程持有的同步狀態(tài),如果成功則向下執(zhí)行,失敗返回false

2.如果同步隊(duì)列的頭節(jié)點(diǎn)不為空且等待狀態(tài)不為初始狀態(tài),那么將調(diào)用unparkSuccessor方法喚醒它的后繼節(jié)點(diǎn):

  1. private void unparkSuccessor(Node node) { 
  2.     int ws = node.waitStatus; 
  3.     if (ws < 0) 
  4.         compareAndSetWaitStatus(node, ws, 0); 
  5.     Node s = node.next
  6.     if (s == null || s.waitStatus > 0) { 
  7.         s = null
  8.         for (Node t = tail; t != null && t != node; t = t.prev) 
  9.             if (t.waitStatus <= 0) 
  10.                 s = t; 
  11.     } 
  12.     if (s != null
  13.         LockSupport.unpark(s.thread); 

方法主要實(shí)現(xiàn)的功能有:

  • 如果頭節(jié)點(diǎn)的等待狀態(tài)小于0,使用CAS將它置為0
  • 如果后續(xù)節(jié)點(diǎn)為空、或它的等待狀態(tài)為CANCEL被取消,那么從隊(duì)尾開(kāi)始,向前尋找最靠近隊(duì)列頭部的一個(gè)等待狀態(tài)小于0 的節(jié)點(diǎn)
  • 找到符合條件的節(jié)點(diǎn)后,調(diào)用LockSupport工具類(lèi)的unpark方法,喚醒后繼節(jié)點(diǎn)中對(duì)應(yīng)的線程

同步隊(duì)列新頭節(jié)點(diǎn)的設(shè)置過(guò)程如下圖所示:

在上面的過(guò)程中,采用的是從后向前遍歷尋找未取消節(jié)點(diǎn)的方式,這是因?yàn)锳QS的同步隊(duì)列是一個(gè)弱一致性的雙向列表,在下面的情況中,存在next指針為null的情況:

  • 在enq方法插入新節(jié)點(diǎn)時(shí),可能存在舊尾節(jié)點(diǎn)的next指針還未指向新節(jié)點(diǎn)的情況
  • 在shouldParkAfterFailedAcquire方法中,當(dāng)移除CANCEL狀態(tài)的節(jié)點(diǎn)時(shí),也存在next指針還未指向后續(xù)節(jié)點(diǎn)的情況

acquireShared

在了解了獨(dú)占式獲取同步狀態(tài)后,再來(lái)看一下共享式獲取同步狀態(tài)。在共享模式下,允許多個(gè)線程同時(shí)獲取到同步狀態(tài),來(lái)看一下它的源碼:

  1. public final void acquireShared(int arg) { 
  2.     if (tryAcquireShared(arg) < 0) 
  3.         doAcquireShared(arg); 

首先調(diào)用子類(lèi)重寫(xiě)的tryAcquireShared方法,返回值為int類(lèi)型,如果值大于等于0表示獲取同步狀態(tài)成功,那么直接返回。如果小于0表示獲取失敗,執(zhí)行下面的doAcquireShared方法,將線程放入等待隊(duì)列使用自旋嘗試獲取,直到獲取同步狀態(tài)成功:

  1. private void doAcquireShared(int arg) { 
  2.     final Node node = addWaiter(Node.SHARED); 
  3.     boolean failed = true
  4.     try { 
  5.         boolean interrupted = false
  6.         for (;;) { 
  7.             final Node p = node.predecessor(); 
  8.             if (p == head) { 
  9.                 int r = tryAcquireShared(arg); 
  10.                 if (r >= 0) { 
  11.                     setHeadAndPropagate(node, r); 
  12.                     p.next = null; // help GC 
  13.                     if (interrupted) 
  14.                         selfInterrupt(); 
  15.                     failed = false
  16.                     return
  17.                 } 
  18.             } 
  19.             if (shouldParkAfterFailedAcquire(p, node) && 
  20.                 parkAndCheckInterrupt()) 
  21.                 interrupted = true
  22.         } 
  23.     } finally { 
  24.         if (failed) 
  25.             cancelAcquire(node); 
  26.     } 

對(duì)上面的代碼進(jìn)行簡(jiǎn)要的解釋?zhuān)?/p>

1.調(diào)用addWaiter方法,封裝新節(jié)點(diǎn),并以共享模式(Node.SHARED)將節(jié)點(diǎn)放入同步隊(duì)列中隊(duì)尾

2.在for循環(huán)中,獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),如果前驅(qū)節(jié)點(diǎn)是同步隊(duì)列的頭節(jié)點(diǎn),那么就以共享模式去嘗試獲取同步狀態(tài),判斷tryAcquireShared的返回值,如果返回值大于等于0,表示獲取同步狀態(tài)成功,修改新的頭節(jié)點(diǎn),并將信息傳播給同步隊(duì)列中的后繼節(jié)點(diǎn),然后檢查中斷標(biāo)志位,如果線程被阻塞,那么進(jìn)行喚醒

3.如果前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn)、或獲取同步狀態(tài)失敗時(shí),調(diào)用shouldParkAfterFailedAcquire判斷是否需要阻塞,如果需要?jiǎng)t調(diào)用parkAndCheckInterrupt,在前驅(qū)節(jié)點(diǎn)的等待狀態(tài)為SIGNAL時(shí),將節(jié)點(diǎn)對(duì)應(yīng)的線程阻塞

可以看到,共享式的獲取同步狀態(tài)的調(diào)用過(guò)程和acquire方法非常相似,但不同的是在獲取同步狀態(tài)成功后,會(huì)調(diào)用setHeadAndPropagate方法進(jìn)行共享式同步狀態(tài)的傳播:

  1. private void setHeadAndPropagate(Node node, int propagate) { 
  2.     Node h = head; // Record old head for check below 
  3.     setHead(node); 
  4.     if (propagate > 0 || h == null || h.waitStatus < 0 || 
  5.         (h = head) == null || h.waitStatus < 0) { 
  6.         Node s = node.next
  7.         if (s == null || s.isShared()) 
  8.             doReleaseShared(); 
  9.     } 

因?yàn)楣蚕硎酵綘顟B(tài)是允許多個(gè)線程共享的,所以在一個(gè)線程獲取到同步狀態(tài)后,需要在第一時(shí)間通知后繼節(jié)點(diǎn)的線程可以嘗試獲取同步資源,這樣就可以避免其他線程阻塞時(shí)間過(guò)長(zhǎng)。在方法中,把當(dāng)前節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)后,需要根據(jù)情況判斷后繼節(jié)點(diǎn)是否需要釋放:

  • propagate>0:表示還擁有剩余的同步資源,從doAcquireShared方法中執(zhí)行到這時(shí),取值是大于等于0的,在等于0的情況下,會(huì)繼續(xù)下面的判斷
  • h == null:原頭節(jié)點(diǎn)為空,一般情況下不滿足,有可能發(fā)生在原頭節(jié)點(diǎn)被gc回收的情況,此條不滿足情況則向下繼續(xù)判斷
  • h.waitStatus < 0:原頭節(jié)點(diǎn)的等待狀態(tài)可能取值為0或-3
  • 當(dāng)某個(gè)線程釋放同步資源或者前一個(gè)節(jié)點(diǎn)共享式獲取同步狀態(tài)時(shí)(會(huì)執(zhí)行下面的doReleaseShared方法),會(huì)將自己的waitStatus從-1改變?yōu)?
  • 這時(shí)可能后繼節(jié)點(diǎn)還沒(méi)有來(lái)的及將自己更新為頭節(jié)點(diǎn),如果有其他的線程在這個(gè)時(shí)候再調(diào)用doReleaseShared方法,那么取到的還是原頭節(jié)點(diǎn),會(huì)把它的waitStatus從0改變?yōu)?3,在這個(gè)過(guò)程中,說(shuō)明其他線程調(diào)用doReleaseShared釋放了同步資源
  • (h = head) == null:新頭節(jié)點(diǎn)為空,一般情況下不滿足,會(huì)向下繼續(xù)判斷
  • h.waitStatus < 0:新頭節(jié)點(diǎn)的等待狀態(tài)可能取值為0或-3或-1
  1. 如果后繼節(jié)點(diǎn)剛加入隊(duì)列,還沒(méi)有運(yùn)行到shouldParkAfterFailedAcquire方法,修改其前驅(qū)節(jié)點(diǎn)的等待狀態(tài)時(shí),此時(shí)可能為0
  2. 如果節(jié)點(diǎn)被喚醒成為了新的頭節(jié)點(diǎn),并且此時(shí)后繼節(jié)點(diǎn)才剛被加入同步隊(duì)列,又有其他線程釋放鎖調(diào)用了doReleaseShared,會(huì)把頭節(jié)點(diǎn)的狀態(tài)從0改為-3
  3. 隊(duì)列中的節(jié)點(diǎn)已經(jīng)調(diào)用了shouldParkAfterFailedAcquire,會(huì)把waitStatus 從0或-3 改為-1

如果滿足上面的任何一種狀態(tài),并且它的后繼節(jié)點(diǎn)是SHARED狀態(tài)的,則執(zhí)行doReleaseShared方法釋放后繼節(jié)點(diǎn):

  1. private void doReleaseShared() { 
  2.     for (;;) { 
  3.         Node h = head; 
  4.         if (h != null && h != tail) { 
  5.             int ws = h.waitStatus; 
  6.             if (ws == Node.SIGNAL) { 
  7.                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 
  8.                     continue; // loop to recheck cases 
  9.                 unparkSuccessor(h); 
  10.             } 
  11.             else if (ws == 0 && 
  12.                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 
  13.                 continue; // loop on failed CAS 
  14.         } 
  15.         if (h == head) // loop if head changed 
  16.             break; 
  17.     } 

doReleaseShared方法不僅在這里的共享狀態(tài)傳播的情況下被調(diào)用,還會(huì)在后面介紹的共享式釋放同步狀態(tài)中被調(diào)用。在方法中,當(dāng)頭節(jié)點(diǎn)不為空且不等于尾節(jié)點(diǎn)(意味著沒(méi)有后繼節(jié)點(diǎn)需要等待喚醒)時(shí):

  • 先將頭節(jié)點(diǎn)從SIGNAL狀態(tài)更新為0,然后調(diào)用unparkSuccessor方法喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)
  • 將頭節(jié)點(diǎn)的狀態(tài)從0更新為PROPAGATE,表明狀態(tài)需要向后繼節(jié)點(diǎn)傳播
  • 如果頭節(jié)點(diǎn)在更新?tīng)顟B(tài)的時(shí)候沒(méi)有發(fā)生改變,則退出循環(huán)

通過(guò)上面的流程,就實(shí)現(xiàn)了從頭節(jié)點(diǎn)嘗試向后喚醒節(jié)點(diǎn),實(shí)現(xiàn)了共享狀態(tài)的向后傳播。

releaseShared

最后,再來(lái)看一下對(duì)應(yīng)的共享式釋放同步狀態(tài)方法:

  1. public final boolean releaseShared(int arg) { 
  2.     if (tryReleaseShared(arg)) { 
  3.         doReleaseShared(); 
  4.         return true
  5.     } 
  6.     return false

releaseShared方法會(huì)釋放指定量的資源,如果調(diào)用子類(lèi)重寫(xiě)的tryReleaseShared方法返回值為true,表示釋放成功,那么還是執(zhí)行上面介紹過(guò)的doReleaseShared方法喚醒同步隊(duì)列中的等待線程。

3、自定義同步組件

在前面的介紹中說(shuō)過(guò),在使用AQS時(shí),需要定義一個(gè)子類(lèi)繼承AbstractQueuedSynchronizer抽象類(lèi),并實(shí)現(xiàn)它的抽象方法來(lái)管理同步狀態(tài)。接下來(lái)我們就來(lái)手寫(xiě)一個(gè)獨(dú)占式的鎖,按照文檔中的推薦,我們將子類(lèi)定義為自定義同步工具類(lèi)的靜態(tài)內(nèi)部類(lèi):

  1. public class MyLock { 
  2.     private static class AqsHelper extends AbstractQueuedSynchronizer { 
  3.         @Override 
  4.         protected boolean tryAcquire(int arg) { 
  5.             int state = getState(); 
  6.             if (state == 0) { 
  7.                 if (compareAndSetState(0, arg)) { 
  8.                     setExclusiveOwnerThread(Thread.currentThread()); 
  9.                     return true
  10.                 } 
  11.             } else if (getExclusiveOwnerThread() == Thread.currentThread()) { 
  12.                 setState(getState() + arg); 
  13.                 return true
  14.             } 
  15.             return false
  16.         } 
  17.         @Override 
  18.         protected boolean tryRelease(int arg) { 
  19.             int state = getState() - arg; 
  20.             if (state == 0) { 
  21.                 setExclusiveOwnerThread(null); 
  22.                 setState(state); 
  23.                 return true
  24.             } 
  25.             setState(state); 
  26.             return false
  27.         } 
  28.         @Override 
  29.         protected boolean isHeldExclusively() { 
  30.             return getState() == 1; 
  31.         } 
  32.     } 
  33.      
  34.     private final AqsHelper aqsHelper = new AqsHelper(); 
  35.     public void lock() { 
  36.         aqsHelper.acquire(1); 
  37.     } 
  38.     public boolean tryLock() { 
  39.         return aqsHelper.tryAcquire(1); 
  40.     } 
  41.     public void unlock() { 
  42.         aqsHelper.release(1); 
  43.     } 
  44.     public boolean isLocked() { 
  45.         return aqsHelper.isHeldExclusively(); 
  46.     } 

在AQS的子類(lèi)中,首先重寫(xiě)了tryAcquire方法,在方法中利用CAS來(lái)修改state的狀態(tài)值,并在修改成功時(shí)設(shè)置當(dāng)前線程獨(dú)占資源。并且通過(guò)比較嘗試獲取鎖的線程與持有鎖的線程是否相同的方式,來(lái)實(shí)現(xiàn)了鎖的可重入性。在重寫(xiě)的tryRelease方法中,進(jìn)行資源的釋放,如果存在重入的情況,會(huì)一直到所有重入鎖釋放完才會(huì)真正的釋放鎖,并放棄占有狀態(tài)。

可以注意到在自定義的鎖工具類(lèi)中,我們定義了lock和tryLock兩個(gè)方法,分別調(diào)用了acquire和tryAcquire方法,它們的區(qū)別是lock會(huì)等待鎖資源,直到成功時(shí)才會(huì)返回,而tryLock嘗試獲取鎖時(shí),會(huì)立即返回成功或失敗的狀態(tài)。

接下來(lái),我們通過(guò)下面的測(cè)試代碼,驗(yàn)證自定義的鎖的有效性:

  1. public class Test { 
  2.     private MyLock lock=new MyLock(); 
  3.     private int i=0; 
  4.     public void sayHi(){ 
  5.         try { 
  6.             lock.lock(); 
  7.             System.out.println("i am "+i++); 
  8.         }finally { 
  9.             lock.unlock(); 
  10.         } 
  11.     } 
  12.     public static void main(String[] args) { 
  13.         Test test=new Test(); 
  14.         Thread[] th=new Thread[20]; 
  15.         for (int i = 0; i < 20; i++) { 
  16.             new Thread(()->{ 
  17.                 test.sayHi(); 
  18.             }).start(); 
  19.         } 
  20.     } 

運(yùn)行上面的測(cè)試代碼,結(jié)果如下,可以看見(jiàn)通過(guò)加鎖保證了對(duì)變量i的同步訪問(wèn)控制:

接下來(lái)通過(guò)下面的例子測(cè)試鎖的可重入性:

  1. public class Test2 { 
  2.     private MyLock lock=new MyLock(); 
  3.     public void function1(){ 
  4.         lock.lock(); 
  5.         System.out.println("execute function1"); 
  6.         function2(); 
  7.         lock.unlock(); 
  8.     } 
  9.     public void function2(){ 
  10.         lock.lock(); 
  11.         System.out.println("execute function2"); 
  12.         lock.unlock(); 
  13.     } 
  14.     public static void main(String[] args) { 
  15.         Test2 test2=new Test2(); 
  16.         new Thread(()->{ 
  17.             test2.function1(); 
  18.         }).start(); 
  19.     } 

執(zhí)行上面的代碼,可以看到在function1未釋放鎖的情況下,function2對(duì)鎖進(jìn)行了重入并執(zhí)行了后續(xù)的代碼:

 

總結(jié)

通過(guò)上面的學(xué)習(xí),我們了解了AQS的兩大核心同步隊(duì)列和同步狀態(tài),并對(duì)AQS對(duì)資源的管理以及隊(duì)列狀態(tài)的變化有了一定的研究。其實(shí)歸根結(jié)底,AQS只是提供給我們來(lái)開(kāi)發(fā)同步組件的一個(gè)底層框架,在它的層面上,并不關(guān)心子類(lèi)在繼承它時(shí)要實(shí)現(xiàn)什么功能,AQS只是提供了一套維護(hù)同步狀態(tài)的功能,至于要完成什么樣的一個(gè)工具類(lèi),這完全是由我們自己去定義的。

 

責(zé)任編輯:姜華 來(lái)源: 碼農(nóng)參上
相關(guān)推薦

2021-04-13 14:07:22

JUC解析AQS抽象

2020-11-16 08:11:32

ReentrantLo

2025-01-13 09:24:32

2020-06-18 10:50:56

Java并發(fā)同步器

2021-05-18 06:55:07

Java AQS源碼

2024-01-22 08:52:00

AQS雙異步數(shù)據(jù)一致性

2020-05-06 09:10:46

AQS同步器CAS

2024-01-12 07:38:38

AQS原理JUC

2012-06-05 02:12:55

Java多線程

2020-10-16 08:26:38

AQS通信協(xié)作

2024-10-18 11:29:15

2025-02-06 08:24:25

AQS開(kāi)發(fā)Java

2024-02-01 13:03:00

AI模型

2022-11-11 10:48:55

AQS源碼架構(gòu)

2021-05-27 07:54:21

JavaStateAQS

2024-02-29 09:37:25

Java并發(fā)編程

2022-05-23 09:22:20

Go語(yǔ)言調(diào)試器Delve

2022-11-09 10:46:18

AQS加鎖機(jī)制

2021-10-17 19:52:40

Python:源碼編譯器

2022-03-03 18:39:01

Harmonyioremap鴻蒙
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)