自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

源碼分析 Kubernetes 對(duì) Pod IP 的管理

云計(jì)算 云原生
在源碼中,我看到了 InPlacePodVerticalScaling 這個(gè)參數(shù),發(fā)現(xiàn)是 Kubernetes 1.27 的一個(gè) alpha 特性,可以在不重啟 Pod 的情況下,調(diào)整 Pod 的資源配置;在寫(xiě) Operator 更新 CR 狀態(tài)時(shí),在合適的場(chǎng)景下,可以學(xué)習(xí) nodeCIDRUpdateChannel 的實(shí)現(xiàn),將更新的狀態(tài)放入 channel 中,然后通過(guò) goroutine 處理

1. kube-controller-manager 對(duì)網(wǎng)段的管理

在 kube-controller-manager 有眾多控制器,與 Pod IP 相關(guān)的是 NodeIpamController。

NodeIpamController 控制器主要是管理節(jié)點(diǎn)的 podcidr,當(dāng)有新節(jié)點(diǎn)加入集群時(shí),分配一個(gè)子網(wǎng)段給節(jié)點(diǎn);當(dāng)節(jié)點(diǎn)刪除時(shí),回收子網(wǎng)段。

每個(gè)節(jié)點(diǎn)的子網(wǎng)段不會(huì)重疊,每個(gè)節(jié)點(diǎn)都能夠獨(dú)立地完成 Pod IP 的分配。

下面看一個(gè) kube-controller-manager 的運(yùn)行示例:

kubectl -n kube-system get pod kube-controller-manager -o yaml

其中關(guān)于網(wǎng)段配置的部分為:

spec:
  containers:
    - command:
        - kube-controller-manager
        - --allocate-node-cidrs=true
        - --cluster-cidr=10.234.0.0/16
        - --node-cidr-mask-size=24
        - --service-cluster-ip-range=10.96.0.0/16

cluster-cidr 指定了 Pod IP 的范圍,掩碼位數(shù) 16,如果不考慮保留 IP,意味著集群最多可以容納 2^16 = 65536 個(gè) pod。

這些 Pod 分布在若干個(gè)節(jié)點(diǎn)上,接著看 node-cidr-mask-size 為 24,每個(gè)節(jié)點(diǎn)只剩下 32-24=8 位留給 pod,每個(gè)節(jié)點(diǎn)最多能創(chuàng)建 2^8=256 個(gè) pod。

相應(yīng)的,這個(gè)集群能夠容納的節(jié)點(diǎn)數(shù)量為 2^(32-16-8)=256 個(gè)節(jié)點(diǎn)。

在規(guī)劃集群時(shí),需要根據(jù)集群的規(guī)模來(lái)調(diào)整這兩個(gè)參數(shù)。

開(kāi)啟 allocate-node-cidrs、設(shè)置 cluster-cidr 之后,kube-controller-manager 會(huì)給每個(gè)節(jié)點(diǎn)分配子網(wǎng)段,將結(jié)果寫(xiě)入 spec.podCIDR 字段。

spec:
  podCIDR: 10.234.58.0/24
  podCIDRs:
    - 10.234.58.0/24

下面我們從源碼分析一下這一過(guò)程。

1. 啟動(dòng) NodeIpamController

func startNodeIpamController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
 // 如果 allocate-node-cidrs 沒(méi)有開(kāi)啟會(huì)立即返回
 if !controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs {
  return nil, false, nil
 }

 // 獲取 clusterCIDR, serviceCIDR 啟動(dòng) NodeIpamController
 nodeIpamController, err := nodeipamcontroller.NewNodeIpamController(
  ctx,
  controllerContext.InformerFactory.Core().V1().Nodes(),
  clusterCIDRInformer,
  controllerContext.Cloud,
  controllerContext.ClientBuilder.ClientOrDie("node-controller"),
  clusterCIDRs,
  serviceCIDR,
  secondaryServiceCIDR,
  nodeCIDRMaskSizes,
  ipam.CIDRAllocatorType(controllerContext.ComponentConfig.KubeCloudShared.CIDRAllocatorType),
 )
 go nodeIpamController.RunWithMetrics(ctx, controllerContext.ControllerManagerMetrics)
 return nil, true, nil
}

