我是如何一步一步監(jiān)控公司MySQL的每一個操作?
作為一個程序員,閑下來還是喜歡學(xué)習(xí)鉆研一些新奇的技術(shù),canal就成了很好的研究對象,一不小心就監(jiān)控了公司MySQL的一舉一動的
一、canal是個啥?
canal是阿里開發(fā)的一款基于數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱與消費的框架,整個框架純JAVA開發(fā),目前僅支持Mysql和MariaDB(和mysql類似)。
那什么是數(shù)據(jù)庫增量日志?
MySQL的日志種類是比較多的,主要包含:錯誤日志、查詢?nèi)罩?、慢查詢?nèi)罩?、事?wù)日志、二進制日志。而MySQL數(shù)據(jù)庫所發(fā)生的數(shù)據(jù)變更(DML(data manipulation language)數(shù)據(jù)操縱語言,也就是我們熟悉的增刪改),都會以二進制日志(binary log)形式存儲。
二、canal原理
在介紹canal原理之前,我們先來回顧一下MySQL主從同步的原理,這或許會讓你更好的理解canal的工作機制。
1、MySQL主從同步原理:
MySQL主從同步也叫讀寫分離,可以提升數(shù)據(jù)庫的負(fù)載和容錯能力,實現(xiàn)數(shù)據(jù)庫的高可用
先來分析一張MySQL主從同步原理圖:
以上圖片源自網(wǎng)絡(luò),如有侵權(quán)聯(lián)系刪除
master節(jié)點操作過程:
當(dāng)master節(jié)點數(shù)據(jù)發(fā)生更改后(delete、update、insert,還是創(chuàng)建函數(shù)、存儲過程等操作),向binary log中寫入記錄日志,這些記錄又叫做二進制日志事件(binary log events)。
- show binlog events
這些事件會按照順序?qū)懭隻in log中。當(dāng)slave節(jié)點啟動連接到master節(jié)點的時候,master節(jié)點會為slave節(jié)點開啟binlog dump線程(負(fù)責(zé)傳輸binlog數(shù)據(jù))。
一旦master節(jié)點的bin log發(fā)生變化時,bin logdump線程會通知slave節(jié)點有可以傳輸?shù)腷inlog,并將相應(yīng)的bin log內(nèi)容發(fā)送給slave節(jié)點。
slave節(jié)點操作過程:
slave節(jié)點上會創(chuàng)建兩個線程:一個I/O線程,一個SQL線程。I/O線程連接到master節(jié)點,master節(jié)點上的binlog dump 線程會將binlog的內(nèi)容發(fā)送給該I\O線程。
該I/O線程接收到binlog內(nèi)容后,再將內(nèi)容寫入到本地的relay log。而sql線程讀取到I/O線程寫入的ralay log,將relay log中的內(nèi)容寫入slave數(shù)據(jù)庫。
2、canal原理
懂了上邊MySQL的主從同步原理,canal的工作機制就很好理解了。
其實canal是模擬了MySQL數(shù)據(jù)庫中,slave節(jié)點與master節(jié)點的交互協(xié)議,偽裝自己為MySQL slave節(jié)點,向MySQL master節(jié)點發(fā)送dump協(xié)議,MySQL master節(jié)點收到dump請求,開始推送binary log給slave節(jié)點(也就是canal)。
以上圖片源自網(wǎng)絡(luò),如有侵權(quán)聯(lián)系刪除
光說不練假把式,開干!
三、canal實現(xiàn)“監(jiān)控”MySQL
在寫代碼前我們先對MySQL進行一下改造,安裝MySQL就不再細說了,基本操作。
1、查看一下MySQL是否開啟了binary log功能
- show binary logs
如果沒有開啟是圖中的狀態(tài),一般用戶是沒有這個命令權(quán)限的,不過我有,嘖嘖嘖!
如果沒有需要手動開啟,并且在my.cnf文件中配置binlog-format 為Row模式
- log-bin=mysq-bin
- binlog-format=Row
log-bin是binlog文件存放位置
binlog-format 設(shè)置MySQL復(fù)制log-bin的方式
MySQL的三種復(fù)制方式:
基于SQL語句的復(fù)制(statement-based replication, SBR)
- 優(yōu)點:將修改數(shù)據(jù)的sql保存在binlog,不需要記錄每一條sql和數(shù)據(jù)變化,binlog體量會很小,IO開銷少,性能好
- 缺點:會導(dǎo)致master-slave中的數(shù)據(jù)不一致
基于行的復(fù)制(row-based replication, RBR)
- 優(yōu)點:不記錄每條sql語句的上下文信息,僅需記錄哪條數(shù)據(jù)被修改了,修改成什么樣了
- 缺點:binlog體積很大,尤其是在alter table屬性時,會產(chǎn)生大量binlog數(shù)據(jù)
混合模式復(fù)制(mixed-based replication, MBR)
- 對應(yīng)的,binlog的格式也有三種:STATEMENT,ROW,MIXED。
2、為canal 創(chuàng)建一個有權(quán)限操作MySQL的用戶
- CREATE USER canal IDENTIFIED BY 'canal';
- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
- -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
- FLUSH PRIVILEGES;
3、安裝canal
下載地址:https://github.com/alibaba/canal/releases
下載后選擇版本例如:canal.deployer-xxx.tar.gz
4、配置canal
修改instance.properties文件,需要添加監(jiān)聽數(shù)據(jù)庫和表的規(guī)則,canal可以全量監(jiān)聽數(shù)據(jù)庫,也可以針對某個表進行監(jiān)聽,比較靈活。
- vim conf/example/instance.properties
- #################################################
- ## mysql serverId
- canal.instance.mysql.slaveId = 2020
- # position info 修改自己的數(shù)據(jù)庫(canal要監(jiān)聽的數(shù)據(jù)庫 地址 )
- canal.instance.master.address = 127.0.0.1:3306
- canalcanal.instance.master.journal.name =
- canal.instance.master.position =
- canal.instance.master.timestamp =
- #canal.instance.standby.address =
- #canal.instance.standby.journal.name =
- #canal.instance.standby.position =
- #canal.instance.standby.timestamp =
- # username/password 修改成自己 數(shù)據(jù)庫信息的賬號 (單獨開一個 準(zhǔn)備階段創(chuàng)建的賬號)
- canalcanal.instance.dbUsername = canal
- canalcanal.instance.dbPassword = canal
- canalcanal.instance.defaultDatabaseName =
- canal.instance.connectionCharset = UTF-8
- # table regex 表的監(jiān)聽規(guī)則
- # canal.instance.filter.regex = blogs\.blog_info
- canal.instance.filter.regex = .\*\\\\..\*
- # table black regex
- canal.instance.filter.black.regex =
啟動canal
- sh bin/startup.sh
看一下server日志,確認(rèn)一下canal是否正常啟動
- vi logs/canal/canal.log
顯示canal server is running now即為成功
- 2020-01-08 15:25:33.361 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
- 2020-01-08 15:25:33.468 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.12.245:11111]
- 2020-01-08 15:25:34.061 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
5、編寫Java客戶端代碼,實現(xiàn)canal監(jiān)聽
引入依賴包
- <dependency>
- <groupId>com.alibaba.otter</groupId>
- <artifactId>canal.client</artifactId>
- <version>1.1.0</version>
- </dependency>
這里只是簡單實現(xiàn)
- public class MainApp {
- public static void main(String... args) throws Exception {
- /**
- * 創(chuàng)建與
- */
- CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
- 11111), "example", "", "");
- int batchSize = 1000;
- int emptyCount = 0;
- try {
- connector.connect();
- /**
- * 監(jiān)控數(shù)據(jù)庫中所有表
- */
- connector.subscribe(".*\\..*");
- /**
- * 指定要監(jiān)控的表,庫名.表名
- */
- //connector.subscribe("xin-master.jk_order");
- connector.rollback();
- //120次心跳過后未檢測到,跳出
- int totalEmptyCount = 120;
- while (emptyCount < totalEmptyCount) {
- Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù)
- long batchId = message.getId();
- int size = message.getEntries().size();
- if (batchId == -1 || size == 0) {
- emptyCount++;
- System.out.println("empty count : " + emptyCount);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- }
- } else {
- emptyCount = 0;
- // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
- printEntry(message.getEntries());
- }
- /**
- * 提交確認(rèn)
- */
- connector.ack(batchId);
- /**
- * 處理失敗, 回滾數(shù)據(jù)
- */
- connector.rollback(batchId);
- }
- System.out.println("empty too many times, exit");
- } finally {
- connector.disconnect();
- /**
- * 手動開啟事務(wù)回滾
- */
- //TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
- }
- }
- private static void printEntry(List<CanalEntry.Entry> entrys) {
- for (CanalEntry.Entry entry : entrys) {
- if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry
- .EntryType
- .TRANSACTIONEND) {
- continue;
- }
- CanalEntry.RowChange rowChage = null;
- try {
- rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
- } catch (Exception e) {
- throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
- e);
- }
- CanalEntry.EventType eventType = rowChage.getEventType();
- System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
- entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
- entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
- eventType));
- for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
- if (eventType == CanalEntry.EventType.DELETE) {
- printColumn(rowData.getBeforeColumnsList());
- } else if (eventType == CanalEntry.EventType.INSERT) {
- printColumn(rowData.getAfterColumnsList());
- } else {
- System.out.println("-------> before");
- printColumn(rowData.getBeforeColumnsList());
- System.out.println("-------> after");
- printColumn(rowData.getAfterColumnsList());
- }
- }
- }
- }
- private static void printColumn(List<CanalEntry.Column> columns) {
- for (CanalEntry.Column column : columns) {
- System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
- }
- }
- }
代碼到這就編寫完成了,我們啟動服務(wù)看下是什么效果,由于并沒有操作數(shù)據(jù)庫,所以監(jiān)聽的結(jié)果都是空的。
接下來我們在數(shù)據(jù)庫執(zhí)行一條update語句試試
- update jk_orderset order_no = '1111' where id = 40
控制臺檢測到了數(shù)據(jù)庫的修改,并生成binlog 日志文件mysql-bin.000009:3830
那么生成的binlog 文件該怎么用,如何解析成SQl語句呢?
- <!-- mysql binlog解析 -->
- <dependency>
- <groupId>com.github.shyiko</groupId>
- <artifactId>mysql-binlog-connector-java</artifactId>
- <version>0.13.0</version>
- </dependency>
將剛才的binlog文件下載本地測試一下
- public static void main(String[] args) throws IOException {
- String filePath = "C:\\ProgramData\\MySQL\\MySQL Server 5.7\\Data\\mysql-bin.000009:3830";
- File binlogFile = new File(filePath);
- EventDeserializer eventDeserializer = new EventDeserializer();
- eventDeserializer.setChecksumType(ChecksumType.CRC32);
- BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer);
- try {
- for (Event event; (event = reader.readEvent()) != null; ) {
- System.out.println(event.toString());
- }
- } finally {
- reader.close();
- }
- }
查看一下執(zhí)行結(jié)果,發(fā)現(xiàn)數(shù)據(jù)庫最近的一次操作是加了一個idx_index索引
- Event{header=EventHeaderV4{timestamp=1551325542000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=8455, flags=0}, data=null}
- Event{header=EventHeaderV4{timestamp=1551325542000, eventType=QUERY, serverId=1, headerLength=19, dataLength=190, nextPosition=8664, flags=0}, data=QueryEventData{threadId=25, executionTime=0, errorCode=0, database='xin-master', sql='ALTER TABLE `jk_order`
- DROP INDEX `idx_index` ,
- ADD INDEX `idx_index` (`user_id`, `service_id`, `real_price`) USING BTREE'}}
- Event{header=EventHeaderV4{timestamp=1551438586000, eventType=STOP, serverId=1, headerLength=19, dataLength=4, nextPosition=8687, flags=0}, data=null}
至此我們就已經(jīng)實現(xiàn)了監(jiān)控MySQL
四、canal應(yīng)用場景
canal應(yīng)用場景大致有以下:
- 解決MySQL主從同步延遲的問題
- 實現(xiàn)數(shù)據(jù)庫實時備份
- 多級索引 (賣家和買家各自分庫索引)
- 實現(xiàn)業(yè)務(wù)cache刷新
- 價格變化等重要業(yè)務(wù)消息
重點分析一下canal是如何解決MySQL主從同步延遲的問題
生產(chǎn)環(huán)境下MySQL的主從同步模式(maser-slave)很常見,但對于跨機房部署的集群,會出現(xiàn)同步延時的情況。舉個栗子:
一條訂單狀態(tài)是未付款,master節(jié)點修改成已付款,可由于某些原因出現(xiàn)延遲數(shù)據(jù)未能及時同步到slave,這時用戶立即查看訂單狀態(tài)(查詢走slave)顯示還是未付款,哪個用戶看到這種情況不得慌啊。
為什么會出現(xiàn)主從同步延遲呢?
當(dāng)主庫master的TPS并發(fā)較高時,master節(jié)點并發(fā)產(chǎn)生的修改操作,而slave節(jié)點的sql線程是單線程處理同步數(shù)據(jù),延時自然而言就產(chǎn)生了。
不過造成主從同步的原因不止這些,由于主從服務(wù)器存在跨機器并且跨機房,除了網(wǎng)絡(luò)帶寬原因之外,網(wǎng)絡(luò)的穩(wěn)定性以及機器之間的同步,都是主從同步應(yīng)該考慮的主要原因。
總結(jié)
本文只是簡單實現(xiàn)canal監(jiān)聽數(shù)據(jù)庫的功能,旨在給大家提供一種解決問題的思路,還是反復(fù)絮叨的那句話,解決問題的技術(shù)方法很對,具體如何應(yīng)用還需結(jié)合具體業(yè)務(wù)。