自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

SpringBoot與Canal整合,實現(xiàn)金融交易系統(tǒng)的實時數(shù)據(jù)同步功能

數(shù)據(jù)庫 其他數(shù)據(jù)庫
Canal是阿里巴巴開源的一個用于高效抓取 MySQL 數(shù)據(jù)庫增量變更日志(binlog)并進行處理的中間件。它可以將 MySQL 的 binlog 解析為結(jié)構(gòu)化的 JSON 格式,并提供多種方式將這些數(shù)據(jù)推送到下游系統(tǒng)。

Canal是阿里巴巴開源的一個用于高效抓取 MySQL 數(shù)據(jù)庫增量變更日志(binlog)并進行處理的中間件。它可以將 MySQL 的 binlog 解析為結(jié)構(gòu)化的 JSON 格式,并提供多種方式將這些數(shù)據(jù)推送到下游系統(tǒng)。

我們?yōu)槭裁催x擇Canal?

  • 實時性: Canal基于MySQL的binlog機制,能夠在毫秒級內(nèi)完成數(shù)據(jù)同步。
  • 批量獲取數(shù)據(jù):Canal支持批量獲取數(shù)據(jù)庫變更數(shù)據(jù),減少網(wǎng)絡(luò)開銷和處理時間。
  • 多線程處理:Canal可以配置多線程來處理不同的數(shù)據(jù)變更事件,提高整體吞吐量。
  • 斷點續(xù)傳:Canal支持從斷點繼續(xù)消費數(shù)據(jù),確保數(shù)據(jù)不會丟失。
  • 持久化存儲:Canal可以將消費進度持久化到ZooKeeper中,保證在故障恢復(fù)后能夠繼續(xù)正常工作。
  • 容錯機制:Canal內(nèi)置了多種容錯機制,如重試策略和自動恢復(fù)功能,提高了系統(tǒng)的可靠性。
  • 標準協(xié)議:Canal使用標準化的binlog協(xié)議,易于與其他系統(tǒng)集成。
  • 過濾機制:Canal支持靈活的過濾規(guī)則,可以選擇性地訂閱特定的數(shù)據(jù)庫和表。
  • 動態(tài)配置:Canal支持動態(tài)配置,可以根據(jù)實際需求調(diào)整監(jiān)控范圍和處理邏輯。
  • 自定義處理:Canal允許開發(fā)者編寫自定義的處理器,實現(xiàn)復(fù)雜的數(shù)據(jù)處理邏輯。
  • 精確同步:Canal能夠精確地捕獲和同步數(shù)據(jù)庫的每一行變更,確保數(shù)據(jù)的一致性。
  • 事務(wù)支持:Canal能夠處理復(fù)雜的事務(wù)場景,確保事務(wù)的原子性和完整性。
  • 沖突解決:Canal提供了多種沖突解決策略,避免數(shù)據(jù)同步過程中的沖突問題。

哪些公司使用了Canal?

  • 阿里巴巴 :Canal 被用于多個業(yè)務(wù)部門的數(shù)據(jù)同步需求。
  • 騰訊 :在社交網(wǎng)絡(luò)、游戲等業(yè)務(wù)中使用 Canal 進行數(shù)據(jù)同步。
  • 美團:在餐飲外賣、酒店預(yù)訂等多個業(yè)務(wù)中使用 Canal 進行數(shù)據(jù)同步。
  • 小米 :在智能家居、手機銷售等多種業(yè)務(wù)中使用 Canal 進行數(shù)據(jù)同步。
  • 滴滴出行:在網(wǎng)約車、共享單車等多種業(yè)務(wù)中使用 Canal 進行數(shù)據(jù)同步。
  • 網(wǎng)易:在游戲、音樂等多種業(yè)務(wù)中使用 Canal 進行數(shù)據(jù)同步。

代碼實操

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>canal-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>canal-demo</name>
    <description>Demo project for Spring Boot with Canal</description>
    <properties>
        <java.version>11</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

application.properties

# 數(shù)據(jù)源配置
spring.datasource.url=jdbc:mysql://localhost:3306/your_database?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

# Canal配置
canal.server.ip=127.0.0.1
canal.port=11111
canal.destination=example

交易實體類

package com.example.canaldemo.model;

import lombok.Data;

@Data
public class Transaction {
    private Long id;          // 主鍵ID
    private String transactionId; // 交易ID
    private Double amount;      // 交易金額
    private String status;      // 交易狀態(tài)
}

create table

CREATE TABLE transaction (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    transaction_id VARCHAR(50) NOT NULL,
    amount DECIMAL(18, 2) NOT NULL,
    status VARCHAR(20) NOT NULL
);

交易Mapper接口

package com.example.canaldemo.mapper;

import com.example.canaldemo.model.Transaction;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;

/**
 * 交易Mapper接口
 */
@Mapper
public interface TransactionMapper {

    /**
     * 插入一條新的交易記錄
     *
     * @param transaction 交易對象
     */
    @Insert("INSERT INTO transaction(transaction_id, amount, status) VALUES(#{transaction.transactionId}, #{transaction.amount}, #{transaction.status})")
    void insert(@Param("transaction") Transaction transaction);

    /**
     * 更新一條交易記錄
     *
     * @param transaction 交易對象
     */
    @Update("UPDATE transaction SET amount=#{transaction.amount}, status=#{transaction.status} WHERE transaction_id=#{transaction.transactionId}")
    void update(@Param("transaction") Transaction transaction);
}

Canal監(jiān)聽器類

