自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

一萬字詳解 Redis Cluster Gossip 協(xié)議

存儲 存儲軟件 Redis
對于數(shù)據(jù)存儲領(lǐng)域,當(dāng)數(shù)據(jù)量或者請求流量大到一定程度后,就必然會引入分布式。比如 Redis,雖然其單機性能十分優(yōu)秀,但是因為下列原因時,也不得不引入集群。

[[355619]]

本文轉(zhuǎn)載自微信公眾號「程序員歷小冰」,作者歷小冰 。轉(zhuǎn)載本文請聯(lián)系程序員歷小冰公眾號。  

大家好,我是歷小冰,今天來講一下 Reids Cluster 的 Gossip 協(xié)議和集群操作,文章的思維導(dǎo)圖如下所示。

 

集群模式和 Gossip 簡介

對于數(shù)據(jù)存儲領(lǐng)域,當(dāng)數(shù)據(jù)量或者請求流量大到一定程度后,就必然會引入分布式。比如 Redis,雖然其單機性能十分優(yōu)秀,但是因為下列原因時,也不得不引入集群。

  • 單機無法保證高可用,需要引入多實例來提供高可用性
  • 單機能夠提供高達 8W 左右的QPS,再高的QPS則需要引入多實例
  • 單機能夠支持的數(shù)據(jù)量有限,處理更多的數(shù)據(jù)需要引入多實例;
  • 單機所處理的網(wǎng)絡(luò)流量已經(jīng)超過服務(wù)器的網(wǎng)卡的上限值,需要引入多實例來分流。

有集群,集群往往需要維護一定的元數(shù)據(jù),比如實例的ip地址,緩存分片的 slots 信息等,所以需要一套分布式機制來維護元數(shù)據(jù)的一致性。這類機制一般有兩個模式:分散式和集中式

分散式機制將元數(shù)據(jù)存儲在部分或者所有節(jié)點上,不同節(jié)點之間進行不斷的通信來維護元數(shù)據(jù)的變更和一致性。Redis Cluster,Consul 等都是該模式。

而集中式是將集群元數(shù)據(jù)集中存儲在外部節(jié)點或者中間件上,比如 zookeeper。舊版本的 kafka 和 storm 等都是使用該模式。

兩種模式各有優(yōu)劣,具體如下表所示:

 

模式 優(yōu)點 缺點
集中式 數(shù)據(jù)更新及時,時效好,元數(shù)據(jù)的更新和讀取,時效性非常好,一旦元數(shù)據(jù)出現(xiàn)了變更,立即就更新到集中式的外部節(jié)點中,其他節(jié)點讀取的時候立即就可以感知到; 較大數(shù)據(jù)更新壓力,更新壓力全部集中在外部節(jié)點,作為單點影響整個系統(tǒng)
分散式 數(shù)據(jù)更新壓力分散,元數(shù)據(jù)的更新比較分散,不是集中某一個節(jié)點,更新請求比較分散,而且有不同節(jié)點處理,有一定的延時,降低了并發(fā)壓力 數(shù)據(jù)更新延遲,可能導(dǎo)致集群的感知有一定的滯后

分散式數(shù)據(jù)更新壓力分散,元數(shù)據(jù)的更新比較分散,不是集中某一個節(jié)點,更新請求比較分散,而且有不同節(jié)點處理,有一定的延時,降低了并發(fā)壓力數(shù)據(jù)更新延遲,可能導(dǎo)致集群的感知有一定的滯后

分散式的元數(shù)據(jù)模式有多種可選的算法進行元數(shù)據(jù)的同步,比如說 Paxos、Raft 和 Gossip。Paxos 和 Raft 等都需要全部節(jié)點或者大多數(shù)節(jié)點(超過一半)正常運行,整個集群才能穩(wěn)定運行,而 Gossip 則不需要半數(shù)以上的節(jié)點運行。

Gossip 協(xié)議,顧名思義,就像流言蜚語一樣,利用一種隨機、帶有傳染性的方式,將信息傳播到整個網(wǎng)絡(luò)中,并在一定時間內(nèi),使得系統(tǒng)內(nèi)的所有節(jié)點數(shù)據(jù)一致。對你來說,掌握這個協(xié)議不僅能很好地理解這種最常用的,實現(xiàn)最終一致性的算法,也能在后續(xù)工作中得心應(yīng)手地實現(xiàn)數(shù)據(jù)的最終一致性。

Gossip 協(xié)議又稱 epidemic 協(xié)議(epidemic protocol),是基于流行病傳播方式的節(jié)點或者進程之間信息交換的協(xié)議,在P2P網(wǎng)絡(luò)和分布式系統(tǒng)中應(yīng)用廣泛,它的方法論也特別簡單:

在一個處于有界網(wǎng)絡(luò)的集群里,如果每個節(jié)點都隨機與其他節(jié)點交換特定信息,經(jīng)過足夠長的時間后,集群各個節(jié)點對該份信息的認(rèn)知終將收斂到一致。

這里的“特定信息”一般就是指集群狀態(tài)、各節(jié)點的狀態(tài)以及其他元數(shù)據(jù)等。Gossip協(xié)議是完全符合 BASE 原則,可以用在任何要求最終一致性的領(lǐng)域,比如分布式存儲和注冊中心。另外,它可以很方便地實現(xiàn)彈性集群,允許節(jié)點隨時上下線,提供快捷的失敗檢測和動態(tài)負(fù)載均衡等。

此外,Gossip 協(xié)議的最大的好處是,即使集群節(jié)點的數(shù)量增加,每個節(jié)點的負(fù)載也不會增加很多,幾乎是恒定的。這就允許 Redis Cluster 或者 Consul 集群管理的節(jié)點規(guī)模能橫向擴展到數(shù)千個。

Redis Cluster 的 Gossip 通信機制

