Java并發(fā)之同步器設(shè)計
在 Java并發(fā)之內(nèi)存模型了解到多進程(線程)讀取共享資源的時候存在競爭條件。
計算機中通過設(shè)計同步器來協(xié)調(diào)進程(線程)之間執(zhí)行順序。同步器作用就像登機安檢人員一樣可以協(xié)調(diào)旅客按順序通過。
在Java中,同步器可以理解為一個對象,它根據(jù)自身狀態(tài)協(xié)調(diào)線程的執(zhí)行順序。比如鎖(Lock),信號量(Semaphore),屏障(CyclicBarrier),阻塞隊列(Blocking Queue)。
這些同步器在功能設(shè)計上有所不同,但是內(nèi)部實現(xiàn)上有共通的地方。
同步器
同步器的設(shè)計一般包含幾個方面:狀態(tài)變量設(shè)計(同步器內(nèi)部狀態(tài)),訪問條件設(shè)定,狀態(tài)更新,等待方式,通知策略。
訪問條件是控制線程是否能執(zhí)行(訪問共享對象)的條件,它往往與狀態(tài)變量緊密相關(guān)。而通知策略是線程釋放鎖定狀態(tài)后通知其它等待線程的方式,一般有以下幾種情況
- 通知所有等待的線程。
- 通知1個隨機的N個等待線程。
- 通知1個特定的N個等待線程
看下面例子,通過鎖方式的同步器
- public class Lock{
- // 狀態(tài)變量 isLocked
- private boolean isLocked = false;
- public synchronized void lock() throws InterruptedException{
- // 訪問條件 當(dāng)isLocked=false 時獲得訪問權(quán)限否則等待
- while(isLocked){
- // 阻塞等待
- wait();
- }
- //狀態(tài)更新 線程獲得訪問權(quán)限
- isLocked = true;
- }
- public synchronized void unlock(){
- //狀態(tài)更新 線程釋放訪問權(quán)限
- isLocked = false;
- // 通知策略 object.notify | object.notifyAll
- notify();
- }
- }
我們用計數(shù)信號量控制同時執(zhí)行操作活動數(shù)。這里模擬一個連接池。
- public class PoolSemaphore {
- // 狀態(tài)變量 actives 計數(shù)器
- private int actives = 0;
- private int max;
- public PoolSemaphore(int max) {
- this.max = max;
- }
- public synchronized void acquire() throws InterruptedException {
- //訪問條件 激活數(shù)小于最大限制時,獲得訪問權(quán)限否則等待
- while (this.actives == max) wait();
- //狀態(tài)更新 線程獲得訪問權(quán)限
- this.actives++;
- // 通知策略 object.notify | object.notifyAll
- this.notify();
- }
- public synchronized void release() throws InterruptedException {
- //訪問條件 激活數(shù)不為0時,獲得訪問權(quán)限否則等待
- while (this.actives == 0) wait();
- //狀態(tài)更新 線程獲得訪問權(quán)限
- this.actives--;
- // 通知策略 object.notify | object.notifyAll
- this.notify();
- }
- }
原子指令
同步器設(shè)計里面,最重要的操作邏輯是“如果滿足條件,以更新狀態(tài)變量來標(biāo)志線程獲得或釋放訪問權(quán)限”,該操作應(yīng)具備原子性。
比如test-and-set 計算機原子指令,意思是進行條件判斷滿足則設(shè)置新值。
- function Lock(boolean *lock) {
- while (test_and_set(lock) == 1);
- }
另外還有很多原子指令 fetch-and-add compare-and-swap,注意這些指令需硬件支持才有效。
同步操作中,利用計算機原子指令,可以避開鎖,提升效率。java中沒有 test-and-set 的支持,不過 java.util.concurrent.atomic 給我們提供了很多原子類API,里面支持了 getAndSet 和compareAndSet 操作。
看下面例子,主要在區(qū)別是等待方式不一樣,上面是通過wait()阻塞等待,下面是無阻塞循環(huán)。
- public class Lock{
- // 狀態(tài)變量 isLocked
- private AtomicBoolean isLocked = new AtomicBoolean(false);
- public void lock() throws InterruptedException{
- // 等待方式 變?yōu)樽孕却?nbsp;
- while(!isLocked.compareAndSet(false, true));
- //狀態(tài)更新 線程獲得訪問權(quán)限
- isLocked.set(true);
- }
- public synchronized void unlock(){
- //狀態(tài)更新 線程釋放訪問權(quán)限
- isLocked.set(false);
- }
- }
關(guān)于阻塞擴展說明
阻塞意味著需要將進程或線程狀態(tài)進行轉(zhuǎn)存,以便還原后恢復(fù)執(zhí)行。這種操作是昂貴繁重,而線程基于進程之上相對比較輕量。線程的阻塞在不同編程平臺實現(xiàn)方式也有所不同,像Java是基于JVM運行,所以它由JVM完成實現(xiàn)。
在《Java Concurrency in Practice》中,作者提到
競爭性同步可能需要OS活動,這增加了成本。當(dāng)爭用鎖時,未獲取鎖的線程必須阻塞。 JVM可以通過旋轉(zhuǎn)等待(反復(fù)嘗試獲取鎖直到成功)來實現(xiàn)阻塞,也可以通過操作系統(tǒng)掛起阻塞的線程來實現(xiàn)阻塞。哪種效率更高取決于上下文切換開銷與鎖定可用之前的時間之間的關(guān)系。對于短暫的等待,最好使用自旋等待;對于長時間的等待,最好使用暫停。一些JVM基于對過去等待時間的分析數(shù)據(jù)來自適應(yīng)地在這兩者之間進行選擇,但是大多數(shù)JVM只是掛起線程等待鎖定。
從上面可以看出JVM實現(xiàn)阻塞兩種方式
- 旋轉(zhuǎn)等待(spin-waiting),簡單理解是不暫停執(zhí)行,以循環(huán)的方式等待,適合短時間場景。
- 通過操作系統(tǒng)掛起線程。
JVM中通過 -XX: +UseSpinning 開啟旋轉(zhuǎn)等待, -XX: PreBlockSpi =10指定最大旋轉(zhuǎn)次數(shù)。
AQS
AQS是AbstractQueuedSynchronizer簡稱。本節(jié)對AQS只做簡單闡述,并不全面。
java.util.concurrent包中的 ReentrantLock,CountDownLatch,Semaphore,CyclicBarrier等都是基于是AQS同步器實現(xiàn)。
狀態(tài)變量 是用 int state 來表示,狀態(tài)的獲取與更新通過以下API操作。
- int getState()
- void setState(int newState)
- boolean compareAndSetState(int expect, int update)
該狀態(tài)值在不同API中有不同表示意義。比如ReentrantLock中表示持有鎖的線程獲取鎖的次數(shù),Semaphore表示剩余許可數(shù)。
關(guān)于等待方式和通知策略的設(shè)計
AQS通過維護一個FIFO同步隊列(Sync queue)來進行同步管理。當(dāng)多線程爭用共享資源時被阻塞入隊。而線程阻塞與喚醒是通過 LockSupport.park/unpark API實現(xiàn)。
它定義了兩種資源共享方式。
- Exclusive(獨占,只有一個線程能執(zhí)行,如ReentrantLock)
- Share(共享,多個線程可同時執(zhí)行,如Semaphore/CountDownLatch)
每個節(jié)點包含waitStatus(節(jié)點狀態(tài)),prev(前繼),next(后繼),thread(入隊時線程),nextWaiter(condition隊列的后繼節(jié)點)
waitStatus 有以下取值。
- CANCELLED(1) 表示線程已取消。當(dāng)發(fā)生超時或中斷,節(jié)點狀態(tài)變?yōu)槿∠鬆顟B(tài)不再改變。
- SIGNAL(-1) 表示后繼節(jié)點等待前繼的喚醒。后繼節(jié)點入隊時,會將前繼狀態(tài)更新為SIGNAL。
- CONDITION(-2) 表示線程在Condition queue 里面等待。當(dāng)其他線程調(diào)用了Condition.signal()方法后,CONDITION狀態(tài)的節(jié)點將從 Condition queue 轉(zhuǎn)移到 Sync queue,等待獲取鎖。
- PROPAGATE(-3) 在共享模式下,當(dāng)前節(jié)點釋放后,確保有效通知后繼節(jié)點。
- (0) 節(jié)點加入隊列時的默認狀態(tài)。
AQS 幾個關(guān)鍵 API
- tryAcquire(int) 獨占方式下,嘗試去獲取資源。成功返回true,否則false。
- tryRelease(int) 獨占方式下,嘗試釋放資源,成功返回true,否則false。
- tryAcquireShared(int) 共享方式下,嘗試獲取資源。返回負數(shù)為失敗,零和正數(shù)為成功并表示剩余資源。
- tryReleaseShared(int) 共享方式下,嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待節(jié)點返回true,否則false。
- isHeldExclusively() 判斷線程是否正在獨占資源。
acquire(int arg)
- public final void acquire(int arg) {
- if (
- // 嘗試直接去獲取資源,如果成功則直接返回
- !tryAcquire(arg)
- &&
- //線程阻塞在同步隊列等待獲取資源。等待過程中被中斷,則返回true,否則false
- acquireQueued(
- // 標(biāo)記該線程為獨占方式,并加入同步隊列尾部。
- addWaiter(Node.EXCLUSIVE), arg)
- )
- selfInterrupt();
- }
release(int arg)
- public final boolean release(int arg) {
- // 嘗試釋放資源
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- // 喚醒下一個線程(后繼節(jié)點)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
- private void unparkSuccessor(Node node) {
- ....
- Node s = node.next; // 找到后繼節(jié)點
- if (s == null || s.waitStatus > 0) {//無后繼或節(jié)點已取消
- s = null;
- // 找到有效的等待節(jié)點
- for (Node t = tail; t != null && t != node; tt = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- if (s != null)
- LockSupport.unpark(s.thread); // 喚醒
- }
總結(jié)
本文記錄并發(fā)編程中同步器設(shè)計的一些共性特征。并簡單介紹了Java中的AQS。