RunWithMetrics 只是提供了一些監(jiān)控指標(biāo),真正的啟動(dòng)邏輯在 Run 方法中。

func (nc *Controller) RunWithMetrics(ctx context.Context, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
 controllerManagerMetrics.ControllerStarted("nodeipam")
 defer controllerManagerMetrics.ControllerStopped("nodeipam")
 nc.Run(ctx)
}
func (nc *Controller) Run(ctx context.Context) {
 if nc.allocatorType == ipam.IPAMFromClusterAllocatorType || nc.allocatorType == ipam.IPAMFromCloudAllocatorType {
  go nc.legacyIPAM.Run(ctx)
 } else {
  go nc.cidrAllocator.Run(ctx)
 }

 <-ctx.Done()
}

1.2 監(jiān)聽(tīng)節(jié)點(diǎn)變化

在查找 cidrAllocator 接口實(shí)現(xiàn)的時(shí)候,我發(fā)現(xiàn)了三種 CIDR 分配器,分別是 RangeAllocator 適用單網(wǎng)段分配、MultiCIDRRangeAllocator 適用于多 CIDR、CloudCIDRAllocator 適用于對(duì)接云廠。

func New(ctx context.Context, kubeClient clientset.Interface, cloud cloudprovider.Interface, nodeInformer informers.NodeInformer, clusterCIDRInformer networkinginformers.ClusterCIDRInformer, allocatorType CIDRAllocatorType, allocatorParams CIDRAllocatorParams) (CIDRAllocator, error) {
 switch allocatorType {
 case RangeAllocatorType:
  return NewCIDRRangeAllocator(logger, kubeClient, nodeInformer, allocatorParams, nodeList)
 case MultiCIDRRangeAllocatorType:
  if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRRangeAllocator) {
   return nil, fmt.Errorf("invalid CIDR allocator type: %v, feature gate %v must be enabled", allocatorType, features.MultiCIDRRangeAllocator)
  }
  return NewMultiCIDRRangeAllocator(ctx, kubeClient, nodeInformer, clusterCIDRInformer, allocatorParams, nodeList, nil)

 case CloudAllocatorType:
  return NewCloudCIDRAllocator(logger, kubeClient, cloud, nodeInformer)
 default:
  return nil, fmt.Errorf("invalid CIDR allocator type: %v", allocatorType)
 }
}

這里看看 RangeAllocator 的實(shí)現(xiàn)。

func NewCIDRRangeAllocator(logger klog.Logger, client clientset.Interface, nodeInformer informers.NodeInformer, allocatorParams CIDRAllocatorParams, nodeList *v1.NodeList) (CIDRAllocator, error) {
 nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  AddFunc: controllerutil.CreateAddNodeHandler(func(node *v1.Node) error {
   return ra.AllocateOrOccupyCIDR(logger, node)
  }),
  UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
   if len(newNode.Spec.PodCIDRs) == 0 {
    return ra.AllocateOrOccupyCIDR(logger, newNode)
   }
   return nil
  }),
  DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
   return ra.ReleaseCIDR(logger, node)
  }),
 })

 return ra, nil
}

其實(shí) RangeAllocator 分配器的實(shí)現(xiàn)與寫(xiě) Operator 時(shí)的控制器類(lèi)似,都是通過(guò) informer 來(lái)監(jiān)聽(tīng)資源的變化,然后調(diào)用相應(yīng)的方法。

1.3 更新節(jié)點(diǎn)的 podCIDR

這里比較特殊的是,控制器并不是直接操作資源,而是將變更放到了一個(gè) channel 中,然后通過(guò) goroutine 處理狀態(tài)更新。

