工作中這樣用MQ,很香!
前言
消息隊(duì)列(MQ)是分布式系統(tǒng)中不可或缺的技術(shù)之一。
對很多小伙伴來說,剛接觸MQ時(shí),可能覺得它只是個(gè)“傳話工具”,但用著用著,你會(huì)發(fā)現(xiàn)它簡直是系統(tǒng)的“潤滑劑”。
無論是解耦、削峰,還是異步任務(wù)處理,都離不開MQ的身影。
下面我結(jié)合實(shí)際場景,從簡單到復(fù)雜,逐一拆解MQ的10種經(jīng)典使用方式,希望對你會(huì)有所幫助。
圖片
1. 異步處理:讓系統(tǒng)輕松一點(diǎn)
場景
小伙伴們是不是經(jīng)常遇到這樣的情況:用戶提交一個(gè)操作,比如下單,然后要發(fā)送短信通知。
如果直接在主流程里調(diào)用短信接口,一旦短信服務(wù)響應(yīng)慢,就會(huì)拖累整個(gè)操作。
用戶等得不耐煩,心態(tài)直接崩了。
解決方案
用MQ,把非關(guān)鍵流程抽出來異步處理。下單時(shí),直接把“發(fā)短信”這件事丟給MQ,訂單服務(wù)就能立刻響應(yīng)用戶,而短信的事情讓MQ和消費(fèi)者去搞定。
示例代碼
// 訂單服務(wù):生產(chǎn)者
Order order = createOrder(); // 訂單生成邏輯
rabbitTemplate.convertAndSend("order_exchange", "order_key", order);
System.out.println("訂單已生成,發(fā)短信任務(wù)交給MQ");
// 短信服務(wù):消費(fèi)者
@RabbitListener(queues = "sms_queue")
public void sendSms(Order order) {
System.out.println("發(fā)送短信,訂單ID:" + order.getId());
// 調(diào)用短信服務(wù)接口
}
深度解析
這種方式的好處是:主流程解耦,不受慢服務(wù)的拖累。訂單服務(wù)只管自己的事,短信服務(wù)掛了也沒關(guān)系,MQ會(huì)把消息暫存,等短信服務(wù)恢復(fù)后繼續(xù)處理。
2. 流量削峰:穩(wěn)住系統(tǒng)別崩
場景
每年的“雙十一”電商大促,用戶秒殺商品時(shí)一窩蜂沖進(jìn)來。
突然涌入的高并發(fā)請求,不僅會(huì)壓垮應(yīng)用服務(wù),還會(huì)直接讓數(shù)據(jù)庫“趴窩”。
解決方案
秒殺請求先寫入MQ,后端服務(wù)以穩(wěn)定的速度從MQ中消費(fèi)消息,處理訂單。
這樣既能避免系統(tǒng)被瞬時(shí)流量壓垮,還能提升處理的平穩(wěn)性。
示例代碼
// 用戶提交秒殺請求:生產(chǎn)者
rabbitTemplate.convertAndSend("seckill_exchange", "seckill_key", userRequest);
System.out.println("用戶秒殺請求已進(jìn)入隊(duì)列");
// 秒殺服務(wù):消費(fèi)者
@RabbitListener(queues = "seckill_queue")
public void processSeckill(UserRequest request) {
System.out.println("處理秒殺請求,用戶ID:" + request.getUserId());
// 執(zhí)行秒殺邏輯
}
深度解析
MQ在這里相當(dāng)于一個(gè)緩沖池,把瞬時(shí)流量均勻分布到一段時(shí)間內(nèi)處理。系統(tǒng)穩(wěn)定性提升,用戶體驗(yàn)更好。
3. 服務(wù)解耦:減少相互牽制
場景
比如一個(gè)訂單系統(tǒng)需要通知庫存系統(tǒng)扣減庫存,還要通知支付系統(tǒng)完成扣款。
如果直接用同步接口調(diào)用,服務(wù)間的依賴性很強(qiáng),一個(gè)服務(wù)掛了,整個(gè)鏈條都會(huì)被拖垮。
解決方案
訂單服務(wù)只負(fù)責(zé)把消息丟到MQ里,庫存服務(wù)和支付服務(wù)各自從MQ中消費(fèi)消息。
這樣訂單服務(wù)不需要直接依賴它們。
示例代碼
// 訂單服務(wù):生產(chǎn)者
rabbitTemplate.convertAndSend("order_exchange", "order_key", order);
System.out.println("訂單生成消息已發(fā)送");
// 庫存服務(wù):消費(fèi)者
@RabbitListener(queues = "stock_queue")
public void updateStock(Order order) {
System.out.println("扣減庫存,訂單ID:" + order.getId());
}
// 支付服務(wù):消費(fèi)者
@RabbitListener(queues = "payment_queue")
public void processPayment(Order order) {
System.out.println("處理支付,訂單ID:" + order.getId());
}
深度解析
通過MQ,各個(gè)服務(wù)之間可以實(shí)現(xiàn)松耦合。
即使庫存服務(wù)掛了,也不會(huì)影響訂單生成的流程,大幅提升系統(tǒng)的容錯(cuò)能力。
4. 分布式事務(wù):保證數(shù)據(jù)一致性
場景
訂單服務(wù)需要同時(shí)生成訂單和扣減庫存,這涉及兩個(gè)不同的數(shù)據(jù)庫操作。
如果一個(gè)成功一個(gè)失敗,就會(huì)導(dǎo)致數(shù)據(jù)不一致。
解決方案
通過MQ實(shí)現(xiàn)分布式事務(wù)。
訂單服務(wù)生成訂單后,將扣減庫存的任務(wù)交給MQ,最終實(shí)現(xiàn)數(shù)據(jù)的一致性。
示例代碼
// 訂單服務(wù):生產(chǎn)者
rabbitTemplate.convertAndSend("order_exchange", "order_key", order);
System.out.println("訂單創(chuàng)建消息已發(fā)送");
// 庫存服務(wù):消費(fèi)者
@RabbitListener(queues = "stock_queue")
public void updateStock(Order order) {
System.out.println("更新庫存,訂單ID:" + order.getId());
// 執(zhí)行扣減庫存邏輯
}
深度解析
通過“最終一致性”解決了分布式事務(wù)的難題,雖然短時(shí)間內(nèi)可能有數(shù)據(jù)不一致,但最終狀態(tài)一定是正確的。
5. 廣播通知:一條消息,通知多個(gè)服務(wù)
場景
比如商品價(jià)格調(diào)整,庫存、搜索、推薦服務(wù)都需要同步更新。
如果每個(gè)服務(wù)都要單獨(dú)通知,工作量會(huì)很大。
解決方案
MQ的廣播模式(Fanout)可以讓多個(gè)消費(fèi)者訂閱同一條消息,實(shí)現(xiàn)消息的“一發(fā)多收”。
示例代碼
// 生產(chǎn)者:廣播消息
rabbitTemplate.convertAndSend("price_update_exchange", "", priceUpdate);
System.out.println("商品價(jià)格更新消息已廣播");
// 消費(fèi)者1:庫存服務(wù)
@RabbitListener(queues = "stock_queue")
public void updateStockPrice(PriceUpdate priceUpdate) {
System.out.println("庫存價(jià)格更新:" + priceUpdate.getProductId());
}
// 消費(fèi)者2:搜索服務(wù)
@RabbitListener(queues = "search_queue")
public void updateSearchPrice(PriceUpdate priceUpdate) {
System.out.println("搜索價(jià)格更新:" + priceUpdate.getProductId());
}
深度解析
這種模式讓多個(gè)服務(wù)都能接收到同一條消息,擴(kuò)展性非常強(qiáng)。
6. 日志收集:分布式日志集中化
場景
多個(gè)服務(wù)產(chǎn)生的日志需要統(tǒng)一存儲(chǔ)和分析。
如果直接寫數(shù)據(jù)庫,可能導(dǎo)致性能瓶頸。
解決方案
各服務(wù)將日志寫入MQ,日志分析系統(tǒng)從MQ中消費(fèi)消息并統(tǒng)一處理。
示例代碼
// 服務(wù)端:生產(chǎn)者
rabbitTemplate.convertAndSend("log_exchange", "log_key", logEntry);
System.out.println("日志已發(fā)送");
// 日志分析服務(wù):消費(fèi)者
@RabbitListener(queues = "log_queue")
public void processLog(LogEntry log) {
System.out.println("日志處理:" + log.getMessage());
// 存儲(chǔ)或分析邏輯
}
7. 延遲任務(wù):定時(shí)觸發(fā)操作
場景
用戶下單后,如果30分鐘內(nèi)未支付,需要自動(dòng)取消訂單。
解決方案
使用MQ的延遲隊(duì)列功能,設(shè)置消息延遲消費(fèi)的時(shí)間。
示例代碼
// 生產(chǎn)者:發(fā)送延遲消息
rabbitTemplate.convertAndSend("delay_exchange", "delay_key", order, message -> {
message.getMessageProperties().setDelay(30 * 60 * 1000); // 延遲30分鐘
return message;
});
System.out.println("訂單取消任務(wù)已設(shè)置");
// 消費(fèi)者:處理延遲消息
@RabbitListener(queues = "delay_queue")
public void cancelOrder(Order order) {
System.out.println("取消訂單:" + order.getId());
// 取消訂單邏輯
}
8. 數(shù)據(jù)同步:跨系統(tǒng)保持?jǐn)?shù)據(jù)一致
場景
在一個(gè)分布式系統(tǒng)中,多個(gè)服務(wù)依賴同一份數(shù)據(jù)源。
例如,電商平臺(tái)的訂單狀態(tài)更新后,需要同步到緩存系統(tǒng)和推薦系統(tǒng)。
如果讓每個(gè)服務(wù)直接從數(shù)據(jù)庫拉取數(shù)據(jù),會(huì)增加數(shù)據(jù)庫壓力,還可能出現(xiàn)延遲或不一致的問題。
解決方案
利用MQ進(jìn)行數(shù)據(jù)同步。訂單服務(wù)更新訂單狀態(tài)后,將更新信息發(fā)送到MQ,緩存服務(wù)和推薦服務(wù)從MQ中消費(fèi)消息并同步數(shù)據(jù)。
示例代碼
訂單服務(wù):生產(chǎn)者
// 更新訂單狀態(tài)后,將消息發(fā)送到MQ
Order order = updateOrderStatus(orderId, "PAID"); // 更新訂單狀態(tài)為已支付
rabbitTemplate.convertAndSend("order_exchange", "order_status_key", order);
System.out.println("訂單狀態(tài)更新消息已發(fā)送:" + order.getId());
緩存服務(wù):消費(fèi)者
@RabbitListener(queues = "cache_update_queue")
public void updateCache(Order order) {
System.out.println("更新緩存,訂單ID:" + order.getId() + " 狀態(tài):" + order.getStatus());
// 更新緩存邏輯
cacheService.update(order.getId(), order.getStatus());
}
推薦服務(wù):消費(fèi)者
@RabbitListener(queues = "recommendation_queue")
public void updateRecommendation(Order order) {
System.out.println("更新推薦系統(tǒng),訂單ID:" + order.getId() + " 狀態(tài):" + order.getStatus());
// 更新推薦服務(wù)邏輯
recommendationService.updateOrderStatus(order);
}
深度解析
通過MQ實(shí)現(xiàn)數(shù)據(jù)同步的好處是:
- 減輕數(shù)據(jù)庫壓力:避免多個(gè)服務(wù)同時(shí)查詢數(shù)據(jù)庫。
- 最終一致性:即使某個(gè)服務(wù)處理延遲,MQ也能保障消息不丟失,最終所有服務(wù)的數(shù)據(jù)狀態(tài)一致。
9. 分布式任務(wù)調(diào)度
場景
有些任務(wù)需要定時(shí)執(zhí)行,比如每天凌晨清理過期訂單。
這些訂單可能分布在多個(gè)服務(wù)中,如果每個(gè)服務(wù)獨(dú)立運(yùn)行定時(shí)任務(wù),可能會(huì)出現(xiàn)重復(fù)處理或任務(wù)遺漏的問題。
解決方案
使用MQ統(tǒng)一分發(fā)調(diào)度任務(wù),每個(gè)服務(wù)根據(jù)自身的業(yè)務(wù)需求,從MQ中消費(fèi)任務(wù)并執(zhí)行。
示例代碼
任務(wù)調(diào)度服務(wù):生產(chǎn)者
// 定時(shí)任務(wù)生成器
@Scheduled(cron = "0 0 0 * * ?") // 每天凌晨觸發(fā)
public void generateTasks() {
List<Task> expiredTasks = taskService.getExpiredTasks();
for (Task task : expiredTasks) {
rabbitTemplate.convertAndSend("task_exchange", "task_routing_key", task);
System.out.println("任務(wù)已發(fā)送:" + task.getId());
}
}
訂單服務(wù):消費(fèi)者
@RabbitListener(queues = "order_task_queue")
public void processOrderTask(Task task) {
System.out.println("處理訂單任務(wù):" + task.getId());
// 執(zhí)行訂單清理邏輯
orderService.cleanExpiredOrder(task);
}
庫存服務(wù):消費(fèi)者
@RabbitListener(queues = "stock_task_queue")
public void processStockTask(Task task) {
System.out.println("處理庫存任務(wù):" + task.getId());
// 執(zhí)行庫存釋放邏輯
stockService.releaseStock(task);
}
深度解析
分布式任務(wù)調(diào)度可以解決:
- 重復(fù)執(zhí)行:每個(gè)服務(wù)只處理自己隊(duì)列中的任務(wù)。
- 任務(wù)遺漏:MQ確保任務(wù)可靠傳遞,防止任務(wù)丟失。
10. 文件處理:異步執(zhí)行大文件任務(wù)
場景
用戶上傳一個(gè)大文件后,需要對文件進(jìn)行處理(如格式轉(zhuǎn)換、壓縮等)并存儲(chǔ)。
如果同步執(zhí)行這些任務(wù),前端頁面可能會(huì)一直加載,導(dǎo)致用戶體驗(yàn)差。
解決方案
用戶上傳文件后,立即將任務(wù)寫入MQ,后臺(tái)異步處理文件,處理完成后通知用戶或更新狀態(tài)。
示例代碼
上傳服務(wù):生產(chǎn)者
// 上傳文件后,將任務(wù)寫入MQ
FileTask fileTask = new FileTask();
fileTask.setFileId(fileId);
fileTask.setOperation("COMPRESS");
rabbitTemplate.convertAndSend("file_task_exchange", "file_task_key", fileTask);
System.out.println("文件處理任務(wù)已發(fā)送,文件ID:" + fileId);
文件處理服務(wù):消費(fèi)者
@RabbitListener(queues = "file_task_queue")
public void processFileTask(FileTask fileTask) {
System.out.println("處理文件任務(wù):" + fileTask.getFileId() + " 操作:" + fileTask.getOperation());
// 模擬文件處理邏輯
if ("COMPRESS".equals(fileTask.getOperation())) {
fileService.compressFile(fileTask.getFileId());
} else if ("CONVERT".equals(fileTask.getOperation())) {
fileService.convertFileFormat(fileTask.getFileId());
}
// 更新任務(wù)狀態(tài)
taskService.updateTaskStatus(fileTask.getFileId(), "COMPLETED");
}
前端輪詢或回調(diào)通知
// 前端輪詢文件處理狀態(tài)
setInterval(() => {
fetch(`/file/status?fileId=${fileId}`)
.then(response => response.json())
.then(status => {
if (status === "COMPLETED") {
alert("文件處理完成!");
}
});
}, 5000);
深度解析
異步文件處理的優(yōu)勢:
- 提升用戶體驗(yàn):主線程迅速返回,減少用戶等待時(shí)間。
- 后臺(tái)任務(wù)靈活擴(kuò)展:支持多種操作邏輯,適應(yīng)復(fù)雜文件處理需求。
總結(jié)
消息隊(duì)列不只是傳遞消息的工具,更是系統(tǒng)解耦、提升穩(wěn)定性和擴(kuò)展性的利器。
在這10種經(jīng)典場景中,每一種都能解決特定的業(yè)務(wù)痛點(diǎn)。