自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

SpringBoot與Debezium整合,實(shí)現(xiàn)供應(yīng)鏈數(shù)據(jù)庫實(shí)時(shí)同步系統(tǒng)

開發(fā) 架構(gòu)
Debezium支持多種關(guān)系型數(shù)據(jù)庫,包括MySQL、PostgreSQL、MongoDB等。這意味著我們可以靈活地選擇適合業(yè)務(wù)需求的數(shù)據(jù)庫,而無需擔(dān)心數(shù)據(jù)捕獲的問題。

Debezium專門用于捕獲數(shù)據(jù)庫的變化并將這些變化以實(shí)時(shí)流的方式推送到消息隊(duì)列系統(tǒng)Kafka,從而實(shí)現(xiàn)高效、可靠的實(shí)時(shí)數(shù)據(jù)同步和流處理。

我們?yōu)槭裁催x擇Debezium?

實(shí)時(shí)數(shù)據(jù)同步

Debezium是一個(gè)開源的分布式平臺,專門用于捕獲數(shù)據(jù)庫的變化,并將這些變化以實(shí)時(shí)流的方式推送到Kafka或其他消息隊(duì)列系統(tǒng)。這對于需要實(shí)時(shí)更新庫存信息的應(yīng)用場景尤為重要。

支持多種數(shù)據(jù)庫

Debezium支持多種關(guān)系型數(shù)據(jù)庫,包括MySQL、PostgreSQL、MongoDB等。這意味著我們可以靈活地選擇適合業(yè)務(wù)需求的數(shù)據(jù)庫,而無需擔(dān)心數(shù)據(jù)捕獲的問題。

高性能和低延遲

Debezium通過使用數(shù)據(jù)庫的日志文件(如MySQL的二進(jìn)制日志)來捕獲數(shù)據(jù)變化,這種方式不僅高效而且延遲極低。這確保了即使在高并發(fā)環(huán)境下,也能快速響應(yīng)數(shù)據(jù)庫的變化。

結(jié)構(gòu)化數(shù)據(jù)輸出

Debezium將捕獲到的數(shù)據(jù)變化以結(jié)構(gòu)化的JSON格式輸出,便于下游系統(tǒng)解析和處理。這種標(biāo)準(zhǔn)化的數(shù)據(jù)格式使得集成變得更加簡單和可靠。

容錯(cuò)性和可靠性

Debezium具有強(qiáng)大的容錯(cuò)機(jī)制,能夠在網(wǎng)絡(luò)故障或服務(wù)器重啟后繼續(xù)從斷點(diǎn)處恢復(fù)數(shù)據(jù)捕獲。這確保了數(shù)據(jù)的一致性和完整性。

易于配置和部署

Debezium可以通過簡單的REST API進(jìn)行配置和管理,這大大簡化了部署過程。此外,Debezium與Kafka生態(tài)系統(tǒng)緊密集成,使得整個(gè)數(shù)據(jù)管道易于搭建和維護(hù)。

數(shù)據(jù)一致性保證

Debezium確保數(shù)據(jù)捕獲過程中的一致性,避免了因數(shù)據(jù)不同步導(dǎo)致的業(yè)務(wù)問題。這對于庫存管理系統(tǒng)來說尤為重要,因?yàn)槿魏螏齑鏀?shù)據(jù)的不一致都可能導(dǎo)致嚴(yán)重的后果。

