自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

實戰(zhàn):如何優(yōu)雅的從 Skywalking 切換到 OpenTelemetry

開發(fā) 前端
OpenTelemetry 是可觀測系統(tǒng)的新標準,基于它可以兼容以前使用的 Prometheus、 victoriametrics、skywalking 等系統(tǒng),同時還可以靈活擴展,不用與任何但一生態(tài)或技術棧進行綁定。

背景

最近公司將我們之前使用的鏈路工具切換為了 OpenTelemetry。

我們的技術棧是:

OTLP                               
Client──────────?Collect────────?StartRocks
(Agent)                               ▲    
                                      │    
                                      │    
                                   Jaeger

其中客戶端使用 OpenTelemetry 提供的 Java Agent 進行埋點收集數(shù)據(jù),再由 Agent 通過 OTLP(OpenTelemetry Protocol) 協(xié)議將數(shù)據(jù)發(fā)往 Collector,在 Collector 中我們可以自行任意處理數(shù)據(jù),并決定將這些數(shù)據(jù)如何存儲(這點在以往的 SkyWalking 體系中是很難自定義的)

這里我們將數(shù)據(jù)寫入 StartRocks 中,供之后的 UI 層進行查看。

OpenTelemetry 是可觀測系統(tǒng)的新標準,基于它可以兼容以前使用的 Prometheus、 victoriametrics、skywalking 等系統(tǒng),同時還可以靈活擴展,不用與任何但一生態(tài)或技術棧進行綁定。更多關于 OTel 的內(nèi)容會在今后介紹。

難點

其中有一個關鍵問題就是:如何在線上進行無縫切換。

雖然我們內(nèi)部的發(fā)布系統(tǒng)已經(jīng)支持重新發(fā)布后就會切換到新的鏈路,也可以讓業(yè)務自行發(fā)布然后逐步的切換到新的系統(tǒng),這樣也是最保險的方式。

但這樣會有幾個問題:

  • 當存在調(diào)用依賴的系統(tǒng)沒有全部切換為新鏈路時,再查詢的時候就會出現(xiàn)斷層,整個鏈路無法全部串聯(lián)起來。
  • 業(yè)務團隊沒有足夠的動力去推動發(fā)布,可能切換的周期較長。

所以最好的方式還是由我們在后臺統(tǒng)一發(fā)布,對外沒有任何感知就可以一鍵全部切換為 OpenTelemetry。

仔細一看貌似也沒什么難的,無非就是模擬用戶點擊發(fā)布按鈕而已。

但這事由我們自動來做就不一樣了,用戶點擊發(fā)布的時候會選擇他們認為可以發(fā)布的分支進行發(fā)布,我們不能自作主張的比如選擇 main 分支,有可能只是合并了但還不具備發(fā)布條件。

所以保險的方式還是得用當前項目上一次發(fā)布時所使用的 git hash 值重新打包發(fā)布。

但這也有幾個問題:

  • 重復打包發(fā)布太慢了,線上幾十上百個項目,每打包發(fā)布一次就得幾分鐘,雖然可以并發(fā),但考慮到 kubernetes 的壓力也不能調(diào)的太高。
  • 保不準業(yè)務鏡像中有單獨加入一些環(huán)境變量,這樣打包可能會漏。

切換方案

所以思來想去最保險的方法還是將業(yè)務鏡像拉取下來,然后手動刪除鏡像中的 skywalking 包以及 JVM 參數(shù),全部替換為 OpenTelemetry 的包和 JVM 參數(shù)。

整體的方案如下:

  • 遍歷 namespace 的 pod >0 的 deployment。
  • 遍歷 deployment 中的所有 container,獲得業(yè)務鏡像。
  1. 跳過 istio 和日志采集 container,獲取到業(yè)務容器。
  2. 判斷該容器是否需要替換,其實就是判斷環(huán)境變量中是否有 skywalking ,如果有就需要替換。
  3. 獲取業(yè)務容器的鏡像。
  • 基于該 Image 重新構建一個 OpenTelemetry 的鏡像 3.1 新的鏡像包含新的啟動腳本. 3.1.1 新的啟動腳本中會刪除原有的 skywalking agent 3.2 新鏡像會包含 OpenTelemetry 的 jar 包以及我們自定義的 OTel 擴展包 3.3 替換啟動命令為新的啟動腳本。
  • 修改 deployment 中的 JVM 啟動參數(shù)。
  • 修改 deployment 的鏡像后滾動更新。
  • 開啟一個 goroutine 定時檢測更新之后是否啟動成功。
  • 如果長時間 (比如五分鐘) 都沒有啟動成功,則執(zhí)行回滾流程。

具體代碼

因為需要涉及到操作 kubernetes,所以整體就使用 Golang 實現(xiàn)了。

遍歷 deployment 得到需要替換的容器鏡像

