自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

面試官:RocketMQ 的推模式和拉模式有什么區(qū)別?

開發(fā) 架構(gòu)
PULL 模式是從 Broker 拉取消息后放入緩存,然后消費端不停地從緩存取出消息來執(zhí)行客戶端定義的處理邏輯,而 PUSH 模式是在死循環(huán)中不停的從 Broker 拉取消息,拉取到后調(diào)用回調(diào)函數(shù)進行處理,回調(diào)函數(shù)中調(diào)用客戶端定義的處理邏輯。

大家好,我是君哥。

RocketMQ 消息消費有兩種模式,PULL 和 PUSH,今天我們來看一下這兩種模式有什么區(qū)別。

PUSH 模式

首先看一段 RocketMQ 推模式的一個官方示例:

public static void main(String[] args) throws InterruptedException, MQClientException {
Tracer tracer = initTracer();
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageOpenTracingHookImpl(tracer));
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}

消費者會定義一個消息監(jiān)聽器,并且把這個監(jiān)聽器注冊到 DefaultMQPushConsumer,同時也會注冊到 DefaultMQPushConsumerIm-pl,當拉取到消息時,就會使用這個監(jiān)聽器來處理消息。那這個監(jiān)聽器是什么時候調(diào)用呢?看下面的 UML 類圖:

消費者真正拉取請求的類是 DefaultMQPush-ConsumerImpl,這個類的 pullMessage 方法調(diào)用了 PullAPIWrapper 的 pullKernelImpl 方法,這個方法有一個參數(shù)是回調(diào)函數(shù) Pull-Callback,當 PULL 狀態(tài)是 PullStatus.FOU-ND 時,代表拉取消息成功,處理邏輯如下:

PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);

switch (pullResult.getPullStatus()) {
case FOUND:
//省略部分邏輯
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
//省略部分邏輯
break;
//省略其他case
default:
break;
}
}
}

@Override
public void onException(Throwable e) {
//省略
}
};

這個處理邏輯調(diào)用了 ConsumeMessage-Service 類的 submitConsumeRequest 方法,我們看一下并發(fā)消費消息的處理邏輯,代碼如下:

public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
//分批處理,跟上面邏輯一致
}

ConsumeRequest 類是一個線程類,run 方法里面調(diào)用了消費者定義的消息處理方法,代碼如下:

public void run() {
//省略邏輯
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
//省略邏輯
try {
//調(diào)用消費方法
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
//省略邏輯
}
//省略邏輯
}

下面以并發(fā)消費方式下的同步拉取消息為例總結(jié)一下消費者消息處理過程:

  1. 在 MessageListenerConcurrently 中定義消費者處理邏輯,消費者啟動時注冊到 DefaultMQPushConsumer 和 DefaultMQ-PushConsumerImpl。
  2. 消費者啟動時,啟動消費拉取線程 PullMessageService,里面死循環(huán)不停地從 Broker 拉取消息。這里調(diào)用了 DefaultMQPushConsumerImpl 類的 pullMessage 方法。
  3. DefaultMQPushConsumerImpl 類的 pullMessage 方法調(diào)用 PullAPIWrapper 的 pullKernelImpl 方法真正去發(fā)送 PULL 請求,并傳入 PullCallback 的 回調(diào)函數(shù)。
  4. 拉取到消息后,調(diào)用 PullCallback 的 onSuccess 方法處理結(jié)果,這里調(diào)用了 ConsumeMessageConcurrentlyService 的 submitConsumeRequest 方法,里面用 ConsumeRequest 線程來處理拉取到的消息。
  5. ConsumeRequest 處理消息時調(diào)用了消費端定義的消費邏輯,也就是 Message-ListenerConcurrently 的 consumeMessage 方法。

PULL 模式

下面是來自官方的一段 PULL 模式拉取消息的代碼:

DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.start();
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s%n", messageExts);
}
} finally {
litePullConsumer.shutdown();
}

這里我們看到,PULL 模式需要在處理邏輯里不停的去拉取消息,比如上面代碼中寫了一個死循環(huán)。那 PULL 模式中 poll 函數(shù)是怎么實現(xiàn)的呢?我們看下面的 UML 類圖:

跟蹤源碼可以看到,消息拉取最終是從 DefaultLitePullConsumerImpl 類中的一個 LinkedBlockingQueue 上面拉取。那消息是什么時候 put 到 LinkedBlockingQueue 呢?

官方拉取消息的代碼中有一個 subscribe 方法訂閱了 Topic,這里相關(guān)的 UML 類圖如下:

這個 subscribe 方法最終調(diào)用了 DefaultLite-PullConsumerImpl 類的 subscribe,代碼如下:

public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
try {
//省略邏輯
this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
//省略邏輯
} catch (Exception e) {
throw new MQClientException("subscribe exception", e);
}
}

這里給 DefaultLitePullConsumer 類的 messageQueueListener 這個監(jiān)聽器進行了賦值。當監(jiān)聽器監(jiān)聽到 MessageQueue 發(fā)送變化時,就會啟動消息拉取消息的線程 Pull-TaskImpl,代碼如下:

public void run() {
//省略部分邏輯
if (!this.isCancelled()) {
long pullDelayTimeMills = 0;
try {
PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
switch (pullResult.getPullStatus()) {
case FOUND:
final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
synchronized (objLock) {
if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
processQueue.putMessage(pullResult.getMsgFoundList());
submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
}
}
break;
//省略其他 case
}
}
//省略 catch
if (!this.isCancelled()) {
//啟動下一次拉取
scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
} else {
log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
}
}
}

拉取消息成功后,調(diào)用 submitConsume-Request 方法把拉取到的消息放到 consumeRequestCache,然后啟動下一次拉取。

這樣就清除了示例代碼中 poll 消息的邏輯,那還有一個問題,監(jiān)聽器是什么時候觸發(fā)監(jiān)聽事件呢?

在消費者啟動時,會啟動 RebalanceService 這個線程,這個線程的 run 方法如下:

public void run() {
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
}

下面的 UML 類圖顯示了 doRebalance 方法的調(diào)用關(guān)系:

可以看到最終調(diào)用了 最終調(diào)用了 Rebalance-LitePullImpl 的 messageQueueChanged 方法,代碼如下:


public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
if (messageQueueListener != null) {
try {
messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
} catch (Throwable e) {
log.error("messageQueueChanged exception", e);
}
}
}

這里最終觸發(fā)了監(jiān)聽器。

下面以并發(fā)消費方式下的同步拉取消息為例總結(jié)一下消費者消息處理過程:

  1. 消費者啟動,向 DefaultLitePullConsumer 訂閱了 Topic,這個訂閱過程會向 DefaultLitePullConsumer 注冊一個監(jiān)聽器。
  2. 消費者啟動過程中,會啟動 Message-Queue 重平衡線程 Rebalance-Service,當重平衡過程發(fā)現(xiàn) ProcessQueueTable 發(fā)生變化時,啟動消息拉取線程。
  3. 消息拉取線程拉取到消息后,把消息放到 consumeRequestCache,然后進行下一次拉取。
  4. 消費者啟動后,不停地從 consumeReq-uestCache 拉取消息進行處理。

總結(jié)

通過本文的講解,可以看到 PUSH 模式和 PULL 模式本質(zhì)上都是客戶端主動拉取,RocketMQ并沒有真正實現(xiàn) Broker 推送消息的 PUSH 模式。RocketMQ 中 PULL 模式和 PUSH 模式的區(qū)別如下:

PULL 模式是從 Broker 拉取消息后放入緩存,然后消費端不停地從緩存取出消息來執(zhí)行客戶端定義的處理邏輯,而 PUSH 模式是在死循環(huán)中不停的從 Broker 拉取消息,拉取到后調(diào)用回調(diào)函數(shù)進行處理,回調(diào)函數(shù)中調(diào)用客戶端定義的處理邏輯。

PUSH 模式拉取消息依賴死循環(huán)來不停喚起業(yè)務(wù),而 PULL 模式拉取消息是通過 MessageQueue 監(jiān)聽器來觸發(fā)消息拉取線程,消息拉取線程會在拉取完一次后接著下一次拉取。

責(zé)任編輯:姜華 來源: 君哥聊技術(shù)
點贊
收藏

51CTO技術(shù)棧公眾號