應(yīng)用場景

  • 博客平臺:當(dāng)新文章發(fā)布或現(xiàn)有文章更新時(shí),實(shí)時(shí)刷新前端頁面。
  • 論壇:實(shí)時(shí)顯示最新的帖子和評論,提升用戶體驗(yàn)。
  • 跨系統(tǒng)集成:將 CRM 系統(tǒng)、ERP 系統(tǒng)和其他業(yè)務(wù)系統(tǒng)的數(shù)據(jù)變化整合到一起,提供統(tǒng)一的數(shù)據(jù)視圖。
  • 增量加載:僅加載自上次同步以來發(fā)生變化的數(shù)據(jù),減少數(shù)據(jù)傳輸量和處理時(shí)間。
  • 庫存監(jiān)控:當(dāng)庫存低于閾值時(shí),立即觸發(fā)告警,提醒相關(guān)人員補(bǔ)充庫存。
  • 交易監(jiān)控:實(shí)時(shí)監(jiān)控金融交易數(shù)據(jù),檢測可疑活動并觸發(fā)安全措施。
  • 訂單管理系統(tǒng):當(dāng)訂單狀態(tài)發(fā)生變化時(shí),將變化事件發(fā)送給支付、物流等微服務(wù),觸發(fā)相應(yīng)的業(yè)務(wù)流程。
  • 用戶管理系統(tǒng):當(dāng)用戶信息更新時(shí),將變化事件通知權(quán)限管理、營銷等微服務(wù),保持?jǐn)?shù)據(jù)一致性。
  • 財(cái)務(wù)審計(jì):記錄所有財(cái)務(wù)交易的變化,供后續(xù)審計(jì)使用。
  • 大數(shù)據(jù)分析:將來自不同系統(tǒng)的數(shù)據(jù)變化收集到 Hadoop 或 Amazon S3 中,使用 Spark 等工具進(jìn)行復(fù)雜的數(shù)據(jù)分析。
  • 機(jī)器學(xué)習(xí)模型訓(xùn)練:實(shí)時(shí)收集和處理數(shù)據(jù),用于訓(xùn)練和更新機(jī)器學(xué)習(xí)模型。
  • 玩家行為分析:實(shí)時(shí)收集玩家的游戲行為數(shù)據(jù),分析玩家偏好和游戲平衡性。
  • 動態(tài)調(diào)整:根據(jù)玩家的行為數(shù)據(jù)動態(tài)調(diào)整游戲難度和獎(jiǎng)勵(lì)機(jī)制。

哪些公司使用Debezium?

Uber

  • 用途: Uber 使用 Debezium 捕獲訂單、司機(jī)位置等數(shù)據(jù)的變化,并將這些數(shù)據(jù)推送到 Kafka。
  • 描述: 這使得 Uber 能夠?qū)崟r(shí)監(jiān)控訂單狀態(tài)和司機(jī)位置,優(yōu)化調(diào)度算法并提高運(yùn)營效率。

LinkedIn

  • 用途: LinkedIn 使用 Debezium 來捕獲用戶活動和社交網(wǎng)絡(luò)數(shù)據(jù)的變化。
  • 描述: 通過 Debezium,LinkedIn 能夠?qū)崟r(shí)更新推薦系統(tǒng)和新聞推送,提供個(gè)性化的用戶體驗(yàn)。

Walmart

  • 用途: Walmart 使用 Debezium 實(shí)現(xiàn)其供應(yīng)鏈管理系統(tǒng)中的數(shù)據(jù)同步和實(shí)時(shí)分析。
  • 描述: 通過 Debezium,Walmart 能夠?qū)崟r(shí)監(jiān)控庫存水平和訂單狀態(tài),提高供應(yīng)鏈效率和客戶滿意度。

IBM

  • 用途: IBM 使用 Debezium 實(shí)現(xiàn)其混合云環(huán)境中的數(shù)據(jù)同步和流處理。
  • 描述: Debezium 幫助 IBM 在不同云平臺之間無縫傳輸數(shù)據(jù),確保業(yè)務(wù)連續(xù)性和數(shù)據(jù)一致性。

eBay

  • 用途: eBay 使用 Debezium 實(shí)現(xiàn)其電子商務(wù)平臺中的數(shù)據(jù)同步和實(shí)時(shí)分析。
  • 描述: 通過 Debezium,eBay 能夠?qū)崟r(shí)更新商品信息和庫存狀態(tài),提升購物體驗(yàn)和運(yùn)營效率。

