package com.doumee.jtt808.web.config;
|
|
import io.github.yezhihao.netmc.session.Session;
|
import io.github.yezhihao.protostar.SchemaManager;
|
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBufUtil;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.http.codec.ServerSentEvent;
|
import org.yzh.protocol.basics.JTMessage;
|
import org.yzh.protocol.codec.JTMessageAdapter;
|
import org.yzh.protocol.codec.JTMessageDecoder;
|
import org.yzh.protocol.codec.JTMessageEncoder;
|
import org.yzh.protocol.commons.JT808;
|
import reactor.core.publisher.FluxSink;
|
|
import java.util.HashMap;
|
import java.util.HashSet;
|
import java.util.Set;
|
|
public class WebLogAdapter extends JTMessageAdapter {
|
|
protected static final Logger log = LoggerFactory.getLogger(WebLogAdapter.class);
|
public static final HashMap<String, Set<FluxSink<Object>>> clientIds = new HashMap<>();
|
public static final HashSet<Integer> ignoreMsgs = new HashSet<>();
|
|
static {
|
ignoreMsgs.add(JT808.定位数据批量上传);
|
}
|
public WebLogAdapter(SchemaManager schemaManager) {
|
super(schemaManager);
|
}
|
|
public WebLogAdapter(JTMessageEncoder messageEncoder, JTMessageDecoder messageDecoder) {
|
super(messageEncoder, messageDecoder);
|
}
|
|
@Override
|
public void encodeLog(Session session, JTMessage message, ByteBuf output) {
|
Set<FluxSink<Object>> emitters = clientIds.get(message.getClientId());
|
if (emitters != null) {
|
ServerSentEvent<Object> event = ServerSentEvent.builder().event(message.getClientId())
|
.data(message + "hex:" + ByteBufUtil.hexDump(output, 0, output.writerIndex())).build();
|
for (FluxSink<Object> emitter : emitters) {
|
emitter.next(event);
|
}
|
}
|
if ((!ignoreMsgs.contains(message.getMessageId())) && (emitters != null || clientIds.isEmpty()))
|
super.encodeLog(session, message, output);
|
}
|
|
@Override
|
public void decodeLog(Session session, JTMessage message, ByteBuf input) {
|
if (message != null) {
|
Set<FluxSink<Object>> emitters = clientIds.get(message.getClientId());
|
if (emitters != null) {
|
ServerSentEvent<Object> event = ServerSentEvent.builder().event(message.getClientId())
|
.data(message + "hex:" + ByteBufUtil.hexDump(input, 0, input.writerIndex())).build();
|
for (FluxSink<Object> emitter : emitters) {
|
emitter.next(event);
|
}
|
}
|
if (!ignoreMsgs.contains(message.getMessageId()) && (emitters != null || clientIds.isEmpty()))
|
super.decodeLog(session, message, input);
|
|
if (!message.isVerified())
|
log.error("<<<<<校验码错误session={},payload={}", session, ByteBufUtil.hexDump(input, 0, input.writerIndex()));
|
}
|
}
|
|
public static void clearMessage() {
|
synchronized (ignoreMsgs) {
|
ignoreMsgs.clear();
|
}
|
}
|
|
public static void addMessage(int messageId) {
|
if (!ignoreMsgs.contains(messageId)) {
|
synchronized (ignoreMsgs) {
|
ignoreMsgs.add(messageId);
|
}
|
}
|
}
|
|
public static void removeMessage(int messageId) {
|
if (ignoreMsgs.contains(messageId)) {
|
synchronized (ignoreMsgs) {
|
ignoreMsgs.remove(messageId);
|
}
|
}
|
}
|
|
public static void clearClient() {
|
synchronized (clientIds) {
|
clientIds.clear();
|
}
|
}
|
|
public static void addClient(String clientId, FluxSink<Object> emitter) {
|
synchronized (clientIds) {
|
clientIds.computeIfAbsent(clientId, k -> new HashSet<>()).add(emitter);
|
}
|
}
|
|
public static void removeClient(String clientId, FluxSink<Object> emitter) {
|
synchronized (clientIds) {
|
Set<FluxSink<Object>> emitters = clientIds.get(clientId);
|
if (emitters != null) {
|
emitters.remove(emitter);
|
if (emitters.isEmpty()) {
|
clientIds.remove(clientId);
|
}
|
}
|
}
|
}
|
}
|