func (r *rangeAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node) error {
 allocated := nodeReservedCIDRs{
  nodeName:       node.Name,
  allocatedCIDRs: make([]*net.IPNet, len(r.cidrSets)),
 }

 for idx := range r.cidrSets {
  podCIDR, err := r.cidrSets[idx].AllocateNext()
  allocated.allocatedCIDRs[idx] = podCIDR
 }
 // 將更新的內(nèi)容放入 channel 中
 r.nodeCIDRUpdateChannel <- allocated
 return nil
}

nodeCIDRUpdateChannel 的長(zhǎng)度是 5000。

cidrUpdateQueueSize = 5000
 nodeCIDRUpdateChannel: make(chan nodeReservedCIDRs, cidrUpdateQueueSize),

而更新 Node Spec 的邏輯是通過(guò) 30 個(gè) goroutine 來(lái)處理。

const cidrUpdateWorkers untyped int = 30
 for i := 0; i < cidrUpdateWorkers; i++ {
  go r.worker(ctx)
 }
func (r *rangeAllocator) worker(ctx context.Context) {
 logger := klog.FromContext(ctx)
 for {
  select {
  case workItem, ok := <-r.nodeCIDRUpdateChannel:
   if !ok {
    logger.Info("Channel nodeCIDRUpdateChannel was unexpectedly closed")
    return
   }
   if err := r.updateCIDRsAllocation(logger, workItem); err != nil {
    // Requeue the failed node for update again.
    r.nodeCIDRUpdateChannel <- workItem
   }
  case <-ctx.Done():
   return
  }
 }
}

cidrUpdateRetries = 3 這里會(huì)重試 3 次更新,如果一直更新失敗,會(huì)將節(jié)點(diǎn)重新放入 channel 中,等待下次更新。

// updateCIDRsAllocation assigns CIDR to Node and sends an update to the API server.
func (r *rangeAllocator) updateCIDRsAllocation(logger klog.Logger, data nodeReservedCIDRs) error {
 // If we reached here, it means that the node has no CIDR currently assigned. So we set it.
 for i := 0; i < cidrUpdateRetries; i++ {
  if err = nodeutil.PatchNodeCIDRs(r.client, types.NodeName(node.Name), cidrsString); err == nil {
   logger.Info("Set node PodCIDR", "node", klog.KObj(node), "podCIDRs", cidrsString)
   return nil
  }
 }
 // 放回 pool 中
 controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRAssignmentFailed")
}

使用 Patch 方法更新節(jié)點(diǎn)對(duì)象的 Spec 字段。

