自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

一文吃透生產(chǎn)者和消費(fèi)者模型!

開(kāi)發(fā) 前端
簡(jiǎn)單的說(shuō),生產(chǎn)者和消費(fèi)者之間不直接進(jìn)行交互,而是通過(guò)一個(gè)緩沖區(qū)來(lái)進(jìn)行交互,生產(chǎn)者負(fù)責(zé)生成數(shù)據(jù),然后存入緩沖區(qū);消費(fèi)者則負(fù)責(zé)處理數(shù)據(jù),從緩沖區(qū)獲取。

01、背景介紹

在 Java 多線程編程中,還有一個(gè)非常重要的設(shè)計(jì)模式,它就是:生產(chǎn)者和消費(fèi)者模型。

這種模型可以充分發(fā)揮 cpu 的多線程特性,通過(guò)一些平衡手段能有效的提升系統(tǒng)整體處理數(shù)據(jù)的速度,減輕系統(tǒng)負(fù)載,提高程序的效率和穩(wěn)定性,同時(shí)實(shí)現(xiàn)模塊之間的解耦。

那什么是生產(chǎn)者和消費(fèi)者模型呢?

簡(jiǎn)單的說(shuō),生產(chǎn)者和消費(fèi)者之間不直接進(jìn)行交互,而是通過(guò)一個(gè)緩沖區(qū)來(lái)進(jìn)行交互,生產(chǎn)者負(fù)責(zé)生成數(shù)據(jù),然后存入緩沖區(qū);消費(fèi)者則負(fù)責(zé)處理數(shù)據(jù),從緩沖區(qū)獲取。

大致流程圖如下:

圖片圖片

對(duì)于最簡(jiǎn)單的生產(chǎn)者和消費(fèi)者模型,總結(jié)下來(lái),大概有以下幾個(gè)特點(diǎn):

  • 緩沖區(qū)為空的時(shí)候,消費(fèi)者不能消費(fèi),會(huì)進(jìn)入休眠狀態(tài),直到有新數(shù)據(jù)進(jìn)入緩沖區(qū),再次被喚醒
  • 緩沖區(qū)填滿的時(shí)候,生產(chǎn)者不能生產(chǎn),也會(huì)進(jìn)入休眠狀態(tài),直到緩沖區(qū)有空間,再次被喚醒

生產(chǎn)者和消費(fèi)者模型作為一個(gè)非常重要的設(shè)計(jì)模型,它的優(yōu)點(diǎn)在于:

  • 解耦:生產(chǎn)者和消費(fèi)者之間不直接進(jìn)行交互,即使生產(chǎn)者和消費(fèi)者的代碼發(fā)生變化,也不會(huì)對(duì)對(duì)方產(chǎn)生影響
  • 消峰:例如在某項(xiàng)工作中,假如 A 操作生產(chǎn)數(shù)據(jù)的速度很快,B 操作處理速度很慢,那么 A 操作就必須等待 B 操作完成才能結(jié)束,反之亦然。如果將 A 操作和B 操作進(jìn)行解耦,中間插入一個(gè)緩沖區(qū),這樣 A 操作將生產(chǎn)的數(shù)據(jù)存入緩沖區(qū),就接受了;B 操作從緩沖區(qū)獲取數(shù)據(jù)并進(jìn)行處理,平衡好 A 操作和 B 操作之間的緩沖區(qū),可以顯著提升系統(tǒng)的數(shù)據(jù)處理能力

生產(chǎn)者和消費(fèi)者模型的應(yīng)用場(chǎng)景非常多,例如 Java 的線程池任務(wù)執(zhí)行框架、消息中間件 rabbitMQ 等,因此掌握生產(chǎn)者和消費(fèi)者模型,對(duì)于開(kāi)發(fā)者至關(guān)重要。

下面我們通過(guò)幾個(gè)案例,一起來(lái)了解一下生產(chǎn)者和消費(fèi)者設(shè)計(jì)模型的實(shí)踐思路。

