SpringBoot整合RocketMQ實現(xiàn)事務(wù)/廣播/順序消息詳解
環(huán)境:springboot2.4.12 + RocketMQ4.8.0
依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
配置文件
server:
port: 8080
---
rocketmq:
nameServer: localhost:9876
producer:
group: demo-mq
普通消息
發(fā)送
@Resource
private RocketMQTemplate rocketMQTemplate ;
public void send(String message) {
rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build());
}
接收
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2")
@Component
public class ConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("接收到消息:" + message) ;
}
}
順序消息
發(fā)送
@Resource
private RocketMQTemplate rocketMQTemplate ;
public void sendOrder(String topic, String message, String tags, int id) {
rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(),
"order-" + id, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ;
}
@Override
public void onException(Throwable e) {
e.printStackTrace() ;
}
});
}
這里是根據(jù)hashkey將消息發(fā)送到不同的隊列中
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group",
selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY)
@Component
public class ConsumerOrderListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ;
}
}
consumeMode = ConsumeMode.ORDERLY,指明了消息模式為順序模式,一個隊列,一個線程。
結(jié)果
圖片
當(dāng)consumeMode = ConsumeMode.CONCURRENTLY執(zhí)行結(jié)果如下:
圖片
集群/廣播消息模式
發(fā)送端
@Resource
private RocketMQTemplate rocketMQTemplate ;
public void send(String topic, String message, String tags) {
rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ;
}
集群消息模式
消費端
@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",
selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING)
@Component
public class ConsumerBroadListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("ConsumerBroadListener1接收到消息:" + message) ;
}
}
messageModel = MessageModel.CLUSTERING
測試
啟動兩個服務(wù)分別端口是8080,8081
8080服務(wù)
圖片
8081服務(wù)
圖片
集群消息模式下,每個服務(wù)分別接收一部分消息,實現(xiàn)了負載均衡
廣播消息模式
消費端
@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",
selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING)
@Component
public class ConsumerBroadListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("ConsumerBroadListener1接收到消息:" + message) ;
}
}
messageModel = MessageModel.BROADCASTING
測試
啟動兩個服務(wù)分別端口是8080,8081
8080服務(wù)
圖片
8081服務(wù)
圖片
集群消息模式下,每個服務(wù)分別都接受了同樣的消息。
事務(wù)消息
RocketMQ事務(wù)的3個狀態(tài)
TransactionStatus.CommitTransaction:提交事務(wù)消息,消費者可以消費此消息TransactionStatus.RollbackTransaction:回滾事務(wù),它代表該消息將被刪除,不允許被消費。TransactionStatus.Unknown :中間狀態(tài),它代表需要檢查消息隊列來確定狀態(tài)。
RocketMQ實現(xiàn)事務(wù)消息主要分為兩個階段:正常事務(wù)的發(fā)送及提交、事務(wù)信息的補償流程 整體流程為:
正常事務(wù)發(fā)送與提交階段
1、生產(chǎn)者發(fā)送一個半消息給MQServer(半消息是指消費者暫時不能消費的消息)2、服務(wù)端響應(yīng)消息寫入結(jié)果,半消息發(fā)送成功3、開始執(zhí)行本地事務(wù)4、根據(jù)本地事務(wù)的執(zhí)行狀態(tài)執(zhí)行Commit或者Rollback操作
事務(wù)信息的補償流程1、如果MQServer長時間沒收到本地事務(wù)的執(zhí)行狀態(tài)會向生產(chǎn)者發(fā)起一個確認回查的操作請求2、生產(chǎn)者收到確認回查請求后,檢查本地事務(wù)的執(zhí)行狀態(tài)3、根據(jù)檢查后的結(jié)果執(zhí)行Commit或者Rollback操作補償階段主要是用于解決生產(chǎn)者在發(fā)送Commit或者Rollback操作時發(fā)生超時或失敗的情況。
發(fā)送端
@Resource
private RocketMQTemplate rocketMQTemplate ;
public void sendTx(String topic, Long id, String tags) {
rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload(
new Users(id, UUID.randomUUID().toString().replaceAll("-", ""))).
setHeader("BID", UUID.randomUUID().toString().replaceAll("-", "")).build(),
UUID.randomUUID().toString().replaceAll("-", "")) ;
}
生產(chǎn)者對應(yīng)的監(jiān)聽器
@RocketMQTransactionListener
public class ProducerTxListener implements RocketMQLocalTransactionListener {
@Resource
private BusinessService bs ;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 這里執(zhí)行本地的事務(wù)操作,比如保存數(shù)據(jù)。
try {
// 創(chuàng)建一個日志記錄表,將這唯一的ID存入數(shù)據(jù)庫中,在下面的check方法中可以根據(jù)這個id查詢是否有數(shù)據(jù)
String id = (String) msg.getHeaders().get("BID") ;
Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ;
System.out.println("消息內(nèi)容:" + users + "\t參與數(shù)據(jù):" + arg + "\t本次事務(wù)的唯一編號:" + id) ;
bs.save(users, new UsersLog(users.getId(), id)) ;
} catch (Exception e) {
e.printStackTrace() ;
return RocketMQLocalTransactionState.ROLLBACK ;
}
return RocketMQLocalTransactionState.COMMIT ;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 這里檢查本地事務(wù)是否執(zhí)行成功
String id = (String) msg.getHeaders().get("BID") ;
System.out.println("執(zhí)行查詢ID為:" + id + " 的數(shù)據(jù)是否存在") ;
UsersLog usersLog = bs.queryUsersLog(id) ;
if (usersLog == null) {
return RocketMQLocalTransactionState.ROLLBACK ;
}
return RocketMQLocalTransactionState.COMMIT ;
}
}
消費端
@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10")
@Component
public class ConsumerTxListener implements RocketMQListener<Users> {
@Override
public void onMessage(Users users) {
System.out.println("TX接收到消息:" + users) ;
}
}
Service
@Transactional
public boolean save(Users users, UsersLog usersLog) {
usersRepository.save(users) ;
usersLogRepository.save(usersLog) ;
if (users.getId() == 1) {
throw new RuntimeException("數(shù)據(jù)錯誤") ;
}
return true ;
}
public UsersLog queryUsersLog(String bid) {
return usersLogRepository.findByBid(bid) ;
}
Controller
@GetMapping("/tx/{id}")
public Object sendTx(@PathVariable("id")Long id) {
ps.sendTx("tx-topic", id, "tag10") ;
return "send transaction success" ;
}
測試
調(diào)用接口后,控制臺輸出:
圖片
從打印日志看出來都保存完畢了后 消費端才接受到消息。
圖片
圖片
刪除數(shù)據(jù),再測試ID為1會報錯的。
圖片
數(shù)據(jù)庫中沒有數(shù)據(jù)。。。
是不是也不是很復(fù)雜,2個階段來處理。
完畢?。?!