超詳細(xì)的RabbitMQ入門與實戰(zhàn)介紹,看這篇文章就夠了
一、前情提示
上一篇文章《?教你面試的時候如何迅速完成90%以上的海量數(shù)據(jù)處理題?》,我們已經(jīng)給出了一整套的數(shù)據(jù)一致性的保障方案。
我們從如下三個角度,給出了方案如何實現(xiàn)。并且通過數(shù)據(jù)平臺和電商系統(tǒng)進(jìn)行了舉例分析。
- 核心數(shù)據(jù)的監(jiān)控。
- 數(shù)據(jù)鏈路追蹤。
- 自動化數(shù)據(jù)鏈路分析。
目前為止,我們的架構(gòu)圖大概如下所示:
并且咱們之前對于這種架構(gòu)下,如何基于MQ進(jìn)行解耦的實現(xiàn)也做了詳細(xì)的說明。
那么這篇文章,我們就基于這個架構(gòu),在數(shù)據(jù)一致性方面做進(jìn)一步的說明。同樣,我們以RabbitMQ這個消息中間件來舉例。
二、選擇性的訂閱部分核心數(shù)據(jù)
?首先一個基于MQ實現(xiàn)的細(xì)節(jié)點就在于,比如對數(shù)據(jù)監(jiān)控系統(tǒng)而言,他可能僅僅只是要從MQ里訂閱部分?jǐn)?shù)據(jù)來消費罷了。
這個是啥意思呢?因為比如實時計算平臺他是會將自己計算出來的所有的數(shù)據(jù)指標(biāo)都投遞到MQ里去的。
但是這些數(shù)據(jù)指標(biāo)可能是多達(dá)幾十個甚至是幾百個的,這里面不可能所有數(shù)據(jù)指標(biāo)都是核心數(shù)據(jù)吧?
基本上按照我們過往經(jīng)驗而言,對于這種數(shù)據(jù)類的系統(tǒng)核心數(shù)據(jù)指標(biāo),大概就占到10%左右的比例而已。
然后對于數(shù)據(jù)查詢平臺而言,他可能是需要把所有的數(shù)據(jù)指標(biāo)都消費出來,然后落地到自己的存儲里去的。
但是對于數(shù)據(jù)監(jiān)控系統(tǒng)而言,他只需要過濾出10%的核心數(shù)據(jù)指標(biāo)即可,所以他需要的是有選擇性的訂閱數(shù)據(jù)。
咱們看看下面的圖,立馬就明白是什么意思了。?
三、RabbitMQ的queue與exchange的綁定
不知道大家是否還記得之前講解基于RabbitMQ實現(xiàn)多系統(tǒng)訂閱同一份數(shù)據(jù)的場景。
我們采用的是每個系統(tǒng)使用自己的一個queue,但是都綁定到一個fanout exchange上去,然后生產(chǎn)者直接投遞數(shù)據(jù)到fanout exchange。
fanout exchange會分發(fā)一份數(shù)據(jù),綁定到自己的所有queue上去,然后各個系統(tǒng)都會從自己的queue里拿到相同的一份數(shù)據(jù)。
大家再看看下面的圖回顧一下。
在這里有一個關(guān)鍵的代碼如下所示:
?也就是說,把自己創(chuàng)建的queue綁定到exchange上去,這個綁定關(guān)系在RabbitMQ里有一個專業(yè)的術(shù)語叫做:binding。
四、direct exchange實現(xiàn)消息路由
如果僅僅使用之前的fanout exchange,那么是無法實現(xiàn)不同的系統(tǒng)按需訂閱數(shù)據(jù)的,如果要實現(xiàn)允許不同的系統(tǒng)按需訂閱數(shù)據(jù),那么需要使用direct exchange。
direct exchange允許你在投遞消息的時候,給每個消息打上一個routing key。同時direct exchange還允許binding到自己的queue指定一個binding key。
這樣,direct exchange就會根據(jù)消息的routing key將這個消息路由到相同binding key對應(yīng)的queue里去,這樣就可以實現(xiàn)不同的系統(tǒng)按需訂閱數(shù)據(jù)了。
說了這么多,是不是感覺有點暈,老規(guī)矩,咱們來一張圖,直觀的感受一下怎么回事兒:
而且一個queue是可以使用多個binding key的,比如說使用“k1”和“k2”兩個binding key的話,那么routing key為“k1”和“k2”的消息都會路由到那個queue里去。
同時不同的queue也可以指定相同的ruoting key,這個時候就跟fanout exchange其實是一樣的了,一個消息會同時路由到多個queue里去。
五、按需訂閱的代碼實現(xiàn)
首先在生產(chǎn)者那塊,比如說實時計算平臺吧,他就應(yīng)該是要定義一個direct exchange了。
如下代碼所示,所有的數(shù)據(jù)都是投遞到這個exchange里去,比如我們這里使用的exchange名字就是“rt_data”,意思就是實時數(shù)據(jù)計算結(jié)果,類型是“direct”:
channel.exchangeDeclare(
"rt_data",
"direct");
而且,在投遞消息的時候,要給一個消息打上標(biāo)簽,也就是他的routing key,表明這個消息是普通數(shù)據(jù)還是核心數(shù)據(jù),這樣才能實現(xiàn)路由,如下代碼所示:
上面第一個參數(shù)是指定要投遞到哪個exchange里去,第二個參數(shù)就是routing key,這里的“common_data”代表了是普通數(shù)據(jù),也可以用“core_data”代表核心數(shù)據(jù),實時計算平臺根據(jù)自己的情況指定普通或者核心數(shù)據(jù)。
然后消費者在進(jìn)行queue和exchange的binding的時候,需要指定binding key,代碼如下所示:
上面第一行就是在消費者那里,比如數(shù)據(jù)監(jiān)控系統(tǒng)那里,也是定義一下direct exchange。
然后第二行就是定義一個“rt_data_monitor“這個queue。
第三行就是對queue和exchange進(jìn)行綁定,指定了binding key是“core_data”。
如果是數(shù)據(jù)查詢系統(tǒng),他是普通數(shù)據(jù)和核心數(shù)據(jù)都要的,那么就可以在binding key里指定多個值,用逗號隔開,如下所示:
channel.queueBind(
"rt_data_query",
"rt_data",
"common_data, core_data");
到這里,大家就明白如何對數(shù)據(jù)打上不同的標(biāo)簽(也就是routing key),然后讓不同的系統(tǒng)按需訂閱自己需要的數(shù)據(jù)了(也就是指定binding key),這種方式用到了direct exchange這種類型,非常的靈活。
最后,再看看之前畫的那幅圖,大家再來感受一下即可:
六、更加強(qiáng)大而且靈活的按需訂閱
?RabbitMQ 還支持更加強(qiáng)大而且靈活的按需數(shù)據(jù)訂閱,也就是使用topic exchange,其實跟direct exchange是類似的,只不過功能更加的強(qiáng)大罷了。
比如說你定義一個topic exchange,然后routing key就需要指定為用點號隔開的多個單詞,如下所示:?
然后,你在設(shè)置binding key的時候,他是支持通配符的。 * 匹配一個單詞,# 匹配0個或者多個單詞,比如說你的binding key可以這么來設(shè)置:
這個product.*.* ,就會跟“product.common.data”匹配上,意思就是,可能某個系統(tǒng)就是對商品類的數(shù)據(jù)指標(biāo)感興趣,不管是普通數(shù)據(jù)還是核心數(shù)據(jù)。
所以到這里,大家就應(yīng)該很容易明白了,通過RabbitMQ的direct、topic兩種exchange,我們可以輕松實現(xiàn)各種強(qiáng)大的數(shù)據(jù)按需訂閱的功能。
通過本文,我們就將最近講的數(shù)據(jù)一致性保障方案里的一些MQ中間件落地的細(xì)節(jié)給大家說明白了。