func PatchNodeCIDRs(c clientset.Interface, node types.NodeName, cidrs []string) error {
 // set the Pod cidrs list and set the old Pod cidr field
 patch := nodeForCIDRMergePatch{
  Spec: nodeSpecForMergePatch{
   PodCIDR:  cidrs[0],
   PodCIDRs: cidrs,
  },
 }

 patchBytes, err := json.Marshal(&patch)
 if err != nil {
  return fmt.Errorf("failed to json.Marshal CIDR: %v", err)
 }
 if _, err := c.CoreV1().Nodes().Patch(context.TODO(), string(node), types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil {
  return fmt.Errorf("failed to patch node CIDR: %v", err)
 }
 return nil
}

2. kubelet 對(duì)網(wǎng)絡(luò)的配置

圖片圖片

上圖是 Kubelet 創(chuàng)建 Pod 的過(guò)程,這里截取其中對(duì)網(wǎng)絡(luò)配置的部分進(jìn)行分析:

  1. Pod 調(diào)度到某個(gè)節(jié)點(diǎn)上
  2. kubelet 通過(guò) cri 調(diào)用 container runtime 創(chuàng)建 sandbox
  3. container runtime 創(chuàng)建 sandbox
  4. container runtime 調(diào)用 cni 創(chuàng)建 Pod 網(wǎng)絡(luò)
  5. IPAM 對(duì) Pod IP 的管理

下面從源碼實(shí)現(xiàn)的角度來(lái)看看這個(gè)過(guò)程。

2.1 Pod 調(diào)度到某個(gè)節(jié)點(diǎn)上

apiVersion: v1
kind: Pod
metadata:
  labels:
    app: demo
    pod-template-hash: 7b9b5cf76b
  name: demo-7b9b5cf76b-5lpmj
  namespace: default
spec:
  containers:
    - image: hubimage/demo-ubuntu
  nodeName: node1

Kubernetes 中調(diào)度的過(guò)程是 kube-scheduler 根據(jù) Pod 的資源需求和節(jié)點(diǎn)的資源情況,將 Pod 調(diào)度到某個(gè)節(jié)點(diǎn)上,并將調(diào)度結(jié)果寫(xiě)入 pod.spec.nodeName 字段。

這部分不是網(wǎng)絡(luò)的重點(diǎn),之前我也在生產(chǎn)環(huán)境下定制過(guò)調(diào)度器,感興趣的話可以看看Tekton 優(yōu)化之定制集群調(diào)度器 。

2.2 kubelet 調(diào)用 cri 創(chuàng)建 sandbox

SyncPod 是 kubelet 中的核心方法,它會(huì)根據(jù) Pod 的狀態(tài),調(diào)用 cri 創(chuàng)建或刪除 pod。

// SyncPod syncs the running Pod into the desired Pod by executing following steps:
//
// 1.計(jì)算沙箱和容器變化。
// 2. 必要時(shí)關(guān)閉 Pod 沙箱。
// 3. 關(guān)閉任何不應(yīng)運(yùn)行的容器。
// 4.必要時(shí)創(chuàng)建沙箱。
// 5.創(chuàng)建 ephemeral 容器。
// 6. 創(chuàng)建 init 容器。
// 7. 調(diào)整運(yùn)行容器的大?。ㄈ绻?InPlacePodVerticalScaling==true)
// 8. 創(chuàng)建正常容器
func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, Pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
  // Step 4: Create a sandbox for the Pod if necessary.
  podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt)
}

調(diào)用 RuntimeService 接口的 RunPodSandbox 方法創(chuàng)建 sandbox。

// createPodSandbox creates a Pod sandbox and returns (podSandBoxID, message, error).
func (m *kubeGenericRuntimeManager) createPodSandbox(ctx context.Context, Pod *v1.Pod, attempt uint32) (string, string, error) {
 podSandBoxID, err := m.runtimeService.RunPodSandbox(ctx, podSandboxConfig, runtimeHandler)

經(jīng)過(guò) runtimeService、instrumentedRuntimeService 接口的封裝,最終會(huì)調(diào)用 remoteRuntimeService 的 RunPodSandbox 方法。

// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
func (r *remoteRuntimeService) RunPodSandbox(ctx context.Context, config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
 resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
  Config:         config,
  RuntimeHandler: runtimeHandler,
 })

這里的 runtimeClient 是一個(gè) rpc client,通過(guò) rpc 調(diào)用 container runtime 創(chuàng)建 sandbox。

2.3 container runtime 創(chuàng)建 sandbox

以 containerd 為例,創(chuàng)建 sandbox:

func (in *instrumentedService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (res *runtime.RunPodSandboxResponse, err error) {
 if err := in.checkInitialized(); err != nil {
  return nil, err
 }
 res, err = in.c.RunPodSandbox(ctrdutil.WithNamespace(ctx), r)
 return res, errdefs.ToGRPC(err)
}

調(diào)用 CNI 創(chuàng)建網(wǎng)絡(luò),創(chuàng)建 sandbox。

// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (_ *runtime.RunPodSandboxResponse, retErr error) {
 // 生成 sandbox id
 id := util.GenerateID()
 metadata := config.GetMetadata()
 name := makeSandboxName(metadata)

 // 獲取 sandbox 的 oci 運(yùn)行時(shí)
 ociRuntime, err := c.getSandboxRuntime(config, r.GetRuntimeHandler())
 sandboxInfo.Runtime.Name = ociRuntime.Type
 sandboxInfo.Sandboxer = ociRuntime.Sandboxer

 // 創(chuàng)建 sandbox 對(duì)象
 sandbox := sandboxstore.NewSandbox(
  sandboxstore.Metadata{
   ID:             id,
   Name:           name,
   Config:         config,
   RuntimeHandler: r.GetRuntimeHandler(),
  },
  sandboxstore.Status{
   State: sandboxstore.StateUnknown,
  },
 )

 // 調(diào)用 CNI 插件,創(chuàng)建 sandbox 的網(wǎng)絡(luò)
 if !hostNetwork(config) && !userNsEnabled {
  var netnsMountDir = "/var/run/netns"
  sandbox.NetNS, err = netns.NewNetNS(netnsMountDir)
  // Save sandbox metadata to store
  if err := c.setupPodNetwork(ctx, &sandbox); err != nil {
   return nil, fmt.Errorf("failed to setup network for sandbox %q: %w", id, err)
  }
 }

 // 創(chuàng)建 sandbox
 err = c.nri.RunPodSandbox(ctx, &sandbox)
}

2.4 container runtime 調(diào)用 cni 創(chuàng)建 Pod 網(wǎng)絡(luò)

在上一步驟中,調(diào)用 RunPodSandbox 創(chuàng)建 sandbox 之前,會(huì)先調(diào)用 setupPodNetwork 配置網(wǎng)絡(luò)。這里展開(kāi)看一下 setupPodNetwork 的實(shí)現(xiàn)。

func (c *criService) setupPodNetwork(ctx context.Context, sandbox *sandboxstore.Sandbox) error {
 var (
  id        = sandbox.ID
  config    = sandbox.Config
  path      = sandbox.NetNSPath
  netPlugin = c.getNetworkPlugin(sandbox.RuntimeHandler)
  err       error
  result    *cni.Result
 )
 if c.config.CniConfig.NetworkPluginSetupSerially {
  result, err = netPlugin.SetupSerially(ctx, id, path, opts...)
 } else {
  result, err = netPlugin.Setup(ctx, id, path, opts...)
 }
}

libcni 實(shí)現(xiàn)了 netPlugin 接口

// containerd/go-cni/cni.go
func (c *libcni) Setup(ctx context.Context, id string, path string, opts ...NamespaceOpts) (*Result, error) {
 if err := c.Status(); err != nil {
  return nil, err
 }
 // 建一個(gè)新的網(wǎng)絡(luò)命名空間
 ns, err := newNamespace(id, path, opts...)
 if err != nil {
  return nil, err
 }
 // 調(diào)用 CNI 插件
 result, err := c.attachNetworks(ctx, ns)
 if err != nil {
  return nil, err
 }
 return c.createResult(result)
}

attachNetworks 起了很多協(xié)程,每個(gè)協(xié)程調(diào)用 asynchAttach 方法,asynchAttach 方法調(diào)用 Attach 方法。

func (c *libcni) attachNetworks(ctx context.Context, ns *Namespace) ([]*types100.Result, error) {
 var wg sync.WaitGroup
 var firstError error
 results := make([]*types100.Result, len(c.Networks()))
 rc := make(chan asynchAttachResult)

 for i, network := range c.Networks() {
  wg.Add(1)
  go asynchAttach(ctx, i, network, ns, &wg, rc)
 }

 for range c.Networks() {
  rs := <-rc
  if rs.err != nil && firstError == nil {
   firstError = rs.err
  }
  results[rs.index] = rs.res
 }
 wg.Wait()

 return results, firstError
}

運(yùn)行了很多協(xié)程調(diào)用 CNI,但 rc channel 的長(zhǎng)度為 1,處理結(jié)果時(shí)卻一個(gè)一個(gè)的。

func asynchAttach(ctx context.Context, index int, n *Network, ns *Namespace, wg *sync.WaitGroup, rc chan asynchAttachResult) {
 defer wg.Done()
 r, err := n.Attach(ctx, ns)
 rc <- asynchAttachResult{index: index, res: r, err: err}
}

