自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

一口氣搞懂Flink Metrics監(jiān)控指標和性能優(yōu)化,全靠這33張圖和7千字

安全 應用安全
最近在公司做 Flink 推理任務的性能測試,要對 job 的全鏈路吞吐、全鏈路時延、吞吐時延指標進行監(jiān)控和調優(yōu),其中要使用 Flink Metrics 對指標進行監(jiān)控。

[[426743]]

本文轉載自微信公眾號「3分鐘秒懂大數據」,作者在IT中穿梭旅行。轉載本文請聯系3分鐘秒懂大數據公眾號。

前言

大家好,我是土哥。

最近在公司做 Flink 推理任務的性能測試,要對 job 的全鏈路吞吐、全鏈路時延、吞吐時延指標進行監(jiān)控和調優(yōu),其中要使用 Flink Metrics 對指標進行監(jiān)控。

接下來這篇文章,干貨滿滿,我將帶領讀者全面了解 Flink Metrics 指標監(jiān)控,并通過實戰(zhàn)案例,對全鏈路吞吐、全鏈路時延、吞吐時延的指標進行性能優(yōu)化,徹底掌握 Flink Metrics 性能調優(yōu)的方法和 Metrics 的使用。大綱目錄如下:

1 Flink Metrics 簡介

Flink Metrics 是 Flink 集群運行中的各項指標,包含機器系統指標,比如:CPU、內存、線程、JVM、網絡、IO、GC 以及任務運行組件(JM、TM、Slot、作業(yè)、算子)等相關指標。

Flink Metrics 包含兩大作用:

  • 實時采集監(jiān)控數據。在 Flink 的 UI 界面上,用戶可以看到自己提交的任務狀態(tài)、時延、監(jiān)控信息等等。
  • 對外提供數據收集接口。用戶可以將整個 Flink 集群的監(jiān)控數據主動上報至第三方監(jiān)控系統,如:prometheus、grafana 等,下面會介紹。

1.1 Flink Metric Types

Flink 一共提供了四種監(jiān)控指標:分別為 Counter、Gauge、Histogram、Meter。

1. Count 計數器

統計一個 指標的總量。寫過 MapReduce 的開發(fā)人員就應該很熟悉 Counter,其實含義都是一樣的,就是對一個計數器進行累加,即對于多條數據和多兆數據一直往上加的過程。其中 Flink 算子的接收記錄總數 (numRecordsIn) 和發(fā)送記錄總數 (numRecordsOut) 屬于 Counter 類型。

使用方式:可以通過調用 counter(String name)來創(chuàng)建和注冊 MetricGroup

2. Gauge 指標瞬時值

Gauge 是最簡單的 Metrics ,它反映一個指標的瞬時值。比如要看現在 TaskManager 的 JVM heap 內存用了多少,就可以每次實時的暴露一個 Gauge,Gauge 當前的值就是 heap 使用的量。

使用前首先創(chuàng)建一個實現 org.apache.flink.metrics.Gauge 接口的類。返回值的類型沒有限制。您可以通過在 MetricGroup 上調用 gauge。

3. Meter 平均值

用來記錄一個指標在某個時間段內的平均值。Flink 中的指標有 Task 算子中的 numRecordsInPerSecond,記錄此 Task 或者算子每秒接收的記錄數。

使用方式:通過 markEvent() 方法注冊事件的發(fā)生。通過markEvent(long n) 方法注冊同時發(fā)生的多個事件。

4. Histogram 直方圖

Histogram 用于統計一些數據的分布,比如說 Quantile、Mean、StdDev、Max、Min 等,其中最重要一個是統計算子的延遲。此項指標會記錄數據處理的延遲信息,對任務監(jiān)控起到很重要的作用。

使用方式:通過調用 histogram(String name, Histogram histogram) 來注冊一個 MetricGroup。

1.2 Scope

