.NET 下 RabbitMQ 隊(duì)列、死信隊(duì)列、延時(shí)隊(duì)列及小應(yīng)用
引言
RabbitMQ 是一款廣泛使用的開源消息代理軟件,它基于 AMQP 協(xié)議,提供了可靠、靈活的消息傳遞服務(wù)。在 .NET 應(yīng)用程序中,我們可以利用 RabbitMQ 來實(shí)現(xiàn)異步通信、解耦服務(wù)、平衡負(fù)載等功能。本文將詳細(xì)介紹如何在 .NET 中使用 RabbitMQ 的隊(duì)列、死信隊(duì)列、延時(shí)隊(duì)列,以及一些實(shí)際應(yīng)用場(chǎng)景。
RabbitMQ 隊(duì)列基礎(chǔ)
安裝 RabbitMQ.Client
在 .NET 項(xiàng)目中使用 RabbitMQ,首先需要安裝 RabbitMQ.Client 庫。可以通過 NuGet 包管理器來安裝:
- 使用包管理器控制臺(tái):
Install-Package RabbitMQ.Client
- 使用 .NET CLI:
dotnet add package RabbitMQ.Client
創(chuàng)建生產(chǎn)者和消費(fèi)者
生產(chǎn)者
生產(chǎn)者負(fù)責(zé)發(fā)送消息到 RabbitMQ 服務(wù)器。以下是一個(gè)簡(jiǎn)單的生產(chǎn)者示例:
using RabbitMQ.Client;
using System.Text;
class Producer
{
public static void SendMessage(string message)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
}
}
消費(fèi)者
消費(fèi)者負(fù)責(zé)從 RabbitMQ 服務(wù)器接收消息。以下是一個(gè)簡(jiǎn)單的消費(fèi)者示例:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
class Consumer
{
public static void ReceiveMessage()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
死信隊(duì)列
死信隊(duì)列(Dead Letter Queue,簡(jiǎn)稱 DLQ)用于存儲(chǔ)和處理那些因?yàn)槟承┰驘o法被正常消費(fèi)的消息。以下是幾種常見的死信隊(duì)列形成場(chǎng)景:
- 消息 TTL(Time To Live)過期
- 隊(duì)列達(dá)到最大長(zhǎng)度
- 消息被拒絕(basic.reject 或 basic.nack)并且 requeue=false
實(shí)現(xiàn)死信隊(duì)列
以下是一個(gè)使用死信隊(duì)列的示例:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
class DeadLetterQueueExample
{
public static void Setup()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
// 聲明死信交換機(jī)和死信隊(duì)列
channel.ExchangeDeclare("dead_letter_exchange", ExchangeType.Direct);
channel.QueueDeclare("dead_letter_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind("dead_letter_queue", "dead_letter_exchange", "dead_letter_routing_key");
// 聲明普通隊(duì)列,并設(shè)置死信交換機(jī)和死信路由鍵
var args = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", "dead_letter_exchange" },
{ "x-dead-letter-routing-key", "dead_letter_routing_key" }
};
channel.QueueDeclare("normal_queue", durable: false, exclusive: false, autoDelete: false, arguments: args);
// 發(fā)送消息到普通隊(duì)列
var body = Encoding.UTF8.GetBytes("This message will be dead lettered.");
channel.BasicPublish("", "normal_queue", null, body);
}
}
public static void ConsumeDeadLetter()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Received dead letter message: {message}");
};
channel.BasicConsume("dead_letter_queue", autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
延時(shí)隊(duì)列
RabbitMQ 本身沒有直接支持延時(shí)隊(duì)列的功能,但可以通過 TTL(Time To Live)+ 死信隊(duì)列的組合來實(shí)現(xiàn)。以下是實(shí)現(xiàn)延時(shí)隊(duì)列的步驟:
- 創(chuàng)建一個(gè)普通隊(duì)列,并設(shè)置其死信交換機(jī)和死信路由鍵。
- 將需要延遲處理的消息發(fā)送到這個(gè)隊(duì)列,并設(shè)置消息的過期時(shí)間(TTL)。
- 當(dāng)消息過期后,RabbitMQ 會(huì)將其發(fā)送到死信隊(duì)列,而死信隊(duì)列可以由消費(fèi)者按照正常的方式進(jìn)行處理。
實(shí)現(xiàn)延時(shí)隊(duì)列
以下是一個(gè)使用延時(shí)隊(duì)列的示例:
using RabbitMQ.Client;
using System.Text;
class DelayQueueExample
{
public static void SendMessage(string message, int delayMilliseconds)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
// 聲明死信交換機(jī)和死信隊(duì)列
channel.ExchangeDeclare("delay_exchange", ExchangeType.Direct);
channel.QueueDeclare("delay_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind("delay_queue", "delay_exchange", "delay_routing_key");
// 聲明延時(shí)隊(duì)列,并設(shè)置死信交換機(jī)和死信路由鍵
var args = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", "delay_exchange" },
{ "x-dead-letter-routing-key", "delay_routing_key" }
};
channel.QueueDeclare("normal_queue", durable: false, exclusive: false, autoDelete: false, arguments: args);
// 發(fā)送消息到普通隊(duì)列,并設(shè)置 TTL
var properties = channel.CreateBasicProperties();
properties.Expiration = delayMilliseconds.ToString();
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", "normal_queue", properties, body);
}
}
}
小應(yīng)用示例
訂單超時(shí)自動(dòng)取消
假設(shè)我們有一個(gè)在線商城,用戶下單后需要在指定時(shí)間內(nèi)完成支付,否則訂單將自動(dòng)取消。我們可以使用延時(shí)隊(duì)列來實(shí)現(xiàn)這一功能:
- 用戶下單時(shí),將訂單信息發(fā)送到延時(shí)隊(duì)列,并設(shè)置 TTL 為指定的超時(shí)時(shí)間。
- 如果用戶在超時(shí)時(shí)間內(nèi)完成支付,可以從延時(shí)隊(duì)列中移除該訂單的消息。
- 如果用戶未在超時(shí)時(shí)間內(nèi)完成支付,訂單消息將被發(fā)送到死信隊(duì)列。
- 一個(gè)專門的消費(fèi)者監(jiān)聽死信隊(duì)列,當(dāng)收到訂單消息時(shí),自動(dòng)取消該訂單,并進(jìn)行相應(yīng)的后續(xù)處理。
日志記錄
在分布式系統(tǒng)中,日志記錄是一個(gè)重要的功能。我們可以使用 RabbitMQ 的隊(duì)列來實(shí)現(xiàn)日志的異步記錄:
- 各個(gè)服務(wù)在生成日志時(shí),將日志信息發(fā)送到一個(gè)日志隊(duì)列。
- 一個(gè)專門的日志服務(wù)監(jiān)聽日志隊(duì)列,當(dāng)收到日志消息時(shí),將其存儲(chǔ)到日志數(shù)據(jù)庫或文件系統(tǒng)中。
任務(wù)調(diào)度
RabbitMQ 可以用于實(shí)現(xiàn)任務(wù)調(diào)度系統(tǒng):
- 將需要執(zhí)行的任務(wù)發(fā)送到任務(wù)隊(duì)列,每個(gè)任務(wù)可以包含任務(wù)的詳細(xì)信息和執(zhí)行時(shí)間。
- 任務(wù)消費(fèi)者從任務(wù)隊(duì)列中獲取任務(wù),并根據(jù)任務(wù)的執(zhí)行時(shí)間將其放入延時(shí)隊(duì)列。
- 當(dāng)任務(wù)的執(zhí)行時(shí)間到達(dá)時(shí),任務(wù)消息從延時(shí)隊(duì)列中釋放,并被任務(wù)消費(fèi)者獲取。
- 任務(wù)消費(fèi)者執(zhí)行任務(wù),并將任務(wù)的執(zhí)行結(jié)果發(fā)送到結(jié)果隊(duì)列。
結(jié)論
RabbitMQ 提供了強(qiáng)大的消息隊(duì)列功能,在 .NET 應(yīng)用程序中,我們可以利用其隊(duì)列、死信隊(duì)列、延時(shí)隊(duì)列等特性,實(shí)現(xiàn)異步通信、任務(wù)調(diào)度、日志記錄。