自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

一篇文章帶你學習etcd-wal模塊解析

數(shù)據(jù)庫 其他數(shù)據(jù)庫
etcd會判斷命令是否合法,然后Leader 收到提案后,通過 Raft 模塊的事件總線保存待發(fā)給 Follower 節(jié)點的消息和待持久化的日志條目,日志條目是封裝的entry。

[[408646]]

Part1常見的數(shù)據(jù)庫日志

傳統(tǒng)數(shù)據(jù)庫的日志,例如 redo log(重做日志),記錄的是修改后的數(shù)據(jù)。其實就是 MySQL 里經(jīng)常說到的 WAL 技術,它的關鍵點就是先寫日志,再寫磁盤。

  • redo log 是 InnoDB 引擎特有的;binlog 是 MySQL 的 Server 層實現(xiàn)的,所有引擎都可以使用。redo log 是物理日志,記錄的是“在某個數(shù)據(jù)頁上做了什么修改”;
  • binlog 是邏輯日志,記錄的是這個語句的原始邏輯,比如“給 ID=2 這一行的 c 字段加 1 ”。redo log 是循環(huán)寫的,空間固定會用完;
  • binlog 是可以追加寫入的。“追加寫”是指 binlog 文件寫到一定大小后會切換到下一個,并不會覆蓋以前的日志。

redis使用AOF(Append Only File),這樣做的好處是會在執(zhí)行命令成功后保存,不需要提前驗證命令是否正確。AOF會保存服務器執(zhí)行的所有寫操作到日志文件中,在服務重啟以后,會執(zhí)行這些命令來恢復數(shù)據(jù)。而 AOF 里記錄的是 Redis 收到的每一條命令,這些命令是以文本形式保存的。

etcd會判斷命令是否合法,然后Leader 收到提案后,通過 Raft 模塊的事件總線保存待發(fā)給 Follower 節(jié)點的消息和待持久化的日志條目,日志條目是封裝的entry。etcdserver 從 Raft 模塊獲取到以上消息和日志條目后,作為 Leader,它會將 put 提案消息廣播給集群各個節(jié)點,同時需要把集群 Leader 任期號、投票信息、已提交索引、提案內容持久化到一個 WAL(Write Ahead Log)日志文件中,用于保證集群的一致性、可恢復性。

Part2wal源碼分析

etcd server在啟動時,會根據(jù)是否wal目錄來確定之前etcd是否創(chuàng)建過wal,如果沒有創(chuàng)建wal,etcd會嘗試調用wal.Create方法,創(chuàng)建wal。否則使用wal.Open及wal.ReadAll方法是reload之前的wal,邏輯在etcd/etcdserver/server.go的NewServer方法里,存在wal時會調用restartNode,下面分創(chuàng)建wal和加載wal兩種情況作介紹。

wal的關鍵對象介紹如下

wal日志結構.png

dir:wal文件保存的路徑

dirFile:dir打開后的一個目錄fd對象

metadata:創(chuàng)建wal時傳入的字節(jié)序列,etcd里面主要是序列化的是節(jié)點id及集群id相關信息,后續(xù)每創(chuàng)建一個wal文件就會將其寫到wal的首部。

state:wal在append過程中保存的hardState信息,每次raft傳出的hardState有變化都會被更新,并會及時刷盤,在wal有切割時會在新的wal頭部保存最新的

