自研 Pulsar Starter:Winfun-Pulsar-Spring-Boot-Starter
里程碑
版本 | 功能點(diǎn) | 作者 | 完成 |
---|---|---|---|
1.0.0 | 支持PulsarTemplate發(fā)送消息&支持自定義注解實(shí)例化Consumer監(jiān)聽消息 | howinfun | ✅ |
1.1.0 | 支持動(dòng)態(tài)開啟/關(guān)閉Consumer消費(fèi)線程池、支持自定義配置Consuemr消費(fèi)線程池參數(shù) | howinfun | ✅ |
1.2.0 | 支持Spring容器停止時(shí),釋放Pulsar所有相關(guān)資源 | howinfun | TODO |
1.3.0 | 支持多Pulsar數(shù)據(jù)源 | howinfun | TODO |
一、背景
Pulsar 作為新生代云原生消息隊(duì)列,越來越受到開發(fā)者的熱愛;而我們現(xiàn)在基本上的項(xiàng)目都是基于 SpringBoot 上開發(fā)的,但是我們可以發(fā)現(xiàn),至今都沒有比較大眾和成熟的關(guān)于 Pulsar 的 Starter,所以我們需要自己整一個(gè),從而避免常規(guī)使用 Pulsar API 時(shí)產(chǎn)生大量的重復(fù)代碼。
二、設(shè)計(jì)思路
由于是第一版的設(shè)計(jì),所以我們是從簡單開始,不會一開始就設(shè)計(jì)得很復(fù)雜,盡量保留 Pulsar API 原生的功能。
2.1、PulsarClient
我們都知道,不管是 Producer 還是 Consumer,都是由 PulsarClient 創(chuàng)建的。
當(dāng)然了,PulsarClient 可以根據(jù)業(yè)務(wù)需要自定義很多參數(shù),但是第一版的設(shè)計(jì)只會支持比較常用的參數(shù)。
我們這個(gè)組件支持下面功能點(diǎn):
- 支持 PulsarClient 參數(shù)配置外部化,參數(shù)可配置在 applicatin.properties 中。
- 支持 applicatin.properties 提供配置提示信息。
- 讀取外部配置文件,根據(jù)參數(shù)實(shí)例化 PulsarClient,并注入到 IOC 容器中。
2.2、Producer
Producer是發(fā)送消息的組件。
- 這里我們提供一個(gè)模版類,可以根據(jù)需求創(chuàng)建對應(yīng)的 Producer 實(shí)例。
- 支持將 Topic<->Producer 關(guān)系緩存起來,避免重復(fù)創(chuàng)建 Producer 實(shí)例。
- 支持同步/異步發(fā)送消息。
2.3、Consumer
Consumer是消費(fèi)消息的組件。
- 這里我們提供一個(gè)抽象類,開發(fā)者只需要集成此實(shí)現(xiàn)類并實(shí)現(xiàn) doReceive 方法即可,即消費(fèi)消息的邏輯方法。
- 接著還提供一個(gè)自定義注解,自定義注解支持自定義 Consmuer 配置,例如Topic、Tenant、Namespace等。
- 實(shí)現(xiàn)類加入上述自定義注解后,組件將會自動(dòng)識別并且生成對應(yīng)的 Consumer 實(shí)例。
- 支持同步/線程池異步消費(fèi)。
三、使用例子
3.1、引入依賴
- <dependency>
- <groupId>io.github.howinfun</groupId>
- <artifactId>winfun-pulsar-spring-boot-starter</artifactId>
- <version>1.1.0</version>
- </dependency>
3.2、加入配置
- pulsar.service-url=pulsar://127.0.0.1:6650
- pulsar.tenant=winfun
- pulsar.namespace=study
- pulsar.operation-timeout=30
- pulsar.io-threads=10
- pulsar.listener-threads=10
3.3、發(fā)送消息
- /**
- * 發(fā)送消息
- * @author: winfun
- **/
- @RestController
- @RequestMapping("msg")
- public class MessageController {
- @Autowired
- private PulsarTemplate pulsarTemplate;
- @Autowired
- private PulsarProperties pulsarProperties;
- /***
- * 往指定topic發(fā)送消息
- * @author winfun
- * @param topic topic
- * @param msg msg
- * @return {@link String }
- **/
- @GetMapping("/{topic}/{msg}")
- public String send(@PathVariable("topic") String topic,@PathVariable("msg") String msg) throws Exception {
- this.pulsarTemplate.createBuilder().persistent(Boolean.TRUE)
- .tenant(this.pulsarProperties.getTenant())
- .namespace(this.pulsarProperties.getNamespace())
- .topic(topic)
- .send(msg);
- return "success";
- }
- }
3.4、消費(fèi)消息
- /**
- * @author: winfun
- * @date: 2021/8/20 8:13 下午
- **/
- @Slf4j
- @PulsarListener(topics = {"test-topic2"},
- threadPool = @ThreadPool(
- coreThreads = 2,
- maxCoreThreads = 3,
- threadPoolName = "test-thread-pool"))
- public class ConsumerListener extends BaseMessageListener {
- /**
- * 消費(fèi)消息
- * @param consumer 消費(fèi)者
- * @param msg 消息
- */
- @Override
- protected void doReceived(Consumer<String> consumer, Message<String> msg) {
- log.info("成功消費(fèi)消息:{}",msg.getValue());
- try {
- consumer.acknowledge(msg);
- } catch (PulsarClientException e) {
- e.printStackTrace();
- }
- }
- /***
- * 是否開啟異步消費(fèi)
- * @return {@link Boolean }
- **/
- @Override
- public Boolean enableAsync() {
- return Boolean.TRUE;
- }
- }
四、源碼
源碼就不放在這里分析了,大家可到Github上看看,如果有什么代碼上面的建議或意見,歡迎大家提MR。