
背景
我們?cè)谑褂胟ubernetes的客戶端k8s.io/client-go 進(jìn)行開發(fā)的時(shí)候,比如寫個(gè)CRD的operator, 經(jīng)常會(huì)用到隊(duì)列這種數(shù)據(jù)結(jié)構(gòu)。并且很多時(shí)候,我們?cè)谧龇?wù)器端后臺(tái)開發(fā)的時(shí)候,需要用到任務(wù)隊(duì)列,進(jìn)行任務(wù)的異步處理與任務(wù)管理。k8s.io/client-go中的workqueue包里面提供了三種常用的隊(duì)列。今天給大家演示下三種隊(duì)列的使用方法與相應(yīng)的使用場(chǎng)景,大家在工作中可以直接copy這些代碼,加速自己項(xiàng)目的開發(fā)。這三個(gè)隊(duì)列的關(guān)系如下圖所示:

k8s隊(duì)列關(guān)系
隊(duì)列
type (基礎(chǔ)隊(duì)列)
下面給出了數(shù)據(jù)結(jié)構(gòu),其中dirty,processing兩個(gè)集合分別存儲(chǔ)的是需要處理的任務(wù)和正在處理的任務(wù),queue[]t按序存放的是所有添加的任務(wù)。這三個(gè)屬性的關(guān)系很有意思,dirty用于快速判斷queue中是否存在相應(yīng)的任務(wù),這樣有以下兩個(gè)用處:
1. 在Add的時(shí)候,可以防止重復(fù)添加。(代碼查看:?https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L120?)。
2.由于在任務(wù)完成后要調(diào)用Done方法,把任務(wù)從processing集合中刪除掉,那么如果在完成前(即調(diào)用Done方法之前),把任務(wù)再次添加進(jìn)dirty集合,那么在完成調(diào)用Done方法的時(shí)候,會(huì)再次把任務(wù)重新添加進(jìn)queue隊(duì)列,進(jìn)行處理(代碼查看:https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L180)。
而processing集合存放的是當(dāng)前正在執(zhí)行的任務(wù),它的作用有以下幾點(diǎn)。
1.在Add的時(shí)候,如果任務(wù)正在處理,就直接返回。這樣在任務(wù)調(diào)用Done的時(shí)候,由于dirty集合中有,會(huì)把這個(gè)任務(wù)再次放在隊(duì)列的尾部。(代碼查看:https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L120)。
2.用于判斷隊(duì)列中是否還有任務(wù)正在執(zhí)行,這樣在shutdown的時(shí)候,可以有的放矢。(代碼查看:https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L221)。
整個(gè)queue隊(duì)列工作模式是,你的工作線程通過Get方法從隊(duì)列中獲取任務(wù)(如果隊(duì)列長(zhǎng)度為0,需要q.cond.Wait()),然后處理任務(wù)(你自己的業(yè)務(wù)邏輯),處理完后調(diào)用Done方法,表明任務(wù)完成了,同時(shí)調(diào)用q.cond.Signal(),喚醒等待的工作線程?。
type Type struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
queue []t
// dirty defines all of the items that need to be processed.
dirty set
// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing set
cond *sync.Cond
shuttingDown bool
drain bool
metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
clock clock.WithTicker
}
delaying_queue(延遲隊(duì)列)
這個(gè)延遲隊(duì)列繼承了上面的基礎(chǔ)隊(duì)列,同時(shí)提供了addAfter函數(shù),實(shí)現(xiàn)根據(jù)延遲時(shí)間把元素增加進(jìn)延遲隊(duì)列。其中的waitForPriorityQueue實(shí)現(xiàn)了一個(gè)用于waitFor元素的優(yōu)先級(jí)隊(duì)列,其實(shí)就是一個(gè)最小堆。
func (q *delayingType) AddAfter(item interface{}, duration time.Duration)這個(gè)函數(shù)(代碼https://github.com/kubernetes/client-go/blob/master/util/workqueue/delaying_queue.go#L162)。
當(dāng)duration為0,就直接通過q.add放到它繼承的基礎(chǔ)執(zhí)行隊(duì)列里面,如果有延遲值,就放在q.waitingForAddCh通道里面,等待readyAt時(shí)機(jī)成熟,再放到隊(duì)列中。那這個(gè)通道里面的元素當(dāng)readyAt后,如何加入到基礎(chǔ)執(zhí)行隊(duì)列?下面的截圖給出了答案,便是啟動(dòng)的ret.waitingLoop協(xié)程。這個(gè)方法的具體代碼(https://github.com/kubernetes/client-go/blob/master/util/workqueue/delaying_queue.go#L189),具體思路就是利用了上面的waitForPriorityQueue最小堆,還有等待加入隊(duì)列通道q.waitingForAddCh,大家可以看看給出的具體代碼,大致的思想就會(huì)了解。

創(chuàng)建延遲隊(duì)列
// delayingType wraps an Interface and provides delayed re-enquing
type delayingType struct {
Interface
// clock tracks time for delayed firing
clock clock.Clock
// stopCh lets us signal a shutdown to the waiting loop
stopCh chan struct{}
// stopOnce guarantees we only signal shutdown a single time
stopOnce sync.Once
// heartbeat ensures we wait no more than maxWait before firing
heartbeat clock.Ticker
// waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan *waitFor
// metrics counts the number of retries
metrics retryMetrics
}
// waitFor holds the data to add and the time it should be added
type waitFor struct {
data t
readyAt time.Time
// index in the priority queue (heap)
index int
}
type waitForPriorityQueue []*waitFor

元素添加邏輯
下面是測(cè)試代碼,大家可以看看如何創(chuàng)建延遲隊(duì)列,還有添加任務(wù)。
下面的代碼,在延遲隊(duì)列里面增加了一個(gè)字符串"foo",延遲執(zhí)行的時(shí)間是50毫秒。然后差不多50毫秒后,延遲隊(duì)列長(zhǎng)度為0
fakeClock := testingclock.NewFakeClock(time.Now())
q := NewDelayingQueueWithCustomClock(fakeClock, "")
first := "foo"
q.AddAfter(first, 50*time.Millisecond)
if err := waitForWaitingQueueToFill(q); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if q.Len() != 0 {
t.Errorf("should not have added")
}
fakeClock.Step(60 * time.Millisecond)
if err := waitForAdded(q, 1); err != nil {
t.Errorf("should have added")
}
item, _ := q.Get()
q.Done(item)
// step past the next heartbeat
fakeClock.Step(10 * time.Second)
err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) {
if q.Len() > 0 {
return false, fmt.Errorf("added to queue")
}
return false, nil
})
if err != wait.ErrWaitTimeout {
t.Errorf("expected timeout, got: %v", err)
}
if q.Len() != 0 {
t.Errorf("should not have added")
}
func waitForAdded(q DelayingInterface, depth int) error {
return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if q.Len() == depth {
return true, nil
}
return false, nil
})
}
func waitForWaitingQueueToFill(q DelayingInterface) error {
return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if len(q.(*delayingType).waitingForAddCh) == 0 {
return true, nil
}
return false, nil
})
}
rate_limiting_queue(限速隊(duì)列)?
限速隊(duì)列是利用延遲隊(duì)列的延遲特性,延遲某個(gè)元素的插入FIFO隊(duì)列的時(shí)間,達(dá)到限速的目的
workqueue包下面的rateLimiter有多種,下面的代碼顯示的是
ItemExponentialFailureRateLimiter(排隊(duì)指數(shù)算法)。
type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int
baseDelay time.Duration
maxDelay time.Duration
}
它有個(gè)基礎(chǔ)延遲時(shí)間,加入到延遲隊(duì)列后,被執(zhí)行的延遲時(shí)間的計(jì)算公式是如下所示。另外它還有個(gè)最大延遲時(shí)間的參數(shù)。
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1
// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}
calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}
下面的測(cè)試代碼,顯示的是創(chuàng)建了一個(gè)1毫秒基礎(chǔ)延遲,最大1秒的延遲隊(duì)列。它在延遲隊(duì)列中增加了
一個(gè)"one"字符串,由于是第一次添加,所以基于上面的公式它的延遲時(shí)間是1毫秒,再次增加"one"
后,它的延遲時(shí)間是2*1毫秒,即2毫秒,對(duì)于增加的字符串"two"也是一樣,當(dāng)我們調(diào)用forget
方法后ItemExponentialFailureRateLimiter中的計(jì)數(shù)器會(huì)重置,再次增加"one"字符串后,
它的延遲時(shí)間又變成了1毫秒
limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second)
queue := NewRateLimitingQueue(limiter).(*rateLimitingType)
fakeClock := testingclock.NewFakeClock(time.Now())
delayingQueue := &delayingType{
Interface: New(),
clock: fakeClock,
heartbeat: fakeClock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(""),
}
queue.DelayingInterface = delayingQueue
queue.AddRateLimited("one")
waitEntry := <-delayingQueue.waitingForAddCh
if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
t.Errorf("expected %v, got %v", e, a)
}
queue.AddRateLimited("one")
waitEntry = <-delayingQueue.waitingForAddCh
if e, a := 2*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, queue.NumRequeues("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
queue.AddRateLimited("two")
waitEntry = <-delayingQueue.waitingForAddCh
if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
t.Errorf("expected %v, got %v", e, a)
}
queue.AddRateLimited("two")
waitEntry = <-delayingQueue.waitingForAddCh
if e, a := 2*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
t.Errorf("expected %v, got %v", e, a)
}
queue.Forget("one")
if e, a := 0, queue.NumRequeues("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
queue.AddRateLimited("one")
waitEntry = <-delayingQueue.waitingForAddCh
if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
t.Errorf("expected %v, got %v", e, a)
}
此外這個(gè)包下面還有ItemFastSlowRateLimiter,BucketRateLimiter等。具體的大家可以查看default_rate_limiters.go(代碼:https://github.com/kubernetes/client-go/blob/master/util/workqueue/default_rate_limiters.go)。
應(yīng)用場(chǎng)景
延遲隊(duì)列場(chǎng)景:
1、訂單延遲支付關(guān)閉
常見的打車軟件都會(huì)有匹配司機(jī),這個(gè)可以用延遲隊(duì)列來實(shí)現(xiàn);處理已提交訂單超過30分鐘未付款失效的訂單,延遲隊(duì)列可以很好的解決;又或者注冊(cè)了超過30天的用戶,發(fā)短信撩動(dòng)等。
2、定時(shí)任務(wù)調(diào)度
比如使用DelayQueue保存當(dāng)天將會(huì)執(zhí)行的任務(wù)和執(zhí)行時(shí)間,或是需要設(shè)置一個(gè)倒計(jì)時(shí),倒計(jì)時(shí)結(jié)束后更新數(shù)據(jù)庫(kù)中某個(gè)表狀態(tài)
限速隊(duì)列場(chǎng)景:
比如限制數(shù)據(jù)隊(duì)列的寫入速度。