package com.doumee.jtt808.web.config; import io.github.yezhihao.netmc.session.Session; import io.github.yezhihao.protostar.SchemaManager; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.codec.ServerSentEvent; import org.yzh.protocol.basics.JTMessage; import org.yzh.protocol.codec.JTMessageAdapter; import org.yzh.protocol.codec.JTMessageDecoder; import org.yzh.protocol.codec.JTMessageEncoder; import org.yzh.protocol.commons.JT808; import reactor.core.publisher.FluxSink; import java.util.HashMap; import java.util.HashSet; import java.util.Set; public class WebLogAdapter extends JTMessageAdapter { protected static final Logger log = LoggerFactory.getLogger(WebLogAdapter.class); public static final HashMap>> clientIds = new HashMap<>(); public static final HashSet ignoreMsgs = new HashSet<>(); static { ignoreMsgs.add(JT808.定位数据批量上传); } public WebLogAdapter(SchemaManager schemaManager) { super(schemaManager); } public WebLogAdapter(JTMessageEncoder messageEncoder, JTMessageDecoder messageDecoder) { super(messageEncoder, messageDecoder); } @Override public void encodeLog(Session session, JTMessage message, ByteBuf output) { Set> emitters = clientIds.get(message.getClientId()); if (emitters != null) { ServerSentEvent event = ServerSentEvent.builder().event(message.getClientId()) .data(message + "hex:" + ByteBufUtil.hexDump(output, 0, output.writerIndex())).build(); for (FluxSink emitter : emitters) { emitter.next(event); } } if ((!ignoreMsgs.contains(message.getMessageId())) && (emitters != null || clientIds.isEmpty())) super.encodeLog(session, message, output); } @Override public void decodeLog(Session session, JTMessage message, ByteBuf input) { if (message != null) { Set> emitters = clientIds.get(message.getClientId()); if (emitters != null) { ServerSentEvent event = ServerSentEvent.builder().event(message.getClientId()) .data(message + "hex:" + ByteBufUtil.hexDump(input, 0, input.writerIndex())).build(); for (FluxSink emitter : emitters) { emitter.next(event); } } if (!ignoreMsgs.contains(message.getMessageId()) && (emitters != null || clientIds.isEmpty())) super.decodeLog(session, message, input); if (!message.isVerified()) log.error("<<<<<校验码错误session={},payload={}", session, ByteBufUtil.hexDump(input, 0, input.writerIndex())); } } public static void clearMessage() { synchronized (ignoreMsgs) { ignoreMsgs.clear(); } } public static void addMessage(int messageId) { if (!ignoreMsgs.contains(messageId)) { synchronized (ignoreMsgs) { ignoreMsgs.add(messageId); } } } public static void removeMessage(int messageId) { if (ignoreMsgs.contains(messageId)) { synchronized (ignoreMsgs) { ignoreMsgs.remove(messageId); } } } public static void clearClient() { synchronized (clientIds) { clientIds.clear(); } } public static void addClient(String clientId, FluxSink emitter) { synchronized (clientIds) { clientIds.computeIfAbsent(clientId, k -> new HashSet<>()).add(emitter); } } public static void removeClient(String clientId, FluxSink emitter) { synchronized (clientIds) { Set> emitters = clientIds.get(clientId); if (emitters != null) { emitters.remove(emitter); if (emitters.isEmpty()) { clientIds.remove(clientId); } } } } }