自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

K8s Informer 是如何保證事件不丟失的?

云計(jì)算 云原生
k8s 是如何 "監(jiān)視" 資源對(duì)象,以確保其始終保持我們聲明的狀態(tài)的呢?答案就是 -- Controller。除了組件中的 kube-controller-manager,我們可以編寫自己的 Controller,也叫自定義控制器(為了方便下文統(tǒng)稱為自定義 Controller)。

1、資源 Controller 主要作用

我們知道 k8s 里重要概念之一就是 聲明式 API,比如 kubectl apply 就是聲明式 API的實(shí)現(xiàn)。

效果就是資源對(duì)象的運(yùn)行狀態(tài)要與我們聲明的一致。比如kubectl apply 一個(gè) deployment 的 yml,他要求的狀態(tài)就是: 該 deployment 成功運(yùn)行。

那么問題來(lái)了,k8s 是如何 "監(jiān)視" 資源對(duì)象,以確保其始終保持我們聲明的狀態(tài)的呢?答案就是 -- Controller。除了組件中的 kube-controller-manager,我們可以編寫自己的 Controller,也叫自定義控制器(為了方便下文統(tǒng)稱為自定義 Controller)。

接下來(lái),我們就來(lái)剖析一下 Controller 背后的"秘密"

2、流程大覽

我們先看看社區(qū)給出的 Controller 的架構(gòu)圖:

其中有幾個(gè)主要對(duì)象(結(jié)構(gòu)體) -- Reflector、Informer、Indexer。Reflector 和 Indexer 我們會(huì)在之后的文章中會(huì)一一講解 。

本文主要是講解一下 Informer。

從圖中可以看到主要有9個(gè)步驟,這里我將9個(gè)步驟合并成3個(gè)大步驟:

(畫的有點(diǎn)丑-__- !!!)

大步驟1: Reflector 將資源對(duì)象的事件添加進(jìn) Delta FIFO queue 中。

這里先提前介紹一下 Delta FIFO queue。所謂 Delta 就是變化的意思,什么的變化呢?就是資源對(duì)象的變化。

即 資源對(duì)象的變化都會(huì)被添加到 Delta FIFO queue 中!這樣是不是就很好理解了。

大步驟2: Informer 將 Delta FIFO queue 中的對(duì)象數(shù)據(jù) 添加到本地 cache 中。

補(bǔ)充一下這個(gè)本地 cache 緩存的就是監(jiān)聽資源對(duì)象的最新版。就是緩存的當(dāng)前集群里面的資源信息。

大步驟3: 使用 workqueue 處理業(yè)務(wù)邏輯。

3、步驟分析

咱們結(jié)合社區(qū)給的編寫的 自定義Controller用例 來(lái)做源碼分析。這里使用的版本是 client-go v0.20.5。

用例中用到的是普通 informer,介紹的也是普通 informer。但很更多用的是sharedInformer,比如 manager、SharedInformerFactory 都是對(duì)普通 informer 的一個(gè)再封裝,本質(zhì)的東西是一樣的。感興趣的話,后面再出介紹 sharedInformer、manager 的文章。

大步驟1

我們看到架構(gòu)圖中間有一個(gè)分界線,將流程分割為上下兩半, 而上半部主要包括大步驟 1、2。

這兩個(gè)步驟其實(shí)是連在一起的,其入口代碼就是這一行 : informer.Run(),可以先不管這。

我們先看用例中Informer的初始化入口代碼。

NewIndexerInformer  的代碼如下:

再真正的Informer初始化,就是 newInformer :

注意第381行 就是 Delta FIFO 的初始化,架構(gòu)圖中的 Delta FIFO queue 就是在這實(shí)例化的。

我們發(fā)現(xiàn) newInformer 返回的 是一個(gè) low-level Controller 接口。這個(gè)接口抽象的很簡(jiǎn)單,就三個(gè)方法:

Run(stopCh <-chan struct{}):

運(yùn)行邏輯。

HasSynced() bool :

數(shù)據(jù)同步完成與否

