自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Kubernetes CSI 實現(xiàn)舉例,你學(xué)會了嗎?

云計算 云原生
通過 s3fs,用戶可以像訪問本地文件系統(tǒng)一樣訪問 Amazon S3 存儲桶中的對象。這使得開發(fā)人員可以直接在應(yīng)用程序中使用 S3 存儲桶,而無需使用 Amazon S3 的 API 進(jìn)行手動操作。

前言

接下去我們簡單來看下一個 CSI Plugin 開源項目的實現(xiàn),github 地址是:https://github.com/alibaba/open-object。

這個 CSI Plugin 實現(xiàn)了將 Minio 作為后端存儲。以這個項目為例子,主要的考慮是:(1)它的實現(xiàn)不復(fù)雜,整體上的代碼量就 2k 行左右,便于閱讀和了解;(2)項目實現(xiàn)了將 Minio 作為后端存儲,在這個項目基礎(chǔ)之上,我們可以進(jìn)一步擴(kuò)展將阿里云 OSS 作為后端存儲的實現(xiàn),便于進(jìn)一步的實操。(3)Minio 是對象存儲,了解它的實現(xiàn)可以解決我心中的一個疑惑,就是我們?nèi)绾螌σ粋€文件的讀寫訪問,轉(zhuǎn)換為對 bucket 的讀寫請求。通過后續(xù)的了解,我們可以知道這主要是依賴了像 s3f3 這樣的程序,s3f3 通過 FUSE 將一個 bucket 表現(xiàn)為一個文件。

從部署開始

CSIDriver 資源對象

CSIDriver 包含了部署的 CSI Plugin 有關(guān)的信息。Kubernetes AD Controller 通過該對象來決定是否需要 attach。Kubelet 通過該對象決定 mount 時是否需要傳遞 Pod 信息。CSIDriver 資源對象未劃分命名空間。

可以了解到以下幾個點:

  • OpenObject 這個 CSI 插件不需要 attach 步驟,并且在 mount 的時候需要傳遞 Pod 信息。
apiVersion: storage.k8s.io/v1beta1
kind: CSIDriver
metadata:
  name: object.csi.aliyun.com
spec:
  attachRequired: false
  podInfoOnMount: true
  volumeLifecycleModes:
  - Persistent

CSI Node Plugin

可以看到 Daemonset 中包含了 3 個容器:

  • init 容器,啟動一個 connector,這個 connector 運行在宿主機(jī)上,監(jiān)聽 /etc/open-object/connector.sock。通過這個 socket 文件,connector 會收到相應(yīng)的執(zhí)行命令,并執(zhí)行這些命令??傮w上的作用相當(dāng)于宿主機(jī)和容器之間的連接器。
  • driver-registrar 容器,調(diào)用 csi-plugin 的 NodeGetInfo 的接口,將 CSI Node Plugin 的信息通過 kubelet 的插件注冊機(jī)制在對應(yīng)節(jié)點的 kubelet 上進(jìn)行注冊,同時把 CSI Node Plugin 的通信地址同步給 kubelet。
  • CSI-Plugin 容器,這個容器里面運行的就是 CSI Node Plugin,也是我們實現(xiàn)的內(nèi)容。
kind: DaemonSet
apiVersion: apps/v1
metadata:
  name: open-object
  namespace: {{ .Values.namespace }}
