自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

實時數(shù)倉 | 三分鐘搞定Flink Cdc

大數(shù)據(jù)
Flink CDC Connector 是ApacheFlink的一組數(shù)據(jù)源連接器,使用變化數(shù)據(jù)捕獲change data capture (CDC)從不同的數(shù)據(jù)庫中提取變更數(shù)據(jù)。Flink CDC連接器將Debezium集成為引擎來捕獲數(shù)據(jù)變更。

簡介

Flink CDC Connector 是ApacheFlink的一組數(shù)據(jù)源連接器,使用變化數(shù)據(jù)捕獲change data capture (CDC)從不同的數(shù)據(jù)庫中提取變更數(shù)據(jù)。Flink CDC連接器將Debezium集成為引擎來捕獲數(shù)據(jù)變更。因此,它可以充分利用Debezium的功能。

特點(diǎn)

  • 支持讀取數(shù)據(jù)庫快照,并且能夠持續(xù)讀取數(shù)據(jù)庫的變更日志,即使發(fā)生故障,也支持exactly-once 的處理語義
  • 對于DataStream API的CDC connector,用戶無需部署Debezium和Kafka,即可在單個作業(yè)中使用多個數(shù)據(jù)庫和表上的變更數(shù)據(jù)。
  • 對于Table/SQL API 的CDC connector,用戶可以使用SQL DDL創(chuàng)建CDC數(shù)據(jù)源,來監(jiān)視單個表上的數(shù)據(jù)變更。

使用場景

  • 數(shù)據(jù)庫之間的增量數(shù)據(jù)同步
  • 審計日志
  • 數(shù)據(jù)庫之上的實時物化視圖
  • 基于CDC的維表join

Flink提供的 table format

Flink提供了一系列可以用于table connector的table format,具體如下:

Formats Supported Connectors
CSV Apache Kafka, Filesystem
JSON Apache Kafka, Filesystem, Elasticsearch
Apache Avro Apache Kafka, Filesystem
Debezium CDC Apache Kafka
Canal CDC Apache Kafka
Apache Parquet Filesystem
Apache ORC Filesystem

使用過程中的注意點(diǎn)

使用MySQL CDC的注意點(diǎn)

如果要使用MySQL CDC connector,對于程序而言,需要添加如下依賴:

  1. <dependency> 
  2.   <groupId>com.alibaba.ververica</groupId> 
  3.   <artifactId>flink-connector-mysql-cdc</artifactId> 
  4.   <version>1.0.0</version> 
  5. </dependency> 

如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-mysql-cdc-1.0.0.jar,將該jar包放在Flink安裝目錄的lib文件夾下即可。

使用canal-json的注意點(diǎn)

如果要使用Kafka的canal-json,對于程序而言,需要添加如下依賴:

  1. <!-- universal --> 
  2. <dependency> 
  3.     <groupId>org.apache.flink</groupId> 
  4.     <artifactId>flink-connector-kafka_2.11</artifactId> 
  5.     <version>1.11.0</version> 
  6. </dependency> 

如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-kafka_2.11-1.11.0.jar,將該jar包放在Flink安裝目錄的lib文件夾下即可。由于Flink1.11的安裝包 的lib目錄下并沒有提供該jar包,所以必須要手動添加依賴包,否則會報如下錯誤:

  1. [ERROR] Could not execute SQL statement. Reason: 
  2. org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. 
  3.  
  4. Available factory identifiers are: 
  5.  
  6. datagen 
  7. mysql-cdc 

使用changelog-json的注意點(diǎn)

如果要使用Kafka的changelog-json Format,對于程序而言,需要添加如下依賴:

  1. <dependency> 
  2.   <groupId>com.alibaba.ververica</groupId> 
  3.   <artifactId>flink-format-changelog-json</artifactId> 
  4.   <version>1.0.0</version> 
  5. </dependency> 

如果要使用Flink SQL Client,需要添加如下jar包:flink-format-changelog-json-1.0.0.jar,將該jar包放在Flink安裝目錄的lib文件夾下即可。

mysql-cdc的操作實踐

創(chuàng)建MySQL數(shù)據(jù)源表