LastSyncResourceVersion() string:

資源最近一次的ResourceVersion

接下來(lái)我們看看三個(gè)方法是如何在 controller 中看到這實(shí)現(xiàn)的。

咱們直接跳轉(zhuǎn)到 419 行里面的代碼,low-level Controller 的初始化, 可以很方便就看到了 Run 方法的實(shí)現(xiàn):

大部分代碼是 Reflector 的初始化。

第152行 啟動(dòng)了一個(gè)協(xié)程,*r.Run* 就是 Reflector 的執(zhí)行邏輯:List & Watch 資源對(duì)象,然后 Add object to Delta FIFO queue 。

咱們點(diǎn)擊跳轉(zhuǎn),直接跳到 ListAndWatch 方法中, 雖然這兒的代碼又多又亂(忍不住吐槽),但它要的做的事很簡(jiǎn)單,就四件事。這里我們就把重點(diǎn)代碼拷貝出來(lái)說(shuō)。

第一件事

用你初始化好的 cache.ListWatch 對(duì)象 的ListFunc拉取資源對(duì)象,然后將對(duì)象同步到 Delta FIFO queue:

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
......
......
list, paginatedResult, err = pager.List(context.Background(), options)
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
  r.setIsLastSyncResourceVersionUnavailable(true) 
  // 拉取資源列表
  list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
}
.....
resourceVersion = listMetaInterface.GetResourceVersion()
.....
items, err := meta.ExtractList(list) // 轉(zhuǎn)換成對(duì)象
......
if err := r.syncWith(items, resourceVersion); err != nil { // 將拉取到資源對(duì)象都添加到 Delta FIFO queue
  return fmt.Errorf("unable to sync list result: %v", err)
}
......
r.setLastSyncResourceVersion(resourceVersion) // 設(shè)置最近一次的版本
......
}

這里再簡(jiǎn)單說(shuō)明一下,r.syncWith(items, resourceVersion) 主要是通過 Delta FIFO queue 中的 Replace() 來(lái)同步資源。 其中有一個(gè)關(guān)鍵的邏輯如下:

if !f.populated {
  f.populated = true
  f.initialPopulationCount = len(list) + queuedDeletions
}

f.populated =true 就是確定資源對(duì)象進(jìn)入隊(duì)列的動(dòng)作已經(jīng)發(fā)生;f.initialPopulationCount 就是確定已經(jīng)有多少對(duì)象在隊(duì)列中了。

然后我們看 informer HasSynced() 的底層邏輯:

func (f *DeltaFIFO) HasSynced() bool {
 f.lock.Lock()
 defer f.lock.Unlock()
 return f.populated && f.initialPopulationCount == 0
}

而 f.initialPopulationCount-- 發(fā)生在下文的 pop 中。

LastSyncResourceVersion() string 返回的版本,就是r.setLastSyncResourceVersion(resourceVersion) 設(shè)置的。

第二件事

再次同步資源。

go func() {
  resyncCh, cleanup := r.resyncChan()
  defer func() {
   cleanup() // Call the last one written into cleanup
  }()
  for {
   select {
   case <-resyncCh:
   case <-stopCh:
    return
   case <-cancelCh:
    return
   }
   if r.ShouldResync == nil || r.ShouldResync() {
    klog.V(4).Infof("%s: forcing resync", r.name)
    if err := r.store.Resync(); err != nil {
     resyncerrc <- err
     return
    }
   }
   cleanup()
   resyncCh, cleanup = r.resyncChan()
  }
 }()

用例代碼 中 cache.NewIndexerInformer() 會(huì)設(shè)置一個(gè) resyncPeriod 參數(shù)就是在這起作用。

設(shè)置的是 0,所以這個(gè)協(xié)程會(huì)永遠(yuǎn)阻塞在 case<-resyncCh。

這的詳細(xì)邏輯會(huì)放在之后講 Delta FIFO queue 的時(shí)候再講,簡(jiǎn)單理解就是將 indexer 緩存的數(shù)據(jù)用同步到 Delta FIFO queue 中。

第三件事

