代碼很少,卻很優(yōu)秀!RocketMQ的NameServer是如何做到的?
今天我們來一起深入分析 RocketMQ的注冊(cè)中心 NameServer。
本文基于 RocketMQ release-5.2.0。
首先,我們回顧下 RocketMQ的內(nèi)核原理鳥瞰圖:
從上面的鳥瞰圖,我們可以看出:Nameserver既和 Broker交互,也和 Producer和 Consumer交互,因此,在 RocketMQ中,Nameserver起到了一個(gè)紐帶性的作用。
接著,我們?cè)倏纯?NameServer的工程結(jié)構(gòu),如下圖:
整個(gè)工程只有 11個(gè)類(老版本好像只有不到 10個(gè)類),為什么 RocketMQ可以用如此少的代碼,設(shè)計(jì)出如此高性能且輕量的注冊(cè)中心?
我覺得最核心的 3個(gè)點(diǎn)是:
- AP設(shè)計(jì)思想
- 簡(jiǎn)單的數(shù)據(jù)結(jié)構(gòu)
- 心跳機(jī)制
一、AP設(shè)計(jì)思想
像 ZooKeeper,采用了 Zab (Zookeeper Atomic Broadcast) 這種比較重的協(xié)議,必須大多數(shù)節(jié)點(diǎn)(過半數(shù))可用,才能確保了數(shù)據(jù)的一致性和高可用,大大增加了網(wǎng)絡(luò)開銷和復(fù)雜度。
而 NameServer遵守了 CAP理論中 AP,在一個(gè) NameServer集群中,NameServer節(jié)點(diǎn)之間是P2P(Peer to Peer)的對(duì)等關(guān)系,并且 NameServer之間并沒有通信,減少很多不必要的網(wǎng)絡(luò)開銷,即便只剩一個(gè) NameServer節(jié)點(diǎn)也能繼續(xù)工作,足以保證高可用。
二、數(shù)據(jù)結(jié)構(gòu)
NameServer維護(hù)了一套比較簡(jiǎn)單的數(shù)據(jù)結(jié)構(gòu),內(nèi)部維護(hù)了一個(gè)路由表,該路由表包含以下幾個(gè)核心元數(shù)據(jù),對(duì)應(yīng)的源碼類RouteInfoManager如下:
public class RouteInfoManager {
private final static long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; // broker失效時(shí)間 120s
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
- topicQueueTable:Topic消息隊(duì)列路由信息,消息發(fā)送時(shí)根據(jù)路由表進(jìn)行負(fù)載均衡
- brokerAddrTable:Broker基礎(chǔ)信息,包括brokerName、所屬集群名稱、主備Broker地址
- clusterAddrTable:Broker集群信息,存儲(chǔ)集群中所有Broker名稱
- brokerLiveTable:Broker狀態(tài)信息,NameServer每次收到心跳包會(huì)替換該信息
- filterServerTable:Broker上的FilterServer列表,用于過濾標(biāo)簽(Tag)或 SQL表達(dá)式,以減輕 Consumer的負(fù)擔(dān),提高消息消費(fèi)的效率。
1.TopicRouteData
TopicRouteData是 NameServer中最重要的數(shù)據(jù)結(jié)構(gòu)之一,它包括了 Topic對(duì)應(yīng)的所有 Broker信息以及每個(gè) Broker上的隊(duì)列信息,filter服務(wù)器列表,其源碼如下:
public class TopicRouteData {
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String, List<String>> filterServerTable;
//It could be null or empty
private Map<String/*brokerName*/, TopicQueueMappingInfo> topicQueueMappingByBroker;
}
2.BrokerData
BrokerData包含了 Broker的基本屬性,狀態(tài),所在集群以及 Broker服務(wù)器的 IP地址,其源碼如下:
public class BrokerData {
private String cluster;//所在的集群
private String brokerName;//所在的brokerName
private HashMap<Long, String> brokerAddrs;//該broker對(duì)應(yīng)的機(jī)器IP列表
private String zoneName; // 區(qū)域名稱
}
3.QueueData
QueueData包含了 BrokerName,readQueue的數(shù)量,writeQueue的數(shù)量等信息,對(duì)應(yīng)的源碼類是QueueData,其源碼如下:
public class QueueData {
private String brokerName;//所在的brokerName
private int readQueueNums;// 讀隊(duì)列數(shù)量
private int writeQueueNums;// 寫隊(duì)列數(shù)量
private int perm; // 讀寫權(quán)限,參考PermName 類
private int topicSysFlag; // topic同步標(biāo)記,參考TopicSysFlag 類
}
4.元數(shù)據(jù)舉例
為了更好地理解元數(shù)據(jù),這里對(duì)每一種元數(shù)據(jù)都給出一個(gè)數(shù)據(jù)實(shí)例:
topicQueueTable:{
"topicA":[
{
"brokeName":"broker-a",
"readQueueNums":4,
"writeQueueNums":4,
"perm":6,
"topicSyncFlag":0
},
{
"brokeName":"broker-b",
"readQueueNums":4,
"writeQueueNums":4,
"perm":6,
"topicSyncFlag":0
}
],
"topicB":[]
}
brokeAddrTable:{
"broker-a":{
"cluster":"cluster-1",
"brokerName":"broker-a",
"brokerAddrs":{
0:"192.168.0.1:8000",
1:"192.168.0.2:8000"
}
},
"broker-b":{
"cluster":"cluster-1",
"brokerName":"broker-b",
"brokerAddrs":{
0:"192.168.0.3:8000",
1:"192.168.0.4:8000"
}
}
}
三、心跳機(jī)制
心跳機(jī)制是 NameServer維護(hù) Broker的路由信息最重要的一個(gè)抓手,主要分為接收心跳、處理心跳、心跳超時(shí) 3部分:
1.接收心跳
Broker每 30s會(huì)向所有的 NameServer發(fā)送心跳包,告訴它們自己還存活著,從而更新自己在 NameServer的狀態(tài),整體交互如下圖:
2.處理心跳
NameServer收到心跳包時(shí)會(huì)更新 brokerLiveTable緩存中 BrokerLiveInfo的 lastUpdateTimeStamp信息,整體交互如下圖:
處理邏輯可以參考源碼:org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest#brokerHeartbeat:
public RemotingCommand brokerHeartbeat(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final BrokerHeartbeatRequestHeader requestHeader =
(BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
this.namesrvController.getRouteInfoManager().updateBrokerInfoUpdateTimestamp(requestHeader.getClusterName(), requestHeader.getBrokerAddr());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
3.心跳超時(shí)
NameServer每隔 10s(每隔5s + 5s延遲)掃描 brokerLiveTable檢查 Broker的狀態(tài),如果在 120s內(nèi)未收到 Broker心跳,則認(rèn)為 Broker異常,會(huì)從路由表將該 Broker摘除并關(guān)閉 Socket連接,同時(shí)還會(huì)更新路由表的其他信息,整體交互如下圖:
private void startScheduleService() {
this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,
5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);
}
源碼參考:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#unRegisterBroker(),核心流程:
- 遍歷brokerAddrTable
- 遍歷broker地址
- 根據(jù) broker地址移除 brokerAddr
- 如果當(dāng)前 Topic只包含待移除的 Broker,則移除該 Topic
四、其他核心源碼解讀
NameServer啟動(dòng)
NameServer的啟動(dòng)類為:org.apache.rocketmq.namesrv.NamesrvStartup,整個(gè)流程如下圖:
NameServer啟動(dòng)最核心的 3個(gè)事情是:
- 加載配置:NameServerConfig、NettyServerConfig主要是映射配置文件,并創(chuàng)建 NamesrvController。
- 啟動(dòng) Netty通信服務(wù):NettyRemotingServer是 NameServer和Broker,Producer,Consumer通信的底層通道 Netty服務(wù)器。
- 啟動(dòng)定時(shí)器和鉤子程序:NameServerController實(shí)例一方面處理 Netty接收到消息后,一方面內(nèi)部有多個(gè)定時(shí)器和鉤子程序,它是 NameServer的核心控制器。
五、總結(jié)
NameServer并沒有采用復(fù)雜的分布式協(xié)議來保持?jǐn)?shù)據(jù)的一致性,而是采用 CAP理論中的 AP,各個(gè)節(jié)點(diǎn)之間是Peer to Peer的對(duì)等關(guān)系,數(shù)據(jù)的一致性通過心跳機(jī)制,定時(shí)器,延時(shí)感知來完成。