協(xié)程到底有什么用?六種I/O模式告訴你!
大家好,我是小風哥,今天來聊一聊協(xié)程的作用。
假設磁盤上有10個文件,你需要讀取的內存,那么你該怎么用代碼實現呢?
在接著往下看之前,先自己想一想這個問題,看看自己能想出幾種方法,各自有什么樣的優(yōu)缺點。想清楚了嗎(還在看嗎),想清楚了我們繼續(xù)往下看。
最簡單的方法——串行
這可能是大多數同學都能想到的最簡單方法,那就是一個一個的讀取,讀完一個接著讀下一個。用代碼表示是這樣的:
for file in files:
result = file.read()
process(result)
是不是非常簡單,我們假設每個文件讀取需要1分鐘,那么10個文件總共需要10分鐘才能讀取完成。這種方法有什么問題呢?實際上這種方法只有一個問題,那就是慢。除此之外,其它都是優(yōu)點:
- 代碼簡單,容易理解
- 可維護性好,這代碼交給誰都能維護的了(論程序員的核心競爭力在哪里)
那么慢的問題該怎么解決呢?有的同學可能已經想到了,為啥要一個一個讀取呢?并行讀取不就可以加快速度了嗎。
稍好的方法,并行
那么,該怎么并行讀取文件呢?顯然,地球人都知道,線程就是用來并行的。我們可以同時開啟10個線程,每個線程中讀取一個文件。用代碼實現就是這樣的:
def read_and_process(file):
result = file.read()
process(result)
def main():
files = [fileA,fileB,fileC......]
for file in files:
create_thread(read_and_process,
file).run()
# 等待這些線程執(zhí)行完成
怎么樣,是不是也非常簡單。那么這種方法有什么問題嗎?在開啟10個線程這種問題規(guī)模下沒有問題?,F在我們把問題難度加大,假設有10000個文件,需要處理該怎么辦呢?有的同學可能想10個文件和10000個文件有什么區(qū)別嗎,直接創(chuàng)建10000個線程去讀不可以嗎?實際上這里的問題其實是說創(chuàng)建多個線程有沒有什么問題。我們知道,雖然線程號稱“輕量級進程”,雖然是輕量級但當數量足夠可觀時依然會有性能問題。這里的問題主要有這樣幾個方面:
- 創(chuàng)建線程需要消耗系統(tǒng)資源,像內存等(想一想為什么?)
- 調度開銷,尤其是當線程數量較多且都比較繁忙時(同樣想一想為什么?)
- 創(chuàng)建多個線程不一定能加快I/O(如果此時設備處理能力已經飽和)
既然線程有這樣那樣的問題,那么還有沒有更好的方法?
答案是肯定的,并行編程不一定只能依賴線程這種技術。
這里的答案就是基于事件驅動編程技術。
事件驅動 + 異步
沒錯,即使在單個線程中,使用事件驅動+異步也可以實現IO并行處理,Node.js就是非常典型的例子。為什么單線程也可以做到并行呢?這是基于這樣兩個事實:
- 相對于CPU的處理速度來說,IO是非常慢的
- IO不怎么需要計算資源
因此,當我們發(fā)起IO操作后為什么要一直等著IO執(zhí)行完成呢?在IO執(zhí)行完之前的這段時間處理其它IO難道不香嗎?
這就是為什么單線程也可以并行處理多個IO的本質所在。
回到我們的例子,該怎樣用事件驅動+異步來改造上述程序呢?實際上非常簡單。首先我們需要創(chuàng)建一個event loop,這個非常簡單:
event_loop = EventLoop()
然后,我們需要往event loop中加入原材料,也就是需要監(jiān)控的event,就像這樣:
def add_to_event_loop(event_loop, file):
file.asyn_read() # 文件異步讀取
event_loop.add(file)
注意當執(zhí)行file.asyn_read這行代碼時會立即返回,不會阻塞線程,當這行代碼返回時可能文件還沒有真正開始讀取,這就是所謂的異步。
file.asyn_read這行代碼的真正目的僅僅是發(fā)起IO,而不是等待IO執(zhí)行完成。此后我們將該IO放到event loop中進行監(jiān)控,也就是event_loop.add(file)這行代碼的作用。
一切準備就緒,接下來就可以等待event的到來了:
while event_loop:
file = event_loop.wait_one_IO_ready()
process(file.result)
我們可以看到,event_loop會一直等待直到有文件讀取完成(event_loop.wait_one_IO_ready()),這時我們就能得到讀完的文件了,接下來處理即可。全部代碼如下所示:
def add_to_event_loop(event_loop, file):
file.asyn_read() # 文件異步讀取
event_loop.add(file)
def main():
files = [fileA,fileB,fileC ...]
event_loop = EventLoop()
for file in files:
add_to_event_loop(event_loop, file)
while event_loop:
file = event_loop.wait_one_IO_ready()
process(file.result)
多線程 VS 單線程 + event loop
接下來我們看下程序執(zhí)行的效果。
在多線程情況下,假設有10個文件,每個文件讀取需要1秒,那么很簡單,并行讀取10個文件需要1秒。
那么對于單線程+event loop呢?我們再次看下event loop + 異步版本的代碼:
def add_to_event_loop(event_loop, file):
file.asyn_read() # 文件異步讀取
event_loop.add(file)
def main():
files = [fileA,fileB,fileC......]
event_loop = EventLoop()
for file in files:
add_to_event_loop(event_loop, file)
while event_loop:
file = event_loop.wait_one_IO_ready()
process(file.result)
對于add_to_event_loop,由于文件異步讀取,因此該函數可以瞬間執(zhí)行完成,真正耗時的函數其實就是event loop的等待函數,也就是這樣:
file = event_loop.wait_one_IO_ready()
我們知道,一個文件的讀取耗時是1秒,因此該函數在1s后才能返回,但是,但是,接下來是重點。但是雖然該函數wait_one_IO_ready會等待1s,不要忘了,我們利用這兩行代碼同時發(fā)起了10個IO操作請求。
for file in files: add_to_event_loop(event_loop, file)
因此在event_loop.wait_one_IO_ready等待的1s期間,剩下的9個IO也完成了,也就是說event_loop.wait_one_IO_ready函數只是在第一次循環(huán)時會等待1s,但是此后的9次循環(huán)會直接返回,原因就在于剩下的9個IO也完成了。
因此整個程序的執(zhí)行耗時也是1秒。是不是很神奇,我們只用一個線程就達到了10個線程的效果。
這就是event loop + 異步的威力所在。
一個好聽的名字:Reactors模式
本質上,我們上述給出的event loop簡單代碼片段做的事情本質上和生物一樣:
給出刺激,做出反應。
我們這里的給出event,然后處理event。這本質上就是所謂的Reactors模式。
現在你應該明白所謂的Reactors模式是怎么一回事了吧。
所謂的一些看上去復雜的異步框架其核心不過就是這里給出的代碼片段,只是這些框架可以支持更加復雜的多階段任務處理以及各種類型的IO。而我們這里給出的代碼片段只能處理文件讀取這一類IO。
把回調也加進來
如果我們需要處理各種類型的IO上述代碼片段會有什么問題嗎?
問題就在于上述代碼片段就不會這么簡單了,針對不同類型會有不同的處理方法,因此上述process方法需要判斷IO類型然后有針對性的處理,這會使得代碼越來越復雜,越來越難以維護。
幸好我們也有應對策略,這就是回調。
我們可以把IO完成后的處理任務封裝到回調函數中,然后和IO一并注冊到event loop。就像這樣:
def IO_type_1(event_loop, io):
io.start()
def callback(result):
process_IO_type_1(result)
event_loop.add((io, callback))
這樣,event_loop在檢測到有IO完成后就可以把該IO和關聯(lián)的callback處理函數一并檢索出來,直接調用callback函數就可以了。
while event_loop:
io, callback = event_loop.wait_one_IO_ready()
callback(io.result)
看到了吧,這樣event_loop內部就極其簡潔了,even_loop根本就不關心該怎么處理該IO結果,這是注冊的callback該關心的事情,event_loop需要做的僅僅就是拿到event以及相應的處理函數callback,然后調用該callback函數就可以了。
現在我們可以同單線程來并發(fā)編程了,也使用callback對IO處理進行了抽象,使得代碼更加容易維護,想想看還有沒有什么問題?
回調函數的問題
雖然回調函數使得event loop內部更加簡潔,但依然有其它問題,讓我們來仔細看看回調函數:
def start_IO_type_1(event_loop, io):
io.start()
def callback(result):
process_IO_type_1(result)
event_loop.add((io, callback))
從上述代碼中你能看到什么問題嗎?在上述代碼中,一次IO處理過程被分為了兩部分:
- 發(fā)起IO
- IO處理
其中第2部分放到了回調函數中,這樣的異步處理天然不容易理解,這和我們熟悉的發(fā)起IO,等待IO完成、處理IO結果的同步模塊有很大差別。
這里的給的例子很簡單,所以你可能不以為意,但是當處理的任務非常復雜時,可能會出現回調函數中嵌套回調函數,也就是回調地獄,這樣的代碼維護起來會讓你懷疑為什么要稱為一名苦逼的碼農。
問題出在哪里
讓我們再來仔細的看看問題出在了哪里?
同步編程模式下很簡單,但是同步模式下發(fā)起IO,線程會被阻塞,這樣我們就不得不創(chuàng)建多個線程,但是創(chuàng)建過多線程又會有性能問題。
這樣為了發(fā)起IO后不阻塞當前線程我們就不得不采用異步編程+event loop。在這種模式下,異步發(fā)起IO不會阻塞調用線程,我們可以使用單線程加異步編程的方法來實現多線程效果,但是在這種模式下處理一個IO的流程又不得不被拆分成兩部分,這樣的代碼違反程序員直覺,因此難以維護。
那么很自然的,有沒有一種方法既能有同步編程的簡單理解又會有異步編程的非阻塞呢?
Finally!終于到了協(xié)程
利用協(xié)程我可以以同步的形式來異步編程。
這是什么意思呢?
我們之所以采用異步編程是為了發(fā)起IO后不阻塞當前線程,而是用協(xié)程,程序員可以自行決定在什么時刻掛起當前協(xié)程,這樣也不會阻塞當前線程。
而協(xié)程最棒的一點就在于掛起后可以暫存執(zhí)行狀態(tài),恢復運行后可以在掛起點繼續(xù)運行,這樣我們就不再需要像回調那樣將一個IO的處理流程拆分成兩部分了。因此我們可以在發(fā)起異步IO,這樣不會阻塞當前線程,同時在發(fā)起異步IO后掛起當前協(xié)程,當IO完成后恢復該協(xié)程的運行,這樣我們就可以實現同步的方式來異步編程了。接下來我們就用協(xié)程來改造一下回調版本的IO處理方式:
def start_IO_type_1(io):
io.start() # IO異步請求
yield # 暫停當前協(xié)程
process_IO_type_1(result) # 處理返回結果
此后我們要把該協(xié)程放到event loop中監(jiān)控起來:
def add_to_event_loop(io, event_loop):
coroutine = start_IO_type_1(io)
next(coroutine)
event_loop.add(coroutine)
最后,當IO完成后event loop檢索出相應的協(xié)程并恢復其運行:
while event_loop:
coroutine = event_loop.wait_one_IO_ready()
next(coroutine)
現在你應該看出來了吧,上述代碼中沒有回調,也沒有把處理IO的流程拆成兩部分,整體的代碼都是以同步的方式來編寫,最棒的是依然能達到異步的效果。
實際上你會看到,采用協(xié)程后我們依然需要基于事件編程的event loop,因為本質上協(xié)程并沒有改變IO的異步處理本質,只要IO是異步處理的那么我們就必須依賴event loop來監(jiān)控IO何時完成,只不過我們采用協(xié)程消除了對回調的依賴,整體編程方式上還是采用程序員最熟悉也最容易理解的同步方式。
總結
看上去簡簡單單的IO實際上一點都不簡單吧。為了高效進行IO操作,我們采用的技術是這樣演進的:
- 單線程串行 + 阻塞式IO(同步)
- 多線程并行 + 阻塞式IO(并行)
- 單線程 + 非阻塞式IO(異步) + event loop
- 單線程 + 非阻塞式IO(異步) + event loop + 回調
- Reactor模式(更好的單線程 + 非阻塞式IO+ event loop + 回調)
- 單線程 + 非阻塞式IO(異步) + event loop + 協(xié)程
最終我們采用協(xié)程技術獲取到了異步編程的高效以及同步編程的簡單理解,這也是當今高性能服務器常用的一種技術組合。