func ProcessDeployment(ctx context.Context, finish []string, deployment v1.Deployment, clientSet kubernetes.Interface) error {
 deploymentName := deployment.Name
 for _, s := range finish {
  if s == deploymentName {
   klog.Infof("Skip finish deployment:%s", deploymentName)
   return nil
  }
 }
 // Write finish deployment name to a file
 defer writeDeploymentName2File(deploymentName, fmt.Sprintf("finish-%s.log", deployment.Namespace))

 appName := deployment.GetObjectMeta().GetLabels()["appName"]
 klog.Infof("Begin to process deployment:%s, appName:%s", deploymentName, appName)

 upgrade, err := checkContainIstio(ctx, deployment, clientSet)
 if err != nil {
  return err
 }
 if upgrade == false {
  klog.Infof("Don't have istio, No need to upgrade deployment:%s appName:%s", deploymentName, appName)
  return nil
 }

 for i, container := range deployment.Spec.Template.Spec.Containers {
  if strings.HasPrefix(deploymentName, container.Name) {

   // Check if container has sw jvm
   for _, envVar := range container.Env {
    if envVar.Name == "CATALINA_OPTS" {
     if !strings.Contains(envVar.Value, "skywalking") {
      klog.Infof("Skip upgrade don't have sw jvm deployment:%s container:%s", deploymentName, container.Name)
      return nil
     }
    }
   }
   upgrade(container)

   // Check newDeployment status
   go checkNewDeploymentStatus(ctx, clientSet, newDeployment)

   // delete from image
   deleteImage(container.Image)

  }
 }

 return nil
}

這個函數(shù)需要傳入一個 deployment ,同時還有一個已經(jīng)完成了的列表進來。

已完成列表用于多次運行的時候可以快速跳過已經(jīng)執(zhí)行的 deployment。

checkContainIstio() 函數(shù)很簡單,判斷是否包含了 Istio 容器,如果沒有包含說明不是后端應用(可能是前端、大數(shù)據(jù)之類的任務),就可以直接跳過了。

而判斷是否需要替換的前提這事判斷環(huán)境變量 CATALINA_OPTS 中是否包含了 skywalking 的內(nèi)容,如果包含則說明需要進行替換。

Upgrade 核心函數(shù)

func upgrade(container Container){
 klog.Infof("Begin to upgrade deployment:%s container:%s", deploymentName, container.Name)
 newImageName := fmt.Sprintf("%s-otel-%s", container.Image, generateRandomString(4))
 err := BuildNewOtelImage(container.Image, newImageName)
 if err != nil {
  return err
 }

 // Update deployment jvm ENV
 for e, envVar := range container.Env {
  if envVar.Name == "CATALINA_OPTS" {
   otelJVM := replaceSWAgent2OTel(envVar.Value, appName)
   deployment.Spec.Template.Spec.Containers[i].Env[e].Value = otelJVM
  }
 }
 // Update deployment image
 deployment.Spec.Template.Spec.Containers[i].Image = newImageName

 newDeployment, err := clientSet.AppsV1().Deployments(deployment.Namespace).Update(ctx, &deployment, metav1.UpdateOptions{})
 if err != nil {
  return err
 }
 klog.Infof("Finish upgrade deployment:%s container:%s", deploymentName, container.Name)
}

這里一共分為以下幾部:

  • 基于老鏡像構建新鏡像。
  • 更新原有的 CATALINA_OPTS 環(huán)境變量,也就是替換 skywalking 的參數(shù)。
  • 更新 deployment 鏡像,觸發(fā)滾動更新。

構建新鏡像

dockerfile = fmt.Sprintf(`FROM %s
COPY %s /home/admin/%s
COPY otel.tar.gz /home/admin/otel.tar.gz
RUN tar -zxvf /home/admin/otel.tar.gz -C /home/admin
RUN rm -rf /home/admin/skywalking-agent
ENTRYPOINT ["/bin/sh", "/home/admin/start.sh"]
`, fromImage, script, script)

 idx := strings.LastIndex(newImageName, "/") + 1
 dockerFileName := newImageName[idx:]
 create, err := os.Create(fmt.Sprintf("Dockerfile-%s", dockerFileName))
 if err != nil {
  return err
 }
 defer func() {
  create.Close()
  os.Remove(create.Name())
 }()
 _, err = create.WriteString(dockerfile)
 if err != nil {
  return err
 }

 cmd := exec.Command("docker", "build", ".", "-f", create.Name(), "-t", newImageName)
 cmd.Stdin = strings.NewReader(dockerfile)
 if err := cmd.Run(); err != nil {
  return err
 }

其實這里的重點就是構建這個新鏡像,從這個 dockerfile 中也能看出具體的邏輯,也就是上文提到的刪除原有的 skywalking 資源同時將新的 OpenTelemetry 資源打包進去。

