Prometheus時(shí)序數(shù)據(jù)庫-數(shù)據(jù)的查詢
前言
在之前的博客里,筆者詳細(xì)闡述了Prometheus數(shù)據(jù)的插入過程。但我們最常見的打交道的是數(shù)據(jù)的查詢。Prometheus提供了強(qiáng)大的Promql來滿足我們千變?nèi)f化的查詢需求。在這篇文章里面,筆者就以一個(gè)簡單的Promql為例,講述下Prometheus查詢的過程。
Promql
一個(gè)Promql表達(dá)式可以計(jì)算為下面四種類型:
- 瞬時(shí)向量(Instant Vector) - 一組同樣時(shí)間戳的時(shí)間序列(取自不同的時(shí)間序列,例如不同機(jī)器同一時(shí)間的CPU idle)
- 區(qū)間向量(Range vector) - 一組在一段時(shí)間范圍內(nèi)的時(shí)間序列
- 標(biāo)量(Scalar) - 一個(gè)浮點(diǎn)型的數(shù)據(jù)值
- 字符串(String) - 一個(gè)簡單的字符串
我們還可以在Promql中使用svm/avg等集合表達(dá)式,不過只能用在瞬時(shí)向量(Instant Vector)上面。為了闡述Prometheus的聚合計(jì)算以及篇幅原因,筆者在本篇文章只詳細(xì)分析瞬時(shí)向量(Instant Vector)的執(zhí)行過程。
瞬時(shí)向量(Instant Vector)
前面說到,瞬時(shí)向量是一組擁有同樣時(shí)間戳的時(shí)間序列。但是實(shí)際過程中,我們對不同Endpoint采樣的時(shí)間是不可能精確一致的。所以,Prometheus采取了距離指定時(shí)間戳之前最近的數(shù)據(jù)(Sample)。如下圖所示:
當(dāng)然,如果是距離當(dāng)前時(shí)間戳1個(gè)小時(shí)的數(shù)據(jù)直觀看來肯定不能納入到我們的返回結(jié)果里面。
所以Prometheus通過一個(gè)指定的時(shí)間窗口來過濾數(shù)據(jù)(通過啟動(dòng)參數(shù)—query.lookback-delta指定,默認(rèn)5min)。
對一條簡單的Promql進(jìn)行分析
好了,解釋完Instant Vector概念之后,我們可以著手進(jìn)行分析了。直接上一條帶有聚合函數(shù)的Promql吧。
- SUM BY (group) (http_requests{job="api-server",group="production"})
首先,對于這種有語法結(jié)構(gòu)的語句肯定是將其Parse一把,構(gòu)造成AST樹了。調(diào)用
- promql.ParseExpr
由于Promql較為簡單,所以Prometheus直接采用了LL語法分析。在這里直接給出上述Promql的AST樹結(jié)構(gòu)。
Prometheus對于語法樹的遍歷過程都是通過vistor模式,具體到代碼為:
- ast.go vistor設(shè)計(jì)模式
- func Walk(v Visitor, node Node, path []Node) error {
- var err error
- if v, err = v.Visit(node, path); v == nil || err != nil {
- return err
- }
- path = append(path, node)
- for _, e := range Children(node) {
- if err := Walk(v, e, path); err != nil {
- return err
- }
- }
- _, err = v.Visit(nil, nil)
- return err
- }
- func (f inspector) Visit(node Node, path []Node) (Visitor, error) {
- if err := f(node, path); err != nil {
- return nil, err
- }
- return f, nil
- }
通過golang里非常方便的函數(shù)式功能,直接傳遞求值函數(shù)inspector進(jìn)行不同情況下的求值。
- type inspector func(Node, []Node) error
求值過程
具體的求值過程核心函數(shù)為:
- func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (Value, storage.Warnings, error) {
- ......
- querier, warnings, err := ng.populateSeries(ctxPrepare, query.queryable, s) // 這邊拿到對應(yīng)序列的數(shù)據(jù)
- ......
- val, err := evaluator.Eval(s.Expr) // here 聚合計(jì)算
- ......
- }
populateSeries
首先通過populateSeries的計(jì)算出VectorSelector Node所對應(yīng)的series(時(shí)間序列)。這里直接給出求值函數(shù)
- func(node Node, path []Node) error {
- ......
- querier, err := q.Querier(ctx, timestamp.FromTime(mint), timestamp.FromTime(s.End))
- ......
- case *VectorSelector:
- .......
- set, wrn, err = querier.Select(params, n.LabelMatchers...)
- ......
- n.unexpandedSeriesSet = set
- ......
- case *MatrixSelector:
- ......
- }
- return nil
可以看到這個(gè)求值函數(shù),只對VectorSelector/MatrixSelector進(jìn)行操作,針對我們的Promql也就是只對葉子節(jié)點(diǎn)VectorSelector有效。
select
獲取對應(yīng)數(shù)據(jù)的核心函數(shù)就在querier.Select。我們先來看下qurier是如何得到的.
- querier, err := q.Querier(ctx, timestamp.FromTime(mint), timestamp.FromTime(s.End))
根據(jù)時(shí)間戳范圍去生成querier,里面最重要的就是計(jì)算出哪些block在這個(gè)時(shí)間范圍內(nèi),并將他們附著到querier里面。具體見函數(shù)
- func (db *DB) Querier(mint, maxt int64) (Querier, error) {
- for _, b := range db.blocks {
- ......
- // 遍歷blocks挑選block
- }
- // 如果maxt>head.mint(即內(nèi)存中的block),那么也加入到里面querier里面。
- if maxt >= db.head.MinTime() {
- blocks = append(blocks, &rangeHead{
- head: db.head,
- mint: mint,
- maxt: maxt,
- })
- }
- ......
- }
知道數(shù)據(jù)在哪些block里面,我們就可以著手進(jìn)行計(jì)算VectorSelector的數(shù)據(jù)了。
- // labelMatchers {job:api-server} {__name__:http_requests} {group:production}
- querier.Select(params, n.LabelMatchers...)
有了matchers我們很容易的就能夠通過倒排索引取到對應(yīng)的series。為了篇幅起見,我們假設(shè)數(shù)據(jù)都在headBlock(也就是內(nèi)存里面)。那么我們對于倒排的計(jì)算就如下圖所示:
這樣,我們的VectorSelector節(jié)點(diǎn)就已經(jīng)有了最終的數(shù)據(jù)存儲地址信息了,例如圖中的memSeries refId=3和4。
如果想了解在磁盤中的數(shù)據(jù)尋址,可以詳見筆者之前的博客
- <<Prometheus時(shí)序數(shù)據(jù)庫-磁盤中的存儲結(jié)構(gòu)>>
通過populateSeries找到對應(yīng)的數(shù)據(jù),那么我們就可以通過evaluator.Eval獲取最終的結(jié)果了。計(jì)算采用后序遍歷,等下層節(jié)點(diǎn)返回?cái)?shù)據(jù)后才開始上層節(jié)點(diǎn)的計(jì)算。那么很自然的,我們先計(jì)算VectorSelector。
- func (ev *evaluator) eval(expr Expr) Value {
- ......
- case *VectorSelector:
- // 通過refId拿到對應(yīng)的Series
- checkForSeriesSetExpansion(ev.ctx, e)
- // 遍歷所有的series
- for i, s := range e.series {
- // 由于我們這邊考慮的是instant query,所以只循環(huán)一次
- for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
- // 獲取距離ts最近且小于ts的最近的sample
- _, v, ok := ev.vectorSelectorSingle(it, e, ts)
- if ok {
- if ev.currentSamples < ev.maxSamples {
- // 注意,這邊的v對應(yīng)的原始t被替換成了ts,也就是instant query timeStamp
- ss.Points = append(ss.Points, Point{V: v, T: ts})
- ev.currentSamples++
- } else {
- ev.error(ErrTooManySamples(env))
- }
- }
- ......
- }
- }
- }
如代碼注釋中看到,當(dāng)我們找到一個(gè)距離ts最近切小于ts的sample時(shí)候,只用這個(gè)sample的value,其時(shí)間戳則用ts(Instant Query指定的時(shí)間戳)代替。
其中vectorSelectorSingle值得我們觀察一下:
- func (ev *evaluator) vectorSelectorSingle(it *storage.BufferedSeriesIterator, node *VectorSelector, ts int64) (int64, float64, bool){
- ......
- // 這一步是獲取>=refTime的數(shù)據(jù),也就是我們instant query傳入的
- ok := it.Seek(refTime)
- ......
- if !ok || t > refTime {
- // 由于我們需要的是<=refTime的數(shù)據(jù),所以這邊回退一格,由于同一memSeries同一時(shí)間的數(shù)據(jù)只有一條,所以回退的數(shù)據(jù)肯定是<=refTime的
- t, v, ok = it.PeekBack(1)
- if !ok || t < refTime-durationMilliseconds(LookbackDelta) {
- return 0, 0, false
- }
- }
- }
就這樣,我們找到了series 3和4距離Instant Query時(shí)間最近且小于這個(gè)時(shí)間的兩條記錄,并保留了記錄的標(biāo)簽。這樣,我們就可以在上層進(jìn)行聚合。
SUM by聚合
葉子節(jié)點(diǎn)VectorSelector得到了對應(yīng)的數(shù)據(jù)后,我們就可以對上層節(jié)點(diǎn)AggregateExpr進(jìn)行聚合計(jì)算了。代碼棧為:
- evaluator.rangeEval
- |->evaluate.eval.func2
- |->evelator.aggregation grouping key為group
具體的函數(shù)如下圖所示:
- func (ev *evaluator) aggregation(op ItemType, grouping []string, without bool, param interface{}, vec Vector, enh *EvalNodeHelper) Vector {
- ......
- // 對所有的sample
- for _, s := range vec {
- metric := s.Metric
- ......
- group, ok := result[groupingKey]
- // 如果此group不存在,則新加一個(gè)group
- if !ok {
- ......
- result[groupingKey] = &groupedAggregation{
- labels: m, // 在這里我們的m=[group:production]
- value: s.V,
- mean: s.V,
- groupCount: 1,
- }
- ......
- }
- switch op {
- // 這邊就是對SUM的最終處理
- case SUM:
- group.value += s.V
- .....
- }
- }
- .....
- for _, aggr := range result {
- enh.out = append(enh.out, Sample{
- Metric: aggr.labels,
- Point: Point{V: aggr.value},
- })
- }
- ......
- return enh.out
- }
好了,有了上面的處理,我們聚合的結(jié)果就變?yōu)?
這個(gè)和我們的預(yù)期結(jié)果一致,一次查詢的過程就到此結(jié)束了。
總結(jié)
Promql是非常強(qiáng)大的,可以滿足我們的各種需求。其運(yùn)行原理自然也激起了筆者的好奇心,本篇文章雖然只分析了一條簡單的Promql,但萬變不離其宗,任何Promql都是類似的運(yùn)行邏輯。希望本文對讀者能有所幫助。