Redis Cluster 是在 3.0 版本引入集群功能。為了讓讓集群中的每個實例都知道其他所有實例的狀態(tài)信息,Redis 集群規(guī)定各個實例之間按照 Gossip 協(xié)議來通信傳遞信息。

上圖展示了主從架構(gòu)的 Redis Cluster 示意圖,其中實線表示節(jié)點間的主從復(fù)制關(guān)系,而虛線表示各個節(jié)點之間的 Gossip 通信。

 

Redis Cluster 中的每個節(jié)點都維護一份自己視角下的當(dāng)前整個集群的狀態(tài),主要包括:

  • 當(dāng)前集群狀態(tài)
  • 集群中各節(jié)點所負(fù)責(zé)的 slots信息,及其migrate狀態(tài)
  • 集群中各節(jié)點的master-slave狀態(tài)
  • 集群中各節(jié)點的存活狀態(tài)及懷疑Fail狀態(tài)

也就是說上面的信息,就是集群中Node相互八卦傳播流言蜚語的內(nèi)容主題,而且比較全面,既有自己的更有別人的,這么一來大家都相互傳,最終信息就全面而且一致了。

Redis Cluster 的節(jié)點之間會相互發(fā)送多種消息,較為重要的如下所示:

  • MEET:通過「cluster meet ip port」命令,已有集群的節(jié)點會向新的節(jié)點發(fā)送邀請,加入現(xiàn)有集群,然后新節(jié)點就會開始與其他節(jié)點進行通信;
  • PING:節(jié)點按照配置的時間間隔向集群中其他節(jié)點發(fā)送 ping 消息,消息中帶有自己的狀態(tài),還有自己維護的集群元數(shù)據(jù),和部分其他節(jié)點的元數(shù)據(jù);
  • PONG: 節(jié)點用于回應(yīng) PING 和 MEET 的消息,結(jié)構(gòu)和 PING 消息類似,也包含自己的狀態(tài)和其他信息,也可以用于信息廣播和更新;
  • FAIL: 節(jié)點 PING 不通某節(jié)點后,會向集群所有節(jié)點廣播該節(jié)點掛掉的消息。其他節(jié)點收到消息后標(biāo)記已下線。

Redis 的源碼中 cluster.h 文件定義了全部的消息類型,代碼為 redis 4.0版本。

  1. // 注意,PING 、 PONG 和 MEET 實際上是同一種消息。 
  2. // PONG 是對 PING 的回復(fù),它的實際格式也為 PING 消息, 
  3. // 而 MEET 則是一種特殊的 PING 消息,用于強制消息的接收者將消息的發(fā)送者添加到集群中(如果節(jié)點尚未在節(jié)點列表中的話) 
  4. #define CLUSTERMSG_TYPE_PING 0          /* Ping 消息 */ 
  5. #define CLUSTERMSG_TYPE_PONG 1          /* Pong 用于回復(fù)Ping */ 
  6. #define CLUSTERMSG_TYPE_MEET 2          /* Meet 請求將某個節(jié)點添加到集群中 */ 
  7. #define CLUSTERMSG_TYPE_FAIL 3          /* Fail 將某個節(jié)點標(biāo)記為 FAIL */ 
  8. #define CLUSTERMSG_TYPE_PUBLISH 4       /* 通過發(fā)布與訂閱功能廣播消息 */ 
  9. #define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 /* 請求進行故障轉(zhuǎn)移操作,要求消息的接收者通過投票來支持消息的發(fā)送者 */ 
  10. #define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6     /* 消息的接收者同意向消息的發(fā)送者投票 */ 
  11. #define CLUSTERMSG_TYPE_UPDATE 7        /* slots 已經(jīng)發(fā)生變化,消息發(fā)送者要求消息接收者進行相應(yīng)的更新 */ 
  12. #define CLUSTERMSG_TYPE_MFSTART 8       /* 為了進行手動故障轉(zhuǎn)移,暫停各個客戶端 */ 
  13. #define CLUSTERMSG_TYPE_COUNT 9         /* 消息總數(shù) */ 

通過上述這些消息,集群中的每一個實例都能獲得其它所有實例的狀態(tài)信息。這樣一來,即使有新節(jié)點加入、節(jié)點故障、Slot 變更等事件發(fā)生,實例間也可以通過 PING、PONG 消息的傳遞,完成集群狀態(tài)在每個實例上的同步。下面,我們依次來看看幾種常見的場景。

定時 PING/PONG 消息

Redis Cluster 中的節(jié)點都會定時地向其他節(jié)點發(fā)送 PING 消息,來交換各個節(jié)點狀態(tài)信息,檢查各個節(jié)點狀態(tài),包括在線狀態(tài)、疑似下線狀態(tài) PFAIL 和已下線狀態(tài) FAIL。

Redis 集群的定時 PING/PONG 的工作原理可以概括成兩點:

  • 一是,每個實例之間會按照一定的頻率,從集群中隨機挑選一些實例,把 PING 消息發(fā)送給挑選出來的實例,用來檢測這些實例是否在線,并交換彼此的狀態(tài)信息。PING 消息中封裝了發(fā)送消息的實例自身的狀態(tài)信息、部分其它實例的狀態(tài)信息,以及 Slot 映射表。
  • 二是,一個實例在接收到 PING 消息后,會給發(fā)送 PING 消息的實例,發(fā)送一個 PONG 消息。PONG 消息包含的內(nèi)容和 PING 消息一樣。

下圖顯示了兩個實例間進行 PING、PONG 消息傳遞的情況,其中實例一為發(fā)送節(jié)點,實例二是接收節(jié)點

 

新節(jié)點上線

Redis Cluster 加入新節(jié)點時,客戶端需要執(zhí)行 CLUSTER MEET 命令,如下圖所示。

 

節(jié)點一在執(zhí)行 CLUSTER MEET 命令時會首先為新節(jié)點創(chuàng)建一個 clusterNode 數(shù)據(jù),并將其添加到自己維護的 clusterState 的 nodes 字典中。有關(guān) clusterState 和 clusterNode 關(guān)系,我們在最后一節(jié)會有詳盡的示意圖和源碼來講解。

