111
k94314517
2025-03-04 bfd4f08b4304cac2822db86de3712e1c6b37f6ab
server/services/src/main/java/com/doumee/core/mqtt/config/MqttClientInit.java
@@ -1,21 +1,33 @@
package com.doumee.core.mqtt.config;
import com.doumee.config.SpringContextUtil;
import com.doumee.core.constants.Constants;
import com.doumee.core.mqtt.service.MqttPushCallback;
import com.doumee.core.mqtt.service.MqttSubService;
import com.doumee.core.mqtt.service.MqttToolService;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
public class MqttClientInit {
        static MqttClient client;
        static MqttClient subClient;
        public static boolean isNeedReSub =false ;
        public static synchronized MqttClient getInstance(MqttConfig config ){
            if(client !=null){
                if(!client.isConnected()){
                    try {
                        client.reconnect();
                    } catch (MqttException e) {
                        throw new RuntimeException(e);
                    }
                }
                return  client;
            }
            try {
                // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
                String clientId =config.getClientid() +config.getVersion();
@@ -45,6 +57,7 @@
        }
        public static synchronized MqttClient getSubInstance(MqttConfig config,MqttPushCallback callBack){
            if(subClient !=null){
                refreshClient();
                return  subClient;
            }
            try {
@@ -78,11 +91,22 @@
        }
        public static synchronized void isSubClientValid()   {
            if(isNeedReSub){
                MqttSubService service = SpringContextUtil.getBean(MqttSubService.class);
                if(service!=null){
                    service.startSubcribe();
                }
            }
        }
        public static synchronized void refreshClient() throws MqttException {
            if(client !=null){
                boolean result = client.isConnected();
                   client.reconnect();
        public static synchronized void refreshClient() {
            try {
                if(subClient !=null &&  !subClient.isConnected()){
                    subClient.reconnect();
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }