如何實現(xiàn)一個簡單易用的 RocketMQ SDK
2018 年,做為架構(gòu)負責(zé)人,接到一個架構(gòu)需求:實現(xiàn)一個簡單易用的 RocketMQ SDK 。
因為各個團隊 RocketMQ 原生客戶端配置起來千奇百怪,有的配置存在風(fēng)險,各團隊負責(zé)人都需要一個簡潔易用的 RocketMQ SDK 。
我立馬調(diào)研相關(guān)開源的方案,當時 RocketMQ-Spring 項目并沒有開源,而阿里云的 ONS SDK 是開源的,我只能講目標轉(zhuǎn)向 阿里云 ONS 。
通過學(xué)習(xí) ONS 的設(shè)計方式,我對于 RocketMQ 的客戶端原理有了進一步了解,也實現(xiàn)了公司內(nèi)部使用的 RocketMQ SDK 。
圖片
項目地址:https://github.com/makemyownlife/platform-rocketmq
之所以說簡單,就是讓用戶(開發(fā)者)使用 SDK 時,減少心智負擔。
舉三個例子:
1 發(fā)送順序消息
使用原生代碼發(fā)送消息時,會使用如下的代碼:
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
我們可以將 SDK API 簡化為:
SendResult send(final ProducerMessage message, final String shardingKey);
開發(fā)者不需要定義隊列選擇器,只需要傳遞分片鍵 orderId 即可。
2 單條消息消費
使用原來代碼定義消費監(jiān)聽器時,使用如下的代碼:
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 返回消息消費狀態(tài),ConsumeConcurrentlyStatus.CONSUME_SUCCESS為消費成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
監(jiān)聽器內(nèi)部,對于開發(fā)者操作的對象是消息列表 msgs ,很多開發(fā)同學(xué)想只操作一條消息。
于是,我們可以將 SDK API 簡化為:
consumer.subscribe("mytest", new ConsumerListener() {
@Override
public ConsumerAction consumer(ConsumerMessage msg) {
byte[] body = msg.getBody();
System.out.println("msg:" + new String(body));
return ConsumerAction.CommitMessage;
}
});
開發(fā)者在消費時,可以一條一條操作,代碼簡潔了不少。
同時,很多開發(fā)者在使用普通消費、順序消費時,需要返回延時消費的狀態(tài)碼時,兩種消費模式定義的枚舉也不相同。我們將枚舉做了統(tǒng)一:
/**
* 消費消息的返回結(jié)果
*/
public enum ConsumerAction {
/**
* 消費成功,繼續(xù)消費下一條消息
*/
CommitMessage,
/**
* 消費失敗,告知服務(wù)器稍后再投遞這條消息,繼續(xù)消費其他消息
*/
ReconsumeLater;
}
3 訂閱關(guān)系一致
實際場景里,訂閱關(guān)系不一致是極容易發(fā)生的事情,就算是高級別的架構(gòu)師也會翻車,每次翻車現(xiàn)場都是慘不忍睹。
正確的訂閱關(guān)系見下圖:
圖片
正確的訂閱關(guān)系
代碼邏輯角度來看,每個消費者實例內(nèi)訂閱方法的主題、 TAG、監(jiān)聽邏輯都需要保持一致。
圖片
當訂閱關(guān)系不一致時,在 Broker 端同一個消費組內(nèi)的各個消費者客戶端的訂閱信息相互被覆蓋,從而導(dǎo)致某個消費者客戶端無法拉取到新的消息。
怎么解決呢 ?
我當時想起了阿里技術(shù)專家沈詢的一句話:
世界上解決一個計算機問題最簡單的方法:“恰好”不需要解決它 !
公司內(nèi)部出現(xiàn)訂閱關(guān)系一致99%的問題是:消費者組一致的前提下,主題相同,但 TAG 不相同。
基于此,我的設(shè)計思路就明確了:不開放訂閱 TAG 的權(quán)限!
沒想到吧,我就是這么粗暴。
按照這種設(shè)計思路,雖然開始有的程序員會有質(zhì)疑,但你和他梳理好消費者組的定義,以及做好領(lǐng)域劃分,對業(yè)務(wù)來講,反而清晰了。