用你初始化好的 cache.ListWatch 對(duì)象的 WatchFunc watch 對(duì)象。

這里的 watch 功能是底層就是 etcd 的 watch 特性功能,感興趣的同學(xué)可以自己了解一下,這里就不展開說(shuō)明了。

w, err := r.listerWatcher.Watch(options)
if err != nil {   
  if utilnet.IsConnectionRefused(err) {
    <-r.initConnBackoffManager.Backoff().C()
    continue
  }
  return err
}

if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
......
......
}

第四件事

將watch到的對(duì)象,加入到Delta FIFO queue中。

// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
....
....
loop:
 for {
  select {
  case <-stopCh:
   return errorStopRequested
  case err := <-errc:
   return err
  case event, ok := <-w.ResultChan():
....
....
   switch event.Type {
   case watch.Added:
    err := r.store.Add(event.Object)
    if err != nil {
     utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
    }
   case watch.Modified:
    err := r.store.Update(event.Object)
    if err != nil {
     utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
    }
   case watch.Deleted:
    // TODO: Will any consumers need access to the "last known
    // state", which is passed in event.Object? If so, may need
    // to change this.
    err := r.store.Delete(event.Object)
    if err != nil {
     utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
    }
                       }
                       ......
                       r.setLastSyncResourceVersion(newResourceVersion) // 設(shè)置最近一次的版本
    
......
......
}

再簡(jiǎn)單歸納一下,就兩件事:

  • 一開始,拉取資源列表然后加入到Delta FIFO queue
  • watch 資源對(duì)象的變化,加入到Delta FIFO queue

大步驟2

Indexerer 其實(shí)算是一個(gè)內(nèi)存數(shù)據(jù)庫(kù)的抽象接口。其中Store當(dāng)然就代表的存儲(chǔ),其他的就是索引相關(guān)的。

// client-go/tools/cache/store.go
type cache struct {
 // cacheStorage bears the burden of thread safety for the cache
 cacheStorage ThreadSafeStore
 // keyFunc is used to make the key for objects stored in and retrieved from items, and
 // should be deterministic.
 keyFunc KeyFunc
}

cache 就是接口的實(shí)現(xiàn),就是一個(gè)緩存。索引肯定是用作搜索的,其使用咱們下文在 作死的優(yōu)化 那一節(jié)可以看到。

然后我們退回看 Run 方法截圖的第154 行代碼,看看第二大步驟的邏輯。

wait.Until 就是一個(gè)定時(shí)器,簡(jiǎn)化成下面的代碼:

func Util(stopCh <-chan struct{}) {
    dur := 1 * time.Second
 timer := time.NewTimer(dur)
 defer timer.Stop()

 for {
          select {
         case <-stopCh:
         return
          case <-t.C():
         f()
         timer.Reset(dur)
          }
 }
}

執(zhí)行的邏輯就是 c.processLoop:

其實(shí)代碼很容易理解,就是將隊(duì)列 (Delta FIFO queue)的item 彈出,然后調(diào)用處理函數(shù)執(zhí)行ResourceEventHandler中的方法。

先看跳轉(zhuǎn)到 Pop 代碼:

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
 ......
 ......
 id := f.queue[0]
 f.queue = f.queue[1:]
        if f.initialPopulationCount > 0 {
   f.initialPopulationCount-- // 同步數(shù)據(jù)減1
 }
 ......
 item, ok := f.items[id]
 ......
 err := process(item)
 ......
}

它的內(nèi)容其實(shí)比較簡(jiǎn)單,這里只羅列出了最主要的邏輯,相信大伙兒能看明白。而且也看到了上文提到確定同步的關(guān)鍵邏輯 f.initialPopulationCount--

也就是說(shuō)只有 Delta FIFO queue 中的所有數(shù)據(jù)都同步到了 Indexer 中,informer 的數(shù)據(jù)同步才算完成。

然后咱們?cè)賮?lái)看 process ,就是 newInformer截圖 圖中我們第393行的 Process ,展開的方法:

