Java并發(fā)編程(JUC)模擬AND型信號量
AND型信號量可能大家都聽說過并可能都有一定的理解,但是你有使用過么?今天就使用Java來模擬實現(xiàn)!
本文是對上篇文章(進程同步機制)的一次實踐,通過JUC提供的一些機制來模擬一些OS中的AND型信號量,因為記錄型型信號量可以等價于JUC中提供的Semaphore(信號量),但是對于AND型信號量因為一些原因(主要是過時了),JUC沒有提供,今天就手動的來寫一個AND型信號量對應(yīng)的Swait操作和Ssignal操作(這里不明白的可以看前面的理論篇)。通過本篇博文讓你對進程同步機制有個更好的理解。
1.一個錯誤示例
在這里,首先解釋一下,為了滿足線程申請信號量不成功后將進程阻塞,并插入到對應(yīng)的隊列中,所以使用了ReentrantLock+Condition來實現(xiàn)Swait方法。廢話不多說,直接上代碼:
- //數(shù)據(jù)定義
- static Lock lock = new ReentrantLock();
- static Condition condition1 = lock.newCondition();
- static Condition condition2 = lock.newCondition();
- public static void Swait(String id, Semaphore s1, Semaphore s2) throws InterruptedException {
- lock.tryLock(1, TimeUnit.SECONDS);
- log.info("當前的兩個信號量的狀態(tài):【{},{}】", s1.availablePermits(), s2.availablePermits());
- //availablePermits可獲取到信號量中還剩余的值
- if(s1.availablePermits() < 1 || s2.availablePermits() < 1){
- if (s1.availablePermits() < 1) {
- log.info("線程【{}】被掛起到信號量【{}】中", id, s1);
- //阻塞,并插入到condition1的阻塞隊列中
- condition1.await();
- } else {
- log.info("線程【{}】被掛起到信號量【{}】中", id, s2);
- //阻塞,并插入到condition2的阻塞隊列中
- condition2.await();
- }
- log.info("被掛起的線程【{}】被喚醒執(zhí)行。", id);
- } else {
- log.info("為線程【{}】分配資源!", id);
- s1.acquire();
- s2.acquire();
- }
- lock.unlock();
- }
- public static void Ssignal(Semaphore s1, Semaphore s2) throws InterruptedException {
- log.info("線程【{}】執(zhí)行了釋放資源", id);
- lock.tryLock(1, TimeUnit.SECONDS);
- s1.release();
- s2.release();
- //喚醒等待隊列中的線程
- condition.signal();
- lock.unlock();
- }
大家仔細看上面的代碼,這個也是我剛開始寫的代碼,第一眼看似乎是沒什么問題,但是里面隱藏著一個坑,在Swait方法中,調(diào)用condition1.await(),此時線程被阻塞在這一行中,但是當被別的線程(調(diào)用Ssignal)喚醒時,在被阻塞的下一行開始繼續(xù)執(zhí)行,但是在后續(xù)的代碼里,是沒有去申請信號量的,而是直接就Swait成功了,這樣在執(zhí)行Ssignal時就會導(dǎo)致信號量憑空的增加了,也就無法正確的表征系統(tǒng)中的資源數(shù)量了。
2.一個簡單的示例
下面我們就對代碼進行優(yōu)化,大家可以回顧一下AND型信號量,當其因為資源不足時,需要將線程插入到第一個無法滿足條件(即Si<1)的信號量對應(yīng)的等待隊列中,并且將程序計數(shù)器放置到Swait操作的開始處,所以我們對Swait代碼進行修改如下:
- public static void Swait(String id, Semaphore s1, Semaphore s2) throws InterruptedException {
- lock.tryLock(1, TimeUnit.SECONDS);
- log.info("當前的兩個信號量的狀態(tài):【{},{}】", s1.availablePermits(), s2.availablePermits());
- //如果申請不到,就掛起線程,并將線程插入到condition的隊列中
- while (s1.availablePermits() < 1 || s2.availablePermits() < 1) {
- if (s1.availablePermits() < 1) {
- log.info("線程【{}】被掛起到信號量【{}】中", id, s1);
- condition1.await();
- } else {
- log.info("線程【{}】被掛起到信號量【{}】中", id, s2);
- condition2.await();
- }
- log.info("被掛起的線程【{}】被喚醒執(zhí)行。", id);
- }
- log.info("為線程【{}】分配資源!", id);
- s1.acquire();
- s2.acquire();
- lock.unlock();
- }
在上面的代碼中,我們將請求的資源放到一個循環(huán)條件中,以滿足將程序計數(shù)器放置到Swait操作的開始處,在每次被喚醒后都要重新判斷資源是否足夠,如果足夠才跳出循環(huán),否則就再次自我阻塞。
3.一個可以同時申請N個的Swait操作
如果你知道了信號量的種類數(shù)(系統(tǒng)中的資源類型),其實上面的代碼已經(jīng)可以滿足一定的需要了,只需要我們將所有的信號量寫入到參數(shù)列表中即可。但是對于致力于代碼的復(fù)用,這里就有些差強人意了,因此我們再次對代碼進行改進,代碼如下所示:
- public static void Swait(String id, Semaphore... list) throws InterruptedException {
- lock.lock();
- //如果資源不足,就掛起線程,并將線程插入到condition的隊列中
- while (true) {
- int count=0;
- //循環(huán)判斷參數(shù)列表中信號量的可用值
- for (Semaphore semaphore:list){
- if(semaphore.availablePermits()>0){
- count++;
- }
- }
- //如果資源都滿足,則跳出循環(huán),進行資源分配
- if(count == list.length){
- break;
- }
- log.info("線程【{}】被掛起-----", id);
- //將當前線程阻塞
- condition1.await();
- log.info("被掛起的線程【{}】被喚醒執(zhí)行。", id);
- }
- log.info("為線程【{}】分配資源!", id);
- //分配資源
- for (Semaphore semaphore:list){
- semaphore.acquire();
- }
- lock.unlock();
- }
- public static void Ssignal(String id, Semaphore... list) throws InterruptedException {
- log.info("線程【{}】執(zhí)行了釋放資源", id);
- lock.tryLock(1, TimeUnit.SECONDS);
- //循環(huán)釋放信號量
- for (Semaphore semaphore:list){
- semaphore.release();
- }
- //喚醒等待隊列中的線程
- condition.signal();
- lock.unlock();
- }
為此,我們將方法中的信號量列表改為可變的參數(shù)列表,這樣在傳參的時候就可以方便的進行了,但是也會存才一些問題,比如無法約束“借出”與“歸還”的信號量的數(shù)量是否一致。并且因為信號量的數(shù)量不定,所以無法為每個信號量新建一個條件變量(Condition),因此在上面的代碼中所有的信號量公用一個條件變量,所有阻塞的線程都插入在其阻塞隊列中。
4.一個完整的例子
這里我們使用一個經(jīng)典的進程同步問題來演示我們使用Java模擬的AND型信號量,在這里,我們采用生產(chǎn)者–消費者問題來演示,完整的代碼如下:
- //用來保證互斥的訪問臨界區(qū)(緩存區(qū))
- static final Semaphore mutex = new Semaphore(1);
- //緩沖區(qū),最大容量為50
- static List<Integer> buffer = new ArrayList<>();
- //緩沖區(qū)中還可放入的消息數(shù)量
- static final Semaphore empty = new Semaphore(50);
- //緩沖區(qū)中的消息數(shù)量
- static final Semaphore full = new Semaphore(0);
- //可重入鎖和條件變量
- static Lock lock = new ReentrantLock();
- static Condition condition = lock.newCondition();
- //用與輔助的簡單的生成消息
- static Integer count = 0;
- //生產(chǎn)者
- static class Producer extends Thread {
- Producer(String name) {
- super.setName(name);
- }
- @Override
- public void run() {
- do {
- try {
- Swait(this.getName(), mutex, empty);
- log.info("生產(chǎn)了一條消息:【{}】", count);
- buffer.add(count++);
- Thread.sleep(1000);
- Ssignal(this.getName(), mutex, full);
- } catch (InterruptedException e) {
- log.error("生產(chǎn)消息時產(chǎn)生異常!");
- }
- } while (true);
- }
- }
- //消費者
- static class Consumer extends Thread {
- Consumer(String name) {
- super.setName(name);
- }
- @Override
- public void run() {
- do {
- try {
- Swait(this.getName(), mutex, full);
- log.info("消費了一條消息:【{}】", buffer.remove(0));
- Thread.sleep(1000);
- Ssignal(this.getName(), mutex, empty);
- } catch (InterruptedException e) {
- log.error("消費消息時產(chǎn)生異常!");
- }
- } while (true);
- }
- }
- public static void Swait(String id, Semaphore... list) throws InterruptedException {
- lock.lock();
- //如果資源不足,就掛起線程,并將線程插入到condition的隊列中
- while (true) {
- int count=0;
- for (Semaphore semaphore:list){
- if(semaphore.availablePermits()>0){
- count++;
- }
- }
- if(count == list.length){
- break;
- }
- log.info("線程【{}】被掛起", id);
- condition.await();
- log.info("被掛起的線程【{}】被喚醒執(zhí)行。", id);
- }
- log.info("為線程【{}】分配資源!", id);
- for (Semaphore semaphore:list){
- semaphore.acquire();
- }
- lock.unlock();
- }
- public static void Ssignal(String id, Semaphore... list) throws InterruptedException {
- log.info("線程【{}】執(zhí)行了釋放資源", id);
- lock.tryLock(1, TimeUnit.SECONDS);
- for (Semaphore semaphore:list){
- semaphore.release();
- }
- //喚醒等待隊列中的一個線程
- condition.signal();
- lock.unlock();
- }
- public static void main(String[] args) {
- Producer p1 = new Producer("p1");
- Consumer c1 = new Consumer("c1");
- p1.start();
- c1.start();
- }
上面代碼都是可以直接執(zhí)行的,如果不需要使用參數(shù)列表,可以將上面的Swait方法進行替換即可(記得創(chuàng)建對應(yīng)的條件變量)。
下圖是部分的執(zhí)行結(jié)果:

又到了分隔線以下,本文到此就結(jié)束了,本文內(nèi)容全部都是由博主自己進行整理并結(jié)合自身的理解并且進行的代碼編寫,如果有什么錯誤,還請批評指正。
本文的所有java代碼都已通過測試,對其中有什么疑惑的,可以評論區(qū)留言,歡迎你的留言與討論;另外原創(chuàng)不易,如果本文對你有所幫助,還請留下個贊,以表支持。
希望本文可以幫助你理解加深理解進程同步,也可以幫助你理解Java并發(fā)編程.