SpringBoot與Canal整合,實現(xiàn)金融交易系統(tǒng)的實時數(shù)據(jù)同步功能
作者:Java知識日歷
Canal是阿里巴巴開源的一個用于高效抓取 MySQL 數(shù)據(jù)庫增量變更日志(binlog)并進行處理的中間件。它可以將 MySQL 的 binlog 解析為結(jié)構(gòu)化的 JSON 格式,并提供多種方式將這些數(shù)據(jù)推送到下游系統(tǒng)。
Canal是阿里巴巴開源的一個用于高效抓取 MySQL 數(shù)據(jù)庫增量變更日志(binlog)并進行處理的中間件。它可以將 MySQL 的 binlog 解析為結(jié)構(gòu)化的 JSON 格式,并提供多種方式將這些數(shù)據(jù)推送到下游系統(tǒng)。
我們?yōu)槭裁催x擇Canal?
- 實時性: Canal基于MySQL的binlog機制,能夠在毫秒級內(nèi)完成數(shù)據(jù)同步。
- 批量獲取數(shù)據(jù):Canal支持批量獲取數(shù)據(jù)庫變更數(shù)據(jù),減少網(wǎng)絡(luò)開銷和處理時間。
- 多線程處理:Canal可以配置多線程來處理不同的數(shù)據(jù)變更事件,提高整體吞吐量。
- 斷點續(xù)傳:Canal支持從斷點繼續(xù)消費數(shù)據(jù),確保數(shù)據(jù)不會丟失。
- 持久化存儲:Canal可以將消費進度持久化到ZooKeeper中,保證在故障恢復(fù)后能夠繼續(xù)正常工作。
- 容錯機制:Canal內(nèi)置了多種容錯機制,如重試策略和自動恢復(fù)功能,提高了系統(tǒng)的可靠性。
- 標準協(xié)議:Canal使用標準化的binlog協(xié)議,易于與其他系統(tǒng)集成。
- 過濾機制:Canal支持靈活的過濾規(guī)則,可以選擇性地訂閱特定的數(shù)據(jù)庫和表。
- 動態(tài)配置:Canal支持動態(tài)配置,可以根據(jù)實際需求調(diào)整監(jiān)控范圍和處理邏輯。
- 自定義處理:Canal允許開發(fā)者編寫自定義的處理器,實現(xiàn)復(fù)雜的數(shù)據(jù)處理邏輯。
- 精確同步:Canal能夠精確地捕獲和同步數(shù)據(jù)庫的每一行變更,確保數(shù)據(jù)的一致性。
- 事務(wù)支持:Canal能夠處理復(fù)雜的事務(wù)場景,確保事務(wù)的原子性和完整性。
- 沖突解決:Canal提供了多種沖突解決策略,避免數(shù)據(jù)同步過程中的沖突問題。
哪些公司使用了Canal?
- 阿里巴巴 :Canal 被用于多個業(yè)務(wù)部門的數(shù)據(jù)同步需求。
- 騰訊 :在社交網(wǎng)絡(luò)、游戲等業(yè)務(wù)中使用 Canal 進行數(shù)據(jù)同步。
- 美團:在餐飲外賣、酒店預(yù)訂等多個業(yè)務(wù)中使用 Canal 進行數(shù)據(jù)同步。
- 小米 :在智能家居、手機銷售等多種業(yè)務(wù)中使用 Canal 進行數(shù)據(jù)同步。
- 滴滴出行:在網(wǎng)約車、共享單車等多種業(yè)務(wù)中使用 Canal 進行數(shù)據(jù)同步。
- 網(wǎng)易:在游戲、音樂等多種業(yè)務(wù)中使用 Canal 進行數(shù)據(jù)同步。
代碼實操
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>canal-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>canal-demo</name>
<description>Demo project for Spring Boot with Canal</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties
# 數(shù)據(jù)源配置
spring.datasource.url=jdbc:mysql://localhost:3306/your_database?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# Canal配置
canal.server.ip=127.0.0.1
canal.port=11111
canal.destination=example
交易實體類
package com.example.canaldemo.model;
import lombok.Data;
@Data
public class Transaction {
private Long id; // 主鍵ID
private String transactionId; // 交易ID
private Double amount; // 交易金額
private String status; // 交易狀態(tài)
}
create table
CREATE TABLE transaction (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
transaction_id VARCHAR(50) NOT NULL,
amount DECIMAL(18, 2) NOT NULL,
status VARCHAR(20) NOT NULL
);
交易Mapper接口
package com.example.canaldemo.mapper;
import com.example.canaldemo.model.Transaction;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;
/**
* 交易Mapper接口
*/
@Mapper
public interface TransactionMapper {
/**
* 插入一條新的交易記錄
*
* @param transaction 交易對象
*/
@Insert("INSERT INTO transaction(transaction_id, amount, status) VALUES(#{transaction.transactionId}, #{transaction.amount}, #{transaction.status})")
void insert(@Param("transaction") Transaction transaction);
/**
* 更新一條交易記錄
*
* @param transaction 交易對象
*/
@Update("UPDATE transaction SET amount=#{transaction.amount}, status=#{transaction.status} WHERE transaction_id=#{transaction.transactionId}")
void update(@Param("transaction") Transaction transaction);
}
Canal監(jiān)聽器類
package com.example.canaldemo.listener;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.example.canaldemo.mapper.TransactionMapper;
import com.example.canaldemo.model.Transaction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.List;
/**
* Canal監(jiān)聽器類,用于監(jiān)聽數(shù)據(jù)庫的變化并進行相應(yīng)的處理
*/
@Component
public class CanalListener {
private final String destination = "example"; // 這個值需要與Canal配置中的destination一致
private final String serverIp = "127.0.0.1";
private final int port = 11111;
@Autowired
private TransactionMapper transactionMapper;
/**
* 在Bean初始化后啟動Canal監(jiān)聽器
*/
@PostConstruct
public void start() {
// 創(chuàng)建Canal連接器
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(serverIp, port), destination, "", "");
try {
// 連接到Canal服務(wù)器
connector.connect();
// 訂閱所有數(shù)據(jù)庫的所有表
connector.subscribe(".*\\..*");
// 回滾到上次中斷的位置
connector.rollback();
while (true) {
// 獲取一批消息,最多100條
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
// 如果沒有消息,則等待1秒
Thread.sleep(1000);
} else {
// 處理消息
processMessage(message.getEntries());
}
// 提交確認
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 斷開連接
connector.disconnect();
}
}
/**
* 處理Canal發(fā)送過來的消息
*
* @param entryList 消息列表
*/
private void processMessage(List<CanalEntry.Entry> entryList) {
for (CanalEntry.Entry entry : entryList) {
// 忽略事務(wù)開始和結(jié)束事件
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage;
try {
// 解析RowChange數(shù)據(jù)
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
// 打印日志
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
// 處理每一行數(shù)據(jù)變化
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
Transaction transaction = convertToTransaction(rowData.getAfterColumnsList());
if (eventType == CanalEntry.EventType.DELETE) {
// 處理刪除事件(如果需要)
} elseif (eventType == CanalEntry.EventType.INSERT) {
// 插入新記錄
transactionMapper.insert(transaction);
} else {
// 更新現(xiàn)有記錄
transactionMapper.update(transaction);
}
}
}
}
/**
* 將Canal列數(shù)據(jù)轉(zhuǎn)換為Transaction對象
*
* @param columns 列數(shù)據(jù)列表
* @return 轉(zhuǎn)換后的Transaction對象
*/
private Transaction convertToTransaction(List<CanalEntry.Column> columns) {
Transaction transaction = new Transaction();
for (CanalEntry.Column column : columns) {
switch (column.getName()) {
case"id":
transaction.setId(Long.parseLong(column.getValue()));
break;
case"transaction_id":
transaction.setTransactionId(column.getValue());
break;
case"amount":
transaction.setAmount(Double.parseDouble(column.getValue()));
break;
case"status":
transaction.setStatus(column.getValue());
break;
}
}
return transaction;
}
}
Application
package com.example.canaldemo;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@MapperScan("com.example.canaldemo.mapper")
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
測試
插入一條交易記錄
curl -X POST http://localhost:8080/api/transactions \
-H "Content-Type: application/json" \
-d '{"transactionId": "TX123", "amount": 100.00, "status": "PENDING"}'
更新一條交易記錄
curl -X PUT http://localhost:8080/api/transactions/TX123 \
-H "Content-Type: application/json" \
-d '{"transactionId": "TX123", "amount": 100.00, "status": "COMPLETED"}'
觀察后臺日志
================> binlog[mysql-bin.000001:1234] , name[your_database,transaction] , eventType : INSERT
id : 1 update=true
transaction_id : TX123 update=true
amount : 100.00 update=true
status : PENDING update=true
================> binlog[mysql-bin.000001:5678] , name[your_database,transaction] , eventType : UPDATE
-------> before
id : 1 update=false
transaction_id : TX123 update=false
amount : 100.00 update=false
status : PENDING update=false
-------> after
id : 1 update=false
transaction_id : TX123 update=false
amount : 100.00 update=false
status : COMPLETED update=true
責(zé)任編輯:武曉燕
來源:
Java知識日歷