PayPal

  • 用途: PayPal 使用 Debezium 捕獲支付交易數(shù)據(jù)的變化,并將其用于實(shí)時(shí)風(fēng)險(xiǎn)管理和合規(guī)性檢查。
  • 描述: 通過 Debezium,PayPal 能夠及時(shí)發(fā)現(xiàn)可疑交易行為,確保支付系統(tǒng)的安全性和可靠性。

Airbnb

  • 用途: Airbnb 使用 Debezium 實(shí)現(xiàn)其內(nèi)部系統(tǒng)的數(shù)據(jù)同步和實(shí)時(shí)監(jiān)控。
  • 描述: 通過 Debezium,Airbnb 能夠?qū)崟r(shí)更新房源信息和預(yù)訂狀態(tài),優(yōu)化住宿安排和客戶服務(wù)。

數(shù)據(jù)表

-- 創(chuàng)建 products 表,存儲產(chǎn)品信息。
CREATETABLE products (
    idINT AUTO_INCREMENT PRIMARY KEY,
    nameVARCHAR(255) NOTNULLCOMMENT'產(chǎn)品名稱',
    price DECIMAL(10, 2) NOTNULLCOMMENT'產(chǎn)品價(jià)格'
);

-- 創(chuàng)建 inventory 表,存儲產(chǎn)品的庫存信息。
CREATETABLE inventory (
    idINT AUTO_INCREMENT PRIMARY KEY,
    product_id INTCOMMENT'關(guān)聯(lián)的產(chǎn)品ID',
    quantity INTNOTNULLCOMMENT'庫存數(shù)量',
    last_updated TIMESTAMPDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMPCOMMENT'最后更新時(shí)間',
    FOREIGNKEY (product_id) REFERENCES products(id)
);

my.cnf文件配置

[mysqld]
log-bin=mysql-bin
binlog_format=ROW
server-id=1
expire_logs_days=10

確保啟用了二進(jìn)制日志, 記得記得要重啟MySQL服務(wù)?。?!

配置Debezium Connector

創(chuàng)建一個(gè)Debezium連接器配置文件 register-mysql.json

{
  "name": "inventory-connector",
"config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "password",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.include.list": "inventorydb",
    "table.include.list": "inventorydb.products,inventorydb.inventory",
    "include.schema.changes": "false"
  }
}

使用curl命令注冊Debezium連接器:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

代碼實(shí)操

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.5</version>
        <relativePath/><!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>inventory-sync</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>inventory-sync</name>
    <description>Debezium Demo</description>
    <properties>
        <java.version>11</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

application.properties

# Kafka服務(wù)器地址
spring.kafka.bootstrap-servers=localhost:9092
# 消費(fèi)者組ID
spring.kafka.consumer.group-id=inventory-consumer-group
# 自動偏移重置策略
spring.kafka.consumer.auto-offset-reset=earliest
# 鍵反序列化器
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 值反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.connect.json.JsonDeserializer

Debezium消息監(jiān)聽器

package com.example.inventorysync.listener;

import com.example.inventorysync.handler.DataChangeHandler;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class DebeziumEventListener {

    private static final Logger logger = LoggerFactory.getLogger(DebeziumEventListener.class);

    @Autowired
    private DataChangeHandler dataChangeHandler;

    /**
     * 監(jiān)聽Kafka主題中的消息
     *
     * @param record 接收到的Kafka消息記錄
     */
    @KafkaListener(topics = {"dbserver1.inventorydb.products", "dbserver1.inventorydb.inventory"})
    public void listen(ConsumerRecord<String, String> record) {
        logger.info("Received message: {}", record.value());
        dataChangeHandler.handleChange(record.value());
    }
}

數(shù)據(jù)處理器

