自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

SpringBoot整合Flink CDC,實(shí)時(shí)追蹤數(shù)據(jù)變動(dòng),無縫同步至Redis

開發(fā) 前端
具體來說,F(xiàn)link CDC的應(yīng)用場景包括但不限于實(shí)時(shí)數(shù)據(jù)倉庫更新、實(shí)時(shí)數(shù)據(jù)同步和遷移、實(shí)時(shí)數(shù)據(jù)處理等。它還可以確保數(shù)據(jù)一致性,并在數(shù)據(jù)發(fā)生變更時(shí)能夠準(zhǔn)確地捕獲和處理。

環(huán)境:SpringBoot2.7.16 + Flink 1.19.0 + JDK21

1. 簡介

Flink CDC(Flink Change Data Capture)是基于數(shù)據(jù)庫的日志CDC技術(shù),實(shí)現(xiàn)了全增量一體化讀取的數(shù)據(jù)集成框架。它搭配Flink計(jì)算框架,能夠高效實(shí)現(xiàn)海量數(shù)據(jù)的實(shí)時(shí)集成。Flink CDC的核心功能在于實(shí)時(shí)地監(jiān)視數(shù)據(jù)庫或數(shù)據(jù)流中發(fā)生的數(shù)據(jù)變動(dòng),并將這些變動(dòng)抽取出來,以便進(jìn)一步的處理和分析。通過使用Flink CDC,用戶可以輕松地構(gòu)建實(shí)時(shí)數(shù)據(jù)管道,對數(shù)據(jù)變動(dòng)進(jìn)行實(shí)時(shí)響應(yīng)和處理,為實(shí)時(shí)分析、實(shí)時(shí)報(bào)表和實(shí)時(shí)決策等場景提供強(qiáng)大的支持。

具體來說,F(xiàn)link CDC的應(yīng)用場景包括但不限于實(shí)時(shí)數(shù)據(jù)倉庫更新、實(shí)時(shí)數(shù)據(jù)同步和遷移、實(shí)時(shí)數(shù)據(jù)處理等。它還可以確保數(shù)據(jù)一致性,并在數(shù)據(jù)發(fā)生變更時(shí)能夠準(zhǔn)確地捕獲和處理。此外,F(xiàn)link CDC支持與多種數(shù)據(jù)源進(jìn)行集成,如MySQL、PostgreSQL、Oracle等,并提供了相應(yīng)的連接器,方便數(shù)據(jù)的捕獲和處理。

接下來將詳細(xì)的介紹關(guān)于MySQL CDC的使用。MySQL CDC 連接器允許從 MySQL 數(shù)據(jù)庫讀取快照數(shù)據(jù)和增量數(shù)據(jù)。

支持的數(shù)據(jù)庫

Connector

Database

Driver

mysql-cdc

  • MySQL:5.6,5.7,8.0.x
  • RDS MYSQL: 5.6,5.7,8.0.x
  • PolarDB MySQL: 5.6,5.7,8.0.x
  • Aurora MySQL 5.6,5.7,8.0.x
  • MariaDB: 10.x
  • PolarDB X: 2.0.1

JDBC Driver 8.0.27

2. 實(shí)戰(zhàn)案例

2.1 MySQL開啟Binlog

在MySQL的配置文件中(如Linux的/etc/my.cnf或Windows的\my.ini),需要在[mysqld]部分設(shè)置相關(guān)參數(shù)以開啟binlog功能,如下:

[mysqld]
server-id=1
# 格式,行級格式
binlog-format=Row
# binlog 日志文件的前綴
log-bin=mysql-bin
# 指定哪些數(shù)據(jù)庫需要記錄二進(jìn)制日志
binlog_do_db=testjpa

