HarmonyOS實(shí)現(xiàn)MQTT消息監(jiān)聽(tīng)展示
想了解更多內(nèi)容,請(qǐng)?jiān)L問(wèn):
51CTO和華為官方合作共建的鴻蒙技術(shù)社區(qū)
思路
因?yàn)閔armonyOS暫時(shí)沒(méi)有發(fā)現(xiàn)現(xiàn)成的mqtt的js包,所以使用Java進(jìn)行Mqtt消息的接收,使用JS去定時(shí)調(diào)用Java接收到消息并展示
首先是JS調(diào)用Java,JS FA(Feature Ability)調(diào)用Java PA(Particle Ability)有兩種方式,Ability和Internal Ability,這里使用的是第一種Ability
然后是Java端的Mqtt消息接收,使用paho的第三方庫(kù)進(jìn)行消息接收,頁(yè)面啟動(dòng)時(shí)JS端調(diào)用Java端實(shí)現(xiàn)Mqtt消息接收開(kāi)始,使用異步掛起,接收消息并緩存,隨后JS端每次調(diào)用Java端拿到的都是最新緩存的信息
具體代碼
hml頁(yè)面:
- <div class="container">
- <div>
- <text class="title">
- {{ title }}
- </text>
- </div>
- <div>
- <text class="title" onclick="mqttMessage">
- 開(kāi)始mqtt
- </text>
- </div>
- <div>
- <text class="title" onclick="stopMqtt">
- 停止mqtt
- </text>
- </div>
- </div>
JS代碼:
- const ABILITY_TYPE_EXTERNAL = 0;
- const ACTION_SYNC = 0;
- const ACTION_MESSAGE_CODE_START_MQTT = 1001;
- const ACTION_MESSAGE_CODE_MQTT_MESSAGE = 1002;
- const BUNDLE_NAME = 'com.example.mqttapplication';
- const ABILITY_NAME = 'com.example.mqttapplication.PlayAbility';
- export const playAbility = {
- startMqtt: async function() {
- FeatureAbility.callAbility({
- messageCode: ACTION_MESSAGE_CODE_START_MQTT,
- abilityType: ABILITY_TYPE_EXTERNAL,
- syncOption: ACTION_SYNC,
- bundleName: BUNDLE_NAME,
- abilityName: ABILITY_NAME
- });
- },
- mqttMessage: async function(that) {
- var result = await FeatureAbility.callAbility({
- messageCode: ACTION_MESSAGE_CODE_MQTT_MESSAGE,
- abilityType: ABILITY_TYPE_EXTERNAL,
- syncOption: ACTION_SYNC,
- bundleName: BUNDLE_NAME,
- abilityName: ABILITY_NAME
- });
- var ret = JSON.parse(result);
- if (ret.code == 0) {
- console.info('mqtt is:' + JSON.stringify(ret.abilityResult));
- that.title = 'mqtt is:' + JSON.stringify(ret.abilityResult);
- } else {
- console.error('mqtt error code:' + JSON.stringify(ret.code));
- }
- }
- }
- export default {
- data: {
- title: "",
- timer: null
- },
- task() {
- playAbility.mqttMessage(this);
- },
- mqttMessage() {
- this.title = "開(kāi)始獲取MQTT消息";
- this.task()
- this.timer=setInterval(this.task,200)
- },
- stopMqtt() {
- clearInterval(this.timer)
- }
- }
- //初始化Java端Mqtt消息接收
- playAbility.startMqtt()
Java端代碼(接收Mqtt消息,異步)
- import org.eclipse.paho.client.mqttv3.*;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
- import java.util.List;
- public class MqttThread implements Runnable {
- /**地址*/
- public static final String MQTT_BROKER_HOST = "tcp://xxx.xxx.xxx.xxx:1883";
- /**客戶(hù)端唯一標(biāo)識(shí)*/
- public static final String MQTT_CLIENT_ID = "client";
- /**訂閱標(biāo)識(shí)*/
- public static final String MQTT_TOPIC = "HarmonyTest";
- /**客戶(hù)端*/
- private volatile static MqttClient mqttClient;
- /**連接選項(xiàng)*/
- private static MqttConnectOptions options;
- /**消息*/
- private final List<String> message;
- public MqttThread(List<String> message) {
- this.message = message;
- }
- public void run() {
- try {
- mqttClient = new MqttClient(MQTT_BROKER_HOST, MQTT_CLIENT_ID, new MemoryPersistence());
- options = new MqttConnectOptions();
- options.setCleanSession(true);
- options.setConnectionTimeout(20);
- options.setKeepAliveInterval(20);
- mqttClient.connect(options);
- mqttClient.subscribe(MQTT_TOPIC);
- mqttClient.setCallback(new MqttCallback() {
- @Override
- public void connectionLost(Throwable throwable) { }
- @Override
- public void messageArrived(String s, MqttMessage mqttMessage) {
- message.clear();
- message.add(mqttMessage.toString());
- System.out.println("接收到mqtt消息:" + mqttMessage.toString());
- }
- @Override
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { }
- });
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
Java端代碼(Particle Ability)
- import com.example.mqttapplication.mqtt.MqttThread;
- import ohos.aafwk.ability.Ability;
- import ohos.aafwk.content.Intent;
- import ohos.hiviewdfx.HiLog;
- import ohos.hiviewdfx.HiLogLabel;
- import ohos.rpc.*;
- import ohos.utils.zson.ZSONObject;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- public class PlayAbility extends Ability {
- static final HiLogLabel label = new HiLogLabel(HiLog.LOG_APP, 1, "MY_TAG");
- private static final int ERROR = -1;
- private static final int SUCCESS = 0;
- private static final int START_MQTT = 1001;
- private static final int MQTT_MESSAGE = 1002;
- @Override
- protected void onStart(Intent intent) {
- super.onStart(intent);
- }
- @Override
- protected IRemoteObject onConnect(Intent intent) {
- super.onConnect(intent);
- PlayRemote remote = new PlayRemote();
- return remote.asObject();
- }
- static class PlayRemote extends RemoteObject implements IRemoteBroker {
- private List<String> message;
- private Thread thread;
- public PlayRemote() {
- super("PlayRemote");
- }
- @Override
- public boolean onRemoteRequest(int code, MessageParcel data, MessageParcel reply, MessageOption option) {
- // 開(kāi)始mqtt
- else if (code == START_MQTT) {
- Map<String, Object> result = new HashMap<>();
- result.put("code", SUCCESS);
- result.put("abilityResult", "成功開(kāi)始mqtt");
- try {
- message = new ArrayList<>();
- MqttThread mqttThread = new MqttThread(message);
- thread = new Thread(mqttThread);
- thread.start();
- System.out.println("mqtt啟動(dòng)成功");
- }
- catch (Exception e) {
- result.put("code", ERROR);
- result.put("abilityResult", "啟動(dòng)失敗");
- }
- reply.writeString(ZSONObject.toZSONString(result));
- }
- // 獲取mqtt消息
- else if (code == MQTT_MESSAGE) {
- Map<String, Object> result = new HashMap<>();
- result.put("code", SUCCESS);
- if (message.isEmpty()) {
- result.put("abilityResult", "未接收到MQTT消息");
- }
- else {
- ZSONObject zsonObject = ZSONObject.stringToZSON(message.get(0));
- result.put("abilityResult", zsonObject.getString("message"));
- }
- reply.writeString(ZSONObject.toZSONString(result));
- }
- else {
- Map<String, Object> result = new HashMap<>();
- result.put("abilityError", ERROR);
- reply.writeString(ZSONObject.toZSONString(result));
- return false;
- }
- return true;
- }
- @Override
- public IRemoteObject asObject() {
- return this;
- }
- }
- }
另外啟動(dòng)網(wǎng)絡(luò)連接還需要往config.json里加點(diǎn)東西獲取權(quán)限
- {
- ...
- "module": {
- ...
- "reqPermissions": [
- {
- "name": "ohos.permission.GET_NETWORK_INFO"
- },
- {
- "name": "ohos.permission.INTERNET"
- },
- {
- "name": "ohos.permission.SET_NETWORK_INFO"
- },
- {
- "name": "ohos.permission.MANAGE_WIFI_CONNECTION"
- },
- {
- "name": "ohos.permission.SET_WIFI_INFO"
- },
- {
- "name": "ohos.permission.GET_WIFI_INFO"
- }
- ]
- }
- }
最后寫(xiě)了個(gè)python的腳本用來(lái)發(fā)送mqtt消息,很簡(jiǎn)單就一行
- import paho.mqtt.publish as publish
- publish.single('HarmonyTest', '{"message":"BongShakalaka"}', hostname='xxx.xxx.xxx.xxx')
附:mqtt消息是要有mqtt服務(wù)器的,這個(gè)就自己搭或者買(mǎi)吧
想了解更多內(nèi)容,請(qǐng)?jiān)L問(wèn):
51CTO和華為官方合作共建的鴻蒙技術(shù)社區(qū)