如何在 Asyncio 中使用 Socket
楔子
本次我們來聊一聊 Socket,以及它如何與 asyncio 搭配使用。
阻塞 Socket
Socket 是對(duì) TCP/IP 協(xié)議的一個(gè)封裝,可以讓我們更方便地使用 TCP/IP 協(xié)議,而不用關(guān)注背后的原理。并且我們經(jīng)常使用的 Web 框架,本質(zhì)上也是一個(gè) Socket。
所以 Socket 是操作系統(tǒng)對(duì) TCP/IP 網(wǎng)絡(luò)協(xié)議棧的封裝,并提供了一系列的接口,我們通過這些接口可以實(shí)現(xiàn)網(wǎng)絡(luò)通信,而不用關(guān)注網(wǎng)絡(luò)協(xié)議的具體細(xì)節(jié)。
圖片
按照現(xiàn)有的網(wǎng)絡(luò)模型,Socket 并不屬于其中的任何一層,但我們可以簡(jiǎn)單地將 Socket 理解為傳輸層之上的抽象層,負(fù)責(zé)連接應(yīng)用層和傳輸層。Socket 提供了大量的 API,基于這些 API 我們可以非常方便地使用網(wǎng)絡(luò)協(xié)議棧,在不同主機(jī)間進(jìn)行網(wǎng)絡(luò)通信。
Linux 一切皆文件,Socket 也不例外,它被稱為套接字文件,在使用上和普通文件是類似的。
Socket 是什么我們已經(jīng)知道了,下面來看看如何使用 Socket 進(jìn)行編程。
圖片
整個(gè)過程如下:
- 服務(wù)端初始化 socket,此時(shí)會(huì)得到「主動(dòng)套接字」;
- 服務(wù)端調(diào)用 bind 方法,將套接字綁定在某個(gè) IP 和端口上;
- 服務(wù)端調(diào)用 listen 進(jìn)行監(jiān)聽,此時(shí)「主動(dòng)套接字」會(huì)變成「監(jiān)聽套接字」;
- 服務(wù)端調(diào)用 accept,等待客戶端連接,此時(shí)服務(wù)端會(huì)阻塞在這里(調(diào)用的是阻塞的 API);
- 客戶端同樣初始化 socket,得到主動(dòng)套接字;
- 客戶端調(diào)用主動(dòng)套接字的 connect,向服務(wù)器端發(fā)起連接請(qǐng)求,如果連接成功,后續(xù)客戶端就用這個(gè)主動(dòng)套接字進(jìn)行數(shù)據(jù)的傳輸;
- 當(dāng)客戶端來連接時(shí),那么服務(wù)端的 accept 將不再阻塞,并返回「已連接套接字」,后續(xù)服務(wù)端便用這個(gè)已連接套接字和客戶端進(jìn)行數(shù)據(jù)傳輸;
- 當(dāng)客戶端來連接時(shí),那么服務(wù)端的 accept 將不再阻塞,并返回「已連接套接字」,后續(xù)服務(wù)端便用這個(gè)已連接套接字和客戶端進(jìn)行數(shù)據(jù)傳輸;
我們使用來編寫代碼演示一下這個(gè)過程,首先是服務(wù)端:
import socket
# socket.socket() 會(huì)返回一個(gè)「主動(dòng)套接字」
server = socket.socket(
# 表示使用 IPv4,如果是 socket.AF_INET6
# 則表示使用 IPv6
socket.AF_INET,
# 表示建立 TCP 連接,如果是 socket.SOCK_DGRAM
# 則表示建立 UDP 連接
socket.SOCK_STREAM
)
# 當(dāng)然這兩個(gè)參數(shù)也可以不傳,因?yàn)槟J(rèn)就是它
# 設(shè)置套接字屬性,這里讓端口釋放后立刻就能再次使用
server.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, True)
# 將「主動(dòng)套接字」綁定在某個(gè) IP 和端口上
server.bind(("localhost", 12345))
# 監(jiān)聽,此時(shí)「主動(dòng)套接字」會(huì)變成「監(jiān)聽套接字」
server.listen(5)
# 調(diào)用 accept,等待客戶端連接,此時(shí)會(huì)阻塞在這里
# 如果客戶端連接到來,那么會(huì)返回「已連接套接字」,也就是這里的 conn
# 至于 addr 則是一個(gè)元組,保存了客戶端連接的信息(IP 和端口)
conn, addr = server.accept()
# 下面我們通過「已連接套接字」conn 和客戶端進(jìn)行消息的收發(fā)
# 收消息使用 recv、發(fā)消息使用 send,和 read、write 本質(zhì)是一樣的
while True:
msg = conn.recv(1024)
# 當(dāng)客戶端斷開連接時(shí),msg 會(huì)收到一個(gè)空字節(jié)串
if not msg:
print("客戶端已經(jīng)斷開連接")
conn.close()
break
print("客戶端發(fā)來消息:", msg.decode("utf-8"))
# 然后我們加點(diǎn)內(nèi)容之后,再給客戶端發(fā)過去
conn.send("服務(wù)端收到, 你發(fā)的消息是: ".encode("utf-8") + msg)
接下來編寫客戶端:
import socket
# 返回主動(dòng)套接字
client = socket.socket(socket.AF_INET,
socket.SOCK_STREAM)
# 連接服務(wù)端
client.connect(("localhost", 12345))
while True:
# 發(fā)送消息
data = input("請(qǐng)輸入內(nèi)容: ")
if data.strip().lower() in ("q", "quit", "exit"):
client.close()
print("Bye~~~")
break
client.send(data.encode("utf-8"))
print(client.recv(1024).decode("utf-8"))
啟動(dòng)服務(wù)端和客戶端進(jìn)行測(cè)試:
圖片
還是比較簡(jiǎn)單的,當(dāng)然我們這里的服務(wù)端每次只能和一個(gè)客戶端通信,如果想服務(wù)多個(gè)客戶端的話,那么需要為已連接套接字單獨(dú)開一個(gè)線程和客戶端進(jìn)行通信,然后主線程繼續(xù)調(diào)用 accept 方法等待下一個(gè)客戶端。
下面來編寫一下多線程的版本,這里只需要編寫服務(wù)端即可,客戶端代碼不變。
import socket
import threading
server = socket.socket()
server.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, True)
server.bind(("localhost", 12345))
server.listen(5)
def handle_message(conn, addr):
while True:
msg = conn.recv(1024)
if not msg:
print(f"客戶端(ip: {addr[0]}, port: {addr[1]}) 已經(jīng)斷開連接")
conn.close()
break
print(f"客戶端(ip: {addr[0]}, port: {addr[1]}) 發(fā)來消息:",
msg.decode("utf-8"))
conn.send("服務(wù)端收到, 你發(fā)的消息是: ".encode("utf-8") + msg)
while True:
conn, addr = server.accept()
threading.Thread(
target=handle_message,
args=(conn, addr)
).start()
代碼很簡(jiǎn)單,就是把已連接套接字和客戶端的通信邏輯寫在了單獨(dú)的函數(shù)中,每來一個(gè)客戶端,服務(wù)端都會(huì)啟動(dòng)一個(gè)新的線程去執(zhí)行該函數(shù),然后繼續(xù)監(jiān)聽,等待下一個(gè)客戶端連接到來。
然后客戶端代碼不變,我們啟動(dòng)三個(gè)客戶端去和服務(wù)端通信,看看結(jié)果如何。
圖片
結(jié)果一切正常,當(dāng)然我們這里的代碼比較簡(jiǎn)單,就是普通的消息收發(fā)。你也可以實(shí)現(xiàn)一個(gè)更復(fù)雜的功能,比如文件下載器,把服務(wù)端當(dāng)成網(wǎng)盤,支持客戶端上傳和下載文件,并不難。
非阻塞 Socket
先回顧一下 socket 模型:
圖片
但是注意:我們說在 listen() 這一步,會(huì)將主動(dòng)套接字轉(zhuǎn)化為監(jiān)聽套接字,但此時(shí)的監(jiān)聽套接字的類型是阻塞的。阻塞類型的監(jiān)聽套接字在調(diào)用 accept() 方法時(shí),如果沒有客戶端來連接的話,就會(huì)一直處于阻塞狀態(tài),那么此時(shí)主線程就沒法干其它事情了。
所以要設(shè)置為非阻塞,而非阻塞的監(jiān)聽套接字在調(diào)用 accept() 時(shí),如果沒有客戶端來連接,那么主線程不會(huì)傻傻地等待,而是會(huì)直接返回,然后去做其它的事情。
類似的,我們?cè)趧?chuàng)建已連接套接字的時(shí)候默認(rèn)也是阻塞的,阻塞類型的已連接套接字在調(diào)用 send() 和 recv() 的時(shí)候也會(huì)處于阻塞狀態(tài)。比如當(dāng)客戶端一直不發(fā)數(shù)據(jù)的時(shí)候,已連接套接字就會(huì)一直阻塞在 recv() 這一步。如果是非阻塞類型的已連接套接字,那么當(dāng)調(diào)用 recv() 但卻收不到數(shù)據(jù)時(shí),也不用處于阻塞狀態(tài),同樣可以直接返回去做其它事情。
import socket
server = socket.socket()
server.bind(("localhost", 12345))
# 調(diào)用 setblocking 方法,傳入 False
# 表示將監(jiān)聽套接字和已連接套接字的類型設(shè)置為非阻塞
server.setblocking(False)
server.listen(5)
while True:
try:
# 非阻塞的監(jiān)聽套接字調(diào)用 accept() 時(shí)
# 如果發(fā)現(xiàn)沒有客戶端連接,則會(huì)立刻拋出 BlockingIOError
# 因此這里寫了個(gè)死循環(huán)
conn, addr = server.accept()
except BlockingIOError:
pass
else:
break
while True:
try:
# 同理,非阻塞的已連接套接字在調(diào)用 recv() 時(shí)
# 如果發(fā)現(xiàn)客戶端沒有發(fā)數(shù)據(jù),那么同樣會(huì)報(bào)錯(cuò)
msg = conn.recv(1024)
except BlockingIOError:
pass
else:
print(msg.decode("utf-8"))
conn.send(b"data from server")
很明顯,雖然上面的代碼在運(yùn)行的時(shí)候正常,但存在兩個(gè)問題:
1)雖然 accept() 不阻塞了,在沒有客戶端連接時(shí)主線程可以去做其它事情,但如果后續(xù)有客戶端連接,主線程要如何得知呢?因此必須要有一種機(jī)制,能夠繼續(xù)在監(jiān)聽套接字上等待后續(xù)連接請(qǐng)求,并在請(qǐng)求到來時(shí)通知主線程。我們上面的做法是寫了一個(gè)死循環(huán),但很明顯這是沒有意義的,這種做法還不如使用阻塞的套接字。
2)send() / recv() 不阻塞了,相當(dāng)于 I/O 讀寫流程不再是阻塞的,讀寫方法都會(huì)瞬間完成并返回,也就是說它會(huì)采用能讀多少就讀多少、能寫多少就寫多少的策略來執(zhí)行 I/O 操作,這顯然更符合我們對(duì)性能的追求。
圖片
顯然對(duì)于非阻塞套接字而言,會(huì)面臨一個(gè)問題,那就是當(dāng)我們執(zhí)行讀取操作時(shí),有可能只讀了一部分?jǐn)?shù)據(jù),剩余的數(shù)據(jù)客戶端還沒發(fā)過來,那么這些數(shù)據(jù)何時(shí)可讀呢?同理寫數(shù)據(jù)也是這種情況,當(dāng)緩沖區(qū)滿了,而我們的數(shù)據(jù)還沒有寫完,那么剩下的數(shù)據(jù)又何時(shí)可寫呢?因此同樣要有一種機(jī)制,能夠在主線程做別的事情的時(shí)候繼續(xù)監(jiān)聽已連接套接字,并且在有數(shù)據(jù)可讀寫的時(shí)候通知主線程。
這樣才能保證主線程既不會(huì)像基本 IO 模型一樣,一直在阻塞點(diǎn)等待,也不會(huì)無法處理實(shí)際到達(dá)的客戶端連接請(qǐng)求和可讀寫的數(shù)據(jù),而上面所提到的機(jī)制便是 I/O 多路復(fù)用。
早期的所有框架都是非阻塞 + 回調(diào) + 基于 IO 多路復(fù)用的事件循環(huán),這種模式的性能也非常高,Redis 和 Nginx 都是基于這種方式實(shí)現(xiàn)了高并發(fā)。只是這種編碼方式非常痛苦,它將好端端的自上而下的邏輯分割的四分五裂,而且也不好維護(hù),它使得開發(fā)人員在編寫業(yè)務(wù)邏輯的同時(shí),還要關(guān)注并發(fā)細(xì)節(jié)。
因此使用多路復(fù)用 + 回調(diào)的方式編寫異步化代碼,雖然并發(fā)量能上去,但是對(duì)開發(fā)者很不友好;而使用同步的方式編寫同步代碼,雖然很容易理解,可并發(fā)量卻又上不去。那么問題來了,有沒有一種辦法,能夠讓我們?cè)谙硎墚惒交瘞淼母卟l(fā)的同時(shí),又能以同步的方式去編寫代碼呢?也就是我們能不能以同步的方式去編寫異步化的代碼呢?
答案是可以的,使用「協(xié)程」便可以辦到。協(xié)程在這種模式的基礎(chǔ)之上又批了一層外衣,兼顧了開發(fā)效率與運(yùn)行效率。
在 asyncio 中使用 Socket
asyncio 的事件循環(huán)提供了處理套接字的一些方法,我們主要會(huì)用到三個(gè):
- sock_accept()
- sock_recv()
- sock_sendall()
這些方法類似于前面使用的套接字方法,但不同之處在于,它們需要接收非阻塞套接字作為參數(shù),然后返回協(xié)程。我們可以等待協(xié)程,直到有數(shù)據(jù)可供操作。
先來看一下 sock_accept(),它類似于 server.accept()。
conn,add = await loop.sock_accept(sock)
然后 sock_recv 和 sock_sendall 的調(diào)用方式與 sock_accept 類似,它們接收一個(gè)套接字,然后返回協(xié)程對(duì)象。通過 await 表達(dá)式,sock_recv 將會(huì)阻塞,直到套接字有可以處理的字節(jié);sock_sendall 接收一個(gè)套接字和要發(fā)送的數(shù)據(jù),同樣會(huì)陷入阻塞,直到要發(fā)送給套接字的所有數(shù)據(jù)都發(fā)送完畢,成功時(shí)返回 None。
data = await loop.sock_recv(sock)
await loop.sock_sendall(sock, data)
下面我們就基于 asyncio 設(shè)計(jì)一個(gè)回顯服務(wù)器。
import asyncio
import socket
async def echo(conn: socket.socket):
loop = asyncio.get_running_loop()
# 無限循環(huán)等待來自客戶端連接的數(shù)據(jù)
try:
while data := await loop.sock_recv(conn, 1024):
# 收到數(shù)據(jù)之后再將其發(fā)送給客戶端
# 為了區(qū)分,我們發(fā)送的時(shí)候在結(jié)尾加一個(gè) b"~"
await loop.sock_sendall(conn, data + b"~")
except Exception as e:
print(f"服務(wù)出錯(cuò): {e}")
finally:
conn.close()
async def listen_for_conn(server: socket.socket):
loop = asyncio.get_running_loop()
while True:
conn, addr = await loop.sock_accept(server)
conn.setblocking(False)
print(f"收到客戶端 {addr} 的連接")
# 每次連接時(shí),都創(chuàng)建一個(gè)任務(wù)來監(jiān)聽客戶端的數(shù)據(jù)
asyncio.create_task(echo(conn))
async def main():
server = socket.socket()
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
server.setblocking(False)
server.bind(("localhost", 12345))
server.listen()
await listen_for_conn(server)
asyncio.run(main())
運(yùn)行這個(gè)應(yīng)用程序可以同時(shí)服務(wù)多個(gè)客戶端,它里面同樣使用了 IO 多路復(fù)用,只不過事件循環(huán)將它封裝起來了,我們不需要直接面對(duì)。所以這種編程模式就簡(jiǎn)單多了。
小結(jié)
如果使用阻塞套接字創(chuàng)建應(yīng)用程序,那么阻塞套接字將在等待數(shù)據(jù)時(shí)停止整個(gè)線程。這阻止了我們實(shí)現(xiàn)并發(fā),因?yàn)橐淮沃荒軓囊粋€(gè)客戶端獲取數(shù)據(jù)。
使用非阻塞套接字構(gòu)建應(yīng)用程序,這些套接字總是會(huì)立即返回,而結(jié)果有兩種:要么已經(jīng)準(zhǔn)備好了數(shù)據(jù),要么因?yàn)闆]有數(shù)據(jù)而出現(xiàn)異常。
使用 asyncio 的事件循環(huán)方法來構(gòu)建具有非阻塞套接字的應(yīng)用程序,這些方法接收一個(gè)套接字并返回一個(gè)協(xié)程,然后可在 await 表達(dá)式中使用它。這將暫停父協(xié)程,直到套接字帶有數(shù)據(jù)。事件循環(huán)就是基于 IO 多路復(fù)用做的一個(gè)封裝,而 IO 多路復(fù)用能夠?qū)崿F(xiàn)的前提之一就是:套接字必須是非阻塞的。