Prometheus告警規(guī)則管理
什么是Rule
Prometheus支持用戶(hù)自定義Rule規(guī)則。Rule分為兩類(lèi),一類(lèi)是Recording Rule,另一類(lèi)是Alerting Rule。Recording Rule的主要目的是通過(guò)PromQL可以實(shí)時(shí)對(duì)Prometheus中采集到的樣本數(shù)據(jù)進(jìn)行查詢(xún),聚合以及其它各種運(yùn)算操作。而在某些PromQL較為復(fù)雜且計(jì)算量較大時(shí),直接使用PromQL可能會(huì)導(dǎo)致Prometheus響應(yīng)超時(shí)的情況。這時(shí)需要一種能夠類(lèi)似于后臺(tái)批處理的機(jī)制能夠在后臺(tái)完成這些復(fù)雜運(yùn)算的計(jì)算,對(duì)于使用者而言只需要查詢(xún)這些運(yùn)算結(jié)果即可。Prometheus通過(guò)Recoding Rule規(guī)則支持這種后臺(tái)計(jì)算的方式,可以實(shí)現(xiàn)對(duì)復(fù)雜查詢(xún)的性能優(yōu)化,提高查詢(xún)效率。
今天主要帶來(lái)告警規(guī)則的分析。Prometheus中的告警規(guī)則允許你基于PromQL表達(dá)式定義告警觸發(fā)條件,Prometheus后端對(duì)這些觸發(fā)規(guī)則進(jìn)行周期性計(jì)算,當(dāng)滿(mǎn)足觸發(fā)條件后則會(huì)觸發(fā)告警通知。
什么是告警Rule
告警是prometheus的一個(gè)重要功能,接下來(lái)從源碼的角度來(lái)分析下告警的執(zhí)行流程。
怎么定義告警Rule
一條典型的告警規(guī)則如下所示:
- groups:
- - name: example
- rules:
- - alert: HighErrorRate
- #指標(biāo)需要在觸發(fā)告警之前的10分鐘內(nèi)大于0.5。
- expr: job:request_latency_seconds:mean5m{job="myjob"} > 0.5
- for: 10m
- labels:
- severity: page
- annotations:
- summary: High request latency
- description: description info
在告警規(guī)則文件中,我們可以將一組相關(guān)的規(guī)則設(shè)置定義在一個(gè)group下。在每一個(gè)group中我們可以定義多個(gè)告警規(guī)則(rule)。一條告警規(guī)則主要由以下幾部分組成:
- alert:告警規(guī)則的名稱(chēng)。
- expr:基于PromQL表達(dá)式告警觸發(fā)條件,用于計(jì)算是否有時(shí)間序列滿(mǎn)足該條件。
- for:評(píng)估等待時(shí)間,可選參數(shù)。用于表示只有當(dāng)觸發(fā)條件持續(xù)一段時(shí)間后才發(fā)送告警。在等待期間新產(chǎn)生告警的狀態(tài)為pending。
- labels:自定義標(biāo)簽,允許用戶(hù)指定要附加到告警上的一組附加標(biāo)簽。
- annotations:用于指定一組附加信息,比如用于描述告警詳細(xì)信息的文字等,annotations的內(nèi)容在告警產(chǎn)生時(shí)會(huì)一同作為參數(shù)發(fā)送到Alertmanager。
Rule管理器
規(guī)則管理器會(huì)根據(jù)配置的規(guī)則,基于規(guī)則PromQL表達(dá)式告警的觸發(fā)條件,用于計(jì)算是否有時(shí)間序列滿(mǎn)足該條件。在滿(mǎn)足該條件時(shí),將告警信息發(fā)送給告警服務(wù)。
- type Manager struct {
- opts *ManagerOptions //外部的依賴(lài)
- groups map[string]*Group //當(dāng)前的規(guī)則組
- mtx sync.RWMutex //規(guī)則管理器讀寫(xiě)鎖
- block chan struct{}
- done chan struct{}
- restored bool
- logger log.Logger
- }
- opts(*ManagerOptions類(lèi)型):記錄了Manager實(shí)例使用到的其他模塊,例如storage模塊、notify模塊等。
- groups(map[string]*Group類(lèi)型):記錄了所有的rules.Group實(shí)例,其中key由rules.Group的名稱(chēng)及其所在的配置文件構(gòu)成。
- mtx(sync.RWMutex類(lèi)型):在讀寫(xiě)groups字段時(shí)都需要獲取該鎖進(jìn)行同步。
讀取Rule組配置
在Prometheus Server啟動(dòng)的過(guò)程中,首先會(huì)調(diào)用Manager.Update()方法加載Rule配置文件并進(jìn)行解析,其大致流程如下。
- 調(diào)用Manager.LoadGroups()方法加載并解析Rule配置文件,最終得到rules.Group實(shí)例集合。
- 停止原有的rules.Group實(shí)例,啟動(dòng)新的rules.Group實(shí)例。其中會(huì)為每個(gè)rules.Group實(shí)例啟動(dòng)一個(gè)goroutine,它會(huì)關(guān)聯(lián)rules.Group實(shí)例下的全部PromQL查詢(xún)。
- func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string) error {
- m.mtx.Lock()
- defer m.mtx.Unlock()
- // 從當(dāng)前文件中加載規(guī)則
- groups, errs := m.LoadGroups(interval, externalLabels, externalURL, files...)
- if errs != nil {
- for _, e := range errs {
- level.Error(m.logger).Log("msg", "loading groups failed", "err", e)
- }
- return errors.New("error loading rules, previous rule set restored")
- }
- m.restored = true
- var wg sync.WaitGroup
- //循環(huán)遍歷規(guī)則組
- for _, newg := range groups {
- // If there is an old group with the same identifier,
- // check if new group equals with the old group, if yes then skip it.
- // If not equals, stop it and wait for it to finish the current iteration.
- // Then copy it into the new group.
- //根據(jù)新的rules.Group的信息獲取規(guī)則組名
- gn := GroupKey(newg.file, newg.name)
- //根據(jù)規(guī)則組名獲取到老的規(guī)則組并刪除原有的rules.Group實(shí)例
- oldg, ok := m.groups[gn]
- delete(m.groups, gn)
- if ok && oldg.Equals(newg) {
- groups[gn] = oldg
- continue
- }
- wg.Add(1)
- //為每一個(gè)rules.Group實(shí)例啟動(dòng)一個(gè)goroutine
- go func(newg *Group) {
- if ok {
- oldg.stop()
- //將老的規(guī)則組中的狀態(tài)信息復(fù)制到新的規(guī)則組
- newg.CopyState(oldg)
- }
- wg.Done()
- // Wait with starting evaluation until the rule manager
- // is told to run. This is necessary to avoid running
- // queries against a bootstrapping storage.
- <-m.block
- //調(diào)用rules.Group.run()方法,開(kāi)始周期性的執(zhí)行PromQl語(yǔ)句
- newg.run(m.opts.Context)
- }(newg)
- }
- // Stop remaining old groups.
- //停止所有老規(guī)則組的服務(wù)
- wg.Add(len(m.groups))
- for n, oldg := range m.groups {
- go func(n string, g *Group) {
- g.markStale = true
- g.stop()
- if m := g.metrics; m != nil {
- m.IterationsMissed.DeleteLabelValues(n)
- m.IterationsScheduled.DeleteLabelValues(n)
- m.EvalTotal.DeleteLabelValues(n)
- m.EvalFailures.DeleteLabelValues(n)
- m.GroupInterval.DeleteLabelValues(n)
- m.GroupLastEvalTime.DeleteLabelValues(n)
- m.GroupLastDuration.DeleteLabelValues(n)
- m.GroupRules.DeleteLabelValues(n)
- m.GroupSamples.DeleteLabelValues((n))
- }
- wg.Done()
- }(n, oldg)
- }
- wg.Wait()
- //更新規(guī)則管理器中的規(guī)則組
- m.groups = groups
- return nil
- }
運(yùn)行Rule組調(diào)度方法
規(guī)則組啟動(dòng)流程(Group.run):進(jìn)入Group.run方法后先進(jìn)行初始化等待,以使規(guī)則的運(yùn)算時(shí)間在同一時(shí)刻,周期為g.interval;然后定義規(guī)則運(yùn)算調(diào)度方法:iter,調(diào)度周期為g.interval;在iter方法中調(diào)用g.Eval方法執(zhí)行下一層次的規(guī)則運(yùn)算調(diào)度。
規(guī)則運(yùn)算的調(diào)度周期g.interval,由prometheus.yml配置文件中g(shù)lobal中的 [ evaluation_interval:| default = 1m ]指定。實(shí)現(xiàn)如下:
- func (g *Group) run(ctx context.Context) {
- defer close(g.terminated)
- // Wait an initial amount to have consistently slotted intervals.
- evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.interval)
- select {
- case <-time.After(time.Until(evalTimestamp))://初始化等待
- case <-g.done:
- return
- }
- ctx = promql.NewOriginContext(ctx, map[string]interface{}{
- "ruleGroup": map[string]string{
- "file": g.File(),
- "name": g.Name(),
- },
- })
- //定義規(guī)則組規(guī)則運(yùn)算調(diào)度算法
- iter := func() {
- g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc()
- start := time.Now()
- //規(guī)則運(yùn)算的入口
- g.Eval(ctx, evalTimestamp)
- timeSinceStart := time.Since(start)
- g.metrics.IterationDuration.Observe(timeSinceStart.Seconds())
- g.setEvaluationTime(timeSinceStart)
- g.setLastEvaluation(start)
- }
- // The assumption here is that since the ticker was started after having
- // waited for `evalTimestamp` to pass, the ticks will trigger soon
- // after each `evalTimestamp + N * g.interval` occurrence.
- tick := time.NewTicker(g.interval) //設(shè)置規(guī)則運(yùn)算定時(shí)器
- defer tick.Stop()
- defer func() {
- if !g.markStale {
- return
- }
- go func(now time.Time) {
- for _, rule := range g.seriesInPreviousEval {
- for _, r := range rule {
- g.staleSeries = append(g.staleSeries, r)
- }
- }
- // That can be garbage collected at this point.
- g.seriesInPreviousEval = nil
- // Wait for 2 intervals to give the opportunity to renamed rules
- // to insert new series in the tsdb. At this point if there is a
- // renamed rule, it should already be started.
- select {
- case <-g.managerDone:
- case <-time.After(2 * g.interval):
- g.cleanupStaleSeries(ctx, now)
- }
- }(time.Now())
- }()
- //調(diào)用規(guī)則組規(guī)則運(yùn)算的調(diào)度方法
- iter()
- if g.shouldRestore {
- // If we have to restore, we wait for another Eval to finish.
- // The reason behind this is, during first eval (or before it)
- // we might not have enough data scraped, and recording rules would not
- // have updated the latest values, on which some alerts might depend.
- select {
- case <-g.done:
- return
- case <-tick.C:
- missed := (time.Since(evalTimestamp) / g.interval) - 1
- if missed > 0 {
- g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
- g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
- }
- evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
- iter()
- }
- g.RestoreForState(time.Now())
- g.shouldRestore = false
- }
- for {
- select {
- case <-g.done:
- return
- default:
- select {
- case <-g.done:
- return
- case <-tick.C:
- missed := (time.Since(evalTimestamp) / g.interval) - 1
- if missed > 0 {
- g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
- g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
- }
- evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
- //調(diào)用規(guī)則組規(guī)則運(yùn)算的調(diào)度方法
- iter()
- }
- }
- }
- }
運(yùn)行Rule調(diào)度方法
規(guī)則組對(duì)具體規(guī)則的調(diào)度在Group.Eval中實(shí)現(xiàn),在Group.Eval方法中會(huì)將規(guī)則組下的每條規(guī)則通過(guò)QueryFunc將(promQL)放到查詢(xún)引擎(queryEngine)中執(zhí)行,如果被執(zhí)行的是AlertingRule類(lèi)型,那么執(zhí)行結(jié)果指標(biāo)會(huì)被NotifyFunc組件發(fā)送給告警服務(wù);如果是RecordingRule類(lèi)型,最后將改結(jié)果指標(biāo)存儲(chǔ)到Prometheus的儲(chǔ)存管理器中,并對(duì)過(guò)期指標(biāo)進(jìn)行存儲(chǔ)標(biāo)記處理。
- // Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
- func (g *Group) Eval(ctx context.Context, ts time.Time) {
- var samplesTotal float64
- 遍歷當(dāng)前規(guī)則組下的所有規(guī)則
- for i, rule := range g.rules {
- select {
- case <-g.done:
- return
- default:
- }
- func(i int, rule Rule) {
- sp, ctx := opentracing.StartSpanFromContext(ctx, "rule")
- sp.SetTag("name", rule.Name())
- defer func(t time.Time) {
- sp.Finish()
- //更新服務(wù)指標(biāo)-規(guī)則的執(zhí)行時(shí)間
- since := time.Since(t)
- g.metrics.EvalDuration.Observe(since.Seconds())
- rule.SetEvaluationDuration(since)
- //記錄本次規(guī)則執(zhí)行的耗時(shí)
- rule.SetEvaluationTimestamp(t)
- }(time.Now())
- //記錄規(guī)則運(yùn)算的次數(shù)
- g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
- //運(yùn)算規(guī)則
- vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)
- if err != nil {
- //規(guī)則出現(xiàn)錯(cuò)誤后,終止查詢(xún)
- rule.SetHealth(HealthBad)
- rule.SetLastError(err)
- //記錄查詢(xún)失敗的次數(shù)
- g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
- // Canceled queries are intentional termination of queries. This normally
- // happens on shutdown and thus we skip logging of any errors here.
- if _, ok := err.(promql.ErrQueryCanceled); !ok {
- level.Warn(g.logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err)
- }
- return
- }
- samplesTotal += float64(len(vector))
- //判斷是否是告警類(lèi)型規(guī)則
- if ar, ok := rule.(*AlertingRule); ok {
- 發(fā)送告警
- ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
- }
- var (
- numOutOfOrder = 0
- numDuplicates = 0
- )
- //此處為Recording獲取存儲(chǔ)器指標(biāo)
- app := g.opts.Appendable.Appender(ctx)
- seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
- defer func() {
- if err := app.Commit(); err != nil {
- rule.SetHealth(HealthBad)
- rule.SetLastError(err)
- g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
- level.Warn(g.logger).Log("msg", "Rule sample appending failed", "err", err)
- return
- }
- g.seriesInPreviousEval[i] = seriesReturned
- }()
- for _, s := range vector {
- if _, err := app.Append(0, s.Metric, s.T, s.V); err != nil {
- rule.SetHealth(HealthBad)
- rule.SetLastError(err)
- switch errors.Cause(err) {
- 儲(chǔ)存指標(biāo)返回的各種錯(cuò)誤碼處理
- case storage.ErrOutOfOrderSample:
- numOutOfOrder++
- level.Debug(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
- case storage.ErrDuplicateSampleForTimestamp:
- numDuplicates++
- level.Debug(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
- default:
- level.Warn(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
- }
- } else {
- //緩存規(guī)則運(yùn)算后的結(jié)果指標(biāo)
- seriesReturned[s.Metric.String()] = s.Metric
- }
- }
- if numOutOfOrder > 0 {
- level.Warn(g.logger).Log("msg", "Error on ingesting out-of-order result from rule evaluation", "numDropped", numOutOfOrder)
- }
- if numDuplicates > 0 {
- level.Warn(g.logger).Log("msg", "Error on ingesting results from rule evaluation with different value but same timestamp", "numDropped", numDuplicates)
- }
- for metric, lset := range g.seriesInPreviousEval[i] {
- if _, ok := seriesReturned[metric]; !ok {
- //設(shè)置過(guò)期指標(biāo)的指標(biāo)值
- // Series no longer exposed, mark it stale.
- _, err = app.Append(0, lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
- switch errors.Cause(err) {
- case nil:
- case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
- // Do not count these in logging, as this is expected if series
- // is exposed from a different rule.
- default:
- level.Warn(g.logger).Log("msg", "Adding stale sample failed", "sample", metric, "err", err)
- }
- }
- }
- }(i, rule)
- }
- if g.metrics != nil {
- g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal)
- }
- g.cleanupStaleSeries(ctx, ts)
- }
然后就是規(guī)則的具體執(zhí)行了,我們這里先只看AlertingRule的流程。首先看下AlertingRule的結(jié)構(gòu):
- // An AlertingRule generates alerts from its vector expression.
- type AlertingRule struct {
- // The name of the alert.
- name string
- // The vector expression from which to generate alerts.
- vector parser.Expr
- // The duration for which a labelset needs to persist in the expression
- // output vector before an alert transitions from Pending to Firing state.
- holdDuration time.Duration
- // Extra labels to attach to the resulting alert sample vectors.
- labels labels.Labels
- // Non-identifying key/value pairs.
- annotations labels.Labels
- // External labels from the global config.
- externalLabels map[string]string
- // true if old state has been restored. We start persisting samples for ALERT_FOR_STATE
- // only after the restoration.
- restored bool
- // Protects the below.
- mtx sync.Mutex
- // Time in seconds taken to evaluate rule.
- evaluationDuration time.Duration
- // Timestamp of last evaluation of rule.
- evaluationTimestamp time.Time
- // The health of the alerting rule.
- health RuleHealth
- // The last error seen by the alerting rule.
- lastError error
- // A map of alerts which are currently active (Pending or Firing), keyed by
- // the fingerprint of the labelset they correspond to.
- active map[uint64]*Alert
- logger log.Logger
- }
這里比較重要的就是active字段了,它保存了執(zhí)行規(guī)則后需要進(jìn)行告警的資源,具體是否告警還要執(zhí)行一系列的邏輯來(lái)判斷是否滿(mǎn)足告警條件。具體執(zhí)行的邏輯如下:
- func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) {
- res, err := query(ctx, r.vector.String(), ts)
- if err != nil {
- r.SetHealth(HealthBad)
- r.SetLastError(err)
- return nil, err
- }
- // ......
- }
這一步通過(guò)創(chuàng)建Manager時(shí)傳入的QueryFunc函數(shù)執(zhí)行規(guī)則配置中的expr表達(dá)式,然后得到返回的結(jié)果,這里的結(jié)果是滿(mǎn)足表達(dá)式的指標(biāo)的集合。比如配置的規(guī)則為:
- cpu_usage > 90
那么查出來(lái)的結(jié)果可能是
- cpu_usage{instance="192.168.0.11"} 91
- cpu_usage{instance="192.168.0.12"} 92
然后遍歷查詢(xún)到的結(jié)果,根據(jù)指標(biāo)的標(biāo)簽生成一個(gè)hash值,然后判斷這個(gè)hash值是否之前已經(jīng)存在(即之前是否已經(jīng)有相同的指標(biāo)數(shù)據(jù)返回),如果是,則更新上次的value及annotations,如果不是,則創(chuàng)建一個(gè)新的alert并保存至該規(guī)則下的active alert列表中。然后遍歷規(guī)則的active alert列表,根據(jù)規(guī)則的持續(xù)時(shí)長(zhǎng)配置、alert的上次觸發(fā)時(shí)間、alert的當(dāng)前狀態(tài)、本次查詢(xún)alert是否依然存在等信息來(lái)修改alert的狀態(tài)。具體規(guī)則如下:
如果alert之前存在,但本次執(zhí)行時(shí)不存在
- 狀態(tài)是StatePending或者本次檢查時(shí)間距離上次觸發(fā)時(shí)間超過(guò)15分鐘(15分鐘為寫(xiě)死的常量),則將該alert從active列表中刪除
- 狀態(tài)不為StateInactive的alert修改為StateInactive
如果alert之前存在并且本次執(zhí)行仍然存在
- alert的狀態(tài)是StatePending并且本次檢查距離上次觸發(fā)時(shí)間超過(guò)配置的for持續(xù)時(shí)長(zhǎng),那么狀態(tài)修改為StateFiring
其余情況修改alert的狀態(tài)為StatePending
上面那一步只是修改了alert的狀態(tài),但是并沒(méi)有真正執(zhí)行發(fā)送告警操作。下面才是真正要執(zhí)行告警操作:
- // 判斷規(guī)則是否是alert規(guī)則,如果是則發(fā)送告警信息(具體是否真正發(fā)送由ar.sendAlerts中的邏輯判斷)
- if ar, ok := rule.(*AlertingRule); ok {
- ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
- }
- // .......
- func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
- alerts := []*Alert{}
- r.ForEachActiveAlert(func(alert *Alert) {
- if alert.needsSending(ts, resendDelay) {
- alert.LastSentAt = ts
- // Allow for two Eval or Alertmanager send failures.
- delta := resendDelay
- if interval > resendDelay {
- delta = interval
- }
- alert.ValidUntil = ts.Add(4 * delta)
- anew := *alert
- alerts = append(alerts, &anew)
- }
- })
- notifyFunc(ctx, r.vector.String(), alerts...)
- }
- func (a *Alert) needsSending(ts time.Time, resendDelay time.Duration) bool {
- if a.State == StatePending {
- return false
- }
- // if an alert has been resolved since the last send, resend it
- if a.ResolvedAt.After(a.LastSentAt) {
- return true
- }
- return a.LastSentAt.Add(resendDelay).Before(ts)
- }
概括一下以上邏輯就是:
- 如果alert的狀態(tài)是StatePending,則不發(fā)送告警
- 如果alert的已經(jīng)被解決,那么再次發(fā)送告警標(biāo)記該條信息已經(jīng)被解決
- 如果當(dāng)前時(shí)間距離上次發(fā)送告警的時(shí)間大于配置的重新發(fā)送延時(shí)時(shí)間(ResendDelay),則發(fā)送告警,否則不發(fā)送
以上就是prometheus的告警流程。學(xué)習(xí)這個(gè)流程主要是問(wèn)了能夠?qū)rometheus的rules相關(guān)的做二次開(kāi)發(fā)。我們可以修改LoadGroups()方法,讓其可以動(dòng)態(tài)側(cè)加載定義在mysql中定義的規(guī)則,動(dòng)態(tài)實(shí)現(xiàn)告警規(guī)則更新。
參考:
《深入淺出prometheus原理、應(yīng)用、源碼與拓展詳解》