自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Java高并發(fā)編程基礎之AQS

開發(fā) 后端
大多數(shù)人應該都可以說出 CountDownLatch、CyclicBarrier、Sempahore多線程并發(fā)三大利器。這三大利器都是通過AbstractQueuedSynchronizer抽象類(下面簡寫AQS)來實現(xiàn)的,所以學習三大利器之前我們有必要先來學習下AQS。

[[383836]]

 引言

曾經(jīng)有一道比較比較經(jīng)典的面試題“你能夠說說java的并發(fā)包下面有哪些常見的類?”大多數(shù)人應該都可以說出 CountDownLatch、CyclicBarrier、Sempahore多線程并發(fā)三大利器。這三大利器都是通過AbstractQueuedSynchronizer抽象類(下面簡寫AQS)來實現(xiàn)的,所以學習三大利器之前我們有必要先來學習下AQS。

AQS是一種提供了原子式管理同步狀態(tài)、阻塞和喚醒線程功能以及隊列模型的簡單框架”

AQS結構

說到同步我們如何來保證同步?大家第一印象肯定是加鎖了,說到鎖的話大家肯定首先會想到的是Synchronized。Synchronized大家應該基本上都會使用,加鎖和釋放鎖都是jvm 來幫我們實現(xiàn)的,我們只需要簡單的加個 Synchronized關鍵字就可以了。用起來超級方便。但是有沒有一種情況我們設置一個鎖的超時時間Synchronized就有點實現(xiàn)不了,這時候我們就可以用ReentrantLock來實現(xiàn),ReentrantLock是通過aqs來實現(xiàn)的,今天我們就通過ReentrantLock來學習一下aqs。

CAS && 公平鎖和非公平鎖

AQS里面用到了大量的CAS學習AQS之前我們還是有必要簡單的先了解下CAS、公平鎖和非公平鎖。

CAS

  • CAS 全稱是 compare and swap,是一種用于在多線程環(huán)境下實現(xiàn)同步功能的機制。CAS 操作包含三個操作數(shù) :內存位置、預期數(shù)值和新值。CAS 的實現(xiàn)邏輯是將內存位置處的數(shù)值與預期數(shù)值相比較,若相等,則將內存位置處的值替換為新值。若不相等,則不做任何操作,這個操作是個原子性操作,java里面的AtomicInteger等類都是通過cas來實現(xiàn)的。

公平鎖和非公平鎖

  • 公平鎖:多個線程按照申請鎖的順序去獲得鎖,線程會直接進入隊列去排隊,隊列中第一個才能獲得到鎖。優(yōu)點:等待鎖的線程不會餓死,每個線程都可以獲取到鎖。缺點:整體吞吐效率相對非公平鎖要低,等待隊列中除第一個線程以外的所有線程都會阻塞,CPU喚醒阻塞線程的開銷比非公平鎖大。
  • 非公平鎖:多個線程去獲取鎖的時候,會直接去嘗試獲取,獲取不到,再去進入等待隊列,如果能獲取到,就直接獲取到鎖。優(yōu)點:可以減少CPU喚醒線程的開銷,整體的吞吐效率會高點,CPU也不必喚醒所有線程,會減少喚起線程的數(shù)量。缺點:處于等待隊列中的線程可能會餓死,或者等很久才會獲得鎖。文字有點拗口,我們來個實際的例子說明下。比如我們去食堂就餐的時候都要排隊,大家都按照先來后到的順序排隊打飯,這就是公平鎖。如果等到你準備拿盤子打飯的時候 直接蹦出了一個五大三粗的胖子插隊到你前面,你看打不贏他只能忍氣吞聲讓他插隊,等胖子打完飯了又來個小個子也來插你隊,這時候你沒法忍了,直接大吼一聲讓他滾,這個 小個子只能屁顛屁顛到隊尾去排隊了這就是非公平鎖。我們先來看看AQS有哪些屬性
  1. // 頭節(jié)點 
  2. private transient volatile Node head; 
  3.  
  4. // 阻塞的尾節(jié)點,每個新的節(jié)點進來,都插入到最后,也就形成了一個鏈表 
  5. private transient volatile Node tail; 
  6.  
  7. // 這個是最重要的,代表當前鎖的狀態(tài),0代表沒有被占用,大于 0 代表有線程持有當前鎖 
  8. // 這個值可以大于 1,是因為鎖可以重入,每次重入都加上 1 
  9. private volatile int state; 
  10.  
  11. // 代表當前持有獨占鎖的線程,舉個最重要的使用例子,因為鎖可以重入 
  12. // reentrantLock.lock()可以嵌套調用多次,所以每次用這個來判斷當前線程是否已經(jīng)擁有了鎖 
  13. // if (currentThread == getExclusiveOwnerThread()) {state++} 
  14. private transient Thread exclusiveOwnerThread; //繼承自AbstractOwnableSynchronizer 

