自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

java 生產(chǎn)者消費者問題

開發(fā) 后端 前端
  生產(chǎn)者和消費者問題是線程模型中的經(jīng)典問題:生產(chǎn)者和消費者在同一時間段內(nèi)共用同一個存儲空間,如下圖所示,生產(chǎn)者向空間里存放數(shù)據(jù),而消費者取用數(shù)據(jù),如果不加以協(xié)調(diào)可能會出現(xiàn)以下情況:

引言

  生產(chǎn)者和消費者問題是線程模型中的經(jīng)典問題:生產(chǎn)者和消費者在同一時間內(nèi)共用同一個存儲空間,如下圖所示,生產(chǎn)者向空間里存放數(shù)據(jù),而消費者取用數(shù)據(jù),如果不加以協(xié)調(diào)可能會出現(xiàn)以下情況:

生產(chǎn)者消費者圖

  存儲空間已滿,而生產(chǎn)者占用著它,消費者等著生產(chǎn)者讓出空間從而去除產(chǎn)品,生產(chǎn)者等著消費者消費產(chǎn)品,從而向空間中添加產(chǎn)品?;ハ嗟却?,從而發(fā)生死鎖

  生產(chǎn)者消費者問題是研究多線程程序時繞不開的經(jīng)典問題之一,它描述是有一塊緩沖區(qū)作為倉庫,生產(chǎn)者可以將產(chǎn)品放入倉庫,消費者則可以從倉庫中取走產(chǎn)品。解決生產(chǎn)者/消費者問題的方法可分為兩類:

  (1)采用某種機制保護生產(chǎn)者和消費者之間的同步;

 ?。?)在生產(chǎn)者和消費者之間建立一個管道。

  ***種方式有較高的效率,并且易于實現(xiàn),代碼的可控制性較好,屬于常用的模式。第二種管道緩沖區(qū)不易控制,被傳輸數(shù)據(jù)對象不易于封裝等,實用性不強。因此本文只介紹同步機制實現(xiàn)的生產(chǎn)者/消費者問題。

  同步問題核心在于:如何保證同一資源被多個線程并發(fā)訪問時的完整性。常用的同步方法是采用信號或加鎖機制, 保證資源在任意時刻至多被一個線程訪問。Java語言在多線程編程上實現(xiàn)了完全對象化,提供了對同步機制的良好支持。在Java中一共有四種方法支持同 步,其中前三個是同步方法,一個是管道方法。

(1)wait() / notify()方法

(2)await() / signal()方法

(3)BlockingQueue阻塞隊列方法

(4)PipedInputStream / PipedOutputStream

本文只介紹最常用的前兩種種,第三、四種暫不做討論,有興趣的讀者可以自己去網(wǎng)上找答案。

一、wait() / notify()方法

  wait() / nofity()方法是基類Object的兩個方法,也就意味著所有Java類都會擁有這兩個方法,這樣,我們就可以為任何對象實現(xiàn)同步機制。

  wait()方法:當(dāng)緩沖區(qū)已滿/空時,生產(chǎn)者/消費者線程停止自己的執(zhí)行,放棄鎖,使自己處于等等狀態(tài),讓其他線程執(zhí)行。

  notify()方法:當(dāng)生產(chǎn)者/消費者向緩沖區(qū)放入/取出一個產(chǎn)品時,向其他等待的線程發(fā)出可執(zhí)行的通知,同時放棄鎖,使自己處于等待狀態(tài)。

代碼實現(xiàn):