最后再將這個鏡像上傳到私服。

其中的替換 JVM 參數(shù)也比較簡單,直接刪除 skywalking 的內(nèi)容,然后再追加上 OpenTelemetry 需要的參數(shù)即可。

定時檢測替換是否成功

func checkNewDeploymentStatus(ctx context.Context, clientSet kubernetes.Interface, newDeployment *v1.Deployment) error {
 ready := true
 tick := time.Tick(10 * time.Second)
 for i := 0; i < 30; i++ {
  <-tick
  originPodList, err := clientSet.CoreV1().Pods(newDeployment.Namespace).List(ctx, metav1.ListOptions{
   LabelSelector: metav1.FormatLabelSelector(&metav1.LabelSelector{
    MatchLabels: newDeployment.Spec.Selector.MatchLabels,
   }),
  })
  if err != nil {
   return err
  }

  // Check if there are any Pods
  if len(originPodList.Items) == 0 {
   klog.Infof("No Pod in deployment:%s, Skip", newDeployment.Name)
  }
  for _, item := range originPodList.Items {
   // Check Pod running
   for _, status := range item.Status.ContainerStatuses {
    if status.RestartCount > 0 {
     ready = false
     break
    }
   }
  }
  klog.Infof("Check deployment:%s namespace:%s status:%t", newDeployment.Name, newDeployment.Namespace, ready)
  if ready == false {
   break
  }
 }

 if ready == false {
  // rollback
  klog.Infof("=======Rollback deployment:%s namespace:%s", newDeployment.Name, newDeployment.Namespace)
  writeDeploymentName2File(newDeployment.Name, fmt.Sprintf("rollback-%s.log", newDeployment.Namespace))
 }

 return nil
}

這里會啟動一個 10s 執(zhí)行一次的定時任務,每次都會檢測是否有容器發(fā)生了重啟(正常情況下是不會出現(xiàn)重啟的)

如果檢測了 30 次都沒有重啟的容器,那就說明本次替換成功了,不然就記錄一個日志文件,然后人工處理。

這種通常是原有的鏡像與 OpenTelemetry 不兼容,比如里面寫死了一些 skywalking 的 API,導致啟動失敗。

所以替換任務跑完之后我還會檢測這個 rollback-$namespace 的日志文件,人工處理這些失敗的應用。

分批處理 deployment

最后講講如何單個調(diào)用剛才的 ProcessDeployment() 函數(shù)。

考慮到不能對 kubernetes 產(chǎn)生影響,所以我們需要限制并發(fā)處理 deployment 的數(shù)量(我這里的限制是 10 個)。

所以就得分批進行替換,每次替換 10 個,而且其中有一個執(zhí)行失敗就得暫停后續(xù)任務,由人工檢測失敗原因再決定是否繼續(xù)處理。

畢竟處理的是線上應用,需要小心謹慎。

所以觸發(fā)的代碼如下:

func ProcessDeploymentList(ctx context.Context, data []v1.Deployment, clientSet kubernetes.Interface) error {
 file, err := os.ReadFile(fmt.Sprintf("finish-%s.log", data[0].Namespace))
 if err != nil {
  return err
 }
 split := strings.Split(string(file), "\n")

 batchSize := 10
 start := 0

 for start < len(data) {

  end := start + batchSize
  if end > len(data) {
   end = len(data)
  }

  batch := data[start:end]

  //等待goroutine結束
  var wg sync.WaitGroup
  klog.Infof("Start process batch size %d", len(batch))

  errs := make(chan error, len(batch))

  wg.Add(len(batch))
  for _, item := range batch {
   d := item
   go func() {
    defer wg.Done()
    if err := ProcessDeployment(ctx, split, d, clientSet); err != nil {
     klog.Errorf("!!!Process deployment name:%s error: %v", d.Name, err)
     errs <- err
     return
    }
   }()
  }

  go func() {
   wg.Wait()
   close(errs)
  }()

  //任何一個失敗就返回
  for err := range errs {
   if err != nil {
    return err
   }
  }

  start = end
  klog.Infof("Deal next batch")
 }

 return nil

}

使用 WaitGroup 來控制一組任務,使用一個 chan 來傳遞異常;這類分批處理的代碼在一些批處理框架中還蠻常見的。

總結

最后只需要查詢某個 namespace 下的所有 deployment 列表傳入這個批處理函數(shù)即可。

不過整個過程中還是有幾個點需要注意:

  • 因為需要替換鏡像的前提是要把現(xiàn)有的鏡像拉取到本地,所以跑這個任務的客戶端需要有充足的磁盤,同時和鏡像服務器的網(wǎng)絡條件較好。
  • 不然執(zhí)行的過程會比較慢,同時磁盤占用滿了也會影響任務。

