Golang 語(yǔ)言編寫的消息隊(duì)列 NSQ 官方客戶端 go-nsq 怎么使用
01
介紹
NSQ 是 Golang 語(yǔ)言編寫的實(shí)時(shí)分布式消息傳遞平臺(tái)(也可以理解為消息隊(duì)列),它主要由三個(gè)守護(hù)進(jìn)程組成,分別是 nsqd
、 nsqlookupd
和 nsqadmin
。其中 nsqd
是核心組成部分,它負(fù)責(zé)處理客戶端的請(qǐng)求,比如生產(chǎn)、排序和消費(fèi)消息等; nsqlookupd
負(fù)責(zé)管理集群拓?fù)湫畔⒑吞峁┮粋€(gè)最終一致性的發(fā)現(xiàn)服務(wù), nsqadmin
是一個(gè) web 界面的管理平臺(tái),可以用于實(shí)時(shí)查看集群信息和執(zhí)行其他管理操作。單個(gè) nsqd
可以含有很多 topic,每個(gè) topic 可以含有很多 channel。
NSQ 支持跨平臺(tái)和多語(yǔ)言客戶端。使用 Mac 的讀者朋友們可以使用 brew 方便的安裝 NSQ。本文我們主要介紹 NSQ 官方提供的 golang 客戶端 go-nsq。關(guān)于 NSQ 的更多內(nèi)容,感興趣的讀者朋友們可以查閱官方文檔,限于篇幅,本文不再贅述。
使用 go-nsq
操作 NSQ,需要安裝 go-nsq
庫(kù),它提供了許多用于操作 NSQ 的函數(shù)和方法。
安裝方式:
- // Mac 安裝 nsq
- brew install nsq
- // 安裝 go-nsq
- go get -u github.com/nsqio/go-nsq
02
生產(chǎn)者
go-nsq 包中的 Producer 類型,用于向 NSQ 發(fā)送消息。首先,需要調(diào)用函數(shù) NewProducer 創(chuàng)建一個(gè) Producer 實(shí)例,接收參數(shù)是 string 類型的 addr 和指針類型的 nsq.Config(NSQ 配置信息),返回結(jié)果是一個(gè) Producer 實(shí)例的地址和 error。
需要注意的是,必須調(diào)用函數(shù) NewConfig 返回一個(gè)指針類型的 nsq.Config,它包含默認(rèn)配置信息??梢酝ㄟ^調(diào)用該實(shí)例的 Set 方法設(shè)置配置信息,并且必須在用于傳參之前設(shè)置,否則設(shè)置的配置信息將不會(huì)生效。
Producer 包含很多方法,本文主要介紹四個(gè)方法,分別是 Ping、String、Publish 和 Stop。其中 Ping 方法用于檢測(cè) Producer 是否連接成功它配置的 nsqd
;String 方法返回 Producer 連接的 nsqd
的地址;Publish 方法用于同步發(fā)送消息到指定的 topic;Stop 方法用于優(yōu)雅地停止 Producer。
示例代碼:
- // 默認(rèn)配置信息
- config := nsq.NewConfig()
- // 創(chuàng)建生產(chǎn)者
- producer, err := nsq.NewProducer("127.0.0.1:4150", config)
- if err != nil {
- log.Fatal(err)
- }
- // 驗(yàn)證生成者連接是否成功
- err = producer.Ping()
- if err != nil {
- log.Fatal(err)
- }
- // 返回生產(chǎn)者地址
- producerAddr := producer.String()
- log.Printf("producerAddr:%v", producerAddr)
- messageBody := []byte("hello")
- topicName := "topic"
- // 同步發(fā)送消息到指定 topic
- err = producer.Publish(topicName, messageBody)
- if err != nil {
- log.Fatal(err)
- }
- producer.Stop()
運(yùn)行結(jié)果:
- 2021/10/23 18:58:23 INF 1 (127.0.0.1:4150) connecting to nsqd
- 2021/10/23 18:58:23 producerAddr:127.0.0.1:4150
- 2021/10/23 18:58:23 INF 1 stopping
- 2021/10/23 18:58:23 INF 1 exiting router
03
消費(fèi)者
go-nsq 包中的 Consumer 類型,用于從 NSQ 消費(fèi)消息。首先,需要調(diào)用函數(shù) NewConsumer 創(chuàng)建一個(gè) Consumer 實(shí)例,接收參數(shù)是 string 類型的 topic 和 channel,指針類型的 nsq.Config(NSQ 配置信息),返回結(jié)果是一個(gè) Consumer 實(shí)例的地址和 error。
需要注意的是,必須調(diào)用函數(shù) NewConfig 返回一個(gè)指針類型的 nsq.Config,它包含默認(rèn)配置信息??梢酝ㄟ^調(diào)用該實(shí)例的 Set 方法設(shè)置配置信息,并且必須在用于傳參之前設(shè)置,否則設(shè)置的配置信息將不會(huì)生效。
Consumer 包含很多方法,本文主要介紹三個(gè)方法,分別是 Stats、AddHandler 和 ConnectToNSQD。其中 Stats 方法用于檢索 Consumer 的當(dāng)前連接和消息統(tǒng)計(jì)信息;AddHandler 用于為 Consumer 消費(fèi)的消息設(shè)置處理函數(shù),每個(gè)處理函數(shù)都獨(dú)立運(yùn)行在一個(gè) goroutine 中,如果需要啟動(dòng)多個(gè) goroutine 運(yùn)行處理函數(shù),可以多次調(diào)用 AddHandler;ConnectToNSQD 用于連接配置的 nsqd
。
示例代碼:
- config := nsq.NewConfig()
- // 創(chuàng)建 Consumer
- consumer, err := nsq.NewConsumer("topic", "channel", config)
- if err != nil {
- log.Fatal(err)
- }
- consumerStats := consumer.Stats()
- log.Printf("consumerStats:%+v", consumerStats)
- // 給 Consumer 添加處理器,可添加多個(gè),每個(gè) Handler 都運(yùn)行在單獨(dú)的 goroutine 中
- consumer.AddHandler(&myMessageHandler{})
- // 連接 nsqd
- err = consumer.ConnectToNSQD("127.0.0.1:4150")
- if err != nil {
- log.Fatal(err)
- }
- <-consumer.StopChan
運(yùn)行結(jié)果:
- 2021/10/23 18:59:30 consumerStats:&{MessagesReceived:0 MessagesFinished:0 MessagesRequeued:0 Connections:0}
- 2021/10/23 18:59:30 INF 1 [topic/channel] (127.0.0.1:4150) connecting to nsqd
- 2021/10/23 18:59:30 hello
04
總結(jié)
本文主要介紹 Golang 語(yǔ)言編寫的實(shí)時(shí)分布式消息平臺(tái) NSQ 的 golang 客戶端 go-nsq。它是 NSQ 官方提供的 NSQ Golang 客戶端。并且 分別介紹了 Producer 和 Consumer 的簡(jiǎn)單使用方法,大家可以根據(jù)自己的業(yè)務(wù)需求,使用 go-nsq 靈活運(yùn)用 NSQ。關(guān)于 go-nsq 的更多內(nèi)容,感興趣的讀者朋友們可以閱讀官方文檔。