自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Loki 源碼分析之日志寫入

開發(fā) 前端
這里面我們嘗試對 Loki 的源碼進(jìn)行一些簡單的分析,由于有很多模塊和實(shí)現(xiàn)細(xì)節(jié),這里我們主要是對核心功能進(jìn)行分析,希望對大家有所幫助。本文首先對日志的寫入過程進(jìn)行簡單分析。

 [[403097]]

前面我們介紹了 Loki 的一些基本使用配置,但是對 Loki 還是了解不夠深入,官方文檔寫得較為凌亂,而且沒有跟上新版本,為了能夠?qū)?Loki 有一個更深入的認(rèn)識,做到有的放矢,這里面我們嘗試對 Loki 的源碼進(jìn)行一些簡單的分析,由于有很多模塊和實(shí)現(xiàn)細(xì)節(jié),這里我們主要是對核心功能進(jìn)行分析,希望對大家有所幫助。本文首先對日志的寫入過程進(jìn)行簡單分析。

Distributor Push API

Promtail 通過 Loki 的 Push API 接口推送日志數(shù)據(jù),該接口在初始化 Distributor 的時候進(jìn)行初始化,在控制器基礎(chǔ)上包裝了兩個中間件,其中的 HTTPAuthMiddleware 就是獲取租戶 ID,如果開啟了認(rèn)證配置,則從 X-Scope-OrgID 這個請求 Header 頭里面獲取,如果沒有配置則用默認(rèn)的 fake 代替。

  1. // pkg/loki/modules.go 
  2. func (t *Loki) initDistributor() (services.Service, error) { 
  3.  ...... 
  4.  if t.cfg.Target != All { 
  5.   logproto.RegisterPusherServer(t.Server.GRPC, t.distributor) 
  6.  } 
  7.  
  8.  pushHandler := middleware.Merge( 
  9.   serverutil.RecoveryHTTPMiddleware, 
  10.   t.HTTPAuthMiddleware, 
  11.  ).Wrap(http.HandlerFunc(t.distributor.PushHandler)) 
  12.  
  13.  t.Server.HTTP.Handle("/api/prom/push", pushHandler) 
  14.  t.Server.HTTP.Handle("/loki/api/v1/push", pushHandler) 
  15.  return t.distributor, nil 

Push API 處理器實(shí)現(xiàn)如下所示,首先通過 ParseRequest 函數(shù)將 Http 請求轉(zhuǎn)換成 logproto.PushRequest,然后直接調(diào)用 Distributor 下面的 Push 函數(shù)來推送日志數(shù)據(jù):

  1. // pkg/distributor/http.go 
  2.  
  3. // PushHandler 從 HTTP body 中讀取一個 snappy 壓縮的 proto 
  4. func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { 
  5.  logger := util_log.WithContext(r.Context(), util_log.Logger) 
  6.  userID, _ := user.ExtractOrgID(r.Context()) 
  7.  req, err := ParseRequest(logger, userID, r) 
  8.  ...... 
  9.  _, err = d.Push(r.Context(), req) 
  10.  ...... 
  11.  
  12. func ParseRequest(logger gokit.Logger, userID string, r *http.Request) (*logproto.PushRequest, error) { 
  13.  var body lokiutil.SizeReader 
  14.  contentEncoding := r.Header.Get(contentEnc) 
  15.  switch contentEncoding { 
  16.  case ""
  17.   body = lokiutil.NewSizeReader(r.Body) 
  18.  case "snappy"
  19.   body = lokiutil.NewSizeReader(r.Body) 
  20.  case "gzip"
  21.   gzipReader, err := gzip.NewReader(r.Body) 
  22.   if err != nil { 
  23.    return nil, err 
  24.   } 
  25.   defer gzipReader.Close() 
  26.   body = lokiutil.NewSizeReader(gzipReader) 
  27.  default
  28.   return nil, fmt.Errorf("Content-Encoding %q not supported", contentEncoding) 
  29.  } 
  30.  
  31.  contentType := r.Header.Get(contentType) 
  32.  var req logproto.PushRequest 
  33.  ...... 
  34.  switch contentType { 
  35.  case applicationJSON: 
  36.   var err error 
  37.   if loghttp.GetVersion(r.RequestURI) == loghttp.VersionV1 { 
  38.    err = unmarshal.DecodePushRequest(body, &req) 
  39.   } else { 
  40.    err = unmarshal_legacy.DecodePushRequest(body, &req) 
  41.   } 
  42.   if err != nil { 
  43.    return nil, err 
  44.   } 
  45.  default
  46.   // When no content-type header is set or when it is set to 
  47.   // `application/x-protobuf`: expect snappy compression. 
  48.   if err := util.ParseProtoReader(r.Context(), body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil { 
  49.    return nil, err 
  50.   } 
  51.  } 
  52.  return &req, nil 

首先我們先了解下 PushRequest 的結(jié)構(gòu),PushRequest 就是一個 Stream 集合:

  1. // pkg/logproto/logproto.pb.go 
  2. type PushRequest struct { 
  3.  Streams []Stream `protobuf:"bytes,1,rep,name=streams,proto3,customtype=Stream" json:"streams"
  4.  
  5. // pkg/logproto/types.go 
  6. // Stream 流包含一個唯一的標(biāo)簽集,作為一個字符串,然后還包含一組日志條目 
  7. type Stream struct { 
  8.  Labels  string  `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"
  9.  Entries []Entry `protobuf:"bytes,2,rep,name=entries,proto3,customtype=EntryAdapter" json:"entries"
  10.  
  11. // Entry 是一個帶有時間戳的日志條目 
  12. type Entry struct { 
  13.  Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"
  14.  Line      string    `protobuf:"bytes,2,opt,name=line,proto3" json:"line"

 

然后查看 Distributor 下的 Push 函數(shù)實(shí)現(xiàn):

  1. // pkg/distributor/distributor.go 
  2. // Push 日志流集合 
  3. func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) { 
  4.  // 獲取租戶ID 
  5.  userID, err := user.ExtractOrgID(ctx) 
  6.  ...... 
  7.  
  8.  // 首先把請求平鋪成一個樣本的列表 
  9.  streams := make([]streamTracker, 0, len(req.Streams)) 
  10.  keys := make([]uint32, 0, len(req.Streams)) 
  11.  var validationErr error 
  12.  validatedSamplesSize := 0 
  13.  validatedSamplesCount := 0 
  14.  
  15.  validationContext := d.validator.getValidationContextFor(userID) 
  16.  
  17.  for _, stream := range req.Streams { 
  18.   // 解析日志流標(biāo)簽 
  19.   stream.Labels, err = d.parseStreamLabels(validationContext, stream.Labels, &stream) 
  20.   ...... 
  21.   n := 0 
  22.   for _, entry := range stream.Entries { 
  23.    // 校驗(yàn)一個日志Entry實(shí)體 
  24.    if err := d.validator.ValidateEntry(validationContext, stream.Labels, entry); err != nil { 
  25.     validationErr = err 
  26.     continue 
  27.    } 
  28.    stream.Entries[n] = entry 
  29.    n++ 
  30.    // 校驗(yàn)成功的樣本大小和個數(shù) 
  31.    validatedSamplesSize += len(entry.Line) 
  32.    validatedSamplesCount++ 
  33.   } 
  34.   // 去掉校驗(yàn)失敗的實(shí)體 
  35.   stream.Entries = stream.Entries[:n] 
  36.  
  37.   if len(stream.Entries) == 0 { 
  38.    continue 
  39.   } 
  40.   // 為當(dāng)前日志流生成用于hash換的token值 
  41.   keys = append(keys, util.TokenFor(userID, stream.Labels)) 
  42.   streams = append(streams, streamTracker{ 
  43.    stream: stream, 
  44.   }) 
  45.  } 
  46.  
  47.  if len(streams) == 0 { 
  48.   return &logproto.PushResponse{}, validationErr 
  49.  } 
  50.  
  51.  now := time.Now() 
  52.  // 每個租戶有一個限速器,判斷可以正常傳輸?shù)娜罩敬笮∈欠駪?yīng)該被限制 
  53.  if !d.ingestionRateLimiter.AllowN(now, userID, validatedSamplesSize) { 
  54.   // 返回429表明客戶端被限速了 
  55.   ...... 
  56.   return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize) 
  57.  } 
  58.  
  59.  const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck 
  60.  var descs [maxExpectedReplicationSet]ring.InstanceDesc 
  61.  
  62.  samplesByIngester := map[string][]*streamTracker{} 
  63.  ingesterDescs := map[string]ring.InstanceDesc{} 
  64.  for i, key := range keys { 
  65.   // ReplicationSet 描述了一個指定的鍵與哪些 Ingesters 進(jìn)行對話,以及可以容忍多少個錯誤 
  66.   // 根據(jù) label hash 到 hash 環(huán)上獲取對應(yīng)的 ingester 節(jié)點(diǎn),一個節(jié)點(diǎn)可能有多個對等的 ingester 副本來做 HA 
  67.   replicationSet, err := d.ingestersRing.Get(key, ring.Write, descs[:0], nil, nil) 
  68.   ...... 
  69.   // 最小成功的實(shí)例樹 
  70.   streams[i].minSuccess = len(replicationSet.Ingesters) - replicationSet.MaxErrors 
  71.   // 可容忍的最大故障實(shí)例數(shù) 
  72.   streams[i].maxFailures = replicationSet.MaxErrors 
  73.   // 將 Stream 按對應(yīng)的 ingester 進(jìn)行分組 
  74.   for _, ingester := range replicationSet.Ingesters { 
  75.    // 配置每個 ingester 副本對應(yīng)的日志流數(shù)據(jù) 
  76.    samplesByIngester[ingester.Addr] = append(samplesByIngester[ingester.Addr], &streams[i]) 
  77.    ingesterDescs[ingester.Addr] = ingester 
  78.   } 
  79.  } 
  80.  
  81.  tracker := pushTracker{ 
  82.   done: make(chan struct{}), 
  83.   err:  make(chan error), 
  84.  } 
  85.  tracker.samplesPending.Store(int32(len(streams))) 
  86.  // 循環(huán)Ingesters 
  87.  for ingester, samples := range samplesByIngester { 
  88.   // 讓ingester并行處理通過hash環(huán)對應(yīng)的日志流列表 
  89.   go func(ingester ring.InstanceDesc, samples []*streamTracker) { 
  90.    ...... 
  91.    // 將日志流樣本數(shù)據(jù)下發(fā)給對應(yīng)的 ingester 節(jié)點(diǎn) 
  92.    d.sendSamples(localCtx, ingester, samples, &tracker) 
  93.   }(ingesterDescs[ingester], samples) 
  94.  } 
  95.  ...... 

Push 函數(shù)的核心就是根據(jù)日志流的標(biāo)簽來計(jì)算一個 Token 值,根據(jù)這個 Token 值去哈希環(huán)上獲取對應(yīng)的處理日志的 Ingester 實(shí)例,然后并行通過 Ingester 處理日志流數(shù)據(jù),通過 sendSamples 函數(shù)為單個 ingester 去發(fā)送日志樣本數(shù)據(jù):

  1. // pkg/distributor/distributor.go 
  2.  
  3. func (d *Distributor) sendSamples(ctx context.Context, ingester ring.InstanceDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) { 
  4.  err := d.sendSamplesErr(ctx, ingester, streamTrackers) 
  5.  ...... 
  6.  
  7. func (d *Distributor) sendSamplesErr(ctx context.Context, ingester ring.InstanceDesc, streams []*streamTracker) error { 
  8.  // 根據(jù) ingester 地址獲取 client 
  9.  c, err := d.pool.GetClientFor(ingester.Addr) 
  10.  ...... 
  11.  // 重新構(gòu)造 PushRequest 
  12.  req := &logproto.PushRequest{ 
  13.   Streams: make([]logproto.Stream, len(streams)), 
  14.  } 
  15.  for i, s := range streams { 
  16.   req.Streams[i] = s.stream 
  17.  } 
  18.  // 通過 Ingester 客戶端請求數(shù)據(jù) 
  19.  _, err = c.(logproto.PusherClient).Push(ctx, req) 
  20.  ...... 

Ingester 寫入日志

Ingester 客戶端中的 Push 函數(shù)實(shí)際上就是一個 gRPC 服務(wù)的客戶端:

  1. // pkg/ingester/ingester.go 
  2.  
  3. // Push 實(shí)現(xiàn) logproto.Pusher. 
  4. func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) { 
  5.  // 獲取租戶ID 
  6.  instanceID, err := user.ExtractOrgID(ctx) 
  7.  ...... 
  8.  // 根據(jù)租戶ID獲取 instance 對象 
  9.  instance := i.getOrCreateInstance(instanceID) 
  10.  // 直接調(diào)用 instance 對象 Push 數(shù)據(jù) 
  11.  err = instance.Push(ctx, req) 
  12.  return &logproto.PushResponse{}, err 

instance 下的 Push 函數(shù):

  1. // pkg/ingester/instance.go 
  2.  
  3. func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { 
  4.  record := recordPool.GetRecord() 
  5.  record.UserID = i.instanceID 
  6.  defer recordPool.PutRecord(record) 
  7.  
  8.  i.streamsMtx.Lock() 
  9.  defer i.streamsMtx.Unlock() 
  10.  
  11.  var appendErr error 
  12.  for _, s := range req.Streams { 
  13.   // 獲取一個 stream 對象 
  14.   stream, err := i.getOrCreateStream(s, false, record) 
  15.   if err != nil { 
  16.    appendErr = err 
  17.    continue 
  18.   } 
  19.   // 真正用于數(shù)據(jù)處理的是 stream 對象中的 Push 函數(shù) 
  20.   if _, err := stream.Push(ctx, s.Entries, record); err != nil { 
  21.    appendErr = err 
  22.    continue 
  23.   } 
  24.  } 
  25.  ...... 
  26.  return appendErr 
  27.  
  28. func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, record *WALRecord) (*stream, error) { 
  29.  if lock { 
  30.   i.streamsMtx.Lock() 
  31.   defer i.streamsMtx.Unlock() 
  32.  } 
  33.  // 如果 streams 中包含當(dāng)前標(biāo)簽列表對應(yīng)的 stream 對象,則直接返回 
  34.  stream, ok := i.streams[pushReqStream.Labels] 
  35.  if ok { 
  36.   return stream, nil 
  37.  } 
  38.  // record 只在重放 WAL 時為 nil 
  39.  // 我們不希望在重放 WAL 后丟掉數(shù)據(jù) 
  40.  // 為 instance 降低 stream 流限制 
  41.  var err error 
  42.  if record != nil { 
  43.   // 限流器判斷 
  44.   // AssertMaxStreamsPerUser 確保與當(dāng)前輸入的流數(shù)量沒有達(dá)到限制 
  45.   err = i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams)) 
  46.  } 
  47.  ...... 
  48.  // 解析日志流標(biāo)簽集 
  49.  labels, err := logql.ParseLabels(pushReqStream.Labels) 
  50.  ...... 
  51.  // 獲取對應(yīng)標(biāo)簽集的指紋 
  52.  fp := i.getHashForLabels(labels) 
  53.  // 重新實(shí)例化一個 stream 對象,這里還會維護(hù)日志流的倒排索引 
  54.  sortedLabels := i.index.Add(client.FromLabelsToLabelAdapters(labels), fp) 
  55.  stream = newStream(i.cfg, fp, sortedLabels, i.metrics) 
  56.  // 將stream設(shè)置到streams中去 
  57.  i.streams[pushReqStream.Labels] = stream 
  58.  i.streamsByFP[fp] = stream 
  59.  
  60.  // 當(dāng)重放 wal 的時候 record 是 nil (我們不希望在重放時重寫 wal entries). 
  61.  if record != nil { 
  62.   record.Series = append(record.Series, tsdb_record.RefSeries{ 
  63.    Ref:    uint64(fp), 
  64.    Labels: sortedLabels, 
  65.   }) 
  66.  } else { 
  67.   // 如果 record 為 nil,這就是一個 WAL 恢復(fù) 
  68.   i.metrics.recoveredStreamsTotal.Inc() 
  69.  } 
  70.  ...... 
  71.  i.addTailersToNewStream(stream) 
  72.  return stream, nil 

這個里面涉及到 WAL 這一塊的設(shè)計(jì),比較復(fù)雜,我們可以先看 stream 下面的 Push 函數(shù)實(shí)現(xiàn),主要就是將收到的 []Entry 先 Append 到內(nèi)存中的 Chunk 流([]chunkDesc) 中:

  1. // pkg/ingester/stream.go 
  2. func (s *stream) Push(ctx context.Context, entries []logproto.Entry, record *WALRecord) (int, error) { 
  3.  s.chunkMtx.Lock() 
  4.  defer s.chunkMtx.Unlock() 
  5.  var bytesAdded int 
  6.  prevNumChunks := len(s.chunks) 
  7.  var lastChunkTimestamp time.Time 
  8.  // 如果之前的 chunks 列表為空,則創(chuàng)建一個新的 chunk 
  9.  if prevNumChunks == 0 { 
  10.   s.chunks = append(s.chunks, chunkDesc{ 
  11.    chunk: s.NewChunk(), 
  12.   }) 
  13.   chunksCreatedTotal.Inc() 
  14.  } else { 
  15.   // 獲取最新一個chunk的日志時間戳 
  16.   _, lastChunkTimestamp = s.chunks[len(s.chunks)-1].chunk.Bounds() 
  17.  } 
  18.  
  19.  var storedEntries []logproto.Entry 
  20.  failedEntriesWithError := []entryWithError{} 
  21.  
  22.  for i := range entries { 
  23.   // 如果這個日志條目與我們最后 append 的一行的時間戳和內(nèi)容相匹配,則忽略它 
  24.   if entries[i].Timestamp.Equal(s.lastLine.ts) && entries[i].Line == s.lastLine.content { 
  25.    continue 
  26.   } 
  27.  
  28.   // 最新的一個 chunk 
  29.   chunk := &s.chunks[len(s.chunks)-1] 
  30.   // 如果當(dāng)前chunk已經(jīng)關(guān)閉 或者 已經(jīng)達(dá)到設(shè)置的最大 Chunk 大小 
  31.   if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, lastChunkTimestamp, chunk, s.cfg.SyncPeriod, s.cfg.SyncMinUtilization) { 
  32.    // 如果 chunk 沒有更多的空間,則調(diào)用 Close 來以確保 head block 中的數(shù)據(jù)都被切割和壓縮。 
  33.    err := chunk.chunk.Close() 
  34.    ...... 
  35.    chunk.closed = true 
  36.    ...... 
  37.    // Append 一個新的 Chunk 
  38.    s.chunks = append(s.chunks, chunkDesc{ 
  39.     chunk: s.NewChunk(), 
  40.    }) 
  41.    chunk = &s.chunks[len(s.chunks)-1] 
  42.    lastChunkTimestamp = time.Time{} 
  43.   } 
  44.   // 往 chunk 里面 Append 日志數(shù)據(jù) 
  45.   if err := chunk.chunk.Append(&entries[i]); err != nil { 
  46.    failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], err}) 
  47.   } else { 
  48.    // 存儲添加到 chunk 中的日志數(shù)據(jù) 
  49.    storedEntries = append(storedEntries, entries[i]) 
  50.    // 配置最后日志行的數(shù)據(jù) 
  51.    lastChunkTimestamp = entries[i].Timestamp 
  52.    s.lastLine.ts = lastChunkTimestamp 
  53.    s.lastLine.content = entries[i].Line 
  54.    // 累計(jì)大小 
  55.    bytesAdded += len(entries[i].Line) 
  56.   } 
  57.   chunk.lastUpdated = time.Now() 
  58.  } 
  59.  
  60.  if len(storedEntries) != 0 { 
  61.   // 當(dāng)重放 wal 的時候 record 將為 nil(我們不希望在重放的時候重寫wal日志條目) 
  62.   if record != nil { 
  63.    record.AddEntries(uint64(s.fp), storedEntries...) 
  64.   } 
  65.   // 后續(xù)是用與tail日志的處理 
  66.   ...... 
  67.  } 
  68.  ...... 
  69.  // 如果新增了chunks 
  70.  if len(s.chunks) != prevNumChunks { 
  71.   memoryChunks.Add(float64(len(s.chunks) - prevNumChunks)) 
  72.  } 
  73.  return bytesAdded, nil 

Chunk 其實(shí)就是多條日志構(gòu)成的壓縮包,將日志壓成 Chunk 的可以直接存入對象存儲, 一個 Chunk 到達(dá)指定大小之前會不斷 Append 新的日志到里面,而在達(dá)到大小之后, Chunk 就會關(guān)閉等待持久化(強(qiáng)制持久化也會關(guān)閉 Chunk, 比如關(guān)閉 ingester 實(shí)例時就會關(guān)閉所有的 Chunk 并持久化)。Chunk 的大小控制很重要:

  • 假如 Chunk 容量過小: 首先是導(dǎo)致壓縮效率不高,同時也會增加整體的 Chunk 數(shù)量, 導(dǎo)致倒排索引過大,最后, 對象存儲的操作次數(shù)也會變多, 帶來額外的性能開銷
  • 假如 Chunk 過大: 一個 Chunk 的 open 時間會更長, 占用額外的內(nèi)存空間, 同時, 也增加了丟數(shù)據(jù)的風(fēng)險,Chunk 過大也會導(dǎo)致查詢讀放大

(圖片來源: https://aleiwu.com/post/grafana-loki/)

在將日志流追加到 Chunk 中過后,在 Ingester 初始化時會啟動兩個循環(huán)去處理 Chunk 數(shù)據(jù),分別從 chunks 數(shù)據(jù)取出存入優(yōu)先級隊(duì)列,另外一個循環(huán)定期檢查從內(nèi)存中刪除已經(jīng)持久化過后的數(shù)據(jù)。

首先是 Ingester 中定義了一個 flushQueues 屬性,是一個優(yōu)先級隊(duì)列數(shù)組,該隊(duì)列中存放的是 flushOp:

  1. // pkg/ingester/ingester.go 
  2. type Ingester struct { 
  3.  services.Service 
  4.  ...... 
  5.  // 每個 flush 線程一個隊(duì)列,指紋用來選擇隊(duì)列 
  6.  flushQueues     []*util.PriorityQueue  // 優(yōu)先級隊(duì)列數(shù)組 
  7.  flushQueuesDone sync.WaitGroup 
  8.  ...... 
  9.  
  10. // pkg/ingester/flush.go 
  11. // 優(yōu)先級隊(duì)列中存放的數(shù)據(jù) 
  12. type flushOp struct { 
  13.  from      model.Time 
  14.  userID    string 
  15.  fp        model.Fingerprint 
  16.  immediate bool 

在初始化 Ingester 的時候會根據(jù)傳遞的 ConcurrentFlushes 參數(shù)來實(shí)例化 flushQueues的大?。?/p>

  1. // pkg/ingester/ingester.go 
  2. func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides, configs *runtime.TenantConfigs, registerer prometheus.Registerer) (*Ingester, error) { 
  3.  ...... 
  4.  i := &Ingester{ 
  5.   ...... 
  6.   flushQueues:           make([]*util.PriorityQueue, cfg.ConcurrentFlushes), 
  7.   ...... 
  8.  } 
  9.  ...... 
  10.  i.Service = services.NewBasicService(i.starting, i.running, i.stopping) 
  11.  return i, nil 

然后通過 services.NewBasicService 實(shí)例化 Service 的時候指定了服務(wù)的 Starting、Running、Stopping 3 個狀態(tài),在其中的 staring 狀態(tài)函數(shù)中會啟動協(xié)程去消費(fèi)優(yōu)先級隊(duì)列中的數(shù)據(jù)

  1. // pkg/ingester/ingester.go 
  2. func (i *Ingester) starting(ctx context.Context) error { 
  3.  // todo,如果開啟了 WAL 的處理 
  4.  ...... 
  5.  // 初始化 flushQueues 
  6.  i.InitFlushQueues() 
  7.  ...... 
  8.  // 啟動循環(huán)檢查chunk數(shù)據(jù) 
  9.  i.loopDone.Add(1) 
  10.  go i.loop() 
  11.  return nil 

初始化 flushQueues 實(shí)現(xiàn)如下所示,其中 flushQueuesDone 是一個 WaitGroup,根據(jù)配置的并發(fā)數(shù)量并發(fā)執(zhí)行 flushLoop 操作:

  1. // pkg/ingester/flush.go 
  2. func (i *Ingester) InitFlushQueues() { 
  3.  i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes) 
  4.  for j := 0; j < i.cfg.ConcurrentFlushes; j++ { 
  5.   // 為每個協(xié)程構(gòu)造一個優(yōu)先級隊(duì)列 
  6.   i.flushQueues[j] = util.NewPriorityQueue(flushQueueLength) 
  7.   go i.flushLoop(j) 
  8.  } 

每一個優(yōu)先級隊(duì)列循環(huán)消費(fèi)數(shù)據(jù):

  1. // pkg/ingester/flush.go 
  2. func (i *Ingester) flushLoop(j int) { 
  3.  ...... 
  4.  for { 
  5.   // 從隊(duì)列中根據(jù)優(yōu)先級取出數(shù)據(jù) 
  6.   o := i.flushQueues[j].Dequeue() 
  7.   if o == nil { 
  8.    return 
  9.   } 
  10.   op := o.(*flushOp) 
  11.   // 執(zhí)行真正的刷新用戶序列數(shù)據(jù) 
  12.   err := i.flushUserSeries(op.userID, op.fp, op.immediate) 
  13.   ...... 
  14.   // 如果退出時刷新失敗了,把失敗的操作放回到隊(duì)列中去。 
  15.   if op.immediate && err != nil { 
  16.    op.from = op.from.Add(flushBackoff) 
  17.    i.flushQueues[j].Enqueue(op) 
  18.   } 
  19.  } 

刷新用戶的序列操作,也就是要保存到存儲中去:

  1. // pkg/ingester/flush.go 
  2. // 根據(jù)用戶ID刷新用戶日志序列 
  3. func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error { 
  4.  instance, ok := i.getInstanceByID(userID) 
  5.  ...... 
  6.  // 根據(jù)instance和fp指紋數(shù)據(jù)獲取需要刷新的chunks 
  7.  chunks, labels, chunkMtx := i.collectChunksToFlush(instance, fp, immediate) 
  8.  ...... 
  9.  // 執(zhí)行真正的刷新 chunks 操作 
  10.  err := i.flushChunks(ctx, fp, labels, chunks, chunkMtx) 
  11.  ...... 
  12.  
  13. // 收集需要刷新的 chunks 
  14. func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint, immediate bool) ([]*chunkDesc, labels.Labels, *sync.RWMutex) { 
  15.  instance.streamsMtx.Lock() 
  16.  // 根據(jù)指紋數(shù)據(jù)獲取 stream 
  17.  stream, ok := instance.streamsByFP[fp] 
  18.  instance.streamsMtx.Unlock() 
  19.  if !ok { 
  20.   return nil, nil, nil 
  21.  } 
  22.  
  23.  var result []*chunkDesc 
  24.  stream.chunkMtx.Lock() 
  25.  defer stream.chunkMtx.Unlock() 
  26.  // 循環(huán)所有chunks 
  27.  for j := range stream.chunks { 
  28.   // 判斷是否應(yīng)該刷新當(dāng)前chunk 
  29.   shouldFlush, reason := i.shouldFlushChunk(&stream.chunks[j]) 
  30.   if immediate || shouldFlush { 
  31.    // 確保不再對該塊進(jìn)行寫操作(如果沒有關(guān)閉,則設(shè)置為關(guān)閉狀態(tài)) 
  32.    if !stream.chunks[j].closed { 
  33.     stream.chunks[j].closed = true 
  34.    } 
  35.    // 如果該 chunk 還沒有被成功刷新,則刷新這個塊 
  36.    if stream.chunks[j].flushed.IsZero() { 
  37.     result = append(result, &stream.chunks[j]) 
  38.     ...... 
  39.    } 
  40.   } 
  41.  } 
  42.  return result, stream.labels, &stream.chunkMtx 

下面是判斷一個具體的 chunk 是否應(yīng)該被刷新的邏輯:

  1. // pkg/ingester/flush.go 
  2. func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) (bool, string) { 
  3.  // chunk關(guān)閉了也應(yīng)該刷新了 
  4.  if chunk.closed { 
  5.   if chunk.synced { 
  6.    return true, flushReasonSynced 
  7.   } 
  8.   return true, flushReasonFull 
  9.  } 
  10.  // chunk最后更新的時間超過了配置的 chunk 空閑時間 MaxChunkIdle 
  11.  if time.Since(chunk.lastUpdated) > i.cfg.MaxChunkIdle { 
  12.   return true, flushReasonIdle 
  13.  } 
  14.  
  15.  // chunk的邊界時間操過了配置的 chunk  最大時間 MaxChunkAge 
  16.  if fromto := chunk.chunk.Bounds(); to.Sub(from) > i.cfg.MaxChunkAge { 
  17.   return true, flushReasonMaxAge 
  18.  } 
  19.  return false"" 

真正將 chunks 數(shù)據(jù)刷新保存到存儲中是 flushChunks 函數(shù)實(shí)現(xiàn)的:

  1. // pkg/ingester/flush.go 
  2. func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs labels.Labels, cs []*chunkDesc, chunkMtx sync.Locker) error { 
  3.  ...... 
  4.  wireChunks := make([]chunk.Chunk, len(cs)) 
  5.  // 下面的匿名函數(shù)用于生成保存到存儲中的chunk數(shù)據(jù) 
  6.  err = func() error { 
  7.   chunkMtx.Lock() 
  8.   defer chunkMtx.Unlock() 
  9.  
  10.   for j, c := range cs { 
  11.    if err := c.chunk.Close(); err != nil { 
  12.     return err 
  13.    } 
  14.    firstTime, lastTime := loki_util.RoundToMilliseconds(c.chunk.Bounds()) 
  15.    ch := chunk.NewChunk( 
  16.     userID, fp, metric, 
  17.     chunkenc.NewFacade(c.chunk, i.cfg.BlockSize, i.cfg.TargetChunkSize), 
  18.     firstTime, 
  19.     lastTime, 
  20.    ) 
  21.  
  22.    chunkSize := c.chunk.BytesSize() + 4*1024 // size + 4kB should be enough room for cortex header 
  23.    start := time.Now() 
  24.    if err := ch.EncodeTo(bytes.NewBuffer(make([]byte, 0, chunkSize))); err != nil { 
  25.     return err 
  26.    } 
  27.    wireChunks[j] = ch 
  28.   } 
  29.   return nil 
  30.  }() 
  31.  
  32.  
  33.  // 通過 store 接口保存 chunk 數(shù)據(jù) 
  34.  if err := i.store.Put(ctx, wireChunks); err != nil { 
  35.   return err 
  36.  } 
  37.  
  38.  ...... 
  39.  
  40.  chunkMtx.Lock() 
  41.  defer chunkMtx.Unlock() 
  42.  for i, wc := range wireChunks { 
  43.   // flush 成功,寫入刷新時間 
  44.   cs[i].flushed = time.Now() 
  45.   // 下是一些監(jiān)控?cái)?shù)據(jù)更新 
  46.   ...... 
  47.  } 
  48.  
  49.  return nil 

chunk 數(shù)據(jù)被寫入到存儲后,還有有一個協(xié)程會去定時清理本地的這些 chunk 數(shù)據(jù),在上面的 Ingester 的 staring 函數(shù)中最后有一個 go i.loop(),在這個 loop() 函數(shù)中會每隔 FlushCheckPeriod(默認(rèn) 30s,可以通過 --ingester.flush-check-period 進(jìn)行配置)時間就會去去調(diào)用 sweepUsers 函數(shù)進(jìn)行垃圾回收:

  1. // pkg/ingester/ingester.go 
  2. func (i *Ingester) loop() { 
  3.  defer i.loopDone.Done() 
  4.  
  5.  flushTicker := time.NewTicker(i.cfg.FlushCheckPeriod) 
  6.  defer flushTicker.Stop() 
  7.  
  8.  for { 
  9.   select { 
  10.   case <-flushTicker.C: 
  11.    i.sweepUsers(falsetrue
  12.   case <-i.loopQuit: 
  13.    return 
  14.   } 
  15.  } 

sweepUsers 函數(shù)用于執(zhí)行將日志流數(shù)據(jù)加入到優(yōu)先級隊(duì)列中,并對沒有序列的用戶進(jìn)行垃圾回收:

  1. // pkg/ingester/flush.go 
  2. // sweepUsers 定期執(zhí)行 flush 操作,并對沒有序列的用戶進(jìn)行垃圾回收 
  3. func (i *Ingester) sweepUsers(immediate, mayRemoveStreams bool) { 
  4.  instances := i.getInstances() 
  5.  for _, instance := range instances { 
  6.   i.sweepInstance(instance, immediate, mayRemoveStreams) 
  7.  } 
  8.  
  9. func (i *Ingester) sweepInstance(instance *instance, immediate, mayRemoveStreams bool) { 
  10.  instance.streamsMtx.Lock() 
  11.  defer instance.streamsMtx.Unlock() 
  12.  for _, stream := range instance.streams { 
  13.   i.sweepStream(instance, stream, immediate) 
  14.   i.removeFlushedChunks(instance, stream, mayRemoveStreams) 
  15.  } 
  16.  
  17. // must hold streamsMtx 
  18. func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate bool) { 
  19.  stream.chunkMtx.RLock() 
  20.  defer stream.chunkMtx.RUnlock() 
  21.  if len(stream.chunks) == 0 { 
  22.   return 
  23.  } 
  24.  // 最新的chunk 
  25.  lastChunk := stream.chunks[len(stream.chunks)-1] 
  26.  // 判斷是否應(yīng)該被flush 
  27.  shouldFlush, _ := i.shouldFlushChunk(&lastChunk) 
  28.  // 如果只有一個chunk并且不是強(qiáng)制持久化切最新的chunk還不應(yīng)該被flush,則直接返回 
  29.  if len(stream.chunks) == 1 && !immediate && !shouldFlush { 
  30.   return 
  31.  } 
  32.  // 根據(jù)指紋獲取用與處理的優(yōu)先級隊(duì)列索引 
  33.  flushQueueIndex := int(uint64(stream.fp) % uint64(i.cfg.ConcurrentFlushes)) 
  34.  firstTime, _ := stream.chunks[0].chunk.Bounds() 
  35.  // 加入到優(yōu)先級隊(duì)列中去 
  36.  i.flushQueues[flushQueueIndex].Enqueue(&flushOp{ 
  37.   model.TimeFromUnixNano(firstTime.UnixNano()), instance.instanceID, 
  38.   stream.fp, immediate, 
  39.  }) 
  40.  
  41. // 移除已經(jīng)flush過后的chunks數(shù)據(jù) 
  42. func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream, mayRemoveStream bool) { 
  43.  now := time.Now() 
  44.  
  45.  stream.chunkMtx.Lock() 
  46.  defer stream.chunkMtx.Unlock() 
  47.  prevNumChunks := len(stream.chunks) 
  48.  var subtracted int 
  49.  for len(stream.chunks) > 0 { 
  50.   // 如果chunk還沒有被刷新到存儲 或者 chunk被刷新到存儲到現(xiàn)在的時間還沒操過 RetainPeriod(默認(rèn)15分鐘,可以通過--ingester.chunks-retain-period 進(jìn)行配置)則忽略 
  51.   if stream.chunks[0].flushed.IsZero() || now.Sub(stream.chunks[0].flushed) < i.cfg.RetainPeriod { 
  52.    break 
  53.   } 
  54.   subtracted += stream.chunks[0].chunk.UncompressedSize() 
  55.   // 刪除引用,以便該塊可以被垃圾回收起來 
  56.   stream.chunks[0].chunk = nil 
  57.   // 移除chunk 
  58.   stream.chunks = stream.chunks[1:] 
  59.  } 
  60.  ...... 
  61.  // 如果stream中的所有chunk都被清空了,則清空該 stream 的相關(guān)數(shù)據(jù) 
  62.  if mayRemoveStream && len(stream.chunks) == 0 { 
  63.   delete(instance.streamsByFP, stream.fp) 
  64.   delete(instance.streams, stream.labelsString) 
  65.   instance.index.Delete(stream.labels, stream.fp) 
  66.   ...... 
  67.  } 

關(guān)于存儲或者查詢等模塊的實(shí)現(xiàn)在后文再繼續(xù)探索,包括 WAL 的實(shí)現(xiàn)也較為復(fù)雜。

 

責(zé)任編輯:姜華 來源: k8s技術(shù)圈
相關(guān)推薦

2024-02-04 00:00:00

Loki性能查詢

2010-09-14 10:46:59

2025-02-10 02:00:00

2011-02-22 16:23:20

VSFTPD

2011-08-15 11:31:27

iPhone開發(fā)日志

2014-04-21 15:53:59

iOS開源項(xiàng)目CocoaLumber

2023-12-25 11:18:12

OpenTeleme應(yīng)用日志Loki

2022-12-29 08:00:26

Loki網(wǎng)絡(luò)設(shè)備

2022-06-28 08:40:16

LokiPromtail日志報(bào)警

2021-05-18 07:30:36

開發(fā)Spring Boot日志

2021-09-13 08:20:13

Loki日志系統(tǒng)

2023-01-04 08:21:02

Loki配置日志

2024-03-11 00:01:00

PromtailLoki服務(wù)器

2024-02-01 09:48:17

2022-06-13 11:33:59

RedoMySQL

2023-12-05 07:21:17

IstioEnvoy

2011-03-15 11:33:18

iptables

2014-08-26 11:11:57

AsyncHttpCl源碼分析

2020-12-22 09:17:49

日志Loki服務(wù)

2025-03-26 08:01:18

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號