發(fā)件箱模式雖然聽上去可能很簡單,但是在平時開發(fā)中可能會忽略掉。如果還不能理解,我們可以將它類比到生活的場景,寄信人只需要寫好信件,放入收件箱,之后就不用管了。送信的人會來收件箱取走信件,根據(jù)信件里需要送到的地址,將信件送至目的地。這樣做的好處就是,寄信人寫好信之后,就不需要等待收信人有空的時候才能寄信,只需要往發(fā)件箱里丟就好了。
?前言
微服務架構如今非常的流行,這個架構下可能經(jīng)常會遇到“雙寫”的場景。雙寫是指您的應用程序需要在兩個不同的系統(tǒng)中更改數(shù)據(jù)的情況,比如它需要將數(shù)據(jù)存儲在數(shù)據(jù)庫中并向消息隊列發(fā)送事件。您需要保證這兩個操作都會成功。如果兩個操作之一失敗,您的系統(tǒng)可能會變得不一致。那針對這樣的情況有什么好的方法或者設計保證呢?本文就和大家分享一個“發(fā)件箱模式”, 可以很好的避免此類問題。
下訂單的例子
假設我們有一個 OrderService 類,它在創(chuàng)建新訂單時被調用,此時它應該將訂單實體保存在數(shù)據(jù)庫中并向交付微服務發(fā)送一個事件,以便交付部門可以開始計劃交付。
你的代碼可能是下面這樣子的:
@Service
public record OrderService(
IDeliveryMessageQueueService deliveryMessageQueueService,
IOrderRepository orderRepository,
TransactionTemplate transactionTemplate) implements IOrderService {
@Override
public void create(int id, String description) {
String message = buildMessage(id, description);
transactionTemplate.executeWithoutResult(transactionStatus -> {
// 保存訂單
orderRepository.save(id, description);
});
// 發(fā)送消息
deliveryMessageQueueService.send(message);
}
private String buildMessage(int id, String description) {
// ...
}
}
可以看到我們在事務中將訂單保存在數(shù)據(jù)庫中,然后我們使用消息隊列將事件發(fā)送到交付服務。這是雙寫的一個場景。
這么寫,會遇到什么問題呢?
首先,如果我們保存了訂單但是發(fā)送消息失敗了怎么辦?送貨服務永遠不會收到消息。
那你可能想到把保存訂單和發(fā)消息放到同一個事務中不就可以了嗎,就是是將 deliveryMessageQueueService#send? 移動到與 orderRepository#save相同的事務中,如下圖:
transactionTemplate.executeWithoutResult(transactionStatus -> {
// 保存訂單
orderRepository.save(id, description);
// 發(fā)送消息
deliveryMessageQueueService.send(message);
});
實際上,在數(shù)據(jù)庫事務內(nèi)部建立 TCP 連接是一種糟糕的做法,我們不應該這樣做。
有沒有更好的方法呢?
我們可以訂單表所在的同一數(shù)據(jù)庫中有一個表“發(fā)件箱”(在最簡單的情況下,它可以有一個列“消息”和當前時間戳)。保存訂單時,在同一個事務中,我們在“發(fā)件箱”表中保存了一條消息。消息一發(fā)送,我們就可以將其從發(fā)件箱表中刪除,代碼如下:
@Service
public record OrderService(
IDeliveryMessageQueueService deliveryMessageQueueService,
IOrderRepository orderRepository,
IOutboxRepository outboxRepository,
TransactionTemplate transactionTemplate) implements IOrderService {
@Override
public void create(int id, String description) {
UUID outboxId = UUID.randomUUID();
String message = buildMessage(id, description);
transactionTemplate.executeWithoutResult(transactionStatus -> {
// 保存訂單
orderRepository.save(id, description);
// 保存到發(fā)件箱
outboxRepository.save(new OutboxEntity(outboxId, message));
});
deliveryMessageQueueService.send(message);
// 刪除
outboxRepository.delete(outboxId);
}
private String buildMessage(int id, String description) {
// ...
}
}
可以看到,我們在一次事務中將訂單和發(fā)件箱實體保存在我們的數(shù)據(jù)庫中。然后我們發(fā)送一條消息,如果成功,我們刪除這條消息。
如果 deliveryMessageQueueService#send? 失敗會怎樣?(例如,您的應用程序被終止或消息隊列或數(shù)據(jù)庫不可用)。在這種情況下,outboxRepository#delete 將不會運行,我們必須重試發(fā)送消息。
它可以使用將在后臺運行的計劃任務來完成,該任務將嘗試發(fā)送在表發(fā)件箱中顯示超過 X 秒(例如 10 秒)的消息,如下面的代碼。
@Service
public record OutboxRetryTask(IOutboxRepository outboxRepository,
IDeliveryMessageQueueService deliveryMessageQueueService) {
@Scheduled(fixedDelayString = "10000")
public void retry() {
List<OutboxEntity> outboxEntities = outboxRepository.findAllBefore(Instant.now().minusSeconds(60));
for (OutboxEntity outbox : outboxEntities) {
deliveryMessageQueueService.send(outbox.message());
outboxRepository.delete(outbox.id());
}
}
}
在這里你可以看到,我們每 10 秒運行一個任務,并發(fā)送之前沒有發(fā)送過的消息。如果消息成功發(fā)送到消息隊列,但發(fā)件箱實體沒有從數(shù)據(jù)庫中刪除(例如因為數(shù)據(jù)庫問題),那么下次該后臺任務將嘗試再次將此消息發(fā)送到消息隊列。但這也意味著我們消息的消費者必須做好冪等處理,因為可能會多次接收相同的消息。
發(fā)件箱模式
通過上面的例子,我們可以抽象出“發(fā)件箱模式”。

- 在數(shù)據(jù)庫里面額外增加一個outbox表用于存儲需要發(fā)送的event
- 把直接發(fā)送event的步驟換成先把event存儲到數(shù)據(jù)庫outbox表
- 程序啟動一個 job 不斷去抓取 outbox 表里面的記錄,通過推送線程完成不同業(yè)務的推送
- 最后刪除發(fā)送成功的記錄
- 提醒消息消費端要做好冪等處理
總結
發(fā)件箱模式雖然聽上去可能很簡單,但是在平時開發(fā)中可能會忽略掉。如果還不能理解,我們可以將它類比到生活的場景,寄信人只需要寫好信件,放入收件箱,之后就不用管了。送信的人會來收件箱取走信件,根據(jù)信件里需要送到的地址,將信件送至目的地。這樣做的好處就是,寄信人寫好信之后,就不需要等待收信人有空的時候才能寄信,只需要往發(fā)件箱里丟就好了。