自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

詳解 canal 同步 MySQL 增量數(shù)據(jù)到 ES

數(shù)據(jù)庫 MySQL
canal 是一個非常有趣的開源項目,很多公司使用 canal 構建數(shù)據(jù)傳輸服務( Data Transmission Service ,簡稱 DTS ) 。推薦大家閱讀這個開源項目,你可以從中學習到網(wǎng)絡編程、多線程模型、高性能隊列 Disruptor、 流程模型抽象等。

canal 是阿里知名的開源項目,主要用途是基于 MySQL 數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費。

這篇文章,我們手把手向同學們展示使用 canal 將 MySQL 增量數(shù)據(jù)同步到 ES 。

圖片

1 集群模式

圖片

圖中 server 對應一個 canal 運行實例 ,對應一個 JVM 。

server 中包含 1..n 個 instance , 我們可以將 instance 理解為配置任務。

instance 包含如下模塊 :

  • eventParser數(shù)據(jù)源接入,模擬 slave 協(xié)議和 master 進行交互,協(xié)議解析
  • eventSinkParser 和 Store 鏈接器,進行數(shù)據(jù)過濾,加工,分發(fā)的工作
  • eventStore數(shù)據(jù)存儲
  • metaManager增量訂閱 & 消費信息管理器

真實場景中,canal 高可用依賴 zookeeper ,筆者將客戶端模式可以簡單劃分為:TCP 模式 和 MQ 模式 。

實戰(zhàn)中我們經(jīng)常會使用 MQ 模式 。因為 MQ 模式的優(yōu)勢在于解耦 ,canal server 將數(shù)據(jù)變更信息發(fā)送到消息隊列 kafka 或者 RocketMQ ,消費者消費消息,順序執(zhí)行相關邏輯即可。

順序消費:

對于指定的一個 Topic ,所有消息根據(jù) Sharding Key 進行區(qū)塊分區(qū),同一個分區(qū)內的消息按照嚴格的先進先出(FIFO)原則進行發(fā)布和消費。同一分區(qū)內的消息保證順序,不同分區(qū)之間的消息順序不做要求。

圖片

2 MySQL配置

1、對于自建 MySQL , 需要先開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf  中配置如下

[mysqld]
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復

注意:針對阿里云 RDS for MySQL , 默認打開了 binlog , 并且賬號默認具有 binlog dump 權限 , 不需要任何權限或者 binlog 設置,可以直接跳過這一步。

2、授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grant 。

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

3、創(chuàng)建數(shù)據(jù)庫商品表 t_product 。

CREATE TABLE `t_product` (
 `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
 `name` VARCHAR ( 255 ) COLLATE utf8mb4_bin NOT NULL,
 `price` DECIMAL ( 10, 2 ) NOT NULL,
 `status` TINYINT ( 4 ) NOT NULL,
 `create_time` datetime NOT NULL,
 `update_time` datetime NOT NULL,
   PRIMARY KEY ( `id` ) 
) ENGINE = INNODB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_bin

3 Elasticsearch配置

使用 Kibana 創(chuàng)建商品索引 。

PUT /t_product
{
    "settings": {
        "number_of_shards": 2,
        "number_of_replicas": 1
    },
    "mappings": {
            "properties": {
               "id": {
                    "type":"keyword"
                },
                "name": {
                    "type":"text"
                },
                "price": {
                    "type":"double"
                },
                "status": {
                    "type":"integer"
                },
                "createTime": {
                    "type": "date",
                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                },
                "updateTime": {
                    "type": "date",
                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                }
        }
    }
}

執(zhí)行完成,如圖所示 :

圖片

4 RocketMQ 配置

創(chuàng)建主題:product-syn-topic ,canal 會將 Binlog 的變化數(shù)據(jù)發(fā)送到該主題。

圖片圖片

5 canal 配置

我們選取 canal 版本 1.1.6 ,進入 conf 目錄。

1、配置 canal.properties

#集群模式 zk地址
canal.zkServers = localhost:2181
#本質是MQ模式和tcp模式 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rocketMQ
#instance 列表
canal.destinations = product-syn
#conf root dir
canal.conf.dir = ../conf
#全局的spring配置方式的組件文件 生產(chǎn)環(huán)境,集群化部署
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

######  以下部分是默認值 展示出來 
# Canal的batch size, 默認50K, 由于kafka最大消息體限制請勿超過1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get數(shù)據(jù)的超時時間, 單位: 毫秒, 空為不限超時
canal.mq.canalGetTimeout = 100
# 是否為 flat json格式對象
canal.mq.flatMessage = true

2、instance 配置文件

在 conf 目錄下創(chuàng)建實例目錄 product-syn  , 在 product-syn 目錄創(chuàng)建配置文件 :instance.properties。

#  按需修改成自己的數(shù)據(jù)庫信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,數(shù)據(jù)庫的用戶名和密碼
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...

# table regex 
canal.instance.filter.regex=mytest.t_product

# mq config
canal.mq.topic=product-syn-topic
# 針對庫名或者表名發(fā)送動態(tài)topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partitinotallow=0
# hash partition config
#canal.mq.partitinotallow=3
#庫名.表名: 唯一主鍵,多個表之間用逗號分隔
#canal.mq.partitinotallow=mytest.person:id,mytest.role:id
#################################################

3、服務啟動

啟動兩個 canal 服務,我們從 zookeeper gui 中查看服務運行情況 。

圖片

修改一條 t_product 表記錄,可以從 RocketMQ 控制臺中觀測到新的消息。

圖片

6 消費者

1、產(chǎn)品索引操作服務

圖片

2、消費監(jiān)聽器

圖片

消費者邏輯重點有兩點:

  • 順序消費監(jiān)聽器
  • 將消息數(shù)據(jù)轉換成  JSON 字符串,從 data 節(jié)點中獲取表最新數(shù)據(jù)(批量操作可能是多條)。然后根據(jù)操作類型 UPDATE、 INSERT、DELETE 執(zhí)行產(chǎn)品索引操作服務的方法。

7 寫到最后

canal 是一個非常有趣的開源項目,很多公司使用 canal 構建數(shù)據(jù)傳輸服務( Data Transmission Service ,簡稱 DTS ) 。

推薦大家閱讀這個開源項目,你可以從中學習到網(wǎng)絡編程、多線程模型、高性能隊列 Disruptor、 流程模型抽象等。

這篇文章涉及到的代碼已收錄到下面的工程中,有興趣的同學可以一看。

https://github.com/makemyownlife/rocketmq4-learning

圖片 圖片

責任編輯:武曉燕 來源: 勇哥java實戰(zhàn)分享
相關推薦

2020-09-21 11:30:28

CanalMySQL數(shù)據(jù)庫

2021-12-27 09:59:57

SpringCanal 中間件

2023-06-08 08:43:36

2023-02-02 09:46:24

2023-01-09 09:02:26

2023-05-31 08:56:24

2023-07-27 08:29:09

2023-02-17 07:54:39

2022-12-27 08:56:28

2023-09-26 08:11:22

Spring配置MySQL

2023-10-06 22:35:19

2022-01-10 06:53:00

自動數(shù)據(jù)MySQL

2020-02-20 19:34:24

JAVAMySQL框架

2024-08-02 09:36:03

2024-07-03 08:02:19

MySQL數(shù)據(jù)搜索

2021-01-06 10:36:55

MySQL數(shù)據(jù)庫Hive

2023-05-30 08:38:25

MySQL數(shù)據(jù)庫日志

2010-05-19 10:22:07

2025-01-15 07:55:30

2018-11-08 10:13:28

Rsync服務器備份
點贊
收藏

51CTO技術棧公眾號