Attach 方法中才真正開(kāi)始調(diào)用 CNI 插件。

func (n *Network) Attach(ctx context.Context, ns *Namespace) (*types100.Result, error) {
 r, err := n.cni.AddNetworkList(ctx, n.config, ns.config(n.ifName))
 if err != nil {
  return nil, err
 }
 return types100.NewResultFromResult(r)
}

在 https://github.com/containernetworking/cni/blob/main/libcni/api.go 中 CNI 接口定義了很多方法,其中最重要的是 AddNetwork 和 DelNetwork 方法,帶 List 的方法是批量操作。

type CNI interface {
 AddNetworkList(ctx context.Context, net *NetworkConfigList, rt *RuntimeConf) (types.Result, error)
 AddNetwork(ctx context.Context, net *NetworkConfig, rt *RuntimeConf) (types.Result, error)
 DelNetworkList(ctx context.Context, net *NetworkConfigList, rt *RuntimeConf) error
 DelNetwork(ctx context.Context, net *NetworkConfig, rt *RuntimeConf) error
}

AddNetwork 用于為容器添加網(wǎng)絡(luò)接口,在主機(jī)上創(chuàng)建 veth 網(wǎng)卡綁定到容器的 ech0 網(wǎng)卡上。DelNetwork 用于在容器刪除時(shí),清理容器相關(guān)的網(wǎng)絡(luò)配置。

CNI 調(diào)用插件的核心是 Exec 接口,直接調(diào)用二進(jìn)制程序。

type Exec interface {
 ExecPlugin(ctx context.Context, pluginPath string, stdinData []byte, environ []string) ([]byte, error)
 FindInPath(plugin string, paths []string) (string, error)
 Decode(jsonBytes []byte) (version.PluginInfo, error)
}

CRI 以標(biāo)準(zhǔn)輸入、環(huán)境變量的形式將網(wǎng)絡(luò)配置信息傳遞給 CNI 插件。CNI 插件處理完成之后,將網(wǎng)絡(luò)配置信息寫(xiě)入到標(biāo)準(zhǔn)輸出中,CRI 將標(biāo)準(zhǔn)輸出中的網(wǎng)絡(luò)配置信息解析出來(lái),寫(xiě)入到容器的網(wǎng)絡(luò)配置文件中。

再回到 container runtime 的實(shí)現(xiàn) containerd:

/usr/bin/containerd config  dump |grep cni

    [plugins."io.containerd.grpc.v1.cri".cni]
      bin_dir = "/opt/cni/bin"
      conf_dir = "/etc/cni/net.d"

這里的 /etc/cni/net.d 是 CNI 網(wǎng)絡(luò)配置文件的默認(rèn)存放路徑,/opt/cni/bin 是 CNI 網(wǎng)絡(luò)插件的默認(rèn)搜索路徑。

ls /opt/cni/bin

bandwidth  calico       cilium-cni  firewall  host-device  install  loopback  portmap  sbr     tuning  vrf
bridge     calico-IPAM  dhcp        flannel   host-local   ipvlan   macvlan   ptp      static  vlan
cat /etc/cni/net.d/05-cilium.conf
{
  "cniVersion": "0.3.1",
  "name": "cilium",
  "type": "cilium-cni",
  "enable-debug": false
}

這些配置用來(lái)初始化 CRI 獲取 CNI 插件的 netPlugin map[string]cni.CNI 結(jié)構(gòu)。

2.5 IPAM 對(duì) Pod IP 的管理

IPAM 是 IP Address Management 的縮寫(xiě),負(fù)責(zé)為容器分配 ip 地址。IPAM 組件通常是一個(gè)獨(dú)立的二進(jìn)制文件,也可以直接由 CNI 插件實(shí)現(xiàn)。在 https://github.com/containernetworking/plugins/tree/main/plugins/ipam 中,目前有三種實(shí)現(xiàn) host-local、dhcp、static。 這里以 host-local 為例:

  • 查看 CNI 的配置文件