Process: func(obj interface{}) error {
   for _, d := range obj.(Deltas) {
    switch d.Type {
    case Sync, Replaced, Added, Updated:
     if old, exists, err := clientState.Get(d.Object); err == nil && exists {
      if err := clientState.Update(d.Object); err != nil {
       return err
      }
      h.OnUpdate(old, d.Object)
     } else {
      if err := clientState.Add(d.Object); err != nil {
       return err
      }
      h.OnAdd(d.Object)
     }
    case Deleted:
     if err := clientState.Delete(d.Object); err != nil {
      return err
     }
     h.OnDelete(d.Object)
    }
   }
   return nil
  }

我們看 Process 匿名函數(shù)的行參,obj 就是 Pop 出的對(duì)象。根據(jù) Delta類型d.Type 來(lái)判斷對(duì)對(duì)象的處理方式。clientState 就是 Indexer,h 就是 ResourceEventHandler。

所以 Pop出來(lái)的對(duì)象,馬上就進(jìn)入了 Indexer中,然后再調(diào)用 ResourceEventHandler 對(duì)應(yīng)的方法,這里我們就是將 object 的 key 加入到 workqueue 中。

它各種方法的對(duì)應(yīng)的操作就是 這段代碼。

大步驟3

最后就是我們自己的應(yīng)用程序,來(lái)處理各種資源事件(Add、Update、Delete)。由于Workqueue的存在,就簡(jiǎn)化成處理隊(duì)列里面的元素。

我們直接可以看這個(gè)processNextItem 函數(shù)。

第55行,獲取隊(duì)列里面的數(shù)據(jù)。

第65行,就是我們處理對(duì)象的業(yè)務(wù)邏輯。syncToStdout 只是打印一些日志, 但其中 obj, exists, err := c.indexer.GetByKey(key) 這行代碼很關(guān)鍵,就是從 indexer 中獲取資源對(duì)象。有了它我們就能處理各種業(yè)務(wù)邏輯,比如我自己工作一般就是將與ResourceEventHandler定義的變化(AddFunc、UpdateFunc、DeleteFunc 你可以只有AddFunc)的對(duì)象寫回我們自己的云平臺(tái)。

類似代碼如下(syncToStdout 換成了 action):

func (d *Deployment) action(key string) error {
 obj, exists, err := d.indexer.GetByKey(key)
 if err != nil {
  return fmt.Errorf("fetching object with key %s from store failed with %w", key, err)
 }
 ns, deploymentName, err := cache.SplitMetaNamespaceKey(key)
 if err != nil {
  return err
 }
 
 if exists {
  deployment, ok := obj.(*apps_v1.Deployment) // 一定要斷言資源類型,這里類型要同 list & watch 方法中的一致。github的例子是pod,這里用的是deployment
  
  if !ok {
   return fmt.Errorf("type asset fault")
  }
  
  post(deployment) // 將資源傳回的偽代碼
 }
 
 return nil
}

到這,3大步驟就結(jié)束了。

4、補(bǔ)充一個(gè)知識(shí):

第三大步驟主要就是對(duì) workqueue 的調(diào)用。而 workqueue 有三大類:

  • 普通隊(duì)列
  • 延遲隊(duì)列
  • 限速隊(duì)列

延遲隊(duì)列是對(duì)普通隊(duì)列的封裝。而限速隊(duì)列是對(duì)延遲對(duì)列的封裝,外加一個(gè)限速器。

我們一般使用限速隊(duì)列,方便我們?cè)谔幚礤e(cuò)誤的時(shí)候重試。

處理完后,還要記得從隊(duì)列中移除正在處理的 key

defer c.queue.Done(key)

重試與移除在 用例代碼 中寫的非常清楚,一定不要漏掉這兩塊重要的邏輯。

作死的“優(yōu)化”

我們可能會(huì)發(fā)現(xiàn) workqueue 有點(diǎn)多余。我們完全可以直接在ResourceEventHandler中處理業(yè)務(wù)邏輯嘛!代碼如下:

