Linux高性能網(wǎng)絡(luò)編程十談 | 信號和定時器
在Linux網(wǎng)絡(luò)編程中,信號處理和定時器是經(jīng)常遇到的功能,在聊這塊內(nèi)容之前如果您看過上一篇文章《Linux高性能網(wǎng)絡(luò)編程十談|IO復(fù)用和模式》,應(yīng)該比較完整的了解epoll了,但是這里還遺漏了一個知識點,那開始先補上這個坑。
關(guān)于epoll驚群問題,什么是驚群呢?
比如我們在寫代碼過程中,使用兩個線程的epoll監(jiān)聽socket,當socket上有事件發(fā)生時,兩個epoll都會被喚醒,導(dǎo)致會操作同一個socket,這就是驚群,那如何解決呢?
(1)使用EPOLLEXCLUSIVE:EPOLLEXCLUSIVE是epoll的擴展選項,它允許一個線程獨占一個epoll實例,從而避免了epoll的驚群問題;
(2)使用EPOLLONESHOT:對于注冊了EPOLLONESHOT事件的文件描述符,操作系統(tǒng)最多觸發(fā)一個可讀,可寫或者異常事件,且觸發(fā)一次,這樣就能確保一個線程獲取事件并處理,但是需要注意的是對于監(jiān)聽類型(如accept)不能使用EPOLLONESHOT,否則就不能持續(xù)監(jiān)聽連接,對于處理完了的非監(jiān)聽事件,需要重置EPOLLONESHOT;
第一部分:信號
1、發(fā)送信號給進程
#include <sys/types.h>
#include <signal.h>
int kill(pid_t pid, int sig);
(1)pid的取值和含義如下:
(2)sig的取值和含義如下(在linux命令行使用kill -l查看取值,這里列幾個經(jīng)常使用的):
2、信號回調(diào)函數(shù)
#include <signal.h>
typedef void (&__sighandler_t) (int);
__sighandler_t signal(int sig, __sighandler_t _handler);
int sigaction(int sig, const struct sigaction *act, struct sigaction *oact);
(1)__sighandler_t信號處理的函數(shù)指針,其中處理參數(shù)為觸發(fā)信號當前值,其中有兩個默認宏(SIG_DFL:使用信號默認處理,SIG_IGN:忽略目標信號);
(2)signal注冊信號回調(diào)處理函數(shù),返回值為一個函數(shù)指針,含義是這個信號上一次處理的回調(diào)函數(shù)或者是系統(tǒng)默認的處理函數(shù),這里目的是讓用戶可以自己恢復(fù)信號處理方式,比如系統(tǒng)對于一些信號是殺掉進程的,這里就應(yīng)該處理完自己的回調(diào)邏輯后再調(diào)用系統(tǒng)默認行為;
(3)sigaction函數(shù)的功能是檢查或修改與指定信號相關(guān)聯(lián)的處理動作,使用樣例如下:
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <signal.h>
int main()
{
struct sigaction newact, oldact;
newact.sa_handler = SIG_IGN; // 設(shè)置信號忽略,也可以設(shè)置為處理函數(shù)
sigemptyset(&newact.sa_mask);
newact.sa_flags = 0;
int count = 0;
pid_t pid = 0;
sigaction(SIGINT, &newact, &oldact); // 原始的備份到oldact,為后續(xù)的處理恢復(fù)
pid = fork();
if (pid == 0)
{
while(1)
{
printf("child exec ...\n");
sleep(1);
}
return 0;
}
while (1)
{
if (count++ > 3)
{
sigaction(SIGINT, &oldact, NULL); // 恢復(fù)父進程信號處理方式
kill(pid, SIGKILL); // 父進程發(fā)信號給子進程
}
printf("father exec ...\n");
sleep(1);
}
return 0;
}
第二部分:定時器
在Linux網(wǎng)絡(luò)編程中,定時器的作用主要是管理定時任務(wù),處理過期連接,檢測超時隊列等,那我們可以通過哪些方式實現(xiàn)定時器呢?
1、利用系統(tǒng)API
...
setsockopt(socketfd, SOL_SOCKET, SO_SNDTIMEO, &timeout, len);
setsockopt(socketfd, SOL_SOCKET, SO_RCVTIMEO, &timeout, len);
int number = epoll_wait(fd, events, MAX_EVENT_NUMBER, timeout);
...
通過使用socket的參數(shù),設(shè)置連接句柄的發(fā)送和接收數(shù)據(jù)超時時間,可以實現(xiàn)定時處理:
(1)SO_SNDTIMEO發(fā)送數(shù)據(jù)超時時間,根據(jù)timeout設(shè)置;
(2)SO_RCVTIMEO接收數(shù)據(jù)超時時間,根據(jù)timeout設(shè)置;
IO復(fù)用的參數(shù)中都帶了一個timeout參數(shù),可以設(shè)置來達到定時觸發(fā)分支邏輯,比如epoll_wait;
2、簡單的定時器
(1)啟動一個線程實現(xiàn)定時器,具體實現(xiàn)如下圖:
- 主線程啟動,開始執(zhí)行任務(wù),這里可以是網(wǎng)絡(luò)收發(fā)或者其他;
- 啟動一個線程,做定時任務(wù)處理使用;
- 主線程需要增加定時任務(wù),可以將任務(wù)封裝為task,添加到任務(wù)隊列中;
- 同時通知定時線程,隊列中有任務(wù)了,這里通知機制可以是信號量或者廣播方式;
- 定時線程取出隊列中任務(wù),判斷當前任務(wù)是否過期,如果過期就執(zhí)行,沒有過期就繼續(xù)放入任務(wù)隊列中,同時這里需要讓線程等待隊列中距離下一個周期最短的時間,繼續(xù)取隊列任務(wù);
(2)使用epoll_wait設(shè)置timeout,是在網(wǎng)絡(luò)事件觸發(fā)的定時器中最方便的方式,具體邏輯如下:
...
start_timer = ... // 開始執(zhí)行時間
while (true) {
int number = epoll_wait(epfd, events, MAX_EVENT_NUMBER, timeout);
for (...) {
...
// 處理連接任務(wù)
...
}
end_timer = ... // epoll_wait返回并處理任務(wù)時間
// 處理定時任務(wù),判斷當前時間是否在一個timeout
if (end_timer - start_timer > timeout) { // 這里是偽代碼,具體時間判斷可以參考linux結(jié)構(gòu)體
...
// 啟動線程執(zhí)行定時任務(wù)邏輯
...
}
}
3、時間輪
時間輪是一種高效定時器,通過類似圓盤的形式定義每個tick,定時轉(zhuǎn)動圓盤,假設(shè)每次tick時間為si,一個時間輪有N個tick,那么執(zhí)行轉(zhuǎn)動一圈時間為N*si;
現(xiàn)在插入一個任務(wù),需要to1時間周期后執(zhí)行,這里就分情況處理:
(1)如果to1 < N*si,則需要分配到(當前時間輪的位置 + to1 / si)的位置上,等待自然tick到達執(zhí)行當前to1的定時任務(wù);
(2)如果to1 > N*si,則需要分配到(當前時間輪的位置 + (to1 % N) / si + N)的位置上,由于to1執(zhí)行時間超過一輪的周期,所以需要等待多輪轉(zhuǎn)動后才能執(zhí)行,那如何處理呢?因此我們將每個輪的tick上掛一個鏈表,這個鏈表的節(jié)點表示到達這個tick需要執(zhí)行的任務(wù)to1,這里的節(jié)點有可能是大于一個輪轉(zhuǎn)動的事件周期,也可能就是當前輪時間周期內(nèi)執(zhí)行,我們只需要當事件到達tick時,取出鏈表遍歷鏈表節(jié)點to1,判斷是否是當前事件周期內(nèi)執(zhí)行,如果是摘除鏈表節(jié)點然后執(zhí)行任務(wù),如果不是則重新計算to1需要多久后執(zhí)行,計算方法就和上面的一樣(當前時間輪位置 + ((to1 - 鏈表最小的周期時間) % N) / si + N),然后將當前鏈表節(jié)點重新放回;
事件輪
4、時間堆
堆的數(shù)據(jù)結(jié)構(gòu)應(yīng)該大家都比較熟悉了,堆是一種滿足以下條件的樹:
- 堆中某個節(jié)點的值總是不大于或不小于其父節(jié)點的值;
- 堆總是一棵完全二叉樹;
- 添加堆節(jié)點的時間復(fù)雜度O(lgn),刪除節(jié)點是O(lgn),獲取節(jié)點是O(1);
時間堆
(1)循環(huán)線程讀取最小時間堆的堆頂元素;
(2)取出最小節(jié)點,判斷當前事件是否過期,如果過期則繼續(xù)執(zhí)行,否則不處理;
(3)將最小節(jié)點對應(yīng)的事件丟給執(zhí)行線程執(zhí)行;
這里最小時間堆節(jié)點在代碼實現(xiàn)中可以用一個數(shù)組表示,使用完全二叉樹的排列。
#include<iostream>
void heapify(int arr[], int n, int i) {
if (i >= n) return;
int min_node = i;
int lson = i * 2 + 1;
int rson = i * 2 + 2;
if (lson < n && arr[min_node] > arr[lson]) { // 和左孩子比較,找到最小節(jié)點
min_node = lson;
}
if (rson < n && arr[min_node] > arr[rson]) { // 和右孩子比較,找到最小節(jié)點
min_node = rson;
}
if (min_node != i) {
swap(arr[min_node], arr[i]);
heapify(arr, n, min_node); // 遞歸處理
}
}
void heapSort(int arr[], int n) {
// 反向取出最后一個節(jié)點
int lastNode = n - 1;
int parent = (lastNode - 1) / 2;
for (int i = parent; i >= 0; i--) {
heapify(arr, n, i);
}
for (int i = n - 1; i >= 0; i--) {
swap(arr[i], arr[0]);
heapify(arr, i, 0); // 調(diào)整堆節(jié)點
}
}
int main() {
int arr[5] = { 70, 41, 10, 90, 18, 26 };
heap_sort(arr, sizeof(arr) / sizeof(arr[0]));
for (int i = 0; i < sizeof(arr) / sizeof(arr[0]); i++) {
cout << arr[i] << endl;
}
return 0;
}