package com.example.inventorysync.handler;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class DataChangeHandler {

    private static final Logger logger = LoggerFactory.getLogger(DataChangeHandler.class);
    private final ObjectMapper objectMapper = new ObjectMapper();

    /**
     * 處理數(shù)據(jù)變更事件
     *
     * @param jsonData 變更事件的JSON字符串
     */
    public void handleChange(String jsonData) {
        try {
            // 解析JSON數(shù)據(jù)
            JsonNode rootNode = objectMapper.readTree(jsonData);
            JsonNode payloadNode = rootNode.path("payload");

            if (!payloadNode.isMissingNode()) {
                String op = payloadNode.path("op").asText();
                switch (op) {
                    case"c":
                        handleCreate(payloadNode);
                        break;
                    case"u":
                        handleUpdate(payloadNode);
                        break;
                    case"d":
                        handleDelete(payloadNode);
                        break;
                    default:
                        logger.warn("Unsupported operation type: {}", op);
                }
            } else {
                logger.error("Payload node is missing in the JSON data");
            }
        } catch (Exception e) {
            logger.error("Error processing data change event", e);
        }
    }

    /**
     * 處理插入操作
     *
     * @param payloadNode 包含插入數(shù)據(jù)的JSON節(jié)點(diǎn)
     */
    private void handleCreate(JsonNode payloadNode) throws Exception {
        // 獲取after節(jié)點(diǎn)的數(shù)據(jù)
        JsonNode afterNode = payloadNode.path("after");
        logger.info("Handling CREATE event: {}", afterNode.toString());

        if (afterNode.has("id")) {
            int id = afterNode.path("id").asInt();
            String tableName = getTableName(afterNode);
            if ("products".equals(tableName)) {
                String name = afterNode.path("name").asText();
                double price = afterNode.path("price").asDouble();
                logger.info("Product created: ID={}, Name={}, Price={}", id, name, price);
            } elseif ("inventory".equals(tableName)) {
                int productId = afterNode.path("product_id").asInt();
                int quantity = afterNode.path("quantity").asInt();
                String lastUpdated = afterNode.path("last_updated").asText();
                logger.info("Inventory created: ID={}, Product ID={}, Quantity={}, Last Updated={}", id, productId, quantity, lastUpdated);
            }
        }
    }

    /**
     * 處理更新操作
     *
     * @param payloadNode 包含更新前后數(shù)據(jù)的JSON節(jié)點(diǎn)
     */
    private void handleUpdate(JsonNode payloadNode) throws Exception {
        // 獲取before和after節(jié)點(diǎn)的數(shù)據(jù)
        JsonNode beforeNode = payloadNode.path("before");
        JsonNode afterNode = payloadNode.path("after");
        logger.info("Handling UPDATE event: Before - {}, After - {}", beforeNode.toString(), afterNode.toString());

        if (afterNode.has("id")) {
            int id = afterNode.path("id").asInt();
            String tableName = getTableName(afterNode);
            if ("products".equals(tableName)) {
                String name = afterNode.path("name").asText();
                double price = afterNode.path("price").asDouble();
                logger.info("Product updated: ID={}, Name={}, Price={}", id, name, price);
            } elseif ("inventory".equals(tableName)) {
                int productId = afterNode.path("product_id").asInt();
                int quantity = afterNode.path("quantity").asInt();
                String lastUpdated = afterNode.path("last_updated").asText();
                logger.info("Inventory updated: ID={}, Product ID={}, Quantity={}, Last Updated={}", id, productId, quantity, lastUpdated);
            }
        }
    }

    /**
     * 處理刪除操作
     *
     * @param payloadNode 包含刪除前數(shù)據(jù)的JSON節(jié)點(diǎn)
     */
    private void handleDelete(JsonNode payloadNode) throws Exception {
        // 獲取before節(jié)點(diǎn)的數(shù)據(jù)
        JsonNode beforeNode = payloadNode.path("before");
        logger.info("Handling DELETE event: {}", beforeNode.toString());

        if (beforeNode.has("id")) {
            int id = beforeNode.path("id").asInt();
            String tableName = getTableName(beforeNode);
            if ("products".equals(tableName)) {
                logger.info("Product deleted: ID={}", id);
            } elseif ("inventory".equals(tableName)) {
                logger.info("Inventory deleted: ID={}", id);
            }
        }
    }

    /**
     * 獲取表名
     *
     * @param node 包含表名的JSON節(jié)點(diǎn)
     * @return 表名
     */
    private String getTableName(JsonNode node) {
        return node.path("table").asText();
    }
}

