自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Spring Boot + MQTT:消息交互輕松實現(xiàn)

網(wǎng)絡(luò) 網(wǎng)絡(luò)管理
MQTT(Message Queuing Telemetry Transport)是一種輕量級的消息傳輸協(xié)議,專為資源受限的設(shè)備和低帶寬、高延遲的網(wǎng)絡(luò)環(huán)境設(shè)計。在物聯(lián)網(wǎng)(IoT)領(lǐng)域,MQTT因其低開銷和高效能而被廣泛應(yīng)用。

介紹

MQTT(Message Queuing Telemetry Transport)是一種輕量級的消息傳輸協(xié)議,專為資源受限的設(shè)備和低帶寬、高延遲的網(wǎng)絡(luò)環(huán)境設(shè)計。在物聯(lián)網(wǎng)(IoT)領(lǐng)域,MQTT因其低開銷和高效能而被廣泛應(yīng)用。

圖片圖片

使用

安裝說明

https://docs.emqx.com/zh/emqx/v4.3/getting-started/install.html#zip-%E5%8E%8B%E7%BC%A9%E5%8C%85%E5%AE%89%E8%A3%85-linux%E3%80%81macos%E3%80%81windows

圖片圖片

默認(rèn)賬號:admin/public,登錄后進(jìn)入首頁:

圖片圖片

依賴引入

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

配置

# MQTT配置
mqtt:
  broker-url: tcp://localhost:1883 # MQTT代理服務(wù)器地址
  client-id: yian-mqtt-client # 客戶端ID,必須唯一
  username: admin # 用戶名(如果需要)
  password: 1qazxsw2 # 密碼(如果需要)
  topics:
    topic1: test/topic_1
    topic2: test/topic_2
    topic3: test/topic_3
  qos: 1 # 消息質(zhì)量等級(0、1或2)
  automatic-reconnect: true# 自動重連
  clean-session: true# 是否清除會話
  connection-timeout: 5000 # 連接超時時間(秒)
  keep-alive-interval: 30 # 保持連接時間(秒)
  pool-config:
    core-size: 8
    max-size: 32
    queue-capacity: 1024
    thread-name-prefix: mqtt-worker-
@Data
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
    /**
     * MQTT Broker地址
     */
    private String brokerUrl;

    /**
     * 客戶端ID
     */
    private String clientId;

    /**
     * 用戶名
     */
    private String username;

    /**
     * 密碼
     */
    private String password;

    /**
     * QoS級別 (0/1/2)
     */
    private int qos = 1;
    /**
     * 自動重連
     */
    private boolean automaticReconnect = false;
    /**
     * 清理會話
     */
    private boolean cleanSession = false;

    /**
     * 連接超時時間(ms)
     */
    private int connectionTimeout = 5000;

    /**
     * 保持連接間隔(秒)
     */
    private int keepAliveInterval = 30;

    /**
     * 主題配置
     */
    private Topics topics = new Topics();
    /**
     * 線程相關(guān)參數(shù)
     */
    private PoolConfig poolConfig = new PoolConfig();

    @Data
    public static class PoolConfig {
        private int coreSize = 8;
        private int maxSize = 16;
        private int queueCapacity = 1000;
        private String threadPrefix = "mqtt-worker-";
    }

    @Data
    public static class Topics {
        /**
         * topic1
         */
        private String topic1;

        /**
         * topic2
         */
        private String topic2;

        /**
         * topic3
         */
        private String topic3;

    }

    // 需要顯式聲明空構(gòu)造器
    public MqttProperties() {
    }
}

MQTT配置類

@Slf4j
@Configuration
public class MqttConfig {
    private static final List<String> DEFAULT_TOPICS = Collections.singletonList("defaultTopic");

    private final MqttProperties mqttProperties;
    private final MqttCallback mqttCallback;

    public MqttConfig(MqttProperties mqttProperties, MqttCallback mqttCallback) {
        this.mqttProperties = mqttProperties;
        this.mqttCallback = mqttCallback;
    }

    @Bean
    public MqttClient mqttClient() throws MqttException {
        MqttClient client = createMqttClient();
        MqttConnectOptions options = buildMqttConnectOptions();

        try {
            client.connect(options);
            log.info("MQTT連接成功,Broker地址: {}", mqttProperties.getBrokerUrl());
            subscribeTopics(client);
        } catch (MqttException e) {
            log.error("MQTT連接異常: {},錯誤碼: {}", e.getMessage(), e.getReasonCode(), e);
            throw new RuntimeException("MQTT連接失敗", e);
        }

        client.setCallback(mqttCallback);
        return client;
    }

    private MqttClient createMqttClient() throws MqttException {
        return new MqttClient(
                mqttProperties.getBrokerUrl(),
                generateClientId(),
                new MemoryPersistence()
        );
    }

    private String generateClientId() {
        return Optional.ofNullable(mqttProperties.getClientId())
                .filter(StringUtils::hasText)
                .orElseGet(() -> "CLIENT_" + System.currentTimeMillis());
    }

