自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

一文讀懂K8s controller-runtime

云計(jì)算 云原生
利用kubebuilder、operator-sdk等框架,可以快速生成相應(yīng)資源對(duì)象的controller代碼。接下來(lái),以kubebuilder為例,對(duì)controller代碼邏輯進(jìn)行解析。

K8s開(kāi)發(fā)中,經(jīng)常能聽(tīng)過(guò)controller的概念,那么這些概念在K8s底層是如何實(shí)現(xiàn),本文將詳細(xì)介紹。

Controller

在K8s中,實(shí)現(xiàn)一個(gè)controller是通過(guò)controller-runtime(https://github.com/kubernetes-sigs/controller-runtime) 框架來(lái)實(shí)現(xiàn)的,包括Kubebuilder、operator-sdk等工具也只是在controller-runtime上做了封裝,以便開(kāi)發(fā)者快速生成項(xiàng)目的腳手架而已。

Controller定義在pkg/internal/controller/controller,一個(gè)controller主要包含Watch和Start兩個(gè)方法,以及一個(gè)調(diào)協(xié)方法Reconcile。在controller的定義中,看上去沒(méi)有資源對(duì)象的Informer或者Indexer數(shù)據(jù),而在K8s中所有與kube-apiserver資源的交互是通過(guò)Informer實(shí)現(xiàn)的,實(shí)際上這里是通過(guò)下面的 startWatches 屬性做了一層封裝。

type Controller struct {
   // Name is used to uniquely identify a Controller in tracing, logging and monitoring.  Name is required.
   Name string

   // MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
   MaxConcurrentReconciles int

   // Reconciler is a function that can be called at any time with the Name / Namespace of an object and
   // ensures that the state of the system matches the state specified in the object.
   // Defaults to the DefaultReconcileFunc.
   Do reconcile.Reconciler

   // MakeQueue constructs the queue for this controller once the controller is ready to start.
   // This exists because the standard Kubernetes workqueues start themselves immediately, which
   // leads to goroutine leaks if something calls controller.New repeatedly.
   MakeQueue func() workqueue.RateLimitingInterface

   // Queue is an listeningQueue that listens for events from Informers and adds object keys to
   // the Queue for processing
   Queue workqueue.RateLimitingInterface

   // SetFields is used to inject dependencies into other objects such as Sources, EventHandlers and Predicates
   // Deprecated: the caller should handle injected fields itself.
   SetFields func(i interface{}) error

   // mu is used to synchronize Controller setup
   mu sync.Mutex

   // Started is true if the Controller has been Started
   Started bool

   // ctx is the context that was passed to Start() and used when starting watches.
   //
   // According to the docs, contexts should not be stored in a struct: https://golang.org/pkg/context,
   // while we usually always strive to follow best practices, we consider this a legacy case and it should
   // undergo a major refactoring and redesign to allow for context to not be stored in a struct.
   ctx context.Context

   // CacheSyncTimeout refers to the time limit set on waiting for cache to sync
   // Defaults to 2 minutes if not set.
   CacheSyncTimeout time.Duration

   // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
   startWatches []watchDescription

   // LogConstructor is used to construct a logger to then log messages to users during reconciliation,
   // or for example when a watch is started.
   // Note: LogConstructor has to be able to handle nil requests as we are also using it
   // outside the context of a reconciliation.
   LogConstructor func(request *reconcile.Request) logr.Logger

   // RecoverPanic indicates whether the panic caused by reconcile should be recovered.
   RecoverPanic *bool
}

Watch()

Watch方法首先會(huì)判斷當(dāng)前的controller是否已啟動(dòng),如果未啟動(dòng),會(huì)將watch的內(nèi)容暫存到startWatches中等待controller啟動(dòng)。如果已啟動(dòng),則會(huì)直接調(diào)用src.Start(c.ctx, evthdler, c.Queue, prct...), 其中Source可以為informer、kind、channel等。

// Watch implements controller.Controller.
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
    ...
    // Controller hasn't started yet, store the watches locally and return.
    //
    // These watches are going to be held on the controller struct until the manager or user calls Start(...).
    if !c.Started {
        c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
        return nil
    }

    c.LogConstructor(nil).Info("Starting EventSource", "source", src)
    return src.Start(c.ctx, evthdler, c.Queue, prct...)
}