然后節(jié)點一會根據(jù)據(jù) CLUSTER MEET 命令中的 IP 地址和端口號,向新節(jié)點發(fā)送一條 MEET 消息。新節(jié)點接收到節(jié)點一發(fā)送的MEET消息后,新節(jié)點也會為節(jié)點一創(chuàng)建一個 clusterNode 結(jié)構(gòu),并將該結(jié)構(gòu)添加到自己維護的 clusterState 的 nodes 字典中。

接著,新節(jié)點向節(jié)點一返回一條PONG消息。節(jié)點一接收到節(jié)點B返回的PONG消息后,得知新節(jié)點已經(jīng)成功的接收了自己發(fā)送的MEET消息。

最后,節(jié)點一還會向新節(jié)點發(fā)送一條 PING 消息。新節(jié)點接收到該條 PING 消息后,可以知道節(jié)點A已經(jīng)成功的接收到了自己返回的P ONG消息,從而完成了新節(jié)點接入的握手操作。

MEET 操作成功之后,節(jié)點一會通過稍早時講的定時 PING 機制將新節(jié)點的信息發(fā)送給集群中的其他節(jié)點,讓其他節(jié)點也與新節(jié)點進行握手,最終,經(jīng)過一段時間后,新節(jié)點會被集群中的所有節(jié)點認(rèn)識。

節(jié)點疑似下線和真正下線

Redis Cluster 中的節(jié)點會定期檢查已經(jīng)發(fā)送 PING 消息的接收方節(jié)點是否在規(guī)定時間 ( cluster-node-timeout ) 內(nèi)返回了 PONG 消息,如果沒有則會將其標(biāo)記為疑似下線狀態(tài),也就是 PFAIL 狀態(tài),如下圖所示。

 

然后,節(jié)點一會通過 PING 消息,將節(jié)點二處于疑似下線狀態(tài)的信息傳遞給其他節(jié)點,例如節(jié)點三。節(jié)點三接收到節(jié)點一的 PING 消息得知節(jié)點二進入 PFAIL 狀態(tài)后,會在自己維護的 clusterState 的 nodes 字典中找到節(jié)點二所對應(yīng)的 clusterNode 結(jié)構(gòu),并將主節(jié)點一的下線報告添加到 clusterNode 結(jié)構(gòu)的 fail_reports 鏈表中。

 

隨著時間的推移,如果節(jié)點十 (舉個例子) 也因為 PONG 超時而認(rèn)為節(jié)點二疑似下線了,并且發(fā)現(xiàn)自己維護的節(jié)點二的 clusterNode 的 fail_reports 中有半數(shù)以上的主節(jié)點數(shù)量的未過時的將節(jié)點二標(biāo)記為 PFAIL 狀態(tài)報告日志,那么節(jié)點十將會把節(jié)點二將被標(biāo)記為已下線 FAIL 狀態(tài),并且節(jié)點十會立刻向集群其他節(jié)點廣播主節(jié)點二已經(jīng)下線的 FAIL 消息,所有收到 FAIL 消息的節(jié)點都會立即將節(jié)點二狀態(tài)標(biāo)記為已下線。如下圖所示。

 

需要注意的是,報告疑似下線記錄是由時效性的,如果超過 cluster-node-timeout *2 的時間,這個報告就會被忽略掉,讓節(jié)點二又恢復(fù)成正常狀態(tài)。

Redis Cluster 通信源碼實現(xiàn)

綜上,我們了解了 Redis Cluster 在定時 PING/PONG、新節(jié)點上線、節(jié)點疑似下線和真正下線等環(huán)節(jié)的原理和操作流程,下面我們來真正看一下 Redis 在這些環(huán)節(jié)的源碼實現(xiàn)和具體操作。

涉及的數(shù)據(jù)結(jié)構(gòu)體

首先,我們先來講解一下其中涉及的數(shù)據(jù)結(jié)構(gòu),也就是上文提到的 ClusterNode 等結(jié)構(gòu)。

每個節(jié)點都會維護一個 clusterState 結(jié)構(gòu),表示當(dāng)前集群的整體狀態(tài),它的定義如下所示。

  1. typedef struct clusterState { 
  2.    clusterNode *myself;  /* 當(dāng)前節(jié)點的clusterNode信息 */ 
  3.    .... 
  4.    dict *nodes;          /* name到clusterNode的字典 */ 
  5.    .... 
  6.    clusterNode *slots[CLUSTER_SLOTS]; /* slot 和節(jié)點的對應(yīng)關(guān)系*/ 
  7.    .... 
  8. } clusterState; 

它有三個比較關(guān)鍵的字段,具體示意圖如下所示:

  • myself 字段,是一個 clusterNode 結(jié)構(gòu),用來記錄自己的狀態(tài);
  • nodes 字典,記錄一個 name 到 clusterNode 結(jié)構(gòu)的映射,以此來記錄其他節(jié)點的狀態(tài);
  • slot 數(shù)組,記錄slot 對應(yīng)的節(jié)點 clusterNode結(jié)構(gòu)。

 

