自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

RocketMQ消息回溯實踐與解析

開發(fā) 前端
消息回溯功能是 RocketMQ 提供給業(yè)務(wù)方的定心丸,業(yè)務(wù)在出現(xiàn)任何無法恢復(fù)的問題后,都可以及時通過消息回溯來恢復(fù)業(yè)務(wù)或者訂正數(shù)據(jù)。特別是在流或者批計算的場景,重跑數(shù)據(jù)往往是常態(tài)。

1 問題背景

前段時間,小A公司的短信服務(wù)出現(xiàn)了問題,導(dǎo)致一段時間內(nèi)的短信沒有發(fā)出去,等服務(wù)修復(fù)后,需要重新補發(fā)這批數(shù)據(jù)。

由于短信服務(wù)是直接通過RocketMQ觸發(fā),因此在修復(fù)這些數(shù)據(jù)的時候,小A犯了難,于是就有了以下對話

領(lǐng)導(dǎo):小A呀,這數(shù)據(jù)這么多,你準備怎么修呀?

小A:頭大呀領(lǐng)導(dǎo),一般業(yè)務(wù)我們都有一個本地消息表來做冪等,我只需要把數(shù)據(jù)庫表的狀態(tài)重置,然后把數(shù)據(jù)撈出來重新循環(huán)執(zhí)行就可以啦,但是短信服務(wù)我們沒有本地表呀!

領(lǐng)導(dǎo):那你有什么想法嗎?

小A:簡單的話,那就讓上游重發(fā)吧,我們再消費一遍就好了。

領(lǐng)導(dǎo):這樣問題就更嚴重了呀,你想,上游重發(fā)一遍,那是不是所有的消費者組都要重新消費一遍,到時候其他業(yè)務(wù)同學(xué)就要來找你了。

小A:那就不好辦了。。。

領(lǐng)導(dǎo):其實RocketMQ有專門的消息回溯的能力,你可以試試

小A:這么神奇?我研究研究。。。

2 驗證

2.1 生產(chǎn)者啟動

準備一個新的topic,并發(fā)送1W條消息

public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 10000; i++) {
            try {
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }

2.2 消費者啟動

準備一個新的消費者組,消費topic下數(shù)據(jù)并記錄總條數(shù)

public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.subscribe("TopicTest", "*");
    
    final AtomicInteger count = new AtomicInteger();
    
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            count.incrementAndGet();
            System.out.printf("%s Receive New Messages End: %s %n", Thread.currentThread().getName(), msgs);
            System.out.println(count.get());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    consumer.start();
}

消費者消息記錄消費者消息記錄

2.3 執(zhí)行回溯

命令行執(zhí)行

mqadmin.cmd resetOffsetByTime -n 127.0.0.1:9876 -t TopicTest -g please_rename_unique_group_name_4 -s 1722240069000

以下為mqadmin.cmd的內(nèi)容,因此也可以直接通過調(diào)用MQAdminStartup的main方法執(zhí)行

MQAdminStartup手動執(zhí)行MQAdminStartup手動執(zhí)行

代碼執(zhí)行:

public static void main(String[] args) {
    String[] params = new String[]{"resetOffsetByTime","-n","127.0.0.1:9876","-t", "TopicTest", "-g", "please_rename_unique_group_name_4", "-s", "1722240069000"};
    MQAdminStartup.main(params);
}

2.4 結(jié)果驗證

客戶端重置成功記錄客戶端重置成功記錄

消費者重新消費記錄消費者重新消費記錄

2.5 驗證小結(jié)

從結(jié)果上來看,消費者offset被重置到了指定的時間戳位置,由于指定時間戳早于最早消息的創(chuàng)建時間,因此重新消費了所有未被刪除的消息。

那rocketmq究竟做了什么呢?

2.5.1 分析參數(shù)

動作標(biāo)識:resetOffsetByTime

額外參數(shù):

-n nameserver的地址

-t 指定topic名稱

-g 指定消費者組名稱

-s 指定回溯時間

2.5.2 思考

消息回溯思考消息回溯思考

3 分析

以下源碼部分均出自4.2.0版本,展示代碼有所精簡。

3.1 策略模式,解析命令行

org.apache.rocketmq.tools.command.MQAdminStartup#main

/*根據(jù)動作標(biāo)識解析除對應(yīng)的處理類,我們本次請求實際處理策略類:ResetOffsetByTimeCommand*/
SubCommand cmd = findSubCommand(args[0]);
/*解析命令行*/
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options),
                            new PosixParser());
                            
/*提交請求執(zhí)行*/
cmd.execute(commandLine, options, rpcHook);

3.2 創(chuàng)建客戶端,與服務(wù)端交互

org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand#execute

