111
k94314517
2025-03-04 bfd4f08b4304cac2822db86de3712e1c6b37f6ab
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package com.doumee.jtt808.web.controller;
 
import io.github.yezhihao.netmc.session.Session;
import io.github.yezhihao.netmc.session.SessionManager;
import io.github.yezhihao.netmc.util.AdapterCollection;
import io.github.yezhihao.protostar.util.Explain;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.swagger.annotations.Api;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.yzh.commons.model.APIResult;
import org.yzh.protocol.codec.JTMessageDecoder;
import com.doumee.jtt808.web.config.WebLogAdapter;
import com.doumee.jtt808.web.model.entity.DeviceDO;
import com.doumee.jtt808.web.model.enums.SessionKey;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
 
import javax.servlet.http.HttpSession;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
 
@RestController
@RequestMapping
@Api(tags = "其他通信接口")
public class OtherController {
 
    @Autowired
    private SessionManager sessionManager;
    @Autowired
    private JTMessageDecoder decoder;
 
    @Operation(summary = "终端实时信息查询")
    @GetMapping("device/all")
    public APIResult<Collection<Session>> all() {
        Collection<Session> all = sessionManager.all();
        return APIResult.ok(all);
    }
 
    @Operation(summary = "获得当前所有在线设备信息")
    @GetMapping("device/option")
    public APIResult<Collection<DeviceDO>> getClientId(HttpSession httpSession) {
        AdapterCollection<Session, DeviceDO> result = new AdapterCollection<>(sessionManager.all(), session -> {
            DeviceDO device = SessionKey.getDevice(session);
            if (device != null)
                return device;
            return new DeviceDO().mobileNo(session.getClientId());
        });
        return APIResult.ok(result);
    }
 
    @Operation(summary = "设备订阅")
    @PostMapping(value = "device/sse", produces = MediaType.TEXT_PLAIN_VALUE)
    public String sseSub(HttpSession httpSession, @RequestParam String clientId, @RequestParam boolean sub) {
        FluxSink<Object> emitter = (FluxSink<Object>) httpSession.getAttribute("emitter");
        if (emitter == null) {
            return "0";
        }
        if (sub) {
            WebLogAdapter.addClient(clientId, emitter);
            ((Set<String>) httpSession.getAttribute("clientIds")).add(clientId);
        } else {
            WebLogAdapter.removeClient(clientId, emitter);
            ((Set<String>) httpSession.getAttribute("clientIds")).remove(clientId);
        }
        return "1";
    }
 
    @Operation(summary = "设备监控")
    @GetMapping(value = "device/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Object> sseConnect(HttpSession httpSession, String clientId) {
        return Flux.create(emitter -> {
            Set<String> clientIds = new HashSet<>();
            if (clientId != null) {
                WebLogAdapter.addClient(clientId, emitter);
                clientIds.add(clientId);
            }
            httpSession.setAttribute("clientIds", clientIds);
            httpSession.setAttribute("emitter", emitter);
            emitter.onDispose(() -> clientIds.forEach(id -> WebLogAdapter.removeClient(id, emitter)));
        });
    }
 
    @Operation(summary = "808协议分析工具")
    @RequestMapping(value = "message/explain", method = {RequestMethod.POST, RequestMethod.GET})
    public String decode(@Parameter(description = "16进制报文") @RequestParam String hex) {
        Explain explain = new Explain();
        hex = hex.replace(" ", "");
        String[] lines = hex.split("\n");
        for (String line : lines) {
            String[] msgs = line.split("7e7e");
            for (String msg : msgs) {
                ByteBuf byteBuf = Unpooled.wrappedBuffer(ByteBufUtil.decodeHexDump(msg));
                decoder.decode(byteBuf, explain);
            }
        }
        return explain.toString();
    }
 
    @Operation(summary = "原始消息发送")
    @PostMapping("device/raw")
    public Mono<String> postRaw(@Parameter(description = "终端手机号") @RequestParam String clientId,
                                @Parameter(description = "16进制报文") @RequestParam String message) {
        Session session = sessionManager.get(clientId);
        if (session != null) {
            ByteBuf byteBuf = Unpooled.wrappedBuffer(ByteBufUtil.decodeHexDump(message));
 
            return session.notify(byteBuf).map(unused -> "success")
                    .timeout(Duration.ofSeconds(10), Mono.just("timeout"))
                    .onErrorResume(throwable -> Mono.just("fail"));
        }
        return Mono.just("offline");
    }
 
 
}