SpringBoot整合分布式消息平臺Pulsar
大家好,我是君哥。
作為優(yōu)秀的消息流平臺,Pulsar 的使用越來越多,這篇文章講解 Pulsar 的 Java 客戶端。
部署 Pulsar
Pulsar 的部署方式主要有 3 種,本地安裝二進制文件、docker 部署、在 Kubernetes 上部署。
本文采用 docker 部署一個單節(jié)點的 Pulsar 集群。實驗環(huán)境是 2 核 CPU 和 4G 內(nèi)存。
部署命令如下:
- docker run -it -p 6650:6650 -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.9.1 bin/pulsar standalone
安裝過程可能會出現(xiàn)下面的錯誤:
- unknown flag: --mount
- See 'docker run --help'.
這是因為 docker 版本低,不支持 mount 參數(shù),把 docker 版本升級到 17.06 以上就可以了。
部署過程中可能會因為網(wǎng)絡(luò)的原因失敗,多試幾次就可以成功了。如果看到下面的日志,就說明啟動成功了。
- 2022-01-08T22:27:58,726+0000 [main] INFO org.apache.pulsar.broker.PulsarService - messaging service is ready, bootstrap service port = 8080, broker url= pulsar://localhost:6650, cluster=standalone
本地單節(jié)點集群啟動后,會創(chuàng)建一個 namespace,名字叫 public/default
Pulsar 客戶端
目前 Pulsar 支持多種語言的客戶端,包括:
Java 客戶端Go 客戶端Python 客戶端C++ 客戶端Node.js 客戶端WebSocket 客戶端C# 客戶端
SpringBoot 配置
使用 SpringBoot 整合 Pulsar 客戶端,首先引入 Pulsar 客戶端依賴,代碼如下:
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client</artifactId>
- <version>2.9.1</version>
- </dependency>
然后在 properties 文件中添加配置:
- # Pulsar 地址
- pulsar.url=pulsar://192.168.59.155:6650
- # topic
- pulsar.topic=testTopic
- # consumer group
- pulsar.subscription=topicGroup
創(chuàng)建 Client
創(chuàng)建客戶端非常簡單,代碼如下:
- client = PulsarClient.builder()
- .serviceUrl(url)
- .build();
上面的 url 就是 properties 文件中定義的 pulsar.url 。
創(chuàng)建 Client 時,即使集群沒有啟成功,程序也不會報錯,因為這時還沒有真正地去連接集群。
創(chuàng)建 Producer
- producer = client.newProducer()
- .topic(topic)
- .compressionType(CompressionType.LZ4)
- .sendTimeout(0, TimeUnit.SECONDS)
- .enableBatching(true)
- .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
- .batchingMaxMessages(1000)
- .maxPendingMessages(1000)
- .blockIfQueueFull(true)
- .roundRobinRouterBatchingPartitionSwitchFrequency(10)
- .batcherBuilder(BatcherBuilder.DEFAULT)
- .create();
創(chuàng)建 Producer,會真正的連接集群,這時如果集群有問題,就會報連接錯誤。
下面解釋一下創(chuàng)建 Producer 的參數(shù):
topic:Producer 要寫入的 topic。
compressionType:壓縮策略,目前支持 4 種策略 (NONE、LZ4、ZLIB、ZSTD),從 Pulsar2.3 開始,只有 Consumer 的版本在 2.3 以上,這個策略才會生效。
sendTimeout:超時時間,如果 Producer 在超時時間為收到 ACK,會進行重新發(fā)送。
enableBatching:是否開啟消息批量處理,這里默認 true,這個參數(shù)只有在異步發(fā)送 (sendAsync) 時才能生效,選擇同步發(fā)送會失效。
batchingMaxPublishDelay:批量發(fā)送消息的時間段,這里定義的是 10ms,需要注意的是,設(shè)置了批量時間,就不會受消息數(shù)量的影響。批量發(fā)送會把要發(fā)送的批量消息放在一個網(wǎng)絡(luò)包里發(fā)送出去,減少網(wǎng)絡(luò) IO 次數(shù),大大提高網(wǎng)卡的發(fā)送效率。
batchingMaxMessages:批量發(fā)送消息的最大數(shù)量。
maxPendingMessages:等待從 broker 接收 ACK 的消息隊列最大長度。如果這個隊列滿了,producer 所有的 sendAsync 和 send 都會失敗,除非設(shè)置了 blockIfQueueFull 值是 true。
blockIfQueueFull:Producer 發(fā)送消息時會把消息先放入本地 Queue 緩存,如果緩存滿了,就會阻塞消息發(fā)送。
roundRobinRouterBatchingPartition-SwitchFrequency:如果發(fā)送消息時沒有指定 key,那默認采用 round robin 的方式發(fā)送消息,使用 round robin 的方式,切換 partition 的周期是 (frequency * batchingMaxPublishDelay)。
創(chuàng)建 Consumer
Pulsar 的消費模型如下圖:
從圖中可以看到,Consumer 要綁定一個 subscription 才能進行消費。
- consumer = client.newConsumer()
- .topic(topic)
- .subscriptionName(subscription)
- .subscriptionType(SubscriptionType.Shared)
- .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .negativeAckRedeliveryDelay(60, TimeUnit.SECONDS)
- .receiverQueueSize(1000)
- .subscribe();
下面解釋一下創(chuàng)建 Consumer 的參數(shù):
topic:Consumer 要訂閱的 topic。
subscriptionName:consumer 要關(guān)聯(lián)的 subscription 名字。
subscriptionType:訂閱類型,Pulsar 支持四種類型訂閱:
Exclusive:獨占模式,同一個 Topic 只能有一個消費者,如果多個消費者,就會出錯。Failover:災(zāi)備模式,同一個 Topic 可以有多個消費者,但是只能有一個消費者消費,其他消費者作為故障轉(zhuǎn)移備用,如果當(dāng)前消費者出了故障,就從備用消費者中選擇一個進行消費。如下圖:
Shared:共享模式,同一個 Topic 可以由多個消費者訂閱和消費。消息通過 round robin 輪詢機制分發(fā)給不同的消費者,并且每個消息僅會被分發(fā)給一個消費者。當(dāng)消費者斷開,如果發(fā)送給它消息沒有被消費,這些消息會被重新分發(fā)給其它存活的消費者。如下圖:
Key_Shared:消息和消費者都會綁定一個key,消息只會發(fā)送給綁定同一個key的消費者。如果有新消費者建立連接或者有消費者斷開連接,就需要更新一些消息的 key。跟 Shared 模式相比,Key_Shared 的好處是既可以讓消費者并發(fā)地消費消息,又能保證同一Key下的消息順序。如下圖:
subscriptionInitialPosition:創(chuàng)建新的 subscription 時從哪里開始消費,有兩個選項:
Latest:從最新的消息開始消費Earliest:從最早的消息開始消費
negativeAckRedeliveryDelay:消費失敗后間隔多久 broker 重新發(fā)送。
receiverQueueSize:在調(diào)用 receive 方法之前,最多能累積多少條消息??梢栽O(shè)置為 0,這樣每次只從 broker 拉取一條消息。在 Shared 模式下,receiverQueueSize 設(shè)置為 0,可以防止批量消息多發(fā)給一個 Consumer 而導(dǎo)致其他 Consumer 空閑。
Consumer 接收消息有四種方式:同步單條、同步批量、異步單條和異步批量,代碼如下:
- Message message = consumer.receive()
- CompletableFuture<Message> message = consumer.receiveAsync();
- Messages message = consumer.batchReceive();
- CompletableFuture<Messages> message = consumer.batchReceiveAsync();
對于批量接收,也可以設(shè)置批量接收的策略,代碼如下:
- consumer = client.newConsumer()
- .topic(topic)
- .subscriptionName(subscription)
- .batchReceivePolicy(BatchReceivePolicy.builder()
- .maxNumMessages(100)
- .maxNumBytes(1024 * 1024)
- .timeout(200, TimeUnit.MILLISECONDS)
- .build())
- .subscribe();
代碼中的參數(shù)說明如下:
maxNumMessages:批量接收的最大消息數(shù)量。maxNumBytes:批量接收消息的大小,這里是 1MB。
測試
首先編寫 Producer 發(fā)送消息的代碼,如下:
- public void sendMsg(String key, String data) {
- CompletableFuture<MessageId> future = producer.newMessage()
- .key(key)
- .value(data.getBytes()).sendAsync();
- future.handle((v, ex) -> {
- if (ex == null) {
- logger.info("發(fā)送消息成功, key:{}, msg: {}", key, data);
- } else {
- logger.error("發(fā)送消息失敗, key:{}, msg: {}", key, data);
- }
- return null;
- });
- future.join();
- logger.info("發(fā)送消息完成, key:{}, msg: {}", key, data);
- }
然后編寫一個 Consumer 消費消息的代碼,如下:
- public void start() throws Exception{
- while (true) {
- Message message = consumer.receive();
- String key = message.getKey();
- String data = new String(message.getData());
- String topic = message.getTopicName();
- if (StringUtils.isNotEmpty(data)) {
- try{
- logger.info("收到消息, topic:{}, key:{}, data:{}", topic, key, data);
- }catch(Exception e){
- logger.error("接收消息異常,topic:{}, key:{}, data:{}", topic, key, data, e);
- }
- }
- consumer.acknowledge(message);
- }
- }
最后編寫一個 Controller 類,調(diào)用 Producer 發(fā)送消息,代碼如下:
- @RequestMapping("/send")
- @ResponseBody
- public String send(@RequestParam String key, @RequestParam String data) {
- logger.info("收到消息發(fā)送請求, key:{}, value:{}", key, data);
- pulsarProducer.sendMsg(key, data);
- return "success";
- }
調(diào)用 Producer 發(fā)送一條消息,key=key1,data=data1,具體操作為在瀏覽器中輸入下面的 url 后回車:
- http://192.168.157.1:8083/pulsar/send?key=key1&data=data1
可以看到控制臺輸出下面日志:
- 2022-01-08 22:42:33,199 [pulsar-client-io-6-1] [INFO] boot.pulsar.PulsarProducer - 發(fā)送消息成功, key:key1, msg: data1
- 2022-01-08 22:42:33,200 [http-nio-8083-exec-1] [INFO] boot.pulsar.PulsarProducer - 發(fā)送消息完成, key:key1, msg: data1
- 2022-01-08 22:42:33,232 [Thread-22] [INFO] boot.pulsar.PulsarConsumer - 收到消息, topic:persistent://public/default/testTopic, key:key1, data:data1
- 2022-01-08 22:43:14,498 [pulsar-timer-5-1] [INFO] org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [testTopic] [topicGroup] [7def6] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.02 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
- 2022-01-08 22:43:14,961 [pulsar-timer-9-1] [INFO] org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [testTopic] [standalone-9-0] Pending messages: 0 --- Publish throughput: 0.02 msg/s --- 0.00 Mbit/s --- Latency: med: 69.000 ms - 95pct: 69.000 ms - 99pct: 69.000 ms - 99.9pct: 69.000 ms - max: 69.000 ms --- Ack received rate: 0.02 ack/s --- Failed messages: 0
從日志中看到,這里使用的 namespace 就是創(chuàng)建集群時生成的public/default。
總結(jié)
從 SpringBoot 整合 Java 客戶端使用來看,Pulsar 的 api 是非常友好的,使用起來方便簡潔。Consumer 的使用需要考慮多一些,需要考慮到批量、異步以及訂閱類型。