From 4fabfe4dbd2eb28d07a4350597d314958cc1c281 Mon Sep 17 00:00:00 2001 From: MrShi <1878285526@qq.com> Date: 星期四, 09 十月 2025 11:16:43 +0800 Subject: [PATCH] 优化 --- server/services/src/main/java/com/doumee/core/mqtt/config/MqttClientInit.java | 64 +++++++++++++++++++++++++------- 1 files changed, 50 insertions(+), 14 deletions(-) diff --git a/server/services/src/main/java/com/doumee/core/mqtt/config/MqttClientInit.java b/server/services/src/main/java/com/doumee/core/mqtt/config/MqttClientInit.java index 8085cba..43e47d2 100644 --- a/server/services/src/main/java/com/doumee/core/mqtt/config/MqttClientInit.java +++ b/server/services/src/main/java/com/doumee/core/mqtt/config/MqttClientInit.java @@ -1,21 +1,37 @@ package com.doumee.core.mqtt.config; +import com.doumee.config.SpringContextUtil; +import com.doumee.core.constants.Constants; import com.doumee.core.mqtt.service.MqttPushCallback; +import com.doumee.core.mqtt.service.MqttSubService; +import com.doumee.core.mqtt.service.MqttToolService; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.annotation.Autowired; public class MqttClientInit { static MqttClient client; - - public static synchronized MqttClient getInstance(MqttConfig config,MqttPushCallback callBack){ + static MqttClient subClient; + public static boolean isNeedReSub =false ; + public static synchronized MqttClient getInstance(MqttConfig config ){ if(client !=null){ + if(!client.isConnected()){ + try { + client.reconnect(); + } catch (MqttException e) { + throw new RuntimeException(e); + } + } return client; } + try { // host涓轰富鏈哄悕锛宑lientid鍗宠繛鎺QTT鐨勫鎴风ID锛屼竴鑸互鍞竴鏍囪瘑绗﹁〃绀猴紝MemoryPersistence璁剧疆clientid鐨勪繚瀛樺舰寮忥紝榛樿涓轰互鍐呭瓨淇濆瓨 - client = new org.eclipse.paho.client.mqttv3.MqttClient(config.getHost(), config.getClientid() ,new MemoryPersistence()); + String clientId =config.getClientid() +config.getVersion(); + client = new org.eclipse.paho.client.mqttv3.MqttClient(config.getHost(), clientId,new MemoryPersistence()); // MQTT鐨勮繛鎺ヨ缃� MqttConnectOptions options = new MqttConnectOptions(); // 璁剧疆鏄惁娓呯┖session,杩欓噷濡傛灉璁剧疆涓篺alse琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩炴帴璁板綍锛岃缃负true琛ㄧず姣忔杩炴帴鍒版湇鍔″櫒閮戒互鏂扮殑韬唤杩炴帴 @@ -28,11 +44,9 @@ options.setConnectionTimeout(10); // 璁剧疆浼氳瘽蹇冭烦鏃堕棿 鍗曚綅涓虹 鏈嶅姟鍣ㄤ細姣忛殧1.5*20绉掔殑鏃堕棿鍚戝鎴风鍙戦�佷釜娑堟伅鍒ゆ柇瀹㈡埛绔槸鍚﹀湪绾匡紝浣嗚繖涓柟娉曞苟娌℃湁閲嶈繛鐨勬満鍒� options.setKeepAliveInterval(20); - // 璁剧疆鍥炶皟 - client.setCallback(callBack); //璁剧疆鏂紑鍚庨噸鏂拌繛鎺� options.setAutomaticReconnect(true); - MqttTopic topic = client.getTopic(config.getClientid()+"_close"); + MqttTopic topic = client.getTopic(clientId+"_close"); //閬楀槺 options.setWill(topic, "close".getBytes(), 1, true); client.connect(options); @@ -41,13 +55,15 @@ } return client; } - public static synchronized MqttClient refreshClient(MqttConfig config,MqttPushCallback callBack){ - if(client !=null){ - return client; + public static synchronized MqttClient getSubInstance(MqttConfig config,MqttPushCallback callBack){ + if(subClient !=null){ + refreshClient(); + return subClient; } try { // host涓轰富鏈哄悕锛宑lientid鍗宠繛鎺QTT鐨勫鎴风ID锛屼竴鑸互鍞竴鏍囪瘑绗﹁〃绀猴紝MemoryPersistence璁剧疆clientid鐨勪繚瀛樺舰寮忥紝榛樿涓轰互鍐呭瓨淇濆瓨 - client = new org.eclipse.paho.client.mqttv3.MqttClient(config.getHost(), config.getClientid() ,new MemoryPersistence()); + String clientId =config.getSubclientid() +config.getVersion(); + subClient = new org.eclipse.paho.client.mqttv3.MqttClient(config.getHost(), clientId,new MemoryPersistence()); // MQTT鐨勮繛鎺ヨ缃� MqttConnectOptions options = new MqttConnectOptions(); // 璁剧疆鏄惁娓呯┖session,杩欓噷濡傛灉璁剧疆涓篺alse琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩炴帴璁板綍锛岃缃负true琛ㄧず姣忔杩炴帴鍒版湇鍔″櫒閮戒互鏂扮殑韬唤杩炴帴 @@ -61,17 +77,37 @@ // 璁剧疆浼氳瘽蹇冭烦鏃堕棿 鍗曚綅涓虹 鏈嶅姟鍣ㄤ細姣忛殧1.5*20绉掔殑鏃堕棿鍚戝鎴风鍙戦�佷釜娑堟伅鍒ゆ柇瀹㈡埛绔槸鍚﹀湪绾匡紝浣嗚繖涓柟娉曞苟娌℃湁閲嶈繛鐨勬満鍒� options.setKeepAliveInterval(20); // 璁剧疆鍥炶皟 - client.setCallback(callBack); + subClient.setCallback(callBack); //璁剧疆鏂紑鍚庨噸鏂拌繛鎺� options.setAutomaticReconnect(true); - MqttTopic topic = client.getTopic(config.getClientid()+"_close"); + MqttTopic topic = subClient.getTopic(clientId+"_close"); //閬楀槺 options.setWill(topic, "close".getBytes(), 1, true); - client.connect(options); + subClient.connect(options); } catch (Exception e) { e.printStackTrace(); } - return client; + return subClient; + } + + + public static synchronized void isSubClientValid() { + if(isNeedReSub){ + MqttSubService service = SpringContextUtil.getBean(MqttSubService.class); + if(service!=null){ + service.startSubcribe(); + } + } + } + + public static synchronized void refreshClient() { + try { + if(subClient !=null && !subClient.isConnected()){ + subClient.reconnect(); + } + }catch (Exception e){ + e.printStackTrace(); + } } } -- Gitblit v1.9.3