clusterNode 結(jié)構(gòu)保存了一個節(jié)點的當(dāng)前狀態(tài),比如節(jié)點的創(chuàng)建時間、節(jié)點的名字、節(jié)點 當(dāng)前的配置紀(jì)元、節(jié)點的IP地址和端口號等等。除此之外,clusterNode結(jié)構(gòu)的 link 屬性是一個clusterLink結(jié)構(gòu),該結(jié)構(gòu)保存了連接節(jié)點所需的有關(guān)信息**,比如**套接字描述符,輸入緩沖區(qū)和輸出緩沖區(qū)。clusterNode 還有一個 fail_report 的列表,用來記錄疑似下線報告。具體定義如下所示。

  1. typedef struct clusterNode { 
  2.     mstime_t ctime; /* 創(chuàng)建節(jié)點的時間 */ 
  3.     char name[CLUSTER_NAMELEN]; /* 節(jié)點的名字 */ 
  4.     int flags;      /* 節(jié)點標(biāo)識,標(biāo)記節(jié)點角色或者狀態(tài),比如主節(jié)點從節(jié)點或者在線和下線 */ 
  5.     uint64_t configEpoch; /* 當(dāng)前節(jié)點已知的集群統(tǒng)一epoch */ 
  6.     unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */ 
  7.     int numslots;   /* Number of slots handled by this node */ 
  8.     int numslaves;  /* Number of slave nodes, if this is a master */ 
  9.     struct clusterNode **slaves; /* pointers to slave nodes */ 
  10.     struct clusterNode *slaveof; /* pointer to the master node. Note that it 
  11.                                     may be NULL even if the node is a slave 
  12.                                     if we don't have the master node in our 
  13.                                     tables. */ 
  14.     mstime_t ping_sent;      /* 當(dāng)前節(jié)點最后一次向該節(jié)點發(fā)送 PING 消息的時間 */ 
  15.     mstime_t pong_received;  /* 當(dāng)前節(jié)點最后一次收到該節(jié)點 PONG 消息的時間 */ 
  16.     mstime_t fail_time;      /* FAIL 標(biāo)志位被設(shè)置的時間 */ 
  17.     mstime_t voted_time;     /* Last time we voted for a slave of this master */ 
  18.     mstime_t repl_offset_time;  /* Unix time we received offset for this node */ 
  19.     mstime_t orphaned_time;     /* Starting time of orphaned master condition */ 
  20.     long long repl_offset;      /* 當(dāng)前節(jié)點的repl便宜 */ 
  21.     char ip[NET_IP_STR_LEN];  /* 節(jié)點的IP 地址 */ 
  22.     int port;                   /* 端口 */ 
  23.     int cport;                  /* 通信端口,一般是端口+1000 */ 
  24.     clusterLink *link;          /* 和該節(jié)點的 tcp 連接 */ 
  25.     list *fail_reports;         /* 下線記錄列表 */ 
  26. } clusterNode; 

clusterNodeFailReport 是記錄節(jié)點下線報告的結(jié)構(gòu)體, node 是報告節(jié)點的信息,而 time 則代表著報告時間。

  1. typedef struct clusterNodeFailReport { 
  2.     struct clusterNode *node;  /* 報告當(dāng)前節(jié)點已經(jīng)下線的節(jié)點 */ 
  3.     mstime_t time;             /* 報告時間 */ 
  4. } clusterNodeFailReport; 

消息結(jié)構(gòu)體

了解了 Reids 節(jié)點維護的數(shù)據(jù)結(jié)構(gòu)體后,我們再來看節(jié)點進行通信的消息結(jié)構(gòu)體。通信消息最外側(cè)的結(jié)構(gòu)體為 clusterMsg,它包括了很多消息記錄信息,包括 RCmb 標(biāo)志位,消息總長度,消息協(xié)議版本,消息類型;它還包括了發(fā)送該消息節(jié)點的記錄信息,比如節(jié)點名稱,節(jié)點負(fù)責(zé)的slot信息,節(jié)點ip和端口等;最后它包含了一個 clusterMsgData 來攜帶具體類型的消息。

  1. typedef struct { 
  2.     char sig[4];        /* 標(biāo)志位,"RCmb" (Redis Cluster message bus). */ 
  3.     uint32_t totlen;    /* 消息總長度 */ 
  4.     uint16_t ver;       /* 消息協(xié)議版本 */ 
  5.     uint16_t port;      /* 端口 */ 
  6.     uint16_t type;      /* 消息類型 */ 
  7.     uint16_t count;     /*  */ 
  8.     uint64_t currentEpoch;  /* 表示本節(jié)點當(dāng)前記錄的整個集群的統(tǒng)一的epoch,用來決策選舉投票等,與configEpoch不同的是:configEpoch表示的是master節(jié)點的唯一標(biāo)志,currentEpoch是集群的唯一標(biāo)志。 */ 
  9.     uint64_t configEpoch;   /* 每個master節(jié)點都有一個唯一的configEpoch做標(biāo)志,如果和其他master節(jié)點沖突,會強制自增使本節(jié)點在集群中唯一 */ 
  10.     uint64_t offset;    /* 主從復(fù)制偏移相關(guān)信息,主節(jié)點和從節(jié)點含義不同 */ 
  11.     char sender[CLUSTER_NAMELEN]; /* 發(fā)送節(jié)點的名稱 */ 
  12.     unsigned char myslots[CLUSTER_SLOTS/8]; /* 本節(jié)點負(fù)責(zé)的slots信息,16384/8個char數(shù)組,一共為16384bit */ 
  13.     char slaveof[CLUSTER_NAMELEN]; /* master信息,假如本節(jié)點是slave節(jié)點的話,協(xié)議帶有master信息 */ 
  14.     char myip[NET_IP_STR_LEN];    /* IP */ 
  15.     char notused1[34];  /* 保留字段 */ 
  16.     uint16_t cport;      /* 集群的通信端口 */ 
  17.     uint16_t flags;      /* 本節(jié)點當(dāng)前的狀態(tài),比如 CLUSTER_NODE_HANDSHAKE、CLUSTER_NODE_MEET */ 
  18.     unsigned char state; /* Cluster state from the POV of the sender */ 
  19.     unsigned char mflags[3]; /* 本條消息的類型,目前只有兩類:CLUSTERMSG_FLAG0_PAUSED、CLUSTERMSG_FLAG0_FORCEACK */ 
  20.     union clusterMsgData data; 
  21. } clusterMsg; 

