一篇帶給你pod創(chuàng)建源碼分析
大家好,我是華仔。
接觸kubernetes已經(jīng)4年多了,不過多是停留在能夠使用,對其原理、源碼不是很熟悉。對于平常執(zhí)行的命令,它背后執(zhí)行的流程、邏輯也不是很清楚。所以,最近打算去看看k8s各模塊的源碼。一來是加深對k8s各模塊的理解和認識;二來是方便以后遇到問題好分析問題的根本原因,有理有據(jù),則可以服人;再者后續(xù)跳槽也不怕被面試官的技術(shù)問題所難到了。那么今天,就來簡單說一說pod創(chuàng)建的源碼吧。文章有錯誤的地方還請指正,輕噴。首先,k8s的源碼在github上即可獲取。本次我看的是1.21.3。另外,很多翻譯都是直譯或翻譯軟件翻譯的。請諒解。
正文
1、k8s源碼中針對pod的增刪改查是在源碼包/pkg/kubelet/kubelet.go中的syncLoop()進行。如下所示:
- // syncLoop is the main loop for processing changes. It watches for changes from
- // three channels (file, apiserver, and http) and creates a union of them. For
- // any new change seen, will run a sync against desired state and running state. If
- // no changes are seen to the configuration, will synchronize the last known desired
- // state every sync-frequency seconds. Never returns.
- // syncLoop是處理更改的主循環(huán)。它感知來自三個channel(file,apiserver,http)的pod的變化,并且聚合它們。有任何的改變發(fā)生,將運行狀態(tài)同步為期望狀態(tài)。反之,則在每個同步周期內(nèi)同步最后已知的期望狀態(tài)。
- func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
- klog.InfoS("Starting kubelet main sync loop")
在syncLoop()中則通過kl.syncLoopIteration()針對pod具體執(zhí)行具體的操作。
- kl.syncLoopMonitor.Store(kl.clock.Now())
- if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
- break
- }
2、在syncLoopIteration有幾個重要的參數(shù),如下所示:
- // Arguments:
- // 1. configCh: a channel to read config events from
- // 2. handler: the SyncHandler to dispatch pods to
- // 3. syncCh: a channel to read periodic sync events from
- // 4. housekeepingCh: a channel to read housekeeping events from
- // 5. plegCh: a channel to read PLEG updates from
- // * configCh: dispatch the pods for the config change to the appropriate
- // handler callback for the event type
- // * plegCh: update the runtime cache; sync pod
- // * syncCh: sync all pods waiting for sync
- // * housekeepingCh: trigger cleanup of pods
- // * health manager: sync pods that have failed or in which one or more
- // containers have failed health checks
- func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
- syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
- select {
- case u, open := <-configCh:
- // Update from a config source; dispatch it to the right handler
- // callback.
- if !open {
- klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
- return false
- }
SyncHandler是一個interface。包含對pod常見操作的幾個方法。該接口由kubelet來實現(xiàn)。如下所示:
- // SyncHandler is an interface implemented by Kubelet, for testability
- # pod創(chuàng)建、更新、 刪除...
- type SyncHandler interface {
- HandlePodAdditions(pods []*v1.Pod)
- HandlePodUpdates(pods []*v1.Pod)
- HandlePodRemoves(pods []*v1.Pod)
- HandlePodReconcile(pods []*v1.Pod)
- HandlePodSyncs(pods []*v1.Pod)
- HandlePodCleanups() error
- }
3、針對pod可進行的操作如下,每個操作都有對應的方法。比如ADD,就會去執(zhí)行HandlePodAdditions方法
- // These constants identify the PodOperations that can be made on a pod configuration.
- const (
- // SET is the current pod configuration.
- SET PodOperation = iota
- // ADD signifies pods that are new to this source.
- ADD
- // DELETE signifies pods that are gracefully deleted from this source.
- DELETE
- // REMOVE signifies pods that have been removed from this source.
- REMOVE
- // UPDATE signifies pods have been updated in this source.
- UPDATE
- // RECONCILE signifies pods that have unexpected status in this source,
- // kubelet should reconcile status with this source.
- RECONCILE
- )
- switch u.Op {
- case kubetypes.ADD:
- klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", format.Pods(u.Pods))
- // After restarting, kubelet will get all existing pods through
- // ADD as if they are new pods. These pods will then go through the
- // admission process and *may* be rejected. This can be resolved
- // once we have checkpointing.
- handler.HandlePodAdditions(u.Pods)
4、HandlePodAdditions又是如何去執(zhí)行創(chuàng)建pod的呢?主要有以下幾個操作:
- 1. 根據(jù)pod的創(chuàng)建時間進行排序
- sort.Sort(sliceutils.PodsByCreationTime(pods))
- 2. 將pod添加到podmanager中.因為kubelet它會依賴這個pod manager作為期望狀態(tài)的一個憑證。
- 如果一個在pod manager中無法查詢,那么就意味著它已經(jīng)被apiserver刪除了,不再需要其他操作
- // Always add the pod to the pod manager. Kubelet relies on the pod
- // manager as the source of truth for the desired state. If a pod does
- // not exist in the pod manager, it means that it has been deleted in
- // the apiserver and no action (other than cleanup) is required.
- kl.podManager.AddPod(pod)
- 3. 判斷pod是不是靜態(tài)pod
- mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
- 4. 通過dispatchWork分發(fā)任務
- kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
- 5. 將pod加入到probe manager,即健康檢查.包括startup probe、liveness probe、readiness probe。
- kl.probeManager.AddPod(pod)
dispatchWork又做了哪些事情呢?如下:
- // Run the sync in an async worker. 在一個異步worker中執(zhí)行同步
- kl.podWorkers.UpdatePod(&UpdatePodOptions{
- Pod: pod,
- MirrorPod: mirrorPod,
- UpdateType: syncType,
- OnCompleteFunc: func(err error) {
- if err != nil {
- metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
- }
- },
- })
那么UpdatePod()又做哪些事情呢?
- // Creating a new pod worker either means this is a new pod, or that the
- // kubelet just restarted. In either case the kubelet is willing to believe
- // the status of the pod for the first pod worker sync. See corresponding
- // comment in syncPod.
- // 創(chuàng)建一個新的pod worker,意味著這是一個新的pod
- go func() {
- defer runtime.HandleCrash()
- p.managePodLoop(podUpdates)
- }()
managePodLoop()去執(zhí)行同步。
- for update := range podUpdates {
- err := func() error {
- podUID := update.Pod.UID
- // This is a blocking call that would return only if the cache
- // has an entry for the pod that is newer than minRuntimeCache
- // Time. This ensures the worker doesn't start syncing until
- // after the cache is at least newer than the finished time of
- // the previous sync.
- status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
- if err != nil {
- // This is the legacy event thrown by manage pod loop
- // all other events are now dispatched from syncPodFn
- p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
- return err
- }
- // 這里去做同步
- err = p.syncPodFn(syncPodOptions{
- mirrorPod: update.MirrorPod,
- pod: update.Pod,
- podStatus: status,
- killPodOptions: update.KillPodOptions,
- updateType: update.UpdateType,
- })
- lastSyncTime = time.Now()
- return err
- }()
5、最終調(diào)用到pkg/kubelet/kuberuntime/kuberuntime_manager.go中SyncPod()進行pod的創(chuàng)建
- // SyncPod syncs the running pod into the desired pod by executing following steps:
- // 執(zhí)行以下的步驟將運行的pod同步到期望的狀態(tài)
- // 1. Compute sandbox and container changes.
- // 計算sanbox和container改變
- // 2. Kill pod sandbox if necessary.
- // 如果有必要就刪除pod sandbox
- // 3. Kill any containers that should not be running.
- // 刪除不需要運行的容器
- // 4. Create sandbox if necessary.
- // 需要的情況下創(chuàng)建sandbox
- // 5. Create ephemeral containers.
- // 創(chuàng)建臨時容器
- // 6. Create init containers.
- // 創(chuàng)建初始化容器
- // 7. Create normal containers.
- // 創(chuàng)建普通容器
- func (m *kubeGenericRuntimeManager) SyncPod()
- // Step 1: Compute sandbox and container changes.
- podContainerChanges := m.computePodActions(pod, podStatus)
- klog.V(3).InfoS("computePodActions got for pod", "podActions", podContainerChanges, "pod", klog.KObj(pod))
- if podContainerChanges.CreateSandbox {
- ref, err := ref.GetReference(legacyscheme.Scheme, pod)
- if err != nil {
- klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
- }
- if podContainerChanges.SandboxID != "" {
- m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
- } else {
- klog.V(4).InfoS("SyncPod received new pod, will create a sandbox for it", "pod", klog.KObj(pod))
- }
- }
- // Step 2: Kill the pod if the sandbox has changed.
- if podContainerChanges.KillPod {
- // Step 3: kill any running containers in this pod which are not to keep.
- for containerID, containerInfo := range podContainerChanges.ContainersToKill {
- klog.V(3).InfoS("Killing unwanted container for pod", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod))
- killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
- result.AddSyncResult(killContainerResult)
- if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, containerInfo.reason, nil); err != nil {
- killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
- klog.ErrorS(err, "killContainer for pod failed", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod))
- return
- }
- // Step 4: Create a sandbox for the pod if necessary.
- podSandboxID := podContainerChanges.SandboxID
- if podContainerChanges.CreateSandbox {
- var msg string
- var err error
- klog.V(4).InfoS("Creating PodSandbox for pod", "pod", klog.KObj(pod))
- createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
- result.AddSyncResult(createSandboxResult)
- podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
- // Step 5: start ephemeral containers
- // These are started "prior" to init containers to allow running ephemeral containers even when there
- // are errors starting an init container. In practice init containers will start first since ephemeral
- // containers cannot be specified on pod creation.
- if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
- for _, idx := range podContainerChanges.EphemeralContainersToStart {
- start("ephemeral container", ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
- }
- }
- // Step 6: start the init container.
- if container := podContainerChanges.NextInitContainerToStart; container != nil {
- // Start the next init container.
- if err := start("init container", containerStartSpec(container)); err != nil {
- return
- }
- // Successfully started the container; clear the entry in the failure
- klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
- }
- // Step 7: start containers in podContainerChanges.ContainersToStart.
- for _, idx := range podContainerChanges.ContainersToStart {
- start("container", containerStartSpec(&pod.Spec.Containers[idx]))
- }
6、另外,pod worker還要做以下事情:
- # 創(chuàng)建pod數(shù)據(jù)目錄、volume、獲取image pull secrets。。。
- newPodWorkers(klet.syncPod --->pkg/kubelet/kubelet.go) //通過syncPod
- kubetypes.SyncPodKill
- kubetypes.SyncPodCreate
- podStatus.IPs = append(podStatus.IPs, ipInfo.IP)
- runnable.Admit
- kubetypes.IsStaticPod(pod)
- kl.makePodDataDirs(pod)
- kl.volumeManager.WaitForAttachAndMount(pod)
- kl.getPullSecretsForPod(pod)
- kl.containerRuntime.SyncPod(pkg/kubelet/container/runtime.go)
本文轉(zhuǎn)載自微信公眾號「運維開發(fā)故事」