jiangping
2023-10-13 50c6151c164d8d6bd4d052257c75ba424883fecb
mqtt
已添加3个文件
已修改13个文件
246 ■■■■ 文件已修改
server/platform/src/main/java/com/doumee/api/common/TestController.java 62 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/platform/src/main/java/com/doumee/config/shiro/ShiroConfig.java 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/platform/src/main/java/com/doumee/service/MqttSubService.java 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/platform/src/main/java/com/doumee/service/impl/MqttSubServiceImpl.java 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/platform/src/main/resources/application.yml 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/services/src/main/java/com/doumee/core/mqtt/config/MqttClientInit.java 50 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/services/src/main/java/com/doumee/core/mqtt/config/MqttConfig.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/services/src/main/java/com/doumee/core/mqtt/service/MqttPushCallback.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/services/src/main/java/com/doumee/core/mqtt/service/MqttToolService.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/services/src/main/java/com/doumee/dao/business/model/MqttLog.java 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/services/src/main/java/com/doumee/service/business/DeviceService.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/services/src/main/java/com/doumee/service/business/DeviceSubcribeService.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/services/src/main/java/com/doumee/service/business/impl/DeviceServiceImpl.java 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/services/src/main/java/com/doumee/service/business/impl/DeviceSubscribeServiceImpl.java 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/services/src/main/java/com/doumee/service/business/impl/MemberRidesServiceImpl.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/自行车mqtt协议.md 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/platform/src/main/java/com/doumee/api/common/TestController.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,62 @@
package com.doumee.api.common;
import com.alibaba.fastjson.JSONObject;
import com.doumee.api.BaseController;
import com.doumee.biz.system.SystemDictDataBiz;
import com.doumee.core.annotation.trace.Trace;
import com.doumee.core.constants.Constants;
import com.doumee.core.utils.DateUtil;
import com.doumee.core.utils.aliyun.ALiYunUtil;
import com.doumee.service.business.DeviceService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.multipart.MultipartHttpServletRequest;
import org.springframework.web.multipart.commons.CommonsMultipartResolver;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
/**
 * @author Eva.Caesar Liu
 * @date 2023/02/14 11:14
 */
@Api(tags = "测试接口工具")
@Trace(exclude = true)
@RestController
@RequestMapping("/test")
@Slf4j
public class TestController extends BaseController {
    @Autowired
    private DeviceService deviceService;
    @ApiOperation(value = "测试mqtt发布消息", notes = "上传", httpMethod = "POST", position = 6)
    @ApiImplicitParams({
        @ApiImplicitParam(name = "topic", value = "主题", required = true, paramType = "query", dataType = "String", dataTypeClass = String.class),
        @ApiImplicitParam(name = "json", value = "内容", required = true, paramType = "query", dataType = "String", dataTypeClass = String.class),
    })
    @PostMapping(value = "/testPush" )
    public void testPush(@RequestParam String topic,@RequestParam String json, HttpServletRequest request, HttpServletResponse response) throws Exception {
        deviceService.testPush(topic,json);
    }
}
server/platform/src/main/java/com/doumee/config/shiro/ShiroConfig.java
@@ -89,13 +89,6 @@
        Map<String, String> map = new LinkedHashMap<>();
        // è·¯å¾„拦截配置
        // è·¯å¾„拦截配置
//        map.put("/system/dictData/companyUserRules","anon");
//        map.put("/system/login", "anon");
//        map.put("/system/logout", "anon");
//        map.put("/common/captcha", "anon");
//        map.put("/business/areas/*", "anon");
//        map.put("/public/uploadPicture","anon");
//        map.put("/public/uploadLocal","anon");
        map.put("/system/dictData/companyUserRules","anon");
        map.put("/system/login", "anon");
@@ -103,17 +96,9 @@
        map.put("/system/loginH5", "anon");
        map.put("/common/captcha", "anon");
        map.put("/business/areas/*", "anon");
        map.put("/test/testPush","anon");
        map.put("/public/uploadPicture","anon");
        map.put("/public/uploadLocal","anon");
//        map.put("/business/company/register", "anon");
//        map.put("/business/labels/page","anon");
//        map.put("/business/*/list","anon");
//        map.put("/business/goods/goodsPage","anon");
//        map.put("/business/goods/h5Image","anon");
//        map.put("/business/goods/export","anon");
//        map.put("/business/goods/listForH5","anon");
        // - æ”¾è¡Œswagger
server/platform/src/main/java/com/doumee/service/MqttSubService.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,16 @@
package com.doumee.service;
import com.doumee.dao.business.model.Locks;
import com.doumee.dao.business.model.MqttLog;
/**
 * ä¸Žç¡¬ä»¶å¯¹æŽ¥æœåŠ¡
 * @author æ±Ÿè¹„蹄
 * @date 2023/10/09 18:06
 */
