自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

阻塞隊(duì)列BlockingQueue,看完就會(huì)了

開(kāi)發(fā) 前端
當(dāng)生產(chǎn)者嘗試向已滿(mǎn)的有界隊(duì)列添加元素時(shí),添加方法(比如put()),操作可能會(huì)阻塞,直到隊(duì)列中有可用空間。這種特性使得有界隊(duì)列在某些場(chǎng)景下,能自動(dòng)實(shí)現(xiàn)限流,避免系統(tǒng)資源過(guò)度消耗。

在Java并發(fā)編程中,生產(chǎn)者-消費(fèi)者問(wèn)題是一個(gè)常見(jiàn)的需求。java.util.concurrent.BlockingQueue接口為解決這類(lèi)問(wèn)題提供了強(qiáng)大便捷的支持。

不僅提供了在多線(xiàn)程環(huán)境下安全添加和獲取元素的方法,還通過(guò)阻塞機(jī)制確保了生產(chǎn)者和消費(fèi)者之間的協(xié)調(diào)。

接下來(lái),我們一起看看BlockingQueue的特性、方法以及如何使用它構(gòu)建高效的多線(xiàn)程應(yīng)用程序。

一、BlockingQueue類(lèi)型

(一)無(wú)界隊(duì)列

無(wú)界隊(duì)列可以在理論上無(wú)限增長(zhǎng),在Java中創(chuàng)建無(wú)界BlockingQueue非常簡(jiǎn)便:

BlockingQueue<String> unboundedQueue = new LinkedBlockingDeque<>();

此隊(duì)列的容量默認(rèn)是Integer.MAX_VALUE,雖然有默認(rèn)邊界,但在實(shí)際應(yīng)用中,若生產(chǎn)者持續(xù)快速生產(chǎn)元素,而消費(fèi)者無(wú)法及時(shí)消費(fèi),可能導(dǎo)致內(nèi)存占用不斷增加,最終引發(fā)OOM。所以說(shuō),雖然默認(rèn)有界,實(shí)際相當(dāng)于無(wú)界。

(二)有界隊(duì)列

有界隊(duì)列則具有明確的最大容量限制,創(chuàng)建方式如下:

BlockingQueue<Integer> boundedQueue = new LinkedBlockingDeque<>(10);

這里創(chuàng)建了一個(gè)容量為10的BlockingQueue。

當(dāng)生產(chǎn)者嘗試向已滿(mǎn)的有界隊(duì)列添加元素時(shí),添加方法(比如put()),操作可能會(huì)阻塞,直到隊(duì)列中有可用空間。這種特性使得有界隊(duì)列在某些場(chǎng)景下,能自動(dòng)實(shí)現(xiàn)限流,避免系統(tǒng)資源過(guò)度消耗。

二、BlockingQueue的方法

(一)添加元素

add(E e)

嘗試將指定元素添加到隊(duì)列中。若添加成功,返回true;若隊(duì)列已滿(mǎn),則拋出IllegalStateException異常。比如:

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
try {
    boolean success = queue.add(10);
    if (success) {
        System.out.println("元素添加成功");
    }
} catch (IllegalStateException e) {
    System.out.println("隊(duì)列已滿(mǎn),添加元素失敗");
}

put(E e)

將元素插入隊(duì)列,如果隊(duì)列已滿(mǎn),則阻塞當(dāng)前線(xiàn)程,直到有可用空間。比如:

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
try {
    queue.put(20);
    System.out.println("元素已放入隊(duì)列");
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    System.out.println("線(xiàn)程被中斷,添加元素失敗");
}

offer(E e)

嘗試將元素添加到隊(duì)列中。若添加成功,返回true;若隊(duì)列已滿(mǎn),則返回false。比如:

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
boolean result = queue.offer(30);
if (result) {
    System.out.println("元素添加成功");
} else {
    System.out.println("隊(duì)列已滿(mǎn),添加元素失敗");
}

offer(E e, long timeout, TimeUnit unit)