clusterMsgData 是一個 union 結(jié)構(gòu)體,它可以為 PING,MEET,PONG 或者 FAIL 等消息體。其中當(dāng)消息為 PING、MEET 和 PONG 類型時,ping 字段是被賦值的,而是 FAIL 類型時,fail 字段是被賦值的。

  1. // 注意這是 union 關(guān)鍵字 
  2. union clusterMsgData { 
  3.     /* PING, MEET 或者 PONG 消息時,ping 字段被賦值 */ 
  4.     struct { 
  5.         /* Array of N clusterMsgDataGossip structures */ 
  6.         clusterMsgDataGossip gossip[1]; 
  7.     } ping; 
  8.     /*  FAIL 消息時,fail 被賦值 */ 
  9.     struct { 
  10.         clusterMsgDataFail about; 
  11.     } fail; 
  12.     // .... 省略 publish 和 update 消息的字段 
  13. }; 

clusterMsgDataGossip 是 PING、PONG 和 MEET 消息的結(jié)構(gòu)體,它會包括發(fā)送消息節(jié)點維護的其他節(jié)點信息,也就是上文中 clusterState 中 nodes 字段包含的信息,具體代碼如下所示,你也會發(fā)現(xiàn)二者的字段是類似的。

  1. typedef struct { 
  2.  /* 節(jié)點的名字,默認(rèn)是隨機的,MEET消息發(fā)送并得到回復(fù)后,集群會為該節(jié)點設(shè)置正式的名稱*/ 
  3.     char nodename[CLUSTER_NAMELEN];  
  4.     uint32_t ping_sent; /* 發(fā)送節(jié)點最后一次給接收節(jié)點發(fā)送 PING 消息的時間戳,收到對應(yīng) PONG 回復(fù)后會被賦值為0 */ 
  5.     uint32_t pong_received; /* 發(fā)送節(jié)點最后一次收到接收節(jié)點發(fā)送 PONG 消息的時間戳 */ 
  6.     char ip[NET_IP_STR_LEN];  /* IP address last time it was seen */ 
  7.     uint16_t port;       /* IP*/        
  8.     uint16_t cport;      /* 端口*/   
  9.     uint16_t flags;      /* 標(biāo)識*/  
  10.     uint32_t notused1;   /* 對齊字符*/ 
  11. } clusterMsgDataGossip; 
  12.  
  13. typedef struct { 
  14.     char nodename[CLUSTER_NAMELEN]; /* 下線節(jié)點的名字 */ 
  15. } clusterMsgDataFail; 

看完了節(jié)點維護的數(shù)據(jù)結(jié)構(gòu)體和發(fā)送的消息結(jié)構(gòu)體后,我們就來看看 Redis 的具體行為源碼了。

隨機周期性發(fā)送PING消息

Redis 的 clusterCron 函數(shù)會被定時調(diào)用,每被執(zhí)行10次,就會準(zhǔn)備向隨機的一個節(jié)點發(fā)送 PING 消息。

它會先隨機的選出 5 個節(jié)點,然后從中選擇最久沒有與之通信的節(jié)點,調(diào)用 clusterSendPing 函數(shù)發(fā)送類型為 CLUSTERMSG_TYPE_PING 的消息

  1. // cluster.c 文件  
  2. // clusterCron() 每執(zhí)行 10 次(至少間隔一秒鐘),就向一個隨機節(jié)點發(fā)送 gossip 信息 
  3. if (!(iteration % 10)) { 
  4.     int j; 
  5.  
  6.     /* 隨機 5 個節(jié)點,選出其中一個 */ 
  7.     for (j = 0; j < 5; j++) { 
  8.         de = dictGetRandomKey(server.cluster->nodes); 
  9.         clusterNode *this = dictGetVal(de); 
  10.  
  11.         /* 不要 PING 連接斷開的節(jié)點,也不要 PING 最近已經(jīng) PING 過的節(jié)點 */ 
  12.         if (this->link == NULL || this->ping_sent != 0) continue
  13.         if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE)) 
  14.             continue
  15.         /* 對比 pong_received 字段,選出更長時間未收到其 PONG 消息的節(jié)點(表示好久沒有接受到該節(jié)點的PONG消息了) */ 
  16.         if (min_pong_node == NULL || min_pong > this->pong_received) { 
  17.             min_pong_node = this; 
  18.             min_pong = this->pong_received; 
  19.         } 
  20.     } 
  21.     /* 向最久沒有收到 PONG 回復(fù)的節(jié)點發(fā)送 PING 命令 */ 
  22.     if (min_pong_node) { 
  23.         serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name); 
  24.         clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING); 
  25.     } 

clusterSendPing 函數(shù)的具體行為我們后續(xù)再了解,因為該函數(shù)在其他環(huán)節(jié)也會經(jīng)常用到

節(jié)點加入集群

當(dāng)節(jié)點執(zhí)行 CLUSTER MEET 命令后,會在自身給新節(jié)點維護一個 clusterNode 結(jié)構(gòu)體,該結(jié)構(gòu)體的 link 也就是TCP連接字段是 null,表示是新節(jié)點尚未建立連接。