package com.example.canaldemo.listener;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.example.canaldemo.mapper.TransactionMapper;
import com.example.canaldemo.model.Transaction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.List;

/**
 * Canal監(jiān)聽器類,用于監(jiān)聽數(shù)據(jù)庫的變化并進行相應(yīng)的處理
 */
@Component
public class CanalListener {

    private final String destination = "example"; // 這個值需要與Canal配置中的destination一致
    private final String serverIp = "127.0.0.1";
    private final int port = 11111;

    @Autowired
    private TransactionMapper transactionMapper;

    /**
     * 在Bean初始化后啟動Canal監(jiān)聽器
     */
    @PostConstruct
    public void start() {
        // 創(chuàng)建Canal連接器
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(serverIp, port), destination, "", "");
        try {
            // 連接到Canal服務(wù)器
            connector.connect();
            // 訂閱所有數(shù)據(jù)庫的所有表
            connector.subscribe(".*\\..*");
            // 回滾到上次中斷的位置
            connector.rollback();

            while (true) {
                // 獲取一批消息,最多100條
                Message message = connector.getWithoutAck(100);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    // 如果沒有消息,則等待1秒
                    Thread.sleep(1000);
                } else {
                    // 處理消息
                    processMessage(message.getEntries());
                }
                // 提交確認
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 斷開連接
            connector.disconnect();
        }
    }

    /**
     * 處理Canal發(fā)送過來的消息
     *
     * @param entryList 消息列表
     */
    private void processMessage(List<CanalEntry.Entry> entryList) {
        for (CanalEntry.Entry entry : entryList) {
            // 忽略事務(wù)開始和結(jié)束事件
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage;
            try {
                // 解析RowChange數(shù)據(jù)
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            CanalEntry.EventType eventType = rowChage.getEventType();
            // 打印日志
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            // 處理每一行數(shù)據(jù)變化
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                Transaction transaction = convertToTransaction(rowData.getAfterColumnsList());
                if (eventType == CanalEntry.EventType.DELETE) {
                    // 處理刪除事件(如果需要)
                } elseif (eventType == CanalEntry.EventType.INSERT) {
                    // 插入新記錄
                    transactionMapper.insert(transaction);
                } else {
                    // 更新現(xiàn)有記錄
                    transactionMapper.update(transaction);
                }
            }
        }
    }

    /**
     * 將Canal列數(shù)據(jù)轉(zhuǎn)換為Transaction對象
     *
     * @param columns 列數(shù)據(jù)列表
     * @return 轉(zhuǎn)換后的Transaction對象
     */
    private Transaction convertToTransaction(List<CanalEntry.Column> columns) {
        Transaction transaction = new Transaction();
        for (CanalEntry.Column column : columns) {
            switch (column.getName()) {
                case"id":
                    transaction.setId(Long.parseLong(column.getValue()));
                    break;
                case"transaction_id":
                    transaction.setTransactionId(column.getValue());
                    break;
                case"amount":
                    transaction.setAmount(Double.parseDouble(column.getValue()));
                    break;
                case"status":
                    transaction.setStatus(column.getValue());
                    break;
            }
        }
        return transaction;
    }
}

Application

package com.example.canaldemo;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@MapperScan("com.example.canaldemo.mapper")
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

測試

插入一條交易記錄

curl -X POST http://localhost:8080/api/transactions \
-H "Content-Type: application/json" \
-d '{"transactionId": "TX123", "amount": 100.00, "status": "PENDING"}'

更新一條交易記錄

curl -X PUT http://localhost:8080/api/transactions/TX123 \
-H "Content-Type: application/json" \
-d '{"transactionId": "TX123", "amount": 100.00, "status": "COMPLETED"}'

觀察后臺日志

================> binlog[mysql-bin.000001:1234] , name[your_database,transaction] , eventType : INSERT
id : 1    update=true
transaction_id : TX123    update=true
amount : 100.00    update=true
status : PENDING    update=true
================> binlog[mysql-bin.000001:5678] , name[your_database,transaction] , eventType : UPDATE
-------> before
id : 1    update=false
transaction_id : TX123    update=false
amount : 100.00    update=false
status : PENDING    update=false
-------> after
id : 1    update=false
transaction_id : TX123    update=false
amount : 100.00    update=false
status : COMPLETED    update=true
責(zé)任編輯:武曉燕 來源: Java知識日歷
相關(guān)推薦

2020-06-12 12:49:52

數(shù)據(jù)

2025-02-20 18:17:41

2020-09-21 11:30:28

CanalMySQL數(shù)據(jù)庫

2025-03-20 08:57:54

Spring日志存儲系統(tǒng)

2025-04-01 08:38:41

2023-09-26 08:11:22

Spring配置MySQL

2024-12-06 11:58:16

2023-05-31 08:56:24

2021-11-07 15:01:16

區(qū)塊鏈金融技術(shù)

2024-09-02 09:14:36

SpringRabbitMQ數(shù)據(jù)

2025-04-25 08:34:52

2011-04-29 14:35:53

惠普工作站

2025-03-11 09:28:34

2025-03-17 08:39:08

SpringApache數(shù)據(jù)

2017-02-20 20:04:05

系統(tǒng)超輕量日志實現(xiàn)

2024-04-09 10:02:13

Spring數(shù)據(jù)Redis

2024-10-30 08:15:18

2021-12-27 09:59:57

SpringCanal 中間件

2009-05-14 10:02:59

實時數(shù)據(jù)SQL Server商業(yè)智能

2025-04-23 08:50:00

SpringBootCurator分布式鎖
點贊
收藏

51CTO技術(shù)棧公眾號