自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Java代碼中,如何監(jiān)控MySQL的Binlog?

開發(fā) 后端
最近在工作中,遇到了這樣一個(gè)業(yè)務(wù)場(chǎng)景,我們需要關(guān)注一個(gè)業(yè)務(wù)系統(tǒng)數(shù)據(jù)庫中某幾張表的數(shù)據(jù),當(dāng)數(shù)據(jù)發(fā)生新增或修改時(shí),將它同步到另一個(gè)業(yè)務(wù)系統(tǒng)數(shù)據(jù)庫中的表中。

[[410677]]

最近在工作中,遇到了這樣一個(gè)業(yè)務(wù)場(chǎng)景,我們需要關(guān)注一個(gè)業(yè)務(wù)系統(tǒng)數(shù)據(jù)庫中某幾張表的數(shù)據(jù),當(dāng)數(shù)據(jù)發(fā)生新增或修改時(shí),將它同步到另一個(gè)業(yè)務(wù)系統(tǒng)數(shù)據(jù)庫中的表中。

一提到數(shù)據(jù)庫的同步,估計(jì)大家第一時(shí)間想到的就是基于binlog的主從復(fù)制了,但是放在我們的場(chǎng)景中,還有幾個(gè)問題:

  • 第一,并不是需要復(fù)制所有表的數(shù)據(jù),復(fù)制對(duì)象只有少量的幾張表
  • 第二,也是比較麻煩的,兩個(gè)業(yè)務(wù)系統(tǒng)數(shù)據(jù)庫表結(jié)構(gòu)可能不一致。例如,要同步數(shù)據(jù)庫1的A表中的某些字段到數(shù)據(jù)庫2的B表中,在這一過程中,A表和B表的字段并不是完全相同

這樣的話,我們只能通過代碼的方式,首先獲取到數(shù)據(jù)庫1表中數(shù)據(jù)的變動(dòng),再通過手動(dòng)映射的方式,插入到數(shù)據(jù)庫2的表中。但是,獲取變動(dòng)數(shù)據(jù)的這一過程,還是離不開binlog,因此我們就需要在代碼中對(duì)binlog進(jìn)行一下監(jiān)控。

先說結(jié)論,我們最終使用了一個(gè)開源工具mysql-binlog-connector-java,用來監(jiān)控binlog變化并獲取數(shù)據(jù),獲取數(shù)據(jù)后再手動(dòng)插入到另一個(gè)庫的表中,基于它來實(shí)現(xiàn)了數(shù)據(jù)的同步。這個(gè)工具的git項(xiàng)目地址如下:

  1. https://github.com/shyiko/mysql-binlog-connector-java 

在正式開始前,還是先簡單介紹一下mysql的binlog,binlog是一個(gè)二進(jìn)制文件,它保存在磁盤中,是用來記錄數(shù)據(jù)庫表結(jié)構(gòu)變更、表數(shù)據(jù)修改的二進(jìn)制日志。其實(shí)除了數(shù)據(jù)復(fù)制外,它還可以實(shí)現(xiàn)數(shù)據(jù)恢復(fù)、增量備份等功能。

啟動(dòng)項(xiàng)目前,首先需要確保mysql服務(wù)已經(jīng)啟用了binlog:

  1. show variables like 'log_bin'

如果為值為OFF,表示沒有啟用,那么需要首先啟用binlog,修改配置文件:

  1. log_bin=mysql-bin 
  2. binlog-format=ROW 
  3. server-id=1 

對(duì)參數(shù)做一個(gè)簡要說明:

  • 在配置文件中加入了log_bin配置項(xiàng)后,表示啟用了binlog
  • binlog-format是binlog的日志格式,支持三種類型,分別是STATEMENT、ROW、MIXED,我們?cè)谶@里使用ROW模式
  • server-id用于標(biāo)識(shí)一個(gè)sql語句是從哪一個(gè)server寫入的,這里一定要進(jìn)行設(shè)置,否則我們?cè)诤竺娴拇a中會(huì)無法正常監(jiān)聽到事件

在更改完配置文件后,重啟mysql服務(wù)。再次查看是否啟用binlog,返回為ON,表示已經(jīng)開啟成功。

在Java項(xiàng)目中,首先引入maven坐標(biāo):

  1. <dependency> 
  2.     <groupId>com.github.shyiko</groupId> 
  3.     <artifactId>mysql-binlog-connector-java</artifactId> 
  4.     <version>0.21.0</version> 
  5. </dependency> 

 