clusterCron 函數(shù)中也會處理這些未建立連接的新節(jié)點,調(diào)用 createClusterLink 創(chuàng)立連接,然后調(diào)用 clusterSendPing 函數(shù)來發(fā)送 MEET 消息

  1. /* cluster.c clusterCron 函數(shù)部分,為未創(chuàng)建連接的節(jié)點創(chuàng)建連接 */ 
  2. if (node->link == NULL) { 
  3.     int fd; 
  4.     mstime_t old_ping_sent; 
  5.     clusterLink *link; 
  6.     /* 和該節(jié)點建立連接 */ 
  7.     fd = anetTcpNonBlockBindConnect(server.neterr, node->ip, 
  8.         node->cport, NET_FIRST_BIND_ADDR); 
  9.     /* .... fd 為-1時的異常處理 */ 
  10.     /* 建立 link */ 
  11.     link = createClusterLink(node); 
  12.     link->fd = fd; 
  13.     node->link = link; 
  14.     aeCreateFileEvent(server.el,link->fd,AE_READABLE, 
  15.             clusterReadHandler,link); 
  16.     /* 向新連接的節(jié)點發(fā)送 PING 命令,防止節(jié)點被識進入下線 */ 
  17.     /* 如果節(jié)點被標(biāo)記為 MEET ,那么發(fā)送 MEET 命令,否則發(fā)送 PING 命令 */ 
  18.     old_ping_sent = node->ping_sent; 
  19.     clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ? 
  20.             CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING); 
  21.     /* .... */ 
  22.     /* 如果當(dāng)前節(jié)點(發(fā)送者)沒能收到 MEET 信息的回復(fù),那么它將不再向目標(biāo)節(jié)點發(fā)送命令。*/ 
  23.     /* 如果接收到回復(fù)的話,那么節(jié)點將不再處于 HANDSHAKE 狀態(tài),并繼續(xù)向目標(biāo)節(jié)點發(fā)送普通 PING 命令*/ 
  24.     node->flags &= ~CLUSTER_NODE_MEET; 

防止節(jié)點假超時及狀態(tài)過期

防止節(jié)點假超時和標(biāo)記疑似下線標(biāo)記也是在 clusterCron 函數(shù)中,具體如下所示。它會檢查當(dāng)前所有的 nodes 節(jié)點列表,如果發(fā)現(xiàn)某個節(jié)點與自己的最后一個 PONG 通信時間超過了預(yù)定的閾值的一半時,為了防止節(jié)點是假超時,會主動釋放掉與之的 link 連接,然后會主動向它發(fā)送一個 PING 消息。

  1. /* cluster.c clusterCron 函數(shù)部分,遍歷節(jié)點來檢查 fail 的節(jié)點*/ 
  2. while((de = dictNext(di)) != NULL) { 
  3.     clusterNode *node = dictGetVal(de); 
  4.     now = mstime(); /* Use an updated time at every iteration. */ 
  5.     mstime_t delay; 
  6.  
  7.     /* 如果等到 PONG 到達的時間超過了 node timeout 一半的連接 */ 
  8.     /* 因為盡管節(jié)點依然正常,但連接可能已經(jīng)出問題了 */ 
  9.     if (node->link && /* is connected */ 
  10.         now - node->link->ctime > 
  11.         server.cluster_node_timeout && /* 還未重連 */ 
  12.         node->ping_sent && /* 已經(jīng)發(fā)過ping消息 */ 
  13.         node->pong_received < node->ping_sent && /* 還在等待pong消息 */ 
  14.         /* 等待pong消息超過了 timeout/2 */ 
  15.         now - node->ping_sent > server.cluster_node_timeout/2) 
  16.     { 
  17.         /* 釋放連接,下次 clusterCron() 會自動重連 */ 
  18.         freeClusterLink(node->link); 
  19.     } 
  20.  
  21.     /* 如果目前沒有在 PING 節(jié)點*/ 
  22.     /* 并且已經(jīng)有 node timeout 一半的時間沒有從節(jié)點那里收到 PONG 回復(fù) */ 
  23.     /* 那么向節(jié)點發(fā)送一個 PING ,確保節(jié)點的信息不會太舊,有可能一直沒有隨機中 */ 
  24.     if (node->link && 
  25.         node->ping_sent == 0 && 
  26.         (now - node->pong_received) > server.cluster_node_timeout/2) 
  27.     { 
  28.         clusterSendPing(node->link, CLUSTERMSG_TYPE_PING); 
  29.         continue
  30.     } 
  31.     /* .... 處理failover和標(biāo)記遺失下線 */ 

處理failover和標(biāo)記疑似下線

如果防止節(jié)點假超時處理后,節(jié)點依舊未收到目標(biāo)節(jié)點的 PONG 消息,并且時間已經(jīng)超過了 cluster_node_timeout,那么就將該節(jié)點標(biāo)記為疑似下線狀態(tài)。

  1. /* 如果這是一個主節(jié)點,并且有一個從服務(wù)器請求進行手動故障轉(zhuǎn)移,那么向從服務(wù)器發(fā)送 PING*/ 
  2. if (server.cluster->mf_end && 
  3.     nodeIsMaster(myself) && 
  4.     server.cluster->mf_slave == node && 
  5.     node->link) 
  6.     clusterSendPing(node->link, CLUSTERMSG_TYPE_PING); 
  7.     continue
  8.  
  9. /* 后續(xù)代碼只在節(jié)點發(fā)送了 PING 命令的情況下執(zhí)行*/ 
  10. if (node->ping_sent == 0) continue
  11.  
  12. /* 計算等待 PONG 回復(fù)的時長 */  
  13. delay = now - node->ping_sent; 
  14. /* 等待 PONG 回復(fù)的時長超過了限制值,將目標(biāo)節(jié)點標(biāo)記為 PFAIL (疑似下線)*/ 
  15. if (delay > server.cluster_node_timeout) { 
  16.     /* 超時了,標(biāo)記為疑似下線 */ 
  17.     if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) { 
  18.         redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing"
  19.             node->name); 
  20.         // 打開疑似下線標(biāo)記 
  21.         node->flags |= REDIS_NODE_PFAIL; 
  22.         update_state = 1; 
  23.     } 

實際發(fā)送Gossip消息

