深度解析:基于 RocketMQ 實(shí)現(xiàn)分布式事務(wù)的技術(shù)實(shí)踐與原理探究
在上一篇文章Spring Boot自動(dòng)裝配原理以及實(shí)踐我們完成了服務(wù)通用日志監(jiān)控組件的開(kāi)發(fā),確保每個(gè)服務(wù)都可以基于一個(gè)注解實(shí)現(xiàn)業(yè)務(wù)功能的監(jiān)控。 而本文我們嘗試基于RocketMQ實(shí)現(xiàn)下單的分布式的事務(wù)??赡軙?huì)有讀者會(huì)有疑問(wèn),之前我們不是基于Seata完成了分布式事務(wù),為什么我們還要用到RocketMQ呢?
我們的再來(lái)回顧一下我們下單功能大抵是做以下三件事情:
- 創(chuàng)建訂單,將訂單記錄存到數(shù)據(jù)庫(kù)中。
- 扣款,記錄用戶(hù)扣款后錢(qián)包所剩下的額度。
- 扣除商品庫(kù)存,并發(fā)放商品。
我們將該場(chǎng)景放到高并發(fā)場(chǎng)景下,這個(gè)功能勢(shì)必要考慮性能和可靠性問(wèn)題,所以我們?cè)跇I(yè)務(wù)需求清楚明了的情況下,就希望能有一種方式確保下單功能在高并發(fā)場(chǎng)景保證性能、可靠性。 而Seata的AT模式確實(shí)可以保證最終一致性,但是seata的AT模式本質(zhì)上是依賴(lài)于global_table、branch_table等數(shù)據(jù)表維護(hù)應(yīng)用層分布式事務(wù),在操作期間會(huì)涉及大量的更新和刪除操作,隨著時(shí)間的推移還是會(huì)出現(xiàn)大量的索引碎片,導(dǎo)致索引性能下降。
所以我們就考慮采用RocketMQ實(shí)現(xiàn)分布式事務(wù),盡管RocketMQ對(duì)于分布式事務(wù)的實(shí)現(xiàn)業(yè)務(wù)侵入性相對(duì)強(qiáng)一些,但它可以保證業(yè)務(wù)層面的功能解耦從而提升并發(fā)性能,且RocketMQ還對(duì)消息消費(fèi)可靠性做了許多不錯(cuò)的優(yōu)化,例如:失敗重試、死信隊(duì)列等,所以我們還是嘗試使用RocketMQ來(lái)改良我們的下單分布式事務(wù)問(wèn)題。
一、詳解RocketMQ落地分布式事務(wù)案例
1. 需求說(shuō)明
用戶(hù)下單大抵需要在三個(gè)服務(wù)中完成:
- 訂單服務(wù)完成訂單創(chuàng)建,基于用戶(hù)傳入的產(chǎn)品編碼、用戶(hù)編碼、產(chǎn)品購(gòu)買(mǎi)數(shù)生成訂單信息,對(duì)應(yīng)的調(diào)用參數(shù)如下:
{
"accountCode": "0932897",
"productCode": "P003",
"count": 1
}
- 基于入?yún)⒌挠脩?hù)代碼定位到用戶(hù)錢(qián)包金額,完成賬戶(hù)扣款。
- 基于產(chǎn)品和購(gòu)買(mǎi)數(shù)完成庫(kù)存扣減。
這其中會(huì)跨域三個(gè)服務(wù),分別是訂單服務(wù)創(chuàng)建訂單、賬戶(hù)服務(wù)扣款、商品服務(wù)扣減庫(kù)存。
2. 落地思路
以我們業(yè)務(wù)為最終目標(biāo),RocketMQ實(shí)現(xiàn)分布式事務(wù)的原理是基于2PC的,流程大抵如下:
- 訂單服務(wù)發(fā)送一個(gè)事務(wù)消息到消息隊(duì)列,消息內(nèi)容就是我們的訂單信息,這里面包含用戶(hù)賬號(hào)、購(gòu)買(mǎi)的產(chǎn)品代碼、購(gòu)買(mǎi)產(chǎn)品數(shù)量等數(shù)據(jù)。
- MQ收到half消息,并回復(fù)ack確認(rèn)。
- 生產(chǎn)者(訂單服務(wù)order-service)得知我們發(fā)送的消息已被收到,訂單服務(wù)則執(zhí)行本地事務(wù)并提交事務(wù),即將訂單信息寫(xiě)入數(shù)據(jù)庫(kù)中,同時(shí)在該事務(wù)內(nèi)將訂單插入結(jié)果寫(xiě)入transaction_log表中。
- 生產(chǎn)者(訂單服務(wù)order-service)完成本地事務(wù)的提交,告知MQ將事務(wù)消息commit,此時(shí)消費(fèi)者就可以消費(fèi)這條消息了,注意若生產(chǎn)者消費(fèi)失敗,則將消息rollback,一切就當(dāng)沒(méi)有發(fā)生過(guò)。
- 如果上述的消息是commit則將消息持久化到commitLog中,以便后續(xù)MQ宕機(jī)或者服務(wù)宕機(jī)后依然可以繼續(xù)消費(fèi)這條沒(méi)有被消費(fèi)的消息。
- (非必要步驟)若MQ長(zhǎng)時(shí)間沒(méi)有收到生產(chǎn)者的commit或者rollback的信號(hào),則攜帶事務(wù)id找生產(chǎn)者查詢(xún)transaction_log索要當(dāng)前消息狀態(tài),如果看到對(duì)應(yīng)的消息則判定生產(chǎn)者事務(wù)成功將消息commit給消費(fèi)者消費(fèi),若沒(méi)看到則說(shuō)明生產(chǎn)者本地事務(wù)執(zhí)行失敗,回滾該消息。
- 消費(fèi)者即我們的用戶(hù)服務(wù)或者庫(kù)存服務(wù)收到消息則執(zhí)行本地事務(wù)并提交,若失敗則會(huì)不斷重試,直到達(dá)到上限則將消息存到死信隊(duì)列并告警。
- 人工介入查看死信隊(duì)列查看失敗消息手工補(bǔ)償數(shù)據(jù)。
二、實(shí)踐-基于RocketMQ實(shí)現(xiàn)分布式事務(wù)
1. 部署RocketMQ(Linux環(huán)境)
在編寫(xiě)業(yè)務(wù)代碼之前,我們必須完成一下RocketMQ的部署,首先我們自然要下載一下RocketMQ,下載地址如下,筆者下載的是rocketmq-all-4.8.0-bin-release這個(gè)版本:https://rocketmq.apache.org/download/。
完成完成后,我們將其解壓到自定義的路徑,鍵入sudo vim /etc/profile配置MQ環(huán)境變量,完成后鍵入source /etc/profile使之生效,對(duì)應(yīng)的配置內(nèi)容如下所示:
export ROCKETMQ_HOME=/home/sharkchili/rocketmq-all-4.8.0-bin-release
export PATH=$PATH:$ROCKETMQ_HOME/bin
需要注意的是筆者本次采用WSL的Ubuntu子系統(tǒng)時(shí)啟動(dòng)時(shí)腳本會(huì)拋出runserver.sh: 70: [[: Exec format error錯(cuò)誤,嘗試格式化和指令配置后都沒(méi)有很好的解決,于是循著報(bào)錯(cuò)找到runserver.sh這行對(duì)應(yīng)的腳本內(nèi)容,該括弧本質(zhì)上就是基于JDK內(nèi)容配置對(duì)應(yīng)的GC算法:
以筆者為里系統(tǒng)是jdk8,所以直接去掉判斷用走JDK8的配置即可:
choose_gc_options()
{
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFractinotallow=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
}
完成后鍵入./mqnamesrv &將MQ啟動(dòng),如果彈窗輸出下面這條結(jié)果,則說(shuō)明mq的NameServer啟動(dòng)成功。
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
然后我們?cè)冁I入./mqbroker -n 127.0.0.1:9876啟動(dòng)broker,需要注意的是默認(rèn)情況下broker占用堆內(nèi)存差不多是4g,所以讀者本地部署時(shí)建議修改一下runbroker.sh的堆內(nèi)存,如下圖所示:
若彈窗輸出下面所示的文字,則說(shuō)明broker啟動(dòng)成功,自此mq就在windows環(huán)境部署成功了。我們就可以開(kāi)始編碼工作了。
The broker[DESKTOP-BI4ATFQ, 192.168.237.1:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
2. 服務(wù)引入MQ完成下單功能開(kāi)發(fā)
(1) 服務(wù)引入RocketMQ依賴(lài)
完成RocketMQ部署之后,我們就可以著手編碼工作了,首先我們要在在三個(gè)服務(wù)中引入RocketMQ的依賴(lài),由于筆者的spring-boot版本比較老,所以這里筆者為了統(tǒng)一管理在父pom中指定了mq較新的版本號(hào):
<!--rocketmq-->
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
然后我們分別對(duì)order、account、product三個(gè)服務(wù)中引入依賴(lài):
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
(2) 注冊(cè)中心配置RocketMQ信息
由于我們的分布式事務(wù)涉及3個(gè)服務(wù),而且mq的消費(fèi)模式采用的是發(fā)布訂閱模式,所以我們的生產(chǎn)者(order-service)和消費(fèi)者(account-serivce)都配置為cloud-group
rocketmq.name-server=172.29.193.12:9876
# 指定消費(fèi)者組
rocketmq.producer.group=cloud-group
之所以沒(méi)有沒(méi)將消費(fèi)者2(product-service)也配置到cloud-group中的原因也很簡(jiǎn)單,同一個(gè)消息只能被同一個(gè)消費(fèi)者組中的一個(gè)成員消費(fèi),假如我們的將product-service配置到同一個(gè)消費(fèi)者組中就會(huì)出現(xiàn)因一條消息只能被一個(gè)服務(wù)消費(fèi)而導(dǎo)致product-service收不到消息。
對(duì)此我們實(shí)現(xiàn)思路有兩種:
- 將服務(wù)都放到同一個(gè)消費(fèi)者組,消費(fèi)模式改為廣播模式。
- 將product-service設(shè)置到別的消費(fèi)者組中。
考慮后續(xù)擴(kuò)展筆者選擇方案2,將產(chǎn)品服務(wù)的訂閱者放到消費(fèi)者組2中:
rocketmq.name-server=172.29.193.12:9876
rocketmq.producer.group=cloud-group2
(3) 創(chuàng)建消息日志表
我們?cè)谏衔倪M(jìn)行需求梳理時(shí)有提到一個(gè)MQServer沒(méi)收到生產(chǎn)者本地事務(wù)執(zhí)行狀態(tài)進(jìn)行回查的操作,所以我們?cè)谏a(chǎn)者在執(zhí)行本地事務(wù)時(shí),需要?jiǎng)?chuàng)建一張表記錄生產(chǎn)者本地事務(wù)執(zhí)行狀態(tài),建表SQL如下:
DROP TABLE IF EXISTS `rocketmq_transaction_log`;
CREATE TABLE `rocketmq_transaction_log` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`transaction_id` varchar(50) DEFAULT NULL,
`log` varchar(500) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
(4) 完成order服務(wù)half消息發(fā)送、監(jiān)聽(tīng)、回查回調(diào)邏輯
我們的訂單服務(wù)需要做以下三件事:
- 發(fā)送half消息給MQ。
- half消息發(fā)送成功執(zhí)行本地事務(wù)并記錄日志。
- 告知MQ可以提交事務(wù)消息。
所以我們需要定義一下消息格式,對(duì)象類(lèi)中必須包含訂單號(hào)、產(chǎn)品編碼、用戶(hù)編碼、購(gòu)買(mǎi)產(chǎn)品數(shù)量等信息。
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
public class OrderDto {
private static final long serialVersionUID = 1L;
//設(shè)置主鍵自增,避免插入時(shí)沒(méi)必要的報(bào)錯(cuò)
@TableId(value = "ID", type = IdType.AUTO)
private Integer id;
/**
* 訂單號(hào)
*/
private String orderNo;
/**
* 用戶(hù)編碼
*/
private String accountCode;
/**
* 產(chǎn)品編碼
*/
private String productCode;
/**
* 產(chǎn)品扣減數(shù)量
*/
private Integer count;
/**
* 余額
*/
private BigDecimal amount;
/**
* 本次扣減金額
*/
private BigDecimal price;
}
然后我們就可以編寫(xiě)控制層的代碼了,通過(guò)獲取前端傳輸?shù)膮?shù)調(diào)用orderService完成half消息發(fā)送。
@PostMapping("/order/createOrderByMQ")
public ResultData<String> createOrderByMQ(@RequestBody OrderDto orderDTO) {
log.info("基于mq完成用戶(hù)下單流程,請(qǐng)求參數(shù): " + JSON.toJSONString(orderDTO));
orderService.createOrderByMQ(orderDTO);
return ResultData.success("基于mq完成用戶(hù)下單完成");
}
orderService的實(shí)現(xiàn)邏輯很簡(jiǎn)單,定義好消息設(shè)置消息頭內(nèi)容和消息載體的對(duì)象,通過(guò)sendMessageInTransaction方法完成半消息發(fā)送,需要了解一下消息的主題(topic)為ORDER_MSG_TOPIC,只有訂閱這個(gè)主題的消費(fèi)者才能消費(fèi)這條消息:
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void createOrderByMQ(OrderDto orderDto) {
//創(chuàng)建half消息對(duì)應(yīng)的事務(wù)日志的id
String transactionId = UUID.randomUUID().toString();
//調(diào)用產(chǎn)品服務(wù)獲取商品詳情
ResultData<ProductDTO> productInfo = productFeign.getByCode(orderDto.getProductCode());
//計(jì)算總售價(jià)
BigDecimal amount = productInfo.getData().getPrice().multiply(new BigDecimal(orderDto.getCount()));
orderDto.setAmount(amount);
//將訂單信息作為載體
Message<OrderDto> message = MessageBuilder.withPayload(orderDto)
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
//下單用戶(hù)編碼
.setHeader("accountCode", orderDto.getAccountCode())
//產(chǎn)品編碼
.setHeader("productCode", orderDto.getProductCode())
//產(chǎn)品購(gòu)買(mǎi)數(shù)
.setHeader("count", orderDto.getCount())
//下單金額
.setHeader("amount", amount)
.build();
//發(fā)送half消息
rocketMQTemplate.sendMessageInTransaction("ORDER_MSG_TOPIC", message, orderDto);
}
完成half消息發(fā)送之后,我們就必須知曉消息發(fā)送結(jié)果才能確定是否執(zhí)行本地事務(wù)并提交,所以我們的訂單服務(wù)必須創(chuàng)建一個(gè)監(jiān)聽(tīng)器了解half消息的發(fā)送情況,executeLocalTransaction方法就是mq成功收到半消息后的回調(diào)函數(shù),一旦我們得知消息成功發(fā)送之后,MQ就會(huì)執(zhí)行這個(gè)方法,筆者通過(guò)這個(gè)方法獲取消息頭的參數(shù)創(chuàng)建訂單對(duì)象,調(diào)用createOrderWithRocketMqLog完成訂單的創(chuàng)建的本地事務(wù)成功的日志記錄。
@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class OrderListener implements RocketMQLocalTransactionListener {
private final IOrderService orderService;
private final RocketmqTransactionLogMapper rocketMqTransactionLogMapper;
/**
* 監(jiān)聽(tīng)到發(fā)送half消息,執(zhí)行本地事務(wù)
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
log.info("order執(zhí)行本地事務(wù)");
try {
//解析消息頭
MessageHeaders headers = message.getHeaders();
//獲取購(gòu)買(mǎi)金額
BigDecimal amount = new BigDecimal(String.valueOf(headers.get("amount")));
//獲取訂單信息
Order order = Order.builder()
.accountCode((String) headers.get("accountCode"))
.amount(amount)
.productCode((String) headers.get("productCode"))
.count(Integer.valueOf(String.valueOf(headers.get("count"))))
.build();
//獲取事務(wù)id
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
//執(zhí)行本地事務(wù)和記錄事務(wù)日志
orderService.createOrderWithRocketMqLog(order, transactionId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("創(chuàng)建訂單失敗,失敗原因: {}", e.getMessage(), e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 本地事務(wù)的檢查,檢查本地事務(wù)是否成功
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
MessageHeaders headers = message.getHeaders();
//獲取事務(wù)ID
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
log.info("檢查本地事務(wù),事務(wù)ID:{}", transactionId);
//根據(jù)事務(wù)id從日志表檢索
QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("transaction_id", transactionId);
RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper);
//如果消息表存在,則說(shuō)明生產(chǎn)者事務(wù)執(zhí)行完成,回復(fù)commit
if (null != rocketmqTransactionLog) {
return RocketMQLocalTransactionState.COMMIT;
}
//回復(fù)rollback
return RocketMQLocalTransactionState.ROLLBACK;
}
}
createOrderWithRocketMqLog做了兩件事,分別是插入訂單信息和創(chuàng)建消息日志,這里筆者用到了事務(wù)注解確保了兩個(gè)操作的原子性。 這樣一來(lái),MQserver后續(xù)的回查邏輯完全可以基于RocketmqTransactionLog 進(jìn)行判斷,如果消息的事務(wù)id在表中存在,則說(shuō)明生產(chǎn)者本地事務(wù)成功,反之就是失敗。
@Transactional(rollbackFor = Exception.class)
@Override
public void createOrderWithRocketMqLog(Order order, String transactionId) {
//創(chuàng)建訂單編號(hào)
order.setOrderNo(UUID.randomUUID().toString());
//插入訂單信息
orderMapper.insert(order);
//事務(wù)日志
RocketmqTransactionLog log = RocketmqTransactionLog.builder()
.transactionId(transactionId)
.log("執(zhí)行創(chuàng)建訂單操作")
.build();
rocketmqTransactionLogMapper.insert(log);
}
補(bǔ)充一下基于MP生成的RocketmqTransactionLog 類(lèi)代碼:
@TableName("rocketmq_transaction_log")
@ApiModel(value = "RocketmqTransactionLog對(duì)象", description = "")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class RocketmqTransactionLog implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(value = "ID", type = IdType.AUTO)
private Integer id;
private String transactionId;
private String log;
}
(5) 完成account、product監(jiān)聽(tīng)事件
然后我們就可以實(shí)現(xiàn)用戶(hù)服務(wù)和商品服務(wù)的監(jiān)聽(tīng)事件了,一旦生產(chǎn)者提交事務(wù)消息之后,這幾個(gè)消費(fèi)者都會(huì)收到這個(gè)topic(主題)的消息,進(jìn)而完成當(dāng)前服務(wù)的業(yè)務(wù)邏輯。
先來(lái)看看實(shí)現(xiàn)扣款的用戶(hù)服務(wù),我們的監(jiān)聽(tīng)器繼承了RocketMQListener,基于@RocketMQMessageListener注解設(shè)置它訂閱的主題為createByRocketMQ,一旦收到這個(gè)主題的消息時(shí)這個(gè)監(jiān)聽(tīng)器就會(huì)執(zhí)行onMessage方法,我們的邏輯很簡(jiǎn)單,就是獲取消息的內(nèi)容完成扣款,唯一需要注意的就是線(xiàn)程安全問(wèn)題。我們的壓測(cè)的情況下,單用戶(hù)可能會(huì)頻繁創(chuàng)建訂單,在并發(fā)期間同一個(gè)用戶(hù)的扣款消息可能同時(shí)到達(dá)扣款服務(wù)中,這就導(dǎo)致單位時(shí)間內(nèi)扣款服務(wù)從數(shù)據(jù)庫(kù)中查詢(xún)到相同的余額,執(zhí)行相同的扣款邏輯,導(dǎo)致金額少扣了。
所以我們必須保證扣款操作互斥和原子化,考慮到筆者當(dāng)前項(xiàng)目環(huán)境是單體,所以就用簡(jiǎn)單的synchronized 關(guān)鍵字解決問(wèn)題。
@Slf4j
@Service
@RocketMQMessageListener(topic = "ORDER_MSG_TOPIC", consumerGroup = "cloud-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class SubtracAmountListener implements RocketMQListener<OrderDto> {
@Resource
private AccountMapper accountMapper;
//強(qiáng)制轉(zhuǎn)為runTimeException
@SneakyThrows
@Override
public void onMessage(OrderDto orderDto) {
log.info("賬戶(hù)服務(wù)收到消息,開(kāi)始消費(fèi)");
QueryWrapper<Account> query = new QueryWrapper<>();
query.eq("account_code", orderDto.getAccountCode());
//解決單體服務(wù)下線(xiàn)程安全問(wèn)題
synchronized (this){
Account account = accountMapper.selectOne(query);
BigDecimal subtract = account.getAmount().subtract(orderDto.getAmount());
if (subtract.compareTo(BigDecimal.ZERO)<0){
throw new Exception("用戶(hù)余額不足");
}
account.setAmount(subtract);
log.info("更新賬戶(hù)服務(wù),請(qǐng)求參數(shù):{}", JSON.toJSONString(account));
accountMapper.updateById(account);
}
}
}
然后就說(shuō)商品服務(wù),邏輯也很簡(jiǎn)單,也同樣要注意一下線(xiàn)程安全問(wèn)題:
@Slf4j
@Service
@RocketMQMessageListener(topic = "ORDER_MSG_TOPIC", consumerGroup = "cloud-group2")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ProductSubtractListener implements RocketMQListener<OrderDto> {
@Resource
private ProductMapper productMapper;
@Override
public void onMessage(OrderDto orderDto) {
log.info(" 產(chǎn)品服務(wù)收到消息,開(kāi)始消費(fèi)");
QueryWrapper<Product> queryWrapper=new QueryWrapper<>();
queryWrapper.eq("product_code",orderDto.getProductCode());
synchronized (this){
Product product = productMapper.selectOne(queryWrapper);
if (product.getCount()<orderDto.getCount()){
throw new RuntimeException("庫(kù)存不足");
}
product.setCount(product.getCount()-orderDto.getCount());
log.info("更新產(chǎn)品庫(kù)存信息,請(qǐng)求參數(shù):{}", JSON.toJSONString(product));
productMapper.updateById(product);
}
}
}
三、基于幾個(gè)測(cè)試用例驗(yàn)證MQ半消息事務(wù)
1. 前置準(zhǔn)備與說(shuō)明
完整編碼工作后,自測(cè)是非常有必要的,我們?nèi)粘M瓿砷_(kāi)發(fā)任務(wù)后,都會(huì)結(jié)合需求場(chǎng)景以及功能編排一些自測(cè)用例查看最終結(jié)果是否與預(yù)期一致。 需要注意的是由于訂單業(yè)務(wù)邏輯較為復(fù)雜,很多業(yè)務(wù)場(chǎng)景一篇博客是不可能全部覆蓋,所以這里我們就測(cè)試一下基于RocketMQ實(shí)現(xiàn)分布式事務(wù)常見(jiàn)的幾個(gè)問(wèn)題場(chǎng)景是否和預(yù)期一致。
在測(cè)試前我們必須做好前置準(zhǔn)備工作,準(zhǔn)備功能測(cè)試時(shí)涉及到的SQL語(yǔ)句,以本次用戶(hù)購(gòu)買(mǎi)產(chǎn)品的業(yè)務(wù)為例,涉及到訂單表、用戶(hù)賬戶(hù)信息表、產(chǎn)品表、以及生產(chǎn)者本地事務(wù)日志表。
SELECT * FROM t_order to2 ;
SELECT * from account a ;
SELECT * from product p ;
SELECT * FROM rocketmq_transaction_log rtl ;
在每次測(cè)試完成之后,我們希望數(shù)據(jù)能夠還原,所以這里也需要準(zhǔn)備一下每次測(cè)試結(jié)束后的更新語(yǔ)句,由于訂單表和消息日志表都是主鍵自增,考慮到這兩張表只涉及插入,所以筆者為了重置主鍵的值采取的是truncate語(yǔ)句。
truncate table t_order;
truncate rocketmq_transaction_log ;
UPDATE account set amount=10000 ;
UPDATE product set count=10000;
2. 測(cè)試正常消費(fèi)
第一個(gè)用例是查看所有服務(wù)都正常的情況下,訂單表是否有數(shù)據(jù),用戶(hù)表的用戶(hù)是否會(huì)正??劭睿约吧唐繁韼?kù)存是否會(huì)扣減。
測(cè)試前,我們先查看訂單表,確認(rèn)沒(méi)有數(shù)據(jù)
查看我們的測(cè)試用戶(hù),錢(qián)包額度為10000
再查看庫(kù)存表,可以看到數(shù)量為1000
確認(rèn)完數(shù)據(jù)之后,我們就可以測(cè)試服務(wù)是否按照預(yù)期的方式執(zhí)行,將所有服務(wù)啟動(dòng)。
我們通過(guò)網(wǎng)關(guān)發(fā)起調(diào)用,請(qǐng)求地址如下:
http://localhost:8090/order/order/createOrderByMQ
請(qǐng)求參數(shù)如下,從參數(shù)可以看出這個(gè)請(qǐng)求意為用戶(hù)代碼(accountCode)為demoData這個(gè)用戶(hù)希望購(gòu)買(mǎi)1個(gè)(count)產(chǎn)品代碼(productCode)為P001的產(chǎn)品,該產(chǎn)品當(dāng)前售價(jià)(price)為1元。
{
"accountCode": "0932897",
"productCode": "P003",
"count": 1
}
調(diào)用完成后,查看訂單表,訂單數(shù)據(jù)生成無(wú)誤:
圖片
查看用戶(hù)服務(wù)是否完成用戶(hù)扣款,扣款無(wú)誤:
查看產(chǎn)品表,可以看到產(chǎn)品數(shù)量也準(zhǔn)確扣減:
3. 測(cè)試生產(chǎn)者commit提交失敗
我們希望測(cè)試一下發(fā)送完half消息之后,執(zhí)行本地事務(wù)完成,但是未提交commit請(qǐng)求時(shí),MQServer是否會(huì)調(diào)用回查邏輯。
為了完成這一點(diǎn)我們必須按照以下兩個(gè)步驟執(zhí)行:
- 在訂單服務(wù)提交事務(wù)消息處打個(gè)斷點(diǎn)。
- 發(fā)起請(qǐng)求,當(dāng)代碼執(zhí)行到這里的時(shí)候通過(guò)jps定位到進(jìn)程號(hào),將其強(qiáng)制殺死。如下所示,我們的代碼執(zhí)行到了提交事務(wù)消息這一步:
我們通過(guò)jps定位并將其殺死::
完成這些步驟后,我們?cè)俅螌⒎?wù)啟動(dòng),等待片刻之后可以發(fā)現(xiàn),MQServer會(huì)調(diào)用checkLocalTransaction回查生產(chǎn)者本地事務(wù)的情況。我們放行這塊代碼讓程序執(zhí)行下去,最后再查看數(shù)據(jù)庫(kù)中的數(shù)據(jù)結(jié)果是否符合預(yù)期。
4. 測(cè)試消費(fèi)者消費(fèi)失敗
測(cè)試消費(fèi)者執(zhí)行報(bào)錯(cuò)后是否會(huì)進(jìn)行重試,這一點(diǎn)就比較好測(cè)試了,我們?cè)谙M(fèi)者監(jiān)聽(tīng)器中插入隨便插入一個(gè)報(bào)錯(cuò)查看其是否會(huì)不斷重試。這里筆者就不多做演示,實(shí)驗(yàn)結(jié)果是會(huì)進(jìn)行不斷重試,當(dāng)重試次數(shù)達(dá)到閾值時(shí)會(huì)將結(jié)果存到死信隊(duì)列中。
四、壓測(cè)MQ和Seata的性能
由于MQ是采用異步消費(fèi)的形式解耦了服務(wù)間的業(yè)務(wù),而我們的Seata采用默認(rèn)的AT模式每次執(zhí)行分布式事務(wù)時(shí)都會(huì)需要借助undo-log、全局鎖等的方式保證最終一致性。所以理論上RocketMQ的性能肯定是高于Seata的,對(duì)此我們不妨使用Jmeter進(jìn)行壓測(cè)來(lái)驗(yàn)證一下。
本次壓測(cè)只用了1000個(gè)并發(fā),MQ和seata的壓測(cè)結(jié)果如下,可以看到MQ無(wú)論從執(zhí)行時(shí)間還是成功率都遠(yuǎn)遠(yuǎn)優(yōu)秀于Seata的。
MQ的壓測(cè)結(jié)果:
Seata的壓測(cè)結(jié)果,可以看到大量的數(shù)據(jù)因?yàn)閘ock_table鎖超時(shí)而導(dǎo)致失敗,所以整體性能表現(xiàn)非常差勁:
五、詳解RocketMQ落地分布式事務(wù)常見(jiàn)問(wèn)題
1. RocketMQ 如何保證事務(wù)的最終一致性
最終一致性是一種允許軟狀態(tài)存在的分布式事務(wù)解決方案,RocketMQ 保證事務(wù)最終一致性的方式主要是依賴(lài)生產(chǎn)者本地事務(wù)和消息可靠發(fā)送的原子性來(lái)最大努力保證最終一致性,注意這里筆者所強(qiáng)調(diào)的盡最大努力交付。
之所以說(shuō)是最大努力交付是說(shuō)RocketMQ是通過(guò)保證生產(chǎn)者事務(wù)和消息發(fā)送可靠性的原子性和一致性,由此保證消費(fèi)者一定能夠消費(fèi)到消息,理想情況下,只要消費(fèi)者能夠正確消費(fèi)消息,事務(wù)結(jié)果最終是可以保證一致性的,但是復(fù)雜的系統(tǒng)因素消費(fèi)者可能會(huì)存在消費(fèi)失敗的情況,此時(shí)事務(wù)最終一致性就無(wú)法保證,業(yè)界的做法是通過(guò)手動(dòng)操作或者腳本等方式完成數(shù)據(jù)補(bǔ)償。
2. 什么是half消息
half消息即半消息,和普通消息的區(qū)別是該消息不會(huì)立馬被消費(fèi)者消費(fèi),原因是half消息的存在是為了保證生產(chǎn)者本地事務(wù)和消費(fèi)者的原子性和一致性,其過(guò)程如上文所介紹,初始發(fā)送的half消息是存儲(chǔ)在MQ一個(gè)內(nèi)存隊(duì)列中(并未投遞到topic中),只有生產(chǎn)者本地事務(wù)成功并發(fā)送commit通知后,這個(gè)消息才會(huì)被持久化到commitLog同時(shí)提交到topic隊(duì)列中,此時(shí)消費(fèi)者才能夠消費(fèi)該消息并執(zhí)行本地事務(wù)。
3. 為什么要先發(fā)送half消息再執(zhí)行本地事務(wù)?先執(zhí)行本地事務(wù),成功后在發(fā)送不行嗎?
先發(fā)送half消息的原因是為了盡可能確保生產(chǎn)者和消息隊(duì)列通信正常,只有通信正常了才能確保生產(chǎn)者本地事務(wù)和消息發(fā)送的原子性和一致性,由此保證分布式事務(wù)的可靠性。
先執(zhí)行本地事務(wù),執(zhí)行成功后再發(fā)送存在一個(gè)問(wèn)題,試想一下,假設(shè)我們本地事務(wù)執(zhí)行成功,但是發(fā)送的消息因?yàn)榫W(wǎng)絡(luò)波動(dòng)等諸多原因?qū)е翸Q沒(méi)有收到消息,此時(shí)生產(chǎn)者和消費(fèi)者的分布式事務(wù)就會(huì)出現(xiàn)數(shù)據(jù)不一致問(wèn)題。
而half消息則不同,它會(huì)優(yōu)先發(fā)送一個(gè)消費(fèi)者感知不到的half消息確認(rèn)通信可達(dá),然后執(zhí)行本地事務(wù)后降消息設(shè)置未commit讓消費(fèi)者消費(fèi),即使說(shuō)commit消息未收到,因?yàn)閔alf消息的存在,MQ在指定超時(shí)先限制后也可以通過(guò)回查的方式到生產(chǎn)者事務(wù)表查詢(xún)執(zhí)行情況。
4. 如果mq收到half消息,準(zhǔn)備發(fā)送success信號(hào)的消息給生產(chǎn)者,但因?yàn)榫W(wǎng)絡(luò)波動(dòng)導(dǎo)致生產(chǎn)者沒(méi)有收到這個(gè)消息要怎么辦?
此時(shí)生產(chǎn)者就會(huì)認(rèn)為half消息發(fā)送失敗,本地事務(wù)不執(zhí)行,隨著時(shí)間推移MQ長(zhǎng)時(shí)間沒(méi)收到commit或者rollback消息就會(huì)回查生產(chǎn)者消息日志表,明確沒(méi)看到數(shù)據(jù)則知曉生產(chǎn)者本地事務(wù)執(zhí)行失敗,直接rollback掉half消息,而消費(fèi)者全程無(wú)感知,業(yè)務(wù)上的一致性也是可以保證。
5. MQ沒(méi)有收到生產(chǎn)者(訂單服務(wù))的commit或者rollback信號(hào)怎么保證事務(wù)最終一致性?
常規(guī)的做法就是建立一張表記錄消息狀態(tài),只要我們訂單信息插入成功就需要日志一下這條數(shù)據(jù),所以我們必須保證訂單數(shù)據(jù)插入和日志插入表中的原子性,確保生產(chǎn)者的事務(wù)和消息日志的ACID:
6. 如果生產(chǎn)者執(zhí)行本地事務(wù)失敗了怎么辦?
這一點(diǎn)前面的部分也已經(jīng)說(shuō)明,首先將本地會(huì)事務(wù)回滾,并向消息隊(duì)列提交一個(gè)rollback的請(qǐng)求不提交half消息,消息就不會(huì)被消費(fèi)者消費(fèi),保證最終一致性。
7. 前面說(shuō)的都是事務(wù)流程?這和事務(wù)消息如何保證數(shù)據(jù)最終一致性有什么關(guān)系?
生產(chǎn)者和消息隊(duì)列事務(wù)流程可以確保生產(chǎn)者和消息隊(duì)列發(fā)送的一致性,確保寫(xiě)操作都是同時(shí)成功或者失敗。只有保證兩者正常通信,才能確保消費(fèi)者可以消費(fèi)MQ中的消息從而完成數(shù)據(jù)最終一致性。
8. 消費(fèi)者提交本地事務(wù)失敗了怎么辦?
我們都知道消息隊(duì)列只能保證消息可靠性,而無(wú)法保證分布式事務(wù)的強(qiáng)一致性,出現(xiàn)這種情況,消費(fèi)者 不向 MQ 提交本次消息的 offset 即可。如果不提交 offset,那么 MQ 會(huì)在一定時(shí)間后,繼續(xù)將這條消息推送給消費(fèi)者,消費(fèi)者就可以繼續(xù)執(zhí)行本地事務(wù)并提交了,直到成功消息隊(duì)列會(huì)進(jìn)行N次重試,如果還是失敗,則可以到死信隊(duì)列中查看失敗消息,然后通過(guò)補(bǔ)償機(jī)制實(shí)現(xiàn)分布式事務(wù)最終一致性。