自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

來聊聊去中心化 Redis 集群節(jié)點如何完成通信

數(shù)據(jù)庫 Redis
本文將從源碼的角度分析redis集群節(jié)點如何利用Gossip協(xié)議完成節(jié)點間的通信與傳播,希望對你有幫助。

一、寫在文章開頭

今天我們來聊點有意思的,關(guān)于redis中集群間通信的設(shè)計與實現(xiàn),本文將從源碼的角度分析redis集群節(jié)點如何利用Gossip協(xié)議完成節(jié)點間的通信與傳播,希望對你有幫助。

二、詳解Redis集群節(jié)點通信的設(shè)計與實現(xiàn)

1. 詳解Gossip協(xié)議

在此之前我們先簡單介紹一下Gossip協(xié)議,該協(xié)議是分布式集群的一種通信協(xié)議,我們都知道管理集群的方式有中心化和去中心化兩種方式,中心化的方式是通過第一個第三方的管理中心,例如zookeeper等來維護一份集群節(jié)點的信息、狀態(tài):

而redis采用的是去中心化的方式實現(xiàn)集群節(jié)點通信,即通過Gossip協(xié)議進行節(jié)點通信,讓各個節(jié)點之間兩兩通信,廣播與自己保持交流的節(jié)點,由此將節(jié)點串聯(lián)起來構(gòu)成一張關(guān)系網(wǎng):

我們以一個簡單的場景為例介紹一下Gossip協(xié)議,默認情況下我們的當(dāng)前有3個節(jié)點的集群,各個節(jié)點彼此按照通信要求發(fā)送自己的信息和與自己保持交流的節(jié)點,由此將有限的資源共享出去構(gòu)成一個集群。

此時,我們需要橫向擴展一個節(jié)點4,我們只需配置/redis-cli --cluster add-node 新節(jié)點IP:新節(jié)點端口 任意存活節(jié)點IP:任意存活節(jié)點端口,這個存活節(jié)點后續(xù)和其他節(jié)點通信時,就會將當(dāng)前新添加的節(jié)點4發(fā)送出去,由此其他節(jié)點收到這個消息并存儲下來,經(jīng)過各個節(jié)點的不斷反復(fù)通信,這個集群中的各個節(jié)點就會擁有集群中所有節(jié)點的信息。

2. 集群消息協(xié)定

任何通信都是需要按照協(xié)議規(guī)范進行,redis集群也一樣,為了保證節(jié)點間通信的規(guī)范,redis要求集群節(jié)點通信的消息的類型可以是以下幾種:

  • ping消息,用來向其他節(jié)點發(fā)送節(jié)點信息。
  • 回復(fù)ping的pong消息。
  • 如果當(dāng)前節(jié)點中存在新添加的節(jié)點,則通過meet格式的消息發(fā)送給其他節(jié)點。
  • 如果節(jié)點出現(xiàn)故障,則發(fā)送fail消息告知集群其他節(jié)點。

對此我們給出消息的宏定義代碼,位于cluster.h中:

//集群中的ping
#define CLUSTERMSG_TYPE_PING 0          /* Ping */
//集群中的pong
#define CLUSTERMSG_TYPE_PONG 1          /* Pong (reply to Ping) */
//想加入集群的節(jié)點
#define CLUSTERMSG_TYPE_MEET 2          /* Meet "let's join" message */
//某個節(jié)點有故障
#define CLUSTERMSG_TYPE_FAIL 3          /* Mark node xxx as failing */

3. 集群節(jié)點消息體

后續(xù)集群都會通過clusterMsg來表示一條消息,它記錄消息長度以及發(fā)送節(jié)點名稱、負責(zé)的slots以及節(jié)點端口號等信息:

typedef struct {
    char sig[4];       
    //消息總長度
    uint32_t totlen;  
   //......
    //消息類型
    uint16_t type;     
    //......
    //發(fā)送節(jié)點的名稱
    char sender[REDIS_CLUSTER_NAMELEN]; 
    //發(fā)送節(jié)點負責(zé)的slots
    unsigned char myslots[REDIS_CLUSTER_SLOTS/8];
    //......
    char notused1[32];  
    //節(jié)點端口
    uint16_t port;     
    //......
    //記錄消息的消息體
    union clusterMsgData data;
} clusterMsg;

這里我們對這個消息體clusterMsgData進行展開說明一下,可以看到他用一段共用體維護各種類型消息的結(jié)構(gòu),這其中我們只需要了解的是ping消息,從注釋可以看到ping消息這個結(jié)構(gòu)體可以發(fā)送ping、meet、pong等類型消息,ping消息類型其內(nèi)部用clusterMsgDataGossip數(shù)組維護,這一點這個消息可以包含多個節(jié)點信息存于數(shù)組中:

union clusterMsgData {
   //可以發(fā)送ping meet pong的消息,該結(jié)構(gòu)體內(nèi)部有clusterMsgDataGossip數(shù)組,這意味這個結(jié)構(gòu)體可以存放多個節(jié)點的消息
    struct {
        /* Array of N clusterMsgDataGossip structures */
        clusterMsgDataGossip gossip[1];
    } ping;

    //......
};

步入clusterMsgDataGossip即可看到這個結(jié)構(gòu)體存儲的是需要發(fā)送給它人的節(jié)點名稱、ping和收到ping的時間以及端口號等信息:

typedef struct {
    char nodename[REDIS_CLUSTER_NAMELEN];//節(jié)點名稱
    uint32_t ping_sent; //發(fā)送ping的事件
    uint32_t pong_received;//收到pong的事件
    char ip[REDIS_IP_STR_LEN];  //廣播的節(jié)點ip
    uint16_t port;          //節(jié)點與客戶端進行通信的端口
    //......
} clusterMsgDataGossip;

我們來簡單小結(jié)一下,假設(shè)我們的某個節(jié)點向其他節(jié)點發(fā)送ping消息告知自己維護的節(jié)點信息和狀態(tài),那么對應(yīng)的消息格式大體如下圖所示:

4. 詳解集群節(jié)點ping流程

集群節(jié)點的指向流程也是交由redis的時間事件serverCron執(zhí)行,它會每個100ms執(zhí)行一次集群的定任務(wù)clusterCron方法,其內(nèi)部會檢查這個定時任務(wù)是否執(zhí)行了10次,一旦執(zhí)行10次(也就是100ms*10即每1秒)后就會隨機從當(dāng)前節(jié)點維護的其他節(jié)點信息字典表中抽取5個節(jié)點,找到最早回復(fù)pong給當(dāng)前節(jié)點發(fā)送一條ping消息:

對此我們給出定時執(zhí)行的serverCron函數(shù),可以看到其內(nèi)部每100ms執(zhí)行一次集群定時任務(wù)clusterCron:

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    //......
    //100ms執(zhí)行一次集群的函數(shù) 
    run_with_period(100) {
        if (server.cluster_enabled) clusterCron();
    }
 //......
}

我們步入clusterCron即可看到,該定時任務(wù)會隨機抽取5個節(jié)點然后找到最早給該節(jié)點發(fā)送pong的節(jié)點發(fā)送ping消息包:

void clusterCron(void) {
   //......
    // 每10次即每過去1s執(zhí)行一次這段邏輯
    if (!(iteration % 10)) {
        int j;

       
        //隨機選出5個節(jié)點
        for (j = 0; j < 5; j++) {
            de = dictGetRandomKey(server.cluster->nodes);
            clusterNode *this = dictGetVal(de);

            /* Don't ping nodes disconnected or with a ping currently active. */
            //斷連、或者自己、或者正在握手的節(jié)點不處理
            if (this->link == NULL || this->ping_sent != 0) continue;
            if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE))
                continue;
            //選擇最早收到pong的節(jié)點    
            if (min_pong_node == NULL || min_pong > this->pong_received) {
                min_pong_node = this;
                min_pong = this->pong_received;
            }
        }
        //向最早收到pong的調(diào)用clusterSendPing發(fā)送消息
        if (min_pong_node) {
            redisLog(REDIS_DEBUG,"Pinging node %.40s", min_pong_node->name);
            clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
        }
    }

   //......
}

步入clusterSendPing即可看到我們所說的核心邏輯,即按照公式計算出要發(fā)送給最早回復(fù)pong的節(jié)點對應(yīng)節(jié)點數(shù),然后封裝成消息發(fā)送出去:

