靈魂拷問,AQS 是個啥???
啥?你連 AQS 是啥都不知道?
如果想要精通 Java 并發(fā)的話, AQS 是一定要掌握的。今天跟著阿粉一起搞一搞。
基本概念
AQS 是 AbstractQueuedSynchronizer 的簡稱,翻譯成中文就是 抽象隊(duì)列同步器 ,這三個單詞分開來看:
- Abstract (抽象):也就是說, AQS 是一個抽象類,只實(shí)現(xiàn)一些主要的邏輯,有些方法推遲到子類實(shí)現(xiàn)
- Queued (隊(duì)列):隊(duì)列有啥特征呢?先進(jìn)先出( FIFO )對吧?也就是說, AQS 是用先進(jìn)先出隊(duì)列來存儲數(shù)據(jù)的
- Synchronizer (同步):即 AQS 實(shí)現(xiàn)同步功能
以上概括一下, AQS 是一個用來構(gòu)建鎖和同步器的框架,使用 AQS 能簡單而又高效地構(gòu)造出同步器。
AQS 內(nèi)部實(shí)現(xiàn)
AQS 隊(duì)列在內(nèi)部維護(hù)了一個 FIFO 的雙向鏈表,如果對數(shù)據(jù)結(jié)構(gòu)比較熟的話,應(yīng)該很容易就能想到,在雙向鏈表中,每個節(jié)點(diǎn)都有兩個指針,分別指向直接前驅(qū)節(jié)點(diǎn)和直接后繼節(jié)點(diǎn)。使用雙向鏈表的優(yōu)點(diǎn)之一,就是從任意一個節(jié)點(diǎn)開始都很容易訪問它的前驅(qū)節(jié)點(diǎn)和后繼節(jié)點(diǎn)。
在 AQS 中,每個 Node 其實(shí)就是一個線程封裝,當(dāng)線程在競爭鎖失敗之后,會封裝成 Node 加入到 AQS 隊(duì)列中;獲取鎖的線程釋放鎖之后,會從隊(duì)列中喚醒一個阻塞的 Node (也就是線程)
AQS 使用 volatile 的變量 state 來作為資源的標(biāo)識:
- private volatile int state;
關(guān)于 state 狀態(tài)的讀取與修改,子類可以通過覆蓋 getState() 和 setState() 方法來實(shí)現(xiàn)自己的邏輯,其中比較重要的是:
- // 傳入期望值 expect ,想要修改的值 update ,然后通過 Unsafe 的 compareAndSwapInt() 即 CAS 操作來實(shí)現(xiàn)
- protected final boolean compareAndSetState(int expect, int update) {
- // See below for intrinsics setup to support this
- return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
- }
下面是 AQS 中兩個重要的成員變量:
- private transient volatile Node head; // 頭結(jié)點(diǎn)
- private transient volatile Node tail; // 尾節(jié)點(diǎn)
關(guān)于 AQS 維護(hù)的雙向鏈表,在源碼中是這樣解釋的:
- The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue. CLH locks are normally used for spinlocks. We instead use them for blocking synchronizers, but use the same basic tactic of holding some of the control information about a thread in the predecessor of its node.
也就是 AQS 的等待隊(duì)列是 “CLH” 鎖定隊(duì)列的變體
直接來一張圖會更形象一些:
Node 節(jié)點(diǎn)維護(hù)的是線程,控制線程的一些操作,具體來看看是 Node 是怎么做的:
- static final class Node {
- /** Marker to indicate a node is waiting in shared mode */
- // 標(biāo)記一個節(jié)點(diǎn),在 共享模式 下等待
- static final Node SHARED = new Node();
- /** Marker to indicate a node is waiting in exclusive mode */
- // 標(biāo)記一個節(jié)點(diǎn),在 獨(dú)占模式 下等待
- static final Node EXCLUSIVE = null;
- /** waitStatus value to indicate thread has cancelled */
- // waitStatus 的值,表示該節(jié)點(diǎn)從隊(duì)列中取消
- static final int CANCELLED = 1;
- /** waitStatus value to indicate successor's thread needs unparking */
- // waitStatus 的值,表示后繼節(jié)點(diǎn)在等待喚醒
- // 只有處于 signal 狀態(tài)的節(jié)點(diǎn),才能被喚醒
- static final int SIGNAL = -1;
- /** waitStatus value to indicate thread is waiting on condition */
- // waitStatus 的值,表示該節(jié)點(diǎn)在等待一些條件
- static final int CONDITION = -2;
- /**
- * waitStatus value to indicate the next acquireShared should
- * unconditionally propagate
- */
- // waitStatus 的值,表示有資源可以使用,新 head 節(jié)點(diǎn)需要喚醒后繼節(jié)點(diǎn)
- // 如果是在共享模式下,同步狀態(tài)應(yīng)該無條件傳播下去
- static final int PROPAGATE = -3;
- // 節(jié)點(diǎn)狀態(tài),取值為 -3,-2,-1,0,1
- volatile int waitStatus;
- // 前驅(qū)節(jié)點(diǎn)
- volatile Node prev;
- // 后繼節(jié)點(diǎn)
- volatile Node next;
- // 節(jié)點(diǎn)所對應(yīng)的線程
- volatile Thread thread;
- // condition 隊(duì)列中的后繼節(jié)點(diǎn)
- Node nextWaiter;
- // 判斷是否是共享模式
- final boolean isShared() {
- return nextWaiter == SHARED;
- }
- /**
- * 返回前驅(qū)節(jié)點(diǎn)
- */
- final Node predecessor() throws NullPointerException {
- Node p = prev;
- if (p == null)
- throw new NullPointerException();
- else
- return p;
- }
- Node() { // Used to establish initial head or SHARED marker
- }
- /**
- * 將線程構(gòu)造成一個 Node 節(jié)點(diǎn),然后添加到 condition 隊(duì)列中
- */
- Node(Thread thread, Node mode) { // Used by addWaiter
- this.nextWaiter = mode;
- this.thread = thread;
- }
- /**
- * 等待隊(duì)列用到的方法
- */
- Node(Thread thread, int waitStatus) { // Used by Condition
- this.waitStatus = waitStatus;
- this.thread = thread;
- }
- }
AQS 如何獲取資源
在 AQS 中,獲取資源的入口是 acquire(int arg) 方法,其中 arg 是獲取資源的個數(shù),來看下代碼:
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
在獲取資源時,會首先調(diào)用 tryAcquire 方法,這個方法是在子類中具體實(shí)現(xiàn)的
如果通過 tryAcquire 獲取資源失敗,接下來會通過 addWaiter(Node.EXCLUSIVE) 方法,將這個線程插入到等待隊(duì)列中,具體代碼:
- private Node addWaiter(Node mode) {
- // 生成該線程所對應(yīng)的 Node 節(jié)點(diǎn)
- Node node = new Node(Thread.currentThread(), mode);
- // 將 Node 插入到隊(duì)列中
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- // 使用 CAS 操作,如果成功就返回
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- // 如果 pred == null 或者 CAS 操作失敗,則調(diào)用 enq 方法再次自旋插入
- enq(node);
- return node;
- }
- // 自旋 CAS 插入等待隊(duì)列
- private Node enq(final Node node) {
- for (;;) {
- Node t = tail;
- if (t == null) { // Must initialize
- if (compareAndSetHead(new Node()))
- tail = head;
- } else {
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
在上面能夠看到使用的是 CAS 自旋插入,這是因?yàn)樵?AQS 中會存在多個線程同時競爭資源的情況,進(jìn)而一定會出現(xiàn)多個線程同時插入節(jié)點(diǎn)的操作,這里使用 CAS 自旋插入是為了保證操作的線程安全性
現(xiàn)在呢,申請 acquire(int arg) 方法,然后通過調(diào)用 addWaiter 方法,將一個 Node 插入到了隊(duì)列尾部。處于等待隊(duì)列節(jié)點(diǎn)是從頭結(jié)點(diǎn)開始一個一個的去獲取資源,獲取資源方式如下:
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- // 如果 Node 的前驅(qū)節(jié)點(diǎn) p 是 head,說明 Node 是第二個節(jié)點(diǎn),那么它就可以嘗試獲取資源
- if (p == head && tryAcquire(arg)) {
- // 如果資源獲取成功,則將 head 指向自己
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- // 節(jié)點(diǎn)進(jìn)入等待隊(duì)列后,調(diào)用 shouldParkAfterFailedAcquire 或者 parkAndCheckInterrupt 方法
- // 進(jìn)入阻塞狀態(tài),即只有頭結(jié)點(diǎn)的線程處于活躍狀態(tài)
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
在獲取資源時,除了 acquire 之外,還有三個方法:
- acquireInterruptibly :申請可中斷的資源(獨(dú)占模式)
- acquireShared :申請共享模式的資源
- acquireSharedInterruptibly :申請可中斷的資源(共享模式)
到這里,關(guān)于 AQS 如何獲取資源就說的差不多了,接下來看看 AQS 是如何釋放資源的
AQS 如何釋放資源
釋放資源相對于獲取資源來說,簡單了很多。源碼如下:
- public final boolean release(int arg) {
- // 如果釋放鎖成功
- if (tryRelease(arg)) {
- // 獲取 AQS 隊(duì)列中的頭結(jié)點(diǎn)
- Node h = head;
- // 如果頭結(jié)點(diǎn)不為空,且狀態(tài) != 0
- if (h != null && h.waitStatus != 0)
- // 調(diào)用 unparkSuccessor(h) 方法,喚醒后續(xù)節(jié)點(diǎn)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
- private void unparkSuccessor(Node node) {
- int ws = node.waitStatus;
- // 如果狀態(tài)是負(fù)數(shù),嘗試將它改為 0
- if (ws < 0)
- compareAndSetWaitStatus(node, ws, 0);
- // 得到頭結(jié)點(diǎn)的后繼節(jié)點(diǎn)
- Node s = node.next;
- // 如果 waitStatus 大于 0 ,說明這個節(jié)點(diǎn)被取消
- if (s == null || s.waitStatus > 0) {
- s = null;
- // 那就從尾節(jié)點(diǎn)開始,找到距離 head 最近的一個 waitStatus<=0 的節(jié)點(diǎn)進(jìn)行喚醒
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- // 如果后繼節(jié)點(diǎn)不為空,則將其從阻塞狀態(tài)變?yōu)榉亲枞麪顟B(tài)
- if (s != null)
- LockSupport.unpark(s.thread);
- }
AQS 兩種資源共享模式
資源有兩種共享模式:
- 獨(dú)占模式( Exclusive ):資源是獨(dú)占的,也就是一次只能被一個線程占有,比如 ReentrantLock
- 共享模式( Share ):同時可以被多個線程獲取,具體的資源個數(shù)可以通過參數(shù)來確定,比如 Semaphore/CountDownLatch
看到這里, AQS 你 get 了嘛?