Go 語言實戰(zhàn):構(gòu)建強大的延遲任務(wù)隊列
介紹
延遲隊列是一種數(shù)據(jù)結(jié)構(gòu),用于處理需要在未來某個特定時間執(zhí)行的任務(wù)。這些任務(wù)被添加到隊列中,并且指定了一個執(zhí)行時間,只有到達指定的時間點時才能從隊列中取出并執(zhí)行。
在實際應(yīng)用中,延遲隊列可以用于處理各種需要延遲處理的任務(wù),例如發(fā)送郵件提醒、訂單自動取消、對超時任務(wù)的處理等。由于任務(wù)的執(zhí)行是在未來的某個時間點,因此這些任務(wù)不會立即執(zhí)行,而是存儲在隊列中,直到它的預定執(zhí)行時間才會被執(zhí)行。
Simple
在 Go 語言中,我們可以使用 time 包提供的計時器功能,通過使用 Go 中的 slice 存儲延遲處理的任務(wù),實現(xiàn)一個簡單的延遲隊列的功能。
示例代碼:
type Task struct {
ExecuteTime time.Time
Job func()
}
首先,我們定義一個結(jié)構(gòu)體 Task,它包含一個可以執(zhí)行任務(wù)的函數(shù) Job,和一個執(zhí)行時間 ExecuteTime,這是期望執(zhí)行該函數(shù)的時間。
示例代碼:
type DelayQueue struct {
TaskQueue []Task
}
接下來,我們定義一個 DelayQueue 結(jié)構(gòu)體,它擁有一個 TaskQueue,這是一個 Task 類型的切片,用于保存待執(zhí)行任務(wù)的列表。
示例代碼:
// 添加任務(wù)
func (d *DelayQueue) AddTask(t Task) {
d.TaskQueue = append(d.TaskQueue, t)
}
// 移除任務(wù)
func (d *DelayQueue) RemoveTask() {
d.TaskQueue = d.TaskQueue[1:]
}
// 執(zhí)行任務(wù)
func (d *DelayQueue) ExecuteTasks() {
for len(d.TaskQueue) > 0 {
// 獲取隊列最頂部的任務(wù)
currentTask := d.TaskQueue[0]
// 如果執(zhí)行時間還沒到,等待
if time.Now().Before(currentTask.ExecuteTime) {
time.Sleep(currentTask.ExecuteTime.Sub(time.Now()))
}
// 執(zhí)行任務(wù)
currentTask.Job()
// 移除已執(zhí)行的任務(wù)
d.RemoveTask()
}
}
DelayQueue 包含三個方法:
- 第一個方法是 AddTask(t Task)。此方法將提供的任務(wù) t 添加到 TaskQueue 的末尾。
- 第二個方法是 RemoveTask()。此方法從 TaskQueue 中移除第一個任務(wù)。
- 第三個方法是 ExecuteTasks()。此方法將執(zhí)行 TaskQueue 中的所有任務(wù)。如果隊列頂部任務(wù)的執(zhí)行時間還未到,該方法將等待。一旦時間到了,它將會執(zhí)行 Job 并從 TaskQueue 中移除該任務(wù)。
示例代碼:
func main() {
fmt.Println("Start DelayQueue")
queue := DelayQueue{}
firstTask := Task{
ExecuteTime: time.Now().Add(4 * time.Second),
Job: func() {
fmt.Println("Executed task 1 after delay")
},
}
queue.AddTask(firstTask)
secondTask := Task{
ExecuteTime: time.Now().Add(10 * time.Second),
Job: func() {
fmt.Println("Executed task 2 after delay")
},
}
queue.AddTask(secondTask)
queue.ExecuteTasks()
fmt.Println("Done!")
}
輸出結(jié)果:
Start DelayQueue
Executed task 1 after delay
Executed task 2 after delay
Done!
在示例代碼中,我們創(chuàng)建了一個延時隊列,將任務(wù)添加到隊列中,并在指定的延時后執(zhí)行它們。
通過使用這些結(jié)構(gòu)體和方法,我們可以在 Go 中實現(xiàn)簡單的延遲執(zhí)行任務(wù)的功能。
但是,當 Go 程序重啟時,存儲在 slice 中的延遲處理的任務(wù)將全部丟失。
Complex
在 Go 程序中,如果想在重啟后保留數(shù)據(jù),我們可以將數(shù)據(jù)持久化到 Redis,可以使用 go-redis/redis 庫[1]與 Redis 交互。而對于延遲隊列的需求,則可以使用 Redis 的 ZSET(有序集合)特性來實現(xiàn)。
示例代碼:
// 定義一個全局的redisdb變量
var redisdb *redis.Client
// 初始化連接
func initClient() (err error) {
redisdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
})
_, err = redisdb.Ping().Result()
if err != nil {
return err
}
return nil
}
全局變量 redisdb 是 redis.Client 類型的指針,用來保存到 Redis 客戶端的引用。
initClient 函數(shù)初始化連接到 Redis 服務(wù)器,該服務(wù)器在本地主機的 6379 端口運行。它將一個新的 Redis 客戶端分配給 redisdb 變量。如果連接成功,它就會 ping Redis 服務(wù)器以測試連接。
示例代碼:
// 向隊列中添加任務(wù)
func addTaskToQueue(task string, executeTime int64) {
err := redisdb.ZAdd("delay-queue", redis.Z{
Score: float64(executeTime),
Member: task,
}).Err()
if err != nil {
panic(err)
}
}
addTaskToQueue 函數(shù)將具有執(zhí)行時間的任務(wù)添加到 Redis 等待排序的集合 "delay-queue"。執(zhí)行時間是一個 UNIX 時間戳,作為排序集合中的項目的 score,允許 Redis 按照他們應(yīng)該執(zhí)行的時間來排序項目。
示例代碼:
// 從隊列中獲取并處理任務(wù)
func getAndExecuteTasks() {
for {
// 使用 ZRANGEBYSCORE 命令獲取分數(shù)(時間戳)<= 當前時間的任務(wù)
tasks, err := redisdb.ZRangeByScore("delay-queue", redis.ZRangeBy{
Min: "-inf",
Max: fmt.Sprintf("%d", time.Now().Unix()),
}).Result()
if err != nil {
time.Sleep(1 * time.Second)
continue
}
// 處理任務(wù)
for _, task := range tasks {
fmt.Println("Executing task: ", task)
// 執(zhí)行完任務(wù)后,用 ZREM 移除該任務(wù)
redisdb.ZRem("delay-queue", task)
}
// 暫停一秒
time.Sleep(1 * time.Second)
}
}
getAndExecuteTasks 函數(shù)不斷檢查 "delay-queue"。它提取隊列中 score 小于或等于當前時間戳的任務(wù),意味著這些任務(wù)現(xiàn)在應(yīng)該執(zhí)行或者他們應(yīng)該在過去就已經(jīng)執(zhí)行。獲取任務(wù)后,它打印任務(wù)(模擬執(zhí)行)并從隊列中刪除任務(wù)。
示例代碼:
func main() {
err := initClient()
if err != nil {
fmt.Println("redis connect error:", err)
return
}
// 添加一些測試任務(wù)
addTaskToQueue("task1", time.Now().Add(10*time.Second).Unix())
addTaskToQueue("task2", time.Now().Add(20*time.Second).Unix())
// 執(zhí)行延遲隊列中的任務(wù)
getAndExecuteTasks()
}
輸出結(jié)果:
Executing task: task1
Executing task: task2
main 函數(shù)調(diào)用這些函數(shù)。首先,它初始化 Redis 客戶端。如果初始化和連接成功,它將一些測試任務(wù)添加到隊列中,并啟動任務(wù)執(zhí)行循環(huán)。
總結(jié)一下,這段 Go 代碼使用 Redis 的 Sorted Set 數(shù)據(jù)類型創(chuàng)建了一個延時隊列系統(tǒng),其中的任務(wù)按照他們的執(zhí)行時間進行排序,一個任務(wù)工作者循環(huán)獲取并執(zhí)行隊列中的任務(wù)。這是一個簡單而高效地實現(xiàn)作業(yè)調(diào)度系統(tǒng)的方法。
總結(jié)
本文我們分別實現(xiàn)簡單版和復雜版的延遲隊列,其中簡單版延遲隊列,只使用 Go 實現(xiàn),復雜版延遲隊列,使用 Go 和 Redis 實現(xiàn)。
(1) 只使用 Go 實現(xiàn)延遲隊列:
優(yōu)點:
- 不需要外部依賴:只使用 Go 實現(xiàn)延遲隊列,你不需要安裝和維護外部的 Redis 服務(wù)器。
缺點:
- 健壯性和持久性:如果程序崩潰或重新啟動,延遲隊列的數(shù)據(jù)可能會丟失。
- 并發(fā)控制:使用 Go 內(nèi)置的數(shù)據(jù)結(jié)構(gòu)(如 channels 或 slices)在多個 goroutines 之間共享狀態(tài)變量可能需要精細的并發(fā)控制,比如使用 mutexes 或者 channels。
(2) 使用 Go + Redis 實現(xiàn)延遲隊列:
優(yōu)點:
- 數(shù)據(jù)持久性:Redis 提供了數(shù)據(jù)持久性,即使在程序重啟或崩潰后,隊列中的數(shù)據(jù)依然可以恢復。
- 簡化并發(fā):Redis 提供的數(shù)據(jù)結(jié)構(gòu)(如 sorted set)是原子操作,可以簡化并發(fā)控制。
- 功能強大:使用 Redis,你可以利用其提供的一些內(nèi)建功能,如超時、TTL、持久化等。
缺點:
- 需要額外的組件:使用 Redis 意味著需要安裝和運行 Redis 服務(wù)器,這可能增加系統(tǒng)的復雜性和運維成本。
- 網(wǎng)絡(luò)延遲:如果 Go 程序和 Redis 服務(wù)器不在同一臺機器上,網(wǎng)絡(luò)延遲可能會影響延遲的準確性。
總的來說,如果我們對延遲隊列的持久性、準確性和并發(fā)性有高要求,那么 Go + Redis 的方案可能會更適合。如果我們想要一個更簡單的解決方案,并且可以容忍在程序崩潰時部分數(shù)據(jù)丟失,那么只使用 Go 實現(xiàn)可能會更合適。