package com.doumee.service.business.impl.mqtt;
|
|
import com.alibaba.fastjson.JSONArray;
|
import com.alibaba.fastjson.JSONObject;
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
import com.doumee.core.utils.Constants;
|
import com.doumee.core.utils.DateUtil;
|
import com.doumee.dao.business.DeviceDataMapper;
|
import com.doumee.dao.business.DeviceMapper;
|
import com.doumee.dao.business.model.Device;
|
import com.doumee.dao.business.model.DeviceData;
|
import com.doumee.mqtt.config.MqttClientInit;
|
import com.doumee.mqtt.config.MqttConfig;
|
import com.doumee.mqtt.service.MqttBizService;
|
import com.doumee.service.business.DeviceService;
|
import lombok.extern.slf4j.Slf4j;
|
import me.chanjar.weixin.common.util.DataUtils;
|
import org.apache.commons.lang3.StringUtils;
|
import org.eclipse.paho.client.mqttv3.*;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
|
import java.util.ArrayList;
|
import java.util.Date;
|
import java.util.List;
|
|
/**
|
* @author T14
|
*/
|
@Service
|
@Slf4j
|
public class MqttPushCallback implements MqttCallback {
|
|
|
@Autowired
|
private DeviceMapper deviceMapper;
|
@Autowired
|
private DeviceDataMapper deviceDataMapper;
|
@Override
|
public void connectionLost(Throwable cause) {
|
// 连接丢失后,一般在这里面进行重连
|
log.error("连接断开,重连中");
|
MqttClientInit.refreshClient();
|
}
|
|
@Override
|
public void deliveryComplete(IMqttDeliveryToken token) {
|
log.error("deliveryComplete---------" + token.isComplete());
|
}
|
@Override
|
public void messageArrived(String topic, MqttMessage message) throws Exception {
|
// subscribe后得到的消息会执行到这里面
|
try {
|
Date date = new Date();
|
log.error("接收消息主题 : " + DateUtil.getPlusTime2(date) + topic);
|
log.error("接收消息Qos : " + DateUtil.getPlusTime2(date) + message.getQos());
|
log.error("接收消息内容 : " + DateUtil.getPlusTime2(date) + new String(message.getPayload()));
|
// ------------TODO-----------监听数据
|
//-----------------/dev/MTS/98CC4D121E5A/status/json
|
if(StringUtils.contains(topic,"/status/json")){
|
dealDeviceDataT30A(topic,message);
|
}
|
|
}catch (Exception e){
|
e.printStackTrace();
|
}
|
}
|
|
private void dealDeviceDataT30A(String topic, MqttMessage message) {
|
try {
|
Date date = new Date();
|
//如果是空开上报数据
|
String t[] = topic.replace("/dev/","").replace("/status/json","").split("/");
|
String no =t[0];
|
String doorNo = t[1];
|
Device device = deviceMapper.selectOne(new QueryWrapper<Device>().lambda()
|
.eq(Device::getNo,no)
|
.eq(Device::getIsdeleted,no)
|
.eq(Device::getDoorNo,doorNo));
|
if(device!=null && Constants.equalsInteger(device.getIsUsed(),Constants.ZERO)){
|
//如果设备存在并且设备在用,记录上报数据
|
List<DeviceData> dataList = new ArrayList<>();
|
JSONObject jsob = JSONObject.parseObject(new String(message.getPayload()));
|
if(jsob.getJSONArray("data")!=null && jsob.getJSONArray("data").size()>0){
|
for (int i = 0; i < jsob.getJSONArray("data").size(); i++) {
|
JSONObject jsonObject = jsob.getJSONArray("data").getJSONObject(i);
|
DeviceData data = new DeviceData();
|
data.setDataJson(jsob.toJSONString());
|
data.setCreateDate(date);
|
data.setEditDate(date);
|
data.setVal2("0");//电流值
|
data.setVal3("0");
|
data.setVal4("0");
|
data.setVal5("0");
|
data.setVal6("0");
|
data.setHappenTime(DateUtil.getPlusTime2(new Date(jsob.getLong("timeMS"))));
|
//模块序号
|
if(jsonObject.getJSONObject("mcbComSta")!=null
|
&& jsonObject.getJSONObject("mcbComSta").getString("busAddr")!=null){
|
data.setVal1(jsonObject.getJSONObject("mcbComSta").getString("busAddr"));
|
//<0>表示分闸,<1>表示合闸,无符号
|
data.setVal7(jsonObject.getJSONObject("mcbComSta").getString("on"));
|
}
|
if(jsonObject.getJSONObject("mcbSta")!=null){
|
//电流
|
if(jsonObject.getJSONObject("mcbSta").getString("cur")!=null){
|
data.setVal2(jsonObject.getJSONObject("mcbSta").getString("cur"));
|
}
|
//电压
|
if(jsonObject.getJSONObject("mcbSta").getString("vol")!=null){
|
data.setVal3(jsonObject.getJSONObject("mcbSta").getString("vol"));
|
}
|
//温度
|
if(jsonObject.getJSONObject("mcbSta").getString("tmp")!=null){
|
data.setVal4(jsonObject.getJSONObject("mcbSta").getString("tmp"));
|
}
|
//有功功率值,单位kW
|
if(jsonObject.getJSONObject("mcbSta").getString("pwrP")!=null){
|
data.setVal5(jsonObject.getJSONObject("mcbSta").getString("pwrP"));
|
}
|
//N相电流值,单位A
|
if(jsonObject.getJSONObject("mcbSta").getString("curN")!=null){
|
data.setVal6(jsonObject.getJSONObject("mcbSta").getString("curN"));
|
}
|
}
|
data.setDeviceId(device.getId().toString());
|
data.setDataType(Constants.ZERO);
|
if(StringUtils.isNotBlank(data.getVal1())){
|
dataList.add(data );
|
}
|
}
|
}
|
if(dataList.size()>0){
|
deviceDataMapper.insert(dataList);
|
}
|
}
|
}catch (Exception e){
|
log.error("===============topic数据记录失败:"+topic+e.getMessage());
|
}
|
}
|
}
|