在指定的超時(shí)時(shí)間內(nèi)嘗試將元素插入隊(duì)列。若在超時(shí)時(shí)間內(nèi)成功插入,返回true;否則返回false。比如:

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
try {
    boolean success = queue.offer(40, 2, TimeUnit.SECONDS);
    if (success) {
        System.out.println("元素在超時(shí)時(shí)間內(nèi)添加成功");
    } else {
        System.out.println("在超時(shí)時(shí)間內(nèi)未能添加元素,隊(duì)列可能已滿(mǎn)");
    }
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    System.out.println("線(xiàn)程被中斷,添加元素失敗");
}

(二)檢索元素

take()

從隊(duì)列中獲取并移除頭部元素。如果隊(duì)列為空,當(dāng)前線(xiàn)程將被阻塞,直到有元素可用。

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
// 假設(shè)隊(duì)列中已有元素
try {
    Integer element = queue.take();
    System.out.println("取出的元素為: " + element);
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    System.out.println("線(xiàn)程被中斷,獲取元素失敗");
}

poll(long timeout, TimeUnit unit)

檢索并移除隊(duì)列頭部元素。若在指定的超時(shí)時(shí)間內(nèi)有元素可用,則返回該元素;若超時(shí)時(shí)間內(nèi)仍無(wú)元素可用,則返回null。

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
try {
    Integer element = queue.poll(1, TimeUnit.SECONDS);
    if (element!= null) {
        System.out.println("在超時(shí)時(shí)間內(nèi)取出的元素為: " + element);
    } else {
        System.out.println("在超時(shí)時(shí)間內(nèi)未獲取到元素,隊(duì)列可能為空");
    }
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    System.out.println("線(xiàn)程被中斷,獲取元素失敗");
}

三、多線(xiàn)程生產(chǎn)者-消費(fèi)者

我們將模擬一個(gè)簡(jiǎn)單的生產(chǎn)-消費(fèi)場(chǎng)景,假設(shè)有一個(gè)消息隊(duì)列,多個(gè)生產(chǎn)者線(xiàn)程不斷向隊(duì)列中生產(chǎn)消息(這里簡(jiǎn)化為隨機(jī)整數(shù)),多個(gè)消費(fèi)者線(xiàn)程從隊(duì)列中獲取消息并進(jìn)行處理(這里簡(jiǎn)化為打印消息和線(xiàn)程名)。為了能夠有效結(jié)束,增加poisonPill參數(shù)。

首先是生產(chǎn)者:

public class Producer implements Runnable {
    private final BlockingQueue<Integer> queue;
    private final int poisonPill;
    private final int poisonPillPerProducer;

    public Producer(BlockingQueue<Integer> queue, int poisonPill, int poisonPillPerProducer) {
        this.queue = queue;
        this.poisonPill = poisonPill;
        this.poisonPillPerProducer = poisonPillPerProducer;
    }