public interface MqttSubService {
    void startSubcribe();
}
server/platform/src/main/java/com/doumee/service/impl/MqttSubServiceImpl.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,36 @@
package com.doumee.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.doumee.core.constants.Constants;
import com.doumee.core.mqtt.config.MqttConfig;
import com.doumee.core.mqtt.service.MqttToolService;
import com.doumee.dao.business.MqttLogMapper;
import com.doumee.dao.business.model.Locks;
import com.doumee.dao.business.model.MqttLog;
import com.doumee.service.MqttSubService;
import com.doumee.service.business.DeviceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Date;
/**
 *  ä¸Žç¡¬ä»¶å¯¹æŽ¥æœåŠ¡
 * @author æ±Ÿè¹„蹄
 * @date 2023/10/09 18:06
 */
@Service
public class MqttSubServiceImpl implements MqttSubService {
    @Autowired
    private MqttToolService mqttToolService;
    @Autowired
    private MqttLogMapper mqttLogMapper;
    @Autowired
    private MqttConfig mqttConfig;
    @Override
    @PostConstruct
    public void startSubcribe() {
        mqttToolService.subscribe(new String[]{ Constants.MqttTopic.lockInfo, Constants.MqttTopic.closeLock});
    }
}
server/platform/src/main/resources/application.yml
@@ -62,4 +62,5 @@
mqtt:
  clientid: doumeetestplat
  subclientid: doumeetestplatSub
