阻塞隊(duì)列BlockingQueue,看完就會(huì)了
在Java并發(fā)編程中,生產(chǎn)者-消費(fèi)者問(wèn)題是一個(gè)常見(jiàn)的需求。java.util.concurrent.BlockingQueue接口為解決這類(lèi)問(wèn)題提供了強(qiáng)大便捷的支持。
不僅提供了在多線(xiàn)程環(huán)境下安全添加和獲取元素的方法,還通過(guò)阻塞機(jī)制確保了生產(chǎn)者和消費(fèi)者之間的協(xié)調(diào)。
接下來(lái),我們一起看看BlockingQueue的特性、方法以及如何使用它構(gòu)建高效的多線(xiàn)程應(yīng)用程序。
一、BlockingQueue類(lèi)型
(一)無(wú)界隊(duì)列
無(wú)界隊(duì)列可以在理論上無(wú)限增長(zhǎng),在Java中創(chuàng)建無(wú)界BlockingQueue非常簡(jiǎn)便:
BlockingQueue<String> unboundedQueue = new LinkedBlockingDeque<>();
此隊(duì)列的容量默認(rèn)是Integer.MAX_VALUE,雖然有默認(rèn)邊界,但在實(shí)際應(yīng)用中,若生產(chǎn)者持續(xù)快速生產(chǎn)元素,而消費(fèi)者無(wú)法及時(shí)消費(fèi),可能導(dǎo)致內(nèi)存占用不斷增加,最終引發(fā)OOM。所以說(shuō),雖然默認(rèn)有界,實(shí)際相當(dāng)于無(wú)界。
(二)有界隊(duì)列
有界隊(duì)列則具有明確的最大容量限制,創(chuàng)建方式如下:
BlockingQueue<Integer> boundedQueue = new LinkedBlockingDeque<>(10);
這里創(chuàng)建了一個(gè)容量為10的BlockingQueue。
當(dāng)生產(chǎn)者嘗試向已滿(mǎn)的有界隊(duì)列添加元素時(shí),添加方法(比如put()),操作可能會(huì)阻塞,直到隊(duì)列中有可用空間。這種特性使得有界隊(duì)列在某些場(chǎng)景下,能自動(dòng)實(shí)現(xiàn)限流,避免系統(tǒng)資源過(guò)度消耗。
二、BlockingQueue的方法
(一)添加元素
add(E e)
嘗試將指定元素添加到隊(duì)列中。若添加成功,返回true;若隊(duì)列已滿(mǎn),則拋出IllegalStateException異常。比如:
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
try {
boolean success = queue.add(10);
if (success) {
System.out.println("元素添加成功");
}
} catch (IllegalStateException e) {
System.out.println("隊(duì)列已滿(mǎn),添加元素失敗");
}
put(E e)
將元素插入隊(duì)列,如果隊(duì)列已滿(mǎn),則阻塞當(dāng)前線(xiàn)程,直到有可用空間。比如:
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
try {
queue.put(20);
System.out.println("元素已放入隊(duì)列");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("線(xiàn)程被中斷,添加元素失敗");
}
offer(E e)
嘗試將元素添加到隊(duì)列中。若添加成功,返回true;若隊(duì)列已滿(mǎn),則返回false。比如:
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
boolean result = queue.offer(30);
if (result) {
System.out.println("元素添加成功");
} else {
System.out.println("隊(duì)列已滿(mǎn),添加元素失敗");
}
offer(E e, long timeout, TimeUnit unit)
在指定的超時(shí)時(shí)間內(nèi)嘗試將元素插入隊(duì)列。若在超時(shí)時(shí)間內(nèi)成功插入,返回true;否則返回false。比如:
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
try {
boolean success = queue.offer(40, 2, TimeUnit.SECONDS);
if (success) {
System.out.println("元素在超時(shí)時(shí)間內(nèi)添加成功");
} else {
System.out.println("在超時(shí)時(shí)間內(nèi)未能添加元素,隊(duì)列可能已滿(mǎn)");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("線(xiàn)程被中斷,添加元素失敗");
}
(二)檢索元素
take()
從隊(duì)列中獲取并移除頭部元素。如果隊(duì)列為空,當(dāng)前線(xiàn)程將被阻塞,直到有元素可用。
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
// 假設(shè)隊(duì)列中已有元素
try {
Integer element = queue.take();
System.out.println("取出的元素為: " + element);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("線(xiàn)程被中斷,獲取元素失敗");
}
poll(long timeout, TimeUnit unit)
檢索并移除隊(duì)列頭部元素。若在指定的超時(shí)時(shí)間內(nèi)有元素可用,則返回該元素;若超時(shí)時(shí)間內(nèi)仍無(wú)元素可用,則返回null。
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
try {
Integer element = queue.poll(1, TimeUnit.SECONDS);
if (element!= null) {
System.out.println("在超時(shí)時(shí)間內(nèi)取出的元素為: " + element);
} else {
System.out.println("在超時(shí)時(shí)間內(nèi)未獲取到元素,隊(duì)列可能為空");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("線(xiàn)程被中斷,獲取元素失敗");
}
三、多線(xiàn)程生產(chǎn)者-消費(fèi)者
我們將模擬一個(gè)簡(jiǎn)單的生產(chǎn)-消費(fèi)場(chǎng)景,假設(shè)有一個(gè)消息隊(duì)列,多個(gè)生產(chǎn)者線(xiàn)程不斷向隊(duì)列中生產(chǎn)消息(這里簡(jiǎn)化為隨機(jī)整數(shù)),多個(gè)消費(fèi)者線(xiàn)程從隊(duì)列中獲取消息并進(jìn)行處理(這里簡(jiǎn)化為打印消息和線(xiàn)程名)。為了能夠有效結(jié)束,增加poisonPill參數(shù)。
首先是生產(chǎn)者:
public class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
private final int poisonPill;
private final int poisonPillPerProducer;
public Producer(BlockingQueue<Integer> queue, int poisonPill, int poisonPillPerProducer) {
this.queue = queue;
this.poisonPill = poisonPill;
this.poisonPillPerProducer = poisonPillPerProducer;
}
@Override
public void run() {
try {
produceMessages();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void produceMessages() throws InterruptedException {
Random random = new Random();
for (int i = 0; i < 100; i++) {
int message = random.nextInt(100);
queue.put(message);
}
for (int j = 0; j < poisonPillPerProducer; j++) {
queue.put(poisonPill);
}
}
}
生產(chǎn)者構(gòu)造函數(shù)接受一個(gè)BlockingQueue用于與消費(fèi)者通信,還接受poisonPill和每個(gè)生產(chǎn)者應(yīng)發(fā)送poisonPill值的數(shù)量。在produceMessages方法中,先生產(chǎn)100個(gè)隨機(jī)消息放入隊(duì)列,然后發(fā)送指定數(shù)量的毒丸。
接著是消費(fèi)者:
public class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
private final int poisonPill;
public Consumer(BlockingQueue<Integer> queue, int poisonPill) {
this.queue = queue;
this.poisonPill = poisonPill;
}
@Override
public void run() {
try {
consumeMessages();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void consumeMessages() throws InterruptedException {
while (true) {
Integer message = queue.take();
if (message.equals(poisonPill)) {
return;
}
System.out.println(Thread.currentThread().getName() + " 消費(fèi)消息: " + message);
}
}
}
消費(fèi)者構(gòu)造函數(shù)接受BlockingQueue和poisonPill值。在consumeMessages方法中,不斷從隊(duì)列獲取消息,如果是等于poisonPill則結(jié)束消費(fèi),否則打印消息和線(xiàn)程名。
最后是主程序類(lèi)來(lái)啟動(dòng)生產(chǎn)者和消費(fèi)者線(xiàn)程:
final int BOUND = 10;
final int N_PRODUCERS = 3;
final int N_CONSUMERS = 2;
final int poisonPill = Integer.MAX_VALUE;
final int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
final int mod = N_CONSUMERS % N_PRODUCERS;
final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);
for (int i = 0; i < N_PRODUCERS; i++) {
new Thread(new Producer(queue, poisonPill, poisonPillPerProducer)).start();
}
for (int j = 0; j < N_CONSUMERS; j++) {
new Thread(new Consumer(queue, poisonPill)).start();
}
new Thread(new Producer(queue, poisonPill, poisonPillPerProducer + mod)).start();
在主程序中,定義了隊(duì)列容量、生產(chǎn)者和消費(fèi)者數(shù)量,以及poisonPill值相關(guān)參數(shù),創(chuàng)建了數(shù)量為10的有界BlockingQueue,啟動(dòng)了指定數(shù)量的生產(chǎn)者和消費(fèi)者線(xiàn)程。
當(dāng)運(yùn)行上述程序時(shí),生產(chǎn)者線(xiàn)程會(huì)不斷向隊(duì)列中放入隨機(jī)整數(shù),消費(fèi)者線(xiàn)程會(huì)從隊(duì)列中取出并打印這些整數(shù),同時(shí)每個(gè)消費(fèi)者接收到poisonPill后會(huì)結(jié)束執(zhí)行。
運(yùn)行結(jié)果如下(因?yàn)橛昧穗S機(jī)數(shù),每次效果不同):
Thread-3 消費(fèi)消息: 47
Thread-4 消費(fèi)消息: 3
Thread-3 消費(fèi)消息: 35
Thread-4 消費(fèi)消息: 83
Thread-4 消費(fèi)消息: 68
Thread-4 消費(fèi)消息: 40
Thread-4 消費(fèi)消息: 73
Thread-4 消費(fèi)消息: 56
Thread-4 消費(fèi)消息: 56
...
隨著生產(chǎn)和消費(fèi)的進(jìn)行,最終所有消費(fèi)者線(xiàn)程在接收到poisonPill后停止。