分布式延時消息的另外一種選擇 Redisson
前言
因為工作中需要用到分布式的延時隊列,調(diào)研了一段時間,選擇使用 Redisson DelayedQueue,為了搞清楚內(nèi)部運行流程,特記錄下來。
總體流程大概是圖中的這個樣子,初看一眼有點不知從何下手,接下來我會通過以下幾點來分析流程,相信看完本文你能了解整個運行流程。
- 基本使用
- 內(nèi)部數(shù)據(jù)結(jié)構(gòu)介紹
- 基本流程
- 發(fā)送延時消息
- 獲取延時消息
- 初始化延時隊列
圖片
基本使用
發(fā)送延遲消息代碼如下,發(fā)送了一條延遲時間為 5s 的消息。
public void produce() {
String queuename = "delay-queue";
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
delayedQueue.offer("測試延遲消息", 5, TimeUnit.SECONDS);
}
接收消息代碼如下,可以看到 delayedQueue 是沒有用到的,那么為什么要加這一行呢,這個后面總結(jié)部分回答。
public void consume() throws InterruptedException {
String queuename = "delay-queue";
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
String msg = blockingQueue.take();
//收到消息進行處理...
}
這兩段代碼可以寫在兩個不同的 Java 工程里,只要連接的是同一個 Redis 就行。
調(diào)用 comsume() 之后,如果隊列里沒有消息,會阻塞等待隊列里有消息并且取到了才會返回。之所以這么說是因為可能有別的 Java 進程也在跟你一樣取同一個隊列里的消息,如果消息被另一個搶完了,那這時就還得阻塞等待。
這時看上去的原理是這樣的:
生產(chǎn)者調(diào)用 offer() 后,自己內(nèi)部開啟一個定時器,等到了時間再發(fā)送到 redis 的 list 里。
圖片
如果是這樣設(shè)計的話,相信大家都能看出來一個很簡單的問題,要是延時時間還沒到,生產(chǎn)者自己掛了,那樣消息就丟了。所以還是讓我們接著往下看。
內(nèi)部數(shù)據(jù)結(jié)構(gòu)介紹
redisson 源碼里一共創(chuàng)建了三個隊列:【消息延時隊列】、【消息順序隊列】、【消息目標(biāo)隊列】。
圖片
假設(shè)在同一時間按照 msg1、msg2、msg3 的順序發(fā)消息到延時隊列,這三條消息就會被保存在【消息延時隊列】和【消息順序隊列】。
可以看到【消息延時隊列】的順序是按照到期時間升序排列的,而不是像【消息順序隊列】按照插入順序排。
消息到期后會將消息從前兩個隊列移除(怎么移?誰來移?),插入【消息目標(biāo)隊列】,也就是圖中第三個隊列。
消費者也是阻塞在【消息目標(biāo)隊列】上取消息。
這時可以簡單說明下每個隊列的作用:
- 【消息延時隊列】利用按照到期時間排序的特性,可以很快找到下一個要到期的消息,客戶端內(nèi)部自己定時到【消息目標(biāo)隊列】取
- 【消息順序隊列】這個隊列對分析的流程關(guān)聯(lián)不大,可以忽略
- 【消息目標(biāo)隊列】存放到期的消息,供消費端取
其實【消息延時隊列】隊列里存的時間(也就是 zet 的 score)是到期的時間戳,為了畫圖方便,圖里就畫的是延遲的時間,不過不影響理解。
理解好這幾個隊列的名字和作用,后面還會一直用到,如果忘了可以翻回來回顧下。
因為書寫理解方便和【消息順序隊列】在本文沒涉及到,后面部分好幾次提到的內(nèi)容:把到期的消息從【消息延時隊列】移到【消息目標(biāo)隊列】里,這句話實際的代碼邏輯是這樣:把【消息延時隊列】和【消息順序隊列】里的到期消息移除,把它們插入到【消息目標(biāo)隊列】。
基本流程
知道了內(nèi)部所使用到的數(shù)據(jù)結(jié)構(gòu)后,這里可以簡單說下整體的基本流程。
先說發(fā)送延遲消息,發(fā)送的延遲消息會先存在【消息延時隊列】和【消息順序隊列】,如果【消息延時隊列】原本是空的,會發(fā)布訂閱信息提醒有新的消息。
獲取延遲消息只需要從【消息目標(biāo)隊列】阻塞的取就行了,因為里面都是到期數(shù)據(jù)。
那么問題就只剩下怎么樣判斷時間到了,把【消息延時隊列】里的消息移動到【消息目標(biāo)隊列】里呢?
這部分工作交給了初始化延時隊列來處理。
這里面會定時從【消息延時隊列】查詢最新到期時間,定時去把【消息延時隊列】里的消息移動到【消息目標(biāo)隊列】里。
如果【消息延時隊列】是空的,就不會再定時查,而是等待發(fā)布訂閱信息提醒,再定時把【消息延時隊列】里的消息移動到【消息目標(biāo)隊列】里。
剛開始看可能有點抽象,可以看完底下一節(jié)內(nèi)容之后,再回頭來看這里對應(yīng)的流程總結(jié),可能會比較清晰。
發(fā)送延時消息
發(fā)送延時消息的邏輯比較簡單,先看下發(fā)送的代碼。
public void produce() {
String queuename = "delay-queue";
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
delayedQueue.offer("測試延遲消息", 5, TimeUnit.SECONDS);
}
從 delayedQueue.offer 方法開始,最終會執(zhí)行到 RedissonDelayedQueue 的 offerAsync 方法里。
offerAsync 方法的作用就是發(fā)送一段腳本給 redis 執(zhí)行,腳本內(nèi)容是:
- 將消息和到期時間插入【消息延時隊列】和【消息順序隊列】
- 如果最近到期的消息是剛剛插入的消息,則對指定主題發(fā)布到期時間,目的是為了讓客戶端定時去把【消息延時隊列】里的到期數(shù)據(jù)移動到【消息目標(biāo)隊列】
@Override
public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
if (delay < 0) {
throw new IllegalArgumentException("Delay can't be negative");
}
long delayInMs = timeUnit.toMillis(delay);
long timeout = System.currentTimeMillis() + delayInMs;
long randomId = ThreadLocalRandom.current().nextLong();
return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
"local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);"
+ "redis.call('zadd', KEYS[2], ARGV[1], value);"
+ "redis.call('rpush', KEYS[3], value);"
// if new object added to queue head when publish its startTime
// to all scheduler workers
+ "local v = redis.call('zrange', KEYS[2], 0, 0); "
+ "if v[1] == value then "
+ "redis.call('publish', KEYS[4], ARGV[1]); "
+ "end;",
Arrays.<Object>asList(getRawName(), timeoutSetName, queueName, channelName),
timeout, randomId, encode(e));
}
獲取延時消息
獲取延時消息是本文最簡單的一部分。
public void consume() throws InterruptedException {
String queuename = "delay-queue";
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
String msg = blockingQueue.take();
//收到消息進行處理...
}
blockingQueue.take() 方法其實只是對【消息目標(biāo)隊列】執(zhí)行 blpop 阻塞的獲取到期消息
初始化延時隊列
看一下初始化的代碼。
public void init() {
String queuename = "delay-queue";
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
}
入口就是在 redissonClient.getDelayedQueue(blockingQueue) 中,創(chuàng)建了 RedissonDelayedQueue 對象,并執(zhí)行了構(gòu)造方法里的邏輯。
那么這里面主要做了什么事呢?
主要是調(diào)用了 QueueTransferTask 的 start() 方法。
public void start() {
RTopic schedulerTopic = getTopic();
statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {
@Override
public void onSubscribe(String channel) {
pushTask();
}
});
messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() {
@Override
public void onMessage(CharSequence channel, Long startTime) {
scheduleTask(startTime);
}
});
}
這段代碼主要是設(shè)置了指定主題(主題名:redisson_delay_queue_channel:{queuename})兩個發(fā)布訂閱的監(jiān)聽器。
- 當(dāng)指定主題有新訂閱時調(diào)用 pushTask() 方法,里面又會調(diào)用 pushTaskAsync() 方法
- 當(dāng)指定主題有新消息時調(diào)用 scheduleTask(startTime) 方法
需要注意的是,這里會先訂閱指定主題,然后觸發(fā)執(zhí)行 onSubscribe() 方法。
所以我們主要搞懂這三個方法都是做什么的,那么整個初始化流程就明白了。
因為這三個方法是相互調(diào)用的,只看文字的話容易云里霧里,這里有個流程圖,看方法解釋文字的時候可以對照著流程圖看比較有印象。
圖片
- scheduleTask()這個方法看起來多,但核心內(nèi)容就是根據(jù)方法參數(shù)指定的時間調(diào)用 pushTask()。
private void scheduleTask(final Long startTime) {
TimeoutTask oldTimeout = lastTimeout.get();
if (startTime == null) {
return;
}
if (oldTimeout != null) {
oldTimeout.getTask().cancel();
}
long delay = startTime - System.currentTimeMillis();
if (delay > 10) {
Timeout timeout = connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
pushTask();
TimeoutTask currentTimeout = lastTimeout.get();
if (currentTimeout.getTask() == timeout) {
lastTimeout.compareAndSet(currentTimeout, null);
}
}
}, delay, TimeUnit.MILLISECONDS);
if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {
timeout.cancel();
}
} else {
pushTask();
}
}
- pushTaskAsync()這個方法是抽象方法,在創(chuàng)建 RedissonDelayedQueue 對象的時候傳進來的,代碼如下:
@Override
protected RFuture<Long> pushTaskAsync() {
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ "if #expiredValues > 0 then "
+ "for i, v in ipairs(expiredValues) do "
+ "local randomId, value = struct.unpack('dLc0', v);"
+ "redis.call('rpush', KEYS[1], value);"
+ "redis.call('lrem', KEYS[3], 1, v);"
+ "end; "
+ "redis.call('zrem', KEYS[2], unpack(expiredValues));"
+ "end; "
// get startTime from scheduler queue head task
+ "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
+ "if v[1] ~= nil then "
+ "return v[2]; "
+ "end "
+ "return nil;",
Arrays.<Object>asList(getRawName(), timeoutSetName, queueName),
System.currentTimeMillis(), 100);
}
看不懂也不要緊,聽我解釋下就明白了。
這里發(fā)送了一段腳本給 redis 執(zhí)行:
我的理解就是初始化的時候
1是為了處理舊的消息,比如生產(chǎn)者1發(fā)送了消息,然后時間沒到自己下線了,這時如果沒有其他客戶端在線,就沒有人能把數(shù)據(jù)從【消息目標(biāo)隊列】移到【消息目標(biāo)隊列】了。
2是返回的這個時間戳,會拿這個定時,等時間到了去【消息目標(biāo)隊列】拉取到期的消息。
簡單總結(jié)就是這個方法是把到期消息從【消息延時隊列】放到【消息目標(biāo)隊列】里,并且返回了最近要到期消息的時間戳。
- 從【消息延時隊列】取出前一百條到期的消息,如果有的話,添加到【消息目標(biāo)隊列】里,并將這些消息從【消息延時隊列】和【消息順序隊列】中移除
- 從【消息延時隊列】取出下一條要到期的消息,返回它的到期時間戳(如果隊列里沒消息返回空)。
- pushTask()
private void pushTask() {
RFuture<Long> startTimeFuture = pushTaskAsync();
startTimeFuture.whenComplete((res, e) -> {
if (e != null) {
if (e instanceof RedissonShutdownException) {
return;
}
log.error(e.getMessage(), e);
scheduleTask(System.currentTimeMillis() + 5 * 1000L);
return;
}
if (res != null) {
scheduleTask(res);
}
});
}
這個代碼看起來就比較簡單,調(diào)用了 pushTaskAsync() 獲取最近要到期消息的時間戳(異步封裝了一下)。
有異常的話就調(diào)用 scheduleTask() 五秒后再執(zhí)行一次 pushTask()。
沒有異常的話如果有最近要到期消息的時間戳(說明【消息延時隊列】里還有未到期消息),用這個最新到期時間調(diào)用 scheduleTask(),在這個指定的時間調(diào)用 pushTask()。
這個方法簡單總結(jié)就是決定了要不要調(diào)用、什么時候再調(diào)用 pushTask(),主要操作邏輯都在 pushTaskAsync() 里(把到期的消息從【消息延時隊列】移到【消息目標(biāo)隊列】供消費端消費)。
了解了上面幾個方法的流程和含義,還記得一開頭提到的添加了兩個發(fā)布訂閱的監(jiān)聽器嗎?
1.當(dāng)指定主題有新訂閱時調(diào)用 pushTask() 方法,里面又會調(diào)用 pushTaskAsync() 方法
2.當(dāng)指定主題有新消息時調(diào)用 scheduleTask(startTime) 方法
需要注意的是,這里會先訂閱指定主題,然后觸發(fā)執(zhí)行 onSubscribe() 方法
- 在初始化延時隊列剛啟動的時候,處理到期舊數(shù)據(jù):把到期的消息從【消息延時隊列】移到【消息目標(biāo)隊列】供消費端消費;處理新數(shù)據(jù):獲取下次到期時間決定下次調(diào)用 pushTask() 的時間。上面講的這種情況是站在當(dāng)前客戶端的視角,但畢竟這是監(jiān)聽訂閱信息,如果啟動不止一個客戶端的話(就算是1個生產(chǎn)者1個消費者,也算兩個客戶端),總有一個客戶端的訂閱信息回調(diào)函數(shù),會不會有問題?仔細想想是沒有的,處理到期舊數(shù)據(jù):之前啟動的客戶端已經(jīng)處理完了;處理新數(shù)據(jù):獲取最近到期時間,在 scheduleTask() 里,如果之前有正在定時的任務(wù),會把原來正在定時的任務(wù)取消掉。這個被取消的任務(wù),時間要么就是當(dāng)前這個時間,要么是之后的時間,取消掉不會影響邏輯。
- 為了應(yīng)對原本【消息延時隊列】里沒消息了這種情況,流程結(jié)束了,重啟定時去調(diào)用 pushTask() ,把到期的消息從【消息延時隊列】移到【消息目標(biāo)隊列】供消費端消費。
總結(jié)
再放一下開頭的圖總體流程圖:
圖片
- 初始化延時隊列時會把【消息延時隊列】里的到期數(shù)據(jù)移動到【消息目標(biāo)隊列】,沒有也有可能;然后是找最近要到期的消息時間,定時去拉,這個剛啟動也是可能沒有的,不過不要緊,這兩步是為了處理滯留在【消息延時隊列】的舊數(shù)據(jù)(在發(fā)送了延時消息后,還沒到期時所有客戶端都下線了,這樣就沒人能把【消息延時隊列】里的到期數(shù)據(jù)移動到【消息目標(biāo)隊列】里,就會出現(xiàn)這種情況);最主要的還是設(shè)置了發(fā)布訂閱監(jiān)聽器,當(dāng)有人發(fā)送延時消息的時候能收到通知,定時去將【消息延時隊列】里的到期數(shù)據(jù)移動到【消息目標(biāo)隊列】。
- 發(fā)送延時消息會先發(fā)送到【消息延時隊列】和【消息順序隊列】,如果【消息延時隊列】里沒有數(shù)據(jù),則將剛發(fā)送的到期時間發(fā)布到指定主題,提醒其他客戶端有新消息。
- 初始化延時隊列時設(shè)置的發(fā)布訂閱監(jiān)聽器把【消息延時隊列】里的到期數(shù)據(jù)移動到【消息目標(biāo)隊列】里。
- 獲取延遲消息只需要執(zhí)行 blpop 阻塞的獲取【消息目標(biāo)隊列】的消息就可以了。
這里回答開頭部分說的問題,到這看完了本文,你可以試著自己想一想這個問題的答案。
接收消息代碼如下,可以看到 delayedQueue 是沒有用到的,那么為什么要加這一行呢,這個后面總結(jié)部分回答。
public void consume() throws InterruptedException {
String queuename = "delay-queue";
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
String msg = blockingQueue.take();
//收到消息進行處理...
}
其實這個問題也是我開發(fā)過程中遇到的一個奇怪的地方,接收方代碼沒有初始化延時隊列。
首先再啰嗦一句,初始化延時隊列的作用是會定時去把【消息延時隊列】里的到期數(shù)據(jù)移動到【消息目標(biāo)隊列】。
如果只有發(fā)送方初始化延時隊列:
- 發(fā)送方發(fā)送了延遲消息,在到期之前下線了(它就不能把【消息延時隊列】里的到期數(shù)據(jù)移動到【消息目標(biāo)隊列】),而且沒有其他發(fā)送方。
- 接收方不管有多少個,都沒人能把【消息延時隊列】里的到期數(shù)據(jù)移動到【消息目標(biāo)隊列】。
所以接收方代碼里也初始化延時隊列能夠避免一部分數(shù)據(jù)丟失問題。