并發(fā)編程之Semaphore原理與應(yīng)用
前言
控制并發(fā)流程的工具類,作用就是幫助我們程序員更容易的讓線程之間合作,讓線程之間相互配合來滿足業(yè)務(wù)邏輯。比如讓線程A等待線程B執(zhí)行完畢后再執(zhí)行等合作策略。
控制并發(fā)流程的工具類主要有:

簡介
Semaphore 信號量,許可,用于控制在一段時間內(nèi),可并發(fā)訪問執(zhí)行的線程數(shù)量。它的作用是控制訪問特定資源的線程數(shù)目,底層依賴AQS的狀態(tài)State,是在生產(chǎn)當(dāng)中比較常用的一個工具類。
關(guān)于 AQS,可以查看《并發(fā)編程之抽象隊(duì)列同步器AQS應(yīng)用ReentrantLock》
一個信號量有且僅有 3 種操作,且它們?nèi)渴窃拥摹?/p>
- 初始化、增加和減少。
- 增加可以為一個進(jìn)程解除阻塞。
- 減少可以讓一個進(jìn)程進(jìn)入阻塞。
Semaphore 管理一系列許可證。
- 每個 acquire() 方法阻塞,直到有一個許可證可以獲得然后拿走一個許可證。
- 每個 release() 方法增加一個許可證,這可能會釋放一個阻塞的 acquire() 方法。
- 不使用實(shí)際的許可對象,Semaphore 只對可用許可的號碼進(jìn)行計(jì)數(shù),并采取相應(yīng)的行動。
Semaphore 在計(jì)數(shù)器不為 0 的時候?qū)€程就放行,一旦達(dá)到 0,那么所有請求資源的新線程都會被阻塞,包括增加請求到許可的線程,Semaphore 是不可重入的。
- 每一次請求一個許可都會導(dǎo)致計(jì)數(shù)器減少 1,同樣每次釋放一個許可都會導(dǎo)致計(jì)數(shù)器增加 1,一旦達(dá)到 0,新的許可請求線程將被掛起。
Semaphore 有兩種模式,公平模式 和 非公平模式 ,默認(rèn)是非公平模式。
- 公平模式就是調(diào)用 acquire 的順序就是獲取許可證的順序,遵循 FIFO。
- 非公平模式是搶占式的,也就是有可能一個新的獲取線程恰好在一個許可證釋放時得到了這個許可證,而前面還有等待的線程。
應(yīng)用場景
Semaphore可以用來做流量限制,特別是公共資源有限的應(yīng)用場景,比如說數(shù)據(jù)庫連接。
由于 release() 釋放許可時,未對釋放許可數(shù)做限制,所有可以通過該方法增加總的許可數(shù)量; reducePermits() 方法可以減少總的許可數(shù)量,通過這兩個方法可以到達(dá)動態(tài)調(diào)整許可的
分析:假如有一個需求,需讀取幾個萬個文件的數(shù)據(jù),因?yàn)槎际荌O密集型,我們可以啟動幾十個線程并發(fā)的讀取,但是如果讀取到內(nèi)存后,還需要存儲到數(shù)據(jù)庫,而數(shù)據(jù)庫的連接數(shù)只有10個,這時候我們就必須要控制只有10個線程同時獲取到數(shù)據(jù)庫連接,否則會拋出異常提示無法連接數(shù)據(jù)庫。針對這種情況,我們就可以使用Semaphore來做流量控制。
代碼如下:
- package com.niuh.tools;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Semaphore;
- import java.util.concurrent.TimeUnit;
- /**
- * <p>
- * Semaphore示例
- * </p>
- */
- public class SemaphoreRunner {
- /**
- * 線程數(shù)量
- */
- private static final int THREAD_COUNT = 30;
- /**
- * 線程池
- */
- private static ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
- private static Semaphore semaphore = new Semaphore(10);
- public static void main(String[] args) {
- for (int i = 0; i < THREAD_COUNT; i++) {
- executor.execute(new Runnable() {
- public void run() {
- try {
- // 獲取一個"許可證"
- semaphore.acquire();
- // 模擬數(shù)據(jù)保存
- TimeUnit.SECONDS.sleep(2);
- System.out.println("save date...");
- // 執(zhí)行完后,歸還"許可證"
- semaphore.release();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- }
- executor.shutdown();
- }
- }
源碼分析
Semaphore 類圖
- Semaphore 通過使用內(nèi)部類 Syn 繼承 AQS 實(shí)現(xiàn)。

其內(nèi)部主要變量和方法如下:
框架流程圖如下:

構(gòu)造函數(shù)
- permits 表示許可線程的數(shù)量
- fair 表示公平性,如果這個設(shè)為 true 的話,下次執(zhí)行的線程會是等待最久的線程
- public Semaphore(int permits) {
- sync = new NonfairSync(permits);
- }
- /**
- * @param permits 總許可數(shù)
- * @param fair fair=true 公平鎖 fair=false 非公平鎖
- */
- public Semaphore(int permits, boolean fair) {
- sync = fair ? new FairSync(permits) : new NonfairSync(permits);
- }
內(nèi)部類同步器
- abstract static class Sync extends AbstractQueuedSynchronizer {
- private static final long serialVersionUID = 1192457210091910933L;
- // 賦值setState為總許可數(shù)
- Sync(int permits) {
- setState(permits);
- }
- // 剩余許可數(shù)
- final int getPermits() {
- return getState();
- }
- // 自旋 + CAS 非公平獲取
- final int nonfairTryAcquireShared(int acquires) {
- for (;;) {
- // 剩余可用許可數(shù)
- int available = getState();
- // 本次獲取許可后,剩余許可
- int remaining = available - acquires;
- // 如果獲取后,剩余許可大于0,則CAS更新剩余許可,否則獲取更新失敗
- if (remaining < 0 ||
- compareAndSetState(available, remaining))
- return remaining;
- }
- }
- // 自旋 + CAS 釋放許可
- // 由于未對釋放許可數(shù)做限制,所以可以通過release動態(tài)增加許可數(shù)量
- protected final boolean tryReleaseShared(int releases) {
- for (;;) {
- // 當(dāng)前剩余許可
- int current = getState();
- // 許可可更新值
- int next = current + releases;
- // 如果許可更新值為負(fù)數(shù),說明許可數(shù)量益處,拋出錯誤
- if (next < current) // overflow
- throw new Error("Maximum permit count exceeded");
- // CAS更新許可數(shù)量
- if (compareAndSetState(current, next))
- return true;
- }
- }
- // 自旋 + CAS 減少許可數(shù)量
- final void reducePermits(int reductions) {
- for (;;) {
- // 當(dāng)前剩余許可
- int current = getState();
- // 更新值
- int next = current - reductions;
- // 如果更新值比當(dāng)前剩余許可大,拋出益處
- if (next > current) // underflow
- throw new Error("Permit count underflow");
- // CAS 更新許可數(shù)
- if (compareAndSetState(current, next))
- return;
- }
- }
- // 丟棄所有許可
- final int drainPermits() {
- for (;;) {
- int current = getState();
- if (current == 0 || compareAndSetState(current, 0))
- return current;
- }
- }
- }
非公平模式
- /**
- * 非公平模式
- */
- static final class NonfairSync extends Sync {
- private static final long serialVersionUID = -2694183684443567898L;
- NonfairSync(int permits) {
- super(permits);
- }
- protected int tryAcquireShared(int acquires) {
- return nonfairTryAcquireShared(acquires);
- }
- }
公平模式
- /**
- * 公平模式
- */
- static final class FairSync extends Sync {
- private static final long serialVersionUID = 2014338818796000944L;
- FairSync(int permits) {
- super(permits);
- }
- // 公平模式獲取許可
- // 公平模式不論許可是否充足,都會判斷同步隊(duì)列中是否線程在等待,如果有,獲取失敗,排隊(duì)阻塞
- protected int tryAcquireShared(int acquires) {
- for (;;) {
- // 如果有線程在排隊(duì),立即返回
- if (hasQueuedPredecessors())
- return -1;
- // 自旋 + CAS獲取許可
- int available = getState();
- int remaining = available - acquires;
- if (remaining < 0 ||
- compareAndSetState(available, remaining))
- return remaining;
- }
- }
- }
獲取許可
Semaphore 提供了兩種獲取資源的方式。
- 響應(yīng)中斷 和 不響應(yīng)中斷。
響應(yīng)中斷獲取資源
兩個方法支持 Interrupt 中斷機(jī)制,可使用 acquire() 方法每次獲取一個信號量,也可以使用 acquire(int permits) 方法獲取指定數(shù)量的信號量 。
從semaphore中獲取一個許可,線程會一直被阻塞直到獲取一個許可或是被中斷,獲取一個許可后立即返回,并把許可數(shù)減1,如果沒有可用的許可,當(dāng)前線程會處于休眠狀態(tài)直到:
- 某些其他線程調(diào)用release方法,并且當(dāng)前線程是下一個要被分配許可的線程
- 某些其他線程中斷當(dāng)前線程
如果當(dāng)前線程被acquire方法使得中斷狀態(tài)設(shè)置為on或者在等待許可時被中斷則拋出InterruptedException,并且清除當(dāng)前線程的已中斷狀態(tài)。
acquire執(zhí)行流程:

- public void acquire() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
- public void acquire(int permits) throws InterruptedException {
- if (permits < 0) throw new IllegalArgumentException();
- sync.acquireSharedInterruptibly(permits);
- }
- public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- // 獲取許可,剩余許可 >= 0,則獲取許可成功 <0 獲取許可失敗,進(jìn)入排隊(duì)
- if (tryAcquireShared(arg) < 0)
- doAcquireSharedInterruptibly(arg);
- }
- /**
- * 獲取許可失敗,當(dāng)前線程進(jìn)入同步隊(duì)列,排隊(duì)阻塞
- * @param arg the acquire argument
- */
- private void doAcquireSharedInterruptibly(int arg)
- throws InterruptedException {
- // 創(chuàng)建同步隊(duì)列節(jié)點(diǎn),并入隊(duì)列
- final Node node = addWaiter(Node.SHARED);
- boolean failed = true;
- try {
- for (;;) {
- // 如果當(dāng)前節(jié)點(diǎn)是第二個節(jié)點(diǎn),嘗試獲取鎖
- final Node p = node.predecessor();
- if (p == head) {
- int r = tryAcquireShared(arg);
- if (r >= 0) {
- setHeadAndPropagate(node, r);
- p.next = null; // help GC
- failed = false;
- return;
- }
- }
- // 阻塞當(dāng)前線程
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- throw new InterruptedException();
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
代碼的執(zhí)行步驟如下:

AQS 子類使用共享模式,需要實(shí)現(xiàn) tryAcquireShared() 方法。
- 在公平鎖中還是與ReentrantLock中的操作一樣,先判斷同步隊(duì)列中是不是還有其他的等待線程,否則直接返回失敗。否則對 state 值進(jìn)行減操作并返回剩下的信號量。
- 非公平鎖直接調(diào)用了父類中的 nonfairTryAcquireShared 和 ReentrantLock 一樣。
- // 非公平鎖的獲取方式
- protected int tryAcquireShared(int acquires) {
- return nonfairTryAcquireShared(acquires);
- }
- final int nonfairTryAcquireShared(int acquires) {
- for (;;) {
- int available = getState();//獲取去中的信號量數(shù)
- int remaining = available - acquires;//剩余信號量數(shù)
- //1.信號量數(shù)大于0,獲取共享鎖,并設(shè)置執(zhí)行compareAndSetState(available, remaining),返回剩余信號量數(shù)
- //2.信號量數(shù)小于等于0,直接返回負(fù)數(shù)
- if (remaining < 0 || compareAndSetState(available, remaining))
- return remaining;
- }
- }
- // 公平鎖獲取
- protected int tryAcquireShared(int acquires) {
- for (;;) {
- if (hasQueuedPredecessors())
- return -1;
- int available = getState();
- int remaining = available - acquires;
- if (remaining < 0 || compareAndSetState(available, remaining))
- return remaining;
- }
- }
變量 state 采用 volatile 可見修飾。
- /**
- * The synchronization state.
- */
- private volatile int state;
- /**
- * Returns the current value of synchronization state.
- * This operation has memory semantics of a <tt>volatile</tt> read.
- * @return current state value
- */
- protected final int getState() {
- return state;
- }
不響應(yīng)中斷獲取資源
兩個方法不響應(yīng) Interrupt 中斷機(jī)制,其它功能與 acquire() 方法一致。
從semaphore中獲取一個許可,線程會一直被阻塞直到獲取一個許可或是被中斷,獲取一個許可后立即返回,并把許可數(shù)減1,如果沒有可用的許可,當(dāng)前線程會處于休眠狀態(tài)直到:
- 某些其他線程調(diào)用release方法,并且當(dāng)前線程是下一個要被分配許可的線程;
- 如果當(dāng)前線程在等待許可時被中斷,那么它會接著等待,但是與沒有發(fā)生中斷相比,為線程分配許可的時間可能改變。
- public void acquireUninterruptibly() {
- sync.acquireShared(1);
- }
- public void acquireUninterruptibly(int permits) {
- if (permits < 0) throw new IllegalArgumentException();
- sync.acquireShared(permits);
- }
嘗試獲得信號量
嘗試獲得信號量有三個方法。
- 嘗試獲取信號量,如果獲取成功則返回 true,否則馬上返回 false,不會阻塞當(dāng)前線程。
- 嘗試獲取信號量,如果在指定的時間內(nèi)獲得信號量,則返回 true,否則返回 false。
- 嘗試獲取指定數(shù)量的信號量,如果在指定的時間內(nèi)獲得信號量,則返回 true,否則返回 false。
- public boolean tryAcquire() {
- return sync.nonfairTryAcquireShared(1) >= 0;
- }
- public boolean tryAcquire(long timeout, TimeUnit unit)
- throws InterruptedException {
- return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
- }
- public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
- throws InterruptedException {
- if (permits < 0) throw new IllegalArgumentException();
- return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
- }
釋放歸還許可
release 方法,主要作用是釋放資源,需要保證 release 的執(zhí)行,否則線程退出但是資源沒有釋放。
- 一般代碼寫在 finally 中是最好的。
- 并且獲取多少資源就要釋放多少資源,否則還是資源沒被正確釋放,如果一開始執(zhí)行了 acquire(10) 最后釋放的時候不能只寫一個 release() 而是 release(10) 才對。
- // 嘗試釋放鎖
- public final boolean release(int arg) {
- // 如果釋放鎖成功 喚醒同步隊(duì)列中的后繼節(jié)點(diǎn)
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
- // 為了方便對比把兩個代碼放在一塊 可以看到 release 中的結(jié)構(gòu)完全一樣
- // 區(qū)別就在于 doReleaseShared 中有更多的判斷操作
- public final boolean releaseShared(int arg) {
- if (tryReleaseShared(arg)) {
- doReleaseShared(); //在里面執(zhí)行的 unparkSuccessor(h)
- return true;
- }
- return false;
- }
子類實(shí)現(xiàn)共享模式的類需要實(shí)現(xiàn) tryReleaseShared() 方法判斷是否釋放成功。
- 這個方法是一個 CAS 自旋,原因是因?yàn)?Semaphore 是一個共享鎖,可能有多個線程同時釋放資源,因此 CAS 操作可能失敗。
- // 由于未對釋放許可數(shù)做限制,所以可以通過release動態(tài)增加許可數(shù)量
- protected final boolean tryReleaseShared(int releases) {
- for (;;) {
- //獲取當(dāng)前許可數(shù)量
- int current = getState();
- //計(jì)算回收后的數(shù)量
- int next = current + releases;
- if (next < current) // overflow
- throw new Error("Maximum permit count exceeded");
- //CAS改變許可數(shù)量成功,返回true
- if (compareAndSetState(current, next))
- return true;
- }
- }
一旦 CAS 改變許可數(shù)量成功,就調(diào)用 doReleaseShared() 方法釋放阻塞的線程。
- private void doReleaseShared() {
- // 自旋,喚醒等待的第一個線程(其它線程將由第一個線程向后傳遞喚醒)
- for (;;) {
- Node h = head;
- if (h != null && h != tail) {
- int ws = h.waitStatus;
- if (ws == Node.SIGNAL) {
- if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
- continue; // loop to recheck cases
- // 喚醒第一個等待線程
- unparkSuccessor(h);
- }
- else if (ws == 0 &&
- !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
- continue; // loop on failed CAS
- }
- if (h == head) // loop if head changed
- break;
- }
- }
其他方法
獲取當(dāng)前剩余的信號量數(shù)量
- 該方法返回 AQS 中 state 變量的值,當(dāng)前剩余的信號量個數(shù)。
- public int availablePermits() {
- return sync.getPermits();
- }
- // Sync
- final int getPermits() {
- return getState();
- }
耗盡許可數(shù)量
- 獲取并返回立即可用的所有許可。
- Sync 類的drainPermits()方法,獲取 1 個信號量后將可用的信號量個數(shù)置為 0。例如總共有 10 個信號量,已經(jīng)使用了 5 個,再調(diào)用 drainPermits() 方法后,可以獲得一個信號量,剩余 4 個信號量就消失了,總共可用的信號量就變成 6 個了。用 CAS 自旋將剩余資源清空。
- public int drainPermits() {
- return sync.drainPermits();
- }
- // Sync
- final int drainPermits() {
- for (;;) {
- int current = getState();
- if (current == 0 || compareAndSetState(current, 0))
- return current;
- }
- }
縮減許可數(shù)量
- 縮減必須是單向的,即只能減少不能增加。用 CAS 自旋在剩余共享資源上做縮減。
- protected void reducePermits(int reduction) {
- if (reduction < 0) throw new IllegalArgumentException();
- sync.reducePermits(reduction);
- }
- // Sync
- final void reducePermits(int reductions) {
- for (;;) {
- int current = getState();
- int next = current - reductions;
- if (next > current) // underflow
- throw new Error("Permit count underflow");
- if (compareAndSetState(current, next))
- return;
- }
- }
上述兩個方法對共享資源數(shù)量的修改操作有兩點(diǎn)需要注意
- 是不可逆的
- 是對剩余資源的操作而不是全部資源,當(dāng)剩余資源數(shù)目不足或已經(jīng)為 0 時,方法就返回。
- 正在被占用的資源不參與。
判斷 AQS 同步隊(duì)列中是否還有 Node
- public final boolean hasQueuedThreads() {
- return sync.hasQueuedThreads();
- }
- // AbstractQueuedSynchronizer
- public final boolean hasQueuedThreads() {
- //頭結(jié)點(diǎn)不等于尾節(jié)點(diǎn)就說明鏈表中還有元素
- return head != tail;
- }
總結(jié)
Semaphore 的內(nèi)部工作流程也是基于 AQS,不同于 CyclicBarrier 和 ReentrantLock,不會使用到 AQS 的條件隊(duì)列,都是在同步隊(duì)列中操作,只是當(dāng)前線程會被 park。
Semaphore 是 JUC 包提供的一個典型的共享鎖,它通過自定義兩種不同的同步器(FairSync 和 NonfairSync)提供了公平和非公平兩種工作模式,兩種模式下分別提供了限時/不限時、響應(yīng)中斷/不響應(yīng)中斷的獲取資源的方法(限時獲取總是及時響應(yīng)中斷的),而所有的釋放資源的 release() 操作是統(tǒng)一的。
PS:以上代碼提交在 Github :
https://github.com/Niuh-Study/niuh-juc-final.git