自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

深入剖析Alertmanager:解鎖告警管理的核心邏輯

開發(fā) 前端
Alertmanager作為監(jiān)控體系中的關鍵組件,在告警處理方面展現(xiàn)出了強大的功能和高效的實現(xiàn)邏輯。通過對告警去重、分組、路由和抑制等核心功能的深入分析,以及對其源碼中關鍵數(shù)據(jù)結(jié)構和處理流程的解讀,我們清晰地認識到它如何在復雜的監(jiān)控環(huán)境中,將海量的告警信息進行有序管理和精準分發(fā)。

一、引言

在當今復雜的IT系統(tǒng)架構中,監(jiān)控體系對于保障系統(tǒng)的穩(wěn)定運行至關重要。而Alertmanager作為監(jiān)控體系里關鍵的一環(huán),在處理告警信息、確保相關人員及時響應等方面發(fā)揮著無可替代的作用。它就像是一個信息樞紐,接收來自各個監(jiān)控源的告警信息,經(jīng)過一系列智能處理后,精準地將關鍵信息傳遞給相關人員。

接下來,讓我們深入探索Alertmanager的實現(xiàn)邏輯,通過源碼分析,一窺其背后的運行機制。這不僅能幫助我們更好地理解和運用Alertmanager,還能為優(yōu)化監(jiān)控體系提供有力的支持。

二、Alertmanager初相識

(一)功能概覽

Alertmanager具備一系列強大的功能,旨在高效處理和管理告警信息。

告警去重:在復雜的監(jiān)控環(huán)境中,同一問題可能會產(chǎn)生多個重復的告警,這不僅會干擾運維人員的判斷,還可能導致重要信息被淹沒。Alertmanager通過獨特的算法,比較告警的標簽、內(nèi)容等關鍵信息,精準識別并去除重復的告警。例如,當某個服務器的CPU使用率持續(xù)過高,多個監(jiān)控指標可能會同時觸發(fā)告警,但Alertmanager能夠?qū)⑦@些重復告警合并為一個,確保運維人員只收到一次通知 ,有效減少了告警噪音,讓運維人員能夠?qū)W⒂谡嬲膯栴}。

告警分組:將相似的告警進行分組,是Alertmanager的又一核心功能。通過合理的分組策略,能夠?qū)⒋罅糠稚⒌母婢畔⒄沓捎行虻募希岣吒婢目勺x性和管理效率。比如,在一個大型電商系統(tǒng)中,可能會有多個與訂單處理相關的服務出現(xiàn)故障,如訂單創(chuàng)建失敗、訂單支付異常等告警。Alertmanager可以根據(jù)預先設定的規(guī)則,將這些與訂單處理相關的告警歸為一組,以單一通知的形式發(fā)送給負責訂單業(yè)務的運維團隊。這樣,運維人員可以一次性了解到訂單業(yè)務相關的所有問題,快速定位故障范圍,提高故障處理的效率。

告警路由:Alertmanager支持根據(jù)告警的標簽、內(nèi)容等屬性,將告警精準地路由到不同的接收器,如電子郵件、Slack、PagerDuty等。這一功能使得告警能夠及時送達至最合適的人員或團隊手中,確保問題得到及時處理。例如,對于與網(wǎng)絡相關的告警,可以配置Alertmanager將其發(fā)送給網(wǎng)絡運維團隊的Slack群組;而對于與數(shù)據(jù)庫相關的告警,則發(fā)送到數(shù)據(jù)庫管理員的郵箱。通過靈活的路由配置,實現(xiàn)了告警通知的個性化和精準化,大大提高了告警響應的及時性和準確性。

告警抑制:在某些情況下,一個告警的產(chǎn)生可能會引發(fā)一系列其他相關告警。為了避免在這種情況下運維人員收到過多冗余的告警通知,Alertmanager提供了告警抑制功能。通過設置抑制規(guī)則,當某個特定告警被觸發(fā)后,其他與之相關的告警可以被臨時抑制。例如,當整個數(shù)據(jù)中心的網(wǎng)絡出現(xiàn)故障時,可能會導致大量服務器和服務的連接異常告警。此時,可以配置Alertmanager,當數(shù)據(jù)中心網(wǎng)絡故障的告警被觸發(fā)后,抑制所有服務器和服務的連接異常告警,只保留網(wǎng)絡故障的告警通知,這樣可以有效避免告警風暴,讓運維人員能夠快速定位到問題的根源 。

(二)工作流程總覽

Alertmanager的工作流程從接收告警開始,歷經(jīng)多個關鍵環(huán)節(jié),最終將處理后的告警信息發(fā)送給相應的接收者。

首先,Alertmanager通過其HTTP API接收來自Prometheus或其他監(jiān)控系統(tǒng)發(fā)送的告警信息。這些告警信息包含了豐富的元數(shù)據(jù),如告警名稱、描述、標簽、發(fā)生時間等。例如,Prometheus在監(jiān)測到某個服務器的CPU使用率超過80%且持續(xù)5分鐘后,會向Alertmanager發(fā)送一條告警信息,其中包括告警名稱“High CPU Usage”、詳細描述“Server X's CPU usage has exceeded 80% for 5 minutes”,以及相關標簽如“server=X”“service=backend”等。

接收告警后,Alertmanager會進行去重處理。它會根據(jù)告警的標簽和內(nèi)容,判斷是否存在重復的告警事件。如果發(fā)現(xiàn)重復告警,會將其合并,確保在一定時間內(nèi),同一告警只會被通知一次。

接著,進入告警分組環(huán)節(jié)。根據(jù)預先設定的分組規(guī)則,Alertmanager會將具有相同或相似標簽的告警歸為一組。例如,根據(jù)“service”標簽進行分組,所有與“backend”服務相關的告警會被合并成一個組。分組后的告警信息會以更清晰、有條理的方式呈現(xiàn)給運維人員,便于他們進行統(tǒng)一處理。

