一口氣搞懂Flink Metrics監(jiān)控指標和性能優(yōu)化,全靠這33張圖和7千字
本文轉載自微信公眾號「3分鐘秒懂大數據」,作者在IT中穿梭旅行。轉載本文請聯系3分鐘秒懂大數據公眾號。
前言
大家好,我是土哥。
最近在公司做 Flink 推理任務的性能測試,要對 job 的全鏈路吞吐、全鏈路時延、吞吐時延指標進行監(jiān)控和調優(yōu),其中要使用 Flink Metrics 對指標進行監(jiān)控。
接下來這篇文章,干貨滿滿,我將帶領讀者全面了解 Flink Metrics 指標監(jiān)控,并通過實戰(zhàn)案例,對全鏈路吞吐、全鏈路時延、吞吐時延的指標進行性能優(yōu)化,徹底掌握 Flink Metrics 性能調優(yōu)的方法和 Metrics 的使用。大綱目錄如下:
1 Flink Metrics 簡介
Flink Metrics 是 Flink 集群運行中的各項指標,包含機器系統指標,比如:CPU、內存、線程、JVM、網絡、IO、GC 以及任務運行組件(JM、TM、Slot、作業(yè)、算子)等相關指標。
Flink Metrics 包含兩大作用:
- 實時采集監(jiān)控數據。在 Flink 的 UI 界面上,用戶可以看到自己提交的任務狀態(tài)、時延、監(jiān)控信息等等。
- 對外提供數據收集接口。用戶可以將整個 Flink 集群的監(jiān)控數據主動上報至第三方監(jiān)控系統,如:prometheus、grafana 等,下面會介紹。
1.1 Flink Metric Types
Flink 一共提供了四種監(jiān)控指標:分別為 Counter、Gauge、Histogram、Meter。
1. Count 計數器
統計一個 指標的總量。寫過 MapReduce 的開發(fā)人員就應該很熟悉 Counter,其實含義都是一樣的,就是對一個計數器進行累加,即對于多條數據和多兆數據一直往上加的過程。其中 Flink 算子的接收記錄總數 (numRecordsIn) 和發(fā)送記錄總數 (numRecordsOut) 屬于 Counter 類型。
使用方式:可以通過調用 counter(String name)來創(chuàng)建和注冊 MetricGroup
2. Gauge 指標瞬時值
Gauge 是最簡單的 Metrics ,它反映一個指標的瞬時值。比如要看現在 TaskManager 的 JVM heap 內存用了多少,就可以每次實時的暴露一個 Gauge,Gauge 當前的值就是 heap 使用的量。
使用前首先創(chuàng)建一個實現 org.apache.flink.metrics.Gauge 接口的類。返回值的類型沒有限制。您可以通過在 MetricGroup 上調用 gauge。
3. Meter 平均值
用來記錄一個指標在某個時間段內的平均值。Flink 中的指標有 Task 算子中的 numRecordsInPerSecond,記錄此 Task 或者算子每秒接收的記錄數。
使用方式:通過 markEvent() 方法注冊事件的發(fā)生。通過markEvent(long n) 方法注冊同時發(fā)生的多個事件。
4. Histogram 直方圖
Histogram 用于統計一些數據的分布,比如說 Quantile、Mean、StdDev、Max、Min 等,其中最重要一個是統計算子的延遲。此項指標會記錄數據處理的延遲信息,對任務監(jiān)控起到很重要的作用。
使用方式:通過調用 histogram(String name, Histogram histogram) 來注冊一個 MetricGroup。
1.2 Scope
Flink 的指標體系按樹形結構劃分,域相當于樹上的頂點分支,表示指標大的分類。每個指標都會分配一個標識符,該標識符將基于 3 個組件進行匯報:
- 注冊指標時用戶提供的名稱;
- 可選的用戶自定義域;
- 系統提供的域。
例如,如果 A.B 是系統域,C.D 是用戶域,E 是名稱,那么指標的標識符將是 A.B.C.D.E. 你可以通過設置 conf/flink-conf.yam 里面的 metrics.scope.delimiter 參數來配置標識符的分隔符(默認“.”)。
舉例說明:以算子的指標組結構為例,其默認為:
算子的輸入記錄數指標為:
hlinkui.taskmanager.1234.wordcount.flatmap.0.numRecordsIn
1.3 Metrics 運行機制
在生產環(huán)境下,為保證對Flink集群和作業(yè)的運行狀態(tài)進行監(jiān)控,Flink 提供兩種集成方式:
1.3.1 主動方式 MetricReport
Flink Metrics 通過在 conf/flink-conf.yaml 中配置一個或者一些 reporters,將指標暴露給一個外部系統.這些 reporters 將在每個 job 和 task manager 啟動時被實例化。
1.3.2 被動方式 RestAPI
通過提供 Rest 接口,被動接收外部系統調用,可以返回集群、組件、作業(yè)、Task、算子的狀態(tài)。Rest API 實現類是 WebMonitorEndpoint
2 Flink Metrics 監(jiān)控系統搭建
Flink 主動方式共提供了 8 種 Report。
我們使用 PrometheusPushGatewayReporter 方式 通過 prometheus + pushgateway + grafana 組件搭建 Flink On Yarn 可視化監(jiān)控。
當 用戶 使用 Flink 通過 session 模式向 yarn 集群提交一個 job 后,Flink 會通過 PrometheusPushGatewayReporter 將 metrics push 到 pushgateway 的 9091 端口上,然后使用外部系統 prometheus 從 pushgateway 進行 pull 操作,將指標采集過來,通過 Grafana可視化工具展示出來。原理圖如下:
首先,我們先在 Flink On Yarn 集群中提交一個 Job 任務,讓其運行起來,然后執(zhí)行下面的操作。
2.1 配置 Reporter
下面所有工具、jar 包已經全部下載好,需要的朋友在公眾號后臺回復:02,可以全部獲取到。
2.1.1 導包
將 flink-metrics-prometheus_2.11-1.13.2.jar 包導入 flink-1.13.2/bin 目錄下。
2.1.2 配置 Reporter
選取 PrometheusPushGatewayReporter 方式,通過在官網查詢 Flink 1.13.2 Metrics 的配置后,在 flink-conf.yaml 設置,配置如下:
- metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
- metrics.reporter.promgateway.host: 192.168.244.129
- metrics.reporter.promgateway.port: 9091
- metrics.reporter.promgateway.jobName: myJob
- metrics.reporter.promgateway.randomJobNameSuffix: true
- metrics.reporter.promgateway.deleteOnShutdown: false
- metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2
- metrics.reporter.promgateway.interval: 60 SECONDS
2.2 部署 pushgateway
Pushgateway 是一個獨立的服務,Pushgateway 位于應用程序發(fā)送指標和 Prometheus 服務器之間。
Pushgateway 接收指標,然后將其作為目標被 Prometheus 服務器拉取??梢詫⑵淇醋鞔矸?,或者與 blackbox exporter 的行為相反,它接收度量,而不是探測它們。
2.2.1 解壓 pushgateway
2.2.2. 啟動 pushgateway
進入到 pushgateway-1.4.1 目錄下
- ./pushgateway &
查看是否在后臺啟動成功
- ps aux|grep pushgateway
2.2.3. 登錄 pushgateway webui
2.3 部署 prometheus
Prometheus(普羅米修斯)是一個最初在 SoundCloud 上構建的監(jiān)控系統。自 2012 年成為社區(qū)開源項目,擁有非?;钴S的開發(fā)人員和用戶社區(qū)。為強調開源及獨立維護,Prometheus 于 2016 年加入云原生云計算基金會(CNCF),成為繼Kubernetes 之后的第二個托管項目。
2.3.1 解壓prometheus-2.30.0
2.3.2 編寫配置文件
- scrape_configs:
- - job_name: 'prometheus'
- static_configs:
- - targets: ['192.168.244.129:9090']
- labels:
- instance: 'prometheus'
- - job_name: 'linux'
- static_configs:
- - targets: ['192.168.244.129:9100']
- labels:
- instance: 'localhost'
- - job_name: 'pushgateway'
- static_configs:
- - targets: ['192.168.244.129:9091']
- labels:
- instance: 'pushgateway'
2.3.3 啟動prometheus
- ./prometheus --config.file=prometheus.yml &
啟動完后,可以通過 ps 查看一下端口:
- ps aux|grep prometheus
2.3.4 登錄prometheus webui
2.4 部署 grafana
Grafana 是一個跨平臺的開源的度量分析和可視化工具,可以通過將采集的數據查詢然后可視化的展示,并及時通知。它主要有以下六大特點:
- 展示方式:快速靈活的客戶端圖表,面板插件有許多不同方式的可視化指標和日志,官方庫中具有豐富的儀表盤插件,比如熱圖、折線圖、圖表等多種展示方式;
- 數據源:Graphite,InfluxDB,OpenTSDB,Prometheus,Elasticsearch,CloudWatch 和 KairosDB 等;
- 通知提醒:以可視方式定義最重要指標的警報規(guī)則,Grafana將不斷計算并發(fā)送通知,在數據達到閾值時通過 Slack、PagerDuty 等獲得通知;
- 混合展示:在同一圖表中混合使用不同的數據源,可以基于每個查詢指定數據源,甚至自定義數據源;
- 注釋:使用來自不同數據源的豐富事件注釋圖表,將鼠標懸停在事件上會顯示完整的事件元數據和標記;
- 過濾器:Ad-hoc 過濾器允許動態(tài)創(chuàng)建新的鍵/值過濾器,這些過濾器會自動應用于使用該數據源的所有查詢。
2.4.1 解壓grafana-8.1.5
2.4.2 啟動grafana-8.1.5
- ./bin/grafana-server web &
2.4.3 登錄 grafana
登錄用戶名和密碼都是 admin
grafana 配置中文教程:
https://grafana.com/docs/grafana/latest/datasources/prometheus/
2.4.4 配置數據源、創(chuàng)建系統負載監(jiān)控
要訪問 Prometheus 設置,請將鼠標懸停在配置(齒輪)圖標上,然后單擊數據源,然后單擊 Prometheus 數據源,根據下圖進行操作。
操作完成后,點擊進行驗證。
2.4.5 添加儀表盤
點擊最左側 + 號,選擇 DashBoard,選擇新建一個 pannel。
至此,Flink 的 metrics 的指標展示在 Grafana 中了。
flink 指標對應的指標名比較長,可以在 Legend 中配置顯示內容,在{{key}} 將 key 換成對應需要展示的字段即可,如:{{job_name}},{{operator_name}}。
3 指標性能測試
上述監(jiān)控系統搭建好了之后,我們可以進行性能指標監(jiān)控了?,F在以一個實戰(zhàn)案例進行介紹:
3.1 業(yè)務場景介紹
金融風控場景
3.1.1 業(yè)務需求:
Flink Source 從 data kafka topic 中讀取推理數據,通過 sql 預處理成模型推理要求的數據格式,在進行 keyBy 分組后流入下游 connect 算子,與模型 connect 后進入 Co-FlatMap 算子再進行推理,原理圖如下:
3.1.2 業(yè)務要求:
根據模型的復雜程度,要求推理時延到達 20ms 以內,全鏈路耗時 50ms 以內, 吞吐量達到每秒 1.2w 條以上。
3.1.3 業(yè)務數據:
推理數據:3000w,推理字段 495 個,機器學習 Xgboost 模型字段:495。
3.2 指標解析
由于性能測試要求全鏈路耗時 50ms 以內,應該使用 Flink Metrics 的 Latency Marker 進行計算。
3.2.1 全鏈路時延計算方式 :
全鏈路時延指的是一條推理數據進入 source 算子到數據預處理算子直到最后一個算子輸出結果的耗時,即處理一條數據需要多長時間,包含算子內處理邏輯時間,算子間數據傳遞時間,緩沖區(qū)內等待時間。
全鏈路時延要使用 latency metric 計算。latency metric 是由 source 算子根據當前本地時間生成的一個 marker ,并不參與各個算子的邏輯計算,僅僅跟著數據往下游算子流動,每到達一個算子則算出當前本地時間戳并與 source 生成的時間戳相減,得到 source 算子到當前算子的耗時,當到達 sink 算子或者說最后一個算子時,算出當前本地時間戳與 source 算子生成的時間戳相減,即得到全鏈路時延。原理圖如下:
由于使用到 Lateny marker,所有需要在 flink-conf.yaml 配置參數。
- latency.metrics.interval
系統配置截圖如下:
3.2.2 全鏈路吞吐計算方式 :
全鏈路吞吐 = 單位時間處理數據數量 / 單位時間。
3.3 提交任務到Flink on Yarn集群
**3.3.1 直接提交 Job **
- # -m jobmanager 的地址
- # -yjm 1024 指定 jobmanager 的內存信息
- # -ytm 1024 指定 taskmanager 的內存信息
- bin/flink run \
- -t yarn-per-job -yjm 4096 -ytm 8800 -s 96 \
- --detached -c com.threeknowbigdata.datastream.XgboostModelPrediction \
- examples/batch/WordCount.jar \
提交完成后,我們通過 Flink WEBUI 可以看到 job 運行的任務結果如下:
因為推理模型只是一個 model,存在狀態(tài)中,所以全鏈路吞吐考慮的是每秒有多少條推理數據進入 source 算子到倒數第二個算子(最后一個算子只是指標匯總)流出,這個條數就是全鏈路吞吐。
可以看到在處理 2000W 條數據時,代碼直接統計輸出的數值和 flink webUI 的統計數值基本一致,所以統計數值是可信的。
Flink WEBUI 跑的結果數據
打開 Prometheus 在對話框輸入全鏈路時延計算公式
- 計算公式:
- avg(flink_taskmanager_job_latency_source_id_
- operator_id _operator_subtask_index_latency{
- source_id="cbc357ccb763df2852fee8c4fc7d55f2",
- operator_id="c9c0ca46716e76f6b700eddf4366d243",quantile="0.999"})
3.4 優(yōu)化前性能分析
在將任務提交到集群后,經過全鏈路時延計算公式、吞吐時延計算公式,最后得到優(yōu)化前的結果時延指標統計圖如下:
吞吐指標統計圖如下:
通過本次測試完后,從圖中可以發(fā)現:
時延指標:加并行度,吞吐量也跟隨高,但是全鏈路時延大幅增長( 1并行至32并行,時延從 110ms 增加至 3287ms )
這遠遠沒有達到要求的結果。
3.5 問題分析
通過 Prometheus分析后,結果如下:
3.5.1 并行度問題 :
反壓現象:在 Flink WEB-UI 上,可以看到應用存在著非常嚴重的反壓,這說明鏈路中存在較為耗時的算子,阻塞了整個鏈路;
數據處理慢于拉取數據:數據源消費數據的速度,大于下游數據處理速度;
增加計算并行度:所以在接下來的測試中會調大推理算子并行度,相當于提高下游數據處理能力。
3.5.2 Buffer 超時問題 :
Flink 雖是純流式框架,但默認開啟了緩存機制(上游累積部分數據再發(fā)送到下游);
緩存機制可以提高應用的吞吐量,但是也增大了時延;
推理場景:為獲取最好的時延指標,第二輪測試超時時間置 0,記錄吞吐量。
3.5.3 Buffer 數量問題 :
同上,Flink 中的 Buffer 數量是可以配置的;
Buffer 數量越多,能緩存的數據也就越多;
推理場景:為獲取最好的時延指標,第二輪測試:減小 Flink 的 Buffer 數量來優(yōu)化時延指標。
3.5.4 調優(yōu)參數配置
SOURCE 與 COFLATMAP 的并行度按照 1:12 配置;
Buffer 超時時間配置為 0ms (默認100ms);
- //在代碼中設置
- senv.setBufferTimeout(0);
Buffer 數量的配置如下:
修改flink-conf.yaml
- memory.buffers-per-channel: 2
- memory.float-buffers-per-gate: 2
- memory.max-buffers-per-channel: 2
配置截圖如下:
3.6 優(yōu)化后性能分析
經過修改配置后,將任務再次提交到集群后,經過全鏈路時延計算公式、吞吐時延計算公式,最后得到優(yōu)化后的結果。
時延指標統計圖如下:
吞吐指標統計圖如下:
優(yōu)化后 LGB 推理測試總結 :
時延指標:并行度提升,時延也會增加,但幅度很小(可接受)。實際上,在測試過程中存在一定反壓,若調大 SOURCE 與 COFLATMAP 的并行度比例,全鏈路時延可進一步降低;吞吐量指標:隨著并行度的增加,吞吐量也隨著提高,當并行度提高至 96 時,吞吐量可以達到 1.3W,此時的時延維持在 50ms 左右(比較穩(wěn)定)。
3.7 優(yōu)化前后 LGB 分析總結
如下圖所示:
3.7.1吞吐量---影響因素:
內存:對吞吐和時延沒什么影響, 并行度與吞吐成正相關。
- 增大 kafka 分區(qū),吞吐增加
- 增大 source、維表 source 并行度
- 增大 flatmap 推理并行度
3.7.2全鏈路時延---影響因素:
- Buffer 超時越短、個數越少、時延越低。
- 整個鏈路是否有算子堵塞(車道排隊模型)。
- 調大推理算子并行度,時延降低,吞吐升高(即增加了推理的處理能力)。