[譯]理解Node.js事件驅(qū)動機制
學習 Node.js 一定要理解的內(nèi)容之一,文中主要涉及到了 EventEmitter 的使用和一些異步情況的處理,比較偏基礎,值得一讀。
大多數(shù) Node.js 對象都依賴了 EventEmitter 模塊來監(jiān)聽和響應事件,比如我們常用的 HTTP requests, responses, 以及 streams。
- const EventEmitter = require('events');
事件驅(qū)動機制的最簡單形式,是在 Node.js 中十分流行的回調(diào)函數(shù),例如 fs.readFile。 在回調(diào)函數(shù)這種形式中,事件每被觸發(fā)一次,回調(diào)就會被觸發(fā)一次。
我們先來探索下這個最基本的方式。
你準備好了就叫我哈,Node!
很久很久以前,在 js 里還沒有原生支持 Promise,async/await 還只是一個遙遠的夢想,回調(diào)函數(shù)是處理異步問題的最原始的方式。
回調(diào)從本質(zhì)上講是傳遞給其他函數(shù)的函數(shù),在 JavaScript 中函數(shù)是***類對象,這也讓回調(diào)的存在成為可能。
一定要搞清楚的是,回調(diào)在代碼中的并不表示異步調(diào)用。 回調(diào)既可以是同步調(diào)用的,也可以是異步調(diào)用的。
舉個例子,這里有一個宿主函數(shù) fileSize,它接受一個回調(diào)函數(shù) cb,并且可以通過條件判斷來同步或者異步地調(diào)用該回調(diào)函數(shù):
- function fileSize (fileName, cb) {
- if (typeof fileName !== 'string') {
- // Sync
- return cb(new TypeError('argument should be string'));
- }
- fs.stat(fileName, (err, stats) => {
- if (err) {
- // Async
- return cb(err);
- }
- // Async
- cb(null, stats.size);
- });
- }
這其實也是個反例,這樣寫經(jīng)常會引起一些意外的錯誤,在設計宿主函數(shù)的時候,應當盡可能的使用同一種風格,要么始終都是同步的使用回調(diào),要么始終都是異步的。
我們來研究下一個典型的異步 Node 函數(shù)的簡單示例,它用回調(diào)樣式編寫:
- const readFileAsArray = function(file, cb) {
- fs.readFile(file, function(err, data) {
- if (err) {
- return cb(err);
- }
- const lines = data.toString().trim().split('\n');
- cb(null, lines);
- });
- };
readFileAsArray 函數(shù)接受兩個參數(shù):一個文件路徑和一個回調(diào)函數(shù)。它讀取文件內(nèi)容,將其拆分成行數(shù)組,并將該數(shù)組作為回調(diào)函數(shù)的參數(shù)傳入,調(diào)用回調(diào)函數(shù)。
現(xiàn)在設計一個用例,假設我們在同一目錄中的文件 numbers.txt 包含如下內(nèi)容:
- 10
- 11
- 12
- 13
- 14
- 15
如果我們有一個需求,要求統(tǒng)計該文件中的奇數(shù)數(shù)量,我們可以使用 readFileAsArray 來簡化代碼:
- readFileAsArray('./numbers.txt', (err, lines) => {
- if (err) throw err;
- const numbers = lines.map(Number);
- const oddNumbers = numbers.filter(n => n%2 === 1);
- console.log('Odd numbers count:', oddNumbers.length);
- });
這段代碼將文件內(nèi)容讀入字符串數(shù)組中,回調(diào)函數(shù)將其解析為數(shù)字,并計算奇數(shù)的個數(shù)。
這才是最純粹的 Node 回調(diào)風格。回調(diào)的***個參數(shù)要遵循錯誤優(yōu)先的原則,err 可以為空,我們要將回調(diào)作為宿主函數(shù)的***一個參數(shù)傳遞。你應該一直用這種方式這樣設計你的函數(shù),因為用戶可能會假設。讓宿主函數(shù)把回調(diào)當做其***一個參數(shù),并讓回調(diào)函數(shù)以一個可能為空的錯誤對象作為其***個參數(shù)。
回調(diào)在現(xiàn)代 JavaScript 中的替代品
在現(xiàn)代 JavaScript 中,我們有 Promise,Promise 可以用來替代異步 API 的回調(diào)。回調(diào)函數(shù)需要作為宿主函數(shù)的一個參數(shù)進行傳遞(多個宿主回調(diào)進行嵌套就形成了回調(diào)地獄),而且錯誤和成功都只能在其中進行處理。而 Promise 對象可以讓我們分開處理成功和錯誤,還允許我們鏈式調(diào)用多個異步事件。
如果 readFileAsArray 函數(shù)支持 Promise,我們可以這樣使用它,如下所示:
- readFileAsArray('./numbers.txt')
- .then(lines => {
- const numbers = lines.map(Number);
- const oddNumbers = numbers.filter(n => n%2 === 1);
- console.log('Odd numbers count:', oddNumbers.length);
- })
- .catch(console.error);
我們在宿主函數(shù)的返回值上調(diào)用了一個函數(shù)來處理我們的需求,這個 .then 函數(shù)會把剛剛在回調(diào)版本中的那個行數(shù)組傳遞給這里的匿名函數(shù)。為了處理錯誤,我們在結(jié)果上添加一個 .catch 調(diào)用,當發(fā)生錯誤時,它會捕捉到錯誤并讓我們訪問到這個錯誤。
在現(xiàn)代 JavaScript 中已經(jīng)支持了 Promise 對象,因此我們可以很容易的將其使用在宿主函數(shù)之中。下面是支持 Promise 版本的 readFileAsArray 函數(shù)(同時支持舊有的回調(diào)函數(shù)方式):
- const readFileAsArray = function(file, cb = () => {}) {
- return new Promise((resolve, reject) => {
- fs.readFile(file, function(err, data) {
- if (err) {
- reject(err);
- return cb(err);
- }
- const lines = data.toString().trim().split('\n');
- resolve(lines);
- cb(null, lines);
- });
- });
- };
我們使該函數(shù)返回一個 Promise 對象,該對象包裹了 fs.readFile 的異步調(diào)用。Promise 對象暴露了兩個參數(shù),一個 resolve 函數(shù)和一個 reject 函數(shù)。
當有異常拋出時,我們可以通過向回調(diào)函數(shù)傳遞 error 來處理錯誤,也同樣可以使用 Promise 的 reject 函數(shù)。每當我們將數(shù)據(jù)交給回調(diào)函數(shù)處理時,我們同樣也可以用 Promise 的 resolve 函數(shù)。
在這種同時可以使用回調(diào)和 Promise 的情況下,我們需要做的唯一一件事情就是為這個回調(diào)參數(shù)設置默認值,防止在沒有傳遞回調(diào)函數(shù)參數(shù)時,其被執(zhí)行然后報錯的情況。 在這個例子中使用了一個簡單的默認空函數(shù):()=> {}。
通過 async/await 使用 Promise
當需要連續(xù)調(diào)用異步函數(shù)時,使用 Promise 會讓你的代碼更容易編寫。不斷的使用回調(diào)會讓事情變得越來越復雜,最終陷入回調(diào)地獄。
Promise 的出現(xiàn)改善了一點,Generator 的出現(xiàn)又改善了一點。 處理異步問題的***解決方式是使用 async 函數(shù),它允許我們將異步代碼視為同步代碼,使其整體上更加可讀。
以下是使用 async/await 版本的調(diào)用 readFileAsArray 的例子:
- async function countOdd () {
- try {
- const lines = await readFileAsArray('./numbers');
- const numbers = lines.map(Number);
- const oddCount = numbers.filter(n => n%2 === 1).length;
- console.log('Odd numbers count:', oddCount);
- } catch(err) {
- console.error(err);
- }
- }
- countOdd();
首先,我們創(chuàng)建了一個 async 函數(shù) —— 就是一個普通的函數(shù)聲明之前,加了個 async 關鍵字。在 async 函數(shù)內(nèi)部,我們調(diào)用了 readFileAsArray 函數(shù),就像把它的返回值賦值給變量 lines 一樣,為了真的拿到 readFileAsArray 處理生成的行數(shù)組,我們使用關鍵字 await。之后,我們繼續(xù)執(zhí)行代碼,就好像 readFileAsArray 的調(diào)用是同步的一樣。
要讓代碼運行,我們可以直接調(diào)用 async 函數(shù)。這讓我們的代碼變得更加簡單和易讀。為了處理異常,我們需要將異步調(diào)用包裝在一個 try/catch 語句中。
有了 async/await 這個特性,我們不必使用任何特殊的API(如 .then 和 .catch )。我們只是把這種函數(shù)標記出來,然后使用純粹的 JavaScript 寫代碼。
我們可以把 async/await 這個特性用在支持使用 Promise 處理后續(xù)邏輯的函數(shù)上。但是,它無法用在只支持回調(diào)的異步函數(shù)上(例如setTimeout)。
EventEmitter 模塊
EventEmitter 是一個處理 Node 中各個對象之間通信的模塊。 EventEmitter 是 Node 異步事件驅(qū)動架構(gòu)的核心。 Node 的許多內(nèi)置模塊都繼承自 EventEmitter。
它的概念其實很簡單:emitter 對象會發(fā)出被定義過的事件,導致之前注冊的所有監(jiān)聽該事件的函數(shù)被調(diào)用。所以,emitter 對象基本上有兩個主要特征:
- 觸發(fā)定義過的事件
- 注冊或者取消注冊監(jiān)聽函數(shù)
為了使用 EventEmitter,我們需要創(chuàng)建一個繼承自 EventEmitter 的類。
- class MyEmitter extends EventEmitter {
- }
我們從 EventEmitter 的子類實例化的對象,就是 emitter 對象:
- const myEmitter = new MyEmitter();
在這些 emitter 對象的生命周期里,我們可以調(diào)用 emit 函數(shù)來觸發(fā)我們想要的觸發(fā)的任何被命名過的事件。
- myEmitter.emit('something-happened');
emit 函數(shù)的使用表示發(fā)生某種情況發(fā)生了,讓大家去做該做的事情。 這種情況通常是某些狀態(tài)變化引起的。
我們可以使用 on 方法添加監(jiān)聽器函數(shù),并且每次 emitter 對象觸發(fā)其關聯(lián)的事件時,將執(zhí)行這些監(jiān)聽器函數(shù)。
事件 !== 異步
先看看這個例子:
- const EventEmitter = require('events');
- class WithLog extends EventEmitter {
- execute(taskFunc) {
- console.log('Before executing');
- this.emit('begin');
- taskFunc();
- this.emit('end');
- console.log('After executing');
- }
- }
- const withLog = new WithLog();
- withLog.on('begin', () => console.log('About to execute'));
- withLog.on('end', () => console.log('Done with execute'));
- withLog.execute(() => console.log('*** Executing task ***'));
WithLog 是一個事件觸發(fā)器,它有一個方法 —— execute,該方法接受一個參數(shù),即具體要處理的任務函數(shù),并在其前后包裹 log 以輸出其執(zhí)行日志。
為了看到這里會以什么順序執(zhí)行,我們在兩個命名的事件上都注冊了監(jiān)聽器,***執(zhí)行一個簡單的任務來觸發(fā)事件。
下面是上面程序的輸出結(jié)果:
- Before executing
- About to execute
- *** Executing task ***
- Done with execute
- After executing
這里我想證實的是以上的輸出都是同步發(fā)生的,這段代碼里沒有什么異步的成分。
- ***行輸出了 "Before executing"
- begin 事件被觸發(fā),輸出 "About to execute"
- 真正應該被執(zhí)行的任務函數(shù)被調(diào)用,輸出 " Executing task "
- end 事件被觸發(fā),輸出 "Done with execute"
- ***輸出 "After executing"
就像普通的回調(diào)一樣,不要以為事件意味著同步或異步代碼。
跟之前的回調(diào)一樣,不要一提到事件就認為它是異步的或者同步的,還要具體分析。
如果我們傳遞 taskFunc 是一個異步函數(shù),會發(fā)生什么呢?
- // ...
- withLog.execute(() => {
- setImmediate(() => {
- console.log('*** Executing task ***')
- });
- });
輸出結(jié)果變成了這樣:
- Before executing
- About to execute
- Done with execute
- After executing
- *** Executing task ***
這樣就有問題了,異步函數(shù)的調(diào)用導致 "Done with execute" 和 "After executing" 的輸出并不準確。
要在異步函數(shù)完成后發(fā)出事件,我們需要將回調(diào)(或 Promise)與基于事件的通信相結(jié)合。 下面的例子說明了這一點。
使用事件而不是常規(guī)回調(diào)的一個好處是,我們可以通過定義多個監(jiān)聽器對相同的信號做出多個不同的反應。如果使用回調(diào)來完成這件事,我們要在單個回調(diào)中寫更多的處理邏輯。事件是應用程序允許多個外部插件在應用程序核心之上構(gòu)建功能的好辦法。你可以把它們當成鉤子來掛一些由于狀態(tài)變化而引發(fā)執(zhí)行的程序。
異步事件
我們把剛剛那些同步代碼的示例改成異步的:
- const fs = require('fs');
- const EventEmitter = require('events');
- class WithTime extends EventEmitter {
- execute(asyncFunc, ...args) {
- this.emit('begin');
- console.time('execute');
- asyncFunc(...args, (err, data) => {
- if (err) {
- return this.emit('error', err);
- }
- this.emit('data', data);
- console.timeEnd('execute');
- this.emit('end');
- });
- }
- }
- const withTime = new WithTime();
- withTime.on('begin', () => console.log('About to execute'));
- withTime.on('end', () => console.log('Done with execute'));
- withTime.execute(fs.readFile, __filename);
用 WithTime 類執(zhí)行 asyncFunc 函數(shù),并通過調(diào)用 console.time 和 console.timeEnd 報告該asyncFunc 所花費的時間。它在執(zhí)行之前和之后都將以正確的順序觸發(fā)相應的事件,并且還會發(fā)出 error/data 事件作為處理異步調(diào)用的信號。
我們傳遞一個異步的 fs.readFile 函數(shù)來測試一下 withTime emitter。 我們現(xiàn)在可以直接通過監(jiān)聽 data 事件來處理讀取到的文件數(shù)據(jù),而不用把這套處理邏輯寫到 fs.readFile 的回調(diào)函數(shù)中。
執(zhí)行這段代碼,我們以預期的順序執(zhí)行了一系列事件,并且得到異步函數(shù)的執(zhí)行時間,這些是十分重要的。
- About to execute
- execute: 4.507ms
- Done with execute
請注意,我們是將回調(diào)與事件觸發(fā)器 emitter 相結(jié)合實現(xiàn)的這部分功能。 如果 asynFunc 支持Promise,我們可以使用 async/await 函數(shù)來做同樣的事情:
- class WithTime extends EventEmitter {
- async execute(asyncFunc, ...args) {
- this.emit('begin');
- try {
- console.time('execute');
- const data = await asyncFunc(...args);
- this.emit('data', data);
- console.timeEnd('execute');
- this.emit('end');
- } catch(err) {
- this.emit('error', err);
- }
- }
- }
我認為這段代碼比之前的回調(diào)風格的代碼以及使用 .then/.catch 風格的代碼更具可讀性。async/await 讓我們更加接近 JavaScript 語言本身(不必再使用 .then/.catch 這些 api)。
事件參數(shù)和錯誤
在之前的例子中,有兩個事件被發(fā)出時還攜帶了別的參數(shù)。
error 事件被觸發(fā)時會攜帶一個 error 對象。
- this.emit('error', err);
data 事件被觸發(fā)時會攜帶一個 data 對象。
- this.emit('data', data);
我們可以在 emit 函數(shù)中不斷的添加參數(shù),當然***個參數(shù)一定是事件的名稱,除去***個參數(shù)之外的所有參數(shù)都可以在該事件注冊的監(jiān)聽器中使用。
例如,要處理 data 事件,我們注冊的監(jiān)聽器函數(shù)將訪問傳遞給 emit 函數(shù)的 data 參數(shù),而這個 data 也正是由 asyncFunc 返回的數(shù)據(jù)。
- withTime.on('data', (data) => {
- // do something with data
- });
error 事件比較特殊。在我們基于回調(diào)的那個示例中,如果不使用監(jiān)聽器處理 error 事件,node 進程將會退出。
舉個由于錯誤使用參數(shù)而造成程序崩潰的例子:
- class WithTime extends EventEmitter {
- execute(asyncFunc, ...args) {
- console.time('execute');
- asyncFunc(...args, (err, data) => {
- if (err) {
- return this.emit('error', err); // Not Handled
- }
- console.timeEnd('execute');
- });
- }
- }
- const withTime = new WithTime();
- withTime.execute(fs.readFile, ''); // BAD CALL
- withTime.execute(fs.readFile, __filename);
***次調(diào)用 execute 將會觸發(fā) error 事件,由于沒有處理 error ,Node 程序隨之崩潰:
- events.js:163
- throw er; // Unhandled 'error' event
- ^
- Error: ENOENT: no such file or directory, open ''
第二次執(zhí)行調(diào)用將受到此崩潰的影響,并且可能根本不會被執(zhí)行。
如果我們?yōu)檫@個 error 事件注冊一個監(jiān)聽器函數(shù)來處理 error,結(jié)果將大不相同:
- withTime.on('error', (err) => {
- // do something with err, for example log it somewhere
- console.log(err)
- });
如果我們執(zhí)行上述操作,將會報告***次執(zhí)行 execute 時發(fā)送的錯誤,但是這次 node 進程不會崩潰退出,其他程序的調(diào)用也都能正常完成:
- { Error: ENOENT: no such file or directory, open '' errno: -2, code: 'ENOENT', syscall: 'open', path: '' }
- execute: 4.276ms
需要注意的是,基于 Promise 的函數(shù)有些不同,它們暫時只是輸出一個警告:
- UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: 1): Error: ENOENT: no such file or directory, open ''
- DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
另一種處理異常的方式是在監(jiān)聽全局的 uncaughtException 進程事件。 然而,使用該事件全局捕獲錯誤并不是一個好辦法。
關于 uncaughtException,一般都會建議你避免使用它,但是如果必須用它,你應該讓進程退出:
- process.on('uncaughtException', (err) => {
- // something went unhandled.
- // Do any cleanup and exit anyway!
- console.error(err); // don't do just that.
- // FORCE exit the process too.
- process.exit(1);
- });
但是,假設在同一時間發(fā)生多個錯誤事件,這意味著上面的 uncaughtException 監(jiān)聽器將被多次觸發(fā),這可能會引起一些問題。
EventEmitter 模塊暴露了 once 方法,這個方法發(fā)出的信號只會調(diào)用一次監(jiān)聽器。所以,這個方法常與 uncaughtException 一起使用。
監(jiān)聽器的順序
如果針對一個事件注冊多個監(jiān)聽器函數(shù),當事件被觸發(fā)時,這些監(jiān)聽器函數(shù)將按其注冊的順序被觸發(fā)。
- // first
- withTime.on('data', (data) => {
- console.log(`Length: ${data.length}`);
- });
- // second
- withTime.on('data', (data) => {
- console.log(`Characters: ${data.toString().length}`);
- });
- withTime.execute(fs.readFile, __filename);
上述代碼會先輸出 Length 信息,再輸出 Characters 信息,執(zhí)行的順序與注冊的順序保持一致。
如果你想定義一個新的監(jiān)聽函數(shù),但是希望它能夠***個被執(zhí)行,你還可以使用 prependListener 方法:
- withTime.on('data', (data) => {
- console.log(`Length: ${data.length}`);
- });
- withTime.prependListener('data', (data) => {
- console.log(`Characters: ${data.toString().length}`);
- });
- withTime.execute(fs.readFile, __filename);
上述代碼中,Charaters 信息將首先被輸出。
***,你可以用 removeListener 函數(shù)來刪除某個監(jiān)聽器函數(shù)。