自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

JUC解析-AQS抽象隊(duì)列同步器

開發(fā) 前端
tryRelease方法 這個(gè)動(dòng)作可以認(rèn)為就是一個(gè)設(shè)置鎖狀態(tài)的操作,而且是將狀態(tài)減掉傳入的參數(shù)值(參數(shù)是1),如果結(jié)果狀態(tài)為0,就將排它鎖的Owner設(shè)置為null,以使得其它的線程有機(jī)會(huì)進(jìn)行執(zhí)行。

 [[392882]]

抽象隊(duì)列同步器(AQS-AbstractQueuedSynchronizer)

從名字上來理解:

  • 抽象:是抽象類,具體由子類實(shí)現(xiàn)
  • 隊(duì)列:數(shù)據(jù)結(jié)構(gòu)是隊(duì)列,使用隊(duì)列存儲(chǔ)數(shù)據(jù)
  • 同步:基于它可以實(shí)現(xiàn)同步功能

我們就從這幾個(gè)方面來入手解讀,但首先,我們得先知道以下幾個(gè)它的特點(diǎn),以便于理解

AbstractQueuedSynchronizer特點(diǎn)

1.AQS可以實(shí)現(xiàn)獨(dú)占鎖和共享鎖。

2.獨(dú)占鎖exclusive是一個(gè)悲觀鎖。保證只有一個(gè)線程經(jīng)過一個(gè)阻塞點(diǎn),只有一個(gè)線程可以獲得鎖。

3.共享鎖shared是一個(gè)樂觀鎖??梢栽试S多個(gè)線程阻塞點(diǎn),可以多個(gè)線程同時(shí)獲取到鎖。它允許一個(gè)資源可以被多個(gè)讀操作,或者被一個(gè)寫操作訪問,但是兩個(gè)操作不能同時(shí)訪問。

4.AQS使用一個(gè)int類型的成員變量state來表示同步狀態(tài),當(dāng)state>0時(shí)表示已經(jīng)獲取了鎖,當(dāng)state = 0無鎖。它提供了三個(gè)方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))來對(duì)同步狀態(tài)state進(jìn)行操作,可以確保對(duì)state的操作是安全的。

5.AQS是通過一個(gè)CLH隊(duì)列實(shí)現(xiàn)的(CLH鎖即Craig, Landin, and Hagersten (CLH) locks,CLH鎖是一個(gè)自旋鎖,能確保無饑餓性,提供先來先服務(wù)的公平性。CLH鎖也是一種基于鏈表的可擴(kuò)展、高性能、公平的自旋鎖,申請線程只在本地變量上自旋,它不斷輪詢前驅(qū)的狀態(tài),如果發(fā)現(xiàn)前驅(qū)釋放了鎖就結(jié)束自旋。)

抽象

我們來扒一扒源碼可以看到它繼承于AbstractOwnableSynchronizer它是一個(gè)抽象類.

  1. public abstract class AbstractQueuedSynchronizer 
  2.     extends AbstractOwnableSynchronizer 
  3.     implements java.io.Serializable 

AQS內(nèi)部使用了一個(gè)volatile的變量state來作為資源的標(biāo)識(shí)。同時(shí)定義了幾個(gè)獲取和改變state的protected方法,子類可以覆蓋這些方法來實(shí)現(xiàn)自己的邏輯.

