Node.js Stream 背壓 — 消費(fèi)端數(shù)據(jù)積壓來(lái)不及處理會(huì)怎么樣?
Stream 在 Node.js 中是一個(gè)被廣泛應(yīng)用的模塊,流的兩端可讀流、可寫(xiě)流之間通過(guò)管道鏈接,通常寫(xiě)入磁盤(pán)速度是低于讀取磁盤(pán)速度的,這樣管道的兩端就會(huì)產(chǎn)生壓力差,就需要一種平衡的機(jī)制,使得平滑順暢的從一個(gè)端流向另一個(gè)端。
背壓是一個(gè)術(shù)語(yǔ),表示向流中寫(xiě)入數(shù)據(jù)的速度超過(guò)了它所能處理的最大能力限制。例如,基于 Stream 寫(xiě)一個(gè)文件時(shí),當(dāng)寫(xiě)入端處理不過(guò)來(lái)時(shí),會(huì)通知到讀取端,你可以先等等,我這里忙不過(guò)來(lái)了...,等到一定時(shí)機(jī)后再次讀取寫(xiě)入。
問(wèn)題來(lái)源
“數(shù)據(jù)是以流的形式從可讀流流向可寫(xiě)流的,不會(huì)全部讀入內(nèi)存,我想說(shuō)的是上游流速過(guò)快下游來(lái)不及消費(fèi)造成數(shù)據(jù)積壓 即“背壓” 問(wèn)題會(huì)怎樣” 這個(gè)問(wèn)題來(lái)自于「Nodejs技術(shù)棧-交流群」一位朋友的疑問(wèn),當(dāng)時(shí)沒(méi)有給出答案,沒(méi)有做過(guò)類(lèi)似的實(shí)際數(shù)據(jù)測(cè)試,出現(xiàn)這種情況一般都會(huì)導(dǎo)致數(shù)據(jù)流兩端不平衡,另一端數(shù)據(jù)不斷積壓,持續(xù)消耗系統(tǒng)內(nèi)存,其它服務(wù)也必然受到影響。
本文,通過(guò)修改編譯 Node.js 源碼,在禁用掉 “背壓” 之后,做了一些測(cè)試,可以明顯看到兩者之間的一個(gè)效果對(duì)比。
流數(shù)據(jù)讀取->寫(xiě)入示例
先構(gòu)造一個(gè)大文件,我在本地創(chuàng)建了一個(gè) 2.2GB 大小的文件,通過(guò)大文件能夠顯著看到處理積壓與不處理積壓之間的差別。
下面例子實(shí)現(xiàn)的功能是讀取文件、經(jīng)過(guò) gzip 壓縮處理之后寫(xiě)入到一個(gè)新的目標(biāo)文件,也可寫(xiě)成 readable.pipe(gzip).pipe(writable) 不過(guò)這樣沒(méi)有任何的錯(cuò)誤處理機(jī)制,可借助一些工具 https://github.com/mafintosh/pump 處理。
對(duì)于處理這樣的任務(wù),Stream 模塊還提供了一個(gè)實(shí)用的方法 pipeline,管道中可以處理不同的數(shù)據(jù)流,當(dāng)其中某個(gè)數(shù)據(jù)流發(fā)生錯(cuò)誤,它會(huì)自動(dòng)處理并釋放掉相應(yīng)的資源。
- // stream-back-pressure-test.js
- const gzip = require('zlib').createGzip();
- const fs = require('fs');
- const { pipeline } = require('stream/promises');
- const readable = fs.createReadStream('2.2GB-file.zip');
- const writable = fs.createWriteStream('2.2GB-file.zip.gz');
- (async () => {
- try {
- await pipeline(
- readable,
- gzip,
- writable
- );
- console.log('Pipeline succeeded.');
- } catch (err) {
- console.error('Pipeline failed.', err);
- }
- })();
write() 源碼修改與編譯
write(chunk) 方法介紹
可寫(xiě)流對(duì)象的 write(chunk) 方法接收一些數(shù)據(jù)寫(xiě)入流,當(dāng)內(nèi)部緩沖區(qū)小于創(chuàng)建可寫(xiě)流對(duì)象時(shí)配置的 highWaterMark 則返回 true,否則返回 false 表示內(nèi)部緩沖區(qū)已滿或溢出,此時(shí)就是背壓的一種表現(xiàn)。
向流寫(xiě)入數(shù)據(jù)的速度已超出了其能處理的能力,若此時(shí)還是不斷調(diào)用 write() 方法,可以想象內(nèi)部的緩沖區(qū)也會(huì)不斷增加,當(dāng)前進(jìn)程占用的系統(tǒng)內(nèi)存就會(huì)不斷增加。
當(dāng)使用 pipe() 或 pipeline 在內(nèi)部處理時(shí),還是調(diào)用的 stream.write(chunk) 方法。
- stream.write(chunk)
如果要測(cè)試數(shù)據(jù)積壓帶來(lái)的一些消耗問(wèn)題,我們需要修改 Node.js 源碼,將 stream.write(chunk) 方法的返回值改為 true 禁止積壓處理。
源碼修改
我直接拉取的 Master 代碼,剛開(kāi)始忘記切換 Node.js 版本...,各版本大同小異,大致差不多,主要是找到 Writable.prototype.write() 方法,該方法最終的返回值是一個(gè)布爾值,找到 return ret && !state.errored && !state.destroyed 直接改為 return true; 禁用掉背壓處理。
- // https://github.com/nodejs/node/blob/master/lib/internal/streams/writable.js#L334
- Writable.prototype.write = function(chunk, encoding, cb) {
- return _write(this, chunk, encoding, cb) === true;
- };
- // https://github.com/nodejs/node/blob/master/lib/internal/streams/writable.js#L396
- // If we're already writing something, then just put this
- // in the queue, and wait our turn. Otherwise, call _write
- // If we return false, then we need a drain event, so set that flag.
- function writeOrBuffer(stream, state, chunk, encoding, callback) {
- ...
- // stream._write resets state.length
- const ret = state.length < state.highWaterMark;
- ...
- // Return false if errored or destroyed in order to break
- // any synchronous while(stream.write(data)) loops.
- // return ret && !state.errored && !state.destroyed;
- return true;
- }
編譯
源碼編譯對(duì)電腦的環(huán)境有一些要求,參考 Node.js 給出的這份文檔 Building Node.js。
先執(zhí)行 ./configure 生成當(dāng)前環(huán)境編譯需要的默認(rèn)配置,然后執(zhí)行 make 命令編譯,第一次編譯時(shí)間有點(diǎn)略長(zhǎng),差不多夠吃個(gè)飯了...
- $ ./configure
- $ make -j4
之后每次修改后也還需要重新編譯,為了方便起見(jiàn),在當(dāng)前目錄下創(chuàng)建一個(gè) shell 腳本文件。
- 創(chuàng)建腳本文件 vim compile.sh 輸入以下內(nèi)容。
- 使腳本具有可執(zhí)行權(quán)限 chmod +x ./test.sh。
- 運(yùn)行腳本編譯 sh compile.sh。
- #!/bin/bash
- ./configure --debug
- make -j4
- echo "Compiled successfully"
編譯成功后,最后幾行日志輸出如下所示,當(dāng)前目錄下會(huì)生成一個(gè) node 的可執(zhí)行命令,或者 out/Release/node 也可執(zhí)行。
- if [ ! -r node ] || [ ! -L node ]; then \
- ln -fs out/Release/node node; fi
現(xiàn)在可以在當(dāng)前目錄下創(chuàng)建一個(gè)測(cè)試文件,用剛剛編譯好的 node 運(yùn)行。
- ./node ./test.js
內(nèi)存消耗測(cè)試
再推薦一個(gè) Linux 命令 /usr/bin/time,能夠測(cè)量命令的使用時(shí)間并給出系統(tǒng)資源的消耗情況??梢詤⒖歼@篇文章介紹 http://c.biancheng.net/linux/time.html。
沒(méi)有處理積壓的測(cè)試結(jié)果
運(yùn)行命令 sudo /usr/bin/time -lp ./node ./stream-back-pressure-test.js 測(cè)試沒(méi)有積壓處理的情況。
980713472 是執(zhí)行程序所占用內(nèi)存的最大值,大約消耗 0.9GB。
- real 188.25
- user 179.72
- sys 28.77
- 980713472 maximum resident set size
- 0 average shared memory size
- 0 average unshared data size
- 0 average unshared stack size
- 3348430 page reclaims
- 3864 page faults
- 0 swaps
- 0 block input operations
- 3 block output operations
- 0 messages sent
- 0 messages received
- 0 signals received
- 21341 voluntary context switches
- 2934500 involuntary context switches
如果是 Mac 電腦,同時(shí)打開(kāi)活動(dòng)監(jiān)視器也能看到程序處理過(guò)程中的一些內(nèi)存消耗信息,可以看到內(nèi)存的占用還是很高的,另外我的電腦上的其它服務(wù)也受到了影響,一些應(yīng)用變得異??D。
image.png
正常程序積壓處理的測(cè)試結(jié)果
59215872 是執(zhí)行程序所占用內(nèi)存的最大值,大約消耗 56 MB。
- real 184.67
- user 176.22
- sys 20.68
- 59215872 maximum resident set size
- 0 average shared memory size
- 0 average unshared data size
- 0 average unshared stack size
- 1486628 page reclaims
- 3971 page faults
- 0 swaps
- 0 block input operations
- 0 block output operations
- 0 messages sent
- 0 messages received
- 1 signals received
- 4843 voluntary context switches
- 2551476 involuntary context switches
通過(guò) Mac 活動(dòng)監(jiān)視器看到內(nèi)存的占用,是沒(méi)什么壓力的,電腦上其它服務(wù)此時(shí)也沒(méi)受到影響。
為什么背壓我沒(méi)聽(tīng)說(shuō)過(guò)?
經(jīng)過(guò)上面的測(cè)試,可以看到?jīng)]有正確處理積壓的結(jié)果和正常的經(jīng)過(guò)處理的存在極大的差別,但是你可能又有疑問(wèn):“為什么我沒(méi)有聽(tīng)說(shuō)過(guò)背壓?也沒(méi)遇到過(guò)類(lèi)似問(wèn)題?”。
這是因?yàn)?Node.js 的 Stream 模塊提供的一些方法 pipe()、pipeline() 已經(jīng)為我們做了這些處理,使用了這些 API 方法我們是不需要自己考慮去處理 “背壓” 這一問(wèn)題的**。因?yàn)橐坏┚彌_區(qū)中的數(shù)據(jù)超過(guò)了 highWaterMark 限制,可寫(xiě)流的 write() 方法就會(huì)返回 false,處理數(shù)據(jù)積壓的這一機(jī)制也會(huì)被觸發(fā)。
如果你直接使用的 write() 方法寫(xiě)入數(shù)據(jù),而沒(méi)有正確的處理背壓,就要小心了,如果有攻擊者多次發(fā)起請(qǐng)求,也會(huì)導(dǎo)致你的進(jìn)程不斷的消耗服務(wù)器系統(tǒng)內(nèi)存,從而會(huì)拖垮服務(wù)器上的其它應(yīng)用。
總結(jié)
可寫(xiě)流在消費(fèi)數(shù)據(jù)時(shí),內(nèi)部有一個(gè)緩沖區(qū),一旦緩沖區(qū)的數(shù)據(jù)滿了之后,也沒(méi)做任何 “背壓” 處理,會(huì)導(dǎo)致緩沖區(qū)數(shù)據(jù)溢出,后面來(lái)不及消費(fèi)的數(shù)據(jù)不得不駐留在內(nèi)存中,直到程序處理完畢,才會(huì)被清除。整個(gè)數(shù)據(jù)積壓的過(guò)程中當(dāng)前進(jìn)程會(huì)不斷的消耗系統(tǒng)內(nèi)存,對(duì)其它進(jìn)程任務(wù)也會(huì)產(chǎn)生很大的影響。
最后,留一個(gè)問(wèn)題:“如何用 Node.js 實(shí)現(xiàn)從可讀流到可寫(xiě)流的數(shù)據(jù)復(fù)制?類(lèi)似于 pipe()”,實(shí)現(xiàn)過(guò)程要考慮 “背壓” 處理,最好是基于 Promise 方便之后使用 Async/Await 來(lái)使用,做一點(diǎn)提示可以考慮結(jié)合異步迭代器實(shí)現(xiàn),歡迎在留言討論,下一節(jié)揭曉這個(gè)問(wèn)題。
本文轉(zhuǎn)載自微信公眾號(hào)「Nodejs技術(shù)?!?,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系Nodejs技術(shù)棧公眾號(hào)。