面試官:Redis如何實(shí)現(xiàn)延遲任務(wù)?
延遲任務(wù)(Delayed Task)是指在未來(lái)的某個(gè)時(shí)間點(diǎn),執(zhí)行相應(yīng)的任務(wù)。也就是說(shuō),延遲任務(wù)是一種計(jì)劃任務(wù),它被安排在特定的時(shí)間后執(zhí)行,而不是立即執(zhí)行。
延遲任務(wù)的常見(jiàn)使用場(chǎng)景有以下幾個(gè):
- 定時(shí)發(fā)送通知或消息:
- 發(fā)送定時(shí)短信、郵件或應(yīng)用內(nèi)消息,如注冊(cè)確認(rèn)、訂單狀態(tài)更新、促銷(xiāo)活動(dòng)通知等。
- 定時(shí)推送新聞、天氣預(yù)報(bào)、股票價(jià)格等實(shí)時(shí)信息。
- 異步處理和后臺(tái)任務(wù):
- 將耗時(shí)的操作安排為延遲任務(wù),避免阻塞主線程或用戶界面,提高系統(tǒng)的響應(yīng)性能。
- 執(zhí)行批量數(shù)據(jù)處理,如日志分析、數(shù)據(jù)報(bào)表生成等。
- 緩存管理和過(guò)期處理:
- 定時(shí)清理過(guò)期的緩存數(shù)據(jù),釋放存儲(chǔ)空間。
- 更新緩存中的數(shù)據(jù),保持?jǐn)?shù)據(jù)的時(shí)效性和準(zhǔn)確性。
- 計(jì)劃任務(wù)和定時(shí)調(diào)度:
- 在特定時(shí)間執(zhí)行系統(tǒng)維護(hù)任務(wù),如數(shù)據(jù)庫(kù)備份、系統(tǒng)更新等。
- 定時(shí)啟動(dòng)或關(guān)閉服務(wù),以節(jié)約資源或滿足業(yè)務(wù)需求。
- 訂單和支付處理:
- 在用戶下單后的一段時(shí)間內(nèi),如果用戶未支付,則自動(dòng)取消訂單。
- 定時(shí)檢查訂單的支付狀態(tài),并更新相應(yīng)的訂單信息。
- 重試和失敗恢復(fù)機(jī)制:
- 當(dāng)某個(gè)操作失敗時(shí),可以在延遲一段時(shí)間后自動(dòng)重試,以提高成功率。
- 實(shí)現(xiàn)分布式鎖的超時(shí)釋放,避免死鎖情況。
- 提醒和日程管理:
- 設(shè)置日程提醒,如會(huì)議、生日、紀(jì)念日等。
- 定時(shí)提醒用戶完成任務(wù)或進(jìn)行某項(xiàng)活動(dòng)。
- 定時(shí)數(shù)據(jù)采集和上報(bào):
- 定期從傳感器、設(shè)備或外部系統(tǒng)中采集數(shù)據(jù)。
- 定時(shí)上報(bào)應(yīng)用的使用情況、統(tǒng)計(jì)數(shù)據(jù)或用戶行為分析。
Redis如何實(shí)現(xiàn)延遲任務(wù)?
Redis 本身并沒(méi)有直接提供延遲任務(wù)的功能,但可以通過(guò)一些策略和手段,在 Redis 中手動(dòng)實(shí)現(xiàn)延遲任務(wù)。
使用 Redis 實(shí)現(xiàn)延遲任務(wù)的主要手段有以下幾個(gè):
- 使用過(guò)期鍵的事件通知執(zhí)行延時(shí)任務(wù):開(kāi)啟過(guò)期鍵通知,當(dāng) Redis 中鍵值過(guò)期時(shí)觸發(fā)時(shí)間,在事件中實(shí)現(xiàn)延遲代碼,但因?yàn)?Redis 的 Key 過(guò)期時(shí)不會(huì)被及時(shí)刪除,所以這個(gè)過(guò)期事件也不保證可以立即觸發(fā),所以此方式很少用來(lái)實(shí)現(xiàn)延遲任務(wù)(因?yàn)闃O其不穩(wěn)定)。
- 使用 ZSet 執(zhí)行延時(shí)任務(wù):在 ZSet 中插入延遲執(zhí)行時(shí)間和任務(wù) ID,如下命令所示:
ZADD delay_tasks <timestamp> <task_id>
然后,啟動(dòng)一個(gè)后臺(tái)線程或者定時(shí)任務(wù),定期通過(guò) ZRANGEBYSCORE 命令從有序集合中獲取已到達(dá)執(zhí)行時(shí)間的任務(wù),即分?jǐn)?shù)小于或等于當(dāng)前時(shí)間的任務(wù),進(jìn)行執(zhí)行即可實(shí)現(xiàn)延時(shí)任務(wù)。
- 使用 Redisson 執(zhí)行延遲任務(wù):在 Redisson 框架中,提供了一個(gè) RDelayedQueue 用于實(shí)現(xiàn)延遲隊(duì)列,使用簡(jiǎn)單方便,推薦使用。
具體實(shí)現(xiàn)如下。
過(guò)期鍵通知事件實(shí)現(xiàn)
Redis 提供了鍵空間通知功能,當(dāng)某個(gè)鍵發(fā)生變化(過(guò)期)時(shí),可以發(fā)送通知。你可以結(jié)合 EXPIRE 過(guò)期命令和鍵空間通知來(lái)實(shí)現(xiàn)延遲任務(wù)。
當(dāng)為某個(gè)鍵設(shè)置過(guò)期時(shí)間時(shí),一旦該鍵過(guò)期,Redis 會(huì)發(fā)送一個(gè)通知。你可以訂閱這個(gè)通知,并在接收到通知時(shí)執(zhí)行任務(wù)。但這種方法可能不夠精確,且依賴于 Redis 的內(nèi)部機(jī)制。
它的實(shí)現(xiàn)步驟是:
- 設(shè)置開(kāi)啟 Redis 過(guò)期鍵通知事件,可以通過(guò)執(zhí)行“CONFIG SET notify-keyspace-events KEA”命令來(lái)動(dòng)態(tài)開(kāi)啟鍵空間通知功能,而無(wú)需重啟 Redis 服務(wù)器。
- 設(shè)置過(guò)期鍵,可以通過(guò)命令“SET mykey "myvalue" EX 3”設(shè)置某個(gè) key 3 秒后過(guò)期(3s 后執(zhí)行)。
- 編寫(xiě)一個(gè)監(jiān)聽(tīng)程序來(lái)訂閱 Redis 的鍵空間通知。這可以通過(guò)使用 Redis 的發(fā)布/訂閱功能來(lái)實(shí)現(xiàn),具體實(shí)現(xiàn)代碼如下(以 Jedis 框架使用為例):
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class RedisKeyspaceNotifier {
public static void main(String[] args) {
// 創(chuàng)建Jedis實(shí)例
Jedis jedis = new Jedis("localhost", 6379);
// 配置鍵空間通知(通常這一步在Redis配置文件中完成,但也可以在運(yùn)行時(shí)配置)
jedis.configSet("notify-keyspace-events", "KEA");
// 訂閱鍵空間通知
jedis.subscribe(new KeyspaceNotificationSubscriber(), "__keyevent@0__:expired");
}
static class KeyspaceNotificationSubscriber extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
System.out.println("Received message from channel: " + channel + ", message: " + message);
// 在這里處理接收到的鍵空間通知
// 例如,如果message是一個(gè)需要處理的任務(wù)ID,你可以在這里觸發(fā)相應(yīng)的任務(wù)處理邏輯
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("Subscribed to channel: " + channel);
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println("Unsubscribed from channel: " + channel);
}
}
}
但因?yàn)?Redis 的 Key 過(guò)期時(shí)不會(huì)被及時(shí)刪除,Redis 采用的是惰性刪除和定期刪除,所以這個(gè)過(guò)期事件也不保證可以立即觸發(fā),所以此方式很少用來(lái)實(shí)現(xiàn)延遲任務(wù)(因?yàn)闃O其不穩(wěn)定)。
使用ZSet實(shí)現(xiàn)延遲任務(wù)
可以將任務(wù)及其執(zhí)行時(shí)間作為成員和分?jǐn)?shù)存儲(chǔ)在 ZSET 中,然后,使用一個(gè)后臺(tái)任務(wù)(如定時(shí)任務(wù)或守護(hù)進(jìn)程)定期檢查 ZSET,查找分?jǐn)?shù)(即執(zhí)行時(shí)間)小于或等于當(dāng)前時(shí)間的成員,并執(zhí)行相應(yīng)的任務(wù)。執(zhí)行后,從 ZSET 中刪除該成員,具體實(shí)現(xiàn)代碼如下:
import redis.clients.jedis.Jedis;
import java.util.Set;
public class RedisDelayedTaskDemo {
private static final String ZSET_KEY = "delayed_tasks";
private static final String REDIS_HOST = "localhost";
private static final int REDIS_PORT = 6379;
public static void main(String[] args) {
Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT);
// 添加延遲任務(wù)
addDelayedTask(jedis, "task1", System.currentTimeMillis() / 1000 + 5); // 5秒后執(zhí)行
addDelayedTask(jedis, "task2", System.currentTimeMillis() / 1000 + 10); // 10秒后執(zhí)行
// 模擬定時(shí)任務(wù)檢查器
new Thread(() -> {
while (true) {
try {
// 檢查并執(zhí)行到期的任務(wù)
checkAndExecuteTasks(jedis);
Thread.sleep(1000); // 每秒檢查一次
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
private static void addDelayedTask(Jedis jedis, String task, long executeTime) {
jedis.zadd(ZSET_KEY, executeTime, task);
System.out.println("Added task: " + task + " with execution time: " + executeTime);
}
private static void checkAndExecuteTasks(Jedis jedis) {
long currentTime = System.currentTimeMillis() / 1000;
Set<String> tasks = jedis.zrangeByScore(ZSET_KEY, 0, currentTime);
for (String task : tasks) {
jedis.zrem(ZSET_KEY, task); // 從有序集合中移除任務(wù)
executeTask(task); // 執(zhí)行任務(wù)
}
}
private static void executeTask(String task) {
System.out.println("Executing task: " + task);
// 在這里添加實(shí)際的任務(wù)執(zhí)行邏輯
}
}
在這個(gè)示例中,我們首先使用 addDelayedTask 方法向 Redis 的有序集合中添加任務(wù),并設(shè)置它們的執(zhí)行時(shí)間。然后,我們啟動(dòng)一個(gè)線程來(lái)模擬定時(shí)任務(wù)檢查器,它會(huì)每秒檢查一次是否有任務(wù)到期,并執(zhí)行到期的任務(wù)。
使用Redisson執(zhí)行定時(shí)任務(wù)
在 Redisson 框架中,提供了一個(gè) RDelayedQueue 用于實(shí)現(xiàn)延遲隊(duì)列,使用簡(jiǎn)單方便,推薦使用,它的具體實(shí)現(xiàn)如下:
import org.redisson.Redisson;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import java.util.concurrent.TimeUnit;
public class RDelayedQueueDemo {
public static void main(String[] args) throws InterruptedException {
// 創(chuàng)建 Redisson 客戶端
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
// 獲取延遲隊(duì)列
RDelayedQueue<String> delayedQueue =
redisson.getDelayedQueue("delayedQueue");
// 添加延遲任務(wù)
delayedQueue.offer("task1", 5, TimeUnit.SECONDS);
// 監(jiān)聽(tīng)并處理延遲任務(wù)
Thread listenerThread = new Thread(() -> {
while (true) {
try {
// 通過(guò) take 方法等待并獲取到期的任務(wù)
String task = delayedQueue.take();
System.out.println("Handle task: " + task);
} catch (InterruptedException e) {
break;
}
}
});
listenerThread.start();
}
}
在上述示例中,我們首先創(chuàng)建了一個(gè) Redisson 客戶端,通過(guò)配置文件指定了使用單節(jié)點(diǎn) Redis 服務(wù)器。然后,我們獲取一個(gè)延遲隊(duì)列 RDelayedQueue,并添加一個(gè)延遲任務(wù),延遲時(shí)間為 5 秒,接著,我們通過(guò)線程監(jiān)聽(tīng)并處理延遲隊(duì)列中的任務(wù)。
Redis實(shí)現(xiàn)延遲任務(wù)優(yōu)缺點(diǎn)分析
優(yōu)點(diǎn):
- 輕量級(jí)與高性能:Redis 是一個(gè)內(nèi)存中的數(shù)據(jù)結(jié)構(gòu)存儲(chǔ)系統(tǒng),因此讀寫(xiě)速度非??臁⑷蝿?wù)信息存儲(chǔ)在 Redis 中可以迅速地進(jìn)行存取操作。
- 簡(jiǎn)單易用:Redis 的 API 簡(jiǎn)潔明了,易于集成到現(xiàn)有的應(yīng)用系統(tǒng)中。
缺點(diǎn):
- 精度有限:Redis 的延遲任務(wù)依賴于系統(tǒng)的定時(shí)檢查機(jī)制,而不是精確的定時(shí)器。這意味著任務(wù)的執(zhí)行可能會(huì)有一定的延遲,特別是在系統(tǒng)負(fù)載較高或檢查間隔較長(zhǎng)的情況下。
- 功能有限:與專業(yè)的任務(wù)調(diào)度系統(tǒng)相比,Redis 提供的延遲任務(wù)功能可能相對(duì)簡(jiǎn)單。對(duì)于復(fù)雜的任務(wù)調(diào)度需求,如任務(wù)依賴、任務(wù)優(yōu)先級(jí)等,可能需要額外的邏輯來(lái)實(shí)現(xiàn)。
- 穩(wěn)定性較差:使用 Redis 實(shí)現(xiàn)延遲任務(wù)沒(méi)有重試機(jī)制和 ACK 確認(rèn)機(jī)制,所以穩(wěn)定性比較差。
- 單點(diǎn)故障風(fēng)險(xiǎn):如果沒(méi)有正確配置 Redis 集群或主從復(fù)制,那么單個(gè) Redis 實(shí)例的故障可能導(dǎo)致整個(gè)延遲任務(wù)系統(tǒng)的癱瘓。