1.6萬字+28張圖盤點(diǎn)11種延遲任務(wù)的實(shí)現(xiàn)方式
大家好,我是三友~~
延遲任務(wù)在我們?nèi)粘I钪斜容^常見,比如訂單支付超時(shí)取消訂單功能,又比如自動(dòng)確定收貨的功能等等。
所以本篇文章就來從實(shí)現(xiàn)到原理來盤點(diǎn)延遲任務(wù)的11種實(shí)現(xiàn)方式,這些方式并沒有絕對(duì)的好壞之分,只是適用場景的不大相同。
圖片
DelayQueue
DelayQueue是JDK提供的api,是一個(gè)延遲隊(duì)列。
圖片
DelayQueue泛型參數(shù)得實(shí)現(xiàn)Delayed接口,Delayed繼承了Comparable接口。
圖片
getDelay方法返回這個(gè)任務(wù)還剩多久時(shí)間可以執(zhí)行,小于0的時(shí)候說明可以這個(gè)延遲任務(wù)到了執(zhí)行的時(shí)間了。
compareTo這個(gè)是對(duì)任務(wù)排序的,保證最先到延遲時(shí)間的任務(wù)排到隊(duì)列的頭。
來個(gè)demo
@Getter
public class SanYouTask implements Delayed {
private final String taskContent;
private final Long triggerTime;
public SanYouTask(String taskContent, Long delayTime) {
this.taskContent = taskContent;
this.triggerTime = System.currentTimeMillis() + delayTime * 1000;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return this.triggerTime.compareTo(((SanYouTask) o).triggerTime);
}
}
SanYouTask實(shí)現(xiàn)了Delayed接口,構(gòu)造參數(shù)
- taskContent:延遲任務(wù)的具體的內(nèi)容
- delayTime:延遲時(shí)間,秒為單位
@Slf4j
public class DelayQueueDemo {
public static void main(String[] args) {
DelayQueue<SanYouTask> sanYouTaskDelayQueue = new DelayQueue<>();
new Thread(() -> {
while (true) {
try {
SanYouTask sanYouTask = sanYouTaskDelayQueue.take();
log.info("獲取到延遲任務(wù):{}", sanYouTask.getTaskContent());
} catch (Exception e) {
}
}
}).start();
log.info("提交延遲任務(wù)");
sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日記5s", 5L));
sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日記3s", 3L));
sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日記8s", 8L));
}
}
開啟一個(gè)線程從DelayQueue中獲取任務(wù),然后提交了三個(gè)任務(wù),延遲時(shí)間分為別5s,3s,8s。
測試結(jié)果:
圖片
成功實(shí)現(xiàn)了延遲任務(wù)。
實(shí)現(xiàn)原理
圖片
offer方法在提交任務(wù)的時(shí)候,會(huì)通過根據(jù)compareTo的實(shí)現(xiàn)對(duì)任務(wù)進(jìn)行排序,將最先需要被執(zhí)行的任務(wù)放到隊(duì)列頭。
take方法獲取任務(wù)的時(shí)候,會(huì)拿到隊(duì)列頭部的元素,也就是隊(duì)列中最早需要被執(zhí)行的任務(wù),通過getDelay返回值判斷任務(wù)是否需要被立刻執(zhí)行,如果需要的話,就返回任務(wù),如果不需要就會(huì)等待這個(gè)任務(wù)到延遲時(shí)間的剩余時(shí)間,當(dāng)時(shí)間到了就會(huì)將任務(wù)返回。
Timer
Timer也是JDK提供的api
先來demo
@Slf4j
public class TimerDemo {
public static void main(String[] args) {
Timer timer = new Timer();
log.info("提交延遲任務(wù)");
timer.schedule(new TimerTask() {
@Override
public void run() {
log.info("執(zhí)行延遲任務(wù)");
}
}, 5000);
}
}
通過schedule提交一個(gè)延遲時(shí)間為5s的延遲任務(wù)
圖片
實(shí)現(xiàn)原理
提交的任務(wù)是一個(gè)TimerTask
public abstract class TimerTask implements Runnable {
//忽略其它屬性
long nextExecutionTime;
}
TimerTask內(nèi)部有一個(gè)nextExecutionTime屬性,代表下一次任務(wù)執(zhí)行的時(shí)間,在提交任務(wù)的時(shí)候會(huì)計(jì)算出nextExecutionTime值。
Timer內(nèi)部有一個(gè)TaskQueue對(duì)象,用來保存TimerTask任務(wù)的,會(huì)根據(jù)nextExecutionTime來排序,保證能夠快速獲取到最早需要被執(zhí)行的延遲任務(wù)。
在Timer內(nèi)部還有一個(gè)執(zhí)行任務(wù)的線程TimerThread,這個(gè)線程就跟DelayQueue demo中開啟的線程作用是一樣的,用來執(zhí)行到了延遲時(shí)間的任務(wù)。
所以總的來看,Timer有點(diǎn)像整體封裝了DelayQueue demo中的所有東西,讓用起來簡單點(diǎn)。
雖然Timer用起來比較簡單,但是在阿里規(guī)范中是不推薦使用的,主要是有以下幾點(diǎn)原因:
- Timer使用單線程來處理任務(wù),長時(shí)間運(yùn)行的任務(wù)會(huì)導(dǎo)致其他任務(wù)的延時(shí)處理
- Timer沒有對(duì)運(yùn)行時(shí)異常進(jìn)行處理,一旦某個(gè)任務(wù)觸發(fā)運(yùn)行時(shí)異常,會(huì)導(dǎo)致整個(gè)Timer崩潰,不安全
ScheduledThreadPoolExecutor
由于Timer在使用上有一定的問題,所以在JDK1.5版本的時(shí)候提供了ScheduledThreadPoolExecutor,這個(gè)跟Timer的作用差不多,并且他們的方法的命名都是差不多的,但是ScheduledThreadPoolExecutor解決了單線程和異常崩潰等問題。
來個(gè)demo
@Slf4j
public class ScheduledThreadPoolExecutorDemo {
public static void main(String[] args) {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, new ThreadPoolExecutor.CallerRunsPolicy());
log.info("提交延遲任務(wù)");
executor.schedule(() -> log.info("執(zhí)行延遲任務(wù)"), 5, TimeUnit.SECONDS);
}
}
結(jié)果
圖片
實(shí)現(xiàn)原理
ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,也就是繼承了線程池,所以可以有很多個(gè)線程來執(zhí)行任務(wù)。
ScheduledThreadPoolExecutor在構(gòu)造的時(shí)候會(huì)傳入一個(gè)DelayedWorkQueue阻塞隊(duì)列,所以線程池內(nèi)部的阻塞隊(duì)列是DelayedWorkQueue。
圖片
在提交延遲任務(wù)的時(shí)候,任務(wù)會(huì)被封裝一個(gè)任務(wù)會(huì)被封裝成ScheduledFutureTask對(duì)象,然后放到DelayedWorkQueue阻塞隊(duì)列中。
ScheduledFutureTask
ScheduledFutureTask實(shí)現(xiàn)了前面提到的Delayed接口,所以其實(shí)可以猜到DelayedWorkQueue會(huì)根據(jù)ScheduledFutureTask對(duì)于Delayed接口的實(shí)現(xiàn)來排序,所以線程能夠獲取到最早到延遲時(shí)間的任務(wù)。
當(dāng)線程從DelayedWorkQueue中獲取到需要執(zhí)行的任務(wù)之后就會(huì)執(zhí)行任務(wù)。
RocketMQ
RocketMQ是阿里開源的一款消息中間件,實(shí)現(xiàn)了延遲消息的功能,如果有對(duì)RocketMQ不熟悉的小伙伴可以看一下我之前寫的RocketMQ保姆級(jí)教程和RocketMQ消息短暫而又精彩的一生 這兩篇文章。
RocketMQ延遲消息的延遲時(shí)間默認(rèn)有18個(gè)等級(jí)。
圖片
當(dāng)發(fā)送消息的時(shí)候只需要指定延遲等級(jí)即可。如果這18個(gè)等級(jí)的延遲時(shí)間不符和你的要求,可以修改RocketMQ服務(wù)端的配置文件。
來個(gè)demo
依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
<!--web依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
配置文件
rocketmq:
name-server: 192.168.200.144:9876 #服務(wù)器ip:nameServer端口
producer:
group: sanyouProducer
controller類,通過DefaultMQProducer發(fā)送延遲消息到sanyouDelayTaskTopic這個(gè)topic,延遲等級(jí)為2,也就是延遲時(shí)間為5s的意思。
@RestController
@Slf4j
public class RocketMQDelayTaskController {
@Resource
private DefaultMQProducer producer;
@GetMapping("/rocketmq/add")
public void addTask(@RequestParam("task") String task) throws Exception {
Message msg = new Message("sanyouDelayTaskTopic", "TagA", task.getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.setDelayTimeLevel(2);
// 發(fā)送消息并得到消息的發(fā)送結(jié)果,然后打印
log.info("提交延遲任務(wù)");
producer.send(msg);
}
}
創(chuàng)建一個(gè)消費(fèi)者,監(jiān)聽sanyouDelayTaskTopic的消息。
@Component
@RocketMQMessageListener(consumerGroup = "sanyouConsumer", topic = "sanyouDelayTaskTopic")
@Slf4j
public class SanYouDelayTaskTopicListener implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
log.info("獲取到延遲任務(wù):{}", msg);
}
}
啟動(dòng)應(yīng)用,瀏覽器輸入以下鏈接添加任務(wù)
http://localhost:8080/rocketmq/add?task=sanyou
測試結(jié)果:
圖片
實(shí)現(xiàn)原理
圖片
生產(chǎn)者發(fā)送延遲消息之后,RocketMQ服務(wù)端在接收到消息之后,會(huì)去根據(jù)延遲級(jí)別是否大于0來判斷是否是延遲消息
- 如果不大于0,說明不是延遲消息,那就會(huì)將消息保存到指定的topic中
- 如果大于0,說明是延遲消息,此時(shí)RocketMQ會(huì)進(jìn)行一波偷梁換柱的操作,將消息的topic改成SCHEDULE_TOPIC_XXXX中,XXXX不是占位符,然后存儲(chǔ)。
在BocketMQ內(nèi)部有一個(gè)延遲任務(wù),相當(dāng)于是一個(gè)定時(shí)任務(wù),這個(gè)任務(wù)就會(huì)獲取SCHEDULE_TOPIC_XXXX中的消息,判斷消息是否到了延遲時(shí)間,如果到了,那么就會(huì)將消息的topic存儲(chǔ)到原來真正的topic(拿我們的例子來說就是sanyouDelayTaskTopic)中,之后消費(fèi)者就可以從真正的topic中獲取到消息了。
圖片
定時(shí)任務(wù)
RocketMQ這種實(shí)現(xiàn)方式相比于前面提到的三種更加可靠,因?yàn)榍懊嫣岬降娜N任務(wù)內(nèi)容都是存在內(nèi)存的,服務(wù)器重啟任務(wù)就丟了,如果要實(shí)現(xiàn)任務(wù)不丟還得自己實(shí)現(xiàn)邏輯,但是RocketMQ消息有持久化機(jī)制,能夠保證任務(wù)不丟失。
RabbitMQ
RabbitMQ也是一款消息中間件,通過RabbitMQ的死信隊(duì)列也可以是先延遲任務(wù)的功能。
demo
引入RabbitMQ的依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
配置文件
spring:
rabbitmq:
host: 192.168.200.144 #服務(wù)器ip
port: 5672
virtual-host: /
RabbitMQ死信隊(duì)列的配置類,后面說原理的時(shí)候會(huì)介紹干啥的。
@Configuration
public class RabbitMQConfiguration {
@Bean
public DirectExchange sanyouDirectExchangee() {
return new DirectExchange("sanyouDirectExchangee");
}
@Bean
public Queue sanyouQueue() {
return QueueBuilder
//指定隊(duì)列名稱,并持久化
.durable("sanyouQueue")
//設(shè)置隊(duì)列的超時(shí)時(shí)間為5秒,也就是延遲任務(wù)的時(shí)間
.ttl(5000)
//指定死信交換機(jī)
.deadLetterExchange("sanyouDelayTaskExchangee")
.build();
}
@Bean
public Binding sanyouQueueBinding() {
return BindingBuilder.bind(sanyouQueue()).to(sanyouDirectExchangee()).with("");
}
@Bean
public DirectExchange sanyouDelayTaskExchange() {
return new DirectExchange("sanyouDelayTaskExchangee");
}
@Bean
public Queue sanyouDelayTaskQueue() {
return QueueBuilder
//指定隊(duì)列名稱,并持久化
.durable("sanyouDelayTaskQueue")
.build();
}
@Bean
public Binding sanyouDelayTaskQueueBinding() {
return BindingBuilder.bind(sanyouDelayTaskQueue()).to(sanyouDelayTaskExchange()).with("");
}
}
RabbitMQDelayTaskController用來發(fā)送消息,這里沒指定延遲時(shí)間,是因?yàn)樵诼暶麝?duì)列的時(shí)候指定了延遲時(shí)間為5s
@RestController
@Slf4j
public class RabbitMQDelayTaskController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/rabbitmq/add")
public void addTask(@RequestParam("task") String task) throws Exception {
// 消息ID,需要封裝到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("提交延遲任務(wù)");
// 發(fā)送消息
rabbitTemplate.convertAndSend("sanyouDirectExchangee", "", task, correlationData);
}
}
啟動(dòng)應(yīng)用,瀏覽器輸入以下鏈接添加任務(wù)。
http://localhost:8080/rabbitmq/add?task=sanyou
測試結(jié)果,成功實(shí)現(xiàn)5s的延遲任務(wù)。
實(shí)現(xiàn)原理
圖片
整個(gè)工作流程如下:
- 消息發(fā)送的時(shí)候會(huì)將消息發(fā)送到sanyouDirectExchange這個(gè)交換機(jī)上
- 由于sanyouDirectExchange綁定了sanyouQueue,所以消息會(huì)被路由到sanyouQueue這個(gè)隊(duì)列上
- 由于sanyouQueue沒有消費(fèi)者消費(fèi)消息,并且又設(shè)置了5s的過期時(shí)間,所以當(dāng)消息過期之后,消息就被放到綁定的sanyouDelayTaskExchange死信交換機(jī)中
- 消息到達(dá)sanyouDelayTaskExchange交換機(jī)后,由于跟sanyouDelayTaskQueue進(jìn)行了綁定,所以消息就被路由到sanyouDelayTaskQueue中,消費(fèi)者就能從sanyouDelayTaskQueue中拿到消息了
上面說的隊(duì)列與交換機(jī)的綁定關(guān)系,就是上面的配置類所干的事。
其實(shí)從這個(gè)單從消息流轉(zhuǎn)的角度可以看出,RabbitMQ跟RocketMQ實(shí)現(xiàn)有相似之處。
消息最開始都并沒有放到最終消費(fèi)者消費(fèi)的隊(duì)列中,而都是放到一個(gè)中間隊(duì)列中,等消息到了過期時(shí)間或者說是延遲時(shí)間,消息就會(huì)被放到最終的隊(duì)列供消費(fèi)者消息。
只不過RabbitMQ需要你顯示的手動(dòng)指定消息所在的中間隊(duì)列,而RocketMQ是在內(nèi)部已經(jīng)做好了這塊邏輯。
除了基于RabbitMQ的死信隊(duì)列來做,RabbitMQ官方還提供了延時(shí)插件,也可以實(shí)現(xiàn)延遲消息的功能,這個(gè)插件的大致原理也跟上面說的一樣,延時(shí)消息會(huì)被先保存在一個(gè)中間的地方,叫做Mnesia,然后有一個(gè)定時(shí)任務(wù)去查詢最近需要被投遞的消息,將其投遞到目標(biāo)隊(duì)列中。
監(jiān)聽Redis過期key
在Redis中,有個(gè)發(fā)布訂閱的機(jī)制。
圖片
生產(chǎn)者在消息發(fā)送時(shí)需要到指定發(fā)送到哪個(gè)channel上,消費(fèi)者訂閱這個(gè)channel就能獲取到消息。圖中channel理解成MQ中的topic。
并且在Redis中,有很多默認(rèn)的channel,只不過向這些channel發(fā)送消息的生產(chǎn)者不是我們寫的代碼,而是Redis本身。這里面就有這么一個(gè)channel叫做__keyevent@<db>__:expired,db是指Redis數(shù)據(jù)庫的序號(hào)。
當(dāng)某個(gè)Redis的key過期之后,Redis內(nèi)部會(huì)發(fā)布一個(gè)事件到__keyevent@<db>__:expired這個(gè)channel上,只要監(jiān)聽這個(gè)事件,那么就可以獲取到過期的key。
所以基于監(jiān)聽Redis過期key實(shí)現(xiàn)延遲任務(wù)的原理如下:
- 將延遲任務(wù)作為key,過期時(shí)間設(shè)置為延遲時(shí)間
- 監(jiān)聽__keyevent@<db>__:expired這個(gè)channel,那么一旦延遲任務(wù)到了過期時(shí)間(延遲時(shí)間),那么就可以獲取到這個(gè)任務(wù)
來個(gè)demo
Spring已經(jīng)實(shí)現(xiàn)了監(jiān)聽__keyevent@*__:expired這個(gè)channel這個(gè)功能,__keyevent@*__:expired中的*代表通配符的意思,監(jiān)聽所有的數(shù)據(jù)庫。
所以demo寫起來就很簡單了,只需4步即可
依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
配置文件
spring:
redis:
host: 192.168.200.144
port: 6379
配置類
@Configuration
public class RedisConfiguration {
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(connectionFactory);
return redisMessageListenerContainer;
}
@Bean
public KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer) {
return new KeyExpirationEventMessageListener(redisMessageListenerContainer);
}
}
KeyExpirationEventMessageListener實(shí)現(xiàn)了對(duì)__keyevent@*__:expiredchannel的監(jiān)聽。
圖片
當(dāng)KeyExpirationEventMessageListener收到Redis發(fā)布的過期Key的消息的時(shí)候,會(huì)發(fā)布RedisKeyExpiredEvent事件。
圖片
所以我們只需要監(jiān)聽RedisKeyExpiredEvent事件就可以拿到過期消息的Key,也就是延遲消息。
對(duì)RedisKeyExpiredEvent事件的監(jiān)聽實(shí)現(xiàn)MyRedisKeyExpiredEventListener
@Component
public class MyRedisKeyExpiredEventListener implements ApplicationListener<RedisKeyExpiredEvent> {
@Override
public void onApplicationEvent(RedisKeyExpiredEvent event) {
byte[] body = event.getSource();
System.out.println("獲取到延遲消息:" + new String(body));
}
}
代碼寫好,啟動(dòng)應(yīng)用。
之后我直接通過Redis命令設(shè)置消息,就沒通過代碼發(fā)送消息了,消息的key為sanyou,值為task,值不重要,過期時(shí)間為5s
set sanyou task
expire sanyou 5
成功獲取到延遲任務(wù)。
圖片
雖然這種方式可以實(shí)現(xiàn)延遲任務(wù),但是這種方式坑比較多。
任務(wù)存在延遲
Redis過期事件的發(fā)布不是指key到了過期時(shí)間就發(fā)布,而是key到了過期時(shí)間被清除之后才會(huì)發(fā)布事件。
而Redis過期key的兩種清除策略,就是面試八股文常背的兩種:
- 惰性清除。當(dāng)這個(gè)key過期之后,訪問時(shí),這個(gè)Key才會(huì)被清除
- 定時(shí)清除。后臺(tái)會(huì)定期檢查一部分key,如果有key過期了,就會(huì)被清除
所以即使key到了過期時(shí)間,Redis也不一定會(huì)發(fā)送key過期事件,這就到導(dǎo)致雖然延遲任務(wù)到了延遲時(shí)間也可能獲取不到延遲任務(wù)。
丟消息太頻繁
Redis實(shí)現(xiàn)的發(fā)布訂閱模式,消息是沒有持久化機(jī)制,當(dāng)消息發(fā)布到某個(gè)channel之后,如果沒有客戶端訂閱這個(gè)channel,那么這個(gè)消息就丟了,并不會(huì)像MQ一樣進(jìn)行持久化,等有消費(fèi)者訂閱的時(shí)候再給消費(fèi)者消費(fèi)。
所以說,假設(shè)服務(wù)重啟期間,某個(gè)生產(chǎn)者或者是Redis本身發(fā)布了一條消息到某個(gè)channel,由于服務(wù)重啟,沒有監(jiān)聽這個(gè)channel,那么這個(gè)消息自然就丟了。
消息消費(fèi)只有廣播模式
Redis的發(fā)布訂閱模式消息消費(fèi)只有廣播模式一種。
所謂的廣播模式就是多個(gè)消費(fèi)者訂閱同一個(gè)channel,那么每個(gè)消費(fèi)者都能消費(fèi)到發(fā)布到這個(gè)channel的所有消息。
圖片
如圖,生產(chǎn)者發(fā)布了一條消息,內(nèi)容為sanyou,那么兩個(gè)消費(fèi)者都可以同時(shí)收到sanyou這條消息。
所以,如果通過監(jiān)聽channel來獲取延遲任務(wù),那么一旦服務(wù)實(shí)例有多個(gè)的話,還得保證消息不能重復(fù)處理,額外地增加了代碼開發(fā)量。
接收到所有key的某個(gè)事件
這個(gè)不屬于Redis發(fā)布訂閱模式的問題,而是Redis本身事件通知的問題。
當(dāng)監(jiān)聽了__keyevent@<db>__:expired的channel,那么所有的Redis的key只要發(fā)生了過期事件都會(huì)被通知給消費(fèi)者,不管這個(gè)key是不是消費(fèi)者想接收到的。
所以如果你只想消費(fèi)某一類消息的key,那么還得自行加一些標(biāo)記,比如消息的key加個(gè)前綴,消費(fèi)的時(shí)候判斷一下帶前綴的key就是需要消費(fèi)的任務(wù)。
Redisson的RDelayedQueue
Redisson他是Redis的兒子(Redis son),基于Redis實(shí)現(xiàn)了非常多的功能,其中最常使用的就是Redis分布式鎖的實(shí)現(xiàn),但是除了實(shí)現(xiàn)Redis分布式鎖之外,它還實(shí)現(xiàn)了延遲隊(duì)列的功能。
先來個(gè)demo
引入pom。
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.1</version>
</dependency>
封裝了一個(gè)RedissonDelayQueue類。
@Component
@Slf4j
public class RedissonDelayQueue {
private RedissonClient redissonClient;
private RDelayedQueue<String> delayQueue;
private RBlockingQueue<String> blockingQueue;
@PostConstruct
public void init() {
initDelayQueue();
startDelayQueueConsumer();
}
private void initDelayQueue() {
Config config = new Config();
SingleServerConfig serverConfig = config.useSingleServer();
serverConfig.setAddress("redis://localhost:6379");
redissonClient = Redisson.create(config);
blockingQueue = redissonClient.getBlockingQueue("SANYOU");
delayQueue = redissonClient.getDelayedQueue(blockingQueue);
}
private void startDelayQueueConsumer() {
new Thread(() -> {
while (true) {
try {
String task = blockingQueue.take();
log.info("接收到延遲任務(wù):{}", task);
} catch (Exception e) {
e.printStackTrace();
}
}
}, "SANYOU-Consumer").start();
}
public void offerTask(String task, long seconds) {
log.info("添加延遲任務(wù):{} 延遲時(shí)間:{}s", task, seconds);
delayQueue.offer(task, seconds, TimeUnit.SECONDS);
}
}
這個(gè)類在創(chuàng)建的時(shí)候會(huì)去初始化延遲隊(duì)列,創(chuàng)建一個(gè)RedissonClient對(duì)象,之后通過RedissonClient對(duì)象獲取到RDelayedQueue和RBlockingQueue對(duì)象,傳入的隊(duì)列名字叫SANYOU,這個(gè)名字無所謂。
當(dāng)延遲隊(duì)列創(chuàng)建之后,會(huì)開啟一個(gè)延遲任務(wù)的消費(fèi)線程,這個(gè)線程會(huì)一直從RBlockingQueue中通過take方法阻塞獲取延遲任務(wù)。
添加任務(wù)的時(shí)候是通過RDelayedQueue的offer方法添加的。
controller類,通過接口添加任務(wù),延遲時(shí)間為5s。
@RestController
public class RedissonDelayQueueController {
@Resource
private RedissonDelayQueue redissonDelayQueue;
@GetMapping("/add")
public void addTask(@RequestParam("task") String task) {
redissonDelayQueue.offerTask(task, 5);
}
}
啟動(dòng)項(xiàng)目,在瀏覽器輸入如下連接,添加任務(wù)。
http://localhost:8080/add?task=sanyou
靜靜等待5s,成功獲取到任務(wù)。
圖片
實(shí)現(xiàn)原理
如下是Redisson延遲隊(duì)列的實(shí)現(xiàn)原理:
圖片
SANYOU前面的前綴都是固定的,Redisson創(chuàng)建的時(shí)候會(huì)拼上前綴。
- redisson_delay_queue_timeout:SANYOU,sorted set數(shù)據(jù)類型,存放所有延遲任務(wù),按照延遲任務(wù)的到期時(shí)間戳(提交任務(wù)時(shí)的時(shí)間戳 + 延遲時(shí)間)來排序的,所以列表的最前面的第一個(gè)元素就是整個(gè)延遲隊(duì)列中最早要被執(zhí)行的任務(wù),這個(gè)概念很重要
- redisson_delay_queue:SANYOU,list數(shù)據(jù)類型,也是存放所有的任務(wù),但是研究下來發(fā)現(xiàn)好像沒什么用。。
- SANYOU,list數(shù)據(jù)類型,被稱為目標(biāo)隊(duì)列,這個(gè)里面存放的任務(wù)都是已經(jīng)到了延遲時(shí)間的,可以被消費(fèi)者獲取的任務(wù),所以上面demo中的RBlockingQueue的take方法是從這個(gè)目標(biāo)隊(duì)列中獲取到任務(wù)的
- redisson_delay_queue_channel:SANYOU,是一個(gè)channel,用來通知客戶端開啟一個(gè)延遲任務(wù)
任務(wù)提交的時(shí)候,Redisson會(huì)將任務(wù)放到redisson_delay_queue_timeout:SANYOU中,分?jǐn)?shù)就是提交任務(wù)的時(shí)間戳+延遲時(shí)間,就是延遲任務(wù)的到期時(shí)間戳。
Redisson客戶端內(nèi)部通過監(jiān)聽redisson_delay_queue_channel:SANYOU這個(gè)channel來提交一個(gè)延遲任務(wù),這個(gè)延遲任務(wù)能夠保證將redisson_delay_queue_timeout:SANYOU中到了延遲時(shí)間的任務(wù)從redisson_delay_queue_timeout:SANYOU中移除,存到SANYOU這個(gè)目標(biāo)隊(duì)列中。
于是消費(fèi)者就可以從SANYOU這個(gè)目標(biāo)隊(duì)列獲取到延遲任務(wù)了。
所以從這可以看出,Redisson的延遲任務(wù)的實(shí)現(xiàn)跟前面說的MQ的實(shí)現(xiàn)都是殊途同歸,最開始任務(wù)放到中間的一個(gè)地方,叫做redisson_delay_queue_timeout:SANYOU,然后會(huì)開啟一個(gè)類似于定時(shí)任務(wù)的一個(gè)東西,去判斷這個(gè)中間地方的消息是否到了延遲時(shí)間,到了再放到最終的目標(biāo)的隊(duì)列供消費(fèi)者消費(fèi)。
Redisson的這種實(shí)現(xiàn)方式比監(jiān)聽Redis過期key的實(shí)現(xiàn)方式更加可靠,因?yàn)橄⒍即嬖趌ist和sorted set數(shù)據(jù)類型中,所以消息很少丟。
上述說的兩種Redis的方案更詳細(xì)的介紹,可以查看我之前寫的用Redis實(shí)現(xiàn)延遲隊(duì)列,我研究了兩種方案,發(fā)現(xiàn)并不簡單這篇文章。
Netty的HashedWheelTimer
先來個(gè)demo
@Slf4j
public class NettyHashedWheelTimerDemo {
public static void main(String[] args) {
HashedWheelTimer timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 8);
timer.start();
log.info("提交延遲任務(wù)");
timer.newTimeout(timeout -> log.info("執(zhí)行延遲任務(wù)"), 5, TimeUnit.SECONDS);
}
}
測試結(jié)果:
圖片
實(shí)現(xiàn)原理
圖片
如圖,時(shí)間輪會(huì)被分成很多格子(上述demo中的8就代表了8個(gè)格子),一個(gè)格子代表一段時(shí)間(上述demo中的100就代表一個(gè)格子是100ms),所以上述demo中,每800ms會(huì)走一圈。
當(dāng)任務(wù)提交的之后,會(huì)根據(jù)任務(wù)的到期時(shí)間進(jìn)行hash取模,計(jì)算出這個(gè)任務(wù)的執(zhí)行時(shí)間所在具體的格子,然后添加到這個(gè)格子中,通過如果這個(gè)格子有多個(gè)任務(wù),會(huì)用鏈表來保存。所以這個(gè)任務(wù)的添加有點(diǎn)像HashMap儲(chǔ)存元素的原理。
HashedWheelTimer內(nèi)部會(huì)開啟一個(gè)線程,輪詢每個(gè)格子,找到到了延遲時(shí)間的任務(wù),然后執(zhí)行。
由于HashedWheelTimer也是單線程來處理任務(wù),所以跟Timer一樣,長時(shí)間運(yùn)行的任務(wù)會(huì)導(dǎo)致其他任務(wù)的延時(shí)處理。
前面Redisson中提到的客戶端延遲任務(wù)就是基于Netty的HashedWheelTimer實(shí)現(xiàn)的。
Hutool的SystemTimer
Hutool工具類也提供了延遲任務(wù)的實(shí)現(xiàn)SystemTimer
demo
@Slf4j
public class SystemTimerDemo {
public static void main(String[] args) {
SystemTimer systemTimer = new SystemTimer();
systemTimer.start();
log.info("提交延遲任務(wù)");
systemTimer.addTask(new TimerTask(() -> log.info("執(zhí)行延遲任務(wù)"), 5000));
}
}
執(zhí)行結(jié)果
圖片
Hutool底層其實(shí)也用到了時(shí)間輪。
Qurtaz
Qurtaz是一款開源作業(yè)調(diào)度框架,基于Qurtaz提供的api也可以實(shí)現(xiàn)延遲任務(wù)的功能。
demo
依賴
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.2</version>
</dependency>
SanYouJob實(shí)現(xiàn)Job接口,當(dāng)任務(wù)到達(dá)執(zhí)行時(shí)間的時(shí)候會(huì)調(diào)用execute的實(shí)現(xiàn),從context可以獲取到任務(wù)的內(nèi)容。
@Slf4j
public class SanYouJob implements Job {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
JobDetail jobDetail = context.getJobDetail();
JobDataMap jobDataMap = jobDetail.getJobDataMap();
log.info("獲取到延遲任務(wù):{}", jobDataMap.get("delayTask"));
}
}
測試類
public class QuartzDemo {
public static void main(String[] args) throws SchedulerException, InterruptedException {
// 1.創(chuàng)建Scheduler的工廠
SchedulerFactory sf = new StdSchedulerFactory();
// 2.從工廠中獲取調(diào)度器實(shí)例
Scheduler scheduler = sf.getScheduler();
// 6.啟動(dòng) 調(diào)度器
scheduler.start();
// 3.創(chuàng)建JobDetail,Job類型就是上面說的SanYouJob
JobDetail jb = JobBuilder.newJob(SanYouJob.class)
.usingJobData("delayTask", "這是一個(gè)延遲任務(wù)")
.build();
// 4.創(chuàng)建Trigger
Trigger t = TriggerBuilder.newTrigger()
//任務(wù)的觸發(fā)時(shí)間就是延遲任務(wù)到的延遲時(shí)間
.startAt(DateUtil.offsetSecond(new Date(), 5))
.build();
// 5.注冊(cè)任務(wù)和定時(shí)器
log.info("提交延遲任務(wù)");
scheduler.scheduleJob(jb, t);
}
}
執(zhí)行結(jié)果:
圖片
實(shí)現(xiàn)原理
核心組件
- Job:表示一個(gè)任務(wù),execute方法的實(shí)現(xiàn)是對(duì)任務(wù)的執(zhí)行邏輯
- JobDetail:任務(wù)的詳情,可以設(shè)置任務(wù)需要的參數(shù)等信息
- Trigger:觸發(fā)器,是用來觸發(fā)業(yè)務(wù)的執(zhí)行,比如說指定5s后觸發(fā)任務(wù),那么任務(wù)就會(huì)在5s后觸發(fā)
- Scheduler:調(diào)度器,內(nèi)部可以注冊(cè)多個(gè)任務(wù)和對(duì)應(yīng)任務(wù)的觸發(fā)器,之后會(huì)調(diào)度任務(wù)的執(zhí)行
圖片
啟動(dòng)的時(shí)候會(huì)開啟一個(gè)QuartzSchedulerThread調(diào)度線程,這個(gè)線程會(huì)去判斷任務(wù)是否到了執(zhí)行時(shí)間,到的話就將任務(wù)交給任務(wù)線程池去執(zhí)行。
無限輪詢延遲任務(wù)
無限輪詢的意思就是開啟一個(gè)線程不停的去輪詢?nèi)蝿?wù),當(dāng)這些任務(wù)到達(dá)了延遲時(shí)間,那么就執(zhí)行任務(wù)。
demo
@Slf4j
public class PollingTaskDemo {
private static final List<DelayTask> DELAY_TASK_LIST = new CopyOnWriteArrayList<>();
public static void main(String[] args) {
new Thread(() -> {
while (true) {
try {
for (DelayTask delayTask : DELAY_TASK_LIST) {
if (delayTask.triggerTime <= System.currentTimeMillis()) {
log.info("處理延遲任務(wù):{}", delayTask.taskContent);
DELAY_TASK_LIST.remove(delayTask);
}
}
TimeUnit.MILLISECONDS.sleep(100);
} catch (Exception e) {
}
}
}).start();
log.info("提交延遲任務(wù)");
DELAY_TASK_LIST.add(new DelayTask("三友的java日記", 5L));
}
@Getter
@Setter
public static class DelayTask {
private final String taskContent;
private final Long triggerTime;
public DelayTask(String taskContent, Long delayTime) {
this.taskContent = taskContent;
this.triggerTime = System.currentTimeMillis() + delayTime * 1000;
}
}
}
任務(wù)可以存在數(shù)據(jù)庫又或者是內(nèi)存,看具體的需求,這里我為了簡單就放在內(nèi)存里了。
執(zhí)行結(jié)果:
圖片
這種操作簡單,但是就是效率低下,每次都得遍歷所有的任務(wù)。
最后
最后,本文所有示例代碼地址:
https://github.com/sanyou3/delay-task-demo.git