SpringBoot與Pulsar整合,實(shí)現(xiàn)金融交易場(chǎng)景下的消息有序性保障
Apache Pulsar 設(shè)計(jì)用于大規(guī)模實(shí)時(shí)數(shù)據(jù)處理,支持多種消息傳遞模型(發(fā)布/訂閱、隊(duì)列等),并提供了強(qiáng)大的功能來(lái)確保消息的可靠性和性能。
優(yōu)勢(shì)
1. 強(qiáng)大的消息模型
- 發(fā)布/訂閱 (Pub/Sub): 支持多個(gè)消費(fèi)者同時(shí)從同一個(gè)主題接收消息,適合實(shí)時(shí)數(shù)據(jù)分析和通知系統(tǒng)。
- 獨(dú)占訂閱 (Exclusive Subscription): 確保只有一個(gè)消費(fèi)者能夠消費(fèi)某個(gè)分區(qū)的消息,從而保證消息的嚴(yán)格順序。
- 共享訂閱 (Shared Subscription): 多個(gè)消費(fèi)者可以負(fù)載均衡地消費(fèi)消息,提高吞吐量。
- 故障域感知路由: 根據(jù)地理位置和網(wǎng)絡(luò)拓?fù)鋬?yōu)化消息路由,確保高效的數(shù)據(jù)傳輸。
2. 持久化與存儲(chǔ)
- 持久化消息: 所有消息都被持久化到磁盤,確保消息不會(huì)丟失。
- 分層存儲(chǔ): 使用分層存儲(chǔ)策略,結(jié)合內(nèi)存和磁盤存儲(chǔ),提高讀寫效率。
- 自動(dòng)清理: 定期清理過(guò)期或不再需要的消息,節(jié)省存儲(chǔ)空間。
3. 事務(wù)支持
- 事務(wù)消息: 支持事務(wù)性的消息發(fā)送和確認(rèn)機(jī)制,確保數(shù)據(jù)一致性。
- 兩階段提交: 實(shí)現(xiàn)ACID特性,保證消息的一致性和可靠性。
4. 死信隊(duì)列
- 死信隊(duì)列 (Dead Letter Queue, DLQ): 對(duì)于無(wú)法成功處理的消息,將其放入死信隊(duì)列以便后續(xù)排查和處理。
- 重試機(jī)制: 在消息處理失敗時(shí),進(jìn)行一定次數(shù)的重試(默認(rèn)最多3次),如果仍然失敗,則將消息放入死信隊(duì)列。
應(yīng)用場(chǎng)景
用戶發(fā)起一筆交易請(qǐng)求,系統(tǒng)需要將該請(qǐng)求發(fā)送到交易處理系統(tǒng),并確保請(qǐng)求按順序被處理。而使用Pulsar的獨(dú)占訂閱模式確保交易請(qǐng)求按順序被單一消費(fèi)者處理,避免亂序?qū)е碌馁~務(wù)錯(cuò)誤。
啟動(dòng)Pulsar:
bin/pulsar standalone
代碼實(shí)操
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Apache Pulsar Client -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.10.1</version>
</dependency>
<!-- Lombok for cleaner Java code -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- JUnit for testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置文件
在application.properties
文件中配置Pulsar的相關(guān)屬性:
# Pulsar broker URL
pulsar.service.url=pulsar://localhost:6650
# Topic name
pulsar.topic.name=finance-transaction-topic
# Dead letter topic name
pulsar.dead-letter.topic.name=dead-letter-topic
# Max redelivery count before sending to dead letter queue
pulsar.max.redeliver.count=3
服務(wù)類
創(chuàng)建一個(gè)服務(wù)類來(lái)處理生產(chǎn)和消費(fèi)消息,包括事務(wù)消息和死信隊(duì)列的處理邏輯。
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.CompletableFuture;
@Service
@Slf4j
public class FinanceTransactionService {
@Value("${pulsar.service.url}")
private String serviceUrl;
@Value("${pulsar.topic.name}")
private String topicName;
@Value("${pulsar.dead-letter.topic.name}")
private String deadLetterTopicName;
@Value("${pulsar.max.redeliver.count}")
private int maxRedeliverCount;
private PulsarClient client;
private Producer<String> producer;
private Consumer<String> consumer;
/**
* 初始化Pulsar客戶端、生產(chǎn)者和消費(fèi)者
*/
@PostConstruct
public void init() throws Exception {
// 初始化Pulsar客戶端
client = PulsarClient.builder()
.serviceUrl(serviceUrl)
.build();
// 創(chuàng)建生產(chǎn)者
producer = client.newProducer(Schema.STRING)
.topic(topicName)
.sendTimeout(0, java.util.concurrent.TimeUnit.SECONDS)
.enableBatching(false)
.create();
// 創(chuàng)建消費(fèi)者
consumer = client.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("finance-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
.maxDelayMs(60_000)
.minDelayMs(1_000)
.multiplier(2)
.build())
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliverCount)
.deadLetterTopic(deadLetterTopicName)
.build())
.subscribe();
// 開始消費(fèi)消息
consumeMessages();
}
/**
* 關(guān)閉Pulsar客戶端、生產(chǎn)者和消費(fèi)者
*/
@PreDestroy
public void close() throws Exception {
if (producer != null) {
producer.close();
}
if (consumer != null) {
consumer.close();
}
if (client != null) {
client.close();
}
}
/**
* 發(fā)送事務(wù)消息
*
* @param message 消息內(nèi)容
* @return 消息ID的CompletableFuture對(duì)象
*/
public CompletableFuture<MessageId> sendTransactionalMessage(String message) {
return producer.sendAsync(message);
}
/**
* 消費(fèi)消息并處理
*/
private void consumeMessages() {
new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Message<String> msg = consumer.receive();
log.info("Received message: {}", msg.getValue());
// 處理消息
boolean processSuccess = processMessage(msg.getValue());
if (processSuccess) {
// 確認(rèn)消息
consumer.acknowledgeAsync(msg.getMessageId());
} else {
// 負(fù)確認(rèn)消息,觸發(fā)重試機(jī)制
consumer.negativeAcknowledge(msg.getMessageId(), new CustomException("Processing failed"));
}
} catch (Exception e) {
log.error("Error processing message", e);
}
}
}).start();
}
/**
* 模擬消息處理邏輯
*
* @param message 消息內(nèi)容
* @return 處理是否成功
*/
private boolean processMessage(String message) {
// 模擬消息處理邏輯
// 對(duì)于每三條消息中的一條模擬處理失敗
long messageId = Long.parseLong(message.split(":")[1]);
return messageId % 3 != 0;
}
static class CustomException extends Exception {
public CustomException(String message) {
super(message);
}
}
// Getter methods for configuration properties (for testing purposes)
public String getServiceUrl() {
return serviceUrl;
}
public String getTopicName() {
return topicName;
}
public String getDeadLetterTopicName() {
return deadLetterTopicName;
}
public int getMaxRedeliverCount() {
return maxRedeliverCount;
}
}
控制器類
創(chuàng)建一個(gè)控制器類來(lái)暴露API端點(diǎn)用于發(fā)送消息。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ExecutionException;
@RestController
public class FinanceTransactionController {
@Autowired
private FinanceTransactionService financeTransactionService;
/**
* 發(fā)送消息到Pulsar主題
*
* @param message 消息內(nèi)容
* @return 發(fā)送結(jié)果
*/
@PostMapping("/send-message")
public String sendMessage(@RequestParam String message) {
try {
financeTransactionService.sendTransactionalMessage(message).get();
return"Message sent successfully";
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to send message", e);
return"Failed to send message";
}
}
}
單元測(cè)試
為了驗(yàn)證上述功能是否正常工作,我們寫了一些測(cè)試用例。
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.http.ResponseEntity;
import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class FinanceTransactionControllerTest {
@Autowired
private TestRestTemplate restTemplate;
@Autowired
private FinanceTransactionService financeTransactionService;
/**
* 清空主題中的消息,確保每次測(cè)試前環(huán)境干凈
*/
@BeforeEach
public void setUp() throws Exception {
clearTopic(financeTransactionService.getTopicName());
clearTopic(financeTransactionService.getDeadLetterTopicName());
}
/**
* 關(guān)閉資源
*/
@AfterEach
public void tearDown() throws Exception {
financeTransactionService.close();
}
/**
* 測(cè)試成功發(fā)送的消息是否正確地出現(xiàn)在主主題中,并且沒(méi)有出現(xiàn)在死信隊(duì)列中
*/
@Test
public void testSendMessage_Success() {
ResponseEntity<String> response = restTemplate.postForEntity("/send-message?message=transaction:1", null, String.class);
assertEquals("Message sent successfully", response.getBody());
response = restTemplate.postForEntity("/send-message?message=transaction:2", null, String.class);
assertEquals("Message sent successfully", response.getBody());
response = restTemplate.postForEntity("/send-message?message=transaction:4", null, String.class);
assertEquals("Message sent successfully", response.getBody());
// 驗(yàn)證消息在主主題中
assertMessageInTopic("transaction:1");
assertMessageInTopic("transaction:2");
assertMessageInTopic("transaction:4");
// 驗(yàn)證死信隊(duì)列中沒(méi)有消息
assertNoMessagesInTopic(financeTransactionService.getDeadLetterTopicName());
}
/**
* 測(cè)試失敗發(fā)送的消息是否在達(dá)到最大重試次數(shù)后進(jìn)入死信隊(duì)列
*/
@Test
public void testSendMessage_Failure() {
ResponseEntity<String> response = restTemplate.postForEntity("/send-message?message=transaction:3", null, String.class);
assertEquals("Message sent successfully", response.getBody());
// 驗(yàn)證消息在死信隊(duì)列中(經(jīng)過(guò)多次重試)
assertMessageInTopicWithRetries("transaction:3", financeTransactionService.getMaxRedeliverCount());
}
/**
* 清空指定主題中的所有消息
*
* @param topicName 主題名稱
*/
private void clearTopic(String topicName) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl(financeTransactionService.getServiceUrl())
.build();
Reader<String> reader = client.newReader(Schema.STRING)
.topic(topicName)
.startMessageId(MessageId.earliest)
.create();
while (reader.hasMessageAvailable()) {
reader.readNext();
}
reader.close();
client.close();
}
/**
* 驗(yàn)證指定主題中是否存在特定消息
*
* @param expectedMessage 預(yù)期消息內(nèi)容
*/
private void assertMessageInTopic(String expectedMessage) {
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(financeTransactionService.getServiceUrl())
.build();
Reader<String> reader = client.newReader(Schema.STRING)
.topic(financeTransactionService.getTopicName())
.startMessageId(MessageId.earliest)
.create()) {
while (reader.hasMessageAvailable()) {
Message<String> msg = reader.readNext();
if (msg.getValue().equals(expectedMessage)) {
return;
}
}
fail("Expected message not found in topic: " + expectedMessage);
} catch (Exception e) {
fail("Failed to read from topic: " + e.getMessage());
}
}
/**
* 驗(yàn)證指定主題中沒(méi)有消息
*
* @param topicName 主題名稱
*/
private void assertNoMessagesInTopic(String topicName) {
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(financeTransactionService.getServiceUrl())
.build();
Reader<String> reader = client.newReader(Schema.STRING)
.topic(topicName)
.startMessageId(MessageId.earliest)
.create()) {
assertFalse(reader.hasMessageAvailable(), "Unexpected messages found in topic: " + topicName);
} catch (Exception e) {
fail("Failed to read from topic: " + e.getMessage());
}
}
/**
* 驗(yàn)證指定主題中是否存在特定消息(帶有重試機(jī)制)
*
* @param expectedMessage 預(yù)期消息內(nèi)容
* @param maxRetries 最大重試次數(shù)
*/
private void assertMessageInTopicWithRetries(String expectedMessage, int maxRetries) {
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(financeTransactionService.getServiceUrl())
.build();
Reader<String> reader = client.newReader(Schema.STRING)
.topic(financeTransactionService.getDeadLetterTopicName())
.startMessageId(MessageId.earliest)
.create()) {
int retryCount = 0;
while (retryCount < maxRetries) {
if (reader.hasMessageAvailable()) {
Message<String> msg = reader.readNext();
if (msg.getValue().equals(expectedMessage)) {
return;
}
}
retryCount++;
Thread.sleep(1000); // 等待1秒后重試
}
fail("Expected message not found in dead letter topic after retries: " + expectedMessage);
} catch (Exception e) {
fail("Failed to read from dead letter topic: " + e.getMessage());
}
}
}
測(cè)試結(jié)果
發(fā)送消息:
curl -X POST http://localhost:8080/send-message\?message\=transaction\:1
curl -X POST http://localhost:8080/send-message\?message\=transaction\:2
curl -X POST http://localhost:8080/send-message\?message\=transaction\:3
curl -X POST http://localhost:8080/send-message\?message\=transaction\:4
日志:
Received message: transaction:1
Received message: transaction:2
Received message: transaction:3
Received message: transaction:4