doum
7 天以前 308bc8e5c0db7298a127f3478995ddc6167328e9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package com.doumee.service.business.impl.mqtt;
 
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.doumee.core.utils.Constants;
import com.doumee.core.utils.DateUtil;
import com.doumee.dao.business.DeviceDataMapper;
import com.doumee.dao.business.DeviceMapper;
import com.doumee.dao.business.model.Device;
import com.doumee.dao.business.model.DeviceData;
import com.doumee.mqtt.config.MqttClientInit;
import com.doumee.mqtt.config.MqttConfig;
import com.doumee.mqtt.service.MqttBizService;
import com.doumee.service.business.DeviceService;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.common.util.DataUtils;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
 
/**
 * @author T14
 */
@Service
@Slf4j
public class MqttPushCallback implements MqttCallback {
 
 
        @Autowired
        private DeviceMapper deviceMapper;
        @Autowired
        private DeviceDataMapper deviceDataMapper;
        @Override
        public void connectionLost(Throwable cause) {
            // 连接丢失后,一般在这里面进行重连
           log.error("连接断开,重连中");
            MqttClientInit.refreshClient();
        }
 
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
           log.error("deliveryComplete---------" + token.isComplete());
        }
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            // subscribe后得到的消息会执行到这里面
            try {
                Date date = new Date();
               log.error("接收消息主题 : " + DateUtil.getPlusTime2(date) + topic);
               log.error("接收消息Qos : " + DateUtil.getPlusTime2(date) +  message.getQos());
               log.error("接收消息内容 : " +  DateUtil.getPlusTime2(date) + new String(message.getPayload()));
                //  ------------TODO-----------监听数据
                //-----------------/dev/MTS/98CC4D121E5A/status/json
                if(StringUtils.contains(topic,"/status/json")){
                    dealDeviceDataT30A(topic,message);
                }
 
            }catch (Exception e){
                e.printStackTrace();
            }
        }
 
    private void dealDeviceDataT30A(String topic, MqttMessage message) {
        try {
            Date date = new Date();
            //如果是空开上报数据
            String t[] = topic.replace("/dev/","").replace("/status/json","").split("/");
            String no =t[0];
            String doorNo = t[1];
            Device device = deviceMapper.selectOne(new QueryWrapper<Device>().lambda()
                    .eq(Device::getNo,no)
                    .eq(Device::getIsdeleted,no)
                    .eq(Device::getDoorNo,doorNo));
            if(device!=null && Constants.equalsInteger(device.getIsUsed(),Constants.ZERO)){
                //如果设备存在并且设备在用,记录上报数据
                List<DeviceData> dataList = new ArrayList<>();
                JSONObject jsob = JSONObject.parseObject(new String(message.getPayload()));
                if(jsob.getJSONArray("data")!=null && jsob.getJSONArray("data").size()>0){
                    for (int i = 0; i <  jsob.getJSONArray("data").size(); i++) {
                        JSONObject jsonObject = jsob.getJSONArray("data").getJSONObject(i);
                        DeviceData data = new DeviceData();
                        data.setDataJson(jsob.toJSONString());
                        data.setCreateDate(date);
                        data.setEditDate(date);
                        data.setVal2("0");//电流值
                        data.setVal3("0");
                        data.setVal4("0");
                        data.setVal5("0");
                        data.setVal6("0");
                        data.setHappenTime(DateUtil.getPlusTime2(new Date(jsob.getLong("timeMS"))));
                        //模块序号
                        if(jsonObject.getJSONObject("mcbComSta")!=null
                                && jsonObject.getJSONObject("mcbComSta").getString("busAddr")!=null){
                            data.setVal1(jsonObject.getJSONObject("mcbComSta").getString("busAddr"));
                            //<0>表示分闸,<1>表示合闸,无符号
                            data.setVal7(jsonObject.getJSONObject("mcbComSta").getString("on"));
                        }
                        if(jsonObject.getJSONObject("mcbSta")!=null){
                            //电流
                            if(jsonObject.getJSONObject("mcbSta").getString("cur")!=null){
                                data.setVal2(jsonObject.getJSONObject("mcbSta").getString("cur"));
                            }
                            //电压
                            if(jsonObject.getJSONObject("mcbSta").getString("vol")!=null){
                                data.setVal3(jsonObject.getJSONObject("mcbSta").getString("vol"));
                            }
                            //温度
                            if(jsonObject.getJSONObject("mcbSta").getString("tmp")!=null){
                                data.setVal4(jsonObject.getJSONObject("mcbSta").getString("tmp"));
                            }
                            //有功功率值,单位kW
                            if(jsonObject.getJSONObject("mcbSta").getString("pwrP")!=null){
                                data.setVal5(jsonObject.getJSONObject("mcbSta").getString("pwrP"));
                            }
                            //N相电流值,单位A
                            if(jsonObject.getJSONObject("mcbSta").getString("curN")!=null){
                                data.setVal6(jsonObject.getJSONObject("mcbSta").getString("curN"));
                            }
                        }
                        data.setDeviceId(device.getId().toString());
                        data.setDataType(Constants.ZERO);
                        if(StringUtils.isNotBlank(data.getVal1())){
                            dataList.add(data );
                        }
                    }
                }
                if(dataList.size()>0){
                    deviceDataMapper.insert(dataList);
                }
            }
        }catch (Exception e){
            log.error("===============topic数据记录失败:"+topic+e.getMessage());
        }
    }
}