如何理解 Redis 是單線程的
一、在文章開頭
你剛剛說redis是單線程的,那你能不能告訴我它是如何基于單個線程完成指令處理與客戶端連接接?
基于這個問題,筆者會直接通過3.0.0源碼分析的角度來剖析一下redis單線程的設(shè)計與實現(xiàn)。
二、詳解redis的單線程模型
1. 單線程處理核心任務(wù)
當(dāng)我們通過./redis-server啟動redis時,如果我們配置了后臺啟動,那么shell進程線程就會調(diào)用系統(tǒng)函數(shù)即fork方法創(chuàng)建一個子進程,再通過execve方法將子進程主體替換成redis可執(zhí)行文件也就是我們的redis-server,而子進程執(zhí)行時會保持從父進程集成過來的標(biāo)準(zhǔn)輸入和輸出,最后redis就會調(diào)用main方法開始執(zhí)行自己的啟動邏輯了。
到這為止,我們不難看出,在啟動階段redis的啟動并不是多線程的,它會根據(jù)我們的配置來決定啟動邏輯,以我們上文所說的后臺啟動,它本質(zhì)是通過父進程fork的方式完成創(chuàng)建與初始化的,這一點我們也可以直接從redis的main方法印證:
int main(int argc, char **argv) {
//命令參數(shù)解析與初始化
//......
//如果配置后臺啟動,則調(diào)用daemonize從父進程中fork出來執(zhí)行
if (server.daemonize) daemonize();
//......
}
我們步入daemonize方法,可以看到其內(nèi)部如果子進程fork成功,后續(xù)的標(biāo)準(zhǔn)輸入、輸出、錯誤都會重定向到/dev/null,此后的各項工作也都是交由我們的redis server的主線程進行負責(zé)處理:
void daemonize(void) {
int fd;
//fork返回0說明fork成功,創(chuàng)建新會話,然后父進程exit(0)直接退出
if (fork() != 0) exit(0); /* parent exits */
setsid(); /* create a new session */
//將標(biāo)準(zhǔn)輸入、輸出、錯誤重定向?qū)懙?dev/null中,由此和終端分離
if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
dup2(fd, STDIN_FILENO);
dup2(fd, STDOUT_FILENO);
dup2(fd, STDERR_FILENO);
if (fd > STDERR_FILENO) close(fd);
}
}
此時,主線程的socket就會注冊到epoll中,通過非阻塞調(diào)用epoll函數(shù)獲取就緒的連接和指令完成與多個客戶端的交互:
而上述所說這種工作模式,也就是我們的aeMain函數(shù),這里筆者也給出的對應(yīng)的的代碼實現(xiàn),如下所示,aeMain的本質(zhì)邏輯就是調(diào)用無限循環(huán),在循環(huán)中調(diào)用aeApiPoll即epoll非阻塞輪詢獲取就緒的事件并交給對應(yīng)的讀寫事件處理器(rfileProc/wfileProc)進行處理:
//無限循環(huán)調(diào)用aeProcessEvents處理讀寫事件
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
//輪詢標(biāo)識沒有停止則無限循環(huán)
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
//輪詢并處理事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
//......
//通過epoll完成非阻塞調(diào)用
numevents = aeApiPoll(eventLoop, tvp);
//遍歷拿到的事件將其交給讀寫處理器處理
for (j = 0; j < numevents; j++) {
//解析出該文件對應(yīng)的類型
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
//如果事件fe是讀事件則交給rfileProc
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
//如果事件包含寫標(biāo)志,則交給wfileProc處理器處理
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
//......
//返回處理事件數(shù)
return processed; /* return the number of processed file/time events */
}
2. 多線程執(zhí)行IO事件
截至到上述的片段,redis大體上我們可以認為是單線程執(zhí)行,但是在3.0.0之后源碼中,為了避免某些IO任務(wù)對主線程的執(zhí)行效率的影響,redis還是創(chuàng)建了一些異步線程處理這些任務(wù)。
如下圖所示,我們以aof為例,redis主線程會通過定時任務(wù)的方法serverCron會按照用戶的配置檢查當(dāng)前是否需要進行aof寫入,如果需要則通過bioCreateBackgroundJob提交一個任務(wù)到AOF異步刷盤的任務(wù)列表中,此時redis創(chuàng)建的io線程就會無限循環(huán)調(diào)用bioProcessBackgroundJobs從該列表中取出自己綁定的任務(wù)進行異步消費,通過這種簡單的多線程模式,保證了耗時的IO操作不會阻塞主線程:
這里我們先給出對應(yīng)的事件宏定義,可以看到事件總數(shù)為REDIS_BIO_NUM_OPS 即2,然后0是文件關(guān)閉事件,1的AOF異步刷盤事件,通過這樣的順序完成了事件的類型碼和總量的定義:
/* Background job opcodes */
#define REDIS_BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */
#define REDIS_BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */
#define REDIS_BIO_NUM_OPS 2
對應(yīng)的這些線程的初始化工作我們可以在main方法調(diào)用的initServer中可以看到這樣一段調(diào)用,其內(nèi)部的調(diào)用bioInit本質(zhì)就是完成上述IO任務(wù)的線程的創(chuàng)建:
void initServer(void) {
int j;
//......
//創(chuàng)建bio任務(wù)線程
bioInit();
}
bioInit它會初始化2個線程以及棧大小(最大不會超過4M),為每個線程各自分配一個隊列,分配隊列這一步就會按照循環(huán)遍歷得到的值進行分配,遍歷時用REDIS_BIO_NUM_OPS作為范圍控制,遍歷到0的處理文件關(guān)閉事件,1則是AOF刷盤事件。 完成事件類型隊列分配之后,redis會為每個線程分配消費任務(wù)的方法指針bioProcessBackgroundJobs,后續(xù)的線程的任務(wù)消費和處理都是調(diào)用這個方法執(zhí)行的:
void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
int j;
//循環(huán)2次,剛剛好對應(yīng)2個事件即0是文件關(guān)閉事件、1是aof刷盤事件
for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
//互斥數(shù)組初始化
pthread_mutex_init(&bio_mutex[j],NULL);
//條件數(shù)組初始化
pthread_cond_init(&bio_condvar[j],NULL);
//bio任務(wù)數(shù)組初始化,每個數(shù)組元素都是一個任務(wù)列表
bio_jobs[j] = listCreate();
//表示每種任務(wù)列表待處理的任務(wù)數(shù)為0
bio_pending[j] = 0;
}
//設(shè)置線程最大的棧屬性大小,默認為1,若小于REDIS_THREAD_STACK_SIZE即4M則乘2
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
if (!stacksize) stacksize = 1;
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&attr, stacksize);
//創(chuàng)建線程并,為每一個線程分配一個任務(wù)列表
for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
//循環(huán)兩次 j為0即代表文件關(guān)閉事件、1是aof刷盤事件,這個arg會作為事件類型綁定到線程pthread上
void *arg = (void*)(unsigned long) j;
//調(diào)用pthread_create完成線程屬性初始化和事件類型的綁定
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
bio_threads[j] = thread;
}
}
這里我們也給出bioProcessBackgroundJobs邏輯可以看到,每個線程調(diào)用該方法時,會在無限循環(huán)中根據(jù)任務(wù)的type按需消費處理:
void *bioProcessBackgroundJobs(void *arg) {
struct bio_job *job;
//每個線程都會根據(jù)自己傳入的arg決定任務(wù)的type,0為文件關(guān)閉事件、1為aof刷盤事件
unsigned long type = (unsigned long) arg;
sigset_t sigset;
//......
//按照類型到bio_jobs取任務(wù)執(zhí)行
while(1) {
listNode *ln;
//......
//取出自己需要處理的類型的隊列任務(wù)
ln = listFirst(bio_jobs[type]);
job = ln->value;
//上互斥鎖
pthread_mutex_unlock(&bio_mutex[type]);
//線程按照自己的類型進行消費
if (type == REDIS_BIO_CLOSE_FILE) {
close((long)job->arg1);
} else if (type == REDIS_BIO_AOF_FSYNC) {
aof_fsync((long)job->arg1);
} else {
redisPanic("Wrong job type in bioProcessBackgroundJobs().");
}
//完成后釋放任務(wù)對象
zfree(job);
//線程解鎖 任務(wù)移除
pthread_mutex_lock(&bio_mutex[type]);
//任務(wù)處理完成后的收尾工作
listDelNode(bio_jobs[type],ln);
bio_pending[type]--;
}
}
了解的任務(wù)消費的源碼之后,我們再來看看任務(wù)的投遞的邏輯,我們以aof文件刷盤的任務(wù)為例,從定時任務(wù)函數(shù)serverCron,其內(nèi)部會判斷aof_child_pid的pid不為-1,若不為-1說明當(dāng)前存在aof子進程,對此redis-server就會獲取當(dāng)前aof子進程的pid,調(diào)用backgroundRewriteDoneHandler提交一個aof重寫完成的回調(diào)任務(wù),等待aof重寫完成后該任務(wù)就會被消費,從而完成aof緩沖區(qū)刷盤:
這里我們直接從serverCron為入口查看上述邏輯,可以看到其內(nèi)部會查看rdb_child_pid 或者aof_child_pid 的值,這兩個變量分別記錄rdb或者aof異步持久化進程的id值,若達到以下兩個條件則說明存在aof重寫任務(wù),需要提交一個aof重寫后的刷盤任務(wù):
- aof_child_pid 不是-1
- wait3獲取到的pid也為aof重寫的子進程id
符合上述條件則調(diào)用backgroundRewriteDoneHandler提交一個aof重寫完成后的異步刷盤任務(wù):
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j;
REDIS_NOTUSED(eventLoop);
REDIS_NOTUSED(id);
REDIS_NOTUSED(clientData);
//......
//檢查后臺的aof重寫進程是否結(jié)束,若結(jié)束的步入循環(huán)
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
int statloc;
pid_t pid;
//獲取當(dāng)前子進程pid
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
//......
if (pid == server.rdb_child_pid) {
//......
} else if (pid == server.aof_child_pid) {//如果pid為aof的子進程值則調(diào)用backgroundRewriteDoneHandler
backgroundRewriteDoneHandler(exitcode,bysignal);
} else {
redisLog(REDIS_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
updateDictResizePolicy();
}
} else {
//......
}
//......
}
步入backgroundRewriteDoneHandler可以看到,如果AOF刷盤策略是AOF_FSYNC_EVERYSEC即異步刷盤則會調(diào)用aof_background_fsync進行文件刷盤,而該方法內(nèi)部的邏輯就是調(diào)用我們上文的所說的提交后臺任務(wù)方法bioCreateBackgroundJob:
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
//......
if (server.aof_fd == -1) {
//......
} else {
/* AOF enabled, replace the old fd with the new one. */
oldfd = server.aof_fd;
server.aof_fd = newfd;
if (server.aof_fsync == AOF_FSYNC_ALWAYS)
aof_fsync(newfd);
else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)//如果是異步刷盤則將任務(wù)提交到對應(yīng)的隊列中
//提交異步刷盤任務(wù)到REDIS_BIO_AOF_FSYNC隊列中
aof_background_fsync(newfd);
//......
}
server.aof_lastbgrewrite_status = REDIS_OK;
//......
} else if (!bysignal && exitcode != 0) {
//......
} else {
//......
}
//......
}
//調(diào)用bioCreateBackgroundJob提交任務(wù)到AOF刷盤隊列中
void aof_background_fsync(int fd) {
bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}
最終,我們就可以在bioCreateBackgroundJob看到aof異步刷盤的任務(wù)提交核心步驟:
- 獲取任務(wù)參數(shù),以我們aof異步刷盤的邏輯第一個參數(shù)就是aof子進程的文件句柄。
- 線程上鎖。
- 任務(wù)入隊。
- 喚醒相應(yīng)線程。
- 釋放互斥鎖。
對應(yīng)源碼如下,讀者可參考上述說明和注釋了解邏輯:
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
struct bio_job *job = zmalloc(sizeof(*job));
job->time = time(NULL);
//獲取aof子進程的fd
job->arg1 = arg1;
//以本文為例都說null
job->arg2 = arg2;
job->arg3 = arg3;
//上鎖
pthread_mutex_lock(&bio_mutex[type]);
//追加任務(wù)到對應(yīng)job的數(shù)組中
listAddNodeTail(bio_jobs[type],job);
bio_pending[type]++;
//通知相關(guān)線程消費
pthread_cond_signal(&bio_condvar[type]);
//釋放互斥鎖
pthread_mutex_unlock(&bio_mutex[type]);
}
三、小結(jié)
自此我們把redis中主線程和IO任務(wù)的線程都以圖解和源碼印證的方式分析完成了,以筆者的理解,設(shè)計者所說的redis是單線程的本質(zhì)上的是強調(diào)對于核心的連接建立和指令處理是通過極致壓榨單個線程高效完成,而其余的一些非核心的IO耗時邏輯還是需要多個線程進行異步處理。