其實這個功能依然有提升空間,考慮到后續(xù)會升級 OpenTelemetry  agent 的版本,甚至也需要增減一些 JVM 參數(shù)。

所以最后有一個統(tǒng)一的工具,可以直接升級 Agent,而不是每次我都需要修改這里的代碼。

后來在網(wǎng)上看到了得物的相關分享,他們可以遠程加載配置來解決這個問題。

這也是一種解決方案,直到我們看到了 OpenTelemetry 社區(qū)提供了 Operator,其中也包含了注入 agent 的功能。

apiVersion: opentelemetry.io/v1alpha1  
kind: Instrumentation  
metadata:  
  name: my-instrumentation  
spec:  
  exporter:  
    endpoint: http://otel-collector:4317  
  propagators:  
    - tracecontext  
    - baggage  
    - b3  
  sampler:  
    type: parentbased_traceidratio  
    argument: "0.25"  
  java:  
    image: private/autoinstrumentation-java:1.32.0-1

我們可以使用他提供的 CRD 來配置我們 agent,只要維護好自己的鏡像就好了。

使用起來也很簡單,只要安裝好了 OpenTelemetry-operator ,然后再需要注入 Java Agent 的 Pod 中使用注解:

instrumentation.opentelemetry.io/inject-java: "true"

operator 就會自動從剛才我們配置的鏡像中讀取 agent,然后復制到我們的業(yè)務容器。

再配置上環(huán)境變量 $JAVA_TOOL_OPTIONS=/otel/javaagent.java, 這是一個 Java 內(nèi)置的環(huán)境變量,應用啟動的時候會自動識別,這樣就可以自動注入 agent 了。

envJavaToolsOptions   = "JAVA_TOOL_OPTIONS"

// set env value
idx := getIndexOfEnv(container.Env, envJavaToolsOptions)  
if idx == -1 {  
    container.Env = append(container.Env, corev1.EnvVar{  
       Name:  envJavaToolsOptions,  
       Value: javaJVMArgument,  
    })} else {  
    container.Env[idx].Value = container.Env[idx].Value + javaJVMArgument  
}

// copy javaagent.jar
pod.Spec.InitContainers = append(pod.Spec.InitContainers, corev1.Container{  
    Name:      javaInitContainerName,  
    Image:     javaSpec.Image,  
    Command:   []string{"cp", "/javaagent.jar", javaInstrMountPath + "/javaagent.jar"},  
    Resources: javaSpec.Resources,  
    VolumeMounts: []corev1.VolumeMount{{  
       Name:      javaVolumeName,  
       MountPath: javaInstrMountPath,  
    }},})

大致的運行原理是當有 Pod 的事件發(fā)生了變化(重啟、重新部署等),operator 就會檢測到變化,此時會判斷是否開啟了剛才的注解:

instrumentation.opentelemetry.io/inject-java: "true"

接著會寫入環(huán)境變量 JAVA_TOOL_OPTIONS,同時將 jar 包從 InitContainers 中復制到業(yè)務容器中。

這里使用到了 kubernetes 的初始化容器,該容器是用于做一些準備工作的,比如依賴安裝、配置檢測或者是等待其他一些組件啟動成功后再啟動業(yè)務容器。

目前這個 operator 還處于使用階段,同時部分功能還不滿足(比如支持自定義擴展),今后有時間也可以分析下它的運行原理。

參考鏈接:

  • https://xie.infoq.cn/article/e6def1e245e9d67735bd00dd5。
  • https://github.com/open-telemetry/opentelemetry-operator/#opentelemetry-auto-instrumentation-injection。
責任編輯:姜華 來源: crossoverJie
相關推薦

2020-02-11 15:50:51

WindowsLinux命令行

2020-04-17 14:37:19

WindowsLinux微軟

2022-09-29 09:58:30

Colima開源

2021-08-06 15:15:09

Windows 11Dev頻道Beta頻道

2024-08-28 08:09:13

contextmetrics類型

2023-02-08 13:01:20

Debian測試版

2022-07-27 07:24:32

Debian系統(tǒng)

2023-04-20 16:48:22

PandasPolarsPython

2024-04-16 08:09:36

JavapulsarAPI

2020-06-28 16:07:03

HomebrewMacLinux

2024-08-21 08:09:17

2024-09-04 08:09:51

2019-12-02 10:50:30

Python 2Python 3編程語言

2010-11-26 15:56:23

mysql環(huán)境變量

2009-12-03 10:05:26

Ubuntu超級用戶

2009-06-23 18:19:54

NetBeans英文界面

2010-05-24 09:41:31

2020-07-03 07:54:13

MacLinux操作系統(tǒng)

2021-11-22 09:56:13

FedoraLinux

2013-06-18 09:53:50

Git開源
點贊
收藏

51CTO技術棧公眾號