物聯(lián)網(wǎng)網(wǎng)關(guān)開發(fā):基于MQTT消息總線的設(shè)計過程(下)
一、前言
在上一篇文章中物聯(lián)網(wǎng)網(wǎng)關(guān)開發(fā):基于MQTT消息總線的設(shè)計過程(上),我們聊了在一個物聯(lián)網(wǎng)系統(tǒng)的網(wǎng)關(guān)中,如何利用 MQTT 消息總線,在嵌入式系統(tǒng)內(nèi)部實現(xiàn)多個進程之間的相互通信問題。
這個通信模型的最大幾個優(yōu)點是:
- 模塊之間解耦合;
- 各模塊之間可以并行開發(fā);
- 把 TCP 鏈接和粘包問題交給消息總線處理,我們只需要處理業(yè)務(wù)層的東西;
- 調(diào)試方便;
以上只是描述了在一個嵌入式系統(tǒng)內(nèi)部,進程之間的通信方式,那么網(wǎng)關(guān)如何與云平臺進行交互呢?
在上一篇文章中已經(jīng)提到過:網(wǎng)關(guān)與云平臺之間的通信方式一般都是客戶指定的,就那么幾種(阿里云、華為云、騰訊云、亞馬遜AWS平臺)。一般都要求網(wǎng)關(guān)與云平臺之間處于長連接的狀態(tài),這樣云端的各種指令就可以隨時發(fā)送到網(wǎng)關(guān)。
這一篇文章,我們就來聊一聊這部分內(nèi)容。
在公眾號回復(fù):mqtt,獲取示例代碼的網(wǎng)盤地址。
二、與云平臺之間的 MQTT 連接
目前的幾大物聯(lián)網(wǎng)云平臺,都提供了不同的接入方式。對于網(wǎng)關(guān)來說,應(yīng)用最多的就是 MQTT 接入。
我們知道,MQTT 只是一個協(xié)議而已,不同的編程語言中都有實現(xiàn),在 C 語言中也有好幾個實現(xiàn)。
在網(wǎng)關(guān)內(nèi)部,運行著一個后臺 deamon: MQTT Broker,其實就是 mosquitto 這個可執(zhí)行程序,它充當(dāng)著消息總線的功能。這里請大家注意:因為這個消息總線是運行在嵌入式系統(tǒng)的內(nèi)部,接入總線的客戶端就是需要相互通信的那些進程。這些進程的數(shù)量是有限的,即使是一個比較復(fù)雜的系統(tǒng),最多十幾個進程也就差不多了。因此,mosquitto 這個實現(xiàn)是完全可以支撐系統(tǒng)負(fù)載的。
那么,如果在云端部署一個 MQTT Broker,理論上是可以直接使用 mosquitto 這個實現(xiàn)來作為消息總線的,但是你要評估接入的客戶端(也就是網(wǎng)關(guān))在一個什么樣的數(shù)量級,考慮到并發(fā)的問題,一定要做壓力測試。
對于后臺開發(fā),我的經(jīng)驗不多,不敢(也不能)多言,誤導(dǎo)大家就罪過了。不過,對于一般的學(xué)習(xí)和測試來說,在云端直接部署 mosquitto 作為消息總線,是沒有問題的。
三、Proc_Bridge 進程:外部和內(nèi)部消息總線之間的橋接器
下面這張圖,說明了 Proc_Bridge 進程在這個模型中的作用:

