自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

聊聊 Redis 的發(fā)布訂閱設(shè)計與實現(xiàn)

數(shù)據(jù)庫 Redis 開發(fā)
關(guān)于 Redis 發(fā)布訂閱的設(shè)計與實現(xiàn),本質(zhì)上就是通過一個個鏈表管理訂閱者,通過pub指令定位到channel后將消息遍歷發(fā)送到對應(yīng)客戶端socket上。

借一個午休的時光整理一下關(guān)于redis發(fā)布訂閱源碼的設(shè)計與實現(xiàn),通過本文的閱讀,你將會對發(fā)布訂閱模型的設(shè)計思想以及對哨兵間選舉通信的流程有著更底層的視角。

一、詳解redis中發(fā)布訂閱的設(shè)計

1. channel的設(shè)計

redis服務(wù)端啟動時,會初始化一個記錄channel以及channel訂閱者的鍵值對結(jié)構(gòu),它用channel的名稱作為key,用一個鏈表記錄這個訂閱這個channel的客戶端:

對此我們給出redis初始化函數(shù)initServer的代碼片段,可以看到其內(nèi)部調(diào)用dictCreate方法為pubsub_channels 這個記錄channel和channel訂閱者的指針初始化了一個頻道名稱為key,鏈表為value的字典:

void initServer(void) {
    //......
    // 初始化pubsub_channels存儲頻道信息,keylistDictType用頻道名稱作為key,訂閱者list作為value
    server.pubsub_channels = dictCreate(&keylistDictType,NULL);
 //......
}

2. pub/sub的實現(xiàn)

當(dāng)客戶端1通過SUBSCRIBE mychannel訂閱mychannel這個頻道,本質(zhì)上就是redis服務(wù)端解析SUBSCRIBE指令并調(diào)用subscribeCommand函數(shù),該方法會檢查這個channel是否存在,如果不存在則則以channel名稱為key,初始化一個鏈表作為value,將訂閱這個channel的客戶端追加到鏈表中。反之,如果channel存在則直接將客戶端信息存入鏈表即可:

圖片==圖片==

對此我們給出對應(yīng)的源碼實現(xiàn),該函數(shù)subscribeCommand位于pubsub.c可以看到其入口邏輯就是遍歷參數(shù)得到當(dāng)前客戶端想訂閱的頻道,然后調(diào)用pubsubSubscribeChannel將該客戶端追加到這個頻道的鏈表上:

void subscribeCommand(redisClient *c) {
    int j;
    //遍歷頻道將該客戶端存入
    for (j = 1; j < c->argc; j++)
        pubsubSubscribeChannel(c,c->argv[j]);
     //將當(dāng)前客戶端標(biāo)識為做了發(fā)布訂閱   
    c->flags |= REDIS_PUBSUB;
}

我們步入pubsubSubscribeChannel方法即可看到上圖所說明的邏輯,如果對應(yīng)的頻道不存在,則初始化然后將客戶端追加到鏈表中,反之直接追加到鏈表中:

int pubsubSubscribeChannel(redisClient *c, robj *channel) {
    //頻道添加到pubsub_channels中
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        //查看這個頻道的訂閱者鏈表是否存在
        de = dictFind(server.pubsub_channels,channel);
        //如果頻道不存在,則直接初始化鏈表
        if (de == NULL) {
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        
        //將客戶端追加的鏈表尾巴
        listAddNodeTail(clients,c);
    }
   //......
}

同理,當(dāng)我們通過redis客戶端鍵入publish mychannel "hello"向mychannel 發(fā)送一個hello消息時,redis服務(wù)端會解析這條publish指令并調(diào)用publishCommand完成消息發(fā)布,通知到各個訂閱者:

圖片==圖片==

我們給出publishCommand的源碼,位于pubsub.c這個源代碼文件中,可以看到這段代碼會將channel和對應(yīng)的消息傳入pubsubPublishMessage方法中,并返回接收者數(shù):

void publishCommand(redisClient *c) {
    //發(fā)布消息返回接收者 PUBLISH <channel> <message>,返回接收者的數(shù)量
    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
   //......
}

