From f9691d544e62d6c04dbfe45d05a6c7bc5e004291 Mon Sep 17 00:00:00 2001
From: jiangping <jp@doumee.com>
Date: 星期五, 29 十二月 2023 11:52:29 +0800
Subject: [PATCH] 服务商

---
 server/services/src/main/java/com/doumee/core/mqtt/config/MqttClientInit.java |   64 +++++++++++++++++++++++++-------
 1 files changed, 50 insertions(+), 14 deletions(-)

diff --git a/server/services/src/main/java/com/doumee/core/mqtt/config/MqttClientInit.java b/server/services/src/main/java/com/doumee/core/mqtt/config/MqttClientInit.java
index 8085cba..43e47d2 100644
--- a/server/services/src/main/java/com/doumee/core/mqtt/config/MqttClientInit.java
+++ b/server/services/src/main/java/com/doumee/core/mqtt/config/MqttClientInit.java
@@ -1,21 +1,37 @@
 package com.doumee.core.mqtt.config;
 
+import com.doumee.config.SpringContextUtil;
+import com.doumee.core.constants.Constants;
 import com.doumee.core.mqtt.service.MqttPushCallback;
+import com.doumee.core.mqtt.service.MqttSubService;
+import com.doumee.core.mqtt.service.MqttToolService;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.MqttTopic;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.beans.factory.annotation.Autowired;
 
 public class MqttClientInit {
         static MqttClient client;
-
-        public static synchronized MqttClient getInstance(MqttConfig config,MqttPushCallback callBack){
+        static MqttClient subClient;
+        public static boolean isNeedReSub =false ;
+        public static synchronized MqttClient getInstance(MqttConfig config ){
             if(client !=null){
+                if(!client.isConnected()){
+                    try {
+                        client.reconnect();
+                    } catch (MqttException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
                 return  client;
             }
+
             try {
                 // host涓轰富鏈哄悕锛宑lientid鍗宠繛鎺QTT鐨勫鎴风ID锛屼竴鑸互鍞竴鏍囪瘑绗﹁〃绀猴紝MemoryPersistence璁剧疆clientid鐨勪繚瀛樺舰寮忥紝榛樿涓轰互鍐呭瓨淇濆瓨
-                client = new org.eclipse.paho.client.mqttv3.MqttClient(config.getHost(), config.getClientid() ,new MemoryPersistence());
+                String clientId =config.getClientid() +config.getVersion();
+                client = new org.eclipse.paho.client.mqttv3.MqttClient(config.getHost(), clientId,new MemoryPersistence());
                 // MQTT鐨勮繛鎺ヨ缃�
                 MqttConnectOptions   options = new MqttConnectOptions();
                 // 璁剧疆鏄惁娓呯┖session,杩欓噷濡傛灉璁剧疆涓篺alse琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩炴帴璁板綍锛岃缃负true琛ㄧず姣忔杩炴帴鍒版湇鍔″櫒閮戒互鏂扮殑韬唤杩炴帴
@@ -28,11 +44,9 @@
                 options.setConnectionTimeout(10);
                 // 璁剧疆浼氳瘽蹇冭烦鏃堕棿 鍗曚綅涓虹 鏈嶅姟鍣ㄤ細姣忛殧1.5*20绉掔殑鏃堕棿鍚戝鎴风鍙戦�佷釜娑堟伅鍒ゆ柇瀹㈡埛绔槸鍚﹀湪绾匡紝浣嗚繖涓柟娉曞苟娌℃湁閲嶈繛鐨勬満鍒�
                 options.setKeepAliveInterval(20);
-                // 璁剧疆鍥炶皟
-                client.setCallback(callBack);
                 //璁剧疆鏂紑鍚庨噸鏂拌繛鎺�
                 options.setAutomaticReconnect(true);
-                MqttTopic topic = client.getTopic(config.getClientid()+"_close");
+                MqttTopic topic = client.getTopic(clientId+"_close");
                 //閬楀槺
                 options.setWill(topic, "close".getBytes(), 1, true);
                 client.connect(options);
@@ -41,13 +55,15 @@
             }
             return  client;
         }
-        public static synchronized MqttClient refreshClient(MqttConfig config,MqttPushCallback callBack){
-            if(client !=null){
-                return  client;
+        public static synchronized MqttClient getSubInstance(MqttConfig config,MqttPushCallback callBack){
+            if(subClient !=null){
+                refreshClient();
+                return  subClient;
             }
             try {
                 // host涓轰富鏈哄悕锛宑lientid鍗宠繛鎺QTT鐨勫鎴风ID锛屼竴鑸互鍞竴鏍囪瘑绗﹁〃绀猴紝MemoryPersistence璁剧疆clientid鐨勪繚瀛樺舰寮忥紝榛樿涓轰互鍐呭瓨淇濆瓨
-                client = new org.eclipse.paho.client.mqttv3.MqttClient(config.getHost(), config.getClientid() ,new MemoryPersistence());
+                String clientId =config.getSubclientid() +config.getVersion();
+                subClient = new org.eclipse.paho.client.mqttv3.MqttClient(config.getHost(), clientId,new MemoryPersistence());
                 // MQTT鐨勮繛鎺ヨ缃�
                 MqttConnectOptions   options = new MqttConnectOptions();
                 // 璁剧疆鏄惁娓呯┖session,杩欓噷濡傛灉璁剧疆涓篺alse琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩炴帴璁板綍锛岃缃负true琛ㄧず姣忔杩炴帴鍒版湇鍔″櫒閮戒互鏂扮殑韬唤杩炴帴
@@ -61,17 +77,37 @@
                 // 璁剧疆浼氳瘽蹇冭烦鏃堕棿 鍗曚綅涓虹 鏈嶅姟鍣ㄤ細姣忛殧1.5*20绉掔殑鏃堕棿鍚戝鎴风鍙戦�佷釜娑堟伅鍒ゆ柇瀹㈡埛绔槸鍚﹀湪绾匡紝浣嗚繖涓柟娉曞苟娌℃湁閲嶈繛鐨勬満鍒�
                 options.setKeepAliveInterval(20);
                 // 璁剧疆鍥炶皟
-                client.setCallback(callBack);
+                subClient.setCallback(callBack);
                 //璁剧疆鏂紑鍚庨噸鏂拌繛鎺�
                 options.setAutomaticReconnect(true);
-                MqttTopic topic = client.getTopic(config.getClientid()+"_close");
+                MqttTopic topic = subClient.getTopic(clientId+"_close");
                 //閬楀槺
                 options.setWill(topic, "close".getBytes(), 1, true);
-                client.connect(options);
+                subClient.connect(options);
             } catch (Exception e) {
                 e.printStackTrace();
             }
-            return  client;
+            return  subClient;
+        }
+
+
+        public static synchronized void isSubClientValid()   {
+            if(isNeedReSub){
+                MqttSubService service = SpringContextUtil.getBean(MqttSubService.class);
+                if(service!=null){
+                    service.startSubcribe();
+                }
+            }
+        }
+
+        public static synchronized void refreshClient() {
+            try {
+                if(subClient !=null &&  !subClient.isConnected()){
+                    subClient.reconnect();
+                }
+            }catch (Exception e){
+                e.printStackTrace();
+            }
         }
 
 }

--
Gitblit v1.9.3