在 Kubernetes 上自動縮放 Kinesis Data Streams 應用程序
想了解如何在 Kubernetes 上自動擴展您的 Kinesis Data Streams 消費者應用程序,以便您可以節(jié)省成本并提高資源效率?該博客提供了有關如何做到這一點的分步指南。
通過利用 Kubernetes 自動擴展 Kinesis 消費者應用程序,您可以受益于其內(nèi)置功能,例如 Horizontal Pod Autoscaler。
什么是 Amazon Kinesis 和 Kinesis Data Streams?
Amazon Kinesis是一個用于實時數(shù)據(jù)處理、攝取和分析的平臺。Kinesis Data Streams是一種無服務器流數(shù)據(jù)服務(Kinesis 流數(shù)據(jù)平臺的一部分,還有Kinesis Data Firehose、Kinesis Video Streams和Kinesis Data Analytics。
Kinesis Data Streams 可以彈性擴展并持續(xù)適應數(shù)據(jù)攝取率和流消費率的變化。它可用于構建實時數(shù)據(jù)分析應用程序、實時儀表板和實時數(shù)據(jù)管道。
讓我們首先概述 Kinesis Data Streams 的一些關鍵概念。
Kinesis Data Streams:高級架構
- Kinesis 數(shù)據(jù)流是一組分片。每個分片都有一系列數(shù)據(jù)記錄。
- 生產(chǎn)者不斷將數(shù)據(jù)推送到 Kinesis Data Streams,消費者實時處理數(shù)據(jù)。
- 分區(qū) 鍵用于按流中的分片對數(shù)據(jù)進行分組。
- Kinesis Data Streams 將屬于一個流的數(shù)據(jù)記錄分成多個分片。
- 它使用與每個數(shù)據(jù)記錄關聯(lián)的分區(qū)鍵來確定給定數(shù)據(jù)記錄屬于哪個分片。
- 消費者從 Amazon Kinesis Data Streams 獲取記錄,對其進行處理,并將結果存儲在 Amazon DynamoDB、Amazon Redshift、Amazon S3 等中。
- 這些消費者也稱為 Amazon Kinesis Data Streams 應用程序。
- 開發(fā)可以處理來自 KDS 數(shù)據(jù)流的數(shù)據(jù)的自定義消費者應用程序的方法之一是使用Kinesis Client Library ( KCL)。
Kinesis 消費者應用程序如何橫向擴展?
Kinesis Client Library 確保有一個記錄處理器為每個分片運行并處理來自該分片的數(shù)據(jù)。KCL通過處理與分布式計算和可擴展性相關的許多復雜任務,幫助您使用和處理來自 Kinesis 數(shù)據(jù)流的數(shù)據(jù)。它連接到數(shù)據(jù)流,枚舉數(shù)據(jù)流中的分片,并使用租約來協(xié)調分片與其消費者應用程序的關聯(lián)。
記錄處理器為其管理的每個分片實例化。KCL從數(shù)據(jù)流中拉取數(shù)據(jù)記錄,將記錄推送到相應的記錄處理器,檢查點處理記錄。更重要的是,當工作實例計數(shù)發(fā)生變化或數(shù)據(jù)流重新分片(分片被拆分或合并)時,它會平衡分片-工作關聯(lián)(租約)。這意味著您可以通過簡單地添加更多實例來擴展您的 Kinesis Data Streams 應用程序,因為KCL它將自動平衡實例之間的分片。
但是,當負載增加時,您仍然需要一種方法來擴展您的應用程序。當然,您可以手動執(zhí)行此操作或構建自定義解決方案來完成此操作。
這是Kubernetes 事件驅動的自動縮放(KEDA) 可以提供幫助的地方。是一個基于 Kubernetes 的事件驅動的自動伸縮組件,可以像 Kinesis 一樣監(jiān)控事件源,并根據(jù)需要處理的事件數(shù)量來KEDA伸縮底層Deployment(和s)。Pod
為見證自動縮放的運行,您將使用一個 Java 應用程序,該應用程序使用 Kinesis Client Library ( KCL) 2.x 使用來自 Kinesis Data Stream 的數(shù)據(jù)。它將部署到Amazon EKS上的 Kubernetes 集群,并使用KEDA. 該應用程序包括ShardRecordProcessor處理來自 Kinesis 流的數(shù)據(jù)并將其保存到 DynamoDB 表的實現(xiàn)。我們將使用 AWS CLI 為 Kinesis 流生成數(shù)據(jù)并觀察應用程序的擴展。
之前,我們深入了解,這里是KEDA.
什么是科達?
KEDA是一個開源 CNCF 項目,它建立在原生 Kubernetes 原語(例如 Horizontal Pod Autoscaler)之上,可以添加到任何 Kubernetes 集群。以下是其關鍵組件的高級概述(您可以參考KEDA 文檔進行深入研究):
- 該keda-operator-metrics-apiserver組件KEDA充當Kubernetes 指標服務器,為 Horizontal Pod Autoscaler 公開指標
- KEDA Scaler與外部系統(tǒng)(例如 Redis)集成以獲取這些指標(例如,列表的長度),以根據(jù)需要處理的事件數(shù)量驅動 Kubernetes 中任何容器的自動縮放。
- 組件的作用keda-operator是activate和deactivateDeployment;即,縮放到零和從零開始。
您將看到Kinesis Stream KEDA 縮放器正在運行,它根據(jù) AWS Kinesis Stream 的分片數(shù)進行縮放。
現(xiàn)在讓我們繼續(xù)本文的實際部分。
先決條件
除了 AWS 賬戶外,您還需要安裝AWS CLI、kubectl、Docker、 Java 11 和Maven。
設置 EKS 集群、創(chuàng)建 DynamoDB 表和 Kinesis 數(shù)據(jù)流
您可以通過多種方式創(chuàng)建Amazon EKS 集群。我更喜歡使用eksctl CLI,因為它提供了便利。使用以下方法創(chuàng)建 EKS 集群eksctl非常簡單:
eksctl create cluster --name <cluster name> --region <region e.g. us-east-1>
有關詳細信息,請參閱Amazon EKS 入門 – eksctl文檔。
創(chuàng)建一個 DynamoDB 表來保存應用程序數(shù)據(jù)。您可以使用 AWS CLI 通過以下命令創(chuàng)建表:
aws dynamodb create-table \ --table-name users \ --attribute-definitions AttributeName=email,AttributeType=S \ --key-schema AttributeName=email,KeyType=HASH \ --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
使用 AWS CLI創(chuàng)建一個包含兩個分片的 Kinesis 流:
aws kinesis create-stream --stream-name kinesis-keda-demo --shard-count 2
克隆此 GitHub 存儲庫并將其更改為正確的目錄:
git clone https://github.com/abhirockzz/kinesis-keda-autoscalingcd kinesis-keda-autoscaling
好的,讓我們開始吧!
在 EKS 上設置和配置 KEDA
出于本教程的目的,您將使用YAML 文件部署KEDA. 但您也可以使用Helm 圖表。
安裝KEDA:
# update version 2.8.2 if requiredkubectl apply -f https://github.com/kedacore/keda/releases/download/v2.8.2/keda-2.8.2.yaml
驗證安裝:
# check Custom Resource Definitionskubectl get crd# check KEDA Deploymentskubectl get deployment -n keda# check KEDA operator logskubectl logs -f $(kubectl get pod -l=app=keda-operator -o jsnotallow='{.items[0].metadata.name}' -n keda) -n keda
配置 IAM 角色
KEDA 操作員和 Kinesis 消費者應用程序需要調用 AWS API。由于兩者都將作為 EKS 中的 s 運行Deployment,我們將使用IAM 服務賬戶角色 (IRSA)來提供必要的權限。
在這種特殊情況下:
- KEDA運算符需要能夠獲取 Kinesis 流的分片計數(shù):它通過使用DescribeStreamSummaryAPI 來實現(xiàn)。
- 應用程序(具體來說是 KCL 庫)需要與 Kinesis 和 DynamoDB 交互:它需要一堆IAM 權限才能這樣做。
為 KEDA 操作員配置 IRSA
將您的 AWS 賬戶 ID 和 OIDC 身份提供商設置為環(huán)境變量:
ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)#update the cluster name and region as requiredexport EKS_CLUSTER_NAME=demo-eks-clusterexport AWS_REGION=us-east-1OIDC_PROVIDER=$(aws eks describe-cluster --name $EKS_CLUSTER_NAME --query "cluster.identity.oidc.issuer" --output text | sed -e "s/^https:\/\///")
JSON為角色創(chuàng)建一個包含可信實體的文件:
read -r -d '' TRUST_RELATIONSHIP <<EOF{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_PROVIDER}" }, "Action": "sts:AssumeRoleWithWebIdentity", "Condition": { "StringEquals": { "${OIDC_PROVIDER}:aud": "sts.amazonaws.com", "${OIDC_PROVIDER}:sub": "system:serviceaccount:keda:keda-operator" } } } ]}EOFecho "${TRUST_RELATIONSHIP}" > trust_keda.json
現(xiàn)在,創(chuàng)建 IAM 角色并附加策略(查看policy_kinesis_keda.json文件了解詳細信息):
export ROLE_NAME=keda-operator-kinesis-roleaws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust_keda.json --description "IRSA for kinesis KEDA scaler on EKS"aws iam create-policy --policy-name keda-kinesis-policy --policy-document file://policy_kinesis_keda.jsonaws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::${ACCOUNT_ID}:policy/keda-kinesis-policy
關聯(lián) IAM 角色和服務帳戶:
kubectl annotate serviceaccount -n keda keda-operator eks.amazonaws.com/role-arn=arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}# verify the annotation kubectl describe serviceaccount/keda-operator -n keda
您需要重新啟動KEDA操作員Deployment才能生效:
kubectl rollout restart deployment.apps/keda-operator -n keda# to verify, confirm that the KEDA operator has the right environment variableskubectl describe pod -n keda $(kubectl get po -l=app=keda-operator -n keda --output=jsnotallow={.items..metadata.name}) | grep "^\s*AWS_"# expected outputAWS_STS_REGIONAL_ENDPOINTS: regionalAWS_DEFAULT_REGION: us-east-1AWS_REGION: us-east-1AWS_ROLE_ARN: arn:aws:iam::<AWS_ACCOUNT_ID>:role/keda-operator-kinesis-roleAWS_WEB_IDENTITY_TOKEN_FILE: /var/run/secrets/eks.amazonaws.com/serviceaccount/token
為 KCL 消費者應用程序配置 IRSA
首先創(chuàng)建一個 Kubernetes 服務帳戶:
kubectl apply -f - <<EOFapiVersion: v1kind: ServiceAccountmetadata: name: kcl-consumer-app-saEOF
JSON為角色創(chuàng)建一個包含可信實體的文件:
read -r -d '' TRUST_RELATIONSHIP <<EOF{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_PROVIDER}" }, "Action": "sts:AssumeRoleWithWebIdentity", "Condition": { "StringEquals": { "${OIDC_PROVIDER}:aud": "sts.amazonaws.com", "${OIDC_PROVIDER}:sub": "system:serviceaccount:default:kcl-consumer-app-sa" } } } ]}EOFecho "${TRUST_RELATIONSHIP}" > trust.json
現(xiàn)在,創(chuàng)建 IAM 角色并附加策略(查看policy.json文件了解詳細信息):
export ROLE_NAME=kcl-consumer-app-roleaws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust.json --description "IRSA for KCL consumer app on EKS"aws iam create-policy --policy-name kcl-consumer-app-policy --policy-document file://policy.jsonaws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::${ACCOUNT_ID}:policy/kcl-consumer-app-policy
關聯(lián) IAM 角色和服務帳戶:
kubectl annotate serviceaccount -n default kcl-consumer-app-sa eks.amazonaws.com/role-arn=arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}# verify the annotationkubectl describe serviceaccount/kcl-consumer-app-sa
核心基礎設施現(xiàn)已準備就緒。讓我們準備并部署消費者應用程序。
將 KCL 消費者應用程序部署到 EKS
您首先需要構建 Docker 鏡像并將其推送到Amazon Elastic Container Registry (ECR)(有關Dockerfile詳細信息,請參閱 )。
構建 Docker 鏡像并將其推送到 ECR
# create runnable JAR filemvn clean compile assembly\:single# build docker imagedocker build -t kcl-consumer-app .AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)# create a private ECR repoaws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.comaws ecr create-repository --repository-name kcl-consumer-app --region us-east-1# tag and push the imagedocker tag kcl-consumer-app:latest $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/kcl-consumer-app:latestdocker push $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/kcl-consumer-app:latest
部署消費者應用程序
更新consumer.yaml以包含您剛剛推送到 ECR 的 Docker 映像。清單的其余部分保持不變:
apiVersion: apps/v1kind: Deploymentmetadata: name: kcl-consumerspec: replicas: 1 selector: matchLabels: app: kcl-consumer template: metadata: labels: app: kcl-consumer spec: serviceAccountName: kcl-consumer-app-sa containers: - name: kcl-consumer image: AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/kcl-consumer-app:latest imagePullPolicy: Always env: - name: STREAM_NAME value: kinesis-keda-demo - name: TABLE_NAME value: users - name: APPLICATION_NAME value: kinesis-keda-demo - name: AWS_REGION value: us-east-1 - name: INSTANCE_NAME valueFrom: fieldRef: fieldPath: metadata.name
創(chuàng)建Deployment:
kubectl apply -f consumer.yaml# verify Pod transition to Running statekubectl get pods -w
KCL 應用程序自動縮放在 KEDA 中的應用
現(xiàn)在您已經(jīng)部署了消費者應用程序,KCL庫應該開始行動了。它要做的第一件事是在 DynamoDB 中創(chuàng)建一個“控制表”——這應該與 KCL 應用程序的名稱相同(在本例中為 )kinesis-keda-demo。
進行初始協(xié)調和創(chuàng)建表可能需要幾分鐘時間。您可以檢查消費者應用程序的日志以跟蹤進度。
kubectl logs -f $(kubectl get po -l=app=kcl-consumer --output=jsnotallow={.items..metadata.name})
租約分配完成后,檢查表并記下leaseOwner屬性:
aws dynamodb describe-table --table-name kinesis-keda-demoaws dynamodb scan --table-name kinesis-keda-demo
現(xiàn)在,讓我們使用 AWS CLI 將一些數(shù)據(jù)發(fā)送到 Kinesis 流。
export KINESIS_STREAM=kinesis-keda-demoaws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user1@foo.com --data $(echo -n '{"name":"user1", "city":"new york"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user2@foo.com --data $(echo -n '{"name":"user2", "city":"tel aviv"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user3@foo.com --data $(echo -n '{"name":"user3", "city":"new delhi"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user4@foo.com --data $(echo -n '{"name":"user4", "city":"seattle"}' | base64)
KCL 應用程序將每條記錄保存到目標DynamoDB表(在本例中已命名users)。您可以檢查表格以驗證記錄。
aws dynamodb scan --table-name users
注意到processed_by屬性的值了嗎?它與 KCL 消費者相同Pod。這將使我們更容易驗證端到端的自動縮放過程。
為 Kinesis 創(chuàng)建 KEDA 定標器
這是ScaledObject定義。請注意,它的目標是kcl-consumer Deployment(我們剛剛創(chuàng)建的那個)并且shardCount設置為1:
apiVersion: keda.sh/v1alpha1kind: ScaledObjectmetadata: name: aws-kinesis-stream-scaledobjectspec: scaleTargetRef: name: kcl-consumer triggers: - type: aws-kinesis-stream metadata: # Required streamName: kinesis-keda-demo # Required awsRegion: "us-east-1" shardCount: "1" identityOwner: "operator"
創(chuàng)建KEDAKinesis 縮放器:
kubectl apply -f keda-kinesis-scaler.yaml
驗證 KCL 應用程序自動縮放
Pod我們從我們的一個 KCL 應用程序開始。但是,多虧了KEDA,我們現(xiàn)在應該看到第二次Pod出現(xiàn)了。
kubectl get pods -l=app=kcl-consumer -w# check logs of the new podkubectl logs -f <enter Pod name>
我們的應用程序能夠自動縮放到兩個,因為我們在定義中Pods指定了。這意味著Kinesis 流中的每個分片都會有一個。shardCount: "1"ScaledObjectPod
檢查kinesis-keda-demo控制表DynamoDB:您應該看到leaseOwner.
讓我們向 Kinesis 流發(fā)送更多數(shù)據(jù)。
export KINESIS_STREAM=kinesis-keda-demoaws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user5@foo.com --data $(echo -n '{"name":"user5", "city":"new york"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user6@foo.com --data $(echo -n '{"name":"user6", "city":"tel aviv"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user7@foo.com --data $(echo -n '{"name":"user7", "city":"new delhi"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user8@foo.com --data $(echo -n '{"name":"user8", "city":"seattle"}' | base64)
驗證屬性的值processed_by。由于我們已經(jīng)擴展到兩個Pods,每個記錄的值應該不同,因為每個記錄都Pod將處理來自 Kinesis 流的記錄子集。
增加 Kinesis 流容量
讓我們將分片數(shù)量從兩個擴展到三個,并繼續(xù)監(jiān)控KCL應用程序的自動擴展。
aws kinesis update-shard-count --stream-name kinesis-keda-demo --target-shard-count 3 --scaling-type UNIFORM_SCALING
一旦 Kinesis 重新分片完成,KEDA縮放器將開始行動并將 KCL 應用程序擴展到三個Pods。
kubectl get pods -l=app=kcl-consumer -w
kinesis-keda-demo和之前一樣,在控制表中確認Kinesis shard lease已經(jīng)更新DynamoDB。檢查leaseOwner屬性。
繼續(xù)向 Kinesis 流發(fā)送更多數(shù)據(jù)。正如預期的那樣,Pods 將共享記錄處理,這將反映在表processed_by中的屬性中users。
export KINESIS_STREAM=kinesis-keda-demoaws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user9@foo.com --data $(echo -n '{"name":"user9", "city":"new york"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user10@foo.com --data $(echo -n '{"name":"user10", "city":"tel aviv"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user11@foo.com --data $(echo -n '{"name":"user11", "city":"new delhi"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user12@foo.com --data $(echo -n '{"name":"user12", "city":"seattle"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user14@foo.com --data $(echo -n '{"name":"user14", "city":"tel aviv"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user15@foo.com --data $(echo -n '{"name":"user15", "city":"new delhi"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user16@foo.com --data $(echo -n '{"name":"user16", "city":"seattle"}' | base64)
縮小
到目前為止,我們只在一個方向上進行了擴展。當我們減少 Kinesis 流的分片容量時會發(fā)生什么?親自嘗試一下:將分片數(shù)從三個減少到兩個,看看 KCL 應用程序會發(fā)生什么。
驗證端到端解決方案后,您應該清理資源以避免產(chǎn)生任何額外費用。
刪除資源
刪除 EKS 集群、Kinesis 流和 DynamoDB 表。
eksctl delete cluster --name keda-kinesis-demoaws kinesis delete-stream --stream-name kinesis-keda-demoaws dynamodb delete-table --table-name users
結論
在本文中,您學習了如何使用KEDA自動縮放KCL使用來自 Kinesis 流的數(shù)據(jù)的應用程序。
您可以根據(jù)您的應用要求配置 KEDA 定標器。例如,您可以將Kinesis 流中的每三個分片設置為shardCount一個3。Pod然而,如果你想維護一個一對一的映射,你可以設置為shardCount并1會KCL處理分布式協(xié)調和租約分配,從而確保每個Pod記錄處理器都有一個實例。這是一種有效的方法,可讓您擴展 Kinesis 流處理管道以滿足應用程序的需求。