package com.doumee.jtt808.web.config; 
 | 
  
 | 
import io.github.yezhihao.netmc.session.Session; 
 | 
import io.github.yezhihao.protostar.SchemaManager; 
 | 
import io.netty.buffer.ByteBuf; 
 | 
import io.netty.buffer.ByteBufUtil; 
 | 
import org.slf4j.Logger; 
 | 
import org.slf4j.LoggerFactory; 
 | 
import org.springframework.http.codec.ServerSentEvent; 
 | 
import org.yzh.protocol.basics.JTMessage; 
 | 
import org.yzh.protocol.codec.JTMessageAdapter; 
 | 
import org.yzh.protocol.codec.JTMessageDecoder; 
 | 
import org.yzh.protocol.codec.JTMessageEncoder; 
 | 
import org.yzh.protocol.commons.JT808; 
 | 
import reactor.core.publisher.FluxSink; 
 | 
  
 | 
import java.util.HashMap; 
 | 
import java.util.HashSet; 
 | 
import java.util.Set; 
 | 
  
 | 
public class WebLogAdapter extends JTMessageAdapter { 
 | 
  
 | 
    protected static final Logger log = LoggerFactory.getLogger(WebLogAdapter.class); 
 | 
    public static final HashMap<String, Set<FluxSink<Object>>> clientIds = new HashMap<>(); 
 | 
    public static final HashSet<Integer> ignoreMsgs = new HashSet<>(); 
 | 
  
 | 
    static { 
 | 
        ignoreMsgs.add(JT808.定位数据批量上传); 
 | 
    } 
 | 
    public WebLogAdapter(SchemaManager schemaManager) { 
 | 
        super(schemaManager); 
 | 
    } 
 | 
  
 | 
    public WebLogAdapter(JTMessageEncoder messageEncoder, JTMessageDecoder messageDecoder) { 
 | 
        super(messageEncoder, messageDecoder); 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public void encodeLog(Session session, JTMessage message, ByteBuf output) { 
 | 
        Set<FluxSink<Object>> emitters = clientIds.get(message.getClientId()); 
 | 
        if (emitters != null) { 
 | 
            ServerSentEvent<Object> event = ServerSentEvent.builder().event(message.getClientId()) 
 | 
                    .data(message + "hex:" + ByteBufUtil.hexDump(output, 0, output.writerIndex())).build(); 
 | 
            for (FluxSink<Object> emitter : emitters) { 
 | 
                emitter.next(event); 
 | 
            } 
 | 
        } 
 | 
        if ((!ignoreMsgs.contains(message.getMessageId())) && (emitters != null || clientIds.isEmpty())) 
 | 
            super.encodeLog(session, message, output); 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public void decodeLog(Session session, JTMessage message, ByteBuf input) { 
 | 
        if (message != null) { 
 | 
            Set<FluxSink<Object>> emitters = clientIds.get(message.getClientId()); 
 | 
            if (emitters != null) { 
 | 
                ServerSentEvent<Object> event = ServerSentEvent.builder().event(message.getClientId()) 
 | 
                        .data(message + "hex:" + ByteBufUtil.hexDump(input, 0, input.writerIndex())).build(); 
 | 
                for (FluxSink<Object> emitter : emitters) { 
 | 
                    emitter.next(event); 
 | 
                } 
 | 
            } 
 | 
            if (!ignoreMsgs.contains(message.getMessageId()) && (emitters != null || clientIds.isEmpty())) 
 | 
                super.decodeLog(session, message, input); 
 | 
  
 | 
            if (!message.isVerified()) 
 | 
                log.error("<<<<<校验码错误session={},payload={}", session, ByteBufUtil.hexDump(input, 0, input.writerIndex())); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    public static void clearMessage() { 
 | 
        synchronized (ignoreMsgs) { 
 | 
            ignoreMsgs.clear(); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    public static void addMessage(int messageId) { 
 | 
        if (!ignoreMsgs.contains(messageId)) { 
 | 
            synchronized (ignoreMsgs) { 
 | 
                ignoreMsgs.add(messageId); 
 | 
            } 
 | 
        } 
 | 
    } 
 | 
  
 | 
    public static void removeMessage(int messageId) { 
 | 
        if (ignoreMsgs.contains(messageId)) { 
 | 
            synchronized (ignoreMsgs) { 
 | 
                ignoreMsgs.remove(messageId); 
 | 
            } 
 | 
        } 
 | 
    } 
 | 
  
 | 
    public static void clearClient() { 
 | 
        synchronized (clientIds) { 
 | 
            clientIds.clear(); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    public static void addClient(String clientId, FluxSink<Object> emitter) { 
 | 
        synchronized (clientIds) { 
 | 
            clientIds.computeIfAbsent(clientId, k -> new HashSet<>()).add(emitter); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    public static void removeClient(String clientId, FluxSink<Object> emitter) { 
 | 
        synchronized (clientIds) { 
 | 
            Set<FluxSink<Object>> emitters = clientIds.get(clientId); 
 | 
            if (emitters != null) { 
 | 
                emitters.remove(emitter); 
 | 
                if (emitters.isEmpty()) { 
 | 
                    clientIds.remove(clientId); 
 | 
                } 
 | 
            } 
 | 
        } 
 | 
    } 
 | 
} 
 |