結合異步迭代器實現(xiàn) Node.js 流式數(shù)據(jù)復制
實現(xiàn)可讀流到可寫流數(shù)據(jù)復制,就是不斷的讀取->寫入這個過程,那么你首先想到的是不是下面這樣呢?代碼看似很簡單,結果卻是很糟糕的,沒有任何的數(shù)據(jù)積壓處理。如果讀取的文件很大了,造成的后果就是緩沖區(qū)數(shù)據(jù)溢出,程序會占用過多的系統(tǒng)內(nèi)存,拖垮服務器上的其它應用,如果不明白的回顧下這篇文章 Node.js Stream 背壓 — 消費端數(shù)據(jù)積壓來不及處理會怎么樣?。
- // 糟糕的示例,沒有數(shù)據(jù)積壓處理
- readable.on('data', data => {
- writable.write(data)
- });
類似以上的需求,推薦你用 pipe() 方法以流的形式完成數(shù)據(jù)的復制。
作為學習,結合異步迭代器以一種簡單的方式實現(xiàn)一個類似于 pipe 一樣的方法完成數(shù)據(jù)源到目標源的數(shù)據(jù)復制。
數(shù)據(jù)寫入方法實現(xiàn)
_write 方法目的是控制可寫流的數(shù)據(jù)寫入,它返回一個 Promise 對象,如果可寫流的 dest.write() 方法返回 true,表示內(nèi)部緩沖區(qū)未滿,繼續(xù)寫入。
當 dest.write() 方法返回 false 表示向流中寫入數(shù)據(jù)超過了它所能處理的最大能力限制,此時暫停向流中寫入數(shù)據(jù),直到 drain 事件觸發(fā),表示緩沖區(qū)中的數(shù)據(jù)已排空了可以繼續(xù)寫入,再將 Promise 對象變?yōu)榻鉀Q。
- function _write(dest, chunk) {
- return new Promise(resolve => {
- if (dest.write(chunk)) {
- return resolve(null);
- }
- dest.once('drain', resolve);
- })
- }
結合異步迭代器實現(xiàn)
異步迭代器使從可讀流對象讀取數(shù)據(jù)變得更簡單,異步的讀取數(shù)據(jù)并調(diào)用我們封裝的 _write(chunk) 方法寫入數(shù)據(jù),如果緩沖區(qū)空間已滿,這里 await _write(dest, chunk) 也會等待,當緩沖區(qū)有空間可以繼續(xù)寫入了,再次進行讀取 -> 寫入。
- function myCopy(src, dest) {
- return new Promise(async (resolve, reject) => {
- dest.on('error', reject);
- try {
- for await (const chunk of src) {
- await _write(dest, chunk);
- }
- resolve();
- } catch (err) {
- reject(err);
- }
- });
- }
使用如下所示:
- const readable = fs.createReadStream('text.txt');
- const writable = fs.createWriteStream('dest-text.txt');
- await myCopy(readable, writable);
本文轉(zhuǎn)載自微信公眾號「Nodejs技術棧」,可以通過以下二維碼關注。轉(zhuǎn)載本文請聯(lián)系Nodejs技術棧公眾號。