探索 MySQL Binlog 的奧秘與應(yīng)用
在 MySQL 的廣袤世界中,有一個至關(guān)重要的存在,它宛如數(shù)據(jù)庫運(yùn)行軌跡的忠實記錄者,默默見證著每一次數(shù)據(jù)的變更與操作,它就是 binlog。Binlog 如同一個神秘而強(qiáng)大的寶庫,承載著數(shù)據(jù)庫操作的關(guān)鍵信息,為數(shù)據(jù)的恢復(fù)、復(fù)制以及系統(tǒng)的穩(wěn)定性提供著堅實的支撐。當(dāng)我們深入探索 MySQL 的奧秘時,binlog 無疑是其中閃耀著獨特光芒的關(guān)鍵一環(huán)。
1. bin log是什么?作用是什么呢?
bin log實際上是一個物理日志,當(dāng)我們對某個數(shù)據(jù)頁進(jìn)行修改操作時我們就會將這個操作寫到bin log中,當(dāng)我們數(shù)據(jù)庫需要進(jìn)行主備、主從復(fù)制等操作時,都可以基于bin log保證數(shù)據(jù)一致性。
2. bin log緩沖區(qū)
bin log緩沖區(qū)和我們的redo log和undo log緩沖區(qū)有所不同,redo log和undo log緩存都在存儲引擎的共享緩沖區(qū)緩沖區(qū)buffer pool中,而bin log則是為每個工作線程獨立分配一個內(nèi)存作為bin log緩沖區(qū):
bin log之所以是在每個線程中,是為保證不同存儲引擎的兼容性,bin log是innodb獨有的,如果將bin log放到共享緩沖區(qū)時很可能導(dǎo)致兼容性問題,將bin log緩沖區(qū)設(shè)置為每個線程獨享也保證了事務(wù)并發(fā)的安全性。
3. bin.log對應(yīng)的三種記錄格式
(1) row:這種格式主要用于保證數(shù)據(jù)實時性的,例如我們執(zhí)行下面這段SQL
update table set time=now() where id=1;
如果我們將其存到bin log之后很長一段時間才提交事務(wù),那么時間就會有所延遲,所以MySQL為了保證數(shù)據(jù)實時性,就會將寫入bin log中的SQL用row格式,如下圖所示,可以看到row格式的SQL語句時間是當(dāng)前時間的具體值,并且where條件寫死了當(dāng)前條件列,確保數(shù)據(jù)實時一致性:
當(dāng)然這樣做的缺點也很明顯,如果涉及大批量操作,那么針對每條數(shù)據(jù)對應(yīng)的都會生成對應(yīng)的row語句,那么對于內(nèi)存的占用就很高,進(jìn)行恢復(fù)和同步時的IO和SQL執(zhí)行時間也是非常不友好的。
(2) stament:這種同步策略即執(zhí)行的SQL是什么,對應(yīng)傳輸過去的時對應(yīng)的語句就是什么樣的,這就會導(dǎo)致我們上文所說的一致性問題:
(3) mixed:這種格式就是為了上述兩種方案的混合體,如果操作可能出現(xiàn)數(shù)據(jù)不一致問題則用row格式,反之使用stament格式。
4. bin log文件日志格式
我們可以通過下面這條SQL語句看到我們本地的bin log文件:
show binary logs;
輸出結(jié)果如下所示,可以看到bin log的格式基本都是mysql-bin.0000xxx:
mysql-bin.001606 440052 No
mysql-bin.001607 111520 No
5. bin log是如何完成寫入
當(dāng)我們開始事務(wù)時,將修改寫入bin log cache中,一旦事務(wù)提交,就會將bin log通過write寫入到文件系統(tǒng)緩存的page cache中,然后根據(jù)我們配置的刷盤參數(shù)將cache內(nèi)容調(diào)用操作系統(tǒng)內(nèi)核方法fsync將結(jié)果寫入到bin log 物理文件中:
而調(diào)用系統(tǒng)函數(shù)fsync的實際是根據(jù)MySQL系統(tǒng)參數(shù)決定的,這個系統(tǒng)變量查詢SQL如下:
SHOW VARIABLES LIKE 'sync_binlog';
而sync_binlog值分別三種:
- 當(dāng)配置為了0時,每次事務(wù)提交都只會write,fsync調(diào)用時機(jī)是由系統(tǒng)決定的。
- 當(dāng)配置設(shè)置為1時,每次事務(wù)提交都會調(diào)用fsync。
- 當(dāng)配置為N,代表提交了N個事務(wù)之后就會將page cache中的數(shù)據(jù)通過fsync進(jìn)行刷盤。
6. bin log和redo log的區(qū)別
這個問題我們可以從以下幾個場景來表述一下:
- 從使用場景來說:bin log常用于數(shù)據(jù)災(zāi)備或數(shù)據(jù)同步到其他異構(gòu)程序中的場景。redo log常用于故障恢復(fù)保證數(shù)據(jù)持久性。
- 從數(shù)據(jù)內(nèi)容來說:redo log存儲的物理日志,即修改的數(shù)據(jù)內(nèi)容,對應(yīng)的redo block結(jié)構(gòu)體針對各種偏移量和修改涉及的頁都有及其復(fù)雜的涉及,這里就不多做贅述。 而bin log則是記錄可以是statment語句也可以是原生修改的row,具體可以通過查看binlog_format知曉。
- 生成范圍:bin log是MySQL server生成的事務(wù)日志,任何存儲引擎都可以使用redo log只有innodb這個存儲引擎支持。
7. (實踐)基于flink cdc同步數(shù)據(jù)
接下來我們就基于spring boot演示一下如何基于flink cdc訂閱bin.log完成db庫中的tb_1和tb_2的數(shù)據(jù)訂閱和同步:
之所以筆者使用flink cdc而不是canel大體有以下幾個原因:
- flink cdc支持全量和增量同步以及斷點續(xù)傳等功能,尤其是斷點續(xù)傳這一點對于需要保證異構(gòu)數(shù)據(jù)庫的數(shù)據(jù)一致性是非常好的。
- 性能表現(xiàn)更出色,按照阿里云的說法:
我們將全增量一體化框架與 Debezium 1.6 版本做 簡單的 TPC-DS 讀取測試對比,customer 單表數(shù)據(jù)量 6500 萬,在 Flink CDC 用 8 個并發(fā)的情況下,吞吐提升了 6.8 倍,耗時僅 13 分鐘,得益于并發(fā)讀取的支持,如果用戶需要更快的讀取速度,用戶可以增加并發(fā)實現(xiàn)。
話不說我們給出基礎(chǔ)的集成步驟,首先是引入flink cdc和MySQL的依賴,這里筆者為了文章的簡練只給出的flink cdc相關(guān)的pom依賴:
<properties>
<flink.version>1.13.6</flink.version>
</properties>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!--mysql -cdc-->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.0</version>
</dependency>
然后我們在yml或者properties文件中給出MySQL配置即可,然后我們聲明一個CdcInfo記錄從bin.log中同步的數(shù)據(jù):
@Data
publicclass CdcInfo {
/**
* 變更前數(shù)據(jù)
*/
private JSONObject beforeData;
/**
* 變更后數(shù)據(jù)
*/
private JSONObject afterData;
private String operation;
/**
* binlog 文件名
*/
private String binLogName;
/**
* binlog當(dāng)前讀取點位
*/
private Integer filePos;
/**
* 數(shù)據(jù)庫名
*/
private String dbName;
/**
* 表名
*/
private String tbName;
/**
* 變更時間
*/
private Long changeTime;
}
然后我們編寫一個關(guān)于bin.log通知事件的監(jiān)聽,針對flink cdc配置筆者都基于CommandLineRunner 這個拓展點完成配置,這里面涉及眾多的flink cdc配置參數(shù),可以看到筆者的程序同步模式配置的是initial即啟動后會進(jìn)行全量同步再進(jìn)行增量同步,同時通過表達(dá)式db.tb_[1-2]+指明僅僅處理tb_1和tb_2表的數(shù)據(jù)更新變化。
@Component
publicclass MysqlCdcEventListener implements CommandLineRunner {
//數(shù)據(jù)接收器用于應(yīng)用架構(gòu)更改和將更改數(shù)據(jù)寫入外部系統(tǒng)
privatefinal CdcSink cdcSink;
public MysqlCdcEventListener(CdcSink cdcSink) {
this.cdcSink = cdcSink;
}
@Override
public void run(String... args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設(shè)置并行度
env.setParallelism(Runtime.getRuntime().availableProcessors());
DebeziumSourceFunction<CdcInfo> debeziumSource = buildDebeziumSource();
DataStream<CdcInfo> streamSource = env
.addSource(debeziumSource, "mysql-source")
.setParallelism(1);
//將流數(shù)據(jù)交給
streamSource.addSink(cdcSink);
env.execute("mysql-stream-cdc");
}
/**
* 構(gòu)造變更數(shù)據(jù)源
*/
private DebeziumSourceFunction<CdcInfo> buildDebeziumSource() {
Properties debeziumProperties = new Properties();
//設(shè)置快照為無鎖
debeziumProperties.put("snapshot.locking.mode", "none");
return MySqlSource.<CdcInfo>builder()
.hostname("xxxx")
.port(3306)
.databaseList("db")
//監(jiān)聽db庫中的[1-2]表
.tableList("db.tb_[1-2]+")
.username("xxxx")
.password("xxxx")
//設(shè)置為 initial:在第一次啟動時對受監(jiān)視的數(shù)據(jù)庫表執(zhí)行初始快照,并繼續(xù)讀取最新的 binlog
.startupOptions(StartupOptions.initial())
//設(shè)置序列化配置
.deserializer(new MysqlDeserialization())
.serverTimeZone("GMT+8")
.debeziumProperties(debeziumProperties)
.build();
}
}
上文代碼示例中給出一個涉及反序列化生產(chǎn)CdcInfo的操作,筆者指明了MysqlDeserialization 這里也給出對應(yīng)的源碼示例:
public class MysqlDeserialization implements DebeziumDeserializationSchema<CdcInfo> {
publicstaticfinal String TS_MS = "ts_ms";
publicstaticfinal String BIN_FILE = "file";
publicstaticfinal String POS = "pos";
publicstaticfinal String CREATE = "CREATE";
publicstaticfinal String BEFORE = "before";
publicstaticfinal String AFTER = "after";
publicstaticfinal String SOURCE = "source";
publicstaticfinal String UPDATE = "UPDATE";
@Override
public void deserialize(SourceRecord sourceRecord, Collector<CdcInfo> collector) {
//獲取bin.log訂閱到的信息
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Struct struct = (Struct) sourceRecord.value();
final Struct source = struct.getStruct(SOURCE);
CdcInfo tbCdcInfo = new CdcInfo();
//獲取前后變化數(shù)據(jù)
tbCdcInfo.setBeforeData(convert2JsonObj(struct, BEFORE));
tbCdcInfo.setAfterData(convert2JsonObj(struct, AFTER));
//5.獲取操作類型 CREATE UPDATE DELETE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toUpperCase();
tbCdcInfo.setOperation(type);
tbCdcInfo.setBinLogName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));
tbCdcInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x -> Integer.parseInt(x.toString())).orElse(0));
tbCdcInfo.setDbName(database);
tbCdcInfo.setTbName(tableName);
tbCdcInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));
//7.輸出數(shù)據(jù)
collector.collect(tbCdcInfo);
}
/**
* 從原始數(shù)據(jù)獲取出變更之前或之后的數(shù)據(jù)
*/
private JSONObject convert2JsonObj(Struct value, String fieldElement) {
Struct element = value.getStruct(fieldElement);
JSONObject jsonObject = new JSONObject();
if (element != null) {
Schema afterSchema = element.schema();
List<Field> fieldList = afterSchema.fields();
for (Field field : fieldList) {
Object afterValue = element.get(field);
jsonObject.put(field.name(), afterValue);
}
}
return jsonObject;
}
@Override
public TypeInformation<CdcInfo> getProducedType() {
return TypeInformation.of(CdcInfo.class);
}
}
此時我們啟動程序后針對數(shù)據(jù)表進(jìn)行修改操作就會收到數(shù)據(jù)消息的訂閱了:
訂閱到的數(shù)據(jù):CdcInfo(beforeData={"id":1,"name":"xiaoming"}, afterData={"id":1,"name":"xiaoming1"}, operation=UPDATE, binLogName=binlog.000156, filePos=1256, dbName=db, tbName=tb_2, changeTime=1734622269654)
小結(jié)
MySQL Binlog(二進(jìn)制日志)作為 MySQL 數(shù)據(jù)庫中至關(guān)重要的組成部分,蘊(yùn)含著眾多奧秘且具備豐富多樣的應(yīng)用場景。
- 從原理層面來看,Binlog 以二進(jìn)制的形式記錄了數(shù)據(jù)庫中數(shù)據(jù)變更的相關(guān)事件,如 INSERT、UPDATE、DELETE 等操作。它采用順序追加的方式寫入,這種特性不僅保證了日志記錄的完整性和連續(xù)性,還為后續(xù)的恢復(fù)和復(fù)制提供了堅實基礎(chǔ)。不同的日志格式(STATEMENT、ROW、MIXED)各有優(yōu)劣,開發(fā)人員和數(shù)據(jù)庫管理員可以根據(jù)實際需求進(jìn)行靈活選擇,以平衡數(shù)據(jù)一致性、性能和存儲空間等多方面因素。
- 在應(yīng)用領(lǐng)域,Binlog 展現(xiàn)出了巨大的價值。在數(shù)據(jù)恢復(fù)場景中,基于全量備份結(jié)合 Binlog 可以實現(xiàn)精準(zhǔn)的時間點恢復(fù)(PITR),確保在面對數(shù)據(jù)丟失或損壞時,能夠?qū)?shù)據(jù)庫還原到指定的歷史時刻,最大程度減少數(shù)據(jù)損失。
- 在主從復(fù)制方面,主庫將 Binlog 發(fā)送給從庫,從庫通過重放這些日志來同步數(shù)據(jù),從而實現(xiàn)數(shù)據(jù)的多副本存儲和讀寫分離,提升系統(tǒng)的可用性和性能。此外,Binlog 還在數(shù)據(jù)遷移、數(shù)據(jù)審計以及實時數(shù)據(jù)處理等領(lǐng)域發(fā)揮著重要作用。例如,通過解析 Binlog 可以獲取數(shù)據(jù)的實時變化,將這些變化推送至其他系統(tǒng)進(jìn)行進(jìn)一步處理,實現(xiàn)系統(tǒng)間的數(shù)據(jù)同步和業(yè)務(wù)邏輯的聯(lián)動。
然而,在使用 Binlog 的過程中,也需要關(guān)注一些問題。例如,Binlog 的記錄會占用一定的磁盤空間,需要合理規(guī)劃存儲空間和清理策略;同時,在進(jìn)行主從復(fù)制時,Binlog 的傳輸和重放可能會受到網(wǎng)絡(luò)延遲、服務(wù)器性能等因素的影響,導(dǎo)致數(shù)據(jù)同步延遲或出現(xiàn)錯誤,這就需要建立有效的監(jiān)控和故障處理機(jī)制。
總之,深入理解 MySQL Binlog 的奧秘,并合理運(yùn)用其各項特性,對于保障數(shù)據(jù)庫的高可用性、數(shù)據(jù)一致性以及實現(xiàn)多樣化的業(yè)務(wù)需求都具有重要意義。無論是數(shù)據(jù)庫管理員進(jìn)行日常運(yùn)維管理,還是開發(fā)人員設(shè)計架構(gòu)和開發(fā)應(yīng)用程序,都應(yīng)該充分認(rèn)識到 Binlog 的價值,并謹(jǐn)慎處理與之相關(guān)的各種問題。