    private MqttConnectOptions buildMqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setAutomaticReconnect(mqttProperties.isAutomaticReconnect());
        options.setCleanSession(mqttProperties.isCleanSession());

        Optional.ofNullable(mqttProperties.getUsername())
                .filter(StringUtils::hasText)
                .ifPresent(options::setUserName);

        Optional.ofNullable(mqttProperties.getPassword())
                .filter(StringUtils::hasText)
                .map(String::toCharArray)
                .ifPresent(options::setPassword);

        options.setConnectionTimeout(mqttProperties.getConnectionTimeout());
        options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
        return options;
    }

    private void subscribeTopics(MqttClient client) throws MqttException {
        List<String> topics = getTopicsToSubscribe();
        for (String topic : topics) {
            try {
                client.subscribe(topic, mqttProperties.getQos());
                log.info("成功訂閱主題: {}", topic);
            } catch (MqttException e) {
                log.error("訂閱主題[{}]失敗,錯誤碼: {}", topic, e.getReasonCode(), e);
                throw e;
            }
        }
    }

    private List<String> getTopicsToSubscribe() {
        return Optional.ofNullable(mqttProperties.getTopics())
                .map(t -> Arrays.asList(t.getTopic1()))
                .filter(list -> !list.contains(null))
                .orElse(DEFAULT_TOPICS);
    }

    @Bean("mqttExecutor")
    public Executor mqttExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(mqttProperties.getPoolConfig().getCoreSize());
        executor.setMaxPoolSize(mqttProperties.getPoolConfig().getMaxSize());
        executor.setQueueCapacity(mqttProperties.getPoolConfig().getQueueCapacity());
        executor.setThreadNamePrefix(mqttProperties.getPoolConfig().getThreadPrefix());
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

消息回調(diào),處理接收的消息

@Slf4j
@Component
public class MqttMessageListener implements MqttCallback {
    private static final int MAX_RETRY_ATTEMPTS = 10;
    private static final long INITIAL_RETRY_DELAY = 1_000L;
    private static final List<String> DEFAULT_TOPICS = Collections.singletonList("defaultTopic");

    private final ScheduledExecutorService reconnectScheduler = Executors.newSingleThreadScheduledExecutor();
    private final AtomicInteger retryCounter = new AtomicInteger(0);

    private final Map<String, MessageHandler> topicHandlers = new ConcurrentHashMap<>();

    private final TestService testService;
    private final MqttProperties mqttProperties;
    @Lazy
    @Autowired
    private MqttClient mqttClient;


    public MqttMessageListener(TestService testService, MqttProperties mqttProperties) {
        this.testService = testService;
        this.mqttProperties = mqttProperties;
        initializeHandlers();
    }

    private void initializeHandlers() {
        topicHandlers.put(mqttProperties.getTopics().getTopic1(), this::handleMessage2);
    }

    @Override
    public void connectionLost(Throwable cause) {
        log.error("MQTT連接中斷,原因: {}", cause.getMessage());
        scheduleReconnect();
    }

    private synchronized void scheduleReconnect() {
        int attempt = retryCounter.incrementAndGet();
        if (attempt > MAX_RETRY_ATTEMPTS) {
            log.error("達(dá)到最大重連次數(shù)[{}],停止重連嘗試", MAX_RETRY_ATTEMPTS);
            return;
        }
        long delay = INITIAL_RETRY_DELAY * (long) Math.pow(2, attempt - 1);
        log.info("將在{}ms后嘗試第{}次重連...", delay, attempt);

        reconnectScheduler.schedule(() -> {
            try {
                if (!mqttClient.isConnected()) {
                    mqttClient.reconnect();
                }
                subscribeTopics(mqttClient);
                mqttClient.setCallback(this);
                retryCounter.set(0);
                log.info("MQTT連接恢復(fù)成功");
            } catch (MqttException e) {
                log.error("第{}次重連失敗: {}", attempt, e.getMessage(),e);
                scheduleReconnect();
            }
        }, delay, TimeUnit.MILLISECONDS);
    }


    private void subscribeTopics(MqttClient client) throws MqttException {
        List<String> topics = getTopicsToSubscribe();
        for (String topic : topics) {
            try {
                client.subscribe(topic, mqttProperties.getQos());
                log.info("成功訂閱主題: {}", topic);
            } catch (MqttException e) {
                log.error("訂閱主題[{}]失敗,錯誤碼: {}", topic, e.getReasonCode(), e);
                throw e;
            }
        }
    }