public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
    DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
    
    String group = commandLine.getOptionValue("g").trim();//消費者組
    String topic = commandLine.getOptionValue("t").trim();//主題
    String timeStampStr = commandLine.getOptionValue("s").trim();//重置時間戳
    long timestamp = timeStampStr.equals("now") ? System.currentTimeMillis() : Long.parseLong(timeStampStr);//重置時間戳
    boolean isC = false;//是否C客戶端
    boolean force = true;//是否強制重置,這里提前解釋一下,有可能時間戳對應(yīng)的offset比當(dāng)前消費進度要大,強制的話會出現(xiàn)部分消息消費不到
    if (commandLine.hasOption('f')) {
        force = Boolean.valueOf(commandLine.getOptionValue("f").trim());
    }

    /*與nameserver以及broker交互的客戶端啟動*/
    defaultMQAdminExt.start();
    /*正式執(zhí)行命令*/
    Map<MessageQueue, Long> offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic, group, timestamp, force, isC);
}

3.3 獲取topic對應(yīng)的broker地址,提交重置請求

org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetByTimestamp

public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
    boolean isC)
    throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    /*從nameserver處獲取broker地址*/
    TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
    /*由于消息數(shù)據(jù)分區(qū)分片,topic下的messagequeue可能存在多個broker上,因此這是個列表*/
    List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
    Map<MessageQueue, Long> allOffsetTable = new HashMap<MessageQueue, Long>();
    if (brokerDatas != null) {
        for (BrokerData brokerData : brokerDatas) {
            String addr = brokerData.selectBrokerAddr();
            if (addr != null) {
                /*循環(huán)與各個broker交互,執(zhí)行重置操作*/
                Map<MessageQueue, Long> offsetTable =
                    this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce,
                        timeoutMillis, isC);
                if (offsetTable != null) {
                    allOffsetTable.putAll(offsetTable);
                }
            }
        }
    }
    return allOffsetTable;
}

3.4 與 nameserver交互獲取broker地址

org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#examineTopicRouteInfo

public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
    boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
    GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
    requestHeader.setTopic(topic);
 /*同樣的組裝參數(shù),請求碼:105*/
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);

    /*創(chuàng)建請求與nameserver交互*/
    RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
    byte[] body = response.getBody();
    if (body != null) {
        return TopicRouteData.decode(body, TopicRouteData.class);
    }
}

3.4.1 nameserver收到請求,獲取路由信息并返回

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final GetRouteInfoRequestHeader requestHeader =
        (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);

    /*nameserver內(nèi)部存儲topic的路由信息*/
    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
 byte[] content = topicRouteData.encode();
    response.setBody(content);
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

3.4.2 RouteInfoManager的核心屬性

//topic路由信息,根據(jù)這個做負載均衡,QueueData里面記錄brokerName
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//broke基本信息 名稱  所在集群信息   主備broke地址  brokerId=0表示master   >0表示slave
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//集群信息,包含集群所有的broke信息
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//存活的broke信息,以及對應(yīng)的channel
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
//broke的過濾類信息
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

3.5 與broker交互,執(zhí)行重置操作

org.apache.rocketmq.client.impl.MQClientAPIImpl#invokeBrokerToResetOffset

public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group,
    final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
    throws RemotingException, MQClientException, InterruptedException {
    
    ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
    requestHeader.setTopic(topic);
    requestHeader.setGroup(group);
    requestHeader.setTimestamp(timestamp);
    requestHeader.setForce(isForce);

    /*同樣的組裝參數(shù),請求碼:222*/
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader);
    if (isC) {
        request.setLanguage(LanguageCode.CPP);
    }
 /*創(chuàng)建請求與broker交互,注意這里是同步invokeSync*/
    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
    if (response.getBody() != null) {
        ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
        return body.getOffsetTable();
    }
}

broker收到請求,開始處理;

org.apache.rocketmq.broker.client.net.Broker2Client#resetOffset