func NewPodWithOutWorkQueue(ctx context.Context, clientset *kubernetes.Clientset) {
 //workQueue := workqueue.NewDelayingQueue()
 namespace := meta_v1.NamespaceAll
 listWatcher := &cache.ListWatch{
  ListFunc: func(options meta_v1.ListOptions) (runtime2.Object, error) {
   //options.LabelSelector = requireLabel.String()
   return clientset.CoreV1().Pods(namespace).List(ctx, options)
  },
  WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) {
   //options.LabelSelector = requireLabel.String()
   return clientset.CoreV1().Pods(namespace).Watch(ctx, options)
  },
 }

 indexer, informer := cache.NewIndexerInformer(listWatcher, &core_v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
  AddFunc: func(obj interface{}) {
   key, err := cache.MetaNamespaceKeyFunc(obj)
   if err == nil {
    fmt.Println("add: ", key)
   }
  },
  DeleteFunc: func(obj interface{}) {
   key, err := cache.MetaNamespaceKeyFunc(obj)
   if err == nil {
    fmt.Println("delete: ", key)
   }
  },
 }, cache.Indexers{
  cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
 })

 go informer.Run(ctx.Done())

 go GetIndexer(indexer)
}

func GetIndexer(idx cache.Indexer)  {
 for {
  time.Sleep( 3 * time.Second)
  fmt.Println("GetIndexers:", idx.ListIndexFuncValues(cache.NamespaceIndex))
 }
}

這里我們還借機(jī)查看了Indexer里面的信息。

其中 GetIndexer 就會(huì)打印以 namespace 聚合數(shù)據(jù)??梢院?jiǎn)單理解成下面的 sql 語(yǔ)句select namespace from xx_table。

為什么說(shuō)是作死呢?我們有些小伙伴就是這樣寫的,以為不依賴一個(gè)組件就是“優(yōu)化”,但卻沒有思考過為什么官方用例、 manager 中都會(huì)用到 workqueue。

所以這就要引申一個(gè)問題 為什么要用 workqueue ?原因如下:

  • 在不依賴 Delta FIFO queue 的情況下,將資源事件變得有序。
  • workqueue 也可以當(dāng)作緩存看。將要處理的事件以 key 的方式先緩存在 workqueue 中。
緩存的作用相信很多人都清楚:解決兩個(gè)組件處理速度不匹配的問題,如 cpu 和 硬盤之間經(jīng)常是用 內(nèi)存做緩存。
我們的業(yè)務(wù)處理邏輯大概率肯定是慢于事件的生成的,而且還延遲隊(duì)列類型做選擇

方便失敗后重試。

加個(gè)煎蛋

這可以算個(gè)番外系列,不感興趣的朋友可以直接跳過。

有些同學(xué)其實(shí)已經(jīng)發(fā)現(xiàn),我們完全不可以不用那么多隊(duì)列的(Delta FIFO queue,Workqueue),甚至還用了個(gè)小數(shù)據(jù)庫(kù)(Indexer)!

我們可不可以直接Watch對(duì)象?即相當(dāng)于直接調(diào)用 etcd 的 watch API。答案是可以的。

我們借鑒一下這里的代碼。

實(shí)現(xiàn)一個(gè)pod的watch, 代碼如下:

func NewPodOnlyWithWatch(ctx context.Context, clientset *kubernetes.Clientset) {
 onlyWatch := &cache.ListWatch{
  WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) {
   //options.LabelSelector = requireLabel.String()
   //options.ResourceVersion = ""
   return clientset.CoreV1().Pods("devops").Watch(ctx, meta_v1.ListOptions{})
  },
 }

 watcher, err := watch2.NewRetryWatcher("1", onlyWatch)
 if err != nil {
  panic(err)
 }

 // Give the watcher a chance to get to sending events (blocking)
 time.Sleep(10 * time.Millisecond)

 for {
  select {
  case event, ok := <-watcher.ResultChan():
   if !ok {
    fmt.Println("ResultChan closed")
    return
   }

   //fmt.Println("get event")
   if pod, ok := event.Object.(*core_v1.Pod); ok {
    switch event.Type {
    case watch.Added:
     fmt.Printf("新增事件:%s/%s\n", pod.Namespace, pod.Name)
    case watch.Deleted:
     fmt.Printf("刪除事件:%s/%s\n", pod.Namespace, pod.Name)
    case watch.Modified:
     fmt.Printf("更新事件:%s/%s\n", pod.Namespace, pod.Name)
    default:
     fmt.Printf("%s事件:%s\n", event.Type, pod.Name)

    }
   }

  case <-watcher.Done():
   fmt.Println("watcher down")
   return
  }
 }
}