之后,告警路由開始發(fā)揮作用。Alertmanager會根據(jù)告警的屬性和配置的路由規(guī)則,將告警分發(fā)到相應的接收器。例如,對于“severity=critical”且“service=backend”的告警,會被路由到后端運維團隊的Slack群組;而對于“severity=warning”且“service=frontend”的告警,則會發(fā)送到前端運維團隊的電子郵件地址。

在整個過程中,Alertmanager還會根據(jù)配置的抑制規(guī)則進行抑制判斷。如果滿足抑制條件,某些相關告警的通知將被臨時抑制,避免過多冗余告警的干擾。

最后,通過配置好的接收器,如電子郵件、Slack等,將處理后的告警信息發(fā)送給相應的人員或團隊。這樣,運維人員能夠及時獲取到關鍵的告警信息,采取相應的措施進行處理,保障系統(tǒng)的穩(wěn)定運行。

三、核心功能的實現(xiàn)邏輯

(一)告警去重機制

1. 哈希算法原理

Alertmanager采用哈希算法實現(xiàn)告警去重。它通過對告警的標簽進行特定計算,生成一個哈希值。以代碼中的hashAlert函數(shù)為例,具體步驟如下:

func hashAlert(a *types.Alert) uint64 {  
    const sep = '\xff'  
  
    hb := hashBuffers.Get().(*hashBuffer)  
    defer hashBuffers.Put(hb)  
    b := hb.buf[:0]  
  
    names := make(model.LabelNames, 0, len(a.Labels))  
  
    for ln := range a.Labels {  
       names = append(names, ln)  
    }  
    sort.Sort(names)  
  
    for _, ln := range names {  
       b = append(b, string(ln)...)  
       b = append(b, sep)  
       b = append(b, string(a.Labels[ln])...)  
       b = append(b, sep)  
    }  
  
    hash := xxhash.Sum64(b)  
  
    return hash  
}

首先,它將告警a的所有標簽名提取出來,存入names切片中。接著,對names進行排序,這一步至關重要,確保了相同標簽集合無論以何種順序輸入,都能得到一致的處理結(jié)果。然后,遍歷排序后的names,將標簽名、分隔符sep、標簽值以及分隔符依次追加到字節(jié)切片b中。最后,使用xxhash.Sum64函數(shù)對字節(jié)切片b進行哈希計算,得到最終的哈希值。這個哈希值就像告警的“指紋”,用于唯一標識該告警。如果兩個告警的哈希值相同,那么在去重機制中,它們就被視為重復告警。

2. 去重流程

在Alertmanager的去重過程中,DedupStage階段起著關鍵作用。其核心邏輯如下:

func (n *DedupStage) Exec(ctx context.Context, _ *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {  
    gkey, ok := GroupKey(ctx)  
    if !ok {  
       return ctx, nil, errors.New("group key missing")  
    }  
  
    repeatInterval, ok := RepeatInterval(ctx)  
    if !ok {  
       return ctx, nil, errors.New("repeat interval missing")  
    }  
  
    firingSet := map[uint64]struct{}{}  
    resolvedSet := map[uint64]struct{}{}  
    firing := []uint64{}  
    resolved := []uint64{}  
  
    var hash uint64  
    for _, a := range alerts {  
       hash = n.hash(a)  
       if a.Resolved() {  
          resolved = append(resolved, hash)  
          resolvedSet[hash] = struct{}{}  
       } else {  
          firing = append(firing, hash)  
          firingSet[hash] = struct{}{}  
       }  
    }  
  
    ctx = WithFiringAlerts(ctx, firing)  
    ctx = WithResolvedAlerts(ctx, resolved)  
  
    entries, err := n.nflog.Query(nflog.QGroupKey(gkey), nflog.QReceiver(n.recv))  
    if err != nil && !errors.Is(err, nflog.ErrNotFound) {  
       return ctx, nil, err  
    }  
  
    var entry *nflogpb.Entry  
    switch len(entries) {  
    case 0:  
    case 1:  
       entry = entries[0]  
    default:  
       return ctx, nil, fmt.Errorf("unexpected entry result size %d", len(entries))  
    }  
  
    if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) {  
       return ctx, alerts, nil  
    }  
    return ctx, nil, nil  
}

在這個函數(shù)中,首先初始化了兩個用于存儲哈希值的集合firingSet和resolvedSet,分別用于記錄觸發(fā)狀態(tài)和已解決狀態(tài)的告警哈希值,同時初始化了兩個用于存儲哈希值的切片firing和resolved。然后,遍歷傳入的所有告警alerts,通過n.hash(a)計算每個告警的哈希值。根據(jù)告警的狀態(tài)(a.Resolved()判斷是否已解決),將哈希值分別存入對應的集合和切片中。接著,通過WithFiringAlerts和WithResolvedAlerts函數(shù)將計算得到的觸發(fā)和已解決的告警哈希值切片存入上下文ctx中,以便后續(xù)階段使用。之后,從日志nflog中查詢與當前告警組和接收器相關的記錄entries。根據(jù)查詢結(jié)果,判斷是否需要更新告警信息。如果n.needsUpdate返回true,則說明當前實例需要繼續(xù)發(fā)送這些告警,函數(shù)返回當前上下文ctx、原始告警列表alerts以及nil錯誤,表示處理成功;否則,返回當前上下文ctx、空的告警列表nil以及nil錯誤,意味著這些告警已被其他實例發(fā)送,本實例不再重復發(fā)送。通過這一系列步驟,Alertmanager有效地實現(xiàn)了告警去重,避免了重復告警對運維人員的干擾。

(二)告警分組策略

1. 分組依據(jù)

