自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Pulsar客戶端消費(fèi)模式揭秘:Go 語(yǔ)言實(shí)現(xiàn) ZeroQueueConsumer

開(kāi)發(fā) 前端
pulsar-client-go 的運(yùn)行原理與 Java 客戶端的類似,也是將消息存放在了一個(gè)內(nèi)部隊(duì)列里,所以每次消費(fèi)消息只需要從這個(gè)隊(duì)列 messageCh 里獲取即可。值得注意的是, pulsar-client-go 版本的 zeroQueueConsumer 就不支持直接讀取內(nèi)部的隊(duì)列了。

前段時(shí)間在 pulsar-client-go 社區(qū)里看到這么一個(gè) issue:

圖片圖片

import "github.com/apache/pulsar-client-go/pulsar"

client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL: "pulsar://localhost:6650",
})
if err != nil {
    log.Fatal(err)
}
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic:             "persistent://public/default/mq-topic-1",
    SubscriptionName:  "sub-1",
    Type:              pulsar.Shared,
    ReceiverQueueSize: 0,
})
if err != nil {
    log.Fatal(err)
}


// 小于等于 0 時(shí)會(huì)設(shè)置為 1000
const (  
    defaultReceiverQueueSize = 1000  
)
if options.ReceiverQueueSize <= 0 {  
    options.ReceiverQueueSize = defaultReceiverQueueSize  
}

他發(fā)現(xiàn)手動(dòng)將 pulsar-client-go 客戶端的 ReceiverQueueSize 設(shè)置為 0 的時(shí)候,客戶端在初始化時(shí)會(huì)再將其調(diào)整為 1000.

if options.ReceiverQueueSize < 0 {  
    options.ReceiverQueueSize = defaultReceiverQueueSize  
}

而如果手動(dòng)將源碼修改為可以設(shè)置為 0 時(shí),卻不能正常消費(fèi),消費(fèi)者會(huì)一直處于 waiting 狀態(tài),獲取不到任何數(shù)據(jù)。

經(jīng)過(guò)我的排查發(fā)現(xiàn)是 Pulsar 的  Go  客戶端缺少了一個(gè) ZeroQueueConsumerImpl的實(shí)現(xiàn)類,這個(gè)類主要用于可以精細(xì)控制消費(fèi)邏輯。

If you'd like to have tight control over message dispatching across consumers, set the consumers' receiver queue size very low (potentially even to 0 if necessary). Each consumer has a receiver queue that determines how many messages the consumer attempts to fetch at a time. For example, a receiver queue of 1000 (the default) means that the consumer attempts to process 1000 messages from the topic's backlog upon connection. Setting the receiver queue to 0 essentially means ensuring that each consumer is only doing one thing at a time.

https://pulsar.apache.org/docs/next/cookbooks-message-queue/#client-configuration-changes

正如官方文檔里提到的那樣,可以將 ReceiverQueueSize 設(shè)置為 0;這樣消費(fèi)者就可以一條條的消費(fèi)數(shù)據(jù),而不會(huì)將消息堆積在客戶端隊(duì)列里。

客戶端消費(fèi)邏輯

借此機(jī)會(huì)需要再回顧下 pulsar 客戶端的消費(fèi)邏輯,這樣才能理解 ReceiverQueueSize 的作用以及如何在 pulsar-client-go 如何實(shí)現(xiàn)這個(gè) ZeroQueueConsumerImpl。

Pulsar 客戶端的消費(fèi)模式是基于推拉結(jié)合的:

圖片圖片

如這張圖所描述的流程,消費(fèi)者在啟動(dòng)的時(shí)候會(huì)主動(dòng)向服務(wù)端發(fā)送一個(gè) Flow 的命令,告訴服務(wù)端需要下發(fā)多少條消息給客戶端。

同時(shí)會(huì)使用剛才的那個(gè) ReceiverQueueSize參數(shù)作為內(nèi)部隊(duì)列的大小,將客戶端下發(fā)的消息存儲(chǔ)在內(nèi)部隊(duì)列里。

然后在調(diào)用 receive 函數(shù)的時(shí)候會(huì)直接從這個(gè)隊(duì)列里獲取數(shù)據(jù)。

圖片圖片

圖片圖片

每次消費(fèi)成功后都會(huì)將內(nèi)部的一個(gè) AvailablePermit+1,直到大于 MaxReceiveQueueSize / 2 就會(huì)再次向 broker 發(fā)送 flow 命令,告訴 broker 再次下發(fā)消息。