spec:
  selector:
    matchLabels:
      app: open-object
  template:
    metadata:
      labels:
        app: open-object
    spec:
      tolerations:
      - operator: Exists
      serviceAccount: open-object
      hostNetwork: true
      hostPID: true
      dnsPolicy: ClusterFirstWithHostNet
      initContainers:
      - name: run-connector
        image: {{ .Values.images.object.image }}:{{ .Values.images.object.tag }}
        securityContext:
          privileged: true
          capabilities:
            add: ["SYS_ADMIN"]
          allowPrivilegeEscalation: true
        command:
        - /run-connector.sh
        volumeMounts:
        - name: host-systemd-config
          mountPath: /host/usr/lib/systemd/system
        - name: host-etc
          mountPath: /host/etc/open-object
      containers:
      - name: driver-registrar
        image: {{ .Values.images.registrar.image }}:{{ .Values.images.registrar.tag }}
        args:
        - "--kubelet-registration-path=$(DRIVER_REG_SOCK_PATH)"
        - "--v=4"
        - "--csi-address=$(ADDRESS)"
        env:
        - name: ADDRESS
          value: /csi/csi.sock
        - name: DRIVER_REG_SOCK_PATH
          value: /var/lib/kubelet/plugins/object.csi.aliyun.com/csi.sock
        - name: KUBE_NODE_NAME
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName
        ...
        volumeMounts:
        - name: plugin-dir
          mountPath: /csi
        - name: registration-dir
          mountPath: /registration/
      - name: csi-plugin
        securityContext:
          privileged: true
          capabilities:
            add: ["SYS_ADMIN"]
          allowPrivilegeEscalation: true
        image: {{ .Values.images.object.image }}:{{ .Values.images.object.tag }}
        args:
        - "csi"
        - "--endpoint=$(CSI_ENDPOINT)"
        - "--nodeID=$(NODE_ID)"
        - "--driver=object.csi.aliyun.com"
        env:
        - name: CSI_ENDPOINT
          value: unix:///csi/csi.sock
        - name: NODE_ID
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName
        - name: TZ
          value: Asia/Shanghai
        ...
        volumeMounts:
        - name: plugin-dir
          mountPath: /csi
        - name: pods-mount-dir
          mountPath: /var/lib/kubelet/pods
          mountPropagation: "Bidirectional"
        - name: fuse-device
          mountPath: /dev/fuse
        - name: host-etc
          mountPath: /host/etc/open-object
        - name: host-etc-os
          mountPath: /host/etc/os-release
      volumes:
      - name: registration-dir
        hostPath:
          path: /var/lib/kubelet/plugins_registry/
          type: DirectoryOrCreate
      - name: plugin-dir
        hostPath:
          path: /var/lib/kubelet/plugins/object.csi.aliyun.com
          type: DirectoryOrCreate
      - name: pods-mount-dir
        hostPath:
          path: /var/lib/kubelet/pods
          type: Directory
      - name: fuse-device
        hostPath:
          path: /dev/fuse
      - name: host-etc
        hostPath:
          path: /etc/open-object
          type: DirectoryOrCreate
      - name: host-etc-os
        hostPath:
          path: /etc/os-release
          type: File
      - name: host-systemd-config
        hostPath:
          path: /usr/lib/systemd/system
          type: DirectoryOrCreate

CSI Controller Plugin

下面這個 Deployment 部署的是 external-provisioner 組件,指定了通過哪個 socket 文件跟 CSI Controller Plugin 進(jìn)行通信。但是,我們在這個 Deployment 中并沒有看到 CSI Controller Plugin 組件相關(guān)的容器。這是因為對于 OpenObject 這個項目來說,它將 CSI Node Plugin 和 CSI Controller Plugin 的代碼都實現(xiàn)在了上述提到的 csi-plugin 容器里。由于,csi-plugin 是通過 Daemonset 方式部署的,每個節(jié)點上都有,所以 external-provisioner 組件可以通過指定的 socket 文件跟 csi-plugin 容器通信。

kind: Deployment
apiVersion: apps/v1
metadata:
  name: open-object-csi-provisioner
  namespace: {{ .Values.namespace }}
  labels:
    app: open-object
    component: open-object-csi-provisioner
spec:
  selector:
    matchLabels:
      app: open-object
      component: open-object-csi-provisioner
  replicas: 1
  template:
    metadata:
      labels:
        app: open-object
        component: open-object-csi-provisioner
    spec:
      tolerations:
      - operator: Exists
        effect: NoSchedule
        key: node-role.kubernetes.io/master
      priorityClassName: system-cluster-critical
      serviceAccount: open-object
      hostNetwork: true
      dnsPolicy: ClusterFirstWithHostNet
      containers:
        - name: csi-provisioner
          image: {{ .Values.images.provisioner.image }}:{{ .Values.images.provisioner.tag }}
          args:
            - --csi-address=$(ADDRESS)
            - --volume-name-prefix=fuse
            - --extra-create-metadata=true
            - --timeout=10m
          env:
            - name: ADDRESS
              value: /var/lib/kubelet/plugins/object.csi.aliyun.com/csi.sock
            - name: TZ
              value: Asia/Shanghai
          ...
          volumeMounts:
            - name: socket-dir
              mountPath: /var/lib/kubelet/plugins/object.csi.aliyun.com
      volumes:
        - name: socket-dir
          hostPath:
            path: /var/lib/kubelet/plugins/object.csi.aliyun.com
            type: DirectoryOrCreate

