SpringBoot整合RabbitMQ延遲隊列&優(yōu)先級隊列詳解
延遲隊列
延遲隊列:簡單說就是發(fā)送出去的消息經過給定的時間后,消費者才能看見消息(消費消息)。
這里簡單說下步驟:
- 創(chuàng)建一個隊列,如:bs-queue, 設置死信交換機(死信交換機路由key(這是可選的))及隊列,如:dead-exchange; 消息的消費端監(jiān)聽該dead-queue隊列。設置消息有效期參數x-message-ttl參數(值為自己需要延遲的時間,單位:毫秒)。
- 發(fā)送消息發(fā)送到bs-queue上。由于消息消費端監(jiān)聽的是死信隊列,所以只需要等待指定的時間后消息會自動被轉發(fā)到死信隊列上(dead-queue)。
- 消息的消費端監(jiān)聽dead-queu隊列即可。
優(yōu)先級隊列
優(yōu)先級隊列是在RabbitMQ3.5.0之后的版本才支持的。
具有高優(yōu)先級的隊列具有高的優(yōu)先權,優(yōu)先級高的消息具備優(yōu)先被消費的特權。
隊列的優(yōu)先級通過x-max-priority參數設置。
建立一個priority-exchange交換機,類型:direct。
圖片
建立一個priority-queue隊列,并與priority-exchange綁定。
圖片
設置x-max-priority參數的值為100,表示最大優(yōu)先級為100。
注意:x-max-priority參數的值應該介于1到255。建議使用1到10之間的隊列。如果設置的優(yōu)先級更大將使用更多的Erlang進程消耗更多的CPU資源。運行時調度也會受到影響。
接下來演示優(yōu)先級隊列
我們先只發(fā)送消息,然后再把消息的消費功能打開。
發(fā)送消息接口:
@GetMapping("/sendPriority")
public Object sendPriority(String msg, Integer priority) {
ms.sendPriorityQueue(msg, priority) ;
return "success" ;
}
public void sendPriorityQueue(String msg, Integer priority) {
logger.info("準備發(fā)送消息:{}", msg);
Message message = MessageBuilder.withBody(msg.getBytes()).setPriority(priority).build() ;
rabbitTemplate.convertAndSend("priority-exchange", "pe.msg", message) ;
}
發(fā)送4條消息:
// 第一條消息
msg=第一條消息&priority=2
// 第二條消息
msg=第二條消息&priority=10
// 第三條消息
msg=第三條消息&priority=1
// 第四條消息
msg=第四條消息&priority=7
查看消息隊列:
圖片
消息消費端:
@RabbitListener(queues = { "priority-queue" })
@RabbitHandler
public void listenerPriority(Message message, Channel channel) {
System.out.println("接受到消息.....income");
byte[] body = message.getBody();
MessageProperties mps = message.getMessageProperties();
String content = new String(body, Charset.forName("UTF-8"));
try {
System.out.println("接受到消息來自交換機: 【" + mps.getReceivedExchange() + "】, 隊列:【" + mps.getConsumerQueue()+ "】:\n內容: " + content);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
} catch (Exception e) {
e.printStackTrace();
try {
channel.basicReject(mps.getDeliveryTag(), false);
} catch (IOException e1) {
e1.printStackTrace() ;
}
}
}
啟動服務
圖片
根據打印出的結果,正好是我們設置優(yōu)先級的順序輸出。
上面設置的消息優(yōu)先級都是在指定的范圍<100,如果消息的優(yōu)先級超過這個值會怎么樣呢?
發(fā)送8條消息:
// 第一條消息
msg=第一條消息&priority=2
// 第二條消息
msg=第二條消息&priority=10
// 第三條消息
msg=第三條消息&priority=1
// 第四條消息
msg=第四條消息&priority=7
// 第五條消息
msg=第五條消息&priority=101
消費消息:
圖片
同樣是按照順序輸出的。