源碼解密協(xié)程隊(duì)列和線程隊(duì)列的實(shí)現(xiàn)原理
本次來聊一聊 Python 的隊(duì)列,首先隊(duì)列是一種特殊的線性表,具有先進(jìn)先出(FIFO)的特性,這意味著元素的入隊(duì)順序和出隊(duì)順序是一致的。
隊(duì)列通常用于存儲(chǔ)需要按順序處理的數(shù)據(jù),例如任務(wù)調(diào)度。當(dāng)然隊(duì)列最常見的一個(gè)應(yīng)用場景就是解耦,一個(gè)線程不停地生產(chǎn)數(shù)據(jù),放到隊(duì)列里,另一個(gè)線程從隊(duì)列中取數(shù)據(jù)進(jìn)行消費(fèi)。
而 Python 也提供了隊(duì)列,分別是協(xié)程隊(duì)列和線程隊(duì)列。
import asyncio
import queue
# 協(xié)程隊(duì)列
coroutine_queue = asyncio.Queue()
# 線程隊(duì)列
threading_queue = queue.Queue()
如果你的程序基于 asyncio,那么應(yīng)該使用協(xié)程隊(duì)列,如果你的程序采用了多線程,那么應(yīng)該使用線程隊(duì)列。
下面我們來看一看這兩種隊(duì)列的 API,以及底層實(shí)現(xiàn)原理。
協(xié)程隊(duì)列
協(xié)程隊(duì)列的具體實(shí)現(xiàn)由 asyncio 提供,以下是它的一些用法。
import asyncio
async def main():
# 創(chuàng)建隊(duì)列時(shí)可以指定能夠存儲(chǔ)的最大元素個(gè)數(shù)
# 不指定則沒有容量限制
queue = asyncio.Queue(maxsize=20)
# 返回容量
print(queue.maxsize)
"""
20
"""
# 添加元素,如果隊(duì)列滿了會(huì)阻塞,直到有剩余空間
await queue.put(111)
# 添加元素,如果隊(duì)列滿了會(huì)拋異常
# 因?yàn)椴恍枰枞却?,所?put_nowait 不是協(xié)程函數(shù)
queue.put_nowait(222)
# 隊(duì)列是否已滿
print(queue.full())
"""
False
"""
# 返回隊(duì)列內(nèi)部的元素個(gè)數(shù)
print(queue.qsize())
"""
2
"""
# 從隊(duì)列中獲取元素,如果隊(duì)列為空,會(huì)阻塞,直到隊(duì)列中有可用元素
print(await queue.get())
"""
111
"""
# 從隊(duì)列中獲取元素,如果隊(duì)列為空,會(huì)拋異常
# 因?yàn)椴恍枰枞却?put_nowait 不是協(xié)程函數(shù)
print(queue.get_nowait())
"""
222
"""
# 隊(duì)列是否為空
print(queue.empty())
"""
True
"""
asyncio.run(main())
所以協(xié)程隊(duì)列的 API 很簡單,我們再羅列一下:
圖片
然后,協(xié)程隊(duì)列還有兩個(gè) API,需要單獨(dú)說明,分別是 task_done() 和 join()。
首先在協(xié)程隊(duì)列內(nèi)部有一個(gè) _unfinished_tasks 屬性,初始值為 0,每當(dāng)往隊(duì)列添加一個(gè)元素時(shí),該屬性的值就會(huì)自增 1。但是從隊(duì)列取出元素時(shí),該屬性不會(huì)自動(dòng)減 1,需要手動(dòng)調(diào)用 task_done() 方法。
所以 _unfinished_tasks 記錄了隊(duì)列中有多少個(gè)任務(wù)數(shù)據(jù)需要處理,每來一個(gè)自動(dòng)加 1,但取走一個(gè)不會(huì)自動(dòng)減 1,而是需要 task_done 來實(shí)現(xiàn)。
然后 join() 的作用是,當(dāng) _unfinished_tasks 不為 0 的時(shí)候,await queue.join() 會(huì)阻塞,直到為 0。
import asyncio
async def consumer(queue, n):
print(f"consumer{n} 開始消費(fèi)")
await asyncio.sleep(3)
await queue.get()
# 獲取數(shù)據(jù)后,調(diào)用 task_done
queue.task_done()
print(f"consumer{n} 消費(fèi)完畢")
async def main():
queue = asyncio.Queue()
await queue.put(123)
await queue.put(456)
await queue.put(789)
# 隊(duì)列里面有三個(gè)數(shù)據(jù),開啟三個(gè)消費(fèi)者去消費(fèi)
await asyncio.gather(
consumer(queue, 1),
consumer(queue, 2),
consumer(queue, 3),
)
# 這里會(huì)陷入阻塞,直到 _unfinished_tasks 變?yōu)?0
await queue.join()
print("main 解除阻塞")
asyncio.run(main())
"""
consumer1 開始消費(fèi)
consumer2 開始消費(fèi)
consumer3 開始消費(fèi)
consumer1 消費(fèi)完畢
consumer2 消費(fèi)完畢
consumer3 消費(fèi)完畢
main 解除阻塞
"""
還是比較簡單的,然后我們來看一下協(xié)程隊(duì)列的具體實(shí)現(xiàn)細(xì)節(jié)。
圖片
首先協(xié)程隊(duì)列內(nèi)部有一個(gè) _queue 屬性,它是一個(gè)雙端隊(duì)列,負(fù)責(zé)保存具體的元素。因?yàn)橐WC兩端的操作都是高效的,所以采用雙端隊(duì)列實(shí)現(xiàn)。
然后是 _getters 和 _putters 兩個(gè)屬性,它們是做什么的呢?在隊(duì)列滿了的時(shí)候,協(xié)程往隊(duì)列添加元素時(shí)會(huì)陷入阻塞,等到隊(duì)列有剩余空間時(shí)會(huì)解除阻塞。同理,在隊(duì)列為空時(shí),協(xié)程從隊(duì)列獲取元素時(shí)會(huì)陷入阻塞,等到隊(duì)列有可用元素時(shí)會(huì)解除阻塞。
那么這個(gè)阻塞等待,以及自動(dòng)喚醒并解除阻塞是怎么實(shí)現(xiàn)的呢?在介紹鎖和信號(hào)量的時(shí)候,我們分析過整個(gè)實(shí)現(xiàn)過程,協(xié)程隊(duì)列與之類似。
假設(shè)協(xié)程從隊(duì)列獲取元素,但是隊(duì)列為空,于是會(huì)創(chuàng)建一個(gè) Future 對象,并保存起來,當(dāng)前保存的地方就是 _getters,它也是雙端隊(duì)列。然后 await future,此時(shí)就會(huì)陷入阻塞,當(dāng)其它協(xié)程往隊(duì)列中添加元素時(shí),會(huì)將 _getters 里面的 future 彈出,設(shè)置結(jié)果集。因此 await future 的協(xié)程就會(huì)解除阻塞,因?yàn)殛?duì)列有可用元素了。
同理,協(xié)程往隊(duì)列添加元素也是如此,如果隊(duì)列滿了,同樣創(chuàng)建一個(gè) Future 對象,并保存起來,當(dāng)前保存的地方就是 _putters。然后 await future,陷入阻塞,當(dāng)其它協(xié)程從隊(duì)列中取出元素,會(huì)將 _putters 里面的 future 彈出,設(shè)置結(jié)果集。因此 await future 的協(xié)程就會(huì)解除阻塞,因?yàn)殛?duì)列有可用空間了。
圖片
三個(gè)內(nèi)部調(diào)用的方法,_get 方法負(fù)責(zé)從隊(duì)列的頭部彈出元素,_put 方法負(fù)責(zé)從隊(duì)列的尾部追加元素,比較簡單。然后是 _wakeup_next 方法,它負(fù)責(zé)喚醒阻塞的協(xié)程。參數(shù) waiters 要么是 _getters,要么是 _putters,從里面彈出一個(gè) future,設(shè)置結(jié)果集,讓對應(yīng)的協(xié)程解除阻塞。
圖片
- qsize() 負(fù)責(zé)返回隊(duì)列的元素個(gè)數(shù);
- maxsize 負(fù)責(zé)返回隊(duì)列的容量;
- empty() 負(fù)責(zé)判斷隊(duì)列是否為空;
- full() 負(fù)責(zé)判斷隊(duì)列是否已滿,如果容量小于等于 0,那么表示容量無限,隊(duì)列永遠(yuǎn)不會(huì)滿。否則判斷元素個(gè)數(shù)是否大于等于容量;
圖片
然后看看 put_nowait 和 get_nowait,首先是 put_nowait,往隊(duì)列添加元素。
如果添加時(shí)發(fā)現(xiàn)隊(duì)列已滿,那么拋出異常。如果未滿,則調(diào)用 _put 方法往 _queue 里面添加元素,因?yàn)樵氐膶?shí)際存儲(chǔ)是由 self._queue 這個(gè)雙端隊(duì)列負(fù)責(zé)的。
添加完畢后,將 _unfinished_task 加 1。最后從 _getters 里面彈出 future,設(shè)置結(jié)果集,讓因獲取不到元素而陷入阻塞的協(xié)程解除阻塞(同時(shí)會(huì)將添加的元素取走)。
get_nowait 的邏輯也很簡單,如果隊(duì)列為空,直接拋異常。如果不為空,則調(diào)用 _get 方法從隊(duì)列中彈出元素。最后從 _putters 里面彈出 future,設(shè)置結(jié)果集,讓因隊(duì)列已滿、無法添加元素而陷入阻塞的協(xié)程解除阻塞(同時(shí)會(huì)將元素添加進(jìn)隊(duì)列)。
再來看看 put 方法的實(shí)現(xiàn)細(xì)節(jié):
圖片
結(jié)果和我們之前分析的一樣,只是源碼內(nèi)部多做了一些異常檢測。再來看看 get 方法,它的實(shí)現(xiàn)細(xì)節(jié)和 put 是類似的。
圖片
比較簡單,還是沒什么難度的,最后再來看看 task_done 和 join 兩個(gè)方法。
圖片
協(xié)程隊(duì)列里面使用了 asyncio.Event,它表示事件,如果事件對象沒有調(diào)用 set 方法設(shè)置標(biāo)志位,那么調(diào)用 wait 方法時(shí)會(huì)陷入阻塞。當(dāng)事件對象調(diào)用 set 方法時(shí),wait 會(huì)解除阻塞。
所以協(xié)程隊(duì)列的 join 方法的邏輯就是,當(dāng) _unfinished_tasks 大于 0 時(shí),調(diào)用事件對象的 wait 方法陷入阻塞。
而 task_done 方法的作用就是將 _unfinished_tasks 減 1,當(dāng)它的值屬性為 0 時(shí),調(diào)用事件對象的 set 方法,讓 join 解除阻塞。
以上就是整個(gè)協(xié)程隊(duì)列的實(shí)現(xiàn)細(xì)節(jié),具體的元素存儲(chǔ)是由 collections.deque 來承載的。并在隊(duì)列已滿或者為空時(shí),通過 Future 對象來實(shí)現(xiàn)阻塞等待和自動(dòng)喚醒。
另外除了先進(jìn)先出隊(duì)列之外,還有先進(jìn)后出隊(duì)列,一般稱為 LIFO 隊(duì)列,它的效果類似于棧。
圖片
這個(gè)沒什么好說的,因?yàn)槭窍冗M(jìn)后出,所以添加和彈出都在同一端,直接使用列表實(shí)現(xiàn)即可。并且由于 LifoQueue 繼承 Queue,所以它的 API 和普通的協(xié)程隊(duì)列是一樣的。
除了先進(jìn)先出隊(duì)列,還有一個(gè)優(yōu)先隊(duì)列。
圖片
它的 API 和普通的協(xié)程隊(duì)列也是一致的,只不過優(yōu)先隊(duì)列在添加元素時(shí),需要指定一個(gè)優(yōu)先級(jí):(優(yōu)先級(jí), 元素),優(yōu)先級(jí)的值越低,表示優(yōu)先級(jí)越高。然后在內(nèi)部,會(huì)按照優(yōu)先級(jí)的高低,維護(hù)一個(gè)小根堆,堆頂元素便是優(yōu)先級(jí)最高的元素。
這幾個(gè)隊(duì)列具體使用哪一種,則取決于具體的業(yè)務(wù)場景。
線程隊(duì)列
說完了協(xié)程隊(duì)列,再來看看線程隊(duì)列,它們的 API 是類似的,但實(shí)現(xiàn)細(xì)節(jié)則不同。因?yàn)椴僮飨到y(tǒng)感知不到協(xié)程,所以協(xié)程隊(duì)列的阻塞等待是基于 Future 實(shí)現(xiàn)的,而線程隊(duì)列的阻塞等待是基于條件變量(和互斥鎖)實(shí)現(xiàn)的。
還是先來看看線程隊(duì)列的一些 API,和協(xié)程隊(duì)列是類似的。
from queue import Queue
# 可以指定一個(gè) maxsize 參數(shù),表示隊(duì)列的容量
# 默認(rèn)為 0,表示隊(duì)列的容量無限
queue = Queue(maxsize=20)
# 查看容量
print(queue.maxsize)
"""
20
"""
# 查看隊(duì)列的元素個(gè)數(shù)
print(queue.qsize())
"""
0
"""
# 判斷隊(duì)列是否已滿
print(queue.full())
"""
False
"""
# 判斷隊(duì)列是否為空
print(queue.empty())
"""
True
"""
# 往隊(duì)列中添加元素
# block 參數(shù)表示是否阻塞,默認(rèn)為 True,當(dāng)隊(duì)列已滿時(shí),線程會(huì)阻塞
# timeout 表示超時(shí)時(shí)間,默認(rèn)為 None,表示會(huì)無限等待
# 當(dāng)然也可以給 timeout 傳一個(gè)具體的值
# 如果在規(guī)定時(shí)間內(nèi),沒有將元素放入隊(duì)列,那么拋異常
queue.put(123, block=True, timeout=None)
# 也是往隊(duì)列中添加元素,但是當(dāng)隊(duì)列已滿時(shí),會(huì)直接拋異常
# put_nowait(item) 本質(zhì)上就是 put(item, block=False)
queue.put_nowait(456)
# 從隊(duì)列中取出元素
# 同樣可以傳遞 block 和 timeout 參數(shù)
# block 默認(rèn)為 True,當(dāng)隊(duì)列為空時(shí)會(huì)陷入阻塞
# timeout 默認(rèn)為 None,表示會(huì)無限等待
print(queue.get(block=True, timeout=None))
"""
123
"""
# 也是從隊(duì)列中取出元素,但是當(dāng)隊(duì)列為空時(shí),會(huì)直接拋異常
# get_nowait() 本質(zhì)上就是 get(block=False)
print(queue.get_nowait())
"""
456
"""
# task_done(),將 unfinished_tasks 屬性的值減 1
print(queue.unfinished_tasks)
"""
2
"""
queue.task_done()
queue.task_done()
print(queue.unfinished_tasks)
"""
0
"""
# join(),當(dāng) unfinished_tasks 不為 0 時(shí),陷入阻塞
queue.join()
API 和協(xié)程隊(duì)列是相似的,我們羅列一下:
圖片
線程隊(duì)列的具體使用我們已經(jīng)知道了,下面來看看它的具體實(shí)現(xiàn)。
圖片
線程隊(duì)列的內(nèi)部依舊使用雙端隊(duì)列進(jìn)行元素存儲(chǔ),并且還使用了一個(gè)互斥鎖和三個(gè)條件變量。
為了保證數(shù)據(jù)的一致性和線程安全,當(dāng)隊(duì)列在多線程環(huán)境中被修改(比如添加或刪除元素)時(shí),需要使用互斥鎖。任何需要修改隊(duì)列的操作都必須在獲取到互斥鎖之后進(jìn)行,以防止多個(gè)線程同時(shí)對隊(duì)列進(jìn)行修改,否則會(huì)導(dǎo)致數(shù)據(jù)不一致或其它錯(cuò)誤。同時(shí),一旦對隊(duì)列的修改完成,必須立即釋放互斥鎖,以便其它線程可以訪問隊(duì)列。
然后是 not_empty 條件變量,當(dāng)一個(gè)新元素被添加到隊(duì)列時(shí),應(yīng)該向 not_empty發(fā)送一個(gè)信號(hào)。這個(gè)動(dòng)作會(huì)通知那些想從隊(duì)列中獲取元素,但因隊(duì)列為空而陷入阻塞的線程,現(xiàn)在隊(duì)列中已經(jīng)有了新的元素,它們可以繼續(xù)執(zhí)行獲取元素的操作。
接下來是 not_full 條件變量,當(dāng)從隊(duì)列中取走一個(gè)元素時(shí),應(yīng)該向 not_full 發(fā)送一個(gè)信號(hào)。這個(gè)動(dòng)作通知那些想往隊(duì)列添加元素,但因隊(duì)列已滿而陷入阻塞的線程,現(xiàn)在隊(duì)列中已經(jīng)有了可用空間,它們可以繼續(xù)執(zhí)行添加元素的操作。
最后是 all_tasks_done 條件變量,當(dāng)處理的任務(wù)全部完成,即計(jì)數(shù)器 unfinished_task 為 0 時(shí),應(yīng)該向 all_tasks_done 發(fā)送一個(gè)信號(hào)。這個(gè)動(dòng)作會(huì)通知那些執(zhí)行了 join() 方法而陷入阻塞的線程,它們可以繼續(xù)往下執(zhí)行了。
圖片
因?yàn)榫€程隊(duì)列采用了雙端隊(duì)列存儲(chǔ)元素,所以雙端隊(duì)列的長度就是線程隊(duì)列的元素個(gè)數(shù)。如果元素個(gè)數(shù)為 0,那么隊(duì)列就是空;如果容量大于 0,并且小于等于元素個(gè)數(shù),那么隊(duì)列就滿了。
圖片
前面說了,put_nowait 和 get_nowait 本質(zhì)上就是調(diào)用了 put 和 get,所以我們的重點(diǎn)是 put 和 get 兩個(gè)方法。
圖片
以上就是 put 方法的底層實(shí)現(xiàn),不難理解。說完了 put,再來看看 get。
圖片
最后是 task_done 和 join 方法,看看它們的內(nèi)部邏輯。
圖片
調(diào)用 join 方法,當(dāng) unfinished_task 大于 0 時(shí),會(huì)陷入阻塞。調(diào)用 task_done 方法,會(huì)將未完成任務(wù)數(shù)減 1,如果為 0,那么喚醒阻塞等待的線程。
需要注意的是,喚醒調(diào)用的方法不是 notify,而是 notify_all。對于添加元素和獲取元素,每次顯然只能喚醒一個(gè)線程,此時(shí)調(diào)用 notify。而 unfinished_task 為 0 時(shí),應(yīng)該要喚醒所有等待的線程,因此要調(diào)用 notify_all。
最后線程隊(duì)列也有相應(yīng)的 PriorityQueue 和 LifoQueue,它們的用法、實(shí)現(xiàn)和協(xié)程里面的這兩個(gè)隊(duì)列是一樣的。
小結(jié)
以上便是協(xié)程隊(duì)列和線程隊(duì)列的具體用法和實(shí)現(xiàn)原理,它們本質(zhì)上都是基于雙端隊(duì)列實(shí)現(xiàn)具體的元素存儲(chǔ),并且在隊(duì)列已滿和隊(duì)列為空時(shí),可以阻塞等待。
只不過協(xié)程隊(duì)列是通過 Future 對象實(shí)現(xiàn)的,而線程隊(duì)列是通過條件變量實(shí)現(xiàn)的。
當(dāng)然,除了協(xié)程隊(duì)列和線程隊(duì)列,還有進(jìn)程隊(duì)列,但進(jìn)程隊(duì)列要復(fù)雜的多。因此關(guān)于進(jìn)程隊(duì)列的實(shí)現(xiàn)細(xì)節(jié),我們以后專門花篇幅去介紹。