package com.doumee.service.business.impl.mqtt; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.doumee.core.utils.Constants; import com.doumee.core.utils.DateUtil; import com.doumee.dao.business.DeviceDataMapper; import com.doumee.dao.business.DeviceMapper; import com.doumee.dao.business.model.Device; import com.doumee.dao.business.model.DeviceData; import com.doumee.mqtt.config.MqttClientInit; import com.doumee.mqtt.config.MqttConfig; import com.doumee.mqtt.service.MqttBizService; import com.doumee.service.business.DeviceService; import lombok.extern.slf4j.Slf4j; import me.chanjar.weixin.common.util.DataUtils; import org.apache.commons.lang3.StringUtils; import org.eclipse.paho.client.mqttv3.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.Date; import java.util.List; /** * @author T14 */ @Service @Slf4j public class MqttPushCallback implements MqttCallback { @Autowired private DeviceMapper deviceMapper; @Autowired private DeviceDataMapper deviceDataMapper; @Override public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连 log.error("连接断开,重连中"); MqttClientInit.refreshClient(); } @Override public void deliveryComplete(IMqttDeliveryToken token) { log.error("deliveryComplete---------" + token.isComplete()); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe后得到的消息会执行到这里面 try { Date date = new Date(); log.error("接收消息主题 : " + DateUtil.getPlusTime2(date) + topic); log.error("接收消息Qos : " + DateUtil.getPlusTime2(date) + message.getQos()); log.error("接收消息内容 : " + DateUtil.getPlusTime2(date) + new String(message.getPayload())); // ------------TODO-----------监听数据 //-----------------/dev/MTS/98CC4D121E5A/status/json if(StringUtils.contains(topic,"/status/json")){ dealDeviceDataT30A(topic,message); } }catch (Exception e){ e.printStackTrace(); } } private void dealDeviceDataT30A(String topic, MqttMessage message) { try { Date date = new Date(); //如果是空开上报数据 String t[] = topic.replace("/dev/","").replace("/status/json","").split("/"); String no =t[0]; String doorNo = t[1]; Device device = deviceMapper.selectOne(new QueryWrapper().lambda() .eq(Device::getNo,no) .eq(Device::getIsdeleted,no) .eq(Device::getDoorNo,doorNo)); if(device!=null && Constants.equalsInteger(device.getIsUsed(),Constants.ZERO)){ //如果设备存在并且设备在用,记录上报数据 List dataList = new ArrayList<>(); JSONObject jsob = JSONObject.parseObject(new String(message.getPayload())); if(jsob.getJSONArray("data")!=null && jsob.getJSONArray("data").size()>0){ for (int i = 0; i < jsob.getJSONArray("data").size(); i++) { JSONObject jsonObject = jsob.getJSONArray("data").getJSONObject(i); DeviceData data = new DeviceData(); data.setDataJson(jsob.toJSONString()); data.setCreateDate(date); data.setEditDate(date); data.setVal2("0");//电流值 data.setVal3("0"); data.setVal4("0"); data.setVal5("0"); data.setVal6("0"); data.setHappenTime(DateUtil.getPlusTime2(new Date(jsob.getLong("timeMS")))); //模块序号 if(jsonObject.getJSONObject("mcbComSta")!=null && jsonObject.getJSONObject("mcbComSta").getString("busAddr")!=null){ data.setVal1(jsonObject.getJSONObject("mcbComSta").getString("busAddr")); //<0>表示分闸,<1>表示合闸,无符号 data.setVal7(jsonObject.getJSONObject("mcbComSta").getString("on")); } if(jsonObject.getJSONObject("mcbSta")!=null){ //电流 if(jsonObject.getJSONObject("mcbSta").getString("cur")!=null){ data.setVal2(jsonObject.getJSONObject("mcbSta").getString("cur")); } //电压 if(jsonObject.getJSONObject("mcbSta").getString("vol")!=null){ data.setVal3(jsonObject.getJSONObject("mcbSta").getString("vol")); } //温度 if(jsonObject.getJSONObject("mcbSta").getString("tmp")!=null){ data.setVal4(jsonObject.getJSONObject("mcbSta").getString("tmp")); } //有功功率值,单位kW if(jsonObject.getJSONObject("mcbSta").getString("pwrP")!=null){ data.setVal5(jsonObject.getJSONObject("mcbSta").getString("pwrP")); } //N相电流值,单位A if(jsonObject.getJSONObject("mcbSta").getString("curN")!=null){ data.setVal6(jsonObject.getJSONObject("mcbSta").getString("curN")); } } data.setDeviceId(device.getId().toString()); data.setDataType(Constants.ZERO); if(StringUtils.isNotBlank(data.getVal1())){ dataList.add(data ); } } } if(dataList.size()>0){ deviceDataMapper.insert(dataList); } } }catch (Exception e){ log.error("===============topic数据记录失败:"+topic+e.getMessage()); } } }