實時監(jiān)視同步數(shù)據(jù)庫變更,這個框架真是神器
我們數(shù)據(jù)庫中的數(shù)據(jù)一直在變化,有時候我們希望能監(jiān)聽數(shù)據(jù)庫數(shù)據(jù)的變化并根據(jù)變化做出一些反應,比如更新對應變化數(shù)據(jù)的緩存、增量同步到其它數(shù)據(jù)源、對數(shù)據(jù)進行檢測和審計等等。而這種技術就叫變更數(shù)據(jù)捕獲(Change Data Capture)。對于這種技術我們可能知道一個國內比較知名的框架Canal,非常好用!但是Canal有一個局限性就是只能用于Mysql的變更數(shù)據(jù)捕獲。今天來介紹另一種更加強大的分布式CDC框架Debezium。
Debezium
提起Debezium這個框架,相信大多數(shù)普通開發(fā)者都比較陌生,但是提及它所屬的公司大家一定不會陌生。
紅帽公司
沒錯就是開源界最成功的紅帽公司。Debezium是為捕獲數(shù)據(jù)更改的流式處理框架,開源免費。Debezium近乎實時地監(jiān)控數(shù)據(jù)庫行級別(row-level)的數(shù)據(jù)變更,并針對變更可以做出反應。而且只有已提交的變更才是可見的,所以不用擔心事務問題或者更改被回滾的問題。Debezium為所有的數(shù)據(jù)庫更改事件提供了一個統(tǒng)一的模型,所以不用擔心每種數(shù)據(jù)庫系統(tǒng)的復雜性。Debezium提供了對MongoDB、MySQL、PostgreSQL、SQL Server、Oracle、DB2等數(shù)據(jù)庫的支持。
另外借助于Kafka Connector可以開發(fā)出一個基于事件流的變更捕獲平臺,具有高容錯率和極強的擴展性。
Debezium Kafka 架構
如圖所示,部署了用于 MySQL 和 PostgresSQL 的 Debezium Kafka連接器以捕獲對這兩種類型數(shù)據(jù)庫的更改事件,然后將這些更改通過下游的Kafka Connector將記錄傳輸?shù)狡渌到y(tǒng)或者數(shù)據(jù)庫(例如 Elasticsearch、數(shù)據(jù)倉庫、分析系統(tǒng))或緩存。
另一種玩法就是將Debezium內置到應用程序中,來做一個類似消息總線的設施,將數(shù)據(jù)變更事件傳遞給訂閱的下游系統(tǒng)中。
Debezium內置服務器架構
Debezium對數(shù)據(jù)的完整性和可用性也是做了不少的工作。Debezium用持久化的、有副本備份的日志來記錄數(shù)據(jù)庫數(shù)據(jù)變化的歷史,因此,你的應用可以隨時停止再重啟,而不會錯過它停止運行時發(fā)生的事件,保證了所有的事件都能被正確地、完全地處理掉。
稍后我會演示一個Spring Boot集成Debezium的數(shù)據(jù)捕獲系統(tǒng)。
Spring Boot集成Debezium
理論介紹并不能讓你直觀感受到Debezium的能力,所以接下來我將使用嵌入式Debezium引擎來演示一下。
流程圖
如上圖所示,當我們變更MySQL數(shù)據(jù)庫中的某行數(shù)據(jù)時,通過Debezium實時監(jiān)聽到binlog日志的變化觸發(fā)捕獲變更事件,然后獲取到變更事件模型,并做出響應(消費)。接下來我們來搭建環(huán)境。
MySQL開啟binlog日志
為了方便這里使用MySQL的Docker容器,對應的腳本為:
- # 運行mysql容器
- docker run --name mysql-service -v d:/mysql/data:/var/lib/mysql -p 3306:3306 -e TZ=Asia/Shanghai -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7 --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci --default-time_zone="+8:00"
- # 設置binlog位置
- docker exec mysql-service bash -c "echo 'log-bin=/var/lib/mysql/mysql-bin' >> /etc/mysql/mysql.conf.d/mysqld.cnf"
- # 配置 mysql的server-id
- docker exec mysql-service bash -c "echo 'server-id=123454' >> /etc/mysql/mysql.conf.d/mysqld.cnf"
上面的腳本運行了一個用戶名為root、密碼為123456并且將數(shù)據(jù)掛載到本地路徑d:/mysql/data的MySQL容器,同時開啟了binlog日志,并設置server-id為123454,這些信息后面配置會用。
請注意如果不使用root用戶的話,需要保證用戶具有SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT四種權限。
Spring Boot集成嵌入式Debezium
Debezium依賴
Spring Boot的應用中加入下列依賴:
- <dependency>
- <groupId>io.debezium</groupId>
- <artifactId>debezium-api</artifactId>
- <version>${debezium.version}</version>
- </dependency>
- <dependency>
- <groupId>io.debezium</groupId>
- <artifactId>debezium-embedded</artifactId>
- <version>${debezium.version}</version>
- </dependency>
- <dependency>
- <groupId>io.debezium</groupId>
- <artifactId>debezium-connector-mysql</artifactId>
- <version>${debezium.version}</version>
- </dependency>
目前最新的版本號為1.5.2.Final。
聲明配置
然后聲明需要的配置:
- /**
- * Debezium 配置.
- *
- * @return configuration
- */
- @Bean
- io.debezium.config.Configuration debeziumConfig() {
- return io.debezium.config.Configuration.create()
- // 連接器的Java類名稱
- .with("connector.class", MySqlConnector.class.getName())
- // 偏移量持久化,用來容錯 默認值
- .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
- // 偏移量持久化文件路徑 默認/tmp/offsets.dat 如果路徑配置不正確可能導致無法存儲偏移量 可能會導致重復消費變更
- // 如果連接器重新啟動,它將使用最后記錄的偏移量來知道它應該恢復讀取源信息中的哪個位置。
- .with("offset.storage.file.filename", "C:/Users/n1/IdeaProjects/spring-boot-debezium/tmp/offsets.dat")
- // 捕獲偏移量的周期
- .with("offset.flush.interval.ms", "6000")
- // 連接器的唯一名稱
- .with("name", "mysql-connector")
- // 數(shù)據(jù)庫的hostname
- .with("database.hostname", "localhost")
- // 端口
- .with("database.port", "3306")
- // 用戶名
- .with("database.user", "root")
- // 密碼
- .with("database.password", "123456")
- // 包含的數(shù)據(jù)庫列表
- .with("database.include.list", "etl")
- // 是否包含數(shù)據(jù)庫表結構層面的變更,建議使用默認值true
- .with("include.schema.changes", "false")
- // mysql.cnf 配置的 server-id
- .with("database.server.id", "123454")
- // MySQL 服務器或集群的邏輯名稱
- .with("database.server.name", "customer-mysql-db-server")
- // 歷史變更記錄
- .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
- // 歷史變更記錄存儲位置
- .with("database.history.file.filename", "C:/Users/n1/IdeaProjects/spring-boot-debezium/tmp/dbhistory.dat")
- .build();
- }
配置分為兩部分:
- 一部分是Debezium Engine的配置屬性,參見Debezium Engine配置[1]。
- 一部分是Mysql Connector的配置屬性,參見Mysql Connector配置[2]。
實例化Debezium Engine
應用程序需要為運行的Mysql Connector啟動一個Debezium引擎,這個引擎會以異步線程的形式運行,它包裝了整個Mysql Connector連接器的生命周期。聲明一個引擎需要以下幾步:
聲明收到數(shù)據(jù)變更捕獲信息的格式,提供了JSON、Avro、Protobuf、Connect、CloudEvents等格式。
加載上面定義的配置。
聲明消費數(shù)據(jù)更改事件的函數(shù)方法。
聲明的偽代碼:
- DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
- .using(configuration.asProperties())
- .notifying(this::handlePayload)
- .build();
handlePayload方法為:
- private void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) {
- recordChangeEvents.forEach(r -> {
- SourceRecord sourceRecord = r.record();
- Struct sourceRecordChangeValue = (Struct) sourceRecord.value();
- if (sourceRecordChangeValue != null) {
- // 判斷操作的類型 過濾掉讀 只處理增刪改 這個其實可以在配置中設置
- Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
- if (operation != Envelope.Operation.READ) {
- String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;
- // 獲取增刪改對應的結構體數(shù)據(jù)
- Struct struct = (Struct) sourceRecordChangeValue.get(record);
- // 將變更的行封裝為Map
- Map<String, Object> payload = struct.schema().fields().stream()
- .map(Field::name)
- .filter(fieldName -> struct.get(fieldName) != null)
- .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
- .collect(toMap(Pair::getKey, Pair::getValue));
- // 這里簡單打印一下
- System.out.println("payload = " + payload);
- }
- }
- });
- }
引擎的啟動和關閉正好契合Spring Bean的生命周期:
- @Data
- public class DebeziumServerBootstrap implements InitializingBean, SmartLifecycle {
- private final Executor executor = Executors.newSingleThreadExecutor();
- private DebeziumEngine<?> debeziumEngine;
- @Override
- public void start() {
- executor.execute(debeziumEngine);
- }
- @SneakyThrows
- @Override
- public void stop() {
- debeziumEngine.close();
- }
- @Override
- public boolean isRunning() {
- return false;
- }
- @Override
- public void afterPropertiesSet() throws Exception {
- Assert.notNull(debeziumEngine, "debeziumEngine must not be null");
- }
- }
啟動
啟動該Spring Boot項目,你可以采用各種手段往數(shù)據(jù)庫增刪改數(shù)據(jù),觀察會有類似下面的打?。?/p>
- payload = {user_id=1123213, username=felord.cn, age=11 , gender=0, enabled=1}
說明Debezium監(jiān)聽到了數(shù)據(jù)庫的變更。你可以想想這種技術在哪些場景有用武之地。好了今天的分享就到這里,感謝大家的支持,我是:碼農(nóng)小胖哥。原創(chuàng)不易,請多多關注、點贊、轉發(fā)、再看。
參考資料
[1]Debezium Engine配置: https://debezium.io/documentation/reference/1.5/development/engine.html#engine-properties
[2]Mysql Connector配置: https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-connector-properties
本文轉載自微信公眾號「碼農(nóng)小胖哥」,可以通過以下二維碼關注。轉載本文請聯(lián)系碼農(nóng)小胖哥公眾號。