Kubernetes CSI 實現(xiàn)舉例,你學(xué)會了嗎?
前言
接下去我們簡單來看下一個 CSI Plugin 開源項目的實現(xiàn),github 地址是:https://github.com/alibaba/open-object。
這個 CSI Plugin 實現(xiàn)了將 Minio 作為后端存儲。以這個項目為例子,主要的考慮是:(1)它的實現(xiàn)不復(fù)雜,整體上的代碼量就 2k 行左右,便于閱讀和了解;(2)項目實現(xiàn)了將 Minio 作為后端存儲,在這個項目基礎(chǔ)之上,我們可以進(jìn)一步擴(kuò)展將阿里云 OSS 作為后端存儲的實現(xiàn),便于進(jìn)一步的實操。(3)Minio 是對象存儲,了解它的實現(xiàn)可以解決我心中的一個疑惑,就是我們?nèi)绾螌σ粋€文件的讀寫訪問,轉(zhuǎn)換為對 bucket 的讀寫請求。通過后續(xù)的了解,我們可以知道這主要是依賴了像 s3f3 這樣的程序,s3f3 通過 FUSE 將一個 bucket 表現(xiàn)為一個文件。
從部署開始
CSIDriver 資源對象
CSIDriver 包含了部署的 CSI Plugin 有關(guān)的信息。Kubernetes AD Controller 通過該對象來決定是否需要 attach。Kubelet 通過該對象決定 mount 時是否需要傳遞 Pod 信息。CSIDriver 資源對象未劃分命名空間。
可以了解到以下幾個點:
- OpenObject 這個 CSI 插件不需要 attach 步驟,并且在 mount 的時候需要傳遞 Pod 信息。
apiVersion: storage.k8s.io/v1beta1
kind: CSIDriver
metadata:
name: object.csi.aliyun.com
spec:
attachRequired: false
podInfoOnMount: true
volumeLifecycleModes:
- Persistent
CSI Node Plugin
可以看到 Daemonset 中包含了 3 個容器:
- init 容器,啟動一個 connector,這個 connector 運行在宿主機(jī)上,監(jiān)聽 /etc/open-object/connector.sock。通過這個 socket 文件,connector 會收到相應(yīng)的執(zhí)行命令,并執(zhí)行這些命令??傮w上的作用相當(dāng)于宿主機(jī)和容器之間的連接器。
- driver-registrar 容器,調(diào)用 csi-plugin 的 NodeGetInfo 的接口,將 CSI Node Plugin 的信息通過 kubelet 的插件注冊機(jī)制在對應(yīng)節(jié)點的 kubelet 上進(jìn)行注冊,同時把 CSI Node Plugin 的通信地址同步給 kubelet。
- CSI-Plugin 容器,這個容器里面運行的就是 CSI Node Plugin,也是我們實現(xiàn)的內(nèi)容。
kind: DaemonSet
apiVersion: apps/v1
metadata:
name: open-object
namespace: {{ .Values.namespace }}
spec:
selector:
matchLabels:
app: open-object
template:
metadata:
labels:
app: open-object
spec:
tolerations:
- operator: Exists
serviceAccount: open-object
hostNetwork: true
hostPID: true
dnsPolicy: ClusterFirstWithHostNet
initContainers:
- name: run-connector
image: {{ .Values.images.object.image }}:{{ .Values.images.object.tag }}
securityContext:
privileged: true
capabilities:
add: ["SYS_ADMIN"]
allowPrivilegeEscalation: true
command:
- /run-connector.sh
volumeMounts:
- name: host-systemd-config
mountPath: /host/usr/lib/systemd/system
- name: host-etc
mountPath: /host/etc/open-object
containers:
- name: driver-registrar
image: {{ .Values.images.registrar.image }}:{{ .Values.images.registrar.tag }}
args:
- "--kubelet-registration-path=$(DRIVER_REG_SOCK_PATH)"
- "--v=4"
- "--csi-address=$(ADDRESS)"
env:
- name: ADDRESS
value: /csi/csi.sock
- name: DRIVER_REG_SOCK_PATH
value: /var/lib/kubelet/plugins/object.csi.aliyun.com/csi.sock
- name: KUBE_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
...
volumeMounts:
- name: plugin-dir
mountPath: /csi
- name: registration-dir
mountPath: /registration/
- name: csi-plugin
securityContext:
privileged: true
capabilities:
add: ["SYS_ADMIN"]
allowPrivilegeEscalation: true
image: {{ .Values.images.object.image }}:{{ .Values.images.object.tag }}
args:
- "csi"
- "--endpoint=$(CSI_ENDPOINT)"
- "--nodeID=$(NODE_ID)"
- "--driver=object.csi.aliyun.com"
env:
- name: CSI_ENDPOINT
value: unix:///csi/csi.sock
- name: NODE_ID
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: TZ
value: Asia/Shanghai
...
volumeMounts:
- name: plugin-dir
mountPath: /csi
- name: pods-mount-dir
mountPath: /var/lib/kubelet/pods
mountPropagation: "Bidirectional"
- name: fuse-device
mountPath: /dev/fuse
- name: host-etc
mountPath: /host/etc/open-object
- name: host-etc-os
mountPath: /host/etc/os-release
volumes:
- name: registration-dir
hostPath:
path: /var/lib/kubelet/plugins_registry/
type: DirectoryOrCreate
- name: plugin-dir
hostPath:
path: /var/lib/kubelet/plugins/object.csi.aliyun.com
type: DirectoryOrCreate
- name: pods-mount-dir
hostPath:
path: /var/lib/kubelet/pods
type: Directory
- name: fuse-device
hostPath:
path: /dev/fuse
- name: host-etc
hostPath:
path: /etc/open-object
type: DirectoryOrCreate
- name: host-etc-os
hostPath:
path: /etc/os-release
type: File
- name: host-systemd-config
hostPath:
path: /usr/lib/systemd/system
type: DirectoryOrCreate
CSI Controller Plugin
下面這個 Deployment 部署的是 external-provisioner 組件,指定了通過哪個 socket 文件跟 CSI Controller Plugin 進(jìn)行通信。但是,我們在這個 Deployment 中并沒有看到 CSI Controller Plugin 組件相關(guān)的容器。這是因為對于 OpenObject 這個項目來說,它將 CSI Node Plugin 和 CSI Controller Plugin 的代碼都實現(xiàn)在了上述提到的 csi-plugin 容器里。由于,csi-plugin 是通過 Daemonset 方式部署的,每個節(jié)點上都有,所以 external-provisioner 組件可以通過指定的 socket 文件跟 csi-plugin 容器通信。
kind: Deployment
apiVersion: apps/v1
metadata:
name: open-object-csi-provisioner
namespace: {{ .Values.namespace }}
labels:
app: open-object
component: open-object-csi-provisioner
spec:
selector:
matchLabels:
app: open-object
component: open-object-csi-provisioner
replicas: 1
template:
metadata:
labels:
app: open-object
component: open-object-csi-provisioner
spec:
tolerations:
- operator: Exists
effect: NoSchedule
key: node-role.kubernetes.io/master
priorityClassName: system-cluster-critical
serviceAccount: open-object
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
containers:
- name: csi-provisioner
image: {{ .Values.images.provisioner.image }}:{{ .Values.images.provisioner.tag }}
args:
- --csi-address=$(ADDRESS)
- --volume-name-prefix=fuse
- --extra-create-metadata=true
- --timeout=10m
env:
- name: ADDRESS
value: /var/lib/kubelet/plugins/object.csi.aliyun.com/csi.sock
- name: TZ
value: Asia/Shanghai
...
volumeMounts:
- name: socket-dir
mountPath: /var/lib/kubelet/plugins/object.csi.aliyun.com
volumes:
- name: socket-dir
hostPath:
path: /var/lib/kubelet/plugins/object.csi.aliyun.com
type: DirectoryOrCreate
根據(jù)上述的情況,我們總結(jié)下:
- CSI Node Plugin 和 CSI Controller Plugin 都實現(xiàn)在同一個程序里。
- 由于這個程序通過 Daemonset 部署到了每個節(jié)點上,因此相當(dāng)于每個節(jié)點都運行著 CSI Node Plugin 和 CSI Controller Plugin 這兩個插件,并且都可以通過 csi.sock 文件實現(xiàn)跟 CSI Node Plugin 和 CSI Controller Plugin 的通信。所以,雖然 external-provisioner 是通過 Deployment 部署的,但是它仍舊可以通過 csi.sock 文件實現(xiàn)跟 CSI Controller Plugin 的通信。
進(jìn)入源碼
代碼實現(xiàn)上,我們重點關(guān)注 CSI 接口的實現(xiàn)。
初始化
首先是整個 CSI Plugin 的初始化:
- NewFuseDriver 函數(shù)初始化了一個 FuseDriver 結(jié)構(gòu)體,這個結(jié)構(gòu)體包含了 endpoint 也就是 CSI Plugin 監(jiān)聽的 socket 文件地址。
- ids、cs、ns 分別是 CSI Identity、CSI Controller 和 CSI Node 三類接口的具體實現(xiàn)。
func NewFuseDriver(nodeID, endpoint, driverName string, kubeClient *kubernetes.Clientset) (*FuseDriver, error) {
driver := csicommon.NewCSIDriver(driverName, version.Version, nodeID)
if driver == nil {
klog.Fatalln("Failed to initialize CSI Driver.")
}
s3Driver := &FuseDriver{
endpoint: endpoint,
driver: driver,
ids: newIdentityServer(driver),
cs: newControllerServer(driver),
ns: newNodeServer(driver),
}
return s3Driver, nil
}
func (s3 *FuseDriver) Run() {
// Initialize default library driver
s3.driver.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
})
s3.driver.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER})
s := csicommon.NewNonBlockingGRPCServer()
s.Start(s3.endpoint, s3.ids, s3.cs, s3.ns)
s.Wait()
}
CSI Controller 類接口的實現(xiàn)
先看下 CSI Controller 類接口的具體實現(xiàn),主要實現(xiàn)的是 CreateVolume、DeleteVolume 和 ControllerExpandVolume 這三個接口。
- CreateVolume 接口的核心實現(xiàn)就是在 minio 上創(chuàng)建一個 bucket。
- DeleteVolume 接口的核心實現(xiàn)就是將 bucket 刪除。
- ControllerExpandVolume 接口的核心實現(xiàn)就是將 bucket 的 quota 上限提高,調(diào)整 bucket 的 metadata。
type controllerServer struct {
kubeClinet *kubernetes.Clientset
*csicommon.DefaultControllerServer
}
func newControllerServer(d *csicommon.CSIDriver) *controllerServer {
cfg, err := clientcmd.BuildConfigFromFlags("", "")
if err != nil {
klog.Fatalf("Error building kubeconfig: %s", err.Error())
}
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}
return &controllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
kubeClinet: kubeClient,
}
}
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
...
// create volume
return driver.CreateVolume(ctx, req)
}
func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
...
return driver.DeleteVolume(ctx, req)
}
func (cs *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
...
// expand volume
return driver.ControllerExpandVolume(ctx, req)
}
func (driver *MinIODriver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
...
if err := driver.minioClient.CreateBucket(bucketName, capacity); err != nil {
return &csi.CreateVolumeResponse{}, status.Error(codes.Internal, err.Error())
}
...
}
func (driver *MinIODriver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
...
if err := driver.minioClient.DeleteBucket(bucketName); err != nil {
return &csi.DeleteVolumeResponse{}, status.Error(codes.Internal, err.Error())
}
...
}
func (driver *MinIODriver) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
...
capacity := req.GetCapacityRange().RequiredBytes
if DefaultFeatureGate.Enabled(Quota) {
if err := driver.minioClient.SetBucketQuota(bucketName, capacity, madmin.HardQuota); err != nil {
return &csi.ControllerExpandVolumeResponse{}, status.Error(codes.Internal, err.Error())
}
}
bucketMap, err := driver.minioClient.GetBucketMetadata(bucketName)
if err != nil {
return &csi.ControllerExpandVolumeResponse{}, status.Error(codes.Internal, err.Error())
}
bucketMap[MetaDataCapacity] = strconv.FormatInt(capacity, 10)
if err = driver.minioClient.SetBucketMetadata(bucketName, bucketMap); err != nil {
return &csi.ControllerExpandVolumeResponse{}, status.Error(codes.Internal, err.Error())
}
return &csi.ControllerExpandVolumeResponse{CapacityBytes: capacity, NodeExpansionRequired: false}, nil
}
CSI Node 類接口的實現(xiàn)
接下去看一下 CSI Node 類接口的具體實現(xiàn),主要實現(xiàn)的是 NodePublishVolume、NodeUnpublishVolume 這兩個接口。NodeStageVolume 和 NodeUnstageVolume 這兩個接口并沒有相應(yīng)的邏輯,也就是說不存在多個 Pod 共享一個 volume 的情況。
- NodePublishVolume 接口會調(diào)用 MinIODriver 的 NodePublishVolume 接口。在這個接口中會封裝好的 s3fs 執(zhí)行命令,并將執(zhí)行命令發(fā)送給 connector,connector 在宿主機(jī)上執(zhí)行相應(yīng)的命令。這個命令的主要作用是將 minio 的 bucket 掛載到 target path 上,也就是 /var/lib/kubelet/pods/${pod uid}/volumes/kubernetes.io****~${CSI Plugin Name}/${PV name} 上。
- NodeUnpublishVolume 接口會調(diào)用 FuseUmount 方法 unmount 掉 target path 上的掛載。代碼繼續(xù)跟蹤下去,它是在容器內(nèi)部就執(zhí)行了 unmount 方法。為什么在容器內(nèi)部執(zhí)行 unmount 方法就可以了呢?這是因為在部署這個容器的時候,把宿主機(jī)的 /var/lib/kubelet/pods/ 目錄就掛載到了容器的 /var/lib/kubelet/pods/ 上。
type nodeServer struct {
*csicommon.DefaultNodeServer
}
func newNodeServer(d *csicommon.CSIDriver) *nodeServer {
return &nodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d),
}
}
func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
volumeID := req.GetVolumeId()
targetPath := req.GetTargetPath()
...
return driver.NodePublishVolume(ctx, req)
}
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
volumeID := req.GetVolumeId()
targetPath := req.GetTargetPath()
...
if err := common.FuseUmount(targetPath); err != nil {
return &csi.NodeUnpublishVolumeResponse{}, status.Error(codes.Internal, err.Error())
}
klog.Infof("s3: mountpoint %s has been unmounted.", targetPath)
return &csi.NodeUnpublishVolumeResponse{}, nil
}
func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
return &csi.NodeStageVolumeResponse{}, nil
}
func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
return &csi.NodeUnstageVolumeResponse{}, nil
}
// NodeGetCapabilities returns the supported capabilities of the node server
func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
...
}
func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
return &csi.NodeExpandVolumeResponse{}, status.Error(codes.Unimplemented, "NodeExpandVolume is not implemented")
}
// NodeGetVolumeStats used for csi metrics
func (ns *nodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
return nil, nil
}
// NodePublishVolume 中的 driver.NodePublishVolume
func (driver *MinIODriver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
pv, err := driver.kubeClinet.CoreV1().PersistentVolumes().Get(ctx, req.GetVolumeId(), metav1.GetOptions{})
if err != nil {
return &csi.NodePublishVolumeResponse{}, status.Error(codes.Internal, err.Error())
}
bucketName := pv.Spec.CSI.VolumeAttributes[ParamBucketNameTag]
targetPath := req.GetTargetPath()
notMnt, err := checkMount(targetPath)
if err != nil {
return &csi.NodePublishVolumeResponse{}, status.Error(codes.Internal, err.Error())
}
if !notMnt {
return &csi.NodePublishVolumeResponse{}, nil
}
if err := S3FSMount(driver.Endpoint, bucketName, targetPath, driver.AK, driver.SK); err != nil {
return &csi.NodePublishVolumeResponse{}, err
}
klog.Infof("s3: bucket %s successfully mounted to %s", bucketName, targetPath)
return &csi.NodePublishVolumeResponse{}, nil
}
// NodeUnpublishVolume 中的 common.FuseUmount
func FuseUmount(path string) error {
if err := mount.New("").Unmount(path); err != nil {
return err
}
// as fuse quits immediately, we will try to wait until the process is done
process, err := findFuseMountProcess(path)
if err != nil {
klog.Errorf("Error getting PID of fuse mount: %s", err)
return nil
}
if process == nil {
klog.Warningf("Unable to find PID of fuse mount %s, it must have finished already", path)
return nil
}
klog.Infof("Found fuse pid %v of mount %s, checking if it still runs", process.Pid, path)
return waitForProcess(process, 1)
}
擴(kuò)展認(rèn)識
上文提到了 s3f3,s3fs 是一個開源的用戶態(tài)文件系統(tǒng),它允許將 Amazon S3 bucket 掛載到 Linux 系統(tǒng)中,使其表現(xiàn)為本地文件系統(tǒng)。
通過 s3fs,用戶可以像訪問本地文件系統(tǒng)一樣訪問 Amazon S3 存儲桶中的對象。這使得開發(fā)人員可以直接在應(yīng)用程序中使用 S3 存儲桶,而無需使用 Amazon S3 的 API 進(jìn)行手動操作。s3fs 提供了對 Amazon S3 存儲桶的標(biāo)準(zhǔn)文件系統(tǒng)操作,例如讀取、寫入、復(fù)制、移動和刪除文件。它還支持文件權(quán)限、目錄結(jié)構(gòu)和符號鏈接等常見的文件系統(tǒng)功能。由于 Amazon S3 和 minio 的 API 是兼容的,因此也可以用于 minio。
s3f3 github 地址:https://github.com/s3fs-fuse/s3fs-fuse