寫一段簡單的示例,看看它的具體使用方式:

  1. public static void main(String[] args) { 
  2.     BinaryLogClient client = new BinaryLogClient("127.0.0.1", 3306, "hydra""123456"); 
  3.     client.setServerId(2); 
  4.  
  5.     client.registerEventListener(event -> { 
  6.         EventData data = event.getData(); 
  7.         if (data instanceof TableMapEventData) { 
  8.             System.out.println("Table:"); 
  9.             TableMapEventData tableMapEventData = (TableMapEventData) data; 
  10.             System.out.println(tableMapEventData.getTableId()+": ["+tableMapEventData.getDatabase() + "-" + tableMapEventData.getTable()+"]"); 
  11.         } 
  12.         if (data instanceof UpdateRowsEventData) { 
  13.             System.out.println("Update:"); 
  14.             System.out.println(data.toString()); 
  15.         } else if (data instanceof WriteRowsEventData) { 
  16.             System.out.println("Insert:"); 
  17.             System.out.println(data.toString()); 
  18.         } else if (data instanceof DeleteRowsEventData) { 
  19.             System.out.println("Delete:"); 
  20.             System.out.println(data.toString()); 
  21.         } 
  22.     }); 
  23.  
  24.     try { 
  25.         client.connect(); 
  26.     } catch (IOException e) { 
  27.         e.printStackTrace(); 
  28.     } 

首先,創(chuàng)建一個(gè)BinaryLogClient客戶端對(duì)象,初始化時(shí)需要傳入mysql的連接信息,創(chuàng)建完成后,給客戶端注冊(cè)一個(gè)監(jiān)聽器,來實(shí)現(xiàn)它對(duì)binlog的監(jiān)聽和解析。在監(jiān)聽器中,我們暫時(shí)只對(duì)4種類型的事件數(shù)據(jù)進(jìn)行了處理,除了WriteRowsEventData、DeleteRowsEventData、UpdateRowsEventData對(duì)應(yīng)增刪改操作類型的事件數(shù)據(jù)外,還有一個(gè)TableMapEventData類型的數(shù)據(jù),包含了表的對(duì)應(yīng)關(guān)系,在后面的例子中再具體說明。

在這里,客戶端監(jiān)聽到的是數(shù)據(jù)庫級(jí)別的所有事件,并且可以監(jiān)聽到表的DML語句和DDL語句,所以我們只需要處理我們關(guān)心的事件數(shù)據(jù)就行,否則會(huì)收到大量的冗余數(shù)據(jù)。

啟動(dòng)程序,控制臺(tái)輸出:

  1. com.github.shyiko.mysql.binlog.BinaryLogClient openChannelToBinaryLogStream 
  2. 信息: Connected to 127.0.0.1:3306 at mysql-bin.000002/1046 (sid:2, cid:10) 

信息: Connected to 127.0.0.1:3306 at mysql-bin.000002/1046 (sid:2, cid:10)

連接mysql的binlog成功,接下來,我們?cè)跀?shù)據(jù)庫中插入一條數(shù)據(jù),這里操作的數(shù)據(jù)庫名字是tenant,表是dept:

  1. insert into dept VALUES(8,"人力","","1"); 

這時(shí),控制臺(tái)就會(huì)打印監(jiān)聽到事件的數(shù)據(jù):

  1. Table
  2. 108: [tenant-dept] 
  3. Insert
  4. WriteRowsEventData{tableId=108, includedColumns={0, 1, 2, 3}, rows=[ 
  5.     [8, 人力, , 1] 
  6. ]} 

我們監(jiān)聽到的事件類型數(shù)據(jù)有兩類,第一類是TableMapEventData,通過它可以獲取操作的數(shù)據(jù)庫名稱、表名稱以及表的id。之所以我們要監(jiān)聽這個(gè)事件,是因?yàn)橹蟊O(jiān)聽的實(shí)際操作中返回?cái)?shù)據(jù)中包含了表的id,而沒有表名等信息,所以如果我們想知道具體的操作是在哪一張表的話,就要先維護(hù)一個(gè)id與表的對(duì)應(yīng)關(guān)系。

第二個(gè)打印出來的監(jiān)聽事件數(shù)據(jù)是WriteRowsEventData,其中記錄了insert語句作用的表,插入涉及到的列,以及實(shí)際插入的數(shù)據(jù)。另外,如果我們只需要對(duì)特定的一張或幾張表進(jìn)行處理的話,也可以提前設(shè)置表的名單,在這里根據(jù)表id到表名的映射關(guān)系,實(shí)現(xiàn)數(shù)據(jù)的過濾,

接下來,我們?cè)賵?zhí)行一條update語句:

  1. update dept set tenant_id=3 where id=8 or id=9 

控制臺(tái)輸出:

  1. Table
  2. 108: [tenant-dept] 
  3. Update
  4. UpdateRowsEventData{tableId=108, includedColumnsBeforeUpdate={0, 1, 2, 3}, includedColumns={0, 1, 2, 3}, rows=[ 
  5.     {before=[8, 人力, , 1], after=[8, 人力, , 3]}, 
  6.     {before=[9, 人力, , 1], after=[9, 人力, , 3]} 
  7. ]} 

在執(zhí)行update語句時(shí),可能會(huì)作用于多條數(shù)據(jù),因此在實(shí)際修改的數(shù)據(jù)中,可能包含多行記錄,這一點(diǎn)體現(xiàn)在上面的rows中,包含了id為8和9的兩條數(shù)據(jù)。