步入pubsubPublishMessage即可看到發(fā)布消息的核心邏輯,可以看到這個方法用receivers來記錄接收的通知者,它會先進(jìn)行精準(zhǔn)匹配,到pubsub_channels找到和channel名字一致的channel并向該channel的訂閱者發(fā)布消息,然后在進(jìn)行模糊匹配,遍歷所有的channel找到模糊匹配上的channel并向訂閱者發(fā)布消息:

int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    dictEntry *de;
    listNode *ln;
    listIter li;

    
    //查找名字相同的channel
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
        list *list = dictGetVal(de);
       //......
  //移動至訂閱者鏈表首部
        listRewind(list,&li);
        //遍歷并向這些訂閱者發(fā)布消息
        while ((ln = listNext(&li)) != NULL) {
            redisClient *c = ln->value;

          //......
          //發(fā)布消息
            addReplyBulk(c,message);
            //接收數(shù)++
            receivers++;
        }
    }
    
    if (listLength(server.pubsub_patterns)) {
     //移動至channel鏈表首部
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        //遍歷channel
        while ((ln = listNext(&li)) != NULL) {
            pubsubPattern *pat = ln->value;
   //找到匹配的channel并發(fā)布消息
            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {
               //......
                addReplyBulk(pat->client,message);
                receivers++;
            }
        }
    }
    return receivers;
}

3. 哨兵如何利用發(fā)布訂閱完成消息通信的

關(guān)于pub/sub模式,redis中的哨兵就很好的利用這種模式進(jìn)行溝通和選舉等各個工作,當(dāng)我們的redis以哨兵的方式啟動時,redis會定期執(zhí)行哨兵的定時任務(wù),該任務(wù)會在檢查連接時檢查發(fā)布訂閱master的連接是否為空,若為空則調(diào)用異步連接綁定的方式訂閱master的"__sentinel__:hello"頻道,而該頻道主要負(fù)責(zé)下面這些工作:

  • Sentinel 實例的發(fā)現(xiàn)與信息交換:每個 Sentinel 實例會定期通過 __sentinel__:hello 頻道發(fā)布自己的信息,包括 Sentinel 的 IP 地址、端口、運(yùn)行 ID、當(dāng)前配置的紀(jì)元(epoch)等。 其他 Sentinel 實例會訂閱這個頻道,從而感知到其他 Sentinel 的存在,并獲取它們的信息。 監(jiān)控主從節(jié)點的狀態(tài):
  • Sentinel 實例通過 __sentinel__:hello 頻道共享它們對 Redis 主節(jié)點和從節(jié)點的監(jiān)控信息:例如,某個 Sentinel 實例檢測到主節(jié)點不可用時,會通過這個頻道通知其他 Sentinel 實例,以便它們確認(rèn)并共同決定是否進(jìn)行故障轉(zhuǎn)移。 故障轉(zhuǎn)移的協(xié)調(diào):

在故障轉(zhuǎn)移過程中,Sentinel 實例會通過 __sentinel__:hello 頻道交換信息,協(xié)調(diào)誰來執(zhí)行故障轉(zhuǎn)移操作,并確保只有一個 Sentinel 實例負(fù)責(zé)執(zhí)行。

void sentinelReconnectInstance(sentinelRedisInstance *ri) {
    //......
   
    //檢查發(fā)布訂閱是否為空
    if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && ri->pc == NULL) {
     //若為空則pc指針指向異步連接
        ri->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,REDIS_BIND_ADDR);
        if (ri->pc->err) {
         //......
        } else {//如果沒有報錯,則訂閱__sentinel__:hello頻道
            int retval;

            //......
            //哨兵訂閱 __sentinel__:hello 頻道(也就是下面的常量SENTINEL_HELLO_CHANNEL),通過sentinelReceiveHelloMessages處理回調(diào)
            retval = redisAsyncCommand(ri->pc,
                sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
                    SENTINEL_HELLO_CHANNEL);
          //......
            }
        }
    }
   //......
}

通過master的hello頻道,哨兵會定期publish自己的信息到hello頻道,其他哨兵就可以基于這個頻道發(fā)現(xiàn)其他的哨兵由此完成通信:

