格物致知-記一次Nodejs源碼分析的經(jīng)歷
本文轉(zhuǎn)載自微信公眾號(hào)「編程雜技」,作者theanarkh。轉(zhuǎn)載本文請(qǐng)聯(lián)系編程雜技公眾號(hào)。
昨天分析http模塊相關(guān)的代碼時(shí),遇到了一個(gè)晦澀的邏輯,看了想,想了看還是沒(méi)看懂。百度、谷歌了很多帖子也沒(méi)看到合適的答案。突然看到一個(gè)題目有點(diǎn)相識(shí)的搜索結(jié)果,點(diǎn)進(jìn)去是Stack Overflow上的帖子,但是已經(jīng)404,最后還是通過(guò)快照功能成功看到內(nèi)容。這個(gè)帖子[1]和我的疑惑不相關(guān),但是突然給了我一些靈感。沿著這個(gè)靈感去看了代碼,最后下載nodejs源碼,加了一些log,編譯了一夜(太久了,等不及編譯完成,得睡覺(jué)了)。上午起來(lái)驗(yàn)證,終于揭開(kāi)了疑惑。這個(gè)問(wèn)題源于下面這段代碼。
- function connectionListenerInternal(server, socket) {
- socket.server = server;
- // 分配一個(gè)http解析器
- const parser = parsers.alloc();
- // 解析請(qǐng)求報(bào)文
- parser.initialize(
- HTTPParser.REQUEST,
- new HTTPServerAsyncResource('HTTPINCOMINGMESSAGE', socket),
- server.maxHeaderSize || 0,
- server.insecureHTTPParser === undefined ?
- isLenient() : server.insecureHTTPParser,
- );
- parser.socket = socket;
- // 開(kāi)始解析頭部的開(kāi)始時(shí)間
- parser.parsingHeadersStart = nowDate();
- socket.parser = parser;
- const state = {
- onData: null,
- onEnd: null,
- onClose: null,
- onDrain: null,
- // 同一tcp連接上,請(qǐng)求和響應(yīng)的的隊(duì)列
- outgoing: [],
- incoming: [],
- outgoingData: 0,
- keepAliveTimeoutSet: false
- };
- state.onData = socketOnData.bind(undefined, server, socket, parser, state);
- socket.on('data', state.onData);
- if (socket._handle && socket._handle.isStreamBase &&
- !socket._handle._consumed) {
- parser._consumed = true;
- socket._handle._consumed = true;
- parser.consume(socket._handle);
- }
- parser[kOnExecute] =
- onParserExecute.bind(undefined, server, socket, parser, state);
- socket._paused = false;
- }
這段代碼看起來(lái)很多,這是啟動(dòng)http服務(wù)器后,有新的tcp連接建立時(shí)執(zhí)行的回調(diào)。問(wèn)題在于tcp上有數(shù)據(jù)到來(lái)時(shí),是怎么處理的,上面代碼中nodejs監(jiān)聽(tīng)了socket的data事件,同時(shí)注冊(cè)了鉤子kOnExecute。data事件我們都知道是流上有數(shù)據(jù)到來(lái)時(shí)觸發(fā)的事件。我們看一下socketOnData做了什么事情。
- function socketOnData(server, socket, parser, state, d) {
- // 交給http解析器處理,返回已經(jīng)解析的字節(jié)數(shù)
- const ret = parser.execute(d);
- onParserExecuteCommon(server, socket, parser, state, ret, d);
- }
這看起來(lái)沒(méi)有問(wèn)題,socket上有數(shù)據(jù),然后交給http解析器處理。幾乎所有http模塊源碼解析的文章也是這樣分析的,我第一反應(yīng)也覺(jué)得這個(gè)沒(méi)問(wèn)題,那kOnExecute是做什么的呢?kOnExecute鉤子函數(shù)的值是onParserExecute,這個(gè)看起來(lái)也是解析tcp上的數(shù)據(jù)的,看起來(lái)和onSocketData是一樣的作用,難道tcp上的數(shù)據(jù)有兩個(gè)消費(fèi)者?我們看一下kOnExecute什么時(shí)候被回調(diào)的。
- void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override {
- Local<Value> ret = Execute(buf.base, nread);
- Local<Value> cb =
- object()->Get(env()->context(), kOnExecute).ToLocalChecked();
- MakeCallback(cb.As<Function>(), 1, &ret);
- }
在node_http_parser.cc中的OnStreamRead中被回調(diào),那么OnStreamRead又是什么時(shí)候被回調(diào)的呢?OnStreamRead是nodejs中c++層流操作的通用函數(shù),當(dāng)流有數(shù)據(jù)的時(shí)候就會(huì)執(zhí)行該回調(diào)。而且OnStreamRead中也會(huì)把數(shù)據(jù)交給http解析器解析。這看起來(lái)真的有兩個(gè)消費(fèi)者?這就很奇怪,為什么一份數(shù)據(jù)會(huì)交給http解析器處理兩次?這時(shí)候我的想法就是這兩個(gè)地方肯定是互斥的。但是我一直沒(méi)有找到是哪里做了處理。最后在connectionListenerInternal的一段代碼中找到了答案。
- if (socket._handle && socket._handle.isStreamBase && !socket._handle._consumed) {
- parser._consumed = true;
- socket._handle._consumed = true;
- parser.consume(socket._handle);
- }
因?yàn)閠cp流是繼承StreamBase類(lèi)的,所以if成立(后面會(huì)具體分析)。我們看一下consume的實(shí)現(xiàn)。
- static void Consume(const FunctionCallbackInfo<Value>& args) {
- Parser* parser;
- ASSIGN_OR_RETURN_UNWRAP(&parser, args.Holder());
- CHECK(args[0]->IsObject());
- StreamBase* stream = StreamBase::FromObjject(args[0].As<Object>());
- CHECK_NOT_NULL(stream);
- stream->PushStreamListener(parser);
- }
http解析器把自己注冊(cè)為tcp stream的一個(gè)listener。這里涉及到了c++層對(duì)流的設(shè)計(jì)。我們從頭開(kāi)始??匆幌翽ushStreamListener做了什么事情。c++層中,流的操作由類(lèi)StreamResource進(jìn)行了封裝。
- class StreamResource {
- public:
- virtual ~StreamResource();
- virtual int ReadStart() = 0;
- virtual int ReadStop() = 0;
- virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
- virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
- virtual int DoWrite(WriteWrap* w,
- uv_buf_t* bufs,
- size_t count,
- uv_stream_t* send_handle) = 0;
- void PushStreamListener(StreamListener* listener);
- void RemoveStreamListener(StreamListener* listener);
- protected:
- uv_buf_t EmitAlloc(size_t suggested_size);
- void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0));
- StreamListener* listener_ = nullptr;
- uint64_t bytes_read_ = 0;
- uint64_t bytes_written_ = 0;
- friend class StreamListener;
- };
我們看到StreamResource是一個(gè)基類(lèi),定義了操作流的公共方法。其中有一個(gè)成員是StreamListener類(lèi)的實(shí)例。我們看看StreamListener的實(shí)現(xiàn)。
- class StreamListener {
- public:
- virtual ~StreamListener();
- virtual uv_buf_t OnStreamAlloc(size_t suggested_size) = 0;
- virtual void OnStreamRead(ssize_t nread,
- const uv_buf_t& buf) = 0;
- virtual void OnStreamDestroy() {}
- inline StreamResource* stream() { return stream_; }
- protected:
- void PassReadErrorToPreviousListener(ssize_t nread);
- StreamResource* stream_ = nullptr;
- StreamListener* previous_listener_ = nullptr;
- friend class StreamResource;
- };
StreamListener是一個(gè)負(fù)責(zé)消費(fèi)流數(shù)據(jù)的類(lèi)。StreamListener 和StreamResource類(lèi)的關(guān)系如下。
null我們看到一個(gè)流可以注冊(cè)多個(gè)listener,多個(gè)listener形成一個(gè)鏈表。接著我們看一下創(chuàng)建一個(gè)c++層的tcp對(duì)象是怎樣的。下面是TCPWrap的繼承關(guān)系。
- class TCPWrap : public ConnectionWrap<TCPWrap, uv_tcp_t>{}
- class ConnectionWrap : public LibuvStreamWrap{}
- class LibuvStreamWrap : public HandleWrap, public StreamBase{}
- class StreamBase : public StreamResource {}
我們看到tcp流是繼承于StreamResource的。新建一個(gè)tcp的c++的對(duì)象時(shí)(tcp_wrap.cc),會(huì)不斷往上調(diào)用父類(lèi)的構(gòu)造函數(shù),其中在StreamBase中有一個(gè)關(guān)鍵的操作。
- inline StreamBase::StreamBase(Environment* env) : env_(env) {
- PushStreamListener(&default_listener_);
- }
- EmitToJSStreamListener default_listener_;
StreamBase會(huì)默認(rèn)給流注冊(cè)一個(gè)listener。我們看下EmitToJSStreamListener 具體的定義。
- class ReportWritesToJSStreamListener : public StreamListener {
- public:
- void OnStreamAfterWrite(WriteWrap* w, int status) override;
- void OnStreamAfterShutdown(ShutdownWrap* w, int status) override;
- private:
- void OnStreamAfterReqFinished(StreamReq* req_wrap, int status);
- };
- class EmitToJSStreamListener : public ReportWritesToJSStreamListener {
- public:
- uv_buf_t OnStreamAlloc(size_t suggested_size) override;
- void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
- };
EmitToJSStreamListener繼承StreamListener ,定義了分配內(nèi)存和讀取接收數(shù)據(jù)的函數(shù)。接著我們看一下PushStreamListener做了什么事情。
- inline void StreamResource::PushStreamListener(StreamListener* listener) {
- // 頭插法
- listener->previous_listener_ = listener_;
- listener->stream_ = this;
- listener_ = listener;
- }
PushStreamListener就是構(gòu)造出上圖的結(jié)構(gòu)。對(duì)應(yīng)到創(chuàng)建一個(gè)c++層的tcp對(duì)象中,如下圖。
然后我們看一下對(duì)于流來(lái)說(shuō),讀取數(shù)據(jù)的整個(gè)鏈路。首先是js層調(diào)用readStart
- function tryReadStart(socket) {
- socket._handle.reading = true;
- const err = socket._handle.readStart();
- if (err)
- socket.destroy(errnoException(err, 'read'));
- }
- // 注冊(cè)等待讀事件
- Socket.prototype._read = function(n) {
- tryReadStart(this);
- };
我們看看readStart
- int LibuvStreamWrap::ReadStart() {
- return uv_read_start(stream(), [](uv_handle_t* handle,
- size_t suggested_size,
- uv_buf_t* buf) {
- static_cast<LibuvStreamWrap*>(handle->data)->OnUvAlloc(suggested_size, buf);
- }, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
- static_cast<LibuvStreamWrap*>(stream->data)->OnUvRead(nread, buf);
- });
- }
ReadStart調(diào)用libuv的uv_read_start注冊(cè)等待可讀事件,并且注冊(cè)了兩個(gè)回調(diào)函數(shù)OnUvAlloc和OnUvRead。
- void LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) {
- EmitRead(nread, *buf);
- }
- inline void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) {
- // bytes_read_表示已讀的字節(jié)數(shù)
- if (nread > 0)
- bytes_read_ += static_cast<uint64_t>(nread);
- listener_->OnStreamRead(nread, buf);
- }
通過(guò)層層調(diào)用最后會(huì)調(diào)用listener_的OnStreamRead。我們看看tcp的OnStreamRead
- void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
- StreamBase* stream = static_cast<StreamBase*>(stream_);
- Environment* env = stream->stream_env();
- HandleScope handle_scope(env->isolate());
- Context::Scope context_scope(env->context());
- AllocatedBuffer buf(env, buf_);
- stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer());
- }
繼續(xù)回調(diào)CallJSOnreadMethod
- MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread,
- Local<ArrayBuffer> ab,
- size_t offset,
- StreamBaseJSChecks checks) {
- Environment* env = env_;
- // ...
- AsyncWrap* wrap = GetAsyncWrap();
- CHECK_NOT_NULL(wrap);
- Local<Value> onread = wrap->object()->GetInternalField(kOnReadFunctionField);
- CHECK(onread->IsFunction());
- return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
- }
CallJSOnreadMethod會(huì)回調(diào)js層的onread回調(diào)函數(shù)。onread會(huì)把數(shù)據(jù)push到流中,然后觸發(fā)data事件。這是tcp里默認(rèn)的數(shù)據(jù)讀取過(guò)程。而文章開(kāi)頭講到的parser.consume打破了這個(gè)默認(rèn)行為。stream->PushStreamListener(parser);修改了tcp流的listener鏈,http parser把自己作為數(shù)據(jù)的接收者。所以這時(shí)候tcp流上的數(shù)據(jù)是直接由node_http_parser.cc的OnStreamRead消費(fèi)的。而不是觸發(fā)socket的data事件,最后通過(guò)在nodejs源碼中加log,重新編譯驗(yàn)證的確如文中所述。最后提一個(gè)這個(gè)過(guò)程中還有一個(gè)關(guān)鍵的地方是調(diào)用consume函數(shù)的前提是socket._handle.isStreamBase為true。isStreamBase是在StreamBase::AddMethods中定義為true的,而tcp對(duì)象創(chuàng)建的過(guò)程中,調(diào)用了這個(gè)方法,所以tcp的isStreamBase是true,才會(huì)執(zhí)行consume,才會(huì)執(zhí)行kOnExecute回調(diào)。
References
[1] 帖子: http://cache.baiducontent.com/c?m=rZy2XovtTdJJuXWLM-s8wgpaz8NFubewtolyiC19iAKFJrbGdx2EFnArzlAIDisNP70zWWsCPv-4jwMHTGNcLaUsMVr-lvLqYmmHD-w_fUYz6a5K6OQRC9kZmLYN5RXsb34OdINb8xHIJsdyClaEWOtCGKMQ2saYK7ed7OG8v0E1pRKR4K46phl0rCBrw6amXE3QpPo62dMhvu_VASYYqq&p=cb77c64ad49111a05bee9e264d5693&newp=882a9646dc9712a05ab7cc374f0ccc231615d70e3ad3d501298ffe0cc4241a1a1a3aecbf2d29170ed6c27f630bae4856ecf630723d0834f1f689df08d2ecce7e7b&s=cfcd208495d565ef&user=baidu&fm=sc&query=onParserExecute&qid=869f73bc002e44f5&p1=11