SpringBoot整合RocketMQ事務/廣播/順序消息
環(huán)境:springboot2.3.9RELEASE + RocketMQ4.8.0
依賴
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>2.2.0</version>
- </dependency>
配置文件
- server:
- port: 8080
- ---
- rocketmq:
- nameServer: localhost:9876
- producer:
- group: demo-mq
普通消息
發(fā)送
- @Resource
- private RocketMQTemplate rocketMQTemplate ;
- public void send(String message) {
- rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build());
- }
接受
- @RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2")
- @Component
- public class ConsumerListener implements RocketMQListener<String> {
- @Override
- public void onMessage(String message) {
- System.out.println("接收到消息:" + message) ;
- }
- }
順序消息
發(fā)送
- @Resource
- private RocketMQTemplate rocketMQTemplate ;
- public void sendOrder(String topic, String message, String tags, int id) {
- rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(),
- "order-" + id, new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ;
- }
- @Override
- public void onException(Throwable e) {
- e.printStackTrace() ;
- }
- });
- }
這里是根據(jù)hashkey將消息發(fā)送到不同的隊列中
- @RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group",
- selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY)
- @Component
- public class ConsumerOrderListener implements RocketMQListener<String> {
- @Override
- public void onMessage(String message) {
- System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ;
- }
- }
consumeMode = ConsumeMode.ORDERLY,指明了消息模式為順序模式,一個隊列,一個線程。
結(jié)果
當consumeMode = ConsumeMode.CONCURRENTLY執(zhí)行結(jié)果如下:
集群/廣播消息模式
發(fā)送端
- @Resource
- private RocketMQTemplate rocketMQTemplate ;
- public void send(String topic, String message, String tags) {
- rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ;
- }
集群消息模式
消費端
- @RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",
- selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING)
- @Component
- public class ConsumerBroadListener implements RocketMQListener<String> {
- @Override
- public void onMessage(String message) {
- System.out.println("ConsumerBroadListener1接收到消息:" + message) ;
- }
- }
messageModel = MessageModel.CLUSTERING
測試
啟動兩個服務分別端口是8080,8081
8080服務
8081服務
集群消息模式下,每個服務分別接收一部分消息,實現(xiàn)了負載均衡
廣播消息模式
消費端
- @RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",
- selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING)
- @Component
- public class ConsumerBroadListener implements RocketMQListener<String> {
- @Override
- public void onMessage(String message) {
- System.out.println("ConsumerBroadListener1接收到消息:" + message) ;
- }
- }
messageModel = MessageModel.BROADCASTING
測試
啟動兩個服務分別端口是8080,8081
8080服務
8081服務
集群消息模式下,每個服務分別都接受了同樣的消息。
事務消息
RocketMQ事務的3個狀態(tài)
TransactionStatus.CommitTransaction:提交事務消息,消費者可以消費此消息
TransactionStatus.RollbackTransaction:回滾事務,它代表該消息將被刪除,不允許被消費。
TransactionStatus.Unknown :中間狀態(tài),它代表需要檢查消息隊列來確定狀態(tài)。
RocketMQ實現(xiàn)事務消息主要分為兩個階段:正常事務的發(fā)送及提交、事務信息的補償流程 整體流程為:
正常事務發(fā)送與提交階段
1、生產(chǎn)者發(fā)送一個半消息給MQServer(半消息是指消費者暫時不能消費的消息)
2、服務端響應消息寫入結(jié)果,半消息發(fā)送成功
3、開始執(zhí)行本地事務
4、根據(jù)本地事務的執(zhí)行狀態(tài)執(zhí)行Commit或者Rollback操作
事務信息的補償流程
1、如果MQServer長時間沒收到本地事務的執(zhí)行狀態(tài)會向生產(chǎn)者發(fā)起一個確認回查的操作請求
2、生產(chǎn)者收到確認回查請求后,檢查本地事務的執(zhí)行狀態(tài)
3、根據(jù)檢查后的結(jié)果執(zhí)行Commit或者Rollback操作
補償階段主要是用于解決生產(chǎn)者在發(fā)送Commit或者Rollback操作時發(fā)生超時或失敗的情況。
發(fā)送端
- @Resource
- private RocketMQTemplate rocketMQTemplate ;
- public void sendTx(String topic, Long id, String tags) {
- rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload(
- new Users(id, UUID.randomUUID().toString().replaceAll("-", ""))).
- setHeader("BID", UUID.randomUUID().toString().replaceAll("-", "")).build(),
- UUID.randomUUID().toString().replaceAll("-", "")) ;
- }
生產(chǎn)者對應的監(jiān)聽器
- @RocketMQTransactionListener
- public class ProducerTxListener implements RocketMQLocalTransactionListener {
- @Resource
- private BusinessService bs ;
- @Override
- public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
- // 這里執(zhí)行本地的事務操作,比如保存數(shù)據(jù)。
- try {
- // 創(chuàng)建一個日志記錄表,將這唯一的ID存入數(shù)據(jù)庫中,在下面的check方法中可以根據(jù)這個id查詢是否有數(shù)據(jù)
- String id = (String) msg.getHeaders().get("BID") ;
- Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ;
- System.out.println("消息內(nèi)容:" + users + "\t參與數(shù)據(jù):" + arg + "\t本次事務的唯一編號:" + id) ;
- bs.save(users, new UsersLog(users.getId(), id)) ;
- } catch (Exception e) {
- e.printStackTrace() ;
- return RocketMQLocalTransactionState.ROLLBACK ;
- }
- return RocketMQLocalTransactionState.COMMIT ;
- }
- @Override
- public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
- // 這里檢查本地事務是否執(zhí)行成功
- String id = (String) msg.getHeaders().get("BID") ;
- System.out.println("執(zhí)行查詢ID為:" + id + " 的數(shù)據(jù)是否存在") ;
- UsersLog usersLog = bs.queryUsersLog(id) ;
- if (usersLog == null) {
- return RocketMQLocalTransactionState.ROLLBACK ;
- }
- return RocketMQLocalTransactionState.COMMIT ;
- }
- }
消費端
- @RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10")
- @Component
- public class ConsumerTxListener implements RocketMQListener<Users> {
- @Override
- public void onMessage(Users users) {
- System.out.println("TX接收到消息:" + users) ;
- }
- }
Service
- @Transactional
- public boolean save(Users users, UsersLog usersLog) {
- usersRepository.save(users) ;
- usersLogRepository.save(usersLog) ;
- if (users.getId() == 1) {
- throw new RuntimeException("數(shù)據(jù)錯誤") ;
- }
- return true ;
- }
- public UsersLog queryUsersLog(String bid) {
- return usersLogRepository.findByBid(bid) ;
- }
Controller
- @GetMapping("/tx/{id}")
- public Object sendTx(@PathVariable("id")Long id) {
- ps.sendTx("tx-topic", id, "tag10") ;
- return "send transaction success" ;
- }
測試
調(diào)用接口后,控制臺輸出:
從打印日志看出來都保存完畢了后 消費端才接受到消息。
刪除數(shù)據(jù),再測試ID為1會報錯的。
數(shù)據(jù)庫中沒有數(shù)據(jù)。。。
是不是也不是很復雜,2個階段來處理。
完畢!!!