教大家如何手寫一個AQS?
手寫一個AQS
AQS即AbstractQueuedSynchronizer,是用來實現(xiàn)鎖和線程同步的一個工具類。大部分操作基于CAS和FIFO隊列來實現(xiàn)。
如果讓我們自己基于API來實現(xiàn)一個鎖,實現(xiàn)可以分為幾個大部分
- 加鎖
- 解鎖
- 入隊
- 出隊
- 阻塞
- 喚醒
我們來想一下這幾個部分的實現(xiàn)
加鎖
1.用一個變量state作為鎖的標志位,默認是0,表示此時所有線程都可以加鎖,加鎖的時候通過cas將state從0變?yōu)?,cas執(zhí)行成功表示加鎖成功
2.當有線程占有了鎖,這時候有其他線程來加鎖,判斷當前來搶鎖的線程是不是占用鎖的線程?是:重入鎖,state+1,當釋放的時候state-1,用state表示加鎖的次數(shù) 否:加鎖失敗,將線程放入等待隊列,并且阻塞
3.有沒有什么其他可以優(yōu)化的地方?當放入等待隊列的時候,看看有沒有其他線程?有,鎖被占用了,并且輪不到當前線程來搶,直接阻塞就行了 在放入隊列時候,通過cas再嘗試獲取一波鎖,如果獲取成功,就不用阻塞了,提高了效率
解鎖
1.通過cas對state-1,如果是重入鎖,釋放一次減一次,當state=0時表示鎖被釋放。2.喚醒等待隊列中的線程
入隊
入隊這個過程和我們平常使用的隊列不同。我們平常使用的隊列每次生成一個節(jié)點放入即可。
而AQS隊列,當隊列為空時,第一次生成兩個節(jié)點,第一個節(jié)點代表當前占有鎖的線程,第二個節(jié)點為搶鎖失敗的節(jié)點。不為空的時候,每次生成一個節(jié)點放入隊尾。
「當把線程放入隊列中時,后續(xù)應(yīng)該做哪些操作呢?」
如果讓你寫是不是直接放入隊列中就完事了?但Doug Lea是這樣做的
- 如果當前線程是隊列中的第二個節(jié)點則再嘗試搶一下鎖(不是第二個節(jié)點就不用搶來,輪不到),這樣避免了頻繁的阻塞和喚醒線程,提高了效率
- 上鬧鐘,讓上一個線程來喚醒自己(后續(xù)會說到,即更改上一個節(jié)點的waitStatus)
- 阻塞
出隊
當A線程釋放鎖,喚醒隊列中的B線程,A線程會從隊列中刪除
那出隊這個事情由誰來做?是由被喚醒的線程來做,即B線程
阻塞和喚醒
阻塞和喚醒線程調(diào)用api即可
- // 阻塞線程
- LockSupport.park(this)
- // 喚醒線程
- LockSupport.unpark(this)
獨占鎖的獲取和釋放
JUC中的許多并發(fā)工具類ReentrantLock,CountDownLatch等的實現(xiàn)都依賴AbstractQueuedSynchronizer
AbstractQueuedSynchronizer定義了一個鎖實現(xiàn)的內(nèi)部流程,而如何加鎖和解鎖則在各個子類中實現(xiàn),典型的模板方法模式
AQS內(nèi)部維護了一個FIFO的隊列(底層實現(xiàn)就是雙向鏈表),通過該隊列來實現(xiàn)線程的并發(fā)訪問控制,隊列中的元素是一個Node節(jié)點
- static final class Node {
- //表示當前線程以共享模式持有鎖
- static final Node SHARED = new Node();
- //表示當前線程以獨占模式持有鎖
- static final Node EXCLUSIVE = null;
- static final int CANCELLED = 1;
- static final int SIGNAL = -1;
- static final int CONDITION = -2;
- static final int PROPAGATE = -3;
- //當前節(jié)點的狀態(tài)
- volatile int waitStatus;
- //前繼節(jié)點
- volatile Node prev;
- //后繼節(jié)點
- volatile Node next;
- //當前線程
- volatile Thread thread;
- //存儲在condition隊列中的后繼節(jié)點
- Node nextWaiter;
- }
waitStatus(默認是0)表示節(jié)點的狀態(tài),包含的狀態(tài)有
狀態(tài) | 值 | 含義 |
---|---|---|
CANCELLED | 1 | 線程獲取鎖的請求已經(jīng)取消 |
SIGNAL | -1 | 表示當前節(jié)點的的后繼節(jié)點將要或者已經(jīng)被阻塞,在當前節(jié)點釋放的時候需要unpark后繼節(jié)點 |
CONDITION | -2 | 表示當前節(jié)點在等待condition,即在condition隊列中 |
PROPAGATE | -3 | 表示狀態(tài)需要向后傳播,僅在共享模式下使用) |
0 | Node被初始化后的默認值,當前節(jié)點在隊列中等待獲取鎖 |
再來看AbstractQueuedSynchronizer這個類的屬性
- //等待隊列的頭節(jié)點
- private transient volatile Node head;
- //等待隊列的尾節(jié)點
- private transient volatile Node tail;
- //加鎖的狀態(tài),在不同子類中有不同的意義
- private volatile int state;
「這個state在不同的子類中有不同的含義」
「ReentrantLock」:state表示加鎖的次數(shù),為0表示沒有被加鎖,為1表示被加鎖1次,為2表示被加鎖2次,因為ReentrantLock是一個可以重入的鎖「CountDownLatch」:state表示一個計數(shù)器,當state>0時,線程調(diào)用await會被阻塞,當state值被減少為0時,線程會被喚醒「Semaphore」:state表示資源的數(shù)量,state>0時,可以獲取資源,并將state-1,當state=0時,獲取不到資源,此時線程會被阻塞。當資源被釋放時,state+1,此時其他線程可以獲得資源
AbstractQueuedSynchronizer中的FIFO隊列是用雙向鏈表來實現(xiàn)的
在這里插入圖片描述
AQS提供了獨占鎖和共享鎖兩種加鎖方式,每種方式都有響應(yīng)中斷和不響應(yīng)中斷的區(qū)別,所以AQS的鎖可以分為如下四類
- 不響應(yīng)中斷的獨占鎖(acquire)
- 響應(yīng)中斷的獨占鎖(acquireInterruptibly)
- 不響應(yīng)中斷的共享鎖(acquireShared)
- 響應(yīng)中斷的共享鎖(acquireSharedInterruptibly)
而釋放鎖的方式只有兩種
- 獨占鎖的釋放(release)
- 共享鎖的釋放(releaseShared)
不響應(yīng)中斷的獨占鎖
以ReentrantLock為例,從加鎖這一部分開始分析
- // 調(diào)用ReentrantLock.FairSync#lock方法其實就是調(diào)用acquire(1);
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//獲取到鎖返回false,否則返回true
- selfInterrupt();//當前線程將自己中斷
- }
- 先嘗試獲取,如果獲取到直接退出,否則進入2
- 獲取鎖失敗,以獨占模式將線程包裝成Node放到隊列中
- 如果放入的節(jié)點是隊列的第二個節(jié)點,則再嘗試獲取鎖,因為此時鎖有可能釋放類,不是第二個節(jié)點就不用嘗試了,因為輪不到。如果獲取到鎖則將當前節(jié)點設(shè)為head節(jié)點,退出,否則進入4
- 設(shè)置好鬧鐘后將自己阻塞
- 線程被喚醒,重新競爭鎖,獲取鎖成功,繼續(xù)執(zhí)行。如果線程發(fā)生過中斷,則最后重置中斷標志位位true,即執(zhí)行selfInterrupt()方法
「從代碼層面詳細分析一波,走起」
tryAcquire是讓子類實現(xiàn)的
- protected boolean tryAcquire(int arg) {
- throw new UnsupportedOperationException();
- }
這里通過拋出異常來告訴子類要重寫這個方法,為什么不將這個方法定義為abstract方法呢?因為AQS有2種功能,獨占和共享,如果用abstract修飾,則子類需要同時實現(xiàn)兩種功能的方法,對子類不友好
- 當隊列不為空,嘗試將新節(jié)點通過CAS的方式設(shè)置為尾節(jié)點,如果成功,返回附加著當前線程的節(jié)點
- 當隊列為空,或者新節(jié)點通過CAS的方式設(shè)置為尾節(jié)點失敗,進入enq方法
- private Node addWaiter(Node mode) {
- Node node = new Node(Thread.currentThread(), mode);
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- enq(node);
- return node;
- }
- 當隊列不為空,一直CAS,直到把新節(jié)點放入隊尾
- 當隊列為空,先往對列中放入一個節(jié)點,在把傳入的節(jié)點CAS為尾節(jié)點
「前面已經(jīng)說過了哈,AQS隊列為空時,第一次會放入2個節(jié)點」
- private Node enq(final Node node) {
- for (;;) {
- Node t = tail;
- // 隊列為空,進行初始化,
- if (t == null) {
- if (compareAndSetHead(new Node()))
- tail = head;
- } else {
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
放入隊列后還要干什么?
- 如果是第二個節(jié)點再嘗試獲取一波鎖,因為此時有可能鎖已經(jīng)釋放了,其他節(jié)點就不用了,因為還輪不到
- 上鬧鐘,讓別的線程喚醒自己
- 阻塞自己
- // 自旋獲取鎖,直到獲取鎖成功,或者異常退出
- // 但是并不是busy acquire,因為當獲取失敗后會被掛起,由前驅(qū)節(jié)點釋放鎖時將其喚醒
- // 同時由于喚醒的時候可能有其他線程競爭,所以還需要進行嘗試獲取鎖,體現(xiàn)的非公平鎖的精髓。
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- // 獲取前繼節(jié)點
- final Node p = node.predecessor();
- // node節(jié)點的前繼節(jié)點是head節(jié)點,嘗試獲取鎖,如果成功說明head節(jié)點已經(jīng)釋放鎖了
- // 將node設(shè)為head開始運行(head中不包含thread)
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- // 將第一個節(jié)點出隊
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- // 獲取鎖失敗后是否可以掛起
- // 如果可以掛起,則阻塞當前線程(獲取鎖失敗的節(jié)點)
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
根據(jù)前繼節(jié)點的狀態(tài),是否可以阻塞當前獲取鎖失敗的節(jié)點
一般情況會經(jīng)歷如下2個過程
- 默認情況下上一個節(jié)點的waitStatus=0,所以會進入compareAndSetWaitStatus方法,通過cas將上一個節(jié)點的waitStatus設(shè)置為SIGNAL,然后return false
- shouldParkAfterFailedAcquire方法外面是一個死循環(huán),當再次進入這個方法時,如果上一步cas成功,則會走第一個if,return true。接著執(zhí)行parkAndCheckInterrupt,線程會阻塞
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int ws = pred.waitStatus;
- // 前繼節(jié)點釋放時會unpark后繼節(jié)點,可以掛起
- if (ws == Node.SIGNAL)
- return true;
- if (ws > 0) {
- //將CANCELLED狀態(tài)的線程清理出隊列
- // 后面會提到為什么會有CANCELLED的節(jié)點
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus > 0);
- pred.next = node;
- } else {
- // 將前繼節(jié)點的狀態(tài)設(shè)置為SIGNAL,代表釋放鎖時需要喚醒后面的線程
- // cas更新可能失敗,所以不能直接返回true
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false;
- }
shouldParkAfterFailedAcquire表示上好鬧鐘了,可以阻塞線程了。后續(xù)當線程被喚醒的時候會從return語句出繼續(xù)執(zhí)行,然后進入acquireQueued方法的死循環(huán),重新?lián)屾i。至此,加鎖結(jié)束。
- // 掛起線程,返回是否被中斷過
- private final boolean parkAndCheckInterrupt() {
- // 阻塞線程
- LockSupport.park(this);
- // 返回當前線程是否被調(diào)用過Thread#interrupt方法
- return Thread.interrupted();
- }
最后用一個流程圖來解釋不響應(yīng)中斷的獨占鎖
入隊過程中有異常該怎么辦?
可以看到上面調(diào)用acquireQueued方法發(fā)生異常的時候,會調(diào)用cancelAcquire方法,我們就詳細分析一下這個cancelAcquire方法有哪些作用?
「哪些地方執(zhí)行發(fā)生異常會執(zhí)行cancelAcquire?」
可以看到調(diào)用cancelAcquire方法的有如下幾個部分
「分析這些方法的調(diào)用,發(fā)現(xiàn)基本就是如下2個地方會發(fā)生異?!?/strong>
- 嘗試獲取鎖的方法如tryAcquire,這些一般是交給子類來實現(xiàn)的
- 當線程是被調(diào)用Thread#interrupt方法喚醒,如果要響應(yīng)中斷,會拋出InterruptedException
- //處理異常退出的node
- private void cancelAcquire(Node node) {
- if (node == null)
- return;
- // 設(shè)置該節(jié)點不再關(guān)聯(lián)任何線程
- node.thread = null;
- // 跳過CANCELLED節(jié)點,找到一個有效的前繼節(jié)點
- Node pred = node.prev;
- while (pred.waitStatus > 0)
- node.prev = pred = pred.prev;
- // 獲取過濾后的有效節(jié)點的后繼節(jié)點
- Node predNext = pred.next;
- // 設(shè)置狀態(tài)為取消
- node.waitStatus = Node.CANCELLED;
- // case 1
- if (node == tail && compareAndSetTail(node, pred)) {
- compareAndSetNext(pred, predNext, null);
- } else {
- // case 2
- int ws;
- if (pred != head &&
- ((ws = pred.waitStatus) == Node.SIGNAL ||
- (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
- pred.thread != null) {
- Node next = node.next;
- if (next != null && next.waitStatus <= 0)
- compareAndSetNext(pred, predNext, next);
- } else {
- // case3
- unparkSuccessor(node);
- }
- node.next = node; // help GC
- }
- }
將node出隊有如下三種情況
當前節(jié)點是tail
當前節(jié)點不是head的后繼節(jié)點,也不是tail
當前節(jié)點是head的后繼節(jié)點
「當前節(jié)點是tail」
compareAndSetTail,將tail指向pred compareAndSetNext,將pred的next指向null,也就是把當前節(jié)點移出隊列
在這里插入圖片描述
「當前節(jié)點不是head的后繼節(jié)點,也不是tail」
這里將node的前繼節(jié)點的next指向了node的后繼節(jié)點,即compareAndSetNext(pred, predNext, next),「注意pred和node節(jié)點中間有可能有CANCELLED的節(jié)點,怕亂就沒畫出來」
「當前節(jié)點是head的后繼節(jié)點」
沒有對隊列進行操作,只是進行head后繼節(jié)點的喚醒操作(unparkSuccessor方法,后面會分析這個方法),因為此時他是head的后繼節(jié)點,還是有可能獲取到鎖的,所以喚醒它嘗試獲取一波鎖,當再次調(diào)用到shouldParkAfterFailedAcquire(判斷是否應(yīng)該阻塞的方法時)會把CANCELLED狀態(tài)的節(jié)點從隊列中刪除
獨占鎖的釋放
獨占鎖是釋放其實就是利用cas將state-1,當state=0表示鎖被釋放,需要將阻塞隊列中的線程喚醒
- // 調(diào)用ReentrantLock#unlock方法其實就是調(diào)用release(1)
- public final boolean release(int arg) {
- // 嘗試釋放鎖
- // 當state=0,表示鎖被釋放,tryRelease返回true,此時需要喚醒阻塞隊列中的線程
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
「tryRelease即具體的解鎖邏輯,需要子類自己去實現(xiàn)」
「喚醒同步隊列中的線程,可以看到前面加了判斷h != null && h.waitStatus != 0」
h = null,說明同步同步隊列中沒有數(shù)據(jù),則不需要喚醒 h = null && waitStatus = 0,同步隊列是有了,但是沒有線程給自己上鬧鐘,不用喚醒 h != null && waitStatus < 0,說明頭節(jié)點被人上了鬧鐘,自己需要喚醒阻塞的線程 h != null && waitStatus > 0,頭節(jié)點因為發(fā)生異常被設(shè)置為取消,但還是得喚醒線程
- private void unparkSuccessor(Node node) {
- int ws = node.waitStatus;
- if (ws < 0)
- compareAndSetWaitStatus(node, ws, 0);
- // 頭結(jié)點的下一個節(jié)點
- Node s = node.next;
- // 為空或者被取消
- if (s == null || s.waitStatus > 0) {
- s = null;
- // 從隊列尾部向前遍歷找到最前面的一個waitStatus<=0的節(jié)點
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- if (s != null)
- // 喚醒節(jié)點,但并不表示它持有鎖,要從阻塞的地方開始運行
- LockSupport.unpark(s.thread);
- }
「為什么要從后向前找第一個非CANCELLED的節(jié)點呢?」
- private Node addWaiter(Node mode) {
- Node node = new Node(Thread.currentThread(), mode);
- // Try the fast path of enq; backup to full enq on failure
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- // 線程在這里掛起了
- pred.next = node;
- return node;
- }
- }
- enq(node);
- return node;
- }
這其實和入隊的邏輯有關(guān)系,假如Node1在圖示位置掛起了,Node1后面又陸續(xù)增加了Node2和Node3,如果此時從前向后遍歷會導(dǎo)致元素丟失,不能正確喚醒線程
分析一下獨占鎖響應(yīng)中斷和不響應(yīng)中斷的區(qū)別
我們之前說過獨占鎖可以響應(yīng)中斷,也可以不響應(yīng)中斷,調(diào)用的方法如下?
- 不響應(yīng)中斷的獨占鎖(acquire)
- 響應(yīng)中斷的獨占鎖(acquireInterruptibly)
所以我們只需要看這2個方法的區(qū)別在哪里就可以,我下面只列出有區(qū)別的部分哈。
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
- public final void acquireInterruptibly(int arg)
- throws InterruptedException {
- // 判斷線程是否被中斷
- if (Thread.interrupted())
- throw new InterruptedException();
- if (!tryAcquire(arg))
- doAcquireInterruptibly(arg);
- }
「acquire在嘗試獲取鎖的時候完全不管線程有沒有被中斷,而acquireInterruptibly在嘗試獲取鎖之前會判斷線程是否被中斷,如果被中斷,則直接拋出異常?!?/p>
tryAcquire方法一樣,所以我們只需要對比acquireQueued方法和doAcquireInterruptibly方法的區(qū)別即可
「執(zhí)行acquireQueued方法當線程發(fā)生中斷時,只是將interrupted設(shè)置為true,并且調(diào)用selfInterrupt方法將中斷標志位設(shè)置為true」
「而執(zhí)行doAcquireInterruptibly方法,當線程發(fā)生中斷時,直接拋出異常?!?/p>
最后看一下parkAndCheckInterrupt方法,這個方法中判斷線程是否中斷的邏輯特別巧!
- private final boolean parkAndCheckInterrupt() {
- LockSupport.park(this);
- return Thread.interrupted();
- }
「Thread類提供了如下2個方法來判斷線程是否是中斷狀態(tài)」
- isInterrupted
- interrupted
「這里為什么用interrupted而不是isInterrupted的呢?」
演示一下這2個方法的區(qū)別
- @Test
- public void testInterrupt() throws InterruptedException {
- Thread thread = new Thread(() -> {
- while (true) {}
- });
- thread.start();
- TimeUnit.MICROSECONDS.sleep(100);
- thread.interrupt();
- // true
- System.out.println(thread.isInterrupted());
- // true
- System.out.println(thread.isInterrupted());
- // true
- System.out.println(thread.isInterrupted());
- }
- @Test
- public void testInterrupt2() {
- Thread.currentThread().interrupt();
- // true
- System.out.println(Thread.interrupted());
- // false
- System.out.println(Thread.interrupted());
- // false
- System.out.println(Thread.interrupted());
- }
「isInterrupted和interrupted的方法區(qū)別如下」
Thread#isInterrupted:測試線程是否是中斷狀態(tài),執(zhí)行后不更改狀態(tài)標志 Thread#interrupted:測試線程是否是中斷狀態(tài),執(zhí)行后將中斷標志更改為false
接著再寫2個例子
- public static void main(String[] args) {
- LockSupport.park();
- // end被一直阻塞沒有輸出
- System.out.println("end");
- }
- public static void main(String[] args) {
- Thread.currentThread().interrupt();
- LockSupport.park();
- // 輸出end
- System.out.println("end");
- }
可以看到當線程被中斷時,調(diào)用park()方法并不會被阻塞
- public static void main(String[] args) {
- Thread.currentThread().interrupt();
- LockSupport.park();
- // 返回中斷狀態(tài),并且清除中斷狀態(tài)
- Thread.interrupted();
- // 輸出start
- System.out.println("start");
- LockSupport.park();
- // end被阻塞,沒有輸出
- System.out.println("end");
- }
到這我們就能理解為什么要進行中斷的復(fù)位了
- 如果當前線程是非中斷狀態(tài),則在執(zhí)行park時被阻塞,返回中斷狀態(tài)false
- 如果當前線程是中斷狀態(tài),則park方法不起作用,返回中斷狀態(tài)true,interrupted將中斷復(fù)位,變?yōu)閒alse
- 再次執(zhí)行循環(huán)的時候,前一步已經(jīng)在線程的中斷狀態(tài)進行了復(fù)位,則再次調(diào)用park方法時會阻塞
「所以這里要對中斷進行復(fù)位,是為了不讓循環(huán)一直執(zhí)行,讓當前線程進入阻塞狀態(tài),如果不進行復(fù)位,前一個線程在獲取鎖之后執(zhí)行了很耗時的操作,那當前線程豈不是要一直執(zhí)行死循環(huán),造成CPU使用率飆升?」
獨占鎖的獲取和釋放我們已經(jīng)搞清楚了,共享鎖的獲取和釋放我們放到分析CountDownLatch源碼的那一節(jié)來分析
基于AQS自己寫一個鎖
你看AQS已經(jīng)把入隊,出隊,阻塞,喚醒的操作都封裝好了,當我們用AQS來實現(xiàn)自己的鎖時,就非常的方便了,只需要重寫加鎖和解鎖的邏輯即可。我這里演示一個基于AQS實現(xiàn)的非重入的互斥鎖
- public class MyLock {
- private final Sync sync;
- public MyLock() {
- sync = new Sync();
- }
- public class Sync extends AbstractQueuedSynchronizer {
- @Override
- protected boolean tryAcquire(int arg) {
- return compareAndSetState(0, arg);
- }
- @Override
- protected boolean tryRelease(int arg) {
- setState(0);
- return true;
- }
- }
- public void lock() {
- sync.acquire(1);
- }
- public void unLock() {
- sync.release(1);
- }
- }
本文轉(zhuǎn)載自微信公眾號「Java識堂」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系Java識堂公眾號。