實時數(shù)倉 | 三分鐘搞定Flink Cdc
簡介
Flink CDC Connector 是ApacheFlink的一組數(shù)據(jù)源連接器,使用變化數(shù)據(jù)捕獲change data capture (CDC)從不同的數(shù)據(jù)庫中提取變更數(shù)據(jù)。Flink CDC連接器將Debezium集成為引擎來捕獲數(shù)據(jù)變更。因此,它可以充分利用Debezium的功能。
特點(diǎn)
- 支持讀取數(shù)據(jù)庫快照,并且能夠持續(xù)讀取數(shù)據(jù)庫的變更日志,即使發(fā)生故障,也支持exactly-once 的處理語義
- 對于DataStream API的CDC connector,用戶無需部署Debezium和Kafka,即可在單個作業(yè)中使用多個數(shù)據(jù)庫和表上的變更數(shù)據(jù)。
- 對于Table/SQL API 的CDC connector,用戶可以使用SQL DDL創(chuàng)建CDC數(shù)據(jù)源,來監(jiān)視單個表上的數(shù)據(jù)變更。
使用場景
- 數(shù)據(jù)庫之間的增量數(shù)據(jù)同步
- 審計日志
- 數(shù)據(jù)庫之上的實時物化視圖
- 基于CDC的維表join
- …
Flink提供的 table format
Flink提供了一系列可以用于table connector的table format,具體如下:
Formats | Supported Connectors |
---|---|
CSV | Apache Kafka, Filesystem |
JSON | Apache Kafka, Filesystem, Elasticsearch |
Apache Avro | Apache Kafka, Filesystem |
Debezium CDC | Apache Kafka |
Canal CDC | Apache Kafka |
Apache Parquet | Filesystem |
Apache ORC | Filesystem |
使用過程中的注意點(diǎn)
使用MySQL CDC的注意點(diǎn)
如果要使用MySQL CDC connector,對于程序而言,需要添加如下依賴:
- <dependency>
- <groupId>com.alibaba.ververica</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>1.0.0</version>
- </dependency>
如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-mysql-cdc-1.0.0.jar,將該jar包放在Flink安裝目錄的lib文件夾下即可。
使用canal-json的注意點(diǎn)
如果要使用Kafka的canal-json,對于程序而言,需要添加如下依賴:
- <!-- universal -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_2.11</artifactId>
- <version>1.11.0</version>
- </dependency>
如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-kafka_2.11-1.11.0.jar,將該jar包放在Flink安裝目錄的lib文件夾下即可。由于Flink1.11的安裝包 的lib目錄下并沒有提供該jar包,所以必須要手動添加依賴包,否則會報如下錯誤:
- [ERROR] Could not execute SQL statement. Reason:
- org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
- Available factory identifiers are:
- datagen
- mysql-cdc
使用changelog-json的注意點(diǎn)
如果要使用Kafka的changelog-json Format,對于程序而言,需要添加如下依賴:
- <dependency>
- <groupId>com.alibaba.ververica</groupId>
- <artifactId>flink-format-changelog-json</artifactId>
- <version>1.0.0</version>
- </dependency>
如果要使用Flink SQL Client,需要添加如下jar包:flink-format-changelog-json-1.0.0.jar,將該jar包放在Flink安裝目錄的lib文件夾下即可。
mysql-cdc的操作實踐
創(chuàng)建MySQL數(shù)據(jù)源表
在創(chuàng)建MySQL CDC表之前,需要先創(chuàng)建MySQL的數(shù)據(jù)表,如下:
- -- MySQL
- /*Table structure for table `order_info` */
- DROP TABLE IF EXISTS `order_info`;
- CREATE TABLE `order_info` (
- `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號',
- `consignee` varchar(100) DEFAULT NULL COMMENT '收貨人',
- `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人電話',
- `total_amount` decimal(10,2) DEFAULT NULL COMMENT '總金額',
- `order_status` varchar(20) DEFAULT NULL COMMENT '訂單狀態(tài),1表示下單,2表示支付',
- `user_id` bigint(20) DEFAULT NULL COMMENT '用戶id',
- `payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式',
- `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送貨地址',
- `order_comment` varchar(200) DEFAULT NULL COMMENT '訂單備注',
- `out_trade_no` varchar(50) DEFAULT NULL COMMENT '訂單交易編號(第三方支付用)',
- `trade_body` varchar(200) DEFAULT NULL COMMENT '訂單描述(第三方支付用)',
- `create_time` datetime DEFAULT NULL COMMENT '創(chuàng)建時間',
- `operate_time` datetime DEFAULT NULL COMMENT '操作時間',
- `expire_time` datetime DEFAULT NULL COMMENT '失效時間',
- `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流單編號',
- `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父訂單編號',
- `img_url` varchar(200) DEFAULT NULL COMMENT '圖片路徑',
- `province_id` int(20) DEFAULT NULL COMMENT '地區(qū)',
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='訂單表';
- -- ----------------------------
- -- Records of order_info
- -- ----------------------------
- INSERT INTO `order_info`
- VALUES (476, 'lAXjcL', '13408115089', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', '', '2020-06-18 02:21:38', NULL, NULL, NULL, NULL, NULL, 9);
- INSERT INTO `order_info`
- VALUES (477, 'QLiFDb', '13415139984', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', '', '2020-06-18 09:12:25', NULL, NULL, NULL, NULL, NULL, 3);
- INSERT INTO `order_info`
- VALUES (478, 'iwKjQD', '13320383859', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', '', '2020-06-18 15:56:34', NULL, NULL, NULL, NULL, NULL, 7);
- /*Table structure for table `order_detail` */
- CREATE TABLE `order_detail` (
- `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號',
- `order_id` bigint(20) DEFAULT NULL COMMENT '訂單編號',
- `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
- `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名稱(冗余)',
- `img_url` varchar(200) DEFAULT NULL COMMENT '圖片名稱(冗余)',
- `order_price` decimal(10,2) DEFAULT NULL COMMENT '購買價格(下單時sku價格)',
- `sku_num` varchar(200) DEFAULT NULL COMMENT '購買個數(shù)',
- `create_time` datetime DEFAULT NULL COMMENT '創(chuàng)建時間',
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='訂單明細(xì)表';
- -- ----------------------------
- -- Records of order_detail
- -- ----------------------------
- INSERT INTO `order_detail`
- VALUES (1329, 476, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯(lián)通電信4G手機(jī) 雙卡雙待', 'http://XLMByOyZDTJQYxphQHNTgYAFzJJCKTmCbzvEJIpz', 8900.00, '3', '2020-06-18 02:21:38');
- INSERT INTO `order_detail`
- VALUES (1330, 477, 9, '榮耀10 GT游戲加速 AIS手持夜景 6GB+64GB 幻影藍(lán)全網(wǎng)通 移動聯(lián)通電信', 'http://ixOCtlYmlxEEgUfPLiLdjMftzrleOEIBKSjrhMne', 2452.00, '4', '2020-06-18 09:12:25');
- INSERT INTO `order_detail`
- VALUES (1331, 478, 4, '小米Play 流光漸變AI雙攝 4GB+64GB 夢幻藍(lán) 全網(wǎng)通4G 雙卡雙待 小水滴全面屏拍照游戲智能手機(jī)', 'http://RqfEFnAOqnqRnNZLFRvBuwXxwNBtptYJCILDKQYv', 1442.00, '1', '2020-06-18 15:56:34');
- INSERT INTO `order_detail`
- VALUES (1332, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯(lián)通電信4G手機(jī) 雙卡雙待', 'http://IwhuCDlsiLenfKjPzbJrIoxswdfofKhJLMzlJAKV', 8900.00, '3', '2020-06-18 15:56:34');
- INSERT INTO `order_detail`
- VALUES (1333, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯(lián)通電信4G手機(jī) 雙卡雙待', 'http://bbfwTbAzTWapywODzOtDJMJUEqNTeRTUQuCDkqXP', 8900.00, '1', '2020-06-18 15:56:34');
Flink SQL Cli創(chuàng)建CDC數(shù)據(jù)源
啟動 Flink 集群,再啟動 SQL CLI,執(zhí)行下面命令:
- -- 創(chuàng)建訂單信息表
- CREATE TABLE order_info(
- id BIGINT,
- user_id BIGINT,
- create_time TIMESTAMP(0),
- operate_time TIMESTAMP(0),
- province_id INT,
- order_status STRING,
- total_amount DECIMAL(10, 5)
- ) WITH (
- 'connector' = 'mysql-cdc',
- 'hostname' = 'kms-1',
- 'port' = '3306',
- 'username' = 'root',
- 'password' = '123qwe',
- 'database-name' = 'mydw',
- 'table-name' = 'order_info'
- );
在Flink SQL Cli中查詢該表的數(shù)據(jù):result-mode: tableau,+表示數(shù)據(jù)的insert。
在SQL CLI中創(chuàng)建訂單詳情表:
- CREATE TABLE order_detail(
- id BIGINT,
- order_id BIGINT,
- sku_id BIGINT,
- sku_name STRING,
- sku_num BIGINT,
- order_price DECIMAL(10, 5),
- create_time TIMESTAMP(0)
- ) WITH (
- 'connector' = 'mysql-cdc',
- 'hostname' = 'kms-1',
- 'port' = '3306',
- 'username' = 'root',
- 'password' = '123qwe',
- 'database-name' = 'mydw',
- 'table-name' = 'order_detail'
- );
查詢結(jié)果如下:
執(zhí)行JOIN操作:
- SELECT
- od.id,
- oi.id order_id,
- oi.user_id,
- oi.province_id,
- od.sku_id,
- od.sku_name,
- od.sku_num,
- od.order_price,
- oi.create_time,
- oi.operate_time
- FROM
- (
- SELECT *
- FROM order_info
- WHERE
- order_status = '2'-- 已支付
- ) oi
- JOIN
- (
- SELECT *
- FROM order_detail
- ) od
- ON oi.id = od.order_id;
canal-json的操作實踐
關(guān)于cannal的使用方式,可以參考我的另一篇文章:基于Canal與Flink實現(xiàn)數(shù)據(jù)實時增量同步(一)。我已經(jīng)將下面的表通過canal同步到了kafka,具體格式為:
- {
- "data":[
- {
- "id":"1",
- "region_name":"華北"
- },
- {
- "id":"2",
- "region_name":"華東"
- },
- {
- "id":"3",
- "region_name":"東北"
- },
- {
- "id":"4",
- "region_name":"華中"
- },
- {
- "id":"5",
- "region_name":"華南"
- },
- {
- "id":"6",
- "region_name":"西南"
- },
- {
- "id":"7",
- "region_name":"西北"
- }
- ],
- "database":"mydw",
- "es":1597128441000,
- "id":102,
- "isDdl":false,
- "mysqlType":{
- "id":"varchar(20)",
- "region_name":"varchar(20)"
- },
- "old":null,
- "pkNames":null,
- "sql":"",
- "sqlType":{
- "id":12,
- "region_name":12
- },
- "table":"base_region",
- "ts":1597128441424,
- "type":"INSERT"
- }
在SQL CLI中創(chuàng)建該canal-json格式的表:
- CREATE TABLE region (
- id BIGINT,
- region_name STRING
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'mydw.base_region',
- 'properties.bootstrap.servers' = 'kms-3:9092',
- 'properties.group.id' = 'testGroup',
- 'format' = 'canal-json' ,
- 'scan.startup.mode' = 'earliest-offset'
- );
查詢結(jié)果如下:
changelog-json的操作實踐
創(chuàng)建MySQL數(shù)據(jù)源
參見上面的order_info
Flink SQL Cli創(chuàng)建changelog-json表
- CREATE TABLE order_gmv2kafka (
- day_str STRING,
- gmv DECIMAL(10, 5)
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'order_gmv_kafka',
- 'scan.startup.mode' = 'earliest-offset',
- 'properties.bootstrap.servers' = 'kms-3:9092',
- 'format' = 'changelog-json'
- );
- INSERT INTO order_gmv2kafka
- SELECT DATE_FORMAT(create_time, 'yyyy-MM-dd') as day_str, SUM(total_amount) as gmv
- FROM order_info
- WHERE order_status = '2' -- 訂單已支付
- GROUP BY DATE_FORMAT(create_time, 'yyyy-MM-dd');
查詢表看一下結(jié)果:
再查一下kafka的數(shù)據(jù):
- {"data":{"day_str":"2020-06-18","gmv":433},"op":"+I"}
當(dāng)將另外兩個訂單的狀態(tài)order_status更新為2時,總金額=443+772+88=1293再觀察數(shù)據(jù):
再看kafka中的數(shù)據(jù):