自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Node.js Stream 模塊 Pipe 方法使用與實(shí)現(xiàn)原理分析

開(kāi)發(fā) 項(xiàng)目管理
通過(guò)流我們可以將一大塊數(shù)據(jù)拆分為一小部分一點(diǎn)一點(diǎn)的流動(dòng)起來(lái),而無(wú)需一次性全部讀入,在 Linux 下我們可以通過(guò) | 符號(hào)實(shí)現(xiàn),類似的在 Nodejs 的 Stream 模塊中同樣也為我們提供了 pipe() 方法來(lái)實(shí)現(xiàn)。

[[384117]]

通過(guò)流我們可以將一大塊數(shù)據(jù)拆分為一小部分一點(diǎn)一點(diǎn)的流動(dòng)起來(lái),而無(wú)需一次性全部讀入,在 Linux 下我們可以通過(guò) | 符號(hào)實(shí)現(xiàn),類似的在 Nodejs 的 Stream 模塊中同樣也為我們提供了 pipe() 方法來(lái)實(shí)現(xiàn)。

1. Stream pipe 基本示例

選擇 Koa 來(lái)實(shí)現(xiàn)這個(gè)簡(jiǎn)單的 Demo,因?yàn)橹坝腥嗽?“Nodejs技術(shù)棧” 交流群?jiǎn)栠^(guò)一個(gè)問(wèn)題,怎么在 Koa 中返回一個(gè) Stream,順便在下文借此機(jī)會(huì)提下。

1.1 未使用 Stream pipe 情況

在 Nodejs 中 I/O 操作都是異步的,先用 util 模塊的 promisify 方法將 fs.readFile 的 callback 形式轉(zhuǎn)為 Promise 形式,這塊代碼看似沒(méi)問(wèn)題,但是它的體驗(yàn)不是很好,因?yàn)樗菍?shù)據(jù)一次性讀入內(nèi)存再進(jìn)行的返回,當(dāng)數(shù)據(jù)文件很大的時(shí)候也是對(duì)內(nèi)存的一種消耗,類似內(nèi)存泄漏這種問(wèn)題也很容易出現(xiàn),因此不推薦它。

  1. const Koa = require('koa'); 
  2. const fs = require('fs'); 
  3. const app = new Koa(); 
  4. const { promisify } = require('util'); 
  5. const { resolve } = require('path'); 
  6. const readFile = promisify(fs.readFile); 
  7.  
  8. app.use(async ctx => { 
  9.   try { 
  10.     ctx.body = await readFile(resolve(__dirname, 'test.json')); 
  11.   } catch(err) { ctx.body = err }; 
  12. }); 
  13.  
  14. app.listen(3000); 

1.2 使用 Stream pipe 情況

下面,再看看怎么通過(guò) Stream 的方式在 Koa 框架中響應(yīng)數(shù)據(jù)

  1. ... 
  2. app.use(async ctx => { 
  3.   try { 
  4.     const readable = fs.createReadStream(resolve(__dirname, 'test.json')); 
  5.     ctx.body = readable; 
  6.   } catch(err) { ctx.body = err }; 
  7. }); 

以上在 Koa 中直接創(chuàng)建一個(gè)可讀流賦值給 ctx.body 就可以了,你可能疑惑了為什么沒(méi)有 pipe 方法,因?yàn)榭蚣芙o你封裝好了,不要被表象所迷惑了,看下相關(guān)源碼:

  1. // https://github.com/koajs/koa/blob/master/lib/application.js#L256 
  2. function respond(ctx) { 
  3.   ... 
  4.   let body = ctx.body; 
  5.   if (body instanceof Stream) return body.pipe(res); 
  6.   ... 

沒(méi)有神奇之處,框架在返回的時(shí)候做了層判斷,因?yàn)?res 是一個(gè)可寫流對(duì)象,如果 body 也是一個(gè) Stream 對(duì)象(此時(shí)的 Body 是一個(gè)可讀流),則使用 body.pipe(res) 以流的方式進(jìn)行響應(yīng)。