下面我們來寫一個demo分析下lock 加鎖和釋放鎖的過程

  1. final void lock() { 
  2.            // 上來先試試直接把狀態(tài)置位1,如果此時沒人獲取鎖就直接 
  3.            if (compareAndSetState(0, 1)) 
  4.                 // 爭搶成功則修改獲得鎖狀態(tài)的線程 
  5.                setExclusiveOwnerThread(Thread.currentThread()); 
  6.            else 
  7.                acquire(1); 
  8.        } 

cas嘗試失敗,說明已經(jīng)有人再持有鎖,所以進入acquire方法

  1. public final void acquire(int arg) { 
  2.        if (!tryAcquire(arg) && 
  3.            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
  4.            selfInterrupt(); 
  5.    } 

tryAcquire方法,看名字大概能猜出什么意思,就是試一試。tryAcquire實際上是調用了父類Sync的nonfairTryAcquire方法

  1. final boolean nonfairTryAcquire(int acquires) { 
  2.           final Thread current = Thread.currentThread(); 
  3.            // 獲取下當前鎖的狀態(tài) 
  4.           int c = getState(); 
  5.           // 這個if 邏輯跟前面一進來就獲取鎖的邏輯一樣都是通過cas嘗試獲取下鎖 
  6.           if (c == 0) { 
  7.               if (compareAndSetState(0, acquires)) { 
  8.                   setExclusiveOwnerThread(current); 
  9.                   return true
  10.               } 
  11.           } 
  12.           // 進入這個判斷說明 鎖重入了 狀態(tài)需要進行+1 
  13.           else if (current == getExclusiveOwnerThread()) { 
  14.               int nextc = c + acquires; 
  15.                // 如果鎖的重入次數(shù)大于int的最大值,直接就拋出異常了,正常情況應該不存在這種情況,不過jdk還是嚴謹?shù)?nbsp;
  16.               if (nextc < 0) // overflow 
  17.                   throw new Error("Maximum lock count exceeded"); 
  18.               setState(nextc); 
  19.               return true
  20.           } 
  21.           // 返回false 說明嘗試獲取鎖失敗了,失敗了就要進行acquireQueued方法了 
  22.           return false
  23.       } 

tryAcquire方法如果獲取鎖失敗了,那么肯定就要排隊等待獲取鎖。排隊的線程需要待在哪里等待獲取鎖?這個就跟我們線程池執(zhí)行任務一樣,線程池把任務都封裝成一個work,然后當線程處理任務不過來的時候,就把任務放到隊列里面。AQS同樣也是類似的,把排隊等待獲取鎖的線程封裝成一個NODE。然后再把NODE放入到一個隊列里面。隊列如下所示,不過需要注意一點head是不存NODE的。


 

 

接下來我們繼續(xù)分析源碼,看下獲取鎖失敗是如何被加入隊列的。就要執(zhí)行acquireQueued方法,執(zhí)行acquireQueued方法之前需要先執(zhí)行addWaiter方法

  1. private Node addWaiter(Node mode) { 
  2.        Node node = new Node(Thread.currentThread(), mode); 
  3.        // Try the fast path of enq; backup to full enq on failure 
  4.        Node pred = tail; 
  5.        if (pred != null) { 
  6.            node.prev = pred; 
  7.            // cas 加入隊列隊尾 
  8.            if (compareAndSetTail(pred, node)) { 
  9.                pred.next = node; 
  10.                return node; 
  11.            } 
  12.        } 
  13.        // 尾結點不為空 || cas 加入尾結點失敗 
  14.        enq(node); 
  15.        return node; 
  16.    } 

enq

接下來再看看enq方法

  1. // 通過自旋和CAS一定要當前node加入隊尾 
  2. private Node enq(final Node node) { 
  3.         for (;;) { 
  4.             Node t = tail; 
  5.             // 尾結點為空說明隊列還是空的,還沒有被初始化,所以初始化頭結點,可以看到頭結點的node 是沒有綁定線程的也就是不存數(shù)據(jù)的 
  6.             if (t == null) { // Must initialize 
  7.                 if (compareAndSetHead(new Node())) 
  8.                     tail = head; 
  9.             } else { 
  10.                 node.prev = t; 
  11.                 if (compareAndSetTail(t, node)) { 
  12.                     t.next = node; 
  13.                     return t; 
  14.                 } 
  15.             } 
  16.         } 
  17.     } 

通過addWaiter方法已經(jīng)把獲取鎖的線程通過封裝成一個NODE加入對列。上述方法的一個執(zhí)行流程圖如下:

接下來就是繼續(xù)執(zhí)行acquireQueued方法

 

acquireQueued

  1. final boolean acquireQueued(final Node node, int arg) { 
  2.     boolean failed = true
  3.     try { 
  4.         boolean interrupted = false
  5.         for (;;) { 
  6.              // 通過自旋去獲取鎖 前驅節(jié)點==head的時候去嘗試獲取鎖,這個方法在前面已經(jīng)分析過了。 
  7.             final Node p = node.predecessor(); 
  8.             if (p == head && tryAcquire(arg)) { 
  9.                 setHead(node); 
  10.                 p.next = null; // help GC 
  11.                 failed = false
  12.                 return interrupted; 
  13.             } 
  14.            // 進入這個if說明node的前驅節(jié)點不等于head 或者嘗試獲取鎖失敗了 
  15.            // 判斷是否需要掛起當前線程 
  16.             if (shouldParkAfterFailedAcquire(p, node) && 
  17.                 parkAndCheckInterrupt()) 
  18.                 interrupted = true
  19.         } 
  20.     } finally { 
  21.            // 異常情況進入cancelAcquire,在jdk11的時候這個源碼直接是catch (Throwable e){ cancelAcquire(node);} 簡單明了 
  22.         if (failed) 
  23.             cancelAcquire(node); 
  24.     } 

setHead

這個方法每當有一個node獲取到鎖了,就把當前node節(jié)點設置為頭節(jié)點,可以簡單的看做當前節(jié)點獲取到鎖了就把當前節(jié)點”移除“(變?yōu)轭^結點)隊列。

shouldParkAfterFailedAcquire

說到這個方法我們就要先看下NODE可能會有哪些狀態(tài)在源碼里面我們可以看到總共會有四種狀態(tài)

  • CANCELLED:值為1,在同步隊列中等待的線程等待超時或被中斷,需要從同步隊列中取消該Node的結點,其結點的waitStatus為CANCELLED,即結束狀態(tài),進入該狀態(tài)后的結點將不會再變化。
  • SIGNAL:值為-1,被標識為該等待喚醒狀態(tài)的后繼結點,當其前繼結點的線程釋放了同步鎖或被取消,將會通知該后繼結點的線程執(zhí)行。說白了,就是處于喚醒狀態(tài),只要前繼結點釋放鎖,就會通知標識為SIGNAL狀態(tài)的后繼結點的線程執(zhí)行。
  • CONDITION:值為-2,與Condition相關,該標識的結點處于等待隊列中,結點的線程等待在Condition上,當其他線程調用了Condition的signal()方法后,CONDITION狀態(tài)的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。
  • PROPAGATE:值為-3,與共享模式相關,在共享模式中,該狀態(tài)標識結點的線程處于可運行狀態(tài)。
  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 
  2.         int ws = pred.waitStatus; 
  3.         // 前驅節(jié)點狀態(tài) 如果這個狀態(tài)為-1 則返回true,把當前線程掛起 
  4.         if (ws == Node.SIGNAL) 
  5.             return true
  6.         // 大于0,說明狀態(tài)為CANCELLED  
  7.         if (ws > 0) { 
  8.             do { 
  9.                // 刪除被取消的node(讓被取消的node成為一個沒有引用的node等著下次GC被回收) 
  10.                 node.prev = pred = pred.prev; 
  11.             } while (pred.waitStatus > 0); 
  12.             pred.next = node; 
  13.         } else { 
  14.             // 進入這里只能是 0,-2,-3。NODE節(jié)點初始化的時候waitStatus默認值是0,所以只有這里才有修改waitStatus的地方 
  15.             // 通過cas 把前驅節(jié)點的狀態(tài)設置為-1,然后返回false ,外面調用這個方法的是個循環(huán),又會調用一次這個方法 
  16.             compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 
  17.         } 
  18.         return false
  19.     } 

