自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

順序消息的實(shí)現(xiàn)-RocketMQ知識體系(五)

開發(fā) 前端
順序消息(FIFO 消息)是 MQ 提供的一種嚴(yán)格按照順序進(jìn)行發(fā)布和消費(fèi)的消息類型。順序消息由兩個(gè)部分組成:順序發(fā)布和順序消費(fèi)。

[[410981]]

我們知道,kafka 如果要保證順序消費(fèi),必須保證消息保存到同一個(gè)patition上,而且為了有序性,只能有一個(gè)消費(fèi)者進(jìn)行消費(fèi)。這種情況下,Kafka 就退化成了單一隊(duì)列,毫無并發(fā)性可言,極大降低系統(tǒng)性能。那么對于對業(yè)務(wù)比較友好的RocketMQ 是如何實(shí)現(xiàn)的呢?首先,我們循序漸進(jìn)的來了解下順序消息的實(shí)現(xiàn)。

順序消息業(yè)務(wù)使用場景

1、電商場景中傳遞訂單狀態(tài)。

2、同步mysql 的binlong 日志,數(shù)據(jù)庫的操作是有順序的。

3、其他消息之間有先后的依賴關(guān)系,后一條消息需要依賴于前一條消息的處理結(jié)果的情況。

等等。。。

消息中間件中的順序消息

順序消息(FIFO 消息)是 MQ 提供的一種嚴(yán)格按照順序進(jìn)行發(fā)布和消費(fèi)的消息類型。順序消息由兩個(gè)部分組成:順序發(fā)布和順序消費(fèi)。

順序消息包含兩種類型:

分區(qū)順序:一個(gè)Partition(queue)內(nèi)所有的消息按照先進(jìn)先出的順序進(jìn)行發(fā)布和消費(fèi)

全局順序:一個(gè)Topic內(nèi)所有的消息按照先進(jìn)先出的順序進(jìn)行發(fā)布和消費(fèi).但是全局順序極大的降低了系統(tǒng)的吞吐量,不符合mq的設(shè)計(jì)初衷。

那么折中的辦法就是選擇分區(qū)順序。

【局部順序消費(fèi)】

如何保證順序

在MQ的模型中,順序需要由3個(gè)階段去保障:

  1. 消息被發(fā)送時(shí)保持順序
  2. 消息被存儲時(shí)保持和發(fā)送的順序一致
  3. 消息被消費(fèi)時(shí)保持和存儲的順序一致

發(fā)送時(shí)保持順序意味著對于有順序要求的消息,用戶應(yīng)該在同一個(gè)線程中采用同步的方式發(fā)送。存儲保持和發(fā)送的順序一致則要求在同一線程中被發(fā)送出來的消息A和B,存儲時(shí)在空間上A一定在B之前。而消費(fèi)保持和存儲一致則要求消息A、B到達(dá)Consumer之后必須按照先A后B的順序被處理。

第一點(diǎn),消息順序發(fā)送,多線程發(fā)送的消息無法保證有序性,因此,需要業(yè)務(wù)方在發(fā)送時(shí),針對同一個(gè)業(yè)務(wù)編號(如同一筆訂單)的消息需要保證在一個(gè)線程內(nèi)順序發(fā)送,在上一個(gè)消息發(fā)送成功后,在進(jìn)行下一個(gè)消息的發(fā)送。對應(yīng)到mq中,消息發(fā)送方法就得使用同步發(fā)送,異步發(fā)送無法保證順序性。

第二點(diǎn),消息順序存儲,mq的topic下會存在多個(gè)queue,要保證消息的順序存儲,同一個(gè)業(yè)務(wù)編號的消息需要被發(fā)送到一個(gè)queue中。對應(yīng)到mq中,需要使用MessageQueueSelector來選擇要發(fā)送的queue,即對業(yè)務(wù)編號進(jìn)行hash,然后根據(jù)隊(duì)列數(shù)量對hash值取余,將消息發(fā)送到一個(gè)queue中。

