一文帶你徹底掌握阻塞隊(duì)列!
一、摘要
在之前的文章中,我們介紹了生產(chǎn)者和消費(fèi)者模型的最基本實(shí)現(xiàn)思路,相信大家對(duì)它已經(jīng)有一個(gè)初步的認(rèn)識(shí)。
在 Java 的并發(fā)包里面還有一個(gè)非常重要的接口:BlockingQueue。
BlockingQueue是一個(gè)阻塞隊(duì)列,更為準(zhǔn)確的解釋是:BlockingQueue是一個(gè)基于阻塞機(jī)制實(shí)現(xiàn)的線程安全的隊(duì)列。通過(guò)它也可以實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型,并且效率更高、安全可靠,相比之前介紹的生產(chǎn)者和消費(fèi)者模型,它可以同時(shí)實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者并行運(yùn)行。
那什么是阻塞隊(duì)列呢?
簡(jiǎn)單的說(shuō),就是當(dāng)參數(shù)在入隊(duì)和出隊(duì)時(shí),通過(guò)加鎖的方式來(lái)避免線程并發(fā)操作時(shí)導(dǎo)致的數(shù)據(jù)異常問(wèn)題。
在 Java 中,能對(duì)線程并發(fā)執(zhí)行進(jìn)行加鎖的方式主要有synchronized和ReentrantLock,其中BlockingQueue采用的是ReentrantLock方式實(shí)現(xiàn)。
與此對(duì)應(yīng)的還有非阻塞機(jī)制的隊(duì)列,主要是采用 CAS 方式來(lái)控制并發(fā)操作,例如:ConcurrentLinkedQueue,這個(gè)我們?cè)诤竺娴奈恼略龠M(jìn)行分享介紹。
今天我們主要介紹BlockingQueue相關(guān)的知識(shí)和用法,廢話不多說(shuō)了,進(jìn)入正題!
二、BlockingQueue 方法介紹
打開BlockingQueue的源碼,你會(huì)發(fā)現(xiàn)它繼承自Queue,正如上文提到的,它本質(zhì)是一個(gè)隊(duì)列接口。
public interface BlockingQueue<E> extends Queue<E> {
//...省略
}
關(guān)于隊(duì)列,我們?cè)谥暗募舷盗形恼轮袑?duì)此有過(guò)深入的介紹,本篇就再次簡(jiǎn)單的介紹一下。
隊(duì)列其實(shí)是一個(gè)數(shù)據(jù)結(jié)構(gòu),元素遵循先進(jìn)先出的原則,所有新元素的插入,也被稱為入隊(duì)操作,會(huì)插入到隊(duì)列的尾部;元素的移除,也被稱為出隊(duì)操作,會(huì)從隊(duì)列的頭部開始移除,從而保證先進(jìn)先出的原則。
在Queue接口中,總共有 6 個(gè)方法,可以分為 3 類,分別是:插入、移除、查詢,內(nèi)容如下:
方法描述add(e)插入元素,如果插入失敗,就拋異常offer(e)插入元素,如果插入成功,就返回 true;反之 falseremove()移除元素,如果移除失敗,就拋異常poll()移除元素,如果移除成功,返回 true;反之 falseelement()獲取隊(duì)首元素,如果獲取結(jié)果為空,就拋異常peek()獲取隊(duì)首元素,如果獲取結(jié)果為空,返回空對(duì)象
因?yàn)锽lockingQueue是Queue的子接口,了解Queue接口里面的方法,有助于我們對(duì)BlockingQueue的理解。
除此之外,BlockingQueue還單獨(dú)擴(kuò)展了一些特有的方法,內(nèi)容如下:
方法描述put(e)插入元素,如果沒(méi)有插入成功,線程會(huì)一直阻塞,直到隊(duì)列中有空間再繼續(xù)offer(e, time, unit)插入元素,如果在指定的時(shí)間內(nèi)沒(méi)有插入成功,就返回 false;反之 truetake()移除元素,如果沒(méi)有移除成功,線程會(huì)一直阻塞,直到隊(duì)列中新的數(shù)據(jù)被加入poll(time, unit)移除元素,如果在指定的時(shí)間內(nèi)沒(méi)有移除成功,就返回 false;反之 truedrainTo(Collection c, int maxElements)一次性取走隊(duì)列中的數(shù)據(jù)到 c 中,可以指定取的個(gè)數(shù)。該方法可以提升獲取數(shù)據(jù)效率,不需要多次分批加鎖或釋放鎖
分析源碼,你會(huì)發(fā)現(xiàn)相比普通的Queue子類,BlockingQueue子類主要有以下幾個(gè)明顯的不同點(diǎn):
- 1.元素插入和移除時(shí)線程安全:主要是通過(guò)在入隊(duì)和出隊(duì)時(shí)進(jìn)行加鎖,保證了隊(duì)列線程安全,加鎖邏輯采用ReentrantLock實(shí)現(xiàn)
- 2.支持阻塞的入隊(duì)和出隊(duì)方法:當(dāng)隊(duì)列滿時(shí),會(huì)阻塞入隊(duì)的線程,直到隊(duì)列不滿;當(dāng)隊(duì)列為空時(shí),會(huì)阻塞出隊(duì)的線程,直到隊(duì)列中有元素;同時(shí)支持超時(shí)機(jī)制,防止線程一直阻塞
三、BlockingQueue 用法詳解
打開源碼,BlockingQueue接口的實(shí)現(xiàn)類非常多,我們重點(diǎn)講解一下其中的 5 個(gè)非常重要的實(shí)現(xiàn)類,分別如下表所示。
實(shí)現(xiàn)類功能ArrayBlockingQueue基于數(shù)組的阻塞隊(duì)列,使用數(shù)組存儲(chǔ)數(shù)據(jù),需要指定長(zhǎng)度,所以是一個(gè)有界隊(duì)列LinkedBlockingQueue基于鏈表的阻塞隊(duì)列,使用鏈表存儲(chǔ)數(shù)據(jù),默認(rèn)是一個(gè)無(wú)界隊(duì)列;也可以通過(guò)構(gòu)造方法中的capacity設(shè)置最大元素?cái)?shù)量,所以也可以作為有界隊(duì)列SynchronousQueue一種沒(méi)有緩沖的隊(duì)列
生產(chǎn)者產(chǎn)生的數(shù)據(jù)直接會(huì)被消費(fèi)者獲取并且立刻消費(fèi)PriorityBlockingQueue基于優(yōu)先級(jí)別的阻塞隊(duì)列,底層基于數(shù)組實(shí)現(xiàn),是一個(gè)無(wú)界隊(duì)列DelayQueue延遲隊(duì)列,其中的元素只有到了其指定的延遲時(shí)間,才能夠從隊(duì)列中出隊(duì)
下面我們對(duì)以上實(shí)現(xiàn)類的用法,進(jìn)行一一介紹。
3.1、ArrayBlockingQueue
ArrayBlockingQueue是一個(gè)基于數(shù)組的阻塞隊(duì)列,初始化的時(shí)候必須指定隊(duì)列大小,源碼實(shí)現(xiàn)比較簡(jiǎn)單,采用的是ReentrantLock和Condition實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型,部分核心源碼如下:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 使用數(shù)組存儲(chǔ)隊(duì)列中的元素 */
final Object[] items;
/** 使用獨(dú)占鎖ReetrantLock */
final ReentrantLock lock;
/** 等待出隊(duì)的條件 */
private final Condition notEmpty;
/** 等待入隊(duì)的條件 */
private final Condition notFull;
/** 初始化時(shí),需要指定隊(duì)列大小 */
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/** 初始化時(shí),也指出指定是否為公平鎖, */
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**入隊(duì)操作*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
/**出隊(duì)操作*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
}
ArrayBlockingQueue采用ReentrantLock進(jìn)行加鎖,只有一個(gè)ReentrantLock對(duì)象,這意味著生產(chǎn)者和消費(fèi)者無(wú)法并行運(yùn)行。
我們看一個(gè)簡(jiǎn)單的示例代碼如下:
public class Container {
/**
* 初始化阻塞隊(duì)列
*/
private final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
/**
* 添加數(shù)據(jù)到阻塞隊(duì)列
* @param value
*/
public void add(Integer value) {
try {
queue.put(value);
System.out.println("生產(chǎn)者:"+ Thread.currentThread().getName()+",add:" + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 從阻塞隊(duì)列獲取數(shù)據(jù)
*/
public void get() {
try {
Integer value = queue.take();
System.out.println("消費(fèi)者:"+ Thread.currentThread().getName()+",value:" + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 生產(chǎn)者
*/
public class Producer extends Thread {
private Container container;
public Producer(Container container) {
this.container = container;
}
@Override
public void run() {
for (int i = 0; i < 6; i++) {
container.add(i);
}
}
}
/**
* 消費(fèi)者
*/
public class Consumer extends Thread {
private Container container;
public Consumer(Container container) {
this.container = container;
}
@Override
public void run() {
for (int i = 0; i < 6; i++) {
container.get();
}
}
}
/**
* 測(cè)試類
*/
public class MyThreadTest {
public static void main(String[] args) {
Container container = new Container();
Producer producer = new Producer(container);
Consumer consumer = new Consumer(container);
producer.start();
consumer.start();
}
}
運(yùn)行結(jié)果如下:
生產(chǎn)者:Thread-0,add:0
生產(chǎn)者:Thread-0,add:1
生產(chǎn)者:Thread-0,add:2
生產(chǎn)者:Thread-0,add:3
生產(chǎn)者:Thread-0,add:4
生產(chǎn)者:Thread-0,add:5
消費(fèi)者:Thread-1,value:0
消費(fèi)者:Thread-1,value:1
消費(fèi)者:Thread-1,value:2
消費(fèi)者:Thread-1,value:3
消費(fèi)者:Thread-1,value:4
消費(fèi)者:Thread-1,value:5
可以很清晰的看到,生產(chǎn)者線程執(zhí)行完畢之后,消費(fèi)者線程才開始消費(fèi)。
3.2、LinkedBlockingQueue
LinkedBlockingQueue是一個(gè)基于鏈表的阻塞隊(duì)列,初始化的時(shí)候無(wú)須指定隊(duì)列大小,默認(rèn)隊(duì)列長(zhǎng)度為Integer.MAX_VALUE,也就是 int 型最大值。
同樣的,采用的是ReentrantLock和Condition實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型,不同的是它使用了兩個(gè)lock,這意味著生產(chǎn)者和消費(fèi)者可以并行運(yùn)行,程序執(zhí)行效率進(jìn)一步得到提升。
部分核心源碼如下:
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 使用出隊(duì)獨(dú)占鎖ReetrantLock */
private final ReentrantLock takeLock = new ReentrantLock();
/** 等待出隊(duì)的條件 */
private final Condition notEmpty = takeLock.newCondition();
/** 使用入隊(duì)獨(dú)占鎖ReetrantLock */
private final ReentrantLock putLock = new ReentrantLock();
/** 等待入隊(duì)的條件 */
private final Condition notFull = putLock.newCondition();
/**入隊(duì)操作*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
/**出隊(duì)操作*/
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
}
把最上面的樣例Container中的阻塞隊(duì)列實(shí)現(xiàn)類換成LinkedBlockingQueue,調(diào)整如下:
/**
* 初始化阻塞隊(duì)列
*/
private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
再次運(yùn)行結(jié)果如下:
生產(chǎn)者:Thread-0,add:0
消費(fèi)者:Thread-1,value:0
生產(chǎn)者:Thread-0,add:1
消費(fèi)者:Thread-1,value:1
生產(chǎn)者:Thread-0,add:2
消費(fèi)者:Thread-1,value:2
生產(chǎn)者:Thread-0,add:3
生產(chǎn)者:Thread-0,add:4
生產(chǎn)者:Thread-0,add:5
消費(fèi)者:Thread-1,value:3
消費(fèi)者:Thread-1,value:4
消費(fèi)者:Thread-1,value:5
可以很清晰的看到,生產(chǎn)者線程和消費(fèi)者線程,交替并行執(zhí)行。
3.3、SynchronousQueue
SynchronousQueue是一個(gè)沒(méi)有緩沖的隊(duì)列,生產(chǎn)者產(chǎn)生的數(shù)據(jù)直接會(huì)被消費(fèi)者獲取并且立刻消費(fèi),相當(dāng)于傳統(tǒng)的一個(gè)請(qǐng)求對(duì)應(yīng)一個(gè)應(yīng)答模式。
相比ArrayBlockingQueue和LinkedBlockingQueue,SynchronousQueue實(shí)現(xiàn)機(jī)制也不同,它主要采用隊(duì)列和棧來(lái)實(shí)現(xiàn)數(shù)據(jù)的傳遞,中間不存儲(chǔ)任何數(shù)據(jù),生產(chǎn)的數(shù)據(jù)必須得消費(fèi)者處理,線程阻塞方式采用 JDK 提供的LockSupport park/unpark函數(shù)來(lái)完成,也支持公平和非公平兩種模式。
- 當(dāng)采用公平模式時(shí):使用一個(gè) FIFO 隊(duì)列來(lái)管理多余的生產(chǎn)者和消費(fèi)者
- 當(dāng)采用非公平模式時(shí):使用一個(gè) LIFO 棧來(lái)管理多余的生產(chǎn)者和消費(fèi)者,這也是SynchronousQueue默認(rèn)的模式
部分核心源碼如下:
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**不同的策略實(shí)現(xiàn)*/
private transient volatile Transferer<E> transferer;
/**默認(rèn)非公平模式*/
public SynchronousQueue() {
this(false);
}
/**可以選策略,也可以采用公平模式*/
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
/**入隊(duì)操作*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
/**出隊(duì)操作*/
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
}
同樣的,把最上面的樣例Container中的阻塞隊(duì)列實(shí)現(xiàn)類換成SynchronousQueue,代碼如下:
public class Container {
/**
* 初始化阻塞隊(duì)列
*/
private final BlockingQueue<Integer> queue = new SynchronousQueue<>();
/**
* 添加數(shù)據(jù)到阻塞隊(duì)列
* @param value
*/
public void add(Integer value) {
try {
queue.put(value);
Thread.sleep(100);
System.out.println("生產(chǎn)者:"+ Thread.currentThread().getName()+",add:" + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 從阻塞隊(duì)列獲取數(shù)據(jù)
*/
public void get() {
try {
Integer value = queue.take();
Thread.sleep(200);
System.out.println("消費(fèi)者:"+ Thread.currentThread().getName()+",value:" + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
再次運(yùn)行結(jié)果如下:
生產(chǎn)者:Thread-0,add:0
消費(fèi)者:Thread-1,value:0
生產(chǎn)者:Thread-0,add:1
消費(fèi)者:Thread-1,value:1
生產(chǎn)者:Thread-0,add:2
消費(fèi)者:Thread-1,value:2
生產(chǎn)者:Thread-0,add:3
消費(fèi)者:Thread-1,value:3
生產(chǎn)者:Thread-0,add:4
消費(fèi)者:Thread-1,value:4
生產(chǎn)者:Thread-0,add:5
消費(fèi)者:Thread-1,value:5
可以很清晰的看到,生產(chǎn)者線程和消費(fèi)者線程,交替串行執(zhí)行,生產(chǎn)者每投遞一條數(shù)據(jù),消費(fèi)者處理一條數(shù)據(jù)。
3.4、PriorityBlockingQueue
PriorityBlockingQueue是一個(gè)基于優(yōu)先級(jí)別的阻塞隊(duì)列,底層基于數(shù)組實(shí)現(xiàn),可以認(rèn)為是一個(gè)無(wú)界隊(duì)列。
PriorityBlockingQueue與ArrayBlockingQueue的實(shí)現(xiàn)邏輯,基本相似,也是采用ReentrantLock來(lái)實(shí)現(xiàn)加鎖的操作。
最大不同點(diǎn)在于:
- 1.PriorityBlockingQueue內(nèi)部基于數(shù)組實(shí)現(xiàn)的最小二叉堆算法,可以對(duì)隊(duì)列中的元素進(jìn)行排序,插入隊(duì)列的元素需要實(shí)現(xiàn)Comparator或者Comparable接口,以便對(duì)元素進(jìn)行排序
- 2.其次,隊(duì)列的長(zhǎng)度是可擴(kuò)展的,不需要顯式指定長(zhǎng)度,上限為Integer.MAX_VALUE - 8
部分核心源碼如下:
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**隊(duì)列元素*/
private transient Object[] queue;
/**比較器*/
private transient Comparator<? super E> comparator;
/**采用ReentrantLock進(jìn)行加鎖*/
private final ReentrantLock lock;
/**條件等待與通知*/
private final Condition notEmpty;
/**入隊(duì)操作*/
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
/**出隊(duì)操作*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
}
同樣的,把最上面的樣例Container中的阻塞隊(duì)列實(shí)現(xiàn)類換成PriorityBlockingQueue,調(diào)整如下:
/**
* 初始化阻塞隊(duì)列
*/
private final BlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
生產(chǎn)者插入數(shù)據(jù)的內(nèi)容,我們改下插入順序。
/**
* 生產(chǎn)者
*/
public class Producer extends Thread {
private Container container;
public Producer(Container container) {
this.container = container;
}
@Override
public void run() {
container.add(5);
container.add(3);
container.add(1);
container.add(2);
container.add(0);
container.add(4);
}
}
最后運(yùn)行結(jié)果如下:
生產(chǎn)者:Thread-0,add:5
生產(chǎn)者:Thread-0,add:3
生產(chǎn)者:Thread-0,add:1
生產(chǎn)者:Thread-0,add:2
生產(chǎn)者:Thread-0,add:0
生產(chǎn)者:Thread-0,add:4
消費(fèi)者:Thread-1,value:0
消費(fèi)者:Thread-1,value:1
消費(fèi)者:Thread-1,value:2
消費(fèi)者:Thread-1,value:3
消費(fèi)者:Thread-1,value:4
消費(fèi)者:Thread-1,value:5
從日志上可以很明顯看出,對(duì)于整數(shù),默認(rèn)情況下,按照升序排序,消費(fèi)者默認(rèn)從 0 開始處理。
3.5、DelayQueue
DelayQueue是一個(gè)線程安全的延遲隊(duì)列,存入隊(duì)列的元素不會(huì)立刻被消費(fèi),只有到了其指定的延遲時(shí)間,才能夠從隊(duì)列中出隊(duì)。
底層采用的是PriorityQueue來(lái)存儲(chǔ)元素,DelayQueue的特點(diǎn)在于:插入隊(duì)列中的數(shù)據(jù)可以按照自定義的delay時(shí)間進(jìn)行排序,快到期的元素會(huì)排列在前面,只有delay時(shí)間小于 0 的元素才能夠被取出。
部分核心源碼如下:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
/**采用ReentrantLock進(jìn)行加鎖*/
private final transient ReentrantLock lock = new ReentrantLock();
/**采用PriorityQueue進(jìn)行存儲(chǔ)數(shù)據(jù)*/
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**條件等待與通知*/
private final Condition available = lock.newCondition();
/**入隊(duì)操作*/
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
/**出隊(duì)操作*/
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
}
同樣的,把最上面的樣例Container中的阻塞隊(duì)列實(shí)現(xiàn)類換成DelayQueue,代碼如下:
public class Container {
/**
* 初始化阻塞隊(duì)列
*/
private final BlockingQueue<DelayedUser> queue = new DelayQueue<DelayedUser>();
/**
* 添加數(shù)據(jù)到阻塞隊(duì)列
* @param value
*/
public void add(DelayedUser value) {
try {
queue.put(value);
System.out.println("生產(chǎn)者:"+ Thread.currentThread().getName()+",add:" + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 從阻塞隊(duì)列獲取數(shù)據(jù)
*/
public void get() {
try {
DelayedUser value = queue.take();
String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
System.out.println(time + " 消費(fèi)者:"+ Thread.currentThread().getName()+",value:" + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
DelayQueue隊(duì)列中的元素需要顯式實(shí)現(xiàn)Delayed接口,定義一個(gè)DelayedUser類,代碼如下:
public class DelayedUser implements Delayed {
/**
* 當(dāng)前時(shí)間戳
*/
private long start;
/**
* 延遲時(shí)間(單位:毫秒)
*/
private long delayedTime;
/**
* 名稱
*/
private String name;
public DelayedUser(long delayedTime, String name) {
this.start = System.currentTimeMillis();
this.delayedTime = delayedTime;
this.name = name;
}
@Override
public long getDelay(TimeUnit unit) {
// 獲取當(dāng)前延遲的時(shí)間
long diffTime = (start + delayedTime) - System.currentTimeMillis();
return unit.convert(diffTime,TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
// 判斷當(dāng)前對(duì)象的延遲時(shí)間是否大于目標(biāo)對(duì)象的延遲時(shí)間
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "DelayedUser{" +
"delayedTime=" + delayedTime +
", name='" + name + '\'' +
'}';
}
}
生產(chǎn)者插入數(shù)據(jù)的內(nèi)容,做如下調(diào)整。
/**
* 生產(chǎn)者
*/
public class Producer extends Thread {
private Container container;
public Producer(Container container) {
this.container = container;
}
@Override
public void run() {
for (int i = 0; i < 6; i++) {
container.add(new DelayedUser(1000 * i, "張三" + i));
}
}
}
最后運(yùn)行結(jié)果如下:
生產(chǎn)者:Thread-0,add:DelayedUser{delayedTime=0, name='張三0'}
生產(chǎn)者:Thread-0,add:DelayedUser{delayedTime=1000, name='張三1'}
生產(chǎn)者:Thread-0,add:DelayedUser{delayedTime=2000, name='張三2'}
生產(chǎn)者:Thread-0,add:DelayedUser{delayedTime=3000, name='張三3'}
生產(chǎn)者:Thread-0,add:DelayedUser{delayedTime=4000, name='張三4'}
生產(chǎn)者:Thread-0,add:DelayedUser{delayedTime=5000, name='張三5'}
2023-11-03 14:55:33 消費(fèi)者:Thread-1,value:DelayedUser{delayedTime=0, name='張三0'}
2023-11-03 14:55:34 消費(fèi)者:Thread-1,value:DelayedUser{delayedTime=1000, name='張三1'}
2023-11-03 14:55:35 消費(fèi)者:Thread-1,value:DelayedUser{delayedTime=2000, name='張三2'}
2023-11-03 14:55:36 消費(fèi)者:Thread-1,value:DelayedUser{delayedTime=3000, name='張三3'}
2023-11-03 14:55:37 消費(fèi)者:Thread-1,value:DelayedUser{delayedTime=4000, name='張三4'}
2023-11-03 14:55:38 消費(fèi)者:Thread-1,value:DelayedUser{delayedTime=5000, name='張三5'}
可以很清晰的看到,延遲時(shí)間最低的排在最前面。
四、小結(jié)
最后我們來(lái)總結(jié)一下BlockingQueue阻塞隊(duì)列接口,它提供了很多非常豐富的生產(chǎn)者和消費(fèi)者模型的編程實(shí)現(xiàn),同時(shí)兼顧了線程安全和執(zhí)行效率的特點(diǎn)。
開發(fā)者可以通過(guò)BlockingQueue阻塞隊(duì)列接口,簡(jiǎn)單的代碼編程即可實(shí)現(xiàn)多線程中數(shù)據(jù)高效安全傳輸?shù)哪康?,確切的說(shuō),它幫助開發(fā)者減輕了不少的編程難度。
在實(shí)際的業(yè)務(wù)開發(fā)中,其中LinkedBlockingQueue使用的是最廣泛的,因?yàn)樗膱?zhí)行效率最高,在使用的時(shí)候,需要平衡好隊(duì)列長(zhǎng)度,防止過(guò)大導(dǎo)致內(nèi)存溢出。
舉個(gè)最簡(jiǎn)單的例子,比如某個(gè)功能上線之后,需要做下壓力測(cè)試,總共需要請(qǐng)求 10000 次,采用 100 個(gè)線程去執(zhí)行,測(cè)試服務(wù)是否能正常工作。如何實(shí)現(xiàn)呢?
可能有的同學(xué)想到,每個(gè)線程執(zhí)行 100 次請(qǐng)求,啟動(dòng) 100 個(gè)線程去執(zhí)行,可以是可以,就是有點(diǎn)笨拙。
其實(shí)還有另一個(gè)辦法,就是將 10000 個(gè)請(qǐng)求對(duì)象,存入到阻塞隊(duì)列中,然后采用 100 個(gè)線程去消費(fèi)執(zhí)行,這種編程模型會(huì)更佳靈活。
具體示例代碼如下:
public static void main(String[] args) throws InterruptedException {
// 將每個(gè)用戶訪問(wèn)百度服務(wù)的請(qǐng)求任務(wù),存入阻塞隊(duì)列中
// 也可以也采用多線程寫入
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
for (int i = 0; i < 10000; i++) {
queue.put("https://www.baidu.com?paramKey=" + i);
}
// 模擬100個(gè)線程,執(zhí)行10000次請(qǐng)求訪問(wèn)百度
final int threadNum = 100;
for (int i = 0; i < threadNum; i++) {
final int threadCount = i + 1;
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("thread " + threadCount + " start");
boolean over = false;
while (!over) {
String url = queue.poll();
if(Objects.nonNull(url)) {
// 發(fā)起請(qǐng)求
String result =HttpUtils.getUrl(url);
System.out.println("thread " + threadCount + " run result:" + result);
}else {
// 任務(wù)結(jié)束
over = true;
System.out.println("thread " + threadCount + " final");
}
}
}
}).start();
}
}
本文主要圍繞BlockingQueue阻塞隊(duì)列接口,從方法介紹到用法詳解,做了一次知識(shí)總結(jié),如果有描述不對(duì)的地方,歡迎留言指出!