最后,再執(zhí)行一條delete語句:

  1. delete from dept where tenant_id=3 

控制臺(tái)打印如下,rows中同樣返回了生效的兩條數(shù)據(jù):

  1. Table
  2. 108: [tenant-dept] 
  3. Delete
  4. DeleteRowsEventData{tableId=108, includedColumns={0, 1, 2, 3}, rows=[ 
  5.     [8, 人力, , 3], 
  6.     [9, 人力, , 3] 
  7. ]} 

簡單的使用原理介紹完成后,再回到我們?cè)鹊男枨笊?,需要將一張表中新增或修改的?shù)據(jù)同步到另一張表中,問題還有一個(gè),就是如何將返回的數(shù)據(jù)對(duì)應(yīng)到所在的列上。這時(shí)應(yīng)該怎么實(shí)現(xiàn)呢?

以u(píng)pdate操作為例,我們要對(duì)提取的數(shù)據(jù)后進(jìn)行一下處理,更改上面例子中的方法:

  1. if (data instanceof UpdateRowsEventData) { 
  2.     System.out.println("Update:"); 
  3.     UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) data; 
  4.     for (Map.Entry<Serializable[], Serializable[]> row : updateRowsEventData.getRows()) { 
  5.         List<Serializable> entries = Arrays.asList(row.getValue()); 
  6.         System.out.println(entries); 
  7.         JSONObject dataObject = getDataObject(entries); 
  8.         System.out.println(dataObject); 
  9.     } 

在將data強(qiáng)制轉(zhuǎn)換為UpdateRowsEventData后,可以使用getRows方法獲取到更新的行數(shù)據(jù),并且能夠取到每一列的值。

之后,調(diào)用了一個(gè)自己實(shí)現(xiàn)的getDataObject方法,用它來實(shí)現(xiàn)數(shù)據(jù)到列的綁定過程:

  1. private static JSONObject getDataObject(List message) { 
  2.     JSONObject resultObject = new JSONObject(); 
  3.     String format = "{\"id\":\"0\",\"dept_name\":\"1\",\"comment\":\"2\",\"tenant_id\":\"3\"}"
  4.     JSONObject json = JSON.parseObject(format); 
  5.     for (String key : json.keySet()) { 
  6.         resultObject.put(key, message.get(json.getInteger(key))); 
  7.     } 
  8.     return resultObject; 

在format字符串中,提前維護(hù)了一個(gè)數(shù)據(jù)庫表的字段順序的字符串,標(biāo)識(shí)了每個(gè)字段位于順序中的第幾個(gè)位置。通過上面這個(gè)函數(shù),能夠?qū)崿F(xiàn)數(shù)據(jù)到列的填裝過程,我們?cè)賵?zhí)行一條update語句來查看一下結(jié)果:

  1. update dept set tenant_id=3,comment="1" where id=8 

控制臺(tái)打印結(jié)果如下:

  1. Table
  2. 108: [tenant-dept] 
  3. Update
  4. [8, 人力, 1, 3] 
  5. {"tenant_id":3,"dept_name":"人力","comment":"1","id":8} 

可以看到,將修改后的這一條記錄中的屬性填裝到了它對(duì)應(yīng)的列中,之后我們?cè)俑鶕?jù)具體的業(yè)務(wù)邏輯,就可以根據(jù)字段名取出數(shù)據(jù),將數(shù)據(jù)同步到其他的表了。

本文轉(zhuǎn)載自微信公眾號(hào)「碼農(nóng)參上」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系碼農(nóng)參上公眾號(hào)。

 

責(zé)任編輯:武曉燕 來源: 碼農(nóng)參上
相關(guān)推薦

2024-08-28 13:09:50

2011-04-01 16:30:49

cacti監(jiān)控Mysql

2020-09-18 11:00:28

MySQLbinlogrelay-log

2024-03-14 14:18:58

MySQL業(yè)務(wù)設(shè)計(jì)事務(wù)

2025-01-22 16:00:00

MySQL數(shù)據(jù)庫Binlog

2024-02-26 07:39:16

2010-05-18 12:24:16

MySQL binlo

2017-04-20 21:00:06

MySQLbinlog主從復(fù)制

2022-09-23 13:24:21

MySQL數(shù)據(jù)庫

2016-11-03 18:39:39

JavaMySQL

2015-12-11 11:49:19

java

2023-06-08 07:37:35

MySQLbinlog機(jī)制

2021-01-15 05:36:48

MySQL錯(cuò)位數(shù)據(jù)庫

2024-05-30 08:03:17

2021-09-18 15:05:58

MySQL數(shù)據(jù)庫監(jiān)控

2022-03-15 11:31:17

MySQL日志格式

2020-12-07 11:12:16

MySOLBinlogOtter

2011-07-11 14:36:10

BinlogMysql

2023-11-16 08:00:00

Datadog部署實(shí)時(shí)監(jiān)控

2023-01-06 18:31:46

準(zhǔn)確命名
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)