void clusterSendPing(clusterLink *link, int type) {
    //......
    //我們希望添加的最大節(jié)點數(shù),集群總是減去自己和正在握手的
    int freshnodes = dictSize(server.cluster->nodes)-2;

      //......
    //計算wanted
    wanted = floor(dictSize(server.cluster->nodes)/10);
    if (wanted < 3) wanted = 3;
    if (wanted > freshnodes) wanted = freshnodes;

      //......

    /* Populate the header. */
    //設(shè)置ping消息頭,構(gòu)建端口號、slot等信息
    if (link->node && type == CLUSTERMSG_TYPE_PING)
        link->node->ping_sent = mstime();
    clusterBuildMessageHdr(hdr,type);

    /* Populate the gossip fields */
    int maxiterations = wanted*3;
    //基于maxiterations進行循環(huán)隨機抽取自己維護的節(jié)點信息并組裝
    while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
        dictEntry *de = dictGetRandomKey(server.cluster->nodes);
        clusterNode *this = dictGetVal(de);
        clusterMsgDataGossip *gossip;
        int j;

       //如果是自己則跳過
        if (this == myself) continue;

       //故障節(jié)點不發(fā)送
        if (maxiterations > wanted*2 &&
            !(this->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL)))
            continue;

        //....

       
        freshnodes--;
        
        //組裝當(dāng)前節(jié)點的名稱、ip、端口等信息存到hdr所指向的消息結(jié)構(gòu)體
        
        //指向gossip某個索引位置設(shè)置名稱、ip、端口等
        gossip = &(hdr->data.ping.gossip[gossipcount]);
        memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
        gossip->ping_sent = htonl(this->ping_sent);
        gossip->pong_received = htonl(this->pong_received);
        memcpy(gossip->ip,this->ip,sizeof(this->ip));
        gossip->port = htons(this->port);
        gossip->flags = htons(this->flags);
        gossip->notused1 = 0;
        gossip->notused2 = 0;
        gossipcount++;
    }

     //......
     //創(chuàng)建一個發(fā)送事件提交給redis發(fā)送出去
    clusterSendMessage(link,buf,totlen);
    zfree(buf);
}

5. 等待pong消息回復(fù)并解析

每個集群的節(jié)點都會定時檢查和對端鏈接的連接是否斷開,如果斷開的嘗試異步非阻塞向其發(fā)送建立連接請求,并注冊一個處理器clusterReadHandler處理對端的ping等消息,所以我們上文的ping消息實際上就是通過這個函數(shù)進行解析讀?。?/p>

對此我們給出這段源碼的入口即可集群的定時任務(wù)clusterCron方法,可以看到其內(nèi)部會便利當(dāng)前節(jié)點通信的節(jié)點,查看連接是否為空,若為空則發(fā)起連接并注冊clusterReadHandler處理消息:

void clusterCron(void) {
    //......

    
    di = dictGetSafeIterator(server.cluster->nodes);
    //遍歷與當(dāng)前節(jié)點保持通信的節(jié)點
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);

        //如果連接為空則非阻塞發(fā)起連接,然后注冊clusterReadHandler處理對端節(jié)點的消息
        if (node->link == NULL) {
            int fd;
            mstime_t old_ping_sent;
            clusterLink *link;

            fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
                node->port+REDIS_CLUSTER_PORT_INCR, REDIS_BIND_ADDR);
            //......
            //創(chuàng)建鏈接對應(yīng)存儲數(shù)據(jù)的空間
            link = createClusterLink(node);
            link->fd = fd;
            node->link = link;
            //為這個鏈接注冊clusterReadHandler處理發(fā)送的消息
            aeCreateFileEvent(server.el,link->fd,AE_READABLE,
                    clusterReadHandler,link);
            //......
        }
    }
    
}

步入clusterReadHandler即可看到redis服務(wù)端解析消息存儲到buf并通過clusterProcessPacket解析的邏輯:

void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    //......

    while(1) { /* Read as long as there is data to read. */
       //......
       //hdr指向link->rcvbuf
       hdr = (clusterMsg*) link->rcvbuf;
        //讀取消息到buf即link->rcvbuf中
        nread = read(fd,buf,readlen);
        //......

        
        if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
            //調(diào)用clusterProcessPacket解析這個連接的消息,即 link->rcvbuf
            if (clusterProcessPacket(link)) {
                sdsfree(link->rcvbuf);
                link->rcvbuf = sdsempty();
            } else {
                return; /* Link no longer valid. */
            }
        }
    }
}

而clusterProcessPacket即是該方法的核心所在,它會將對端節(jié)點發(fā)送的消息進行解析與處理,這里我們就以收到pong消息為例說明一下流程,假設(shè)回復(fù)pong的是master節(jié)點,它會更新收到這條網(wǎng)絡(luò)連接pong響應(yīng)時間,然后解析報文內(nèi)容,如果發(fā)現(xiàn)有個節(jié)點不在我們的節(jié)點列表中,將其存入node字典表中:

int clusterProcessPacket(clusterLink *link) {
    //......

    /* Perform sanity checks */
    //消息完整性校驗
   //......

    /* Check if the sender is a known node. */
    //檢查發(fā)送節(jié)點是否是已知節(jié)點
    sender = clusterLookupNode(hdr->sender);
    //......

   //......

    /* PING, PONG, MEET:消息處理邏輯 */
    if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
        type == CLUSTERMSG_TYPE_MEET)
    {
      //......
  
  //如果收到pong則更新pong_received為當(dāng)前時間
        if (link->node && type == CLUSTERMSG_TYPE_PONG) {
            link->node->pong_received = mstime();
            link->node->ping_sent = 0;

           //......
        }

      
  //......

       
        //如果當(dāng)前節(jié)點是已知節(jié)點,則調(diào)用clusterProcessGossipSection查看當(dāng)前pong消息中的內(nèi)容是否包含未知的、新加入的節(jié)點
        if (sender) clusterProcessGossipSection(hdr,link);
    } else if (type == CLUSTERMSG_TYPE_FAIL) {
        //......
    }
 //......    
    return 1;
}