hardState信息,etcd重啟后會讀取最后一次保存的hardState用來恢復宕機或機器重啟時storage中hardState狀態(tài)信息,hardState的結構如下:

  1. type HardState struct { 
  2.  Term             uint64 `protobuf:"varint,1,opt,name=term" json:"term"
  3.  Vote             uint64 `protobuf:"varint,2,opt,name=vote" json:"vote"
  4.  Commit           uint64 `protobuf:"varint,3,opt,name=commit" json:"commit"
  5.  XXX_unrecognized []byte `json:"-"

start:記錄最后一次保存的snapshot的元數(shù)據(jù)信息,主要是snapshot中最后一條日志Entry的index和Term,walpb.Snapshot的結構如:

  1. type Snapshot struct { 
  2.  Index            uint64 `protobuf:"varint,1,opt,name=index" json:"index"
  3.  Term             uint64 `protobuf:"varint,2,opt,name=term" json:"term"
  4.  XXX_unrecognized []byte `json:"-"

decoder:負責在讀取WAL日志文件時,將protobuf反序列化成Record實例。

readClose:用于關閉decoder關聯(lián)的reader,關閉wal讀模式,通過是在readALL之后調用該函數(shù)實現(xiàn)的

enti:最后一次保存到wal中的日志Entry的index

encoder:負責將寫入WAL日志文件的Record實例進行序列化成protobuf。

size :創(chuàng)建臨時文件時預分配空間的大小,默認是 64MB (由wal.SegmentSizeBytes指定,該值也是每個日志文件的大小)。

locks:當前WAL實例管理的所有WAL日志文件對應的句柄。

fp:filePipeline實例負責創(chuàng)建新的臨時文件。

WAL創(chuàng)建

先來看一下wal.Create()方法,該方法不僅會創(chuàng)建WAL實例,而是做了很多初始化工作,其大致步驟如下:

(1)創(chuàng)建臨時目錄,并在臨時目錄中創(chuàng)建編號為“0-0”的WAL日志文件,WAL日志文件名由兩部分組成,一部分是seq(單調遞增),另一部分是該日志文件中的第一條日志記錄的索引值。

(2)嘗試為該WAL日志文件預分配磁盤空間。

(3)向該WAL日志文件中寫入一條crcType類型的日志記錄、一條metadataType類型的日志記錄及一條snapshotType類型的日志記錄。

(4)創(chuàng)建WAL實例關聯(lián)的filePipeline實例。

(5)將臨時目錄重命名為WAL.dir字段指定的名稱。這里之所以先使用臨時目錄完成初始化操作再將其重命名的方式,主要是為了讓整個初始化過程看上去是一個原子操作。這樣上層模塊只需要檢查wal的目錄是否存在。

wal.Create()方法的具體實現(xiàn)如下:

  1. // Create creates a WAL ready for appending records. The given metadata is 
  2. // recorded at the head of each WAL file, and can be retrieved(檢索) with ReadAll. 
  3. func Create(dirpath string, metadata []byte) (*WAL, error) { 
  4.      
  5.  // keep temporary wal directory so WAL initialization appears atomic 
  6.     //先使用臨時目錄完成初始化操作再將其重命名的方式,主要是為了讓整個初始化過程看上去是一個原子操作。 
  7.  tmpdirpath := filepath.Clean(dirpath) + ".tmp" 
  8.  if fileutil.Exist(tmpdirpath) { 
  9.   if err := os.RemoveAll(tmpdirpath); err != nil { 
  10.    return nil, err 
  11.   } 
  12.  } 
  13.  if err := fileutil.CreateDirAll(tmpdirpath); err != nil { 
  14.   return nil, err 
  15.  } 
  16.  // dir/filename  ,filename從walName獲取  seq-index.wal 
  17.  p := filepath.Join(tmpdirpath, walName(0, 0)) 
  18.  // 創(chuàng)建對文件上互斥鎖 
  19.  f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode) 
  20.  if err != nil { 
  21.   return nil, err 
  22.  } 
  23.  // 定位到文件末尾 
  24.  if _, err = f.Seek(0, io.SeekEnd); err != nil { 
  25.   return nil, err 
  26.  } 
  27.  // 預分配文件,大小為SegmentSizeBytes(64MB) 
  28.  if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil { 
  29.   return nil, err 
  30.  } 
  31.  // 新建WAL結構 
  32.  w := &WAL{ 
  33.   dir:      dirpath, 
  34.   metadata: metadata,// metadata 可為nil 
  35.  } 
  36.  // 在這個wal文件上創(chuàng)建一個encoder 
  37.  w.encoder, err = newFileEncoder(f.File, 0) 
  38.  if err != nil { 
  39.   return nil, err 
  40.  } 
  41.  // 把這個上了互斥鎖的文件加入到locks數(shù)組中 
  42.  w.locks = append(w.locks, f) 
  43.  if err = w.saveCrc(0); err != nil { 
  44.   return nil, err 
  45.  } 
  46.  // 將metadataType類型的record記錄在wal的header處 
  47.  if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil { 
  48.   return nil, err 
  49.  } 
  50.  // 保存空的snapshot 
  51.  if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil { 
  52.   return nil, err 
  53.  } 
  54.  // 之前以.tmp結尾的文件,初始化完成之后重命名 
  55.  if w, err = w.renameWal(tmpdirpath); err != nil { 
  56.   return nil, err 
  57.  } 
  58.  // directory was renamed; sync parent dir to persist rename 
  59.  pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir)) 
  60.  if perr != nil { 
  61.   return nil, perr 
  62.  } 
  63.     // 將parent dir 進行同步到磁盤 
  64.  if perr = fileutil.Fsync(pdir); perr != nil { 
  65.   return nil, perr 
  66.  } 
  67.  
  68.     return w, nil 

WAL日志文件遵循一定的命名規(guī)則,由walName實現(xiàn),格式為"序號--raft日志索引.wal"。

  1. // 根據(jù)seq和index產生wal文件名 
  2. func walName(seq, index uint64) string { 
  3.  return fmt.Sprintf("%016x-%016x.wal", seq, index

在創(chuàng)建的過程中,Create函數(shù)還向WAL日志中寫入了兩條數(shù)據(jù),一條就是記錄metadata,一條是記錄snapshot,WAL中的數(shù)據(jù)都是以Record為單位保存的,結構定義如下:

  1. // 存儲在wal穩(wěn)定存儲中的消息一共有兩種,這是第一種普通記錄的格式 
  2. type Record struct { 
  3.  Type             int64  `protobuf:"varint,1,opt,name=type" json:"type"
  4.  Crc              uint32 `protobuf:"varint,2,opt,name=crc" json:"crc"
  5.  Data             []byte `protobuf:"bytes,3,opt,name=data" json:"data,omitempty"
  6.  XXX_unrecognized []byte `json:"-"

Record類型

其中Type字段表示該Record的類型,取值可以是以下幾種:

  1. const ( 
  2.  metadataType int64 = iota + 1 
  3.  entryType 
  4.  stateType 
  5.  crcType 
  6.  snapshotType 
  7.  // warnSyncDuration is the amount of time allotted to an fsync before 
  8.  // logging a warning 
  9.  warnSyncDuration = time.Second 

對應于raft中的Snapshot(應用狀態(tài)機的Snapshot),WAL中也會記錄一些Snapshot的信息(但是它不會記錄完整的應用狀態(tài)機的Snapshot數(shù)據(jù)),WAL中的Snapshot格式定義如下:

  1. // 存儲在wal中的第二種Record消息,snapshot 
  2. type Snapshot struct { 
  3.  Index            uint64 `protobuf:"varint,1,opt,name=index" json:"index"
  4.  Term             uint64 `protobuf:"varint,2,opt,name=term" json:"term"
  5.  XXX_unrecognized []byte `json:"-"

在保存Snapshot的(注意這里的Snapshot是WAL里的Record類型,不是raft中的應用狀態(tài)機的Snapshot)SaveSnapshot函數(shù)中:

  1. // 持久化walpb.Snapshot 
  2. func (w *WAL) SaveSnapshot(e walpb.Snapshot) error { 
  3.  // pb序列化,此時的e可為空的 
  4.  b := pbutil.MustMarshal(&e) 
  5.  w.mu.Lock() 
  6.  defer w.mu.Unlock() 
  7.     // 創(chuàng)建snapshotType類型的record 
  8.  rec := &walpb.Record{Type: snapshotType, Data: b} 
  9.  // 持久化到wal中 
  10.  if err := w.encoder.encode(rec); err != nil { 
  11.   return err 
  12.  } 
  13.  // update enti only when snapshot is ahead of last index 
  14.  if w.enti < e.Index { 
  15.   // index of the last entry saved to the wal 
  16.   // e.Index來自應用狀態(tài)機的Index 
  17.   w.enti = e.Index 
  18.  } 
  19.  // 同步刷新磁盤 
  20.  return w.sync() 

一條Record需要先把序列化后才能持久化,這個是通過encode函數(shù)完成的(encoder.go),代碼如下:

  1. // 將Record序列化后持久化到WAL文件 
  2. func (e *encoder) encode(rec *walpb.Record) error { 
  3.  e.mu.Lock() 
  4.  defer e.mu.Unlock() 
  5.  e.crc.Write(rec.Data) 
  6.  // 生成數(shù)據(jù)的crc 
  7.  rec.Crc = e.crc.Sum32() 
  8.  var ( 
  9.   data []byte 
  10.   err  error 
  11.   n    int 
  12.  ) 
  13.  if rec.Size() > len(e.buf) { 
  14.   // 如果超過預分配的buf,就使用動態(tài)分配 
  15.   data, err = rec.Marshal() 
  16.   if err != nil { 
  17.    return err 
  18.   } 
  19.  } else { 
  20.   // 否則就使用與分配的buf 
  21.   n, err = rec.MarshalTo(e.buf) 
  22.   if err != nil { 
  23.    return err 
  24.   } 
  25.   data = e.buf[:n] 
  26.  } 
  27.  lenField, padBytes := encodeFrameSize(len(data)) 
  28.  // 先寫recode編碼后的長度 
  29.  if err = writeUint64(e.bw, lenField, e.uint64buf); err != nil { 
  30.   return err 
  31.  } 
  32.  if padBytes != 0 { 
  33.   // 如果有追加數(shù)據(jù)(對齊需求) 
  34.   data = append(data, make([]byte, padBytes)...) 
  35.  } 
  36.  // 寫recode內容 
  37.  _, err = e.bw.Write(data) 
  38.  return err 

從代碼可以看到,一個Record被序列化之后(這里為protobuf格式),會以一個Frame的格式持久化。Frame首先是一個長度字段(encodeFrameSize完成,在encoder.go文件),64bit,56bit存數(shù)據(jù)。其中MSB表示這個Frame是否有padding字節(jié),接下來才是真正的序列化后的數(shù)據(jù)。一般一個page是4096字節(jié),對齊到8字節(jié),不會出現(xiàn)一個double被拆到兩個page的情況,在cache中,也不會被拆開:

  1. func encodeFrameSize(dataBytes int) (lenField uint64, padBytes int) { 
  2.  lenField = uint64(dataBytes) 
  3.  // force 8 byte alignment so length never gets a torn write 
  4.  padBytes = (8 - (dataBytes % 8)) % 8 
  5.  if padBytes != 0 { 
  6.   // lenField的高56記錄padding的長度 
  7.   lenField |= uint64(0x80|padBytes) << 56 // 最高位為1用于表示含有padding,方便在decode的時候判斷 
  8.  } 
  9.  return lenField, padBytes 

最終,下圖展示了包含了兩個WAL文件的示例圖。

filePipeline類型

filePipeline采用“餓漢式”,即提前創(chuàng)建一些文件備用,這樣可以加快文件的創(chuàng)建速度。filePipeline它負責預創(chuàng)建日志文件并為日志文件預分配空間。在filePipeline中會啟動一個獨立的后臺goroutine來創(chuàng)建“.tmp”結尾的臨時文件,當進行日志文件切換時,直接將臨時文件進行重命名即可使用。結構體filePipeline中各個字段的含義如下。

dir(string類型):存放臨時文件的目錄。

size (int64 類型):創(chuàng)建臨時文件時預分配空間的大小,默認是 64MB (由wal.SegmentSizeBytes指定,該值也是每個日志文件的大小)。

count(int類型):當前filePipeline實例創(chuàng)建的臨時文件數(shù)。

filec(chan*fileutil.LockedFile 類型):新建的臨時文件句柄會通過 filec 通道返回給WAL實例使用。

errc(chan error類型):當創(chuàng)建臨時文件出現(xiàn)異常時,則將異常傳遞到errc通道中。

donec(chan struct{}類型):當filePipeline.Close()被調用時會關閉donec通道,從而通知filePipeline實例刪除最后一次創(chuàng)建的臨時文件。

在newFilePipeline()方法中,除了創(chuàng)建filePipeline實例,還會啟動一個后臺goroutine來執(zhí)行filePipeline.run()方法,該后臺goroutine中會創(chuàng)建新的臨時文件并將其句柄傳遞到filec通道中。filePipeline.run()方法的具體實現(xiàn)如下:

  1. // filePipeline pipelines allocating disk space 
  2. type filePipeline struct { 
  3.  // dir to put files 
  4.  dir string 
  5.  // size of files to make, in bytes 
  6.  size int64 
  7.  // count number of files generated 
  8.  count int 
  9.  filec chan *fileutil.LockedFile 
  10.  errc  chan error 
  11.  donec chan struct{} 
  12. func newFilePipeline(dir string, fileSize int64) *filePipeline { 
  13.  fp := &filePipeline{ 
  14.   dir:   dir, 
  15.   size:  fileSize, 
  16.   filec: make(chan *fileutil.LockedFile), 
  17.   errc:  make(chan error, 1), 
  18.   donec: make(chan struct{}), 
  19.  } 
  20.  // 一直執(zhí)行預分配 
  21.  go fp.run() 
  22.  return fp 
  23. // Open returns a fresh file for writing. Rename the file before calling 
  24. // Open again or there will be file collisions. 
  25. func (fp *filePipeline) Open() (f *fileutil.LockedFile, err error) { 
  26.  select { 
  27.  case f = <-fp.filec: // 從filec通道中獲取文件描述符,并返回 
  28.  case err = <-fp.errc: // 如果創(chuàng)建失敗,從errc通道中獲取,并返回 
  29.  } 
  30.  return f, err 
  31.  
  32. func (fp *filePipeline) Close() error { 
  33.  close(fp.donec) 
  34.  return <-fp.errc //出現(xiàn)錯誤,關閉donec通道并向errc通到中發(fā)送錯誤 
  35.  
  36. func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) { 
  37.  // count % 2 so this file isn't the same as the one last published   
  38.     // 創(chuàng)建臨時文件的編號是0或者1。 
  39.  fpath := filepath.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2)) 
  40.     //創(chuàng)建臨時文件,注意文件的模式與權限。 
  41.  if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, fileutil.PrivateFileMode); err != nil { 
  42.   return nil, err 
  43.  } 
  44.  // 嘗試預分配,如果當前文件系統(tǒng)不支持預分配空間,則不會報錯。 
  45.  if err = fileutil.Preallocate(f.File, fp.sizetrue); err != nil { 
  46.   f.Close() //如果出現(xiàn)異常,則會關閉donec通道 
  47.   return nil, err 
  48.  } 
  49.  // 已經(jīng)分配的文件個數(shù) 
  50.  fp.count++ 
  51.  return f, nil //返回創(chuàng)建的臨時文件 
  52.  
  53. // goroutine 
  54. func (fp *filePipeline) run() { 
  55.  defer close(fp.errc) 
  56.  for { 
  57.   // 調用alloc()執(zhí)行創(chuàng)建臨時文件 
  58.   f, err := fp.alloc() 
  59.   if err != nil { 
  60.    fp.errc <- err 
  61.    return 
  62.   } 
  63.   select { 
  64.   case fp.filec <- f:  // 等待消費方從這個channel中取出這個預創(chuàng)建的被鎖的文件 
  65.   case <-fp.donec:    //關閉時觸發(fā),刪除最后一次創(chuàng)建的臨時文件 
  66.    os.Remove(f.Name()) 
  67.    f.Close() 
  68.    return 
  69.   } 
  70.  } 

本文轉載自微信公眾號「 運維開發(fā)故事」,可以通過以下二維碼關注。轉載本文請聯(lián)系 運維開發(fā)故事公眾號。

 

責任編輯:姜華 來源: 運維開發(fā)故事
相關推薦

2021-05-15 09:18:04

Python進程

2021-06-16 14:44:32

etcd-raftRaftLeader

2021-07-13 11:37:47

cpu架構Linux

2021-11-17 10:11:08

PythonLogging模塊

2021-11-10 09:19:41

PythonShutil模塊

2022-02-21 09:44:45

Git開源分布式

2023-05-12 08:19:12

Netty程序框架

2021-06-30 00:20:12

Hangfire.NET平臺

2023-07-30 15:18:54

JavaScript屬性

2023-05-08 08:21:15

JavaNIO編程

2021-01-26 23:46:32

JavaScript數(shù)據(jù)結構前端

2021-03-09 14:04:01

JavaScriptCookie數(shù)據(jù)

2021-06-24 09:05:08

JavaScript日期前端

2021-09-27 09:18:30

ListIterato接口方法

2023-09-06 14:57:46

JavaScript編程語言

2024-01-30 13:47:45

2024-04-19 14:23:52

SwitchJavaScript開發(fā)

2020-12-08 08:09:49

SVG圖標Web

2021-03-05 18:04:15

JavaScript循環(huán)代碼

2021-05-18 08:30:42

JavaScript 前端JavaScript時
點贊
收藏

51CTO技術棧公眾號