以下是前方多次調(diào)用過的clusterSendPing()方法的源碼,代碼中有詳細的注釋,大家可以自行閱讀。主要的操作就是將節(jié)點自身維護的 clusterState 轉(zhuǎn)換為對應(yīng)的消息結(jié)構(gòu)體,。

  1. /* 向指定節(jié)點發(fā)送一條 MEET 、 PING 或者 PONG 消息 */ 
  2. void clusterSendPing(clusterLink *link, int type) { 
  3.     unsigned char *buf; 
  4.     clusterMsg *hdr; 
  5.     int gossipcount = 0; /* Number of gossip sections added so far. */ 
  6.     int wanted; /* Number of gossip sections we want to append if possible. */ 
  7.     int totlen; /* Total packet length. */ 
  8.     // freshnodes 是用于發(fā)送 gossip 信息的計數(shù)器 
  9.     // 每次發(fā)送一條信息時,程序?qū)?nbsp;freshnodes 的值減一 
  10.     // 當(dāng) freshnodes 的數(shù)值小于等于 0 時,程序停止發(fā)送 gossip 信息 
  11.     // freshnodes 的數(shù)量是節(jié)點目前的 nodes 表中的節(jié)點數(shù)量減去 2  
  12.     // 這里的 2 指兩個節(jié)點,一個是 myself 節(jié)點(也即是發(fā)送信息的這個節(jié)點) 
  13.     // 另一個是接受 gossip 信息的節(jié)點 
  14.     int freshnodes = dictSize(server.cluster->nodes)-2; 
  15.  
  16.      
  17.     /* 計算要攜帶多少節(jié)點的信息,最少3個,最多 1/10 集群總節(jié)點數(shù)量*/ 
  18.     wanted = floor(dictSize(server.cluster->nodes)/10); 
  19.     if (wanted < 3) wanted = 3; 
  20.     if (wanted > freshnodes) wanted = freshnodes; 
  21.  
  22.     /* .... 省略 totlen 的計算等*/ 
  23.  
  24.     /* 如果發(fā)送的信息是 PING ,那么更新最后一次發(fā)送 PING 命令的時間戳 */ 
  25.     if (link->node && type == CLUSTERMSG_TYPE_PING) 
  26.         link->node->ping_sent = mstime(); 
  27.     /* 將當(dāng)前節(jié)點的信息(比如名字、地址、端口號、負(fù)責(zé)處理的槽)記錄到消息里面 */ 
  28.     clusterBuildMessageHdr(hdr,type); 
  29.  
  30.     /* Populate the gossip fields */ 
  31.     int maxiterations = wanted*3; 
  32.     /* 每個節(jié)點有 freshnodes 次發(fā)送 gossip 信息的機會 
  33.        每次向目標(biāo)節(jié)點發(fā)送 2 個被選中節(jié)點的 gossip 信息(gossipcount 計數(shù)) */ 
  34.     while(freshnodes > 0 && gossipcount < wanted && maxiterations--) { 
  35.         /* 從 nodes 字典中隨機選出一個節(jié)點(被選中節(jié)點) */ 
  36.         dictEntry *de = dictGetRandomKey(server.cluster->nodes); 
  37.         clusterNode *this = dictGetVal(de); 
  38.  
  39.         /* 以下節(jié)點不能作為被選中節(jié)點: 
  40.          * Myself:節(jié)點本身。 
  41.          * PFAIL狀態(tài)的節(jié)點 
  42.          * 處于 HANDSHAKE 狀態(tài)的節(jié)點。 
  43.          * 帶有 NOADDR 標(biāo)識的節(jié)點 
  44.          * 因為不處理任何 Slot 而被斷開連接的節(jié)點  
  45.          */ 
  46.         if (this == myself) continue
  47.         if (this->flags & CLUSTER_NODE_PFAIL) continue
  48.         if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) || 
  49.             (this->link == NULL && this->numslots == 0)) 
  50.         { 
  51.             freshnodes--; /* Tecnically not correct, but saves CPU. */ 
  52.             continue
  53.         } 
  54.  
  55.         // 檢查被選中節(jié)點是否已經(jīng)在 hdr->data.ping.gossip 數(shù)組里面 
  56.         // 如果是的話說明這個節(jié)點之前已經(jīng)被選中了 
  57.         // 不要再選中它(否則就會出現(xiàn)重復(fù)) 
  58.         if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue
  59.  
  60.         /* 這個被選中節(jié)點有效,計數(shù)器減一 */ 
  61.         clusterSetGossipEntry(hdr,gossipcount,this); 
  62.         freshnodes--; 
  63.         gossipcount++; 
  64.     } 
  65.  
  66.     /* .... 如果有 PFAIL 節(jié)點,最后添加 */ 
  67.  
  68.  
  69.     /* 計算信息長度 */ 
  70.     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); 
  71.     totlen += (sizeof(clusterMsgDataGossip)*gossipcount); 
  72.     /* 將被選中節(jié)點的數(shù)量(gossip 信息中包含了多少個節(jié)點的信息)記錄在 count 屬性里面*/ 
  73.     hdr->count = htons(gossipcount); 
  74.     /* 將信息的長度記錄到信息里面 */ 
  75.     hdr->totlen = htonl(totlen); 
  76.     /* 發(fā)送網(wǎng)絡(luò)請求 */ 
  77.     clusterSendMessage(link,buf,totlen); 
  78.     zfree(buf); 
  79.  
  80.  
  81. void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) { 
  82.     clusterMsgDataGossip *gossip; 
  83.     /* 指向 gossip 信息結(jié)構(gòu) */ 
  84.     gossip = &(hdr->data.ping.gossip[i]); 
  85.     /* 將被選中節(jié)點的名字記錄到 gossip 信息 */    
  86.     memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN); 
  87.     /* 將被選中節(jié)點的 PING 命令發(fā)送時間戳記錄到 gossip 信息 */ 
  88.     gossip->ping_sent = htonl(n->ping_sent/1000); 
  89.     /* 將被選中節(jié)點的 PONG 命令回復(fù)的時間戳記錄到 gossip 信息 */ 
  90.     gossip->pong_received = htonl(n->pong_received/1000); 
  91.     /* 將被選中節(jié)點的 IP 記錄到 gossip 信息 */ 
  92.     memcpy(gossip->ip,n->ip,sizeof(n->ip)); 
  93.     /* 將被選中節(jié)點的端口號記錄到 gossip 信息 */ 
  94.     gossip->port = htons(n->port); 
  95.     gossip->cport = htons(n->cport); 
  96.     /* 將被選中節(jié)點的標(biāo)識值記錄到 gossip 信息 */ 
  97.     gossip->flags = htons(n->flags); 
  98.     gossip->notused1 = 0; 

