譯者 | 李睿
審校 | 重樓
本文將深入探討一個(gè)至關(guān)重要的問題:當(dāng)系統(tǒng)出現(xiàn)問題時(shí),應(yīng)當(dāng)如何有效地監(jiān)控服務(wù)?
一方面,可以借助具備提醒功能的Prometheus,以及集成儀表板和其他實(shí)用功能的Kibana來增強(qiáng)監(jiān)控能力。另一方面,在日志收集方面,ELK堆棧無疑是首選方案。然而,簡(jiǎn)單的日志記錄往往不足以滿足需求,因?yàn)樗鼰o法提供覆蓋整個(gè)組件生態(tài)系統(tǒng)的請(qǐng)求流程的整體視圖。
如果直觀地展示請(qǐng)求流程呢?或者需要在系統(tǒng)間追蹤關(guān)聯(lián)的請(qǐng)求,該怎么辦?這既適用于微服務(wù),也適用于單體服務(wù)——有多少服務(wù)并不重要;重要的是如何管理它們的延遲。
事實(shí)上,每個(gè)用戶請(qǐng)求可能要經(jīng)過由獨(dú)立服務(wù)、數(shù)據(jù)庫(kù)、消息隊(duì)列和外部API組成的復(fù)雜鏈路。
在這種復(fù)雜的環(huán)境中,很難準(zhǔn)確地確定延遲發(fā)生的位置,確定鏈路的哪一部分是性能瓶頸,并在發(fā)生故障時(shí)快速找到其根本原因。
為了有效地應(yīng)對(duì)這些挑戰(zhàn),需要一個(gè)集中的、一致的系統(tǒng)來收集遙測(cè)數(shù)據(jù)——包括跟蹤、指標(biāo)和日志。這正是OpenTelemetry和Jaeger發(fā)揮重要作用的地方。
了解基礎(chǔ)知識(shí)
人們必須理解以下兩個(gè)主要術(shù)語:
Trace ID
Trace ID是一個(gè)16字節(jié)的標(biāo)識(shí)符,通常表示為32個(gè)字符的十六進(jìn)制字符串。它在跟蹤開始時(shí)自動(dòng)生成,并在由特定請(qǐng)求創(chuàng)建的所有跨度中保持不變。這樣可以很容易地看到請(qǐng)求是如何通過系統(tǒng)中的不同服務(wù)或組件傳遞的。
Span ID
跟蹤中的每個(gè)單獨(dú)操作都有自己的Span ID,它通常是一個(gè)隨機(jī)生成的64位值。Span共享相同的Trace ID,但是每個(gè)Span都有一個(gè)唯一的Span ID,因此可以確定每個(gè)Span代表工作流的哪個(gè)部分(如數(shù)據(jù)庫(kù)查詢或?qū)α硪粋€(gè)微服務(wù)的調(diào)用)。
它們之間有何關(guān)聯(lián)?
Trace ID和Span ID是相輔相成的。
當(dāng)發(fā)起請(qǐng)求時(shí),會(huì)生成一個(gè)Trace ID,并將其傳遞給所有相關(guān)服務(wù)。每個(gè)服務(wù)又會(huì)創(chuàng)建一個(gè)與Trace ID關(guān)聯(lián)的、具有唯一Span ID的Span,從而能夠可視化請(qǐng)求從開始到結(jié)束的完整生命周期。
那么,為什么不直接使用Jaeger呢?為什么需要OpenTelemetry(OTEL)及其所有規(guī)范?這是一個(gè)很好的問題!以下逐步分析。
- Jaeger是一個(gè)用于存儲(chǔ)和可視化分布式跟蹤的系統(tǒng)。它收集、存儲(chǔ)、搜索和顯示數(shù)據(jù),顯示請(qǐng)求如何通過服務(wù)“傳輸”。
- OpenTelemetry (OTEL)是一個(gè)標(biāo)準(zhǔn)(以及一組庫(kù)),用于從應(yīng)用程序和基礎(chǔ)設(shè)施中收集遙測(cè)數(shù)據(jù)(跟蹤、指標(biāo)、日志)。它不依賴于任何單一的可視化工具或后端。
簡(jiǎn)而言之:
- OTEL就像一種“通用語言”和一組遙測(cè)收集庫(kù)。
- Jaeger是用于查看和分析分布式跟蹤的后端和用戶界面。
如果已經(jīng)有了Jaeger,為什么還需要OTEL?
1.單一的收集標(biāo)準(zhǔn)
在過去,有像OpenTracing和OpenCensus這樣的項(xiàng)目。OpenTelemetry將這些收集指標(biāo)和跟蹤的方法統(tǒng)一到一個(gè)通用標(biāo)準(zhǔn)中。
2.易于集成
采用Go(或其他語言)編寫代碼,為自動(dòng)注入攔截器和跨度添加OTEL庫(kù),就這樣完成。之后,無論想把數(shù)據(jù)發(fā)送到哪里并不重要——Jaeger、Tempo、Zipkin、Datadog、自定義后端——OpenTelemetry都會(huì)負(fù)責(zé)管道。只需更換導(dǎo)出器即可。
3.不僅僅是跟蹤
OpenTelemetry不僅涵蓋跟蹤,還處理指標(biāo)和日志。最終,可以使用一個(gè)工具集來滿足所有遙測(cè)需求,而不僅僅是跟蹤。
4.以Jaeger為后端
如果主要對(duì)分布式跟蹤可視化感興趣,Jaeger是一個(gè)很好的選擇。但默認(rèn)情況下,它不提供跨語言檢測(cè)。另一方面,OpenTetry提供了一種標(biāo)準(zhǔn)化的數(shù)據(jù)收集方式,然后可以決定將數(shù)據(jù)發(fā)送到哪里(包括Jaeger)。
在實(shí)踐中,它們經(jīng)常協(xié)同工作:
應(yīng)用程序使用OpenTelemetry→通過OTLP協(xié)議通信→進(jìn)入OpenTelemetry收集器(HTTP或gRPC)→導(dǎo)出到Jaeger進(jìn)行可視化。
技術(shù)部分
系統(tǒng)設(shè)計(jì)(簡(jiǎn)要介紹)
以下快速勾勒出幾個(gè)服務(wù),這些服務(wù)將執(zhí)行以下操作:
1.購(gòu)買服務(wù)——處理付款并記錄在MongoDB中。
2.CDC與Debezium——監(jiān)聽MongoDB表中的更改,并將它們發(fā)送到Kafka。
3.購(gòu)買處理器——使用來自Kafka的消息并調(diào)用Auth服務(wù)查找user_id進(jìn)行驗(yàn)證。
4.認(rèn)證服務(wù)——一種簡(jiǎn)單的用戶服務(wù)。
總結(jié):
- 3 Go services
- Kafka
- CDC (Debezium)
- MongoDB
代碼部分
從基礎(chǔ)設(shè)施開始。為了將所有內(nèi)容匯集到一個(gè)系統(tǒng)中,將創(chuàng)建一個(gè)大型的DockerCompose文件,并從設(shè)置遙測(cè)開始。
注:所有代碼都可以通過本文末尾的鏈接獲得,包括基礎(chǔ)設(shè)施。
YAML
services:
jaeger:
image: jaegertracing/all-in-one:1.52
ports:
- "6831:6831/udp" # UDP port for the Jaeger agent
- "16686:16686" # Web UI
- "14268:14268" # HTTP port for spans
networks:
- internal
prometheus:
image: prom/prometheus:latest
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml:ro
ports:
- "9090:9090"
depends_on:
- kafka
- jaeger
- otel-collector
command:
--config.file=/etc/prometheus/prometheus.yml
networks:
- internal
otel-collector:
image: otel/opentelemetry-collector-contrib:0.91.0
command: ['--cnotallow=/etc/otel-collector.yaml']
ports:
- "4317:4317" # OTLP gRPC receiver
volumes:
- ./otel-collector.yaml:/etc/otel-collector.yaml
depends_on:
- jaeger
networks:
- internal
還將配置收集器——收集遙測(cè)數(shù)據(jù)的組件。
在這里選擇gRPC進(jìn)行數(shù)據(jù)傳輸,這意味著通信將通過HTTP/2進(jìn)行:
YAML
receivers:
# Add the OTLP receiver listening on port 4317.
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
processors:
batch:
# https://github.com/open-telemetry/opentelemetry-collector/tree/main/processor/memorylimiterprocessor
memory_limiter:
check_interval: 1s
limit_percentage: 80
spike_limit_percentage: 15
extensions:
health_check: {}
exporters:
otlp:
endpoint: "jaeger:4317"
tls:
insecure: true
prometheus:
endpoint: 0.0.0.0:9090
debug:
verbosity: detailed
service:
extensions: [health_check]
pipelines:
traces:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [otlp]
metrics:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [prometheus]
確保根據(jù)需要調(diào)整任何地址,這樣就完成了基本配置。
OpenTelemetry (OTEL)使用兩個(gè)關(guān)鍵概念——Trace ID和Span ID,它們有助于跟蹤和監(jiān)控分布式系統(tǒng)中的請(qǐng)求。
代碼實(shí)現(xiàn)
現(xiàn)在了解如何讓它在Go代碼中實(shí)現(xiàn)這一點(diǎn)。需要以下導(dǎo)入:
Go
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
然后,當(dāng)應(yīng)用程序啟動(dòng)時(shí),在main()中添加一個(gè)函數(shù)來初始化跟蹤器:
Go
func InitTracer(ctx context.Context) func() {
exp, err := otlptrace.New(
ctx,
otlptracegrpc.NewClient(
otlptracegrpc.WithEndpoint(endpoint),
otlptracegrpc.WithInsecure(),
),
)
if err != nil {
log.Fatalf("failed to create OTLP trace exporter: %v", err)
}
res, err := resource.New(ctx,
resource.WithAttributes(
semconv.ServiceNameKey.String("auth-service"),
semconv.ServiceVersionKey.String("1.0.0"),
semconv.DeploymentEnvironmentKey.String("stg"),
),
)
if err != nil {
log.Fatalf("failed to create resource: %v", err)
}
tp := trace.NewTracerProvider(
trace.WithBatcher(exp),
trace.WithResource(res),
)
otel.SetTracerProvider(tp)
return func() {
err := tp.Shutdown(ctx)
if err != nil {
log.Printf("error shutting down tracer provider: %v", err)
}
}
}
在設(shè)置跟蹤之后,只需要在代碼中放置span來跟蹤調(diào)用。例如,如果想測(cè)量數(shù)據(jù)庫(kù)調(diào)用(因?yàn)檫@通常是尋找性能問題的第一個(gè)地方),可以這樣寫:
Go
tracer := otel.Tracer("auth-service")
ctx, span := tracer.Start(ctx, "GetUserInfo")
defer span.End()
tracedLogger := logging.AddTraceContextToLogger(ctx)
tracedLogger.Info("find user info",
zap.String("operation", "find user"),
zap.String("username", username),
)
user, err := s.userRepo.GetUserInfo(ctx, username)
if err != nil {
s.logger.Error(errNotFound)
span.RecordError(err)
span.SetStatus(otelCodes.Error, "Failed to fetch user info")
return nil, status.Errorf(grpcCodes.NotFound, errNotFound, err)
}
span.SetStatus(otelCodes.Ok, "User info retrieved successfully")
在服務(wù)層進(jìn)行跟蹤,這太棒了!但可以更深入地分析數(shù)據(jù)庫(kù)層:
Go
func (r *UserRepository) GetUserInfo(ctx context.Context, username string) (*models.User, error) {
tracer := otel.Tracer("auth-service")
ctx, span := tracer.Start(ctx, "UserRepository.GetUserInfo",
trace.WithAttributes(
attribute.String("db.statement", query),
attribute.String("db.user", username),
),
)
defer span.End()
var user models.User
// Some code that queries the DB...
// err := doDatabaseCall()
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "Failed to execute query")
return nil, fmt.Errorf("failed to fetch user info: %w", err)
}
span.SetStatus(codes.Ok, "Query executed successfully")
return &user, nil
}
現(xiàn)在,你對(duì)請(qǐng)求過程有了完整的了解。前往Jaeger UI,查詢auth-service下的最后20條跟蹤記錄,將會(huì)在一個(gè)界面中看到所有的Span以及它們之間的關(guān)聯(lián)方式。
現(xiàn)在,一切都是可見的。如果需要,可以將整個(gè)查詢包含在標(biāo)記中。需要記住,不應(yīng)該使遙測(cè)過載——故意添加數(shù)據(jù)。在這里只是在演示什么是可能的,但包括完整的查詢,通常不推薦這種方式。
gRPC客戶機(jī)-服務(wù)器
如果希望查看跨越兩個(gè)gRPC服務(wù)的跟蹤,這很簡(jiǎn)單。需要做的就是從庫(kù)中添加開箱即用的攔截器。例如,在服務(wù)器端:
Go
server := grpc.NewServer(
grpc.StatsHandler(otelgrpc.NewServerHandler()),
)
pb.RegisterAuthServiceServer(server, authService)
在客戶端,代碼也很短:
Go
shutdown := tracing.InitTracer(ctx)
defer shutdown()
conn, err := grpc.Dial(
"auth-service:50051",
grpc.WithInsecure(),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
)
if err != nil {
logger.Fatal("error", zap.Error(err))
}
就是這樣!確保導(dǎo)出器配置正確,當(dāng)客戶端調(diào)用服務(wù)器時(shí),將看到這些服務(wù)上記錄的單個(gè)Trace ID。
處理CDC事件和跟蹤
也想變更數(shù)據(jù)捕獲 (CDC)的事嗎?一個(gè)簡(jiǎn)單的方法是將Trace ID嵌入到MongoDB存儲(chǔ)的對(duì)象中。這樣,當(dāng)Debezium捕獲更改并將其發(fā)送給Kafka時(shí),Trace ID已經(jīng)是記錄的一部分。
例如,如果使用的是MongoDB,可以這樣做:
Go
func (r *mongoPurchaseRepo) SavePurchase(ctx context.Context, purchase entity.Purchase) error {
span := r.handleTracing(ctx, purchase)
defer span.End()
// Insert the record into MongoDB, including the current span's Trace ID
_, err := r.collection.InsertOne(ctx, bson.M{
"_id": purchase.ID,
"user_id": purchase.UserID,
"username": purchase.Username,
"amount": purchase.Amount,
"currency": purchase.Currency,
"payment_method": purchase.PaymentMethod,
// ...
"trace_id": span.SpanContext().TraceID().String(),
})
return err
}
然后Debezium拾取這個(gè)對(duì)象(包括trace_id)并將其發(fā)送給Kafka。在消費(fèi)者端,只需解析傳入消息,提取trace_id,并將其合并到跟蹤上下文中:
Go
// If we find a Trace ID in the payload, attach it to the context
newCtx := ctx
if traceID != "" {
log.Printf("Found Trace ID: %s", traceID)
newCtx = context.WithValue(ctx, "trace-id", traceID)
}
// Create a new span
tracer := otel.Tracer("purchase-processor")
newCtx, span := tracer.Start(newCtx, "handler.processPayload")
defer span.End()
if traceID != "" {
span.SetAttributes(
attribute.String("trace.id", traceID),
)
}
// Parse the "after" field into a Purchase struct...
var purchase model.Purchase
if err := mapstructure.Decode(afterDoc, &purchase); err != nil {
log.Printf("Failed to map 'after' payload to Purchase struct: %v", err)
return err
}
Go:
// If we find a Trace ID in the payload, attach it to the context
newCtx := ctx
if traceID != "" {
log.Printf("Found Trace ID: %s", traceID)
newCtx = context.WithValue(ctx, "trace-id", traceID)
}
// Create a new span
tracer := otel.Tracer("purchase-processor")
newCtx, span := tracer.Start(newCtx, "handler.processPayload")
defer span.End()
if traceID != "" {
span.SetAttributes(
attribute.String("trace.id", traceID),
)
}
// Parse the "after" field into a Purchase struct...
var purchase model.Purchase
if err := mapstructure.Decode(afterDoc, &purchase); err != nil {
log.Printf("Failed to map 'after' payload to Purchase struct: %v", err)
return err
}
替代方案:使用Kafka標(biāo)頭
有時(shí),將Trace ID存儲(chǔ)在Kafka標(biāo)頭中比存儲(chǔ)在負(fù)載本身中更容易。對(duì)于CDC工作流來說,這可能無法開箱使用——Debezium可能限制添加到標(biāo)題中的內(nèi)容。但是如果控制了生產(chǎn)者端(或者如果使用的是標(biāo)準(zhǔn)的Kafka生產(chǎn)者),那么可以使用Sarama等工具執(zhí)行以下操作:
將Trace ID注入標(biāo)頭
Go
// saramaHeadersCarrier is a helper to set/get headers in a Sarama message.
type saramaHeadersCarrier *[]sarama.RecordHeader
func (c saramaHeadersCarrier) Get(key string) string {
for _, h := range *c {
if string(h.Key) == key {
return string(h.Value)
}
}
return ""
}
func (c saramaHeadersCarrier) Set(key string, value string) {
*c = append(*c, sarama.RecordHeader{
Key: []byte(key),
Value: []byte(value),
})
}
// Before sending a message to Kafka:
func produceMessageWithTraceID(ctx context.Context, producer sarama.SyncProducer, topic string, value []byte) error {
span := trace.SpanFromContext(ctx)
traceID := span.SpanContext().TraceID().String()
headers := make([]sarama.RecordHeader, 0)
carrier := saramaHeadersCarrier(&headers)
carrier.Set("trace-id", traceID)
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(value),
Headers: headers,
}
_, _, err := producer.SendMessage(msg)
return err
}
在消費(fèi)者端提取Trace ID
Go
for message := range claim.Messages() {
// Extract the trace ID from headers
var traceID string
for _, hdr := range message.Headers {
if string(hdr.Key) == "trace-id" {
traceID = string(hdr.Value)
}
}
// Now continue your normal tracing workflow
if traceID != "" {
log.Printf("Found Trace ID in headers: %s", traceID)
// Attach it to the context or create a new span with this info
}
}
根據(jù)用例和CDC管道的設(shè)置方式,可以選擇最有效的方法:
1.在數(shù)據(jù)庫(kù)記錄中嵌入Trace ID,使其通過CDC自然流動(dòng)。
2.如果對(duì)生產(chǎn)者有更多的控制權(quán),或者想避免增加消息有效載荷的大小,可以使用Kafka標(biāo)頭。
無論哪種方式,都可以確??缍鄠€(gè)服務(wù)的跟蹤信息保持一致,即使事件是通過Kafka和Debezium異步處理的。
結(jié)論
使用OpenTelemetry和Jaeger提供詳細(xì)的請(qǐng)求跟蹤信息,幫助確定分布式系統(tǒng)中延遲發(fā)生的位置和原因。
在添加Prometheus之后,可以通過指標(biāo)(性能和穩(wěn)定性的關(guān)鍵指標(biāo))完善監(jiān)控體系。這些工具共同構(gòu)成了一個(gè)全面的可觀測(cè)性堆棧,能夠更快地檢測(cè)和解決問題、優(yōu)化性能以及提高系統(tǒng)的整體可靠性。
可以說,這種方法在微服務(wù)環(huán)境中顯著加快了故障排除的速度,是在項(xiàng)目中最先實(shí)施的事項(xiàng)之一。
參考鏈接
原文標(biāo)題:Control Your Services With OTEL, Jaeger, and Prometheus,作者:Ilia Ivankin