自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

實(shí)戰(zhàn)!Spring Boot 整合 阿里開源中間件 Canal 實(shí)現(xiàn)數(shù)據(jù)增量同步!

開發(fā) 架構(gòu)
數(shù)據(jù)同步一直是一個(gè)令人頭疼的問題。在業(yè)務(wù)量小,場景不多,數(shù)據(jù)量不大的情況下我們可能會選擇在項(xiàng)目中直接寫一些定時(shí)任務(wù)手動處理數(shù)據(jù),例如從多個(gè)表將數(shù)據(jù)查出來,再匯總處理,再插入到相應(yīng)的地方。

[[442171]]

本文轉(zhuǎn)載自微信公眾號「碼猿技術(shù)專欄」,作者不才陳某 。轉(zhuǎn)載本文請聯(lián)系碼猿技術(shù)專欄公眾號。

數(shù)據(jù)同步一直是一個(gè)令人頭疼的問題。在業(yè)務(wù)量小,場景不多,數(shù)據(jù)量不大的情況下我們可能會選擇在項(xiàng)目中直接寫一些定時(shí)任務(wù)手動處理數(shù)據(jù),例如從多個(gè)表將數(shù)據(jù)查出來,再匯總處理,再插入到相應(yīng)的地方。

但是隨著業(yè)務(wù)量增大,數(shù)據(jù)量變多以及各種復(fù)雜場景下的分庫分表的實(shí)現(xiàn),使數(shù)據(jù)同步變得越來越困難。

今天這篇文章使用阿里開源的中間件Canal解決數(shù)據(jù)增量同步的痛點(diǎn)。

文章目錄如下:

Canal是什么?

canal譯意為水道/管道/溝渠,主要用途是基于 MySQL 數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi)。

從這句話理解到了什么?

基于MySQL,并且通過MySQL日志進(jìn)行的增量解析,這也就意味著對原有的業(yè)務(wù)代碼完全是無侵入性的。

工作原理:解析MySQL的binlog日志,提供增量數(shù)據(jù)。

基于日志增量訂閱和消費(fèi)的業(yè)務(wù)包括

  • 數(shù)據(jù)庫鏡像
  • 數(shù)據(jù)庫實(shí)時(shí)備份
  • 索引構(gòu)建和實(shí)時(shí)維護(hù)(拆分異構(gòu)索引、倒排索引等)
  • 業(yè)務(wù) cache 刷新
  • 帶業(yè)務(wù)邏輯的增量數(shù)據(jù)處理

當(dāng)前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

官方文檔:https://github.com/alibaba/canal

Canal數(shù)據(jù)如何傳輸?

先來一張官方圖:

Canal分為服務(wù)端和客戶端,這也是阿里常用的套路,比如前面講到的注冊中心Nacos:

  • 服務(wù)端:負(fù)責(zé)解析MySQL的binlog日志,傳遞增量數(shù)據(jù)給客戶端或者消息中間件
  • 客戶端:負(fù)責(zé)解析服務(wù)端傳過來的數(shù)據(jù),然后定制自己的業(yè)務(wù)處理。

目前為止支持的消息中間件很全面了,比如Kafka、RocketMQ,RabbitMQ。

數(shù)據(jù)同步還有其他中間件嗎?

有,當(dāng)然有,還有一些開源的中間件也是相當(dāng)不錯(cuò)的,比如Bifrost。

常見的幾款中間件的區(qū)別如下:

當(dāng)然要我選擇的話,首選阿里的中間件Canal。

Canal服務(wù)端安裝

服務(wù)端需要下載壓縮包,下載地址:https://github.com/alibaba/canal/releases

目前最新的是v1.1.5,點(diǎn)擊下載:

下載完成解壓,目錄如下:

本文使用Canal+RabbitMQ進(jìn)行數(shù)據(jù)的同步,因此下面步驟完全按照這個(gè)base進(jìn)行。

1、打開MySQL的binlog日志