cat /etc/cni/net.d/10-cni.conflist

{
  "name": "networks",
  "type": "cni",
  "ipam": {
    "type": "host-local",
    "subnet": "10.234.58.0/24",
    "routes": [{ "dst": "0.0.0.0/0" }]
  }
}

指定了 CNI 插件的類(lèi)型為 host-local,指定了 Pod IP 的網(wǎng)段為 "10.234.58.0/24" 。

  • 查看 CNI 插件的存儲(chǔ)目錄
ls /var/lib/cni/networks

10.234.58.76  10.234.58.87  last_reserved_ip.0  lock
cat 10.234.58.76

b3b668af977bbeca6853122514044865793c056e81cccebf115dacffd25a8bcc

這里有一組以 ip 命名的文件,而文件里面又是一串字符串。那么這些到底是什么呢?

  • 以 ip 命名的文件是如何生成的

申請(qǐng)一個(gè) Pod IP 時(shí),先獲取一個(gè)可用 ip

func cmdAdd(args *skel.CmdArgs) error {
 for idx, rangeset := range ipamConf.Ranges {
  ipConf, err := allocator.Get(args.ContainerID, args.IfName, requestedIP)
 }
}

獲取到可用 ip 之后,先嘗試著存儲(chǔ)到本地目錄文件中

func (a *IPAllocator) Get(id string, ifname string, requestedIP net.IP) (*current.IPConfig, error) {
 for {
  reservedIP, gw = iter.Next()
  reserved, err := a.store.Reserve(id, ifname, reservedIP.IP, a.rangeID)
 }
}

直接寫(xiě)本地文件目錄

func (s *Store) Reserve(id string, ifname string, ip net.IP, rangeID string) (bool, error) {
 fname := GetEscapedPath(s.dataDir, ip.String())

 f, err := os.OpenFile(fname, os.O_RDWR|os.O_EXCL|os.O_CREATE, 0o600)
 if os.IsExist(err) {
  return false, nil
 }
 if _, err := f.WriteString(strings.TrimSpace(id) + LineBreak + ifname); err != nil {
  f.Close()
  os.Remove(f.Name())
  return false, err
 }
}

寫(xiě)入的內(nèi)容為 strings.TrimSpace(id) + LineBreak + ifname,這里的 id 其實(shí)是容器的 id,ifname 是網(wǎng)卡名稱,LineBreak 是換行符。

通過(guò) id 在主機(jī)上可以找到對(duì)應(yīng)的容器:

docker ps |grep b3b668

b3b668af977b   k8s.gcr.io/pause:3.5                      "/pause"                 6 weeks ago    Up 6 weeks              k8s_POD_xxx-5b795fd7dd-82hrh_kube-system_b127b65c-f0ca-48a7-9020-ada60dfa535a_0
  • last_reserved_ip.0 文件的用途
cat last_reserved_ip.0

10.234.58.87

在獲取可用 IP 時(shí),IPAM 會(huì)創(chuàng)建一個(gè)迭代器。