Application

package com.example.inventorysync;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class InventorySyncApplication {

    public static void main(String[] args) {
        SpringApplication.run(InventorySyncApplication.class, args);
    }
}

測試

插入數(shù)據(jù)

執(zhí)行SQL語句插入產(chǎn)品和庫存數(shù)據(jù):

-- 插入產(chǎn)品數(shù)據(jù)
INSERT INTO products (name, price) VALUES ('Laptop', 999.99);

-- 插入庫存數(shù)據(jù)
INSERT INTO inventory (product_id, quantity) VALUES (1, 100);

日志:

2025-03-31 21:01:00.000 INFO  [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"string","field":"name"},{"type":"decimal","field":"price"}],"optional":false,"name":"inventorydb.products.Value"},"optional":true},"payload":{"op":"c","before":null,"after":{"id":1,"name":"Laptop","price":999.99},"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307260000,"snapshot":"last","db":"inventorydb","sequence":null,"table":"products","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":1234,"row":0,"thread":2,"query":null},"ts_ms":1680307260000}}
2025-03-31 21:01:00.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling CREATE event: {"id":1,"name":"Laptop","price":999.99}
2025-03-31 21:01:00.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Product created: ID=1, Name=Laptop, Price=999.99

2025-03-31 21:01:01.000 INFO  [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"int32","field":"product_id"},{"type":"int32","field":"quantity"},{"type":"string","field":"last_updated"}],"optional":false,"name":"inventorydb.inventory.Value"},"optional":true},"payload":{"op":"c","before":null,"after":{"id":1,"product_id":1,"quantity":100,"last_updated":"2025-03-31T21:01:01.000Z"},"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307261000,"snapshot":"last","db":"inventorydb","sequence":null,"table":"inventory","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":5678,"row":0,"thread":2,"query":null},"ts_ms":1680307261000}}
2025-03-31 21:01:01.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling CREATE event: {"id":1,"product_id":1,"quantity":100,"last_updated":"2025-03-31T21:01:01.000Z"}
2025-03-31 21:01:01.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Inventory created: ID=1, Product ID=1, Quantity=100, Last Updated=2025-03-31T21:01:01.000Z

更新數(shù)據(jù)

執(zhí)行SQL語句更新產(chǎn)品和庫存數(shù)據(jù):

-- 更新產(chǎn)品數(shù)據(jù)
UPDATE products SET price = 899.99 WHERE id = 1;

-- 更新庫存數(shù)據(jù)
UPDATE inventory SET quantity = 90 WHERE id = 1;

日志:

2025-03-31 21:02:00.000 INFO  [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"string","field":"name"},{"type":"decimal","field":"price"}],"optional":false,"name":"inventorydb.products.Value"},"optional":true},"payload":{"op":"u","before":{"id":1,"name":"Laptop","price":999.99},"after":{"id":1,"name":"Laptop","price":899.99},"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307320000,"snapshot":"false","db":"inventorydb","sequence":null,"table":"products","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":9012,"row":0,"thread":2,"query":null},"ts_ms":1680307320000}}
2025-03-31 21:02:00.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling UPDATE event: Before - {"id":1,"name":"Laptop","price":999.99}, After - {"id":1,"name":"Laptop","price":899.99}
2025-03-31 21:02:00.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Product updated: ID=1, Name=Laptop, Price=899.99

2025-03-31 21:02:01.000 INFO  [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"int32","field":"product_id"},{"type":"int32","field":"quantity"},{"type":"string","field":"last_updated"}],"optional":false,"name":"inventorydb.inventory.Value"},"optional":true},"payload":{"op":"u","before":{"id":1,"product_id":1,"quantity":100,"last_updated":"2025-03-31T21:01:01.000Z"},"after":{"id":1,"product_id":1,"quantity":90,"last_updated":"2025-03-31T21:02:01.000Z"},"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307321000,"snapshot":"false","db":"inventorydb","sequence":null,"table":"inventory","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":13456,"row":0,"thread":2,"query":null},"ts_ms":1680307321000}}
2025-03-31 21:02:01.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling UPDATE event: Before - {"id":1,"product_id":1,"quantity":100,"last_updated":"2025-03-31T21:01:01.000Z"}, After - {"id":1,"product_id":1,"quantity":90,"last_updated":"2025-03-31T21:02:01.000Z"}
2025-03-31 21:02:01.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Inventory updated: ID=1, Product ID=1, Quantity=90, Last Updated=2025-03-31T21:02:01.000Z

刪除數(shù)據(jù)

執(zhí)行SQL語句刪除產(chǎn)品和庫存數(shù)據(jù):

-- 刪除庫存數(shù)據(jù)
DELETE FROM inventory WHERE id = 1;

-- 刪除產(chǎn)品數(shù)據(jù)
DELETE FROM products WHERE id = 1;

日志:

2025-03-31 21:03:00.000 INFO  [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"int32","field":"product_id"},{"type":"int32","field":"quantity"},{"type":"string","field":"last_updated"}],"optional":false,"name":"inventorydb.inventory.Value"},"optional":true},"payload":{"op":"d","before":{"id":1,"product_id":1,"quantity":90,"last_updated":"2025-03-31T21:02:01.000Z"},"after":null,"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307380000,"snapshot":"false","db":"inventorydb","sequence":null,"table":"inventory","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":17890,"row":0,"thread":2,"query":null},"ts_ms":1680307380000}}
2025-03-31 21:03:00.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling DELETE event: {"id":1,"product_id":1,"quantity":90,"last_updated":"2025-03-31T21:02:01.000Z"}
2025-03-31 21:03:00.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Inventory deleted: ID=1

2025-03-31 21:03:01.000 INFO  [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"string","field":"name"},{"type":"decimal","field":"price"}],"optional":false,"name":"inventorydb.products.Value"},"optional":true},"payload":{"op":"d","before":{"id":1,"name":"Laptop","price":899.99},"after":null,"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307381000,"snapshot":"false","db":"inventorydb","sequence":null,"table":"products","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":22345,"row":0,"thread":2,"query":null},"ts_ms":1680307381000}}
2025-03-31 21:03:01.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling DELETE event: {"id":1,"name":"Laptop","price":899.99}
2025-03-31 21:03:01.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Product delete


責(zé)任編輯:武曉燕 來源: Java知識日歷
相關(guān)推薦

2025-04-25 08:34:52

2023-02-23 07:52:20

2022-11-16 11:51:32

微軟人工智能

2023-10-31 15:40:12

2021-09-14 10:55:53

數(shù)據(jù)中心數(shù)據(jù)中心架構(gòu)供應(yīng)鏈

2025-04-18 08:54:30

2020-09-21 11:30:28

CanalMySQL數(shù)據(jù)庫

2017-03-07 10:46:05

供應(yīng)鏈大數(shù)據(jù)堆疊

2022-07-06 12:57:23

大數(shù)據(jù)供應(yīng)鏈

2025-03-11 09:28:34

2023-09-18 10:37:36

數(shù)字化供應(yīng)鏈數(shù)字化轉(zhuǎn)型

2022-04-26 10:47:15

智能供應(yīng)鏈供應(yīng)鏈

2022-11-14 10:32:56

供應(yīng)鏈技術(shù)

2017-01-23 11:18:16

戴爾

2014-07-10 09:51:54

供應(yīng)鏈

2020-12-07 13:53:01

區(qū)塊鏈疫苗

2020-06-23 14:12:23

大數(shù)據(jù)IT技術(shù)

2025-03-26 08:43:17

2018-01-24 10:19:08

大數(shù)據(jù)供應(yīng)鏈系統(tǒng)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號