Redis 源碼解析:一條 Redis 命令是如何執(zhí)行的?
作者 | robinhzhang
Redis(Remote Dictionary Server)是一個開源的內存數據庫,遵守 BSD 協議,它提供了一個高性能的鍵值(key-value)存儲系統(tǒng),常用于緩存、消息隊列、會話存儲等應用場景。本文主要向大家分享redis基本概念和流程,希望能和大家一起從源碼角度分析一條命令執(zhí)行過程,希望能幫助開發(fā)同學掌握redis的實現細節(jié),提升編程水平、設計思想。
一、源碼結構
學習 Redis 源代碼之前,我們需要對 Redis 代碼的整體架構有一個了解,基于redis1.0源碼,我們列出了主流程相關的如下源碼文件。
二、核心數據結構
1. redisServer
redisServer是存儲redis服務端運行的結構體,在啟動的時候就會初始化完成,結構如下,它主要包含跟監(jiān)聽的socket有關的參數port和fd;對應存儲數據的redisDb列表;鏈接的客戶端列表clients;事件循環(huán)*el
struct redisServer {
int port; // 服務端監(jiān)聽的端口
int fd; // 服務端起的socket對應的文件句柄
redisDb *db; // redis db的列表,一般實際生產環(huán)境只用一個
// 3-lines
list *clients; // 服務端的列表
// 2 lines
aeEventLoop *el; // 事件循環(huán)
// 36 lines
};
2. redisClient
redisClient是客戶端在服務端存儲的狀態(tài)信息,每當一個客戶端與服務端鏈接時,都會新創(chuàng)建redisClient結構體到redisServer->clients列表中。
typedef struct redisClient {
int fd; // 客戶端發(fā)送命令和接收結果的socket文件句柄
redisDb *db; // 對應的db
// 1-line
sds querybuf; // 查詢命令存儲的緩沖區(qū)
robj **argv; // 查詢命令轉成的命令參數
int argc; // 參數個數
// 1-lines
list *reply; // 命令執(zhí)行完的回復的結果,是個列表
int sentlen; // 結果已經發(fā)送的長度
// 8-lines
} redisClient;
我們也用簡單的示意圖展示了redisClient的結構,它包含命令傳輸所使用的querybuf,命令在經過處理后會存放到argv中;然后比較重要的是*reply表示服務端給到客戶端的回復的數據,這是個列表會在客戶端寫就緒的時候一個一個寫回客戶端,sentlen則是標識了傳輸的長度;然后就是對應的db與socket句柄fd。
3. redisDb
redisDb是redis的鍵值對存儲的位置,主要包含兩大塊,一塊存儲數據,另一塊存儲過期信信息,dict結構實際上是兩個哈希表,至于為什么有兩個,這里是為了做漸進式rehash使用(后面會詳細介紹),rehashidx用于表示rehash進度,iterators迭代器是表示遍歷集合操作個數,表里面的元素就是entry,這里面包含key和value以及指向下一個元素的指針。
typedef struct redisDb {
dict *dict; // 字典1 存儲數據
dict *expires; // 字典2存儲過期數據
int id; // db的id
} redisDb;
typedef struct dict {
dictType *type; // 類型,主要定義相關的函數
void *privdata;
dictht ht[2]; // 兩個hash table,用于做漸進式rehash使用
int rehashidx; /* rehash進度 rehashidx == -1表示不是正在rehash*/
int iterators; /* number of iterators currently running */
} dict;
typedef struct dictht {
dictEntry **table; // 存儲數據的表
unsigned long size; // 大小
unsigned long sizemask; // size-1,計算index的使用[1]
unsigned long used; // 已經使用的長度
} dictht;
typedef struct dictEntry {
void *key; // 鍵,在redis中一般是指向一個SDS類型的數據
union {
void *val; // 值,在redis中一般指向redisObject
uint64_t u64; // 特定情況下優(yōu)化整數存儲,比如過期
int64_t s64; // 特定情況下優(yōu)化整數存儲
} v;
struct dictEntry *next; // 下一個entry
} dictEntry;
4. redisObject
redisObject是redis存儲對象基本的表現形式,它可以存儲類似SDS list set等數據結構,并且存儲了一些信息用于內存管理,比如refcount這是一個整數字段,用于存儲對象的引用計數。每當有一個新的指針指向這個對象時,引用計數會增加;當指針不再指向這個對象時,引用計數會減少。當引用計數降到 0 時,表示沒有任何地方再使用這個對象,對象的內存可以被回收。lru在儲對象的 LRU(最近最少使用)時間,這個時間戳是相對于服務器的 lruclock 的,用于實現緩存淘汰策略。當 Redis 需要釋放內存時,它會根據這個時間戳來判斷哪些對象是最近最少被使用的,從而決定淘汰哪些對象。
typedef struct redisObject {
void *ptr; // 指向具體數據的指針
int refcount; // 引用計數
unsigned type:4; // 類型
unsigned notused:2; // 未使用,可能是為了擴展/占位
unsigned encoding:4; // 編碼方式
unsigned lru:22; // 最近最少使用
} robj;
5. aeEventLoop
aeEventloop是redis事件模型基礎數據,它主要包含文件事件和時間事件的兩個鏈表。對于文件事件來說,包含文件句柄fd,事件類型mask,對應處理函數fileProc;對于時間事件來說包含id、執(zhí)行時間(when_sec、when_ms)和對應執(zhí)行函數timeProc 對應的源代碼如下:
/* Types and data structures */
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); // 文件事件回調函數
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); // 時間事件回調函數
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData); // 事件結束時執(zhí)行的函數
/* File event structure */
typedef struct aeFileEvent {
int fd; // 事件對應的文件句柄ID
int mask; /* one of AE_(READABLE|WRITABLE|EXCEPTION) */
aeFileProc *fileProc; // 文件事件回調函數
aeEventFinalizerProc *finalizerProc; // 事件結束時執(zhí)行的函數
void *clientData; // 對應客戶端的擴展數據
struct aeFileEvent *next; // 下一個文件事件(鏈表存儲)
} aeFileEvent;
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc; // 事件事件回調函數
aeEventFinalizerProc *finalizerProc; // 事件結束時執(zhí)行的函數
void *clientData; // 事件結束時執(zhí)行的函數
struct aeTimeEvent *next;// 下一個事件(鏈表存儲)
} aeTimeEvent;
/* State of an event based program */
typedef struct aeEventLoop {
long long timeEventNextId;
aeFileEvent *fileEventHead;
aeTimeEvent *timeEventHead;
int stop;
} aeEventLoop;
6. 小結
現在我們將redis核心概念的結構總結如下:
三、核心流程
了解完基本概念后,我們就看看基本流程了,首先我們從redis的main函數看起,看看啟動的流程和命令執(zhí)行的基本流程。
1. redis啟動流程
(1) 主入口main()
在看一個軟件的源碼,一般從main函數看起,redis啟動的main函數位于redis.c中,可以看起啟動時,首先初始化了配置initServerConfig(),然后初始化了server端服務initServer(),接下來注冊處理函數為acceptHandler的文件事件,然后啟動了redis的主循環(huán)開始處理事件了。
int main(int argc, char **argv) {
initServerConfig(); // 初始化配置
// 8-lines 從文件中讀取配置
initServer(); // 初始化服務
if (server.daemonize) daemonize(); // TODO
redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
// 5-lines,內存檢查,加載rdb
if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event"); // 創(chuàng)建一個讀的文件事件,處理者是acceptHandler
redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
aeMain(server.el); // 啟動redis主循環(huán)
aeDeleteEventLoop(server.el); // 主循環(huán)退出清理資源
return 0;
}
然后我們看看initServer做了什么初始化,鑒于本文是闡述基本原理,因此注釋掉了非主鏈路上的代碼,可以看到它初始化了客戶端列表、事件循環(huán)、db、創(chuàng)建了時間事件,將這幾個核心的組件初始化了
static void initServer() {
// 5-lines ...
server.clients = listCreate(); // 初始化客戶端列表
// 4-lines
server.el = aeCreateEventLoop(); // 初始化事件循環(huán)
server.db = zmalloc(sizeof(redisDb)*server.dbnum); // 為db分配內存
// 3-lines ...
server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr); // 初始化服務端的sever socket
// 4-lines ...
for (j = 0; j < server.dbnum; j++) { // 初始化數據存儲
server.db[j].dict = dictCreate(&hashDictType,NULL);
server.db[j].expires = dictCreate(&setDictType,NULL);
server.db[j].id = j;
}
// 8-lines ...
aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);// 初始化時間事件
}
(2) 主循環(huán)aeEventProcess執(zhí)行過程
從上一節(jié)我們知道redis在main函數中調用aeMain函數 aeMain函數則不停的循環(huán)調用aeEventProcess處理事件,redis是事件驅動的程序,他主要包含文件事件和時間事件,在aeProcessEvents中處理處理這些事件。
void aeMain(aeEventLoop *eventLoop)
{
eventLoop->stop = 0;
while (!eventLoop->stop)
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
①將這些文件事件裝到不同的集合(可讀、可寫、異常)中
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
// 9-line 變量初始化和前置判斷
FD_ZERO(&rfds); // 清空fd集合
FD_ZERO(&wfds); // 清空fd集合
FD_ZERO(&efds); // 清空fd集合
// 檢查文件事件,將它們分別放到對應的集合中
if (flags & AE_FILE_EVENTS) {
while (fe != NULL) {
if (fe->mask & AE_READABLE) FD_SET(fe->fd, &rfds);
if (fe->mask & AE_WRITABLE) FD_SET(fe->fd, &wfds);
if (fe->mask & AE_EXCEPTION) FD_SET(fe->fd, &efds);
if (maxfd < fe->fd) maxfd = fe->fd;
numfd++;
fe = fe->next;
}
}
// ...
}
②計算超時時間 在調用select()函數的時候,在監(jiān)聽的fd沒有就緒時,會阻塞??;這里還需要處理時間事件,因此我們需要給select()設置一個超時時間,以防阻塞的時候錯過了執(zhí)行時間事件。超時時間計算通過找到最近的一條時間事件的執(zhí)行時間計算的到
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
// 42-lines ...接上文
if (numfd || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int retval;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop); // 遍歷timeEvent的鏈表,拿到最近的執(zhí)行時間事件
if (shortest) {
long now_sec, now_ms;
// 計算時間差
aeGetTime(&now_sec, &now_ms); // 拿到當前時間
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;// 計算秒差
if (shortest->when_ms < now_ms) { // 比較毫秒位
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; // 計算微秒差
tvp->tv_sec --;
} else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
} else { //在某些情況下,事件循環(huán)需要立即返回,而不是等待事件的發(fā)生。這通常發(fā)生在非阻塞模式下,即使沒有事件發(fā)生,事件循環(huán)也不應該阻塞等待。AE_DONT_WAIT 是一個標志,用于指示事件循環(huán)在這種非阻塞模式下運行。所以此處就進盡快返回ASAP
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
// 設置為NULL,永久阻塞等待就緒
tvp = NULL; /* wait forever */
}
}
}
③執(zhí)行文件事件 拿到超時時間后就開始執(zhí)行事件了,首先調用select(),傳入事件集合(&rfds, &wfds, &efds),拿到就緒文件事件的個數,然后開始挨個檢查就緒的文件事件執(zhí)行,值的注意的是在redis1.0中調用的是select()系統(tǒng)調用,在后續(xù)的redis版本中調用的是epoll()相關函數。
retval = select(maxfd+1, &rfds, &wfds, &efds, tvp);
if (retval > 0) {
fe = eventLoop->fileEventHead;
while(fe != NULL) {
int fd = (int) fe->fd;
// 檢查fd是不是在集合里面
if ((fe->mask & AE_READABLE && FD_ISSET(fd, &rfds)) ||
(fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds)) ||
(fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds)))
{
// 求mask
int mask = 0;
if (fe->mask & AE_READABLE && FD_ISSET(fd, &rfds))
mask |= AE_READABLE;
if (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds))
mask |= AE_WRITABLE;
if (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds))
mask |= AE_EXCEPTION; fe->fileProc(eventLoop, fe->fd, fe->clientData, mask); // 執(zhí)行文件事件中的回調函數
processed++; // 處理完成+1
fe = eventLoop->fileEventHead; // 在事件循環(huán)中處理完一個事件后,文件事件列表可能會發(fā)生變化。這種變化可能是因為在處理事件的過程中,某些操作(如關閉文件描述符、修改事件訂閱等)導致了文件事件列表的更新。
FD_CLR(fd, &rfds); // 清除執(zhí)行完畢的fd
FD_CLR(fd, &wfds); // 清除執(zhí)行完畢的fd
FD_CLR(fd, &efds); // 清除執(zhí)行完畢的fd
} else {
fe = fe->next; // 處理下一個
}
}
}
④執(zhí)行時間事件 時間事件的執(zhí)行就相對簡單一些,主要邏輯就是比較事件執(zhí)行時間是否比當前時間大了,到達執(zhí)行時間便執(zhí)行;另外一個點是看這個事件是一次性的還是周期性,一次性的事件要刪掉;另外下一次執(zhí)行的時間點是回調函數返回的,然后寫到事件的結構體中
if (flags & AE_TIME_EVENTS) { // 需要處理時間事件
te = eventLoop->timeEventHead; // 取到第一個事件
maxId = eventLoop->timeEventNextId-1; // 記錄最大的ID
while(te) {
long now_sec, now_ms;
long long id;
if (te->id > maxId) {// 如果ID>maxID則認為這個事件是新加的不在此次循環(huán)處理,否則可能出現無限循環(huán)的情況
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms); //拿到當前時間
if (now_sec > te->when_sec || //比較秒
(now_sec == te->when_sec && now_ms >= te->when_ms)) // 比較毫秒
{
int retval;
id = te->id;
retval = te->timeProc(eventLoop, id, te->clientData); //執(zhí)行時間事件的回調函數
if (retval != AE_NOMORE) {// 如果是不是一次性的
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); //添加下一次時間事件
} else {
aeDeleteTimeEvent(eventLoop, id); //如果是一次性的刪除這個時間
}
te = eventLoop->timeEventHead; // 執(zhí)行完一次時間事件后,列表可能發(fā)生變化,下一次需要從頭開始處理
} else {
te = te->next;
}
}
(3) 小結
整個過程如圖,簡單來說:在redis啟動時,在初始化配置和server的數據后,便啟動了主循環(huán)aeMain,主循環(huán)的任務就是等待事件就緒和處理事件。對于文件事件,redis使用了 IO多路復用技術,通過系統(tǒng)調用select(),檢查就緒的文件事件,就緒后則會遍歷aeEventLoop進行事件處理;對于時間事件,則是與系統(tǒng)當前時間比較,就緒的執(zhí)行。
2. 命令執(zhí)行的完整流程
了解完redis整體事件驅動的運行架構后,我們看下redis的一條命令執(zhí)行的過程中經過了哪些過程 簡單來說有四個過程:redis啟動、客戶端前來連接、客戶端發(fā)送命令到服務端、服務端回復結果給客戶端。 下面讓我們詳細看看:
(1) 過程1(redis啟動)
上一章中,redis在啟動的時候會通過anetTcpSever創(chuàng)建一個socket server,再調用aeCreateFileEvent注冊一個readable事件,回調函數為acceptHander,對應文件的句柄就是server的fd。
int main(int argc, char **argv) {
initServerConfig(); // 初始化配置
// 8-lines 從文件中讀取配置
initServer(); // 初始化服務
if (server.daemonize) daemonize(); // TODO
redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
// 5-lines,內存檢查,加載rdb
if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event"); // 創(chuàng)建一個讀的文件事件,處理者是acceptHandler
redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
aeMain(server.el); // 啟動redis主循環(huán)
aeDeleteEventLoop(server.el); // 循環(huán)推出清理資源
return 0;
}
(2) 過程2(客戶端與服務端建立鏈接)
第一個事件中處理函數是acceptHander,顧名思義就是接收客戶端了鏈接并進行進一步處理,首先執(zhí)行了anetAccept()函數,拿到了客戶端和服務端交互的文件句柄fd,接下來執(zhí)行createClient()函數,創(chuàng)建客戶端實例
static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
// 6-lines ...
cfd = anetAccept(server.neterr, fd, cip, &cport); // 核心是執(zhí)行了accept,建立了連接拿到了與client交互的cfd
if (cfd == AE_ERR) {
redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
if ((c = createClient(cfd)) == NULL) { // 初始化客戶端
redisLog(REDIS_WARNING,"Error allocating resoures for the client");
close(cfd); /* May be already closed, just ingore errors */
return;
}
// 如果當前客戶端數量超過了 `maxclients` 的設置,服務器會接受新的連接,發(fā)送錯誤消息,然后關閉連接。
if (server.maxclients && listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";
/* That's a best effort error message, don't check write errors */
(void) write(c->fd,err,strlen(err));
freeClient(c);
return;
}
server.stat_numconnections++;
}
在createClient()中,初始化了redisClient的一些參數,最重要的是注冊了一個文件事件,對應的執(zhí)行函數是readQueryFromClient
static redisClient *createClient(int fd) {
redisClient *c = zmalloc(sizeof(*c));
// 15-lines...
listSetFreeMethod(c->reply,decrRefCount); //設置了鏈表 c->reply 的釋放方法為 decrRefCount 函數
listSetDupMethod(c->reply,dupClientReplyValue);// 設置了鏈表 c->reply 的復制方法為 dupClientReplyValue 函數
if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
readQueryFromClient, c, NULL) == AE_ERR) { // 創(chuàng)建了一個文件事件,執(zhí)行函數為readQueryFromClient
freeClient(c);
return NULL;
}
if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
return c;
}
(3) 過程3 (客戶端發(fā)送命令給服務端)
接上文 當客戶端發(fā)送命令到服務端時,數據到達服務端經過網卡、協議棧等一系列操作后,達到可讀狀態(tài)后,就會執(zhí)行readQueryFromClient(),處理客戶端傳過來的命令,首先會執(zhí)行read()方法從緩沖區(qū)中讀取一塊數據,將其追加到c->querybuf后面,根據redis協議進行querybuf的解析,并將其轉換成sds的redisObject,存儲到argv中,然后執(zhí)行processCommand()處理命令,注意這里只是展示主流程的代碼和說明,這里為了保證客戶端輸入能在各種情況下都work做了比較多的校驗和錯誤處理;另外redis客戶端和服務端交互的協議有兩種一種是inline的、另外一種是bulk的,在querybuf轉換成argv時,根據協議不同(bulklen==-1),走的也是不同的解析邏輯。
static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = (redisClient*) privdata;
char buf[REDIS_IOBUF_LEN];
int nread;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
nread = read(fd, buf, REDIS_IOBUF_LEN);
// 14-lines ... 讀取buf檢驗處理
if (nread) {
c->querybuf = sdscatlen(c->querybuf, buf, nread); // 將buf追加到querybuf后面
c->lastinteraction = time(NULL); // 更新最后一次交互時間
} else {
return;
}
again:
if (c->bulklen == -1) { // inline協議
/* Read the first line of the query */
char *p = strchr(c->querybuf,'\n');
size_t querylen;
if (p) {
// 28-lines... 處理讀取的buf
// 這里將輸入的參數存到argv中,argc++
for (j = 0; j < argc; j++) {
if (sdslen(argv[j])) {
c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
c->argc++;
} else {
sdsfree(argv[j]);
}
}
zfree(argv);
// 執(zhí)行processCommand處理命令
if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
return;
} else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
redisLog(REDIS_DEBUG, "Client protocol error");
freeClient(c);
return;
}
}
// 15 lines ... bulk命令處理
}
接下來我們繼續(xù)看看重頭戲processCommand的處理過程,首先執(zhí)行l(wèi)ookupCommand,從cmdTable中遍歷找到符合要求的命令,然后進行一些認證和數據合法性校驗后,執(zhí)行cmd的proc函數執(zhí)行命令,執(zhí)行完畢后,清理命令執(zhí)行的過程數據。
static int processCommand(redisClient *c) {
// 11 lines...
cmd = lookupCommand(c->argv[0]->ptr); // 從表里面查找命令
// 46 lines ... cmd、argv、argc、內存、認證等校驗
dirty = server.dirty;
cmd->proc(c); // 執(zhí)行命令
// 4 lines.. 變更通知給連接的從服務器(slaves)和監(jiān)控客戶端(monitors)
if (c->flags & REDIS_CLOSE) {
freeClient(c);
return 0;
}
resetClient(c); // 清理client命令相關字段
return 1;
}
static struct redisCommand cmdTable[] = {
{"get",getCommand,2,REDIS_CMD_INLINE},
{"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
{"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
{"del",delCommand,-2,REDIS_CMD_INLINE},
{"exists",existsCommand,2,REDIS_CMD_INLINE},
{"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}
}
讓我們以get命令為例看看 getCommand()做了什么事,首先從DB里面去查找這個key,然后調用addReply,將結果回復加到回復隊列中去,可以看到它回復了協議頭、數據、協議尾三段數據。
static void getCommand(redisClient *c) {
robj *o = lookupKeyRead(c->db,c->argv[1]);
if (o == NULL) {
addReply(c,shared.nullbulk);
} else {
if (o->type != REDIS_STRING) {
addReply(c,shared.wrongtypeerr);
} else {
addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
addReply(c,o);
addReply(c,shared.crlf);
}
}
}
讓我們看看lookupKeyRead 做了什么,最終執(zhí)行的是dict的方法dictFind,這個函數首先根據key算出在table中的位置,然后開始遍歷entry鏈表,通過dictCompareHashKeys方法比較key是不是相等最終找到這個key取出返回。
static robj *lookupKeyRead(redisDb *db, robj *key) {
expireIfNeeded(db,key); // 檢查過期
return lookupKey(db,key);
} static robj *lookupKey(redisDb *db, robj *key) {
dictEntry *de = dictFind(db->dict,key); //找到key
return de ? dictGetEntryVal(de) : NULL; //找到key對應的value
}
dictEntry *dictFind(dict *ht, const void *key)
{
dictEntry *he;
unsigned int h;
if (ht->size == 0) return NULL;
h = dictHashKey(ht, key) & ht->sizemask; // 計算在哈希表的位置
he = ht->table[h];
while(he) {
if (dictCompareHashKeys(ht, key, he->key)) // 比較entry和key是否相等
return he;
he = he->next;
}
return NULL;
}
具體是怎么回復結果的呢,addReply函數通過調用aeCreateFileEvent 創(chuàng)建了寫入類型的文件事件,然后就是尾插法將要回復的obj添加到c->reply的尾部,等待fd寫就緒時執(zhí)行事件
static void addReply(redisClient *c, robj *obj) {
if (listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
c->replstate == REDIS_REPL_ONLINE) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c, NULL) == AE_ERR) return; //創(chuàng)建了文件事件
if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
incrRefCount(obj
(4) 過程4 (寫就緒將結果寫回客戶端)
當 socket 的發(fā)送緩沖區(qū)有足夠空間,并且網絡狀態(tài)允許數據發(fā)送時,socket 變?yōu)閷懢途w狀態(tài)時,這時候就會aeEventLoop->fileEvents中取出就緒的reply事件,執(zhí)行sendReplyToClient()函數,這個函數會遍歷c->reply列表,按照順序一個一個通過調用write()方法寫回給客戶端,值的注意的是,Redis 限制了單次事件循環(huán)中可以寫入的最大字節(jié)數(REDIS_MAX_WRITE_PER_EVENT),防止一個客戶端占用所有的服務器資源,特別是當該客戶端連接速度非??欤ɡ缤ㄟ^本地回環(huán)接口 loopback interface)并且發(fā)送了一個大請求(如 KEYS * 命令),如果c->reply全寫完了,就干掉這個寫入事件
static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = privdata;
int nwritten = 0, totwritten = 0, objlen;
robj *o;
// 4 lines ...
while(listLength(c->reply)) {
o = listNodeValue(listFirst(c->reply)); // 取出第一個
objlen = sdslen(o->ptr); // 拿到長度
if (objlen == 0) { // 長度為零刪掉
listDelNode(c->reply,listFirst(c->reply));
continue;
}
if (c->flags & REDIS_MASTER) {
/* Don't reply to a master */
nwritten = objlen - c->sentlen;
} else {
// 從上一次發(fā)送的最后的位置(c—>sentlen),發(fā)送剩余長度的數據(objlen - c->sentlen)
nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
if (nwritten <= 0) break;
}
c->sentlen += nwritten; // 更新已經發(fā)送的長度
totwritten += nwritten; // 更新本次事件已經發(fā)送的長度
// 如果已經發(fā)送的長度==要發(fā)送的對象長度,這個對象就發(fā)送完了,刪掉
if (c->sentlen == objlen) {
listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0;
}
// 對單個客戶端單個事件發(fā)送的長度進行限制,因為redis時單線程,防止一個客戶端有
// 大量返回數據時,會阻塞主循環(huán)處理,導致無法服務其他客戶端
if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
}
// 9-lines...
if (totwritten > 0) c->lastinteraction = time(NULL); // 更新最后交互時間
/*
* 當Redis在事件循環(huán)中處理客戶端連接的數據發(fā)送時,它會逐個取出回復列表中的數據進行發(fā)送。
* 每發(fā)送完一個數據,就從列表中刪除該數據。
* 因此,如果回復列表的長度為0,說明所有的回復數據都已經發(fā)送完畢。
*/
if (listLength(c->reply) == 0) {
c->sentlen = 0;
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
}
}
四、其他關鍵功能實現
1. 過期的實現
如前文所說,redis單獨有個dict記錄key的過期信息:
typedef struct redisDb {
dict *dict; // 字典1 存儲數據
dict *expires; // 字典2存儲過期數據
int id; // db的id
} redisDb;
redis的key過期了不會立即刪除,截止redis2.6的源碼,有兩種刪除方式,一種是在執(zhí)行key相關的命令執(zhí)行之前調用expireIfNeeded(),檢查key是否過期了。
static robj *lookupKeyRead(redisDb *db, robj *key) {
expireIfNeeded(db,key); // 檢查過期
return lookupKey(db,key); // 查找key
}
static int expireIfNeeded(redisDb *db, robj *key) {
time_t when;
dictEntry *de;
/* No expire? return ASAP(as soon as possbile) */
if (dictSize(db->expires) == 0 ||
(de = dictFind(db->expires,key)) == NULL) return 0;
/* Lookup the expire */
when = (time_t) dictGetEntryVal(de);
if (time(NULL) <= when) return 0;
/* Delete the key */
dictDelete(db->expires,key);
return dictDelete(db->dict,key) == DICT_OK;
}
另外一種是在啟動的時候注冊了serverCron時間事件,severcorn會定期調用activeExpireCycle()方法,這個方法核心邏輯調用dictGetRandomKey獲取一些隨機的key,然后檢查下key是否過期了,如果過期了執(zhí)行key刪除和資源釋放操作,值的一提的是activeExpireCycle使用了一種自適應算法來嘗試過期(expire)一些超時的鍵。這個算法的目的是平衡 CPU 使用和內存占用,感興趣的同學自己翻閱代碼了解下。
// main()
// ->initServer();
// ->aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
// ->serverCron()
// ->activeExpireCycle()
void activeExpireCycle(void) {
// 21 lines......
while (num--) {
dictEntry *de;
long long t;
if ((de = dictGetRandomKey(db->expires)) == NULL) break;
t = dictGetSignedIntegerVal(de);
if (now > t) {
sds key = dictGetKey(de);
robj *keyobj = createStringObject(key,sdslen(key));
propagateExpire(db,keyobj);
dbDelete(db,keyobj);
decrRefCount(keyobj);
expired++;
server.stat_expiredkeys++;
}
}
// 7 lines .......
}
2. 漸進式rehash
(1) 大哈希表rehash問題
經過上文我們已經知道dict實際上是個兩個拉鏈的哈希表,在不斷的添加key的過程中,hash表的沖突會增多,導致拉鏈會越來越長,極端情況下,哈希表的查找速度會退化到O(n),這時候就需要進行擴容處理了,擴容時會涉及大量的key計算新的hash值轉移到新表,如果key的數量很多,這將是一個成本很高的操作。在早期的redis版本中(redis 1.0時)還是直接進行rehash操作。
/* Expand or create the hashtable */
int dictExpand(dict *ht, unsigned long size)
{
// 14 lines ...
// 復制所有舊表的元素到新表
n.used = ht->used;
for (i = 0; i < ht->size && ht->used > 0; i++) {
dictEntry *he, *nextHe;
if (ht->table[i] == NULL) continue;
/* For each hash entry on this slot... */
he = ht->table[i];
while(he) {
unsigned int h;
nextHe = he->next;
/* Get the new element index */
h = dictHashKey(ht, he->key) & n.sizemask;
he->next = n.table[h];
n.table[h] = he;
ht->used--;
/* Pass to the next element */
he = nextHe;
}
}
assert(ht->used == 0);
_dictFree(ht->table);
/* Remap the new hashtable in the old */
*ht = n;
return DICT_OK;
}
(2) 漸進式rehash出現
顯然,當哈希表巨大無比的時候,這樣重的操作對于單線程的redis是不可以接受的,于是redis在2.x引入了漸進式rehash的方式,漸進式rehash將大而重的rehash操作分解為一個一個小的操作,將消耗均攤到每一個add請求中,我們從set key value 看起,看看redis rehash具體是怎么做的。
(3) 漸進式rehash-單步rehash
首先 setGenericCommand(),最終會調用dictAddRawd()函數,dictAddRawd()會在dict中新增一個key,然后調用dictSetVal()函數將值寫進去。
// setGenericCommand()
// setKey(c->db,key,val);
// dbAdd(db,key,val);
// dictAdd(db->dict, copy, val);
int dictAdd(dict *d, void *key, void *val)
{
dictEntry *entry = dictAddRaw(d,key); // 調用dictAddRaw 增加一個可以
if (!entry) return DICT_ERR;
dictSetVal(d, entry, val); // 為這個可以設置value
return DICT_OK;
}
接下看看dictAddRaw(),這個函數比較清晰,首先先看看是不是正在處于rehash狀態(tài)(通過判斷rehashIdx == -1),如果是則進行一步rehash,然后調用 _dictKeyIndex() 拿到這個新元素在表里面的index,接下來就是為新的entry分配內存,將節(jié)點插在頭部,最后設置entry的key字段。
#define dictIsRehashing(ht) ((ht)->rehashidx != -1) // 判斷是否在rehash狀態(tài)
dictEntry *dictAddRaw(dict *d, void *key)
{
int index;
dictEntry *entry;
dictht *ht;
if (dictIsRehashing(d)) _dictRehashStep(d); // 是否正在rehash,如果是進行一步
if ((index = _dictKeyIndex(d, key)) == -1) // 找到新元素的index, -1表示元素已經存在
return NULL;
/* 分配內存 存儲新的entry */
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
entry = zmalloc(sizeof(*entry));
entry->next = ht->table[index];
ht->table[index] = entry;
ht->used++;
/* 設置entry的key */
dictSetKey(d, entry, key);
return entry;
}
我們重點看看_dictRehashStep做了什么,首先判斷了迭代器是不是為0,判斷是不是迭代器正在遍歷字典,如果有則不rehash,如果沒有則開始執(zhí)行一步rehash,首先校驗下rehash是不是正在進行,如果不是則退出;接下來通過判斷d->ht[0].used == 0(ht[0]表的元素完全被轉移到ht[1]中了)看看是不是rehash完了,如果是的話,將ht[1]作為主表,清理原本的ht[0],并且將rehash標志位置為-1 (d->rehashidx = -1) 當然,如果沒有rehash完,則開始rehash操作,首先找到第一個不為空的槽位,然后把這個槽位以及后面的entry list一個一個轉移到ht[1]中。
static void _dictRehashStep(dict *d) { // 執(zhí)行單步rehash
if (d->iterators == 0) dictRehash(d,1); // 如果迭代器值為0,可以進行進行一步rehash
}
int dictRehash(dict *d, int n) {
if (!dictIsRehashing(d)) return 0; //如果沒有正在rehash,撤出
while(n--) {
dictEntry *de, *nextde;
/* 檢查是否全表都rehash完畢了 通過ht[0]的used判斷 */
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned)d->rehashidx);
while(d->ht[0].table[d->rehashidx] == NULL) d->rehashidx++; // 找到第一個不為0的index
de = d->ht[0].table[d->rehashidx]; // 取出這個槽位的元素
/* 將這個槽位下面的entry都移動到新表ht[1] */
while(de) {
unsigned int h;
nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d->ht[1].sizemask; //求出在新表的哈希值
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de; // 頭插法插到新表的頭部
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
}
return 1;
}
(4) rehash擴容時機
什么時候開始rehash呢,這就看_dictKeyIndex()了,它調用了_dictExpandIfNeeded()函數。
static int _dictKeyIndex(dict *d, const void *key)
{
unsigned int h, idx, table;
dictEntry *he;
/* Expand the hash table if needed */
if (_dictExpandIfNeeded(d) == DICT_ERR)
return -1;
// 12-lines... Compute the key hash value
}
_dictExpandIfNeeded()函數首先看看是不是初始化擴容,如果表ht[0]的size是0,則進行初始化擴容;如果不是則計算負載因子是不是達到了擴容的閾值,然后調用dictExpand()
static int _dictExpandIfNeeded(dict *d)
{
/* 正在進行rehash返回ok */
if (dictIsRehashing(d)) return DICT_OK;
/* ht[0].size 為0 表示這是初始化擴容 */
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
// used >size 達到擴容條件;如果配置可以擴容或者達到一個安全值(影響性能的閾值)觸發(fā)擴容
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize ||
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio)) // 達到負載因子
{
return dictExpand(d, ((d->ht[0].size > d->ht[0].used) ?
d->ht[0].size : d->ht[0].used)*2); // 擴容為原來的兩倍
}
return DICT_OK;
}
(5) 漸進式rehash前置準備
dictExpand函數中,分配一個新的ht表,初始化參數,如果ht[0]是空的,說明是初始化擴容,直接將新創(chuàng)建的ht表給到ht[0],如果不是則將新創(chuàng)建的ht表給到ht[1],并且將rehashidx置為0,這時候就開始rehash了。
/* Expand or create the hash table */
int dictExpand(dict *d, unsigned long size)
{
dictht n; /* the new hash table */
unsigned long realsize = _dictNextPower(size);
/* the size is invalid if it is smaller than the number of
* elements already inside the hash table */
if (dictIsRehashing(d) || d->ht[0].used > size)
return DICT_ERR;
/* Allocate the new hash table and initialize all pointers to NULL */
n.size = realsize;
n.sizemask = realsize-1;
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;
/* Is this the first initialization? If so it's not really a rehashing
* we just set the first hash table so that it can accept keys. */
if (d->ht[0].table == NULL) {
d->ht[0] = n;
return DICT_OK;
}
/* Prepare a second hash table for incremental rehashing */
d->ht[1] = n;
d->rehashidx = 0;
return DICT_OK;
}
(6) 小結
redis漸進式rehash的過程如下圖:
- 在有添加key的操作中,會調用dictAddRaw()函數,這里會根據rehashidx== -1看看是不是在rehash中,如果是則進行單步rehash;
- 另外再嘗試獲取一個entry的index時,redis會看看當前ht[0]表是不是該擴容了,如果是則分配ht[1]的資源并將rehashidx置為0,開始reshsh;
- 當ht[0].used變?yōu)?時,則認為rehash完成了,這時候將h[1]作為主表,釋放之前h[0]的資源
3. redis對象生命周期
(1) redisObject與refcount
上文中我們了解到,為了高效管理內存,避免在命令處理時產生的拷貝,redis提出了share everything的思想,采用了引用計數法管理內存,計數在redisObject->refcount字段,在新創(chuàng)建redisObject時設置refcount為1,調用incrRefCount(),引用計數會+1
static robj *
createObject(int type, void *ptr) {
robj *o;
// 8-lines...
o->type = type;
o->ptr = ptr;
o->refcount = 1; //設置refcount為1
return o;
}
static void incrRefCount(robj *o) {
o->refcount++;
//3 -lines
}
調用decrRefCount()時,會講refcount-1,當引用計數為0時,會按照類型進行內存釋放。
static void decrRefCount(void *obj) {
robj *o = obj;
// 4-lines ...
if (--(o->refcount) == 0) { // 先-- 引用計數減一;如果是0則進行內存釋放
switch(o->type) {
case REDIS_STRING: freeStringObject(o); break;
case REDIS_LIST: freeListObject(o); break;
case REDIS_SET: freeSetObject(o); break;
case REDIS_HASH: freeHashObject(o); break;
default: assert(0 != 0); break;
}
if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
!listAddNodeHead(server.objfreelist,o))
zfree(o);
}
}
(2) set命令key/value生命周期
讓我們以set命令為例,看看key和value聲明周期是怎么管理的 首先從上文知道 當調用set命令發(fā)送到服務端時會調用readQueryFromClient讀取并進行預處理時會調用createObject,將key和value轉成redisObject放到argv中,這時候key:1 value:1
static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
// ....
for (j = 0; j < argc; j++) {
if (sdslen(argv[j])) {
c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
c->argc++;
} else {
sdsfree(argv[j]);
}
}
// ....
}
然后調用processCommand()處理命令,processCommand()通過lookupCommand()拿到:
static int processCommand(redisClient *c) {
// 11 lines...
cmd = lookupCommand(c->argv[0]->ptr); // 從表里面查找命令
// 46 lines ... cmd、argv、argc、內存、認證等校驗
dirty = server.dirty;
cmd->proc(c); // 執(zhí)行命令
// 4 lines.. 變更通知給連接的從服務器(slaves)和監(jiān)控客戶端(monitors)
if (c->flags & REDIS_CLOSE) { // 清理client命令相關字段
freeClient(c);
return 0;
}
resetClient(c);
return 1;
}
最終執(zhí)行了setGenericCommand(),這里如果是正常處理的話,可以看到調用了incrRefCount將argv[1]和argv[2]的引用計數都增加了1,這時候key:2 value:2
static void setGenericCommand(redisClient *c, int nx) {
int retval;
retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
if (retval == DICT_ERR) {
if (!nx) {
dictReplace(c->db->dict,c->argv[1],c->argv[2]);
incrRefCount(c->argv[2]);
} else {
addReply(c,shared.czero);
return;
}
} else {
incrRefCount(c->argv[1]);
incrRefCount(c->argv[2]);
}
server.dirty++;
removeExpire(c->db,c->argv[1]);
addReply(c, nx ? shared.cone : shared.ok);
}
在processCommand()執(zhí)行結束的時候,會調用resetClient重制客戶端資源為下次命令做好準備,這時候會將argv的引用計數全部-1,這時候key:1 value:1,就僅僅是dict存儲的那一份了再引用了。
static int processCommand(redisClient *c) {
//
resetClient(c);
return 1;
}
static void resetClient(redisClient *c) {
freeClientArgv(c);
c->bulklen = -1;
}
static void freeClientArgv(redisClient *c) {
int j;
for (j = 0; j < c->argc; j++)
decrRefCount(c->argv[j]);
c->argc = 0;
}
小結:整個過程可以通過下圖看到:
- 命令處理時,createObject()得到keyObj valueObj,refcount都為1
- 執(zhí)行setCommand的時候,調用了incrRefCount()方法,兩者refCount都變?yōu)?
- 在processCommand調用結束的時候,執(zhí)行resetClient清理資源為下條做準備時,執(zhí)行了decrRefCount,兩者又都變?yōu)?,此時,key val的引用計數為1,即在dict中存在的一個引用
get命令我們不過多闡述,這里闡述下具體過程:
- 命令處理時,createObject()得到keyObj valueObj,key refcount為1
- 然后調用getCommand后,再調用dictFind,在找到后addreply的時候調用了incrRefCount(),value的refcount此時從1變?yōu)?
- 在命令執(zhí)行完畢的時候,會重置客戶端,執(zhí)行了decrRefCount,此時key的refcount變?yōu)?,被清除掉
- 在reply元素傳輸完畢刪除的時候調用listDelNode刪除元素,然后會調用list->free函數,free函數實際上是decrRefCount,這是value的refcount由2變?yōu)?。
五、總結與展望
本文闡述了三方面的基本內容:
- redis基本的概念,包含redisServer redisDb redisClient aeEventLoop等核心概念;
- 介紹了redis啟動和命令處理的基本流程;
- 介紹了重要的過程,過期、漸進式rehash和redisObject生命周期,當然redis 除了本文介紹了單機方面的一些知識外,還有分布式、集群、數據結構等很多值得學習的地方。