所以這里有一個(gè)很關(guān)鍵的事件:就是向 broker 發(fā)送 flow 命令,這樣才會(huì)有新的消息下發(fā)給客戶端。

之前經(jīng)常都會(huì)有研發(fā)同學(xué)讓我排查無(wú)法消費(fèi)的問(wèn)題,最終定位到的原因幾乎都是消費(fèi)緩慢,導(dǎo)致這里的 AvailablePermit 沒(méi)有增長(zhǎng),從而也就不會(huì)觸發(fā) broker 給客戶端推送新的消息。

看到的現(xiàn)象就是消費(fèi)非常緩慢。

ZeroQueueConsumerImpl 原理

下面來(lái)看看 ZeroQueueConsumerImpl 是如何實(shí)現(xiàn)隊(duì)列大小為 0 依然是可以消費(fèi)的。

圖片圖片

在構(gòu)建 consumer 的時(shí)候,就會(huì)根據(jù)隊(duì)列大小從而來(lái)創(chuàng)建普通消費(fèi)者還是 ZeroQueueConsumerImpl 消費(fèi)者。

@Override  
protected CompletableFuture<Message<T>> internalReceiveAsync() {  
    CompletableFuture<Message<T>> future = super.internalReceiveAsync();  
    if (!future.isDone()) {  
        // We expect the message to be not in the queue yet  
        increaseAvailablePermits(cnx());  
    }  
    return future;  
}

這是 ZeroQueueConsumerImpl 重寫的一個(gè)消費(fèi)函數(shù),其中關(guān)鍵的就是 increaseAvailablePermits(cnx());.

void increaseAvailablePermits(ClientCnx currentCnx) {
        increaseAvailablePermits(currentCnx, 1);
    }

    protected void increaseAvailablePermits(ClientCnx currentCnx, int delta) {
        int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta);
        while (available >= getCurrentReceiverQueueSize() / 2 && !paused) {
            if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) {
                sendFlowPermitsToBroker(currentCnx, available);
                break;
            } else {
                available = AVAILABLE_PERMITS_UPDATER.get(this);
            }
        }
    }

從源碼里可以得知這里的邏輯就是將 AvailablePermit 自增,達(dá)到閾值后請(qǐng)求 broker 下發(fā)消息。

因?yàn)樵?nbsp;ZeroQueueConsumerImpl 中隊(duì)列大小為 0,所以 available >= getCurrentReceiverQueueSize() / 2永遠(yuǎn)都會(huì)為 true。

也就是說(shuō)每消費(fèi)一條消息都會(huì)請(qǐng)求 broker 讓它再下發(fā)一條消息,這樣就達(dá)到了每一條消息都精確控制的效果。

pulsar-client-go 中的實(shí)現(xiàn)

為了在 pulsar-client-go 實(shí)現(xiàn)這個(gè)需求,我提交了一個(gè) PR 來(lái)解決這個(gè)問(wèn)題。

其實(shí)從上面的分析已經(jīng)得知為啥手動(dòng)將 ReceiverQueueSize 設(shè)置為 0 無(wú)法消費(fèi)消息了。

根本原因還是在初始化的時(shí)候優(yōu)于隊(duì)列為 0,導(dǎo)致不會(huì)給 broker 發(fā)送 flow 命令,這樣就不會(huì)有消息推送到客戶端,也就無(wú)法消費(fèi)到數(shù)據(jù)了。

所以我們依然得參考 Java 的 ZeroQueueConsumerImpl 在每次消費(fèi)的時(shí)候都手動(dòng)增加  availablePermits。

為此我也新增了一個(gè)消費(fèi)者 zeroQueueConsumer。

// EnableZeroQueueConsumer, if enabled, the ReceiverQueueSize will be 0.  
// Notice: only non-partitioned topic is supported.  
// Default is false.  
EnableZeroQueueConsumer bool

consumer, err := client.Subscribe(ConsumerOptions{  
    Topic:                   topicName,  
    SubscriptionName:        "sub-1",  
    Type:                    Shared,  
    NackRedeliveryDelay:     1 * time.Second,  
    EnableZeroQueueConsumer: true,  
})

if options.EnableZeroQueueConsumer {  
    options.ReceiverQueueSize = 0  
}

在創(chuàng)建消費(fèi)者的時(shí)候需要指定是否開(kāi)啟 ZeroQueueConsumer,當(dāng)開(kāi)啟后會(huì)手動(dòng)將 ReceiverQueueSize 設(shè)置為 0.

// 可以設(shè)置默認(rèn)值。
private int receiverQueueSize = 1000;

