自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

六張圖告訴你 Kafka 是怎樣做數(shù)據(jù)采集和統(tǒng)計的!

開發(fā) 新聞
如果讓你統(tǒng)計前一分鐘內(nèi)的流速,你會怎么統(tǒng)計才能夠讓數(shù)字更加精確呢?

大家好,我是君哥。

在講解 Kafka的副本同步限流機制三部曲(源碼篇) 第二篇(原理篇) 之前

我想先講解一下 Kafka中的數(shù)據(jù)采集和統(tǒng)計機制 當你了解這個機制之后才會更容易理解限流機制 圖片

你會不會好奇,kafka監(jiān)控中,那些數(shù)據(jù)都是怎么計算出來的 比如下圖這些指標

圖片

LogiKM監(jiān)控圖

這些數(shù)據(jù)都是通過Jmx獲取的kafka監(jiān)控指標, 那么我們今天來探討一下,這些指標都是怎么被計算出來的

在開始分析之前,我們可以 自己思考一下

如果讓你統(tǒng)計前一分鐘內(nèi)的流速,你會怎么統(tǒng)計才能夠讓數(shù)字更加精確呢?

我相信你腦海中肯定出現(xiàn)了一個詞:滑動窗口

在kafka的數(shù)據(jù)采樣和統(tǒng)計中,也是用了這個方法, 通過多個樣本Sample進行采樣,并合并統(tǒng)計

當然這一個過程少不了滑動窗口的影子

采集和統(tǒng)計類圖

我們先看下整個Kafka的數(shù)據(jù)采集和統(tǒng)計機制的類圖

圖片

數(shù)據(jù)采集和統(tǒng)計全類圖

看著整個類圖好像很復雜,但是最核心的就是兩個Interface接口

Measurable: 可測量的、可統(tǒng)計的 Interface。這個Interface 有一個方法, 專門用來計算需要被統(tǒng)計的值的

/**
* 測量這個數(shù)量并將結果作為雙精度返回
* 參數(shù):
* config – 此指標的配置
* now – 進行測量的 POSIX 時間(以毫秒為單位)
* 返回:
* 測量值
*/
double measure(MetricConfig config, long now);

比如說返回 近一分鐘的bytesIn

Stat: 記錄數(shù)據(jù), 上面的是統(tǒng)計,但是統(tǒng)計需要數(shù)據(jù)來支撐, 這個Interface就是用來做記錄的,這個Interface有一個方法

/**
* 記錄給定的值
* 參數(shù):
* config – 用于該指標的配置
* value – 要記錄的值
* timeMs – 此值發(fā)生的 POSIX 時間(以毫秒為單位)
*/
void record(MetricConfig config, double value, long timeMs);

有了這兩個接口,就基本上可以記錄數(shù)據(jù)和數(shù)據(jù)統(tǒng)計了

當然這兩個接口都有一個 MetricConfig 對象

圖片

MetricConfig

這是一個統(tǒng)計配置類, 主要是定義采樣的樣本數(shù)、單個樣本的時間窗口大小、單個樣本的事件窗口大小、限流機制有了這樣一個配置了,就可以自由定義時間窗口的大小,和采樣的樣本數(shù)之類的影響最終數(shù)據(jù)精度的變量。

這里我需要對兩個參數(shù)重點說明一下

單個樣本的時間窗口大小: 當前記錄時間 - 當前樣本的開始時間 >= 此值  則需要使用下一個樣本。單個樣本的事件窗口大小: 當前樣本窗口時間次數(shù) >= 此值  則需要使用下一個樣本

在整個統(tǒng)計中,不一定是按照時間窗口來統(tǒng)計的, 也可以按照事件窗口來統(tǒng)計, 具體按照不同需求選擇配置

好了,大家腦海里面已經(jīng)有了最基本的概念了,我們接下來就以一個kafka內(nèi)部經(jīng)常使用的 SampledStat 記錄和統(tǒng)計的抽象類來好好的深入分析理解一下。

SampledStat 樣本記錄統(tǒng)計抽象類

這個記錄統(tǒng)計抽象類,是按照采樣的形式來計算的。里面使用了一個或者多個樣本進行采樣統(tǒng)計 List<Sample> samples; 當前使用的樣本: current樣本初始化的值: initialValue

SampledStat : 實現(xiàn)了MeasurableStat 的抽象類,說明它又能采集記錄數(shù)據(jù),又能統(tǒng)計分析數(shù)據(jù)

當然它自身也定義了有兩個抽象方法

/** 更新具體樣本的數(shù)值 (單個樣本)**/
protected abstract void update(Sample sample, MetricConfig config, double value, long timeMs);


/**組合所有樣本的數(shù)據(jù) 來統(tǒng)計出想要的數(shù)據(jù) **/
public abstract double combine(List<Sample> samples, MetricConfig config, long now);

SampledStat圖形化展示