- 從云平臺消息總線接收到的消息,需要轉(zhuǎn)發(fā)到內(nèi)部的消息總線;
- 從內(nèi)部消息總線接收到的消息,需要轉(zhuǎn)發(fā)到云平臺的消息總線;
如果用 mosquitto 來實現(xiàn),應(yīng)該如何來實現(xiàn)呢?
1. mosquitto 的 API 接口
mosquitto 這個實現(xiàn)是基于回調(diào)函數(shù)的機制來運行的,例如:
- // 連接成功時的回調(diào)函數(shù)
- void my_connect_callback(struct mosquitto *mosq, void *obj, int rc)
- {
- // ...
- }
- // 連接失敗時的回調(diào)函數(shù)
- void my_disconnect_callback(struct mosquitto *mosq, void *obj, int result)
- {
- // ...
- }
- // 接收到消息時的回調(diào)函數(shù)
- void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
- {
- // ..
- }
- int main()
- {
- // 其他代碼
- // ...
- // 創(chuàng)建一個 mosquitto 對象
- struct mosquitto g_mosq = mosquitto_new("client_name", true, NULL);
- // 注冊回調(diào)函數(shù)
- mosquitto_connect_callback_set(g_mosq, my_connect_callback);
- mosquitto_disconnect_callback_set(g_mosq, my_disconnect_callback);
- mosquitto_message_callback_set(g_mosq, my_message_callback);
- // 這里還有其他的回調(diào)函數(shù)設(shè)置
- // 開始連接到消息總線
- mosquitto_connect(g_mosq, "127.0.0.1", 1883, 60);
- while(1)
- {
- int rc = mosquitto_loop(g_mosq, -1, 1);
- if (rc) {
- printf("mqtt_portal: mosquitto_loop rc = %d \n", rc);
- sleep(1);
- mosquitto_reconnect(g_mosq);
- }
- }
- mosquitto_destroy(g_mosq);
- mosquitto_lib_cleanup();
- return 0;
- }
以上代碼就是一個 mosquitto 客戶端的最簡代碼了,使用回調(diào)函數(shù)的機制,讓程序的開發(fā)非常簡單。
mosquitto 把底層的細(xì)節(jié)問題都幫助我們處理了,只要我們注冊的函數(shù)被調(diào)用了,就說明發(fā)生了我們感興趣的事件。
這樣的回調(diào)機制在各種開源軟件中使用的比較多,比如:glib 里的定時器、libevent通訊處理、libmodbus 里的數(shù)據(jù)處理、linux 內(nèi)核中的驅(qū)動開發(fā)和定時器,都是這個套路,一通百通!
在網(wǎng)關(guān)中的每個進程,只需要添加上面這部分代碼,就可以掛載到消息總線上,從而可以與其它進程進行收發(fā)數(shù)據(jù)了。
2. 利用 UserData 指針,實現(xiàn)多個 MQTT 連接
上面的實例僅僅是連接到一個消息總線上,對于一個普通的進程來說,達(dá)到了通信的目的。
但是對于 Proc_Bridge 進程來說,還沒有達(dá)到目的,因為這個進程處于橋接的位置,需要同時連接到遠(yuǎn)程和本地這兩個消息總線上。那么應(yīng)該如何實現(xiàn)呢?
看一下 mosquitto_new 這個函數(shù)的簽名:
- /*
- * obj - A user pointer that will be passed as an argument to any
- * callbacks that are specified.
- */
- /*
- 最后一個參數(shù)的作用是:可以設(shè)置一個用戶自己的數(shù)據(jù)(作為指針傳入),那么
- mosquitto 在回調(diào)我們的注冊的任何一個函數(shù)時,都會把這個指針傳入。
- 因此,我們可以利用這個參數(shù)來區(qū)分這個連接是遠(yuǎn)程連接?還是本地連接。
- */
- libmosq_EXPORT struct mosquitto *mosquitto_new(const char *id, bool clean_session, void *obj);
所以,我們可以定義一個結(jié)構(gòu)體變量,把一個 MQTT 連接的所有信息都記錄在這里,然后注冊給 mosquitto。當(dāng) mosquitto 回調(diào)函數(shù)時,把這個結(jié)構(gòu)體變量的指針回傳給我們,這樣就拿到了這個連接的所有數(shù)據(jù),在某種程度上來說,這也是一種面向?qū)ο蟮乃枷搿?/p>
- // 從來表示一個 MQTT 連接的結(jié)構(gòu)體
- typedef struct{
- char *id;
- char *name;
- char *pw;
- char *host;
- int port;
- pthread_t tHandle;
- struct mosquitto *mosq;
- int mqtt_num;
- }MQData;
完整的代碼已經(jīng)放到網(wǎng)盤里了,為了讓你先從原理上看明白,我把關(guān)鍵幾個地方的代碼貼在這里:
- // 分配結(jié)構(gòu)體變量
- MQData userData = (MQData *)malloc(sizeof(MQData));
- // 設(shè)置屬于這里連接的參數(shù): id, name 等等
- // 創(chuàng)建 mosquitto 對象時,傳入 userData。
- struct mosquitto *mosq = mosquitto_new(userData->id, true, userData);
- // 在回調(diào)函數(shù)中,把 obj 指針前轉(zhuǎn)成 MQData 指針
- static void messageCB(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
- {
- MQData *userData = (MQData *)obj;
- // 此時就可以根據(jù) userData 指針中的內(nèi)容分辨出這是哪一個鏈接了
- }
另外一個問題:不知道你是否注意到示例中的 mosquitto_loop() 這個函數(shù)?這個函數(shù)需要放在 while 死循環(huán)中不停的調(diào)用,才能出發(fā) mosuiqtto 內(nèi)部的事件。(其實在 mosuiqtto 中,還提供了另一個簡化的函數(shù) mosquitto_loop_forever)。
也就是說:在每個連接中,需要持續(xù)的觸發(fā) mosquitto 底層的事件,才能讓消息系統(tǒng)順利的收發(fā)。因此,在示例代碼中,使用兩個線程分別連接到云平臺的總線和內(nèi)部的總線。
四、總結(jié)
經(jīng)過這兩篇文章,基本上把一個物聯(lián)網(wǎng)系統(tǒng)的網(wǎng)關(guān)中,最基本的通信模型聊完了,相當(dāng)于是一個程序的骨架吧,剩下的事情就是處理業(yè)務(wù)層的細(xì)節(jié)問題了。
萬里長征,這才是第一步!
對于一個網(wǎng)關(guān)來說,還有其他更多的問題需要處理,比如:MQTT 連接的鑒權(quán)(用戶名+密碼,證書)、通信數(shù)據(jù)的序列化和反序列化、加密和解密等等,以后慢慢聊吧,希望我們一路前行!