聊聊 Redis 的發(fā)布訂閱設(shè)計與實現(xiàn)
借一個午休的時光整理一下關(guān)于redis發(fā)布訂閱源碼的設(shè)計與實現(xiàn),通過本文的閱讀,你將會對發(fā)布訂閱模型的設(shè)計思想以及對哨兵間選舉通信的流程有著更底層的視角。
一、詳解redis中發(fā)布訂閱的設(shè)計
1. channel的設(shè)計
redis服務(wù)端啟動時,會初始化一個記錄channel以及channel訂閱者的鍵值對結(jié)構(gòu),它用channel的名稱作為key,用一個鏈表記錄這個訂閱這個channel的客戶端:
對此我們給出redis初始化函數(shù)initServer的代碼片段,可以看到其內(nèi)部調(diào)用dictCreate方法為pubsub_channels 這個記錄channel和channel訂閱者的指針初始化了一個頻道名稱為key,鏈表為value的字典:
void initServer(void) {
//......
// 初始化pubsub_channels存儲頻道信息,keylistDictType用頻道名稱作為key,訂閱者list作為value
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
//......
}
2. pub/sub的實現(xiàn)
當(dāng)客戶端1通過SUBSCRIBE mychannel訂閱mychannel這個頻道,本質(zhì)上就是redis服務(wù)端解析SUBSCRIBE指令并調(diào)用subscribeCommand函數(shù),該方法會檢查這個channel是否存在,如果不存在則則以channel名稱為key,初始化一個鏈表作為value,將訂閱這個channel的客戶端追加到鏈表中。反之,如果channel存在則直接將客戶端信息存入鏈表即可:
圖片==
對此我們給出對應(yīng)的源碼實現(xiàn),該函數(shù)subscribeCommand位于pubsub.c可以看到其入口邏輯就是遍歷參數(shù)得到當(dāng)前客戶端想訂閱的頻道,然后調(diào)用pubsubSubscribeChannel將該客戶端追加到這個頻道的鏈表上:
void subscribeCommand(redisClient *c) {
int j;
//遍歷頻道將該客戶端存入
for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]);
//將當(dāng)前客戶端標(biāo)識為做了發(fā)布訂閱
c->flags |= REDIS_PUBSUB;
}
我們步入pubsubSubscribeChannel方法即可看到上圖所說明的邏輯,如果對應(yīng)的頻道不存在,則初始化然后將客戶端追加到鏈表中,反之直接追加到鏈表中:
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
//頻道添加到pubsub_channels中
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
//查看這個頻道的訂閱者鏈表是否存在
de = dictFind(server.pubsub_channels,channel);
//如果頻道不存在,則直接初始化鏈表
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
//將客戶端追加的鏈表尾巴
listAddNodeTail(clients,c);
}
//......
}
同理,當(dāng)我們通過redis客戶端鍵入publish mychannel "hello"向mychannel 發(fā)送一個hello消息時,redis服務(wù)端會解析這條publish指令并調(diào)用publishCommand完成消息發(fā)布,通知到各個訂閱者:
圖片==
我們給出publishCommand的源碼,位于pubsub.c這個源代碼文件中,可以看到這段代碼會將channel和對應(yīng)的消息傳入pubsubPublishMessage方法中,并返回接收者數(shù):
void publishCommand(redisClient *c) {
//發(fā)布消息返回接收者 PUBLISH <channel> <message>,返回接收者的數(shù)量
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
//......
}
步入pubsubPublishMessage即可看到發(fā)布消息的核心邏輯,可以看到這個方法用receivers來記錄接收的通知者,它會先進(jìn)行精準(zhǔn)匹配,到pubsub_channels找到和channel名字一致的channel并向該channel的訂閱者發(fā)布消息,然后在進(jìn)行模糊匹配,遍歷所有的channel找到模糊匹配上的channel并向訂閱者發(fā)布消息:
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
dictEntry *de;
listNode *ln;
listIter li;
//查找名字相同的channel
de = dictFind(server.pubsub_channels,channel);
if (de) {
list *list = dictGetVal(de);
//......
//移動至訂閱者鏈表首部
listRewind(list,&li);
//遍歷并向這些訂閱者發(fā)布消息
while ((ln = listNext(&li)) != NULL) {
redisClient *c = ln->value;
//......
//發(fā)布消息
addReplyBulk(c,message);
//接收數(shù)++
receivers++;
}
}
if (listLength(server.pubsub_patterns)) {
//移動至channel鏈表首部
listRewind(server.pubsub_patterns,&li);
channel = getDecodedObject(channel);
//遍歷channel
while ((ln = listNext(&li)) != NULL) {
pubsubPattern *pat = ln->value;
//找到匹配的channel并發(fā)布消息
if (stringmatchlen((char*)pat->pattern->ptr,
sdslen(pat->pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) {
//......
addReplyBulk(pat->client,message);
receivers++;
}
}
}
return receivers;
}
3. 哨兵如何利用發(fā)布訂閱完成消息通信的
關(guān)于pub/sub模式,redis中的哨兵就很好的利用這種模式進(jìn)行溝通和選舉等各個工作,當(dāng)我們的redis以哨兵的方式啟動時,redis會定期執(zhí)行哨兵的定時任務(wù),該任務(wù)會在檢查連接時檢查發(fā)布訂閱master的連接是否為空,若為空則調(diào)用異步連接綁定的方式訂閱master的"__sentinel__:hello"頻道,而該頻道主要負(fù)責(zé)下面這些工作:
- Sentinel 實例的發(fā)現(xiàn)與信息交換:每個 Sentinel 實例會定期通過 __sentinel__:hello 頻道發(fā)布自己的信息,包括 Sentinel 的 IP 地址、端口、運(yùn)行 ID、當(dāng)前配置的紀(jì)元(epoch)等。 其他 Sentinel 實例會訂閱這個頻道,從而感知到其他 Sentinel 的存在,并獲取它們的信息。 監(jiān)控主從節(jié)點的狀態(tài):
- Sentinel 實例通過 __sentinel__:hello 頻道共享它們對 Redis 主節(jié)點和從節(jié)點的監(jiān)控信息:例如,某個 Sentinel 實例檢測到主節(jié)點不可用時,會通過這個頻道通知其他 Sentinel 實例,以便它們確認(rèn)并共同決定是否進(jìn)行故障轉(zhuǎn)移。 故障轉(zhuǎn)移的協(xié)調(diào):
在故障轉(zhuǎn)移過程中,Sentinel 實例會通過 __sentinel__:hello 頻道交換信息,協(xié)調(diào)誰來執(zhí)行故障轉(zhuǎn)移操作,并確保只有一個 Sentinel 實例負(fù)責(zé)執(zhí)行。
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
//......
//檢查發(fā)布訂閱是否為空
if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && ri->pc == NULL) {
//若為空則pc指針指向異步連接
ri->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,REDIS_BIND_ADDR);
if (ri->pc->err) {
//......
} else {//如果沒有報錯,則訂閱__sentinel__:hello頻道
int retval;
//......
//哨兵訂閱 __sentinel__:hello 頻道(也就是下面的常量SENTINEL_HELLO_CHANNEL),通過sentinelReceiveHelloMessages處理回調(diào)
retval = redisAsyncCommand(ri->pc,
sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
SENTINEL_HELLO_CHANNEL);
//......
}
}
}
//......
}
通過master的hello頻道,哨兵會定期publish自己的信息到hello頻道,其他哨兵就可以基于這個頻道發(fā)現(xiàn)其他的哨兵由此完成通信:
對此我們給出哨兵定期發(fā)送hello的函數(shù)入口sentinelSendPeriodicCommands,這個方法會被定期執(zhí)行,其內(nèi)部邏輯一旦檢查到pub/sub時間間隔過長時就會發(fā)送調(diào)用sentinelSendHello向hello頻道發(fā)送當(dāng)前哨兵的信息讓其他哨兵感知:
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
//......
ping_period = ri->down_after_period;
if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;
if ((ri->flags & SRI_SENTINEL) == 0 &&
(ri->info_refresh == 0 ||
(now - ri->info_refresh) > info_period))
{
//......
} else if ((now - ri->last_pong_time) > ping_period) {//超過ping間隔發(fā)ping
//......
} else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
//超過pub最大間隔SENTINEL_PUBLISH_PERIOD則發(fā)送發(fā)送哨兵自身ip端口等信息到hello頻道
sentinelSendHello(ri);
}
}
步入sentinelSendHello即可看到我們上文所說的邏輯,可以看到當(dāng)前哨兵會組裝個人信息通過異步連接cc指針維護(hù)的連接信息PUBLISH 個人信息到hello頻道:
int sentinelSendHello(sentinelRedisInstance *ri) {
//......
//獲取當(dāng)前哨兵ip
if (sentinel.announce_ip) {
announce_ip = sentinel.announce_ip;
} else {
if (anetSockName(ri->cc->c.fd,ip,sizeof(ip),NULL) == -1)
return REDIS_ERR;
announce_ip = ip;
}
//獲取當(dāng)前哨兵端口
announce_port = sentinel.announce_port ?
sentinel.announce_port : server.port;
//將數(shù)據(jù)拼接到payload中
snprintf(payload,sizeof(payload),
"%s,%d,%s,%llu," /* Info about this sentinel. */
"%s,%s,%d,%llu", /* Info about current master. */
announce_ip, announce_port, server.runid,
(unsigned long long) sentinel.current_epoch,
/* --- */
master->name,master_addr->ip,master_addr->port,
(unsigned long long) master->config_epoch);
//將組裝的哨兵信息publish到hello頻道(SENTINEL_HELLO_CHANNEL就是hello頻道的常量變量值)
retval = redisAsyncCommand(ri->cc,
sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
SENTINEL_HELLO_CHANNEL,payload);
//......
}
二、小結(jié)
自此我們將發(fā)現(xiàn)redis發(fā)布訂閱的設(shè)計與實現(xiàn),本質(zhì)上就是通過一個個鏈表管理訂閱者,通過pub指令定位到channel后將消息遍歷發(fā)送到對應(yīng)客戶端socket上,這里筆者也簡單的補(bǔ)充一句,從源碼中我們可以看到redis的發(fā)布訂閱模型沒有持久化機(jī)制,所以對于可靠性要求高的場景筆者還是不太建議使用pub/sub。