public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
    boolean isC) {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);

    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);

    /*記錄下該消費者組消費topic下的隊列要重置到哪條offset*/
    Map<MessageQueue/*隊列*/, Long/*offser*/> offsetTable = new HashMap<MessageQueue, Long>();

    for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
        MessageQueue mq = new MessageQueue();
        mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
        mq.setTopic(topic);
        mq.setQueueId(i);

        /*broker可以獲取該topic下的consumergroup下的某個隊列的offset*/
        long consumerOffset =
            this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);//消費者組當(dāng)前已經(jīng)消費的offset
        if (-1 == consumerOffset) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(String.format("THe consumer group <%s> not exist", group));
            return response;
        }

        long timeStampOffset;
        if (timeStamp == -1) {
   //沒有指定表示當(dāng)前隊列最大的offset
            timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
        } else {
            //根據(jù)時間戳查到隊列下對應(yīng)的offset
            timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
        }

        if (timeStampOffset < 0) {
            //<0表示消息已經(jīng)被刪掉了
            log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
            timeStampOffset = 0;
        }

        /*如果isForce=false,則要重置的offset<當(dāng)前正在消費的offset才會重置。也過來,也就是說重置不僅會回溯,消費進度過慢也可以往后撥,加快消費進度*/
        if (isForce || timeStampOffset < consumerOffset) {
            offsetTable.put(mq, timeStampOffset);
        } else {
            offsetTable.put(mq, consumerOffset);
        }
    }

    /*確定了要先重置的offset之后開始與客戶端交互,準備客戶端重置,請求碼220*/
    ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
    requestHeader.setTopic(topic);
    requestHeader.setGroup(group);
    requestHeader.setTimestamp(timeStamp);
    RemotingCommand request =
        RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
    if (isC) {
        // c++ language
        ResetOffsetBodyForC body = new ResetOffsetBodyForC();
        List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable);
        body.setOffsetTable(offsetList);
        request.setBody(body.encode());
    } else {
        // other language
        ResetOffsetBody body = new ResetOffsetBody();
        body.setOffsetTable(offsetTable);
        request.setBody(body.encode());
    }

    /*拿到與當(dāng)前broker建立連接的消費者組客戶端信息*/
    ConsumerGroupInfo consumerGroupInfo =
        this.brokerController.getConsumerManager().getConsumerGroupInfo(group);

    if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
        //獲取長連接channel
        ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
            consumerGroupInfo.getChannelInfoTable();
        for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
            int version = entry.getValue().getVersion();
            /*這里版本可以判斷,只有客戶端版本>3.0.7才支持重置*/
            if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
                try {
                    /*注意這里是只管發(fā)不管收,可以簡單理解為異步了invokeOneway*/
                    this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
                    log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
                        topic, group, entry.getValue().getClientId());
                } catch (Exception e) {
                    log.error("[reset-offset] reset offset exception. topic={}, group={}",
                        new Object[] {topic, group}, e);
                }
            } else {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("the client does not support this feature. versinotallow="
                    + MQVersion.getVersionDesc(version));
                log.warn("[reset-offset] the client does not support this feature. versinotallow={}",
                    RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
                return response;
            }
        }
    } else {
        String errorInfo =
            String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
                requestHeader.getGroup(),
                requestHeader.getTopic(),
                requestHeader.getTimestamp());
        log.error(errorInfo);
        response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
        response.setRemark(errorInfo);
        return response;
    }
    response.setCode(ResponseCode.SUCCESS);
    ResetOffsetBody resBody = new ResetOffsetBody();
    resBody.setOffsetTable(offsetTable);
    response.setBody(resBody.encode());
    return response;
}

3.6 消費客戶端收到請求,開始處理

org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset

public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
    DefaultMQPushConsumerImpl consumer = null;
    try {
        /*根據(jù)消費者組找到對應(yīng)的消費實現(xiàn),即我們熟悉的DefaultMQPushConsumerImpl或者DefaultMQPullConsumerImpl*/
        MQConsumerInner impl = this.consumerTable.get(group);
        if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
            consumer = (DefaultMQPushConsumerImpl) impl;
        } else {
            //由于PullConsumer消費進度自己控制,因此直接返回
            log.info("[reset-offset] consumer dose not exist. group={}", group);
            return;
        }
        
        consumer.suspend();//暫停消費

        /*暫停消息拉取,以及待處理的消息緩存都清掉*/
        ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
        for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {
            MessageQueue mq = entry.getKey();
            if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
                ProcessQueue pq = entry.getValue();
                pq.setDropped(true);
                pq.clear();
            }
        }
  
        /*這里的等待實現(xiàn)比較簡單,與broker交互是同步,broker與consumer交互是異步,因此這里阻塞10秒是為了保證所有的consumer都在這里存儲offset并觸發(fā)reblance*/
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
        }

        Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();
        while (iterator.hasNext()) {
            MessageQueue mq = iterator.next();
            //獲取messagequeue應(yīng)該被重置的offset
            Long offset = offsetTable.get(mq);
            if (topic.equals(mq.getTopic()) && offset != null) {
                try {
                    /*更新更新本地offset,這里注意集群模式是先修改本地,然后定時任務(wù)每五秒上報broker,而廣播模式offset在本地存儲,因此只需要修改消費者本地的offset即可*/
                    consumer.updateConsumeOffset(mq, offset);
                    consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
                    iterator.remove();
                } catch (Exception e) {
                    log.warn("reset offset failed. group={}, {}", group, mq, e);
                }
            }
        }
    } finally {
        if (consumer != null) {
            /*重新觸發(fā)reblance,由于broker已經(jīng)重置的該消費者組的offset,因此重分配后以broker為準*/
            consumer.resume();
        }
    }
}