第三點(diǎn),消息順序消費(fèi),要保證消息順序消費(fèi),同一個(gè)queue就只能被一個(gè)消費(fèi)者所消費(fèi),因此對broker中消費(fèi)隊(duì)列加鎖是無法避免的。同一時(shí)刻,一個(gè)消費(fèi)隊(duì)列只能被一個(gè)消費(fèi)者消費(fèi),消費(fèi)者內(nèi)部,也只能有一個(gè)消費(fèi)線程來消費(fèi)該隊(duì)列。即,同一時(shí)刻,一個(gè)消費(fèi)隊(duì)列只能被一個(gè)消費(fèi)者中的一個(gè)線程消費(fèi)。

RocketMQ中順序的實(shí)現(xiàn)

【Producer端】

Producer端確保消息順序唯一要做的事情就是將消息路由到特定的分區(qū),在RocketMQ中,通過MessageQueueSelector來實(shí)現(xiàn)分區(qū)的選擇。

  1. /** 
  2.  * 消息隊(duì)列選擇器 
  3.  */ 
  4. public interface MessageQueueSelector { 
  5.  
  6.     /** 
  7.      * 選擇消息隊(duì)列 
  8.      * 
  9.      * @param mqs 消息隊(duì)列 
  10.      * @param msg 消息 
  11.      * @param arg 參數(shù) 
  12.      * @return 消息隊(duì)列 
  13.      */ 
  14.     MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg); 
  • List mqs:消息要發(fā)送的Topic下所有的分區(qū)
  • Message msg:消息對象
  • 額外的參數(shù):用戶可以傳遞自己的參數(shù)

比如如下實(shí)現(xiàn)就可以保證相同的訂單的消息被路由到相同的分區(qū):

  1. long orderId = ((Order) object).getOrderId; 
  2. return mqs.get(orderId % mqs.size()); 

【Consumer端】

嘗試鎖定鎖定MessageQueue。

首先我們?nèi)绾伪WC一個(gè)隊(duì)列只被一個(gè)消費(fèi)者消費(fèi)?

消費(fèi)隊(duì)列存在于broker端,如果想保證一個(gè)隊(duì)列被一個(gè)消費(fèi)者消費(fèi),那么消費(fèi)者在進(jìn)行消息拉取消費(fèi)時(shí)就必須向mq服務(wù)器申請隊(duì)列鎖,消費(fèi)者申請隊(duì)列鎖的代碼存在于RebalanceService消息隊(duì)列負(fù)載的實(shí)現(xiàn)代碼中。

消費(fèi)者重新負(fù)載,并且分配完消費(fèi)隊(duì)列后,需要向mq服務(wù)器發(fā)起消息拉取請求,代碼實(shí)現(xiàn)在RebalanceImpl#updateProcessQueueTableInRebalance中,針對順序消息的消息拉取,mq做了如下判斷:

  1. // 增加 不在processQueueTable && 存在于mqSet 里的消息隊(duì)列。 
  2.        List<PullRequest> pullRequestList = new ArrayList<>(); // 拉消息請求數(shù)組 
  3.        for (MessageQueue mq : mqSet) { 
  4.            if (!this.processQueueTable.containsKey(mq)) { 
  5.                if (isOrder && !this.lock(mq)) { // 順序消息鎖定消息隊(duì)列 
  6.                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); 
  7.                    continue
  8.                } 
  9.  
  10.                this.removeDirtyOffset(mq); 
  11.                ProcessQueue pq = new ProcessQueue(); 
  12.                long nextOffset = this.computePullFromWhere(mq); 
  13.                if (nextOffset >= 0) { 
  14.                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); 
  15.                    if (pre != null) { 
  16.                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); 
  17.                    } else { 
  18.                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); 
  19.                        PullRequest pullRequest = new PullRequest(); 
  20.                        pullRequest.setConsumerGroup(consumerGroup); 
  21.                        pullRequest.setNextOffset(nextOffset); 
  22.                        pullRequest.setMessageQueue(mq); 
  23.                        pullRequest.setProcessQueue(pq); 
  24.                        pullRequestList.add(pullRequest); 
  25.                        changed = true
  26.                    } 
  27.                } else { 
  28.                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); 
  29.                } 
  30.            } 
  31.        } 
  32.  
  33.        // 發(fā)起消息拉取請求 
  34.        this.dispatchPullRequest(pullRequestList); 