02、代碼實(shí)踐

2.1、利用 wait / notify 方法實(shí)現(xiàn)思路

生產(chǎn)者和消費(fèi)者模型,最簡(jiǎn)單的一種技術(shù)實(shí)踐方案就是基于線程的 wait() / notify() 方法,也就是通知和喚醒機(jī)制,可以將兩個(gè)操作實(shí)現(xiàn)解耦,具體代碼實(shí)踐如下。

/**
 * 緩沖區(qū)容器類
 */
public class Container {

    /**
     * 緩沖區(qū)最大容量
     */
    private int capacity = 3;

    /**
     * 緩沖區(qū)
     */
    private LinkedList<Integer> list = new LinkedList<Integer>();


    /**
     * 添加數(shù)據(jù)到緩沖區(qū)
     * @param value
     */
    public synchronized void add(Integer value) {
        if(list.size() >= capacity){
            System.out.println("生產(chǎn)者:"+ Thread.currentThread().getName()+",緩沖區(qū)已滿,生產(chǎn)者進(jìn)入waiting...");
            try {
                // 進(jìn)入等待狀態(tài)
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("生產(chǎn)者:"+ Thread.currentThread().getName()+",add:" + value);
        list.add(value);

        //喚醒其他所有處于wait()的線程,包括消費(fèi)者和生產(chǎn)者
        notifyAll();
    }


    /**
     * 從緩沖區(qū)獲取數(shù)據(jù)
     */
    public synchronized void get() {
        if(list.size() == 0){
            System.out.println("消費(fèi)者:"+ Thread.currentThread().getName()+",緩沖區(qū)為空,消費(fèi)者進(jìn)入waiting...");
            try {
                // 進(jìn)入等待狀態(tài)
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 從頭部獲取數(shù)據(jù),并移除元素
        Integer val = list.removeFirst();
        System.out.println("消費(fèi)者:"+ Thread.currentThread().getName()+",value:" + val);

        //喚醒其他所有處于wait()的線程,包括消費(fèi)者和生產(chǎn)者
        notifyAll();
    }
}
/**
 * 生產(chǎn)者類
 */
public class Producer extends Thread{

    private Container container;

    public Producer(Container container) {
        this.container = container;
    }

    @Override
    public void run() {
        for (int i = 0; i < 6; i++) {
            container.add(i);
        }
    }
}
/**
 * 消費(fèi)者類
 */
public class Consumer extends Thread{

    private Container container;

    public Consumer(Container container) {
        this.container = container;
    }

    @Override
    public void run() {
        for (int i = 0; i < 6; i++) {
            container.get();
        }
    }
}
/**
 * 測(cè)試類
 */
public class MyThreadTest {

    public static void main(String[] args) {
        Container container = new Container();
        Producer producer = new Producer(container);
        Consumer consumer = new Consumer(container);

        producer.start();
        consumer.start();
    }
}

運(yùn)行結(jié)果如下:

生產(chǎn)者:Thread-0,add:0
生產(chǎn)者:Thread-0,add:1
生產(chǎn)者:Thread-0,add:2
生產(chǎn)者:Thread-0,緩沖區(qū)已滿,生產(chǎn)者進(jìn)入waiting...
消費(fèi)者:Thread-1,value:0
消費(fèi)者:Thread-1,value:1
消費(fèi)者:Thread-1,value:2
消費(fèi)者:Thread-1,緩沖區(qū)為空,消費(fèi)者進(jìn)入waiting...
生產(chǎn)者:Thread-0,add:3
生產(chǎn)者:Thread-0,add:4
生產(chǎn)者:Thread-0,add:5
消費(fèi)者:Thread-1,value:3
消費(fèi)者:Thread-1,value:4
消費(fèi)者:Thread-1,value:5

從日志上可以很清晰的看到,生產(chǎn)者線程生產(chǎn)一批數(shù)據(jù)之后,當(dāng)緩沖區(qū)已經(jīng)滿了,會(huì)進(jìn)入等待狀態(tài),此時(shí)會(huì)通知消費(fèi)者線程;消費(fèi)者線程處理完數(shù)據(jù)之后,當(dāng)緩沖區(qū)沒(méi)有數(shù)據(jù)時(shí),也會(huì)進(jìn)入等待狀態(tài),再次通知生產(chǎn)者線程。

2.2、利用 await / signal 方法實(shí)現(xiàn)思路

除此之外,我們還可以利用ReentrantLock和Condition類中的 await() / signal() 方法實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型。

緩沖區(qū)容器類,具體代碼實(shí)踐如下。

/**
 * 緩沖區(qū)容器類
 */
public class Container {

    private Lock lock = new ReentrantLock();

    private Condition condition = lock.newCondition();

    private int capacity = 3;

    private LinkedList<Integer> list = new LinkedList<Integer>();


    /**
     * 添加數(shù)據(jù)到緩沖區(qū)
     * @param value
     */
    public void add(Integer value) {
        boolean flag = false;
        try {
            flag = lock.tryLock(3, TimeUnit.SECONDS);
            if(list.size() >= capacity){
                System.out.println("生產(chǎn)者:"+ Thread.currentThread().getName()+",緩沖區(qū)已滿,生產(chǎn)者進(jìn)入waiting...");
                // 進(jìn)入等待狀態(tài)
                condition.await();
            }
            System.out.println("生產(chǎn)者:"+ Thread.currentThread().getName()+",add:" + value);
            list.add(value);

            //喚醒其他所有處于wait()的線程,包括消費(fèi)者和生產(chǎn)者
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(flag){
                lock.unlock();
            }
        }
    }


    /**
     * 從緩沖區(qū)獲取數(shù)據(jù)
     */
    public void get() {
        boolean flag = false;
        try {
            flag = lock.tryLock(3, TimeUnit.SECONDS);
            if(list.size() == 0){
                System.out.println("消費(fèi)者:"+ Thread.currentThread().getName()+",緩沖區(qū)為空,消費(fèi)者進(jìn)入waiting...");
                // 進(jìn)入等待狀態(tài)
                condition.await();
            }
            // 從頭部獲取數(shù)據(jù),并移除元素
            Integer val = list.removeFirst();
            System.out.println("消費(fèi)者:"+ Thread.currentThread().getName()+",value:" + val);

            //喚醒其他所有處于wait()的線程,包括消費(fèi)者和生產(chǎn)者
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(flag){
                lock.unlock();
            }
        }
    }
}

生產(chǎn)者、消費(fèi)者、測(cè)試類代碼,跟上文一致,運(yùn)行結(jié)果和上文介紹的也是一樣。

2.3、多生產(chǎn)者和消費(fèi)者的實(shí)現(xiàn)思路

上面介紹的都是一個(gè)生產(chǎn)者線程和一個(gè)消費(fèi)者線程,模型比較簡(jiǎn)單。實(shí)際上,在業(yè)務(wù)開(kāi)發(fā)中,經(jīng)常會(huì)出現(xiàn)多個(gè)生產(chǎn)者線程和多個(gè)消費(fèi)者線程,按照以上的實(shí)現(xiàn)思路,會(huì)出現(xiàn)什么情況呢?

有可能會(huì)出現(xiàn)程序假死現(xiàn)象!下面我們來(lái)分析一下案例,假如有兩個(gè)生產(chǎn)者線程 a1、a2,兩個(gè)消費(fèi)者線程 b1、b2,執(zhí)行過(guò)程如下:

  • 1.生產(chǎn)者線程 a1 執(zhí)行生產(chǎn)數(shù)據(jù)的操作,發(fā)現(xiàn)緩沖區(qū)數(shù)據(jù)已經(jīng)填滿了,然后進(jìn)入等待階段,同時(shí)向外發(fā)起通知,喚醒其它線程
  • 2.因?yàn)榫€程喚醒具有隨機(jī)性,本應(yīng)該喚醒消費(fèi)者線程 b1,結(jié)果可能生產(chǎn)者線程 a2 被喚醒,檢查緩沖區(qū)數(shù)據(jù)已經(jīng)填滿了,又進(jìn)入等待階段,緊接向外發(fā)起通知,消費(fèi)者線程得不到被執(zhí)行的機(jī)會(huì)
  • 3.消費(fèi)者線程 b1、b2,也有可能會(huì)出現(xiàn)這個(gè)現(xiàn)象,本應(yīng)該喚醒生產(chǎn)者線程,結(jié)果喚醒了消費(fèi)者線程

遇到這種情況,應(yīng)該如何解決呢?

因?yàn)镽eentrantLock和Condition的結(jié)合,編程具有高度靈活性,我們可以采用這種組合解決多生產(chǎn)者和多消費(fèi)者中的假死問(wèn)題。

具體實(shí)現(xiàn)邏輯如下:

/**
 * 緩沖區(qū)容器類
 */
public class ContainerDemo {

    private Lock lock = new ReentrantLock();
    private Condition producerCondition = lock.newCondition();
    private Condition consumerCondition = lock.newCondition();

    private int capacity = 3;
    private LinkedList<Integer> list = new LinkedList<Integer>();


    /**
     * 添加數(shù)據(jù)到緩沖區(qū)
     * @param value
     */
    public void add(Integer value) {
        boolean flag = false;
        try {
            flag = lock.tryLock(3, TimeUnit.SECONDS);
            if(list.size() >= capacity){
                System.out.println("生產(chǎn)者:"+ Thread.currentThread().getName()+",緩沖區(qū)已滿,生產(chǎn)者進(jìn)入waiting...");
                // 生產(chǎn)者進(jìn)入等待狀態(tài)
                producerCondition.await();
            }
            System.out.println("生產(chǎn)者:"+ Thread.currentThread().getName()+",add:" + value);
            list.add(value);

            // 喚醒所有消費(fèi)者處于wait()的線程
            consumerCondition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(flag){
                lock.unlock();
            }
        }
    }


    /**
     * 從緩沖區(qū)獲取數(shù)據(jù)
     */
    public void get() {
        boolean flag = false;
        try {
            flag = lock.tryLock(3, TimeUnit.SECONDS);
            if(list.size() == 0){
                System.out.println("消費(fèi)者:"+ Thread.currentThread().getName()+",緩沖區(qū)為空,消費(fèi)者進(jìn)入waiting...");
                // 消費(fèi)者進(jìn)入等待狀態(tài)
                consumerCondition.await();
            }
            // 從頭部獲取數(shù)據(jù),并移除元素
            Integer val = list.removeFirst();
            System.out.println("消費(fèi)者:"+ Thread.currentThread().getName()+",value:" + val);

            // 喚醒所有生產(chǎn)者處于wait()的線程
            producerCondition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(flag){
                lock.unlock();
            }
        }
    }
}
/**
 * 生產(chǎn)者
 */
public class Producer extends Thread{

    private ContainerDemo container;

    private Integer value;

    public Producer(ContainerDemo container, Integer value) {
        this.container = container;
        this.value = value;
    }

    @Override
    public void run() {
        container.add(value);
    }
}
/**
 * 消費(fèi)者
 */
public class Consumer extends Thread{

    private ContainerDemo container;

    public Consumer(ContainerDemo container) {
        this.container = container;
    }

    @Override
    public void run() {
        container.get();
    }
}
/**
 * 測(cè)試類
 */
public class MyThreadTest {

    public static void main(String[] args) {
        ContainerDemo container = new ContainerDemo();

        List<Thread> threadList = new ArrayList<>();
        // 初始化6個(gè)生產(chǎn)者線程
        for (int i = 0; i < 6; i++) {
            threadList.add(new Producer(container, i));
        }
        // 初始化6個(gè)消費(fèi)者線程
        for (int i = 0; i < 6; i++) {
            threadList.add(new Consumer(container));
        }

        // 啟動(dòng)線程
        for (Thread thread : threadList) {
            thread.start();
        }
    }
}

運(yùn)行結(jié)果如下:

生產(chǎn)者:Thread-0,add:0
生產(chǎn)者:Thread-1,add:1
生產(chǎn)者:Thread-2,add:2
生產(chǎn)者:Thread-3,緩沖區(qū)已滿,生產(chǎn)者進(jìn)入waiting...
生產(chǎn)者:Thread-4,緩沖區(qū)已滿,生產(chǎn)者進(jìn)入waiting...
生產(chǎn)者:Thread-5,緩沖區(qū)已滿,生產(chǎn)者進(jìn)入waiting...
消費(fèi)者:Thread-6,value:0
消費(fèi)者:Thread-7,value:1
生產(chǎn)者:Thread-3,add:3
生產(chǎn)者:Thread-4,add:4
生產(chǎn)者:Thread-5,add:5
消費(fèi)者:Thread-8,value:2
消費(fèi)者:Thread-9,value:3
消費(fèi)者:Thread-10,value:4
消費(fèi)者:Thread-11,value:5

通過(guò)ReentrantLock定義兩個(gè)Condition,一個(gè)表示生產(chǎn)者的Condition,一個(gè)表示消費(fèi)者的Condition,喚醒的時(shí)候調(diào)用對(duì)應(yīng)的signalAll()方法就可以解決假死現(xiàn)象。

03、小結(jié)

最后我們來(lái)總結(jié)一下,對(duì)于生產(chǎn)者和消費(fèi)者模型,通過(guò)合理的編程實(shí)現(xiàn),可以充分充分發(fā)揮 cpu 多線程的特性,顯著的提升系統(tǒng)處理數(shù)據(jù)的效率。

對(duì)于生產(chǎn)者和消費(fèi)者模型中的假死現(xiàn)象,可以使用ReentrantLock定義兩個(gè)Condition,進(jìn)行交叉喚醒,以解決假死問(wèn)題。

責(zé)任編輯:武曉燕 來(lái)源: 潘志的研發(fā)筆記
相關(guān)推薦

2021-04-20 08:32:51

消息MQ隊(duì)列

2009-08-13 13:14:31

C#生產(chǎn)者和消費(fèi)者

2021-12-22 11:00:05

模型Golang語(yǔ)言

2015-08-26 09:39:30

java消費(fèi)者

2024-03-14 11:58:43

2012-02-14 12:31:27

Java

2017-05-16 12:30:21

Python多線程生產(chǎn)者消費(fèi)者模式

2021-08-31 10:26:24

存儲(chǔ)

2021-09-09 06:55:43

kafka冪等生產(chǎn)者

2020-09-14 08:45:58

多線程模型面試

2024-08-27 10:19:31

2021-12-28 12:01:59

Kafka 消費(fèi)者機(jī)制

2023-06-01 08:08:38

kafka消費(fèi)者分區(qū)策略

2015-06-15 11:29:34

數(shù)據(jù)中心綠色數(shù)據(jù)中心

2022-07-07 09:00:49

RocketMQ消費(fèi)者消息消費(fèi)

2011-07-22 16:25:38

CA TechnoloIT消費(fèi)化

2011-08-05 16:21:24

2011-11-15 10:05:29

Kindle Fire平板市場(chǎng)

2009-04-15 11:17:23

2018-05-16 23:37:55

攜號(hào)轉(zhuǎn)網(wǎng)運(yùn)營(yíng)商網(wǎng)絡(luò)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)