聊聊Semaphore 信號量源碼分析
本文轉(zhuǎn)載自微信公眾號「運維開發(fā)故事」,作者老鄭。轉(zhuǎn)載本文請聯(lián)系運維開發(fā)故事公眾號。
概述
Semaphore 信號量, 信號量維護了一組許可。如果有必要每個采集模塊都會阻塞,直到有許可可用。然后獲取許可證。每次發(fā)布都會添加一個許可證,可能會釋放一個阻塞資源。但是,沒有使用實際的許可對象;信號量可用數(shù)量的計數(shù),并且進行操作。 信號量通常可以用于限制訪問某些(物理或者邏輯)資源的線程數(shù)。例如下面是一個使用信號量控制對線程池訪問。
- class Pool {
- private static final int MAX_AVAILABLE = 100;
- private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
- public Object getItem() throws InterruptedException {
- available.acquire();
- return getNextAvailableItem();
- }
- public void putItem(Object x) {
- if (markAsUnused(x))
- available.release();
- }
- // Not a particularly efficient data structure; just for demo
- protected Object[] items = ... whatever kinds of items being managed
- protected boolean[] used = new boolean[MAX_AVAILABLE];
- protected synchronized Object getNextAvailableItem() {
- for (int i = 0; i < MAX_AVAILABLE; ++i) {
- if (!used[i]) {
- used[i] = true;
- return items[i];
- }
- }
- return null; // not reached
- }
- protected synchronized boolean markAsUnused(Object item) {
- for (int i = 0; i < MAX_AVAILABLE; ++i) {
- if (item == items[i]) {
- if (used[i]) {
- used[i] = false;
- return true;
- } else
- return false;
- }
- }
- return false;
- }
- }
在獲取項目之前,每個線程必須從信號量獲取一個許可證,以確保項目可用。當線程處理完該項后,它將返回到池中,并向信號量返回一個許可證,允許另一個線程獲取該項。請注意,在調(diào)用acquire時不會保持同步鎖,因為這會阻止項目返回池。信號量封裝了限制對池的訪問所需的同步,與維護池本身一致性所需的任何同步分開。
初始化為1的信號量,其使用方式是最多只有一個可用的許可證,可以用作互斥鎖。這通常被稱為二進制信號量,因為它只有兩個狀態(tài):一個許可證可用,或者零個許可證可用。以這種方式使用時,二進制信號量的屬性(與許多java.util.concurrent.locks.Lock實現(xiàn)不同)是“鎖”可以由所有者以外的線程釋放(因為信號量沒有所有權(quán)的概念)。這在某些特定的上下文中非常有用,例如死鎖恢復。
此類的構(gòu)造函數(shù)可以選擇接受公平性參數(shù)。當設置為false時,此類不保證線程獲取許可的順序。特別是,允許bargging,也就是說,調(diào)用acquire的線程可以在一直在等待的線程之前分配一個許可證-從邏輯上講,新線程將自己置于等待線程隊列的頭部。當公平性設置為true時,信號量保證選擇調(diào)用任何acquire方法的線程,以按照其調(diào)用這些方法的處理順序(先進先出;先進先出)。請注意,F(xiàn)IFO排序必然適用于這些方法中的特定內(nèi)部執(zhí)行點。因此,一個線程可以在另一個線程之前調(diào)用acquire,但在另一個線程之后到達排序點,類似地,從方法返回時也是如此。還請注意,untimed tryAcquire方法不支持公平性設置,但將接受任何可用的許可。
通常,用于控制資源訪問的信號量應該初始化為公平,以確保沒有線程因訪問資源而耗盡。當將信號量用于其他類型的同步控制時,非公平排序的吞吐量優(yōu)勢往往超過公平性考慮。
此類還提供了方便的方法,可以一次獲取和發(fā)布多個許可證。當使用這些方法時,如果沒有將公平設置為真,則要小心無限期延遲的風險增加。
內(nèi)存一致性影響:在調(diào)用“release”方法(如release())之前的線程中的操作發(fā)生在另一個線程中成功的“acquire”方法(如acquire()之后的操作)之前。
原理分析
Semaphore 信號量,是控制并發(fā)的有效手段。它底層通過 AQS 實現(xiàn)。如下圖所示:
構(gòu)造方法
Semaphore 構(gòu)造方法有兩個 Semaphore(int permits) 和 Semaphore(int permits, boolean fair) 后者有兩個參數(shù):第一個參數(shù)是許可數(shù)量初始化,第二個參數(shù)定義信號量是否公平鎖同步(默認為非公平)。
- public Semaphore(int permits) {
- sync = new NonfairSync(permits);
- }
- public Semaphore(int permits, boolean fair) {
- sync = fair ? new FairSync(permits) : new NonfairSync(permits);
- }
acquire 方法
acquire 方法可以為理解獲取許可,如果存在剩余許可那么就可以進入后續(xù)代碼塊,如果沒有獲取線程進入阻塞。在共享模式下獲取,如果中斷將中止。通過首先檢查中斷狀態(tài),然后調(diào)用至少一次tryAcquireShared,并在成功時返回來實現(xiàn)。否則線程將排隊,可能會重復阻塞和取消阻塞,調(diào)用tryAcquireShared,直到成功或線程中斷。
release 方法
acquire 方法可以為理解釋放許可,其他等待許可的線程進入資源競爭階段。然后去查找等待隊列隊頭有效的等待節(jié)點進行喚醒。
整體流程
Semaphore 信號量原理.png
舉個例子
場景描述
對于控制流量,或者控制并發(fā)我們可以使用 Semaphore 信號量來完成。例子:有100 個人需要過橋,但是橋上最多同時能夠承受 5 個人的重量。如果我們需要有序的過橋那么就可以采用信號量的方式來控制。
初始化 5 個許可。
上橋之前先去獲取 許可,如果有剩余許可就上橋。
如果沒有 許可,就等待許可。
image.png
模擬代碼
首先定義橋?qū)ο螅胂滤荆?/p>
- public class Bridge {
- private String name;
- private String address;
- private Integer max;
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- public String getAddress() {
- return address;
- }
- public void setAddress(String address) {
- this.address = address;
- }
- public Integer getMax() {
- return max;
- }
- public void setMax(Integer max) {
- this.max = max;
- }
- }
然后定義遷徙者對象,就是過橋的人,然后他有個動作就是過橋。代碼如下所示。
- public class Migrator {
- private String name;
- public void gapBridge() {
- System.out.println("Migrator: " + this.name + ", time:" + System.currentTimeMillis());
- }
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- }
調(diào)用代碼如下:
- public class MainTest {
- public static void main(String[] args) {
- Bridge bridge = new Bridge();
- bridge.setAddress("云南");
- bridge.setName("XX 橋");
- bridge.setMax(5);
- Semaphore semaphore = new Semaphore(bridge.getMax());
- for (int i=0; i< 100; i++) {
- int idx = i;
- new Thread(()-> {
- try {
- Migrator migrator = new Migrator();
- migrator.setName("name-" + idx);
- semaphore.acquire();
- TimeUnit.SECONDS.sleep(1);
- migrator.gapBridge();
- System.out.println("name " + migrator.getName() + " 通過");
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- semaphore.release();
- }
- }).start();
- }
- }
- }
輸出日志如下:我們可以看到剛開始的時候有 5 個線程獲取到 "許可" 幾乎同時過橋,后面逐漸就是釋放一個許可,另外一個線程繼續(xù)執(zhí)行。
- Migrator: name-7, time:1630495912011
- name name-7 通過
- Migrator: name-2, time:1630495912011
- name name-2 通過
- Migrator: name-4, time:1630495912011
- Migrator: name-8, time:1630495912011
- Migrator: name-3, time:1630495912011
- name name-3 通過
- name name-8 通過
- name name-4 通過
- Migrator: name-5, time:1630495913012
- name name-5 通過
- Migrator: name-0, time:1630495913012
- name name-0 通過
- Migrator: name-6, time:1630495913013
參考文檔
https://www.cnblogs.com/leesf456/p/5414778.html