以informer為例,會(huì)通過(guò)以下方法添加對(duì)應(yīng)的EventHandler

_, err := is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})

以kind為例,會(huì)通過(guò)以下方法添加對(duì)應(yīng)的EventHandler:

i, lastErr = ks.cache.GetInformer(ctx, ks.Type)
_, err := i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})

internal.EventHandler 實(shí)現(xiàn)了 OnAdd、OnUpdate、OnDelete 幾個(gè)方法。也就是說(shuō)src.Start方法作用是獲取對(duì)應(yīng)的informer,并注冊(cè)對(duì)應(yīng)的EventHandler。

Start()

Start方法有兩個(gè)主要功能,一是調(diào)用所有startWatches中Source的start方法,注冊(cè)EventHandler。

for _, watch := range c.startWatches {
   c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))

   if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
      return err
   }
}

二是啟動(dòng)Work來(lái)處理資源對(duì)象。

for i := 0; i < c.MaxConcurrentReconciles; i++ {
   go func() {
      defer wg.Done()
      // Run a worker thread that just dequeues items, processes them, and marks them done.
      // It enforces that the reconcileHandler is never invoked concurrently with the same object.
      for c.processNextWorkItem(ctx) {
      }
   }()
}

processNextWorkItem從Queue中獲取資源對(duì)象,reconcileHandler 函數(shù)就是我們真正執(zhí)行元素業(yè)務(wù)處理的地方,函數(shù)中包含了事件處理以及錯(cuò)誤處理,真正的事件處理是通過(guò)c.Do.Reconcile(req) 暴露給開(kāi)發(fā)者的,所以對(duì)于開(kāi)發(fā)者來(lái)說(shuō),只需要在 Reconcile 函數(shù)中去處理業(yè)務(wù)邏輯就可以了。

func (c *Controller) processNextWorkItem(ctx context.Context) bool {
    obj, shutdown := c.Queue.Get()
    if shutdown {
        // Stop working
        return false
    }

    // We call Done here so the workqueue knows we have finished
    // processing this item. We also must remember to call Forget if we
    // do not want this work item being re-queued. For example, we do
    // not call Forget if a transient error occurs, instead the item is
    // put back on the workqueue and attempted again after a back-off
    // period.
    defer c.Queue.Done(obj)

    ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
    defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)

    c.reconcileHandler(ctx, obj)
    return true
}

// Reconcile implements reconcile.Reconciler.
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
    defer func() {
        if r := recover(); r != nil {
            if c.RecoverPanic != nil && *c.RecoverPanic {
                for _, fn := range utilruntime.PanicHandlers {
                    fn(r)
                }
                err = fmt.Errorf("panic: %v [recovered]", r)
                return
            }

            log := logf.FromContext(ctx)
            log.Info(fmt.Sprintf("Observed a panic in reconciler: %v", r))
            panic(r)
        }
    }()
    return c.Do.Reconcile(ctx, req)
}

Reconcile

Controller的調(diào)協(xié)邏輯在Reconcile中執(zhí)行。

type Reconciler interface {
   // Reconcile performs a full reconciliation for the object referred to by the Request.
   // The Controller will requeue the Request to be processed again if an error is non-nil or
   // Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
   Reconcile(context.Context, Request) (Result, error)
}
type Request struct {
   // NamespacedName is the name and namespace of the object to reconcile.
   types.NamespacedName
}

Reconcile方法的入?yún)equest來(lái)自于controller.queue,并且會(huì)判斷隊(duì)列中的數(shù)據(jù)類(lèi)型是否為Reconcile.Request,如果數(shù)據(jù)類(lèi)型不一致,則不會(huì)執(zhí)行Reconcile的邏輯。

func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
   // Make sure that the object is a valid request.
   req, ok := obj.(reconcile.Request)
   if !ok {
      // As the item in the workqueue is actually invalid, we call
      // Forget here else we'd go into a loop of attempting to
      // process a work item that is invalid.
      c.Queue.Forget(obj)
      c.LogConstructor(nil).Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj)
      // Return true, don't take a break
      return
   }
}