Alertmanager主要依據(jù)告警的標簽來進行分組。通過配置group_by參數(shù),可以指定按照哪些標簽進行分組。例如,當配置group_by: ['alertname', 'cluster']時,具有相同alertname和cluster標簽值的告警會被歸為一組。假設在一個分布式系統(tǒng)中,有多個服務實例運行在不同的集群上。當某個服務出現(xiàn)故障時,會產(chǎn)生多個告警,每個告警都帶有alertname(如ServiceDown)和cluster(如cluster1、cluster2等)標簽。根據(jù)上述配置,所有alertname為ServiceDown且cluster相同的告警會被分到一組。這樣,運維人員可以清晰地看到每個集群中該服務的故障情況,而不是面對大量零散的告警信息,大大提高了故障排查的效率。如果在一個電商系統(tǒng)中,訂單服務出現(xiàn)問題,可能會觸發(fā)多個與訂單相關的告警,如訂單創(chuàng)建失敗、訂單支付失敗等。通過合理設置group_by參數(shù),將這些告警按照訂單服務相關的標簽進行分組,運維人員可以快速了解訂單服務整體的故障狀況,而不是被眾多單獨的告警所困擾。

2. 分組時間控制

分組時間控制涉及到group_wait和group_interval兩個重要參數(shù)。

group_wait表示在發(fā)送一個告警組的通知之前,Alertmanager等待新告警加入同一組的時間。例如,當設置group_wait: 30s時,如果在這30秒內(nèi),有新的符合分組條件的告警產(chǎn)生,它們會被添加到當前組中,然后一起發(fā)送通知。這就好比在收集貨物準備發(fā)貨,在30秒的等待時間內(nèi),不斷有新的貨物(新告警)到達,都被裝進同一個包裹(告警組),30秒后,這個包裹被寄出(發(fā)送告警組通知)。這樣可以避免短時間內(nèi)頻繁發(fā)送小的告警組通知,減少通知的數(shù)量,提高運維人員的處理效率。

group_interval則定義了在一個告警組已經(jīng)發(fā)送通知后,再次發(fā)送該組更新通知之前需要等待的時間。例如,設置group_interval: 5m,當一個告警組在某一時刻發(fā)送了通知后,在接下來的5分鐘內(nèi),即使該組有新的告警加入或狀態(tài)發(fā)生變化,Alertmanager也不會立即發(fā)送更新通知。只有在5分鐘之后,才會重新評估該組是否需要發(fā)送新的通知。這有助于防止在短時間內(nèi)對同一問題進行過度通知,避免運維人員被頻繁的告警更新所打擾。比如,某個服務的短暫波動可能會在短時間內(nèi)產(chǎn)生多個告警,但通過合理設置group_interval,可以將這些告警的更新合并在一個適當?shù)臅r間點發(fā)送,讓運維人員能夠更有效地處理告警信息。

(三)告警路由規(guī)則

1. 路由樹結(jié)構

Alertmanager的路由規(guī)則以路由樹的形式組織。路由樹的每個節(jié)點都包含了一系列的配置信息,用于決定如何處理接收到的告警。節(jié)點的核心構成包括匹配條件和Continue字段。

匹配條件用于判斷告警是否與該節(jié)點匹配。例如,通過設置match: {severity: 'critical'},表示當告警的severity標簽值為critical時,該告警與這個節(jié)點匹配。還可以使用正則表達式進行更靈活的匹配,如match_re: {service: '^backend.*'},這會匹配所有service標簽以backend開頭的告警。

Continue字段則決定了告警在匹配當前節(jié)點后的行為。當Continue為false(默認值)時,一旦告警匹配到當前節(jié)點,就會停止在路由樹中的進一步匹配,直接按照該節(jié)點的配置進行處理,如將告警發(fā)送到指定的接收器。例如,在一個簡單的路由配置中,頂級節(jié)點配置了receiver: 'default_receiver',一個子節(jié)點配置為match: {severity: 'critical'}, receiver: 'critical_receiver', Continue: false。當一個severity為critical的告警到達時,它會匹配到這個子節(jié)點,由于Continue為false,告警將直接被發(fā)送到critical_receiver,不再繼續(xù)檢查其他子節(jié)點。而當Continue為true時,告警在匹配當前節(jié)點后,會繼續(xù)在路由樹中向下匹配其他節(jié)點,直到找到最合適的處理方式。假設存在另一個子節(jié)點配置為match: {service: 'database'}, receiver: 'database_receiver', Continue: true,且告警的service標簽為database,即使它先匹配到了前面的critical節(jié)點,由于該節(jié)點Continue為true,告警仍會繼續(xù)匹配到這個database節(jié)點,并最終可能被發(fā)送到database_receiver,具體取決于后續(xù)的匹配情況。通過這種靈活的路由樹結(jié)構和配置,Alertmanager能夠根據(jù)告警的各種屬性,將其準確地路由到相應的接收器,實現(xiàn)高效的告警分發(fā)。

2. 路由匹配過程

當Alertmanager接收到一個告警時,會從路由樹的頂級節(jié)點開始進行匹配。以如下代碼所示的路由配置為例:

route:
  group_by: ['alertname']
  group_wait: 30s
  group_interval: 5m
  repeat_interval: 4h
  receiver: 'default_receiver'
  routes:
  - match:
      severity: 'critical'
      service: 'backend'
    receiver: 'backend_critical_receiver'
    continue: false
  - match:
      severity: 'warning'
      service: 'backend'
    receiver: 'backend_warning_receiver'
    continue: true
  - match:
      service: 'frontend'
    receiver: 'frontend_receiver'

