注釋掉 on('data') 請(qǐng)求為什么一直掛著?— 了解 Node.js Stream 的兩種模式
這是來自「Nodejs技術(shù)?!菇涣魅阂晃蛔x者朋友提的一個(gè)問題,“如果注釋掉 req.on('data') 事件監(jiān)聽,end 事件就收不到了,進(jìn)而永遠(yuǎn)也不會(huì)執(zhí)行 res.end(),請(qǐng)求會(huì)被一直掛著,為什么?”。
如果你讀到這里,也可以先思考下這個(gè)問題!
- const http = require('http');
- http.createServer((req, res) => {
- let data = '';
- // req.on('data', chunk => {
- // data += chunk.toString();
- // });
- req.on('end', () => {
- res.end(data);
- });
- }).listen(3000);
Node.js 的可讀流對(duì)象提供了兩種模式:流動(dòng)模式(flowing)、暫停模式(paused),如果你使用管道 pipe() 或異步迭代可能不會(huì)關(guān)注到這個(gè)問題,在它們的內(nèi)部實(shí)現(xiàn)中已經(jīng)處理好了,如果你是基于事件的 API 來處理流,可能會(huì)有這些疑問。
流動(dòng)模式(flowing)
流動(dòng)模式下數(shù)據(jù)自動(dòng)從底層系統(tǒng)獲取,并通過 EventEmitter 提供的事件接口,盡可能快的提供給應(yīng)用程序。需要注意的是所有的可讀流一開始都處于暫停模式,要切換為流動(dòng)模式,可通過以下幾種方式實(shí)現(xiàn):
一:注冊(cè) 'data' 事件
為可讀流對(duì)象注冊(cè)一個(gè) 'data' 事件,傳入事件處理函數(shù),會(huì)把流切換為流動(dòng)模式,在數(shù)據(jù)可用時(shí)會(huì)立即把數(shù)據(jù)塊傳送給注冊(cè)的事件處理函數(shù)。
這也是上面的疑問,為什么注釋掉 'data' 事件,請(qǐng)求就會(huì)一直被掛起。
- req.on('data', chunk => {
- data += chunk.toString();
- });
二:stream.pipe() 方法
調(diào)用 pipe() 方法將數(shù)據(jù)發(fā)送到可寫流。
- readable.pipe(writeable)
可讀流的 pipe() 方法實(shí)現(xiàn)中也是注冊(cè)了 'data' 事件,一邊讀取數(shù)據(jù)一邊寫入數(shù)據(jù)至可寫流??梢詤⒁姽P者之前的這篇文章 Node.js Stream 模塊 pipe 方法使用與實(shí)現(xiàn)原理分析。
- Readable.prototype.pipe = function(dest, options) {
- const src = this;
- src.on('data', ondata);
- function ondata(chunk) {
- const ret = dest.write(chunk);
- if (ret === false) {
- ...
- src.pause();
- }
- }
- ...
- };
三:stream.resume() 方法
stream.resume() 將處于暫停模式的可讀流,恢復(fù)觸發(fā) 'data' 事件,切換為流動(dòng)模式。
對(duì)一開始的示例做一個(gè)改造,先調(diào)用 stream.resume() 用來耗盡流中的數(shù)據(jù),但此時(shí)沒有做任何的數(shù)據(jù)處理,之后會(huì)收到 end 事件。
- const http = require('http');
- http.createServer((req, res) => {
- req.resume();
- req.on('end', () => {
- res.end('Ok!');
- });
- }).listen(3000);
四:異步迭代
無需注冊(cè)事件監(jiān)聽函數(shù),使用 for...await of 遍歷可讀流,寫法上也很簡(jiǎn)單。下例,因?yàn)橛玫?*頂級(jí) await 特性,**需要在 ES Modules 規(guī)范中使用。
- // app.mjs
- import { createServer as server } from 'http';
- import { on } from 'events';
- const ee = on(server().listen(3000), 'request');
- for await (const [{ url }, res] of ee) {
- res.end('OK!');
- }
暫停模式
暫停模式也是流一開始時(shí)所處的模式,該模式下會(huì)觸發(fā) 'readable' 事件,表示流中有可讀取的數(shù)據(jù),我們需要不斷調(diào)用 read() 方法拉取數(shù)據(jù),直到返回 null,表示緩沖區(qū)中的數(shù)據(jù)已被耗盡,在 read() 返回 null 后,會(huì)再次觸發(fā) 'readable' 事件,表示仍有可讀取的數(shù)據(jù),如果此時(shí)停止 read() 方法調(diào)用,同樣的請(qǐng)求也會(huì)被掛起。
stream.read(size) 方法從流緩沖區(qū)拉取數(shù)據(jù),每次返回指定 size 大小的數(shù)據(jù),如果不指定 size 則返回內(nèi)部所有緩沖的數(shù)據(jù)。
- const http = require('http');
- http.createServer((req, res) => {
- let data = '';
- let chunk;
- req.on('readable', () => {
- while (null !== (chunk = req.read())) {
- data += chunk.toString();
- }
- })
- req.on('end', () => {
- res.end(data);
- });
- }).listen(3000);
背壓?jiǎn)栴}思考??
以流的形式從可讀流拉取數(shù)據(jù)到可寫流,通常**從磁盤讀取數(shù)據(jù)的速度比磁盤寫入的速度是快的,如果可寫流來不及消費(fèi)數(shù)據(jù)造成數(shù)據(jù)積壓(專業(yè)術(shù)語會(huì)稱呼這個(gè)問題為 “背壓”)會(huì)怎么樣?**也是來自「Nodejs技術(shù)?!菇涣魅鹤x者朋友的疑問,可以思考下,答案可以寫在評(píng)論區(qū),感興趣的關(guān)注下「Nodejs技術(shù)棧」下一次講解。
總結(jié)
流剛開始處于暫停模式,所以注釋掉 req.on('data') 事件監(jiān)聽,請(qǐng)求才會(huì)一直掛起。在基于流的方式讀取文件時(shí),之前通常使用注冊(cè) 'data' 事件處理函數(shù)的方式從可讀流中拉取數(shù)據(jù),現(xiàn)在 Node.js 支持了異步迭代,更推薦你使用 for...await of 這種方式來讀取數(shù)據(jù),代碼看起來也會(huì)更簡(jiǎn)潔,同步編碼思維讓人也能更好的理解。
本文轉(zhuǎn)載自微信公眾號(hào)「Nodejs技術(shù)?!?,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系Nodejs技術(shù)棧公眾號(hào)。