源碼分析 Kubernetes 對(duì) Pod IP 的管理
1. kube-controller-manager 對(duì)網(wǎng)段的管理
在 kube-controller-manager 有眾多控制器,與 Pod IP 相關(guān)的是 NodeIpamController。
NodeIpamController 控制器主要是管理節(jié)點(diǎn)的 podcidr,當(dāng)有新節(jié)點(diǎn)加入集群時(shí),分配一個(gè)子網(wǎng)段給節(jié)點(diǎn);當(dāng)節(jié)點(diǎn)刪除時(shí),回收子網(wǎng)段。
每個(gè)節(jié)點(diǎn)的子網(wǎng)段不會(huì)重疊,每個(gè)節(jié)點(diǎn)都能夠獨(dú)立地完成 Pod IP 的分配。
下面看一個(gè) kube-controller-manager 的運(yùn)行示例:
kubectl -n kube-system get pod kube-controller-manager -o yaml
其中關(guān)于網(wǎng)段配置的部分為:
spec:
containers:
- command:
- kube-controller-manager
- --allocate-node-cidrs=true
- --cluster-cidr=10.234.0.0/16
- --node-cidr-mask-size=24
- --service-cluster-ip-range=10.96.0.0/16
cluster-cidr 指定了 Pod IP 的范圍,掩碼位數(shù) 16,如果不考慮保留 IP,意味著集群最多可以容納 2^16 = 65536 個(gè) pod。
這些 Pod 分布在若干個(gè)節(jié)點(diǎn)上,接著看 node-cidr-mask-size 為 24,每個(gè)節(jié)點(diǎn)只剩下 32-24=8 位留給 pod,每個(gè)節(jié)點(diǎn)最多能創(chuàng)建 2^8=256 個(gè) pod。
相應(yīng)的,這個(gè)集群能夠容納的節(jié)點(diǎn)數(shù)量為 2^(32-16-8)=256 個(gè)節(jié)點(diǎn)。
在規(guī)劃集群時(shí),需要根據(jù)集群的規(guī)模來(lái)調(diào)整這兩個(gè)參數(shù)。
開(kāi)啟 allocate-node-cidrs、設(shè)置 cluster-cidr 之后,kube-controller-manager 會(huì)給每個(gè)節(jié)點(diǎn)分配子網(wǎng)段,將結(jié)果寫(xiě)入 spec.podCIDR 字段。
spec:
podCIDR: 10.234.58.0/24
podCIDRs:
- 10.234.58.0/24
下面我們從源碼分析一下這一過(guò)程。
1. 啟動(dòng) NodeIpamController
func startNodeIpamController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
// 如果 allocate-node-cidrs 沒(méi)有開(kāi)啟會(huì)立即返回
if !controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs {
return nil, false, nil
}
// 獲取 clusterCIDR, serviceCIDR 啟動(dòng) NodeIpamController
nodeIpamController, err := nodeipamcontroller.NewNodeIpamController(
ctx,
controllerContext.InformerFactory.Core().V1().Nodes(),
clusterCIDRInformer,
controllerContext.Cloud,
controllerContext.ClientBuilder.ClientOrDie("node-controller"),
clusterCIDRs,
serviceCIDR,
secondaryServiceCIDR,
nodeCIDRMaskSizes,
ipam.CIDRAllocatorType(controllerContext.ComponentConfig.KubeCloudShared.CIDRAllocatorType),
)
go nodeIpamController.RunWithMetrics(ctx, controllerContext.ControllerManagerMetrics)
return nil, true, nil
}
RunWithMetrics 只是提供了一些監(jiān)控指標(biāo),真正的啟動(dòng)邏輯在 Run 方法中。
func (nc *Controller) RunWithMetrics(ctx context.Context, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
controllerManagerMetrics.ControllerStarted("nodeipam")
defer controllerManagerMetrics.ControllerStopped("nodeipam")
nc.Run(ctx)
}
func (nc *Controller) Run(ctx context.Context) {
if nc.allocatorType == ipam.IPAMFromClusterAllocatorType || nc.allocatorType == ipam.IPAMFromCloudAllocatorType {
go nc.legacyIPAM.Run(ctx)
} else {
go nc.cidrAllocator.Run(ctx)
}
<-ctx.Done()
}
1.2 監(jiān)聽(tīng)節(jié)點(diǎn)變化
在查找 cidrAllocator 接口實(shí)現(xiàn)的時(shí)候,我發(fā)現(xiàn)了三種 CIDR 分配器,分別是 RangeAllocator 適用單網(wǎng)段分配、MultiCIDRRangeAllocator 適用于多 CIDR、CloudCIDRAllocator 適用于對(duì)接云廠。
func New(ctx context.Context, kubeClient clientset.Interface, cloud cloudprovider.Interface, nodeInformer informers.NodeInformer, clusterCIDRInformer networkinginformers.ClusterCIDRInformer, allocatorType CIDRAllocatorType, allocatorParams CIDRAllocatorParams) (CIDRAllocator, error) {
switch allocatorType {
case RangeAllocatorType:
return NewCIDRRangeAllocator(logger, kubeClient, nodeInformer, allocatorParams, nodeList)
case MultiCIDRRangeAllocatorType:
if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRRangeAllocator) {
return nil, fmt.Errorf("invalid CIDR allocator type: %v, feature gate %v must be enabled", allocatorType, features.MultiCIDRRangeAllocator)
}
return NewMultiCIDRRangeAllocator(ctx, kubeClient, nodeInformer, clusterCIDRInformer, allocatorParams, nodeList, nil)
case CloudAllocatorType:
return NewCloudCIDRAllocator(logger, kubeClient, cloud, nodeInformer)
default:
return nil, fmt.Errorf("invalid CIDR allocator type: %v", allocatorType)
}
}
這里看看 RangeAllocator 的實(shí)現(xiàn)。
func NewCIDRRangeAllocator(logger klog.Logger, client clientset.Interface, nodeInformer informers.NodeInformer, allocatorParams CIDRAllocatorParams, nodeList *v1.NodeList) (CIDRAllocator, error) {
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controllerutil.CreateAddNodeHandler(func(node *v1.Node) error {
return ra.AllocateOrOccupyCIDR(logger, node)
}),
UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
if len(newNode.Spec.PodCIDRs) == 0 {
return ra.AllocateOrOccupyCIDR(logger, newNode)
}
return nil
}),
DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
return ra.ReleaseCIDR(logger, node)
}),
})
return ra, nil
}
其實(shí) RangeAllocator 分配器的實(shí)現(xiàn)與寫(xiě) Operator 時(shí)的控制器類(lèi)似,都是通過(guò) informer 來(lái)監(jiān)聽(tīng)資源的變化,然后調(diào)用相應(yīng)的方法。
1.3 更新節(jié)點(diǎn)的 podCIDR
這里比較特殊的是,控制器并不是直接操作資源,而是將變更放到了一個(gè) channel 中,然后通過(guò) goroutine 處理狀態(tài)更新。
func (r *rangeAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node) error {
allocated := nodeReservedCIDRs{
nodeName: node.Name,
allocatedCIDRs: make([]*net.IPNet, len(r.cidrSets)),
}
for idx := range r.cidrSets {
podCIDR, err := r.cidrSets[idx].AllocateNext()
allocated.allocatedCIDRs[idx] = podCIDR
}
// 將更新的內(nèi)容放入 channel 中
r.nodeCIDRUpdateChannel <- allocated
return nil
}
nodeCIDRUpdateChannel 的長(zhǎng)度是 5000。
cidrUpdateQueueSize = 5000
nodeCIDRUpdateChannel: make(chan nodeReservedCIDRs, cidrUpdateQueueSize),
而更新 Node Spec 的邏輯是通過(guò) 30 個(gè) goroutine 來(lái)處理。
const cidrUpdateWorkers untyped int = 30
for i := 0; i < cidrUpdateWorkers; i++ {
go r.worker(ctx)
}
func (r *rangeAllocator) worker(ctx context.Context) {
logger := klog.FromContext(ctx)
for {
select {
case workItem, ok := <-r.nodeCIDRUpdateChannel:
if !ok {
logger.Info("Channel nodeCIDRUpdateChannel was unexpectedly closed")
return
}
if err := r.updateCIDRsAllocation(logger, workItem); err != nil {
// Requeue the failed node for update again.
r.nodeCIDRUpdateChannel <- workItem
}
case <-ctx.Done():
return
}
}
}
cidrUpdateRetries = 3 這里會(huì)重試 3 次更新,如果一直更新失敗,會(huì)將節(jié)點(diǎn)重新放入 channel 中,等待下次更新。
// updateCIDRsAllocation assigns CIDR to Node and sends an update to the API server.
func (r *rangeAllocator) updateCIDRsAllocation(logger klog.Logger, data nodeReservedCIDRs) error {
// If we reached here, it means that the node has no CIDR currently assigned. So we set it.
for i := 0; i < cidrUpdateRetries; i++ {
if err = nodeutil.PatchNodeCIDRs(r.client, types.NodeName(node.Name), cidrsString); err == nil {
logger.Info("Set node PodCIDR", "node", klog.KObj(node), "podCIDRs", cidrsString)
return nil
}
}
// 放回 pool 中
controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRAssignmentFailed")
}
使用 Patch 方法更新節(jié)點(diǎn)對(duì)象的 Spec 字段。
func PatchNodeCIDRs(c clientset.Interface, node types.NodeName, cidrs []string) error {
// set the Pod cidrs list and set the old Pod cidr field
patch := nodeForCIDRMergePatch{
Spec: nodeSpecForMergePatch{
PodCIDR: cidrs[0],
PodCIDRs: cidrs,
},
}
patchBytes, err := json.Marshal(&patch)
if err != nil {
return fmt.Errorf("failed to json.Marshal CIDR: %v", err)
}
if _, err := c.CoreV1().Nodes().Patch(context.TODO(), string(node), types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil {
return fmt.Errorf("failed to patch node CIDR: %v", err)
}
return nil
}
2. kubelet 對(duì)網(wǎng)絡(luò)的配置
圖片
上圖是 Kubelet 創(chuàng)建 Pod 的過(guò)程,這里截取其中對(duì)網(wǎng)絡(luò)配置的部分進(jìn)行分析:
- Pod 調(diào)度到某個(gè)節(jié)點(diǎn)上
- kubelet 通過(guò) cri 調(diào)用 container runtime 創(chuàng)建 sandbox
- container runtime 創(chuàng)建 sandbox
- container runtime 調(diào)用 cni 創(chuàng)建 Pod 網(wǎng)絡(luò)
- IPAM 對(duì) Pod IP 的管理
下面從源碼實(shí)現(xiàn)的角度來(lái)看看這個(gè)過(guò)程。
2.1 Pod 調(diào)度到某個(gè)節(jié)點(diǎn)上
apiVersion: v1
kind: Pod
metadata:
labels:
app: demo
pod-template-hash: 7b9b5cf76b
name: demo-7b9b5cf76b-5lpmj
namespace: default
spec:
containers:
- image: hubimage/demo-ubuntu
nodeName: node1
Kubernetes 中調(diào)度的過(guò)程是 kube-scheduler 根據(jù) Pod 的資源需求和節(jié)點(diǎn)的資源情況,將 Pod 調(diào)度到某個(gè)節(jié)點(diǎn)上,并將調(diào)度結(jié)果寫(xiě)入 pod.spec.nodeName 字段。
這部分不是網(wǎng)絡(luò)的重點(diǎn),之前我也在生產(chǎn)環(huán)境下定制過(guò)調(diào)度器,感興趣的話可以看看Tekton 優(yōu)化之定制集群調(diào)度器 。
2.2 kubelet 調(diào)用 cri 創(chuàng)建 sandbox
SyncPod 是 kubelet 中的核心方法,它會(huì)根據(jù) Pod 的狀態(tài),調(diào)用 cri 創(chuàng)建或刪除 pod。
// SyncPod syncs the running Pod into the desired Pod by executing following steps:
//
// 1.計(jì)算沙箱和容器變化。
// 2. 必要時(shí)關(guān)閉 Pod 沙箱。
// 3. 關(guān)閉任何不應(yīng)運(yùn)行的容器。
// 4.必要時(shí)創(chuàng)建沙箱。
// 5.創(chuàng)建 ephemeral 容器。
// 6. 創(chuàng)建 init 容器。
// 7. 調(diào)整運(yùn)行容器的大?。ㄈ绻?InPlacePodVerticalScaling==true)
// 8. 創(chuàng)建正常容器
func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, Pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
// Step 4: Create a sandbox for the Pod if necessary.
podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt)
}
調(diào)用 RuntimeService 接口的 RunPodSandbox 方法創(chuàng)建 sandbox。
// createPodSandbox creates a Pod sandbox and returns (podSandBoxID, message, error).
func (m *kubeGenericRuntimeManager) createPodSandbox(ctx context.Context, Pod *v1.Pod, attempt uint32) (string, string, error) {
podSandBoxID, err := m.runtimeService.RunPodSandbox(ctx, podSandboxConfig, runtimeHandler)
經(jīng)過(guò) runtimeService、instrumentedRuntimeService 接口的封裝,最終會(huì)調(diào)用 remoteRuntimeService 的 RunPodSandbox 方法。
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
func (r *remoteRuntimeService) RunPodSandbox(ctx context.Context, config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
Config: config,
RuntimeHandler: runtimeHandler,
})
這里的 runtimeClient 是一個(gè) rpc client,通過(guò) rpc 調(diào)用 container runtime 創(chuàng)建 sandbox。
2.3 container runtime 創(chuàng)建 sandbox
以 containerd 為例,創(chuàng)建 sandbox:
func (in *instrumentedService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (res *runtime.RunPodSandboxResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
res, err = in.c.RunPodSandbox(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
}
調(diào)用 CNI 創(chuàng)建網(wǎng)絡(luò),創(chuàng)建 sandbox。
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (_ *runtime.RunPodSandboxResponse, retErr error) {
// 生成 sandbox id
id := util.GenerateID()
metadata := config.GetMetadata()
name := makeSandboxName(metadata)
// 獲取 sandbox 的 oci 運(yùn)行時(shí)
ociRuntime, err := c.getSandboxRuntime(config, r.GetRuntimeHandler())
sandboxInfo.Runtime.Name = ociRuntime.Type
sandboxInfo.Sandboxer = ociRuntime.Sandboxer
// 創(chuàng)建 sandbox 對(duì)象
sandbox := sandboxstore.NewSandbox(
sandboxstore.Metadata{
ID: id,
Name: name,
Config: config,
RuntimeHandler: r.GetRuntimeHandler(),
},
sandboxstore.Status{
State: sandboxstore.StateUnknown,
},
)
// 調(diào)用 CNI 插件,創(chuàng)建 sandbox 的網(wǎng)絡(luò)
if !hostNetwork(config) && !userNsEnabled {
var netnsMountDir = "/var/run/netns"
sandbox.NetNS, err = netns.NewNetNS(netnsMountDir)
// Save sandbox metadata to store
if err := c.setupPodNetwork(ctx, &sandbox); err != nil {
return nil, fmt.Errorf("failed to setup network for sandbox %q: %w", id, err)
}
}
// 創(chuàng)建 sandbox
err = c.nri.RunPodSandbox(ctx, &sandbox)
}
2.4 container runtime 調(diào)用 cni 創(chuàng)建 Pod 網(wǎng)絡(luò)
在上一步驟中,調(diào)用 RunPodSandbox 創(chuàng)建 sandbox 之前,會(huì)先調(diào)用 setupPodNetwork 配置網(wǎng)絡(luò)。這里展開(kāi)看一下 setupPodNetwork 的實(shí)現(xiàn)。
func (c *criService) setupPodNetwork(ctx context.Context, sandbox *sandboxstore.Sandbox) error {
var (
id = sandbox.ID
config = sandbox.Config
path = sandbox.NetNSPath
netPlugin = c.getNetworkPlugin(sandbox.RuntimeHandler)
err error
result *cni.Result
)
if c.config.CniConfig.NetworkPluginSetupSerially {
result, err = netPlugin.SetupSerially(ctx, id, path, opts...)
} else {
result, err = netPlugin.Setup(ctx, id, path, opts...)
}
}
libcni 實(shí)現(xiàn)了 netPlugin 接口
// containerd/go-cni/cni.go
func (c *libcni) Setup(ctx context.Context, id string, path string, opts ...NamespaceOpts) (*Result, error) {
if err := c.Status(); err != nil {
return nil, err
}
// 建一個(gè)新的網(wǎng)絡(luò)命名空間
ns, err := newNamespace(id, path, opts...)
if err != nil {
return nil, err
}
// 調(diào)用 CNI 插件
result, err := c.attachNetworks(ctx, ns)
if err != nil {
return nil, err
}
return c.createResult(result)
}
attachNetworks 起了很多協(xié)程,每個(gè)協(xié)程調(diào)用 asynchAttach 方法,asynchAttach 方法調(diào)用 Attach 方法。
func (c *libcni) attachNetworks(ctx context.Context, ns *Namespace) ([]*types100.Result, error) {
var wg sync.WaitGroup
var firstError error
results := make([]*types100.Result, len(c.Networks()))
rc := make(chan asynchAttachResult)
for i, network := range c.Networks() {
wg.Add(1)
go asynchAttach(ctx, i, network, ns, &wg, rc)
}
for range c.Networks() {
rs := <-rc
if rs.err != nil && firstError == nil {
firstError = rs.err
}
results[rs.index] = rs.res
}
wg.Wait()
return results, firstError
}
運(yùn)行了很多協(xié)程調(diào)用 CNI,但 rc channel 的長(zhǎng)度為 1,處理結(jié)果時(shí)卻一個(gè)一個(gè)的。
func asynchAttach(ctx context.Context, index int, n *Network, ns *Namespace, wg *sync.WaitGroup, rc chan asynchAttachResult) {
defer wg.Done()
r, err := n.Attach(ctx, ns)
rc <- asynchAttachResult{index: index, res: r, err: err}
}
Attach 方法中才真正開(kāi)始調(diào)用 CNI 插件。
func (n *Network) Attach(ctx context.Context, ns *Namespace) (*types100.Result, error) {
r, err := n.cni.AddNetworkList(ctx, n.config, ns.config(n.ifName))
if err != nil {
return nil, err
}
return types100.NewResultFromResult(r)
}
在 https://github.com/containernetworking/cni/blob/main/libcni/api.go 中 CNI 接口定義了很多方法,其中最重要的是 AddNetwork 和 DelNetwork 方法,帶 List 的方法是批量操作。
type CNI interface {
AddNetworkList(ctx context.Context, net *NetworkConfigList, rt *RuntimeConf) (types.Result, error)
AddNetwork(ctx context.Context, net *NetworkConfig, rt *RuntimeConf) (types.Result, error)
DelNetworkList(ctx context.Context, net *NetworkConfigList, rt *RuntimeConf) error
DelNetwork(ctx context.Context, net *NetworkConfig, rt *RuntimeConf) error
}
AddNetwork 用于為容器添加網(wǎng)絡(luò)接口,在主機(jī)上創(chuàng)建 veth 網(wǎng)卡綁定到容器的 ech0 網(wǎng)卡上。DelNetwork 用于在容器刪除時(shí),清理容器相關(guān)的網(wǎng)絡(luò)配置。
CNI 調(diào)用插件的核心是 Exec 接口,直接調(diào)用二進(jìn)制程序。
type Exec interface {
ExecPlugin(ctx context.Context, pluginPath string, stdinData []byte, environ []string) ([]byte, error)
FindInPath(plugin string, paths []string) (string, error)
Decode(jsonBytes []byte) (version.PluginInfo, error)
}
CRI 以標(biāo)準(zhǔn)輸入、環(huán)境變量的形式將網(wǎng)絡(luò)配置信息傳遞給 CNI 插件。CNI 插件處理完成之后,將網(wǎng)絡(luò)配置信息寫(xiě)入到標(biāo)準(zhǔn)輸出中,CRI 將標(biāo)準(zhǔn)輸出中的網(wǎng)絡(luò)配置信息解析出來(lái),寫(xiě)入到容器的網(wǎng)絡(luò)配置文件中。
再回到 container runtime 的實(shí)現(xiàn) containerd:
/usr/bin/containerd config dump |grep cni
[plugins."io.containerd.grpc.v1.cri".cni]
bin_dir = "/opt/cni/bin"
conf_dir = "/etc/cni/net.d"
這里的 /etc/cni/net.d 是 CNI 網(wǎng)絡(luò)配置文件的默認(rèn)存放路徑,/opt/cni/bin 是 CNI 網(wǎng)絡(luò)插件的默認(rèn)搜索路徑。
ls /opt/cni/bin
bandwidth calico cilium-cni firewall host-device install loopback portmap sbr tuning vrf
bridge calico-IPAM dhcp flannel host-local ipvlan macvlan ptp static vlan
cat /etc/cni/net.d/05-cilium.conf
{
"cniVersion": "0.3.1",
"name": "cilium",
"type": "cilium-cni",
"enable-debug": false
}
這些配置用來(lái)初始化 CRI 獲取 CNI 插件的 netPlugin map[string]cni.CNI 結(jié)構(gòu)。
2.5 IPAM 對(duì) Pod IP 的管理
IPAM 是 IP Address Management 的縮寫(xiě),負(fù)責(zé)為容器分配 ip 地址。IPAM 組件通常是一個(gè)獨(dú)立的二進(jìn)制文件,也可以直接由 CNI 插件實(shí)現(xiàn)。在 https://github.com/containernetworking/plugins/tree/main/plugins/ipam 中,目前有三種實(shí)現(xiàn) host-local、dhcp、static。 這里以 host-local 為例:
- 查看 CNI 的配置文件
cat /etc/cni/net.d/10-cni.conflist
{
"name": "networks",
"type": "cni",
"ipam": {
"type": "host-local",
"subnet": "10.234.58.0/24",
"routes": [{ "dst": "0.0.0.0/0" }]
}
}
指定了 CNI 插件的類(lèi)型為 host-local,指定了 Pod IP 的網(wǎng)段為 "10.234.58.0/24" 。
- 查看 CNI 插件的存儲(chǔ)目錄
ls /var/lib/cni/networks
10.234.58.76 10.234.58.87 last_reserved_ip.0 lock
cat 10.234.58.76
b3b668af977bbeca6853122514044865793c056e81cccebf115dacffd25a8bcc
這里有一組以 ip 命名的文件,而文件里面又是一串字符串。那么這些到底是什么呢?
- 以 ip 命名的文件是如何生成的
申請(qǐng)一個(gè) Pod IP 時(shí),先獲取一個(gè)可用 ip
func cmdAdd(args *skel.CmdArgs) error {
for idx, rangeset := range ipamConf.Ranges {
ipConf, err := allocator.Get(args.ContainerID, args.IfName, requestedIP)
}
}
獲取到可用 ip 之后,先嘗試著存儲(chǔ)到本地目錄文件中
func (a *IPAllocator) Get(id string, ifname string, requestedIP net.IP) (*current.IPConfig, error) {
for {
reservedIP, gw = iter.Next()
reserved, err := a.store.Reserve(id, ifname, reservedIP.IP, a.rangeID)
}
}
直接寫(xiě)本地文件目錄
func (s *Store) Reserve(id string, ifname string, ip net.IP, rangeID string) (bool, error) {
fname := GetEscapedPath(s.dataDir, ip.String())
f, err := os.OpenFile(fname, os.O_RDWR|os.O_EXCL|os.O_CREATE, 0o600)
if os.IsExist(err) {
return false, nil
}
if _, err := f.WriteString(strings.TrimSpace(id) + LineBreak + ifname); err != nil {
f.Close()
os.Remove(f.Name())
return false, err
}
}
寫(xiě)入的內(nèi)容為 strings.TrimSpace(id) + LineBreak + ifname,這里的 id 其實(shí)是容器的 id,ifname 是網(wǎng)卡名稱,LineBreak 是換行符。
通過(guò) id 在主機(jī)上可以找到對(duì)應(yīng)的容器:
docker ps |grep b3b668
b3b668af977b k8s.gcr.io/pause:3.5 "/pause" 6 weeks ago Up 6 weeks k8s_POD_xxx-5b795fd7dd-82hrh_kube-system_b127b65c-f0ca-48a7-9020-ada60dfa535a_0
- last_reserved_ip.0 文件的用途
cat last_reserved_ip.0
10.234.58.87
在獲取可用 IP 時(shí),IPAM 會(huì)創(chuàng)建一個(gè)迭代器。
func (a *IPAllocator) Get(id string, ifname string, requestedIP net.IP) (*current.IPConfig, error) {
iter, err := a.GetIter()
if err != nil {
return nil, err
}
for {
reservedIP, gw = iter.Next()
if reservedIP == nil {
break
}
}
而迭代器需要依靠 last_reserved_ip.0 找到上一次分配的 IP,然后從這個(gè) IP 之后開(kāi)始分配。
func (a *IPAllocator) GetIter() (*RangeIter, error) {
lastReservedIP, err := a.store.LastReservedIP(a.rangeID)
if err != nil && !os.IsNotExist(err) {
log.Printf("Error retrieving last reserved ip: %v", err)
} else if lastReservedIP != nil {
startFromLastReservedIP = a.rangeset.Contains(lastReservedIP)
}
這里的 lastIPFilePrefix = "last_reserved_ip."
func (s *Store) LastReservedIP(rangeID string) (net.IP, error) {
ipfile := GetEscapedPath(s.dataDir, lastIPFilePrefix+rangeID)
data, err := os.ReadFile(ipfile)
if err != nil {
return nil, err
}
return net.ParseIP(string(data)), nil
}
host-local 分配 ip 時(shí)是按照輪詢的方式,遞增分配,如果分配到最后一個(gè) IP,就又從頭開(kāi)始分配。
- lock 文件
type Store struct {
*FileLock
dataDir string
}
每次存儲(chǔ)操作都會(huì)進(jìn)行加鎖,IP 分配不會(huì)并發(fā)進(jìn)行,確保唯一性。
a.store.Lock()
defer a.store.Unlock()
3. 總結(jié)
本篇主要是從 Pod IP 管理的角度,梳理了一下從 kube-controller-manager 到 kubelet 的 Pod IP 管理過(guò)程。主要內(nèi)容如下:
- kube-controller-manager 通過(guò) NodeIpamController 控制器為每個(gè)節(jié)點(diǎn)分配 Pod IP 網(wǎng)段,在集群規(guī)劃時(shí)需要根據(jù)集群規(guī)模調(diào)整 cluster-cidr、node-cidr-mask-size 參數(shù)
- kubelet 通過(guò) cri 調(diào)用 container runtime 創(chuàng)建 sandbox
- container runtime 調(diào)用 cni 創(chuàng)建 Pod 網(wǎng)絡(luò)
- IPAM 對(duì) Pod IP 的管理
在工作中很多熟悉的路徑,可能僅僅只是知道大概的流程,不知道具體的實(shí)現(xiàn)。通過(guò)源碼分析,可以更加深入地了解相關(guān)的細(xì)節(jié),也能學(xué)習(xí)到新的知識(shí)。
比如,在源碼中,我看到了 InPlacePodVerticalScaling 這個(gè)參數(shù),發(fā)現(xiàn)是 Kubernetes 1.27 的一個(gè) alpha 特性,可以在不重啟 Pod 的情況下,調(diào)整 Pod 的資源配置;在寫(xiě) Operator 更新 CR 狀態(tài)時(shí),在合適的場(chǎng)景下,可以學(xué)習(xí) nodeCIDRUpdateChannel 的實(shí)現(xiàn),將更新的狀態(tài)放入 channel 中,然后通過(guò) goroutine 處理狀態(tài)更新。