從零動手寫數(shù)據(jù)庫系統(tǒng):數(shù)據(jù)庫系統(tǒng)的日志模塊實現(xiàn)
任何一個應(yīng)用只要冠以”系統(tǒng)“二字,那么它一定離不開一個模塊,那就是”日志“。既然我們要開發(fā)一個數(shù)據(jù)庫系統(tǒng),那么它必然要有自己的日志模塊。日志通常用于記錄系統(tǒng)的運行狀態(tài),有點類似于快照,一旦系統(tǒng)出現(xiàn)異常,那么管理員或者它的代碼本身可以通過掃描分析日志來確定問題所在,或者通過日志執(zhí)行錯誤恢復(fù),這點對數(shù)據(jù)庫系統(tǒng)更加重要。
數(shù)據(jù)庫系統(tǒng)經(jīng)常要往文件中讀寫大量數(shù)據(jù),在這個過程中很容易出現(xiàn)各種各樣的問題,例如在執(zhí)行一個交易時,網(wǎng)絡(luò)突然斷開,機器突然斷電,于是交易執(zhí)行到一半就會突然中斷,當(dāng)系統(tǒng)重新啟動時,整個數(shù)據(jù)庫就會處于一種錯誤狀態(tài),也就是有一部數(shù)據(jù)寫入,但還有一部分?jǐn)?shù)據(jù)丟失,這種情況對數(shù)據(jù)庫系統(tǒng)而言非常致命,倘若不能保證數(shù)據(jù)的一致性,那么這種數(shù)據(jù)系統(tǒng)就不會有人敢使用。那如何保證數(shù)據(jù)一致性呢,這就得靠日志來保證,數(shù)據(jù)庫在讀寫數(shù)據(jù)前,會先寫入日志,記錄相應(yīng)的操作,例如當(dāng)前操作是讀還是寫,然后記錄要讀寫的數(shù)據(jù)。
假設(shè)我們現(xiàn)在有個業(yè)務(wù),要把一百行數(shù)據(jù)寫入兩個表,前50行寫入表1,后50行寫入表2,于是日志就會記錄”表1寫入0到50行“;”表2寫入51到100行“,這類信息。假設(shè)在數(shù)據(jù)寫入前50行后突然斷電,機器重啟,數(shù)據(jù)庫系統(tǒng)重新啟動后,它自動掃描日志發(fā)現(xiàn)”表2寫入51到100行“這個操作沒有執(zhí)行,于是再次執(zhí)行這個操作,這樣數(shù)據(jù)的一致性就能得以保證。
本節(jié)我們在上一節(jié)實現(xiàn)文件系統(tǒng)的基礎(chǔ)上,看看如何實現(xiàn)日志模塊。對于日志模塊而言,日志就是一組字節(jié)數(shù)組,它只負責(zé)把數(shù)組內(nèi)容寫入內(nèi)存或是磁盤文件,數(shù)據(jù)到底有什么內(nèi)容,格式如何解析它一概不管。同時在日志寫入時采用”壓棧“模式,假設(shè)我們有3條日志,其長度分別為50字節(jié),100字節(jié),100字節(jié),現(xiàn)在我們有400字節(jié)的緩存可以寫入,那么寫入日志時我們將從緩存的末尾開始寫,例如存入第一條日志時,我們從緩存第350字節(jié)開始寫入,于是350字節(jié)到400字節(jié)就對應(yīng)第一條日志,然后我們把當(dāng)前可寫入的地址放置到緩存的開頭8字節(jié),例如第一條日志寫入后,下次可寫入的地址是350,于是我們在緩存開頭8字節(jié)存入數(shù)據(jù)350,當(dāng)要寫入第二條日志時,我們讀取緩存前8字節(jié),拿到數(shù)值350,由于第二條緩存長度100字節(jié),于是我們將其寫入緩存的地址為350-100=250,于是寫入后緩存的250到350部分的內(nèi)容就對應(yīng)第二條日志,然后我們將250寫入緩存開頭8字節(jié);當(dāng)寫入第三條日志時,系統(tǒng)讀取開頭8字節(jié)得到數(shù)值250,于是第三條日志的寫入地址就是250-100=150,于是系統(tǒng)將第三條日志寫入緩存偏移150字節(jié)處,于是從150字節(jié)到250字節(jié)這部分的數(shù)據(jù)就對應(yīng)第3條日志,同時把150存入開頭8字節(jié),以此類推。
廢話不多說,我們看看具體代碼實現(xiàn),首先創(chuàng)建文件夾log_manager,加入log_manager.go,輸入以下代碼:
用就是將寫入的日志先存儲在內(nèi)存塊中,一旦當(dāng)前內(nèi)存塊寫滿則將其寫入磁盤文件,然后生成新的內(nèi)存塊用于寫入新日志。每次日志管理器啟動時,它根據(jù)給定的目錄讀取目錄下的二進制文件,將文件尾部的區(qū)塊讀入內(nèi)存,這樣就能得到文件存儲的日志數(shù)據(jù)。
package log_manager
import (
fm "file_manager"
"sync"
)
const (
UINT64_LEN = 8
)
type LogManager struct {
file_manager *fm.FileManager
log_file string
log_page *fm.Page
current_blk *fm.BlockId
latest_lsn uint64 //當(dāng)前日志序列號
last_saved_lsn uint64 //上次存儲到磁盤的日志序列號
mu sync.Mutex
}
func (l *LogManager) appendNewBlock() (*fm.BlockId, error) {
blk, err := l.file_manager.Append(l.log_file)
if err != nil {
return nil, err
}
/*
添加日志時從內(nèi)存的底部往上走,例如內(nèi)存400字節(jié),日志100字節(jié),那么
日志將存儲在內(nèi)存的300到400字節(jié)處,因此我們需要把當(dāng)前內(nèi)存可用底部偏移
寫入頭8個字節(jié)
*/
l.log_page.SetInt(0, uint64(l.file_manager.BlockSize()))
l.file_manager.Write(&blk, l.log_page)
return &blk, nil
}
func NewLogManager(file_manager *fm.FileManager, log_file string) (*LogManager, error) {
log_mgr := LogManager{
file_manager: file_manager,
log_file: log_file,
log_page: fm.NewPageBySize(file_manager.BlockSize()),
last_saved_lsn: 0,
latest_lsn: 0,
}
log_size, err := file_manager.Size(log_file)
if err != nil {
return nil, err
}
if log_size == 0 { //如果文件為空則添加新區(qū)塊
blk, err := log_mgr.appendNewBlock()
if err != nil {
return nil, err
}
log_mgr.current_blk = blk
} else { //文件有數(shù)據(jù),則在文件末尾的區(qū)塊讀入內(nèi)存,最新的日志總會存儲在文件末尾
log_mgr.current_blk = fm.NewBlockId(log_mgr.log_file, log_size-1)
file_manager.Read(log_mgr.current_blk, log_mgr.log_page)
}
return &log_mgr, nil
}
func (l *LogManager) FlushByLSN(lsn uint64) error {
/*
將給定編號及其之前的日志寫入磁盤,注意這里會把與給定日志在同一個區(qū)塊,也就是Page中的
日志也寫入磁盤。例如調(diào)用FlushLSN(65)表示把編號65及其之前的日志寫入磁盤,如果編號為
66,67的日志也跟65在同一個Page里,那么它們也會被寫入磁盤
*/
if lsn > l.last_saved_lsn {
err := l.Flush()
if err != nil {
return err
}
l.last_saved_lsn = lsn
}
return nil
}
func (l *LogManager) Flush() error {
//將當(dāng)前區(qū)塊數(shù)據(jù)寫入寫入磁盤
_, err := l.file_manager.Write(l.current_blk, l.log_page)
if err != nil {
return err
}
return nil
}
func (l *LogManager) Append(log_record []byte) (uint64, error) {
//添加日志
l.mu.Lock()
defer l.mu.Unlock()
boundary := l.log_page.GetInt(0) //獲得可寫入的底部偏移
record_size := uint64(len(log_record))
bytes_need := record_size + UINT64_LEN
var err error
if int(boundary-bytes_need) < int(UINT64_LEN) {
//當(dāng)前容量不夠,先將當(dāng)前日志寫入磁盤
err = l.Flush()
if err != nil {
return l.latest_lsn, err
}
//生成新區(qū)塊用于寫新數(shù)據(jù)
l.current_blk, err = l.appendNewBlock()
if err != nil {
return l.latest_lsn, err
}
boundary = l.log_page.GetInt(0)
}
record_pos := boundary - bytes_need //我們從底部往上寫入
l.log_page.SetBytes(record_pos, log_record) //設(shè)置下次可以寫入的位置
l.log_page.SetInt(0, record_pos)
l.latest_lsn += 1 //記錄新加入日志的編號
return l.latest_lsn, err
}
func (l *LogManager) Iterator() *LogIterator {
//生成日志遍歷器
l.Flush()
return NewLogIterator(l.file_manager, l.current_blk)
}
為了更好的遍歷日志,我們要構(gòu)造一個日志遍歷器,在同一個目錄下創(chuàng)建log_iterator.go,然后寫入以下內(nèi)容:
package log_manager
import (
fm "file_manager"
)
/*
LogIterator用于遍歷給定區(qū)塊內(nèi)的記錄,由于記錄從底部往上寫,因此記錄1,2,3,4寫入后在區(qū)塊的排列為
4,3,2,1,因此LogIterator會從上往下遍歷記錄,于是得到的記錄就是4,3,2,1
*/
type LogIterator struct {
file_manager *fm.FileManager
blk *fm.BlockId
p *fm.Page
current_pos uint64
boundary uint64
}
func NewLogIterator(file_manager *fm.FileManager, blk *fm.BlockId) *LogIterator{
it := LogIterator{
file_manager: file_manager,
blk: blk ,
}
//現(xiàn)將給定區(qū)塊的數(shù)據(jù)讀入
it.p = fm.NewPageBySize(file_manager.BlockSize())
err := it.moveToBlock(blk)
if err != nil {
return nil
}
return &it
}
func (l *LogIterator) moveToBlock(blk *fm.BlockId) error {
//打開存儲日志數(shù)據(jù)的文件,遍歷到給定區(qū)塊,將數(shù)據(jù)讀入內(nèi)存
_, err := l.file_manager.Read(blk, l.p)
if err != nil {
return err
}
//獲得日志的起始地址
l.boundary = l.p.GetInt(0)
l.current_pos = l.boundary
return nil
}
func (l *LogIterator) Next() []byte {
//先讀取最新日志,也就是編號大的,然后依次讀取編號小的
if l.current_pos == l.file_manager.BlockSize() {
l.blk = fm.NewBlockId(l.blk.FileName(), l.blk.Number() - 1)
l.moveToBlock(l.blk)
}
record := l.p.GetBytes(l.current_pos)
l.current_pos += UINT64_LEN + uint64(len(record))
return record
}
func (l *LogIterator) HasNext() bool {
//如果當(dāng)前偏移位置小于區(qū)塊大那么還有數(shù)據(jù)可以從當(dāng)前區(qū)塊讀取
//如果當(dāng)前區(qū)塊數(shù)據(jù)已經(jīng)全部讀完,但是區(qū)塊號不為0,那么可以讀取前面區(qū)塊獲得老的日志數(shù)據(jù)
return l.current_pos < l.file_manager.BlockSize() || l.blk.Number() > 0
}
日志遍歷器的作用是逐條讀取日志,它先從最新的日志開始讀取,然后依次獲取老的日志。最后我們通過測試用例來理解當(dāng)前代碼的作用和邏輯,添加log_manager_test.go,代碼如下:
package log_manager
import (
fm "file_manager"
"fmt"
"github.com/stretchr/testify/require"
"testing"
)
func makeRecord(s string, n uint64) []byte {
//使用page提供接口來設(shè)置字節(jié)數(shù)組的內(nèi)容
p := fm.NewPageBySize(1)
npos := p.MaxLengthForString(s)
b := make([]byte, npos+UINT64_LEN)
p = fm.NewPageByBytes(b)
p.SetString(0, s)
p.SetInt(npos, n)
return b
}
func createRecords(lm *LogManager, start uint64, end uint64) {
for i := start; i <=
end; i++ {
//一條記錄包含兩個信息,一個是字符串record 一個是數(shù)值i
rec := makeRecord(fmt.Sprintf("record%d", i), i)
lm.Append(rec)
}
}
func TestLogManager(t *testing.T) {
file_manager, _ := fm.NewFileManager("logtest", 400)
log_manager, err := NewLogManager(file_manager, "logfile")
require.Nil(t, err)
createRecords(log_manager, 1, 35)
iter := log_manager.Iterator()
rec_num := uint64(35)
for iter.HasNext() {
rec := iter.Next()
p := fm.NewPageByBytes(rec)
s := p.GetString(0)
require.Equal(t, fmt.Sprintf("record%d", rec_num), s)
npos := p.MaxLengthForString(s)
val := p.GetInt(npos)
require.Equal(t, val, rec_num)
rec_num -= 1
}
createRecords(log_manager, 36, 70)
log_manager.FlushByLSN(65)
iter = log_manager.Iterator()
rec_num = uint64(70)
for iter.HasNext() {
rec := iter.Next()
p := fm.NewPageByBytes(rec)
s := p.GetString(0)
require.Equal(t, fmt.Sprintf("record%d", rec_num), s)
npos := p.MaxLengthForString(s)
val := p.GetInt(npos)
require.Equal(t, val, rec_num)
rec_num -= 1
}
}
用例首先創(chuàng)建日志管理器,然后寫入35條日志,每條日志后面又跟著日志的編號。例如第35條日志的內(nèi)容為”record35”,這個字符串會以字節(jié)數(shù)組的方式寫入到區(qū)塊中,然后再把讀入的數(shù)據(jù)重新讀取,同時判斷讀取的數(shù)據(jù)與寫入的是否一致。