可以看到類中為我們提供了幾個(gè)protected級(jí)別的方法,它們分別是:

  1. //創(chuàng)建一個(gè)隊(duì)列同步器實(shí)例,初始state是0 
  2. protected AbstractQueuedSynchronizer() { } 
  3.  
  4. //返回同步狀態(tài)的當(dāng)前值。 
  5. protected final int getState() { 
  6.         return state; 
  7.  
  8. //設(shè)置同步狀態(tài)的值 
  9. protected final void setState(int newState) { 
  10.         state = newState; 
  11.     } 
  12.  
  13. //獨(dú)占方式。嘗試獲取資源,成功則返回true,失敗則返回false。 
  14. protected boolean tryAcquire(int arg) { 
  15.         throw new UnsupportedOperationException(); 
  16.     }     
  17.      
  18.      
  19. //獨(dú)占方式。嘗試釋放資源,成功則返回true,失敗則返回false。 
  20. protected boolean tryRelease(int arg) { 
  21.         throw new UnsupportedOperationException(); 
  22.     } 
  23.      
  24.      
  25. //共享方式。嘗試獲取資源。負(fù)數(shù)表示失?。?表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源 
  26. protected int tryAcquireShared(int arg) { 
  27.         throw new UnsupportedOperationException(); 
  28.     }     
  29.      
  30.      
  31. //共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點(diǎn)返回true,否則返回false。 
  32. protected boolean tryReleaseShared(int arg) { 
  33.         throw new UnsupportedOperationException(); 
  34.     } 
  35.      

這些方法雖然都是protected方法,但是它們并沒有在AQS具體實(shí)現(xiàn),而是直接拋出異常,AQS實(shí)現(xiàn)了一系列主要的邏輯 由此可知,AQS是一個(gè)抽象的用于構(gòu)建鎖和同步器的框架,使用AQS能簡單且高效地構(gòu)造出應(yīng)用廣泛的同步器,比如我們提到的ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,F(xiàn)utureTask等等皆是基于AQS的。

我們自己也能利用AQS非常輕松容易地構(gòu)造出自定義的同步器,只要子類實(shí)現(xiàn)它的幾個(gè)protected方法就可以了.

隊(duì)列

AQS類本身實(shí)現(xiàn)的是具體線程等待隊(duì)列的維護(hù)(如獲取資源失敗入隊(duì)/喚醒出隊(duì)等)。它內(nèi)部使用了一個(gè)先進(jìn)先出(FIFO)的雙端隊(duì)列(CLH),并使用了兩個(gè)指針head和tail用于標(biāo)識(shí)隊(duì)列的頭部和尾部。其數(shù)據(jù)結(jié)構(gòu)如圖:

隊(duì)列并不是直接儲(chǔ)存線程,而是儲(chǔ)存擁有線程的Node節(jié)點(diǎn)。

我們來看看Node的結(jié)構(gòu):

  1. static final class Node { 
  2.     // 標(biāo)記一個(gè)結(jié)點(diǎn)(對(duì)應(yīng)的線程)在共享模式下等待 
  3.     static final Node SHARED = new Node(); 
  4.     // 標(biāo)記一個(gè)結(jié)點(diǎn)(對(duì)應(yīng)的線程)在獨(dú)占模式下等待 
  5.     static final Node EXCLUSIVE = null;  
  6.  
  7.     // waitStatus的值,表示該結(jié)點(diǎn)(對(duì)應(yīng)的線程)已被取消 
  8.     static final int CANCELLED = 1;  
  9.     // waitStatus的值,表示后繼結(jié)點(diǎn)(對(duì)應(yīng)的線程)需要被喚醒 
  10.     static final int SIGNAL = -1; 
  11.     // waitStatus的值,表示該結(jié)點(diǎn)(對(duì)應(yīng)的線程)在等待某一條件 
  12.     static final int CONDITION = -2; 
  13.      
  14.     //waitStatus的值,表示有資源可用,新head結(jié)點(diǎn)需要繼續(xù)喚醒后繼結(jié)點(diǎn) 
  15.     //(共享模式下,多線程并發(fā)釋放資源,而head喚醒其后繼結(jié)點(diǎn)后, 
  16.     //需要把多出來的資源留給后面的結(jié)點(diǎn);設(shè)置新的head結(jié)點(diǎn)時(shí),會(huì)繼續(xù)喚醒其后繼結(jié)點(diǎn)) 
  17.     static final int PROPAGATE = -3; 
  18.  
  19.     // 等待狀態(tài),取值范圍,-3,-2,-1,0,1 
  20.     volatile int waitStatus; 
  21.     volatile Node prev; // 前驅(qū)結(jié)點(diǎn) 
  22.     volatile Node next; // 后繼結(jié)點(diǎn) 
  23.     volatile Thread thread; // 結(jié)點(diǎn)對(duì)應(yīng)的線程 
  24.     Node nextWaiter; // 等待隊(duì)列里下一個(gè)等待條件的結(jié)點(diǎn) 
  25.  
  26.  
  27.     // 判斷共享模式的方法 
  28.     final boolean isShared() { 
  29.         return nextWaiter == SHARED; 
  30.     } 
  31.  
  32.     Node(Thread thread, Node mode) {     // Used by addWaiter 
  33.         this.nextWaiter = mode; 
  34.         this.thread = thread; 
  35.     } 
  36.  
  37.     // 其它方法忽略,可以參考具體的源碼 
  38.  
  39. // AQS里面的addWaiter私有方法 
  40. private Node addWaiter(Node mode) { 
  41.     // 使用了Node的這個(gè)構(gòu)造函數(shù) 
  42.     Node node = new Node(Thread.currentThread(), mode); 
  43.     // 其它代碼省略 

過Node我們可以實(shí)現(xiàn)兩個(gè)隊(duì)列,一是通過prev和next實(shí)現(xiàn)CLH隊(duì)列(線程同步隊(duì)列,雙向隊(duì)列),二是nextWaiter實(shí)現(xiàn)Condition條件上的等待線程隊(duì)列(單向隊(duì)列),這個(gè)Condition主要用在ReentrantLock類中

同步

兩種同步方式:

  • 獨(dú)占模式(Exclusive):資源是獨(dú)占的,一次只能一個(gè)線程獲取。如ReentrantLock。
  • 共享模式(Share):同時(shí)可以被多個(gè)線程獲取,具體的資源個(gè)數(shù)可以通過參數(shù)指定。如Semaphore/CountDownLatch。

同時(shí)實(shí)現(xiàn)兩種模式的同步類,如ReadWriteLock

獲取資源

獲取資源的入口是acquire(int arg)方法。arg是要獲取的資源的個(gè)數(shù),在獨(dú)占模式下始終為1。

  1. public final void acquire(int arg) { 
  2.     if (!tryAcquire(arg) && 
  3.         acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
  4.         selfInterrupt(); 

首先調(diào)用tryAcquire(arg)嘗試去獲取資源。前面提到了這個(gè)方法是在子類具體實(shí)現(xiàn)的如果獲取資源失敗,就通過addWaiter(Node.EXCLUSIVE)方法把這個(gè)線程插入到等待隊(duì)列中。其中傳入的參數(shù)代表要插入的Node是獨(dú)占式的。這個(gè)方法的具體實(shí)現(xiàn):

  1. private Node addWaiter(Node mode) { 
  2.     // 生成該線程對(duì)應(yīng)的Node節(jié)點(diǎn) 
  3.     Node node = new Node(Thread.currentThread(), mode); 
  4.     // 將Node插入隊(duì)列中 
  5.     Node pred = tail; 
  6.     if (pred != null) { 
  7.         node.prev = pred; 
  8.         // 使用CAS嘗試,如果成功就返回 
  9.         if (compareAndSetTail(pred, node)) { 
  10.             pred.next = node; 
  11.             return node; 
  12.         } 
  13.     } 
  14.     // 如果等待隊(duì)列為空或者上述CAS失敗,再自旋CAS插入 
  15.     enq(node); 
  16.     return node; 
  17.  
  18. //AQS中會(huì)存在多個(gè)線程同時(shí)爭奪資源的情況, 
  19. //因此肯定會(huì)出現(xiàn)多個(gè)線程同時(shí)插入節(jié)點(diǎn)的操作, 
  20. //在這里是通過CAS自旋的方式保證了操作的線程安全性。 
  21.  
  22. // 自旋CAS插入等待隊(duì)列 
  23. private Node enq(final Node node) { 
  24.     for (;;) { 
  25.         Node t = tail; 
  26.         if (t == null) { // Must initialize 
  27.             if (compareAndSetHead(new Node())) 
  28.                 tail = head; 
  29.         } else { 
  30.             node.prev = t; 
  31.             if (compareAndSetTail(t, node)) { 
  32.                 t.next = node; 
  33.                 return t; 
  34.             } 
  35.         } 
  36.     } 

若設(shè)置成功就代表自己獲取到了鎖,返回true。狀態(tài)為0設(shè)置1的動(dòng)作在外部就有做過一次,內(nèi)部再一次做只是提升概率,而且這樣的操作相對(duì)鎖來講不占開銷。如果狀態(tài)不是0,則判定當(dāng)前線程是否為排它鎖的Owner,如果是Owner則嘗試將狀態(tài)增加acquires(也就是增加1),如果這個(gè)狀態(tài)值越界,則會(huì)拋出異常提示,若沒有越界,將狀態(tài)設(shè)置進(jìn)去后返回true(實(shí)現(xiàn)了類似于偏向的功能,可重入,但是無需進(jìn)一步征用)。如果狀態(tài)不是0,且自身不是owner,則返回false。

現(xiàn)在通過addWaiter方法,已經(jīng)把一個(gè)Node放到等待隊(duì)列尾部了。而處于等待隊(duì)列的結(jié)點(diǎn)是從頭結(jié)點(diǎn)一個(gè)一個(gè)去獲取資源的。具體的實(shí)現(xiàn)我們來看看acquireQueued方法:

  1. final boolean acquireQueued(final Node node, int arg) { 
  2.     boolean failed = true
  3.     try { 
  4.         boolean interrupted = false
  5.         // 自旋 
  6.         for (;;) { 
  7.             final Node p = node.predecessor(); 
  8.             // 如果node的前驅(qū)結(jié)點(diǎn)p是head,表示node是第二個(gè)結(jié)點(diǎn),就可以嘗試去獲取資源了 
  9.             if (p == head && tryAcquire(arg)) { 
  10.                 // 拿到資源后,將head指向該結(jié)點(diǎn)。 
  11.                 // 所以head所指的結(jié)點(diǎn),就是當(dāng)前獲取到資源的那個(gè)結(jié)點(diǎn)或null。 
  12.                 setHead(node);  
  13.                 p.next = null; // help GC 
  14.                 failed = false
  15.                 return interrupted; 
  16.             } 
  17.             // 如果自己可以休息了,就進(jìn)入waiting狀態(tài),直到被unpark() 
  18.             if (shouldParkAfterFailedAcquire(p, node) && 
  19.                 parkAndCheckInterrupt()) 
  20.                 interrupted = true
  21.         } 
  22.     } finally { 
  23.         if (failed) 
  24.             cancelAcquire(node); 
  25.     } 

這里parkAndCheckInterrupt方法內(nèi)部使用到了LockSupport.park(this),順便簡單介紹一下park。LockSupport類是Java 6 引入的一個(gè)類,提供了基本的線程同步原語。LockSupport實(shí)際上是調(diào)用了Unsafe類里的函數(shù),歸結(jié)到Unsafe里,只有兩個(gè)函數(shù):park(boolean isAbsolute, long time):阻塞當(dāng)前線程 unpark(Thread jthread):使給定的線程停止阻塞

所以結(jié)點(diǎn)進(jìn)入等待隊(duì)列后,是調(diào)用park使它進(jìn)入阻塞狀態(tài)的。只有頭結(jié)點(diǎn)的線程是處于活躍狀態(tài)的。

acquire方法 獲取資源的流程:

當(dāng)然,獲取資源的方法除了acquire外,還有以下三個(gè):

  • acquireInterruptibly:申請可中斷的資源(獨(dú)占模式)
  • acquireShared:申請共享模式的資源
  • acquireSharedInterruptibly:申請可中斷的資源(共享模式)

可中斷的意思是,在線程中斷時(shí)可能會(huì)拋出InterruptedException

釋放資源

釋放資源相比于獲取資源來說,會(huì)簡單許多。在AQS中只有一小段實(shí)現(xiàn)。

源碼:

  1. public final boolean release(int arg) { 
  2.     if (tryRelease(arg)) { 
  3.         Node h = head; 
  4.         if (h != null && h.waitStatus != 0) 
  5.             unparkSuccessor(h); 
  6.         return true
  7.     } 
  8.     return false

tryRelease方法 這個(gè)動(dòng)作可以認(rèn)為就是一個(gè)設(shè)置鎖狀態(tài)的操作,而且是將狀態(tài)減掉傳入的參數(shù)值(參數(shù)是1),如果結(jié)果狀態(tài)為0,就將排它鎖的Owner設(shè)置為null,以使得其它的線程有機(jī)會(huì)進(jìn)行執(zhí)行。

在排它鎖中,加鎖的時(shí)候狀態(tài)會(huì)增加1(當(dāng)然可以自己修改這個(gè)值),在解鎖的時(shí)候減掉1,同一個(gè)鎖,在可以重入后,可能會(huì)被疊加為2、3、4這些值,只有unlock()的次數(shù)與lock()的次數(shù)對(duì)應(yīng)才會(huì)將Owner線程設(shè)置為空,而且也只有這種情況下才會(huì)返回true。這一點(diǎn)大家寫代碼要注意,如果是在循環(huán)體中l(wèi)ock()或故意使用兩次以上的lock(),而最終只有一次unlock(),最終可能無法釋放鎖。導(dǎo)致死鎖.

  1. private void unparkSuccessor(Node node) { 
  2.     // 如果狀態(tài)是負(fù)數(shù),嘗試把它設(shè)置為0 
  3.     int ws = node.waitStatus; 
  4.     if (ws < 0) 
  5.         compareAndSetWaitStatus(node, ws, 0); 
  6.     // 得到頭結(jié)點(diǎn)的后繼結(jié)點(diǎn)head.next 
  7.     Node s = node.next
  8.     // 如果這個(gè)后繼結(jié)點(diǎn)為空或者狀態(tài)大于0 
  9.     // 通過前面的定義我們知道,大于0只有一種可能,就是這個(gè)結(jié)點(diǎn)已被取消 
  10.     if (s == null || s.waitStatus > 0) { 
  11.         s = null
  12.         // 等待隊(duì)列中所有還有用的結(jié)點(diǎn),都向前移動(dòng) 
  13.         for (Node t = tail; t != null && t != node; t = t.prev) 
  14.             if (t.waitStatus <= 0) 
  15.                 s = t; 
  16.     } 
  17.     // 如果后繼結(jié)點(diǎn)不為空, 
  18.     if (s != null
  19.         LockSupport.unpark(s.thread); 

方法unparkSuccessor(Node),意味著真正要釋放鎖了,它傳入的是head節(jié)點(diǎn),內(nèi)部首先會(huì)發(fā)生的動(dòng)作是獲取head節(jié)點(diǎn)的next節(jié)點(diǎn),如果獲取到的節(jié)點(diǎn)不為空,則直接通過:“LockSupport.unpark()”方法來釋放對(duì)應(yīng)的被掛起的線程.

責(zé)任編輯:武曉燕 來源: java寶典
相關(guān)推薦

2020-11-16 08:11:32

ReentrantLo

2021-04-12 08:21:48

AQSjavajvm

2025-01-13 09:24:32

2024-01-12 07:38:38

AQS原理JUC

2020-05-06 09:10:46

AQS同步器CAS

2020-06-18 10:50:56

Java并發(fā)同步器

2021-05-18 06:55:07

Java AQS源碼

2024-01-22 08:52:00

AQS雙異步數(shù)據(jù)一致性

2012-06-05 02:12:55

Java多線程

2020-10-16 08:26:38

AQS通信協(xié)作

2024-10-18 11:29:15

2021-05-11 10:40:29

JUCAQSJava

2021-05-12 15:16:17

JUCAQSJava

2023-04-14 08:39:01

AQS方法JDK5

2010-03-03 17:58:16

Python同步隊(duì)列

2024-02-01 13:03:00

AI模型

2025-02-06 08:24:25

AQS開發(fā)Java

2022-11-11 10:48:55

AQS源碼架構(gòu)

2021-05-27 07:54:21

JavaStateAQS

2024-06-14 09:53:02

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)