Flink 的指標體系按樹形結構劃分,域相當于樹上的頂點分支,表示指標大的分類。每個指標都會分配一個標識符,該標識符將基于 3 個組件進行匯報:

  • 注冊指標時用戶提供的名稱;
  • 可選的用戶自定義域;
  • 系統提供的域。

例如,如果 A.B 是系統域,C.D 是用戶域,E 是名稱,那么指標的標識符將是 A.B.C.D.E. 你可以通過設置 conf/flink-conf.yam 里面的 metrics.scope.delimiter 參數來配置標識符的分隔符(默認“.”)。

舉例說明:以算子的指標組結構為例,其默認為:

.taskmanager....

算子的輸入記錄數指標為:

hlinkui.taskmanager.1234.wordcount.flatmap.0.numRecordsIn

1.3 Metrics 運行機制

在生產環(huán)境下,為保證對Flink集群和作業(yè)的運行狀態(tài)進行監(jiān)控,Flink 提供兩種集成方式:

1.3.1 主動方式 MetricReport

Flink Metrics 通過在 conf/flink-conf.yaml 中配置一個或者一些 reporters,將指標暴露給一個外部系統.這些 reporters 將在每個 job 和 task manager 啟動時被實例化。

1.3.2 被動方式 RestAPI

通過提供 Rest 接口,被動接收外部系統調用,可以返回集群、組件、作業(yè)、Task、算子的狀態(tài)。Rest API 實現類是 WebMonitorEndpoint

2 Flink Metrics 監(jiān)控系統搭建

Flink 主動方式共提供了 8 種 Report。

我們使用 PrometheusPushGatewayReporter 方式 通過 prometheus + pushgateway + grafana 組件搭建 Flink On Yarn 可視化監(jiān)控。

當 用戶 使用 Flink 通過 session 模式向 yarn 集群提交一個 job 后,Flink 會通過 PrometheusPushGatewayReporter 將 metrics push 到 pushgateway 的 9091 端口上,然后使用外部系統 prometheus 從 pushgateway 進行 pull 操作,將指標采集過來,通過 Grafana可視化工具展示出來。原理圖如下:

首先,我們先在 Flink On Yarn 集群中提交一個 Job 任務,讓其運行起來,然后執(zhí)行下面的操作。

2.1 配置 Reporter

下面所有工具、jar 包已經全部下載好,需要的朋友在公眾號后臺回復:02,可以全部獲取到。

2.1.1 導包

將 flink-metrics-prometheus_2.11-1.13.2.jar 包導入 flink-1.13.2/bin 目錄下。

2.1.2 配置 Reporter

選取 PrometheusPushGatewayReporter 方式,通過在官網查詢 Flink 1.13.2 Metrics 的配置后,在 flink-conf.yaml 設置,配置如下:

  • metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
  • metrics.reporter.promgateway.host: 192.168.244.129
  • metrics.reporter.promgateway.port: 9091
  • metrics.reporter.promgateway.jobName: myJob
  • metrics.reporter.promgateway.randomJobNameSuffix: true
  • metrics.reporter.promgateway.deleteOnShutdown: false
  • metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2
  • metrics.reporter.promgateway.interval: 60 SECONDS

2.2 部署 pushgateway

Pushgateway 是一個獨立的服務,Pushgateway 位于應用程序發(fā)送指標和 Prometheus 服務器之間。

Pushgateway 接收指標,然后將其作為目標被 Prometheus 服務器拉取??梢詫⑵淇醋鞔矸?,或者與 blackbox exporter 的行為相反,它接收度量,而不是探測它們。

2.2.1 解壓 pushgateway

2.2.2. 啟動 pushgateway

進入到 pushgateway-1.4.1 目錄下

  1. ./pushgateway & 

查看是否在后臺啟動成功

  1. ps aux|grep pushgateway 

2.2.3. 登錄 pushgateway webui

2.3 部署 prometheus

