LSM-TREE從入門到入魔:從零開始實(shí)現(xiàn)一個(gè)高性能鍵值存儲(chǔ)
目錄
一、引言
二、LSM-Treee 核心功能概述
三、核心功能實(shí)現(xiàn)
1.MemTable 實(shí)現(xiàn)
2.SSTable
3.Write
4.Iterators
5.Read/Scan
6.壓縮
四、總結(jié)
一.引 言
LSM-Tree(Log-Structured Merge Tree)是一種高效的鍵值存儲(chǔ)數(shù)據(jù)結(jié)構(gòu),廣泛應(yīng)用于NoSQL數(shù)據(jù)庫(kù)和大數(shù)據(jù)處理系統(tǒng)中。其核心思想是通過分層、有序地利用磁盤順序?qū)懭氲男阅軆?yōu)勢(shì),優(yōu)化寫入操作,同時(shí)犧牲部分讀取性能以換取更高的寫入吞吐量。
圖片
圖片
在互聯(lián)網(wǎng)的各個(gè)基礎(chǔ)設(shè)施中,不論是數(shù)據(jù)庫(kù)還是緩存亦或是大數(shù)據(jù)框架,LSM-Tree這個(gè)數(shù)據(jù)結(jié)構(gòu)都是很常見的身影。
我每天都在使用這個(gè)存儲(chǔ)引擎,但是對(duì)它的了解還流于表面,所以我想要自己實(shí)現(xiàn)一次LSM-Tree加深理解。
本次實(shí)現(xiàn)我們采用了Zig語言,簡(jiǎn)要的實(shí)現(xiàn)LSM-Tree的核心功能(讀寫、數(shù)據(jù)壓縮、持久化,不包含MVCC的內(nèi)容)。
Zig是一種新興的系統(tǒng)編程語言,其設(shè)計(jì)目標(biāo)是提供現(xiàn)代特性的同時(shí)保持低復(fù)雜性。
本項(xiàng)目極大的受到了Mini-Lsm這個(gè)項(xiàng)目的啟發(fā),強(qiáng)烈推薦大家學(xué)習(xí)這個(gè)項(xiàng)目!
二.LSM-Treee 核心功能概述
在開始自己編寫之前,我先簡(jiǎn)單介紹一下LSM-Tree(Log-Structured Merge Tree)的架構(gòu)以及讀寫流程。
LSM-Tree它結(jié)合了日志和索引的特點(diǎn),優(yōu)化了寫入和讀取性能。每次寫入都是采用append-only的方式,所以寫入性能很高。
而作為代價(jià),追加寫入會(huì)造成存儲(chǔ)放大,LSM-Tree時(shí)采用了多層SSTable的方式將數(shù)據(jù)堆疊在硬盤上。所以需要一個(gè)合并壓縮的過程來回收過多的空間。
圖片
寫流程 | 讀流程 |
|
|
三、核心功能實(shí)現(xiàn)
MemTable 實(shí)現(xiàn)
首先,我們先實(shí)現(xiàn) LSM 存儲(chǔ)引擎的內(nèi)存結(jié)構(gòu)—Memtable。我們選擇跳表實(shí)現(xiàn)作為 Memtable 的數(shù)據(jù)結(jié)構(gòu),因?yàn)樗С譄o鎖的并發(fā)讀寫。我們不會(huì)深入介紹跳表的工作原理(Redis的同學(xué)應(yīng)該不陌生這個(gè)東西),簡(jiǎn)單來說,它是一個(gè)易于實(shí)現(xiàn)的有序鍵值映射。
圖片
Skiplist的實(shí)現(xiàn)非常簡(jiǎn)單,這里我利用Zig編譯時(shí)的能力實(shí)現(xiàn)了一個(gè)泛型版本的跳表src/skiplist.zig,有興趣的小伙伴可以直接去倉(cāng)庫(kù)中參觀代碼。
基于SkipList的能力,我們即可包裝出Memtable的基本功能。
我們這個(gè)LSM支持WAL功能的,即寫入內(nèi)存表之前要先寫入磁盤日志,方便在意外宕機(jī)重啟后可以恢復(fù)數(shù)據(jù)。
WAL的能力我就不想自己再實(shí)現(xiàn)了,于是從網(wǎng)上扒了一個(gè)C的實(shí)現(xiàn)(Zig集成C語言非常便捷,可以參考與 C 交互)。
map: Map,
lock: RwLock,
wal: ?Wal,
id: usize,
allocator: std.mem.Allocator,
arena: std.heap.ArenaAllocator,
approximate_size: atomic.Value(usize) = atomic.Value(usize).init(0),
fn putToList(self: *Self, key: []const u8, value: []const u8) !void {
{
self.lock.lock();
defer self.lock.unlock();
try self.map.insert(kk, vv);
}
_ = self.approximate_size.fetchAdd(@intCast(key.len + value.len), .monotonic);
}
fn putToWal(self: *Self, key: []const u8, value: []const u8) !void {
// [key-size: 4bytes][key][value-size: 4bytes][value]
if (self.wal) |w| {
var buf = std.ArrayList(u8).init(self.arena.allocator());
var bw = buf.writer();
try bw.writeInt(u32, @intCast(key.len), .big);
_ = try bw.write(key);
try bw.writeInt(u32, @intCast(value.len), .big);
_ = try bw.write(value);
try w.append(buf.items);
}
}
// 寫入Memtable,先寫WAL,再寫skiplist table
pub fn put(self: *Self, key: []const u8, value: []const u8) !void {
try self.putToWal(key, value);
try self.putToList(key, value);
}
pub fn get(self: *Self, key: []const u8, val: *[]const u8) !bool {
self.lock.lockShared();
defer self.lock.unlockShared();
var vv: []const u8 = undefined;
if (try self.map.get(key, &vv)) {
val.* = vv;
return true;
}
return false;
}
注意到這里我們沒有實(shí)現(xiàn)刪除的功能,這里我仿照了RocksDB中的墓碑機(jī)制,用空值代表刪除,所以刪除被put(key, "")取代。
SSTable
接下來,我們就著手開始實(shí)現(xiàn)LSM中另外一個(gè)重要元素 --- SSTable。
SSTable(Sorted String Table)是一種不可變的(Immutable)磁盤文件,內(nèi)部按Key有序排列,存儲(chǔ)鍵值對(duì)數(shù)據(jù)。每個(gè)SSTable文件生成后不再修改,更新和刪除操作通過追加新記錄或標(biāo)記刪除,最終通過合并(Compaction)清理冗余數(shù)據(jù)。
每當(dāng)LSM-Tree中的MemTable體積超出閾值,就會(huì)將內(nèi)存中的數(shù)據(jù)寫入SsTable。
圖片
每個(gè)SSTable由多個(gè)Block組成,每個(gè)Block是一組KV的package。
Block的編碼格式如下:
圖片
為了構(gòu)建一個(gè)Block,我們實(shí)現(xiàn)了一個(gè)BlockBuilder的模塊,這部分代碼見src/block.zig:
pub const Block = struct {
data_v: std.ArrayList(u8),
offset_v: std.ArrayList(u16),
}
pub const BlockBuilder = struct {
allocator: std.mem.Allocator,
offset_v: std.ArrayList(u16),
data_v: std.ArrayList(u8),
block_size: usize,
first_key: []u8,
pub fn add(self: *Self, key: []const u8, value: ?[]const u8) !bool {
std.debug.assert(key.len > 0); // key must not be empty
const vSize = if (value) |v| v.len else 0;
if ((self.estimated_size() + key.len + vSize + 3 * @sizeOf(u16) > self.block_size) and !self.is_empty()) {
return false;
}
try self.doAdd(key, value);
if (self.first_key.len == 0) {
self.first_key = try self.allocator.dupe(u8, key);
}
return true;
}
fn doAdd(self: *Self, key: []const u8, value: ?[]const u8) !void {
// add the offset of the data into the offset array
try self.offset_v.append(@intCast(self.data_v.items.len));
const overlap = calculate_overlap(self.first_key, key);
var dw = self.data_v.writer();
// encode key overlap
try dw.writeInt(u16, @intCast(overlap), .big);
// encode key length
try dw.writeInt(u16, @intCast(key.len - overlap), .big);
// encode key content
_ = try dw.write(key[overlap..]);
// encode value length
if (value) |v| {
try dw.writeInt(u16, @intCast(v.len), .big);
// encode value content
_ = try dw.write(v);
} else {
try dw.writeInt(u16, 0, .big);
}
}
pub fn build(self: *Self) !Block {
if (self.isEmpty()) {
@panic("block is empty");
}
return Block.init(
try self.data_v.clone(),
try self.offset_v.clone(),
);
}
}
可能有同學(xué)注意到,我們寫key的時(shí)候沒有直接將key值寫入,而且只寫了key與當(dāng)前block的第一個(gè)key不重疊的suffix部分。由于block中的key都是有序的,所以一個(gè)block中的key有很大概率是前綴類似的,所以這里是一個(gè)空間優(yōu)化的小技巧,例如:
Key: foo, foo1, foo2, foo3 ....
我們寫入block時(shí),只需要寫:
foo|1|2|3|....
很多有序表的實(shí)現(xiàn)中都會(huì)用到這個(gè)小技巧。
有了block的實(shí)現(xiàn),我們可以進(jìn)一步來定義SSTable的格式。一個(gè)SSTable由多個(gè)Block、block元數(shù)據(jù)以及布隆過濾器構(gòu)成。
圖片
布隆過濾器是一種概率性數(shù)據(jù)結(jié)構(gòu),用于維護(hù)一組鍵。您可以向布隆過濾器中添加鍵,并且可以知道在添加到布隆過濾器中的鍵集中可能存在或必須不存在的鍵。
在SSTable中添加布隆過濾器可以有效提升查詢key的效率。
元數(shù)據(jù)包含了block的第一個(gè)與最后一個(gè)key以及block在sst中的offset信息,記錄元數(shù)據(jù)主要為了在后續(xù)的檢索中可以快速定位某個(gè)key落在哪個(gè)block中。
同樣的套路,為了構(gòu)建SSTable,我們先實(shí)現(xiàn)一個(gè)SSTableBuilder,部分代碼見src/ss_table.zig
pub const SsTableBuilder = struct {
allocator: std.mem.Allocator,
builder: BlockBuilder, // 剛才實(shí)現(xiàn)的block構(gòu)建裝置
first_key: ?[]const u8,
last_key: ?[]const u8,
meta: std.ArrayList(BlockMeta),
block_size: usize,
data: std.ArrayList(u8),
bloom: BloomFilterPtr, // 布隆過濾器
pub fn add(self: *Self, key: []const u8, value: []const u8) !void {
try self.setFirstKey(key);
try self.bloom.get().insert(key); // 寫入布隆過濾器
if (try self.builder.add(key, value)) {
try self.setLastKey(key);
return;
}
// block is full
try self.finishBlock();
std.debug.assert(try self.builder.add(key, value));
try self.resetFirstKey(key);
try self.setLastKey(key);
}
// 寫入一個(gè)block的數(shù)據(jù)
fn finishBlock(self: *Self) !void {
if (self.builder.isEmpty()) {
return;
}
var bo = self.builder;
// reset block
defer bo.reset();
self.builder = BlockBuilder.init(self.allocator, self.block_size);
var blk = try bo.build();
defer blk.deinit();
const encoded_block = try blk.encode(self.allocator); // block序列化
defer self.allocator.free(encoded_block);
// 記錄block的元數(shù)據(jù)
try self.meta.append(.{
.allocator = self.allocator,
.offset = self.data.items.len,
.first_key = try self.allocator.dupe(u8, self.first_key.?),
.last_key = try self.allocator.dupe(u8, self.last_key.?),
});
const cksm = hash.Crc32.hash(encoded_block); // 寫入4b的校驗(yàn)值
try self.data.appendSlice(encoded_block);
try self.data.writer().writeInt(u32, cksm, .big);
}
// 構(gòu)建為一個(gè)SSTable
pub fn build(
self: *Self,
id: usize,
block_cache: ?BlockCachePtr, // 讀取block數(shù)據(jù)的緩存,減少block的反序列化成本
path: []const u8,
) !SsTable {
var arena = std.heap.ArenaAllocator.init(self.allocator);
defer arena.deinit();
const allocator = arena.allocator();
try self.finishBlock();
const w = self.data.writer();
// 寫入元數(shù)據(jù)及其offset
const meta_offset = self.data.items.len;
const meta_b = try BlockMeta.batchEncode(self.meta.items, allocator);
_ = try w.write(meta_b);
try w.writeInt(u32, @intCast(meta_offset), .big);
// 寫入布隆過濾器及其offset
const bloom_offset = self.data.items.len;
const encoded_bloom = try self.bloom.get().encode(allocator);
_ = try w.write(encoded_bloom);
try w.writeInt(u32, @intCast(bloom_offset), .big);
const file = try FileObject.init(path, self.data.items);
errdefer file.deinit();
const fk = self.meta.items[0].first_key;
const lk = self.meta.getLast().last_key;
return .{
.allocator = self.allocator,
.file = file,
.block_metas = try self.meta.toOwnedSlice(),
.meta_offset = meta_offset,
.block_cache = block_cache,
.bloom = self.bloom.clone(),
.id = id,
.first_key = try self.allocator.dupe(u8, fk),
.last_key = try self.allocator.dupe(u8, lk),
.max_ts = 0,
};
}
}
Write
有了SSTable和MemTable,我們就有了LSM-Tree需要的兩個(gè)最重要的材料,后續(xù)的讀寫不過是對(duì)這兩類材料的組合拼裝。
在實(shí)現(xiàn)寫操作之前,我們先假想一下LSM-Tree的數(shù)據(jù)結(jié)構(gòu):
- 首先我們需要一個(gè)數(shù)據(jù)結(jié)構(gòu)存儲(chǔ)當(dāng)前MemTable、冷MemTables和多層的SST,如下圖所示。
圖片
- 其次我們需要一個(gè)鎖用于同步上述數(shù)據(jù)結(jié)構(gòu)的讀寫行為。
- 我們還需要一個(gè)SSTable的自增id。
- 最后還需要一些必要的配置,例如存儲(chǔ)路徑、線程管理器等。
最終,我們實(shí)現(xiàn)的LSM數(shù)據(jù)結(jié)構(gòu)如下:
pub const StorageState = struct {
allocator: std.mem.Allocator,
mem_table: MemTablePtr, // 當(dāng)前正在寫的MemTable
imm_mem_tables: std.ArrayList(MemTablePtr), // 冷MemTable數(shù)組
l0_sstables: std.ArrayList(usize), // 第一層的SSTable數(shù)組
levels: std.ArrayList(std.ArrayList(usize)), // 后續(xù)多層的SSTable數(shù)組
sstables: std.AutoHashMap(usize, SsTablePtr), // sst_id => SSTable
}
pub const StorageInner = struct {
const Self = @This();
allocator: std.mem.Allocator,
state: StorageState,
state_lock: std.Thread.RwLock = .{},
next_sst_id: atomic.Value(usize),
path: []const u8,
options: StorageOptions,
compaction_controller: CompactionController,
block_cache: BlockCachePtr,
terminate: std.Thread.ResetEvent = .{},
wg: std.Thread.WaitGroup = .{},
}
先不考慮逐層壓縮的邏輯,只考慮一層SSTable的簡(jiǎn)單情況,寫邏輯可以簡(jiǎn)化為如下流程:
圖片
- 寫入State中的MemTable
pub fn writeBatch(self: *Self, records: []const WriteBatchRecord) !void {
for (records) |record| {
switch (record) {
.put => |pp| {
try self.state.getMemTable().put(pp.key, pp.value);
},
.delete => |dd| {
// we use "" as the tombstone value
try self.state.getMemTable().put(dd, "");
},
}
// 嘗試把當(dāng)前MemTable壓入冷數(shù)據(jù)
try self.tryFreeze(self.state.getMemTable().getApproximateSize());
}
}
- 當(dāng)MemTable體積超出閾值,壓入冷MemTable數(shù)組,重置當(dāng)前MemTable
fn forceFreezeMemtable(self: *Self) !void {
const next_sst_id = self.getNextSstId();
// 生成一個(gè)新的MemTable
var new_mm: MemTable = undefined;
{
if (self.options.enable_wal) {
const mm_path = try pathOfWal(self.allocator, self.path, next_sst_id);
defer self.allocator.free(mm_path);
new_mm = MemTable.init(next_sst_id, self.allocator, mm_path);
} else {
new_mm = MemTable.init(next_sst_id, self.allocator, null);
}
}
errdefer new_mm.deinit();
var old_mm: *MemTable = undefined;
{
self.state_lock.lock();
defer self.state_lock.unlock();
var old_mm_ptr = self.state.mem_table;
old_mm = old_mm_ptr.get();
defer old_mm_ptr.release();
self.state.mem_table = try MemTablePtr.create(self.allocator, new_mm);
// 將寫滿的MemTable壓入冷數(shù)據(jù)
try self.state.imm_mem_tables.append(old_mm_ptr.clone()); // newer memtable is inserted at the end
}
// TIPS:把磁盤同步放在鎖的范圍外面,降低鎖的覆蓋
try old_mm.syncWal();
}
- 當(dāng)冷MemTable數(shù)組大小超出配置閾值,觸發(fā)SSTable落盤,彈出最冷的MemTable,寫入磁盤SSTable,并記錄在L0的SSTable數(shù)組中。這一過程是在一個(gè)線程中定時(shí)觸發(fā)
pub fn flushNextMemtable(self: *Self) !void {
std.debug.assert(self.state.imm_mem_tables.items.len > 0);
var to_flush_table: *MemTable = undefined;
{
self.state_lock.lockShared();
defer self.state_lock.unlockShared();
// oldest memtable is at the index 0
to_flush_table = self.state.imm_mem_tables.items[0].load();
}
// 將最冷的MemTable構(gòu)建為SSTable
var builder = try SsTableBuilder.init(self.allocator, self.options.block_size);
defer builder.deinit();
const sst_id = to_flush_table.id;
try to_flush_table.flush(&builder);
const sst_path = try self.pathOfSst(sst_id);
defer self.allocator.free(sst_path);
var sst = try builder.build(sst_id, self.block_cache.clone(), sst_path);
errdefer sst.deinit();
// add the flushed table to l0_sstables
{
self.state_lock.lock();
defer self.state_lock.unlock();
var m = self.state.imm_mem_tables.orderedRemove(0);
defer m.deinit();
std.debug.assert(m.load().id == sst_id);
// newest sstable is at the end
try self.state.l0_sstables.append(sst_id);
try self.state.sstables.put(sst.id, try SsTablePtr.create(self.allocator, sst));
}
}
當(dāng)然,這里只實(shí)現(xiàn)了一半的寫邏輯,數(shù)據(jù)停留在L0的SST中,后續(xù)的多層SST還沒有使用。
剩下一半的寫邏輯會(huì)在數(shù)據(jù)壓縮的章節(jié)中介紹。
Iterators
寫入的過程比較好理解,但是讀就略微復(fù)雜了,以上面我們實(shí)現(xiàn)的寫結(jié)果為例子,最終我們的數(shù)據(jù)沉淀在一個(gè)3層的數(shù)據(jù)結(jié)構(gòu)中,要如何高效的從其中檢索數(shù)據(jù)呢?
圖片
如同寫過程一般,讀過程也是對(duì)各個(gè)基礎(chǔ)單元(MemTable、SSTable、Block)讀過程的組合,為了方便組合邏輯,我們要先統(tǒng)一各個(gè)模塊的讀行為。
在LSM-Tree中,所有的讀行為都定義為了如下的Interface(Zig中沒trait或者Interface,所以這里實(shí)例代碼我用Rust描述):
pub trait StorageIterator {
/// Get the current value.
fn value(&self) -> &[u8];
/// Get the current key.
fn key(&self) -> &[u8];
/// Check if the current iterator is empty.
fn is_empty(&self) -> bool;
/// Move to the next position.
fn next(&mut self) -> anyhow::Result<()>;
/// Number of underlying active iterators for this iterator.
fn num_active_iterators(&self) -> usize {
1
}
}
我們首先對(duì)MemTable、SSTable、Block這些模塊實(shí)現(xiàn)讀接口,代碼可見:src/MemTable.zig,src/block.zig,src/ss_table.zig,這里單獨(dú)簡(jiǎn)單介紹下SSTable的讀接口實(shí)現(xiàn)思路,其他的模塊實(shí)現(xiàn)思路類似,感興趣的直接閱讀源碼即可。
pub const SsTableIterator = struct {
allocator: std.mem.Allocator,
table: SsTablePtr,
blk: BlockPtr,
blk_iterator: BlockIteratorPtr,
blk_idx: usize,
const Self = @This();
pub fn initAndSeekToFirst(allocator: std.mem.Allocator, table: SsTablePtr) !Self {
const s = try seekToFirstInner(allocator, table);
return .{
.allocator = allocator,
.table = table,
.blk_iterator = s.blk_iter,
.blk = s.blk,
.blk_idx = 0,
};
}
pub fn initAndSeekToKey(allocator: std.mem.Allocator, table: SsTablePtr, k: []const u8) !Self {
const b = try seekToKeyInner(allocator, table, k);
return .{
.allocator = allocator,
.table = table,
.blk_iterator = b.blk_iter,
.blk_idx = b.blk_idx,
.blk = b.blk,
};
}
fn seekToFirstInner(allocator: std.mem.Allocator, table: SsTablePtr) !struct {
blk: BlockPtr,
blk_iter: BlockIteratorPtr,
} {
var blk = try table.get().readBlockCached(0, allocator); // 讀取第一個(gè)block
errdefer blk.release();
var blk_iter = try BlockIterator.createAndSeekToFirst(allocator, blk.clone());
errdefer blk_iter.deinit();
return .{
.blk = blk,
.blk_iter = try BlockIteratorPtr.create(allocator, blk_iter), // 從SSTable的讀接口轉(zhuǎn)換為Block的讀接口
};
}
fn seekToKeyInner(allocator: std.mem.Allocator, table: SsTablePtr, k: []const u8) !struct {
blk_idx: usize,
blk: BlockPtr,
blk_iter: BlockIteratorPtr,
} {
const table_ptr = table.get();
var blk_idx = try table_ptr.findBlockIndex(k);
var blk = try table_ptr.readBlockCached(blk_idx, allocator);
errdefer blk.deinit();
var blk_iter = try BlockIterator.createAndSeekToKey(allocator, blk.clone(), k);
errdefer blk_iter.deinit();
var blk_iter_ptr = try BlockIteratorPtr.create(allocator, blk_iter);
errdefer blk_iter_ptr.release();
// 如果當(dāng)前block讀完了,跳到下一個(gè)block,并生成block的讀接口
if (blk_iter.isEmpty()) {
blk_idx += 1;
if (blk_idx < table_ptr.numBlocks()) {
{
blk.deinit();
blk_iter.deinit();
}
var blk2 = try table_ptr.readBlockCached(blk_idx, allocator);
errdefer blk2.deinit();
var blk_iter2 = try BlockIterator.createAndSeekToFirst(allocator, blk2.clone());
errdefer blk_iter2.deinit();
return .{
.blk_idx = blk_idx,
.blk_iter = try BlockIteratorPtr.create(allocator, blk_iter2),
.blk = blk2,
};
}
}
return .{
.blk_idx = blk_idx,
.blk_iter = blk_iter_ptr,
.blk = blk,
};
}
pub fn key(self: Self) []const u8 {
return self.blk_iterator.get().key();
}
pub fn value(self: Self) []const u8 {
return self.blk_iterator.get().value();
}
pub fn isEmpty(self: Self) bool {
return self.blk_iterator.get().isEmpty();
}
pub fn next(self: *Self) !void {
try self.blk_iterator.get().next();
// 若當(dāng)前的Block讀完了,就跳到下一個(gè)block,并生成Block讀接口。
if (self.blk_iterator.get().isEmpty()) {
self.blk_idx += 1;
if (self.blk_idx < self.table.get().numBlocks()) {
self.reset();
const blk = try self.table.get().readBlockCached(self.blk_idx, self.allocator);
const blk_iter = try BlockIterator.createAndSeekToFirst(self.allocator, blk.clone());
self.blk = blk;
self.blk_iterator = try BlockIteratorPtr.create(self.allocator, blk_iter);
}
}
}
};
有了幾個(gè)基本元素的讀接口之后,我們便遇到第一個(gè)問題:我們?nèi)绾螌?duì)多個(gè)MemTable做讀檢索?
圖片
這個(gè)時(shí)候,我們需要一個(gè)新的數(shù)據(jù)結(jié)構(gòu)來實(shí)現(xiàn)多個(gè)讀實(shí)例的合并檢索---- MergeIterator
MergeIterator在內(nèi)部維護(hù)一個(gè)二叉堆。堆中數(shù)據(jù)的優(yōu)先級(jí)如下:
當(dāng)各個(gè)迭代器key不同時(shí),具有最小key的迭代器最優(yōu)。當(dāng)多個(gè)迭代器有相同的當(dāng)前key時(shí),最新的迭代器一個(gè)最優(yōu)。
假設(shè)我們有如下MemTable(iter1最新,iter3最舊):
- iter1: b->del, c->4, d->5
- iter2: a->1, b->2, c->3
- iter3: e->4
經(jīng)過合并后迭代器結(jié)果應(yīng)該為:
- a最小,iter2優(yōu)先迭代
- iter2迭代一次后,iter1與iter2 key相同,iter1優(yōu)先迭代,b->2跳過
- c最小,iter1優(yōu)先迭代,iter2中c->3跳過
- d最小,iter1優(yōu)先迭代
- 只剩iter3,迭代iter3
最終結(jié)果:a->1, b->del, c->4, d->5, e->4
實(shí)現(xiàn)代碼如下:
// 標(biāo)準(zhǔn)庫(kù)中有二叉堆實(shí)現(xiàn)
const IteratorHeap = std.PriorityQueue(*HeapWrapper, Comparer.Context, Comparer.cmp);
allocator: std.mem.Allocator,
q: IteratorHeap,
current: ?*HeapWrapper,
pub fn init(allocator: std.mem.Allocator, iters: std.ArrayList(StorageIteratorPtr)) !Self {
var q = IteratorHeap.init(allocator, .{});
if (iters.items.len == 0) {
return Self{
.allocator = allocator,
.q = q,
.current = null,
};
}
// PS: the last iter has the highest priority
// 按順序?qū)懭攵娑? for (iters.items, 0..) |sp, i| {
if (!sp.load().isEmpty()) {
const hw = try allocator.create(HeapWrapper);
errdefer allocator.destroy(hw);
hw.* = HeapWrapper.init(i, sp.clone());
try q.add(hw);
}
}
const cc = q.removeOrNull();
return Self{
.allocator = allocator,
.q = q,
.current = cc,
};
}
pub fn key(self: Self) []const u8 {
return self.current.?.key();
}
pub fn value(self: Self) []const u8 {
return self.current.?.value();
}
pub fn isEmpty(self: Self) bool {
if (self.current) |cc| {
return cc.isEmpty();
}
return true;
}
pub fn next(self: *Self) !void {
const cc = self.current.?;
while (true) {
if (self.q.peek()) |ii| {
std.debug.assert(!ii.isEmpty());
// 如果優(yōu)先堆頭部迭代器A和當(dāng)前正在生效的迭代器B的key相同,讓迭代器A跳過重復(fù)key
if (std.mem.eql(u8, cc.key(), ii.key())) {
try ii.next();
if (ii.isEmpty()) {
_ = self.q.remove();
ii.deinit();
self.allocator.destroy(ii);
}
} else {
break;
}
}
break;
}
try cc.next(); // 迭代當(dāng)前迭代器
// 如果當(dāng)前優(yōu)先迭代器迭代完了,就從堆中彈出最優(yōu)迭代器
if (cc.isEmpty()) {
defer {
cc.deinit();
self.allocator.destroy(cc);
}
if (self.q.removeOrNull()) |h| {
self.current = h;
} else {
self.current = null;
}
return;
}
// 將當(dāng)前迭代器寫回二叉堆,重新計(jì)算最優(yōu)迭代器
try self.q.add(cc);
self.current = self.q.removeOrNull();
}
有了MergeIterator這個(gè)工具,我們具備了在多個(gè)MemTable和多個(gè)SSTable中迭代檢索的能力,但是還有個(gè)問題,我們當(dāng)前有兩個(gè)MergeIterator,應(yīng)該如何在兩個(gè)迭代器中執(zhí)行迭代任務(wù)?
圖片
此時(shí),我們?cè)僖胍粋€(gè)新的數(shù)據(jù)結(jié)構(gòu):TwoMergeIterator,這個(gè)是MergeIterator在元素只有兩個(gè)的情況下的簡(jiǎn)化版。
TwoMergeIterator由兩個(gè)迭代器構(gòu)成,一個(gè)高優(yōu)一個(gè)低優(yōu),每次迭代優(yōu)先迭代高優(yōu),當(dāng)key相同時(shí),優(yōu)先迭代高優(yōu)。實(shí)現(xiàn)如下:
pub const TwoMergeIterator = struct {
a: StorageIteratorPtr,
b: StorageIteratorPtr,
choose_a: bool,
// 選擇兩個(gè)迭代器中key更小的迭代器
fn chooseA(a: *StorageIterator, b: *StorageIterator) bool {
if (a.isEmpty()) {
return false;
}
if (b.isEmpty()) {
return true;
}
return std.mem.lessThan(u8, a.key(), b.key());
}
// key相同時(shí),跳過低優(yōu)中的key
fn skipB(self: *TwoMergeIterator) !void {
const ap = self.a.load();
const bp = self.b.load();
if (!ap.isEmpty() and !bp.isEmpty() and std.mem.eql(u8, ap.key(), bp.key())) try bp.next();
}
pub fn init(a: StorageIteratorPtr, b: StorageIteratorPtr) !TwoMergeIterator {
var iter = TwoMergeIterator{
.a = a,
.b = b,
.choose_a = false,
};
try iter.skipB();
iter.choose_a = chooseA(iter.a.load(), iter.b.load());
return iter;
}
pub fn deinit(self: *TwoMergeIterator) void {
self.a.release();
self.b.release();
}
pub fn key(self: TwoMergeIterator) []const u8 {
if (self.choose_a) {
std.debug.assert(!self.a.load().isEmpty());
return self.a.load().key();
}
std.debug.assert(!self.b.load().isEmpty());
return self.b.load().key();
}
pub fn value(self: TwoMergeIterator) []const u8 {
if (self.choose_a) {
std.debug.assert(!self.a.load().isEmpty());
return self.a.load().value();
}
std.debug.assert(!self.b.load().isEmpty());
return self.b.load().value();
}
pub fn isEmpty(self: TwoMergeIterator) bool {
if (self.choose_a) {
return self.a.load().isEmpty();
}
return self.b.load().isEmpty();
}
pub fn next(self: *TwoMergeIterator) !void {
if (self.choose_a) {
try self.a.load().next();
} else {
try self.b.load().next();
}
try self.skipB();
self.choose_a = chooseA(self.a.load(), self.b.load());
}
};
至此,我們讀行為所需要的武器就完備了!
Read/Scan
讓我們?cè)賮砜纯碙SM的架構(gòu)圖:
我們將每個(gè)數(shù)據(jù)層中的數(shù)據(jù)標(biāo)上優(yōu)先級(jí),由于LSM-Tree是append-only的,所以優(yōu)先級(jí)越高的數(shù)據(jù)層中數(shù)據(jù)越新。
所以我們的讀策略也很明顯:按照上圖中P0至P2依次檢索,這部分代碼實(shí)現(xiàn)見src/storage.zig。
- 讀MemTable
// search in memtable
if (try self.state.getMemTable().get(key, value)) {
if (value.*.len == 0) {
// tomestone
return false;
}
return true;
}
- 讀Immutable MemTable
// search in imm_memtables
self.state_lock.lockShared();
defer self.state_lock.unlockShared();
for (self.state.imm_mem_tables.items) |imm_table| {
if (try imm_table.load().get(key, value)) {
if (value.*.len == 0) {
// tomestone
return false;
}
return true;
}
}
- 讀LV0~LVmax SSTables
// 收集L0中的迭代器
var l0_iters = std.ArrayList(StorageIteratorPtr).init(self.allocator);
defer {
for (l0_iters.items) |iter| {
var ii = iter;
ii.release();
}
l0_iters.deinit();
}
{
self.state_lock.lockShared();
defer self.state_lock.unlockShared();
for (self.state.l0_sstables.items) |sst_id| {
const sst = self.state.sstables.get(sst_id).?;
if (try sst.load().mayContain(key)) {
var ss_iter = try SsTableIterator.initAndSeekToKey(self.allocator, sst.clone(), key);
errdefer ss_iter.deinit();
try l0_iters.append(try StorageIteratorPtr.create(self.allocator, .{ .ss_table_iter = ss_iter }));
}
}
}
// 收集Levels中的迭代器
var level_iters: std.ArrayList(StorageIteratorPtr) = undefined;
{
self.state_lock.lockShared();
defer self.state_lock.unlockShared();
level_iters = try std.ArrayList(StorageIteratorPtr).initCapacity(
self.allocator,
self.state.levels.items.len,
);
for (self.state.levels.items) |level| {
var level_ssts = try std.ArrayList(SsTablePtr).initCapacity(self.allocator, level.items.len);
errdefer level_ssts.deinit();
for (level.items) |sst_id| {
const sst = self.state.sstables.get(sst_id).?;
if (try mayWithinTable(key, sst)) {
try level_ssts.append(sst.clone());
}
}
if (level_ssts.items.len > 0) {
var level_iter = try SstConcatIterator.initAndSeekToKey(
self.allocator,
level_ssts,
key,
);
errdefer level_iter.deinit();
try level_iters.append(try StorageIteratorPtr.create(self.allocator, .{ .sst_concat_iter = level_iter }));
}
}
}
// 將多個(gè)迭代器合并為一個(gè)TwoMergeIterator
var l0_merge_iter = try MergeIterators.init(self.allocator, l0_iters);
errdefer l0_merge_iter.deinit();
var levels_merge_iter = try MergeIterators.init(self.allocator, level_iters);
errdefer levels_merge_iter.deinit();
var iter = try TwoMergeIterator.init(
try StorageIteratorPtr.create(self.allocator, .{ .merge_iterators = l0_merge_iter }),
try StorageIteratorPtr.create(self.allocator, .{ .merge_iterators = levels_merge_iter }),
);
defer iter.deinit();
if (iter.isEmpty()) {
return false;
}
if (std.mem.eql(u8, iter.key(), key) and iter.value().len > 0) {
value.* = iter.value();
return true;
}
壓縮
在上一節(jié)的寫過程中,我們實(shí)現(xiàn)了從內(nèi)存表到Level0的SSTable堆疊。
隨著寫入的持續(xù),Lv0的SSTable會(huì)越來越多,這個(gè)時(shí)候就需要我們將Lv0中的數(shù)據(jù)合并寫入至Lv2,并依次類推重復(fù)這個(gè)過程,直到堆疊到最深的層數(shù),這個(gè)逐層合并數(shù)據(jù)的過程就是數(shù)據(jù)壓縮。
圖片
LSM-Tree中數(shù)據(jù)壓縮的過程大致如下:
圖片
具體的實(shí)現(xiàn)代碼可見src/compact.zig,src/storage.zig。
簡(jiǎn)單分層壓縮與原始 LSM 論文中的壓縮策略相似。它為 LSM 樹維護(hù)多個(gè)層級(jí)。當(dāng)一個(gè)層級(jí)太大時(shí),它會(huì)將此層級(jí)的所有 SST 與下一層合并。壓縮策略由 3 個(gè)參數(shù)控制:
- size_ratio_percent:【文件低級(jí)數(shù)量/文件高級(jí)數(shù)量】,當(dāng)實(shí)際計(jì)算的值低于此閾值時(shí)觸發(fā)壓縮。假設(shè)這里我們?cè)O(shè)置為60%,當(dāng)L0中SST數(shù)量為2,L1中SST數(shù)量為1,此時(shí)ratio為1/2 = 50% < 60%,此時(shí)我們應(yīng)該將L0壓縮合并至L1。
- level0_file_num_compaction_trigger: 第一層SSTable達(dá)到多少后觸發(fā)壓縮。因?yàn)檫@是最高層,沒法與更高層比較,只能固定觸發(fā)壓縮。
- max_levels: 顧名思義,最大的層數(shù)限制。
做好這些準(zhǔn)備工作,我們可以逐步實(shí)現(xiàn)壓縮邏輯:
- 生成壓縮任務(wù):
pub const SimpleLeveledCompactionController = struct {
options: SimpleLeveledCompactionOptions,
pub fn generateCompactionTask(self: SimpleLeveledCompactionController, state: *storage.StorageState) !?SimpleLeveledCompactionTask {
if (self.options.max_levels == 1) {
return null;
}
var level_sizes = std.ArrayList(usize).init(state.allocator);
defer level_sizes.deinit();
try level_sizes.append(state.l0_sstables.items.len);
for (state.levels.items) |level| {
try level_sizes.append(level.items.len);
}
// 如果Lv0中SST數(shù)量超出閾值,觸發(fā)L0級(jí)別壓縮
if (state.l0_sstables.items.len >= self.options.level0_file_num_compaction_trigger) {
std.debug.print("compaction of L0 to L1 because L0 has k6zqhab033oa SSTS >= k6zqhab033oa\n", .{ state.l0_sstables.items.len, self.options.level0_file_num_compaction_trigger });
return .{
.upper_level = null,
.upper_level_sst_ids = try state.l0_sstables.clone(),
.lower_level = 1,
.lower_level_sst_ids = try state.levels.items[0].clone(),
.is_lower_level_bottom = false,
};
}
// 計(jì)算Lv[n+1]/lv[n],如果比例小于閾值,觸發(fā)Lv[n]級(jí)別壓縮
for (1..self.options.max_levels) |level| {
const lower_level = level + 1;
if (level_sizes.items[level] == 0) {
continue;
}
const size_ration = level_sizes.items[lower_level] * 100 / level_sizes.items[level];
if (size_ration < self.options.size_ration_percent) {
std.debug.print("compaction of Lk6zqhab033oa to Lk6zqhab033oa because Lk6zqhab033oa size ratio k6zqhab033oa < k6zqhab033oa\n", .{ level, lower_level, level, size_ration, self.options.size_ration_percent });
return .{
.upper_level = level,
.upper_level_sst_ids = try state.levels.items[level - 1].clone(),
.lower_level = lower_level,
.lower_level_sst_ids = try state.levels.items[lower_level - 1].clone(),
.is_lower_level_bottom = lower_level == self.options.max_levels,
};
}
}
return null;
}
}
- 執(zhí)行壓縮任務(wù):
有了上一小節(jié)中讀過程的介紹,多層數(shù)據(jù)的壓縮過程就很好理解了。
例如我們想將L1與L2的SSTable合并壓縮至L2,我們只需要把L1和L2的數(shù)據(jù)放在一起創(chuàng)造一個(gè)迭代器,再持續(xù)從該迭代器中讀出數(shù)據(jù)寫入新的SSTable中,這個(gè)過程保證了新的SSTable中數(shù)據(jù)不重復(fù)且有序。
fn compactSimple(self: *Self, task: SimpleLeveledCompactionTask) !std.ArrayList(SsTablePtr) {
if (task.upper_level) |_| {
var upper_ssts = try std.ArrayList(SsTablePtr).initCapacity(
self.allocator,
task.upper_level_sst_ids.items.len,
);
var lower_ssts = try std.ArrayList(SsTablePtr).initCapacity(
self.allocator,
task.lower_level_sst_ids.items.len,
);
self.state_lock.lockShared();
for (task.upper_level_sst_ids.items) |sst_id| {
const sst = self.state.sstables.get(sst_id).?;
try upper_ssts.append(sst.clone());
}
for (task.lower_level_sst_ids.items) |sst_id| {
const sst = self.state.sstables.get(sst_id).?;
try lower_ssts.append(sst.clone());
}
self.state_lock.unlockShared();
var upper_iter = try SstConcatIterator.initAndSeekToFirst(self.allocator, upper_ssts);
errdefer upper_iter.deinit();
var lower_iter = try SstConcatIterator.initAndSeekToFirst(self.allocator, lower_ssts);
errdefer lower_iter.deinit();
var iter = try TwoMergeIterator.init(
try StorageIteratorPtr.create(self.allocator, .{ .sst_concat_iter = upper_iter }),
try StorageIteratorPtr.create(self.allocator, .{ .sst_concat_iter = lower_iter }),
);
defer iter.deinit();
return self.compactGenerateSstFromIter(&iter, task.is_lower_level_bottom);
} else {
// compact l0_sstables to l1_sstables
// ..... 代碼邏輯大致與上面LvN層壓縮一致,只是Lv0層的SSTable是無序的需要特殊考慮
return self.compactGenerateSstFromIter(&iter, task.is_lower_level_bottom);
}
}
fn compactGenerateSstFromIter(self: *Self, iter: *TwoMergeIterator, compact_to_bottom_level: bool) !std.ArrayList(SsTablePtr) {
var builder: SsTableBuilder = try SsTableBuilder.init(self.allocator, self.options.block_size);
defer builder.deinit();
var new_ssts = std.ArrayList(SsTablePtr).init(self.allocator);
// 持續(xù)迭代此迭代器
while (!iter.isEmpty()) {
// 如果壓縮至最后一層,可以不保留墓碑值key了
if (compact_to_bottom_level) {
if (iter.value().len > 0) {
try builder.add(iter.key(), iter.value());
}
} else {
try builder.add(iter.key(), iter.value());
}
// 當(dāng)寫滿一個(gè)SSTable后,就清空builder,把寫滿的SSTable入列
if (builder.estimatedSize() >= self.options.target_sst_size) {
// reset builder
defer builder.reset() catch unreachable;
const sst_id = self.getNextSstId();
const path = try self.pathOfSst(sst_id);
defer self.allocator.free(path);
var sst = try builder.build(sst_id, self.block_cache.clone(), path);
errdefer sst.deinit();
var sst_ptr = try SsTablePtr.create(self.allocator, sst);
errdefer sst_ptr.deinit();
try new_ssts.append(sst_ptr);
}
try iter.next();
}
// 剩余的數(shù)據(jù)單獨(dú)一個(gè)SSTable
if (builder.estimatedSize() > 0) {
const sst_id = self.getNextSstId();
const path = try self.pathOfSst(sst_id);
defer self.allocator.free(path);
var sst = try builder.build(sst_id, self.block_cache.clone(), path);
errdefer sst.deinit();
var sst_ptr = try SsTablePtr.create(self.allocator, sst);
errdefer sst_ptr.deinit();
try new_ssts.append(sst_ptr);
}
return new_ssts;
}
- 替換壓縮后的SST
這部分邏輯并不復(fù)雜,即刪除此次壓縮任務(wù)中的原有兩層數(shù)據(jù),用新合并的SSTable替換至較低層數(shù)據(jù)。
這里有個(gè)需要注意的點(diǎn),即壓縮過程是在一個(gè)線程中單獨(dú)執(zhí)行的,壓縮過程中LSM-Tree的原數(shù)據(jù)可能發(fā)生了改變,所以這里執(zhí)行SSTable刪除時(shí)要注意過濾掉新數(shù)據(jù),不能覆蓋了有效數(shù)據(jù)。
并發(fā)問題是軟件中的Bug集散地!
pub fn applyCompactionResult(
_: SimpleLeveledCompactionController,
state: *storage.StorageState,
task: SimpleLeveledCompactionTask,
output: []usize,
) !std.ArrayList(usize) {
var files_to_remove = std.ArrayList(usize).init(state.allocator);
errdefer files_to_remove.deinit();
if (task.upper_level) |upper_level| {
// 刪除高層SSTable數(shù)據(jù),這層數(shù)據(jù)不會(huì)在壓縮過程中變更,放心刪
std.debug.assert(sliceEquals(
task.upper_level_sst_ids.items,
state.levels.items[upper_level - 1].items,
));
try files_to_remove.appendSlice(task.upper_level_sst_ids.items);
state.levels.items[upper_level - 1].clearAndFree();
} else {
// 刪除L0數(shù)據(jù),需要小心
try files_to_remove.appendSlice(task.upper_level_sst_ids.items);
var new_l0_sstables = std.ArrayList(usize).init(state.allocator);
errdefer new_l0_sstables.deinit();
{
var l0_sst_compacted = std.AutoHashMap(usize, struct {}).init(state.allocator);
defer l0_sst_compacted.deinit();
for (task.upper_level_sst_ids.items) |sst_id| {
try l0_sst_compacted.put(sst_id, .{});
}
for (state.l0_sstables.items) |sst_id| {
if (!l0_sst_compacted.remove(sst_id)) { // 不在壓縮任務(wù)中的SST不能刪除
try new_l0_sstables.append(sst_id);
}
}
std.debug.assert(l0_sst_compacted.count() == 0);
}
state.l0_sstables.deinit();
state.l0_sstables = new_l0_sstables;
}
// 低層SSTable數(shù)據(jù),直接刪除
try files_to_remove.appendSlice(task.lower_level_sst_ids.items);
state.levels.items[task.lower_level - 1].clearAndFree();
try state.levels.items[task.lower_level - 1].appendSlice(output);
return files_to_remove;
}
// sst to remove
var ssts_to_remove = std.ArrayList(SsTablePtr).init(self.allocator);
{
var new_sst_ids = std.ArrayList(usize).init(self.allocator);
defer new_sst_ids.deinit();
self.state_lock.lock();
defer self.state_lock.unlock();
for (sstables.items) |sst| {
const id: usize = @intCast(sst.get().sstId());
try new_sst_ids.append(id);
try self.state.sstables.put(id, sst.clone());
}
var file_to_remove = try self.compaction_controller.applyCompactionResult(
&self.state,
task,
output.items,
);
defer file_to_remove.deinit();
for (file_to_remove.items) |id| {
if (self.state.sstables.fetchRemove(id)) |kv| {
try ssts_to_remove.append(kv.value);
}
}
try self.syncDir();
}
for (ssts_to_remove.items) |sst| {
const path = try self.pathOfSst(sst.get().sstId());
defer self.allocator.free(path);
try std.fs.cwd().deleteFile(path);
}
try self.syncDir();
四、總結(jié)
我們使用Zig語言實(shí)現(xiàn)了一個(gè)LSM-Tree的核心功能,包括MemTable、SSTable、寫流程、各類Iterator與數(shù)據(jù)壓縮能力。通過這個(gè)項(xiàng)目,我收獲了很多心得體會(huì)。
了解了LSM-Tree的核心流程
以往對(duì)LSM這個(gè)數(shù)據(jù)結(jié)構(gòu)的多層SST設(shè)計(jì)與寫過程早有耳聞,但是讀流程的實(shí)現(xiàn)不太理解。這個(gè)項(xiàng)目解答了我疑惑很久的讀流程的實(shí)現(xiàn),特別是MergeIterator的算法設(shè)計(jì)非常巧妙。
摸索了個(gè)zig語言的智能指針
Zig語言沒有內(nèi)存安全的保證,為了不想指針亂飛到處泄露,在Deepseek的幫助下實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的智能指針,極大降低了內(nèi)存管理的心智負(fù)擔(dān)。
工程經(jīng)驗(yàn)
- 盡可能多的做assertion的工作,可以提前暴露很多bug。
- 大型多模塊的項(xiàng)目,一定要寫單元測(cè)試,不然出了bug無法分塊定位問題。
- 千萬不要把IO過程放在鎖的范圍里,極大的影響性能!