核心思想就是,消費(fèi)客戶端先向broker端發(fā)起對messageQueue的加鎖請求,只有加鎖成功時(shí)才創(chuàng)建pullRequest進(jìn)行消息拉取,下面看下lock加鎖請求方法:

  1. /** 
  2.     * 請求Broker獲得指定消息隊(duì)列的分布式鎖 
  3.     * 
  4.     * @param mq 隊(duì)列 
  5.     * @return 是否成功 
  6.     */ 
  7.    public boolean lock(final MessageQueue mq) { 
  8.        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); 
  9.        if (findBrokerResult != null) { 
  10.            LockBatchRequestBody requestBody = new LockBatchRequestBody(); 
  11.            requestBody.setConsumerGroup(this.consumerGroup); 
  12.            requestBody.setClientId(this.mQClientFactory.getClientId()); 
  13.            requestBody.getMqSet().add(mq); 
  14.  
  15.            try { 
  16.                // 請求Broker獲得指定消息隊(duì)列的分布式鎖 
  17.                Set<MessageQueue> lockedMq = 
  18.                    this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); 
  19.  
  20.                // 設(shè)置消息處理隊(duì)列鎖定成功。鎖定消息隊(duì)列成功,可能本地沒有消息處理隊(duì)列,設(shè)置鎖定成功會在lockAll()方法。 
  21.                for (MessageQueue mmqq : lockedMq) { 
  22.                    ProcessQueue processQueue = this.processQueueTable.get(mmqq); 
  23.                    if (processQueue != null) { 
  24.                        processQueue.setLocked(true); 
  25.                        processQueue.setLastLockTimestamp(System.currentTimeMillis()); 
  26.                    } 
  27.                } 
  28.  
  29.                boolean lockOK = lockedMq.contains(mq); 
  30.                log.info("the message queue lock {}, {} {}"
  31.                    lockOK ? "OK" : "Failed"
  32.                    this.consumerGroup, 
  33.                    mq); 
  34.                return lockOK; 
  35.            } catch (Exception e) { 
  36.                log.error("lockBatchMQ exception, " + mq, e); 
  37.            } 
  38.        } 
  39.  
  40.        return false
  41.    } 

代碼實(shí)現(xiàn)邏輯比較清晰,就是調(diào)用lockBatchMQ方法發(fā)送了一個(gè)加鎖請求,那么broker端收到加鎖請求后的處理邏輯又是怎么樣?

【broker端實(shí)現(xiàn)】

broker端收到加鎖請求的處理邏輯在RebalanceLockManager#tryLockBatch方法中,RebalanceLockManager中關(guān)鍵屬性如下:

  1. /** 
  2.      * 消息隊(duì)列鎖過期時(shí)間,默認(rèn)60s 
  3.      */ 
  4.     private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty( 
  5.         "rocketmq.broker.rebalance.lockMaxLiveTime""60000")); 
  6.     /** 
  7.      * 鎖 
  8.      */ 
  9.     private final Lock lock = new ReentrantLock(); 
  10.     /** 
  11.      * 消費(fèi)分組的消息隊(duì)列鎖映射 
  12.      */ 
  13.     private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable = 
  14.             new ConcurrentHashMap<>(1024); 

