package com.doumee.jtt808.web.controller; import io.github.yezhihao.netmc.session.Session; import io.github.yezhihao.netmc.session.SessionManager; import io.github.yezhihao.netmc.util.AdapterCollection; import io.github.yezhihao.protostar.util.Explain; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.swagger.annotations.Api; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.*; import org.yzh.commons.model.APIResult; import org.yzh.protocol.codec.JTMessageDecoder; import com.doumee.jtt808.web.config.WebLogAdapter; import com.doumee.jtt808.web.model.entity.DeviceDO; import com.doumee.jtt808.web.model.enums.SessionKey; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import javax.servlet.http.HttpSession; import java.time.Duration; import java.util.Collection; import java.util.HashSet; import java.util.Set; @RestController @RequestMapping @Api(tags = "其他通信接口") public class OtherController { @Autowired private SessionManager sessionManager; @Autowired private JTMessageDecoder decoder; @Operation(summary = "终端实时信息查询") @GetMapping("device/all") public APIResult> all() { Collection all = sessionManager.all(); return APIResult.ok(all); } @Operation(summary = "获得当前所有在线设备信息") @GetMapping("device/option") public APIResult> getClientId(HttpSession httpSession) { AdapterCollection result = new AdapterCollection<>(sessionManager.all(), session -> { DeviceDO device = SessionKey.getDevice(session); if (device != null) return device; return new DeviceDO().mobileNo(session.getClientId()); }); return APIResult.ok(result); } @Operation(summary = "设备订阅") @PostMapping(value = "device/sse", produces = MediaType.TEXT_PLAIN_VALUE) public String sseSub(HttpSession httpSession, @RequestParam String clientId, @RequestParam boolean sub) { FluxSink emitter = (FluxSink) httpSession.getAttribute("emitter"); if (emitter == null) { return "0"; } if (sub) { WebLogAdapter.addClient(clientId, emitter); ((Set) httpSession.getAttribute("clientIds")).add(clientId); } else { WebLogAdapter.removeClient(clientId, emitter); ((Set) httpSession.getAttribute("clientIds")).remove(clientId); } return "1"; } @Operation(summary = "设备监控") @GetMapping(value = "device/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux sseConnect(HttpSession httpSession, String clientId) { return Flux.create(emitter -> { Set clientIds = new HashSet<>(); if (clientId != null) { WebLogAdapter.addClient(clientId, emitter); clientIds.add(clientId); } httpSession.setAttribute("clientIds", clientIds); httpSession.setAttribute("emitter", emitter); emitter.onDispose(() -> clientIds.forEach(id -> WebLogAdapter.removeClient(id, emitter))); }); } @Operation(summary = "808协议分析工具") @RequestMapping(value = "message/explain", method = {RequestMethod.POST, RequestMethod.GET}) public String decode(@Parameter(description = "16进制报文") @RequestParam String hex) { Explain explain = new Explain(); hex = hex.replace(" ", ""); String[] lines = hex.split("\n"); for (String line : lines) { String[] msgs = line.split("7e7e"); for (String msg : msgs) { ByteBuf byteBuf = Unpooled.wrappedBuffer(ByteBufUtil.decodeHexDump(msg)); decoder.decode(byteBuf, explain); } } return explain.toString(); } @Operation(summary = "原始消息发送") @PostMapping("device/raw") public Mono postRaw(@Parameter(description = "终端手机号") @RequestParam String clientId, @Parameter(description = "16进制报文") @RequestParam String message) { Session session = sessionManager.get(clientId); if (session != null) { ByteBuf byteBuf = Unpooled.wrappedBuffer(ByteBufUtil.decodeHexDump(message)); return session.notify(byteBuf).map(unused -> "success") .timeout(Duration.ofSeconds(10), Mono.just("timeout")) .onErrorResume(throwable -> Mono.just("fail")); } return Mono.just("offline"); } }