步入clusterProcessGossipSection即可看到該函數(shù)會遍歷消息中的節(jié)點,一旦發(fā)現(xiàn)該節(jié)點是新添加節(jié)點則調(diào)用clusterStartHandshake其存入nodes字典表中:

void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
    uint16_t count = ntohs(hdr->count);
    //解析當(dāng)前節(jié)點gossip消息內(nèi)容
    clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
    clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
 //遍歷node
    while(count--) {
     //......
     //打印當(dāng)前節(jié)點信息
        redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
            g->nodename,
            g->ip,
            ntohs(g->port),
            ci);
      
        node = clusterLookupNode(g->nodename);
        if (node) {//已知節(jié)點處理,如果不可通信才握手重連
           //......
        } else {//未知節(jié)點則發(fā)起握手,若握手建立通信成功則將其存入nodes字典中
            //......
            if (sender &&
                !(flags & REDIS_NODE_NOADDR) &&
                !clusterBlacklistExists(g->nodename))
            {
                clusterStartHandshake(g->ip,ntohs(g->port));
            }
        }

      //走到下一個節(jié)點
        g++;
    }
}

我們給出clusterStartHandshake中將其存入server的cluster的nodes字典表的邏輯:

int clusterStartHandshake(char *ip, int port) {
    //......
 //如果處于握手中,則說明之前已經(jīng)發(fā)現(xiàn)并進行通信了,直接返回
    if (clusterHandshakeInProgress(norm_ip,port)) {
        errno = EAGAIN;
        return 0;
    }

 //基于消息創(chuàng)建node結(jié)構(gòu)其,并調(diào)用clusterAddNode將其存入server.cluster->nodes字典表中
    n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
    memcpy(n->ip,norm_ip,sizeof(n->ip));
    n->port = port;
    clusterAddNode(n);
    return 1;
}

三、小結(jié)

來簡單小結(jié)一下Redis集群節(jié)點如何通過Gossip協(xié)議構(gòu)建集群網(wǎng)絡(luò)的:

  • 新節(jié)點通過meet和集群中某個節(jié)點a建立連接。
  • 當(dāng)前節(jié)點執(zhí)行clusterCron定時任務(wù)時,隨機抽取5個節(jié)點并找到最早回復(fù)pong的實例,假設(shè)是節(jié)點a,發(fā)送ping消息。
  • 注冊clusterReadHandler處理器其他節(jié)點發(fā)送的消息。
  • 收到節(jié)點a的pong消息回復(fù),判斷查看該節(jié)點是否是已知節(jié)點,如果是則調(diào)用clusterProcessGossipSection解析報文內(nèi)容,如果存在新節(jié)點則進行握手通信,如果連接建立成功則將該節(jié)點存入當(dāng)前實例的nodes節(jié)點中。
責(zé)任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關(guān)推薦

2022-08-28 19:36:15

數(shù)據(jù)分片KafkaRocketMQ

2025-02-24 10:07:09

Redis節(jié)點遷移集群

2022-12-08 10:49:43

2025-02-17 11:07:10

2024-11-04 15:49:43

Redis?數(shù)據(jù)遷移

2018-01-12 05:37:52

2022-02-09 15:36:49

Redis主從模式哨兵模式

2022-05-06 16:26:40

區(qū)塊鏈去中心化加密貨幣

2020-04-21 22:59:50

Redis搭建選舉

2019-01-10 13:24:46

去中心化區(qū)塊鏈監(jiān)管

2023-04-07 15:33:09

2025-02-13 11:11:53

Redis哨兵代碼

2023-10-30 08:00:00

區(qū)塊鏈去中心化

2021-02-24 10:02:19

存儲云存儲去中心化存儲

2021-02-05 10:03:31

區(qū)塊鏈技術(shù)智能

2025-02-21 15:43:29

slotredis集群

2023-04-06 08:00:36

VPC虛擬私有云Amazon

2024-02-04 09:00:00

向量查詢數(shù)據(jù)檢索MyScale

2022-05-10 16:03:30

數(shù)字貨幣去中心化VPN

2018-09-05 14:39:05

點贊
收藏

51CTO技術(shù)棧公眾號