Redis實現(xiàn)延遲隊列的方案總結(jié)
redis是我們項目開發(fā)中常見的技術(shù)中間件,它除了可以實現(xiàn)常見的分布式鎖和分布式緩存功能之外,還可以幫助我們實現(xiàn)很多的功能,如延遲隊列。下面介紹幾種redis常見的實現(xiàn)延遲隊列的方案。
1、通過過期key通知實現(xiàn)
圖片
實現(xiàn)思路:首先開啟redis的key過期通知,然后在業(yè)務中給key設置過期時間,到了過期時間后redis會自動的將過期的key消息推送給監(jiān)聽者,從而實現(xiàn)延遲任務。
核心的代碼實現(xiàn):
#1、開始redis的過期通知
notify-keyspace-events Ex
#2、監(jiān)聽redis的過期key
@Component
@Slf4j
public class RedisExpireKeyService extends
KeyExpirationEventMessageListener {
/**
* Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages.
*
* @param listenerContainer must not be {@literal null}.
*/
public RedisExpireKeyService(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
* 監(jiān)聽過期的key
*
*/
@Override
public void onMessage(Message message, byte[] pattern) {
String expireKey = message.toString();
//執(zhí)行具體的業(yè)務
System.out.println("監(jiān)聽到key=" + expireKey + ",已經(jīng)過期");
}
}
生產(chǎn)環(huán)境是不推薦使用此方案,原因Redis 的過期策略采用的是惰性刪除和定期刪除相結(jié)合的方式,redis并不保證 key 在過期時會被立即刪除操作,此方案適用于平時自己項目練習的時候使用。
2、通過Zset數(shù)據(jù)類型+定時任務實現(xiàn)
圖片
實現(xiàn)思路:ZSet 是一種有序集合類型,它可以存儲不重復的元素,并且給每個元素賦予一個 double 類型的排序權(quán)重值(score),所以可以將元素的過期時間作為分值,通過定時任務掃描的方式判斷是否達到過期時間,從而實現(xiàn)延遲隊列。
核心的代碼實現(xiàn):
#使用xxl-job
@JobName("consumerTaskJob")
public void consumerTaskJob() {
String expireKey = "ExPIRE_KEY";
try {
//獲取當前時間
double currentTime = System.currentTimeMillis();
//獲取超時的數(shù)據(jù)
Set<String> expiredMemberSet = redisTemplate.opsForZSet().rangeByScore(expireKey, Double.MIN_VALUE, currentTime);
//過期key
for (String expiredMember : expiredMemberSet) {
//todo 做實際的延遲任務
//從ZSet中移除數(shù)據(jù)
redisTemplate.opsForZSet().remove(expireKey, expiredMember);
}
} catch (Exception e) {
log.error("數(shù)據(jù)處理失敗",e);
}
}
Zset+定時任務的實現(xiàn)延遲任務的方式雖然比監(jiān)聽過期key方案合理一些,但是它還是存在一定的缺陷,如無重試機制、延遲時間固定化(依賴定時任務的執(zhí)行時間)、不適用于大規(guī)模的延遲任務。
3、Redisson實現(xiàn)延遲隊列
Redisson是一個操作Redis的 Java 客戶框架,它提供了RDelayedQueue 接口和 RQueue 接口可以實現(xiàn)延遲隊列(Redisson 提供的延遲隊列底層也是基于 Zset 數(shù)據(jù)結(jié)構(gòu)實現(xiàn)的)。
核心的代碼實現(xiàn):
#1、添加依賴
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.0</version>
</dependency>
#2、添加數(shù)據(jù)到隊列中
//創(chuàng)建RedissonClient實例
RedissonClient redissonClient = Redisson.create();
//創(chuàng)建阻塞隊列
RBlockingDeque<String> queue = redissonClient.getBlockingDeque("delayQueue");
//創(chuàng)建延遲隊列并關聯(lián)到阻塞隊列
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(queue);
//添加延遲任務
delayedQueue.offer("Task1", 5000, TimeUnit.MILLISECONDS);
#3、消費數(shù)據(jù)
while (true) {
try {
//獲取并移除隊首元素,如果隊列為空,則阻塞等待
String task = queue.take();
System.out.println("Task: " + task);
} catch (Exception e) {
log.error("消費失敗",e);
}
}
總結(jié):Redis實現(xiàn)的延遲隊列適用于處理一些比較簡單的業(yè)務,如發(fā)送郵件、發(fā)送通知等,對于復雜的業(yè)務不適用于Redis的延遲任務方案。