根據(jù)上述的情況,我們總結(jié)下:

  • CSI Node Plugin 和 CSI Controller Plugin 都實現(xiàn)在同一個程序里。
  • 由于這個程序通過 Daemonset 部署到了每個節(jié)點上,因此相當(dāng)于每個節(jié)點都運行著 CSI Node Plugin 和 CSI Controller Plugin 這兩個插件,并且都可以通過 csi.sock 文件實現(xiàn)跟 CSI Node Plugin 和 CSI Controller Plugin 的通信。所以,雖然 external-provisioner 是通過 Deployment 部署的,但是它仍舊可以通過 csi.sock 文件實現(xiàn)跟 CSI Controller Plugin 的通信。

進(jìn)入源碼

代碼實現(xiàn)上,我們重點關(guān)注 CSI 接口的實現(xiàn)。

初始化

首先是整個 CSI Plugin 的初始化:

  • NewFuseDriver 函數(shù)初始化了一個 FuseDriver 結(jié)構(gòu)體,這個結(jié)構(gòu)體包含了 endpoint 也就是 CSI Plugin 監(jiān)聽的 socket 文件地址。
  • ids、cs、ns 分別是 CSI Identity、CSI Controller 和 CSI Node 三類接口的具體實現(xiàn)。
func NewFuseDriver(nodeID, endpoint, driverName string, kubeClient *kubernetes.Clientset) (*FuseDriver, error) {
 driver := csicommon.NewCSIDriver(driverName, version.Version, nodeID)
 if driver == nil {
  klog.Fatalln("Failed to initialize CSI Driver.")
 }

 s3Driver := &FuseDriver{
  endpoint: endpoint,
  driver:   driver,
  ids:      newIdentityServer(driver),
  cs:       newControllerServer(driver),
  ns:       newNodeServer(driver),
 }
 return s3Driver, nil
}

func (s3 *FuseDriver) Run() {
 // Initialize default library driver
 s3.driver.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{
  csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
  csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
 })
 s3.driver.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
  csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER})

 s := csicommon.NewNonBlockingGRPCServer()
 s.Start(s3.endpoint, s3.ids, s3.cs, s3.ns)
 s.Wait()
}

CSI Controller 類接口的實現(xiàn)

先看下 CSI Controller 類接口的具體實現(xiàn),主要實現(xiàn)的是 CreateVolume、DeleteVolume 和 ControllerExpandVolume 這三個接口。

  • CreateVolume 接口的核心實現(xiàn)就是在 minio 上創(chuàng)建一個 bucket。
  • DeleteVolume 接口的核心實現(xiàn)就是將 bucket 刪除。
  • ControllerExpandVolume 接口的核心實現(xiàn)就是將 bucket 的 quota 上限提高,調(diào)整 bucket 的 metadata。
type controllerServer struct {
 kubeClinet *kubernetes.Clientset
 *csicommon.DefaultControllerServer
}

func newControllerServer(d *csicommon.CSIDriver) *controllerServer {
 cfg, err := clientcmd.BuildConfigFromFlags("", "")
 if err != nil {
  klog.Fatalf("Error building kubeconfig: %s", err.Error())
 }
 kubeClient, err := kubernetes.NewForConfig(cfg)
 if err != nil {
  klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
 }
 return &controllerServer{
  DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
  kubeClinet:              kubeClient,
 }
}

func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
 ...

 // create volume
 return driver.CreateVolume(ctx, req)
}

func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
 ...

 return driver.DeleteVolume(ctx, req)
}

func (cs *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
 ...

 // expand volume
 return driver.ControllerExpandVolume(ctx, req)
}

func (driver *MinIODriver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
 ...
 if err := driver.minioClient.CreateBucket(bucketName, capacity); err != nil {
  return &csi.CreateVolumeResponse{}, status.Error(codes.Internal, err.Error())
 }
  ...
}

