Spring Boot + MQTT:消息交互輕松實現(xiàn)
作者:一安
MQTT(Message Queuing Telemetry Transport)是一種輕量級的消息傳輸協(xié)議,專為資源受限的設(shè)備和低帶寬、高延遲的網(wǎng)絡(luò)環(huán)境設(shè)計。在物聯(lián)網(wǎng)(IoT)領(lǐng)域,MQTT因其低開銷和高效能而被廣泛應(yīng)用。
介紹
MQTT(Message Queuing Telemetry Transport)是一種輕量級的消息傳輸協(xié)議,專為資源受限的設(shè)備和低帶寬、高延遲的網(wǎng)絡(luò)環(huán)境設(shè)計。在物聯(lián)網(wǎng)(IoT)領(lǐng)域,MQTT因其低開銷和高效能而被廣泛應(yīng)用。
圖片
使用
安裝說明
圖片
默認(rèn)賬號:admin/public,登錄后進(jìn)入首頁:
圖片
依賴引入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
配置
# MQTT配置
mqtt:
broker-url: tcp://localhost:1883 # MQTT代理服務(wù)器地址
client-id: yian-mqtt-client # 客戶端ID,必須唯一
username: admin # 用戶名(如果需要)
password: 1qazxsw2 # 密碼(如果需要)
topics:
topic1: test/topic_1
topic2: test/topic_2
topic3: test/topic_3
qos: 1 # 消息質(zhì)量等級(0、1或2)
automatic-reconnect: true# 自動重連
clean-session: true# 是否清除會話
connection-timeout: 5000 # 連接超時時間(秒)
keep-alive-interval: 30 # 保持連接時間(秒)
pool-config:
core-size: 8
max-size: 32
queue-capacity: 1024
thread-name-prefix: mqtt-worker-
@Data
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
/**
* MQTT Broker地址
*/
private String brokerUrl;
/**
* 客戶端ID
*/
private String clientId;
/**
* 用戶名
*/
private String username;
/**
* 密碼
*/
private String password;
/**
* QoS級別 (0/1/2)
*/
private int qos = 1;
/**
* 自動重連
*/
private boolean automaticReconnect = false;
/**
* 清理會話
*/
private boolean cleanSession = false;
/**
* 連接超時時間(ms)
*/
private int connectionTimeout = 5000;
/**
* 保持連接間隔(秒)
*/
private int keepAliveInterval = 30;
/**
* 主題配置
*/
private Topics topics = new Topics();
/**
* 線程相關(guān)參數(shù)
*/
private PoolConfig poolConfig = new PoolConfig();
@Data
public static class PoolConfig {
private int coreSize = 8;
private int maxSize = 16;
private int queueCapacity = 1000;
private String threadPrefix = "mqtt-worker-";
}
@Data
public static class Topics {
/**
* topic1
*/
private String topic1;
/**
* topic2
*/
private String topic2;
/**
* topic3
*/
private String topic3;
}
// 需要顯式聲明空構(gòu)造器
public MqttProperties() {
}
}
MQTT配置類
@Slf4j
@Configuration
public class MqttConfig {
private static final List<String> DEFAULT_TOPICS = Collections.singletonList("defaultTopic");
private final MqttProperties mqttProperties;
private final MqttCallback mqttCallback;
public MqttConfig(MqttProperties mqttProperties, MqttCallback mqttCallback) {
this.mqttProperties = mqttProperties;
this.mqttCallback = mqttCallback;
}
@Bean
public MqttClient mqttClient() throws MqttException {
MqttClient client = createMqttClient();
MqttConnectOptions options = buildMqttConnectOptions();
try {
client.connect(options);
log.info("MQTT連接成功,Broker地址: {}", mqttProperties.getBrokerUrl());
subscribeTopics(client);
} catch (MqttException e) {
log.error("MQTT連接異常: {},錯誤碼: {}", e.getMessage(), e.getReasonCode(), e);
throw new RuntimeException("MQTT連接失敗", e);
}
client.setCallback(mqttCallback);
return client;
}
private MqttClient createMqttClient() throws MqttException {
return new MqttClient(
mqttProperties.getBrokerUrl(),
generateClientId(),
new MemoryPersistence()
);
}
private String generateClientId() {
return Optional.ofNullable(mqttProperties.getClientId())
.filter(StringUtils::hasText)
.orElseGet(() -> "CLIENT_" + System.currentTimeMillis());
}
private MqttConnectOptions buildMqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(mqttProperties.isAutomaticReconnect());
options.setCleanSession(mqttProperties.isCleanSession());
Optional.ofNullable(mqttProperties.getUsername())
.filter(StringUtils::hasText)
.ifPresent(options::setUserName);
Optional.ofNullable(mqttProperties.getPassword())
.filter(StringUtils::hasText)
.map(String::toCharArray)
.ifPresent(options::setPassword);
options.setConnectionTimeout(mqttProperties.getConnectionTimeout());
options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
return options;
}
private void subscribeTopics(MqttClient client) throws MqttException {
List<String> topics = getTopicsToSubscribe();
for (String topic : topics) {
try {
client.subscribe(topic, mqttProperties.getQos());
log.info("成功訂閱主題: {}", topic);
} catch (MqttException e) {
log.error("訂閱主題[{}]失敗,錯誤碼: {}", topic, e.getReasonCode(), e);
throw e;
}
}
}
private List<String> getTopicsToSubscribe() {
return Optional.ofNullable(mqttProperties.getTopics())
.map(t -> Arrays.asList(t.getTopic1()))
.filter(list -> !list.contains(null))
.orElse(DEFAULT_TOPICS);
}
@Bean("mqttExecutor")
public Executor mqttExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(mqttProperties.getPoolConfig().getCoreSize());
executor.setMaxPoolSize(mqttProperties.getPoolConfig().getMaxSize());
executor.setQueueCapacity(mqttProperties.getPoolConfig().getQueueCapacity());
executor.setThreadNamePrefix(mqttProperties.getPoolConfig().getThreadPrefix());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
消息回調(diào),處理接收的消息
@Slf4j
@Component
public class MqttMessageListener implements MqttCallback {
private static final int MAX_RETRY_ATTEMPTS = 10;
private static final long INITIAL_RETRY_DELAY = 1_000L;
private static final List<String> DEFAULT_TOPICS = Collections.singletonList("defaultTopic");
private final ScheduledExecutorService reconnectScheduler = Executors.newSingleThreadScheduledExecutor();
private final AtomicInteger retryCounter = new AtomicInteger(0);
private final Map<String, MessageHandler> topicHandlers = new ConcurrentHashMap<>();
private final TestService testService;
private final MqttProperties mqttProperties;
@Lazy
@Autowired
private MqttClient mqttClient;
public MqttMessageListener(TestService testService, MqttProperties mqttProperties) {
this.testService = testService;
this.mqttProperties = mqttProperties;
initializeHandlers();
}
private void initializeHandlers() {
topicHandlers.put(mqttProperties.getTopics().getTopic1(), this::handleMessage2);
}
@Override
public void connectionLost(Throwable cause) {
log.error("MQTT連接中斷,原因: {}", cause.getMessage());
scheduleReconnect();
}
private synchronized void scheduleReconnect() {
int attempt = retryCounter.incrementAndGet();
if (attempt > MAX_RETRY_ATTEMPTS) {
log.error("達(dá)到最大重連次數(shù)[{}],停止重連嘗試", MAX_RETRY_ATTEMPTS);
return;
}
long delay = INITIAL_RETRY_DELAY * (long) Math.pow(2, attempt - 1);
log.info("將在{}ms后嘗試第{}次重連...", delay, attempt);
reconnectScheduler.schedule(() -> {
try {
if (!mqttClient.isConnected()) {
mqttClient.reconnect();
}
subscribeTopics(mqttClient);
mqttClient.setCallback(this);
retryCounter.set(0);
log.info("MQTT連接恢復(fù)成功");
} catch (MqttException e) {
log.error("第{}次重連失敗: {}", attempt, e.getMessage(),e);
scheduleReconnect();
}
}, delay, TimeUnit.MILLISECONDS);
}
private void subscribeTopics(MqttClient client) throws MqttException {
List<String> topics = getTopicsToSubscribe();
for (String topic : topics) {
try {
client.subscribe(topic, mqttProperties.getQos());
log.info("成功訂閱主題: {}", topic);
} catch (MqttException e) {
log.error("訂閱主題[{}]失敗,錯誤碼: {}", topic, e.getReasonCode(), e);
throw e;
}
}
}
private List<String> getTopicsToSubscribe() {
return Optional.ofNullable(mqttProperties.getTopics())
.map(t -> Arrays.asList(t.getTopic1()))
.filter(list -> !list.contains(null))
.orElse(DEFAULT_TOPICS);
}
@Override
public void messageArrived(String topic, MqttMessage message) {
try {
String payload = validatePayload(message.getPayload());
log.info("收到消息 [Topic:{}][QoS:{}] {}", topic, message.getQos(), payload);
MessageHandler handler = Optional.ofNullable(topicHandlers.get(topic))
.orElseThrow(() -> new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION));
asyncProcessMessage(() -> {
try {
handler.handle(payload);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} catch (JSONException e) {
log.error("消息格式錯誤 [Topic:{}]: {}", topic, e.getMessage(), e);
} catch (MqttException e) {
log.error("消息處理失敗 [Topic:{}]: {}", topic, e.getMessage(), e);
} catch (Exception e) {
log.error("未知處理異常 [Topic:{}]: {}", topic, e.getMessage(), e);
}
}
private String validatePayload(byte[] payload) throws JSONException {
String json = new String(payload);
try {
JSON.parse(json); // 完整解析 JSON,捕獲格式異常
} catch (JSONException e) {
log.error("非法JSON格式: {}", json);
throw new JSONException("非法JSON格式", e);
}
return json;
}
@Async("mqttExecutor")
public void asyncProcessMessage(Runnable task) {
task.run();
}
private void handleMessage2(String payload) {
testService.handleMessage2(payload);
}
private void handleMessage3(String payload) {
testService.handleMessage3(payload);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
try {
if (token.getException() != null) {
log.error("消息投遞失敗 [MessageId:{}]", token.getMessageId(), token.getException());
} else {
log.info("消息投遞成功 [MessageId:{}]", token.getMessageId());
}
} catch (Exception e) {
log.error("獲取投遞狀態(tài)失敗", e);
}
}
@FunctionalInterface
private interface MessageHandler {
void handle(String payload) throws Exception;
}
}
消息發(fā)布
@Slf4j
@Component
public class MqttPublisher {
@Autowired
@Lazy
private MqttClient mqttClient;
@Value("${mqtt.qos:1}")
private int qosLevel;
private final Object publishLock = new Object();
/**
* 發(fā)布消息
*/
public void publishMessage(String msg, String topic) {
try {
synchronized (publishLock) { // 線程安全鎖
MqttMessage message = new MqttMessage();
message.setPayload(msg.getBytes());
message.setQos(qosLevel);
message.setRetained(false); // 不保留消息
mqttClient.publish(topic, message);
log.info("Topic:{} 響應(yīng)已發(fā)送: {}", topic, msg);
}
} catch (MqttException e) {
log.error("MQTT發(fā)布失敗: {}", e.getMessage());
handlePublishFailure(msg, topic, e);
}
}
/**
* 消息失敗重試機(jī)制
*/
private void handlePublishFailure(String msg, String topic, MqttException e) {
try {
if (mqttClient.isConnected()) {
mqttClient.disconnect();
}
mqttClient.connect();
publishMessage(msg, topic); // 重試發(fā)布
} catch (MqttException ex) {
log.error("消息重試失敗: {}", ex.getMessage(),ex);
}
}
/**
* 消息持久化存儲(QoS 1保障)
*/
private void saveFailedMessage(JSONObject msg) {
// 實現(xiàn)數(shù)據(jù)庫存儲邏輯(此處需補(bǔ)充DAO操作)
log.warn("持久化失敗消息: {}", msg);
}
}
測試
啟動程序:
圖片
查看主題:
圖片
發(fā)布消息:
圖片
責(zé)任編輯:武曉燕
來源:
一安未來