Nodejs v14源碼分析之Event模塊
本文轉(zhuǎn)載自微信公眾號「編程雜技」,作者theanarkh。轉(zhuǎn)載本文請聯(lián)系編程雜技公眾號。
events模塊是Node.js中比較簡單但是卻非常核心的模塊,Node.js中,很多模塊都繼承于events模塊,events模塊是發(fā)布、訂閱模式的實(shí)現(xiàn)。我們首先看一下如何使用events模塊。
- const { EventEmitter } = require('events');
- class Events extends EventEmitter {}
- const events = new Events();
- events.on('demo', () => {
- console.log('emit demo event');
- });
- events.emit('demo');
接下來我們看一下events模塊的具體實(shí)現(xiàn)。
1 初始化 當(dāng)new一個(gè)EventEmitter或者它的子類時(shí),就會進(jìn)入EventEmitter的邏輯。
- function EventEmitter(opts) {
- EventEmitter.init.call(this, opts);
- }
- EventEmitter.init = function(opts) {
- // 如果是未初始化或者沒有自定義_events,則初始化
- if (this._events === undefined ||
- this._events === ObjectGetPrototypeOf(this)._events) {
- this._events = ObjectCreate(null);
- this._eventsCount = 0;
- }
- /*
- 初始化一類事件的處理函數(shù)個(gè)數(shù)的閾值
- 我們可以通過setMaxListeners接口設(shè)置,
- 如果沒有顯示設(shè)置,閾值則為defaultMaxListeners的值(10),
- 可通過getMaxListeners接口獲取
- */
- this._maxListeners = this._maxListeners || undefined;
- // 是否開啟捕獲promise reject,默認(rèn)false
- if (opts && opts.captureRejections) {
- this[kCapture] = Boolean(opts.captureRejections);
- } else {
- this[kCapture] = EventEmitter.prototype[kCapture];
- }
- };
EventEmitter的初始化主要是初始化了一些數(shù)據(jù)結(jié)構(gòu)和屬性。唯一支持的一個(gè)參數(shù)就是captureRejections,captureRejections表示當(dāng)觸發(fā)事件,執(zhí)行處理函數(shù)時(shí),EventEmitter是否捕獲處理函數(shù)中的異常。后面我們會詳細(xì)講解。
2 訂閱事件 初始化完EventEmitter之后,我們就可以開始使用訂閱、發(fā)布的功能。我們可以通過addListener、prependListener、on、once訂閱事件。addListener和on是等價(jià)的,prependListener的區(qū)別在于處理函數(shù)會被插入到隊(duì)首,而默認(rèn)是追加到隊(duì)尾。once注冊的處理函數(shù),最多被執(zhí)行一次。四個(gè)api都是通過_addListener函數(shù)實(shí)現(xiàn)的。下面我們看一下具體實(shí)現(xiàn)。
- function _addListener(target, type, listener, prepend) {
- let m;
- let events;
- let existing;
- events = target._events;
- // 還沒有初始化_events則初始化,_eventsCount為事件類型個(gè)數(shù)
- if (events === undefined) {
- events = target._events = ObjectCreate(null);
- target._eventsCount = 0;
- } else {
- /*
- 已經(jīng)注冊過事件,則判斷是否定義了newListener事件,
- 是的話先觸發(fā),如果監(jiān)聽了newListener事件,每次注冊
- 其它事件時(shí)都會觸發(fā)newListener,相當(dāng)于鉤子
- */
- if (events.newListener !== undefined) {
- target.emit('newListener',
- type,
- listener.listener ?
- listener.listener :
- listener);
- // newListener處理函數(shù)可能會修改_events,這里重新賦值
- events = target._events;
- }
- // 判斷是否已經(jīng)存在處理函數(shù)
- existing = events[type];
- }
- // 不存在則以函數(shù)的形式存儲,否則以數(shù)組形式存儲
- if (existing === undefined) {
- events[type] = listener;
- // 新增一個(gè)事件類型,事件類型個(gè)數(shù)加一
- ++target._eventsCount;
- } else {
- /*
- existing是函數(shù)說明之前注冊過該事件一次,
- 否則說明existing為數(shù)組,則直接插入相應(yīng)位置
- */
- if (typeof existing === 'function') {
- existing = events[type] =
- prepend ? [listener, existing] : [existing, listener];
- } else if (prepend) {
- existing.unshift(listener);
- } else {
- existing.push(listener);
- }
- // 處理告警,處理函數(shù)過多可能是因?yàn)橹暗臎]有刪除,造成內(nèi)存泄漏
- m = _getMaxListeners(target);
- // 該事件處理函數(shù)達(dá)到閾值并且還沒有提示過警告信息則提示
- if (m > 0 && existing.length > m && !existing.warned) {
- existing.warned = true;
- const w = new Error('錯(cuò)誤信息…');
- w.name = 'MaxListenersExceededWarning';
- w.emitter = target;
- w.type = type;
- w.count = existing.length;
- process.emitWarning(w);
- }
- }
- return target;
- }
接下來我們看一下once的實(shí)現(xiàn),對比其它幾種api,once的實(shí)現(xiàn)相對比較復(fù)雜,因?yàn)槲覀円刂铺幚砗瘮?shù)最多執(zhí)行一次,所以我們需要保證在事件觸發(fā)的時(shí)候,執(zhí)行用戶定義函數(shù)的同時(shí),還需要?jiǎng)h除注冊的事件。
- ventEmitter.prototype.once = function once(type, listener) {
- this.on(type, _onceWrap(this, type, listener));
- return this;
- ;
- unction onceWrapper() {
- // 還沒有觸發(fā)過
- if (!this.fired) {
- // 刪除它
- this.target.removeListener(this.type, this.wrapFn);
- // 觸發(fā)了
- this.fired = true;
- // 執(zhí)行
- if (arguments.length === 0)
- return this.listener.call(this.target);
- return this.listener.apply(this.target, arguments);
- }
- }
- // 支持once api
- function _onceWrap(target, type, listener) {
- // fired是否已執(zhí)行處理函數(shù),wrapFn包裹listener的函數(shù)
- const state = { fired: false, wrapFn: undefined, target, type, listener };
- // 生成一個(gè)包裹listener的函數(shù)
- const wrapped = onceWrapper.bind(state);
- /*
- 把原函數(shù)listener也掛到包裹函數(shù)中,用于事件沒有觸發(fā)前,
- 用戶主動(dòng)刪除,見removeListener
- */
- wrapped.listener = listener;
- // 保存包裹函數(shù),用于執(zhí)行完后刪除,見onceWrapper
- state.wrapFn = wrapped;
- return wrapped;
- }
Once函數(shù)構(gòu)造一個(gè)上下文(state)保存用戶處理函數(shù)和執(zhí)行狀態(tài)等信息,然后通過bind返回一個(gè)帶有該上下文(state)的函數(shù)wrapped注冊到事件系統(tǒng)。當(dāng)事件觸發(fā)時(shí),在wrapped函數(shù)中首先移除wrapped,然后執(zhí)行用戶的函數(shù)。Wrapped起到了劫持的作用。另外還需要在wrapped上保存用戶傳進(jìn)來的函數(shù),當(dāng)用戶在事件觸發(fā)前刪除該事件時(shí)或解除該函數(shù)時(shí),在遍歷該類事件的處理函數(shù)過程中,可以通過wrapped.listener找到對應(yīng)的項(xiàng)進(jìn)行刪除。
3 觸發(fā)事件 分析完事件的訂閱,接著我們看一下事件的觸發(fā)。
- EventEmitter.prototype.emit = function emit(type, ...args) {
- // 觸發(fā)的事件是否是error,error事件需要特殊處理
- let doError = (type === 'error');
- const events = this._events;
- // 定義了處理函數(shù)(不一定是type事件的處理函數(shù))
- if (events !== undefined) {
- /*
- 如果觸發(fā)的事件是error,并且監(jiān)聽了kErrorMonitor
- 事件則觸發(fā)kErrorMonitor事件
- */
- if (doError && events[kErrorMonitor] !== undefined)
- this.emit(kErrorMonitor, ...args);
- // 觸發(fā)的是error事件但是沒有定義處理函數(shù)
- doError = (doError && events.error === undefined);
- } else if (!doError)
- // 沒有定義處理函數(shù)并且觸發(fā)的不是error事件則不需要處理,
- return false;
- // If there is no 'error' event listener then throw.
- // 觸發(fā)的是error事件,但是沒有定義處理error事件的函數(shù),則報(bào)錯(cuò)
- if (doError) {
- let er;
- if (args.length > 0)
- er = args[0];
- // 第一個(gè)入?yún)⑹荅rror的實(shí)例
- if (er instanceof Error) {
- try {
- const capture = {};
- /*
- 給capture對象注入stack屬性,stack的值是執(zhí)行
- Error.captureStackTrace語句的當(dāng)前棧信息,但是
- 不包括emit的部分
- */
- Error.captureStackTrace(capture, EventEmitter.prototype.emit);
- ObjectDefineProperty(er, kEnhanceStackBeforeInspector, {
- value: enhanceStackTrace.bind(this, er, capture),
- configurable: true
- });
- } catch {}
- throw er; // Unhandled 'error' event
- }
- let stringifiedEr;
- const { inspect } = require('internal/util/inspect');
- try {
- stringifiedEr = inspect(er);
- } catch {
- stringifiedEr = er;
- }
- const err = new ERR_UNHANDLED_ERROR(stringifiedEr);
- err.context = er;
- throw err; // Unhandled 'error' event
- }
- // 獲取type事件對應(yīng)的處理函數(shù)
- const handler = events[type];
- // 沒有則不處理
- if (handler === undefined)
- return false;
- // 等于函數(shù)說明只有一個(gè)
- if (typeof handler === 'function') {
- // 直接執(zhí)行
- const result = ReflectApply(handler, this, args);
- // 非空判斷是不是promise并且是否需要處理,見addCatch
- if (result !== undefined && result !== null) {
- addCatch(this, result, type, args);
- }
- } else {
- // 多個(gè)處理函數(shù),同上
- const len = handler.length;
- const listeners = arrayClone(handler, len);
- for (let i = 0; i < len; ++i) {
- const result = ReflectApply(listeners[i], this, args);
- if (result !== undefined && result !== null) {
- addCatch(this, result, type, args);
- }
- }
- }
- return true;
- }
我們看到在Node.js中,對于error事件是特殊處理的,如果用戶沒有注冊error事件的處理函數(shù),可能會導(dǎo)致程序掛掉,另外我們看到有一個(gè)addCatch的邏輯,addCatch是為了支持事件處理函數(shù)為異步模式的情況,比如async函數(shù)或者返回Promise的函數(shù)。
- function addCatch(that, promise, type, args) {
- // 沒有開啟捕獲則不需要處理
- if (!that[kCapture]) {
- return;
- }
- // that throws on second use.
- try {
- const then = promise.then;
- if (typeof then === 'function') {
- // 注冊reject的處理函數(shù)
- then.call(promise, undefined, function(err) {
- process.nextTick(emitUnhandledRejectionOrErr, that, err, type, args);
- });
- }
- } catch (err) {
- that.emit('error', err);
- }
- }
- function emitUnhandledRejectionOrErr(ee, err, type, args) {
- // 用戶實(shí)現(xiàn)了kRejection則執(zhí)行
- if (typeof ee[kRejection] === 'function') {
- ee[kRejection](err, type, ...args);
- } else {
- // 保存當(dāng)前值
- const prev = ee[kCapture];
- try {
- /*
- 關(guān)閉然后觸發(fā)error事件,意義
- 1 防止error事件處理函數(shù)也拋出error,導(dǎo)致死循環(huán)
- 2 如果用戶處理了error,則進(jìn)程不會退出,所以需要恢復(fù)
- kCapture的值如果用戶沒有處理error,則Node.js會觸發(fā)
- uncaughtException,如果用戶處理了uncaughtException
- 則需要恢復(fù)kCapture的值
- */
- ee[kCapture] = false;
- ee.emit('error', err);
- } finally {
- ee[kCapture] = prev;
- }
- }
- }
4 取消訂閱 我們接著看一下刪除事件處理函數(shù)的邏輯。
- function removeAllListeners(type) {
- const events = this._events;
- if (events === undefined)
- return this;
- /*
- 沒有注冊removeListener事件,則只需要?jiǎng)h除數(shù)據(jù),
- 否則還需要觸發(fā)removeListener事件
- */
- if (events.removeListener === undefined) {
- // 等于0說明是刪除全部
- if (arguments.length === 0) {
- this._events = ObjectCreate(null);
- this._eventsCount = 0;
- } else if (events[type] !== undefined) {
- /*
- 否則是刪除某個(gè)類型的事件,是唯一一個(gè)處理函數(shù),
- 則重置_events,否則刪除對應(yīng)的事件類型
- */
- if (--this._eventsCount === 0)
- this._events = ObjectCreate(null);
- else
- delete events[type];
- }
- return this;
- }
- /*
- 說明注冊了removeListener事件,arguments.length === 0
- 說明刪除所有類型的事件
- */
- if (arguments.length === 0) {
- /*
- 逐個(gè)刪除,除了removeListener事件,
- 這里刪除了非removeListener事件
- */
- for (const key of ObjectKeys(events)) {
- if (key === 'removeListener') continue;
- this.removeAllListeners(key);
- }
- // 這里刪除removeListener事件,見下面的邏輯
- this.removeAllListeners('removeListener');
- // 重置數(shù)據(jù)結(jié)構(gòu)
- this._events = ObjectCreate(null);
- this._eventsCount = 0;
- return this;
- }
- // 刪除某類型事件
- const listeners = events[type];
- if (typeof listeners === 'function') {
- this.removeListener(type, listeners);
- } else if (listeners !== undefined) {
- // LIFO order
- for (let i = listeners.length - 1; i >= 0; i--) {
- this.removeListener(type, listeners[i]);
- }
- }
- return this;
- }
removeAllListeners函數(shù)主要的邏輯有兩點(diǎn),第一個(gè)是removeListener事件需要特殊處理,這類似一個(gè)鉤子,每次用戶刪除事件處理函數(shù)的時(shí)候都會觸發(fā)該事件。第二是removeListener函數(shù)。removeListener是真正刪除事件處理函數(shù)的實(shí)現(xiàn)。removeAllListeners是封裝了removeListener的邏輯。
- function removeListener(type, listener) {
- let originalListener;
- const events = this._events;
- // 沒有東西可刪除
- if (events === undefined)
- return this;
- const list = events[type];
- // 同上
- if (list === undefined)
- return this;
- // list是函數(shù)說明只有一個(gè)處理函數(shù),否則是數(shù)組,如果list.listener === listener說明是once注冊的
- if (list === listener || list.listener === listener) {
- // type類型的處理函數(shù)就一個(gè),并且也沒有注冊其它類型的事件,則初始化_events
- if (--this._eventsCount === 0)
- this._events = ObjectCreate(null);
- else {
- // 就一個(gè)執(zhí)行完刪除type對應(yīng)的屬性
- delete events[type];
- // 注冊了removeListener事件,則先注冊removeListener事件
- if (events.removeListener)
- this.emit('removeListener',
- type,
- list.listener || listener);
- }
- } else if (typeof list !== 'function') {
- // 多個(gè)處理函數(shù)
- let position = -1;
- // 找出需要?jiǎng)h除的函數(shù)
- for (let i = list.length - 1; i >= 0; i--) {
- if (list[i] === listener ||
- list[i].listener === listener) {
- // 保存原處理函數(shù),如果有的話
- originalListener = list[i].listener;
- position = i;
- break;
- }
- }
- if (position < 0)
- return this;
- // 第一個(gè)則出隊(duì),否則刪除一個(gè)
- if (position === 0)
- list.shift();
- else {
- if (spliceOne === undefined)
- spliceOne = require('internal/util').spliceOne;
- spliceOne(list, position);
- }
- // 如果只剩下一個(gè),則值改成函數(shù)類型
- if (list.length === 1)
- events[type] = list[0];
- // 觸發(fā)removeListener
- if (events.removeListener !== undefined)
- this.emit('removeListener',
- type,
- originalListener || listener);
- }
- return this;
- };
以上就是events模塊的核心邏輯,另外還有一些工具函數(shù)就不一一分析。