那么數(shù)據(jù)是如何進(jìn)入隊(duì)列queue的呢,實(shí)際是通過(guò)Informer中的EventHandler入隊(duì)的?;氐絪rc.Start(c.ctx, evthdler, c.Queue, prct...)方法中,該方法為informer注冊(cè)了一個(gè)internal.EventHandler。internal.EventHandler實(shí)現(xiàn)了OnAdd、OnUpdate、OnDelete等方法,以O(shè)nAdd方法為例,該方法最后會(huì)調(diào)用EventHandler.Create 方法。

type EventHandler struct {
    EventHandler handler.EventHandler
    Queue        workqueue.RateLimitingInterface
    Predicates   []predicate.Predicate
}

// OnAdd creates CreateEvent and calls Create on EventHandler.
func (e EventHandler) OnAdd(obj interface{}) {
    c := event.CreateEvent{}

    // Pull Object out of the object
    if o, ok := obj.(client.Object); ok {
        c.Object = o
    } else {
        log.Error(nil, "OnAdd missing Object",
            "object", obj, "type", fmt.Sprintf("%T", obj))
        return
    }

    for _, p := range e.Predicates {
        if !p.Create(c) {
            return
        }
    }

    // Invoke create handler
    e.EventHandler.Create(c, e.Queue)
}

EventHandler為一個(gè)接口,有EnqueueRequestForObject、Funcs、EnqueueRequestForOwner、enqueueRequestsFromMapFunc四個(gè)實(shí)現(xiàn)類(lèi)。以EnqueueRequestForObject為例,其create方法為:

// Create implements EventHandler.
func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
    if evt.Object == nil {
        enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
        return
    }
    q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
        Name:      evt.Object.GetName(),
        Namespace: evt.Object.GetNamespace(),
    }})
}

所以Reconcile協(xié)調(diào)執(zhí)行的數(shù)據(jù)對(duì)象,實(shí)際是通過(guò)Informer中的EventHandler入隊(duì)的。

kubebuilder等腳手架框架解析

利用kubebuilder、operator-sdk等框架,可以快速生成相應(yīng)資源對(duì)象的controller代碼。接下來(lái),以kubebuilder為例,對(duì)controller代碼邏輯進(jìn)行解析。

一個(gè)完整的controller啟動(dòng)邏輯包含以下步驟:

1) 在main.go啟動(dòng)函數(shù)中,會(huì)定義一個(gè)controllerManager對(duì)象。

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
    Scheme:                 scheme,
    MetricsBindAddress:     metricsAddr,
    Port:                   9443,
    HealthProbeBindAddress: probeAddr,
    LeaderElection:         enableLeaderElection,
    LeaderElectionID:       "9a82ee0d.my.domain",
    CertDir:                "dir",
    ...
})

2)通過(guò)SetUpWithManager()方法,注冊(cè)每種資源對(duì)象的controller到controllerManager對(duì)象中。

if err = (&controllers.AppServiceReconciler{
    Client: mgr.GetClient(),
    Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
    setupLog.Error(err, "unable to create controller", "controller", "AppService")
    os.Exit(1)
}

3)啟動(dòng)controllerManager,也即啟動(dòng)對(duì)應(yīng)資源對(duì)象的controller。

if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
    setupLog.Error(err, "problem running manager")
    os.Exit(1)
}

主要的代碼邏輯在于SetUpWithManager()和mgr.Start()這兩個(gè)方法中。

// SetupWithManager sets up the controller with the Manager.
func (r *AppServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&appexamplecomv1.AppService{}).
        Complete(r)
}

Builder

ctrl.NewControllerManagedBy(mgr)會(huì)返回一個(gè)builder對(duì)象。

NewControllerManagedBy = builder.ControllerManagedBy

func ControllerManagedBy(m manager.Manager) *Builder {
    return &Builder{mgr: m}
}

builder為controller的構(gòu)造器,其結(jié)構(gòu)定義為:

type Builder struct {
    forInput         ForInput
    ownsInput        []OwnsInput
    watchesInput     []WatchesInput
    mgr              manager.Manager
    globalPredicates []predicate.Predicate
    ctrl             controller.Controller
    ctrlOptions      controller.Options
    name             string
}