    @Override
    public void run() {
        try {
            produceMessages();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void produceMessages() throws InterruptedException {
        Random random = new Random();
        for (int i = 0; i < 100; i++) {
            int message = random.nextInt(100);
            queue.put(message);
        }
        for (int j = 0; j < poisonPillPerProducer; j++) {
            queue.put(poisonPill);
        }
    }
}

生產(chǎn)者構(gòu)造函數(shù)接受一個(gè)BlockingQueue用于與消費(fèi)者通信,還接受poisonPill和每個(gè)生產(chǎn)者應(yīng)發(fā)送poisonPill值的數(shù)量。在produceMessages方法中,先生產(chǎn)100個(gè)隨機(jī)消息放入隊(duì)列,然后發(fā)送指定數(shù)量的毒丸。

接著是消費(fèi)者:

public class Consumer implements Runnable {
    private final BlockingQueue<Integer> queue;
    private final int poisonPill;

    public Consumer(BlockingQueue<Integer> queue, int poisonPill) {
        this.queue = queue;
        this.poisonPill = poisonPill;
    }

    @Override
    public void run() {
        try {
            consumeMessages();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void consumeMessages() throws InterruptedException {
        while (true) {
            Integer message = queue.take();
            if (message.equals(poisonPill)) {
                return;
            }
            System.out.println(Thread.currentThread().getName() + " 消費(fèi)消息: " + message);
        }
    }
}

消費(fèi)者構(gòu)造函數(shù)接受BlockingQueue和poisonPill值。在consumeMessages方法中,不斷從隊(duì)列獲取消息,如果是等于poisonPill則結(jié)束消費(fèi),否則打印消息和線(xiàn)程名。

最后是主程序類(lèi)來(lái)啟動(dòng)生產(chǎn)者和消費(fèi)者線(xiàn)程:

final int BOUND = 10;
final int N_PRODUCERS = 3;
final int N_CONSUMERS = 2;
final int poisonPill = Integer.MAX_VALUE;
final int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
final int mod = N_CONSUMERS % N_PRODUCERS;

final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);

for (int i = 0; i < N_PRODUCERS; i++) {
    new Thread(new Producer(queue, poisonPill, poisonPillPerProducer)).start();
}

for (int j = 0; j < N_CONSUMERS; j++) {
    new Thread(new Consumer(queue, poisonPill)).start();
}

new Thread(new Producer(queue, poisonPill, poisonPillPerProducer + mod)).start();

在主程序中,定義了隊(duì)列容量、生產(chǎn)者和消費(fèi)者數(shù)量,以及poisonPill值相關(guān)參數(shù),創(chuàng)建了數(shù)量為10的有界BlockingQueue,啟動(dòng)了指定數(shù)量的生產(chǎn)者和消費(fèi)者線(xiàn)程。

當(dāng)運(yùn)行上述程序時(shí),生產(chǎn)者線(xiàn)程會(huì)不斷向隊(duì)列中放入隨機(jī)整數(shù),消費(fèi)者線(xiàn)程會(huì)從隊(duì)列中取出并打印這些整數(shù),同時(shí)每個(gè)消費(fèi)者接收到poisonPill后會(huì)結(jié)束執(zhí)行。

運(yùn)行結(jié)果如下(因?yàn)橛昧穗S機(jī)數(shù),每次效果不同):

Thread-3 消費(fèi)消息: 47
Thread-4 消費(fèi)消息: 3
Thread-3 消費(fèi)消息: 35
Thread-4 消費(fèi)消息: 83
Thread-4 消費(fèi)消息: 68
Thread-4 消費(fèi)消息: 40
Thread-4 消費(fèi)消息: 73
Thread-4 消費(fèi)消息: 56
Thread-4 消費(fèi)消息: 56
...

隨著生產(chǎn)和消費(fèi)的進(jìn)行,最終所有消費(fèi)者線(xiàn)程在接收到poisonPill后停止。

責(zé)任編輯:武曉燕 來(lái)源: 看山的小屋
相關(guān)推薦

2022-05-17 08:24:58

查詢(xún)?nèi)罩?/a>MySQL

2021-08-13 07:56:13

Python虛擬環(huán)境

2017-12-12 13:27:20

主板跳線(xiàn)USB

2020-11-27 09:16:21

BlockingQue

2018-04-27 15:33:59

Python裝飾器

2017-02-09 19:45:07

Linux系統(tǒng)Linux 發(fā)行版

2023-06-30 08:27:20

2022-10-21 08:02:40

reduce?初始值循環(huán)

2020-06-05 18:09:14

TomcatWeb應(yīng)用服務(wù)器

2024-02-20 08:16:10

阻塞隊(duì)列源碼

2020-11-19 07:41:51

ArrayBlocki

2020-11-25 14:28:56

DelayedWork

2020-11-24 09:04:55

PriorityBlo

2020-11-20 06:22:02

LinkedBlock

2023-12-06 07:28:47

阻塞IO異步IO

2022-07-14 08:22:48

Computedvue3

2017-04-12 10:02:21

Java阻塞隊(duì)列原理分析

2024-12-26 07:49:57

Java隊(duì)列線(xiàn)程

2012-06-14 10:34:40

Java阻塞搜索實(shí)例

2020-07-14 07:46:55

NginxIPIP段
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)