1、倉庫類

 

  1. import java.util.LinkedList;   
  2.  
  3. /**  
  4.  * 倉庫類Storage實現(xiàn)緩沖區(qū)  
  5.  *   
  6.  * @author zcr  
  7.  */   
  8. public class Storage   
  9. {   
  10.     // 倉庫***存儲量   
  11.     private final int MAX_SIZE = 100;   
  12.    
  13.     // 倉庫存儲的載體   
  14.     private LinkedList<Object> list = new LinkedList<Object>();   
  15.    
  16.     /** 
  17.      * 生產(chǎn)num個產(chǎn)品 
  18.      * @param num 生產(chǎn)產(chǎn)品的數(shù)量 
  19.      */ 
  20.     public void produce(int num)   
  21.     {   
  22.         // 同步代碼段   
  23.         synchronized (list)   
  24.         {   
  25.             // 如果倉庫剩余容量不足   
  26.             while (list.size() + num > MAX_SIZE)   
  27.             {   
  28.                 System.out.println("【要生產(chǎn)的產(chǎn)品數(shù)量】:" + num + " \t 【庫存量】:"   
  29.                         + list.size() + "\t 暫時不能執(zhí)行生產(chǎn)任務(wù)!");   
  30.                 try   
  31.                 {   
  32.                     // 由于條件不滿足,生產(chǎn)阻塞   
  33.                     list.wait();   
  34.                 }   
  35.                 catch (InterruptedException e)   
  36.                 {   
  37.                     e.printStackTrace();   
  38.                 }   
  39.             }   
  40.    
  41.             // 生產(chǎn)條件滿足情況下,生產(chǎn)num個產(chǎn)品   
  42.             for (int i = 1; i <= num; ++i)   
  43.             {   
  44.                 list.add(new Object());   
  45.             }   
  46.    
  47.             System.out.println("【已經(jīng)生產(chǎn)產(chǎn)品數(shù)】:" + num + "\t 【現(xiàn)倉儲量為】:" + list.size());   
  48.    
  49.             list.notifyAll();   
  50.         }   
  51.     }   
  52.    
  53.     /** 
  54.      * 消費num個產(chǎn)品 
  55.      * @param num 消費產(chǎn)品數(shù)量 
  56.      */ 
  57.     public void consume(int num)   
  58.     {   
  59.         // 同步代碼段   
  60.         synchronized (list)   
  61.         {   
  62.             // 如果倉庫存儲量不足   
  63.             while (list.size() < num)   
  64.             {   
  65.                 System.out.println("【要消費的產(chǎn)品數(shù)量】:" + num + " \t【庫存量】:"   
  66.                         + list.size() + " \t 暫時不能執(zhí)行生產(chǎn)任務(wù)!");   
  67.                 try   
  68.                 {   
  69.                     // 由于條件不滿足,消費阻塞   
  70.                     list.wait();   
  71.                 }   
  72.                 catch (InterruptedException e)   
  73.                 {   
  74.                     e.printStackTrace();   
  75.                 }   
  76.             }   
  77.    
  78.             // 消費條件滿足情況下,消費num個產(chǎn)品   
  79.             for (int i = 1; i <= num; ++i)   
  80.             {   
  81.                 list.remove();   
  82.             }   
  83.    
  84.             System.out.println("【已經(jīng)消費產(chǎn)品數(shù)】:" + num + " \t 【現(xiàn)倉儲量為】:" + list.size());   
  85.    
  86.             list.notifyAll();   
  87.         }   
  88.     }   
  89.    
  90.     // get/set方法   
  91.     public LinkedList<Object> getList()   
  92.     {   
  93.         return list;   
  94.     }   
  95.    
  96.     public void setList(LinkedList<Object> list)   
  97.     {   
  98.         this.list = list;   
  99.     }   
  100.    
  101.     public int getMAX_SIZE()   
  102.     {   
  103.         return MAX_SIZE;   
  104.     }   

 

2、生產(chǎn)者

  1. /**  
  2.  * 生產(chǎn)者類Producer繼承線程類Thread  
  3.  *   
  4.  *   
  5.  * @author zcr 
  6.  *   
  7.  */   
  8. public class Producer extends Thread   
  9. {   
  10.     // 每次生產(chǎn)的產(chǎn)品數(shù)量   
  11.     private int num;   
  12.    
  13.     // 所在放置的倉庫   
  14.     private Storage storage;   
  15.    
  16.     // 構(gòu)造函數(shù),設(shè)置倉庫   
  17.     public Producer(Storage storage)   
  18.     {   
  19.         this.storage = storage;   
  20.     }   
  21.    
  22.     // 線程run函數(shù)   
  23.     public void run()   
  24.     {   
  25.         produce(num);   
  26.     }   
  27.    
  28.     // 調(diào)用倉庫Storage的生產(chǎn)函數(shù)   
  29.     public void produce(int num)   
  30.     {   
  31.         storage.produce(num);   
  32.     }   
  33.    
  34.     // get/set方法   
  35.     public int getNum()   
  36.     {   
  37.         return num;   
  38.     }   
  39.    
  40.     public void setNum(int num)   
  41.     {   
  42.         this.num = num;   
  43.     }   
  44.    
  45.     public Storage getStorage()   
  46.     {   
  47.         return storage;   
  48.     }   
  49.    
  50.     public void setStorage(Storage storage)   
  51.     {   
  52.         this.storage = storage;   
  53.     }   

 

3、消費者

 

  1. /**  
  2.  * 消費者類Consumer繼承線程類Thread  
  3.  *   
  4.  *   
  5.  * @author zcr 
  6.  *   
  7.  */   
  8. public class Consumer extends Thread   
  9. {   
  10.     // 每次消費的產(chǎn)品數(shù)量   
  11.     private int num;   
  12.    
  13.     // 所在放置的倉庫   
  14.     private Storage storage;   
  15.    
  16.     // 構(gòu)造函數(shù),設(shè)置倉庫   
  17.     public Consumer(Storage storage)   
  18.     {   
  19.         this.storage = storage;   
  20.     }   
  21.    
  22.     // 線程run函數(shù)   
  23.     public void run()   
  24.     {   
  25.         consume(num);   
  26.     }   
  27.    
  28.     // 調(diào)用倉庫Storage的生產(chǎn)函數(shù)   
  29.     public void consume(int num)   
  30.     {   
  31.         storage.consume(num);   
  32.     }   
  33.    
  34.     // get/set方法   
  35.     public int getNum()   
  36.     {   
  37.         return num;   
  38.     }   
  39.    
  40.     public void setNum(int num)   
  41.     {   
  42.         this.num = num;   
  43.     }   
  44.    
  45.     public Storage getStorage()   
  46.     {   
  47.         return storage;   
  48.     }   
  49.    
  50.     public void setStorage(Storage storage)   
  51.     {   
  52.         this.storage = storage;   
  53.     }   

4、測試類

  1. /**  
  2.  * 測試類Test  
  3.  * @author zcr 
  4.  *   
  5.  */   
  6. public class Test   
  7. {   
  8.     public static void main(String[] args)   
  9.     {   
  10.         // 倉庫對象   
  11.         Storage storage = new Storage();   
  12.    
  13.         // 生產(chǎn)者對象   
  14.         Producer p1 = new Producer(storage);   
  15.         Producer p2 = new Producer(storage);   
  16.         Producer p3 = new Producer(storage);   
  17.         Producer p4 = new Producer(storage);   
  18.         Producer p5 = new Producer(storage);   
  19.         Producer p6 = new Producer(storage);   
  20.         Producer p7 = new Producer(storage);   
  21.    
  22.         // 消費者對象   
  23.         Consumer c1 = new Consumer(storage);   
  24.         Consumer c2 = new Consumer(storage);   
  25.         Consumer c3 = new Consumer(storage);   
  26.    
  27.         // 設(shè)置生產(chǎn)者產(chǎn)品生產(chǎn)數(shù)量   
  28.         p1.setNum(10);   
  29.         p2.setNum(10);   
  30.         p3.setNum(10);   
  31.         p4.setNum(10);   
  32.         p5.setNum(10);   
  33.         p6.setNum(10);   
  34.         p7.setNum(80);   
  35.    
  36.         // 設(shè)置消費者產(chǎn)品消費數(shù)量   
  37.         c1.setNum(50);   
  38.         c2.setNum(20);   
  39.         c3.setNum(30);   
  40.    
  41.         // 線程開始執(zhí)行   
  42.         c1.start();   
  43.         c2.start();   
  44.         c3.start();   
  45.         p1.start();   
  46.         p2.start();   
  47.         p3.start();   
  48.         p4.start();   
  49.         p5.start();   
  50.         p6.start();   
  51.         p7.start();   
  52.     }   

 

5、結(jié)果:

  1. 【要消費的產(chǎn)品數(shù)量】:50     【庫存量】:0      暫時不能執(zhí)行生產(chǎn)任務(wù)! 
  2. 【要消費的產(chǎn)品數(shù)量】:20     【庫存量】:0      暫時不能執(zhí)行生產(chǎn)任務(wù)! 
  3. 【已經(jīng)生產(chǎn)產(chǎn)品數(shù)】:10     【現(xiàn)倉儲量為】:10 
  4. 【要消費的產(chǎn)品數(shù)量】:20     【庫存量】:10      暫時不能執(zhí)行生產(chǎn)任務(wù)! 
  5. 【要消費的產(chǎn)品數(shù)量】:50     【庫存量】:10      暫時不能執(zhí)行生產(chǎn)任務(wù)! 
  6. 【已經(jīng)生產(chǎn)產(chǎn)品數(shù)】:10     【現(xiàn)倉儲量為】:20 
  7. 【已經(jīng)生產(chǎn)產(chǎn)品數(shù)】:10     【現(xiàn)倉儲量為】:30 
  8. 【要消費的產(chǎn)品數(shù)量】:50     【庫存量】:30      暫時不能執(zhí)行生產(chǎn)任務(wù)! 
  9. 【已經(jīng)消費產(chǎn)品數(shù)】:20      【現(xiàn)倉儲量為】:10 
  10. 【已經(jīng)生產(chǎn)產(chǎn)品數(shù)】:10     【現(xiàn)倉儲量為】:20 
  11. 【已經(jīng)生產(chǎn)產(chǎn)品數(shù)】:10     【現(xiàn)倉儲量為】:30 
  12. 【要消費的產(chǎn)品數(shù)量】:50     【庫存量】:30      暫時不能執(zhí)行生產(chǎn)任務(wù)! 
  13. 【已經(jīng)生產(chǎn)產(chǎn)品數(shù)】:10     【現(xiàn)倉儲量為】:40 
  14. 【已經(jīng)消費產(chǎn)品數(shù)】:30      【現(xiàn)倉儲量為】:10 
  15. 【要消費的產(chǎn)品數(shù)量】:50     【庫存量】:10      暫時不能執(zhí)行生產(chǎn)任務(wù)! 
  16. 【已經(jīng)生產(chǎn)產(chǎn)品數(shù)】:80     【現(xiàn)倉儲量為】:90 
  17. 【已經(jīng)消費產(chǎn)品數(shù)】:50      【現(xiàn)倉儲量為】:40 

 

  看完上述代碼,對wait() / notify()方法實現(xiàn)的同步有了了解。你可能會對Storage類中為什么要定義public void produce(int num);和public void consume(int num);方法感到不解,為什么不直接在生產(chǎn)者類Producer和消費者類Consumer中實現(xiàn)這兩個方法,卻要調(diào)用Storage類中的實現(xiàn)呢?淡定,后文會有解釋。我們先往下走。


 二、await() / signal()方法

  在JDK5.0之后,Java提供了更加健壯的線程處理機制,包括同步、鎖定、線程池等,它們可以實現(xiàn)更細粒度的線程控制。await()和 signal()就是其中用來做同步的兩種方法,它們的功能基本上和wait() / nofity()相同,完全可以取代它們,但是它們和新引入的鎖定機制Lock直接掛鉤,具有更大的靈活性。通過在Lock對象上調(diào)用newCondition()方法,將條件變量和一個鎖對象進行綁定,進而控制并發(fā)程序訪問競爭資源的安全。

  下面來看代碼:  

  只需更新倉庫類Storage的代碼即可,生產(chǎn)者Producer、消費者Consumer、測試類Test的代碼均不需要進行任何更改。

倉庫類

  1. import java.util.LinkedList; 
  2. import java.util.concurrent.locks.Condition; 
  3. import java.util.concurrent.locks.Lock; 
  4. import java.util.concurrent.locks.ReentrantLock; 
  5.  
  6. /** 
  7.  * 倉庫類Storage實現(xiàn)緩沖區(qū) 
  8.  *  
  9.  *  
  10.  * @author MONKEY.D.MENG 2011-03-15 
  11.  *  
  12.  */ 
  13. public class Storage 
  14.     // 倉庫***存儲量 
  15.     private final int MAX_SIZE = 100
  16.  
  17.     // 倉庫存儲的載體 
  18.     private LinkedList<Object> list = new LinkedList<Object>(); 
  19.  
  20.     // 鎖 
  21.     private final Lock lock = new ReentrantLock(); 
  22.  
  23.     // 倉庫滿的條件變量 
  24.     private final Condition full = lock.newCondition(); 
  25.  
  26.     // 倉庫空的條件變量 
  27.     private final Condition empty = lock.newCondition(); 
  28.  
  29.     // 生產(chǎn)num個產(chǎn)品 
  30.     public void produce(int num) 
  31.     { 
  32.         // 獲得鎖 
  33.         lock.lock(); 
  34.  
  35.         // 如果倉庫剩余容量不足 
  36.         while (list.size() + num > MAX_SIZE) 
  37.         { 
  38.             System.out.println("【要生產(chǎn)的產(chǎn)品數(shù)量】:" + num + "/t【庫存量】:" + list.size() 
  39.                     + "/t暫時不能執(zhí)行生產(chǎn)任務(wù)!"); 
  40.             try 
  41.             { 
  42.                 // 由于條件不滿足,生產(chǎn)阻塞 
  43.                 full.await(); 
  44.             } 
  45.             catch (InterruptedException e) 
  46.             { 
  47.                 e.printStackTrace(); 
  48.             } 
  49.         } 
  50.  
  51.         // 生產(chǎn)條件滿足情況下,生產(chǎn)num個產(chǎn)品 
  52.         for (int i = 1; i <= num; ++i) 
  53.         { 
  54.             list.add(new Object()); 
  55.         } 
  56.  
  57.         System.out.println("【已經(jīng)生產(chǎn)產(chǎn)品數(shù)】:" + num + "/t【現(xiàn)倉儲量為】:" + list.size()); 
  58.  
  59.         // 喚醒其他所有線程 
  60.         full.signalAll(); 
  61.         empty.signalAll(); 
  62.  
  63.         // 釋放鎖 
  64.         lock.unlock(); 
  65.     } 
  66.  
  67.     // 消費num個產(chǎn)品 
  68.     public void consume(int num) 
  69.     { 
  70.         // 獲得鎖 
  71.         lock.lock(); 
  72.  
  73.         // 如果倉庫存儲量不足 
  74.         while (list.size() < num) 
  75.         { 
  76.             System.out.println("【要消費的產(chǎn)品數(shù)量】:" + num + "/t【庫存量】:" + list.size() 
  77.                     + "/t暫時不能執(zhí)行生產(chǎn)任務(wù)!"); 
  78.             try 
  79.             { 
  80.                 // 由于條件不滿足,消費阻塞 
  81.                 empty.await(); 
  82.             } 
  83.             catch (InterruptedException e) 
  84.             { 
  85.                 e.printStackTrace(); 
  86.             } 
  87.         } 
  88.  
  89.         // 消費條件滿足情況下,消費num個產(chǎn)品 
  90.         for (int i = 1; i <= num; ++i) 
  91.         { 
  92.             list.remove(); 
  93.         } 
  94.  
  95.         System.out.println("【已經(jīng)消費產(chǎn)品數(shù)】:" + num + "/t【現(xiàn)倉儲量為】:" + list.size()); 
  96.  
  97.         // 喚醒其他所有線程 
  98.         full.signalAll(); 
  99.         empty.signalAll(); 
  100.  
  101.         // 釋放鎖 
  102.         lock.unlock(); 
  103.     } 
  104.  
  105.     // set/get方法 
  106.     public int getMAX_SIZE() 
  107.     { 
  108.         return MAX_SIZE; 
  109.     } 
  110.  
  111.     public LinkedList<Object> getList() 
  112.     { 
  113.         return list; 
  114.     } 
  115.  
  116.     public void setList(LinkedList<Object> list) 
  117.     { 
  118.         this.list = list; 
  119.     } 

 

結(jié)果:

 

  1. 【要消費的產(chǎn)品數(shù)量】:50/t【庫存量】:0/t暫時不能執(zhí)行生產(chǎn)任務(wù)! 
  2. 【已經(jīng)生產(chǎn)產(chǎn)品數(shù)】:10/t【現(xiàn)倉儲量為】:10 
  3. 【已經(jīng)生產(chǎn)產(chǎn)品數(shù)】:10/t【現(xiàn)倉儲量為】:20 
  4. 【要消費的產(chǎn)品數(shù)量】:30/t【庫存量】:20/t暫時不能執(zhí)行生產(chǎn)任務(wù)! 
  5. 【已經(jīng)消費產(chǎn)品數(shù)】:20/t【現(xiàn)倉儲量為】:0 
  6. 【已經(jīng)生產(chǎn)產(chǎn)品數(shù)】:10/t【現(xiàn)倉儲量為】:10 
  7. 【已經(jīng)生產(chǎn)產(chǎn)品數(shù)】:10/t【現(xiàn)倉儲量為】:20 
  8. 【要消費的產(chǎn)品數(shù)量】:50/t【庫存量】:20/t暫時不能執(zhí)行生產(chǎn)任務(wù)! 
  9. 【已經(jīng)生產(chǎn)產(chǎn)品數(shù)】:10/t【現(xiàn)倉儲量為】:30 
  10. 【已經(jīng)生產(chǎn)產(chǎn)品數(shù)】:10/t【現(xiàn)倉儲量為】:40 
  11. 【要生產(chǎn)的產(chǎn)品數(shù)量】:80/t【庫存量】:40/t暫時不能執(zhí)行生產(chǎn)任務(wù)! 
  12. 【已經(jīng)消費產(chǎn)品數(shù)】:30/t【現(xiàn)倉儲量為】:10 
  13. 【要消費的產(chǎn)品數(shù)量】:50/t【庫存量】:10/t暫時不能執(zhí)行生產(chǎn)任務(wù)! 
  14. 【已經(jīng)生產(chǎn)產(chǎn)品數(shù)】:80/t【現(xiàn)倉儲量為】:90 
  15. 【已經(jīng)消費產(chǎn)品數(shù)】:50/t【現(xiàn)倉儲量為】:40

  這樣我們就知道為神馬我要在Storage類中定義public void produce(int num);和public void consume(int num);方法,并在生產(chǎn)者類Producer和消費者類Consumer中調(diào)用Storage類中的實現(xiàn)了吧。將可能發(fā)生的變化集中到一個類中,不影響原有的構(gòu)架設(shè)計,同時無需修改其他業(yè)務(wù)層代碼。

總結(jié)

  兩種方式原理一致,都是對獨占空間加鎖,阻塞和喚醒線程,***種方式比較傳統(tǒng),第二種方式速度比較快。

  致謝:感謝您的耐心閱讀!

 
責(zé)任編輯:王雪燕 來源: 博客園
相關(guān)推薦

2009-08-13 13:14:31

C#生產(chǎn)者和消費者

2012-02-14 12:31:27

Java

2021-12-22 11:00:05

模型Golang語言

2021-08-31 10:26:24

存儲

2017-05-16 12:30:21

Python多線程生產(chǎn)者消費者模式

2024-03-14 11:58:43

2024-10-11 09:27:52

2024-08-27 10:19:31

2021-04-20 08:32:51

消息MQ隊列

2021-12-28 12:01:59

Kafka 消費者機制

2020-09-14 08:45:58

多線程模型面試

2023-06-01 08:08:38

kafka消費者分區(qū)策略

2015-06-15 11:29:34

數(shù)據(jù)中心綠色數(shù)據(jù)中心

2022-07-07 09:00:49

RocketMQ消費者消息消費

2011-07-22 16:25:38

CA TechnoloIT消費化

2011-08-05 16:21:24

2019-02-18 08:28:31

2011-11-15 10:05:29

Kindle Fire平板市場

2018-07-13 04:27:18

2009-04-15 11:17:23

點贊
收藏

51CTO技術(shù)棧公眾號