Hello,大家好,之前說不打算更新公眾號(hào)了,后面有時(shí)間的話還是會(huì)偶爾更新下,記錄和分享下一些技術(shù)相關(guān)的內(nèi)容,今天分享下如何實(shí)現(xiàn)一個(gè) APM watchdog。
在 APM 中,保證及時(shí)并準(zhǔn)確地獲取應(yīng)用的信息是非常重要的,這樣才能保證應(yīng)用出現(xiàn)問題時(shí),我們可以高效地找到并解決問題。本文以之前提交給 Node.js 的 PR 為例,介紹如何實(shí)現(xiàn)一個(gè) APM watchdog 來對(duì)應(yīng)用進(jìn)行監(jiān)控。這個(gè) PR 的實(shí)現(xiàn)思想來自我們?cè)趦?nèi)部實(shí)現(xiàn)的 APM watchdog,但是因?yàn)檫壿嫃?fù)雜,目前暫時(shí)還沒有時(shí)間去推進(jìn)。
首先來看一下如何使用,然后看看一下如何實(shí)現(xiàn)。
new MemoryProfileWatchdog({
// 內(nèi)存閾值,達(dá)到該閾值則采集堆快照
maxRss: 1024 * 1024,
maxUsedHeapSize: 1024 * 1024,
// 輪詢間隔
interval: 1000,
// 快照寫到哪個(gè)文件
filename: filepath,
});
可以看到,啟動(dòng)一個(gè) watchdog 非常簡單,我們只需要配置一些監(jiān)控的閾值和輪訓(xùn)時(shí)間。監(jiān)控的數(shù)據(jù)是基于定時(shí)輪詢的,因?yàn)闆]有相關(guān)的訂閱發(fā)布機(jī)制,當(dāng) watchdog 監(jiān)控到數(shù)據(jù)達(dá)到閾值時(shí)就會(huì)采集堆快照,因?yàn)檫@里是一個(gè)內(nèi)存 watchdog,我們也可以實(shí)現(xiàn) CPU watchdog,原理是一樣的。接著看看實(shí)現(xiàn),首先看 JS 層的實(shí)現(xiàn)。
class MemoryProfileWatchdog {
#handle;
constructor(options) {
this.#handle = new profiler.MemoryProfileWatchdog({
...options,
filename,
});
this.#handle.start();
}
stop() {
if (this.#handle) {
this.#handle.stop();
this.#handle = null;
}
}
}
JS 層的實(shí)現(xiàn)非常簡單,只是對(duì) C++ 層的簡單封裝,所以直接來看 C++ 層的實(shí)現(xiàn),我們忽略一些細(xì)節(jié),只關(guān)注核心邏輯。
class ProfileWatchdog : public BaseObject {
public:
enum class ProfileWatchdogState { kInitialized, kRunning, kClosing, kClosed };
ProfileWatchdog(Environment* env, v8::Local<v8::Object> object);
~ProfileWatchdog() override;
static v8::Local<v8::FunctionTemplate> GetConstructorTemplate(Environment* env);
// 啟動(dòng) / 停止 watchdog
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
void Start(Environment* env);
void Stop();
// 提交一個(gè)任務(wù)
template <typename Fn>
void AddTask(Fn&& cb, CallbackFlags::Flags flags = CallbackFlags::Flags::kRefed);
// 處理一個(gè)任務(wù)
void HandleTasks();
// 啟動(dòng)一個(gè)定時(shí)器
void SetTimeout();
// 定時(shí)器回調(diào),具體的邏輯由子類實(shí)現(xiàn)
virtual bool TimeoutHandler() = 0;
protected:
// 輪詢間隔
uint64_t interval_;
private:
static void Run(void* arg);
static void Timer(uv_timer_t* timer);
// 子線程
uv_thread_t thread_;
uv_loop_t loop_;
// 主線程和子線程的通信結(jié)構(gòu)體
uv_async_t async_;
// 定時(shí)器
uv_timer_t timer_;
// 任務(wù)隊(duì)列
CallbackQueue<void> tasks_;
Mutex task_mutex_;
};
ProfileWatchdog 實(shí)現(xiàn)了 watchdog 機(jī)制,具體需要監(jiān)控什么數(shù)據(jù)由子類實(shí)現(xiàn),比如內(nèi)存 watchdog。
class MemoryProfileWatchdog : public ProfileWatchdog {
public:
MemoryProfileWatchdog(Environment* env,
v8::Local<v8::Object> object,
v8::Local<v8::Object> options);
static void Init(Environment* env, v8::Local<v8::Object> target);
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
bool TimeoutHandler() override;
private:
// 需要監(jiān)控的數(shù)據(jù)指標(biāo)
size_t max_rss_ = 0;
size_t max_used_heap_size_ = 0;
std::string filename_;
};
有了基本的了解后,接下來看具體實(shí)現(xiàn)。
void ProfileWatchdog::Start(Environment* env) {
int rc;
// 初始化一個(gè)事件循環(huán)結(jié)構(gòu)體
rc = uv_loop_init(&loop_);
// 初始化線程間通信結(jié)構(gòu)體
rc = uv_async_init(&loop_, &async_, [](uv_async_t* task_async) {
ProfileWatchdog* w = ContainerOf(&ProfileWatchdog::async_, task_async);
w->HandleTasks();
});
// 初始化并啟動(dòng)一個(gè)定時(shí)器
rc = uv_timer_init(&loop_, &timer_);
rc = uv_timer_start(&timer_, &ProfileWatchdog::Timer, interval_, 0);
// 創(chuàng)建 watchdog 線程
rc = uv_thread_create(&thread_, &ProfileWatchdog::Run, this);
}
當(dāng)啟動(dòng)一個(gè) watchdog 時(shí)就會(huì)執(zhí)行 Start,Start 函數(shù)中主要初始化了線程間通信的結(jié)構(gòu)體,然后啟動(dòng)一個(gè)定時(shí)器,最后創(chuàng)建一個(gè) watchdog 線程。因?yàn)?Node.js 是單線程的,為了保證 watchdog 在 JS 繁忙時(shí)仍可正常工作,我們需要借助子線程。創(chuàng)建子線程后,子線程就會(huì)開始執(zhí)行 ProfileWatchdog::Run。
void ProfileWatchdog::Run(void* arg) {
ProfileWatchdog* wd = static_cast<ProfileWatchdog*>(arg);
uv_run(&wd->loop_, UV_RUN_DEFAULT);
CheckedUvLoopClose(&wd->loop_);
}
Run 的邏輯很簡單,就是啟動(dòng)一個(gè)事件循環(huán),因?yàn)槲覀兦懊鎲?dòng)了一個(gè)定時(shí)器,所以這個(gè)事件循環(huán)里就會(huì)定時(shí)執(zhí)行定時(shí)器回調(diào) ProfileWatchdog::Timer。
void ProfileWatchdog::Timer(uv_timer_t* timer) {
ProfileWatchdog* w = ContainerOf(&ProfileWatchdog::timer_, timer);
// 往主線程插入一個(gè)任務(wù)
env->RequestInterrupt([watchdog = std::move(w)](Environment* env) {
// 執(zhí)行定時(shí)器的邏輯,由具體的 watchdog 實(shí)現(xiàn),返回 true 表示重啟定時(shí)器,否則監(jiān)控到此為止
if (watchdog->TimeoutHandler()) {
// 往子線程里插入一個(gè)任務(wù),該任務(wù)是重啟定時(shí)器
watchdog->AddTask(
[watchdog = std::move(watchdog)]() { watchdog->SetTimeout(); });
}
});
}
Timer 中通過 env->RequestInterrupt 往主線程插入一個(gè)任務(wù),因?yàn)橛行┐a是不能在子線程里執(zhí)行的,另外 RequestInterrupt 可以保證在 JS 繁忙或阻塞在事件驅(qū)動(dòng)模塊時(shí)仍然可以執(zhí)行我們的任務(wù),那么這個(gè)任務(wù)具體做什么呢?看看內(nèi)存 watchdog 的 TimeoutHandler 實(shí)現(xiàn)。
bool MemoryProfileWatchdog::TimeoutHandler() {
bool reached = false;
if (max_rss_) {
size_t rss = 0;
uv_resident_set_memory(&rss);
if (rss >= max_rss_) {
reached = true;
}
}
if (!reached && max_used_heap_size_) {
Isolate* isolate = env()->isolate();
HeapStatistics heap_statistics;
isolate->GetHeapStatistics(&heap_statistics);
if (heap_statistics.used_heap_size() >= max_used_heap_size_) {
reached = true;
}
}
// 內(nèi)存達(dá)到閾值,采集快照
if (reached) {
HeapProfiler::HeapSnapshotOptions options;
options.numerics_mode = HeapProfiler::NumericsMode::kExposeNumericValues;
options.snapshot_mode = HeapProfiler::HeapSnapshotMode::kExposeInternals;
heap::WriteSnapshot(env(), filename_.c_str(), options);
// 采集完快照,停止 watchdog
return false;
}
return true;
}
TimeoutHandler 就是獲取主線程的內(nèi)存信息,并判斷是否超過了我們配置的閾值,是的話則采集堆快照并停止 watchdog,防止采集過多的重復(fù)信息,我們也可以改成隔久一點(diǎn)再開始重新監(jiān)控,而內(nèi)存如果沒有超過閾值,則重啟定時(shí)器,等待下一輪判斷。從前面的代碼可以看到,如果沒有達(dá)到閾值,我們會(huì)調(diào)用 AddTask 往子線程插入一個(gè)任務(wù)。
watchdog->AddTask([watchdog = std::move(watchdog)]() {
watchdog->SetTimeout();
});
看一下 AddTask 的實(shí)現(xiàn)。
template <typename Fn>
void ProfileWatchdog::AddTask(Fn&& cb, CallbackFlags::Flags flags) {
auto callback = tasks_.CreateCallback(std::move(cb), flags);
{
Mutex::ScopedLock lock(task_mutex_);
// 追加一個(gè)任務(wù)
tasks_.Push(std::move(callback));
}
// 通知子線程有任務(wù)處理
uv_async_send(&async_);
}
AddTask 往子線程的任務(wù)隊(duì)列中插入一個(gè)任務(wù),并通知子線程處理,接著看看子線程如何處理任務(wù)。
void ProfileWatchdog::HandleTasks() {
while (tasks_.size() > 0) {
CallbackQueue<void> queue;
{
Mutex::ScopedLock lock(task_mutex_);
queue.ConcatMove(std::move(tasks_));
}
while (auto head = queue.Shift()) head->Call();
}
}
HandleTasks 會(huì)逐個(gè)任務(wù)處理,也就是執(zhí)行一個(gè)個(gè)函數(shù),我們剛才插入的函數(shù)如下。
void ProfileWatchdog::SetTimeout() {
uv_timer_start(&timer_, &ProfileWatchdog::Timer, interval_, 0);
}
也就是重啟定時(shí)器,這樣就開始等待下次超時(shí),直到觸發(fā)了閾值。
這就是 APM watchdog 的實(shí)現(xiàn)原理,核心思想是利用子線程和 env->RequestInterrupt 機(jī)制,保證我們對(duì)目的線程進(jìn)行相對(duì)實(shí)時(shí)的監(jiān)控(取決于設(shè)置的輪詢時(shí)間),并在發(fā)現(xiàn)問題采集相關(guān)信息來協(xié)助我們排查問題,利用這個(gè)思路,我們可以實(shí)現(xiàn)不同類型的 watchdog 來解決不同的問題,比如 CPU watchdog 可以在 JS 死循環(huán)時(shí)采集 CPU Profile 信息幫助我們找到有問題的代碼,本文就分享到這里,最后貼上目前的實(shí)現(xiàn) PR(見文章末尾)。因?yàn)樯婕暗蕉嗑€程和 Node.js 內(nèi)部的一些知識(shí),實(shí)現(xiàn)起來有很多地方需要考慮的,希望后面有時(shí)間繼續(xù)推進(jìn)。
PR:https://github.com/nodejs/node/pull/45714