4 核心流程

消息回溯全流程消息回溯全流程

5 總結(jié)

消息回溯功能是 RocketMQ 提供給業(yè)務(wù)方的定心丸,業(yè)務(wù)在出現(xiàn)任何無法恢復(fù)的問題后,都可以及時通過消息回溯來恢復(fù)業(yè)務(wù)或者訂正數(shù)據(jù)。特別是在流或者批計算的場景,重跑數(shù)據(jù)往往是常態(tài)。

RocketMQ 能實現(xiàn)消息回溯功能得益于其簡單的位點管理機制,可以很容易通過 mqadmin 工具重置位點。但要注意,由于topic的消息實際都是存儲在broker上,且有一定的刪除機制,因此首先要確認需要消息回溯的集群broker不能下線節(jié)點或者回溯數(shù)據(jù)被刪除之前的時間點,確保消息不會丟失。

6 延申

通過消息回溯的功能,我們可以任意向前或者向后撥動offset,那當(dāng)我們想要指定一個區(qū)間進行消費,這個時候怎么辦呢。比如當(dāng)消費進度過慢,我們選擇向后撥動offset,那就會有一部分未消費的消息出現(xiàn),針對這部分消息,我們應(yīng)該在空余時間把他消費完成,就需要指定區(qū)間來消費了。

其實通過上面代碼org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset中我們可以看到,對于DefaultMQPullConsumerImpl類型的消費者,消息重置是不生效的,這是因為DefaultMQPullConsumerImpl的消費進度完全由消費者來控制,那我們就可以采用拉模式來進行消費。

示例代碼:

public class PullConsumerLocalTest {
    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
    private static final Map<MessageQueue, Pair<Long/*最小offset*/,Long/*最大offset*/>> QUEUE_OFFSE_SECTION_TABLE = new HashMap<>();
    private static final Long MIN_TIMESTAMP = 1722240069000L;//最小時間戳
    private static final Long MAX_TIMESTAMP = 1722240160000L;//最大時間戳

    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();

        /*初始化待處理的offset*/
        String topic = "TopicTest";
        init(consumer, topic);

        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic);
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult =
                        consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            //check max offset and dosomething...
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        consumer.shutdown();
    }

    private static void init(DefaultMQPullConsumer consumer, String topic) throws MQClientException {
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic);
        for (MessageQueue mq : mqs) {
            long minOffset = consumer.searchOffset(mq, MIN_TIMESTAMP);
            long maxOffset = consumer.searchOffset(mq, MAX_TIMESTAMP);
            //記錄區(qū)間內(nèi)范圍內(nèi)最小以及最大的offset
            QUEUE_OFFSE_SECTION_TABLE.put(mq, new Pair<>(minOffset, maxOffset));
            //將最小offset寫為下次消費的初始offset
            OFFSE_TABLE.put(mq, minOffset);
        }
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if (offset != null)
            return offset;

        return 0;
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSE_TABLE.put(mq, offset);
    }

}

7 對比

方式

優(yōu)點

缺點

消費者本地消息表

業(yè)務(wù)完全可控

額外存儲開銷,重復(fù)消費需要單獨開發(fā)

消息重置

無需業(yè)務(wù)修改,支持廣播/集群,順序/無序消息(有冪等操作的需要重置狀態(tài))

低版本3.0.7之前不支持

pull手動控制

消費進度完全可控

需要考慮offset維護,復(fù)雜度較高

責(zé)任編輯:武曉燕 來源: 轉(zhuǎn)轉(zhuǎn)技術(shù)
相關(guān)推薦

2024-10-29 08:34:27

RocketMQ消息類型事務(wù)消息

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2024-09-25 08:32:05

2024-10-11 09:15:33

2025-02-27 08:50:00

RocketMQ開發(fā)代碼

2025-04-11 09:57:16

2022-09-07 21:43:34

云原生存儲技術(shù)消息隊列

2024-11-18 16:15:00

2022-12-22 10:03:18

消息集成

2023-07-17 08:34:03

RocketMQ消息初體驗

2022-06-02 08:21:07

RocketMQ消息中間件

2023-07-18 09:03:01

RocketMQ場景消息

2025-04-09 08:20:00

RocketMQ消息隊列開發(fā)

2023-12-21 08:01:41

RocketMQ消息堆積

2022-03-31 08:26:44

RocketMQ消息排查

2025-02-06 08:24:25

AQS開發(fā)Java

2023-05-15 08:24:46

2020-11-13 16:40:05

RocketMQ延遲消息架構(gòu)

2023-09-21 22:02:22

Go語言高級特性

2025-03-27 04:10:00

點贊
收藏

51CTO技術(shù)棧公眾號