LockEntry對象中關(guān)鍵屬性如下:

  1. /** 
  2.     * 鎖定記錄 
  3.     */ 
  4.    static class LockEntry { 
  5.        /** 
  6.         * 客戶端編號 
  7.         */ 
  8.        private String clientId; 
  9.        /** 
  10.         * 最后鎖定時(shí)間 
  11.         */ 
  12.        private volatile long lastUpdateTimestamp = System.currentTimeMillis(); 
  13.  
  14.        public String getClientId() { 
  15.            return clientId; 
  16.        } 
  17.  
  18.        public void setClientId(String clientId) { 
  19.            this.clientId = clientId; 
  20.        } 
  21.  
  22.        public long getLastUpdateTimestamp() { 
  23.            return lastUpdateTimestamp; 
  24.        } 
  25.  
  26.        public void setLastUpdateTimestamp(long lastUpdateTimestamp) { 
  27.            this.lastUpdateTimestamp = lastUpdateTimestamp; 
  28.        } 
  29.  
  30.        /** 
  31.         * 是否鎖定 
  32.         * 
  33.         * @param clientId 客戶端編號 
  34.         * @return 是否 
  35.         */ 
  36.        public boolean isLocked(final String clientId) { 
  37.            boolean eq = this.clientId.equals(clientId); 
  38.            return eq && !this.isExpired(); 
  39.        } 
  40.  
  41.        /** 
  42.         * 鎖定是否過期 
  43.         * 
  44.         * @return 是否 
  45.         */ 
  46.        public boolean isExpired() { 
  47.            boolean expired = 
  48.                (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME; 
  49.  
  50.            return expired; 
  51.        } 
  52.    } 

broker端通過對ConcurrentMap> mqLockTable的維護(hù)來達(dá)到messageQueue加鎖的目的,使得同一時(shí)刻,一個(gè)messageQueue只能被一個(gè)消費(fèi)者消費(fèi)。

【再次回到Consumer端,拿到鎖后】

消費(fèi)者對messageQueue的加鎖已經(jīng)成功,那么就進(jìn)入到了第二個(gè)步驟,創(chuàng)建pullRequest進(jìn)行消息拉取,消息拉取部分的代碼實(shí)現(xiàn)在PullMessageService中,消息拉取完后,需要提交到ConsumeMessageService中進(jìn)行消費(fèi),順序消費(fèi)的實(shí)現(xiàn)為ConsumeMessageOrderlyService,提交消息進(jìn)行消費(fèi)的方法為ConsumeMessageOrderlyService#submitConsumeRequest,具體實(shí)現(xiàn)如下:

  1. @Override 
  2.  public void submitConsumeRequest(// 
  3.      final List<MessageExt> msgs, // 
  4.      final ProcessQueue processQueue, // 
  5.      final MessageQueue messageQueue, // 
  6.      final boolean dispathToConsume) { 
  7.      if (dispathToConsume) { 
  8.          ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); 
  9.          this.consumeExecutor.submit(consumeRequest); 
  10.      } 
  11.  } 

