聊一聊 Libuv 的信號機制
本文介紹 Libuv 是如何基于操作系統(tǒng)底層的能力實現(xiàn)信號模塊的,看一下如何在 Libuv 中使用信號模塊。
#include "uv.h"
#include "stdio.h"
#include <unistd.h>
void signal_cb(uv_signal_t* handleint sig) {
printf("receive signal\n");
uv_signal_stop(handle);
}
int main() {
printf("%d\n", getpid());
fflush(stdout);
uv_loop_t loop;
uv_signal_t signal;
uv_loop_init(&loop);
uv_signal_init(&loop, &signal);
uv_signal_start(&signal, signal_cb, SIGUSR1);
uv_run(&loop, UV_RUN_DEFAULT);
return 0;
}
通過 gcc main.c -luv && ./a.out 編譯執(zhí)行上面代碼,然后執(zhí)行 kill -SIGUSR1 pid 給該進程發(fā)送信號,可以看到會輸出 receive signal。接著來分析具體的實現(xiàn)。
初始化
Libuv 在初始化第一個事件循環(huán)結(jié)構(gòu)體時會初始化信號處理的相關(guān)結(jié)構(gòu)體。
void uv__signal_global_once_init(void) {
uv_once(&uv__signal_global_init_guard, uv__signal_global_init);
}
因為信號處理是支持多線程的,所以這里用了 uv_once 保證只執(zhí)行一次 uv__signal_global_init。
static void uv__signal_global_init(void) {
if (uv__signal_lock_pipefd[0] == -1)
// 如果在子線程里調(diào)用了 fork,則需要在 fork 后的子進程調(diào)用 uv__signal_global_reinit 重新初始化相關(guān)數(shù)據(jù)結(jié)構(gòu)
if (pthread_atfork(NULL, NULL, &uv__signal_global_reinit))
abort();
uv__signal_global_reinit();
}
繼續(xù)看 uv__signal_global_reinit。
static void uv__signal_global_reinit(void) {
// 清除之前的狀態(tài)
uv__signal_cleanup();
// 創(chuàng)建兩個 fd 用于加鎖 / 解鎖,工作方式是阻塞模式
if (uv__make_pipe(uv__signal_lock_pipefd, 0))
abort();
// 修改鎖為解鎖狀態(tài)
if (uv__signal_unlock())
abort();
}
初始化部分沒有太多的邏輯,只是初始化一些數(shù)據(jù)結(jié)構(gòu)。
加鎖 / 解鎖
因為 Libuv 用一棵全局的紅黑樹維護了信號和訂閱者的關(guān)系,而多個線程可以訪問這個全局的數(shù)據(jù)結(jié)構(gòu),所以需要加鎖訪問,接著看看 Libuv 的鎖是怎么實現(xiàn)的,下面是加鎖的實現(xiàn)。
static int uv__signal_lock(void) {
int r;
char data;
do {
r = read(uv__signal_lock_pipefd[0], &data, sizeof data);
} while (r < 0 && errno == EINTR);
return (r < 0) ? -1 : 0;
}
下面是解鎖的實現(xiàn)。
static int uv__signal_unlock(void) {
int r;
char data = 42;
do {
r = write(uv__signal_lock_pipefd[1], &data, sizeof data);
} while (r < 0 && errno == EINTR);
return (r < 0) ? -1 : 0;
}
剛才介紹初始化過程時說到了 uv__signal_lock_pipefd 是一個通信管道,Libuv 的加鎖解鎖正是通過 uv__signal_lock_pipefd 實現(xiàn)的,管道初始化時會先寫入一個字節(jié)表示處于解鎖狀態(tài),加鎖時會讀出這一個字節(jié),表示加鎖成功,然后解鎖時再次寫入一個字節(jié)。因為讀寫一個字節(jié)是原子的,所以這就實現(xiàn)了加鎖/解鎖的能力,保證多線程訪問時的安全問題。
那么為什么 Libuv 不使用傳統(tǒng)的 mutex 來實現(xiàn)多線程安全訪問呢?這里涉及到一個概念叫做異步信號安全,它表示一個函數(shù)可以安全地在信號處理函數(shù)中使用,因為信號是異步發(fā)生的并且信號處理函數(shù)具有非常高的優(yōu)先級,假設(shè)進程正在執(zhí)行 a 函數(shù)修改一些數(shù)據(jù),突然收到信號然后執(zhí)行信號處理函數(shù),處理函數(shù)中又執(zhí)行了 a 函數(shù)修改數(shù)據(jù),這時候可能會導(dǎo)致問題。解決這個問題的方式通常有兩種:
- 在信號處理函數(shù)里只調(diào)用異步信號安全的函數(shù)。
- 在執(zhí)行非異步信號安全的函數(shù)時屏蔽信號,避免在信號處理函數(shù)里再次執(zhí)行該函數(shù)。
因為 Libuv 在信號處理函數(shù)里需要訪問全局?jǐn)?shù)據(jù)結(jié)構(gòu),而 mutex 相關(guān)的函數(shù)不是異步信號安全的,所以不能使用 mutex 實現(xiàn),而是通過 read / write 實現(xiàn)(它們是異步信號安全的函數(shù))。
信號屏蔽
加鎖解鎖解決了多個線程訪問全局?jǐn)?shù)據(jù)結(jié)構(gòu)的問題,但是還有一個問題是同線程的數(shù)據(jù)競爭訪問問題?這里大家可能會好奇,單線程內(nèi)的代碼是順序執(zhí)行的,為什么會存在數(shù)據(jù)競爭訪問?原因是信號機制的存在,比如我們正在執(zhí)行 a 函數(shù)修改數(shù)據(jù)結(jié)構(gòu),突然收到了一個信號,然后在信號處理函數(shù)里又執(zhí)行 a 函數(shù)修改數(shù)據(jù)結(jié)構(gòu),這樣可能就會導(dǎo)致問題,所以在執(zhí)行某些函數(shù)時需要先屏蔽信號,執(zhí)行完后再允許接收信號。我們看看相關(guān)的處理邏輯。
static void uv__signal_block_and_lock(sigset_t* saved_sigmask) {
sigset_t new_mask;
// 把 new_mask 所有比特位設(shè)置為 1
if (sigfillset(&new_mask))
abort();
// 屏蔽(當(dāng)前線程的)所有信號
if (pthread_sigmask(SIG_SETMASK, &new_mask, saved_sigmask))
abort();
// 加鎖
if (uv__signal_lock())
abort();
}
為什么需要屏蔽所有信號呢?因為執(zhí)行 uv__signal_block_and_lock 后,需要往操作系統(tǒng)注冊信號處理函數(shù),如果剛注冊完信號處理函數(shù),還沒有執(zhí)行 uv__signal_unlock 釋放鎖,這時候突然收到一個信號,然后在信號處理函數(shù)中又嘗試加鎖則會導(dǎo)致死鎖。過程大致如下:
- 加鎖成功,注冊信號處理函數(shù)到操作系統(tǒng)。
- 時鐘中斷觸發(fā),觸發(fā)進程調(diào)度,當(dāng)前進程事件片還沒到,繼續(xù)執(zhí)行。
- 進程調(diào)度完后,發(fā)現(xiàn)有信號需要處理,然后執(zhí)行信號處理函數(shù)。
- 信號處理函數(shù)嘗試加鎖,但是鎖已經(jīng)被持有,然后進入等待狀態(tài)。
- 因為信號處理函數(shù)沒有返回,導(dǎo)致后續(xù)的代碼無法執(zhí)行,進程因為無法進行解鎖操作,最終陷入死鎖。
初始化信號結(jié)構(gòu)體
int uv_signal_init(uv_loop_t* loop, uv_signal_t* handle) {
int err;
err = uv__signal_loop_once_init(loop);
if (err)
return err;
uv__handle_init(loop, (uv_handle_t*) handle, UV_SIGNAL);
handle->signum = 0;
handle->caught_signals = 0;
handle->dispatched_signals = 0;
return 0;
}
初始化的邏輯很簡單,只是做一些字段的初始化,但是有一個比較重要的邏輯是 uv__signal_loop_once_init。
static int uv__signal_loop_once_init(uv_loop_t* loop) {
int err;
// 已經(jīng)初始化過了,直接返回
if (loop->signal_pipefd[0] != -1)
return 0;
// 創(chuàng)建一個非阻塞模式的通信管道
err = uv__make_pipe(loop->signal_pipefd, UV_NONBLOCK_PIPE);
if (err)
return err;
// 初始化 IO 觀察者
uv__io_init(&loop->signal_io_watcher,
uv__signal_event,
loop->signal_pipefd[0]);
// 注冊 IO 觀察者
uv__io_start(loop, &loop->signal_io_watcher, POLLIN);
return 0;
}
uv__signal_loop_once_init 的作用是創(chuàng)建一個通信管道,然后注冊一個 IO 觀察者到事件循環(huán)中,當(dāng)收到信號時,信號處理函數(shù)就會通過這個管道通知事件循環(huán),事件循環(huán)會在某個階段通知信號的訂閱者。
訂閱信號
訂閱信號可以通過下面兩個函數(shù)。
int uv_signal_start(uv_signal_t* handle, uv_signal_cb signal_cb, int signum) {
return uv__signal_start(handle, signal_cb, signum, 0);
}
int uv_signal_start_oneshot(uv_signal_t* handle,
uv_signal_cb signal_cb,
int signum) {
return uv__signal_start(handle, signal_cb, signum, 1);
}
最終是由 uv__signal_start 實現(xiàn)的,其實 oneshot 表示最多只執(zhí)行一次信號處理函數(shù)。
static int uv__signal_start(uv_signal_t* handle,
uv_signal_cb signal_cb,
int signum,
int oneshot) {
sigset_t saved_sigmask;
int err;
uv_signal_t* first_handle;
// 之前已經(jīng)監(jiān)聽過這個信號,這里只需要更新下回調(diào)就行
if (signum == handle->signum) {
handle->signal_cb = signal_cb;
return 0;
}
// 如果之前監(jiān)聽過了,先刪除,比如同一個 handle 監(jiān)聽了另一個信號
if (handle->signum != 0) {
uv__signal_stop(handle);
}
uv__signal_block_and_lock(&saved_sigmask);
// 注冊信號,待會分析
uv__signal_unlock_and_unblock(&saved_sigmask);
return 0;
}
uv__signal_start 首先做了一些前置判斷,然后調(diào) uv__signal_block_and_lock 加鎖和屏蔽所有的信號,加鎖主要是準(zhǔn)備要修改共享的數(shù)據(jù)結(jié)構(gòu),避免多線程引起的問題,屏蔽所有的信號則是因為信號處理函數(shù)也會訪問這個數(shù)據(jù)結(jié)構(gòu),所以需要避免它的執(zhí)行,否則會引起死鎖問題。接著分析信號注冊的邏輯。
// 查找這個信號對應(yīng)的 handle
first_handle = uv__signal_first_handle(signum);
// 還沒注冊過則直接注冊
// 之前注冊過且設(shè)置了 UV_SIGNAL_ONE_SHOT 標(biāo)記,但是當(dāng)前注冊的還沒有設(shè)置 UV_SIGNAL_ONE_SHOT 則注冊
if (first_handle == NULL ||
(!oneshot && (first_handle->flags & UV_SIGNAL_ONE_SHOT))) {
uv__signal_register_handler(signum, oneshot);
}
// oneshot 表示訂閱者最多只被通知一次
if (oneshot)
handle->flags |= UV_SIGNAL_ONE_SHOT;
// 插入紅黑樹
RB_INSERT(uv__signal_tree_s, &uv__signal_tree, handle);
handle->signum = signum;
handle->signal_cb = signal_cb;
uv__handle_start(handle);
注冊信號包括兩步。
第一步是注冊到操作系統(tǒng)。
static int uv__signal_register_handler(int signum, int oneshot) {
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
if (sigfillset(&sa.sa_mask))
abort();
// 設(shè)置信號處理函數(shù)
sa.sa_handler = uv__signal_handler;
sa.sa_flags = SA_RESTART;
// oneshot 則設(shè)置 SA_RESETHAND,操作系統(tǒng)執(zhí)行完信號處理函數(shù)后會重置為默認處理行為
if (oneshot)
sa.sa_flags |= SA_RESETHAND;
// 注冊到操作系統(tǒng)
if (sigaction(signum, &sa, NULL))
return UV__ERR(errno);
return 0;
}
uv__signal_register_handler 實現(xiàn)了信號的注冊,Libuv 并不是每次注冊信號時都會執(zhí)行 uv__signal_register_handler,而是做了一個優(yōu)化,只有滿足兩個條件才會注冊信號到操作系統(tǒng)。
1.還沒注冊過信號:這個是很自然的邏輯,不需要分析。
2.!oneshot && (first_handle->flags & UV_SIGNAL_ONE_SHOT)):
a.oneshot 為 true:則不論之前的 handle 是否設(shè)置了 UV_SIGNAL_ONE_SHOT 都不需要調(diào)操作系統(tǒng)進行注冊了,因為之前已經(jīng)注冊過了,并且保證設(shè)置了 UV_SIGNAL_ONE_SHOT 的 handle 可以被執(zhí)行。
b.oneshot 為 false,first_handle->flags & UV_SIGNAL_ONE_SHOT 為 false:之前的 handle 沒有設(shè)置 UV_SIGNAL_ONE_SHOT,則也不需要調(diào)操作系統(tǒng)注冊信號了,因為之前已經(jīng)注冊過了,并且保證所有的 handle 可以觸發(fā)多次。
c.oneshot 為 false,first_handle->flags & UV_SIGNAL_ONE_SHOT 為 true:如果之前注冊的信號設(shè)置了 UV_SIGNAL_ONE_SHOT 但是本次需要注冊的沒有設(shè)置該 flag,則需要調(diào)用 uv__signal_register_handler 重新進行注冊,因為設(shè)置了 UV_SIGNAL_ONE_SHOT 的話操作系統(tǒng)執(zhí)行完一次自定義的信號處理函數(shù)后就不會再執(zhí)行了,這樣會導(dǎo)致沒有設(shè)置 UV_SIGNAL_ONE_SHOT 的訂閱者得不到通知。
大家可能會疑惑,這里為什么只需要判斷第一個 handle,因為紅黑樹的查找時會先找沒有設(shè)置 UV_SIGNAL_ONE_SHOT 的 handle,然后再找設(shè)置了 UV_SIGNAL_ONE_SHOT 的 handle,所以如果找到的第一個 handle 設(shè)置了 UV_SIGNAL_ONE_SHOT,那說明所有 handle 都設(shè)置了 UV_SIGNAL_ONE_SHOT。
第一步注冊完信號后,第二步是注冊到 Libuv 維護的紅黑樹,因為一個信號最多只能注冊一個處理函數(shù),為了支持一個信號可以有多個訂閱者,Libuv 自己維護了訂閱者,然后把信號處理函數(shù)統(tǒng)一注冊為 uv__signal_handler,然后在收到信號時再由 uv__signal_handler 進行處理和分發(fā)信號。
停止訂閱信號
停止訂閱信號的最終實現(xiàn)函數(shù)是 uv__signal_stop。
static void uv__signal_stop(uv_signal_t* handle) {
sigset_t saved_sigmask;
uv_signal_t* first_handle;
int rem_oneshot;
int first_oneshot;
uv__signal_block_and_lock(&saved_sigmask);
// 從紅黑樹中刪除該 handle
RB_REMOVE(uv__signal_tree_s, &uv__signal_tree, handle);
// 找到第一個訂閱了該信號的 handle
first_handle = uv__signal_first_handle(handle->signum);
// 沒有訂閱者了,則告訴操作系統(tǒng)收到該信號時不需要通知 Libuv 了
if (first_handle == NULL) {
uv__signal_unregister_handler(handle->signum);
} else {
// 判斷是否設(shè)置了 UV_SIGNAL_ONE_SHOT
rem_oneshot = handle->flags & UV_SIGNAL_ONE_SHOT;
first_oneshot = first_handle->flags & UV_SIGNAL_ONE_SHOT;
// 如果剩下的 handle 設(shè)置了 UV_SIGNAL_ONE_SHOT,但是當(dāng)前被刪除的 handle 沒有
// 設(shè)置 UV_SIGNAL_ONE_SHOT 則需要重新注冊信號處理函數(shù)為 oneshot
if (first_oneshot && !rem_oneshot) {
uv__signal_register_handler(handle->signum, 1);
}
}
uv__signal_unlock_and_unblock(&saved_sigmask);
handle->signum = 0;
uv__handle_stop(handle);
}
如果 first_oneshot 為 true 說明剩下的 handle 都是設(shè)置了 UV_SIGNAL_ONE_SHOT,如果 first_oneshot 為 true 并且 rem_oneshot 為 false 說明目前注冊到操作系統(tǒng)的信號函數(shù)沒有設(shè)置 oneshot,因為只要有一個 handle 沒有設(shè)置UV_SIGNAL_ONE_SHOT,那么注冊到操作系統(tǒng)的信號處理函數(shù)都不會設(shè)置 oneshot 標(biāo)記,這時候需要修改重新更新信號處理函數(shù)為 oneshot。
信號的處理
從前面的分析可以看到,信號的處理函數(shù)統(tǒng)一設(shè)置為 uv__signal_handler,所以收到信號時,操作系統(tǒng)就會執(zhí)行 uv__signal_handler。
// signum 為收到的信息
static void uv__signal_handler(int signum) {
uv__signal_msg_t msg;
uv_signal_t* handle;
int saved_errno;
saved_errno = errno;
memset(&msg, 0, sizeof msg);
// 需要加鎖,避免另一個線程在修改紅黑樹
uv__signal_lock();
// 找出 signum 對應(yīng)的訂閱者
for (handle = uv__signal_first_handle(signum);
handle != NULL && handle->signum == signum;
handle = RB_NEXT(uv__signal_tree_s, &uv__signal_tree, handle)) {
int r;
msg.signum = signum;
msg.handle = handle;
// 寫入消息通知事件循環(huán)
do {
r = write(handle->loop->signal_pipefd[1], &msg, sizeof msg);
} while (r == -1 && errno == EINTR);
}
uv__signal_unlock();
errno = saved_errno;
}
收到信號時并不是直接通知訂閱者,而是通知事件循環(huán),在事件循環(huán)的某個階段才會真正通知訂閱者。通知事件循環(huán)的方式是通過寫入多個消息到管道中,事件循環(huán)在 Poll IO 階段就會判斷這個管道可讀,從而讀出所有的消息進行處理。前面介紹初始化信號結(jié)構(gòu)體時說過,第一次初始化時會 uv__signal_loop_once_init 往事件循環(huán)中注冊一個 IO 觀察者,對應(yīng)的處理函數(shù)是 uv__signal_event。
static void uv__signal_event(uv_loop_t* loop,
uv__io_t* w,
unsigned int events) {
uv__signal_msg_t* msg;
uv_signal_t* handle;
char buf[sizeof(uv__signal_msg_t) * 32];
size_t bytes, end, i;
int r;
bytes = 0;
end = 0;
do {
// 讀出消息
r = read(loop->signal_pipefd[0], buf + bytes, sizeof(buf) - bytes);
bytes += r;
end = (bytes / sizeof(uv__signal_msg_t)) * sizeof(uv__signal_msg_t);
// 逐個處理消息
for (i = 0; i < end; i += sizeof(uv__signal_msg_t)) {
msg = (uv__signal_msg_t*) (buf + i);
handle = msg->handle;
// 執(zhí)行回調(diào)
if (msg->signum == handle->signum) {
handle->signal_cb(handle, handle->signum);
}
// 設(shè)置了 UV_SIGNAL_ONE_SHOT,則解除訂閱關(guān)系
if (handle->flags & UV_SIGNAL_ONE_SHOT)
uv__signal_stop(handle);
}
bytes -= end;
if (bytes) {
memmove(buf, buf + end, bytes);
continue;
}
} while (end == sizeof buf);
}
信號的使用
信號的處理具有非常高的優(yōu)先級,這個能力在很多場景下非常有用。下面看一個簡單的場景。
#include <signal.h>
#include <unistd.h>
#include <stdio.h>
void handler(int s) {
printf("receive signal\n");
}
int main () {
printf("%d\n", getpid());
fflush(stdout);
signal(SIGUSR1, handler);
while(1) {}
return 0;
}
通過 gcc main.c -luv && ./a.out 編譯執(zhí)行上面代碼,然后再執(zhí)行 kill -SIGUSR1 pid(執(zhí)行 ./a.out 輸出的 pid),可以看到會輸出 receive signal,也就是說,盡管進程處于死循環(huán),信號機制依然可以正常工作。下面繼續(xù)來看一下兩個具體的應(yīng)用場景。
第一個是在 Node.js 中。假設(shè)業(yè)務(wù)中有以下一段代碼。
console.log(process.pid);
function a() {
while(1) {
b();
}
}
function b() {
while(1) {}
}
a();
有一天我們發(fā)現(xiàn)服務(wù)的某個進程處于 100% 了,那么我們應(yīng)該如何排查呢?我們知道 Node.js 是單線程的,JS 線程處于死循環(huán)時,是無法處理外部進來的請求了,也就意味著我們不能手動采集 CPU Profile 了。這時候信號機制的作用就來了,我們找到這個進程的 pid,然后執(zhí)行 kill -SIGUSR1 pid 會發(fā)現(xiàn) Node.js 的調(diào)試器(本質(zhì)上是創(chuàng)建了一個線程監(jiān)聽了一個端口)被打開了,通過 Chrome Dev Tools 連接上調(diào)試器我們就可以采集 CPU Profile 了(重點是打開調(diào)試器,采集方式很多種)。結(jié)果如下。
可以看到通過 Profile 我們就可以輕松分析出是哪里的代碼導(dǎo)致了死循環(huán),從而快速解決業(yè)務(wù)中的問題。
接著再看一個 GO 的例子。
package main
import (
"fmt"
"runtime"
)
func main() {
// 設(shè)置只有單個線程
runtime.GOMAXPROCS(1)
go func() {
for {
fmt.Println("worker goroutine")
}
}()
for {
}
}
在 go1.13 下執(zhí)行上面的代碼,沒有任務(wù)輸出,然后切換到 go1.23 再試試(可以通過 gvm 管理 Go 版本),可以看到不斷輸出 worker goroutine。為什么會這樣呢?Go 雖然通過協(xié)程原生支持了并發(fā),但是在單個線程中,如果一個 goroutine 正在執(zhí)行時,其他 goroutine 是無法執(zhí)行的,Go 為了避免 goroutine 饑餓問題,實現(xiàn)了搶占機制,但是早期實現(xiàn)的是基于協(xié)作式的搶占機制(比如 go1.13 版本),協(xié)作式搶占的原理是在函數(shù)中插入搶占代碼,goroutine 執(zhí)行到函數(shù)時 Go runtime 會判斷 goroutine 的事件片是不是用完了,用完了則進行調(diào)度,這種搶占機制受限于函數(shù),如果我們不執(zhí)行函數(shù)的話就繞過了這個檢測,比如上面的例子,鑒于這個限制,Go 現(xiàn)在已經(jīng)實現(xiàn)了基于信號的搶占式機制(比如 go1.23 版本),基于信號的搶占式機制正式使用了信號高優(yōu)先級的能力,盡管 goroutine 處于死循環(huán),Go runtime 依然有能力介入,從而實現(xiàn) goroutine 的調(diào)度。