func (driver *MinIODriver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
 ...
 if err := driver.minioClient.DeleteBucket(bucketName); err != nil {
  return &csi.DeleteVolumeResponse{}, status.Error(codes.Internal, err.Error())
 }
 ...
}

func (driver *MinIODriver) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
 ...
 capacity := req.GetCapacityRange().RequiredBytes
 if DefaultFeatureGate.Enabled(Quota) {
  if err := driver.minioClient.SetBucketQuota(bucketName, capacity, madmin.HardQuota); err != nil {
   return &csi.ControllerExpandVolumeResponse{}, status.Error(codes.Internal, err.Error())
  }
 }

 bucketMap, err := driver.minioClient.GetBucketMetadata(bucketName)
 if err != nil {
  return &csi.ControllerExpandVolumeResponse{}, status.Error(codes.Internal, err.Error())
 }
 bucketMap[MetaDataCapacity] = strconv.FormatInt(capacity, 10)
 if err = driver.minioClient.SetBucketMetadata(bucketName, bucketMap); err != nil {
  return &csi.ControllerExpandVolumeResponse{}, status.Error(codes.Internal, err.Error())
 }

 return &csi.ControllerExpandVolumeResponse{CapacityBytes: capacity, NodeExpansionRequired: false}, nil
}

CSI Node 類接口的實現(xiàn)

接下去看一下 CSI Node 類接口的具體實現(xiàn),主要實現(xiàn)的是 NodePublishVolume、NodeUnpublishVolume 這兩個接口。NodeStageVolume 和 NodeUnstageVolume 這兩個接口并沒有相應(yīng)的邏輯,也就是說不存在多個 Pod 共享一個 volume 的情況。

  • NodePublishVolume 接口會調(diào)用 MinIODriver 的 NodePublishVolume 接口。在這個接口中會封裝好的 s3fs 執(zhí)行命令,并將執(zhí)行命令發(fā)送給 connector,connector 在宿主機(jī)上執(zhí)行相應(yīng)的命令。這個命令的主要作用是將 minio 的 bucket 掛載到 target path 上,也就是 /var/lib/kubelet/pods/${pod uid}/volumes/kubernetes.io****~${CSI Plugin Name}/${PV name} 上。
  • NodeUnpublishVolume 接口會調(diào)用 FuseUmount 方法 unmount 掉 target path 上的掛載。代碼繼續(xù)跟蹤下去,它是在容器內(nèi)部就執(zhí)行了 unmount 方法。為什么在容器內(nèi)部執(zhí)行 unmount 方法就可以了呢?這是因為在部署這個容器的時候,把宿主機(jī)的 /var/lib/kubelet/pods/ 目錄就掛載到了容器的 /var/lib/kubelet/pods/ 上。
type nodeServer struct {
 *csicommon.DefaultNodeServer
}

func newNodeServer(d *csicommon.CSIDriver) *nodeServer {
 return &nodeServer{
  DefaultNodeServer: csicommon.NewDefaultNodeServer(d),
 }
}

func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
 volumeID := req.GetVolumeId()
 targetPath := req.GetTargetPath()
  ...
  
 return driver.NodePublishVolume(ctx, req)
}

func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
 volumeID := req.GetVolumeId()
 targetPath := req.GetTargetPath()
  ...
  
 if err := common.FuseUmount(targetPath); err != nil {
  return &csi.NodeUnpublishVolumeResponse{}, status.Error(codes.Internal, err.Error())
 }
 klog.Infof("s3: mountpoint %s has been unmounted.", targetPath)

 return &csi.NodeUnpublishVolumeResponse{}, nil
}

func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
 return &csi.NodeStageVolumeResponse{}, nil
}

func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
 return &csi.NodeUnstageVolumeResponse{}, nil
}

// NodeGetCapabilities returns the supported capabilities of the node server
func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
 ...
}

func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
 return &csi.NodeExpandVolumeResponse{}, status.Error(codes.Unimplemented, "NodeExpandVolume is not implemented")
}

// NodeGetVolumeStats used for csi metrics
func (ns *nodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
 return nil, nil
}

