通過 Inspector 收集 Node.js 的Trace Event 數據
前言
Node.js 提供了 trace event 的機制,在 Node.js 內核代碼里,靜態(tài)地埋了一些點,比如同步文件 IO 耗時,DNS 解析耗時等。每次執(zhí)行這些代碼時,Node.js 就會執(zhí)行這些點的鉤子,從而收集相應的數據。不過這個能力默認是關閉的,畢竟對性能會產生影響。我們可以通過 trace_events 模塊打開這個功能。trace_events 模塊會源源不斷地把數據寫到一個到多個文件中。除了通過 trace_events 模塊之外,Node.js 也實現了通過 Inspector 協議收集 trace event 數據,本文介紹基于 inspector 協議收集 trace event 數據的實現。
首先看一下如何使用這種方式。
const { Session } = require('inspector');
const session = new Session();
session.connect();
function post(message, data) {
return new Promise((resolve, reject) => {
session.post(message, data, (err, result) => {
if (err)
reject(new Error(JSON.stringify(err)));
else
resolve(result);
});
});
}
async function test() {
session.on('NodeTracing.dataCollected', (data) => {
console.log(data.params.value);
});
session.on('NodeTracing.tracingComplete', () => {
console.log('done');
});
const { categories } = await post('NodeTracing.getCategories');
const traceConfig = { includedCategories: categories };
await post('NodeTracing.start', { traceConfig });
setTimeout(() => {
post('NodeTracing.stop');
}, 1000);
}
test();
使用方式比較固定,也比較簡單,trace event 是基于類型的,比如同步文件 IO,DNS 解析。所以第一步首先設置需要收集的模塊類型,也可以通過 NodeTracing.getCategories 命令獲取當前支持的模塊類型。接著通過 NodeTracing.start 開啟數據收集,收集一段時間后,通過 NodeTracing.stop 停止數據的收集,在這個過程中,收集的數據會通過 NodeTracing.dataCollected 事件源源不斷地流向用戶側,我們可以保存這些數據后續(xù)進行分析,收集完畢后會觸發(fā) NodeTracing.tracingComplete 事件,從而完成整個過程。下面我們來看一下這些命令的實現。首先看一下整體的架構。
之前介紹過 Node.js Inspector 的架構,本文就不再具體展開介紹。簡單來說,當我們通過 js 層的 session 發(fā)送命令時,代碼流程從圖的左邊到右邊,收集到數據時,代碼流程從右往左回調 js 層。首先來看一下 NodeTracing.start。Node.js 的 Inspector 框架采用兩級路由的機制,首先通過 NodeTracing 找到一級路由,在 inspetor 里叫 Domain,然后再通過 start 找到二級路由。 來看一下每個路由對應的函數。
m_dispatchMap["NodeTracing.getCategories"] = &DispatcherImpl::getCategories;
m_dispatchMap["NodeTracing.start"] = &DispatcherImpl::start;
m_dispatchMap["NodeTracing.stop"] = &DispatcherImpl::stop;
我們只關注 start 和 stop 的邏輯。
void DispatcherImpl::start(int callId, const String& method, const ProtocolMessage& message, std::unique_ptr<DictionaryValue> requestMessageObject, ErrorSupport* errors){
protocol::DictionaryValue* object = DictionaryValue::cast(requestMessageObject->get("params"));
protocol::Value* traceConfigValue = object ? object->get("traceConfig") : nullptr;
std::unique_ptr<protocol::NodeTracing::TraceConfig> in_traceConfig = ValueConversions<protocol::NodeTracing::TraceConfig>::fromValue(traceConfigValue, errors);
std::unique_ptr<DispatcherBase::WeakPtr> weak = weakPtr();
DispatchResponse response = m_backend->start(std::move(in_traceConfig));
if (weak->get())
weak->get()->sendResponse(callId, response);
return;
}
start 里調用了 m_backend->start,根據架構圖可知道 m_backend 的值是 TracingAgent 對象。
DispatchResponse TracingAgent::start(std::unique_ptr<protocol::NodeTracing::TraceConfig> traceConfig) {
std::set<std::string> categories_set;
protocol::Array<std::string>* categories = traceConfig->getIncludedCategories();
for (size_t i = 0; i < categories->length(); i++)
categories_set.insert(categories->get(i));
tracing::AgentWriterHandle* writer = GetTracingAgentWriter();
if (writer != nullptr) {
trace_writer_ =
writer->agent()->AddClient(categories_set,
std::make_unique<InspectorTraceWriter>(
frontend_object_id_, main_thread_),
tracing::Agent::kIgnoreDefaultCategories);
}
return DispatchResponse::OK();
}
最終通過 AddClient 往 tracing 系統注冊了一個消費者。當tracing 系統產生數據時,就會通過 InspectorTraceWriter 進行消費,看一下這個 InspectorTraceWriter 對象的核心邏輯。
void AppendTraceEvent(
v8::platform::tracing::TraceObject* trace_event) override {
if (!json_writer_)
json_writer_.reset(TraceWriter::CreateJSONTraceWriter(stream_, "value"));
json_writer_->AppendTraceEvent(trace_event);
}
void Flush(bool) override {
if (!json_writer_)
return;
json_writer_.reset();
std::ostringstream result(
"{\"method\":\"NodeTracing.dataCollected\",\"params\":",
std::ostringstream::ate);
result << stream_.str();
result << "}";
main_thread_->Post(std::make_unique<SendMessageRequest>(frontend_object_id_, result.str()));
stream_.str("");
}
tracing 系統調用 AppendTraceEvent 進行數據的消費,不過這些數據會先緩存到內存,然后再調用 Flush 通知真正的消費者,在 Flush 函數里我們可以看到,通過發(fā)送一個 SendMessageRequest 觸發(fā)了 NodeTracing.dataCollected 事件。接著看一下 SendMessageRequest 的邏輯。
void Call(MainThreadInterface* thread) override {
DeletableFrontendWrapper* frontend_wrapper = static_cast<DeletableFrontendWrapper*>(thread->GetObjectIfExists(object_id_));
if (frontend_wrapper == nullptr) return;
auto frontend = frontend_wrapper->get();
if (frontend != nullptr) {
frontend->sendRawJSONNotification(message_);
}
}
void Frontend::sendRawJSONNotification(String notification){
m_frontendChannel->sendProtocolNotification(InternalRawNotification::fromJSON(std::move(notification)));
}
Call 又調用了 m_frontendChannel->sendRawJSONNotification,根據架構圖,m_frontendChannel 的值是 ChannelImpl。最后通過 ChannelImpl 通知用戶側。接著看 stop 的邏輯。
DispatchResponse TracingAgent::stop() {
trace_writer_.reset();
frontend_->tracingComplete();
return DispatchResponse::OK();
}
首先看一下 trace_writer_.reset()。
void AgentWriterHandle::reset() {
if (agent_ != nullptr)
agent_->Disconnect(id_);
agent_ = nullptr;}void Agent::Disconnect(int client) {
if (client == kDefaultHandleId) return;
{
Mutex::ScopedLock lock(initialize_writer_mutex_);
to_be_initialized_.erase(writers_[client].get());
}
ScopedSuspendTracing suspend(tracing_controller_.get(), this);
writers_.erase(client);
categories_.erase(client);
}
接著看 ScopedSuspendTracing。
ScopedSuspendTracing(TracingController* controller, Agent* agent,
bool do_suspend = true)
: controller_(controller), agent_(do_suspend ? agent : nullptr) {
if (do_suspend) {
CHECK(agent_->started_);
controller->StopTracing();
}
}
void TracingController::StopTracing() {
base::MutexGuard lock(mutex_.get());
trace_buffer_->Flush();
}
把所有數據 Flush 到用戶側后觸發(fā) tracingComplete 事件。
void Frontend::tracingComplete(){
if (!m_frontendChannel)
return;
m_frontendChannel->sendProtocolNotification(InternalResponse::createNotification("NodeTracing.tracingComplete"));
}
時間關系,大概介紹到這。