parkAndCheckInterrupt

掛起當前線程,并且阻塞

  1. private final boolean parkAndCheckInterrupt() { 
  2.     LockSupport.park(this); // 掛起當前線程,阻塞 
  3.     return Thread.interrupted(); 

 

 


在這里插入圖片描述

 

 

解鎖

加鎖成功了,那鎖用完了就應該釋放鎖了,釋放鎖重點看下unparkSuccessor這個方法就好了

  1. private void unparkSuccessor(Node node) { 
  2.          // 頭結點狀態(tài) 
  3.        int ws = node.waitStatus; 
  4.        if (ws < 0) 
  5.            compareAndSetWaitStatus(node, ws, 0); 
  6.        Node s = node.next
  7.        // s==null head的successor節(jié)點獲取鎖成功后,執(zhí)行了head.next=null的操作后,解鎖線程讀取了head.next,因此s==null 
  8.        // head的successor節(jié)點被取消(cancelAcquire)時,執(zhí)行了如下操作:successor.waitStatus=1 ; successor.next = successor; 
  9.        if (s == null || s.waitStatus > 0) { 
  10.            s = null
  11.            // 從尾節(jié)點開始往前找,找到最前面的非取消的節(jié)點 這里沒有break 哦 
  12.            for (Node t = tail; t != null && t != node; t = t.prev) 
  13.                if (t.waitStatus <= 0) 
  14.                    s = t; 
  15.        } 
  16.        if (s != null
  17.             // 喚醒線程 ,喚醒的線程會從acquireQueued去獲取鎖 
  18.            LockSupport.unpark(s.thread); 
  19.    } 

釋放鎖代碼比較簡單,基本都寫在代碼注釋里面了,流程如下:

這段代碼里面有一個比較經(jīng)典的面試題:如果頭結點的下一個節(jié)點為空或者頭結點的下一個節(jié)點的狀態(tài)為取消的時候為什么要從后往前找,找到最前面非取消的節(jié)點?

 

  • node.prev = pred; compareAndSetTail(pred, node) 這兩個地方可以看作Tail入隊的原子操作,但是此時pred.next = node;還沒執(zhí)行,如果這個時候執(zhí)行了unparkSuccessor方法,就沒辦法從前往后找了,所以需要從后往前找。
  • 在產(chǎn)生CANCELLED狀態(tài)節(jié)點的時候,先斷開的是Next指針,Prev指針并未斷開,因此也是必須要從后往前遍歷才能夠遍歷完全部的Node

總結

reentrantLock的獲取鎖和釋放鎖基本就講完了,里面還涉及多比較多的細節(jié),感興趣的同學可以對著源碼一行一行去debug試試。

適當?shù)牧私鈇qs才能更好的學習CountDownLatch、CyclicBarrier、Sempahore,因為這三個利器都是基于aqs來實現(xiàn)的。

本文轉載自微信公眾號「java金融」,可以通過以下二維碼關注。轉載本文請聯(lián)系java金融公眾號。

 

責任編輯:武曉燕 來源: java金融
相關推薦

2021-03-11 00:05:55

Java高并發(fā)編程

2021-03-04 07:24:24

JavaSemaphore高并發(fā)

2021-03-18 00:14:29

JavaCyclicBarri高并發(fā)

2024-02-29 09:37:25

Java并發(fā)編程

2019-11-07 09:20:29

Java線程操作系統(tǒng)

2020-11-16 08:11:32

ReentrantLo

2021-08-05 07:58:22

并發(fā)編程包Task

2023-10-29 17:08:38

AQS線程

2011-07-05 14:42:46

java

2016-11-28 09:08:43

java系統(tǒng)異步非阻塞

2011-07-05 17:19:47

元編程

2016-11-28 08:40:17

系統(tǒng)降級服務

2016-11-25 00:45:37

隊列數(shù)據(jù)

2017-09-19 14:53:37

Java并發(fā)編程并發(fā)代碼設計

2016-11-28 09:00:10

瀏覽器瀏覽器緩存服務端

2021-07-03 17:44:34

并發(fā)高并發(fā)原子性

2011-06-13 10:41:17

JAVA

2025-02-17 00:00:25

Java并發(fā)編程

2025-02-19 00:05:18

Java并發(fā)編程

2016-11-28 08:58:43

系統(tǒng)限流算法
點贊
收藏

51CTO技術棧公眾號