在創(chuàng)建MySQL CDC表之前,需要先創(chuàng)建MySQL的數(shù)據(jù)表,如下:

  1. -- MySQL 
  2. /*Table structure for table `order_info` */ 
  3. DROP TABLE IF EXISTS `order_info`; 
  4. CREATE TABLE `order_info` ( 
  5.   `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號'
  6.   `consignee` varchar(100) DEFAULT NULL COMMENT '收貨人'
  7.   `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人電話'
  8.   `total_amount` decimal(10,2) DEFAULT NULL COMMENT '總金額'
  9.   `order_status` varchar(20) DEFAULT NULL COMMENT '訂單狀態(tài),1表示下單,2表示支付'
  10.   `user_id` bigint(20) DEFAULT NULL COMMENT '用戶id'
  11.   `payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式'
  12.   `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送貨地址'
  13.   `order_comment` varchar(200) DEFAULT NULL COMMENT '訂單備注'
  14.   `out_trade_no` varchar(50) DEFAULT NULL COMMENT '訂單交易編號(第三方支付用)'
  15.   `trade_body` varchar(200) DEFAULT NULL COMMENT '訂單描述(第三方支付用)'
  16.   `create_time` datetime DEFAULT NULL COMMENT '創(chuàng)建時間'
  17.   `operate_time` datetime DEFAULT NULL COMMENT '操作時間'
  18.   `expire_time` datetime DEFAULT NULL COMMENT '失效時間'
  19.   `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流單編號'
  20.   `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父訂單編號'
  21.   `img_url` varchar(200) DEFAULT NULL COMMENT '圖片路徑'
  22.   `province_id` int(20) DEFAULT NULL COMMENT '地區(qū)'
  23.   PRIMARY KEY (`id`) 
  24. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='訂單表'
  25. -- ---------------------------- 
  26. -- Records of order_info 
  27. -- ---------------------------- 
  28. INSERT INTO `order_info`  
  29. VALUES (476, 'lAXjcL''13408115089', 433.00, '2', 10, '2''OYyAdSdLxedceqovndCD''ihjAYsSjrgJMQVdFQnSy''8728720206''''2020-06-18 02:21:38'NULLNULLNULLNULLNULL, 9); 
  30. INSERT INTO `order_info` 
  31. VALUES (477, 'QLiFDb''13415139984', 772.00, '1', 90, '2''OizYrQbKuWvrvdfpkeSZ''wiBhhqhMndCCgXwmWVQq''1679381473''''2020-06-18 09:12:25'NULLNULLNULLNULLNULL, 3); 
  32. INSERT INTO `order_info` 
  33. VALUES (478, 'iwKjQD''13320383859', 88.00, '1', 107, '1''cbXLKtNHWOcWzJVBWdAs''njjsnknHxsxhuCCeNDDi''0937074290''''2020-06-18 15:56:34'NULLNULLNULLNULLNULL, 7); 
  34.  
  35. /*Table structure for table `order_detail` */ 
  36. CREATE TABLE `order_detail` ( 
  37.   `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號'
  38.   `order_id` bigint(20) DEFAULT NULL COMMENT '訂單編號'
  39.   `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id'
  40.   `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名稱(冗余)'
  41.   `img_url` varchar(200) DEFAULT NULL COMMENT '圖片名稱(冗余)'
  42.   `order_price` decimal(10,2) DEFAULT NULL COMMENT '購買價格(下單時sku價格)'
  43.   `sku_num` varchar(200) DEFAULT NULL COMMENT '購買個數(shù)'
  44.   `create_time` datetime DEFAULT NULL COMMENT '創(chuàng)建時間'
  45.   PRIMARY KEY (`id`) 
  46. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='訂單明細(xì)表'
  47.  
  48. -- ---------------------------- 
  49. -- Records of order_detail 
  50. -- ---------------------------- 
  51. INSERT INTO `order_detail`  
  52. VALUES (1329, 476, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯(lián)通電信4G手機(jī) 雙卡雙待''http://XLMByOyZDTJQYxphQHNTgYAFzJJCKTmCbzvEJIpz', 8900.00, '3''2020-06-18 02:21:38'); 
  53. INSERT INTO `order_detail`  
  54. VALUES (1330, 477, 9, '榮耀10 GT游戲加速 AIS手持夜景 6GB+64GB 幻影藍(lán)全網(wǎng)通 移動聯(lián)通電信''http://ixOCtlYmlxEEgUfPLiLdjMftzrleOEIBKSjrhMne', 2452.00, '4''2020-06-18 09:12:25'); 
  55. INSERT INTO `order_detail` 
  56. VALUES (1331, 478, 4, '小米Play 流光漸變AI雙攝 4GB+64GB 夢幻藍(lán) 全網(wǎng)通4G 雙卡雙待 小水滴全面屏拍照游戲智能手機(jī)''http://RqfEFnAOqnqRnNZLFRvBuwXxwNBtptYJCILDKQYv', 1442.00, '1''2020-06-18 15:56:34'); 
  57. INSERT INTO `order_detail`  
  58. VALUES (1332, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯(lián)通電信4G手機(jī) 雙卡雙待''http://IwhuCDlsiLenfKjPzbJrIoxswdfofKhJLMzlJAKV', 8900.00, '3''2020-06-18 15:56:34'); 
  59. INSERT INTO `order_detail`  
  60. VALUES (1333, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯(lián)通電信4G手機(jī) 雙卡雙待''http://bbfwTbAzTWapywODzOtDJMJUEqNTeRTUQuCDkqXP', 8900.00, '1''2020-06-18 15:56:34'); 

Flink SQL Cli創(chuàng)建CDC數(shù)據(jù)源

啟動 Flink 集群,再啟動 SQL CLI,執(zhí)行下面命令:

  1. -- 創(chuàng)建訂單信息表 
  2. CREATE TABLE order_info( 
  3.     id BIGINT
  4.     user_id BIGINT
  5.     create_time TIMESTAMP(0), 
  6.     operate_time TIMESTAMP(0), 
  7.     province_id INT
  8.     order_status STRING, 
  9.     total_amount DECIMAL(10, 5) 
  10.   ) WITH ( 
  11.     'connector' = 'mysql-cdc'
  12.     'hostname' = 'kms-1'
  13.     'port' = '3306'
  14.     'username' = 'root'
  15.     'password' = '123qwe'
  16.     'database-name' = 'mydw'
  17.     'table-name' = 'order_info' 
  18. ); 

在Flink SQL Cli中查詢該表的數(shù)據(jù):result-mode: tableau,+表示數(shù)據(jù)的insert。

在SQL CLI中創(chuàng)建訂單詳情表:

  1. CREATE TABLE order_detail( 
  2.     id BIGINT
  3.     order_id BIGINT
  4.     sku_id BIGINT
  5.     sku_name STRING, 
  6.     sku_num BIGINT
  7.     order_price DECIMAL(10, 5), 
  8.  create_time TIMESTAMP(0) 
  9.  ) WITH ( 
  10.     'connector' = 'mysql-cdc'
  11.     'hostname' = 'kms-1'
  12.     'port' = '3306'
  13.     'username' = 'root'
  14.     'password' = '123qwe'
  15.     'database-name' = 'mydw'
  16.     'table-name' = 'order_detail' 
  17. ); 

查詢結(jié)果如下:

執(zhí)行JOIN操作:

  1. SELECT 
  2.     od.id, 
  3.     oi.id order_id, 
  4.     oi.user_id, 
  5.     oi.province_id, 
  6.     od.sku_id, 
  7.     od.sku_name, 
  8.     od.sku_num, 
  9.     od.order_price, 
  10.     oi.create_time, 
  11.     oi.operate_time 
  12. FROM 
  13.    ( 
  14.     SELECT *  
  15.     FROM order_info 
  16.     WHERE  
  17.       order_status = '2'-- 已支付 
  18.    ) oi 
  19.    JOIN 
  20.   ( 
  21.     SELECT * 
  22.     FROM order_detail 
  23.   ) od  
  24.   ON oi.id = od.order_id; 

canal-json的操作實踐

關(guān)于cannal的使用方式,可以參考我的另一篇文章:基于Canal與Flink實現(xiàn)數(shù)據(jù)實時增量同步(一)。我已經(jīng)將下面的表通過canal同步到了kafka,具體格式為:

  1.     "data":[ 
  2.         { 
  3.             "id":"1"
  4.             "region_name":"華北" 
  5.         }, 
  6.         { 
  7.             "id":"2"
  8.             "region_name":"華東" 
  9.         }, 
  10.         { 
  11.             "id":"3"
  12.             "region_name":"東北" 
  13.         }, 
  14.         { 
  15.             "id":"4"
  16.             "region_name":"華中" 
  17.         }, 
  18.         { 
  19.             "id":"5"
  20.             "region_name":"華南" 
  21.         }, 
  22.         { 
  23.             "id":"6"
  24.             "region_name":"西南" 
  25.         }, 
  26.         { 
  27.             "id":"7"
  28.             "region_name":"西北" 
  29.         } 
  30.     ], 
  31.     "database":"mydw"
  32.     "es":1597128441000, 
  33.     "id":102, 
  34.     "isDdl":false
  35.     "mysqlType":{ 
  36.         "id":"varchar(20)"
  37.         "region_name":"varchar(20)" 
  38.     }, 
  39.     "old":null
  40.     "pkNames":null
  41.     "sql":""
  42.     "sqlType":{ 
  43.         "id":12, 
  44.         "region_name":12 
  45.     }, 
  46.     "table":"base_region"
  47.     "ts":1597128441424, 
  48.     "type":"INSERT" 

在SQL CLI中創(chuàng)建該canal-json格式的表:

  1. CREATE TABLE region ( 
  2.   id BIGINT
  3.   region_name STRING 
  4. WITH ( 
  5.  'connector' = 'kafka'
  6.  'topic' = 'mydw.base_region'
  7.  'properties.bootstrap.servers' = 'kms-3:9092'
  8.  'properties.group.id' = 'testGroup'
  9.  'format' = 'canal-json' , 
  10.  'scan.startup.mode' = 'earliest-offset'  
  11. ); 

查詢結(jié)果如下:

changelog-json的操作實踐

創(chuàng)建MySQL數(shù)據(jù)源

參見上面的order_info

Flink SQL Cli創(chuàng)建changelog-json表

  1. CREATE TABLE order_gmv2kafka ( 
  2.   day_str STRING, 
  3.   gmv DECIMAL(10, 5) 
  4. WITH ( 
  5.     'connector' = 'kafka'
  6.     'topic' = 'order_gmv_kafka'
  7.     'scan.startup.mode' = 'earliest-offset'
  8.     'properties.bootstrap.servers' = 'kms-3:9092'
  9.     'format' = 'changelog-json' 
  10. ); 
  11.  
  12. INSERT INTO order_gmv2kafka 
  13. SELECT DATE_FORMAT(create_time, 'yyyy-MM-dd'as day_str, SUM(total_amount) as gmv 
  14. FROM order_info 
  15. WHERE order_status = '2' -- 訂單已支付 
  16. GROUP BY DATE_FORMAT(create_time, 'yyyy-MM-dd');  

查詢表看一下結(jié)果:

再查一下kafka的數(shù)據(jù):

  1. {"data":{"day_str":"2020-06-18","gmv":433},"op":"+I"

當(dāng)將另外兩個訂單的狀態(tài)order_status更新為2時,總金額=443+772+88=1293再觀察數(shù)據(jù):

再看kafka中的數(shù)據(jù):

 

責(zé)任編輯:武曉燕 來源: 大數(shù)據(jù)技術(shù)與數(shù)倉
相關(guān)推薦

2009-11-05 16:04:19

Oracle用戶表

2020-11-20 08:36:59

Jpa數(shù)據(jù)代碼

2024-05-16 11:13:16

Helm工具release

2022-02-16 19:42:25

Spring配置開發(fā)

2009-11-09 12:55:43

WCF事務(wù)

2024-12-18 10:24:59

代理技術(shù)JDK動態(tài)代理

2022-02-17 09:24:11

TypeScript編程語言javaScrip

2021-04-20 13:59:37

云計算

2023-12-27 08:15:47

Java虛擬線程

2024-01-16 07:46:14

FutureTask接口用法

2024-08-30 08:50:00

2009-11-12 09:16:15

ADO.NET數(shù)據(jù)庫連

2013-06-28 14:30:26

棱鏡計劃棱鏡棱鏡監(jiān)控項目

2020-06-30 10:45:28

Web開發(fā)工具

2021-12-17 07:47:37

IT風(fēng)險框架

2024-10-15 09:18:30

2024-01-12 07:38:38

AQS原理JUC

2024-07-05 09:31:37

2021-02-03 14:31:53

人工智能人臉識別

2020-06-29 07:42:20

邊緣計算云計算技術(shù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號