package org.yzh.protocol.codec; 
 | 
  
 | 
import io.github.yezhihao.protostar.SchemaManager; 
 | 
import io.netty.buffer.ByteBuf; 
 | 
import org.slf4j.Logger; 
 | 
import org.slf4j.LoggerFactory; 
 | 
import org.yzh.protocol.basics.JTMessage; 
 | 
  
 | 
import java.util.Map; 
 | 
import java.util.WeakHashMap; 
 | 
import java.util.concurrent.ConcurrentHashMap; 
 | 
  
 | 
  
 | 
/** 
 | 
 * 分包消息管理 
 | 
 * @author yezhihao 
 | 
 * https://gitee.com/yezhihao/jt808-server 
 | 
 */ 
 | 
public class MultiPacketDecoder extends JTMessageDecoder { 
 | 
  
 | 
    private static final Logger log = LoggerFactory.getLogger(MultiPacketDecoder.class); 
 | 
  
 | 
    private final Map<String, MultiPacket> multiPacketsMap; 
 | 
  
 | 
    private final MultiPacketListener multiPacketListener; 
 | 
  
 | 
    public MultiPacketDecoder(String... basePackages) { 
 | 
        this(new SchemaManager(basePackages)); 
 | 
    } 
 | 
  
 | 
    public MultiPacketDecoder(SchemaManager schemaManager) { 
 | 
        this(schemaManager, null); 
 | 
    } 
 | 
  
 | 
    public MultiPacketDecoder(SchemaManager schemaManager, MultiPacketListener multiPacketListener) { 
 | 
        super(schemaManager); 
 | 
        if (multiPacketListener == null) { 
 | 
            this.multiPacketsMap = new WeakHashMap<>(); 
 | 
            this.multiPacketListener = null; 
 | 
        } else { 
 | 
            this.multiPacketsMap = new ConcurrentHashMap<>(); 
 | 
            this.multiPacketListener = multiPacketListener; 
 | 
            startListener(); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    protected ByteBuf[] addAndGet(JTMessage message, ByteBuf packetData) { 
 | 
        String clientId = message.getClientId(); 
 | 
        int messageId = message.getMessageId(); 
 | 
        int packageTotal = message.getPackageTotal(); 
 | 
        int packetNo = message.getPackageNo(); 
 | 
  
 | 
        String key = new StringBuilder(21).append(clientId).append('/').append(messageId).append('/').append(packageTotal).toString(); 
 | 
  
 | 
        MultiPacket multiPacket = multiPacketsMap.get(key); 
 | 
        if (multiPacket == null) 
 | 
            multiPacketsMap.put(key, multiPacket = new MultiPacket(message)); 
 | 
        if (packetNo == 1) 
 | 
            multiPacket.setSerialNo(message.getSerialNo()); 
 | 
  
 | 
  
 | 
        ByteBuf[] packages = multiPacket.addAndGet(packetNo, packetData); 
 | 
        log.debug("<<<<<分包消息{}", multiPacket); 
 | 
        if (packages == null) 
 | 
            return null; 
 | 
        multiPacketsMap.remove(key); 
 | 
        return packages; 
 | 
    } 
 | 
  
 | 
    private void startListener() { 
 | 
        Thread thread = new Thread(() -> { 
 | 
            long timeout = multiPacketListener.timeout; 
 | 
            for (; ; ) { 
 | 
                long nextDelay = timeout; 
 | 
                long now = System.currentTimeMillis(); 
 | 
  
 | 
                for (Map.Entry<String, MultiPacket> entry : multiPacketsMap.entrySet()) { 
 | 
                    MultiPacket packet = entry.getValue(); 
 | 
  
 | 
                    long time = timeout - (now - packet.getLastAccessedTime()); 
 | 
                    if (time <= 0) { 
 | 
                        if (!multiPacketListener.receiveTimeout(packet)) { 
 | 
                            log.warn("<<<<<分包接收超时{}", packet); 
 | 
                            multiPacketsMap.remove(entry.getKey()); 
 | 
                            packet.release(); 
 | 
                        } 
 | 
                    } else { 
 | 
                        nextDelay = Math.min(time, nextDelay); 
 | 
                    } 
 | 
                } 
 | 
                try { 
 | 
                    Thread.sleep(nextDelay); 
 | 
                } catch (InterruptedException e) { 
 | 
                    log.error("MultiPacketListener", e); 
 | 
                } 
 | 
            } 
 | 
        }); 
 | 
        thread.setName("MultiPacketListener"); 
 | 
        thread.setPriority(Thread.MIN_PRIORITY); 
 | 
        thread.setDaemon(true); 
 | 
        thread.start(); 
 | 
    } 
 | 
} 
 |