通讯组件学习

网络连接是游戏服务器的基础中的基础。此游戏服务器使用Jprotobuf事先编译好protobuf文件。通过注解标志各种协议包,接受客户端发送的协议时,自动将其转化为Wsesion对象,玩家角色登陆后再进一步转化为Player对象。

用于标记的枚举

模块声明

@Target(ElementTpye.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SocketClass {

}

模块声明

@Target(ElementTpye.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SocketMethod {
    // 自定义协议号,用于事件体系
    int coustomPacketId() default 0;

}

通讯包声明

@Target(ElementTpye.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Lazy
public @interface SocketMethod {
    // 协议号
    int packetId();
}

Socket服务器

Socket服务器Wserver在启动时加载注解标记的通讯包,

@Component
public class Wserver implements ApplicationContextAware {
    private static final Logger logger = LoggerFactory.getLogger(Wserver.class);

    @Autowired
    private CustomHandlerManager customHandlerManager;
    @Autowired
    private SessionHandler sessionHandler;
    @Autowired
    private FlowFirewall flowFirewall;
    @Autowired
    private IpFirewall ipFirewall;
    @Autowired
    private SocketPacketHandler socketPacketHandler;

    // 自定义的连接日志处理接口
    public static LoggingHandler logginHandler;

    private int[] ports;
    private ApplicationCintext applicationContex;
    private int maxlength;

    // 开启
    public void open() {
        ipFirewal.open();
    }

    // 网络端口是否打开
    public boolean ioOpen() {
        return ipFirewall.isOpened();
    }

    // 关闭
    public void block() {
        ipFirewall.block();
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException{
        this.applicationContext = applicationContext;
    }

    // 加载配置
    public void loadProperties(Resource rsource) {
        Properties prop = new Properties();
        prop.load(resource.getInputStream());
        this.prots = ConfigUtils.getPorts(prop, ServerConfigConstant.SERVER_PORT);

        String maxLengthProp = prop.getProperty(ServerConfigConstant.PACKET_MAXLEGHTH);
        if(maxLengthProp == null) {
            // 默认1m
            maxLength = 1024*1024;
        } else  {
            maxLength = INteger.valueOf(mxaLengthProp) * 1024;
        }
    }

    public void bind() throws InterruptedException, IOException{
        bind(new DummyFirewallManger());
    }

    // 绑定端口
    public void bind(FirewallManager firewallManger) thrwos InterruptedException, IOException{
        socketPacketHandler.setFirewallManager(firewallManager);
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        if(loggingHandler != null) {
            serverBootstrap.handler(loggingHandler);
        }
        serverBootstrap.group(bossGrop, workerGroup)
            .channel(NioSreverSocketchannel.class)
            .option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT)
            .option(ChannelOption.SO_BACKLOG,1024)
            .childOption(ChannelOption.TCP_NODELAY, true).
            .childOption(ChannelOption.TCP_NODELAY)
            .childOption(ChannelOption.SO_RCVBUF,1024*32)
            .childOption(ChannelOption.SO_SNDBUF,1024*32)
            .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,new WriteBufferWaterMark(0,128*1024))
            .childHandler(nwe ChannelInitializer<SocketChannel> -> {
                @Override
                public void initChannel(SocketChannel sc) {
                    ChannelPipeline pipeline = sc.pipeline();
                    pipeline.addLast("encoder", new WpacketEncoder());
                    pipeline.addLast("ipFirewall", ipFirewall);
                    pipeline.addLast("flowFirewal", flowFirewall);
                    pipeline.addLast("decoder", new WpacketDecoder(maxLength)).
                    pipeline.addLast("session", sessionHandler);
                    pipeline.addLast("socketPackHandler",new SocketChooserHandler());
                    for (ChannelHandlerAdapeter cha : customHandlerManager.getHandlers()) {
                        sc.pipeline().addLast(cha);
                    }
                }
            }) ;
        for(int port : ports) {
            ChannelFuture cf = serverBootstrap.bind(port);
            if (StringUitls.isNotEmpty(host)) {
                cf = serverBootstrap.bind(host, port);
            } else {
                cf = serverBootStrap.bind(port);
            }
            cf.sync();
            channelFutures.add(cf);
        }
    }

    class SocketChoserHandler extends ByteToMessageDecoder {
        private final int CHECK_LEN = 5;
        private final String WSOCKET_PREFIX = "GET";

        @Override
        protected void decode(ChannelHandlerContex ctx, ByteBuf in, List<Object> out) throws Ecception{
            String protocol = getProtocol(in);
            if(protocol == null) {
                return;
            }
            ChannelPipeline pipeline = ctx.pipeline();
            if(protocol.startWith(WSOCKET_PREFIX)) {
                addWebSocketHandler(pipeline);
            } else {
                addSocketHandler(pipeline);
            }
            pipeline.remove(SocketChooserHandler.class);

            WpacketDecoder wpacketDecoder = new WpacketDecoder(maxLength);
            pipeline.addLast("decoder", wpacketDecoder);
            ctx.channel().attr(WpacketDecoder.DECODER).set(wpacketDecoder);
            pipeline.addLast("socketPacketHandler", socketPacketHandler);
            for(ChanelHandlerAdpater cha : customHandlerManager.getHandlers()) {
                pipeline.addLast(cha);
            }
        }
    }
    private List<ChannelFuture> channelFutures = new ArrayList<>();

    private EventLoopGroup bossGroup;

    private EventLoopGroup workerGroup;

    public void shutdownGracefully() {
        try {
            for(ChannelFuture cf : channelFutures) {
                if(cf.channel() != null) {
                    try {

                    } catch(Exception w) {
                        logger.error("通讯server channel 关闭异常",e);
                    }
                }
            }
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    // get/set.....

}

用户自定义handler管理器

@Component

public final class CustomHandlerManager {
    private List<ChannelHandlerAdapter> handlers = new ArrayList<>();

    @PostConstruct
    public void init() {
        doInit();
    }

    private void doInit() {
        // TODO
    }

    public List<ChannelHanderAdapter> getHandeers() {
        return handlers;
    }

    public void setHandlers(ChannelHandlerAdapter handlers) {
        this.handlers = handlers;
    }
}

分发器

消息处理器handler

@Sharable
@Component
public class SocketPacketHandler extends ChannelInboundHandlerAdpter implements ApplicationContextAware,
    ApplicationContextAware  {
    private static final boolean OPEN_PROTOBUF_COMPILE = Boolean.valueOf(System.getProperty("openProtobufComplile", "false"));

    /**
    *   类class与packetId的快速映射
    */
    private Map<Class<?>, Integer> calssToPacketId = new ConcurrentHashMap<>();

    /**
    *   packetId与类class的快速映射
    */

    private Map<Integer, Codec> paccketIdToCodec = new ConcurrentHashMap<>();

    private Set<Integer> runInNIOThreadPacketIds = new HashSet<>();

    @Autowired
    private SessionManager sessionManager;

    @Autowierd
    private IEventBusManager  eventManager;

    private FirewallManager firewallManager;
    private ExceptionHandler exceptionHandler;
    private IPacketStatistics packetStatistics;

    public static SocketPacketHandler getInstance() {
        return self;
    }
    private static SocketPacketHandler self;

    @PostConstruct
    private void init() {
        SocketPacketHandler.self = this;
        if(OPEN_PROTOBUF_COMPILE) {
            System.err.println("open protobuf dynamic complie!");
        } else {
            ProtobufProxy.closeCompile();
            System.err.println("close protobuf dynamic complie!");
        }
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        String[] classNames = applicationContext.getBeanNamesForAnnotation(SocketPacket.class);
        for(String className : classNames) {
            Class<?> packetClass = applicationContex.getType(className);
            SocketPacket socketPacket = packetClass.getAnnotion(SocketPacket.class);
            if(packetIdToCodec.containkey(socketPacket.packetId())) {
                throw new RuntimeException(String.format("socketPacket.packetId() [%s] 重复使用!", socketPacket.packetId()));
            }
            Codec codec = ProtobufProxy.create(packetClass);
            packetIdToCodec.put(socketPacket.packetId(), codec);
            classToPacketId.put(packetClass, socketPacket.packetId());
            packetIdToClass.put(socketPacket.packetId(), packetClass);
        }
        stopWatch.stop();
        loogger.debug("load protoProxy {} use time {}s", classNames.length, TimeUnit.MILLSECONDS.toSeconds(stopWatch.getTime()));
        // 默认协议拦截器
        if(packetInterceptor == null) {
            packetIneterceptor = new IPacketInterceptor() {
                @Override
                public boolean isRealPacketMethod() {
                    return true;
                }
                @Override
                public boolran intercept(Wsession wsession, WrequestPacket packet) {
                    return false;
                }
                @Override
                public boolean intercept(Wsession wsession, WresponsePacket packet) {
                    return false;
                }
            };
        } else {
            packetInterceptor.afterInit(this);
        }
    }


    @Override
    public Object postProcessAfterInirialization() throws BeansException{
        Class<?> clazz = bean.getClass();
        if(clazz.isAnnotationPersent(SocektClass.class)) {
            for(Method method : clazz.getMethods()) {
                 SocektMethod methodAnnotation = method.getAnnotation(SocketMethod.class);
                 if(methodAnnotion == null) {
                     continue;
                 }
                 // 参数和返回值验证
                 Class<?>[] clzs = method.getParameterTypes();
                 if(clas.length != 2) {
                     throw new IllegalArgumentException(bean.getClass().getName() + 
                     "."  +method.getName() + "只能拥有两个参数");
                 }
                 // 接收参数验证
                 Calss<?> packetClass = method.getPatameterTypes()[1];
                 int packetId = methodAnnotation.consutomPacketId();
                 if(pcaketId == 0) {
                    SocketPacket socketPacekt = packetCalss.getAnnotation(SocketPacket.class);
                    if(socketPacket == null) {
                        // 该对象没有class注册
                        throw new IllegalArgumentException(String.format("class[%s] 没有包含SocketPacket注解!", packetClass));
                    }
                    packetId = socketPacket.packetId();
                 }
                 if(!"void".equals(method.getReturnType().getName())) {
                     if(method.getReturnType().getAnnotation(SocketPacket.class) == null) {
                        throw new IllegalArgumentException(
                            bean.getClass().getName() + "."+method.getName() + "返回值必须包含SocketPacket注解"
                        );
                     }
                 }
                 if(packetInterceptor != null && !packetInterceptor.isRacketMethod(packetId, method)) {
                     // 检测是否需要注册该协议处理方法
                     continue;
                 }
                 // 是否有其他执行对象已经注册了此消息
                 SocektHandkerDefinition shd = handlerDefinitions.get(packetId);
                 if(shd != null) {
                     throw new IllegalArgumentException(String.format("class[%s]和class[%s]重复使用,一个packetId只能用在一个方法上!", shd.getBean().getClass, packetClass, packetId));
                 }
                 boolean runInNioThread = method.getAnnotation(IRunInNioThread.class) != null;
                 handlerDefinitions.put(packetId, SocketHandlerDefinition.valueOf(bean, method, runInNioThread));
            }
           return bean;
        }
    }

    @Override
    public Object postProcessBeforeInitialization(Object bean, String name) throws BeansException{
        return bean;
    }

        public interface ExceptionHandler {
        // 协议处理异常处理
        void handlerPacketException(Wsession session, Throwable e);

        // 连接异常处理
        void handlerConException(ChannelHandlerContext ctx, Throwable cause);
    }

    private ExceptionHandler exceptionHandler;


    @Override
    public void channelRead(ChannelHandlerContext ctx, Obejct msg) throws Exception {
        // 将包转化为 WrequestPacket 类型 , 编码器已经编码好了
        final WrequestPacket packet = (WrequestPacket) msg;
        // 根据channel获得一个session
        final Wsession session = sessionManager.getSession(ctx.channel().id());

        if(firewallManager != null && !firewallManger.packerFilter(session, packet)) {
            logger.error(String.format("session[%s] packetId[%s]发送非法的信息,可能客户端没有登陆就放在发送信息!"));
            return;
        }
        if(packetInterceptor.intercept(session, packet)) {
            return;
        }
        if(packetStatistics != null) {
            packetStatistics.receive(packet);
        }
        final Object message;
        try {
            message = packetInterceptor.customDecodePacket(packet.getPacketId(), packet.getData());
        } catch(IOException e) {
            logger.error("decode error", e);
            return;
        }
        excMaessage(session, packet.getPacketId(), message);
    }

    /**
    *   解析消息包
    */
    public Object decodePacket(int packetId, byte[] data) {
        Code codec = SocketPacketHanndler.getInstance().getCodec(packetId);
        if(codec == null) {
            logger.error("not found codec with packetId {}", packetId);
            return null;
        }
        try {

        } catch(IOException e) {
            throw new RuntimeException(String.format("decode packet[%d] error", packetId), e);
        }
    }


    // 执行协议
    public void excMessage(Wsession session, int packetId, Obejct message) {
        if(message == null) {
            return;
        }
        final SocketHandlerDefinition shd = handlerDefinitions.get(packetId);
        if(shd == null) {
            logger.error(String.format("没有找到处理[%s]的SocketHandlerDefinition", packetId));
        }
        session.onReceivePacket(message);
        if(shd.isRunInNioThread()) {
            // 特殊业务,在NIO线程执行
            excMessage(session, shd, message);
            return;
        }
        IdentityEventExcutorGrop.addTask(new AbstractDispatcherCodeRUnnable() {
            @Override
            public void doRun() {
                excMessage(session, shd, message);
            }

            @Override
            public long timeoutNanoTime() {
                // 3毫秒
                return 3 * 1000 * 1000;
            }

            @Override
            public String name() {
                return "wSocket_" + packetId;
            }
            @Override
            public int gerDispatcherHashCode() {
                return session.selectDispatcherHashCode();
            }
        });
    }

    public vooid excMessage(Wession session, SocketHandlerDefinition shd, Object message) {
        try {
            Object returnMessage = shd.invoke(session, message);
            if(returnMessage == null) {
                return;
            }
            session.sendPacket(returnMessage);
        } catch(Exception e) {
            if(exceptionHandler != null) {
                exceptionHandler.handlerPacketException(sessoin, e);
            } else {
                logger.error("SocketHandlerDefinition任务执行失败", e);
            }
        }
    }

    private IEventCallback buildEnvetCallback(Wession session) {
        return new IEventCallback() {
            @Override
            public void callback(Object returnMsg) {
                if(returnMsg == null) {
                    return;
                }
                session.sendPacket(returnMsg);
            }

            @Override
            public void exception(Throwable throwable) {
                if(exceptionHandelr != null) {
                    exceptionHandler.handlePacketException(session, throwable);
                } else {
                    logger.error("SocketHandlerDefinition任务执行失败!",throwable);
                }
            }
        };
    }

    // 所有的业务消息必须走这里过去
    public ChannelFuture sendPacket(Wsession session, Channel channel, Object message, boolean flush) {
        try {
            WresponsePacket wp = encodePacket(message);
            if(packetStatistics != null) {
                packetStatistcs.send(wp);
            }
            if(flush) {
                return channel.writeAndFlush(wp);
            }
            ChannelFuture future = channel.write(wp);
            if(session.getFlushTimer().compareAndSet(false, true)) {
                delayFlushTimer.newTimeout(timeout -> {
                    session.getFlushTimer().compareAndSet(true, false);
                    channel.flush();
                    }, 100, TimeUnit.MILLISECONDS);
            }
            return future;
        } catch(Throwable e) {
            String errorMessage = (message == null ? "" : message.getClass().getName()) + " encode编码失败!";
            logger.error(errorMessage, e);
        }
        return null;
    }

    private HashedWheelTimer deleyFlushTimer = new HashedWheelTimeer(100, TimeUnit.MILLISECONDS);

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if(exceptionHandler == null) {
            cause.printStackTrace();
            logger.error("通讯包异常!", cause);
        } else {
            exceptionHandler.handleConException(xtx, cause);
        }
        ctx.close();
    }

    // get/set


    public WresponsePacket encodePacket(Object message) {
        if(message instanceof WresponsePacket) {
            return (WreponsePacket) message;
        }
        WresponsePacket wresponsePacket =  WresponsePacket.valueOf();
        fillPacket(wresponsePacket, message);
        return wresponsePacket;
    }

    // 创建一个网络协议包
    public void fillPacket(WresponsePacket wresponsePacket, Object message) {
        Class<?> messageClass = message.getClass();
        Integer packetId = classToToPacketId.get(messageClass);
        if(packetId == null) {
            throw new NullPointerException("nor found packetId with" + messageClass);
        }
        byte[]  retureMessageBytes = new byte[0];
        try {
            wresponsePacket.setPacketId(packetId);
            returnMessageBytes = getCodec(packetId).encode(message);
            wresponsePacket.setData(returnMessageBytes);
            if(logger.isDebugEnabled()) {
                wresponsePacket.setDebugPacket(message);
            }
        } catch (IOExcetion e) {
            throw new RuntimeException(String.format("encode packet [%d] error", wresponsePacket.getPacketId()), e);
        }
    }


}

消息处理器描述

public class SocketHandlerDefinition {
    private static final Obeject NOT_EXC = new Object();

    private Obeject bean;
    private Map<Class, Method> methodMap;
    private boolean runInNioThread;

    public static SocketHandlerDefition valueOf(Object bean, Method method, boolean runInNioThread) {
        SocketHandlerDefition shd = new SocketHandlerDefinition();
        shd.bean = bean;
        shd.method =  method;
        shd.sessionPatameter = Wsession.class.isAssignableFrom(method.getParameterTypes()[0]);
        shd.runInNioThread = runInNioThread;
        return shd;
    }

    // 调用方法
    public Object invoke(Wsession session, Obejct packet) {
        if(sessionParameter) {
            return ReflectionUtils.invokeMethod(method, bean, session, packet);
        } else {
            Object player = session.getPlayer();
            if(palayer == null) {
                LOGGER.warn("{} player not login, cannot exc socekt packet {}", session, packet);
                return null;
            } else {
                return ReflectionUtils.invokeMethod(method, bean, player, packet);
            }
        }
    }


    // get/set
}

编码器

解码器

 /**
 *  |size-int-4|packetId-short-2|data|
 */
 public class WpacketDecoder extends ByteToMessageDecoder {

     public static final AttaributeKey<WpacketDecoder> DECODER = AttributeKey.valueOf("WpacketDecode");

    /**
    *   1m
    */
    private static int MAX_SIZE = 1*1024*1024;
    private static int MIN_SIZE = 2;

    /**
    *   length+packetId+6
    */
    private static final int NO_AUTH_MIN_READABLE = 4 + 2;
    /*
    *  length+packetId = 10
    */
    private static final int AUTH_MIN_READABLE = 4 + 2 + 4;
    /**
    *   是否有设置加密解锁
    */ 
    private boolean auth;
    /**
    *   是否开启解密
    */
    private boolean openAuth;
    /**
    *  消息加密索引
    */
    private int index;

    private int staerIndex;

    public WpacketDecoder(int maxLength) {
        if(maxLength <=  MIN_SZIE) {
            logger.error("maxLength error ! length[%s] MIN_SIZE[%s]", maxLength, MIN_SIZE);
        }
        MAX_SIZE = maxLength;
    }

    @Override
    public void channelRegistered() throws Exception{
        super.chanelRegistered(ctx);
        ctx.channel().attr(DECODER).set(this);
    }

    @Ovreride
    protected void decode(ChannelHandkerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if(in.refCnt() == 0 ) {
            // 协议已被提前释放
            return;
        }
        int size =  in.markReaderIndex();
        if(readableBytes < (auth ? AUTH_MIN_READABLE : NO_AUTH_MIN_READABLE)) {
            return;
        }
        in.markReaderIndex();
        int size = in.readInt();
        if(sise >= MAX_SIZE) {
            in.clrear();
            ctx.close();
            NetIpCollectUtls.incErrorPacket(ctx);
            logger.warn("{} error size {}", ctx, size);
            retrun;
        }
        if(in.readableBytes() < size) {
            in.resetReaderIndex();
            return;
        }
        short packetId = in.readShort();
        byte[] data;
        if(auth) {
            int reckey = in.readInt();
            data = new byte[size -6];
            in.readytes(data);
            if(openAuth) {
                int token = claDataToken(data);
                int selfKey = (size ^ packetId ^ index) + token;
                if(recKey != selfKey) {
                    in.clear();
                    ctx.close();
                    logger.warn("{} auth key error! packetId[{}], self[{}], rec[{}], index[{}]",
                    xtx, packetId, selfKey, recKey, index);
                    return;
                }
            }
            index += packetId;
            if(index > INDEX_MAX) {
                this.index = startIndex;
            }
        } else {
                data = new byte[size -2];
                in.readBytes(data);
        }
        WrequestPacket wp = WrequestPacket.valueOf(packetId, data);
        out.add(wp);
    }

    // 开启验证
    public void setAuth(int startIndex, boolean open) {
        this.auth = true;
        this.openAuth = open;
        this.index = startIndex;
        this.startIndex = startIndex;
    }

    // 计算数据验证值
    private int calDataToken(byte[] data) {
        int count = 0;
        for(int i=0; i< data.length; i++) {
            count += data[i] & 0xFF;
        }
        return count;
    }

 }

编码器

public class WpacketEncoder extends MeessageToByteEncoder<WresponsePacket> {
    @Override
    protected void encode(ChannelHandlerCoentext ctx, WresponsePacket msg, ByteBuf out) throws Exception {
        msg.write(out);
    }
}

核心

会话管理

@Componet
public class SessionManager {
    private static SessionManager instance;

    public SessionManager() {
        instance = this;
    }

    public static SessionManager self() {
        return instance;
    }

    // 所有会话
    private ConcurrentHashMap<ChannelId, Wsession> allSessions = new ConcurrrntHashMap<>();

    public void add(Wession session) {
        if(!allSessions.containKey(session.getChannel().id())) {
            addSession.put(session.getChannel().id(), session));
        } else {
            // 不应该进入到这里
            logger.error(String.format("channelId[%s], ip[%s]重复注册 sessionManager", session.getChannel().id().asShortText(), session.getChannel().remoteAddress()));
        }
    }

    public int ipSessionCount(String ip) {
        int count = 0;
        for(Wession session : allSessions.values()) {
            if(session.getChannel().remoteAddress().toString().contains(ip)) {
                count++;
            }
        }
        return count;
    }

    public Wsession getSession(ChannelId channelId) {
        return allSesions.get(channelId);
    }

    public Wession remove(ChannelId channelId) {
        return allSessions.get(channeId);
    }

    public Wsession remove(ChannelId id) {
        Wsession sesion = allSessions.remove(id);
        if (session != null) {
            session.notifyClose();
        }
        return session;
    }

    public ConcurrentHashMap<ChannelId, Wession> getAllSessions() {
        return allSessions;
    }

}

基础包

public class WrequestPacket {

    private int packetId;
    private byte[] data;

    public static WrequestPacket valueOf(short packetId, byte[] data) {
        WrequestPacket wp = new WrequestPacket();
        wp.setPacketId(packetId);
        wp.data =  data;
        return wp;
    }

    // get/set
}

@ProtobufClass
public class WresponsePacket {
    @Protobuf(description = "协议")
    private int packetId;
    @Protobuf(description = "数据")

    private Object debugPacket;

    public static WresponsePacket valueOf(int packetID, byte[] data) {
        WresponawPacket wp = new WresponsePacket();
        wp.setPacketId(packetId);
        wp.data = data;
        return wp;
    }

    public static WresponePacket valueOf(inr packetId, bytep[] data) {
        WresponsePacket wp = new WresponsePacket();
        wp.setPacketId(packetId);
        wp.data = data;
        return wp;
    }

    // 写入协议内容
    public void wirte(ByteBuf out) {
         // length
         out.writeInt(data.length + 2);
         out.wiriteShort(packetId);
         out.writeBytes(data);
    }

    // get/set
}

会话

public class Wsession {
    private static final AttributeKey<Object> MAIN_ENTITY = AttributeKey.valueOf("MAIN_ENTITY");
    private static final AtomicInteger SEQ = new AtomicInteger(1);

    private int id;

    private Channel channel;

    private String ip;

    private int dispatcherHashCOde;

    // 流量记录
    private FirewallRecord firewallRecord = new FirewallRecord();

    private List<CloseListener> closeListeners = new CopyOnWriteArrayList<>();

    private List<SendPacketListener> sendPacketListeners = new ArrayList<>(1);

    private List<ReceivePacketListener> receivePacketListeners = new ArrayList<>(1);

    // 连接时间
    private long connectTime;

    @FunctionalInterface
    public interface CloseListener {
        void run();
    }

    @FunctionalInterface
    public interface SendPacketListener {
        void run(Object packet);
    }

    public static Wession valueOf() {
        Wsession session = new Wsession();
        session.channel = channel;
        session.id = SEQ.incrementAndGet();
        session.dispatcherHashCode = channel.hashCode();
        String ip = channel.removeAddress().toString();
        session.ip = ip.substring(1, ip.indexOd(":"));
        session.connectTime = System.currentTimeMillis();

        if(LOGGER.isDebugEnabled()) {
            session.getSendPacketListeners().add(packet -> {
                Object realPacket = packet;
                if(realPacket instanceof WresponsePacket) {
                    realPacket = ((Wrespon) realPacket).getDebugPacket();
                }
                if(realPacket instanceof ISipDebugOut && ((ISkipDebugOut realPacket).skip()) {
                    return;
                }
                Object target = session.getMainEntity();
                LOGGER.debug(String.fotmat("---->>>%s [%s]%S",
                target == null ? channel : target,
                realPacket.getClass().getSimpleName(),
                JsonUtils.object2String(realPacket)));
            });
            session.getReceivePacketListrners().add(packet -> {
                if(packet instanceof ISkillDebugOut && ((ISkipDebugOut) packet).skip()) {
                    return;
                }
                if(packet.getClass().isAnnotationPresent(SkipDebugOut.class)) {
                    return;
                }
                Object target = session.getMainEntity();
                LOGGER.debug(String.format("<<<--%s [%s]%s",
                target == null ? channel : target,
                packet.getClass().getSumpleName(),
                JsonUtils.object2String( packet)));
            });
        }
        return session;
    }

    private Wsession() {

    }

    public <T> getAttr(AttributeKey<T> key) {
        Attribute<T> attr = channel.attr(key);
        return attr == null ? null : attr.get();
    }

    public <T> void setAttr(AttributeKey<T> key, T t) {
        channel.attr(key).set(t);
    }

    // 绑定玩家
    public void bindPlayer(Object player) {
        setAttr(PLAYER, null);
    }
    // 清除玩家绑定
    public void unBindPlayer() {
        setAttr(PLAYER, null);
    }

    @SuppressWarnings("unchecked")
    public <Player> Player getPlayer() {
        return (Player) getAttr(Player);
    }

    // set/get

    // 当dispatcherHashCode没有初始化时选择channel的hashcode作为分发code

    // 业务消息推送
    public ChannelFuture sendPacket(Object packet) {
        ChannelFuture future = SocketPacketHandler.getIsntance().sendPacket(this, channel, packet, flusNow);
        for (SendPacketListener listener : sendPacketListeners) {
            listener.run(packet);
        }
        return future;
    }

    // 接收到协议触发
    public void onReceivePacket(Object pacekt) {        
        if(receivePacketListeners.isEmpty()) {
            return;
        }
        try {
            receivePacketListeners.forEach(receivePacketListener -> receivePacketListener.run(packet));
        } catch(Exception e) {
            LOGGER.error("receivePacketListener error!", e);
        }
    }   

    private AtomicBoolean flushTimer = new AtomicBoolean(false);

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = prime * result + id;
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if(this == obj) {
            return true;
        }
        if(obj == null) {
            return false;
        }
        if(getClass() != obj.getClass()) {
            return false;
        }
        Wsession other = (Wsession) obj;
        if(id != other.id) {
            return false;
        }
        return true;
    }

    // toString()


    // 通知

    public void notiftClose() {
        for(CloseListenr listener : closeListeners) {
            try {
                listener.run();
            } catch(Exception e) {
                LOGGER.error("session CloseListener异常", e);
            }
        }
    }


}

总结

  1. SocketPacket注释标识的类会被认为是协议类,在SocketPacketHander的初始化时会根据协议类编译对应的生成JProtobuf协议类Codec,并将packetId,packetClass,Codec的关系储存在起来。

  2. SocketMethod是接受请求的声明,在Spring bean 加载启动中