Redis 如何高效實(shí)現(xiàn)定時(shí)任務(wù)
Redis通過(guò)單線程結(jié)合非阻塞事件輪詢機(jī)制實(shí)現(xiàn)高效的網(wǎng)絡(luò)IO和時(shí)間事件處理,這篇文章我們將從源碼的角度深入分析一下redis時(shí)間事件的設(shè)計(jì)與實(shí)現(xiàn)。
詳解redis中的時(shí)間事件
時(shí)間事件的定義
時(shí)間事件可以是單次到期執(zhí)行銷毀,也可以是定時(shí)任務(wù),對(duì)此redis對(duì)于時(shí)間事件統(tǒng)一封裝為aeTimeEvent對(duì)象,通過(guò)id來(lái)唯一標(biāo)識(shí)一個(gè)事件,結(jié)合when_sec和when_ms記錄任務(wù)到期執(zhí)行的秒和分,而執(zhí)行時(shí)間事件的函數(shù)也是交由timeProc指針?biāo)赶虻暮瘮?shù)執(zhí)行。 我們以一個(gè)redis定時(shí)執(zhí)行的任務(wù)為例,如下所示,該結(jié)果通過(guò)when_sec和when_ms記錄秒之前的時(shí)間和毫秒的時(shí)間,一旦這個(gè)時(shí)間到了就會(huì)執(zhí)行timeProc這個(gè)函數(shù)指針?biāo)赶虻姆椒╯erverCron,該函數(shù)會(huì)定期執(zhí)行各種任務(wù),這一點(diǎn)筆者會(huì)在后文展開(kāi):
對(duì)應(yīng)的我們給出時(shí)間事件的代碼描述,即位于ae.h這個(gè)頭文件中的aeTimeEvent 結(jié)構(gòu)體,這就是對(duì)時(shí)間事件的封裝結(jié)構(gòu)體,可以看到它除了筆者上述提到的核心字段以外,還有一個(gè)next指針用于連接下一個(gè)注冊(cè)的時(shí)間事件:
//時(shí)間事件
typedef struct aeTimeEvent {
//時(shí)間事件的id全局遞增
long long id; /* time event identifier. */
long when_sec; /* seconds */
//時(shí)間到達(dá)的時(shí)間
long when_ms; /* milliseconds */
//對(duì)應(yīng)時(shí)間時(shí)間的處理器
aeTimeProc *timeProc;
//......
//連接下一個(gè)時(shí)間時(shí)間
struct aeTimeEvent *next;
} aeTimeEvent;
上文提到redis的時(shí)間事件是以鏈表的形式關(guān)聯(lián)起來(lái),這里我們也給出時(shí)間時(shí)間統(tǒng)一管理對(duì)象,即時(shí)間輪詢器aeEventLoop ,它通過(guò)timeEventHead記錄第一個(gè)時(shí)間時(shí)間而后續(xù)的時(shí)間時(shí)間統(tǒng)一用時(shí)間時(shí)間的next指針進(jìn)行管理:
對(duì)應(yīng)我們也給出這段時(shí)間代碼的定義,即位于ae.h中aeEventLoop 的定義:
typedef struct aeEventLoop {
//......
//管理時(shí)間事件的列表
aeTimeEvent *timeEventHead;
//......
} aeEventLoop;
注冊(cè)時(shí)間事件
redis在服務(wù)器初始化階段,會(huì)注冊(cè)一個(gè)定時(shí)的時(shí)間事件,大約每1毫秒觸發(fā)一次,該事件主要做的是:
- 更新redis全局時(shí)鐘,該時(shí)鐘用于全局變量獲取時(shí)間用的。
- 隨機(jī)抽取redis內(nèi)存數(shù)據(jù)庫(kù)中的樣本刪除過(guò)期的鍵值對(duì)。
- 如果檢查到aof重寫(xiě)完成,則進(jìn)行刷盤(pán)操作。
- 如果發(fā)現(xiàn)當(dāng)前aof大小過(guò)大,則fork子進(jìn)程進(jìn)行aof重寫(xiě)操作。
- ......。
對(duì)應(yīng)我們給出時(shí)間事件注冊(cè)的源碼段,即redis初始化時(shí)調(diào)用的方法initServer中的aeCreateTimeEvent,可以看到它將定時(shí)任務(wù)封裝為時(shí)間事件timeEvent,并設(shè)置時(shí)間間隔為1毫秒一次:
void initServer(void) {
//......
/* Create the serverCron() time event, that's our main way to process
* background operations. */
//創(chuàng)建時(shí)間事件注冊(cè)到eventLoop->timeEventHead中
if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
redisPanic("Can't create the serverCron time event.");
exit(1);
}
//......
}
輪詢處理時(shí)間事件
redis每次處理完所有用戶的請(qǐng)求之后,都會(huì)調(diào)用一次時(shí)間時(shí)間處理函數(shù)processTimeEvents,輪詢并處理就緒的時(shí)間事件,由此保證盡可能準(zhǔn)時(shí)執(zhí)行時(shí)間事件,如果事件時(shí)間非定時(shí)任務(wù)則執(zhí)行完成直接刪除,反之設(shè)置下一次執(zhí)行時(shí)間。這些步驟全部完成之后,返回本次處理的時(shí)間事件數(shù):
我們給出處理時(shí)間循環(huán)的入口aeMain,可以看到該函數(shù)就是redis核心函數(shù)所在,它會(huì)循環(huán)調(diào)用aeProcessEvents處理各種事件:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
//處理各種事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
不如aeProcessEvents可以看到該函數(shù)執(zhí)行完所有用戶請(qǐng)求之后調(diào)用processTimeEvents方法獲取并執(zhí)行就緒的時(shí)間事件:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
//......
//處理就緒的客戶端事件
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
//上述核心網(wǎng)絡(luò)IO事件完成后處理時(shí)間事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
最后我們就可以看到處理時(shí)間事件的核心代碼段,其內(nèi)部會(huì)從timeEventHead開(kāi)始輪詢就緒的時(shí)間事件,比對(duì)當(dāng)前時(shí)間是否大于或者等于到期時(shí)間,如果是則執(zhí)行當(dāng)前時(shí)間事件,再判斷這個(gè)事件是否是定時(shí)事件,如果是則更新下次執(zhí)行時(shí)間,反之刪除,最后累加本次處理的時(shí)間時(shí)間數(shù):
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);
//......
if (now < eventLoop->lastTime) {
//從時(shí)間事件頭開(kāi)始
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
eventLoop->lastTime = now;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
//循環(huán)處理到期的時(shí)間事件
while(te) {
long now_sec, now_ms;
long long id;
if (te->id > maxId) {
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
//如果現(xiàn)在的事件大于到達(dá)時(shí)間
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
//調(diào)用時(shí)間時(shí)間函數(shù)處理該事件
retval = te->timeProc(eventLoop, id, te->clientData);
//更新處理數(shù)
processed++;
//.....
if (retval != AE_NOMORE) {//如果事件類型不是AE_NOMORE則說(shuō)明是定時(shí)事件更新周期,反之刪除
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
aeDeleteTimeEvent(eventLoop, id);
}
te = eventLoop->timeEventHead;
} else {
te = te->next;
}
}
return processed;
}
redis對(duì)于時(shí)間事件實(shí)現(xiàn)上的優(yōu)化
因?yàn)闀r(shí)間事件有些要求定期執(zhí)行,所以redis為了保證時(shí)間執(zhí)行的實(shí)時(shí)性,做了如下兩個(gè)優(yōu)化:
- 對(duì)于比較耗時(shí)的時(shí)間事件,例如AOF重寫(xiě),通過(guò)fork子進(jìn)程異步完成:
- 對(duì)于返回給客戶端套接字的內(nèi)容,如果長(zhǎng)度超過(guò)預(yù)設(shè)的值,會(huì)主動(dòng)讓出線程執(zhí)行權(quán),避免時(shí)間時(shí)間饑餓。
對(duì)應(yīng)的我們給出第一點(diǎn)時(shí)間時(shí)間對(duì)于aof重寫(xiě)的核心代碼段,可以看到serverCron內(nèi)部判斷如果當(dāng)前沒(méi)有rdb和aof子進(jìn)程,且需要進(jìn)行aof重寫(xiě)則調(diào)用rewriteAppendOnlyFileBackground函數(shù)fork子進(jìn)程進(jìn)行aof重寫(xiě):
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
//......
/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
//aof_rewrite_scheduled設(shè)置為1,且沒(méi)有其他持久化子進(jìn)程則進(jìn)行aof重寫(xiě),通過(guò)異步避免耗時(shí)
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.aof_rewrite_scheduled)
{
rewriteAppendOnlyFileBackground();
}
//......
}
//fork子進(jìn)程進(jìn)行aof重寫(xiě)
int rewriteAppendOnlyFileBackground(void) {
//......
if ((childpid = fork()) == 0) {//fork子進(jìn)程進(jìn)行aof重寫(xiě)
char tmpfile[256];
/* Child */
closeListeningSockets(0);
redisSetProcTitle("redis-aof-rewrite");
//生成一個(gè)tmp文件
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {//重寫(xiě)aof
size_t private_dirty = zmalloc_get_private_dirty();
//......
exitFromChild(0);
} else {
exitFromChild(1);
}
} else {
//......
}
return REDIS_OK; /* unreached */
}
而回復(fù)給客戶端結(jié)果的處理器sendReplyToClient內(nèi)部也有一段,判斷如果寫(xiě)入數(shù)totwritten 大于REDIS_MAX_WRITE_PER_EVENT (宏定義為64M),則直接中止寫(xiě)入,break退出等到下一次循環(huán)處理,避免因?yàn)檫@個(gè)處理導(dǎo)致其他時(shí)間事件饑餓而導(dǎo)致事件執(zhí)行延期:
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
//......
while(c->bufpos > 0 || listLength(c->reply)) {
//......
//對(duì)于文件事件數(shù)據(jù)寫(xiě)入超長(zhǎng)會(huì)讓出執(zhí)行權(quán)讓時(shí)間事件能夠盡可能的執(zhí)行
server.stat_net_output_bytes += totwritten;
if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&
(server.maxmemory == 0 ||
zmalloc_used_memory() < server.maxmemory)) break;
}
//......
}