Webman使用RabbitMQ消息中間件實現(xiàn)系統(tǒng)異步解耦實戰(zhàn)教程
簡介
RabbitMQ是一個開源的消息代理軟件,它使用高級消息隊列協(xié)議(AMQP)來實現(xiàn)消息的發(fā)送和接收。RabbitMQ支持多種消息協(xié)議,包括STOMP、MQTT等,并且能夠與多種編程語言和平臺集成,如Java、.NET、Python等。
AMQP 即 Advanced Message Queuing Protocol(高級消息隊列協(xié)議),是一個網(wǎng)絡協(xié)議,是應用層協(xié)議的一個開放標準,為面向消息的中間件設計。基于此協(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產(chǎn)品,不同的開發(fā)語言等條件的限制。
基本架構設計
圖片來源:https://blog.csdn.net/cuierdan/article/details/123824300
基本概念
- Message Broker:(消息代理服務器)是一個虛擬的概念,而RabbitMQ是Message Broker的一個實例。
- Producer:(生產(chǎn)者)產(chǎn)生數(shù)據(jù)并將數(shù)據(jù)發(fā)送到消息代理服務器(Message Broker)的程序被稱作消息的生產(chǎn)者。
- Connection:(連接)生產(chǎn)者與消費者通過TCP協(xié)議與消息代理服務器(Message Broker)創(chuàng)建的連接。
- Channel:(信道)創(chuàng)建在Connection中的虛擬連接,類似于連接數(shù)據(jù)庫時的連接池的概念,生產(chǎn)者和消費者并不是直接與MQ通過Connection進行通訊的,而是通過Channel進行連接通訊的,數(shù)據(jù)的流動是在Channel中進行的。
- VirtualHost:(虛擬消息服務器)就像mysql數(shù)據(jù)庫中有數(shù)據(jù)庫實例的概念,并且可以指定用戶對庫和表等操作的設置權限。也可以類別成LINUX系統(tǒng)中的不同用戶,不同用戶之間是相互獨立的。每個VirtualHost相當于一個相對獨立mini的RabbitMQ服務器。每個VirtutalHost之間是相互隔離的,exchange,queue,message等不能互通。
- Exchange:(交換機)交換機直接與Channel(信道)連接,接收來自于消息生產(chǎn)者產(chǎn)生的數(shù)據(jù),在由Exchange將消息路由到一個或多個Queue中(或者丟棄)。Exchange并不存儲消息。RabbitMQ的交換機有fanout(扇出),direct(直接),topic(主題),headers(標題)四種類型,每種交換機類型都對應著不同的路由規(guī)則,根據(jù)不同的路由規(guī)則,交換機會將消息路由到不同的隊列中。
- Binding: (綁定)交換機與隊列之間的虛擬連接,在這個綁定中可以設置Binding Key,一個綁定就是用一個Binding Key將交換器和隊列連接起來,設置的Binding Key存在著一定的規(guī)則,Exchange會將消息中攜帶的Routing Key與Binding Key 中設置的規(guī)則進行匹配,將消息發(fā)送到相應的隊列中。Binding信息被保存到Exchange中的查詢表中,用于Exchange將消息分發(fā)到隊列的依據(jù)。
- Routing Key:(路由鍵)用于匹配路由規(guī)則的依據(jù),生產(chǎn)者在將消息發(fā)送到Exchange時,一般會指定一個Routing Key,交換機會根據(jù)Routing Key 來匹配Binding中設置的路由規(guī)則,將符合規(guī)則的消息發(fā)送到指定的隊列中。
- Queue:(消息隊列)RabbitMQ中的內(nèi)部對象用于存放消息的容器,RabbitMQ會將消息按照RabbitMQ的六大模式中的一種將隊列中的消息發(fā)送給消費者,RabbitMQ會根據(jù)選擇模式的不同將隊列中的消息發(fā)送給一個或多個消費者,在連接到消費者之前,消息一直在等待消費者到隊列中將消息取走。
- Consumer:(消費者)消息的消費者,表示一個從隊列中取消息的應用程序。
特點
- 可靠性:RabbitMQ使用一些機制來保證可靠性, 如持久化、傳輸確認及發(fā)布確認等。
- 靈活的路由:在消息進入隊列之前,通過交換器來路由消息。
- 擴展性:多個RabbitMQ節(jié)點可以組成一個集群,也可以根據(jù)實際業(yè)務情況動態(tài)地擴展 集群中節(jié)點。
- 高可用性:隊列可以在集群中的機器上設置鏡像,使得在部分節(jié)點出現(xiàn)問題的情況下隊 列仍然可用。
- 多種協(xié)議:RabbitMQ除了原生支持AMQP協(xié)議,還支持STOMP, MQTT等多種消息 中間件協(xié)議。
- 支持多語言客戶端:RabbitMQ 幾乎支持所有常用語言,比如 Java、 Python、 Ruby、 PHP、 C#、 JavaScript 等。
主要功能
- 消息隊列:允許應用程序?qū)⑾l(fā)送到隊列中,然后由另一個應用程序從隊列中取出并處理。
- 消息路由:支持將消息從發(fā)送者路由到一個或多個接收者。
- 消息持久化:確保消息在系統(tǒng)故障后不會丟失。
- 消息確認:確保消息被正確處理,如果處理失敗,可以重新發(fā)送。
- 集群:支持在多個節(jié)點上運行,以提供高可用性和負載均衡。
安裝
這里使用1Panel安裝,1Panel 是一個現(xiàn)代化、開源的 Linux 服務器運維管理面板
應用商店
圖片
安裝鏡像
圖片
注:這里需要勾選【端口外部訪問】,方便本地調(diào)試
鏡像
安裝成功后,就可以在鏡像中找到已安裝好的RabbitMQ鏡像容器
圖片
外網(wǎng)可訪問地址:http://{{你的公網(wǎng)ip}}:15672,如果是在云服務器,記得安全策略放開端口15672
圖片
- RabbitMQ管理界面端口:15672。是一個Web應用程序,用于管理和監(jiān)控RabbitMQ消息代理
- AMQP默認端口:5672。是一種網(wǎng)絡協(xié)議,用于在應用程序之間傳遞消息,通常用于消息隊列系統(tǒng)。
控制臺
登陸控制臺,賬號和密碼都是rabbitmq
圖片
使用
這里使用webman插件RabbitMQ客戶端,插件地址:https://www.workerman.net/plugin/67,
非常感謝兔子大佬的插件貢獻!??
非常感謝兔子大佬的插件貢獻!??
非常感謝兔子大佬的插件貢獻!??
插件特點
- 支持5種消費模式:簡單隊列、workQueue、routing、pub/sub、exchange;
- 支持延遲隊列(RabbitMQ須安裝插件);
- 異步無阻塞消費、異步無阻塞生產(chǎn)、同步阻塞生產(chǎn)
安裝插件
通過composer包管理安裝:
composer require workbunny/webman-rabbitmq
注:安裝該插件前,請確保你已經(jīng)安裝好webman框架,相關安裝文檔:https://www.workerman.net/doc/webman/install.html
配置
插件所有配置文件路徑:config/plugin/workbunny/webman-rabbitmq/app.php
<?php
return [
'enable' => true,
'host' => '120.120.120.74',
'vhost' => '/',
'port' => 5672,
'username' => 'rabbitmq',
'password' => 'rabbitmq',
'mechanisms' => 'AMQPLAIN',
...
];
- host 修改為服務器公網(wǎng)ip
- port 修改為15672
消費者
這里使用命令創(chuàng)建一個擁有單進程消費者的RestyBuilder
./webman workbunny:rabbitmq-builder resty --mode=queue
?? Config updated.
?? Builder created.
? Builder RestyBuilder created successfully.
創(chuàng)建完成后完整的消費者文件位置process/workbunny/rabbitmq/RestyBuilder.php
<?php declare(strict_types=1);
namespace process\workbunny\rabbitmq;
use Bunny\Channel as BunnyChannel;
use Bunny\Async\Client as BunnyClient;
use Bunny\Message as BunnyMessage;
use Workbunny\WebmanRabbitMQ\Constants;
use Workbunny\WebmanRabbitMQ\Builders\QueueBuilder;
class RestyBuilder extends QueueBuilder
{
/**
* @var array = [
* 'name' => 'example',
* 'delayed' => false,
* 'prefetch_count' => 1,
* 'prefetch_size' => 0,
* 'is_global' => false,
* 'routing_key' => '',
* ]
*/
protected array $queueConfig = [
// 隊列名稱 ,默認由類名自動生成
'name' => 'process.workbunny.rabbitmq.RestyBuilder',
// 是否延遲
'delayed' => false,
// QOS 數(shù)量
'prefetch_count' => 0,
// QOS size
'prefetch_size' => 0,
// QOS 全局
'is_global' => false,
// 路由鍵
'routing_key' => '',
];
/** @var string 交換機類型 */
protected string $exchangeType = Constants::DIRECT;
/** @var string|null 交換機名稱,默認由類名自動生成 */
protected ?string $exchangeName = 'process.workbunny.rabbitmq.RestyBuilder';
/** @inheritDoc */
public function handler(BunnyMessage $message, BunnyChannel $channel, BunnyClient $client): string
{
echo '[RabbitMQ][隊列消費] Tag:' .$message->consumerTag. PHP_EOL;
echo '[RabbitMQ][隊列消費著] Content:' .$message->content. PHP_EOL;
echo '[RabbitMQ][隊列消費著] Exchange:' .$message->exchange. PHP_EOL;
return Constants::ACK;
}
}
啟動webman
php start.php start
Workerman[start.php] start in DEBUG mode
----------------------------------------------------------------------- WORKERMAN -----------------------------------------------------------------------
Workerman version:4.1.15 PHP version:8.2.18 Event-Loop:\Workerman\Events\Event
------------------------------------------------------------------------ WORKERS ------------------------------------------------------------------------
proto user worker listen processes status
tcp root webman http://0.0.0.0:8217 2 [OK]
tcp root monitor none 1 [OK]
tcp root plugin.workbunny.webman-rabbitmq.process.workbunny.rabbitmq.RestyBuilder none 1 [OK]
---------------------------------------------------------------------------------------------------------------------------------------------------------
生產(chǎn)者
use function Workbunny\WebmanRabbitMQ\sync_publish;
use process\workbunny\rabbitmq\RestyBuilder;
sync_publish(RestyBuilder::instance(), '兔子大佬你好呀!'); # return bool
消費結(jié)果
[RabbitMQ][隊列消費] Tag:process.workbunny.rabbitmq.RestyBuilder
[RabbitMQ][隊列消費著] Content:開源技術小棧你好呀!
[RabbitMQ][隊列消費著] Exchange:process.workbunny.rabbitmq.RestyBuilder
...
[RabbitMQ][隊列消費] Tag:process.workbunny.rabbitmq.RestyBuilder
[RabbitMQ][隊列消費著] Content:兔子大佬你好呀!
[RabbitMQ][隊列消費著] Exchange:process.workbunny.rabbitmq.RestyBuilder
圖片
RabbitMQ管理界面
通過RabbitMQ管理界面端發(fā)送消息
消費者消費情況