除了開啟binlog功能外,F(xiàn)link CDC還需要其他配置和權(quán)限來確保能夠正常連接到MySQL并讀取數(shù)據(jù)。例如,需要授予Flink CDC連接MySQL的用戶必要的權(quán)限,包括SELECT、REPLICATION SLAVE、REPLICATION CLIENT、SHOW VIEW等。這些權(quán)限是Flink CDC讀取數(shù)據(jù)和元數(shù)據(jù)所必需的。

查看是否開啟了binlog功能

mysql> SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+

以上就對mysql相關(guān)的配置完成了。

2.2 依賴管理

<properties>
  <flink.version>1.19.0</flink.version>
</properties>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-base</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-sql-connector-mysql-cdc</artifactId>
  <version>3.0.1</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-runtime</artifactId>
  <version>${flink.version}</version>
</dependency>

2.3 代碼實(shí)現(xiàn)

@Component
public class MonitorMySQLCDC implements InitializingBean {


  // 該隊(duì)列專門用來臨時(shí)保存變化的數(shù)據(jù)(實(shí)際生產(chǎn)環(huán)境,你應(yīng)該使用MQ相關(guān)的產(chǎn)品)
  public static final LinkedBlockingQueue<Map<String, Object>> queue = new LinkedBlockingQueue<>() ;
  
  private final StringRedisTemplate stringRedisTemplate ;
  // 保存到redis中key的前綴
  private final String PREFIX = "users:" ;
  // 數(shù)據(jù)發(fā)生變化后的sink處理
  private final CustomSink customSink ;
  public MonitorMySQLCDC(CustomSink customSink, StringRedisTemplate stringRedisTemplate) {
    this.customSink = customSink ;
    this.stringRedisTemplate = stringRedisTemplate ;
  }
  