如上圖所示, 是一個SampledStat 的圖形化展示, 其中定義了 若干個樣本 Sample

記錄數(shù)據(jù)

@Override
public void record(MetricConfig config, double value, long timeMs) {
Sample sample = current(timeMs);
if (sample.isComplete(timeMs, config))
sample = advance(config, timeMs);
update(sample, config, value, timeMs);
sample.eventCount += 1;
}
  1. 獲取當前的Sample號,如果沒有則創(chuàng)建一個新的Sample,  創(chuàng)建的時候設置 初始化值 和 Sample起始時間(當前時間) ,并保存到樣品列表里面
  2. 判斷這個Sample是否完成(超過窗口期),判斷的邏輯是 當前時間 - 當前Sample的開始時間 >= 配置的時間窗口值 或者 事件總數(shù) >= 配置的事件窗口值
/** 當前時間 - 當前Sample的開始時間 >= 配置的時間窗口值 或者  事件總數(shù) >= 配置的事件窗口值 **/
public boolean isComplete(long timeMs, MetricConfig config) {
return timeMs - lastWindowMs >= config.timeWindowMs() || eventCount >= config.eventWindow();
}

3.如果這個Sample已經(jīng)完成(超過窗口期), 則開始選擇下一個窗口,如果下一個還沒創(chuàng)建則創(chuàng)建新的,如果下一個已經(jīng)存在,則重置這個Sample

4.拿到最終要使用的Sample后, 將數(shù)據(jù)記錄到這個Sample中。具體怎么記錄是讓具體的實現(xiàn)類來實現(xiàn)的,因為想要最終統(tǒng)計的數(shù)據(jù)可以不一樣,比如你只想記錄Sample中的最大值,那么更新的時候判斷是不是比之前的值大則更新,如果你想統(tǒng)計平均值,那么這里就讓單個Sample中所有的值累加(最終會 除以 Sample數(shù)量 求平均數(shù)的)

5.記錄事件次數(shù)+1。

記錄數(shù)據(jù)的展示圖

統(tǒng)計數(shù)據(jù)

/** 測量  統(tǒng)計 數(shù)據(jù)**/
@Override
public double measure(MetricConfig config, long now) {
// 重置過期樣本
purgeObsoleteSamples(config, now);
// 組合所有樣本數(shù)據(jù),并展示最終統(tǒng)計數(shù)據(jù),具體實現(xiàn)類來實現(xiàn)該方法
return combine(this.samples, config, now);
}

先重置 過期樣本 , 過期樣本的意思是:當前時間 - 每個樣本的起始事件 > 樣本數(shù)量 * 每個樣本的窗口時間 ; 就是滑動窗口的概念,只統(tǒng)計這個滑動窗口的樣本數(shù)據(jù), 過期的樣本數(shù)據(jù)會被重置(過期數(shù)據(jù)不采納), 如下圖所示

滑動窗口重置過期數(shù)據(jù)

組合所有樣本數(shù)據(jù)并進行不同維度的統(tǒng)計并返回數(shù)值, 因為不同場景想要得到的數(shù)據(jù)不同,所以這個只是一個抽象方法,需要實現(xiàn)類來實現(xiàn)這個計算邏輯,比如如果是計算平均值 Avg, 它的計算邏輯就是把所有的樣本數(shù)據(jù)值累加并除以累積的次數(shù)

那我們再來看看不同的統(tǒng)計實現(xiàn)類

Avg 計算平均值

一個簡單的SampledStat實現(xiàn)類 它統(tǒng)計所有樣本最終的平均值 每個樣本都會累加每一次的記錄值, 最后把所有樣本數(shù)據(jù)疊加 / 總共記錄的次數(shù)

圖片

在這里插入圖片描述

Max 計算最大值

每個樣本都保存這個樣本的最大值, 然后最后再對比所有樣本值的最大值

圖片

在這里插入圖片描述

WindowedSum 所有樣本窗口總和值

每個樣本累積每一次的記錄值, 統(tǒng)計的時候 把所有樣本的累計值 再累積返回

圖片

在這里插入圖片描述

Rate 樣本記錄統(tǒng)計求速率

Rate 也是實現(xiàn)了 MeasurableStat接口的,說明 它也有 記錄record 和 統(tǒng)計 measure 的方法, 實際上這個類 是一個組合類 ,里面組合了 SampledStat 和TimeUnit unit ,這不是很明顯了么, SampledStat負責記錄和統(tǒng)計, 得到的數(shù)據(jù) 跟時間TimeUnit做一下處理就得出來速率了, 比如SampledStat的實現(xiàn)類AVG可以算出來 被統(tǒng)計的 評價值, 但是如果我們再除以 一個時間維度, 是不是就可以得出 平均速率 了

如何計算統(tǒng)計的有效時間呢

這個有效時間 的計算會影響著最終速率的結果

