來聊聊去中心化 Redis 集群節(jié)點如何完成通信
一、寫在文章開頭
今天我們來聊點有意思的,關(guān)于redis中集群間通信的設(shè)計與實現(xiàn),本文將從源碼的角度分析redis集群節(jié)點如何利用Gossip協(xié)議完成節(jié)點間的通信與傳播,希望對你有幫助。
二、詳解Redis集群節(jié)點通信的設(shè)計與實現(xiàn)
1. 詳解Gossip協(xié)議
在此之前我們先簡單介紹一下Gossip協(xié)議,該協(xié)議是分布式集群的一種通信協(xié)議,我們都知道管理集群的方式有中心化和去中心化兩種方式,中心化的方式是通過第一個第三方的管理中心,例如zookeeper等來維護一份集群節(jié)點的信息、狀態(tài):
而redis采用的是去中心化的方式實現(xiàn)集群節(jié)點通信,即通過Gossip協(xié)議進行節(jié)點通信,讓各個節(jié)點之間兩兩通信,廣播與自己保持交流的節(jié)點,由此將節(jié)點串聯(lián)起來構(gòu)成一張關(guān)系網(wǎng):
我們以一個簡單的場景為例介紹一下Gossip協(xié)議,默認情況下我們的當(dāng)前有3個節(jié)點的集群,各個節(jié)點彼此按照通信要求發(fā)送自己的信息和與自己保持交流的節(jié)點,由此將有限的資源共享出去構(gòu)成一個集群。
此時,我們需要橫向擴展一個節(jié)點4,我們只需配置/redis-cli --cluster add-node 新節(jié)點IP:新節(jié)點端口 任意存活節(jié)點IP:任意存活節(jié)點端口,這個存活節(jié)點后續(xù)和其他節(jié)點通信時,就會將當(dāng)前新添加的節(jié)點4發(fā)送出去,由此其他節(jié)點收到這個消息并存儲下來,經(jīng)過各個節(jié)點的不斷反復(fù)通信,這個集群中的各個節(jié)點就會擁有集群中所有節(jié)點的信息。
2. 集群消息協(xié)定
任何通信都是需要按照協(xié)議規(guī)范進行,redis集群也一樣,為了保證節(jié)點間通信的規(guī)范,redis要求集群節(jié)點通信的消息的類型可以是以下幾種:
- ping消息,用來向其他節(jié)點發(fā)送節(jié)點信息。
- 回復(fù)ping的pong消息。
- 如果當(dāng)前節(jié)點中存在新添加的節(jié)點,則通過meet格式的消息發(fā)送給其他節(jié)點。
- 如果節(jié)點出現(xiàn)故障,則發(fā)送fail消息告知集群其他節(jié)點。
對此我們給出消息的宏定義代碼,位于cluster.h中:
//集群中的ping
#define CLUSTERMSG_TYPE_PING 0 /* Ping */
//集群中的pong
#define CLUSTERMSG_TYPE_PONG 1 /* Pong (reply to Ping) */
//想加入集群的節(jié)點
#define CLUSTERMSG_TYPE_MEET 2 /* Meet "let's join" message */
//某個節(jié)點有故障
#define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */
3. 集群節(jié)點消息體
后續(xù)集群都會通過clusterMsg來表示一條消息,它記錄消息長度以及發(fā)送節(jié)點名稱、負責(zé)的slots以及節(jié)點端口號等信息:
typedef struct {
char sig[4];
//消息總長度
uint32_t totlen;
//......
//消息類型
uint16_t type;
//......
//發(fā)送節(jié)點的名稱
char sender[REDIS_CLUSTER_NAMELEN];
//發(fā)送節(jié)點負責(zé)的slots
unsigned char myslots[REDIS_CLUSTER_SLOTS/8];
//......
char notused1[32];
//節(jié)點端口
uint16_t port;
//......
//記錄消息的消息體
union clusterMsgData data;
} clusterMsg;
這里我們對這個消息體clusterMsgData進行展開說明一下,可以看到他用一段共用體維護各種類型消息的結(jié)構(gòu),這其中我們只需要了解的是ping消息,從注釋可以看到ping消息這個結(jié)構(gòu)體可以發(fā)送ping、meet、pong等類型消息,ping消息類型其內(nèi)部用clusterMsgDataGossip數(shù)組維護,這一點這個消息可以包含多個節(jié)點信息存于數(shù)組中:
union clusterMsgData {
//可以發(fā)送ping meet pong的消息,該結(jié)構(gòu)體內(nèi)部有clusterMsgDataGossip數(shù)組,這意味這個結(jié)構(gòu)體可以存放多個節(jié)點的消息
struct {
/* Array of N clusterMsgDataGossip structures */
clusterMsgDataGossip gossip[1];
} ping;
//......
};
步入clusterMsgDataGossip即可看到這個結(jié)構(gòu)體存儲的是需要發(fā)送給它人的節(jié)點名稱、ping和收到ping的時間以及端口號等信息:
typedef struct {
char nodename[REDIS_CLUSTER_NAMELEN];//節(jié)點名稱
uint32_t ping_sent; //發(fā)送ping的事件
uint32_t pong_received;//收到pong的事件
char ip[REDIS_IP_STR_LEN]; //廣播的節(jié)點ip
uint16_t port; //節(jié)點與客戶端進行通信的端口
//......
} clusterMsgDataGossip;
我們來簡單小結(jié)一下,假設(shè)我們的某個節(jié)點向其他節(jié)點發(fā)送ping消息告知自己維護的節(jié)點信息和狀態(tài),那么對應(yīng)的消息格式大體如下圖所示:
4. 詳解集群節(jié)點ping流程
集群節(jié)點的指向流程也是交由redis的時間事件serverCron執(zhí)行,它會每個100ms執(zhí)行一次集群的定任務(wù)clusterCron方法,其內(nèi)部會檢查這個定時任務(wù)是否執(zhí)行了10次,一旦執(zhí)行10次(也就是100ms*10即每1秒)后就會隨機從當(dāng)前節(jié)點維護的其他節(jié)點信息字典表中抽取5個節(jié)點,找到最早回復(fù)pong給當(dāng)前節(jié)點發(fā)送一條ping消息:
對此我們給出定時執(zhí)行的serverCron函數(shù),可以看到其內(nèi)部每100ms執(zhí)行一次集群定時任務(wù)clusterCron:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
//......
//100ms執(zhí)行一次集群的函數(shù)
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
}
//......
}
我們步入clusterCron即可看到,該定時任務(wù)會隨機抽取5個節(jié)點然后找到最早給該節(jié)點發(fā)送pong的節(jié)點發(fā)送ping消息包:
void clusterCron(void) {
//......
// 每10次即每過去1s執(zhí)行一次這段邏輯
if (!(iteration % 10)) {
int j;
//隨機選出5個節(jié)點
for (j = 0; j < 5; j++) {
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
/* Don't ping nodes disconnected or with a ping currently active. */
//斷連、或者自己、或者正在握手的節(jié)點不處理
if (this->link == NULL || this->ping_sent != 0) continue;
if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE))
continue;
//選擇最早收到pong的節(jié)點
if (min_pong_node == NULL || min_pong > this->pong_received) {
min_pong_node = this;
min_pong = this->pong_received;
}
}
//向最早收到pong的調(diào)用clusterSendPing發(fā)送消息
if (min_pong_node) {
redisLog(REDIS_DEBUG,"Pinging node %.40s", min_pong_node->name);
clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
}
}
//......
}
步入clusterSendPing即可看到我們所說的核心邏輯,即按照公式計算出要發(fā)送給最早回復(fù)pong的節(jié)點對應(yīng)節(jié)點數(shù),然后封裝成消息發(fā)送出去:
void clusterSendPing(clusterLink *link, int type) {
//......
//我們希望添加的最大節(jié)點數(shù),集群總是減去自己和正在握手的
int freshnodes = dictSize(server.cluster->nodes)-2;
//......
//計算wanted
wanted = floor(dictSize(server.cluster->nodes)/10);
if (wanted < 3) wanted = 3;
if (wanted > freshnodes) wanted = freshnodes;
//......
/* Populate the header. */
//設(shè)置ping消息頭,構(gòu)建端口號、slot等信息
if (link->node && type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime();
clusterBuildMessageHdr(hdr,type);
/* Populate the gossip fields */
int maxiterations = wanted*3;
//基于maxiterations進行循環(huán)隨機抽取自己維護的節(jié)點信息并組裝
while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
dictEntry *de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
clusterMsgDataGossip *gossip;
int j;
//如果是自己則跳過
if (this == myself) continue;
//故障節(jié)點不發(fā)送
if (maxiterations > wanted*2 &&
!(this->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL)))
continue;
//....
freshnodes--;
//組裝當(dāng)前節(jié)點的名稱、ip、端口等信息存到hdr所指向的消息結(jié)構(gòu)體
//指向gossip某個索引位置設(shè)置名稱、ip、端口等
gossip = &(hdr->data.ping.gossip[gossipcount]);
memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
gossip->ping_sent = htonl(this->ping_sent);
gossip->pong_received = htonl(this->pong_received);
memcpy(gossip->ip,this->ip,sizeof(this->ip));
gossip->port = htons(this->port);
gossip->flags = htons(this->flags);
gossip->notused1 = 0;
gossip->notused2 = 0;
gossipcount++;
}
//......
//創(chuàng)建一個發(fā)送事件提交給redis發(fā)送出去
clusterSendMessage(link,buf,totlen);
zfree(buf);
}
5. 等待pong消息回復(fù)并解析
每個集群的節(jié)點都會定時檢查和對端鏈接的連接是否斷開,如果斷開的嘗試異步非阻塞向其發(fā)送建立連接請求,并注冊一個處理器clusterReadHandler處理對端的ping等消息,所以我們上文的ping消息實際上就是通過這個函數(shù)進行解析讀?。?/p>
對此我們給出這段源碼的入口即可集群的定時任務(wù)clusterCron方法,可以看到其內(nèi)部會便利當(dāng)前節(jié)點通信的節(jié)點,查看連接是否為空,若為空則發(fā)起連接并注冊clusterReadHandler處理消息:
void clusterCron(void) {
//......
di = dictGetSafeIterator(server.cluster->nodes);
//遍歷與當(dāng)前節(jié)點保持通信的節(jié)點
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
//如果連接為空則非阻塞發(fā)起連接,然后注冊clusterReadHandler處理對端節(jié)點的消息
if (node->link == NULL) {
int fd;
mstime_t old_ping_sent;
clusterLink *link;
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
node->port+REDIS_CLUSTER_PORT_INCR, REDIS_BIND_ADDR);
//......
//創(chuàng)建鏈接對應(yīng)存儲數(shù)據(jù)的空間
link = createClusterLink(node);
link->fd = fd;
node->link = link;
//為這個鏈接注冊clusterReadHandler處理發(fā)送的消息
aeCreateFileEvent(server.el,link->fd,AE_READABLE,
clusterReadHandler,link);
//......
}
}
}
步入clusterReadHandler即可看到redis服務(wù)端解析消息存儲到buf并通過clusterProcessPacket解析的邏輯:
void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
//......
while(1) { /* Read as long as there is data to read. */
//......
//hdr指向link->rcvbuf
hdr = (clusterMsg*) link->rcvbuf;
//讀取消息到buf即link->rcvbuf中
nread = read(fd,buf,readlen);
//......
if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
//調(diào)用clusterProcessPacket解析這個連接的消息,即 link->rcvbuf
if (clusterProcessPacket(link)) {
sdsfree(link->rcvbuf);
link->rcvbuf = sdsempty();
} else {
return; /* Link no longer valid. */
}
}
}
}
而clusterProcessPacket即是該方法的核心所在,它會將對端節(jié)點發(fā)送的消息進行解析與處理,這里我們就以收到pong消息為例說明一下流程,假設(shè)回復(fù)pong的是master節(jié)點,它會更新收到這條網(wǎng)絡(luò)連接pong響應(yīng)時間,然后解析報文內(nèi)容,如果發(fā)現(xiàn)有個節(jié)點不在我們的節(jié)點列表中,將其存入node字典表中:
int clusterProcessPacket(clusterLink *link) {
//......
/* Perform sanity checks */
//消息完整性校驗
//......
/* Check if the sender is a known node. */
//檢查發(fā)送節(jié)點是否是已知節(jié)點
sender = clusterLookupNode(hdr->sender);
//......
//......
/* PING, PONG, MEET:消息處理邏輯 */
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
type == CLUSTERMSG_TYPE_MEET)
{
//......
//如果收到pong則更新pong_received為當(dāng)前時間
if (link->node && type == CLUSTERMSG_TYPE_PONG) {
link->node->pong_received = mstime();
link->node->ping_sent = 0;
//......
}
//......
//如果當(dāng)前節(jié)點是已知節(jié)點,則調(diào)用clusterProcessGossipSection查看當(dāng)前pong消息中的內(nèi)容是否包含未知的、新加入的節(jié)點
if (sender) clusterProcessGossipSection(hdr,link);
} else if (type == CLUSTERMSG_TYPE_FAIL) {
//......
}
//......
return 1;
}
步入clusterProcessGossipSection即可看到該函數(shù)會遍歷消息中的節(jié)點,一旦發(fā)現(xiàn)該節(jié)點是新添加節(jié)點則調(diào)用clusterStartHandshake其存入nodes字典表中:
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
uint16_t count = ntohs(hdr->count);
//解析當(dāng)前節(jié)點gossip消息內(nèi)容
clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
//遍歷node
while(count--) {
//......
//打印當(dāng)前節(jié)點信息
redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
g->nodename,
g->ip,
ntohs(g->port),
ci);
node = clusterLookupNode(g->nodename);
if (node) {//已知節(jié)點處理,如果不可通信才握手重連
//......
} else {//未知節(jié)點則發(fā)起握手,若握手建立通信成功則將其存入nodes字典中
//......
if (sender &&
!(flags & REDIS_NODE_NOADDR) &&
!clusterBlacklistExists(g->nodename))
{
clusterStartHandshake(g->ip,ntohs(g->port));
}
}
//走到下一個節(jié)點
g++;
}
}
我們給出clusterStartHandshake中將其存入server的cluster的nodes字典表的邏輯:
int clusterStartHandshake(char *ip, int port) {
//......
//如果處于握手中,則說明之前已經(jīng)發(fā)現(xiàn)并進行通信了,直接返回
if (clusterHandshakeInProgress(norm_ip,port)) {
errno = EAGAIN;
return 0;
}
//基于消息創(chuàng)建node結(jié)構(gòu)其,并調(diào)用clusterAddNode將其存入server.cluster->nodes字典表中
n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
memcpy(n->ip,norm_ip,sizeof(n->ip));
n->port = port;
clusterAddNode(n);
return 1;
}
三、小結(jié)
來簡單小結(jié)一下Redis集群節(jié)點如何通過Gossip協(xié)議構(gòu)建集群網(wǎng)絡(luò)的:
- 新節(jié)點通過meet和集群中某個節(jié)點a建立連接。
- 當(dāng)前節(jié)點執(zhí)行clusterCron定時任務(wù)時,隨機抽取5個節(jié)點并找到最早回復(fù)pong的實例,假設(shè)是節(jié)點a,發(fā)送ping消息。
- 注冊clusterReadHandler處理器其他節(jié)點發(fā)送的消息。
- 收到節(jié)點a的pong消息回復(fù),判斷查看該節(jié)點是否是已知節(jié)點,如果是則調(diào)用clusterProcessGossipSection解析報文內(nèi)容,如果存在新節(jié)點則進行握手通信,如果連接建立成功則將該節(jié)點存入當(dāng)前實例的nodes節(jié)點中。