Prometheus(普羅米修斯)是一個最初在 SoundCloud 上構建的監(jiān)控系統。自 2012 年成為社區(qū)開源項目,擁有非?;钴S的開發(fā)人員和用戶社區(qū)。為強調開源及獨立維護,Prometheus 于 2016 年加入云原生云計算基金會(CNCF),成為繼Kubernetes 之后的第二個托管項目。

2.3.1 解壓prometheus-2.30.0

2.3.2 編寫配置文件

  1. scrape_configs: 
  2.   - job_name: 'prometheus' 
  3.     static_configs: 
  4.       - targets: ['192.168.244.129:9090'
  5.         labels: 
  6.           instance: 'prometheus' 
  7.   - job_name: 'linux' 
  8.     static_configs: 
  9.       - targets: ['192.168.244.129:9100'
  10.         labels: 
  11.           instance: 'localhost' 
  12.   - job_name: 'pushgateway' 
  13.     static_configs: 
  14.       - targets: ['192.168.244.129:9091'
  15.         labels: 
  16.           instance: 'pushgateway' 

2.3.3 啟動prometheus

  1. ./prometheus --config.file=prometheus.yml & 

啟動完后,可以通過 ps 查看一下端口:

  1. ps aux|grep prometheus 

2.3.4 登錄prometheus webui

2.4 部署 grafana

Grafana 是一個跨平臺的開源的度量分析和可視化工具,可以通過將采集的數據查詢然后可視化的展示,并及時通知。它主要有以下六大特點:

  • 展示方式:快速靈活的客戶端圖表,面板插件有許多不同方式的可視化指標和日志,官方庫中具有豐富的儀表盤插件,比如熱圖、折線圖、圖表等多種展示方式;
  • 數據源:Graphite,InfluxDB,OpenTSDB,Prometheus,Elasticsearch,CloudWatch 和 KairosDB 等;
  • 通知提醒:以可視方式定義最重要指標的警報規(guī)則,Grafana將不斷計算并發(fā)送通知,在數據達到閾值時通過 Slack、PagerDuty 等獲得通知;
  • 混合展示:在同一圖表中混合使用不同的數據源,可以基于每個查詢指定數據源,甚至自定義數據源;
  • 注釋:使用來自不同數據源的豐富事件注釋圖表,將鼠標懸停在事件上會顯示完整的事件元數據和標記;
  • 過濾器:Ad-hoc 過濾器允許動態(tài)創(chuàng)建新的鍵/值過濾器,這些過濾器會自動應用于使用該數據源的所有查詢。

2.4.1 解壓grafana-8.1.5

2.4.2 啟動grafana-8.1.5

  1. ./bin/grafana-server web & 

2.4.3 登錄 grafana

登錄用戶名和密碼都是 admin

grafana 配置中文教程:

https://grafana.com/docs/grafana/latest/datasources/prometheus/

2.4.4 配置數據源、創(chuàng)建系統負載監(jiān)控

要訪問 Prometheus 設置,請將鼠標懸停在配置(齒輪)圖標上,然后單擊數據源,然后單擊 Prometheus 數據源,根據下圖進行操作。

操作完成后,點擊進行驗證。

2.4.5 添加儀表盤

點擊最左側 + 號,選擇 DashBoard,選擇新建一個 pannel。

至此,Flink 的 metrics 的指標展示在 Grafana 中了。

flink 指標對應的指標名比較長,可以在 Legend 中配置顯示內容,在{{key}} 將 key 換成對應需要展示的字段即可,如:{{job_name}},{{operator_name}}。

3 指標性能測試

上述監(jiān)控系統搭建好了之后,我們可以進行性能指標監(jiān)控了?,F在以一個實戰(zhàn)案例進行介紹:

3.1 業(yè)務場景介紹

金融風控場景

3.1.1 業(yè)務需求:

Flink Source 從 data kafka topic 中讀取推理數據,通過 sql 預處理成模型推理要求的數據格式,在進行 keyBy 分組后流入下游 connect 算子,與模型 connect 后進入 Co-FlatMap 算子再進行推理,原理圖如下:

3.1.2 業(yè)務要求:

根據模型的復雜程度,要求推理時延到達 20ms 以內,全鏈路耗時 50ms 以內, 吞吐量達到每秒 1.2w 條以上。

3.1.3 業(yè)務數據:

推理數據:3000w,推理字段 495 個,機器學習 Xgboost 模型字段:495。

3.2 指標解析

由于性能測試要求全鏈路耗時 50ms 以內,應該使用 Flink Metrics 的 Latency Marker 進行計算。

3.2.1 全鏈路時延計算方式 :

全鏈路時延指的是一條推理數據進入 source 算子到數據預處理算子直到最后一個算子輸出結果的耗時,即處理一條數據需要多長時間,包含算子內處理邏輯時間,算子間數據傳遞時間,緩沖區(qū)內等待時間。

全鏈路時延要使用 latency metric 計算。latency metric 是由 source 算子根據當前本地時間生成的一個 marker ,并不參與各個算子的邏輯計算,僅僅跟著數據往下游算子流動,每到達一個算子則算出當前本地時間戳并與 source 生成的時間戳相減,得到 source 算子到當前算子的耗時,當到達 sink 算子或者說最后一個算子時,算出當前本地時間戳與 source 算子生成的時間戳相減,即得到全鏈路時延。原理圖如下:

由于使用到 Lateny marker,所有需要在 flink-conf.yaml 配置參數。

  1. latency.metrics.interval 

系統配置截圖如下:

3.2.2 全鏈路吞吐計算方式 :

全鏈路吞吐 = 單位時間處理數據數量 / 單位時間。

3.3 提交任務到Flink on Yarn集群

**3.3.1 直接提交 Job **

  1. # -m jobmanager 的地址 
  2. # -yjm 1024 指定 jobmanager 的內存信息 
  3. # -ytm 1024 指定 taskmanager 的內存信息 
  4. bin/flink run \ 
  5. -t yarn-per-job -yjm 4096 -ytm  8800 -s 96  \ 
  6. --detached  -c com.threeknowbigdata.datastream.XgboostModelPrediction \ 
  7. examples/batch/WordCount.jar  \ 

提交完成后,我們通過 Flink WEBUI 可以看到 job 運行的任務結果如下:

因為推理模型只是一個 model,存在狀態(tài)中,所以全鏈路吞吐考慮的是每秒有多少條推理數據進入 source 算子到倒數第二個算子(最后一個算子只是指標匯總)流出,這個條數就是全鏈路吞吐。

可以看到在處理 2000W 條數據時,代碼直接統計輸出的數值和 flink webUI 的統計數值基本一致,所以統計數值是可信的。

Flink WEBUI 跑的結果數據

打開 Prometheus 在對話框輸入全鏈路時延計算公式

  1. 計算公式: 
  2. avg(flink_taskmanager_job_latency_source_id_ 
  3. operator_id _operator_subtask_index_latency{ 
  4. source_id="cbc357ccb763df2852fee8c4fc7d55f2"
  5. operator_id="c9c0ca46716e76f6b700eddf4366d243",quantile="0.999"}) 

3.4 優(yōu)化前性能分析

在將任務提交到集群后,經過全鏈路時延計算公式、吞吐時延計算公式,最后得到優(yōu)化前的結果時延指標統計圖如下:

吞吐指標統計圖如下:

通過本次測試完后,從圖中可以發(fā)現:

時延指標:加并行度,吞吐量也跟隨高,但是全鏈路時延大幅增長( 1并行至32并行,時延從 110ms 增加至 3287ms )

這遠遠沒有達到要求的結果。

3.5 問題分析

通過 Prometheus分析后,結果如下:

3.5.1 并行度問題 :

反壓現象:在 Flink WEB-UI 上,可以看到應用存在著非常嚴重的反壓,這說明鏈路中存在較為耗時的算子,阻塞了整個鏈路;

數據處理慢于拉取數據:數據源消費數據的速度,大于下游數據處理速度;

增加計算并行度:所以在接下來的測試中會調大推理算子并行度,相當于提高下游數據處理能力。

3.5.2 Buffer 超時問題 :

Flink 雖是純流式框架,但默認開啟了緩存機制(上游累積部分數據再發(fā)送到下游);

緩存機制可以提高應用的吞吐量,但是也增大了時延;

推理場景:為獲取最好的時延指標,第二輪測試超時時間置 0,記錄吞吐量。

3.5.3 Buffer 數量問題 :

同上,Flink 中的 Buffer 數量是可以配置的;

Buffer 數量越多,能緩存的數據也就越多;

推理場景:為獲取最好的時延指標,第二輪測試:減小 Flink 的 Buffer 數量來優(yōu)化時延指標。

3.5.4 調優(yōu)參數配置

SOURCE 與 COFLATMAP 的并行度按照 1:12 配置;

Buffer 超時時間配置為 0ms (默認100ms);

  1. //在代碼中設置 
  2.  
  3. senv.setBufferTimeout(0); 

Buffer 數量的配置如下:

修改flink-conf.yaml

  1. memory.buffers-per-channel: 2 
  2. memory.float-buffers-per-gate: 2 
  3. memory.max-buffers-per-channel: 2 

配置截圖如下:

3.6 優(yōu)化后性能分析

經過修改配置后,將任務再次提交到集群后,經過全鏈路時延計算公式、吞吐時延計算公式,最后得到優(yōu)化后的結果。

時延指標統計圖如下:

吞吐指標統計圖如下:

優(yōu)化后 LGB 推理測試總結 :

時延指標:并行度提升,時延也會增加,但幅度很小(可接受)。實際上,在測試過程中存在一定反壓,若調大 SOURCE 與 COFLATMAP 的并行度比例,全鏈路時延可進一步降低;吞吐量指標:隨著并行度的增加,吞吐量也隨著提高,當并行度提高至 96 時,吞吐量可以達到 1.3W,此時的時延維持在 50ms 左右(比較穩(wěn)定)。

3.7 優(yōu)化前后 LGB 分析總結

如下圖所示:

3.7.1吞吐量---影響因素:

內存:對吞吐和時延沒什么影響, 并行度與吞吐成正相關。

  • 增大 kafka 分區(qū),吞吐增加
  • 增大 source、維表 source 并行度
  • 增大 flatmap 推理并行度

3.7.2全鏈路時延---影響因素:

  • Buffer 超時越短、個數越少、時延越低。
  • 整個鏈路是否有算子堵塞(車道排隊模型)。
  • 調大推理算子并行度,時延降低,吞吐升高(即增加了推理的處理能力)。

 

責任編輯:武曉燕 來源: 3分鐘秒懂大數據
相關推薦

2020-10-22 12:30:33

MySQL

2021-05-18 09:03:16

Gomapslice

2020-03-31 08:12:25

Kafka架構數據庫

2021-06-08 22:43:07

IPC方式Qt

2021-03-29 12:22:25

微信iOS蘋果

2024-03-26 09:42:27

分片算法應用

2021-12-06 08:30:49

SpringSpring Bean面試題

2019-12-23 14:17:18

紅黑樹二叉數據

2020-04-14 13:32:56

@Transacti失效場景

2023-12-18 23:09:25

開源優(yōu)化引擎

2020-09-24 09:08:04

分布式系統架構

2022-05-24 11:50:46

延時消息分布式

2020-07-08 07:45:44

OAuth2.0授權

2024-01-29 00:29:49

通信技術行業(yè)

2024-04-26 09:40:10

項目精度丟失javascrip

2021-03-01 18:52:39

工具在線瀏覽器

2021-01-04 11:23:21

手機無線電通訊

2020-08-12 09:55:07

附近的人數據庫MySQL

2020-04-16 12:42:42

附近的人共享單車App

2020-10-21 06:39:21

CPU寄存器架構
點贊
收藏

51CTO技術棧公眾號