基于Topic的消息發(fā)布與消費模式
閑話
朋友們,好久不見,不知道你們最近怎樣,但相信你們一定都挺好。已經(jīng)有一段時間沒有更新了,個中原因不好細說,但是歸根結(jié)底也許是自己懶。這個不好,大家不要學(xué)。今天主要就是想分享一下關(guān)于消息處理機制的一些想法。
基本概念
1.Topic
同一個topic下消息的格式一致,例如topic為order-update-message消息的格式都是一個統(tǒng)一的OrderUpdateMessage的結(jié)構(gòu)
2.key主鍵
同一主鍵下的消息列表具有順序性,例如key為訂單號order-0001的消息列表(Queue)下,可能包含的消息列表(Queue)如下:
OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="modifying", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)
3.Group消費者組
同一個topic下同一個group下的消費者,對這個group下的消息隊列進行搶占式消費。例如同一個消費者組group-1下的消費者consumer-1和消費者consumer-2,以及另外一個消費者組group-2下的消費者consumer-3,消息消費的結(jié)果可能如下:
// consumer-1消費的消息
OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
// consumer-2消費的消息
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="modified", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)
// consumer-3消費的消息
OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="modified", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)
Kafka的消息處理機制就是以這樣的形式實現(xiàn)的。
4.優(yōu)勢
生產(chǎn)者和消費者完全解耦,生產(chǎn)者無需關(guān)注是否有消費者在消費,消費者也無需知道生產(chǎn)者是否在生成新的消息。
生產(chǎn)者只關(guān)注消息是否成功的發(fā)送到消息處理中間件,消費者只關(guān)注能否從消息處理中間件消費到消息。
消費者可以按組消費,同組內(nèi)的消費者進行搶占式消費。
RabbitMq中的優(yōu)秀實踐
1.RabbitMq消息處理機制
生產(chǎn)者講帶有指定RoutingKey的消息發(fā)送到對應(yīng)的Exchange上,Exchange通過Binding定義的路由規(guī)格,將消息按照BindingKey分發(fā)到不同的Queue上,消費者從Queue拉取消息消費。
- Exchange & RoutingKey & Topic:RoutingKey決定了消息會被發(fā)送到哪個Exchange上,這和topic是類似的概念。
- Bind & BindingKey & Group:Exchange根據(jù)Binding定義的路由規(guī)格,將消息按照BindingKey分發(fā)到不同的Queue上,這里可以認為是對應(yīng)了Group的概念。
- Queue & Group:Queue則是維護了一個Group下的某個隊列下的所有消息。
優(yōu)秀實踐
因此如果要以RabbitMq實現(xiàn)基于Topic和Group實現(xiàn)的消息生產(chǎn)和消費的機制,可以將消息定義成以下類似的結(jié)構(gòu):
// Exchange: {value="order-update", type="fanout"}
// binding1: {value="promotion-service", bindingKey="order.*.paid"}
// binding2: {value='inventory-service', bindingKey="order.*"}
OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="modified", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)
OrderUpdateMessage(id="msg-0006", orderId = "order-0002", action="paid", ...)
假設(shè)此時有promotion-service(1個實例)和inventory-service(2個實例)兩個消費者消費消息,則對應(yīng)的消息消費的結(jié)果可能是:
// inventory-service
// Exchange: {value="order-update", type="fanout"}
// QueueBinding: {value=Queue('inventory-service'), bindingKey="order.*"}
// inventory-service實例1消費到的消息
OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0006", orderId = "order-0002", action="paid", ...)
// inventory-service實例2消費到的消息
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="modified", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)
// promotion-service
// Exchange: {value="order-update", type="fanout"}
// QueueBinding: {value="promotion-service", bindingKey="order.*.paid"}
// promotion-service實例1消費到的消息
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0006", orderId = "order-0002", action="paid", ...)
總結(jié)
RabbitMQ的Exchange支持不同類型(Direct, Fanout, Topic, Headers),以及Binding可以對消息以更靈活的通配符的方式將消息分發(fā)到對應(yīng)的Queue上,因此其消息處理機制更加靈活。
基于Topic的消息發(fā)布與消費模式,能夠?qū)⑾M者和生產(chǎn)者完全解耦,相對RabbitMQ中的所支持的靈活處理消息的方式,更加簡單且易于理解,這也是Kafka的消息處理機制。
通過對比不同的中間件的消息處理機制也許能找到更好的實踐方式。