Java高并發(fā)編程基礎之AQS
引言
曾經(jīng)有一道比較比較經(jīng)典的面試題“你能夠說說java的并發(fā)包下面有哪些常見的類?”大多數(shù)人應該都可以說出 CountDownLatch、CyclicBarrier、Sempahore多線程并發(fā)三大利器。這三大利器都是通過AbstractQueuedSynchronizer抽象類(下面簡寫AQS)來實現(xiàn)的,所以學習三大利器之前我們有必要先來學習下AQS。
AQS是一種提供了原子式管理同步狀態(tài)、阻塞和喚醒線程功能以及隊列模型的簡單框架”
AQS結構
說到同步我們如何來保證同步?大家第一印象肯定是加鎖了,說到鎖的話大家肯定首先會想到的是Synchronized。Synchronized大家應該基本上都會使用,加鎖和釋放鎖都是jvm 來幫我們實現(xiàn)的,我們只需要簡單的加個 Synchronized關鍵字就可以了。用起來超級方便。但是有沒有一種情況我們設置一個鎖的超時時間Synchronized就有點實現(xiàn)不了,這時候我們就可以用ReentrantLock來實現(xiàn),ReentrantLock是通過aqs來實現(xiàn)的,今天我們就通過ReentrantLock來學習一下aqs。
CAS && 公平鎖和非公平鎖
AQS里面用到了大量的CAS學習AQS之前我們還是有必要簡單的先了解下CAS、公平鎖和非公平鎖。
CAS
- CAS 全稱是 compare and swap,是一種用于在多線程環(huán)境下實現(xiàn)同步功能的機制。CAS 操作包含三個操作數(shù) :內存位置、預期數(shù)值和新值。CAS 的實現(xiàn)邏輯是將內存位置處的數(shù)值與預期數(shù)值相比較,若相等,則將內存位置處的值替換為新值。若不相等,則不做任何操作,這個操作是個原子性操作,java里面的AtomicInteger等類都是通過cas來實現(xiàn)的。
公平鎖和非公平鎖
- 公平鎖:多個線程按照申請鎖的順序去獲得鎖,線程會直接進入隊列去排隊,隊列中第一個才能獲得到鎖。優(yōu)點:等待鎖的線程不會餓死,每個線程都可以獲取到鎖。缺點:整體吞吐效率相對非公平鎖要低,等待隊列中除第一個線程以外的所有線程都會阻塞,CPU喚醒阻塞線程的開銷比非公平鎖大。
- 非公平鎖:多個線程去獲取鎖的時候,會直接去嘗試獲取,獲取不到,再去進入等待隊列,如果能獲取到,就直接獲取到鎖。優(yōu)點:可以減少CPU喚醒線程的開銷,整體的吞吐效率會高點,CPU也不必喚醒所有線程,會減少喚起線程的數(shù)量。缺點:處于等待隊列中的線程可能會餓死,或者等很久才會獲得鎖。文字有點拗口,我們來個實際的例子說明下。比如我們去食堂就餐的時候都要排隊,大家都按照先來后到的順序排隊打飯,這就是公平鎖。如果等到你準備拿盤子打飯的時候 直接蹦出了一個五大三粗的胖子插隊到你前面,你看打不贏他只能忍氣吞聲讓他插隊,等胖子打完飯了又來個小個子也來插你隊,這時候你沒法忍了,直接大吼一聲讓他滾,這個 小個子只能屁顛屁顛到隊尾去排隊了這就是非公平鎖。我們先來看看AQS有哪些屬性
- // 頭節(jié)點
- private transient volatile Node head;
- // 阻塞的尾節(jié)點,每個新的節(jié)點進來,都插入到最后,也就形成了一個鏈表
- private transient volatile Node tail;
- // 這個是最重要的,代表當前鎖的狀態(tài),0代表沒有被占用,大于 0 代表有線程持有當前鎖
- // 這個值可以大于 1,是因為鎖可以重入,每次重入都加上 1
- private volatile int state;
- // 代表當前持有獨占鎖的線程,舉個最重要的使用例子,因為鎖可以重入
- // reentrantLock.lock()可以嵌套調用多次,所以每次用這個來判斷當前線程是否已經(jīng)擁有了鎖
- // if (currentThread == getExclusiveOwnerThread()) {state++}
- private transient Thread exclusiveOwnerThread; //繼承自AbstractOwnableSynchronizer
下面我們來寫一個demo分析下lock 加鎖和釋放鎖的過程
- final void lock() {
- // 上來先試試直接把狀態(tài)置位1,如果此時沒人獲取鎖就直接
- if (compareAndSetState(0, 1))
- // 爭搶成功則修改獲得鎖狀態(tài)的線程
- setExclusiveOwnerThread(Thread.currentThread());
- else
- acquire(1);
- }
cas嘗試失敗,說明已經(jīng)有人再持有鎖,所以進入acquire方法
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
tryAcquire方法,看名字大概能猜出什么意思,就是試一試。tryAcquire實際上是調用了父類Sync的nonfairTryAcquire方法
- final boolean nonfairTryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
- // 獲取下當前鎖的狀態(tài)
- int c = getState();
- // 這個if 邏輯跟前面一進來就獲取鎖的邏輯一樣都是通過cas嘗試獲取下鎖
- if (c == 0) {
- if (compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- // 進入這個判斷說明 鎖重入了 狀態(tài)需要進行+1
- else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- // 如果鎖的重入次數(shù)大于int的最大值,直接就拋出異常了,正常情況應該不存在這種情況,不過jdk還是嚴謹?shù)?nbsp;
- if (nextc < 0) // overflow
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- // 返回false 說明嘗試獲取鎖失敗了,失敗了就要進行acquireQueued方法了
- return false;
- }
tryAcquire方法如果獲取鎖失敗了,那么肯定就要排隊等待獲取鎖。排隊的線程需要待在哪里等待獲取鎖?這個就跟我們線程池執(zhí)行任務一樣,線程池把任務都封裝成一個work,然后當線程處理任務不過來的時候,就把任務放到隊列里面。AQS同樣也是類似的,把排隊等待獲取鎖的線程封裝成一個NODE。然后再把NODE放入到一個隊列里面。隊列如下所示,不過需要注意一點head是不存NODE的。
接下來我們繼續(xù)分析源碼,看下獲取鎖失敗是如何被加入隊列的。就要執(zhí)行acquireQueued方法,執(zhí)行acquireQueued方法之前需要先執(zhí)行addWaiter方法
- private Node addWaiter(Node mode) {
- Node node = new Node(Thread.currentThread(), mode);
- // Try the fast path of enq; backup to full enq on failure
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- // cas 加入隊列隊尾
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- // 尾結點不為空 || cas 加入尾結點失敗
- enq(node);
- return node;
- }
enq
接下來再看看enq方法
- // 通過自旋和CAS一定要當前node加入隊尾
- private Node enq(final Node node) {
- for (;;) {
- Node t = tail;
- // 尾結點為空說明隊列還是空的,還沒有被初始化,所以初始化頭結點,可以看到頭結點的node 是沒有綁定線程的也就是不存數(shù)據(jù)的
- if (t == null) { // Must initialize
- if (compareAndSetHead(new Node()))
- tail = head;
- } else {
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
通過addWaiter方法已經(jīng)把獲取鎖的線程通過封裝成一個NODE加入對列。上述方法的一個執(zhí)行流程圖如下:
接下來就是繼續(xù)執(zhí)行acquireQueued方法
acquireQueued
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- // 通過自旋去獲取鎖 前驅節(jié)點==head的時候去嘗試獲取鎖,這個方法在前面已經(jīng)分析過了。
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- // 進入這個if說明node的前驅節(jié)點不等于head 或者嘗試獲取鎖失敗了
- // 判斷是否需要掛起當前線程
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- // 異常情況進入cancelAcquire,在jdk11的時候這個源碼直接是catch (Throwable e){ cancelAcquire(node);} 簡單明了
- if (failed)
- cancelAcquire(node);
- }
- }
setHead
這個方法每當有一個node獲取到鎖了,就把當前node節(jié)點設置為頭節(jié)點,可以簡單的看做當前節(jié)點獲取到鎖了就把當前節(jié)點”移除“(變?yōu)轭^結點)隊列。
shouldParkAfterFailedAcquire
說到這個方法我們就要先看下NODE可能會有哪些狀態(tài)在源碼里面我們可以看到總共會有四種狀態(tài)
- CANCELLED:值為1,在同步隊列中等待的線程等待超時或被中斷,需要從同步隊列中取消該Node的結點,其結點的waitStatus為CANCELLED,即結束狀態(tài),進入該狀態(tài)后的結點將不會再變化。
- SIGNAL:值為-1,被標識為該等待喚醒狀態(tài)的后繼結點,當其前繼結點的線程釋放了同步鎖或被取消,將會通知該后繼結點的線程執(zhí)行。說白了,就是處于喚醒狀態(tài),只要前繼結點釋放鎖,就會通知標識為SIGNAL狀態(tài)的后繼結點的線程執(zhí)行。
- CONDITION:值為-2,與Condition相關,該標識的結點處于等待隊列中,結點的線程等待在Condition上,當其他線程調用了Condition的signal()方法后,CONDITION狀態(tài)的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。
- PROPAGATE:值為-3,與共享模式相關,在共享模式中,該狀態(tài)標識結點的線程處于可運行狀態(tài)。
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int ws = pred.waitStatus;
- // 前驅節(jié)點狀態(tài) 如果這個狀態(tài)為-1 則返回true,把當前線程掛起
- if (ws == Node.SIGNAL)
- return true;
- // 大于0,說明狀態(tài)為CANCELLED
- if (ws > 0) {
- do {
- // 刪除被取消的node(讓被取消的node成為一個沒有引用的node等著下次GC被回收)
- node.prev = pred = pred.prev;
- } while (pred.waitStatus > 0);
- pred.next = node;
- } else {
- // 進入這里只能是 0,-2,-3。NODE節(jié)點初始化的時候waitStatus默認值是0,所以只有這里才有修改waitStatus的地方
- // 通過cas 把前驅節(jié)點的狀態(tài)設置為-1,然后返回false ,外面調用這個方法的是個循環(huán),又會調用一次這個方法
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false;
- }
parkAndCheckInterrupt
掛起當前線程,并且阻塞
- private final boolean parkAndCheckInterrupt() {
- LockSupport.park(this); // 掛起當前線程,阻塞
- return Thread.interrupted();
- }
在這里插入圖片描述
解鎖
加鎖成功了,那鎖用完了就應該釋放鎖了,釋放鎖重點看下unparkSuccessor這個方法就好了
- private void unparkSuccessor(Node node) {
- // 頭結點狀態(tài)
- int ws = node.waitStatus;
- if (ws < 0)
- compareAndSetWaitStatus(node, ws, 0);
- Node s = node.next;
- // s==null head的successor節(jié)點獲取鎖成功后,執(zhí)行了head.next=null的操作后,解鎖線程讀取了head.next,因此s==null
- // head的successor節(jié)點被取消(cancelAcquire)時,執(zhí)行了如下操作:successor.waitStatus=1 ; successor.next = successor;
- if (s == null || s.waitStatus > 0) {
- s = null;
- // 從尾節(jié)點開始往前找,找到最前面的非取消的節(jié)點 這里沒有break 哦
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- if (s != null)
- // 喚醒線程 ,喚醒的線程會從acquireQueued去獲取鎖
- LockSupport.unpark(s.thread);
- }
釋放鎖代碼比較簡單,基本都寫在代碼注釋里面了,流程如下:
這段代碼里面有一個比較經(jīng)典的面試題:如果頭結點的下一個節(jié)點為空或者頭結點的下一個節(jié)點的狀態(tài)為取消的時候為什么要從后往前找,找到最前面非取消的節(jié)點?
- node.prev = pred; compareAndSetTail(pred, node) 這兩個地方可以看作Tail入隊的原子操作,但是此時pred.next = node;還沒執(zhí)行,如果這個時候執(zhí)行了unparkSuccessor方法,就沒辦法從前往后找了,所以需要從后往前找。
- 在產(chǎn)生CANCELLED狀態(tài)節(jié)點的時候,先斷開的是Next指針,Prev指針并未斷開,因此也是必須要從后往前遍歷才能夠遍歷完全部的Node
總結
reentrantLock的獲取鎖和釋放鎖基本就講完了,里面還涉及多比較多的細節(jié),感興趣的同學可以對著源碼一行一行去debug試試。
適當?shù)牧私鈇qs才能更好的學習CountDownLatch、CyclicBarrier、Sempahore,因為這三個利器都是基于aqs來實現(xiàn)的。
本文轉載自微信公眾號「java金融」,可以通過以下二維碼關注。轉載本文請聯(lián)系java金融公眾號。