16圖,一個State竟然搞出了這么多并發(fā)鎖
本文轉載自微信公眾號「程序員jinjunzhu」,作者jinjunzhu。轉載本文請聯(lián)系程序員jinjunzhu公眾號。
上篇文章扔掉源碼,15張圖帶你徹底理解java AQS通過15張圖講解了AQS管程模型中入口等待隊列原理。AQS使用FIFO隊列實現(xiàn)了一個鎖相關的并發(fā)器模板,可以基于這個模板來實現(xiàn)各種鎖。JDK建議并發(fā)鎖工具類使用內部類實現(xiàn)AQS的同步屬性。
今天我們就來聊一聊基于AQS實現(xiàn)的各種鎖。
1 ReentrantLock
我們先來看一下UML類圖:
從圖中可以看到,ReentrantLock使用抽象內部類Sync來實現(xiàn)了AQS的方法,然后基于Sync這個同步器實現(xiàn)了公平鎖和非公平鎖。主要實現(xiàn)了下面3個方法:
- tryAcquire(int arg):獲取獨占鎖
- tryRelease(int arg):釋放獨占鎖
- isHeldExclusively:當前線程是否占有獨占鎖
ReentrantLock默認實現(xiàn)的是非公平鎖,可以在構造函數(shù)指定。
從實現(xiàn)的方法可以看到,ReentrantLock中獲取的鎖是獨占鎖,我們再來看一下獲取和釋放獨占鎖的代碼:
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
獨占鎖的特點是調用上面acquire方法,傳入的參數(shù)是1。
1.1 獲取公平鎖
獲取鎖首先判斷同步狀態(tài)(state)的值。
1.1.1 state等于0
這說明沒有線程占用鎖,當前線程如果符合下面兩個條件,就可以獲取到鎖:
- 沒有前任節(jié)點,如下圖:
- CAS的方式更新state值(把0更新成1)成功。
如果獲取獨占鎖成功,會更新AQS中exclusiveOwnerThread為當前線程,這個很容易理解。
1.1.2 state不等于0
這說明已經有線程占有鎖,判斷占有鎖的線程是不是當前線程,如下圖:
state += 1值如果小于0,會拋出異常。
如果獲取鎖失敗,則進入AQS隊列等待喚醒。
1.2 獲取非公平鎖
跟公平鎖相比,非公平鎖的唯一不同是如果判斷到state等于0,不用判斷有沒有前任節(jié)點,只要CAS設置state值(把0更新成1)成功,就獲取到了鎖。
1.3 釋放鎖
公平鎖和非公平鎖,釋放邏輯完全一樣,都是在內部類Sync中實現(xiàn)的。釋放鎖需要注意兩點,如下圖:
為什么state會大于1,因為是可以重入的,占有鎖的線程可以多次獲取鎖。
1.4 總結
公平鎖的特點是每個線程都要進行排隊,不用擔心線程永遠獲取不到鎖,但有個缺點是每個線程入隊后都需要阻塞和被喚醒,這一定程度上影響了效率。非公平鎖的特點是每個線程入隊前都會先嘗試獲取鎖,如果獲取成功就不會入隊了,這比公平鎖效率高。但也有一個缺點,隊列中的線程有可能等待很長時間,高并發(fā)下甚至可能永遠獲取不到鎖。
2 ReentrantReadWriteLock
我們先來看一下UML類圖:
從圖中可以看到,ReentrantReadWriteLock使用抽象內部類Sync來實現(xiàn)了AQS的方法,然后基于Sync這個同步器實現(xiàn)了公平鎖和非公平鎖。主要實現(xiàn)了下面3個方法:
- tryAcquire(int arg):獲取獨占鎖
- tryRelease(int arg):釋放獨占鎖
- tryAcquireShared(int arg):獲取共享鎖
- tryReleaseShared(int arg):釋放共享鎖
- isHeldExclusively:當前線程是否占有獨占鎖
可見ReentrantReadWriteLock里面同時用到了共享鎖和獨占鎖。
下圖是定義的幾個常用變量:
下面這2個方法用戶獲取共享鎖和獨占鎖的數(shù)量:
- static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
- static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
從sharedCount可以看到,共享鎖的數(shù)量要右移16位獲取,也就是說共享鎖占了高16位。從上圖EXCLUSIVE_MASK的定義看到,跟EXCLUSIVE_MASK進行與運算,得到的是低16位的值,所以獨占鎖占了低16位。如下圖:
這樣上面獲取鎖數(shù)量的方法就很好理解了。參考1[1]
2.1 讀鎖
讀鎖的實現(xiàn)對應內部類ReadLock。
2.1.1 獲取讀鎖
獲取讀鎖實際上是ReadLock調用了AQS的下面方法,傳入參數(shù)是1:
- public final void acquireShared(int arg) {
- if (tryAcquireShared(arg) < 0)
- doAcquireShared(arg);
- }
ReentrantReadWriteLock內部類Sync實現(xiàn)了tryAcquireShared方法,主要包括如下三種情況:
- 使用exclusiveCount方法查看state中是否有獨占鎖,如果有并且獨占線程不是當前線程,返回-1,獲取失敗。
- 使用sharedCount查看state中共享鎖數(shù)量,如果讀鎖數(shù)量小于最大值(MAX_COUNT=65535),則再滿足下面3個條件就可以獲取成功并返回1:
- 當前線程不需要阻塞(readerShouldBlock)。在公平鎖中,需要判斷是否有前置節(jié)點,如下圖就需要阻塞:
在非公平鎖中,則是判斷第一個節(jié)點是不是有獨占鎖,如下圖就需要阻塞:
- 使用CAS把state的值加SHARED_UNIT(65536)。
這里是不是就更理解讀鎖占高位的說法了,獲取一個讀鎖,state的值就要加SHARED_UNIT這么多個。
- 給當前線程的holdCount加1。
如果2失敗,自旋,重復上面的步驟直到獲取到鎖。
tryAcquireShared(獲取共享鎖)會返回一個整數(shù),如下:
- 返回負數(shù):獲取鎖失敗。
- 返回0:獲取鎖成功但是之后再由線程來獲取共享鎖時就會失敗。
- 返回正數(shù):獲取鎖成功而且之后再有線程來獲取共享鎖時也可能會成功。
2.1.2 釋放讀鎖
ReentrantReadWriteLock釋放讀鎖是在ReadLock中調用了AQS下面方法,傳入的參數(shù)是1:
- public final boolean releaseShared(int arg) {
- if (tryReleaseShared(arg)) {
- doReleaseShared();
- return true;
- }
- return false;
- }
ReentrantReadWriteLock內部類Sync實現(xiàn)了releaseShared方法,具體邏輯分為下面兩步:
當前線程holdCounter值減1。
CAS的方式將state的值減去SHARED_UNIT。
2.2 寫鎖
寫鎖的實現(xiàn)對應內部類WriteLock。
2.2.1 獲取寫鎖
ReentrantReadWriteLock獲取寫鎖其實是在WriteLock中調用了AQS的下面方法,傳入參數(shù)1:
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
在ReentrantReadWriteLock內部類Sync實現(xiàn)了tryAcquire方法,首先獲取state值和獨占鎖數(shù)量(exclusiveCount),之后分如下兩種情況,如下圖:
state不等于0:
- 獨占鎖數(shù)量等于0,這時說明有線程占用了共享鎖,如果當前線程不是獨占線程,獲取鎖失敗。
- 獨占鎖數(shù)量不等于0,獨占鎖數(shù)量加1后大于MAX_COUNT,獲取鎖失敗。
- 上面2種情況不符合,獲取鎖成功,state值加1。
state等于0,判斷當前線程是否需要阻塞(writerShouldBlock)。
在公平鎖中,跟readerShouldBlock的邏輯完全一樣,就是判斷隊列中head節(jié)點的后繼節(jié)點是不是當前線程。在非公平鎖中,直接返回false,即可以直接嘗試獲取鎖。
如果當前線程不需要阻塞,并且給state賦值成功,使用CAS方式把state值加1,把獨占線程置為當前線程。
2.2.2 釋放寫鎖
ReentrantReadWriteLock釋放寫鎖其實是在WriteLock中調用了AQS的下面方法,傳入參數(shù)1:
- public final boolean release(int arg) {
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
ReentrantReadWriteLock在Sync中實現(xiàn)了tryRelease(arg)方法,邏輯如下:
- 判斷當前線程是不是獨占線程,如果不是,拋出異常。
- state值減1后,用新state值判斷獨占鎖數(shù)量是否等于0
- 如果等于0,則把獨占線程置為空,返回true,這樣上面的代碼就可以喚醒隊列中的后置節(jié)點了
- 如果不等于0,返回false,不喚醒后繼節(jié)點。
3 CountDownLatch
我們先來看一下UML類圖:
從上面的圖中看出,CountDownLatch的內部類Sync實現(xiàn)了獲取共享鎖和釋放共享鎖的邏輯。
使用CountDownLatch時,構造函數(shù)會傳入一個int類型的參數(shù)count,表示調動count次的countDown后主線程才可以被喚醒。
- public CountDownLatch(int count) {
- if (count < 0) throw new IllegalArgumentException("count < 0");
- this.sync = new Sync(count);
- }
上面的Sync(count)就是將AQS中的state賦值為count。
3.1 await
CountDownLatch的await方法調用了AQS中的acquireSharedInterruptibly(int arg),傳入參數(shù)1,不過這個參數(shù)并沒有用。代碼如下:
- public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- if (tryAcquireShared(arg) < 0)
- doAcquireSharedInterruptibly(arg);
- }
Sync中實現(xiàn)了tryAcquireShared方法,await邏輯如下圖:
上面的自旋過程就是等待state的值不斷減小,只有state值成為0的時候,主線程才會跳出自旋執(zhí)行之后的邏輯。
3.2 countDown
CountDownLatch的countDown方法調用了AQS的releaseShared(int arg),傳入參數(shù)1,不過這個參數(shù)并沒有用。內部類Sync實現(xiàn)了tryReleaseShared方法,邏輯如下圖:
3.3 總結
CountDownLatch的構造函數(shù)入參值會賦值給state變量,入隊操作是主線程入隊,每個子線程調用了countDown后state值減1,當state值成為0后喚醒主線程。
4 Semaphore
Semaphore是一個信號量,用來保護共享資源。如果線程要訪問共享資源,首先從Semaphore獲取鎖(信號量),如果信號量的計數(shù)器等于0,則當前線程進入AQS隊列阻塞等待。否則,線程獲取鎖成功,信號量減1。使用完共享資源后,釋放鎖(信號量加1)。
Semaphore跟管程模型不一樣的是,允許多個(構造函數(shù)的permits)線程進入管程內部,因此也常用它來做限流。
UML類圖如下:
Semaphore的構造函數(shù)會傳入一個int類型參數(shù),用來初始化state的值。
4.1 acquire
獲取鎖的操作調用了AQS中的acquireSharedInterruptibly方法,傳入參數(shù)1,代碼見CountDownLatch中await小節(jié)。Semaphore在公平鎖和非公平鎖中分別實現(xiàn)了tryAcquireShared方法。
4.1.1 公平鎖
Semaphore默認使用非公平鎖,如果使用公平鎖,需要在構造函數(shù)指定。獲取公平鎖邏輯比較簡單,如下圖:
4.1.2 非公平鎖
acquire在非公平的鎖唯一的區(qū)別就是不會判斷AQS隊列是否有前置節(jié)點(hasQueuedPredecessors),而是直接嘗試獲取鎖。
除了acquire方法外,還有其他幾個獲取鎖的方法,原理類似,只是調用了AQS中的不同方法。
4.2 release
釋放鎖的操作調用了AQS中的releaseShared(int arg)方法,傳入參數(shù)1,在內部類Sync中實現(xiàn)了tryReleaseShared方法,邏輯很簡單:使用CAS的方式將state的值加1,之后喚醒隊列中的后繼節(jié)點。
5 ThreadPoolExecutor
ThreadPoolExecutor中也用到了AQS,看下面的UML類圖:
Worker主要在ThreadPoolExecutor中斷線程的時候使用。Worker自己實現(xiàn)了獨占鎖,在中斷線程時首先進行加鎖,中斷操作后釋放鎖。按照官方說法,這里不直接使用ReentrantLock的原因是防止調用控制線程池的方法(類似setCorePoolSize)時能夠重新獲取到鎖,
5.1 tryAcquire
使用CAS的方式把AQS中state從0改為1,把當前線程置為獨占線程。
5.2 tryRelease
把獨占線程置為空,把AQS中state改為0。
Worker初始化的時候會把state置為-1,這樣是不能獲取鎖成功的。只有調用了runWorker方法,才會通過釋放鎖操作把state更為0。這樣保證了只中斷運行中的線程,而不會中斷等待中的線程。
6 總結
AQS基于雙向隊列實現(xiàn)了入口等待隊列,基于state變量實現(xiàn)了各種并發(fā)鎖,上篇文章講了入口等待隊列,而這篇文章主要講了基于AQS的并發(fā)鎖原理。
在管程模型中,還有一塊兒沒有介紹,就是條件等待隊列,請看下篇。