1.3 使用 Stream VS 不使用 Stream

看到一個(gè)圖片,不得不說(shuō)畫的實(shí)在太萌了,來(lái)源 https://www.cnblogs.com/vajoy/p/6349817.html

2. pipe 的調(diào)用過(guò)程與實(shí)現(xiàn)原理

以上最后以流的方式響應(yīng)數(shù)據(jù)最核心的實(shí)現(xiàn)就是使用 pipe 方法來(lái)實(shí)現(xiàn)的輸入、輸出,本節(jié)的重點(diǎn)也是研究 pipe 的實(shí)現(xiàn),最好的打開(kāi)方式通過(guò)閱讀源碼一起來(lái)看看吧。

2.1 順藤摸瓜

在應(yīng)用層我們調(diào)用了 fs.createReadStream() 這個(gè)方法,順藤摸瓜找到這個(gè)方法創(chuàng)建的可讀流對(duì)象的 pipe 方法實(shí)現(xiàn),以下僅列舉核心代碼實(shí)現(xiàn),基于 Nodejs v12.x 源碼。

2.1.1 /lib/fs.js

導(dǎo)出一個(gè) createReadStream 方法,在這個(gè)方法里面創(chuàng)建了一個(gè) ReadStream 可讀流對(duì)象,且 ReadStream 來(lái)自 internal/fs/streams 文件,繼續(xù)向下找。

  1. // https://github.com/nodejs/node/blob/v12.x/lib/fs.js 
  2. // 懶加載,主要在用到的時(shí)候用來(lái)實(shí)例化 ReadStream、WriteStream ... 等對(duì)象 
  3. function lazyLoadStreams() { 
  4.   if (!ReadStream) { 
  5.     ({ ReadStream, WriteStream } = require('internal/fs/streams')); 
  6.     [ FileReadStream, FileWriteStream ] = [ ReadStream, WriteStream ]; 
  7.   } 
  8.  
  9. function createReadStream(path, options) { 
  10.   lazyLoadStreams(); 
  11.   return new ReadStream(path, options); // 創(chuàng)建一個(gè)可讀流 
  12.  
  13. module.exports = fs = { 
  14.   createReadStream, // 導(dǎo)出 createReadStream 方法 
  15.   ... 

2.1.2 /lib/internal/fs/streams.js

這個(gè)方法里定義了構(gòu)造函數(shù) ReadStream,且在原型上定義了 open、_read、_destroy 等方法,并沒(méi)有我們要找的 pipe 方法。

但是呢通過(guò) ObjectSetPrototypeOf 方法實(shí)現(xiàn)了繼承,ReadStream 繼承了 Readable 在原型中定義的函數(shù),接下來(lái)繼續(xù)查找 Readable 的實(shí)現(xiàn)。

  1. // https://github.com/nodejs/node/blob/v12.x/lib/internal/fs/streams.js 
  2. const { Readable, Writable } = require('stream'); 
  3.  
  4. function ReadStream(path, options) { 
  5.   if (!(this instanceof ReadStream)) 
  6.     return new ReadStream(path, options); 
  7.  
  8.   ... 
  9.   Readable.call(this, options); 
  10.   ... 
  11. ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype); 
  12. ObjectSetPrototypeOf(ReadStream, Readable); 
  13.  
  14. ReadStream.prototype.open = function() { ... }; 
  15.  
  16. ReadStream.prototype._read = function(n) { ... };; 
  17.  
  18. ReadStream.prototype._destroy = function(err, cb) { ... }; 
  19. ... 
  20.  
  21. module.exports = { 
  22.   ReadStream, 
  23.   WriteStream 
  24. }; 

2.1.3 /lib/stream.js

在 stream.js 的實(shí)現(xiàn)中,有條注釋:在 Readable/Writable/Duplex/... 之前導(dǎo)入 Stream,原因是為了避免 cross-reference(require),為什么會(huì)這樣?

第一步 stream.js 這里將 require('internal/streams/legacy') 導(dǎo)出復(fù)制給了 Stream。

在之后的 _stream_readable、Writable、Duplex ... 模塊也會(huì)反過(guò)來(lái)引用 stream.js 文件,具體實(shí)現(xiàn)下面會(huì)看到。

Stream 導(dǎo)入了 internal/streams/legacy

上面 /lib/internal/fs/streams.js 文件從 stream 模塊獲取了一個(gè) Readable 對(duì)象,就是下面的 Stream.Readable 的定義。

  1. // https://github.com/nodejs/node/blob/v12.x/lib/stream.js 
  2. // Note: export Stream before Readable/Writable/Duplex/... 
  3. // to avoid a cross-reference(require) issues 
  4. const Stream = module.exports = require('internal/streams/legacy'); 
  5.  
  6. Stream.Readable = require('_stream_readable'); 
  7. Stream.Writable = require('_stream_writable'); 
  8. Stream.Duplex = require('_stream_duplex'); 
  9. Stream.Transform = require('_stream_transform'); 
  10. Stream.PassThrough = require('_stream_passthrough'); 
  11. ... 

2.1.4 /lib/internal/streams/legacy.js

上面的 Stream 等于 internal/streams/legacy,首先繼承了 Events 模塊,之后呢在原型上定義了 pipe 方法,剛開(kāi)始看到這里的時(shí)候以為實(shí)現(xiàn)是在這里了,但后來(lái)看 _stream_readable 的實(shí)現(xiàn)之后,發(fā)現(xiàn) _stream_readable 繼承了 Stream 之后自己又重新實(shí)現(xiàn)了 pipe 方法,那么疑問(wèn)來(lái)了這個(gè)模塊的 pipe 方法是干嘛的?什么時(shí)候會(huì)被用?翻譯文件名 “legacy=遺留”?有點(diǎn)沒(méi)太理解,難道是遺留了?有清楚的大佬可以指點(diǎn)下,也歡迎在公眾號(hào) “Nodejs技術(shù)棧” 后臺(tái)加我微信一塊討論下!

  1. // https://github.com/nodejs/node/blob/v12.x/lib/internal/streams/legacy.js 
  2. const { 
  3.   ObjectSetPrototypeOf, 
  4. } = primordials; 
  5. const EE = require('events'); 
  6. function Stream(opts) { 
  7.   EE.call(this, opts); 
  8. ObjectSetPrototypeOf(Stream.prototype, EE.prototype); 
  9. ObjectSetPrototypeOf(Stream, EE); 
  10.  
  11. Stream.prototype.pipe = function(dest, options) { 
  12.   ... 
  13. }; 
  14.  
  15. module.exports = Stream; 

2.1.5 /lib/_stream_readable.js

在 _stream_readable.js 的實(shí)現(xiàn)里面定義了 Readable 構(gòu)造函數(shù),且繼承于 Stream,這個(gè) Stream 正是我們上面提到的 /lib/stream.js 文件,而在 /lib/stream.js 文件里加載了 internal/streams/legacy 文件且重寫了里面定義的 pipe 方法。

經(jīng)過(guò)上面一系列的分析,終于找到可讀流的 pipe 在哪里,同時(shí)也更進(jìn)一步的認(rèn)識(shí)到了在創(chuàng)建一個(gè)可讀流時(shí)的執(zhí)行調(diào)用過(guò)程,下面將重點(diǎn)來(lái)看這個(gè)方法的實(shí)現(xiàn)。

 

  1. module.exports = Readable; 
  2. Readable.ReadableState = ReadableState; 
  3.  
  4. const EE = require('events'); 
  5. const Stream = require('stream'); 
  6.  
  7. ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); 
  8. ObjectSetPrototypeOf(Readable, Stream); 
  9.  
  10. function Readable(options) { 
  11.   if (!(this instanceof Readable)) 
  12.     return new Readable(options); 
  13.  
  14.   ... 
  15.   Stream.call(this, options); // 繼承自 Stream 構(gòu)造函數(shù)的定義 
  16. ... 

 

2.2 _stream_readable 實(shí)現(xiàn)分析

2.2.1 聲明構(gòu)造函數(shù) Readable

聲明構(gòu)造函數(shù) Readable 繼承 Stream 的構(gòu)造函數(shù)和原型。

Stream 是 /lib/stream.js 文件,上面分析了,這個(gè)文件繼承了 events 事件,此時(shí)也就擁有了 events 在原型中定義的屬性,例如 on、emit 等方法。

  1. const Stream = require('stream'); 
  2. ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); 
  3. ObjectSetPrototypeOf(Readable, Stream); 
  4.  
  5. function Readable(options) { 
  6.   if (!(this instanceof Readable)) 
  7.     return new Readable(options); 
  8.  
  9.   ... 
  10.  
  11.   Stream.call(this, options); 

2.2.2 聲明 pipe 方法,訂閱 data 事件

在 Stream 的原型上聲明 pipe 方法,訂閱 data 事件,src 為可讀流對(duì)象,dest 為可寫流對(duì)象。

我們?cè)谑褂?pipe 方法的時(shí)候也是監(jiān)聽(tīng)的 data 事件,一邊讀取數(shù)據(jù)一邊寫入數(shù)據(jù)。

看下 ondata() 方法里的幾個(gè)核心實(shí)現(xiàn):

  • dest.write(chunk):接收 chunk 寫入數(shù)據(jù),如果內(nèi)部的緩沖小于創(chuàng)建流時(shí)配置的 highWaterMark,則返回 true,否則返回 false 時(shí)應(yīng)該停止向流寫入數(shù)據(jù),直到 'drain' 事件被觸發(fā)。
  • src.pause():可讀流會(huì)停止 data 事件,意味著此時(shí)暫停數(shù)據(jù)寫入了。

之所以調(diào)用 src.pause() 是為了防止讀入數(shù)據(jù)過(guò)快來(lái)不及寫入,什么時(shí)候知道來(lái)不及寫入呢,要看 dest.write(chunk) 什么時(shí)候返回 false,是根據(jù)創(chuàng)建流時(shí)傳的 highWaterMark 屬性,默認(rèn)為 16384 (16KB),對(duì)象模式的流默認(rèn)為 16。

注意:是 16KB 不是 16Kb,也是之前犯的一個(gè)錯(cuò)誤,大寫的 B 和小寫的 b 在這里是有區(qū)別的。計(jì)算機(jī)中所有數(shù)據(jù)都以 0 和 1 表示,其中 0 或 1 稱作一個(gè)位(bit),用小寫的 b 表示。大寫的 B 表示字節(jié)(byte),1byte = 8bit,大寫 K 表示千,所以是千個(gè)位(Kb)和千個(gè)字節(jié)(KB),一般都是使用 KB 表示一個(gè)文件的大小。

  1. Readable.prototype.pipe = function(dest, options) { 
  2.   const src = this; 
  3.   src.on('data', ondata); 
  4.   function ondata(chunk) { 
  5.     const ret = dest.write(chunk); 
  6.     if (ret === false) { 
  7.       ... 
  8.       src.pause(); 
  9.     } 
  10.   } 
  11.   ... 
  12. }; 

2.2.3 訂閱 drain 事件,繼續(xù)流動(dòng)數(shù)據(jù)

上面提到在 data 事件里,如果調(diào)用 dest.write(chunk) 返回 false,就會(huì)調(diào)用 src.pause() 停止數(shù)據(jù)流動(dòng),什么時(shí)候再次開(kāi)啟呢?

如果說(shuō)可以繼續(xù)寫入事件到流時(shí)會(huì)觸發(fā) drain 事件,也是在 dest.write(chunk) 等于 false 時(shí),如果 ondrain 不存在則注冊(cè) drain 事件。

  1. Readable.prototype.pipe = function(dest, options) { 
  2.   const src = this; 
  3.   src.on('data', ondata); 
  4.   function ondata(chunk) { 
  5.     const ret = dest.write(chunk); 
  6.     if (ret === false) { 
  7.       ... 
  8.       if (!ondrain) { 
  9.         // When the dest drains, it reduces the awaitDrain counter 
  10.         // on the source.  This would be more elegant with a .once() 
  11.         // handler in flow(), but adding and removing repeatedly is 
  12.         // too slow. 
  13.         ondrain = pipeOnDrain(src); 
  14.         dest.on('drain', ondrain); 
  15.       } 
  16.       src.pause(); 
  17.     } 
  18.   } 
  19.   ... 
  20. }; 
  21.  
  22. // 當(dāng)可寫入流 dest 耗盡時(shí),它將會(huì)在可讀流對(duì)象 source 上減少 awaitDrain 計(jì)數(shù)器 
  23. // 為了確保所有需要緩沖的寫入都完成,即 state.awaitDrain === 0 和 src 可讀流上的 data 事件存在,切換流到流動(dòng)模式 
  24. function pipeOnDrain(src) { 
  25.   return function pipeOnDrainFunctionResult() { 
  26.     const state = src._readableState; 
  27.     debug('pipeOnDrain', state.awaitDrain); 
  28.     if (state.awaitDrain) 
  29.       state.awaitDrain--; 
  30.     if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) { 
  31.       state.flowing = true
  32.       flow(src); 
  33.     } 
  34.   }; 
  35.  
  36. // stream.read() 從內(nèi)部緩沖拉取并返回?cái)?shù)據(jù)。如果沒(méi)有可讀的數(shù)據(jù),則返回 null。在可讀流上 src 還有一個(gè) readable 屬性,如果可以安全地調(diào)用 readable.read(),則為 true 
  37. function flow(stream) { 
  38.   const state = stream._readableState; 
  39.   debug('flow', state.flowing); 
  40.   while (state.flowing && stream.read() !== null); 

2.2.4 觸發(fā) data 事件

調(diào)用 readable 的 resume() 方法,觸發(fā)可讀流的 'data' 事件,進(jìn)入流動(dòng)模式。

  1. Readable.prototype.pipe = function(dest, options) { 
  2.   const src = this; 
  3.   // Start the flow if it hasn't been started already. 
  4.   if (!state.flowing) { 
  5.     debug('pipe resume'); 
  6.     src.resume(); 
  7.   } 
  8.   ... 

然后實(shí)例上的 resume(Readable 原型上定義的)會(huì)在調(diào)用 resume() 方法,在該方法內(nèi)部又調(diào)用了 resume_(),最終執(zhí)行了 stream.read(0) 讀取了一次空數(shù)據(jù)(size 設(shè)置的為 0),將會(huì)觸發(fā)實(shí)例上的 _read() 方法,之后會(huì)在觸發(fā) data 事件。

  1. function resume(stream, state) { 
  2.   ... 
  3.   process.nextTick(resume_, stream, state); 
  4.  
  5. function resume_(stream, state) { 
  6.   debug('resume', state.reading); 
  7.   if (!state.reading) { 
  8.     stream.read(0); 
  9.   } 
  10.  
  11.   ... 

2.2.5 訂閱 end 事件

end 事件:當(dāng)可讀流中沒(méi)有數(shù)據(jù)可供消費(fèi)時(shí)觸發(fā),調(diào)用 onend 函數(shù),執(zhí)行 dest.end() 方法,表明已沒(méi)有數(shù)據(jù)要被寫入可寫流,進(jìn)行關(guān)閉(關(guān)閉可寫流的 fd),之后再調(diào)用 stream.write() 會(huì)導(dǎo)致錯(cuò)誤。

  1. Readable.prototype.pipe = function(dest, options) { 
  2.   ... 
  3.   const doEnd = (!pipeOpts || pipeOpts.end !== false) && 
  4.               dest !== process.stdout && 
  5.               dest !== process.stderr; 
  6.  
  7.   const endFn = doEnd ? onend : unpipe; 
  8.   if (state.endEmitted) 
  9.     process.nextTick(endFn); 
  10.   else 
  11.     src.once('end', endFn); 
  12.  
  13.   dest.on('unpipe', onunpipe); 
  14.   ... 
  15.  
  16.   function onend() { 
  17.     debug('onend'); 
  18.     dest.end(); 
  19.   } 

2.2.6 觸發(fā) pipe 事件

在 pipe 方法里面最后還會(huì)觸發(fā)一個(gè) pipe 事件,傳入可讀流對(duì)象

  1. Readable.prototype.pipe = function(dest, options) { 
  2.   ... 
  3.   const source = this; 
  4.   dest.emit('pipe', src); 
  5.   ... 
  6. }; 

在應(yīng)用層使用的時(shí)候可以在可寫流上訂閱 pipe 事件,做一些判斷,具體可參考官網(wǎng)給的這個(gè)示例 stream_event_pipe

2.2.7 支持鏈?zhǔn)秸{(diào)用

最后返回 dest,支持類似 unix 的用法:A.pipe(B).pipe(C)

  1. Stream.prototype.pipe = function(dest, options) { 
  2.   return dest; 
  3. }; 

3. 總結(jié)

本文總體分為兩部分:

  • 第一部分相對(duì)較基礎(chǔ),講解了 Nodejs Stream 的 pipe 方法在 Koa2 中是怎么去應(yīng)用的。
  • 第二部分仍找它的實(shí)現(xiàn),以及對(duì)源碼的一個(gè)簡(jiǎn)單分析,其實(shí) pipe 方法核心還是要去監(jiān)聽(tīng) data 事件,向可寫流寫入數(shù)據(jù),如果內(nèi)部緩沖大于創(chuàng)建流時(shí)配置的 highWaterMark,則要停止數(shù)據(jù)流動(dòng),直到 drain 事件觸發(fā)或者結(jié)束,當(dāng)然還要監(jiān)聽(tīng) end、error 等事件做一些處理。

4. Reference

  • nodejs.cn/api/stream.html
  • cnodejs.org/topic/56ba030271204e03637a3870
  • github.com/nodejs/node/blob/master/lib/_stream_readable.js

本文轉(zhuǎn)載自微信公眾號(hào)「Nodejs技術(shù)?!?,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系Nodejs技術(shù)棧公眾號(hào)。

 

責(zé)任編輯:武曉燕 來(lái)源: Nodejs技術(shù)棧
相關(guān)推薦

2020-04-15 15:48:03

Node.jsstream前端

2021-07-09 00:24:10

No.jsNode.js原理

2021-07-16 04:56:03

NodejsAddon

2019-12-17 11:40:44

Node.js模塊前端

2021-09-26 05:06:04

Node.js模塊機(jī)制

2021-11-06 18:40:27

js底層模塊

2017-03-19 16:40:28

漏洞Node.js內(nèi)存泄漏

2017-03-20 13:43:51

Node.js內(nèi)存泄漏

2014-04-10 09:43:00

Node.jsTwilio

2021-01-26 08:07:44

Node.js模塊 Async

2023-12-07 08:07:47

Node流程代碼

2019-12-10 10:23:57

Node.jsCluster前端

2022-08-28 16:30:34

Node.jsDocker指令

2022-10-28 15:51:24

JavaScript開(kāi)發(fā)Node.js

2020-08-07 10:40:56

Node.jsexpress前端

2011-09-08 14:07:28

Node.js

2023-06-30 23:25:46

HTTP模塊內(nèi)存

2013-11-01 09:34:56

Node.js技術(shù)

2023-01-10 14:11:26

2015-03-10 10:59:18

Node.js開(kāi)發(fā)指南基礎(chǔ)介紹
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)