func (a *IPAllocator) Get(id string, ifname string, requestedIP net.IP) (*current.IPConfig, error) {
    iter, err := a.GetIter()
    if err != nil {
        return nil, err
    }
    for {
        reservedIP, gw = iter.Next()
        if reservedIP == nil {
            break
        }
    }

而迭代器需要依靠 last_reserved_ip.0 找到上一次分配的 IP,然后從這個(gè) IP 之后開(kāi)始分配。

func (a *IPAllocator) GetIter() (*RangeIter, error) {
 lastReservedIP, err := a.store.LastReservedIP(a.rangeID)
 if err != nil && !os.IsNotExist(err) {
  log.Printf("Error retrieving last reserved ip: %v", err)
 } else if lastReservedIP != nil {
  startFromLastReservedIP = a.rangeset.Contains(lastReservedIP)
 }

這里的 lastIPFilePrefix = "last_reserved_ip."

func (s *Store) LastReservedIP(rangeID string) (net.IP, error) {
 ipfile := GetEscapedPath(s.dataDir, lastIPFilePrefix+rangeID)
 data, err := os.ReadFile(ipfile)
 if err != nil {
  return nil, err
 }
 return net.ParseIP(string(data)), nil
}

host-local 分配 ip 時(shí)是按照輪詢的方式,遞增分配,如果分配到最后一個(gè) IP,就又從頭開(kāi)始分配。

  • lock 文件
type Store struct {
 *FileLock
 dataDir string
}

每次存儲(chǔ)操作都會(huì)進(jìn)行加鎖,IP 分配不會(huì)并發(fā)進(jìn)行,確保唯一性。

a.store.Lock()
defer a.store.Unlock()

3. 總結(jié)

本篇主要是從 Pod IP 管理的角度,梳理了一下從 kube-controller-manager 到 kubelet 的 Pod IP 管理過(guò)程。主要內(nèi)容如下:

  • kube-controller-manager 通過(guò) NodeIpamController 控制器為每個(gè)節(jié)點(diǎn)分配 Pod IP 網(wǎng)段,在集群規(guī)劃時(shí)需要根據(jù)集群規(guī)模調(diào)整 cluster-cidr、node-cidr-mask-size 參數(shù)
  • kubelet 通過(guò) cri 調(diào)用 container runtime 創(chuàng)建 sandbox
  • container runtime 調(diào)用 cni 創(chuàng)建 Pod 網(wǎng)絡(luò)
  • IPAM 對(duì) Pod IP 的管理

在工作中很多熟悉的路徑,可能僅僅只是知道大概的流程,不知道具體的實(shí)現(xiàn)。通過(guò)源碼分析,可以更加深入地了解相關(guān)的細(xì)節(jié),也能學(xué)習(xí)到新的知識(shí)。

比如,在源碼中,我看到了 InPlacePodVerticalScaling 這個(gè)參數(shù),發(fā)現(xiàn)是 Kubernetes 1.27 的一個(gè) alpha 特性,可以在不重啟 Pod 的情況下,調(diào)整 Pod 的資源配置;在寫(xiě) Operator 更新 CR 狀態(tài)時(shí),在合適的場(chǎng)景下,可以學(xué)習(xí) nodeCIDRUpdateChannel 的實(shí)現(xiàn),將更新的狀態(tài)放入 channel 中,然后通過(guò) goroutine 處理狀態(tài)更新。

責(zé)任編輯:武曉燕 來(lái)源: 陳少文
相關(guān)推薦

2020-11-30 12:15:26

KubernetesPodLinux

2022-04-09 15:26:46

Kubernetes刪除操作源碼解析

2024-04-15 05:00:00

kubernete網(wǎng)絡(luò)容器

2023-02-09 16:47:34

KubernetesPod優(yōu)先級(jí)

2010-02-06 13:28:31

Android源碼

2009-07-27 09:14:08

網(wǎng)絡(luò)管理IP電話管理

2021-02-22 08:29:03

KubernetesKubectl Fla應(yīng)用

2023-10-19 19:42:25

IstioPodkubernetes

2023-03-21 15:26:02

Kubernetes容器開(kāi)發(fā)

2024-05-23 08:40:46

Kubernetes預(yù)過(guò)濾調(diào)度

2022-01-06 07:06:52

KubernetesResourceAPI

2021-01-28 10:55:47

Kubernetes IPLinux

2021-09-22 08:37:02

pod源碼分析kubernetes

2024-06-19 09:33:05

2019-11-20 09:15:53

KubernetesPod

2021-11-22 08:00:00

Kubernetes容器集群

2021-08-10 07:00:00

Nacos Clien服務(wù)分析

2020-04-26 11:16:46

KubernetesPodLinux

2020-07-06 07:52:10

Kubernetes網(wǎng)絡(luò)通信

2020-04-10 08:00:08

Kubernetes補(bǔ)丁pod
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)