一篇文章帶你學習etcd-wal模塊解析
Part1常見的數(shù)據(jù)庫日志
傳統(tǒng)數(shù)據(jù)庫的日志,例如 redo log(重做日志),記錄的是修改后的數(shù)據(jù)。其實就是 MySQL 里經(jīng)常說到的 WAL 技術,它的關鍵點就是先寫日志,再寫磁盤。
- redo log 是 InnoDB 引擎特有的;binlog 是 MySQL 的 Server 層實現(xiàn)的,所有引擎都可以使用。redo log 是物理日志,記錄的是“在某個數(shù)據(jù)頁上做了什么修改”;
- binlog 是邏輯日志,記錄的是這個語句的原始邏輯,比如“給 ID=2 這一行的 c 字段加 1 ”。redo log 是循環(huán)寫的,空間固定會用完;
- binlog 是可以追加寫入的。“追加寫”是指 binlog 文件寫到一定大小后會切換到下一個,并不會覆蓋以前的日志。
redis使用AOF(Append Only File),這樣做的好處是會在執(zhí)行命令成功后保存,不需要提前驗證命令是否正確。AOF會保存服務器執(zhí)行的所有寫操作到日志文件中,在服務重啟以后,會執(zhí)行這些命令來恢復數(shù)據(jù)。而 AOF 里記錄的是 Redis 收到的每一條命令,這些命令是以文本形式保存的。
etcd會判斷命令是否合法,然后Leader 收到提案后,通過 Raft 模塊的事件總線保存待發(fā)給 Follower 節(jié)點的消息和待持久化的日志條目,日志條目是封裝的entry。etcdserver 從 Raft 模塊獲取到以上消息和日志條目后,作為 Leader,它會將 put 提案消息廣播給集群各個節(jié)點,同時需要把集群 Leader 任期號、投票信息、已提交索引、提案內容持久化到一個 WAL(Write Ahead Log)日志文件中,用于保證集群的一致性、可恢復性。
Part2wal源碼分析
etcd server在啟動時,會根據(jù)是否wal目錄來確定之前etcd是否創(chuàng)建過wal,如果沒有創(chuàng)建wal,etcd會嘗試調用wal.Create方法,創(chuàng)建wal。否則使用wal.Open及wal.ReadAll方法是reload之前的wal,邏輯在etcd/etcdserver/server.go的NewServer方法里,存在wal時會調用restartNode,下面分創(chuàng)建wal和加載wal兩種情況作介紹。
wal的關鍵對象介紹如下
wal日志結構.png
dir:wal文件保存的路徑
dirFile:dir打開后的一個目錄fd對象
metadata:創(chuàng)建wal時傳入的字節(jié)序列,etcd里面主要是序列化的是節(jié)點id及集群id相關信息,后續(xù)每創(chuàng)建一個wal文件就會將其寫到wal的首部。
state:wal在append過程中保存的hardState信息,每次raft傳出的hardState有變化都會被更新,并會及時刷盤,在wal有切割時會在新的wal頭部保存最新的
hardState信息,etcd重啟后會讀取最后一次保存的hardState用來恢復宕機或機器重啟時storage中hardState狀態(tài)信息,hardState的結構如下:
- type HardState struct {
- Term uint64 `protobuf:"varint,1,opt,name=term" json:"term"`
- Vote uint64 `protobuf:"varint,2,opt,name=vote" json:"vote"`
- Commit uint64 `protobuf:"varint,3,opt,name=commit" json:"commit"`
- XXX_unrecognized []byte `json:"-"`
- }
start:記錄最后一次保存的snapshot的元數(shù)據(jù)信息,主要是snapshot中最后一條日志Entry的index和Term,walpb.Snapshot的結構如:
- type Snapshot struct {
- Index uint64 `protobuf:"varint,1,opt,name=index" json:"index"`
- Term uint64 `protobuf:"varint,2,opt,name=term" json:"term"`
- XXX_unrecognized []byte `json:"-"`
- }
decoder:負責在讀取WAL日志文件時,將protobuf反序列化成Record實例。
readClose:用于關閉decoder關聯(lián)的reader,關閉wal讀模式,通過是在readALL之后調用該函數(shù)實現(xiàn)的
enti:最后一次保存到wal中的日志Entry的index
encoder:負責將寫入WAL日志文件的Record實例進行序列化成protobuf。
size :創(chuàng)建臨時文件時預分配空間的大小,默認是 64MB (由wal.SegmentSizeBytes指定,該值也是每個日志文件的大小)。
locks:當前WAL實例管理的所有WAL日志文件對應的句柄。
fp:filePipeline實例負責創(chuàng)建新的臨時文件。
WAL創(chuàng)建
先來看一下wal.Create()方法,該方法不僅會創(chuàng)建WAL實例,而是做了很多初始化工作,其大致步驟如下:
(1)創(chuàng)建臨時目錄,并在臨時目錄中創(chuàng)建編號為“0-0”的WAL日志文件,WAL日志文件名由兩部分組成,一部分是seq(單調遞增),另一部分是該日志文件中的第一條日志記錄的索引值。
(2)嘗試為該WAL日志文件預分配磁盤空間。
(3)向該WAL日志文件中寫入一條crcType類型的日志記錄、一條metadataType類型的日志記錄及一條snapshotType類型的日志記錄。
(4)創(chuàng)建WAL實例關聯(lián)的filePipeline實例。
(5)將臨時目錄重命名為WAL.dir字段指定的名稱。這里之所以先使用臨時目錄完成初始化操作再將其重命名的方式,主要是為了讓整個初始化過程看上去是一個原子操作。這樣上層模塊只需要檢查wal的目錄是否存在。
wal.Create()方法的具體實現(xiàn)如下:
- // Create creates a WAL ready for appending records. The given metadata is
- // recorded at the head of each WAL file, and can be retrieved(檢索) with ReadAll.
- func Create(dirpath string, metadata []byte) (*WAL, error) {
- // keep temporary wal directory so WAL initialization appears atomic
- //先使用臨時目錄完成初始化操作再將其重命名的方式,主要是為了讓整個初始化過程看上去是一個原子操作。
- tmpdirpath := filepath.Clean(dirpath) + ".tmp"
- if fileutil.Exist(tmpdirpath) {
- if err := os.RemoveAll(tmpdirpath); err != nil {
- return nil, err
- }
- }
- if err := fileutil.CreateDirAll(tmpdirpath); err != nil {
- return nil, err
- }
- // dir/filename ,filename從walName獲取 seq-index.wal
- p := filepath.Join(tmpdirpath, walName(0, 0))
- // 創(chuàng)建對文件上互斥鎖
- f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
- if err != nil {
- return nil, err
- }
- // 定位到文件末尾
- if _, err = f.Seek(0, io.SeekEnd); err != nil {
- return nil, err
- }
- // 預分配文件,大小為SegmentSizeBytes(64MB)
- if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {
- return nil, err
- }
- // 新建WAL結構
- w := &WAL{
- dir: dirpath,
- metadata: metadata,// metadata 可為nil
- }
- // 在這個wal文件上創(chuàng)建一個encoder
- w.encoder, err = newFileEncoder(f.File, 0)
- if err != nil {
- return nil, err
- }
- // 把這個上了互斥鎖的文件加入到locks數(shù)組中
- w.locks = append(w.locks, f)
- if err = w.saveCrc(0); err != nil {
- return nil, err
- }
- // 將metadataType類型的record記錄在wal的header處
- if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
- return nil, err
- }
- // 保存空的snapshot
- if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
- return nil, err
- }
- // 之前以.tmp結尾的文件,初始化完成之后重命名
- if w, err = w.renameWal(tmpdirpath); err != nil {
- return nil, err
- }
- // directory was renamed; sync parent dir to persist rename
- pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
- if perr != nil {
- return nil, perr
- }
- // 將parent dir 進行同步到磁盤
- if perr = fileutil.Fsync(pdir); perr != nil {
- return nil, perr
- }
- return w, nil
- }
WAL日志文件遵循一定的命名規(guī)則,由walName實現(xiàn),格式為"序號--raft日志索引.wal"。
- // 根據(jù)seq和index產生wal文件名
- func walName(seq, index uint64) string {
- return fmt.Sprintf("%016x-%016x.wal", seq, index)
- }
在創(chuàng)建的過程中,Create函數(shù)還向WAL日志中寫入了兩條數(shù)據(jù),一條就是記錄metadata,一條是記錄snapshot,WAL中的數(shù)據(jù)都是以Record為單位保存的,結構定義如下:
- // 存儲在wal穩(wěn)定存儲中的消息一共有兩種,這是第一種普通記錄的格式
- type Record struct {
- Type int64 `protobuf:"varint,1,opt,name=type" json:"type"`
- Crc uint32 `protobuf:"varint,2,opt,name=crc" json:"crc"`
- Data []byte `protobuf:"bytes,3,opt,name=data" json:"data,omitempty"`
- XXX_unrecognized []byte `json:"-"`
- }
Record類型
其中Type字段表示該Record的類型,取值可以是以下幾種:
- const (
- metadataType int64 = iota + 1
- entryType
- stateType
- crcType
- snapshotType
- // warnSyncDuration is the amount of time allotted to an fsync before
- // logging a warning
- warnSyncDuration = time.Second
- )
對應于raft中的Snapshot(應用狀態(tài)機的Snapshot),WAL中也會記錄一些Snapshot的信息(但是它不會記錄完整的應用狀態(tài)機的Snapshot數(shù)據(jù)),WAL中的Snapshot格式定義如下:
- // 存儲在wal中的第二種Record消息,snapshot
- type Snapshot struct {
- Index uint64 `protobuf:"varint,1,opt,name=index" json:"index"`
- Term uint64 `protobuf:"varint,2,opt,name=term" json:"term"`
- XXX_unrecognized []byte `json:"-"`
- }
在保存Snapshot的(注意這里的Snapshot是WAL里的Record類型,不是raft中的應用狀態(tài)機的Snapshot)SaveSnapshot函數(shù)中:
- // 持久化walpb.Snapshot
- func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
- // pb序列化,此時的e可為空的
- b := pbutil.MustMarshal(&e)
- w.mu.Lock()
- defer w.mu.Unlock()
- // 創(chuàng)建snapshotType類型的record
- rec := &walpb.Record{Type: snapshotType, Data: b}
- // 持久化到wal中
- if err := w.encoder.encode(rec); err != nil {
- return err
- }
- // update enti only when snapshot is ahead of last index
- if w.enti < e.Index {
- // index of the last entry saved to the wal
- // e.Index來自應用狀態(tài)機的Index
- w.enti = e.Index
- }
- // 同步刷新磁盤
- return w.sync()
- }
一條Record需要先把序列化后才能持久化,這個是通過encode函數(shù)完成的(encoder.go),代碼如下:
- // 將Record序列化后持久化到WAL文件
- func (e *encoder) encode(rec *walpb.Record) error {
- e.mu.Lock()
- defer e.mu.Unlock()
- e.crc.Write(rec.Data)
- // 生成數(shù)據(jù)的crc
- rec.Crc = e.crc.Sum32()
- var (
- data []byte
- err error
- n int
- )
- if rec.Size() > len(e.buf) {
- // 如果超過預分配的buf,就使用動態(tài)分配
- data, err = rec.Marshal()
- if err != nil {
- return err
- }
- } else {
- // 否則就使用與分配的buf
- n, err = rec.MarshalTo(e.buf)
- if err != nil {
- return err
- }
- data = e.buf[:n]
- }
- lenField, padBytes := encodeFrameSize(len(data))
- // 先寫recode編碼后的長度
- if err = writeUint64(e.bw, lenField, e.uint64buf); err != nil {
- return err
- }
- if padBytes != 0 {
- // 如果有追加數(shù)據(jù)(對齊需求)
- data = append(data, make([]byte, padBytes)...)
- }
- // 寫recode內容
- _, err = e.bw.Write(data)
- return err
- }
從代碼可以看到,一個Record被序列化之后(這里為protobuf格式),會以一個Frame的格式持久化。Frame首先是一個長度字段(encodeFrameSize完成,在encoder.go文件),64bit,56bit存數(shù)據(jù)。其中MSB表示這個Frame是否有padding字節(jié),接下來才是真正的序列化后的數(shù)據(jù)。一般一個page是4096字節(jié),對齊到8字節(jié),不會出現(xiàn)一個double被拆到兩個page的情況,在cache中,也不會被拆開:
- func encodeFrameSize(dataBytes int) (lenField uint64, padBytes int) {
- lenField = uint64(dataBytes)
- // force 8 byte alignment so length never gets a torn write
- padBytes = (8 - (dataBytes % 8)) % 8
- if padBytes != 0 {
- // lenField的高56記錄padding的長度
- lenField |= uint64(0x80|padBytes) << 56 // 最高位為1用于表示含有padding,方便在decode的時候判斷
- }
- return lenField, padBytes
- }
最終,下圖展示了包含了兩個WAL文件的示例圖。
filePipeline類型
filePipeline采用“餓漢式”,即提前創(chuàng)建一些文件備用,這樣可以加快文件的創(chuàng)建速度。filePipeline它負責預創(chuàng)建日志文件并為日志文件預分配空間。在filePipeline中會啟動一個獨立的后臺goroutine來創(chuàng)建“.tmp”結尾的臨時文件,當進行日志文件切換時,直接將臨時文件進行重命名即可使用。結構體filePipeline中各個字段的含義如下。
dir(string類型):存放臨時文件的目錄。
size (int64 類型):創(chuàng)建臨時文件時預分配空間的大小,默認是 64MB (由wal.SegmentSizeBytes指定,該值也是每個日志文件的大小)。
count(int類型):當前filePipeline實例創(chuàng)建的臨時文件數(shù)。
filec(chan*fileutil.LockedFile 類型):新建的臨時文件句柄會通過 filec 通道返回給WAL實例使用。
errc(chan error類型):當創(chuàng)建臨時文件出現(xiàn)異常時,則將異常傳遞到errc通道中。
donec(chan struct{}類型):當filePipeline.Close()被調用時會關閉donec通道,從而通知filePipeline實例刪除最后一次創(chuàng)建的臨時文件。
在newFilePipeline()方法中,除了創(chuàng)建filePipeline實例,還會啟動一個后臺goroutine來執(zhí)行filePipeline.run()方法,該后臺goroutine中會創(chuàng)建新的臨時文件并將其句柄傳遞到filec通道中。filePipeline.run()方法的具體實現(xiàn)如下:
- // filePipeline pipelines allocating disk space
- type filePipeline struct {
- // dir to put files
- dir string
- // size of files to make, in bytes
- size int64
- // count number of files generated
- count int
- filec chan *fileutil.LockedFile
- errc chan error
- donec chan struct{}
- }
- func newFilePipeline(dir string, fileSize int64) *filePipeline {
- fp := &filePipeline{
- dir: dir,
- size: fileSize,
- filec: make(chan *fileutil.LockedFile),
- errc: make(chan error, 1),
- donec: make(chan struct{}),
- }
- // 一直執(zhí)行預分配
- go fp.run()
- return fp
- }
- // Open returns a fresh file for writing. Rename the file before calling
- // Open again or there will be file collisions.
- func (fp *filePipeline) Open() (f *fileutil.LockedFile, err error) {
- select {
- case f = <-fp.filec: // 從filec通道中獲取文件描述符,并返回
- case err = <-fp.errc: // 如果創(chuàng)建失敗,從errc通道中獲取,并返回
- }
- return f, err
- }
- func (fp *filePipeline) Close() error {
- close(fp.donec)
- return <-fp.errc //出現(xiàn)錯誤,關閉donec通道并向errc通到中發(fā)送錯誤
- }
- func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
- // count % 2 so this file isn't the same as the one last published
- // 創(chuàng)建臨時文件的編號是0或者1。
- fpath := filepath.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
- //創(chuàng)建臨時文件,注意文件的模式與權限。
- if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
- return nil, err
- }
- // 嘗試預分配,如果當前文件系統(tǒng)不支持預分配空間,則不會報錯。
- if err = fileutil.Preallocate(f.File, fp.size, true); err != nil {
- f.Close() //如果出現(xiàn)異常,則會關閉donec通道
- return nil, err
- }
- // 已經(jīng)分配的文件個數(shù)
- fp.count++
- return f, nil //返回創(chuàng)建的臨時文件
- }
- // goroutine
- func (fp *filePipeline) run() {
- defer close(fp.errc)
- for {
- // 調用alloc()執(zhí)行創(chuàng)建臨時文件
- f, err := fp.alloc()
- if err != nil {
- fp.errc <- err
- return
- }
- select {
- case fp.filec <- f: // 等待消費方從這個channel中取出這個預創(chuàng)建的被鎖的文件
- case <-fp.donec: //關閉時觸發(fā),刪除最后一次創(chuàng)建的臨時文件
- os.Remove(f.Name())
- f.Close()
- return
- }
- }
- }
本文轉載自微信公眾號「 運維開發(fā)故事」,可以通過以下二維碼關注。轉載本文請聯(lián)系 運維開發(fā)故事公眾號。