但不建議直接watcher。其中之一就是:從業(yè)務(wù)視角會(huì)看到的重復(fù)性事件。即資源對(duì)象的一個(gè)更新動(dòng)作,收到多個(gè)事件。

5、總結(jié)

我們常說(shuō)的Controller 他最核心的能力就是能監(jiān)控到資源的任何變化,也就是 聲明式 概念中保證狀態(tài)的關(guān)鍵技術(shù) --  Informer,流程是:

  • Reflector 將對(duì)象加入到Delta FIFO queue中。
  • 然后 informer 將其 pop 出,加入到 Indexer中,以及 resourceEventHandler。
  • 最后就是我們自己的業(yè)務(wù)邏輯, 即:我們自己先到workqueue中,拿到 key,然后用 key 去Indexer 中換取對(duì)象,最后處理對(duì)象。

然后我們又通過 一個(gè)錯(cuò)誤的*優(yōu)化* 的例子,講清楚了 workqueue 的重要性。

我們還可以再 geek 一點(diǎn),選擇直接watch對(duì)象變化的事件,但個(gè)人不建議這樣做。

這一篇文章主要是介紹了 資源事件通過 informer 扭轉(zhuǎn)到 ResourceEventHandler 中的大體流程,并沒有講很多細(xì)節(jié)的部分。

因?yàn)槲覀冞€需要掌握一些關(guān)鍵的組件:Delta FIFO queue、Indexer、workqueue

當(dāng)這些都清楚了后,再來(lái)了解流程的細(xì)節(jié),那就非常輕松了。

當(dāng)然了除了知道了上面的內(nèi)容,我們還應(yīng)該掌握 sharedInformer 以及寫 Controller 的“神器” -- controller-runtime 再封裝的 manager。

如果大家感興趣的話再后面的文章再作詳細(xì)介紹。當(dāng)了解完了這些后,相信 Controller 中的任何技術(shù)細(xì)節(jié)問題都難不倒你了。

責(zé)任編輯:姜華 來(lái)源: 運(yùn)維開發(fā)故事
相關(guān)推薦

2022-04-22 13:32:01

K8s容器引擎架構(gòu)

2024-11-11 07:05:00

Redis哨兵模式主從復(fù)制

2021-01-12 08:03:19

Redis數(shù)據(jù)系統(tǒng)

2024-02-26 08:10:00

Redis數(shù)據(jù)數(shù)據(jù)庫(kù)

2024-08-06 09:55:25

2023-11-06 07:16:22

WasmK8s模塊

2020-11-10 07:05:41

DockerK8S云計(jì)算

2022-04-05 09:24:57

K8s安全網(wǎng)絡(luò)安全時(shí)間響應(yīng)

2019-03-13 09:27:57

宕機(jī)Kafka數(shù)據(jù)

2020-07-30 09:10:21

DockerK8s容器

2024-06-18 08:26:22

2023-11-27 13:18:00

Redis數(shù)據(jù)不丟失

2021-10-22 08:37:13

消息不丟失rocketmq消息隊(duì)列

2022-04-29 10:40:38

技術(shù)服務(wù)端K8s

2023-11-02 08:01:22

2024-02-23 14:53:10

Redis持久化

2023-09-06 08:12:04

k8s云原生

2024-01-26 14:35:03

鑒權(quán)K8sNode

2024-08-30 08:23:06

2020-12-31 07:34:04

Redis數(shù)據(jù)宕機(jī)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)