新選擇!基于Spring Boot監(jiān)聽MySQL日志Binlog實現(xiàn)數(shù)據(jù)實時同步
1. 簡介
MySQL與Redis數(shù)據(jù)實時同步是將MySQL數(shù)據(jù)庫中的數(shù)據(jù)變化實時地反映到Redis緩存系統(tǒng)中的過程。MySQL是一款穩(wěn)定的關(guān)系型數(shù)據(jù)庫,適合做持久化存儲;而Redis是一個高性能的內(nèi)存數(shù)據(jù)庫,適合做緩存和實時數(shù)據(jù)處理。將兩者結(jié)合使用,可以充分發(fā)揮各自的優(yōu)勢,提升系統(tǒng)性能和穩(wěn)定性。
實現(xiàn)MySQL與Redis數(shù)據(jù)實時同步有多種方法,如使用MySQL的二進(jìn)制日志(Binlog)配合Canal或Debezium等工具。此外,還可以在應(yīng)用層進(jìn)行雙寫操作或使用消息隊列實現(xiàn)數(shù)據(jù)同步。
MySQL與Redis數(shù)據(jù)實時同步的主要目的是優(yōu)化性能和保持?jǐn)?shù)據(jù)一致性。通過將熱點數(shù)據(jù)存儲在Redis中,可以大大提高系統(tǒng)的訪問速度,同時確保MySQL中的數(shù)據(jù)變化能夠?qū)崟r反映到Redis中,避免數(shù)據(jù)不一致的問題。這種同步機制在電商、社交等需要高并發(fā)訪問和實時數(shù)據(jù)更新的場景中尤為重要。
本篇文章我將介紹另外一款非常不錯開源的組件mysql-binlog-connector-java。通過名稱就能知道他是通過連接MySQL binlog日志來實現(xiàn)數(shù)據(jù)監(jiān)聽的。該組件不僅僅是能夠?qū)崟r監(jiān)聽binlog的變化,而且你還可以直接去讀取binlog日志文件解析其內(nèi)容。
該組件具備以下特性:
- 自動解析二進(jìn)制日志文件名/位置 | GTID 解析
- 斷開連接可恢復(fù)
- 插件化的故障轉(zhuǎn)移策略
- 支持 binlog_checksum=CRC32(適用于 MySQL 5.6.2+ 用戶)
- 通過 TLS 進(jìn)行安全通信
- 友好的Java管理擴展(JMX)
- 實時統(tǒng)計
- 在 Maven Central 上可用
- 無第三方依賴,跨不同版本的 MySQL 發(fā)行版的測試套件
接下來,我將通過如下幾方面介紹該組件在項目中的使用:
- 編程解析binlog日志
- 實時監(jiān)聽binlog日志
- 通過JMX暴露binlog客戶端
2. 實戰(zhàn)案例
環(huán)境準(zhǔn)備
<dependency>
<groupId>com.zendesk</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.30.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
當(dāng)前mysql-binlog-connector-java最新版本為0.30.1。你可以通過下面地址查看倉庫版本情況:
https://mvnrepository.com/artifact/com.zendesk/mysql-binlog-connector-java
注意:你的確定你開啟了binlog日志
SHOW VARIABLES LIKE '%log_bin%'
圖片
通過上面的命令查看狀態(tài)。
2.1 編程讀取binlog日志
public static void main(String[] args) throws Exception {
File binlogFile = new File("C:\\ProgramData\\MySQL\\MySQL Server 5.7\\Data\\mysql-bin.000032") ;
EventDeserializer eventDeserializer = new EventDeserializer() ;
// 設(shè)置兼容性模式
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY);
BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer);
try {
for (Event event; (event = reader.readEvent()) != null;) {
EventData data = event.getData() ;
// 判斷事件的類型
if (data instanceof WriteRowsEventData ed) {
List<Serializable[]> rows = ed.getRows() ;
rows.forEach(row -> {
for (Serializable s : row) {
if (s instanceof byte[] bs) {
System.err.print(new String(bs) + "\t") ;
} else {
System.err.print(s + "\t") ;
}
}
System.out.println() ;
});
} else if (data instanceof QueryEventData ed) {
System.out.printf("查詢事件:%s%n", ed.getSql()) ;
} else if (data instanceof DeleteRowsEventData ed) {
System.err.println("刪除事件") ;
} else if (data instanceof TableMapEventData ed) {
String database = ed.getDatabase() ;
String table = ed.getTable() ;
System.out.printf("數(shù)據(jù)庫: %s, 表名: %s%n", database, table) ;
}
}
} finally {
reader.close();
}
}
該組件定義了如下的事件類型
圖片
上面程序輸出結(jié)果:
查詢事件:BEGIN
數(shù)據(jù)庫: testjpa, 表名: t_person
刪除事件
查詢事件:BEGIN
數(shù)據(jù)庫: testjpa, 表名: t_person
2520 30 姓名 - 30
查詢事件:BEGIN
數(shù)據(jù)庫: testjpa, 表名: t_person
2521 44 姓名 - 44
查詢事件:BEGIN
數(shù)據(jù)庫: testjpa, 表名: t_person
2522 92 姓名 - 92
查詢事件:BEGIN
數(shù)據(jù)庫: testjpa, 表名: t_person
2523 71 姓名 - 71
正確的讀取binlog日志中的信息。
2.2 實時監(jiān)聽Binlog日志
@Component
public class MySQLToRedisComponent implements CommandLineRunner {
public void listener() {
BinaryLogClient client = new BinaryLogClient("118.24.111.33", 3307, "test", "root", "123123");
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
client.setEventDeserializer(eventDeserializer);
client.registerEventListener(new EventListener() {
@Override
public void onEvent(Event event) {
EventHeader header = event.getHeader() ;
switch(header.getEventType()) {
case EXT_WRITE_ROWS:
WriteRowsEventData writeData = event.getData() ;
List<Serializable[]> rows = writeData.getRows() ;
for (Serializable row : rows) {
if (row.getClass().isArray()) {
printRow(row);
}
}
break ;
case EXT_UPDATE_ROWS:
UpdateRowsEventData updateData = event.getData() ;
BitSet columns = updateData.getIncludedColumns() ;
System.err.printf("更新列: %s%n", columns) ;
List<Entry<Serializable[], Serializable[]>> updateRows = updateData.getRows() ;
for (Entry<Serializable[], Serializable[]> entry : updateRows) {
printRow(entry.getKey()) ;
System.out.println(">>>>>>>>>>>>>>>>>>>>>before") ;
printRow(entry.getValue()) ;
System.out.println(">>>>>>>>>>>>>>>>>>>>>after") ;
}
break ;
case EXT_DELETE_ROWS:
DeleteRowsEventData deleteData = event.getData() ;
List<Serializable[]> deleteRow = deleteData.getRows() ;
for (Serializable row : deleteRow) {
if (row.getClass().isArray()) {
printRow(row);
}
}
break ;
case TABLE_MAP:
TableMapEventData data = event.getData() ;
System.out.printf("變更表: %s.%s%n", data.getDatabase(), data.getTable()) ;
break ;
default:
break ;
}
}
private void printRow(Serializable row) {
Serializable[] ss = (Serializable[]) row ;
for (Serializable s : ss) {
if (s.getClass().isArray()) {
System.out.print(new String((byte[])s) + "\t") ;
} else {
System.out.print(s + "\t") ;
}
}
System.out.println() ;
}
});
client.connect();
}
public void run(String... args) throws Exception {
this.listener() ;
}
}
以上監(jiān)聽程序,我們僅對部分事件進(jìn)行了監(jiān)聽處理。當(dāng)數(shù)據(jù)發(fā)生變化后,輸出如下:
變更表: test.t_person
更新列: {0, 1, 2}
1 張三 66
>>>>>>>>>>>>>>>>>>>>>before
1 張三 22
>>>>>>>>>>>>>>>>>>>>>after
更序列使用了BitSet表示,所以如果你要與具體的列想對應(yīng),你還應(yīng)該執(zhí)行如下的語句來確定具體的列名:
mysql> describe t_person;
+-------+--------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------+--------------+------+-----+---------+----------------+
| id | int | NO | PRI | NULL | auto_increment |
| name | varchar(255) | YES | | NULL | |
| age | int | YES | | NULL | |
+-------+--------------+------+-----+---------+----------------+
你可以通過JDBC的方式執(zhí)行該語句獲取對應(yīng)的列,也可以通過Socket方式發(fā)送命令獲取結(jié)果。推薦還是JDBC。
2.3 JMX暴露Binlog客戶端
在Spring Boot中,我們可以非常方便的通過JMX暴露binlog客戶端的相關(guān)操作,如下示例:
@Bean
BinaryLogClient client() {
return new BinaryLogClient("118.24.111.33", 3307, "test", "root", "123123") ;
}
@Bean
MBeanExporter exporterClient(BinaryLogClient client) {
MBeanExporter exporter = new MBeanExporter();
exporter.setBeans(Map.of("mysql.binlog:type=BinaryLogClient", client)) ;
return exporter;
}
@Bean
MBeanExporter exporterClientStatistics(BinaryLogClient client) {
MBeanExporter exporter = new MBeanExporter();
BinaryLogClientStatistics stats = new BinaryLogClientStatistics(client);
exporter.setBeans(Map.of("mysql.binlog:type=BinaryLogClientStatistics", stats)) ;
return exporter;
}
啟動應(yīng)用后,通過JConsole查看。