  @Override
  public void afterPropertiesSet() throws Exception {
    // 啟動(dòng)異步線程,實(shí)時(shí)處理隊(duì)列中的數(shù)據(jù)
    new Thread(() -> {
      while(true) {
        try {
          Map<String, Object> result = queue.take();
          this.doAction(result) ;
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }).start() ;
    Properties jdbcProperties = new Properties() ;
    jdbcProperties.setProperty("useSSL", "false") ;
    MySqlSource<String> source = MySqlSource.<String>builder()
        .hostname("127.0.0.1")
        .port(3306)
        // 可配置多個(gè)數(shù)據(jù)庫
        .databaseList("testjpa")
        // 可配置多個(gè)表
        .tableList("testjpa.users")
        .username("root")
        .password("123123")
        .jdbcProperties(jdbcProperties)
        // 包括schema的改變
        .includeSchemaChanges(true)
        // 反序列化設(shè)置
        // .deserializer(new StringDebeziumDeserializationSchema())
        .deserializer(new JsonDebeziumDeserializationSchema(true))
        // 啟動(dòng)模式;關(guān)于啟動(dòng)模式下面詳細(xì)介紹
        .startupOptions(StartupOptions.initial())
        .build() ;
    // 環(huán)境配置
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;
    // 設(shè)置 6s 的 checkpoint 間隔
    env.enableCheckpointing(6000) ;
    // 設(shè)置 source 節(jié)點(diǎn)的并行度為 4
    env.setParallelism(4) ;
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL")
        // 添加Sink
        .addSink(this.customSink) ;
    env.execute() ;
  }
  
  @SuppressWarnings("unchecked")
  private void doAction(Map<String, Object> result) throws Exception {
    Map<String, Object> payload = (Map<String, Object>) result.get("payload") ;
    String op = (String) payload.get("op") ;
    switch (op) {
      // 更新和插入操作
      case "u", "c" -> {
        Map<String, Object> after = (Map<String, Object>) payload.get("after") ;
        String id = after.get("id").toString();
        System.out.printf("操作:%s, ID: %s%n", op, id) ;
        stringRedisTemplate.opsForValue().set(PREFIX + id, new ObjectMapper().writeValueAsString(after)) ;
      }
      // 刪除操作
      case "d" -> {
        Map<String, Object> after = (Map<String, Object>) payload.get("before") ;
        String id = after.get("id").toString();
        stringRedisTemplate.delete(PREFIX + id) ;
      } 
    }
  }
  
}

啟動(dòng)模式:

  • initial (默認(rèn)):在第一次啟動(dòng)時(shí)對受監(jiān)視的數(shù)據(jù)庫表執(zhí)行初始快照,并繼續(xù)讀取最新的 binlog。
  • earliest-offset:跳過快照階段,從可讀取的最早 binlog 位點(diǎn)開始讀取
  • latest-offset:首次啟動(dòng)時(shí),從不對受監(jiān)視的數(shù)據(jù)庫表執(zhí)行快照, 連接器僅從 binlog 的結(jié)尾處開始讀取,這意味著連接器只能讀取在連接器啟動(dòng)之后的數(shù)據(jù)更改。
  • specific-offset:跳過快照階段,從指定的 binlog 位點(diǎn)開始讀取。位點(diǎn)可通過 binlog 文件名和位置指定,或者在 GTID 在集群上啟用時(shí)通過 GTID 集合指定。
  • timestamp:跳過快照階段,從指定的時(shí)間戳開始讀取 binlog 事件。

數(shù)據(jù)處理Sink

@Component
public class CustomSink extends RichSinkFunction<String> {


  private ObjectMapper mapper = new ObjectMapper();


  @Override
  public void invoke(String value, Context context) throws Exception {
    System.out.printf("數(shù)據(jù)發(fā)生變化: %s%n", value);
    TypeReference<Map<String, Object>> valueType = new TypeReference<Map<String, Object>>() {
    };
    Map<String, Object> result = mapper.readValue(value, valueType);
    Map<String, Object> payload = (Map<String, Object>) result.get("payload");
    String op = (String) payload.get("op") ;
    // 不對讀操作處理
    if (!"r".equals(op)) {
      MonitorMySQLCDC.queue.put(result);
    }
  }
}

以上就是實(shí)現(xiàn)通過FlinkCDC實(shí)時(shí)通過數(shù)據(jù)到Redis的所有代碼。

2.4 Web監(jiān)控頁面

引入flink web依賴

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-runtime-web</artifactId>
  <version>${flink.version}</version>
</dependency>

環(huán)境配置

Configuration config = new Configuration() ;
config.set(RestOptions.PORT, 9090) ;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config) ;

web監(jiān)聽9090端口。

圖片圖片

通過web控制臺(tái)你可以管理查看到更多的信息。

責(zé)任編輯:武曉燕 來源: Spring全家桶實(shí)戰(zhàn)案例源碼
相關(guān)推薦

2022-07-20 23:15:11

Flink數(shù)據(jù)集CDC

2024-10-18 11:39:55

MySQL數(shù)據(jù)檢索

2023-05-03 08:58:46

數(shù)據(jù)庫開源

2021-06-04 07:24:14

Flink CDC數(shù)據(jù)

2025-04-01 08:38:41

2013-05-13 13:49:43

大數(shù)據(jù)

2021-08-17 06:48:43

SpringbootKafkaStream

2021-07-07 23:25:18

RedisFlinkSQL

2022-01-05 18:18:01

Flink 數(shù)倉連接器

2024-02-01 12:32:35

MySQL數(shù)據(jù)鎖數(shù)據(jù)庫

2023-09-08 10:13:30

開發(fā)技術(shù)

2020-01-10 15:42:13

SpringBootRedis數(shù)據(jù)庫

2020-06-29 07:43:12

緩存RedisSpringBoot

2025-04-25 08:34:52

2023-05-31 08:56:24

2015-06-09 22:25:06

SAP大道至簡

2022-05-23 08:23:24

鏈路追蹤SleuthSpring

2022-05-25 08:23:32

ZipKinTwitter開源項(xiàng)目

2024-12-26 17:16:59

2022-06-09 14:19:46

順豐數(shù)據(jù)集成Flink
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號