順序消息的實(shí)現(xiàn)-RocketMQ知識體系(五)
我們知道,kafka 如果要保證順序消費(fèi),必須保證消息保存到同一個(gè)patition上,而且為了有序性,只能有一個(gè)消費(fèi)者進(jìn)行消費(fèi)。這種情況下,Kafka 就退化成了單一隊(duì)列,毫無并發(fā)性可言,極大降低系統(tǒng)性能。那么對于對業(yè)務(wù)比較友好的RocketMQ 是如何實(shí)現(xiàn)的呢?首先,我們循序漸進(jìn)的來了解下順序消息的實(shí)現(xiàn)。
順序消息業(yè)務(wù)使用場景
1、電商場景中傳遞訂單狀態(tài)。
2、同步mysql 的binlong 日志,數(shù)據(jù)庫的操作是有順序的。
3、其他消息之間有先后的依賴關(guān)系,后一條消息需要依賴于前一條消息的處理結(jié)果的情況。
等等。。。
消息中間件中的順序消息
順序消息(FIFO 消息)是 MQ 提供的一種嚴(yán)格按照順序進(jìn)行發(fā)布和消費(fèi)的消息類型。順序消息由兩個(gè)部分組成:順序發(fā)布和順序消費(fèi)。
順序消息包含兩種類型:
分區(qū)順序:一個(gè)Partition(queue)內(nèi)所有的消息按照先進(jìn)先出的順序進(jìn)行發(fā)布和消費(fèi)
全局順序:一個(gè)Topic內(nèi)所有的消息按照先進(jìn)先出的順序進(jìn)行發(fā)布和消費(fèi).但是全局順序極大的降低了系統(tǒng)的吞吐量,不符合mq的設(shè)計(jì)初衷。
那么折中的辦法就是選擇分區(qū)順序。
【局部順序消費(fèi)】
如何保證順序
在MQ的模型中,順序需要由3個(gè)階段去保障:
- 消息被發(fā)送時(shí)保持順序
- 消息被存儲時(shí)保持和發(fā)送的順序一致
- 消息被消費(fèi)時(shí)保持和存儲的順序一致
發(fā)送時(shí)保持順序意味著對于有順序要求的消息,用戶應(yīng)該在同一個(gè)線程中采用同步的方式發(fā)送。存儲保持和發(fā)送的順序一致則要求在同一線程中被發(fā)送出來的消息A和B,存儲時(shí)在空間上A一定在B之前。而消費(fèi)保持和存儲一致則要求消息A、B到達(dá)Consumer之后必須按照先A后B的順序被處理。
第一點(diǎn),消息順序發(fā)送,多線程發(fā)送的消息無法保證有序性,因此,需要業(yè)務(wù)方在發(fā)送時(shí),針對同一個(gè)業(yè)務(wù)編號(如同一筆訂單)的消息需要保證在一個(gè)線程內(nèi)順序發(fā)送,在上一個(gè)消息發(fā)送成功后,在進(jìn)行下一個(gè)消息的發(fā)送。對應(yīng)到mq中,消息發(fā)送方法就得使用同步發(fā)送,異步發(fā)送無法保證順序性。
第二點(diǎn),消息順序存儲,mq的topic下會存在多個(gè)queue,要保證消息的順序存儲,同一個(gè)業(yè)務(wù)編號的消息需要被發(fā)送到一個(gè)queue中。對應(yīng)到mq中,需要使用MessageQueueSelector來選擇要發(fā)送的queue,即對業(yè)務(wù)編號進(jìn)行hash,然后根據(jù)隊(duì)列數(shù)量對hash值取余,將消息發(fā)送到一個(gè)queue中。
第三點(diǎn),消息順序消費(fèi),要保證消息順序消費(fèi),同一個(gè)queue就只能被一個(gè)消費(fèi)者所消費(fèi),因此對broker中消費(fèi)隊(duì)列加鎖是無法避免的。同一時(shí)刻,一個(gè)消費(fèi)隊(duì)列只能被一個(gè)消費(fèi)者消費(fèi),消費(fèi)者內(nèi)部,也只能有一個(gè)消費(fèi)線程來消費(fèi)該隊(duì)列。即,同一時(shí)刻,一個(gè)消費(fèi)隊(duì)列只能被一個(gè)消費(fèi)者中的一個(gè)線程消費(fèi)。
RocketMQ中順序的實(shí)現(xiàn)
【Producer端】
Producer端確保消息順序唯一要做的事情就是將消息路由到特定的分區(qū),在RocketMQ中,通過MessageQueueSelector來實(shí)現(xiàn)分區(qū)的選擇。
- /**
- * 消息隊(duì)列選擇器
- */
- public interface MessageQueueSelector {
- /**
- * 選擇消息隊(duì)列
- *
- * @param mqs 消息隊(duì)列
- * @param msg 消息
- * @param arg 參數(shù)
- * @return 消息隊(duì)列
- */
- MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
- }
- List
mqs:消息要發(fā)送的Topic下所有的分區(qū) - Message msg:消息對象
- 額外的參數(shù):用戶可以傳遞自己的參數(shù)
比如如下實(shí)現(xiàn)就可以保證相同的訂單的消息被路由到相同的分區(qū):
- long orderId = ((Order) object).getOrderId;
- return mqs.get(orderId % mqs.size());
【Consumer端】
嘗試鎖定鎖定MessageQueue。
首先我們?nèi)绾伪WC一個(gè)隊(duì)列只被一個(gè)消費(fèi)者消費(fèi)?
消費(fèi)隊(duì)列存在于broker端,如果想保證一個(gè)隊(duì)列被一個(gè)消費(fèi)者消費(fèi),那么消費(fèi)者在進(jìn)行消息拉取消費(fèi)時(shí)就必須向mq服務(wù)器申請隊(duì)列鎖,消費(fèi)者申請隊(duì)列鎖的代碼存在于RebalanceService消息隊(duì)列負(fù)載的實(shí)現(xiàn)代碼中。
消費(fèi)者重新負(fù)載,并且分配完消費(fèi)隊(duì)列后,需要向mq服務(wù)器發(fā)起消息拉取請求,代碼實(shí)現(xiàn)在RebalanceImpl#updateProcessQueueTableInRebalance中,針對順序消息的消息拉取,mq做了如下判斷:
- // 增加 不在processQueueTable && 存在于mqSet 里的消息隊(duì)列。
- List<PullRequest> pullRequestList = new ArrayList<>(); // 拉消息請求數(shù)組
- for (MessageQueue mq : mqSet) {
- if (!this.processQueueTable.containsKey(mq)) {
- if (isOrder && !this.lock(mq)) { // 順序消息鎖定消息隊(duì)列
- log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
- continue;
- }
- this.removeDirtyOffset(mq);
- ProcessQueue pq = new ProcessQueue();
- long nextOffset = this.computePullFromWhere(mq);
- if (nextOffset >= 0) {
- ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
- if (pre != null) {
- log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
- } else {
- log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
- PullRequest pullRequest = new PullRequest();
- pullRequest.setConsumerGroup(consumerGroup);
- pullRequest.setNextOffset(nextOffset);
- pullRequest.setMessageQueue(mq);
- pullRequest.setProcessQueue(pq);
- pullRequestList.add(pullRequest);
- changed = true;
- }
- } else {
- log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
- }
- }
- }
- // 發(fā)起消息拉取請求
- this.dispatchPullRequest(pullRequestList);
核心思想就是,消費(fèi)客戶端先向broker端發(fā)起對messageQueue的加鎖請求,只有加鎖成功時(shí)才創(chuàng)建pullRequest進(jìn)行消息拉取,下面看下lock加鎖請求方法:
- /**
- * 請求Broker獲得指定消息隊(duì)列的分布式鎖
- *
- * @param mq 隊(duì)列
- * @return 是否成功
- */
- public boolean lock(final MessageQueue mq) {
- FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
- if (findBrokerResult != null) {
- LockBatchRequestBody requestBody = new LockBatchRequestBody();
- requestBody.setConsumerGroup(this.consumerGroup);
- requestBody.setClientId(this.mQClientFactory.getClientId());
- requestBody.getMqSet().add(mq);
- try {
- // 請求Broker獲得指定消息隊(duì)列的分布式鎖
- Set<MessageQueue> lockedMq =
- this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
- // 設(shè)置消息處理隊(duì)列鎖定成功。鎖定消息隊(duì)列成功,可能本地沒有消息處理隊(duì)列,設(shè)置鎖定成功會在lockAll()方法。
- for (MessageQueue mmqq : lockedMq) {
- ProcessQueue processQueue = this.processQueueTable.get(mmqq);
- if (processQueue != null) {
- processQueue.setLocked(true);
- processQueue.setLastLockTimestamp(System.currentTimeMillis());
- }
- }
- boolean lockOK = lockedMq.contains(mq);
- log.info("the message queue lock {}, {} {}",
- lockOK ? "OK" : "Failed",
- this.consumerGroup,
- mq);
- return lockOK;
- } catch (Exception e) {
- log.error("lockBatchMQ exception, " + mq, e);
- }
- }
- return false;
- }
代碼實(shí)現(xiàn)邏輯比較清晰,就是調(diào)用lockBatchMQ方法發(fā)送了一個(gè)加鎖請求,那么broker端收到加鎖請求后的處理邏輯又是怎么樣?
【broker端實(shí)現(xiàn)】
broker端收到加鎖請求的處理邏輯在RebalanceLockManager#tryLockBatch方法中,RebalanceLockManager中關(guān)鍵屬性如下:
- /**
- * 消息隊(duì)列鎖過期時(shí)間,默認(rèn)60s
- */
- private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty(
- "rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
- /**
- * 鎖
- */
- private final Lock lock = new ReentrantLock();
- /**
- * 消費(fèi)分組的消息隊(duì)列鎖映射
- */
- private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
- new ConcurrentHashMap<>(1024);
LockEntry對象中關(guān)鍵屬性如下:
- /**
- * 鎖定記錄
- */
- static class LockEntry {
- /**
- * 客戶端編號
- */
- private String clientId;
- /**
- * 最后鎖定時(shí)間
- */
- private volatile long lastUpdateTimestamp = System.currentTimeMillis();
- public String getClientId() {
- return clientId;
- }
- public void setClientId(String clientId) {
- this.clientId = clientId;
- }
- public long getLastUpdateTimestamp() {
- return lastUpdateTimestamp;
- }
- public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
- this.lastUpdateTimestamp = lastUpdateTimestamp;
- }
- /**
- * 是否鎖定
- *
- * @param clientId 客戶端編號
- * @return 是否
- */
- public boolean isLocked(final String clientId) {
- boolean eq = this.clientId.equals(clientId);
- return eq && !this.isExpired();
- }
- /**
- * 鎖定是否過期
- *
- * @return 是否
- */
- public boolean isExpired() {
- boolean expired =
- (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
- return expired;
- }
- }
broker端通過對ConcurrentMap
【再次回到Consumer端,拿到鎖后】
消費(fèi)者對messageQueue的加鎖已經(jīng)成功,那么就進(jìn)入到了第二個(gè)步驟,創(chuàng)建pullRequest進(jìn)行消息拉取,消息拉取部分的代碼實(shí)現(xiàn)在PullMessageService中,消息拉取完后,需要提交到ConsumeMessageService中進(jìn)行消費(fèi),順序消費(fèi)的實(shí)現(xiàn)為ConsumeMessageOrderlyService,提交消息進(jìn)行消費(fèi)的方法為ConsumeMessageOrderlyService#submitConsumeRequest,具體實(shí)現(xiàn)如下:
- @Override
- public void submitConsumeRequest(//
- final List<MessageExt> msgs, //
- final ProcessQueue processQueue, //
- final MessageQueue messageQueue, //
- final boolean dispathToConsume) {
- if (dispathToConsume) {
- ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
- this.consumeExecutor.submit(consumeRequest);
- }
- }
構(gòu)建了一個(gè)ConsumeRequest對象,并提交給了ThreadPoolExecutor來并行消費(fèi),看下順序消費(fèi)的ConsumeRequest的run方法實(shí)現(xiàn):
- public void run() {
- if (this.processQueue.isDropped()) {
- log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
- return;
- }
- // 獲得 Consumer 消息隊(duì)列鎖
- final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
- synchronized (objLock) {
- // (廣播模式) 或者 (集群模式 && Broker消息隊(duì)列鎖有效)
- if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
- || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
- final long beginTime = System.currentTimeMillis();
- // 循環(huán)
- for (boolean continueConsume = true; continueConsume; ) {
- if (this.processQueue.isDropped()) {
- log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
- break;
- }
- // 消息隊(duì)列分布式鎖未鎖定,提交延遲獲得鎖并消費(fèi)請求
- if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
- && !this.processQueue.isLocked()) {
- log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
- ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
- break;
- }
- // 消息隊(duì)列分布式鎖已經(jīng)過期,提交延遲獲得鎖并消費(fèi)請求
- if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
- && this.processQueue.isLockExpired()) {
- log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
- ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
- break;
- }
- // 當(dāng)前周期消費(fèi)時(shí)間超過連續(xù)時(shí)長,默認(rèn):60s,提交延遲消費(fèi)請求。默認(rèn)情況下,每消費(fèi)1分鐘休息10ms。
- long interval = System.currentTimeMillis() - beginTime;
- if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
- ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
- break;
- }
- // 獲取消費(fèi)消息。此處和并發(fā)消息請求不同,并發(fā)消息請求已經(jīng)帶了消費(fèi)哪些消息。
- final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
- List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
- if (!msgs.isEmpty()) {
- final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
- ConsumeOrderlyStatus status = null;
- // Hook:before
- ConsumeMessageContext consumeMessageContext = null;
- if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
- consumeMessageContext = new ConsumeMessageContext();
- consumeMessageContext
- .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
- consumeMessageContext.setMq(messageQueue);
- consumeMessageContext.setMsgList(msgs);
- consumeMessageContext.setSuccess(false);
- // init the consume context type
- consumeMessageContext.setProps(new HashMap<String, String>());
- ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
- }
- // 執(zhí)行消費(fèi)
- long beginTimestamp = System.currentTimeMillis();
- ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
- boolean hasException = false;
- try {
- this.processQueue.getLockConsume().lock(); // 鎖定隊(duì)列消費(fèi)鎖
- if (this.processQueue.isDropped()) {
- log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
- this.messageQueue);
- break;
- }
- status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
- } catch (Throwable e) {
- log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
- RemotingHelper.exceptionSimpleDesc(e), //
- ConsumeMessageOrderlyService.this.consumerGroup, //
- msgs, //
- messageQueue);
- hasException = true;
- } finally {
- this.processQueue.getLockConsume().unlock(); // 鎖定隊(duì)列消費(fèi)鎖
- }
- if (null == status //
- || ConsumeOrderlyStatus.ROLLBACK == status//
- || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
- log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", //
- ConsumeMessageOrderlyService.this.consumerGroup, //
- msgs, //
- messageQueue);
- }
- // 解析消費(fèi)結(jié)果狀態(tài)
- long consumeRT = System.currentTimeMillis() - beginTimestamp;
- if (null == status) {
- if (hasException) {
- returnType = ConsumeReturnType.EXCEPTION;
- } else {
- returnType = ConsumeReturnType.RETURNNULL;
- }
- } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
- returnType = ConsumeReturnType.TIME_OUT;
- } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
- returnType = ConsumeReturnType.FAILED;
- } else if (ConsumeOrderlyStatus.SUCCESS == status) {
- returnType = ConsumeReturnType.SUCCESS;
- }
- if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
- consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
- }
- if (null == status) {
- status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
- }
- // Hook:after
- if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
- consumeMessageContext.setStatus(status.toString());
- consumeMessageContext
- .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
- ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
- }
- ConsumeMessageOrderlyService.this.getConsumerStatsManager()
- .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
- // 處理消費(fèi)結(jié)果
- continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
- } else {
- continueConsume = false;
- }
- }
- } else {
- if (this.processQueue.isDropped()) {
- log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
- return;
- }
- ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
- }
- }
- }
獲取到鎖對象后,使用synchronized嘗試申請線程級獨(dú)占鎖。
如果加鎖成功,同一時(shí)刻只有一個(gè)線程進(jìn)行消息消費(fèi)。
如果加鎖失敗,會延遲100ms重新嘗試向broker端申請鎖定messageQueue,鎖定成功后重新提交消費(fèi)請求
至此,第三個(gè)關(guān)鍵點(diǎn)的解決思路也清晰了,基本上就兩個(gè)步驟。
創(chuàng)建消息拉取任務(wù)時(shí),消息客戶端向broker端申請鎖定MessageQueue,使得一個(gè)MessageQueue同一個(gè)時(shí)刻只能被一個(gè)消費(fèi)客戶端消費(fèi)。
消息消費(fèi)時(shí),多線程針對同一個(gè)消息隊(duì)列的消費(fèi)先嘗試使用synchronized申請獨(dú)占鎖,加鎖成功才能進(jìn)行消費(fèi),使得一個(gè)MessageQueue同一個(gè)時(shí)刻只能被一個(gè)消費(fèi)客戶端中一個(gè)線程消費(fèi)。
【順序消費(fèi)問題拆解】
- broke 上要保證一個(gè)隊(duì)列只有一個(gè)進(jìn)程消費(fèi),即一個(gè)隊(duì)列同一時(shí)間只有一個(gè)consumer 消費(fèi)
- broker 給consumer 的消息順序應(yīng)該保持一致,這個(gè)通過 rpc傳輸,序列化后消息順序不變,所以很容易實(shí)現(xiàn)
- consumer 上的隊(duì)列消息要保證同一個(gè)時(shí)間只有一個(gè)線程消費(fèi)
通過問題的拆分,問題變成同一個(gè)共享資源串行處理了,要解決這個(gè)問題,通常的做法都是訪問資源的時(shí)候加鎖,即broker 上一個(gè)隊(duì)列消息在被consumer 訪問的必須加鎖,單個(gè)consumer 端多線程并發(fā)處理消息的時(shí)候需要加鎖;這里還需要考慮broker 鎖的異常情況,假如一個(gè)broke 隊(duì)列上的消息被consumer 鎖住了,萬一consumer 崩潰了,這個(gè)鎖就釋放不了,所以broker 上的鎖需要加上鎖的過期時(shí)間。
實(shí)際上 RocketMQ 消費(fèi)端也就是照著上面的思路做:
RocketMQ中順序消息注意事項(xiàng)
實(shí)際項(xiàng)目中并不是所有情況都需要用到順序消息,但這也是設(shè)計(jì)方案的時(shí)候容易忽略的一點(diǎn)
順序消息是生產(chǎn)者和消費(fèi)者配合協(xié)調(diào)作用的結(jié)果,但是消費(fèi)端保證順序消費(fèi),是保證不了順序消息的
消費(fèi)端并行方式消費(fèi),只設(shè)置一次拉取消息的數(shù)量為 1(即配置參數(shù) consumeBatchSize ),是否可以實(shí)現(xiàn)順序消費(fèi) ?這里實(shí)際是不能的,并發(fā)消費(fèi)在消費(fèi)端有多個(gè)線程同時(shí)消費(fèi),consumeBatchSize 只是一個(gè)線程一次拉取消息的數(shù)量,對順序消費(fèi)沒有意義,這里大家有興趣可以看 ConsumeMessageConcurrentlyService 的代碼,并發(fā)消費(fèi)的邏輯都在哪里。
在使用順序消息時(shí),一定要注意其異常情況的出現(xiàn),對于順序消息,當(dāng)消費(fèi)者消費(fèi)消息失敗后,消息隊(duì)列 RocketMQ 版會自動不斷地進(jìn)行消息重試(每次間隔時(shí)間為 1 秒),重試最大值是Integer.MAX_VALUE.這時(shí),應(yīng)用會出現(xiàn)消息消費(fèi)被阻塞的情況。因此,建議您使用順序消息時(shí),務(wù)必保證應(yīng)用能夠及時(shí)監(jiān)控并處理消費(fèi)失敗的情況,避免阻塞現(xiàn)象的發(fā)生。
重要的事再強(qiáng)調(diào)一次:在使用順序消息時(shí),一定要注意其異常情況的出現(xiàn)!防止資源不釋放!
小結(jié)
通過以上的了解,我們知道了實(shí)現(xiàn)順序消息所必要的條件:順序發(fā)送、順序存儲、順序消費(fèi)。RocketMQ的設(shè)計(jì)中考慮到了這些,我們只需要簡單的使用API,不需要額外使用代碼來約束業(yè)務(wù),使得實(shí)現(xiàn)順序消息更加簡單。