SpringBoot與Debezium整合,實(shí)現(xiàn)供應(yīng)鏈數(shù)據(jù)庫實(shí)時(shí)同步系統(tǒng)
Debezium專門用于捕獲數(shù)據(jù)庫的變化并將這些變化以實(shí)時(shí)流的方式推送到消息隊(duì)列系統(tǒng)Kafka,從而實(shí)現(xiàn)高效、可靠的實(shí)時(shí)數(shù)據(jù)同步和流處理。
我們?yōu)槭裁催x擇Debezium?
實(shí)時(shí)數(shù)據(jù)同步
Debezium是一個(gè)開源的分布式平臺,專門用于捕獲數(shù)據(jù)庫的變化,并將這些變化以實(shí)時(shí)流的方式推送到Kafka或其他消息隊(duì)列系統(tǒng)。這對于需要實(shí)時(shí)更新庫存信息的應(yīng)用場景尤為重要。
支持多種數(shù)據(jù)庫
Debezium支持多種關(guān)系型數(shù)據(jù)庫,包括MySQL、PostgreSQL、MongoDB等。這意味著我們可以靈活地選擇適合業(yè)務(wù)需求的數(shù)據(jù)庫,而無需擔(dān)心數(shù)據(jù)捕獲的問題。
高性能和低延遲
Debezium通過使用數(shù)據(jù)庫的日志文件(如MySQL的二進(jìn)制日志)來捕獲數(shù)據(jù)變化,這種方式不僅高效而且延遲極低。這確保了即使在高并發(fā)環(huán)境下,也能快速響應(yīng)數(shù)據(jù)庫的變化。
結(jié)構(gòu)化數(shù)據(jù)輸出
Debezium將捕獲到的數(shù)據(jù)變化以結(jié)構(gòu)化的JSON格式輸出,便于下游系統(tǒng)解析和處理。這種標(biāo)準(zhǔn)化的數(shù)據(jù)格式使得集成變得更加簡單和可靠。
容錯(cuò)性和可靠性
Debezium具有強(qiáng)大的容錯(cuò)機(jī)制,能夠在網(wǎng)絡(luò)故障或服務(wù)器重啟后繼續(xù)從斷點(diǎn)處恢復(fù)數(shù)據(jù)捕獲。這確保了數(shù)據(jù)的一致性和完整性。
易于配置和部署
Debezium可以通過簡單的REST API進(jìn)行配置和管理,這大大簡化了部署過程。此外,Debezium與Kafka生態(tài)系統(tǒng)緊密集成,使得整個(gè)數(shù)據(jù)管道易于搭建和維護(hù)。
數(shù)據(jù)一致性保證
Debezium確保數(shù)據(jù)捕獲過程中的一致性,避免了因數(shù)據(jù)不同步導(dǎo)致的業(yè)務(wù)問題。這對于庫存管理系統(tǒng)來說尤為重要,因?yàn)槿魏螏齑鏀?shù)據(jù)的不一致都可能導(dǎo)致嚴(yán)重的后果。
應(yīng)用場景
- 博客平臺:當(dāng)新文章發(fā)布或現(xiàn)有文章更新時(shí),實(shí)時(shí)刷新前端頁面。
- 論壇:實(shí)時(shí)顯示最新的帖子和評論,提升用戶體驗(yàn)。
- 跨系統(tǒng)集成:將 CRM 系統(tǒng)、ERP 系統(tǒng)和其他業(yè)務(wù)系統(tǒng)的數(shù)據(jù)變化整合到一起,提供統(tǒng)一的數(shù)據(jù)視圖。
- 增量加載:僅加載自上次同步以來發(fā)生變化的數(shù)據(jù),減少數(shù)據(jù)傳輸量和處理時(shí)間。
- 庫存監(jiān)控:當(dāng)庫存低于閾值時(shí),立即觸發(fā)告警,提醒相關(guān)人員補(bǔ)充庫存。
- 交易監(jiān)控:實(shí)時(shí)監(jiān)控金融交易數(shù)據(jù),檢測可疑活動并觸發(fā)安全措施。
- 訂單管理系統(tǒng):當(dāng)訂單狀態(tài)發(fā)生變化時(shí),將變化事件發(fā)送給支付、物流等微服務(wù),觸發(fā)相應(yīng)的業(yè)務(wù)流程。
- 用戶管理系統(tǒng):當(dāng)用戶信息更新時(shí),將變化事件通知權(quán)限管理、營銷等微服務(wù),保持?jǐn)?shù)據(jù)一致性。
- 財(cái)務(wù)審計(jì):記錄所有財(cái)務(wù)交易的變化,供后續(xù)審計(jì)使用。
- 大數(shù)據(jù)分析:將來自不同系統(tǒng)的數(shù)據(jù)變化收集到 Hadoop 或 Amazon S3 中,使用 Spark 等工具進(jìn)行復(fù)雜的數(shù)據(jù)分析。
- 機(jī)器學(xué)習(xí)模型訓(xùn)練:實(shí)時(shí)收集和處理數(shù)據(jù),用于訓(xùn)練和更新機(jī)器學(xué)習(xí)模型。
- 玩家行為分析:實(shí)時(shí)收集玩家的游戲行為數(shù)據(jù),分析玩家偏好和游戲平衡性。
- 動態(tài)調(diào)整:根據(jù)玩家的行為數(shù)據(jù)動態(tài)調(diào)整游戲難度和獎(jiǎng)勵(lì)機(jī)制。
哪些公司使用Debezium?
Uber
- 用途: Uber 使用 Debezium 捕獲訂單、司機(jī)位置等數(shù)據(jù)的變化,并將這些數(shù)據(jù)推送到 Kafka。
- 描述: 這使得 Uber 能夠?qū)崟r(shí)監(jiān)控訂單狀態(tài)和司機(jī)位置,優(yōu)化調(diào)度算法并提高運(yùn)營效率。
- 用途: LinkedIn 使用 Debezium 來捕獲用戶活動和社交網(wǎng)絡(luò)數(shù)據(jù)的變化。
- 描述: 通過 Debezium,LinkedIn 能夠?qū)崟r(shí)更新推薦系統(tǒng)和新聞推送,提供個(gè)性化的用戶體驗(yàn)。
Walmart
- 用途: Walmart 使用 Debezium 實(shí)現(xiàn)其供應(yīng)鏈管理系統(tǒng)中的數(shù)據(jù)同步和實(shí)時(shí)分析。
- 描述: 通過 Debezium,Walmart 能夠?qū)崟r(shí)監(jiān)控庫存水平和訂單狀態(tài),提高供應(yīng)鏈效率和客戶滿意度。
IBM
- 用途: IBM 使用 Debezium 實(shí)現(xiàn)其混合云環(huán)境中的數(shù)據(jù)同步和流處理。
- 描述: Debezium 幫助 IBM 在不同云平臺之間無縫傳輸數(shù)據(jù),確保業(yè)務(wù)連續(xù)性和數(shù)據(jù)一致性。
eBay
- 用途: eBay 使用 Debezium 實(shí)現(xiàn)其電子商務(wù)平臺中的數(shù)據(jù)同步和實(shí)時(shí)分析。
- 描述: 通過 Debezium,eBay 能夠?qū)崟r(shí)更新商品信息和庫存狀態(tài),提升購物體驗(yàn)和運(yùn)營效率。
PayPal
- 用途: PayPal 使用 Debezium 捕獲支付交易數(shù)據(jù)的變化,并將其用于實(shí)時(shí)風(fēng)險(xiǎn)管理和合規(guī)性檢查。
- 描述: 通過 Debezium,PayPal 能夠及時(shí)發(fā)現(xiàn)可疑交易行為,確保支付系統(tǒng)的安全性和可靠性。
Airbnb
- 用途: Airbnb 使用 Debezium 實(shí)現(xiàn)其內(nèi)部系統(tǒng)的數(shù)據(jù)同步和實(shí)時(shí)監(jiān)控。
- 描述: 通過 Debezium,Airbnb 能夠?qū)崟r(shí)更新房源信息和預(yù)訂狀態(tài),優(yōu)化住宿安排和客戶服務(wù)。
數(shù)據(jù)表
-- 創(chuàng)建 products 表,存儲產(chǎn)品信息。
CREATETABLE products (
idINT AUTO_INCREMENT PRIMARY KEY,
nameVARCHAR(255) NOTNULLCOMMENT'產(chǎn)品名稱',
price DECIMAL(10, 2) NOTNULLCOMMENT'產(chǎn)品價(jià)格'
);
-- 創(chuàng)建 inventory 表,存儲產(chǎn)品的庫存信息。
CREATETABLE inventory (
idINT AUTO_INCREMENT PRIMARY KEY,
product_id INTCOMMENT'關(guān)聯(lián)的產(chǎn)品ID',
quantity INTNOTNULLCOMMENT'庫存數(shù)量',
last_updated TIMESTAMPDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMPCOMMENT'最后更新時(shí)間',
FOREIGNKEY (product_id) REFERENCES products(id)
);
my.cnf文件配置
[mysqld]
log-bin=mysql-bin
binlog_format=ROW
server-id=1
expire_logs_days=10
“
確保啟用了二進(jìn)制日志, 記得記得要重啟MySQL服務(wù)?。?!
配置Debezium Connector
創(chuàng)建一個(gè)Debezium連接器配置文件 register-mysql.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "root",
"database.password": "password",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventorydb",
"table.include.list": "inventorydb.products,inventorydb.inventory",
"include.schema.changes": "false"
}
}
使用curl命令注冊Debezium連接器:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
代碼實(shí)操
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.5</version>
<relativePath/><!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>inventory-sync</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>inventory-sync</name>
<description>Debezium Demo</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties
# Kafka服務(wù)器地址
spring.kafka.bootstrap-servers=localhost:9092
# 消費(fèi)者組ID
spring.kafka.consumer.group-id=inventory-consumer-group
# 自動偏移重置策略
spring.kafka.consumer.auto-offset-reset=earliest
# 鍵反序列化器
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 值反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.connect.json.JsonDeserializer
Debezium消息監(jiān)聽器
package com.example.inventorysync.listener;
import com.example.inventorysync.handler.DataChangeHandler;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class DebeziumEventListener {
private static final Logger logger = LoggerFactory.getLogger(DebeziumEventListener.class);
@Autowired
private DataChangeHandler dataChangeHandler;
/**
* 監(jiān)聽Kafka主題中的消息
*
* @param record 接收到的Kafka消息記錄
*/
@KafkaListener(topics = {"dbserver1.inventorydb.products", "dbserver1.inventorydb.inventory"})
public void listen(ConsumerRecord<String, String> record) {
logger.info("Received message: {}", record.value());
dataChangeHandler.handleChange(record.value());
}
}
數(shù)據(jù)處理器
package com.example.inventorysync.handler;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service
public class DataChangeHandler {
private static final Logger logger = LoggerFactory.getLogger(DataChangeHandler.class);
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* 處理數(shù)據(jù)變更事件
*
* @param jsonData 變更事件的JSON字符串
*/
public void handleChange(String jsonData) {
try {
// 解析JSON數(shù)據(jù)
JsonNode rootNode = objectMapper.readTree(jsonData);
JsonNode payloadNode = rootNode.path("payload");
if (!payloadNode.isMissingNode()) {
String op = payloadNode.path("op").asText();
switch (op) {
case"c":
handleCreate(payloadNode);
break;
case"u":
handleUpdate(payloadNode);
break;
case"d":
handleDelete(payloadNode);
break;
default:
logger.warn("Unsupported operation type: {}", op);
}
} else {
logger.error("Payload node is missing in the JSON data");
}
} catch (Exception e) {
logger.error("Error processing data change event", e);
}
}
/**
* 處理插入操作
*
* @param payloadNode 包含插入數(shù)據(jù)的JSON節(jié)點(diǎn)
*/
private void handleCreate(JsonNode payloadNode) throws Exception {
// 獲取after節(jié)點(diǎn)的數(shù)據(jù)
JsonNode afterNode = payloadNode.path("after");
logger.info("Handling CREATE event: {}", afterNode.toString());
if (afterNode.has("id")) {
int id = afterNode.path("id").asInt();
String tableName = getTableName(afterNode);
if ("products".equals(tableName)) {
String name = afterNode.path("name").asText();
double price = afterNode.path("price").asDouble();
logger.info("Product created: ID={}, Name={}, Price={}", id, name, price);
} elseif ("inventory".equals(tableName)) {
int productId = afterNode.path("product_id").asInt();
int quantity = afterNode.path("quantity").asInt();
String lastUpdated = afterNode.path("last_updated").asText();
logger.info("Inventory created: ID={}, Product ID={}, Quantity={}, Last Updated={}", id, productId, quantity, lastUpdated);
}
}
}
/**
* 處理更新操作
*
* @param payloadNode 包含更新前后數(shù)據(jù)的JSON節(jié)點(diǎn)
*/
private void handleUpdate(JsonNode payloadNode) throws Exception {
// 獲取before和after節(jié)點(diǎn)的數(shù)據(jù)
JsonNode beforeNode = payloadNode.path("before");
JsonNode afterNode = payloadNode.path("after");
logger.info("Handling UPDATE event: Before - {}, After - {}", beforeNode.toString(), afterNode.toString());
if (afterNode.has("id")) {
int id = afterNode.path("id").asInt();
String tableName = getTableName(afterNode);
if ("products".equals(tableName)) {
String name = afterNode.path("name").asText();
double price = afterNode.path("price").asDouble();
logger.info("Product updated: ID={}, Name={}, Price={}", id, name, price);
} elseif ("inventory".equals(tableName)) {
int productId = afterNode.path("product_id").asInt();
int quantity = afterNode.path("quantity").asInt();
String lastUpdated = afterNode.path("last_updated").asText();
logger.info("Inventory updated: ID={}, Product ID={}, Quantity={}, Last Updated={}", id, productId, quantity, lastUpdated);
}
}
}
/**
* 處理刪除操作
*
* @param payloadNode 包含刪除前數(shù)據(jù)的JSON節(jié)點(diǎn)
*/
private void handleDelete(JsonNode payloadNode) throws Exception {
// 獲取before節(jié)點(diǎn)的數(shù)據(jù)
JsonNode beforeNode = payloadNode.path("before");
logger.info("Handling DELETE event: {}", beforeNode.toString());
if (beforeNode.has("id")) {
int id = beforeNode.path("id").asInt();
String tableName = getTableName(beforeNode);
if ("products".equals(tableName)) {
logger.info("Product deleted: ID={}", id);
} elseif ("inventory".equals(tableName)) {
logger.info("Inventory deleted: ID={}", id);
}
}
}
/**
* 獲取表名
*
* @param node 包含表名的JSON節(jié)點(diǎn)
* @return 表名
*/
private String getTableName(JsonNode node) {
return node.path("table").asText();
}
}
Application
package com.example.inventorysync;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class InventorySyncApplication {
public static void main(String[] args) {
SpringApplication.run(InventorySyncApplication.class, args);
}
}
測試
插入數(shù)據(jù)
執(zhí)行SQL語句插入產(chǎn)品和庫存數(shù)據(jù):
-- 插入產(chǎn)品數(shù)據(jù)
INSERT INTO products (name, price) VALUES ('Laptop', 999.99);
-- 插入庫存數(shù)據(jù)
INSERT INTO inventory (product_id, quantity) VALUES (1, 100);
日志:
2025-03-31 21:01:00.000 INFO [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"string","field":"name"},{"type":"decimal","field":"price"}],"optional":false,"name":"inventorydb.products.Value"},"optional":true},"payload":{"op":"c","before":null,"after":{"id":1,"name":"Laptop","price":999.99},"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307260000,"snapshot":"last","db":"inventorydb","sequence":null,"table":"products","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":1234,"row":0,"thread":2,"query":null},"ts_ms":1680307260000}}
2025-03-31 21:01:00.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling CREATE event: {"id":1,"name":"Laptop","price":999.99}
2025-03-31 21:01:00.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Product created: ID=1, Name=Laptop, Price=999.99
2025-03-31 21:01:01.000 INFO [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"int32","field":"product_id"},{"type":"int32","field":"quantity"},{"type":"string","field":"last_updated"}],"optional":false,"name":"inventorydb.inventory.Value"},"optional":true},"payload":{"op":"c","before":null,"after":{"id":1,"product_id":1,"quantity":100,"last_updated":"2025-03-31T21:01:01.000Z"},"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307261000,"snapshot":"last","db":"inventorydb","sequence":null,"table":"inventory","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":5678,"row":0,"thread":2,"query":null},"ts_ms":1680307261000}}
2025-03-31 21:01:01.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling CREATE event: {"id":1,"product_id":1,"quantity":100,"last_updated":"2025-03-31T21:01:01.000Z"}
2025-03-31 21:01:01.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Inventory created: ID=1, Product ID=1, Quantity=100, Last Updated=2025-03-31T21:01:01.000Z
更新數(shù)據(jù)
執(zhí)行SQL語句更新產(chǎn)品和庫存數(shù)據(jù):
-- 更新產(chǎn)品數(shù)據(jù)
UPDATE products SET price = 899.99 WHERE id = 1;
-- 更新庫存數(shù)據(jù)
UPDATE inventory SET quantity = 90 WHERE id = 1;
日志:
2025-03-31 21:02:00.000 INFO [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"string","field":"name"},{"type":"decimal","field":"price"}],"optional":false,"name":"inventorydb.products.Value"},"optional":true},"payload":{"op":"u","before":{"id":1,"name":"Laptop","price":999.99},"after":{"id":1,"name":"Laptop","price":899.99},"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307320000,"snapshot":"false","db":"inventorydb","sequence":null,"table":"products","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":9012,"row":0,"thread":2,"query":null},"ts_ms":1680307320000}}
2025-03-31 21:02:00.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling UPDATE event: Before - {"id":1,"name":"Laptop","price":999.99}, After - {"id":1,"name":"Laptop","price":899.99}
2025-03-31 21:02:00.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Product updated: ID=1, Name=Laptop, Price=899.99
2025-03-31 21:02:01.000 INFO [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"int32","field":"product_id"},{"type":"int32","field":"quantity"},{"type":"string","field":"last_updated"}],"optional":false,"name":"inventorydb.inventory.Value"},"optional":true},"payload":{"op":"u","before":{"id":1,"product_id":1,"quantity":100,"last_updated":"2025-03-31T21:01:01.000Z"},"after":{"id":1,"product_id":1,"quantity":90,"last_updated":"2025-03-31T21:02:01.000Z"},"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307321000,"snapshot":"false","db":"inventorydb","sequence":null,"table":"inventory","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":13456,"row":0,"thread":2,"query":null},"ts_ms":1680307321000}}
2025-03-31 21:02:01.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling UPDATE event: Before - {"id":1,"product_id":1,"quantity":100,"last_updated":"2025-03-31T21:01:01.000Z"}, After - {"id":1,"product_id":1,"quantity":90,"last_updated":"2025-03-31T21:02:01.000Z"}
2025-03-31 21:02:01.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Inventory updated: ID=1, Product ID=1, Quantity=90, Last Updated=2025-03-31T21:02:01.000Z
刪除數(shù)據(jù)
執(zhí)行SQL語句刪除產(chǎn)品和庫存數(shù)據(jù):
-- 刪除庫存數(shù)據(jù)
DELETE FROM inventory WHERE id = 1;
-- 刪除產(chǎn)品數(shù)據(jù)
DELETE FROM products WHERE id = 1;
日志:
2025-03-31 21:03:00.000 INFO [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"int32","field":"product_id"},{"type":"int32","field":"quantity"},{"type":"string","field":"last_updated"}],"optional":false,"name":"inventorydb.inventory.Value"},"optional":true},"payload":{"op":"d","before":{"id":1,"product_id":1,"quantity":90,"last_updated":"2025-03-31T21:02:01.000Z"},"after":null,"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307380000,"snapshot":"false","db":"inventorydb","sequence":null,"table":"inventory","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":17890,"row":0,"thread":2,"query":null},"ts_ms":1680307380000}}
2025-03-31 21:03:00.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling DELETE event: {"id":1,"product_id":1,"quantity":90,"last_updated":"2025-03-31T21:02:01.000Z"}
2025-03-31 21:03:00.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Inventory deleted: ID=1
2025-03-31 21:03:01.000 INFO [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"string","field":"name"},{"type":"decimal","field":"price"}],"optional":false,"name":"inventorydb.products.Value"},"optional":true},"payload":{"op":"d","before":{"id":1,"name":"Laptop","price":899.99},"after":null,"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307381000,"snapshot":"false","db":"inventorydb","sequence":null,"table":"products","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":22345,"row":0,"thread":2,"query":null},"ts_ms":1680307381000}}
2025-03-31 21:03:01.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling DELETE event: {"id":1,"name":"Laptop","price":899.99}
2025-03-31 21:03:01.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Product delete