public long windowSize(MetricConfig config, long now) {
// 將過期的樣本給重置掉
stat.purgeObsoleteSamples(config, now);
// 總共運行的時候 = 當前時間 - 最早的樣本的開始時間
long totalElapsedTimeMs = now - stat.oldest(now).lastWindowMs;
// 總時間/單個創(chuàng)建時間 = 多少個完整的窗口時間
int numFullWindows = (int) (totalElapsedTimeMs / config.timeWindowMs());
int minFullWindows = config.samples() - 1;
// If the available windows are less than the minimum required, add the difference to the totalElapsedTime
if (numFullWindows < minFullWindows)
totalElapsedTimeMs += (minFullWindows - numFullWindows) * config.timeWindowMs();
return totalElapsedTimeMs;
}

這是Rate的有效時間的計算邏輯,當然Rate 還有一個子類是 SampleRate

圖片

SampleRate的窗口Size計算邏輯

這個子類,將 有效時間的計算邏輯改的更簡單, 如果運行時間<一個樣本窗口的時間 則他的運行時間就是單個樣本的窗口時間, 否則就直接用這個運行的時間, 這個計算邏輯更簡單 它跟Rate的區(qū)別就是, 不考慮采樣的時間是否足夠多,我們用圖來簡單描述一下

SampleRate

圖片

SampleRate 速率邏輯

Rate

圖片

Rate 速率邏輯

Meter 包含速率和累積總指標的復合統(tǒng)計數(shù)據(jù)

這是一個CompoundStat的實現(xiàn)類, 說明它是一個復合統(tǒng)計, 可以統(tǒng)計很多指標在這里面 它包含速率指標和累積總指標的復合統(tǒng)計數(shù)據(jù)

底層實現(xiàn)的邏輯還是上面講解過的

副本Fetch流量的速率統(tǒng)計 案例分析

我們知道 在分區(qū)副本重分配過程中,有一個限流機制,就是指定某個限流值,副本同步過程不能超過這個閾值。做限流,那么肯定首先就需要統(tǒng)計 副本同步 的流速;那么上面我們講了這么多,你應該很容易能夠想到如果統(tǒng)計了吧?流速  bytes/s , 統(tǒng)計一秒鐘同步了多少流量, 那么我們可以把樣本窗口設置為 1s,然后多設置幾個樣本窗口求平均值。

接下來我們看看 Kafka是怎么統(tǒng)計的, 首先找到記錄 Follower Fetch 副本流量的地方如下

ReplicaFetcherThread#processPartitionData
if(quota.isThrottled(topicPartition))
quota.record(records.sizeInBytes)

圖片

設置時間窗口配置

這里設置的timeWindowMs 單個樣本窗口時間= 1 snumQuotaSamples 樣本數(shù) = 11 當然這些都是可以配置的

圖片

查看使用了哪個實現(xiàn)類

我們可以看到最終是使用了 SampleRate 來統(tǒng)計流量 !

Gauge 瞬時讀數(shù)的指標

上面我們起始是主要講解了Measurable接口, 它的父類是MetricValueProvider<Double> ,它沒有方法,只是定義,當還有一個子接口是 Gauge ,它并不是上面那種采樣的形式來統(tǒng)計數(shù)據(jù), 它返回的是當前的值, 瞬時值它提供的方法是 value() , Measurable提供的是measure()

這個在kafka中使用場景很少,就不詳細介紹了。

好了,這一篇我們主要講解了一下 Kafka中的數(shù)據(jù)采集和統(tǒng)計機制

那么 接下來下一篇,我們來聊聊 Kafka的監(jiān)控機制, 如何把這些采集

到的信息給保存起來并對外提供!!!

責任編輯:張燕妮 來源: 君哥聊技術
相關推薦

2022-08-15 10:45:34

RocketMQ消息隊列

2022-09-26 10:43:13

RocketMQ保存消息

2022-02-16 18:00:19

動態(tài)代理代碼靜態(tài)代理

2016-05-04 11:29:16

VR投資

2024-02-22 12:20:23

Linux零拷貝技術

2018-05-28 21:17:57

大數(shù)據(jù)分析軟件

2025-03-27 03:00:00

toB分析客戶畫像LTC模型

2021-09-07 05:04:53

HTTPHTTP3.0面試

2012-07-20 17:24:51

HTML5

2022-07-18 14:45:22

Kubernetes暴露方案

2022-08-01 10:43:11

RocketMQZookeeper注冊中心

2024-10-21 10:30:00

2019-05-08 14:24:04

區(qū)塊鏈CosmosPolkadot

2021-04-13 15:51:46

服務治理流量

2021-04-13 18:16:07

多線程安全代碼

2015-09-02 11:44:39

視頻云華為

2015-09-24 09:32:39

大數(shù)據(jù)白富美

2017-05-31 15:27:54

2012-03-14 20:59:32

iPad

2020-09-09 08:30:42

內(nèi)網(wǎng)隱蔽端口
點贊
收藏

51CTO技術棧公眾號