ctrlOptions指定構(gòu)建controller的一些配置,主要是Reconciler。forInput指定被協(xié)調(diào)的對(duì)象本身,通過(guò)build.For()進(jìn)行設(shè)置。ownsInput指定被協(xié)調(diào)監(jiān)聽(tīng)的子對(duì)象資源,通過(guò)build.Owns()進(jìn)行設(shè)置。watchesInput能夠自定義EventHandler處理邏輯,通過(guò)build.Watches()進(jìn)行設(shè)置。所以,kubebuilder生成的controller默認(rèn)只會(huì)對(duì)協(xié)調(diào)的對(duì)象本身進(jìn)行調(diào)協(xié)。

type WatchesInput struct {
    src              source.Source
    eventhandler     handler.EventHandler
    predicates       []predicate.Predicate
    objectProjection objectProjection
}

builder.Complete()會(huì)調(diào)用Builder.Build()進(jìn)行構(gòu)造。Build()包含doController()和doWatch()這兩個(gè)重要方法。

DoController()

doController通過(guò)資源對(duì)象的 GVK 來(lái)獲取 Controller 的名稱(chēng),最后通過(guò)一個(gè) newController 函數(shù)來(lái)實(shí)例化Controller。

controllerName, err := blder.getControllerName(gvk, hasGVK)
blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)

newContrller為controller.New的別名,方法為:func New(name string, mgr manager.Manager, options Options) (Controller, error) { c, err := NewUnmanaged(name, mgr, options) if err != nil { return nil, err }

// Add the controller as a Manager components
   return c, mgr.Add(c)
}

c, err := NewUnmanaged(name, mgr, options)初始化Controller實(shí)例,Controller 實(shí)例化完成后,又通過(guò) mgr.Add(c) 函數(shù)將控制器添加到 Manager 中去進(jìn)行管理。controllerManager 的 Add 函數(shù)傳遞的是一個(gè) Runnable 參數(shù),Runnable 是一個(gè)接口,用來(lái)表示可以啟動(dòng)的一個(gè)組件,而恰好 Controller 實(shí)際上就實(shí)現(xiàn)了這個(gè)接口的 Start 函數(shù),所以可以通過(guò) Add 函數(shù)來(lái)添加 Controller 實(shí)例。

DoWatch()

DoWatch實(shí)現(xiàn)比較簡(jiǎn)單,就是調(diào)用controller.watch來(lái)注冊(cè)EventHandler事件。DoWatch方法會(huì)調(diào)用controller.Watch()方法來(lái)注冊(cè)EventHandler。可以看到對(duì)于forInput這類(lèi)資源,默認(rèn)的EventHandler為EnqueueRequestForObject,對(duì)于ownsInput這類(lèi)資源,默認(rèn)的EventHandler為EnqueueRequestForOwner,這兩類(lèi)handler已在上文提到過(guò),均實(shí)現(xiàn)了Create()、Update()、Delete()等方法,能夠?qū)⒈徽{(diào)協(xié)的資源對(duì)象入隊(duì)。

func (blder *Builder) doWatch() error {
    // Reconcile type
    typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
    if err != nil {
        return err
    }
    src := &source.Kind{Type: typeForSrc}
    hdler := &handler.EnqueueRequestForObject{}
    allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
    if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
        return err
    }

    // Watches the managed types
    for _, own := range blder.ownsInput {
        typeForSrc, err := blder.project(own.object, own.objectProjection)
        if err != nil {
            return err
        }
        src := &source.Kind{Type: typeForSrc}
        hdler := &handler.EnqueueRequestForOwner{
            OwnerType:    blder.forInput.object,
            IsController: true,
        }
        allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
        allPredicates = append(allPredicates, own.predicates...)
        if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
            return err
        }
    }

    // Do the watch requests
    for _, w := range blder.watchesInput {
        allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
        allPredicates = append(allPredicates, w.predicates...)

        // If the source of this watch is of type *source.Kind, project it.
        if srckind, ok := w.src.(*source.Kind); ok {
            typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
            if err != nil {
                return err
            }
            srckind.Type = typeForSrc
        }

        if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
            return err
        }
    }
    return nil
}