對此我們給出哨兵定期發(fā)送hello的函數(shù)入口sentinelSendPeriodicCommands,這個方法會被定期執(zhí)行,其內(nèi)部邏輯一旦檢查到pub/sub時間間隔過長時就會發(fā)送調(diào)用sentinelSendHello向hello頻道發(fā)送當(dāng)前哨兵的信息讓其他哨兵感知:

void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
    //......
    ping_period = ri->down_after_period;
    if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;

    if ((ri->flags & SRI_SENTINEL) == 0 &&
        (ri->info_refresh == 0 ||
        (now - ri->info_refresh) > info_period))
    {
       //......
    } else if ((now - ri->last_pong_time) > ping_period) {//超過ping間隔發(fā)ping
     //......
    } else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
    
        //超過pub最大間隔SENTINEL_PUBLISH_PERIOD則發(fā)送發(fā)送哨兵自身ip端口等信息到hello頻道
        sentinelSendHello(ri);
    }
}

步入sentinelSendHello即可看到我們上文所說的邏輯,可以看到當(dāng)前哨兵會組裝個人信息通過異步連接cc指針維護(hù)的連接信息PUBLISH 個人信息到hello頻道:

int sentinelSendHello(sentinelRedisInstance *ri) {
   //......
   //獲取當(dāng)前哨兵ip
    if (sentinel.announce_ip) {
        announce_ip = sentinel.announce_ip;
    } else {
        if (anetSockName(ri->cc->c.fd,ip,sizeof(ip),NULL) == -1)
            return REDIS_ERR;
        announce_ip = ip;
    }
    //獲取當(dāng)前哨兵端口
    announce_port = sentinel.announce_port ?
                    sentinel.announce_port : server.port;

    //將數(shù)據(jù)拼接到payload中
    snprintf(payload,sizeof(payload),
        "%s,%d,%s,%llu," /* Info about this sentinel. */
        "%s,%s,%d,%llu", /* Info about current master. */
        announce_ip, announce_port, server.runid,
        (unsigned long long) sentinel.current_epoch,
        /* --- */
        master->name,master_addr->ip,master_addr->port,
        (unsigned long long) master->config_epoch);
    //將組裝的哨兵信息publish到hello頻道(SENTINEL_HELLO_CHANNEL就是hello頻道的常量變量值)   
    retval = redisAsyncCommand(ri->cc,
        sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
            SENTINEL_HELLO_CHANNEL,payload);
    //......
}

二、小結(jié)

自此我們將發(fā)現(xiàn)redis發(fā)布訂閱的設(shè)計與實現(xiàn),本質(zhì)上就是通過一個個鏈表管理訂閱者,通過pub指令定位到channel后將消息遍歷發(fā)送到對應(yīng)客戶端socket上,這里筆者也簡單的補(bǔ)充一句,從源碼中我們可以看到redis的發(fā)布訂閱模型沒有持久化機(jī)制,所以對于可靠性要求高的場景筆者還是不太建議使用pub/sub。

責(zé)任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關(guān)推薦

2025-03-20 09:54:47

2020-09-15 10:25:13

Redis命令Java

2023-05-26 08:24:17

短信渠道模型

2022-10-18 08:28:38

運(yùn)營活動實現(xiàn)邏輯整體協(xié)作

2024-01-10 08:16:08

Redis集成JMS

2025-02-19 10:27:48

哨兵Redis故障轉(zhuǎn)移

2025-01-23 08:53:15

2024-07-02 11:42:53

SpringRedis自定義

2021-08-05 06:54:05

觀察者訂閱設(shè)計

2020-01-02 09:57:09

Redis訂閱發(fā)布

2009-11-05 10:07:37

WCF設(shè)計模式

2022-12-02 07:28:58

Event訂閱模式Spring

2024-05-14 08:03:51

C#EventArgs?屬性

2023-12-14 10:10:09

pythonRedis開發(fā)

2024-11-04 08:00:00

Netty客戶端

2024-10-11 11:50:05

Redis適用場景

2022-08-15 09:02:22

Redis模式訂閱消息

2023-02-10 08:59:42

業(yè)務(wù)技術(shù)核心

2024-12-13 16:28:43

2025-01-06 08:10:00

Redis跳表索引
點贊
收藏

51CTO技術(shù)棧公眾號