來(lái)聊聊 Redis 集群數(shù)據(jù)遷移
一、詳解redis cluster數(shù)據(jù)遷移過(guò)程
1. 節(jié)點(diǎn)基本結(jié)構(gòu)定義
redis集群提供16384個(gè)slot,我們可以按需分配給節(jié)點(diǎn)上,后續(xù)進(jìn)行鍵值對(duì)存儲(chǔ)時(shí),我們就可以按照算法將鍵值對(duì)存到對(duì)應(yīng)slot上的redis服務(wù)器上:
集群節(jié)點(diǎn)本質(zhì)就是通過(guò)slots這個(gè)數(shù)組記錄當(dāng)前節(jié)點(diǎn)的所管理的情況,這里我們可以看到slots是一個(gè)char 數(shù)組,長(zhǎng)度為REDIS_CLUSTER_SLOTS(16384)除8,這樣做的原因是因?yàn)?
- char占1個(gè)字節(jié),每個(gè)字節(jié)8位。
- 每個(gè)char可以記錄8個(gè)slot的情況,如果是自己的slot則對(duì)應(yīng)char的某一個(gè)位置記錄為1:
我們以node-1為例,因?yàn)樗?fù)責(zé)0-5460的節(jié)點(diǎn),所以它的slots0-5460都為1,對(duì)應(yīng)的圖解如下所示,可以看到筆者這里省略了后半部分,僅僅表示了0-15位置為1:
對(duì)此我們也給出這段redis中節(jié)點(diǎn)的定義,即位于cluster.h中的clusterNode這個(gè)結(jié)構(gòu)體中,可以看slots這段定義:
typedef struct clusterNode {
//......
//記錄集群負(fù)責(zé)的槽,總的為16384
unsigned char slots[REDIS_CLUSTER_SLOTS/8];
//......
}
2. 設(shè)置slot后續(xù)節(jié)點(diǎn)遷移
以本文示例為例,我們希望后續(xù)節(jié)點(diǎn)2的數(shù)據(jù)全部存到節(jié)點(diǎn)1中,那么我們首先需要鍵入如下兩條配置:
# 在節(jié)點(diǎn)1上執(zhí)行,將節(jié)點(diǎn)2數(shù)據(jù)導(dǎo)入到節(jié)點(diǎn)1上
CLUSTER SETSLOT 3 IMPORTING node2
# 在節(jié)點(diǎn)2上執(zhí)行,將自己的數(shù)據(jù)遷移到節(jié)點(diǎn)1
CLUSTER SETSLOT 3 MIGRATING node1
這兩條指最終都會(huì)被各自的服務(wù)端解析,并調(diào)用clusterCommand執(zhí)行,我們以節(jié)點(diǎn)1導(dǎo)入為例,假設(shè)我們執(zhí)行clusterCommand解析到setslot 關(guān)鍵字和importing關(guān)鍵字,即知曉要導(dǎo)入其他節(jié)點(diǎn)的數(shù)據(jù)。對(duì)應(yīng)的節(jié)點(diǎn)1就會(huì)通過(guò)importing_slots_from數(shù)組標(biāo)記自己將導(dǎo)入這個(gè)slot的數(shù)據(jù),而節(jié)點(diǎn)2也會(huì)通過(guò)migrating_slots_to數(shù)組標(biāo)記自己要將數(shù)據(jù)導(dǎo)出給其他節(jié)點(diǎn)的slot:
對(duì)此我們給出clusterCommand的執(zhí)行流程,可以看到該函數(shù)解析出migrating或者importing關(guān)鍵字時(shí)就會(huì)將對(duì)的migrating_slots_to或者importing_slots_from數(shù)組對(duì)應(yīng)slot位置的索引位置設(shè)置為當(dāng)前上述命令傳入的node id:
void clusterCommand(redisClient *c) {
//......
if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {//處理遷出的邏輯
//看看自己是否有遷出的slot,沒(méi)有則報(bào)錯(cuò)
if (server.cluster->slots[slot] != myself) {
addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
return;
}
//查看自己是否知曉這個(gè)node id,如果沒(méi)有則報(bào)錯(cuò)
if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
addReplyErrorFormat(c,"I don't know about node %s",
(char*)c->argv[4]->ptr);
return;
}
//標(biāo)記遷出到slot為傳入的node
server.cluster->migrating_slots_to[slot] = n;
} else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {//處理遷入的邏輯
//查看遷入的slot是否已經(jīng)配置,如果有則報(bào)錯(cuò)
if (server.cluster->slots[slot] == myself) {
addReplyErrorFormat(c,
"I'm already the owner of hash slot %u",slot);
return;
}
//查看自己是否知曉要遷入數(shù)據(jù)的node的信息,如果不知道則報(bào)錯(cuò)
if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
addReplyErrorFormat(c,"I don't know about node %s",
(char*)c->argv[3]->ptr);
return;
}
//標(biāo)記遷入slot位置為傳入的nodeid
server.cluster->importing_slots_from[slot] = n;
} //......
}
3. 請(qǐng)求重定向問(wèn)題
后續(xù)的我們假設(shè)還是將set key value請(qǐng)求發(fā)送到節(jié)點(diǎn)2,因?yàn)樯鲜雒畹脑?,?jié)點(diǎn)會(huì)返回move/ask告知客戶端這個(gè)鍵值對(duì)現(xiàn)在要存到節(jié)點(diǎn)1上。對(duì)應(yīng)節(jié)點(diǎn)1收到這個(gè)key請(qǐng)求時(shí),通過(guò)key計(jì)算得slot正是自己,它就會(huì)將這個(gè)鍵值對(duì)存儲(chǔ)到自己的數(shù)據(jù)庫(kù)中:
這里我們以節(jié)點(diǎn)1的角度查看這個(gè)問(wèn)題,當(dāng)客戶端收到move指令后,繼續(xù)向節(jié)點(diǎn)1發(fā)送指令,節(jié)點(diǎn)1通過(guò)收到指令調(diào)用processCommand,其內(nèi)部調(diào)用getNodeByQuery獲取當(dāng)前key對(duì)應(yīng)的slot,發(fā)現(xiàn)是自己則直接存儲(chǔ)數(shù)據(jù)到當(dāng)前節(jié)點(diǎn)的內(nèi)存數(shù)據(jù)庫(kù)中:
int processCommand(redisClient *c) {
//......
//如果開(kāi)啟了集群模式,且發(fā)送者不是master且參數(shù)帶key則進(jìn)入邏輯
if (server.cluster_enabled &&
!(c->flags & REDIS_MASTER) &&
!(c->flags & REDIS_LUA_CLIENT &&
server.lua_caller->flags & REDIS_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
{
int hashslot;
if (server.cluster->state != REDIS_CLUSTER_OK) {
//......
} else {
int error_code;
//查找鍵值對(duì)對(duì)應(yīng)的slot和這個(gè)slot負(fù)責(zé)的節(jié)點(diǎn)
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
//如果為空且或者非自己,則轉(zhuǎn)交出去給別人處理
if (n == NULL || n != server.cluster->myself) {
flagTransaction(c);
clusterRedirectClient(c,n,hashslot,error_code);
return REDIS_OK;
}
}
}
//......
//將鍵值對(duì)存儲(chǔ)到當(dāng)前數(shù)據(jù)庫(kù)中
}
我們以節(jié)點(diǎn)的視角再次直接步入getNodeByQuery查看這段邏輯,可以看到其內(nèi)部會(huì)基于key計(jì)算slot然后將得到對(duì)應(yīng)的node,然后進(jìn)行如下判斷:
- 如果本次客戶端請(qǐng)求是一個(gè)批量的請(qǐng)求,且第一個(gè)key定位不到響應(yīng)的slot,直接返回錯(cuò)誤。
- 如果key的slot屬于當(dāng)前節(jié)點(diǎn),且當(dāng)前節(jié)點(diǎn)正在遷出并且當(dāng)前節(jié)點(diǎn)查不到這個(gè)key,則響應(yīng)一個(gè)ask標(biāo)識(shí)告知客戶端到遷出的節(jié)點(diǎn)詢問(wèn)一下是否有數(shù)據(jù)。
- 如果是key屬于當(dāng)前節(jié)點(diǎn)且正在進(jìn)行導(dǎo)入,且key定位不到則響應(yīng)異常,反之說(shuō)明當(dāng)前節(jié)點(diǎn)導(dǎo)入成功,直接返回當(dāng)前節(jié)點(diǎn)信息。
- 如果定位到的slot屬于別的節(jié)點(diǎn),則響應(yīng)一個(gè)move告知客戶端到別的節(jié)點(diǎn)獲取鍵值對(duì)。
對(duì)應(yīng)的我們給出這段代碼函數(shù)getNodeByQuery,對(duì)應(yīng)的邏輯和筆者上述給出的核心分支一致:
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
//.......
//如果是exec命令則用mstate封裝這些命令
if (cmd->proc == execCommand) {
/* If REDIS_MULTI flag is not set EXEC is just going to return an
* error. */
if (!(c->flags & REDIS_MULTI)) return myself;
ms = &c->mstate;
} else {
//為了一個(gè)原子指向,創(chuàng)建一個(gè)假的multi記錄這些指令
ms = &_ms;
_ms.commands = &mc;
//命令個(gè)數(shù)1
_ms.count = 1;
//命令參數(shù)
mc.argv = argv;
//命令參數(shù)個(gè)數(shù)
mc.argc = argc;
//對(duì)應(yīng)的命令
mc.cmd = cmd;
}
//遍歷命令
for (i = 0; i < ms->count; i++) {
//.......
//解析出key以及個(gè)數(shù)
keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
for (j = 0; j < numkeys; j++) {
//拿到key
robj *thiskey = margv[keyindex[j]];
//計(jì)算slot
int thisslot = keyHashSlot((char*)thiskey->ptr,
sdslen(thiskey->ptr));
if (firstkey == NULL) {
firstkey = thiskey;
slot = thisslot;
//拿著slot找到對(duì)應(yīng)的集群節(jié)點(diǎn)
n = server.cluster->slots[slot];
//如果當(dāng)前查詢的key是第一個(gè)key且找不到,則將錯(cuò)誤碼設(shè)置為REDIS_CLUSTER_REDIR_DOWN_UNBOUND并返回空
if (n == NULL) {
getKeysFreeResult(keyindex);
if (error_code)
*error_code = REDIS_CLUSTER_REDIR_DOWN_UNBOUND;
return NULL;
}
//如果就是當(dāng)前節(jié)點(diǎn)正在做遷出或者遷入,則migrating_slot/importing_slot設(shè)置為1
if (n == myself &&
server.cluster->migrating_slots_to[slot] != NULL)
{
migrating_slot = 1;
} else if (server.cluster->importing_slots_from[slot] != NULL) {
importing_slot = 1;
}
} else {
//.......
}
//如果正在做遷出或者嵌入找不到當(dāng)前db找不到key的位置,則missing_keys++
if ((migrating_slot || importing_slot) &&
lookupKeyRead(&server.db[0],thiskey) == NULL)
{
missing_keys++;
}
}
getKeysFreeResult(keyindex);
}
//所有key都沒(méi)有對(duì)應(yīng)slot節(jié)點(diǎn),直接返回當(dāng)前節(jié)點(diǎn)
if (n == NULL) return myself;
//.......
//正在遷出且key找不到位置,錯(cuò)誤碼設(shè)置為ask并返回遷出的目標(biāo)節(jié)點(diǎn),讓客戶端到別的節(jié)點(diǎn)嘗試看看
if (migrating_slot && missing_keys) {
if (error_code) *error_code = REDIS_CLUSTER_REDIR_ASK;
return server.cluster->migrating_slots_to[slot];
}
//如果是節(jié)點(diǎn)正在導(dǎo)入且key找不到則返回,標(biāo)識(shí)當(dāng)前集群不穩(wěn)定
if (importing_slot &&
(c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING))
{
if (multiple_keys && missing_keys) {
if (error_code) *error_code = REDIS_CLUSTER_REDIR_UNSTABLE;
return NULL;
} else {
//反之說(shuō)明導(dǎo)入成功則告知自己可以找到這個(gè)鍵值對(duì)
return myself;
}
}
//......
//返回其他節(jié)點(diǎn),error_code設(shè)置為move
if (n != myself && error_code) *error_code = REDIS_CLUSTER_REDIR_MOVED;
return n;
}
4. 完成節(jié)點(diǎn)遷移
上述操作僅僅針對(duì)新節(jié)點(diǎn)的告知要處理的新的slot,對(duì)于舊的節(jié)點(diǎn)的舊有slot數(shù)據(jù)我們就需要通過(guò)節(jié)點(diǎn)2鍵入CLUSTER GETKEYSINSLOT slot count要遷移的舊的key的slot,然后通過(guò)MIGRATE host port key dbid timeout [COPY | REPLACE]將數(shù)據(jù)遷移到節(jié)點(diǎn)1上。 這里我們補(bǔ)充一下MIGRATE 中copy和replace的區(qū)別,前者是遇到重復(fù)直接報(bào)錯(cuò),后者是遷移時(shí)直接覆蓋。 最終這條指令回基于要遷移的key而生成一條RESTORE-ASKING key ttl serialized-value [REPLACE] [ABSTTL] [IDLETIME seconds] [FREQ frequency]指令發(fā)送給導(dǎo)入的節(jié)點(diǎn),以本文例子來(lái)說(shuō)就是節(jié)點(diǎn)1:
這里我們給出MIGRATE 指令對(duì)應(yīng)的處理函數(shù)migrateCommand,邏輯和我上文說(shuō)的差不多,基于指令解析出replace或者copy等信息,然后用argv[3]即我們的key得出這個(gè)鍵值對(duì)的信息生成RESTORE指令將鍵值對(duì)轉(zhuǎn)存給節(jié)點(diǎn)1:
/* 命令 MIGRATE host port key dbid timeout [COPY | REPLACE] */
void migrateCommand(redisClient *c) {
//......
//解析拷貝和替代選項(xiàng),前者重復(fù)會(huì)報(bào)錯(cuò)
for (j = 6; j < c->argc; j++) {
if (!strcasecmp(c->argv[j]->ptr,"copy")) {
copy = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
replace = 1;
} else {
addReply(c,shared.syntaxerr);
return;
}
}
//......
//查看要遷移的key是否存在嗎,如果不存則直接報(bào)錯(cuò)返回
if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
addReplySds(c,sdsnew("+NOKEY\r\n"));
return;
}
/* Connect */
//建立socket連接
cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
//......
//cmd初始化一個(gè)buf緩沖區(qū)
rioInitWithBuffer(&cmd,sdsempty());
/* Send the SELECT command if the current DB is not already selected. */
//如果尚未選擇當(dāng)前DB,則發(fā)送SELECT命令。
int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
if (select) {
redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
}
/* Create RESTORE payload and generate the protocol to call the command. */
//獲取key的過(guò)期時(shí)效
expireat = getExpire(c->db,c->argv[3]);
if (expireat != -1) {
ttl = expireat-mstime();
if (ttl < 1) ttl = 1;
}
//集群用RESTORE-ASKING發(fā)送key給目標(biāo)
if (server.cluster_enabled)
redisAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
else
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
//填充key和value ttl等
redisAssertWithInfo(c,NULL,sdsEncodedObject(c->argv[3]));
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,
sdslen(c->argv[3]->ptr)));
redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
//......
//遷移指令字符串寫(xiě)入緩沖區(qū)
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr)));
//......
//如果是replace發(fā)出 REPLACE
if (replace)
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
//......
}
5. 最后調(diào)整
最后我們只需在節(jié)點(diǎn)1和2都執(zhí)行CLUSTER SETSLOT <SLOT> NODE <NODE ID> 完成slot指派,這指令最終就會(huì)走到clusterCommand中,節(jié)點(diǎn)1和節(jié)點(diǎn)2各自的處理邏輯為:
- 節(jié)點(diǎn)2看看遷移的key的數(shù)量未0且migrating_slots_to數(shù)據(jù)不為空,若符合要求,則說(shuō)明本次遷移完成但狀態(tài)未修改,直接將migrating_slots_to置空完成指派最后調(diào)整。
- 節(jié)點(diǎn)1查看節(jié)點(diǎn)id是否是自己,且importing_slots_from是否有數(shù)據(jù),若有則說(shuō)明節(jié)點(diǎn)導(dǎo)入完成,直接將importing_slots_from置空。
void clusterCommand(redisClient *c) {
//......
else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {//處理setslot指令
//......
else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
/* CLUSTER SETSLOT <SLOT> NODE <NODE ID> 標(biāo)記最終遷移的節(jié)點(diǎn) */
clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
//......
//如果發(fā)現(xiàn)對(duì)應(yīng)的key為0,且migrating_slots_to不為空,則說(shuō)明遷出完成但狀態(tài)還未修改,節(jié)點(diǎn)2會(huì)將migrating_slots_to設(shè)置為空
if (countKeysInSlot(slot) == 0 &&
server.cluster->migrating_slots_to[slot])
server.cluster->migrating_slots_to[slot] = NULL;
//如果是節(jié)點(diǎn)1則會(huì)看指令的nodeid是否是自己且importing_slots_from是否有數(shù)據(jù),若有則說(shuō)明導(dǎo)入成功直接將importing_slots_from設(shè)置為空
if (n == myself &&
server.cluster->importing_slots_from[slot])
{
//......
server.cluster->importing_slots_from[slot] = NULL;
}
}
//......
}
二、小結(jié)
自此我們將redis集群中的所有核心設(shè)計(jì)都分析完成,我們來(lái)簡(jiǎn)單小結(jié)一下整體過(guò)程:
- 通過(guò)CLUSTER IMPORTING/MIGRATING 進(jìn)行slot遷入或遷出,redis服務(wù)端通過(guò)一個(gè)數(shù)組維護(hù)遷入和遷出的slot的信息。
- 后續(xù)客戶端發(fā)起請(qǐng)求獲取對(duì)應(yīng)的slot的信息時(shí),會(huì)通過(guò)上述兩個(gè)數(shù)組獲知節(jié)點(diǎn)遷移情況已做出結(jié)果響應(yīng)。
- 通過(guò)步驟1能夠告知對(duì)應(yīng)的slot的新數(shù)據(jù)的存儲(chǔ)指向,對(duì)于舊數(shù)據(jù)我們還是需要通過(guò)指令完成遷移,其本質(zhì)就是服務(wù)端定位到對(duì)應(yīng)slot上的key然后生成RESP規(guī)范的協(xié)議指令通知到遷移的節(jié)點(diǎn)上。
- 以遷出節(jié)點(diǎn)為例,看到自己對(duì)應(yīng)slot的key為0且遷出數(shù)組非空,則說(shuō)明遷出完成。
由此完成一次集群的遷移。