Node.js 是如何處理請(qǐng)求的
TCP 協(xié)議的核心概念
要了解服務(wù)器的工作原理首先需要了解 TCP 協(xié)議的工作原理。TCP 是一種面向連接的、可靠的、基于字節(jié)流的傳輸層全雙工通信協(xié)議,它有 4 個(gè)特點(diǎn):面向連接、可靠、流式、全雙工。下面詳細(xì)講解這些特性。
面向連接
TCP 中的連接是一個(gè)虛擬的連接,本質(zhì)上是主機(jī)在內(nèi)存里記錄了對(duì)端的信息,我們可以將連接理解為一個(gè)通信的憑證。如下圖所示。
那么如何建立連接呢?TCP 的連接是通過三次握手建立的。
服務(wù)器首先需要監(jiān)聽一個(gè)端口。
客戶端主動(dòng)往服務(wù)器監(jiān)聽的端口發(fā)起一個(gè) syn 包(第一次握手)。
當(dāng)服務(wù)器所在操作系統(tǒng)收到一個(gè) syn 包時(shí),會(huì)先根據(jù) syn 包里的目的 IP 和端口找到對(duì)應(yīng)的監(jiān)聽 socket,如果找不到則回復(fù) rst 包,如果找到則發(fā)送 ack 給客戶端(第二次握手),接著新建一個(gè)通信 socket 并插入到監(jiān)聽 socket 的連接中隊(duì)列(具體的細(xì)節(jié)會(huì)隨著不同版本的操作系統(tǒng)而變化。比如連接中隊(duì)列和連接完成隊(duì)列是一條隊(duì)列還是兩條隊(duì)列,再比如是否使用了 syn cookie 技術(shù)來防止 syn flood 攻擊,如果使用了,收到 syn 包的時(shí)候就不會(huì)創(chuàng)建 socket,而是收到第三次握手的包時(shí)再創(chuàng)建)。
客戶端收到服務(wù)器的 ack 后,再次發(fā)送 ack 給服務(wù)器,客戶端就完成三次握手進(jìn)入連接建立狀態(tài)了。
當(dāng)服務(wù)器所在操作系統(tǒng)收到客戶端的 ack 時(shí)(第三次握手),處于連接中隊(duì)列的 socket 就會(huì)被移到連接完成隊(duì)列中。
當(dāng)操作系統(tǒng)完成了一個(gè) TCP 連接,操作系統(tǒng)就會(huì)通知相應(yīng)的進(jìn)程,進(jìn)程從連接完成隊(duì)列中摘下一個(gè)已完成連接的 socket 結(jié)點(diǎn),然后生成一個(gè)新的 fd,后續(xù)就可以在該 fd 上和對(duì)端通信。具體的流程如下圖所示。
完成三次握手后,客戶端和服務(wù)器就可以進(jìn)行數(shù)據(jù)通信了。操作系統(tǒng)收到數(shù)據(jù)包和收到 syn 包的流程不一樣,操作系統(tǒng)會(huì)根據(jù)報(bào)文中的 IP 和端口找到處理該報(bào)文的通信 socket(而不是監(jiān)聽 socket),然后把數(shù)據(jù)包(操作系統(tǒng)實(shí)現(xiàn)中是一個(gè) skb 結(jié)構(gòu)體)掛到該通信 socket 的數(shù)據(jù)隊(duì)列中。
當(dāng)應(yīng)用層調(diào)用 read 讀取該 socket 的數(shù)據(jù)時(shí),操作系統(tǒng)會(huì)根據(jù)應(yīng)用層所需大小,從一個(gè)或多個(gè) skb 中返回對(duì)應(yīng)的字節(jié)數(shù)。同樣,寫也是類似的流程,當(dāng)應(yīng)用層往 socket 寫入數(shù)據(jù)時(shí),操作系統(tǒng)不一定會(huì)立刻發(fā)送出去,而是會(huì)保存到寫緩沖區(qū)中,然后根據(jù)復(fù)雜的 TCP 算法發(fā)送。
當(dāng)兩端完成通信后需要關(guān)閉連接,否則會(huì)浪費(fèi)內(nèi)存。TCP 通過四次揮手實(shí)現(xiàn)連接的斷開,第一次揮手可以由任意一端發(fā)起。前面講過 TCP 是全雙工的,所以除了通過四次揮手完成整個(gè) TCP 連接的斷開外,也可以實(shí)現(xiàn)半斷開,比如客戶端關(guān)閉寫端表示不會(huì)再發(fā)送數(shù)據(jù),但是仍然可以讀取來自對(duì)端發(fā)送端數(shù)據(jù)。四次揮手的流程如下。
可靠
TCP 發(fā)送數(shù)據(jù)時(shí)會(huì)先緩存一份到已發(fā)送待確認(rèn)隊(duì)列中,并啟動(dòng)一個(gè)超時(shí)重傳計(jì)時(shí)器,如果一定時(shí)間內(nèi)沒有收到對(duì)端到確認(rèn) ack,則觸發(fā)重傳機(jī)制,直到收到 ack 或者重傳次數(shù)達(dá)到閾值才會(huì)結(jié)束流程。
流式
建立連接后,應(yīng)用層就可以調(diào)用發(fā)送接口源源不斷地發(fā)送數(shù)據(jù)。通常情況下,并不是每次調(diào)用發(fā)送接口,操作系統(tǒng)就直接把數(shù)據(jù)發(fā)送出去,這些數(shù)據(jù)的發(fā)送是由操作系統(tǒng)按照一定的算法去發(fā)送的。對(duì)操作系統(tǒng)來說,它看到的是字節(jié)流,它會(huì)按照 TCP 算法打包出一個(gè)個(gè)包發(fā)送到對(duì)端,所以當(dāng)對(duì)端收到數(shù)據(jù)后,需要處理好數(shù)據(jù)邊界的問題。
從上圖中可以看到,假設(shè)應(yīng)用層發(fā)送了兩個(gè) HTTP 請(qǐng)求,操作系統(tǒng)在打包數(shù)據(jù)發(fā)送時(shí)可能的場(chǎng)景是第一個(gè)包里包括了 HTTP 請(qǐng)求 1 的全部數(shù)據(jù)和部分請(qǐng)求 2 的數(shù)據(jù),所以當(dāng)對(duì)端收到數(shù)據(jù)并進(jìn)行解析時(shí),就需要根據(jù) HTTP 協(xié)議準(zhǔn)確地解析出第一個(gè) HTTP 請(qǐng)求對(duì)應(yīng)的數(shù)據(jù)。
因?yàn)?TCP 的流式協(xié)議,所以基于 TCP 的應(yīng)用層通常需要定義一個(gè)應(yīng)用層協(xié)議,然后按照應(yīng)用層協(xié)議實(shí)現(xiàn)對(duì)應(yīng)的解析器,這樣才能完成有效的數(shù)據(jù)通信,比如常用的 HTTP 協(xié)議。對(duì)比來說 UDP 是面向數(shù)據(jù)包的協(xié)議,當(dāng)應(yīng)用層把數(shù)據(jù)傳遞給 UDP 時(shí),操作系統(tǒng)會(huì)直接打包發(fā)送出去(如果數(shù)據(jù)字節(jié)大小超過閾值則會(huì)報(bào)錯(cuò))。
全雙工
剛才提到 TCP 是全雙工的,全雙工就是通信的兩端都有一個(gè)發(fā)送隊(duì)列和接收隊(duì)列,可以同時(shí)發(fā)送和接收,互不影響。另外也可以選擇關(guān)閉讀端或者寫端。
服務(wù)器的工作原理
介紹了 TCP 協(xié)議的概念后,接著看看如何創(chuàng)建一個(gè) TCP 服務(wù)器(偽代碼)。
// 創(chuàng)建一個(gè) socket,拿到一個(gè)文件描述符
int server_fd = socket();
// 綁定地址(IP + 端口)到該 socket 中
bind(server_fd, addressInfo);
// 修改 socket 為監(jiān)聽狀態(tài),這樣就可以接收 TCP 連接了
listen(server_fd);
執(zhí)行完以上步驟,一個(gè)服務(wù)器就啟動(dòng)了。服務(wù)器啟動(dòng)的時(shí)候會(huì)監(jiān)聽一個(gè)端口,如果有連接到來,我們可以通過 accept 系統(tǒng)調(diào)用拿到這個(gè)新連接對(duì)應(yīng)的 socket,那這個(gè) socket 和監(jiān)聽的 socket 是不是同一個(gè)呢?
其實(shí) socket 分為監(jiān)聽型和通信型。表面上服務(wù)器用一個(gè)端口實(shí)現(xiàn)了多個(gè)連接,但是這個(gè)端口是用于監(jiān)聽的,底層用于和客戶端通信的其實(shí)是另一個(gè) socket。每當(dāng)一個(gè)連接到來的時(shí)候,操作系統(tǒng)會(huì)根據(jù)請(qǐng)求包的目的地址信息找到對(duì)應(yīng)的監(jiān)聽 socket,如果找不到就會(huì)回復(fù) RST 包,如果找到就會(huì)生成一個(gè)新的 socket 與之通信(accept 的時(shí)候返回的那個(gè))。監(jiān)聽 socket 里保存了監(jiān)聽的 IP 和端口,通信 socket 首先從監(jiān)聽 socket 中復(fù)制 IP 和端口,然后把客戶端的 IP 和端口也記錄下來。這樣一來,下次再收到一個(gè)數(shù)據(jù)包,操作系統(tǒng)就會(huì)根據(jù)四元組從 socket 池子里找到該 socket,完成數(shù)據(jù)的處理。因此理論上,一個(gè)服務(wù)器能接受多少連接取決于服務(wù)器的硬件配置,比如內(nèi)存大小。
接下來分析各種處理連接的方式。
串行模式
串行模式就是服務(wù)器逐個(gè)處理連接,處理完前面的連接后才能繼續(xù)處理后面的連接,邏輯如下。
while(1) {
int client_fd = accept(server_fd);
read(client_fd);
write(client_fd);
}
上面的處理方式是最樸素的模型,如果沒有連接,則服務(wù)器處于阻塞狀態(tài),如果有連接服務(wù)器就會(huì)不斷地調(diào)用 accept 摘下完成三次握手的連接并處理。假設(shè)此時(shí)有 n 個(gè)請(qǐng)求到來,進(jìn)程會(huì)從 accept 中被喚醒,然后拿到一個(gè)新的 socket 用于通信,結(jié)構(gòu)圖如下。
這種處理模式下,如果處理的過程中調(diào)用了阻塞 API,比如文件 IO,就會(huì)影響后面請(qǐng)求的處理,可想而知,效率是非常低的,而且,并發(fā)量比較大的時(shí)候,監(jiān)聽 socket 對(duì)應(yīng)的隊(duì)列很快就會(huì)被占滿(已完成連接隊(duì)列有一個(gè)最大長(zhǎng)度),導(dǎo)致后面的連接無法完成。這是最簡(jiǎn)單的模式,雖然服務(wù)器的設(shè)計(jì)中肯定不會(huì)使用這種模式,但是它讓我們了解了一個(gè)服務(wù)器處理請(qǐng)求的整體過程。
多進(jìn)程模式
串行模式中,所有請(qǐng)求都在一個(gè)進(jìn)程中排隊(duì)被處理,效率非常低下。為了提高效率,我們可以把請(qǐng)求分給多個(gè)進(jìn)程處理。因?yàn)樵诖刑幚淼哪J街校绻形募?IO 操作就會(huì)阻塞進(jìn)程,繼而阻塞后續(xù)請(qǐng)求的處理。在多進(jìn)程的模式中,即使一個(gè)請(qǐng)求阻塞了進(jìn)程,操作系統(tǒng)還可以調(diào)度其它進(jìn)程繼續(xù)執(zhí)行新的任務(wù)。多進(jìn)程模式分為幾種。
按需 fork
按需 fork 模式是主進(jìn)程監(jiān)聽端口,有連接到來時(shí),主進(jìn)程執(zhí)行 accept 摘取連接,然后通過 fork 創(chuàng)建子進(jìn)程處理連接,邏輯如下。
while(1) {
int client_fd = accept(socket);
// 忽略出錯(cuò)處理
if (fork() > 0) {
continue;
// 父進(jìn)程負(fù)責(zé) accept
} else {
// 子進(jìn)程負(fù)責(zé)處理連接
handle(client_fd);
exit();
}
}
這種模式下,每次來一個(gè)請(qǐng)求,就會(huì)新建一個(gè)進(jìn)程去處理,比串行模式稍微好了一點(diǎn),每個(gè)請(qǐng)求都被獨(dú)立處理。假設(shè) a 請(qǐng)求阻塞在文件 IO,不會(huì)影響 b 請(qǐng)求的處理,盡可能地做到了并發(fā)。它的缺點(diǎn)是
1. 進(jìn)程數(shù)有限,如果有大量的請(qǐng)求,需要排隊(duì)處理。
2. 進(jìn)程的開銷會(huì)很大,對(duì)于系統(tǒng)來說是一個(gè)負(fù)擔(dān)。
3. 創(chuàng)建進(jìn)程需要時(shí)間,實(shí)時(shí)創(chuàng)建會(huì)增加處理請(qǐng)求的時(shí)間。
pre-fork 模式 + 主進(jìn)程 accept
pre-fork 模式就是服務(wù)器啟動(dòng)的時(shí)候,預(yù)先創(chuàng)建一定數(shù)量的進(jìn)程,但是這些進(jìn)程是 worker 進(jìn)程,不負(fù)責(zé)接收連接,只負(fù)責(zé)處理請(qǐng)求。處理過程為主進(jìn)程負(fù)責(zé)接收連接,然后把接收到的連接交給 worker 進(jìn)程處理,流程如下。
邏輯如下:
let fds = [[], [], [], …進(jìn)程個(gè)數(shù)];
let process = [];
for (let i = 0 ; i < 進(jìn)程個(gè)數(shù); i++) {
// 創(chuàng)建管道用于傳遞文件描述符
socketpair(fds[i]);
let pid;
if (pid = fork() > 0) {
// 父進(jìn)程
process.push({pid, 其它字段});
} else {
const index = i;
// 子進(jìn)程處理請(qǐng)求
while(1) {
// 從管道中讀取文件描述符
var client_fd = read(fd[index][1]);
// 處理請(qǐng)求
handle(client_fd);
}
}
}
// 主進(jìn)程 accept
for (;;) {
const clientFd = accept(socket);
// 找出處理該請(qǐng)求的子進(jìn)程
const i = findProcess();
// 傳遞文件描述符
write(fds[i][0], clientFd);
}
和 fork 模式相比,pre-fork 模式相對(duì)比較復(fù)雜,因?yàn)樵谇耙环N模式中,主進(jìn)程收到一個(gè)請(qǐng)求就會(huì)實(shí)時(shí) fork 一個(gè)子進(jìn)程,這個(gè)子進(jìn)程會(huì)繼承主進(jìn)程中新請(qǐng)求對(duì)應(yīng)的 fd,可以直接處理該 fd 對(duì)應(yīng)的請(qǐng)求。但是在進(jìn)程池的模式中,子進(jìn)程是預(yù)先創(chuàng)建的,當(dāng)主進(jìn)程收到一個(gè)請(qǐng)求的時(shí)候,子進(jìn)程中無法拿得到該請(qǐng)求對(duì)應(yīng)的 fd 。這時(shí)候就需要主進(jìn)程使用傳遞文件描述符的技術(shù)把這個(gè)請(qǐng)求對(duì)應(yīng)的 fd 傳給子進(jìn)程。
pre-fork 模式 + 子進(jìn)程 accept
剛才介紹的模式中,是主進(jìn)程接收連接,然后傳遞給子進(jìn)程處理,這樣主進(jìn)程就會(huì)成為系統(tǒng)的瓶頸,它可能來不及接收和分發(fā)請(qǐng)求給子進(jìn)程,而子進(jìn)程卻很空閑。子進(jìn)程 accept 這種模式也是會(huì)預(yù)先創(chuàng)建多個(gè)進(jìn)程,區(qū)別是多個(gè)子進(jìn)程會(huì)調(diào)用 accept 共同處理請(qǐng)求,而不需要父進(jìn)程參與,邏輯如下。
int server_fd = socket();
bind(server_fd);
for (let i = 0 ; i < 進(jìn)程個(gè)數(shù); i++) {
if (fork() > 0) {
// 父進(jìn)程負(fù)責(zé)監(jiān)控子進(jìn)程
} else {
// 子進(jìn)程處理請(qǐng)求
listen(server_fd);
while(1) {
int client_fd = accept(socket);
handle(client_fd);
}
}
}
這種模式下多個(gè)子進(jìn)程都阻塞在 accept,如果這時(shí)候有一個(gè)請(qǐng)求到來,那么所有的子進(jìn)程都會(huì)被喚醒,但是先被調(diào)度的子進(jìn)程會(huì)摘下這個(gè)請(qǐng)求節(jié)點(diǎn),后續(xù)的進(jìn)程被喚醒后可能會(huì)遇到已經(jīng)沒有請(qǐng)求可以處理,而又進(jìn)入睡眠,這種進(jìn)程被無效喚醒的現(xiàn)象就是著名的驚群現(xiàn)象。這種模式的處理流程如下。
Nginx 中解決了驚群這個(gè)問題,它的處理方式是在 accpet 之前先加鎖,拿到鎖的進(jìn)程才進(jìn)行 accept,這樣就保證了只有一個(gè)進(jìn)程會(huì)阻塞在 accept,不會(huì)引起驚群?jiǎn)栴},但是新版操作系統(tǒng)已經(jīng)在內(nèi)核層面解決了這個(gè)問題,每次只會(huì)喚醒一個(gè)進(jìn)程。
多線程模式
除了使用多進(jìn)程外,也可以使用多線程技術(shù)處理連接,多線程模式和多進(jìn)程模式類似,區(qū)別是在進(jìn)程模式中,每個(gè)子進(jìn)程都有自己的 task_struct,這就意味著在 fork 之后,每個(gè)進(jìn)程負(fù)責(zé)維護(hù)自己的數(shù)據(jù)、資源。線程則不一樣,線程共享進(jìn)程的數(shù)據(jù)和資源,所以連接可以在多個(gè)線程中共享,不需要通過文件描述符傳遞的方式進(jìn)行處理,比如如下架構(gòu)。
上圖中,主線程負(fù)責(zé) accept 請(qǐng)求,然后通過互斥的方式插入一個(gè)任務(wù)到共享隊(duì)列中,線程池中的子線程同樣是通過互斥的方式,從共享隊(duì)列中摘取節(jié)點(diǎn)進(jìn)行處理。
事件驅(qū)動(dòng)
從之前的處理模式中我們知道,為了應(yīng)對(duì)大量的請(qǐng)求,服務(wù)器通常需要大量的進(jìn)程 / 線程,這是個(gè)非常大的開銷。現(xiàn)在很多服務(wù)器(Nginx、Nodejs、Redis)都開始使用單進(jìn)程 + 事件驅(qū)動(dòng)模式去設(shè)計(jì),這種模式可以在單個(gè)進(jìn)程中輕松處理成千上萬的請(qǐng)求。
但也正因?yàn)閱芜M(jìn)程模式下,再多的請(qǐng)求也只在一個(gè)進(jìn)程里處理,這樣一個(gè)任務(wù)會(huì)一直在占據(jù) CPU,后續(xù)的任務(wù)就無法執(zhí)行了。因此,事件驅(qū)動(dòng)模式不適合 CPU 密集型的場(chǎng)景,更適合 IO 密集的場(chǎng)景(一般都會(huì)提供線程 / 線程池,負(fù)責(zé)處理 CPU 或者阻塞型的任務(wù))。大部分操作系統(tǒng)都提供了事件驅(qū)動(dòng)的 API,但是事件驅(qū)動(dòng)在不同系統(tǒng)中實(shí)現(xiàn)不一樣,所以一般都會(huì)有一層抽象層抹平這個(gè)差異。這里以 Linux 的 epoll 為例子。
// 創(chuàng)建一個(gè) epoll 實(shí)例
int epoll_fd = epoll_create();
/*
在 epoll 給某個(gè)文件描述符注冊(cè)感興趣的事件,這里是監(jiān)聽的 socket,注冊(cè)可讀事件,即連接到來
event = {
event: 可讀
fd:監(jiān)聽 socket
// 一些上下文
}
*/
epoll_ctl(epoll_fd , EPOLL_CTL_ADD , socket, event);
while(1) {
// 阻塞等待事件就緒,events 保存就緒事件的信息,total 是個(gè)數(shù)
int total= epoll_wait(epoll_fd , 保存就緒事件的結(jié)構(gòu)events, 事件個(gè)數(shù), timeout);
for (let i = 0; i < total; i++) {
if (events[fd] === 監(jiān)聽 socket) {
int client_fd = accpet(socket);
// 把新的 socket 也注冊(cè)到 epoll,等待可讀,即可讀取客戶端數(shù)據(jù)
epoll_ctl(epoll_fd , EPOLL_CTL_ADD , client_fd, event);
} else {
// 從events[i] 中拿到一些上下文,執(zhí)行相應(yīng)的回調(diào)
}
}
}
事件驅(qū)動(dòng)模式的處理流程為服務(wù)器注冊(cè)文件描述符和事件到 epoll 中,然后 epoll 開始阻塞,當(dāng)有事件觸發(fā)時(shí) epoll 就會(huì)返回哪些 fd 的哪些事件觸發(fā)了,接著服務(wù)器遍歷就緒事件并執(zhí)行對(duì)應(yīng)的回調(diào),在回調(diào)里可以再次注冊(cè) / 刪除事件,就這樣不斷驅(qū)動(dòng)著進(jìn)程的運(yùn)行。epoll 的原理其實(shí)也類似事件驅(qū)動(dòng),它底層維護(hù)用戶注冊(cè)的事件和文件描述符,本身也會(huì)在文件描述符對(duì)應(yīng)的文件 / socket / 管道處注冊(cè)一個(gè)回調(diào),等被通知有事件發(fā)生的時(shí)候,就會(huì)把 fd 和事件返回給用戶,大致原理如下。
function epoll_wait() {
for 事件個(gè)數(shù)
// 調(diào)用文件系統(tǒng)的函數(shù)判斷
if (事件 [i] 中對(duì)應(yīng)的文件描述符中有某個(gè)用戶感興趣的事件發(fā)生 ?) {
插入就緒事件隊(duì)列
} else {
/*
在事件 [i] 中的文件描述符所對(duì)應(yīng)的文件 / socke / 管道等資源中注冊(cè)回調(diào)。
感興趣的事件觸發(fā)后回調(diào) epoll,回調(diào) epoll 后,epoll 把該 event[i] 插入
就緒事件隊(duì)列返回給用戶
*/
}
}
SO_REUSEPORT 端口復(fù)用
新版 Linux 支持 SO_REUSEPORT 特性后,使得服務(wù)器性能有了很大的提升。SO_REUSEPORT 之前,一個(gè) socket 是無法綁定到同一個(gè)地址的,通常的做法是主進(jìn)程接收連接然后傳遞給子進(jìn)程處理,或者主進(jìn)程 bind 后 fork 子進(jìn)程,然后子進(jìn)程執(zhí)行 listen,但底層是共享同一個(gè) socket,所以連接到來時(shí)所有子進(jìn)程都會(huì)被喚醒,但是只有一個(gè)連接可以處理這個(gè)請(qǐng)求,其他的進(jìn)程被無效喚醒。SO_REUSEPORT 特性支持多個(gè)子進(jìn)程對(duì)應(yīng)多個(gè)監(jiān)聽 socket,多個(gè) socket 綁定到同一個(gè)地址,當(dāng)連接到來時(shí),操作系統(tǒng)會(huì)根據(jù)地址信息找到一組 socket,然后根據(jù)策略選擇一個(gè) socket 并喚醒阻塞在該 socket 的進(jìn)程,被 socket 喚醒的進(jìn)程處理自己的監(jiān)聽 socket 下的連接就行,架構(gòu)如下。
除了前面介紹的模式外,還有基于協(xié)程的模式,服務(wù)器技術(shù)繁多,就不一一介紹了。
IO 模型
IO 模型是服務(wù)器中非常重要的部分,操作系統(tǒng)通常會(huì)提供了多種 IO 模型,常見的如下。
阻塞 IO
當(dāng)線程執(zhí)行一個(gè) IO 操作時(shí),如果不滿足條件,當(dāng)前線程會(huì)被阻塞,然后操作系統(tǒng)會(huì)調(diào)度其他線程執(zhí)行。
非阻塞 IO
非阻塞 IO 在不滿足條件的情況下直接返回一個(gè)錯(cuò)誤碼給線程,而不是阻塞線程。
那么這個(gè)阻塞是什么意思呢?直接看一段操作系統(tǒng)的代碼。
// 沒有空間可寫了
while(!(space = UN_BUF_SPACE(pupd)))
{
// 非阻塞模式,直接返回錯(cuò)誤碼
if (nonblock)
return(-EAGAIN);
// 阻塞模式,進(jìn)入阻塞狀態(tài)
interruptible_sleep_on(sock->wait);
}
void interruptible_sleep_on(struct wait_queue **p)
{
// 修改線程狀態(tài)為阻塞狀態(tài)
__sleep_on(p,TASK_INTERRUPTIBLE);
}
static inline void __sleep_on(struct wait_queue **p, int state)
{
unsigned long flags;
// current 代表當(dāng)前執(zhí)行的線程
struct wait_queue wait = { current, NULL };
// 修改線程狀態(tài)為阻塞狀態(tài)
current->state = state;
// 當(dāng)前線程加入到資源的阻塞隊(duì)列,資源就緒后喚醒線程
add_wait_queue(p, &wait);
// 重新調(diào)度其他線程執(zhí)行,即從就緒的線程中選擇一個(gè)來執(zhí)行
schedule();
}
通過這段代碼,我們就可以非常明確地了解到阻塞和非阻塞到底是指什么。
多路復(fù)用 IO
在阻塞式 IO 中,我們需要通過阻塞進(jìn)程來感知 IO 是否就緒,在非阻塞式 IO 中,我們需要通過輪詢來感知 IO 是否就緒,這些都不是合適的方式。為了更好感知 IO 是否就緒,操作系統(tǒng)實(shí)現(xiàn)了訂閱發(fā)布機(jī)制,我們只需要注冊(cè)感興趣的 fd 和事件,當(dāng)事件發(fā)生時(shí)我們就可以感知到。多路復(fù)用 IO 可以同時(shí)訂閱多個(gè) fd 的多個(gè)事件,是現(xiàn)在高性能服務(wù)器的基石??匆粋€(gè)例子。
#include <sys/event.h>
#include <fcntl.h>
#include <stdio.h>
int main(int argc, char **argv)
{
// 用于注冊(cè)事件到 kqueue
struct kevent event;
// 用于接收從 kqueue 返回到事件,表示哪個(gè) fd 觸發(fā)了哪些事件
struct kevent emit_event;
int kqueue_fd, file_fd, result;
// 打開需要監(jiān)控的文件,拿到一個(gè) fd
file_fd = open(argv[1], O_RDONLY);
if (file_fd == -1) {
printf("Fail to open %s", argv[1]);
return 1;
}
// 創(chuàng)建 kqueue 實(shí)例
kqueue_fd = kqueue();
// 設(shè)置需要監(jiān)聽的事件,文件被寫入時(shí)觸發(fā)
EV_SET(&event,file_fd, EVFILT_VNODE, EV_ADD | EV_CLEAR, NOTE_RENAME, 0, NULL);
// 注冊(cè)到操作系統(tǒng)
result = kevent(kqueue_fd, &event, 1, NULL, 0, NULL);
// 不斷阻塞等待,直到文件被寫入
while(1) {
// result 返回觸發(fā)事件的 fd 個(gè)數(shù),這里是一個(gè)
result = kevent(kqueue_fd, NULL, 0, &emit_event, 1, NULL);
if (result > 0) {
printf("%s have been renamed\n", argv[1]);
}
}
}
異步 IO
前面介紹的幾種 IO 模型中,當(dāng) IO 就緒時(shí)需要自己執(zhí)行讀寫操作,而異步 IO 是 IO 就緒時(shí),操作系統(tǒng)幫助線程完成 IO 操作,然后再通知線程操作完成了。下面以 io_uring(Linux 中的異步 IO 框架) 為例了解下具體的情況。
uv_loop_t* loop;
napi_get_uv_event_loop(env, &loop);
struct io_uring_info *io_uring_data = (io_uring_info *)loop->data;
// 申請(qǐng)內(nèi)存
struct request *req = (struct request *)malloc(sizeof(*req) + (sizeof(struct iovec) * 1));
req->fd = fd;
req->offset = offset;
// 保存回調(diào)
napi_create_reference(env, args[2], 1, &req->func);
req->env = env;
req->nvecs = 1;
// 記錄buffer大小
req->iovecs[0].iov_len = bufferLength;
// 記錄內(nèi)存地址
req->iovecs[0].iov_base = bufferData;
// 提交給操作系統(tǒng),操作系統(tǒng)讀完后通知線程,op 為 IORING_OP_READV 表示讀操作
submit_request(op, req, &io_uring_data->ring);
上面的代碼就是我們提交了一個(gè)讀請(qǐng)求給操作系統(tǒng),然后操作系統(tǒng)在文件可讀并且讀完成后通知我們。
Libuv 雖然寫著是異步 IO 庫,但是它并不是真正的異步 IO。它的意思是,你提交一個(gè) IO 請(qǐng)求時(shí),可以注冊(cè)一個(gè)回調(diào),然后就可以去做其他事情了,等操作完成后它會(huì)通知你,它的底層實(shí)現(xiàn)是線程池 + 多路復(fù)用 IO。
Node.js TCP 服務(wù)器的實(shí)現(xiàn)
Node.js 服務(wù)器的底層是 IO 多路復(fù)用 + 非阻塞 IO,所以可以輕松處理成千上萬的請(qǐng)求,但是因?yàn)?Node.js 是單線程的,所以更適合處理 IO 密集型的任務(wù)。下面看看 Node.js 中服務(wù)器是如何實(shí)現(xiàn)的。
啟動(dòng)服務(wù)器
在 Node.js 中,我們通常使用以下方式創(chuàng)建一個(gè)服務(wù)器。
// 創(chuàng)建一個(gè) TCP Server
const server = net.createServer((socket) => {
// 處理連接
});
// 監(jiān)聽端口,啟動(dòng)服務(wù)器
server.listen(8888);
使用 net.createServer 可以創(chuàng)建一個(gè)服務(wù)器,然后拿到一個(gè) Server 對(duì)象,接著調(diào)用 Server 對(duì)象的 listen 函數(shù)就可以啟動(dòng)一個(gè) TCP 服務(wù)器了。下面來看一下具體的實(shí)現(xiàn)。
function createServer(options, connectionListener) {
return new Server(options, connectionListener);
}
function Server(options, connectionListener) {
EventEmitter.call(this);
// 服務(wù)器收到的連接數(shù),可以通過 maxConnections 限制并發(fā)連接數(shù)
this._connections = 0;
// C++ 層的對(duì)象,真正實(shí)現(xiàn) TCP 功能的地方
this._handle = null;
// 服務(wù)器下的連接是否允許半關(guān)閉
this.allowHalfOpen = options.allowHalfOpen || false;
// 有連接時(shí)是否注冊(cè)可讀事件,如果該 socket 是交給其他進(jìn)程處理的話可以設(shè)置為 true
this.pauseOnConnect = !!options.pauseOnConnect;
}
createServer 返回的是一個(gè)一般的 JS 對(duì)象,繼續(xù)看一下 listen 函數(shù)的邏輯,listen 函數(shù)邏輯很繁瑣,但是原理大致是一樣的,所以只講解常用的情況。
Server.prototype.listen = function(...args) {
/*
處理入?yún)?,listen 可以接收很多種格式的參數(shù),
假設(shè)我們這里只傳了 8888 端口號(hào)
*/
const normalized = normalizeArgs(args);
// normalized = [{port: 8888}, null];
const options = normalized[0];
// 監(jiān)聽成功后的回調(diào)
const cb = normalized[1];
// listen 成功后執(zhí)行的回調(diào)
if (cb !== null) {
this.once('listening', cb);
}
listenIncluster(this,
null,
options.port | 0,
4,
...);
return this;
};
listen 處理了入?yún)⒑?,接著調(diào)用了 listenIncluster。
function listenIncluster(server,
address,
port,
addressType,
backlog,
fd,
exclusive) {
exclusive = !!exclusive;
if (cluster === null) cluster = require('cluster');
if (cluster.isMaster || exclusive) {
server._listen2(address, port, addressType, backlog, fd);
return;
}
}
這里只分析在主進(jìn)程創(chuàng)建服務(wù)器的情況,listenIncluster 中執(zhí)行了 _listen2,_listen2 對(duì)應(yīng)的函數(shù)是 setupListenHandle。
function setupListenHandle(address, port, addressType, backlog, fd) {
// 通過 C++ 層導(dǎo)出的 API 創(chuàng)建一個(gè)對(duì)象,該對(duì)象關(guān)聯(lián)了 C++ 層的 TCPWrap 對(duì)象
this._handle = new TCP(TCPConstants.SERVER);
// 創(chuàng)建 socket 并綁定地址到 socket 中
this._handle.bind(address, port);
// 有完成三次握手的連接時(shí)執(zhí)行的回調(diào)
this._handle.onconnection = onconnection;
// 互相關(guān)聯(lián)
this._handle.owner = this;
// 執(zhí)行 C++ 層 listen
this._handle.listen(backlog || 511);
// 觸發(fā) listen 回調(diào)
nextTick(this[async_id_symbol], emitListeningNT, this);
}
setupListenHandle 的邏輯如下。
1. 調(diào)用 new TCP 創(chuàng)建一個(gè) handle(new TCP 對(duì)象關(guān)聯(lián)了 C++ 層的 TCPWrap 對(duì)象)。
2. 保存處理連接的函數(shù) onconnection,當(dāng)有連接時(shí)被執(zhí)行。
3. 調(diào)用了 bind 綁定地址到 socket。
4. 調(diào)用 listen 函數(shù)修改 socket 狀態(tài)為監(jiān)聽狀態(tài)。首先看看 new TCP 做了什么。
void TCPWrap::New(const FunctionCallbackInfo<Value>& args) {
new TCPWrap(env, args.This(), ...);
}
TCPWrap::TCPWrap(Environment* env, Local<Object> object, ProviderType provider)
: ConnectionWrap(env, object, provider) {
// 初始化一個(gè) tcp handle
int r = uv_tcp_init(env->event_loop(), &handle_);
}
new TCP 本質(zhì)上是創(chuàng)建一個(gè) TCP 層的 TCPWrap 對(duì)象,并初始化了 Libuv 的數(shù)據(jù)結(jié)構(gòu) uv_tcp_t(TCPWrap 是對(duì) Libuv uv_tcp_t 的封裝)。接著看 bind。
template <typename T>
void TCPWrap::Bind(...) {
// 通過 JS 對(duì)象拿到關(guān)聯(lián)的 C++ TCPWrap 對(duì)象
TCPWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap,
args.Holder(),
args.GetReturnValue().Set(UV_EBADF));
// 通過 JS 傳入的地址信息直接調(diào)用 Libuv
uv_tcp_bind(&wrap->handle_,
reinterpret_cast<const sockaddr*>(&addr),
flags);
}
Bind 函數(shù)的邏輯很簡(jiǎn)單,直接調(diào)用了 Libuv 函數(shù)。
int uv_tcp_bind(...) {
return uv__tcp_bind(handle, addr, addrlen, flags);
}
int uv__tcp_bind(uv_tcp_t* tcp,
const struct sockaddr* addr,
unsigned int addrlen,
unsigned int flags) {
// 創(chuàng)建一個(gè) socket,并把返回的 fd 保存到 tcp 結(jié)構(gòu)體中
maybe_new_socket(tcp, addr->sa_family, 0);
on = 1;
// 默認(rèn)設(shè)置了 SO_REUSEADDR 屬性,后面具體分析
setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
// 綁定地址信息到 socket
bind(tcp->io_watcher.fd, addr, addrlen);
return 0;
}
uv__tcp_bind 創(chuàng)建了一個(gè) TCP socket 然后把地址信息保存到該 socket 中,執(zhí)行 bind 綁定了地址信息后就繼續(xù)調(diào)用 listen 把 socket 變成監(jiān)聽狀態(tài),C++ 層代碼和 Bind 的差不多,就不再分析,直接看 Libuv 的代碼。
int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
}
int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {
static int single_accept = -1;
unsigned long flags;
int err;
// 已廢棄
if (single_accept == -1) {
const char* val = getenv("UV_TCP_SINGLE_ACCEPT");
single_accept = (val != NULL && atoi(val) != 0);
}
// 有連接時(shí)是否連續(xù)接收,或者間歇性處理,見后面分析
if (single_accept)
tcp->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT;
flags = 0;
// 設(shè)置 flags 到 handle 上,因?yàn)橐呀?jīng)創(chuàng)建了 socket
maybe_new_socket(tcp, AF_INET, flags);
listen(tcp->io_watcher.fd, backlog)
// 保存回調(diào),有連接到來時(shí)被 Libuv 執(zhí)行
tcp->connection_cb = cb;
tcp->flags |= UV_HANDLE_BOUND;
// 有連接來時(shí)的處理函數(shù),該函數(shù)再執(zhí)行上面的 connection_cb
tcp->io_watcher.cb = uv__server_io;
// 注冊(cè)可讀事件,等待連接到來
uv__io_start(tcp->loop, &tcp->io_watcher, POLLIN);
return 0;
}
uv_tcp_listen 首先調(diào)用了 listen 函數(shù)修改 socket 狀態(tài)為監(jiān)聽狀態(tài),這樣才能接收 TCP 連接,接著保存了 C++ 層的回調(diào),并設(shè)置 Libuv 層的回調(diào),最后注冊(cè)可讀事件等待 TCP 連接的到來。這里需要注意兩個(gè)回調(diào)函數(shù)的執(zhí)行順序,當(dāng)有 TCP 連接到來時(shí) Libuv 會(huì)執(zhí)行 uvserver_io,在 uvserver_io 里再執(zhí)行 C++ 層的回調(diào) cb。至此,服務(wù)器就啟動(dòng)了。其中 uv__io_start 最終會(huì)把服務(wù)器對(duì)應(yīng)的文件描述符注冊(cè)到 IO多路 復(fù)用模塊中。
處理連接
當(dāng)有三次握手的連接完成時(shí),操作系統(tǒng)會(huì)新建一個(gè)通信的 socket,并通知 Libuv,Libuv 會(huì)執(zhí)行 uv__server_io。
void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv_stream_t* stream;
int err;
stream = container_of(w, uv_stream_t, io_watcher);
uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
// 回調(diào)了可能關(guān)閉了 server,所以需要實(shí)時(shí)判斷
while (uv__stream_fd(stream) != -1) {
// 摘取一個(gè) TCP 連接,成功的話,err 保存了對(duì)應(yīng)的 fd
err = uv__accept(uv__stream_fd(stream));
// 保存 fd 在 accepted_fd,等待處理
stream->accepted_fd = err;
// 執(zhí)行回調(diào)
stream->connection_cb(stream, 0);
// 如果回調(diào)里沒有處理該 accepted_fd,則注銷可讀事件、先不處理新的連接
if (stream->accepted_fd != -1) {
uv__io_stop(loop, &stream->io_watcher, POLLIN);
return;
}
// 設(shè)置了 UV_HANDLE_TCP_SINGLE_ACCEPT 則進(jìn)入睡眠,讓其他進(jìn)程有機(jī)會(huì)參與處理
if (stream->type == UV_TCP &&
(stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
struct timespec timeout = { 0, 1 };
nanosleep(&timeout, NULL);
}
}
}
uvserver_io 中通過 uvaccept 從操作系統(tǒng)中摘取一個(gè)完成連接的 TCP socket 并拿到一個(gè) fd ,接著保存到 accepted_fd 中并執(zhí)行 connection_cb 回調(diào)。
此外,我們需要注意 UV_HANDLE_TCP_SINGLE_ACCEPT 標(biāo)記。因?yàn)榭赡苡卸鄠€(gè)進(jìn)程監(jiān)聽同一個(gè)端口,當(dāng)多個(gè)連接到來時(shí),多個(gè)進(jìn)程可能會(huì)競(jìng)爭(zhēng)處理這些連接(驚群?jiǎn)栴})。這樣一來,首先被調(diào)度的進(jìn)程可能會(huì)直接處理所有的連接,導(dǎo)致負(fù)載不均衡。通過 UV_HANDLE_TCP_SINGLE_ACCEPT 標(biāo)記,可以在通知進(jìn)程接收連接時(shí),每接收到一個(gè)后先睡眠一段時(shí)間,讓其他進(jìn)程也有機(jī)會(huì)接收連接,一定程度解決負(fù)載不均衡的問題,不過這個(gè)邏輯最近被去掉了,Libuv 維護(hù)者 bnoordhuis 的理由是,第二次調(diào)用 uvaccept 時(shí)有 99.9% 的概念會(huì)返回 EAGAIN,那就是沒有更多的連接可以處理,這樣額外調(diào)用 uvaccept 帶來的系統(tǒng)調(diào)用開銷是比較可觀的。
接著看 connection_cb,connection_cb 對(duì)應(yīng)的是 C++ 層的 OnConnection。
// WrapType 為 TCPWrap,UVType 為 uv_tcp_t
template <typename WrapType, typename UVType>
void ConnectionWrap<WrapType, UVType>::OnConnection(uv_stream_t* handle, int status) {
// HandleWrap 中保存了 handle 和 TCPWrap 的關(guān)系,這里取出來使用
WrapType* wrap_data = static_cast<WrapType*>(handle->data);
Environment* env = wrap_data->env();
Local<Value> argv[] = {
Integer::New(env->isolate(), status),
Undefined(env->isolate())
};
// 新建一個(gè)表示和客戶端通信的對(duì)象,和 JS 層執(zhí)行 new TCP 一樣
Local<Object> client_obj = WrapType::Instantiate(env,wrap_data,WrapType::SOCKET);
WrapType* wrap;
// 從 client_obj 中取出關(guān)聯(lián)的 TCPWrap 對(duì)象存到 wrap 中
ASSIGN_OR_RETURN_UNWRAP(&wrap, client_obj);
// 拿到 TCPWrap 中的 uv_tcp_t 結(jié)構(gòu)體,再轉(zhuǎn)成 uv_stream_t,因?yàn)樗鼈冾愃聘割惡妥宇惖年P(guān)系
uv_stream_t* client_handle = reinterpret_cast<uv_stream_t*>(&wrap->handle_);
// 把通信 fd 存儲(chǔ)到 client_handle 中
uv_accept(handle, client_handle);
argv[1] = client_obj;
// 回調(diào)上層的 onconnection 函數(shù)
wrap_data->MakeCallback(env->onconnection_string(), arraysize(argv), argv);
}
當(dāng)建立了新連接時(shí),操作系統(tǒng)會(huì)新建一個(gè) socket。同樣,在 Node.js 層,也會(huì)通過 Instantiate 函數(shù)新建一個(gè)對(duì)應(yīng)的對(duì)象表示和客戶端的通信。結(jié)構(gòu)如下所示。
Instantiate 代碼如下所示。
MaybeLocal<Object> TCPWrap::Instantiate(Environment* env,
AsyncWrap* parent,
TCPWrap::SocketType type) {
// 拿到導(dǎo)出到 JS 層的 TCP 構(gòu)造函數(shù)(緩存在env中)
Local<Function> constructor = env->tcp_constructor_template()
->GetFunction(env->context())
.ToLocalChecked();
Local<Value> type_value = Int32::New(env->isolate(), type);
// 相當(dāng)于我們?cè)?JS 層調(diào)用 new TCP() 時(shí)拿到的對(duì)象
return handle_scope.EscapeMaybe(
constructor->NewInstance(env->context(), 1, &type_value));
}
新建完和對(duì)端通信的對(duì)象后,接著調(diào)用 uv_accept 消費(fèi)剛才保存在 accepted_fd 中的 fd,并把對(duì)應(yīng)的 fd 保存到 C++ TCPWrap 對(duì)象的 uv_tcp_t 結(jié)構(gòu)體中。
int uv_accept(uv_stream_t* server, uv_stream_t* client) {
int err;
// 把 accepted_fd 保存到 client 中
uv__stream_open(client,
server->accepted_fd,
UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
// 處理了,重置該字段
server->accepted_fd = -1;
// 保證注冊(cè)了可讀事件,繼續(xù)處理新的連接
uv__io_start(server->loop, &server->io_watcher, POLLIN);
return err;
}
C++ 層拿到一個(gè)新的對(duì)象并且保存了 fd 到對(duì)象后,接著回調(diào) JS 層的 onconnection。
// clientHandle 代表一個(gè)和客戶端建立 TCP 連接的實(shí)體
function onconnection(err, clientHandle) {
const handle = this;
const self = handle.owner;
// 新建一個(gè) socket 用于通信
const socket = new Socket({
handle: clientHandle,
allowHalfOpen: self.allowHalfOpen,
pauseOnCreate: self.pauseOnConnect
});
// 服務(wù)器的連接數(shù)加一
self._connections++;
// 觸發(fā)用戶層連接事件
self.emit('connection', socket);
}
在 JS 層也會(huì)封裝一個(gè) Socket 對(duì)象用于管理和客戶端的通信,整體的關(guān)系如下。
接著觸發(fā) connection 事件,剩下的事情就是應(yīng)用層處理了,整體流程如下。
Node.js HTTP 服務(wù)器的創(chuàng)建
接著看看 HTTP 服務(wù)器的實(shí)現(xiàn)。下面是 Node.js 中創(chuàng)建服務(wù)器的例子。
const http = require('http');
http.createServer((req, res) => {
res.write('hello');
res.end();
})
.listen(3000);
我們沿著 createServer 開始分析。
function createServer(opts, requestListener) {
return new Server(opts, requestListener);
}
createServer 中創(chuàng)建了一個(gè) Server 對(duì)象,來看看 Server 初始化的邏輯。
function Server(options, requestListener) {
// 可以自定義表示請(qǐng)求的對(duì)象和響應(yīng)的對(duì)象
this[kIncomingMessage] = options.IncomingMessage || IncomingMessage;
this[kServerResponse] = options.ServerResponse || ServerResponse;
// HTTP 頭最大字節(jié)數(shù)
this.maxHeaderSize = options.maxHeaderSize;
// 允許半關(guān)閉
net.Server.call(this, { allowHalfOpen: true });
// 有請(qǐng)求時(shí)的回調(diào)
if (requestListener) {
this.on('request', requestListener);
}
// 服務(wù)器 socket 讀端關(guān)閉時(shí)是否允許繼續(xù)處理隊(duì)列里的響應(yīng)(TCP 上有多個(gè)請(qǐng)求,管道化)
this.httpAllowHalfOpen = false;
// 有連接時(shí)的回調(diào),由 net 模塊觸發(fā)
this.on('connection', connectionListener);
// 服務(wù)器下所有請(qǐng)求和響應(yīng)的超時(shí)時(shí)間
this.timeout = 0;
// 同一個(gè) TCP 連接上,兩個(gè)請(qǐng)求之前最多間隔的時(shí)間
this.keepAliveTimeout = 5000;
// HTTP 頭的最大個(gè)數(shù)
this.maxHeadersCount = null;
// 解析頭部的最長(zhǎng)時(shí)間,防止 ddos
this.headersTimeout = 60 * 1000;
}
Server 中主要做了一些字段的初始化,并且監(jiān)聽了 connection 和 request 兩個(gè)事件,當(dāng)有連接到來時(shí)會(huì)觸發(fā) connection 事件,connection 事件的處理函數(shù)會(huì)調(diào)用 HTTP 解析器進(jìn)行數(shù)據(jù)的解析,當(dāng)解析出一個(gè) HTTP 請(qǐng)求時(shí)就會(huì)觸發(fā) request 事件通知用戶。
創(chuàng)建了 Server 對(duì)象后,接著我們調(diào)用它的 listen 函數(shù)。因?yàn)?HTTP Server 繼承于 net.Server,所以執(zhí)行 HTTP Server 的 listen 函數(shù)時(shí),其實(shí)是執(zhí)行了 net.Serve 的 listen 函數(shù),net.Server 的 listen 函數(shù)前面已經(jīng)分析過,就不再分析。當(dāng)有請(qǐng)求到來時(shí),會(huì)觸發(fā) connection 事件,從而執(zhí)行 connectionListener。
function connectionListener(socket) {
defaultTriggerAsyncIdScope(
getOrSetAsyncId(socket), connectionListenerInternal, this, socket
);
}
// socket 表示新連接
function connectionListenerInternal(server, socket) {
// socket 所屬 server
socket.server = server;
// 分配一個(gè) HTTP 解析器
const parser = parsers.alloc();
// 初始化解析器
parser.initialize(HTTPParser.REQUEST, ...);
// 關(guān)聯(lián)起來
parser.socket = socket;
socket.parser = parser;
const state = {
onData: null,
// 同一 TCP 連接上,請(qǐng)求和響應(yīng)的的隊(duì)列,線頭阻塞的原理
outgoing: [],
incoming: [],
};
// 監(jiān)聽 TCP 上的數(shù)據(jù),開始解析 HTTP 報(bào)文
state.onData = socketOnData.bind(undefined,
server,
socket,
parser,
state);
socket.on('data', state.onData);
// 解析 HTTP 頭部完成后執(zhí)行的回調(diào)
parser.onIncoming = parserOnIncoming.bind(undefined,
server,
socket,
state);
/*
如果 handle 是繼承 StreamBase 的流,則在 C++ 層解析 HTTP 請(qǐng)求報(bào)文,
否則使用上面的 socketOnData 函數(shù)處理 HTTP 請(qǐng)求報(bào)文,
TCP 模塊的 isStreamBase 為 true
*/
if (socket._handle && socket._handle.isStreamBase &&
!socket._handle._consumed) {
parser._consumed = true;
socket._handle._consumed = true;
parser.consume(socket._handle);
}
// 執(zhí)行 llhttp_execute 時(shí)的回調(diào)
parser[kOnExecute] = onParserExecute.bind(undefined,
server,
socket,
parser,
state);
}
上面的 connectionListenerInternal 函數(shù)中首先分配了一個(gè) HTTP 解析器,HTTP 解析器由以下代碼管理。
const parsers = new FreeList('parsers', 1000, function parsersCb() {
const parser = new HTTPParser();
cleanParser(parser);
parser.onIncoming = null;
// 各種鉤子毀掉
parser[kOnHeaders] = parserOnHeaders;
parser[kOnHeadersComplete] = parserOnHeadersComplete;
parser[kOnBody] = parserOnBody;
parser[kOnMessageComplete] = parserOnMessageComplete;
return parser;
});
parsers 用于管理 HTTP 解析器,它負(fù)責(zé)分配 HTTP 解析器,并且在 HTTP 解析器不再使用時(shí)緩存起來給下次使用,而不是每次都創(chuàng)建一個(gè)新的解析器。分配完 HTTP 解析器后就開始等待 TCP 上數(shù)據(jù)的到來,即 HTTP 請(qǐng)求報(bào)文。但是這里有一個(gè)邏輯需要注意,上面代碼中 Node.js 監(jiān)聽了 socket 的 data 事件,處理函數(shù)為 socketOnData,下面是 socketOnData 的邏輯。
function socketOnData(server, socket, parser, state, d) {
// 交給 HTTP 解析器處理,返回已經(jīng)解析的字節(jié)數(shù)
const ret = parser.execute(d);
}
socketOnData 調(diào)用 HTTP 解析器處理數(shù)據(jù),這看起來沒什么問題,但是有一個(gè)邏輯我們可能會(huì)忽略掉,看一下下面的代碼。
if (socket._handle && socket._handle.isStreamBase) {
parser.consume(socket._handle);
}
上面代碼中,如果 socket._handle.isStreamBase 為 true(TCP handle 的 isStreamBase 為 true),則會(huì)執(zhí)行 parser.consume(socket._handle),這個(gè)是做什么的呢?
static void Consume(const FunctionCallbackInfo<Value>& args) {
Parser* parser;
ASSIGN_OR_RETURN_UNWRAP(&parser, args.Holder());
// 解析出 C++ TCPWrap 對(duì)象
StreamBase* stream = StreamBase::FromObject(args[0].As<Object>());
// 注冊(cè) parser 成為流的消費(fèi)者,即 TCP 數(shù)據(jù)的消費(fèi)者
stream->PushStreamListener(parser);
}
Consume 會(huì)注冊(cè) parser 會(huì)成為流的消費(fèi)者,這個(gè)邏輯會(huì)覆蓋掉剛才的 onData 函數(shù),使得所有的數(shù)據(jù)直接由 parser 處理,看一下當(dāng)數(shù)據(jù)到來時(shí),parser 是如何處理的。
void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override {
// 解析 HTTP 協(xié)議
Local<Value> ret = Execute(buf.base, nread);
// 執(zhí)行 kOnExecute 回調(diào)
Local<Value> cb = object()->Get(env()->context(), kOnExecute).ToLocalChecked();
MakeCallback(cb.As<Function>(), 1, &ret);
}
在 OnStreamRead 中會(huì)源源不斷地把數(shù)據(jù)交給 HTTP 解析器處理并執(zhí)行 kOnExecute 回調(diào),并且在解析的過程中,會(huì)不斷觸發(fā)對(duì)應(yīng)的鉤子函數(shù)。比如解析到 HTTP 頭部時(shí)執(zhí)行 parserOnHeaders。
function parserOnHeaders(headers, url) {
// 記錄解析到的 HTTP 頭
if (this.maxHeaderPairs <= 0 ||
this._headers.length < this.maxHeaderPairs) {
this._headers = this._headers.concat(headers);
}
this._url += url;
}
parserOnHeaders 會(huì)記錄解析到的 HTTP 頭,當(dāng)解析完 HTTP 頭 時(shí)會(huì)調(diào)用 parserOnHeadersComplete。
function parserOnHeadersComplete(versionMajor, versionMinor, headers, method,
url, statusCode, statusMessage, upgrade,
shouldKeepAlive) {
const parser = this;
const { socket } = parser;
// 創(chuàng)建一個(gè)對(duì)象表示收到的 HTTP 請(qǐng)求
const ParserIncomingMessage = (socket && socket.server &&
socket.server[kIncomingMessage]) ||
IncomingMessage;
// 新建一個(gè)IncomingMessage對(duì)象
const incoming = parser.incoming = new ParserIncomingMessage(socket);
// 執(zhí)行回調(diào)
return parser.onIncoming(incoming, shouldKeepAlive);
}
parserOnHeadersComplete 中創(chuàng)建了一個(gè)對(duì)象來表示收到的 HTTP 請(qǐng)求,接著執(zhí)行 onIncoming 函數(shù),對(duì)應(yīng)的是 parserOnIncoming。
function parserOnIncoming(server, socket, state, req, keepAlive) {
// 請(qǐng)求入隊(duì)(待處理的請(qǐng)求隊(duì)列)
state.incoming.push(req);
// 新建一個(gè)表示響應(yīng)的對(duì)象
const res = new server[kServerResponse](req);
/*
socket 當(dāng)前已經(jīng)在處理其它請(qǐng)求的響應(yīng),則先排隊(duì),
否則掛載響應(yīng)對(duì)象到 socket,作為當(dāng)前處理的響應(yīng)
*/
if (socket._httpMessage) {
state.outgoing.push(res);
} else {
res.assignSocket(socket);
}
// 響應(yīng)處理完畢后,需要做一些處理
res.on('finish', resOnFinish.bind(undefined,
req,
res,
socket,
state,
server));
// 觸發(fā) request 事件說明有請(qǐng)求到來
server.emit('request', req, res);
}
我們看到這里會(huì)觸發(fā) request 事件通知用戶有新請(qǐng)求到來,并傳入request和response作為參數(shù),這樣用戶就可以處理請(qǐng)求了。另外 Node.js 本身是不會(huì)處理 HTTP 請(qǐng)求體的數(shù)據(jù),當(dāng) Node.js 解析到請(qǐng)求體時(shí)會(huì)執(zhí)行 kOnBody 鉤子函數(shù),對(duì)應(yīng)的是 parserOnBody 函數(shù)。
function parserOnBody(b, start, len) {
// IncomingMessage 對(duì)象,即 request 對(duì)象
const stream = this.incoming;
// Pretend this was the result of a stream._read call.
if (len > 0 && !stream._dumped) {
const slice = b.slice(start, start + len);
const ret = stream.push(slice);
if (!ret)
readStop(this.socket);
}
}
parserOnBody 會(huì)把數(shù)據(jù) push 到請(qǐng)求對(duì)象 request 中,接著 Node.js 會(huì)觸發(fā) data 事件,所以我們可以通過以下方式獲取 body 的數(shù)據(jù)。
const server= http.createServer((request, response) => {
request.on('data', (chunk) => {
// 處理body
});
request.on('end', () => {
// body結(jié)束
});
})
Node.js 的多進(jìn)程服務(wù)器架構(gòu)
雖然 Node.js 是單進(jìn)程單線程的應(yīng)用,但是我們可以創(chuàng)建多個(gè)進(jìn)程來共同請(qǐng)求。在創(chuàng)建 HTTP 服務(wù)器時(shí)會(huì)調(diào)用 net 模塊的 listen,然后調(diào)用 listenIncluster。我們從該函數(shù)開始分析。
function listenIncluster(server, address, port, addressType,
backlog, fd, exclusive, flags) {
const serverQuery = {
address: address,
port: port,
addressType: addressType,
fd: fd,
flags,
};
cluster._getServer(server, serverQuery, listenOnMasterHandle);
function listenOnMasterHandle(err, handle) {
server._handle = handle;
server._listen2(address,
port,
addressType,
backlog,
fd,
flags);
}
}
listenIncluster 函數(shù)會(huì)調(diào)用子進(jìn)程 cluster 模塊的 _getServer 函數(shù)。
cluster._getServer = function(obj, options, cb) {
let address = options.address;
const message = {
act: 'queryServer',
index,
data: null,
...options
};
message.address = address;
// 給主進(jìn)程發(fā)送消息
send(message, (reply, handle) => {
// 根據(jù)不同模式做處理
if (handle)
shared(reply, handle, indexesKey, cb);
else
rr(reply, indexesKey, cb);
});
};
從上面代碼中可以看到,_getServer 函數(shù)會(huì)給主進(jìn)程發(fā)送一個(gè) queryServer 的請(qǐng)求并設(shè)置了一個(gè)回調(diào)函數(shù)??匆幌轮鬟M(jìn)程是如何處理 queryServer 請(qǐng)求的。
function queryServer(worker, message) {
const key = `${message.address}:${message.port}:${message.addressType}:${message.fd}:${message.index}`;
let handle = handles.get(key);
if (handle === undefined) {
let address = message.address;
let constructor = RoundRobinHandle;
// 根據(jù)策略選取不同的構(gòu)造函數(shù),UDP 只能使用共享模式,因?yàn)?UDP 不是基于連接的,沒有連接可以分發(fā)
if (schedulingPolicy !== SCHED_RR ||
message.addressType === 'udp4' ||
message.addressType === 'udp6') {
constructor = SharedHandle;
}
handle = new constructor(key,
address,
message.port,
message.addressType,
message.fd,
message.flags);
handles.set(key, handle);
}
handle.add(worker, (errno, reply, handle) => {
const { data } = handles.get(key);
// 返回結(jié)果給子進(jìn)程
send(worker, {
errno,
key,
ack: message.seq,
data,
...reply
}, handle);
});
}
queryServer 首先根據(jù)調(diào)度策略選擇構(gòu)造函數(shù)并創(chuàng)建一個(gè)對(duì)象,然后執(zhí)行該對(duì)象的 add 方法并且傳入一個(gè)回調(diào)。下面看看不同策略下的處理。
共享模式
首先看看共享模式的實(shí)現(xiàn),共享模式對(duì)應(yīng)前面分析的主進(jìn)程管理子進(jìn)程,多個(gè)子進(jìn)程共同 accept 處理連接這種方式。
function SharedHandle(key, address, port, addressType, fd, flags) {
this.key = key;
this.workers = [];
this.handle = null;
this.errno = 0;
let rval;
if (addressType === 'udp4' || addressType === 'udp6')
rval = dgram._createSocketHandle(address,
port,
addressType,
fd,
flags);
else
rval = net._createServerHandle(address,
port,
addressType,
fd,
flags);
if (typeof rval === 'number')
this.errno = rval;
else
this.handle = rval;
}
SharedHandle 是共享模式,即主進(jìn)程創(chuàng)建好 handle,交給子進(jìn)程處理,接著看它的 add 函數(shù)。
SharedHandle.prototype.add = function(worker, send) {
this.workers.push(worker);
send(this.errno, null, this.handle);
};
SharedHandle 的 add 把 SharedHandle 中創(chuàng)建的 handle 返回給子進(jìn)程。接著看子進(jìn)程拿到 handle 后的處理。
function shared(message, handle, indexesKey, cb) {
const key = message.key;
const close = handle.close;
handle.close = function() {
send({ act: 'close', key });
handles.delete(key);
indexes.delete(indexesKey);
// 因?yàn)槭枪蚕淼?,可以直?close 掉而不會(huì)影響其它子進(jìn)程等
return close.apply(handle, arguments);
};
handles.set(key, handle);
// 執(zhí)行 net 模塊的回調(diào)
cb(message.errno, handle);
}
shared 函數(shù)把接收到的 handle 再回傳到調(diào)用方,即 net 模塊的 listenOnMasterHandle 函數(shù),listenOnMasterHandle 會(huì)執(zhí)行 listen 開始監(jiān)聽地址。
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
// this._handle 即主進(jìn)程返回的 handle
// 連接到來時(shí)的回調(diào)
this._handle.onconnection = onconnection;
this._handle[owner_symbol] = this;
const err = this._handle.listen(backlog || 511);
}
這樣多個(gè)子進(jìn)程就成功啟動(dòng)了服務(wù)器。共享模式的核心邏輯是主進(jìn)程在 _createServerHandle 創(chuàng)建 handle 時(shí)執(zhí)行 bind 綁定了地址(但沒有 listen),然后通過文件描述符傳遞的方式傳給子進(jìn)程,子進(jìn)程執(zhí)行 listen 的時(shí)候就不會(huì)報(bào)端口已經(jīng)被監(jiān)聽的錯(cuò)誤了,因?yàn)槎丝诒槐O(jiān)聽的錯(cuò)誤是執(zhí)行 bind 的時(shí)候返回的。邏輯如下圖所示。
看一個(gè)共享模式的使用例子。
const cluster = require('cluster');
const os = require('os');
// 設(shè)置為共享模式
cluster.schedulingPolicy = cluster.SCHED_NONE;
// 主進(jìn)程 fork 多個(gè)子進(jìn)程
if (cluster.isMaster) {
// 通常根據(jù) CPU 核數(shù)創(chuàng)建多個(gè)進(jìn)程 os.cpus().length
for (let i = 0; i < 3; i++) {
cluster.fork();
}
} else { // 子進(jìn)程創(chuàng)建服務(wù)器
const net = require('net');
const server = net.createServer((socket) => {
socket.destroy();
console.log(`handled by process: ${process.pid}`);
});
server.listen(8080);
}
輪詢模式
接著看輪詢模式,輪詢模式對(duì)應(yīng)前面的主進(jìn)程 accept,分發(fā)給多個(gè)子進(jìn)程處理這種方式。
function RoundRobinHandle(key, address, port, addressType, fd, flags) {
this.key = key;
this.all = new Map();
this.free = [];
this.handles = [];
this.handle = null;
this.server = net.createServer(assert.fail);
if (fd >= 0)
this.server.listen({ fd });
else if (port >= 0) {
// 啟動(dòng)一個(gè)服務(wù)器
this.server.listen({
port,
host: address,
ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
});
} else
this.server.listen(address); // UNIX socket path.
// 監(jiān)聽成功后,注冊(cè) onconnection 回調(diào),有連接到來時(shí)執(zhí)行
this.server.once('listening', () => {
this.handle = this.server._handle;
// 分發(fā)請(qǐng)求給子進(jìn)程
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
this.server._handle = null;
this.server = null;
});
}
因?yàn)?RoundRobinHandle的 工作模式是主進(jìn)程負(fù)責(zé)監(jiān)聽,收到連接后分發(fā)給子進(jìn)程,所以 RoundRobinHandle 中直接啟動(dòng)了一個(gè)服務(wù)器,當(dāng)收到連接時(shí)執(zhí)行 this.distribute 進(jìn)行分發(fā)。接著看一下RoundRobinHandle 的 add 函數(shù)。
RoundRobinHandle.prototype.add = function(worker, send) {
this.all.set(worker.id, worker);
const done = () => {
// send 的第三個(gè)參數(shù)是 null,說明沒有 handle
if (this.handle.getsockname) {
const out = {};
this.handle.getsockname(out);
send(null, { sockname: out }, null);
} else {
send(null, null, null); // UNIX socket.
}
this.handoff(worker);
};
// 否則等待 listen 成功后執(zhí)行回調(diào)
this.server.once('listening', done);
};
RoundRobinHandle 會(huì)在 listen 成功后執(zhí)行回調(diào)。我們回顧一下執(zhí)行 add 函數(shù)時(shí)的回調(diào)。
handle.add(worker, (errno, reply, handle) => {
const { data } = handles.get(key);
send(worker, {
errno,
key,
ack: message.seq,
data,
...reply
}, handle);
});
回調(diào)函數(shù)會(huì)把 handle 等信息返回給子進(jìn)程。但是在 RoundRobinHandle 和 SharedHandle 中返回的 handle 是不一樣的,分別是 null 和 net.createServer 實(shí)例,因?yàn)榍罢卟恍枰獑?dòng)一個(gè)服務(wù)器,它只需要接收來自父進(jìn)程傳遞的連接就行。
接著我們回到子進(jìn)程的上下文,看子進(jìn)程是如何處理的,剛才我們講過,不同的調(diào)度策略,返回的 handle 是不一樣的,我們看輪詢模式下的處理。
function rr(message, indexesKey, cb) {
let key = message.key;
// 不需要 listen,空操作
function listen(backlog) {
return 0;
}
function close() {
// 因?yàn)?handle 是共享的,所以無法直接關(guān)閉,需要告訴父進(jìn)程,引用數(shù)減一
if (key === undefined)
return;
send({ act: 'close', key });
handles.delete(key);
indexes.delete(indexesKey);
key = undefined;
}
// 構(gòu)造假的 handle 給調(diào)用方
const handle = { close, listen, ref: noop, unref: noop };
handles.set(key, handle);
// 執(zhí)行 net 模塊的回調(diào)
cb(0, handle);
}
round-robin 模式下,Node.js 會(huì)構(gòu)造一個(gè)假的 handle 返回給 net 模塊,因?yàn)檎{(diào)用方會(huì)調(diào)用 handle 的這些函數(shù)。當(dāng)有請(qǐng)求到來時(shí),round-bobin 模塊會(huì)執(zhí)行 distribute 分發(fā)連接給子進(jìn)程。
RoundRobinHandle.prototype.distribute = function(err, handle) {
// 首先保存 handle 到隊(duì)列
this.handles.push(handle);
// 從空閑隊(duì)列獲取一個(gè)子進(jìn)程
const worker = this.free.shift();
// 分發(fā)
if (worker)
this.handoff(worker);
};
RoundRobinHandle.prototype.handoff = function(worker) {
// 拿到一個(gè) handle
const handle = this.handles.shift();
// 沒有 handle,則子進(jìn)程重新入隊(duì)
if (handle === undefined) {
this.free.push(worker);
return;
}
// 通知子進(jìn)程有新連接
const message = { act: 'newconn', key: this.key };
sendHelper(worker.process, message, handle, (reply) => {
// 接收成功
if (reply.accepted)
handle.close();
else
// 結(jié)束失敗,則重新分發(fā)
this.distribute(0, handle);
// 繼續(xù)分發(fā)
this.handoff(worker);
});
};
可以看到 Node.js 沒用按照嚴(yán)格的輪詢,而是哪個(gè)進(jìn)程接收連接快,就繼續(xù)給它分發(fā)。接著看一下子進(jìn)程是怎么處理該請(qǐng)求的。
function onmessage(message, handle) {
if (message.act === 'newconn')
onconnection(message, handle);
}
function onconnection(message, handle) {
const key = message.key;
const server = handles.get(key);
const accepted = server !== undefined;
// 回復(fù)接收成功
send({ ack: message.seq, accepted });
if (accepted)
// 在 net 模塊設(shè)置
server.onconnection(0, handle);
}
最終執(zhí)行 server.onconnection 進(jìn)行連接的處理。邏輯如下圖所示。
看一下輪詢模式的使用例子。
const cluster = require('cluster');
const os = require('os');
// 設(shè)置為輪詢模式
cluster.schedulingPolicy = cluster.SCHED_RR;
// 主進(jìn)程 fork 多個(gè)子進(jìn)程
if (cluster.isMaster) {
// 通常根據(jù) CPU 核數(shù)創(chuàng)建多個(gè)進(jìn)程 os.cpus().length
for (let i = 0; i < 3; i++) {
cluster.fork();
}
} else { // 子進(jìn)程創(chuàng)建服務(wù)器
const net = require('net');
const server = net.createServer((socket) => {
socket.destroy();
console.log(`handled by process: ${process.pid}`);
});
server.listen(8080);
}
實(shí)現(xiàn)一個(gè)高性能的服務(wù)器是非常復(fù)雜的,涉及到很多復(fù)雜的知識(shí),但是即使不是服務(wù)器開發(fā)者,了解服務(wù)器相關(guān)的一些知識(shí)也是非常有用的。