分鐘搞定分布式選舉 Bully 算法
分布式系統(tǒng)通常需要在一組節(jié)點(diǎn)中選出一個(gè)領(lǐng)導(dǎo)者,以確保有效協(xié)調(diào)并做出決策,Bully 算法就是在分布式系統(tǒng)中選舉領(lǐng)導(dǎo)者的一種算法。本文將用 Go 實(shí)現(xiàn) Bully 算法,以了解集群節(jié)點(diǎn)如何選舉領(lǐng)導(dǎo)者。
一、Bully 算法簡(jiǎn)介
Bully 算法是一種簡(jiǎn)單有效的分布式系統(tǒng)選舉算法,其工作原理如下:
- 節(jié)點(diǎn)層次結(jié)構(gòu):系統(tǒng)中的每個(gè)節(jié)點(diǎn)都有一個(gè)獨(dú)一無(wú)二的 ID,節(jié)點(diǎn)之間可以互相知道對(duì)方的 ID。
- 領(lǐng)導(dǎo)者探測(cè):如果節(jié)點(diǎn)探測(cè)到當(dāng)前領(lǐng)導(dǎo)者沒(méi)有響應(yīng)(失?。蜁?huì)啟動(dòng)選舉流程。
- 選舉:發(fā)起選舉的節(jié)點(diǎn)("bully")向所有 ID 更高的節(jié)點(diǎn)發(fā)送選舉信息。如果沒(méi)有 ID 更高的節(jié)點(diǎn)響應(yīng),則"bully"獲勝,成為新的領(lǐng)導(dǎo)者。
- 協(xié)調(diào)者:領(lǐng)導(dǎo)者是系統(tǒng)的協(xié)調(diào)者,負(fù)責(zé)決策和管理分布式任務(wù)。
二、過(guò)程概述
Bully 算法[2]的基本思想是排序(rank),假定每個(gè)節(jié)點(diǎn)在集群中都有序號(hào),而領(lǐng)導(dǎo)者必須是序號(hào)最高的。因此,在選舉時(shí)需要使用節(jié)點(diǎn)的排序值。
選舉有兩種情況:
- 系統(tǒng)剛初始化,還沒(méi)有領(lǐng)導(dǎo)者。
- 某個(gè)節(jié)點(diǎn)發(fā)現(xiàn)領(lǐng)導(dǎo)者宕機(jī)了。
選舉方式如下:
- 節(jié)點(diǎn)向其他比自己排序高的節(jié)點(diǎn)發(fā)送 ELECTION 消息。
- 節(jié)點(diǎn)等待 ALIVE 響應(yīng):如果沒(méi)有更高排序的節(jié)點(diǎn)回應(yīng),自己就成為領(lǐng)導(dǎo)者;否則,排序最高節(jié)點(diǎn)成為新領(lǐng)導(dǎo)者。
下面來(lái)詳細(xì)說(shuō)明一下:
假設(shè)節(jié)點(diǎn)排序?yàn)椋簄ode4 > node3 > node2 > node1
由于 node4 在該集群中排序最高,它沒(méi)有收到任何來(lái)自更高排序的節(jié)點(diǎn)的 ALIVE 消息。因此,node4 決定成為領(lǐng)導(dǎo)者,并發(fā)送了一條 ELECTED 消息,向其他節(jié)點(diǎn)通報(bào)選舉結(jié)果。
三、領(lǐng)導(dǎo)者失效
其他節(jié)點(diǎn)定期發(fā)送 PING 消息,探測(cè)領(lǐng)導(dǎo)者狀態(tài),并等待領(lǐng)導(dǎo)者的 PONG 響應(yīng)。
如果領(lǐng)導(dǎo)者宕機(jī),而第一個(gè)節(jié)點(diǎn)沒(méi)有收到 PONG 消息,那么該節(jié)點(diǎn)就會(huì)重新開(kāi)始選舉過(guò)程。
四、在 Go 中實(shí)現(xiàn) Bully 算法
1. Node.go
var nodeAddressByID = map[string]string{
"node-01": "node-01:6001",
"node-02": "node-02:6002",
"node-03": "node-03:6003",
"node-04": "node-04:6004",
}
type Node struct {
ID string
Addr string
Peers *Peers
eventBus event.Bus
}
為簡(jiǎn)單起見(jiàn),所有節(jié)點(diǎn)都是硬編碼。
該文件定義了 Node 結(jié)構(gòu),代表分布式系統(tǒng)中的一個(gè)節(jié)點(diǎn)。節(jié)點(diǎn)有 ID、網(wǎng)絡(luò)地址(Addr)、已知對(duì)端(Peers)列表和用于通信的事件總線(eventBus)。
- nodeAddressByID:該映射保存了群集中所有節(jié)點(diǎn)的地址。每個(gè)節(jié)點(diǎn)都有一個(gè)映射到其網(wǎng)絡(luò)地址的唯一 ID。
func NewNode(nodeID string) *Node {
node := &Node{
ID: nodeID,
Addr: nodeAddressByID[nodeID],
Peers: NewPeers(),
eventBus: event.NewBus(),
}
node.eventBus.Subscribe(event.LeaderElected, node.PingLeaderContinuously)
return node
}
- NewNode(nodeID string):基于給定 ID 創(chuàng)建新節(jié)點(diǎn),并初始化其地址、對(duì)端集合以及事件總線。
- eventBus.Subscribe:節(jié)點(diǎn)訂閱 LeaderElected 事件,當(dāng)該事件發(fā)生時(shí)觸發(fā) PingLeaderContinuously 函數(shù)
func (node *Node) NewListener() (net.Listener, error) {
addr, err := net.Listen("tcp", node.Addr)
return addr, err
}
- NewListener():該方法在節(jié)點(diǎn)的網(wǎng)絡(luò)地址(node.Addr)上創(chuàng)建新的 TCP 監(jiān)聽(tīng)器,用于處理來(lái)自其他節(jié)點(diǎn)的連接請(qǐng)求。
func (node *Node) ConnectToPeers() {
for peerID, peerAddr := range nodeAddressByID {
if node.IsItself(peerID) {
continue
}
rpcClient := node.connect(peerAddr)
pingMessage := Message{FromPeerID: node.ID, Type: PING}
reply, _ := node.CommunicateWithPeer(rpcClient, pingMessage)
if reply.IsPongMessage() {
log.Debug().Msgf("%s got pong message from %s", node.ID, peerID)
node.Peers.Add(peerID, rpcClient)
}
}
}
- ConnectToPeers():與集群中所有對(duì)端節(jié)點(diǎn)建立 RPC 連接,遍歷 nodeAddressByID 中的每個(gè)對(duì)端節(jié)點(diǎn),連接并發(fā)送 PING 消息。
如果對(duì)端節(jié)點(diǎn)回應(yīng)了 PONG 消息,就將該對(duì)端節(jié)點(diǎn)添加到已知對(duì)端節(jié)點(diǎn)列表中。
func (node *Node) connect(peerAddr string) *rpc.Client {
retry:
client, err := rpc.Dial("tcp", peerAddr)
if err != nil {
log.Debug().Msgf("Error dialing rpc dial %s", err.Error())
time.Sleep(50 * time.Millisecond)
goto retry
}
return client
}
- connect(peerAddr string) *rpc.Client:與給定的 peerAddr(對(duì)端網(wǎng)絡(luò)地址)建立 RPC 客戶端連接。
如果連接報(bào)錯(cuò),利用 goto 語(yǔ)句延遲 50 毫秒后重試。
func (node *Node) CommunicateWithPeer(RPCClient *rpc.Client, args Message) (Message, error) {
var reply Message
err := RPCClient.Call("Node.HandleMessage", args, &reply)
if err != nil {
log.Debug().Msgf("Error calling HandleMessage %s", err.Error())
}
return reply, err
}
- CommunicateWithPeer:該方法通過(guò) RPC 客戶端 RPCClient 向?qū)Χ税l(fā)送信息 args,并等待回復(fù)。
2. Peer.go
type Peer struct {
ID string
RPCClient *rpc.Client
}
type Peers struct {
*sync.RWMutex
peerByID map[string]*Peer
}
func NewPeers() *Peers {
return &Peers{
RWMutex: &sync.RWMutex{},
peerByID: make(map[string]*Peer),
}
}
func (p *Peers) Add(ID string, client *rpc.Client) {
...
}
func (p *Peers) Delete(ID string) {
...
}
func (p *Peers) Get(ID string) *Peer {
...
}
這是 Peer 和 Peers 結(jié)構(gòu)及其方法。Peer 代表系統(tǒng)中的單個(gè)節(jié)點(diǎn),而 Peers 則是對(duì)端節(jié)點(diǎn)的集合,包含添加、刪除、獲取和轉(zhuǎn)換為列表或 ID 的方法。
五、實(shí)現(xiàn)
- 通過(guò) Docker Compose 模擬集群中的節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)都基于相同的 dockerfile。
- 為了讓算法發(fā)揮作用,每個(gè)節(jié)點(diǎn)都需要了解其他節(jié)點(diǎn)的情況,這就需要一種服務(wù)發(fā)現(xiàn)機(jī)制。
- 每個(gè)節(jié)點(diǎn)都被硬編碼了其他節(jié)點(diǎn)的網(wǎng)絡(luò)信息,而不是實(shí)現(xiàn)完整的服務(wù)發(fā)現(xiàn)功能。
- 這種簡(jiǎn)化是為了演示目的。更穩(wěn)健的實(shí)現(xiàn)方式應(yīng)包括適當(dāng)?shù)姆?wù)發(fā)現(xiàn)機(jī)制,以動(dòng)態(tài)處理節(jié)點(diǎn)的添加和刪除。
在通信過(guò)程中,如果領(lǐng)導(dǎo)者出現(xiàn)故障,其連接將被中斷,并返回錯(cuò)誤信息,以便開(kāi)始新的選舉過(guò)程。
當(dāng)節(jié)點(diǎn)啟動(dòng)時(shí),node4 成為領(lǐng)導(dǎo)者,因?yàn)楦鶕?jù)其 ID,它的排序最高。在沒(méi)有領(lǐng)導(dǎo)者的情況下,node4 發(fā)起選舉,宣布自己為領(lǐng)導(dǎo)者,并廣播 ELECTED 消息通知其他節(jié)點(diǎn)。
接下來(lái),我們模擬 node4 被終止的情況,觀察新的領(lǐng)導(dǎo)者是如何被選出來(lái)的。
六、算法面臨的挑戰(zhàn)
當(dāng)出現(xiàn)網(wǎng)絡(luò)分區(qū)時(shí),該算法就會(huì)違反安全保證,導(dǎo)致不同節(jié)點(diǎn)子集可能出現(xiàn)多個(gè)領(lǐng)導(dǎo)者,這種情況被稱為 "腦裂"。
排序靠前的節(jié)點(diǎn)有很強(qiáng)的偏向性,如果它們不穩(wěn)定,就會(huì)出現(xiàn)問(wèn)題。當(dāng)不穩(wěn)定的高排序節(jié)點(diǎn)屢次失敗并試圖再次成為領(lǐng)導(dǎo)者時(shí),這種偏向會(huì)導(dǎo)致不斷循環(huán)重復(fù)選舉。
盡管存在這些挑戰(zhàn),Bully 算法還是為領(lǐng)導(dǎo)者選舉提供了一種清晰實(shí)用的方法,使其在可容錯(cuò)分布式系統(tǒng)中發(fā)揮重要作用。
參考資料:
- [1] Leader Election: Using Bully Algorithm in Golang: https://medium.com/@jitenderkmr/leader-election-using-bully-algorithm-in-go-60ec03bd277c
- [2] Bully 算法: https://en.wikipedia.org/wiki/Bully_algorithm