深入理解Java線程池,剖析LinkedBlockingQueue源碼實現(xiàn)
引言
上篇文章我們講解了ArrayBlockingQueue源碼,這篇文章開始講解LinkedBlockingQueue源碼。從名字上就能看到ArrayBlockingQueue是基于數(shù)組實現(xiàn)的,而LinkedBlockingQueue是基于鏈表實現(xiàn)。
那么,LinkedBlockingQueue底層源碼實現(xiàn)是什么樣的?跟ArrayBlockingQueue有何不同?
LinkedBlockingQueue的應(yīng)用場景跟ArrayBlockingQueue有什么不一樣?
看完這篇文章,可以輕松解答這些問題。
由于LinkedBlockingQueue實現(xiàn)了BlockingQueue接口,而BlockingQueue接口中定義了幾組放數(shù)據(jù)和取數(shù)據(jù)的方法,來滿足不同的場景。
操作 | 拋出異常 | 返回特定值 | 一直阻塞 | 阻塞指定時間 |
放數(shù)據(jù) | add() | offer() | put() | offer(e, time, unit) |
取數(shù)據(jù)(同時刪除數(shù)據(jù)) | remove() | poll() | take() | poll(time, unit) |
取數(shù)據(jù)(不刪除) | element() | peek() | 不支持 | 不支持 |
這四組方法的區(qū)別是:
- 當(dāng)隊列滿的時候,再次添加數(shù)據(jù),add()會拋出異常,offer()會返回false,put()會一直阻塞,offer(e, time, unit)會阻塞指定時間,然后返回false。
- 當(dāng)隊列為空的時候,再次取數(shù)據(jù),remove()會拋出異常,poll()會返回null,take()會一直阻塞,poll(time, unit)會阻塞指定時間,然后返回null。
LinkedBlockingQueue也會有針對這幾組放數(shù)據(jù)和取數(shù)據(jù)方法的具體實現(xiàn)。 Java線程池中的固定大小線程池就是基于LinkedBlockingQueue實現(xiàn)的:
# 創(chuàng)建固定大小的線程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
對應(yīng)的源碼實現(xiàn):
# 底層使用LinkedBlockingQueue隊列存儲任務(wù)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
類結(jié)構(gòu)
先看一下LinkedBlockingQueue類里面有哪些屬性:
public class LinkedBlockingQueue<E>
extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 容量大小
*/
private final int capacity;
/**
* 元素個數(shù)
*/
private final AtomicInteger count = new AtomicInteger();
/**
* 頭節(jié)點
*/
transient Node<E> head;
/**
* 尾節(jié)點
*/
private transient Node<E> last;
/**
* 取數(shù)據(jù)的鎖
*/
private final ReentrantLock takeLock = new ReentrantLock();
/**
* 取數(shù)據(jù)的條件(隊列非空)
*/
private final Condition notEmpty = takeLock.newCondition();
/**
* 放數(shù)據(jù)的鎖
*/
private final ReentrantLock putLock = new ReentrantLock();
/**
* 放數(shù)據(jù)的條件(隊列非滿)
*/
private final Condition notFull = putLock.newCondition();
/**
* 鏈表節(jié)點類
*/
static class Node<E> {
/**
* 節(jié)點元素
*/
E item;
/**
* 后繼節(jié)點
*/
Node<E> next;
Node(E x) {
item = x;
}
}
}
圖片
可以看出LinkedBlockingQueue底層是基于鏈表實現(xiàn)的,定義了頭節(jié)點head和尾節(jié)點last,由鏈表節(jié)點類Node可以看出是個單鏈表。 發(fā)現(xiàn)個問題,ArrayBlockingQueue中只使用了一把鎖,入隊出隊操作共用這把鎖。而LinkedBlockingQueue則使用了兩把鎖,分別是出隊鎖takeLock和入隊鎖putLock,為什么要這么設(shè)計呢?
LinkedBlockingQueue把兩把鎖分開,性能更好,為什么ArrayBlockingQueue不這樣設(shè)計呢?
原因是ArrayBlockingQueue是基于數(shù)組實現(xiàn)的,所有數(shù)據(jù)都存儲在同一個數(shù)組對象里面,對同一個對象沒辦法使用兩把鎖,會有數(shù)據(jù)可見性的問題。而LinkedBlockingQueue底層是基于鏈表實現(xiàn)的,從頭節(jié)點刪除,尾節(jié)點插入,頭尾節(jié)點分別是兩個對象,可以分別使用兩把鎖,提升操作性能。
另外也定義了兩個條件notEmpty和notFull,當(dāng)條件滿足的時候才允許放數(shù)據(jù)或者取數(shù)據(jù),下面會詳細(xì)講。
初始化
LinkedBlockingQueue常用的初始化方法有兩個:
- 無參構(gòu)造方法
- 指定容量大小的有參構(gòu)造方法
/**
* 無參構(gòu)造方法
*/
BlockingQueue<Integer> blockingQueue1 = new LinkedBlockingQueue<>();
/**
* 指定容量大小的構(gòu)造方法
*/
BlockingQueue<Integer> blockingQueue2 = new LinkedBlockingQueue<>(10);
再看一下對應(yīng)的源碼實現(xiàn):
/**
* 無參構(gòu)造方法
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* 指定容量大小的構(gòu)造方法
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException();
}
// 設(shè)置容量大小,初始化頭尾結(jié)點
this.capacity = capacity;
last = head = new Node<E>(null);
}
可以看出LinkedBlockingQueue的無參構(gòu)造方法使用的鏈表容量是Integer的最大值,存儲大量數(shù)據(jù)的時候,會有內(nèi)存溢出的風(fēng)險,建議使用有參構(gòu)造方法,指定容量大小。
有參構(gòu)造方法還會初始化頭尾節(jié)點,節(jié)點值為null。
LinkedBlockingQueue初始化的時候,不支持指定是否使用公平鎖,只能使用非公平鎖,而ArrayBlockingQueue是支持指定的。
放數(shù)據(jù)源碼
放數(shù)據(jù)的方法有四個:
操作 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞一段時間 |
放數(shù)據(jù) | add() | offer() | put() | offer(e, time, unit) |
offer方法源碼
先看一下offer()方法源碼,其他放數(shù)據(jù)方法邏輯也是大同小異,都是在鏈表尾部插入。 offer()方法在隊列滿的時候,會直接返回false,表示插入失敗。
/**
* offer方法入口
*
* @param e 元素
* @return 是否插入成功
*/
public boolean offer(E e) {
// 1. 判空,傳參不允許為null
if (e == null) {
throw new NullPointerException();
}
// 2. 如果隊列已滿,則直接返回false,表示插入失敗
final AtomicInteger count = this.count;
if (count.get() == capacity) {
return false;
}
int c = -1;
Node<E> node = new Node<E>(e);
// 3. 獲取put鎖,并加鎖
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 4. 加鎖后,再次判斷隊列是否已滿,如果未滿,則入隊
if (count.get() < capacity) {
enqueue(node);
// 5. 隊列個數(shù)加一
c = count.getAndIncrement();
// 6. 如果隊列未滿,則喚醒因為隊列已滿而等待放數(shù)據(jù)的線程(用來補(bǔ)償,不加也行)
if (c + 1 < capacity) {
notFull.signal();
}
}
} finally {
// 7. 釋放鎖
putLock.unlock();
}
// 8. c等于0,表示插入前,隊列為空,是第一次插入,需要喚醒因為隊列為空而等待取數(shù)據(jù)的線程
if (c == 0) {
signalNotEmpty();
}
// 9. 返回是否插入成功
return c >= 0;
}
/**
* 入隊
*
* @param node 節(jié)點
*/
private void enqueue(LinkedBlockingQueue.Node<E> node) {
// 直接追加到鏈表末尾
last = last.next = node;
}
/**
* 喚醒因為隊列為空而等待取數(shù)據(jù)的線程
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
offer()方法邏輯也很簡單,追加元素到鏈表末尾,如果是第一次添加元素,就喚醒因為隊列為空而等待取數(shù)據(jù)的線程。
再看一下另外三個添加元素方法源碼:
add方法源碼
add()方法在數(shù)組滿的時候,會拋出異常,底層基于offer()實現(xiàn)。
/**
* add方法入口
*
* @param e 元素
* @return 是否添加成功
*/
public boolean add(E e) {
if (offer(e)) {
return true;
} else {
throw new IllegalStateException("Queue full");
}
}
put方法源碼
put()方法在數(shù)組滿的時候,會一直阻塞,直到有其他線程取走數(shù)據(jù),空出位置,才能添加成功。
/**
* put方法入口
*
* @param e 元素
*/
public void put(E e) throws InterruptedException {
// 1. 判空,傳參不允許為null
if (e == null) {
throw new NullPointerException();
}
int c = -1;
Node<E> node = new Node<E>(e);
// 2. 加可中斷的鎖,防止一直阻塞
final ReentrantLock putLock = this.putLock;
putLock.lockInterruptibly();
final AtomicInteger count = this.count;
try {
// 3. 如果隊列已滿,就一直阻塞,直到被喚醒
while (count.get() == capacity) {
notFull.await();
}
// 4. 如果隊列未滿,則直接入隊
enqueue(node);
c = count.getAndIncrement();
// 5. 如果隊列未滿,則喚醒因為隊列已滿而等待放數(shù)據(jù)的線程(用來補(bǔ)償,不加也行)
if (c + 1 < capacity) {
notFull.signal();
}
} finally {
// 6. 釋放鎖
putLock.unlock();
}
// 7. c等于0,表示插入前,隊列為空,是第一次插入,需要喚醒因為隊列為空而等待取數(shù)據(jù)的線程
if (c == 0) {
signalNotEmpty();
}
}
offer(e, time, unit)源碼
再看一下offer(e, time, unit)方法源碼,在數(shù)組滿的時候, offer(e, time, unit)方法會阻塞一段時間。
/**
* offer方法入口
*
* @param e 元素
* @param timeout 超時時間
* @param unit 時間單位
* @return 是否添加成功
*/
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 1. 判空,傳參不允許為null
if (e == null) {
throw new NullPointerException();
}
// 2. 把超時時間轉(zhuǎn)換為納秒
long nanos = unit.toNanos(timeout);
int c = -1;
final AtomicInteger count = this.count;
// 2. 加可中斷的鎖,防止一直阻塞
final ReentrantLock putLock = this.putLock;
putLock.lockInterruptibly();
try {
// 4. 循環(huán)判斷隊列是否已滿
while (count.get() == capacity) {
if (nanos <= 0) {
// 6. 如果隊列已滿,且超時時間已過,則返回false
return false;
}
// 5. 如果隊列已滿,則等待指定時間
nanos = notFull.awaitNanos(nanos);
}
// 7. 如果隊列未滿,則入隊
enqueue(new Node<E>(e));
// 8. 如果隊列未滿,則喚醒因為隊列已滿而等待放數(shù)據(jù)的線程(用來補(bǔ)償,不加也行)
c = count.getAndIncrement();
if (c + 1 < capacity) {
notFull.signal();
}
} finally {
// 9. 釋放鎖
putLock.unlock();
}
// 10. c等于0,表示插入前,隊列為空,是第一次插入,需要喚醒因為隊列為空而等待取數(shù)據(jù)的線程
if (c == 0) {
signalNotEmpty();
}
return true;
}
彈出數(shù)據(jù)源碼
彈出數(shù)據(jù)(取出數(shù)據(jù)并刪除)的方法有四個:
操作 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞一段時間 |
取數(shù)據(jù)(同時刪除數(shù)據(jù)) | remove() | poll() | take() | poll(time, unit) |
poll方法源碼
看一下poll()方法源碼,其他方取數(shù)據(jù)法邏輯大同小異,都是從鏈表頭部彈出元素。 poll()方法在彈出元素的時候,如果隊列為空,直接返回null,表示彈出失敗。
/**
* poll方法入口
*/
public E poll() {
// 如果隊列為空,則返回null
final AtomicInteger count = this.count;
if (count.get() == 0) {
return null;
}
E x = null;
int c = -1;
// 2. 加鎖
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 3. 如果隊列不為空,則取出隊頭元素
if (count.get() > 0) {
x = dequeue();
// 4. 元素個數(shù)減一
c = count.getAndDecrement();
// 5. 如果隊列不為空,則喚醒因為隊列為空而等待取數(shù)據(jù)的線程
if (c > 1) {
notEmpty.signal();
}
}
} finally {
// 6. 釋放鎖
takeLock.unlock();
}
// 7. 如果取數(shù)據(jù)之前,隊列已滿,取數(shù)據(jù)之后隊列肯定不滿了,則喚醒因為隊列已滿而等待放數(shù)據(jù)的線程
if (c == capacity) {
signalNotFull();
}
return x;
}
/**
* 取出隊頭元素
*/
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h;
head = first;
E x = first.item;
first.item = null;
return x;
}
/**
* 喚醒因為隊列已滿而等待放數(shù)據(jù)的線程
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
remove方法源碼
再看一下remove()方法源碼,如果隊列為空,remove()會拋出異常。
/**
* remove方法入口
*/
public E remove() {
// 1. 直接調(diào)用poll方法
E x = poll();
// 2. 如果取到數(shù)據(jù),直接返回,否則拋出異常
if (x != null) {
return x;
} else {
throw new NoSuchElementException();
}
}
take方法源碼
再看一下take()方法源碼,如果隊列為空,take()方法就一直阻塞,直到被喚醒。
/**
* take方法入口
*/
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
// 1. 加可中斷的鎖,防止一直阻塞
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 2. 如果隊列為空,就一直阻塞,直到被喚醒
while (count.get() == 0) {
notEmpty.await();
}
// 3. 如果隊列不為空,則取出隊頭元素
x = dequeue();
// 4. 隊列元素個數(shù)減一
c = count.getAndDecrement();
// 5. 如果隊列不為空,則喚醒因為隊列為空而等待取數(shù)據(jù)的線程
if (c > 1) {
notEmpty.signal();
}
} finally {
// 6. 釋放鎖
takeLock.unlock();
}
// 7. 如果取數(shù)據(jù)之前,隊列已滿,取數(shù)據(jù)之后隊列肯定不滿了,則喚醒因為隊列已滿而等待放數(shù)據(jù)的線程
if (c == capacity) {
signalNotFull();
}
return x;
}
poll(time, unit)源碼
再看一下poll(time, unit)方法源碼,在隊列滿的時候, poll(time, unit)方法會阻塞指定時間,然后然后null。
/**
* poll方法入口
*
* @param timeout 超時時間
* @param unit 時間單位
* @return 元素
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
// 1. 把超時時間轉(zhuǎn)換成納秒
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
// 2. 加可中斷的鎖,防止一直阻塞
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 3. 循環(huán)判斷隊列是否為空
while (count.get() == 0) {
if (nanos <= 0) {
// 5. 如果隊列為空,且超時時間已過,則返回null
return null;
}
// 4. 阻塞到到指定時間
nanos = notEmpty.awaitNanos(nanos);
}
// 6. 如果隊列不為空,則取出隊頭元素
x = dequeue();
// 7. 隊列元素個數(shù)減一
c = count.getAndDecrement();
// 8. 如果隊列不為空,則喚醒因為隊列為空而等待取數(shù)據(jù)的線程
if (c > 1) {
notEmpty.signal();
}
} finally {
// 9. 釋放鎖
takeLock.unlock();
}
// 7. 如果取數(shù)據(jù)之前,隊列已滿,取數(shù)據(jù)之后隊列肯定不滿了,則喚醒因為隊列已滿而等待放數(shù)據(jù)的線程
if (c == capacity) {
signalNotFull();
}
return x;
}
查看數(shù)據(jù)源碼
再看一下查看數(shù)據(jù)源碼,查看數(shù)據(jù),并不刪除數(shù)據(jù)。
操作 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞一段時間 |
取數(shù)據(jù)(不刪除) | element() | peek() | 不支持 | 不支持 |
peek方法源碼
先看一下peek()方法源碼,如果數(shù)組為空,直接返回null。
/**
* peek方法入口
*/
public E peek() {
// 1. 如果隊列為空,則返回null
if (count.get() == 0) {
return null;
}
// 2. 加鎖
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 3. 取出隊頭元素
Node<E> first = head.next;
if (first == null) {
return null;
} else {
return first.item;
}
} finally {
// 4. 釋放鎖
takeLock.unlock();
}
}
element方法源碼
再看一下element()方法源碼,如果隊列為空,則拋出異常。
/**
* element方法入口
*/
public E element() {
// 1. 調(diào)用peek方法查詢數(shù)據(jù)
E x = peek();
// 2. 如果查到數(shù)據(jù),直接返回
if (x != null) {
return x;
} else {
// 3. 如果沒找到,則拋出異常
throw new NoSuchElementException();
}
}
總結(jié)
這篇文章講解了LinkedBlockingQueue阻塞隊列的核心源碼,了解到LinkedBlockingQueue隊列具有以下特點:
- LinkedBlockingQueue實現(xiàn)了BlockingQueue接口,提供了四組放數(shù)據(jù)和讀數(shù)據(jù)的方法,來滿足不同的場景。
- LinkedBlockingQueue底層基于鏈表實現(xiàn),支持從頭部彈出數(shù)據(jù),從尾部添加數(shù)據(jù)。
- LinkedBlockingQueue初始化的時候,如果不指定隊列長度,默認(rèn)長度是Integer最大值,有內(nèi)存溢出風(fēng)險,建議初始化的時候指定隊列長度。
- LinkedBlockingQueue的方法是線程安全的,分別使用了讀寫兩把鎖,比ArrayBlockingQueue性能更好。
那么ArrayBlockingQueue與LinkedBlockingQueue區(qū)別是什么?相同點:
- 都是繼承自AbstractQueue抽象類,并實現(xiàn)了BlockingQueue接口,所以兩者擁有相同的讀寫方法,出現(xiàn)的地方可以相互替換。
不同點:
- 底層結(jié)構(gòu)不同,ArrayBlockingQueue底層基于數(shù)組實現(xiàn),初始化的時候必須指定數(shù)組長度,無法擴(kuò)容。LinkedBlockingQueue底層基于鏈表實現(xiàn),鏈表最大長度是Integer最大值。
- 占用內(nèi)存大小不同,ArrayBlockingQueue一旦初始化,數(shù)組長度就確定了,不會隨著元素增加而改變。LinkedBlockingQueue會隨著元素越多,鏈表越長,占用內(nèi)存越大。
- 性能不同,ArrayBlockingQueue的入隊和出隊共用一把鎖,并發(fā)較低。LinkedBlockingQueue入隊和出隊使用兩把獨立的鎖,并發(fā)情況下性能更高。
- 公平鎖選項,ArrayBlockingQueue初始化的時候,可以指定使用公平鎖或者非公平鎖,公平鎖模式下,可以按照線程等待的順序來操作隊列。LinkedBlockingQueue只支持非公平鎖。
- 適用場景不同,ArrayBlockingQueue適用于明確限制隊列大小的場景,防止生產(chǎn)速度大于消費速度的時候,造成內(nèi)存溢出、資源耗盡。LinkedBlockingQueue適用于業(yè)務(wù)高峰期可以自動擴(kuò)展消費速度的場景。
今天一起分析了LinkedBlockingQueue隊列的源碼,可以看到LinkedBlockingQueue的源碼非常簡單,沒有什么神秘復(fù)雜的東西,下篇文章再一起接著分析其他的阻塞隊列源碼。