三分鐘學會在 RabbitMQ 中實現(xiàn)發(fā)布訂閱模式
在這個充滿挑戰(zhàn)和收獲的60天學習之旅中,你將迅速提升成為一名全棧工程師。專注于Spring Boot框架,我們將深入研究高級特性,從項目初始化到微服務架構(gòu),再到性能優(yōu)化和持續(xù)集成部署。無論你是初學者還是有一定經(jīng)驗的開發(fā)者,這個專題都將帶你穿越從零到全面掌握Spring Boot的學習曲線。
Day 32 ~ Springboot3.1.x|3分鐘學會在 RabbitMQ 中實現(xiàn)發(fā)布訂閱模式
實現(xiàn)發(fā)布與訂閱消息模式
發(fā)布-訂閱模式是一種消息傳遞方式,其中發(fā)送者(發(fā)布者)不會將消息直接發(fā)送到特定的接收者(訂閱者)。發(fā)布者類別定義了哪些訂閱者因為訂閱者匹配了發(fā)布者的類別而接收消息。
以下是使用RabbitMQ實現(xiàn)發(fā)布-訂閱模式的一種例子,我們將使用RabbitMQ的Fanout Exchange。
Producer
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "Log message...";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("Sent '" + message + "'");
}
}
}
在上述代碼的channel.exchangeDeclare(EXCHANGE_NAME, "fanout"),我們聲明一個名為"log"的exchange,同時我們定義其類型為"fanout",意味著它會將接收到的所有消息廣播給所有它所知道的隊列。
Consumer
每一個訂閱者都需要擁有一個queue,因此,我們需要在客戶端中創(chuàng)建queue。
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
在這個例子中,我們聲明一個新的queue,并將其與"logs"的exchange綁定。然后我們定義了消息的接收以及處理方式。
處理消息發(fā)送失敗的情況
在使用消息中間件的過程中,消息發(fā)送失敗是無法避免的情況。因此,我們需要對此進行正確的處理以避免因此而導致的系統(tǒng)問題。
對于消息發(fā)送失敗的處理,有以下幾種常用的方案:
- 重試: 對于有些暫時的問題,比如網(wǎng)絡(luò)波動,可以通過簡單的重試來解決。
- 消息持久化:將消息存儲在某處(例如數(shù)據(jù)庫),只有當消息成功發(fā)送后,再刪除它。
- 死信隊列:把無法處理的消息放入"死信隊列",然后由專門的消費者來進行處理。
RabbitMQ中的消息確認(publisher confirms)和消費者應答(Consumer Acknowledgements)就是為了解決此類問題。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection()) {
Channel channel = connection.createChannel();
String queueName = "test";
String message = "Hello world";
try {
channel.queueDeclare(queueName, false, false, false, null);
channel.confirmSelect();
channel.basicPublish("", queueName, null, message.getBytes());
if (!channel.waitForConfirms()) {
System.out.println("消息發(fā)送失敗");
}
} catch (Exception e) {
System.out.println("錯誤: " + e.getMessage());
}
}
上述代碼中執(zhí)行channel.confirmSelect();后,當前channel被設(shè)置為publisher confirm模式。在此模式下,當消息被RabbitMQ成功接收后,會發(fā)送一個確認給生產(chǎn)者。如果RabbitMQ沒有發(fā)送確認,那么生產(chǎn)者可以認定該消息發(fā)送失敗。
結(jié)論:掌握發(fā)布-訂閱模式和消息發(fā)送失敗處理策略,對于掌握消息隊列的使用至關(guān)重要,可為系統(tǒng)的穩(wěn)定性和擴展性提供保障。