下面是 clusterBuildMessageHdr 函數(shù),它主要負(fù)責(zé)填充消息結(jié)構(gòu)體中的基礎(chǔ)信息和當(dāng)前節(jié)點的狀態(tài)信息。

  1. /* 構(gòu)建消息的 header */ 
  2. void clusterBuildMessageHdr(clusterMsg *hdr, int type) { 
  3.     int totlen = 0; 
  4.     uint64_t offset; 
  5.     clusterNode *master; 
  6.  
  7.     /* 如果當(dāng)前節(jié)點是salve,則master為其主節(jié)點,如果當(dāng)前節(jié)點是master節(jié)點,則master就是當(dāng)前節(jié)點 */ 
  8.     master = (nodeIsSlave(myself) && myself->slaveof) ? 
  9.               myself->slaveof : myself; 
  10.  
  11.     memset(hdr,0,sizeof(*hdr)); 
  12.     /* 初始化協(xié)議版本、標(biāo)識、及類型, */ 
  13.     hdr->ver = htons(CLUSTER_PROTO_VER); 
  14.     hdr->sig[0] = 'R'
  15.     hdr->sig[1] = 'C'
  16.     hdr->sig[2] = 'm'
  17.     hdr->sig[3] = 'b'
  18.     hdr->type = htons(type); 
  19.     /* 消息頭設(shè)置當(dāng)前節(jié)點id */ 
  20.     memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN); 
  21.  
  22.     /* 消息頭設(shè)置當(dāng)前節(jié)點ip */ 
  23.     memset(hdr->myip,0,NET_IP_STR_LEN); 
  24.     if (server.cluster_announce_ip) { 
  25.         strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN); 
  26.         hdr->myip[NET_IP_STR_LEN-1] = '\0'
  27.     } 
  28.  
  29.     /* 基礎(chǔ)端口及集群內(nèi)節(jié)點通信端口 */ 
  30.     int announced_port = server.cluster_announce_port ? 
  31.                          server.cluster_announce_port : server.port; 
  32.     int announced_cport = server.cluster_announce_bus_port ? 
  33.                           server.cluster_announce_bus_port : 
  34.                           (server.port + CLUSTER_PORT_INCR); 
  35.     /* 設(shè)置當(dāng)前節(jié)點的槽信息 */ 
  36.     memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots)); 
  37.     memset(hdr->slaveof,0,CLUSTER_NAMELEN); 
  38.     if (myself->slaveof != NULL
  39.         memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN); 
  40.     hdr->port = htons(announced_port); 
  41.     hdr->cport = htons(announced_cport); 
  42.     hdr->flags = htons(myself->flags); 
  43.     hdr->state = server.cluster->state; 
  44.  
  45.     /* 設(shè)置 currentEpoch and configEpochs. */ 
  46.     hdr->currentEpoch = htonu64(server.cluster->currentEpoch); 
  47.     hdr->configEpoch = htonu64(master->configEpoch); 
  48.  
  49.     /* 設(shè)置復(fù)制偏移量 */ 
  50.     if (nodeIsSlave(myself)) 
  51.         offset = replicationGetSlaveOffset(); 
  52.     else 
  53.         offset = server.master_repl_offset; 
  54.     hdr->offset = htonu64(offset); 
  55.  
  56.     /* Set the message flags. */ 
  57.     if (nodeIsMaster(myself) && server.cluster->mf_end) 
  58.         hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED; 
  59.  
  60.     /* 計算并設(shè)置消息的總長度 */ 
  61.     if (type == CLUSTERMSG_TYPE_FAIL) { 
  62.         totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); 
  63.         totlen += sizeof(clusterMsgDataFail); 
  64.     } else if (type == CLUSTERMSG_TYPE_UPDATE) { 
  65.         totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); 
  66.         totlen += sizeof(clusterMsgDataUpdate); 
  67.     } 
  68.     hdr->totlen = htonl(totlen); 

 

后記

本來只想寫一下 Redis Cluster 的 Gossip 協(xié)議,沒想到文章越寫,內(nèi)容越多,最后源碼分析也是有點虎頭蛇尾,大家就湊合看一下,也希望大家繼續(xù)關(guān)注我后續(xù)的問題。

 

責(zé)任編輯:武曉燕 來源: 程序員歷小冰
相關(guān)推薦

2019-12-05 10:54:34

集群RedisGossip

2020-03-30 20:14:53

ActiveMQ設(shè)計實踐

2023-04-06 08:52:54

Sentinel分布式系統(tǒng)

2023-10-31 12:58:00

TypeScriptJavaScript

2021-03-16 08:21:29

Spark系統(tǒng)并行

2024-08-13 15:07:20

2021-01-19 05:49:44

DNS協(xié)議

2022-09-06 08:02:40

死鎖順序鎖輪詢鎖

2021-11-11 09:27:02

技術(shù)RedisMySQL

2024-09-26 13:33:12

2024-12-31 00:00:01

驅(qū)動設(shè)計應(yīng)用場景業(yè)務(wù)邏輯

2024-08-30 10:29:21

2021-03-18 10:04:46

數(shù)據(jù)倉庫體系

2022-07-11 10:08:34

大數(shù)據(jù)平臺機房

2023-10-26 00:37:40

滴滴彈性云公有云

2020-11-05 08:14:17

鏈表

2023-02-16 18:22:44

ChatGPTWolfram語言

2020-08-31 14:30:47

Redis數(shù)據(jù)結(jié)構(gòu)數(shù)據(jù)庫

2020-04-16 14:40:02

MySQL數(shù)據(jù)庫架構(gòu)

2010-05-17 13:39:10

MySQL Clust
點贊
收藏

51CTO技術(shù)棧公眾號