修改MySQL的日志文件,my.cnf 配置如下:

  1. [mysqld] 
  2. log-bin=mysql-bin # 開啟 binlog 
  3. binlog-format=ROW # 選擇 ROW 模式 
  4. server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復(fù) 

2、設(shè)置MySQL的配置

需要設(shè)置服務(wù)端配置文件中的MySQL配置,這樣Canal才能知道需要監(jiān)聽哪個(gè)庫、哪個(gè)表的日志文件。

一個(gè) Server 可以配置多個(gè)實(shí)例監(jiān)聽 ,Canal 功能默認(rèn)自帶的有個(gè) example 實(shí)例,本篇就用 example 實(shí)例 。如果增加實(shí)例,復(fù)制 example 文件夾內(nèi)容到同級目錄下,然后在 canal.properties 指定添加實(shí)例的名稱。

修改canal.deployer-1.1.5\conf\example\instance.properties配置文件

  1. # url 
  2. canal.instance.master.address=127.0.0.1:3306 
  3. # username/password 
  4. canal.instance.dbUsername=root 
  5. canal.instance.dbPassword=root 
  6. # 監(jiān)聽的數(shù)據(jù)庫 
  7. canal.instance.defaultDatabaseName=test 
  8.  
  9. # 監(jiān)聽的表,可以指定,多個(gè)用逗號分割,這里正則是監(jiān)聽所有 
  10. canal.instance.filter.regex=.*\\..* 

3、設(shè)置RabbitMQ的配置

服務(wù)端默認(rèn)的傳輸方式是tcp,需要在配置文件中設(shè)置MQ的相關(guān)信息。

這里需要修改兩處配置文件,如下;

1)、canal.deployer-1.1.5\conf\canal.properties

這個(gè)配置文件主要是設(shè)置MQ相關(guān)的配置,比如URL,用戶名、密碼...

  1. # 傳輸方式:tcp, kafka, rocketMQ, rabbitMQ 
  2. canal.serverMode = rabbitMQ 
  3. ################################################## 
  4. #########           RabbitMQ         ############# 
  5. ################################################## 
  6. rabbitmq.host = 127.0.0.1 
  7. rabbitmq.virtual.host =/ 
  8. # exchange 
  9. rabbitmq.exchange =canal.exchange 
  10. # 用戶名、密碼 
  11. rabbitmq.username =guest 
  12. rabbitmq.password =guest 
  13. ## 是否持久化 
  14. rabbitmq.deliveryMode = 2 

2)、canal.deployer-1.1.5\conf\example\instance.properties

這個(gè)文件設(shè)置MQ的路由KEY,這樣才能路由到指定的隊(duì)列中,如下:

  1. canal.mq.topic=canal.routing.key 

4、RabbitMQ新建exchange和Queue

在RabbitMQ中需要新建一個(gè)canal.exchange(必須和配置中的相同)的exchange和一個(gè)名稱為 canal.queue(名稱隨意)的隊(duì)列。

其中綁定的路由KEY為:canal.routing.key(必須和配置中的相同),如下圖:

5、啟動服務(wù)端

點(diǎn)擊bin目錄下的腳本,windows直接雙擊startup.bat,啟動成功如下:

6、測試

在本地?cái)?shù)據(jù)庫test中的oauth_client_details插入一條數(shù)據(jù),如下:

  1. INSERT INTO `oauth_client_details` VALUES ('myjszl''res1''$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W''all''password,refresh_token,authorization_code,client_credentials,implicit''http://www.baidu.com'NULL, 1000, 1000, NULL'false'); 

此時(shí)查看MQ中的canal.queue已經(jīng)有了數(shù)據(jù),如下:

其實(shí)就是一串JSON數(shù)據(jù),這個(gè)JSON如下:

  1.  "data": [{ 
  2.   "client_id""myjszl"
  3.   "resource_ids""res1"
  4.   "client_secret""$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W"
  5.   "scope""all"
  6.   "authorized_grant_types""password,refresh_token,authorization_code,client_credentials,implicit"
  7.   "web_server_redirect_uri""http://www.baidu.com"
  8.   "authorities"null
  9.   "access_token_validity""1000"
  10.   "refresh_token_validity""1000"
  11.   "additional_information"null
  12.   "autoapprove""false" 
  13.  }], 
  14.  "database""test"
  15.  "es": 1640337532000, 
  16.  "id": 7, 
  17.  "isDdl"false
  18.  "mysqlType": { 
  19.   "client_id""varchar(48)"
  20.   "resource_ids""varchar(256)"
  21.   "client_secret""varchar(256)"
  22.   "scope""varchar(256)"
  23.   "authorized_grant_types""varchar(256)"
  24.   "web_server_redirect_uri""varchar(256)"
  25.   "authorities""varchar(256)"
  26.   "access_token_validity""int(11)"
  27.   "refresh_token_validity""int(11)"
  28.   "additional_information""varchar(4096)"
  29.   "autoapprove""varchar(256)" 
  30.  }, 
  31.  "old"null
  32.  "pkNames": ["client_id"], 
  33.  "sql"""
  34.  "sqlType": { 
  35.   "client_id": 12, 
  36.   "resource_ids": 12, 
  37.   "client_secret": 12, 
  38.   "scope": 12, 
  39.   "authorized_grant_types": 12, 
  40.   "web_server_redirect_uri": 12, 
  41.   "authorities": 12, 
  42.   "access_token_validity": 4, 
  43.   "refresh_token_validity": 4, 
  44.   "additional_information": 12, 
  45.   "autoapprove": 12 
  46.  }, 
  47.  "table""oauth_client_details"
  48.  "ts": 1640337532520, 
  49.  "type""INSERT" 

每個(gè)字段的意思已經(jīng)很清楚了,有表名稱、方法、參數(shù)、參數(shù)類型、參數(shù)值.....

客戶端要做的就是監(jiān)聽MQ獲取JSON數(shù)據(jù),然后將其解析出來,處理自己的業(yè)務(wù)邏輯。

Canal客戶端搭建

客戶端很簡單實(shí)現(xiàn),要做的就是消費(fèi)Canal服務(wù)端傳遞過來的消息,監(jiān)聽canal.queue這個(gè)隊(duì)列。

1、創(chuàng)建消息實(shí)體類