當一個告警到達后,首先會檢查其是否滿足頂級節(jié)點的條件(這里頂級節(jié)點主要是設置了一些默認參數(shù)和分組相關配置)。然后,依次檢查子節(jié)點的匹配條件。假設一個告警的標簽為severity: 'critical', service: 'backend',它會首先匹配到第一個子節(jié)點,由于該子節(jié)點的Continue為false,告警將直接被發(fā)送到backend_critical_receiver,不再繼續(xù)檢查其他子節(jié)點。若一個告警的標簽為severity: 'warning', service: 'backend',它會匹配到第二個子節(jié)點,因為Continue為true,告警會繼續(xù)檢查后續(xù)子節(jié)點。如果后續(xù)沒有其他更匹配的節(jié)點,它將根據(jù)該節(jié)點的配置被發(fā)送到backend_warning_receiver。再假設一個告警的標簽為service: 'frontend',它不會匹配到前兩個子節(jié)點,但會匹配到第三個子節(jié)點,最終被發(fā)送到frontend_receiver。通過這樣逐步的匹配過程,Alertmanager能夠根據(jù)告警的具體屬性,將其精準地路由到最合適的接收器,確保告警能夠被及時、準確地處理。

(四)告警抑制功能

1. 抑制規(guī)則定義

在 Alertmanager 的配置文件中,通過 inhibit_rules 部分定義抑制規(guī)則。每個抑制規(guī)則包含以下三個主要部分:

  • source_match 和 **source_match_re**:定義觸發(fā)抑制的告警的匹配條件。這些告警通常是高等級的告警。source_match 用于精確匹配標簽值,而 source_match_re 用于正則表達式匹配。
  • target_match 和 **target_match_re**:定義將被抑制的告警的匹配條件。這些告警通常是低等級的告警。同樣,target_match 用于精確匹配,target_match_re 用于正則表達式匹配。
  • **equal**:定義觸發(fā)抑制的告警和被抑制的告警之間必須匹配的標簽。只有當這些標簽的值相同時,抑制規(guī)則才會生效。

2、抑制邏輯的處理流程

  • 告警接收與存儲:Alertmanager 通過 API 接收來自 Prometheus 的告警信息,并將其存儲在內(nèi)存中的 Alert Provider 中。
  • 告警匹配與分組:Dispatcher 組件從 Alert Provider 訂閱告警信息,并根據(jù)配置的路由規(guī)則(route)對告警進行匹配和分組。
  • 抑制規(guī)則的匹配:在告警分組后,進入 Notification Pipeline 組件的 InhibitStage 階段。此階段會檢查當前告警是否滿足抑制規(guī)則:

a.首先,檢查當前告警是否匹配 target_match 和 target_match_re 定義的條件。

b.然后,檢查是否存在已觸發(fā)的告警(即 source 告警),且該 source 告警滿足 source_match 和 source_match_re 定義的條件。

c.最后,檢查 source 告警和當前告警的 equal 標簽值是否相同。如果所有條件都滿足,則當前告警被標記為抑制狀態(tài),不會發(fā)送通知。

d.后續(xù)處理:被抑制的告警不會進入后續(xù)的通知發(fā)送階段,從而避免了不必要的告警通知。

2. 告警抑制的實現(xiàn)源碼

  • 抑制規(guī)則的定義與加載 在 Alertmanager 配置文件加載時,會解析 inhibit_rules 配置,并將其轉(zhuǎn)換為內(nèi)部的抑制規(guī)則結(jié)構。相關代碼如下:
// inhibit/inhibitor.go
func NewInhibitor(ap provider.Alerts, rs []config.InhibitRule, mk types.AlertMarker, logger *slog.Logger) *Inhibitor {  
    ih := &Inhibitor{  
       alerts: ap,  
       marker: mk,  
       logger: logger,  
    }  
    for _, cr := range rs {  
       r := NewInhibitRule(cr)  
       ih.rules = append(ih.rules, r)  
    }  
    return ih  
}

這里,config.InhibitRule 是從配置文件中解析出的抑制規(guī)則結(jié)構,Inhibitor 結(jié)構體用于管理這些規(guī)則。

  • 抑制規(guī)則的匹配 在 InhibitStage 階段,會調(diào)用 Inhibitor 的 Mutes 方法來檢查告警是否滿足抑制規(guī)則。相關代碼如下:
// inhibit/inhibit.go
func (ih *Inhibitor) Mutes(lset model.LabelSet) bool {  
    fp := lset.Fingerprint()  
  
    for _, r := range ih.rules {  
       if !r.TargetMatchers.Matches(lset) {  
          // If target side of rule doesn't match, we don't need to look any further.  
          continue  
       }  
       // If we are here, the target side matches. If the source side matches, too, we  
       // need to exclude inhibiting alerts for which the same is true.       
       if inhibitedByFP, eq := r.hasEqual(lset, r.SourceMatchers.Matches(lset)); eq {  
          ih.marker.SetInhibited(fp, inhibitedByFP.String())  
          return true  
       }  
    }  
    ih.marker.SetInhibited(fp)  
  
    return false  
}

Mutes 方法會遍歷所有定義的抑制規(guī)則,調(diào)用每個規(guī)則的 Mutes 方法來判斷當前告警是否滿足該規(guī)則。

告警抑制的方法在notify/notify.go中的抑制階段MuteStage結(jié)構體中進行調(diào)用,如下:

// MuteStage filters alerts through a Muter.type MuteStage struct {  
    muter   types.Muter  
    metrics *Metrics  
}  
  
// NewMuteStage return a new MuteStage.  
func NewMuteStage(m types.Muter, metrics *Metrics) *MuteStage {  
    return &MuteStage{muter: m, metrics: metrics}  
}  
  