在 Go 中無(wú)法像 Java 那樣在結(jié)構(gòu)體初始化化的時(shí)候就指定默認(rèn)值,再加上 Go 的 int 類型具備零值(也就是0),所以無(wú)法區(qū)分出 ReceiverQueueSize=0 是用戶主動(dòng)設(shè)置的,還是沒(méi)有傳入這個(gè)參數(shù)使用的零值。

所以才需要新增一個(gè)參數(shù)來(lái)手動(dòng)區(qū)分是否使用 ZeroQueueConsumer。

圖片圖片

之后在創(chuàng)建 consumer 的時(shí)候進(jìn)行判斷,只有使用的是單分區(qū)的 topic 并且開(kāi)啟了 EnableZeroQueueConsumer 才能創(chuàng)建  zeroQueueConsumer。

圖片圖片

使用 PARTITIONED_METADATA 命令可以讓 broker 返回分區(qū)數(shù)量。

func (z *zeroQueueConsumer) Receive(ctx context.Context) (Message, error) {
 if state := z.pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
  z.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
  return nil, errors.New("consumer state is closed")
 }
 z.Lock()
 defer z.Unlock()
 z.pc.availablePermits.inc()
 for {
  select {
  case <-z.closeCh:
   return nil, newError(ConsumerClosed, "consumer closed")
  case cm, ok := <-z.messageCh:
   if !ok {
    return nil, newError(ConsumerClosed, "consumer closed")
   }
   return cm.Message, nil
  case <-ctx.Done():
   return nil, ctx.Err()
  }
 }

}

其中的關(guān)鍵代碼:z.pc.availablePermits.inc()

消費(fèi)時(shí)的邏輯其實(shí)和 Java 的 ZeroQueueConsumerImpl 邏輯保持了一致,也是每消費(fèi)一條數(shù)據(jù)之前就增加一次 availablePermits。

pulsar-client-go 的運(yùn)行原理與 Java 客戶端的類似,也是將消息存放在了一個(gè)內(nèi)部隊(duì)列里,所以每次消費(fèi)消息只需要從這個(gè)隊(duì)列 messageCh 里獲取即可。

值得注意的是, pulsar-client-go 版本的 zeroQueueConsumer 就不支持直接讀取內(nèi)部的隊(duì)列了。

func (z *zeroQueueConsumer) Chan() <-chan ConsumerMessage {  
    panic("zeroQueueConsumer cannot support Chan method")  
}

會(huì)直接 panic,因?yàn)橹苯酉M(fèi) channel 在客戶端層面就沒(méi)法幫用戶主動(dòng)發(fā)送 flow 命令了,所以這個(gè)功能就只能屏蔽掉了,只可以主動(dòng)的 receive 消息。

圖片圖片

許久之前我也畫(huà)過(guò)一個(gè)關(guān)于 pulsar client 的消費(fèi)流程圖,后續(xù)考慮會(huì)再寫一篇關(guān)于 pulsar client 的原理分析文章。

參考鏈接:

  • https://github.com/apache/pulsar-client-go/issues/1223
  • https://cloud.tencent.com/developer/article/2307608
  • https://pulsar.apache.org/docs/next/cookbooks-message-queue/#client-configuration-changes
  • https://github.com/apache/pulsar-client-go/pull/1225
責(zé)任編輯:武曉燕 來(lái)源: crossoverJie
相關(guān)推薦

2023-03-27 00:20:48

2023-05-08 07:55:05

快速排序Go 語(yǔ)言

2020-08-12 08:56:30

代碼凱撒密碼函數(shù)

2022-11-01 18:29:25

Go語(yǔ)言排序算法

2024-08-29 13:23:04

WindowsGo語(yǔ)言

2010-06-01 09:54:23

SVN客戶端安裝

2011-05-13 15:59:28

LBS團(tuán)購(gòu)移動(dòng)支付

2023-12-09 14:29:30

編程語(yǔ)言Go

2012-03-13 10:40:58

Google Go

2022-05-19 14:14:26

go語(yǔ)言限流算法

2024-08-09 10:59:01

KubernetesSidecar模式

2021-07-12 15:50:55

Go 語(yǔ)言netstat命令

2012-08-06 08:50:05

Go語(yǔ)言

2024-06-06 09:47:56

2021-10-18 05:00:38

語(yǔ)言GoRequestHTTP

2021-05-07 15:28:03

Kafka客戶端Sarama

2009-02-04 17:39:14

ibmdwWebSphereDataPower

2011-08-17 10:10:59

2021-09-22 15:46:29

虛擬桌面瘦客戶端胖客戶端

2023-07-31 08:01:13

二叉搜索測(cè)試
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)