OpenHarmony中使用MQTT
想了解更多關(guān)于開源的內(nèi)容,請(qǐng)?jiān)L問(wèn):
一、前景摘要
- DES版本:DevEco Studio 3.0 Release
- SDK版本:3.2.2.5 ( API9)
- npm版本:6.14.16
- EMQX:Linux(Ubuntu)
- MQTTX:Version: v1.9.2
二、了解MQTT
1、什么是MQTT?
MQTT**(**消息隊(duì)列遙測(cè)傳輸)是ISO 標(biāo)準(zhǔn)(ISO/IEC PRF 20922)下基于發(fā)布/訂閱范式的消息協(xié)議。它工作在TCP/IP協(xié)議族上,是為硬件性能低下的遠(yuǎn)程設(shè)備以及網(wǎng)絡(luò)狀況糟糕的情況下而設(shè)計(jì)的發(fā)布/訂閱型消息協(xié)議,為此,它需要一個(gè)消息中間件。
2、MQTT特性
MQTT協(xié)議是為大量計(jì)算能力有限,且工作在低帶寬、不可靠的網(wǎng)絡(luò)的遠(yuǎn)程傳感器和控制設(shè)備通訊而設(shè)計(jì)的協(xié)議,它具有以下主要的幾項(xiàng)特性:
使用發(fā)布/訂閱消息模式,提供一對(duì)多的消息發(fā)布,解除應(yīng)用程序耦合;
- 對(duì)負(fù)載內(nèi)容屏蔽的消息傳輸。
- 使用 TCP/IP 提供網(wǎng)絡(luò)連接。
- 有三種消息發(fā)布服務(wù)質(zhì)量:QoS(定閱等級(jí)),分0、1、2三個(gè)等級(jí),簡(jiǎn)單來(lái)說(shuō)是等級(jí)越高越可靠。
- 小型傳輸,開銷很?。ü潭ㄩL(zhǎng)度的頭部是 2 字節(jié)),協(xié)議交換最小化,以降低網(wǎng)絡(luò)流量。
- 使用 Last Will 和 Testament 特性通知有關(guān)各方客戶端異常中斷的機(jī)制。
3、應(yīng)用場(chǎng)景(多客戶端,少量消息)
- 車聯(lián)網(wǎng)
- 工業(yè)互聯(lián)網(wǎng)
- 智能家居
- 視頻直播彈幕
- IM實(shí)時(shí)聊天(一對(duì)一聊天,群組聊天)
- 推送服務(wù),比如推送實(shí)時(shí)新聞
- 金融交易數(shù)據(jù)訂閱推送
4、基本名詞
MQTT協(xié)議中的三種身份
- MQTT Broker:代理服務(wù)器
- Publish:發(fā)布者,發(fā)布消息
- Subscribe:訂閱者,訂閱消息
MQTT傳輸?shù)南?/h4>
主題(Topic),負(fù)載(payload)和QoS
- Topic:消息的類型,訂閱者訂閱后就會(huì)收到該主題的消息內(nèi)容(payload).
- Payload:消息的內(nèi)容,指訂閱者具體要使用的內(nèi)容。
- QoS:服務(wù)質(zhì)量.
“至多一次”(QoS0):消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡(luò)。會(huì)發(fā)生消息丟失或重復(fù)。這一級(jí)別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次讀記錄無(wú)所謂,因?yàn)椴痪煤筮€會(huì)有第二次發(fā)送。即是推送之后就完事了,至于對(duì)方有沒(méi)有收到,收到是什么,數(shù)據(jù)有沒(méi)有丟失,都不管。
“至少一次”(QoS1):確保消息到達(dá),但消息重復(fù)可能會(huì)發(fā)生。即是你收到推送后,你還得返回一個(gè)puback給對(duì)方,告訴對(duì)方收到了,不然對(duì)方會(huì)以為你沒(méi)收到,隔一段時(shí)間后重新給你推送,直到你給對(duì)方返回一個(gè)Puback為止。
“只有一次”(QoS2):確保消息到達(dá)一次。這一級(jí)別可用于如下情況,在計(jì)費(fèi)系統(tǒng)中,消息重復(fù)或丟失會(huì)導(dǎo)致不正確的結(jié)果。
異常中斷的機(jī)制
使用 Last Will 和 Testament 特性通知有關(guān)各方客戶端。
- Last Will:即遺言機(jī)制,用于通知同一主題下的其他設(shè)備發(fā)送遺言的設(shè)備已經(jīng)斷開了連接。
- Testament:遺言機(jī)制,功能類似于Last Will。
三、MQTT的簡(jiǎn)單使用
1、搭建MQTT服務(wù)器EMQX
MQTT服務(wù)器有很多種,且部署方式也不一樣。
Linux (Ubuntu)
安裝命令:
curl -s https://assets.emqx.com/scripts/install-emqx-deb.sh | sudo bash
sudo apt-get install emqx :安裝
sudo systemctl start emqx :?jiǎn)?dòng)
下載鏈接:https://www.emqx.io/zh/downloads?os=Ubuntu
Docker:https://emqx.io/zh/downloads?os=Docker
Linux(CentOS):https://www.emqx.io/zh/downloads?os=CentOS
Windows:https://www.emqx.io/zh/downloads?os=Windows
測(cè)試:確保emqx已正常運(yùn)行后,可在Linux本地瀏覽器中輸入: http://127.0.0.1:18083。
注意:未登錄需修改密碼,兩次保持一致。
局域網(wǎng)其他電腦訪問(wèn)須知Linux的IP地址。
2、MQTTX
創(chuàng)建連接
創(chuàng)建subscription
收發(fā)消息
#代表通配符,代表訂閱所有該類型的topic。
3、在OpenHarmony使用MQTT
安裝依賴
依賴地址:https://gitee.com/openharmony-tpc/ohos_mqtt。
有所不同的是我用的npm安裝的依賴npm install @ohos/mqtt。
創(chuàng)建Mqtt客戶端
創(chuàng)建mqtt客戶端并建立連接:
const clientOptions: MqttClientOptions = {
url: '192.168.xxx.xxx/:1883',
clientId: 'client_id_' + new Date().getTime(),
persistenceType: 1,
}
const connectOptions: MqttConnectOptions = {
userName: '',
password: '',
connectTimeout: 30,
}
this.mqClient = MqttAsync.createMqtt(this.clientOptions);
this.mqClient.connect(this.connectOptions, (data: MqttResponse) => {
console.log(TAG+" data: "+JSON.stringify(data));
if (data.code == 0) {
this.messageArrived();
this.subscribe('主設(shè)備號(hào)/#');
}
});
監(jiān)聽
//接收消息,使用此接口后,當(dāng)訂閱的主題有消息發(fā)布時(shí),會(huì)自動(dòng)接收到消息。
public messageArrived(): void {
this.mqClient.messageArrived((err, data) => {
console.log(TAG+"messageArrived!!!!!!!!!!!");
console.log(TAG+"messageArrived data:"+JSON.stringify(data));
});
}
訂閱消息
// 訂閱消息
public subscribe(topic: string, qos: QoS = 1): void {
const subscribeOption: MqttSubscribeOptions = { topic, qos };
this.mqClient.subscribe(subscribeOption, (err, data)=>{
this.handleMessage(data)
});
}
發(fā)布消息
// 發(fā)布消息
public publish<T>(topic: string, payload: string | Record<string, any>, qos: QoS = 0): void {
if (typeof payload !== 'string') {
payload = JSON.stringify(payload)
}
const payloadLen = payload.length;
const publishOption: MqttPublishOptions = { topic, payload, qos, payloadLen };
console.log(TAG, 'publishOption data: ' + JSON.stringify(publishOption));
this.mqClient.publish(publishOption, (err, data)=>{
console.log(TAG+"publish!!!!!!!!!!!");
console.log(TAG+"publish data:"+JSON.stringify(data));
}));
}
根據(jù)訂閱的消息的主題的不同進(jìn)行不同的處理
handleMessage(data:any){
console.log(TAG+"subscribe!!!!!!!!!!!");
console.log(TAG+"subscribe data:"+JSON.stringify(data));
//根據(jù)data的不同進(jìn)行不同的處理
}
四、擴(kuò)展
mqtt與MQ中間件的關(guān)系。
消息中間件是基于隊(duì)列與消息傳遞技術(shù),在網(wǎng)絡(luò)環(huán)境中為應(yīng)用系統(tǒng)提供同步或異步、可靠的消息傳輸?shù)闹涡攒浖到y(tǒng)。
MQTT 與消息隊(duì)列有一定的區(qū)別,隊(duì)列是一種先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu),消息隊(duì)列常用于應(yīng)用服務(wù)層面,實(shí)現(xiàn)參考如 RabbitMQ Kafka RocketMQ。
MQTT 是傳輸協(xié)議,絕大部分 MQTT Broker 不保證消息順序(Queue),常用語(yǔ)物聯(lián)網(wǎng)、消息傳輸?shù)取?/p>