// NodePublishVolume 中的 driver.NodePublishVolume
func (driver *MinIODriver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
 pv, err := driver.kubeClinet.CoreV1().PersistentVolumes().Get(ctx, req.GetVolumeId(), metav1.GetOptions{})
 if err != nil {
  return &csi.NodePublishVolumeResponse{}, status.Error(codes.Internal, err.Error())
 }
 bucketName := pv.Spec.CSI.VolumeAttributes[ParamBucketNameTag]
 targetPath := req.GetTargetPath()

 notMnt, err := checkMount(targetPath)
 if err != nil {
  return &csi.NodePublishVolumeResponse{}, status.Error(codes.Internal, err.Error())
 }
 if !notMnt {
  return &csi.NodePublishVolumeResponse{}, nil
 }

 if err := S3FSMount(driver.Endpoint, bucketName, targetPath, driver.AK, driver.SK); err != nil {
  return &csi.NodePublishVolumeResponse{}, err
 }

 klog.Infof("s3: bucket %s successfully mounted to %s", bucketName, targetPath)

 return &csi.NodePublishVolumeResponse{}, nil
}

// NodeUnpublishVolume 中的 common.FuseUmount
func FuseUmount(path string) error {
 if err := mount.New("").Unmount(path); err != nil {
  return err
 }
 // as fuse quits immediately, we will try to wait until the process is done
 process, err := findFuseMountProcess(path)
 if err != nil {
  klog.Errorf("Error getting PID of fuse mount: %s", err)
  return nil
 }
 if process == nil {
  klog.Warningf("Unable to find PID of fuse mount %s, it must have finished already", path)
  return nil
 }
 klog.Infof("Found fuse pid %v of mount %s, checking if it still runs", process.Pid, path)
 return waitForProcess(process, 1)
}

擴(kuò)展認(rèn)識

上文提到了 s3f3,s3fs 是一個開源的用戶態(tài)文件系統(tǒng),它允許將 Amazon S3 bucket 掛載到 Linux 系統(tǒng)中,使其表現(xiàn)為本地文件系統(tǒng)。

通過 s3fs,用戶可以像訪問本地文件系統(tǒng)一樣訪問 Amazon S3 存儲桶中的對象。這使得開發(fā)人員可以直接在應(yīng)用程序中使用 S3 存儲桶,而無需使用 Amazon S3 的 API 進(jìn)行手動操作。s3fs 提供了對 Amazon S3 存儲桶的標(biāo)準(zhǔn)文件系統(tǒng)操作,例如讀取、寫入、復(fù)制、移動和刪除文件。它還支持文件權(quán)限、目錄結(jié)構(gòu)和符號鏈接等常見的文件系統(tǒng)功能。由于 Amazon S3 和 minio 的 API 是兼容的,因此也可以用于 minio。

s3f3 github 地址:https://github.com/s3fs-fuse/s3fs-fuse

責(zé)任編輯:武曉燕 來源: 多選參數(shù)
相關(guān)推薦

2022-06-16 07:50:35

數(shù)據(jù)結(jié)構(gòu)鏈表

2022-07-26 08:03:27

Kubernetes節(jié)點磁盤

2024-01-30 18:29:29

微服務(wù)架構(gòu)Ingress

2023-01-10 08:43:15

定義DDD架構(gòu)

2024-02-04 00:00:00

Effect數(shù)據(jù)組件

2023-07-26 13:11:21

ChatGPT平臺工具

2024-01-19 08:25:38

死鎖Java通信

2024-01-02 12:05:26

Java并發(fā)編程

2023-08-01 12:51:18

WebGPT機(jī)器學(xué)習(xí)模型

2023-12-07 12:29:49

Nginx負(fù)載均衡策略

2024-03-12 08:37:32

asyncawaitJavaScript

2024-08-12 08:12:38

2024-01-26 06:05:16

KuberneteseBPF網(wǎng)絡(luò)

2024-05-06 00:00:00

InnoDBView隔離

2024-08-06 09:47:57

2022-07-08 09:27:48

CSSIFC模型

2023-01-30 09:01:54

圖表指南圖形化

2024-07-31 08:39:45

Git命令暫存區(qū)

2023-12-12 08:02:10

2023-10-10 11:04:11

Rust難點內(nèi)存
點贊
收藏

51CTO技術(shù)棧公眾號