MQ傳遞過來的是JSON數(shù)據(jù),當(dāng)然要創(chuàng)建個(gè)實(shí)體類接收數(shù)據(jù),如下:

  1. /** 
  2.  * @author 公眾號 碼猿技術(shù)專欄 
  3.  * Canal消息接收實(shí)體類 
  4.  */ 
  5. @NoArgsConstructor 
  6. @Data 
  7. public class CanalMessage<T> { 
  8.     @JsonProperty("type"
  9.     private String type; 
  10.  
  11.     @JsonProperty("table"
  12.     private String table
  13.  
  14.     @JsonProperty("data"
  15.     private List<T> data; 
  16.  
  17.     @JsonProperty("database"
  18.     private String database
  19.  
  20.     @JsonProperty("es"
  21.     private Long es; 
  22.  
  23.     @JsonProperty("id"
  24.     private Integer id; 
  25.  
  26.     @JsonProperty("isDdl"
  27.     private Boolean isDdl; 
  28.  
  29.     @JsonProperty("old"
  30.     private List<T> old; 
  31.  
  32.     @JsonProperty("pkNames"
  33.     private List<String> pkNames; 
  34.  
  35.     @JsonProperty("sql"
  36.     private String sql; 
  37.  
  38.     @JsonProperty("ts"
  39.     private Long ts; 

2、MQ消息監(jiān)聽業(yè)務(wù)

接下來就是監(jiān)聽隊(duì)列,一旦有Canal服務(wù)端有數(shù)據(jù)推送能夠及時(shí)的消費(fèi)。

代碼很簡單,只是給出個(gè)接收的案例,具體的業(yè)務(wù)邏輯可以根據(jù)業(yè)務(wù)實(shí)現(xiàn),如下:

  1. import cn.hutool.json.JSONUtil; 
  2. import cn.myjszl.middle.ware.canal.mq.rabbit.model.CanalMessage; 
  3. import lombok.RequiredArgsConstructor; 
  4. import lombok.extern.slf4j.Slf4j; 
  5. import org.springframework.amqp.rabbit.annotation.Exchange; 
  6. import org.springframework.amqp.rabbit.annotation.Queue; 
  7. import org.springframework.amqp.rabbit.annotation.QueueBinding; 
  8. import org.springframework.amqp.rabbit.annotation.RabbitListener; 
  9. import org.springframework.stereotype.Component; 
  10.  
  11. /** 
  12.  * 監(jiān)聽MQ獲取Canal增量的數(shù)據(jù)消息 
  13.  */ 
  14. @Component 
  15. @Slf4j 
  16. @RequiredArgsConstructor 
  17. public class CanalRabbitMQListener { 
  18.  
  19.     @RabbitListener(bindings = { 
  20.             @QueueBinding( 
  21.                     value = @Queue(value = "canal.queue", durable = "true"), 
  22.                     exchange = @Exchange(value = "canal.exchange"), 
  23.                     key = "canal.routing.key" 
  24.             ) 
  25.     }) 
  26.     public void handleDataChange(String message) { 
  27.         //將message轉(zhuǎn)換為CanalMessage 
  28.         CanalMessage canalMessage = JSONUtil.toBean(message, CanalMessage.class); 
  29.         String tableName = canalMessage.getTable(); 
  30.         log.info("Canal 監(jiān)聽 {} 發(fā)生變化;明細(xì):{}", tableName, message); 
  31.         //TODO 業(yè)務(wù)邏輯自己完善............... 
  32.     } 

3、測試

下面向表中插入數(shù)據(jù),看下接收的消息是什么樣的,SQL如下:

  1. INSERT INTO `oauth_client_details` 
  2. VALUES 
  3.  ( 'myjszl''res1''$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W''all''password,refresh_token,authorization_code,client_credentials,implicit''http://www.baidu.com'NULL, 1000, 1000, NULL'false' ); 

客戶端轉(zhuǎn)換后的消息如下圖:

上圖可以看出所有的數(shù)據(jù)都已經(jīng)成功接收到,只需要根據(jù)數(shù)據(jù)完善自己的業(yè)務(wù)邏輯即可。

客戶端案例源碼已經(jīng)上傳GitHub,關(guān)注公眾號:碼猿技術(shù)專欄,回復(fù)關(guān)鍵詞:9530 獲取!

總結(jié)

 

數(shù)據(jù)增量同步的開源工具并不只有Canal一種,根據(jù)自己的業(yè)務(wù)需要選擇合適的組件。

 

責(zé)任編輯:武曉燕 來源: 碼猿技術(shù)專欄
相關(guān)推薦

2023-10-06 22:35:19

2022-12-27 08:56:28

2023-02-17 07:54:39

2023-07-27 08:29:09

2024-12-06 08:29:29

2023-08-31 08:32:52

2022-08-09 08:31:29

RocketMQ消息中間件

2011-10-28 09:20:36

dorado

2014-06-20 09:18:54

Dustjs中間件

2024-11-11 10:02:37

Spring搜索數(shù)據(jù)

2011-10-24 07:41:38

SOA中間件應(yīng)用服務(wù)器

2021-09-09 09:05:30

開源字節(jié)跳動CloudWeGo

2022-04-11 09:15:44

中間件開源

2024-07-03 11:33:02

2015-12-21 14:56:12

Go語言Http網(wǎng)絡(luò)協(xié)議

2021-02-11 08:21:02

中間件開發(fā)CRUD

2011-05-24 15:10:48

2025-04-29 08:36:28

SpringCanal數(shù)據(jù)庫

2013-05-17 17:01:32

紅帽OpenShifPaaS云

2019-12-13 10:32:56

開源消息中間件
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號