實(shí)戰(zhàn)!Spring Boot 整合 阿里開源中間件 Canal 實(shí)現(xiàn)數(shù)據(jù)增量同步!
本文轉(zhuǎn)載自微信公眾號「碼猿技術(shù)專欄」,作者不才陳某 。轉(zhuǎn)載本文請聯(lián)系碼猿技術(shù)專欄公眾號。
數(shù)據(jù)同步一直是一個(gè)令人頭疼的問題。在業(yè)務(wù)量小,場景不多,數(shù)據(jù)量不大的情況下我們可能會選擇在項(xiàng)目中直接寫一些定時(shí)任務(wù)手動處理數(shù)據(jù),例如從多個(gè)表將數(shù)據(jù)查出來,再匯總處理,再插入到相應(yīng)的地方。
但是隨著業(yè)務(wù)量增大,數(shù)據(jù)量變多以及各種復(fù)雜場景下的分庫分表的實(shí)現(xiàn),使數(shù)據(jù)同步變得越來越困難。
今天這篇文章使用阿里開源的中間件Canal解決數(shù)據(jù)增量同步的痛點(diǎn)。
文章目錄如下:
Canal是什么?
canal譯意為水道/管道/溝渠,主要用途是基于 MySQL 數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi)。
從這句話理解到了什么?
基于MySQL,并且通過MySQL日志進(jìn)行的增量解析,這也就意味著對原有的業(yè)務(wù)代碼完全是無侵入性的。
工作原理:解析MySQL的binlog日志,提供增量數(shù)據(jù)。
基于日志增量訂閱和消費(fèi)的業(yè)務(wù)包括
- 數(shù)據(jù)庫鏡像
- 數(shù)據(jù)庫實(shí)時(shí)備份
- 索引構(gòu)建和實(shí)時(shí)維護(hù)(拆分異構(gòu)索引、倒排索引等)
- 業(yè)務(wù) cache 刷新
- 帶業(yè)務(wù)邏輯的增量數(shù)據(jù)處理
當(dāng)前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
官方文檔:https://github.com/alibaba/canal
Canal數(shù)據(jù)如何傳輸?
先來一張官方圖:
Canal分為服務(wù)端和客戶端,這也是阿里常用的套路,比如前面講到的注冊中心Nacos:
- 服務(wù)端:負(fù)責(zé)解析MySQL的binlog日志,傳遞增量數(shù)據(jù)給客戶端或者消息中間件
- 客戶端:負(fù)責(zé)解析服務(wù)端傳過來的數(shù)據(jù),然后定制自己的業(yè)務(wù)處理。
目前為止支持的消息中間件很全面了,比如Kafka、RocketMQ,RabbitMQ。
數(shù)據(jù)同步還有其他中間件嗎?
有,當(dāng)然有,還有一些開源的中間件也是相當(dāng)不錯(cuò)的,比如Bifrost。
常見的幾款中間件的區(qū)別如下:
當(dāng)然要我選擇的話,首選阿里的中間件Canal。
Canal服務(wù)端安裝
服務(wù)端需要下載壓縮包,下載地址:https://github.com/alibaba/canal/releases
目前最新的是v1.1.5,點(diǎn)擊下載:
下載完成解壓,目錄如下:
本文使用Canal+RabbitMQ進(jìn)行數(shù)據(jù)的同步,因此下面步驟完全按照這個(gè)base進(jìn)行。
1、打開MySQL的binlog日志
修改MySQL的日志文件,my.cnf 配置如下:
- [mysqld]
- log-bin=mysql-bin # 開啟 binlog
- binlog-format=ROW # 選擇 ROW 模式
- server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復(fù)
2、設(shè)置MySQL的配置
需要設(shè)置服務(wù)端配置文件中的MySQL配置,這樣Canal才能知道需要監(jiān)聽哪個(gè)庫、哪個(gè)表的日志文件。
一個(gè) Server 可以配置多個(gè)實(shí)例監(jiān)聽 ,Canal 功能默認(rèn)自帶的有個(gè) example 實(shí)例,本篇就用 example 實(shí)例 。如果增加實(shí)例,復(fù)制 example 文件夾內(nèi)容到同級目錄下,然后在 canal.properties 指定添加實(shí)例的名稱。
修改canal.deployer-1.1.5\conf\example\instance.properties配置文件
- # url
- canal.instance.master.address=127.0.0.1:3306
- # username/password
- canal.instance.dbUsername=root
- canal.instance.dbPassword=root
- # 監(jiān)聽的數(shù)據(jù)庫
- canal.instance.defaultDatabaseName=test
- # 監(jiān)聽的表,可以指定,多個(gè)用逗號分割,這里正則是監(jiān)聽所有
- canal.instance.filter.regex=.*\\..*
3、設(shè)置RabbitMQ的配置
服務(wù)端默認(rèn)的傳輸方式是tcp,需要在配置文件中設(shè)置MQ的相關(guān)信息。
這里需要修改兩處配置文件,如下;
1)、canal.deployer-1.1.5\conf\canal.properties
這個(gè)配置文件主要是設(shè)置MQ相關(guān)的配置,比如URL,用戶名、密碼...
- # 傳輸方式:tcp, kafka, rocketMQ, rabbitMQ
- canal.serverMode = rabbitMQ
- ##################################################
- ######### RabbitMQ #############
- ##################################################
- rabbitmq.host = 127.0.0.1
- rabbitmq.virtual.host =/
- # exchange
- rabbitmq.exchange =canal.exchange
- # 用戶名、密碼
- rabbitmq.username =guest
- rabbitmq.password =guest
- ## 是否持久化
- rabbitmq.deliveryMode = 2
2)、canal.deployer-1.1.5\conf\example\instance.properties
這個(gè)文件設(shè)置MQ的路由KEY,這樣才能路由到指定的隊(duì)列中,如下:
- canal.mq.topic=canal.routing.key
4、RabbitMQ新建exchange和Queue
在RabbitMQ中需要新建一個(gè)canal.exchange(必須和配置中的相同)的exchange和一個(gè)名稱為 canal.queue(名稱隨意)的隊(duì)列。
其中綁定的路由KEY為:canal.routing.key(必須和配置中的相同),如下圖:
5、啟動服務(wù)端
點(diǎn)擊bin目錄下的腳本,windows直接雙擊startup.bat,啟動成功如下:
6、測試
在本地?cái)?shù)據(jù)庫test中的oauth_client_details插入一條數(shù)據(jù),如下:
- INSERT INTO `oauth_client_details` VALUES ('myjszl', 'res1', '$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W', 'all', 'password,refresh_token,authorization_code,client_credentials,implicit', 'http://www.baidu.com', NULL, 1000, 1000, NULL, 'false');
此時(shí)查看MQ中的canal.queue已經(jīng)有了數(shù)據(jù),如下:
其實(shí)就是一串JSON數(shù)據(jù),這個(gè)JSON如下:
- {
- "data": [{
- "client_id": "myjszl",
- "resource_ids": "res1",
- "client_secret": "$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W",
- "scope": "all",
- "authorized_grant_types": "password,refresh_token,authorization_code,client_credentials,implicit",
- "web_server_redirect_uri": "http://www.baidu.com",
- "authorities": null,
- "access_token_validity": "1000",
- "refresh_token_validity": "1000",
- "additional_information": null,
- "autoapprove": "false"
- }],
- "database": "test",
- "es": 1640337532000,
- "id": 7,
- "isDdl": false,
- "mysqlType": {
- "client_id": "varchar(48)",
- "resource_ids": "varchar(256)",
- "client_secret": "varchar(256)",
- "scope": "varchar(256)",
- "authorized_grant_types": "varchar(256)",
- "web_server_redirect_uri": "varchar(256)",
- "authorities": "varchar(256)",
- "access_token_validity": "int(11)",
- "refresh_token_validity": "int(11)",
- "additional_information": "varchar(4096)",
- "autoapprove": "varchar(256)"
- },
- "old": null,
- "pkNames": ["client_id"],
- "sql": "",
- "sqlType": {
- "client_id": 12,
- "resource_ids": 12,
- "client_secret": 12,
- "scope": 12,
- "authorized_grant_types": 12,
- "web_server_redirect_uri": 12,
- "authorities": 12,
- "access_token_validity": 4,
- "refresh_token_validity": 4,
- "additional_information": 12,
- "autoapprove": 12
- },
- "table": "oauth_client_details",
- "ts": 1640337532520,
- "type": "INSERT"
- }
每個(gè)字段的意思已經(jīng)很清楚了,有表名稱、方法、參數(shù)、參數(shù)類型、參數(shù)值.....
客戶端要做的就是監(jiān)聽MQ獲取JSON數(shù)據(jù),然后將其解析出來,處理自己的業(yè)務(wù)邏輯。
Canal客戶端搭建
客戶端很簡單實(shí)現(xiàn),要做的就是消費(fèi)Canal服務(wù)端傳遞過來的消息,監(jiān)聽canal.queue這個(gè)隊(duì)列。
1、創(chuàng)建消息實(shí)體類
MQ傳遞過來的是JSON數(shù)據(jù),當(dāng)然要創(chuàng)建個(gè)實(shí)體類接收數(shù)據(jù),如下:
- /**
- * @author 公眾號 碼猿技術(shù)專欄
- * Canal消息接收實(shí)體類
- */
- @NoArgsConstructor
- @Data
- public class CanalMessage<T> {
- @JsonProperty("type")
- private String type;
- @JsonProperty("table")
- private String table;
- @JsonProperty("data")
- private List<T> data;
- @JsonProperty("database")
- private String database;
- @JsonProperty("es")
- private Long es;
- @JsonProperty("id")
- private Integer id;
- @JsonProperty("isDdl")
- private Boolean isDdl;
- @JsonProperty("old")
- private List<T> old;
- @JsonProperty("pkNames")
- private List<String> pkNames;
- @JsonProperty("sql")
- private String sql;
- @JsonProperty("ts")
- private Long ts;
- }
2、MQ消息監(jiān)聽業(yè)務(wù)
接下來就是監(jiān)聽隊(duì)列,一旦有Canal服務(wù)端有數(shù)據(jù)推送能夠及時(shí)的消費(fèi)。
代碼很簡單,只是給出個(gè)接收的案例,具體的業(yè)務(wù)邏輯可以根據(jù)業(yè)務(wù)實(shí)現(xiàn),如下:
- import cn.hutool.json.JSONUtil;
- import cn.myjszl.middle.ware.canal.mq.rabbit.model.CanalMessage;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.annotation.Exchange;
- import org.springframework.amqp.rabbit.annotation.Queue;
- import org.springframework.amqp.rabbit.annotation.QueueBinding;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- /**
- * 監(jiān)聽MQ獲取Canal增量的數(shù)據(jù)消息
- */
- @Component
- @Slf4j
- @RequiredArgsConstructor
- public class CanalRabbitMQListener {
- @RabbitListener(bindings = {
- @QueueBinding(
- value = @Queue(value = "canal.queue", durable = "true"),
- exchange = @Exchange(value = "canal.exchange"),
- key = "canal.routing.key"
- )
- })
- public void handleDataChange(String message) {
- //將message轉(zhuǎn)換為CanalMessage
- CanalMessage canalMessage = JSONUtil.toBean(message, CanalMessage.class);
- String tableName = canalMessage.getTable();
- log.info("Canal 監(jiān)聽 {} 發(fā)生變化;明細(xì):{}", tableName, message);
- //TODO 業(yè)務(wù)邏輯自己完善...............
- }
- }
3、測試
下面向表中插入數(shù)據(jù),看下接收的消息是什么樣的,SQL如下:
- INSERT INTO `oauth_client_details`
- VALUES
- ( 'myjszl', 'res1', '$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W', 'all', 'password,refresh_token,authorization_code,client_credentials,implicit', 'http://www.baidu.com', NULL, 1000, 1000, NULL, 'false' );
客戶端轉(zhuǎn)換后的消息如下圖:
上圖可以看出所有的數(shù)據(jù)都已經(jīng)成功接收到,只需要根據(jù)數(shù)據(jù)完善自己的業(yè)務(wù)邏輯即可。
客戶端案例源碼已經(jīng)上傳GitHub,關(guān)注公眾號:碼猿技術(shù)專欄,回復(fù)關(guān)鍵詞:9530 獲取!
總結(jié)
數(shù)據(jù)增量同步的開源工具并不只有Canal一種,根據(jù)自己的業(yè)務(wù)需要選擇合適的組件。