server/services/src/main/java/com/doumee/core/mqtt/config/MqttClientInit.java
@@ -10,14 +10,49 @@
public class MqttClientInit {
        static MqttClient client;
        static MqttClient subClient;
        public static synchronized MqttClient getInstance(MqttConfig config,MqttPushCallback callBack){
        public static synchronized MqttClient getInstance(MqttConfig config ){
            if(client !=null){
                return  client;
            }
            try {
                // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
                client = new org.eclipse.paho.client.mqttv3.MqttClient(config.getHost(), config.getClientid()+ Constants.getUUID(),new MemoryPersistence());
//                String clientId =config.getClientid()+ Constants.getUUID();
                String clientId =config.getClientid();
                client = new org.eclipse.paho.client.mqttv3.MqttClient(config.getHost(), clientId,new MemoryPersistence());
                // MQTT的连接设置
                MqttConnectOptions   options = new MqttConnectOptions();
                // è®¾ç½®æ˜¯å¦æ¸…空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
                options.setCleanSession(false);
                // è®¾ç½®è¿žæŽ¥çš„用户名
                options.setUserName(config.getUsername());
                // è®¾ç½®è¿žæŽ¥çš„密码
                options.setPassword(config.getPassword().toCharArray());
                // è®¾ç½®è¶…æ—¶æ—¶é—´ å•位为秒
                options.setConnectionTimeout(10);
                // è®¾ç½®ä¼šè¯å¿ƒè·³æ—¶é—´ å•位为秒 æœåŠ¡å™¨ä¼šæ¯éš”1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
                options.setKeepAliveInterval(20);
                //设置断开后重新连接
                options.setAutomaticReconnect(true);
                MqttTopic topic = client.getTopic(clientId+"_close");
                //遗嘱
                options.setWill(topic, "close".getBytes(), 1, true);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return  client;
        }
        public static synchronized MqttClient getSubInstance(MqttConfig config,MqttPushCallback callBack){
            if(subClient !=null){
                return  subClient;
            }
            try {
                // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
//                String clientId =config.getClientid()+ Constants.getUUID();
                String clientId =config.getSubclientid();
                subClient = new org.eclipse.paho.client.mqttv3.MqttClient(config.getHost(), clientId,new MemoryPersistence());
                // MQTT的连接设置
                MqttConnectOptions   options = new MqttConnectOptions();
                // è®¾ç½®æ˜¯å¦æ¸…空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
@@ -31,18 +66,21 @@
                // è®¾ç½®ä¼šè¯å¿ƒè·³æ—¶é—´ å•位为秒 æœåŠ¡å™¨ä¼šæ¯éš”1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
                options.setKeepAliveInterval(20);
                // è®¾ç½®å›žè°ƒ
                client.setCallback(callBack);
                subClient.setCallback(callBack);
                //设置断开后重新连接
                options.setAutomaticReconnect(true);
                MqttTopic topic = client.getTopic(config.getClientid()+"_close");
                MqttTopic topic = subClient.getTopic(clientId+"_close");
                //遗嘱
                options.setWill(topic, "close".getBytes(), 1, true);
                client.connect(options);
                subClient.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return  client;
            return  subClient;
        }
        public static synchronized void refreshClient() throws MqttException {
            if(client !=null){
                boolean result = client.isConnected();
server/services/src/main/java/com/doumee/core/mqtt/config/MqttConfig.java
@@ -25,6 +25,10 @@
         */
        private String clientid;
        /**
         * å®¢æˆ·ç«¯ç¼–码用户订阅
         */
        private String subclientid;
        /**
         * æŽˆæƒè´¦å·
         */
        private String username ;
server/services/src/main/java/com/doumee/core/mqtt/service/MqttPushCallback.java
@@ -38,9 +38,13 @@
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            // subscribe后得到的消息会执行到这里面
            try {
            System.out.println("接收消息主题 : " + topic);
            System.out.println("接收消息Qos : " + message.getQos());
            System.out.println("接收消息内容 : " + new String(message.getPayload()));
            deviceSubcribeService.listener(new String(message.getPayload()),topic);
                deviceSubcribeService.listener(message.getId()+"",new String(message.getPayload()),topic);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
}
server/services/src/main/java/com/doumee/core/mqtt/service/MqttToolService.java
@@ -26,7 +26,7 @@
            for (int i = 0; i < Qos.length; i++) {
                Qos[i] = 1;
            }
            MqttClientInit.getInstance(config,callBack).subscribe(topics, Qos);
            MqttClientInit.getSubInstance(config,callBack).subscribe(topics, Qos);
        } catch (Exception e) {
            e.printStackTrace();
        }
@@ -42,7 +42,7 @@
        mess.setRetained(true);
        mess.setPayload(message.getBytes());
        try {
            MqttClientInit.getInstance(config,callBack).publish(topic, mess);
            MqttClientInit.getInstance(config).publish(topic, mess);
            return  1;
        } catch (Exception e) {
            //LOGGER.error(e.getLocalizedMessage());
server/services/src/main/java/com/doumee/dao/business/model/MqttLog.java
@@ -18,7 +18,7 @@
 */
@Data
@ApiModel("系统行为操作记录表")
@TableName("`mqtt_log`")
@TableName("\"mqtt_log\"")
public class MqttLog {
    @ApiModelProperty(value = "编码")
@@ -73,6 +73,9 @@
    @ApiModelProperty(value = "消息内容")
    @ExcelColumn(name="消息内容")
    private String msg;
    @ApiModelProperty(value = "mqtt消息类容")
    @ExcelColumn(name="mqtt消息类容")
    private String msgId;
    @ApiModelProperty(value = "结果 0成功 1失败", example = "1")
    @ExcelColumn(name="结果 0成功 1失败")
server/services/src/main/java/com/doumee/service/business/DeviceService.java
@@ -17,8 +17,7 @@
     */
    MqttLog openLock(Locks locks);
    void startSubcribe();
    MqttLog getLockInfo(Locks locks);
    void testPush(String topic, String json);
}
server/services/src/main/java/com/doumee/service/business/DeviceSubcribeService.java
@@ -13,6 +13,6 @@
     * è®¾å¤‡ä¸ŠæŠ¥æ•°æ®ï¼ˆä¸ŠæŠ¥ï¼‰
     * @param param ä¸ŠæŠ¥å‚æ•°
     */
    void listener(String param ,String topic);
    void listener(String msgId,String param ,String topic);
}
server/services/src/main/java/com/doumee/service/business/impl/DeviceServiceImpl.java
@@ -9,6 +9,7 @@
import com.doumee.dao.business.model.MqttLog;
import com.doumee.service.business.DeviceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Date;
@@ -26,12 +27,6 @@
    private MqttLogMapper mqttLogMapper;
    @Autowired
    private MqttConfig mqttConfig;
    @Override
    @PostConstruct
    public void startSubcribe() {
        mqttToolService.subscribe(new String[]{ Constants.MqttTopic.openLock, Constants.MqttTopic.closeLock});
    }
    /**
     * å‘起开锁指令
     * @param locks
@@ -56,6 +51,11 @@
        MqttLog mqttLog = createPushLog(topic,result,"实时查询锁信息_"+locks.getId());
        return mqttLog;
    }
    @Override
    @Async
    public void testPush(String topic, String json){
        int result = mqttToolService.pubMessage(json,topic);
    }
    private MqttLog createPushLog(String topic, int result,String info) {
        MqttLog log = new MqttLog();
server/services/src/main/java/com/doumee/service/business/impl/DeviceSubscribeServiceImpl.java
@@ -1,6 +1,7 @@
package com.doumee.service.business.impl;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.doumee.core.constants.Constants;
import com.doumee.core.exception.BusinessException;
import com.doumee.core.mqtt.config.MqttConfig;
@@ -36,10 +37,10 @@
    @Autowired
    private MqttConfig mqttConfig;
    @Override
    public void listener(String param,String topic) {
    public void listener(String msgId,String param,String topic) {
       log.info("mqtt消息订阅==================="+param);
        String info = Constants.MqttTopic.lockInfo.substring(Constants.MqttTopic.lockInfo.lastIndexOf("/")+1) ;
        String closeLock = Constants.MqttTopic.lockInfo.substring(Constants.MqttTopic.closeLock.lastIndexOf("/")+1) ;
        String closeLock = Constants.MqttTopic.closeLock.substring(Constants.MqttTopic.closeLock.lastIndexOf("/")+1) ;
        if(topic.indexOf(Constants.MqttTopic.topic_index)!=0
                || (!StringUtils.contains(topic, info)
                   &&!StringUtils.contains(topic,closeLock))){
@@ -50,6 +51,12 @@
        if(StringUtils.isBlank(lockid)){
            //如果锁头编码为空
            log.error("mqtt消息订阅==============无效数据====="+param);
            return;
        }
        MqttLog mqttLog = new MqttLog();
        mqttLog.setMsgId(msgId);
        if(mqttLogMapper.selectCount(new QueryWrapper<MqttLog>().lambda().eq(MqttLog::getMsgId, msgId)) >0){
            log.error("mqtt消息订阅==============已消费数据====="+param);
            return;
        }
        String logId =Constants.getUUID();
@@ -71,7 +78,7 @@
                MemberRides bikes = new MemberRides();
                bikes.setBikeCode(pjson.getString("bikeCode"));
                bikes.setBackLockId( pjson.getString("lockId"));
                bikes.setBackLockId( pjson.getString("siteId"));
                bikes.setBackSiteId( pjson.getString("siteId"));
                bikes.setBackCommondId(logId);
                result = memberRidesService.mqttCloseBikeEvent(bikes);
                logInfo = "mqtt消息订阅还车消息";
@@ -84,9 +91,9 @@
            logInfo = "mqtt消息订阅异常===";
            result =1;
        }
        createSubLog(topic,logId,result,param,logInfo);
        createSubLog(topic,msgId,logId,result,param,logInfo);
    }
    private void createSubLog(String topic, String logId, int result,String param,String info) {
    private void createSubLog(String topic, String msgId,String logId, int result,String param,String info) {
        MqttLog log = new MqttLog();
        log.setId(logId);
        log.setCreateDate(new Date());
@@ -98,6 +105,7 @@
        log.setType(Constants.ZERO);
        log.setMsg(param);
        log.setInfo(info);
        log.setMsgId(msgId);
        mqttLogMapper.insert(log);
    }
    private String getLockIdFromTopic(String topic) {
server/services/src/main/java/com/doumee/service/business/impl/MemberRidesServiceImpl.java
@@ -598,7 +598,8 @@
    //TODO-----JP------------根据车辆code分析车辆类型,待确认方案-------------------
    private String getBileTypeByCode(String bikeCode) {
        return null;
        int temp=1+(int)(Math.random()*(8));
        return temp+"";
    }
    private Locks dealLockAndSite(Locks locks) {
@@ -653,7 +654,7 @@
    @Override
    public  int mqttCloseBikeEvent(MemberRides bikes){
        //免费骑行时长查询,数据字典配置
        if(StringUtils.isBlank(bikes.getBikeCode()) ||StringUtils.isBlank(bikes.getRentLockId())){
        if(StringUtils.isBlank(bikes.getBikeCode()) ||StringUtils.isBlank(bikes.getBackLockId())||StringUtils.isBlank(bikes.getBackSiteId())){
          throw new BusinessException(ResponseStatus.BAD_REQUEST.getCode(),"还车上报参数错误!");
        }
        QueryWrapper<MemberRides> wrapper = new QueryWrapper<>();
server/×ÔÐгµmqttЭÒé.md
@@ -16,6 +16,8 @@
    "status": 1, // çŠ¶æ€ï¼Œ0闭合, 1打开,2运行中, 3异常
    "bikeCode": "1234567890" // è‡ªè¡Œè½¦ic卡号,无车为空
}
## {"bikeCode":"1234567890","id":"kjflksjlkfsjdlk","siteId": 456,"status":0,"code":"789"}
```
## pub: device/lock/{id}/getInfo
@@ -40,6 +42,7 @@
    "bikeCode": "1234567890",
    "lockId": 123,
    "siteId": 234,
    "time": "2023-10-13 10:12:90" // ç«™ç‚¹ä¸Šä½æœºæ”¶åˆ°è¿˜è½¦æŒ‡ä»¤çš„æ—¶é—´ï¼Œä»…做参考,请以服务器时间为准
    "time": "2023-10-13 10:12:90"
}
## {"bikeCode":"1234567890","lockId":789,"siteId": 456,"time":"2023-10-13 10:12:90"}
```