構(gòu)建了一個(gè)ConsumeRequest對象,并提交給了ThreadPoolExecutor來并行消費(fèi),看下順序消費(fèi)的ConsumeRequest的run方法實(shí)現(xiàn):

  1. public void run() { 
  2.            if (this.processQueue.isDropped()) { 
  3.                log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); 
  4.                return
  5.            } 
  6.  
  7.            // 獲得 Consumer 消息隊(duì)列鎖 
  8.            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); 
  9.            synchronized (objLock) { 
  10.                // (廣播模式) 或者 (集群模式 && Broker消息隊(duì)列鎖有效) 
  11.                if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) 
  12.                    || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { 
  13.                    final long beginTime = System.currentTimeMillis(); 
  14.                    // 循環(huán) 
  15.                    for (boolean continueConsume = true; continueConsume; ) { 
  16.                        if (this.processQueue.isDropped()) { 
  17.                            log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); 
  18.                            break; 
  19.                        } 
  20.  
  21.                        // 消息隊(duì)列分布式鎖未鎖定,提交延遲獲得鎖并消費(fèi)請求 
  22.                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) 
  23.                            && !this.processQueue.isLocked()) { 
  24.                            log.warn("the message queue not locked, so consume later, {}", this.messageQueue); 
  25.                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); 
  26.                            break; 
  27.                        } 
  28.                        // 消息隊(duì)列分布式鎖已經(jīng)過期,提交延遲獲得鎖并消費(fèi)請求 
  29.                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) 
  30.                            && this.processQueue.isLockExpired()) { 
  31.                            log.warn("the message queue lock expired, so consume later, {}", this.messageQueue); 
  32.                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); 
  33.                            break; 
  34.                        } 
  35.  
  36.                        // 當(dāng)前周期消費(fèi)時(shí)間超過連續(xù)時(shí)長,默認(rèn):60s,提交延遲消費(fèi)請求。默認(rèn)情況下,每消費(fèi)1分鐘休息10ms。 
  37.                        long interval = System.currentTimeMillis() - beginTime; 
  38.                        if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { 
  39.                            ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10); 
  40.                            break; 
  41.                        } 
  42.  
  43.                        // 獲取消費(fèi)消息。此處和并發(fā)消息請求不同,并發(fā)消息請求已經(jīng)帶了消費(fèi)哪些消息。 
  44.                        final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); 
  45.                        List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize); 
  46.                        if (!msgs.isEmpty()) { 
  47.                            final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue); 
  48.  
  49.                            ConsumeOrderlyStatus status = null
  50.  
  51.                            // Hook:before 
  52.                            ConsumeMessageContext consumeMessageContext = null
  53.                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { 
  54.                                consumeMessageContext = new ConsumeMessageContext(); 
  55.                                consumeMessageContext 
  56.                                    .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup()); 
  57.                                consumeMessageContext.setMq(messageQueue); 
  58.                                consumeMessageContext.setMsgList(msgs); 
  59.                                consumeMessageContext.setSuccess(false); 
  60.                                // init the consume context type 
  61.                                consumeMessageContext.setProps(new HashMap<String, String>()); 
  62.                                ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); 
  63.                            } 
  64.  
  65.                            // 執(zhí)行消費(fèi) 
  66.                            long beginTimestamp = System.currentTimeMillis(); 
  67.                            ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; 
  68.                            boolean hasException = false
  69.                            try { 
  70.                                this.processQueue.getLockConsume().lock(); // 鎖定隊(duì)列消費(fèi)鎖 
  71.  
  72.                                if (this.processQueue.isDropped()) { 
  73.                                    log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}"
  74.                                        this.messageQueue); 
  75.                                    break; 
  76.                                } 
  77.  
  78.                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); 
  79.                            } catch (Throwable e) { 
  80.                                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", // 
  81.                                    RemotingHelper.exceptionSimpleDesc(e), // 
  82.                                    ConsumeMessageOrderlyService.this.consumerGroup, // 
  83.                                    msgs, // 
  84.                                    messageQueue); 
  85.                                hasException = true
  86.                            } finally { 
  87.                                this.processQueue.getLockConsume().unlock(); // 鎖定隊(duì)列消費(fèi)鎖 
  88.                            } 
  89.  
  90.                            if (null == status // 
  91.                                || ConsumeOrderlyStatus.ROLLBACK == status// 
  92.                                || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { 
  93.                                log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", // 
  94.                                    ConsumeMessageOrderlyService.this.consumerGroup, // 
  95.                                    msgs, // 
  96.                                    messageQueue); 
  97.                            } 
  98.  
  99.                            // 解析消費(fèi)結(jié)果狀態(tài) 
  100.                            long consumeRT = System.currentTimeMillis() - beginTimestamp; 
  101.                            if (null == status) { 
  102.                                if (hasException) { 
  103.                                    returnType = ConsumeReturnType.EXCEPTION; 
  104.                                } else { 
  105.                                    returnType = ConsumeReturnType.RETURNNULL; 
  106.                                } 
  107.                            } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { 
  108.                                returnType = ConsumeReturnType.TIME_OUT; 
  109.                            } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { 
  110.                                returnType = ConsumeReturnType.FAILED; 
  111.                            } else if (ConsumeOrderlyStatus.SUCCESS == status) { 
  112.                                returnType = ConsumeReturnType.SUCCESS; 
  113.                            } 
  114.  
  115.                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { 
  116.                                consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); 
  117.                            } 
  118.  
  119.                            if (null == status) { 
  120.                                status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; 
  121.                            } 
  122.  
  123.                            // Hook:after 
  124.                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { 
  125.                                consumeMessageContext.setStatus(status.toString()); 
  126.                                consumeMessageContext 
  127.                                    .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status); 
  128.                                ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); 
  129.                            } 
  130.  
  131.                            ConsumeMessageOrderlyService.this.getConsumerStatsManager() 
  132.                                .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); 
  133.  
  134.                            // 處理消費(fèi)結(jié)果 
  135.                            continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this); 
  136.                        } else { 
  137.                            continueConsume = false
  138.                        } 
  139.                    } 
  140.                } else { 
  141.                    if (this.processQueue.isDropped()) { 
  142.                        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); 
  143.                        return
  144.                    } 
  145.  
  146.                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); 
  147.                } 
  148.            } 
  149.        } 

獲取到鎖對象后,使用synchronized嘗試申請線程級獨(dú)占鎖。

如果加鎖成功,同一時(shí)刻只有一個(gè)線程進(jìn)行消息消費(fèi)。

如果加鎖失敗,會延遲100ms重新嘗試向broker端申請鎖定messageQueue,鎖定成功后重新提交消費(fèi)請求

至此,第三個(gè)關(guān)鍵點(diǎn)的解決思路也清晰了,基本上就兩個(gè)步驟。

創(chuàng)建消息拉取任務(wù)時(shí),消息客戶端向broker端申請鎖定MessageQueue,使得一個(gè)MessageQueue同一個(gè)時(shí)刻只能被一個(gè)消費(fèi)客戶端消費(fèi)。

消息消費(fèi)時(shí),多線程針對同一個(gè)消息隊(duì)列的消費(fèi)先嘗試使用synchronized申請獨(dú)占鎖,加鎖成功才能進(jìn)行消費(fèi),使得一個(gè)MessageQueue同一個(gè)時(shí)刻只能被一個(gè)消費(fèi)客戶端中一個(gè)線程消費(fèi)。

【順序消費(fèi)問題拆解】

  1. broke 上要保證一個(gè)隊(duì)列只有一個(gè)進(jìn)程消費(fèi),即一個(gè)隊(duì)列同一時(shí)間只有一個(gè)consumer 消費(fèi)
  2. broker 給consumer 的消息順序應(yīng)該保持一致,這個(gè)通過 rpc傳輸,序列化后消息順序不變,所以很容易實(shí)現(xiàn)
  3. consumer 上的隊(duì)列消息要保證同一個(gè)時(shí)間只有一個(gè)線程消費(fèi)

通過問題的拆分,問題變成同一個(gè)共享資源串行處理了,要解決這個(gè)問題,通常的做法都是訪問資源的時(shí)候加鎖,即broker 上一個(gè)隊(duì)列消息在被consumer 訪問的必須加鎖,單個(gè)consumer 端多線程并發(fā)處理消息的時(shí)候需要加鎖;這里還需要考慮broker 鎖的異常情況,假如一個(gè)broke 隊(duì)列上的消息被consumer 鎖住了,萬一consumer 崩潰了,這個(gè)鎖就釋放不了,所以broker 上的鎖需要加上鎖的過期時(shí)間。