    private List<String> getTopicsToSubscribe() {
        return Optional.ofNullable(mqttProperties.getTopics())
                .map(t -> Arrays.asList(t.getTopic1()))
                .filter(list -> !list.contains(null))
                .orElse(DEFAULT_TOPICS);
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) {
        try {
            String payload = validatePayload(message.getPayload());
            log.info("收到消息 [Topic:{}][QoS:{}] {}", topic, message.getQos(), payload);

            MessageHandler handler = Optional.ofNullable(topicHandlers.get(topic))
                    .orElseThrow(() -> new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION));

            asyncProcessMessage(() -> {
                try {
                    handler.handle(payload);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });

        } catch (JSONException e) {
            log.error("消息格式錯誤 [Topic:{}]: {}", topic, e.getMessage(), e);
        } catch (MqttException e) {
            log.error("消息處理失敗 [Topic:{}]: {}", topic, e.getMessage(), e);
        } catch (Exception e) {
            log.error("未知處理異常 [Topic:{}]: {}", topic, e.getMessage(), e);
        }
    }

    private String validatePayload(byte[] payload) throws JSONException {
        String json = new String(payload);
        try {
            JSON.parse(json); // 完整解析 JSON,捕獲格式異常
        } catch (JSONException e) {
            log.error("非法JSON格式: {}", json);
            throw new JSONException("非法JSON格式", e);
        }
        return json;
    }

    @Async("mqttExecutor")
    public void asyncProcessMessage(Runnable task) {
        task.run();
    }

    private void handleMessage2(String payload) {
        testService.handleMessage2(payload);
    }
    private void handleMessage3(String payload) {
        testService.handleMessage3(payload);
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        try {
            if (token.getException() != null) {
                log.error("消息投遞失敗 [MessageId:{}]", token.getMessageId(), token.getException());
            } else {
                log.info("消息投遞成功 [MessageId:{}]", token.getMessageId());
            }
        } catch (Exception e) {
            log.error("獲取投遞狀態(tài)失敗", e);
        }
    }

    @FunctionalInterface
    private interface MessageHandler {
        void handle(String payload) throws Exception;
    }
}

消息發(fā)布

@Slf4j
@Component
public class MqttPublisher {

    @Autowired
    @Lazy
    private MqttClient mqttClient;

    @Value("${mqtt.qos:1}")
    private int qosLevel;

    private final Object publishLock = new Object();

    /**
     * 發(fā)布消息
     */
    public void publishMessage(String msg, String topic) {
        try {
            synchronized (publishLock) { // 線程安全鎖
                MqttMessage message = new MqttMessage();
                message.setPayload(msg.getBytes());
                message.setQos(qosLevel);
                message.setRetained(false); // 不保留消息

                mqttClient.publish(topic, message);
                log.info("Topic:{} 響應(yīng)已發(fā)送: {}", topic, msg);
            }
        } catch (MqttException e) {
            log.error("MQTT發(fā)布失敗: {}", e.getMessage());
            handlePublishFailure(msg, topic, e);
        }
    }

    /**
     * 消息失敗重試機(jī)制
     */
    private void handlePublishFailure(String msg, String topic, MqttException e) {
        try {
            if (mqttClient.isConnected()) {
                mqttClient.disconnect();
            }
            mqttClient.connect();
            publishMessage(msg, topic); // 重試發(fā)布
        } catch (MqttException ex) {
            log.error("消息重試失敗: {}", ex.getMessage(),ex);
        }
    }

    /**
     * 消息持久化存儲(QoS 1保障)
     */
    private void saveFailedMessage(JSONObject msg) {
        // 實現(xiàn)數(shù)據(jù)庫存儲邏輯(此處需補(bǔ)充DAO操作)
        log.warn("持久化失敗消息: {}", msg);
    }
}

測試

啟動程序:

圖片圖片

查看主題:

圖片圖片

發(fā)布消息:

圖片圖片

責(zé)任編輯:武曉燕 來源: 一安未來
相關(guān)推薦

2024-10-11 11:32:22

Spring6RSocket服務(wù)

2025-04-09 02:02:00

Spring框架開發(fā)

2020-04-23 15:59:04

SpringKafka集群

2023-10-15 22:40:25

插件JIB

2021-08-04 10:22:27

鴻蒙HarmonyOS應(yīng)用

2024-08-09 08:52:26

2022-07-13 08:36:57

MQ架構(gòu)設(shè)計模式

2021-09-03 06:46:34

Spring 6pring Boot 項目

2021-09-15 09:02:20

Spring 6Spring BootJava

2023-10-18 15:25:29

數(shù)據(jù)源數(shù)據(jù)庫

2025-02-08 08:16:16

2025-02-17 00:00:45

接口支付寶沙箱

2024-09-12 14:50:08

2022-09-22 13:28:34

Redis分布式鎖

2022-09-29 08:28:57

SpringRedis分布式

2024-01-04 18:01:55

高并發(fā)SpringBoot

2024-10-17 11:24:04

2024-10-25 08:41:18

消息隊列RedisList

2023-07-26 07:28:55

WebSocket服務(wù)器方案

2021-09-16 10:29:05

開發(fā)技能代碼
點贊
收藏

51CTO技術(shù)棧公眾號