SpringBoot與BookKeeper整合,實現(xiàn)金融級別的日志存儲系統(tǒng)
作者:Java知識日歷
選擇Apache BookKeeper作為金融級日志存儲系統(tǒng)的核心組件,主要是因為它具備高性能、高可靠性和良好的可擴展性,能夠有效滿足金融機構對日志存儲的要求。
選擇Apache BookKeeper作為金融級日志存儲系統(tǒng)的核心組件,主要是因為它具備高性能、高可靠性和良好的可擴展性,能夠有效滿足金融機構對日志存儲的要求。
BookKeeper的優(yōu)勢
高吞吐量和低延遲
- 分布式架構: Apache BookKeeper采用分布式的架構設計,能夠支持高并發(fā)的寫入和讀取操作。
- 批量寫入: 支持批量寫入日志條目,顯著提高寫入效率。
- 異步I/O: 使用異步I/O操作,減少等待時間,提升整體性能。
數(shù)據(jù)一致性和持久性
- 強一致性保證: BookKeeper提供強一致性保證,確保所有寫入的數(shù)據(jù)都能被正確讀取。
- 多副本復制: 數(shù)據(jù)在多個Bookies(BookKeeper節(jié)點)上進行多副本復制,防止單點故障導致的數(shù)據(jù)丟失。
- 自動恢復: 在節(jié)點故障時,BookKeeper能夠自動檢測并恢復數(shù)據(jù),確保系統(tǒng)的連續(xù)運行。
水平擴展能力
- 動態(tài)擴展: 可以通過增加Bookies來擴展集群規(guī)模,適應不斷增長的業(yè)務需求。
- 負載均衡: 自動分配負載,確保各節(jié)點之間的工作負載平衡,避免熱點問題。
- 靈活性: 支持多種部署方式,包括本地部署、云部署等。
數(shù)據(jù)加密和訪問控制
- 數(shù)據(jù)加密: 支持對存儲的日志數(shù)據(jù)進行加密處理,防止未授權訪問。
- 認證和授權: 提供細粒度的權限管理機制,限制不同角色的訪問權限。
- 審計日志: 記錄所有對系統(tǒng)的訪問和操作,便于追蹤和審計。
哪些公司采用了BookKeeper?
Intel
- 用途: Intel在其物聯(lián)網(wǎng)(IoT)解決方案中使用BookKeeper來收集和存儲傳感器數(shù)據(jù)。
- 優(yōu)勢: 多副本復制和自動恢復機制,確保數(shù)據(jù)的可靠性和完整性。
阿里巴巴集團
- 用途: 阿里巴巴在多個核心系統(tǒng)中使用BookKeeper,包括交易日志存儲、監(jiān)控系統(tǒng)和大數(shù)據(jù)平臺。
- 優(yōu)勢: 成熟的社區(qū)支持和與現(xiàn)有生態(tài)系統(tǒng)的良好集成,提升了開發(fā)效率和系統(tǒng)穩(wěn)定性。
Baidu
- 用途: Baidu在其搜索引擎和推薦系統(tǒng)中使用BookKeeper來存儲大量的日志和索引數(shù)據(jù)。
- 優(yōu)勢: 高效的數(shù)據(jù)檢索能力和靈活的配置選項,適應不同的應用場景。
Microsoft Azure
- 用途: Microsoft Azure在其云平臺上使用BookKeeper來支持各種分布式系統(tǒng)和服務。
- 優(yōu)勢: 高性能和可擴展性,滿足不同規(guī)模的應用需求。
PayPal
- 用途: PayPal使用BookKeeper來存儲支付交易日志,確保每一筆交易的完整記錄和快速查詢。
- 優(yōu)勢: 數(shù)據(jù)加密和訪問控制,保障金融數(shù)據(jù)的安全性。
Yahoo!
- 用途: Yahoo!在其多個分布式系統(tǒng)中使用BookKeeper,包括搜索引擎日志記錄和流處理系統(tǒng)。
- 優(yōu)勢: 強一致性保證和高可用性,支持復雜的數(shù)據(jù)處理需求。
- 用途: Twitter在其基礎設施中使用BookKeeper來處理大量實時數(shù)據(jù)流,包括推文事件和用戶活動日志。
- 優(yōu)勢: 支持高并發(fā)寫入和讀取操作,能夠應對快速增長的業(yè)務需求。
eBay
- 用途: eBay在其電商平臺中使用BookKeeper來存儲交易日志和其他關鍵數(shù)據(jù)。
- 優(yōu)勢: 安全的數(shù)據(jù)加密和嚴格的訪問控制,保護敏感信息。
記得啟動ZooKeeper服務器
因為BookKeeper依賴于ZooKeeper來進行元數(shù)據(jù)管理和協(xié)調!?。?/span>
我這邊的本地環(huán)境已運行了ZooKeeper。
代碼實操
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.5</version>
<relativePath/><!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>bookkeeper-springboot-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>bookkeeper-springboot-example</name>
<description>Demo project for Spring Boot and Apache BookKeeper integration</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Apache BookKeeper Client -->
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-server</artifactId>
<version>4.18.0</version>
</dependency>
<!-- Jackson Databind for JSON processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- Lombok for reducing boilerplate code -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties
# ZooKeeper 連接字符串
bookkeeper.zk.connectString=localhost:2181
server.port=8080
配置類
package com.example.bookkeeperspringbootexample.config;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PreDestroy;
import java.io.IOException;
@Configuration
publicclass BookKeeperConfig {
privatestaticfinal Logger logger = LoggerFactory.getLogger(BookKeeperConfig.class);
@Value("${bookkeeper.zk.connectString}")
private String zkConnectString;
private BookKeeper bookKeeper;
private LedgerHandle ledgerHandle;
/**
* 初始化BookKeeper客戶端
*
* @return BookKeeper實例
* @throws IOException 如果初始化失敗
*/
@Bean
public BookKeeper bookKeeper() throws IOException {
ClientConfiguration conf = new ClientConfiguration();
conf.setZkServers(zkConnectString);
bookKeeper = new BookKeeper(conf);
logger.info("BookKeeper客戶端已初始化。");
return bookKeeper;
}
/**
* 創(chuàng)建一個新的Ledger
*
* @param bookKeeper BookKeeper實例
* @return LedgerHandle實例
* @throws Exception 如果創(chuàng)建Ledger失敗
*/
@Bean
public LedgerHandle ledgerHandle(BookKeeper bookKeeper) throws Exception {
ledgerHandle = bookKeeper.createLedger(
BookKeeper.DigestType.CRC32,
"password".getBytes()
);
logger.info("Ledger已創(chuàng)建,ID: {}", ledgerHandle.getId());
return ledgerHandle;
}
/**
* 關閉BookKeeper客戶端和Ledger
*/
@PreDestroy
public void shutdown() throws InterruptedException, BookKeeper.BKException {
if (ledgerHandle != null) {
ledgerHandle.close();
logger.info("Ledger已關閉。");
}
if (bookKeeper != null) {
bookKeeper.close();
logger.info("BookKeeper客戶端已關閉。");
}
}
}
交易的數(shù)據(jù)模型
package com.example.bookkeeperspringbootexample.model;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 表示交易的數(shù)據(jù)模型
*/
@Data
public class Transaction {
private Long transactionId; // 交易ID
private Double amount; // 交易金額
private LocalDateTime timestamp; // 時間戳
}
服務類
package com.example.bookkeeperspringbootexample.service;
import com.example.bookkeeperspringbootexample.model.Transaction;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@Service
publicclass BookKeeperService {
privatestaticfinal Logger logger = LoggerFactory.getLogger(BookKeeperService.class);
@Autowired
private LedgerHandle ledgerHandle;
@Autowired
private ObjectMapper objectMapper;
/**
* 異步添加交易到BookKeeper
*
* @param transaction 交易對象
* @return CompletableFuture<Long> 包含新條目的entryId
*/
public CompletableFuture<Long> addTransaction(Transaction transaction) {
try {
byte[] logData = objectMapper.writeValueAsBytes(transaction); // 將交易對象轉換為字節(jié)數(shù)組
return CompletableFuture.supplyAsync(() -> {
try {
long entryId = ledgerHandle.addEntry(logData); // 將字節(jié)數(shù)組添加到Ledger
logger.info("已添加交易,entryId: {}", entryId);
return entryId;
} catch (BKException | InterruptedException e) {
thrownew RuntimeException(e);
}
});
} catch (IOException e) {
thrownew RuntimeException(e);
}
}
/**
* 異步從BookKeeper讀取交易
*
* @param entryId 條目ID
* @return CompletableFuture<Transaction> 包含讀取的交易對象
*/
public CompletableFuture<Transaction> readTransaction(long entryId) {
return CompletableFuture.supplyAsync(() -> {
try {
LedgerSequence seq = ledgerHandle.readEntries(entryId, entryId); // 讀取指定entryId的條目
if (seq.hasMoreElements()) {
LedgerEntry entry = seq.nextElement(); // 獲取條目
byte[] data = entry.getEntryBytes(); // 獲取條目的字節(jié)數(shù)組
logger.info("已讀取交易,entryId: {}", entryId);
return objectMapper.readValue(data, Transaction.class); // 將字節(jié)數(shù)組轉換為交易對象
}
thrownew IllegalArgumentException("未找到ID為 " + entryId + " 的交易");
} catch (BKException | InterruptedException | ExecutionException | IOException e) {
thrownew RuntimeException(e);
}
});
}
}
Controller
package com.example.bookkeeperspringbootexample.controller;
import com.example.bookkeeperspringbootexample.model.Transaction;
import com.example.bookkeeperspringbootexample.service.BookKeeperService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.concurrent.CompletableFuture;
@RestController
@RequestMapping("/transactions")
publicclass TransactionController {
@Autowired
private BookKeeperService bookKeeperService;
/**
* 添加新的交易
*
* @param transaction 交易對象
* @return ResponseEntity<Long> 包含新條目的entryId
*/
@PostMapping("/")
public ResponseEntity<Long> addTransaction(@RequestBody Transaction transaction) {
CompletableFuture<Long> futureEntryId = bookKeeperService.addTransaction(transaction); // 異步添加交易
try {
Long entryId = futureEntryId.get(); // 獲取結果
return ResponseEntity.ok(entryId); // 返回成功的HTTP響應
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt(); // 中斷線程
return ResponseEntity.internalServerError().build(); // 返回內部服務器錯誤
}
}
/**
* 根據(jù)entryId讀取交易
*
* @param entryId 條目ID
* @return ResponseEntity<Transaction> 包含讀取的交易對象
*/
@GetMapping("/{entryId}")
public ResponseEntity<Transaction> getTransaction(@PathVariable long entryId) {
CompletableFuture<Transaction> futureTransaction = bookKeeperService.readTransaction(entryId); // 異步讀取交易
try {
Transaction transaction = futureTransaction.get(); // 獲取結果
return ResponseEntity.ok(transaction); // 返回成功的HTTP響應
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt(); // 中斷線程
return ResponseEntity.notFound().build(); // 返回未找到資源
}
}
}
Application
package com.example.bookkeeperspringbootexample;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class BookKeeperSpringBootExampleApplication {
public static void main(String[] args) {
SpringApplication.run(BookKeeperSpringBootExampleApplication.class, args);
}
}
測試
添加交易
curl -X POST http://localhost:8080/transactions/ \
-H "Content-Type: application/json" \
-d '{"transactionId": 1, "amount": 100.50, "timestamp": "2025-03-19T21:36:06"}'
Respons:
1
讀取交易
curl -X GET http://localhost:8080/transactions/1
Respons:
{"transactionId":1,"amount":100.5,"timestamp":"2025-03-19T21:36:06"}
責任編輯:武曉燕
來源:
Java知識日歷