package com.doumee.service.business.impl;
|
|
import com.alibaba.fastjson.JSONObject;
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
import com.doumee.core.constants.Constants;
|
import com.doumee.core.exception.BusinessException;
|
import com.doumee.core.mqtt.config.MqttConfig;
|
import com.doumee.dao.business.MqttLogMapper;
|
import com.doumee.dao.business.model.*;
|
import com.doumee.service.business.ActionLogService;
|
import com.doumee.service.business.DeviceSubcribeService;
|
import com.doumee.service.business.MemberRidesService;
|
import com.doumee.service.business.SitesService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.lang3.StringUtils;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.context.annotation.Lazy;
|
import org.springframework.stereotype.Service;
|
|
import java.util.Date;
|
|
/**
|
* 与硬件对接服务
|
* @author 江蹄蹄
|
* @date 2023/10/09 18:06
|
*/
|
@Service
|
@Slf4j
|
public class DeviceSubscribeServiceImpl implements DeviceSubcribeService {
|
@Autowired
|
@Lazy
|
MemberRidesService memberRidesService;
|
@Lazy
|
@Autowired
|
SitesService sitesService;
|
@Autowired
|
private MqttLogMapper mqttLogMapper;
|
@Autowired
|
@Lazy
|
private ActionLogService actionLogService;
|
@Autowired
|
private MqttConfig mqttConfig;
|
@Override
|
public void listener(String msgId,String param,String topic) {
|
log.info("mqtt消息订阅==================="+param);
|
String info = Constants.MqttTopic.sub_lockInfo.substring(Constants.MqttTopic.sub_lockInfo.lastIndexOf("/")+1) ;
|
String closeLock = Constants.MqttTopic.sub_closeLock.substring(Constants.MqttTopic.sub_closeLock.lastIndexOf("/")+1) ;
|
String health = Constants.MqttTopic.sub_health.substring(Constants.MqttTopic.sub_health.lastIndexOf("/")+1) ;
|
String adminCard = Constants.MqttTopic.sub_adminCard.substring(Constants.MqttTopic.sub_adminCard.lastIndexOf("/")+1) ;
|
String siteInfo = Constants.MqttTopic.sub_siteInfo.substring(Constants.MqttTopic.sub_siteInfo.lastIndexOf("/")+1) ;
|
String brokers = "brokers";
|
if(!StringUtils.contains(topic, brokers)){
|
if(topic.indexOf(Constants.MqttTopic.topic_index)!=0
|
||topic.split("/").length < 4
|
|| (!StringUtils.contains(topic, info)
|
&&!StringUtils.contains(topic, siteInfo)
|
&&!StringUtils.contains(topic, adminCard)
|
&&!StringUtils.contains(topic, health)
|
&&!StringUtils.contains(topic,closeLock))){
|
log.error("mqtt消息订阅===========无效数据========"+param);
|
return;
|
}
|
}
|
MqttLog mqttLog = new MqttLog();
|
mqttLog.setMsgId(msgId);
|
mqttLog.setTopic(topic);
|
long msgCount =mqttLogMapper.selectCount(new QueryWrapper<MqttLog>().lambda().eq(MqttLog::getMsg, param).eq(MqttLog::getType, Constants.ZERO));
|
if(msgCount>0){
|
log.error("mqtt消息订阅==============已消费数据====="+param);
|
return;
|
}
|
String logId =Constants.getUUID();
|
String logInfo = "";
|
int result =0;
|
try {
|
if(!topic.contains(brokers)){
|
String[] ss = topic.split("/");
|
String siteid = ss[2];//站点编码
|
if(StringUtils.isBlank(siteid) ){
|
//如果锁头编码为空
|
log.error("mqtt消息订阅==============无效数据====="+topic+param);
|
return;
|
}
|
if(StringUtils.contains(topic, info)){
|
//如果锁头信息上报
|
Integer lockid = Constants.formatIntegerFromStr(ss[3]) ;//锁头编码
|
if( lockid == null){
|
//如果锁头编码为空
|
log.error("mqtt消息订阅==============无效数据====="+topic+param);
|
return;
|
}
|
Locks locks = JSONObject.parseObject(param, Locks.class);
|
locks.setSiteId(siteid);
|
locks.setCode(lockid);
|
locks.setInfo(logId);
|
result = memberRidesService.mqttLockInfoEvent(locks);
|
logInfo = "mqtt消息订阅锁头信息";
|
log.info("mqtt消息订阅=========锁信息==========成功");
|
}else if(StringUtils.contains(topic, closeLock)){
|
//如果还车上报
|
Integer lockid = Constants.formatIntegerFromStr(ss[3]);//锁头编码
|
if( lockid == null){
|
//如果锁头编码为空
|
log.error("mqtt消息订阅==============无效数据====="+topic+param);
|
return;
|
}
|
JSONObject pjson = JSONObject.parseObject(param);
|
MemberRides bikes = new MemberRides();
|
bikes.setBikeCode(pjson.getString("bikeCode"));
|
bikes.setBackLockId( lockid);
|
bikes.setBackSiteId( siteid);
|
bikes.setBackCommondId(logId);
|
bikes.setStatus(pjson.getInteger("status"));
|
result = memberRidesService.mqttCloseBikeEvent(bikes);
|
logInfo = "mqtt消息订阅还车消息";
|
log.info("mqtt消息订阅=========还车==========成功");
|
}else if(StringUtils.contains(topic, health)){
|
//心跳消息=
|
Sites site = new Sites();
|
site.setId(siteid);
|
site.setOnline(Constants.ZERO);
|
site.setLastLinkDate(new Date());
|
sitesService.updateByMqtt(site);//更新站点状态
|
logInfo = "mqtt消息订阅心跳消息";
|
log.info("mqtt消息订阅=========心跳消息==========成功");
|
|
}else if(StringUtils.contains(topic, adminCard)){
|
//管理员刷卡开锁
|
JSONObject pjson = JSONObject.parseObject(param);
|
ActionLog actionLog = new ActionLog();
|
actionLog.setParam3(pjson.getString("bikeCode"));
|
actionLog.setParam2(pjson.getString("lockId"));
|
actionLog.setTitle(pjson.getString("cardNo"));
|
actionLog.setParam(siteid);
|
actionLog.setType(Constants.TWO);
|
actionLog.setIsdeleted(Constants.ZERO);
|
actionLog.setCreateDate(new Date());
|
actionLogService.create(actionLog);
|
logInfo = "管理员刷卡开锁";
|
log.info("mqtt消息订阅=========管理员刷卡开锁==========成功");
|
} else if(StringUtils.contains(topic, siteInfo)){
|
//如果锁头全量信息上报
|
JSONObject pjson = JSONObject.parseObject(param);
|
Sites sites = new Sites();
|
sites.setId(siteid);
|
sites.setClientVersion(pjson.getString("version"));
|
sites.setLockNum(pjson.getInteger("locks"));
|
sitesService.dealSiteLocks(sites);
|
logInfo = "mqtt消息订阅锁头全量信息上报";
|
log.info("mqtt消息订阅=========锁头全量信息上报==========成功");
|
}
|
}else {
|
//如果站点上下线消息
|
JSONObject pjson = JSONObject.parseObject(param);
|
String clientId = String.valueOf(pjson.get("clientid"));
|
if(clientId.contains("doumee")){
|
log.error("mqtt消息订阅==============无效订阅状态====="+topic+param);
|
return;
|
}
|
Sites site = sitesService.findById(clientId);
|
if(site == null){
|
log.error("mqtt消息订阅==============无效订阅状态====="+topic+param);
|
return;
|
}
|
if (topic.endsWith("disconnected")) {
|
if(Constants.formatIntegerNum(site.getOnline()) == Constants.ONE){
|
log.error("mqtt消息订阅==============已订阅状态掉线====="+param);
|
return;
|
}
|
site.setOnline(Constants.ONE);
|
} else {
|
if(Constants.formatIntegerNum(site.getOnline()) == Constants.ZERO){
|
log.error("mqtt消息订阅==============已订阅状态上线====="+param);
|
return;
|
}
|
site.setOnline(Constants.ZERO);
|
}
|
site.setLastLinkDate(new Date());
|
sitesService.updateByMqtt(site);//更新站点状态
|
log.info("mqtt消息订阅=========站点上下线==========成功");
|
}
|
}catch (BusinessException e){
|
e.printStackTrace();
|
logInfo = "mqtt消息订阅错误==="+e.getMessage();
|
result =1;
|
}catch (Exception e){
|
e.printStackTrace();
|
logInfo = "mqtt消息订阅异常==="+e.getMessage();
|
result =1;
|
}
|
createSubLog(topic,msgId,logId,result,param,logInfo);
|
}
|
|
|
|
private void createSubLog(String topic, String msgId,String logId, int result,String param,String info) {
|
MqttLog log = new MqttLog();
|
log.setId(logId);
|
log.setCreateDate(new Date());
|
log.setResult(result);
|
log.setTopic(topic);
|
log.setClientid(mqttConfig.getClientid());
|
log.setHostInfo(mqttConfig.getHost());
|
log.setInfo(JSONObject.toJSONString(mqttConfig));
|
log.setType(Constants.ZERO);
|
log.setMsg(param);
|
log.setInfo(info);
|
log.setIsdeleted(Constants.ZERO);
|
log.setMsgId(msgId);
|
mqttLogMapper.insert(log);
|
}
|
private String getLockIdFromTopic(String topic) {
|
String[] ss = topic.split("/");
|
if(ss.length>2){
|
return ss[ss.length-2];
|
}
|
return null;
|
// topic = topic.substring(0,topic.lastIndexOf("/"));
|
// String id = topic.substring( topic.lastIndexOf("/")+1);
|
// return id;
|
}
|
}
|