watchesInput這類(lèi)資源需要自己實(shí)現(xiàn)EventHandler,使用類(lèi)似以下方式實(shí)現(xiàn)相應(yīng)功能。根據(jù)之前的結(jié)論,controller中調(diào)協(xié)的資源對(duì)象來(lái)自于queue,而queue中的數(shù)據(jù)是通過(guò)EventHandler的Create、Update、Delete等處理邏輯進(jìn)行入隊(duì)的。因此這時(shí)controller的處理順序?yàn)椋篍ventHandler中定義的邏輯->入隊(duì)->Reconcile。

func (r *AppServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {

    klog.Infof("開(kāi)始Reconcile邏輯")
    ...
    return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *AppServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        Named("appServiceController").
        Watches(
            &source.Kind{
                Type: &appexamplecomv1.AppService{},
            },
            handler.Funcs{
                CreateFunc: func(createEvent event.CreateEvent, limitingInterface workqueue.RateLimitingInterface) {
                    klog.Infof("createFunc")
                    limitingInterface.Add(reconcile.Request{NamespacedName: types.NamespacedName{
                       Name:      createEvent.Object.GetName(),
                       Namespace: createEvent.Object.GetNamespace(),
                    }})
                },
                UpdateFunc: func(updateEvent event.UpdateEvent, limitingInterface workqueue.RateLimitingInterface) {
                    klog.Infof("updateFunc")
                },
                DeleteFunc: func(deleteEvent event.DeleteEvent, limitingInterface workqueue.RateLimitingInterface) {
                    klog.Infof("deleteFunc")
                },
            }).
        Complete(r)
}

上述代碼只有在Create時(shí)進(jìn)行了入隊(duì)處理,因此只有在創(chuàng)建資源時(shí)會(huì)進(jìn)入Reconcile的邏輯。

Manager.Start()

在注冊(cè)controller到manager后,需要使用mgr.Start(ctrl.SetupSignalHandler())來(lái)啟動(dòng)manager。之前說(shuō)過(guò),注冊(cè)Controller時(shí)調(diào)用DoController方法中的mgr.Add()將controller已runnable的形式添加到了Manager。Manager.start()正是調(diào)用了cm.runnables的start方法,也即controller.start()來(lái)啟動(dòng)controller。

func (cm *controllerManager) Start(ctx context.Context) (err error) {
    ...
    if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {
        if !errors.Is(err, wait.ErrWaitTimeout) {
            return err
        }
    }

    // Start and wait for caches.
    if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
        if !errors.Is(err, wait.ErrWaitTimeout) {
            return err
        }
    }

    // Start the non-leaderelection Runnables after the cache has synced.
    if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
        if !errors.Is(err, wait.ErrWaitTimeout) {
            return err
        }
    }
    ...
}
責(zé)任編輯:武曉燕 來(lái)源: CNCF
相關(guān)推薦

2022-11-24 14:32:00

云原生K8S

2023-12-20 08:13:54

K8S監(jiān)控管理

2021-11-29 13:13:57

網(wǎng)絡(luò)虛擬化容器

2024-10-23 11:34:18

云計(jì)算KubernetesAkamai

2023-12-22 19:59:15

2021-08-04 16:06:45

DataOps智領(lǐng)云

2021-05-07 14:03:36

大數(shù)據(jù)存儲(chǔ)接口CSI

2023-05-20 17:58:31

低代碼軟件

2023-11-27 17:35:48

ComponentWeb外層

2022-10-20 08:01:23

2022-07-26 00:00:03

語(yǔ)言模型人工智能

2021-12-29 18:00:19

無(wú)損網(wǎng)絡(luò)網(wǎng)絡(luò)通信網(wǎng)絡(luò)

2022-07-05 06:30:54

云網(wǎng)絡(luò)網(wǎng)絡(luò)云原生

2022-12-01 17:23:45

2018-09-28 14:06:25

前端緩存后端

2022-09-22 09:00:46

CSS單位

2025-04-03 10:56:47

2022-11-06 21:14:02

數(shù)據(jù)驅(qū)動(dòng)架構(gòu)數(shù)據(jù)

2022-06-06 08:48:37

整體架構(gòu)K8s

2023-09-02 21:27:09

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)