// Exec implements the Stage interface.func (n *MuteStage) Exec(ctx context.Context, logger *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {  
    var (  
       filtered []*types.Alert  
       muted    []*types.Alert  
    )  
    for _, a := range alerts {  
       // TODO(fabxc): increment total alerts counter.  
       // Do not send the alert if muted.  
       if n.muter.Mutes(a.Labels) {  
          muted = append(muted, a)  
       } else {  
          filtered = append(filtered, a)  
       }  
       // TODO(fabxc): increment muted alerts counter if muted.  
    }  
    if len(muted) > 0 {  
  
       var reason string  
       switch n.muter.(type) {  
       case *silence.Silencer:  
          reason = SuppressedReasonSilence  
       case *inhibit.Inhibitor:  
          reason = SuppressedReasonInhibition  
       default:  
       }  
       n.metrics.numNotificationSuppressedTotal.WithLabelValues(reason).Add(float64(len(muted)))  
       logger.Debug("Notifications will not be sent for muted alerts", "alerts", fmt.Sprintf("%v", muted), "reason", reason)  
    }  
  
    return ctx, filtered, nil  
}

通過n.muter.Mutes(a.Labels)來判斷告警是否被抑制,如果未被抑制,則存入filtered結(jié)構體進行后續(xù)處理,反之存入muted結(jié)構體,這些告警不發(fā)送通知。

通過以上源碼分析可以看出,Alertmanager 的告警抑制功能是通過配置文件定義規(guī)則,然后在告警處理流程中逐條檢查告警是否滿足抑制規(guī)則來實現(xiàn)的。

四、源碼深度剖析

(一)關鍵數(shù)據(jù)結(jié)構

1. Alert結(jié)構

在Alertmanager的源碼中,Alert結(jié)構體是表示告警信息的核心數(shù)據(jù)結(jié)構。其定義如下:

type Alert struct {
 GeneratorURL strfmt.URI `json:"generatorURL,omitempty"`
 Labels LabelSet `json:"labels"`
}

Labels字段是一個LabelSet類型的鍵值對集合,用于唯一標識告警,并在告警分組、路由和去重等操作中發(fā)揮關鍵作用。例如,在告警分組時,通過比較不同告警的Labels中指定的標簽,將具有相同標簽值的告警歸為一組。假設在一個分布式系統(tǒng)中,有多個服務實例,每個服務實例的告警都帶有service、instance等標簽。當配置group_by: ['service']時,Labels中service標簽值相同的告警會被分到同一組。

2. 其他重要結(jié)構

除了Alert結(jié)構,還有一些與告警處理緊密相關的重要結(jié)構。

Group結(jié)構在告警分組中起著關鍵作用。它將多個相關的Alert組合在一起,方便進行統(tǒng)一處理和通知。一個Group通常包含一組具有相同或相似特征的告警,這些特征由配置的group_by標簽決定。例如,在一個包含多個微服務的系統(tǒng)中,根據(jù)service標簽進行分組,所有與user-service相關的告警會被歸到同一個Group中。在代碼實現(xiàn)中,Group結(jié)構可能包含Labels(用于標識該組告警的公共標簽)、Alerts(該組內(nèi)的告警列表)等字段,如下所示:

type AlertGroup struct {  
    Alerts []*GettableAlert `json:"alerts"`
    Labels LabelSet `json:"labels"`
    Receiver *Receiver `json:"receiver"`
}

在告警分組過程中,通過計算告警的標簽指紋,將具有相同指紋(即相同group_by標簽組合)的告警添加到同一個Group中。這使得運維人員在收到通知時,可以一次性了解到與某個特定服務或場景相關的所有告警情況,而不是面對大量零散的告警信息,大大提高了告警處理的效率。

Route結(jié)構則用于定義告警的路由規(guī)則。它以樹狀結(jié)構組織,每個節(jié)點都包含了一系列的配置信息,用于決定如何處理接收到的告警。其核心構成包括Matchers(匹配條件)、Receiver(接收器)和Continue字段。

type Route struct {  
    parent *Route  
  
    // The configuration parameters for matches of this route.  
    RouteOpts RouteOpts  
  
    // Matchers an alert has to fulfill to match    // this route.    
    Matchers labels.Matchers  
  
    // If true, an alert matches further routes on the same level.  
    Continue bool  
  
    // Children routes of this route.  
    Routes []*Route  
}

Matchers用于判斷告警是否與該節(jié)點匹配。例如,通過設置Matchers: labels.Matchers{{Name: "severity", Value: "critical"}},表示當告警的severity標簽值為critical時,該告警與這個節(jié)點匹配。Receiver指定了匹配該節(jié)點的告警要發(fā)送到的接收器,如電子郵件地址、Slack頻道等。Continue字段決定了告警在匹配當前節(jié)點后的行為。當Continue為false(默認值)時,一旦告警匹配到當前節(jié)點,就會停止在路由樹中的進一步匹配,直接按照該節(jié)點的配置進行處理,如將告警發(fā)送到指定的接收器。而當Continue為true時,告警在匹配當前節(jié)點后,會繼續(xù)在路由樹中向下匹配其他節(jié)點,以尋找更合適的處理方式。通過這種靈活的路由樹結(jié)構和配置,Alertmanager能夠根據(jù)告警的各種屬性,將其準確地路由到相應的接收器,實現(xiàn)高效的告警分發(fā)。

(二)核心處理流程

1. API接收告警

在Alertmanager中,API接收告警的功能主要由api/v2/api.go文件中的代碼實現(xiàn)。當Alertmanager接收到來自Prometheus或其他監(jiān)控系統(tǒng)發(fā)送的告警時,會調(diào)用postAlertsHandler函數(shù),其代碼如下:

func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.Responder {  
    logger := api.requestLogger(params.HTTPRequest)  
  
    alerts := OpenAPIAlertsToAlerts(params.Alerts)  
    now := time.Now()  
  
    api.mtx.RLock()  
    resolveTimeout := time.Duration(api.alertmanagerConfig.Global.ResolveTimeout)  
    api.mtx.RUnlock()  
  
    for _, alert := range alerts {  
       alert.UpdatedAt = now  
  
       // Ensure StartsAt is set.  
       if alert.StartsAt.IsZero() {  
          if alert.EndsAt.IsZero() {  
             alert.StartsAt = now  
          } else {  
             alert.StartsAt = alert.EndsAt  
          }  
       }  
       // If no end time is defined, set a timeout after which an alert  
       // is marked resolved if it is not updated.       if alert.EndsAt.IsZero() {  
          alert.Timeout = true  
          alert.EndsAt = now.Add(resolveTimeout)  
       }  
       if alert.EndsAt.After(time.Now()) {  
          api.m.Firing().Inc()  
       } else {  
          api.m.Resolved().Inc()  
       }  
    }  
  
    // Make a best effort to insert all alerts that are valid.  
    var (  
       validAlerts    = make([]*types.Alert, 0, len(alerts))  
       validationErrs = &types.MultiError{}  
    )  
    for _, a := range alerts {  
       removeEmptyLabels(a.Labels)  
  
       if err := a.Validate(); err != nil {  
          validationErrs.Add(err)  
          api.m.Invalid().Inc()  
          continue  
       }  
       validAlerts = append(validAlerts, a)  
    }  
    if err := api.alerts.Put(validAlerts...); err != nil {  
       logger.Error("Failed to create alerts", "err", err)  
       return alert_ops.NewPostAlertsInternalServerError().WithPayload(err.Error())  
    }  
  
    if validationErrs.Len() > 0 {  
       logger.Error("Failed to validate alerts", "err", validationErrs.Error())  
       return alert_ops.NewPostAlertsBadRequest().WithPayload(validationErrs.Error())  
    }  
  
    return alert_ops.NewPostAlertsOK()  
}

首先,函數(shù)將接收到的告警數(shù)據(jù)params.Alerts通過OpenAPIAlertsToAlerts函數(shù)進行轉(zhuǎn)換,得到alerts列表。接著,對每個告警a進行處理。調(diào)用removeEmptyLabels函數(shù)清理告警標簽中的空值,確保標簽數(shù)據(jù)的有效性。然后,通過a.Validate()方法對告警進行全面校驗,包括檢查標簽的格式是否正確、是否包含必要的標簽等。如果校驗失敗,將錯誤信息添加到validationErrs中,并增加無效告警的統(tǒng)計計數(shù)api.m.Invalid().Inc(),同時跳過該無效告警,繼續(xù)處理下一個。

經(jīng)過校驗后,將所有有效的告警validAlerts通過api.alerts.Put方法存入alerts存儲結(jié)構中。這里的api.alerts是一個實現(xiàn)了Alerts接口的實例,通常是基于內(nèi)存的mem.Alerts實現(xiàn)。在mem.Alerts的Put方法中,會為每個告警生成唯一的指紋(基于標簽計算),并將告警存儲到內(nèi)部的store.Alerts結(jié)構(本質(zhì)是一個map[model.Fingerprint]*types.Alert)中。如果存儲過程中發(fā)生錯誤,記錄錯誤日志并返回500 Internal Server Error響應給發(fā)送方,告知告警接收失敗。若所有操作成功,則返回200 OK響應,表明告警已成功接收并存儲。通過這一系列嚴謹?shù)牟襟E,Alertmanager實現(xiàn)了對告警的準確接收、校驗和存儲,為后續(xù)的告警處理流程提供了可靠的數(shù)據(jù)基礎。

2. Dispatcher分發(fā)告警

Dispatcher在Alertmanager中承擔著分發(fā)告警的重要職責,其核心邏輯在dispatch/dispatch.go文件中。Dispatcher通過訂閱alerts存儲結(jié)構(通常是mem.Alerts)的Subscribe方法,獲取到新的告警信息。其Run函數(shù)如下:

func (d *Dispatcher) Run() {  
    d.done = make(chan struct{})  
  
    d.mtx.Lock()  
    d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{}  
    d.aggrGroupsNum = 0  
    d.metrics.aggrGroups.Set(0)  
    d.ctx, d.cancel = context.WithCancel(context.Background())  
    d.mtx.Unlock()  
  
    d.run(d.alerts.Subscribe())  
    close(d.done)  
}

在run函數(shù)中,通過不斷從it.Next()獲取新的告警alert,并針對每個接收到的告警,查找匹配的路由規(guī)則:

func (d *Dispatcher) run(it provider.AlertIterator) {  
    maintenance := time.NewTicker(30 * time.Second)  
    defer maintenance.Stop()  
  
    defer it.Close()  
  
    for {  
       select {  
       case alert, ok := <-it.Next():  
          if !ok {  
             // Iterator exhausted for some reason.  
             if err := it.Err(); err != nil {  
                d.logger.Error("Error on alert update", "err", err)  
             }  
             return  
          }  
  
          d.logger.Debug("Received alert", "alert", alert)  
  
          // Log errors but keep trying.  
          if err := it.Err(); err != nil {  
             d.logger.Error("Error on alert update", "err", err)  
             continue  
          }  
  
          now := time.Now()  
          for _, r := range d.route.Match(alert.Labels) {  
             d.processAlert(alert, r)  
          }  
          d.metrics.processingDuration.Observe(time.Since(now).Seconds())  
  
       case <-maintenance.C:  
          d.doMaintenance()  
       case <-d.ctx.Done():  
          return  
       }  
    }  
}

d.route.Match(alert.Labels)函數(shù)會遍歷路由樹,查找與告警標簽匹配的路由節(jié)點r。一旦找到匹配的路由節(jié)點,就調(diào)用processAlert函數(shù)對告警進行處理。

processAlert函數(shù)的主要工作是將告警分配到對應的聚合組aggrGroup中,代碼如下:

func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {  
    groupLabels := getGroupLabels(alert, route)  
  
    fp := groupLabels.Fingerprint()  
  
    d.mtx.Lock()  
    defer d.mtx.Unlock()  
  
    routeGroups, ok := d.aggrGroupsPerRoute[route]  
    if !ok {  
       routeGroups = map[model.Fingerprint]*aggrGroup{}  
       d.aggrGroupsPerRoute[route] = routeGroups  
    }  
  
    ag, ok := routeGroups[fp]  
    if ok {  
       ag.insert(alert)  
       return  
    }  
  
    // If the group does not exist, create it. But check the limit first.  
    if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit {  
       d.metrics.aggrGroupLimitReached.Inc()  
       d.logger.Error("Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name())  
       return  
    }  
  
    ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)  
    routeGroups[fp] = ag  
    d.aggrGroupsNum++  
    d.metrics.aggrGroups.Inc()  
  
    // Insert the 1st alert in the group before starting the group's run()  
    // function, to make sure that when the run() will be executed the 1st    
    // alert is already there.    
    ag.insert(alert)  
  
    go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {  
       _, _, err := d.stage.Exec(ctx, d.logger, alerts...)  
       if err != nil {  
          logger := d.logger.With("num_alerts", len(alerts), "err", err)  
          if errors.Is(ctx.Err(), context.Canceled) {  
             // It is expected for the context to be canceled on  
             // configuration reload or shutdown. In this case, the             
             // message should only be logged at the debug level.             
             logger.Debug("Notify for alerts failed")  
          } else {  
             logger.Error("Notify for alerts failed")  
          }  
       }  
       return err == nil  
    })  
}

首先,根據(jù)告警alert和匹配的路由route,通過getGroupLabels函數(shù)獲取用于分組的標簽groupLabels,并計算其指紋fp。然后,從d.aggrGroupsPerRoute中查找與該路由route對應的聚合組集合routeGroups。如果該集合不存在,則創(chuàng)建一個新的空集合。接著,在routeGroups中查找是否已存在指紋為fp的聚合組ag。若存在,直接將告警插入該聚合組ag.insert(alert)。若不存在,則創(chuàng)建一個新的聚合組ag,使用newAggrGroup函數(shù),傳入上下文d.ctx、分組標簽groupLabels、路由route、超時設置d.timeout和日志記錄器d.logger。創(chuàng)建完成后,將新的聚合組添加到routeGroups中,并增加聚合組的統(tǒng)計計數(shù)。最后,將告警插入新創(chuàng)建的聚合組,并啟動一個新的協(xié)程go ag.run(...),該協(xié)程會調(diào)用d.stage.Exec方法,將聚合組中的告警傳遞給后續(xù)的處理階段(如Notify模塊)進行處理。通過這樣的流程,Dispatcher實現(xiàn)了將告警準確地分發(fā)到相應的聚合組,為后續(xù)的告警處理和通知發(fā)送做好準備。

3. Notify模塊發(fā)送通知

Notify模塊負責將處理后的告警信息發(fā)送給相應的接收者,其核心處理流程在notify/notify.go文件中定義。Notify模塊通過構建一個pipeline來處理告警,pipeline由多個階段(Stage)組成,每個階段執(zhí)行特定的操作。

PipelineBuilder用于構建pipeline,其New函數(shù)如下:

func (pb *PipelineBuilder) New(  
    receivers map[string][]Integration,  
    wait func() time.Duration,  
    inhibitor *inhibit.Inhibitor,  
    silencer *silence.Silencer,  
    intervener *timeinterval.Intervener,  
    marker types.GroupMarker,  
    notificationLog NotificationLog,  
    peer Peer,  
) RoutingStage {  
    rs := make(RoutingStage, len(receivers))  
  
    ms := NewGossipSettleStage(peer)  
    is := NewMuteStage(inhibitor, pb.metrics)  
    tas := NewTimeActiveStage(intervener, marker, pb.metrics)  
    tms := NewTimeMuteStage(intervener, marker, pb.metrics)  
    ss := NewMuteStage(silencer, pb.metrics)  
  
    for name := range receivers {  
       st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics)  
       rs[name] = MultiStage{ms, is, tas, tms, ss, st}  
    }  
  
    pb.metrics.InitializeFor(receivers)  
  
    return rs  
}

在構建pipeline時,首先創(chuàng)建了一些通用的階段,如NewGossipSettleStage用于處理集群節(jié)點間的狀態(tài)同步(確保各個節(jié)點對告警處理的一致性),NewMuteStage(分別基于inhibitor和silencer)用于實現(xiàn)告警抑制和靜默功能,NewTimeMuteStage和NewTimeActiveStage用于根據(jù)時間區(qū)間對告警進行靜音或激活處理。然后,針對每個接收器name,通過createReceiverStage函數(shù)創(chuàng)建一個包含特定接收器處理流程的階段st。這個階段通常包含Wait(等待階段,用于控制發(fā)送頻率)、Dedup(去重階段,避免重復發(fā)送相同告警)、Retry(重試階段,確保告警能成功發(fā)送)等子階段。最后,將這些階段組合成一個MultiStage結(jié)構,并添加到rs中,形成完整的pipeline。

以RetryStage為例,其執(zhí)行邏輯如下:

func (r RetryStage) exec(ctx context.Context, l *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {  
    var sent []*types.Alert  
  
    // If we shouldn't send notifications for resolved alerts, but there are only  
    // resolved alerts, report them all as successfully notified (we still want the    // notification log to log them for the next run of DedupStage).    
    if !r.integration.SendResolved() {  
       firing, ok := FiringAlerts(ctx)  
       if !ok {  
          return ctx, nil, errors.New("firing alerts missing")  
       }  
       if len(firing) == 0 {  
          return ctx, alerts, nil  
       }  
       for _, a := range alerts {  
          if a.Status() != model.AlertResolved {  
             sent = append(sent, a)  
          }  
       }  
    } else {  
       sent = alerts  
    }  
  
    b := backoff.NewExponentialBackOff()  
    b.MaxElapsedTime = 0 // Always retry.  
  
    tick := backoff.NewTicker(b)  
    defer tick.Stop()  
  
    var (  
       i    = 0  
       iErr error  
    )  
  
    l = l.With("receiver", r.groupName, "integration", r.integration.String())  
    if groupKey, ok := GroupKey(ctx); ok {  
       l = l.With("aggrGroup", groupKey)  
    }  
  
    for {  
       i++  
       // Always check the context first to not notify again.  
       select {  
       case <-ctx.Done():  
          if iErr == nil {  
             iErr = ctx.Err()  
             if errors.Is(iErr, context.Canceled) {  
                iErr = NewErrorWithReason(ContextCanceledReason, iErr)  
             } else if errors.Is(iErr, context.DeadlineExceeded) {  
                iErr = NewErrorWithReason(ContextDeadlineExceededReason, iErr)  
             }  
          }  
  
          if iErr != nil {  
             return ctx, nil, fmt.Errorf("%s/%s: notify retry canceled after %d attempts: %w", r.groupName, r.integration.String(), i, iErr)  
          }  
          return ctx, nil, nil  
       default:  
       }  
  
       select {  
       case <-tick.C:  
          now := time.Now()  
          retry, err := r.integration.Notify(ctx, sent...)  
          dur := time.Since(now)  
          r.metrics.notificationLatencySeconds.WithLabelValues(r.labelValues...).Observe(dur.Seconds())  
          r.metrics.numNotificationRequestsTotal.WithLabelValues(r.labelValues...).Inc()  
          if err != nil {  
             r.metrics.numNotificationRequestsFailedTotal.WithLabelValues(r.labelValues...).Inc()  
             if !retry {  
                return ctx, alerts, fmt.Errorf("%s/%s: notify retry canceled due to unrecoverable error after %d attempts: %w", r.groupName, r.integration.String(), i, err)  
             }  
             if ctx.Err() == nil {  
                if iErr == nil || err.Error() != iErr.Error() {  
                   // Log the error if the context isn't done and the error isn't the same as before.  
                   l.Warn("Notify attempt failed, will retry later", "attempts", i, "err", err)  
                }  
                // Save this error to be able to return the last seen error by an  
                // integration upon context timeout.                
                iErr = err  
             }  
          } else {  
             l := l.With("attempts", i, "duration", dur)  
             if i <= 1 {  
                l = l.With("alerts", fmt.Sprintf("%v", alerts))  
                l.Debug("Notify success")  
             } else {  
                l.Info("Notify success")  
             }  
  
             return ctx, alerts, nil  
          }  
       case <-ctx.Done():  
          if iErr == nil {  
             iErr = ctx.Err()  
             if errors.Is(iErr, context.Canceled) {  
                iErr = NewErrorWithReason(ContextCanceledReason, iErr)  
             } else if errors.Is(iErr, context.DeadlineExceeded) {  
                iErr = NewErrorWithReason(ContextDeadlineExceededReason, iErr)  
             }  
          }  
          if iErr != nil {  
             return ctx, nil, fmt.Errorf("%s/%s: notify retry canceled after %d attempts: %w", r.groupName, r.integration.String(), i, iErr)  
          }  
          return ctx, nil, nil  
       }  
    }  
}

在RetryStage中,使用backoff.NewExponentialBackOff創(chuàng)建一個指數(shù)退避策略的重試機制。通過backoff.NewTicker啟動一個定時器tick,定時嘗試發(fā)送告警。每次嘗試發(fā)送時,調(diào)用r.integration.Notify(ctx, alerts...)方法,通過具體的接收器(如電子郵件、Slack等集成方式)發(fā)送告警。在整個過程中,如果上下文ctx被取消,且存在未處理的錯誤iErr,則返回錯誤信息。通過這樣的pipeline處理流程,Notify模塊實現(xiàn)了對告警的去重、按策略發(fā)送和重試等功能,確保告警能夠準確、可靠地發(fā)送到相應的接收者手中。

五、最后

Alertmanager作為監(jiān)控體系中的關鍵組件,在告警處理方面展現(xiàn)出了強大的功能和高效的實現(xiàn)邏輯。通過對告警去重、分組、路由和抑制等核心功能的深入分析,以及對其源碼中關鍵數(shù)據(jù)結(jié)構和處理流程的解讀,我們清晰地認識到它如何在復雜的監(jiān)控環(huán)境中,將海量的告警信息進行有序管理和精準分發(fā)。它不僅有效減少了告警噪音,提高了運維人員的工作效率,還確保了關鍵告警能夠及時送達相關人員手中,為保障系統(tǒng)的穩(wěn)定運行發(fā)揮了重要作用。

責任編輯:武曉燕 來源: 運維開發(fā)故事
相關推薦

2009-09-25 09:36:55

Hibernate核心

2021-02-18 15:36:13

PrometheusAlertmanageGrafana

2024-01-16 16:39:33

PythonPyPy

2025-04-09 08:05:00

運維告警Prometheus

2024-09-11 08:37:39

2021-11-29 12:11:09

npm包管理器工具

2010-12-15 15:46:43

SharePoint

2011-12-25 15:33:13

ibmdwKVM

2017-11-24 11:38:05

2010-05-25 12:59:00

Subversion

2009-09-14 15:12:40

LINQ to XML

2009-07-06 10:44:45

JSP charset

2009-09-27 17:13:36

Hibernate V

2010-06-03 13:08:51

2011-06-03 13:48:18

JavaScript重構

2023-01-13 08:35:29

告警降噪系統(tǒng)

2021-03-06 22:41:06

內(nèi)核源碼CAS

2010-09-07 13:40:02

DIV標簽

2009-09-28 14:54:33

Hibernate映射

2010-06-17 14:35:03

設計模式 UML
點贊
收藏

51CTO技術棧公眾號