實(shí)際上 RocketMQ 消費(fèi)端也就是照著上面的思路做:

RocketMQ中順序消息注意事項(xiàng)

實(shí)際項(xiàng)目中并不是所有情況都需要用到順序消息,但這也是設(shè)計(jì)方案的時(shí)候容易忽略的一點(diǎn)

順序消息是生產(chǎn)者和消費(fèi)者配合協(xié)調(diào)作用的結(jié)果,但是消費(fèi)端保證順序消費(fèi),是保證不了順序消息的

消費(fèi)端并行方式消費(fèi),只設(shè)置一次拉取消息的數(shù)量為 1(即配置參數(shù) consumeBatchSize ),是否可以實(shí)現(xiàn)順序消費(fèi) ?這里實(shí)際是不能的,并發(fā)消費(fèi)在消費(fèi)端有多個(gè)線程同時(shí)消費(fèi),consumeBatchSize 只是一個(gè)線程一次拉取消息的數(shù)量,對順序消費(fèi)沒有意義,這里大家有興趣可以看 ConsumeMessageConcurrentlyService 的代碼,并發(fā)消費(fèi)的邏輯都在哪里。

在使用順序消息時(shí),一定要注意其異常情況的出現(xiàn),對于順序消息,當(dāng)消費(fèi)者消費(fèi)消息失敗后,消息隊(duì)列 RocketMQ 版會自動不斷地進(jìn)行消息重試(每次間隔時(shí)間為 1 秒),重試最大值是Integer.MAX_VALUE.這時(shí),應(yīng)用會出現(xiàn)消息消費(fèi)被阻塞的情況。因此,建議您使用順序消息時(shí),務(wù)必保證應(yīng)用能夠及時(shí)監(jiān)控并處理消費(fèi)失敗的情況,避免阻塞現(xiàn)象的發(fā)生。

重要的事再強(qiáng)調(diào)一次:在使用順序消息時(shí),一定要注意其異常情況的出現(xiàn)!防止資源不釋放!

小結(jié)

通過以上的了解,我們知道了實(shí)現(xiàn)順序消息所必要的條件:順序發(fā)送、順序存儲、順序消費(fèi)。RocketMQ的設(shè)計(jì)中考慮到了這些,我們只需要簡單的使用API,不需要額外使用代碼來約束業(yè)務(wù),使得實(shí)現(xiàn)順序消息更加簡單。

 

責(zé)任編輯:姜華 來源: 小汪哥寫代碼
相關(guān)推薦

2021-07-14 17:18:14

RocketMQ消息分布式

2021-07-08 07:16:24

RocketMQ數(shù)據(jù)結(jié)構(gòu)Message

2021-07-07 15:29:52

存儲RocketMQ體系

2021-07-16 18:44:42

RocketMQ知識

2021-07-09 07:15:48

RocketMQ數(shù)據(jù)結(jié)構(gòu)kafka

2022-06-27 11:04:24

RocketMQ順序消息

2021-07-12 10:25:03

RocketMQ數(shù)據(jù)結(jié)構(gòu)kafka

2021-07-07 07:06:31

Brokerkafka架構(gòu)

2015-07-28 17:52:36

IOS知識體系

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2012-03-08 11:13:23

企業(yè)架構(gòu)

2017-06-22 13:07:21

2017-02-27 16:42:23

Spark識體系

2017-04-03 15:35:13

知識體系架構(gòu)

2021-07-05 06:26:08

生產(chǎn)者kafka架構(gòu)

2021-07-08 05:52:34

Kafka架構(gòu)主從架構(gòu)

2023-09-04 08:00:53

提交事務(wù)消息

2015-07-16 10:15:44

web前端知識體系

2021-04-15 09:17:01

SpringBootRocketMQ

2020-09-09 09:15:58

Nginx體系進(jìn)程
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號