通讯组件学习
网络连接是游戏服务器的基础中的基础。此游戏服务器使用Jprotobuf
事先编译好protobuf文件。通过注解标志各种协议包,接受客户端发送的协议时,自动将其转化为Wsesion
对象,玩家角色登陆后再进一步转化为Player
对象。
用于标记的枚举
模块声明
@Target(ElementTpye.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SocketClass {
}
模块声明
@Target(ElementTpye.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SocketMethod {
// 自定义协议号,用于事件体系
int coustomPacketId() default 0;
}
通讯包声明
@Target(ElementTpye.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Lazy
public @interface SocketMethod {
// 协议号
int packetId();
}
Socket服务器
Socket服务器Wserver
在启动时加载注解标记的通讯包,
@Component
public class Wserver implements ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger(Wserver.class);
@Autowired
private CustomHandlerManager customHandlerManager;
@Autowired
private SessionHandler sessionHandler;
@Autowired
private FlowFirewall flowFirewall;
@Autowired
private IpFirewall ipFirewall;
@Autowired
private SocketPacketHandler socketPacketHandler;
// 自定义的连接日志处理接口
public static LoggingHandler logginHandler;
private int[] ports;
private ApplicationCintext applicationContex;
private int maxlength;
// 开启
public void open() {
ipFirewal.open();
}
// 网络端口是否打开
public boolean ioOpen() {
return ipFirewall.isOpened();
}
// 关闭
public void block() {
ipFirewall.block();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException{
this.applicationContext = applicationContext;
}
// 加载配置
public void loadProperties(Resource rsource) {
Properties prop = new Properties();
prop.load(resource.getInputStream());
this.prots = ConfigUtils.getPorts(prop, ServerConfigConstant.SERVER_PORT);
String maxLengthProp = prop.getProperty(ServerConfigConstant.PACKET_MAXLEGHTH);
if(maxLengthProp == null) {
// 默认1m
maxLength = 1024*1024;
} else {
maxLength = INteger.valueOf(mxaLengthProp) * 1024;
}
}
public void bind() throws InterruptedException, IOException{
bind(new DummyFirewallManger());
}
// 绑定端口
public void bind(FirewallManager firewallManger) thrwos InterruptedException, IOException{
socketPacketHandler.setFirewallManager(firewallManager);
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup bossGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
if(loggingHandler != null) {
serverBootstrap.handler(loggingHandler);
}
serverBootstrap.group(bossGrop, workerGroup)
.channel(NioSreverSocketchannel.class)
.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT)
.option(ChannelOption.SO_BACKLOG,1024)
.childOption(ChannelOption.TCP_NODELAY, true).
.childOption(ChannelOption.TCP_NODELAY)
.childOption(ChannelOption.SO_RCVBUF,1024*32)
.childOption(ChannelOption.SO_SNDBUF,1024*32)
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,new WriteBufferWaterMark(0,128*1024))
.childHandler(nwe ChannelInitializer<SocketChannel> -> {
@Override
public void initChannel(SocketChannel sc) {
ChannelPipeline pipeline = sc.pipeline();
pipeline.addLast("encoder", new WpacketEncoder());
pipeline.addLast("ipFirewall", ipFirewall);
pipeline.addLast("flowFirewal", flowFirewall);
pipeline.addLast("decoder", new WpacketDecoder(maxLength)).
pipeline.addLast("session", sessionHandler);
pipeline.addLast("socketPackHandler",new SocketChooserHandler());
for (ChannelHandlerAdapeter cha : customHandlerManager.getHandlers()) {
sc.pipeline().addLast(cha);
}
}
}) ;
for(int port : ports) {
ChannelFuture cf = serverBootstrap.bind(port);
if (StringUitls.isNotEmpty(host)) {
cf = serverBootstrap.bind(host, port);
} else {
cf = serverBootStrap.bind(port);
}
cf.sync();
channelFutures.add(cf);
}
}
class SocketChoserHandler extends ByteToMessageDecoder {
private final int CHECK_LEN = 5;
private final String WSOCKET_PREFIX = "GET";
@Override
protected void decode(ChannelHandlerContex ctx, ByteBuf in, List<Object> out) throws Ecception{
String protocol = getProtocol(in);
if(protocol == null) {
return;
}
ChannelPipeline pipeline = ctx.pipeline();
if(protocol.startWith(WSOCKET_PREFIX)) {
addWebSocketHandler(pipeline);
} else {
addSocketHandler(pipeline);
}
pipeline.remove(SocketChooserHandler.class);
WpacketDecoder wpacketDecoder = new WpacketDecoder(maxLength);
pipeline.addLast("decoder", wpacketDecoder);
ctx.channel().attr(WpacketDecoder.DECODER).set(wpacketDecoder);
pipeline.addLast("socketPacketHandler", socketPacketHandler);
for(ChanelHandlerAdpater cha : customHandlerManager.getHandlers()) {
pipeline.addLast(cha);
}
}
}
private List<ChannelFuture> channelFutures = new ArrayList<>();
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
public void shutdownGracefully() {
try {
for(ChannelFuture cf : channelFutures) {
if(cf.channel() != null) {
try {
} catch(Exception w) {
logger.error("通讯server channel 关闭异常",e);
}
}
}
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
// get/set.....
}
用户自定义handler管理器
@Component
public final class CustomHandlerManager {
private List<ChannelHandlerAdapter> handlers = new ArrayList<>();
@PostConstruct
public void init() {
doInit();
}
private void doInit() {
// TODO
}
public List<ChannelHanderAdapter> getHandeers() {
return handlers;
}
public void setHandlers(ChannelHandlerAdapter handlers) {
this.handlers = handlers;
}
}
分发器
消息处理器handler
@Sharable
@Component
public class SocketPacketHandler extends ChannelInboundHandlerAdpter implements ApplicationContextAware,
ApplicationContextAware {
private static final boolean OPEN_PROTOBUF_COMPILE = Boolean.valueOf(System.getProperty("openProtobufComplile", "false"));
/**
* 类class与packetId的快速映射
*/
private Map<Class<?>, Integer> calssToPacketId = new ConcurrentHashMap<>();
/**
* packetId与类class的快速映射
*/
private Map<Integer, Codec> paccketIdToCodec = new ConcurrentHashMap<>();
private Set<Integer> runInNIOThreadPacketIds = new HashSet<>();
@Autowired
private SessionManager sessionManager;
@Autowierd
private IEventBusManager eventManager;
private FirewallManager firewallManager;
private ExceptionHandler exceptionHandler;
private IPacketStatistics packetStatistics;
public static SocketPacketHandler getInstance() {
return self;
}
private static SocketPacketHandler self;
@PostConstruct
private void init() {
SocketPacketHandler.self = this;
if(OPEN_PROTOBUF_COMPILE) {
System.err.println("open protobuf dynamic complie!");
} else {
ProtobufProxy.closeCompile();
System.err.println("close protobuf dynamic complie!");
}
StopWatch stopWatch = new StopWatch();
stopWatch.start();
String[] classNames = applicationContext.getBeanNamesForAnnotation(SocketPacket.class);
for(String className : classNames) {
Class<?> packetClass = applicationContex.getType(className);
SocketPacket socketPacket = packetClass.getAnnotion(SocketPacket.class);
if(packetIdToCodec.containkey(socketPacket.packetId())) {
throw new RuntimeException(String.format("socketPacket.packetId() [%s] 重复使用!", socketPacket.packetId()));
}
Codec codec = ProtobufProxy.create(packetClass);
packetIdToCodec.put(socketPacket.packetId(), codec);
classToPacketId.put(packetClass, socketPacket.packetId());
packetIdToClass.put(socketPacket.packetId(), packetClass);
}
stopWatch.stop();
loogger.debug("load protoProxy {} use time {}s", classNames.length, TimeUnit.MILLSECONDS.toSeconds(stopWatch.getTime()));
// 默认协议拦截器
if(packetInterceptor == null) {
packetIneterceptor = new IPacketInterceptor() {
@Override
public boolean isRealPacketMethod() {
return true;
}
@Override
public boolran intercept(Wsession wsession, WrequestPacket packet) {
return false;
}
@Override
public boolean intercept(Wsession wsession, WresponsePacket packet) {
return false;
}
};
} else {
packetInterceptor.afterInit(this);
}
}
@Override
public Object postProcessAfterInirialization() throws BeansException{
Class<?> clazz = bean.getClass();
if(clazz.isAnnotationPersent(SocektClass.class)) {
for(Method method : clazz.getMethods()) {
SocektMethod methodAnnotation = method.getAnnotation(SocketMethod.class);
if(methodAnnotion == null) {
continue;
}
// 参数和返回值验证
Class<?>[] clzs = method.getParameterTypes();
if(clas.length != 2) {
throw new IllegalArgumentException(bean.getClass().getName() +
"." +method.getName() + "只能拥有两个参数");
}
// 接收参数验证
Calss<?> packetClass = method.getPatameterTypes()[1];
int packetId = methodAnnotation.consutomPacketId();
if(pcaketId == 0) {
SocketPacket socketPacekt = packetCalss.getAnnotation(SocketPacket.class);
if(socketPacket == null) {
// 该对象没有class注册
throw new IllegalArgumentException(String.format("class[%s] 没有包含SocketPacket注解!", packetClass));
}
packetId = socketPacket.packetId();
}
if(!"void".equals(method.getReturnType().getName())) {
if(method.getReturnType().getAnnotation(SocketPacket.class) == null) {
throw new IllegalArgumentException(
bean.getClass().getName() + "."+method.getName() + "返回值必须包含SocketPacket注解"
);
}
}
if(packetInterceptor != null && !packetInterceptor.isRacketMethod(packetId, method)) {
// 检测是否需要注册该协议处理方法
continue;
}
// 是否有其他执行对象已经注册了此消息
SocektHandkerDefinition shd = handlerDefinitions.get(packetId);
if(shd != null) {
throw new IllegalArgumentException(String.format("class[%s]和class[%s]重复使用,一个packetId只能用在一个方法上!", shd.getBean().getClass, packetClass, packetId));
}
boolean runInNioThread = method.getAnnotation(IRunInNioThread.class) != null;
handlerDefinitions.put(packetId, SocketHandlerDefinition.valueOf(bean, method, runInNioThread));
}
return bean;
}
}
@Override
public Object postProcessBeforeInitialization(Object bean, String name) throws BeansException{
return bean;
}
public interface ExceptionHandler {
// 协议处理异常处理
void handlerPacketException(Wsession session, Throwable e);
// 连接异常处理
void handlerConException(ChannelHandlerContext ctx, Throwable cause);
}
private ExceptionHandler exceptionHandler;
@Override
public void channelRead(ChannelHandlerContext ctx, Obejct msg) throws Exception {
// 将包转化为 WrequestPacket 类型 , 编码器已经编码好了
final WrequestPacket packet = (WrequestPacket) msg;
// 根据channel获得一个session
final Wsession session = sessionManager.getSession(ctx.channel().id());
if(firewallManager != null && !firewallManger.packerFilter(session, packet)) {
logger.error(String.format("session[%s] packetId[%s]发送非法的信息,可能客户端没有登陆就放在发送信息!"));
return;
}
if(packetInterceptor.intercept(session, packet)) {
return;
}
if(packetStatistics != null) {
packetStatistics.receive(packet);
}
final Object message;
try {
message = packetInterceptor.customDecodePacket(packet.getPacketId(), packet.getData());
} catch(IOException e) {
logger.error("decode error", e);
return;
}
excMaessage(session, packet.getPacketId(), message);
}
/**
* 解析消息包
*/
public Object decodePacket(int packetId, byte[] data) {
Code codec = SocketPacketHanndler.getInstance().getCodec(packetId);
if(codec == null) {
logger.error("not found codec with packetId {}", packetId);
return null;
}
try {
} catch(IOException e) {
throw new RuntimeException(String.format("decode packet[%d] error", packetId), e);
}
}
// 执行协议
public void excMessage(Wsession session, int packetId, Obejct message) {
if(message == null) {
return;
}
final SocketHandlerDefinition shd = handlerDefinitions.get(packetId);
if(shd == null) {
logger.error(String.format("没有找到处理[%s]的SocketHandlerDefinition", packetId));
}
session.onReceivePacket(message);
if(shd.isRunInNioThread()) {
// 特殊业务,在NIO线程执行
excMessage(session, shd, message);
return;
}
IdentityEventExcutorGrop.addTask(new AbstractDispatcherCodeRUnnable() {
@Override
public void doRun() {
excMessage(session, shd, message);
}
@Override
public long timeoutNanoTime() {
// 3毫秒
return 3 * 1000 * 1000;
}
@Override
public String name() {
return "wSocket_" + packetId;
}
@Override
public int gerDispatcherHashCode() {
return session.selectDispatcherHashCode();
}
});
}
public vooid excMessage(Wession session, SocketHandlerDefinition shd, Object message) {
try {
Object returnMessage = shd.invoke(session, message);
if(returnMessage == null) {
return;
}
session.sendPacket(returnMessage);
} catch(Exception e) {
if(exceptionHandler != null) {
exceptionHandler.handlerPacketException(sessoin, e);
} else {
logger.error("SocketHandlerDefinition任务执行失败", e);
}
}
}
private IEventCallback buildEnvetCallback(Wession session) {
return new IEventCallback() {
@Override
public void callback(Object returnMsg) {
if(returnMsg == null) {
return;
}
session.sendPacket(returnMsg);
}
@Override
public void exception(Throwable throwable) {
if(exceptionHandelr != null) {
exceptionHandler.handlePacketException(session, throwable);
} else {
logger.error("SocketHandlerDefinition任务执行失败!",throwable);
}
}
};
}
// 所有的业务消息必须走这里过去
public ChannelFuture sendPacket(Wsession session, Channel channel, Object message, boolean flush) {
try {
WresponsePacket wp = encodePacket(message);
if(packetStatistics != null) {
packetStatistcs.send(wp);
}
if(flush) {
return channel.writeAndFlush(wp);
}
ChannelFuture future = channel.write(wp);
if(session.getFlushTimer().compareAndSet(false, true)) {
delayFlushTimer.newTimeout(timeout -> {
session.getFlushTimer().compareAndSet(true, false);
channel.flush();
}, 100, TimeUnit.MILLISECONDS);
}
return future;
} catch(Throwable e) {
String errorMessage = (message == null ? "" : message.getClass().getName()) + " encode编码失败!";
logger.error(errorMessage, e);
}
return null;
}
private HashedWheelTimer deleyFlushTimer = new HashedWheelTimeer(100, TimeUnit.MILLISECONDS);
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if(exceptionHandler == null) {
cause.printStackTrace();
logger.error("通讯包异常!", cause);
} else {
exceptionHandler.handleConException(xtx, cause);
}
ctx.close();
}
// get/set
public WresponsePacket encodePacket(Object message) {
if(message instanceof WresponsePacket) {
return (WreponsePacket) message;
}
WresponsePacket wresponsePacket = WresponsePacket.valueOf();
fillPacket(wresponsePacket, message);
return wresponsePacket;
}
// 创建一个网络协议包
public void fillPacket(WresponsePacket wresponsePacket, Object message) {
Class<?> messageClass = message.getClass();
Integer packetId = classToToPacketId.get(messageClass);
if(packetId == null) {
throw new NullPointerException("nor found packetId with" + messageClass);
}
byte[] retureMessageBytes = new byte[0];
try {
wresponsePacket.setPacketId(packetId);
returnMessageBytes = getCodec(packetId).encode(message);
wresponsePacket.setData(returnMessageBytes);
if(logger.isDebugEnabled()) {
wresponsePacket.setDebugPacket(message);
}
} catch (IOExcetion e) {
throw new RuntimeException(String.format("encode packet [%d] error", wresponsePacket.getPacketId()), e);
}
}
}
消息处理器描述
public class SocketHandlerDefinition {
private static final Obeject NOT_EXC = new Object();
private Obeject bean;
private Map<Class, Method> methodMap;
private boolean runInNioThread;
public static SocketHandlerDefition valueOf(Object bean, Method method, boolean runInNioThread) {
SocketHandlerDefition shd = new SocketHandlerDefinition();
shd.bean = bean;
shd.method = method;
shd.sessionPatameter = Wsession.class.isAssignableFrom(method.getParameterTypes()[0]);
shd.runInNioThread = runInNioThread;
return shd;
}
// 调用方法
public Object invoke(Wsession session, Obejct packet) {
if(sessionParameter) {
return ReflectionUtils.invokeMethod(method, bean, session, packet);
} else {
Object player = session.getPlayer();
if(palayer == null) {
LOGGER.warn("{} player not login, cannot exc socekt packet {}", session, packet);
return null;
} else {
return ReflectionUtils.invokeMethod(method, bean, player, packet);
}
}
}
// get/set
}
编码器
解码器
/**
* |size-int-4|packetId-short-2|data|
*/
public class WpacketDecoder extends ByteToMessageDecoder {
public static final AttaributeKey<WpacketDecoder> DECODER = AttributeKey.valueOf("WpacketDecode");
/**
* 1m
*/
private static int MAX_SIZE = 1*1024*1024;
private static int MIN_SIZE = 2;
/**
* length+packetId+6
*/
private static final int NO_AUTH_MIN_READABLE = 4 + 2;
/*
* length+packetId = 10
*/
private static final int AUTH_MIN_READABLE = 4 + 2 + 4;
/**
* 是否有设置加密解锁
*/
private boolean auth;
/**
* 是否开启解密
*/
private boolean openAuth;
/**
* 消息加密索引
*/
private int index;
private int staerIndex;
public WpacketDecoder(int maxLength) {
if(maxLength <= MIN_SZIE) {
logger.error("maxLength error ! length[%s] MIN_SIZE[%s]", maxLength, MIN_SIZE);
}
MAX_SIZE = maxLength;
}
@Override
public void channelRegistered() throws Exception{
super.chanelRegistered(ctx);
ctx.channel().attr(DECODER).set(this);
}
@Ovreride
protected void decode(ChannelHandkerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if(in.refCnt() == 0 ) {
// 协议已被提前释放
return;
}
int size = in.markReaderIndex();
if(readableBytes < (auth ? AUTH_MIN_READABLE : NO_AUTH_MIN_READABLE)) {
return;
}
in.markReaderIndex();
int size = in.readInt();
if(sise >= MAX_SIZE) {
in.clrear();
ctx.close();
NetIpCollectUtls.incErrorPacket(ctx);
logger.warn("{} error size {}", ctx, size);
retrun;
}
if(in.readableBytes() < size) {
in.resetReaderIndex();
return;
}
short packetId = in.readShort();
byte[] data;
if(auth) {
int reckey = in.readInt();
data = new byte[size -6];
in.readytes(data);
if(openAuth) {
int token = claDataToken(data);
int selfKey = (size ^ packetId ^ index) + token;
if(recKey != selfKey) {
in.clear();
ctx.close();
logger.warn("{} auth key error! packetId[{}], self[{}], rec[{}], index[{}]",
xtx, packetId, selfKey, recKey, index);
return;
}
}
index += packetId;
if(index > INDEX_MAX) {
this.index = startIndex;
}
} else {
data = new byte[size -2];
in.readBytes(data);
}
WrequestPacket wp = WrequestPacket.valueOf(packetId, data);
out.add(wp);
}
// 开启验证
public void setAuth(int startIndex, boolean open) {
this.auth = true;
this.openAuth = open;
this.index = startIndex;
this.startIndex = startIndex;
}
// 计算数据验证值
private int calDataToken(byte[] data) {
int count = 0;
for(int i=0; i< data.length; i++) {
count += data[i] & 0xFF;
}
return count;
}
}
编码器
public class WpacketEncoder extends MeessageToByteEncoder<WresponsePacket> {
@Override
protected void encode(ChannelHandlerCoentext ctx, WresponsePacket msg, ByteBuf out) throws Exception {
msg.write(out);
}
}
核心
会话管理
@Componet
public class SessionManager {
private static SessionManager instance;
public SessionManager() {
instance = this;
}
public static SessionManager self() {
return instance;
}
// 所有会话
private ConcurrentHashMap<ChannelId, Wsession> allSessions = new ConcurrrntHashMap<>();
public void add(Wession session) {
if(!allSessions.containKey(session.getChannel().id())) {
addSession.put(session.getChannel().id(), session));
} else {
// 不应该进入到这里
logger.error(String.format("channelId[%s], ip[%s]重复注册 sessionManager", session.getChannel().id().asShortText(), session.getChannel().remoteAddress()));
}
}
public int ipSessionCount(String ip) {
int count = 0;
for(Wession session : allSessions.values()) {
if(session.getChannel().remoteAddress().toString().contains(ip)) {
count++;
}
}
return count;
}
public Wsession getSession(ChannelId channelId) {
return allSesions.get(channelId);
}
public Wession remove(ChannelId channelId) {
return allSessions.get(channeId);
}
public Wsession remove(ChannelId id) {
Wsession sesion = allSessions.remove(id);
if (session != null) {
session.notifyClose();
}
return session;
}
public ConcurrentHashMap<ChannelId, Wession> getAllSessions() {
return allSessions;
}
}
基础包
public class WrequestPacket {
private int packetId;
private byte[] data;
public static WrequestPacket valueOf(short packetId, byte[] data) {
WrequestPacket wp = new WrequestPacket();
wp.setPacketId(packetId);
wp.data = data;
return wp;
}
// get/set
}
@ProtobufClass
public class WresponsePacket {
@Protobuf(description = "协议")
private int packetId;
@Protobuf(description = "数据")
private Object debugPacket;
public static WresponsePacket valueOf(int packetID, byte[] data) {
WresponawPacket wp = new WresponsePacket();
wp.setPacketId(packetId);
wp.data = data;
return wp;
}
public static WresponePacket valueOf(inr packetId, bytep[] data) {
WresponsePacket wp = new WresponsePacket();
wp.setPacketId(packetId);
wp.data = data;
return wp;
}
// 写入协议内容
public void wirte(ByteBuf out) {
// length
out.writeInt(data.length + 2);
out.wiriteShort(packetId);
out.writeBytes(data);
}
// get/set
}
会话
public class Wsession {
private static final AttributeKey<Object> MAIN_ENTITY = AttributeKey.valueOf("MAIN_ENTITY");
private static final AtomicInteger SEQ = new AtomicInteger(1);
private int id;
private Channel channel;
private String ip;
private int dispatcherHashCOde;
// 流量记录
private FirewallRecord firewallRecord = new FirewallRecord();
private List<CloseListener> closeListeners = new CopyOnWriteArrayList<>();
private List<SendPacketListener> sendPacketListeners = new ArrayList<>(1);
private List<ReceivePacketListener> receivePacketListeners = new ArrayList<>(1);
// 连接时间
private long connectTime;
@FunctionalInterface
public interface CloseListener {
void run();
}
@FunctionalInterface
public interface SendPacketListener {
void run(Object packet);
}
public static Wession valueOf() {
Wsession session = new Wsession();
session.channel = channel;
session.id = SEQ.incrementAndGet();
session.dispatcherHashCode = channel.hashCode();
String ip = channel.removeAddress().toString();
session.ip = ip.substring(1, ip.indexOd(":"));
session.connectTime = System.currentTimeMillis();
if(LOGGER.isDebugEnabled()) {
session.getSendPacketListeners().add(packet -> {
Object realPacket = packet;
if(realPacket instanceof WresponsePacket) {
realPacket = ((Wrespon) realPacket).getDebugPacket();
}
if(realPacket instanceof ISipDebugOut && ((ISkipDebugOut realPacket).skip()) {
return;
}
Object target = session.getMainEntity();
LOGGER.debug(String.fotmat("---->>>%s [%s]%S",
target == null ? channel : target,
realPacket.getClass().getSimpleName(),
JsonUtils.object2String(realPacket)));
});
session.getReceivePacketListrners().add(packet -> {
if(packet instanceof ISkillDebugOut && ((ISkipDebugOut) packet).skip()) {
return;
}
if(packet.getClass().isAnnotationPresent(SkipDebugOut.class)) {
return;
}
Object target = session.getMainEntity();
LOGGER.debug(String.format("<<<--%s [%s]%s",
target == null ? channel : target,
packet.getClass().getSumpleName(),
JsonUtils.object2String( packet)));
});
}
return session;
}
private Wsession() {
}
public <T> getAttr(AttributeKey<T> key) {
Attribute<T> attr = channel.attr(key);
return attr == null ? null : attr.get();
}
public <T> void setAttr(AttributeKey<T> key, T t) {
channel.attr(key).set(t);
}
// 绑定玩家
public void bindPlayer(Object player) {
setAttr(PLAYER, null);
}
// 清除玩家绑定
public void unBindPlayer() {
setAttr(PLAYER, null);
}
@SuppressWarnings("unchecked")
public <Player> Player getPlayer() {
return (Player) getAttr(Player);
}
// set/get
// 当dispatcherHashCode没有初始化时选择channel的hashcode作为分发code
// 业务消息推送
public ChannelFuture sendPacket(Object packet) {
ChannelFuture future = SocketPacketHandler.getIsntance().sendPacket(this, channel, packet, flusNow);
for (SendPacketListener listener : sendPacketListeners) {
listener.run(packet);
}
return future;
}
// 接收到协议触发
public void onReceivePacket(Object pacekt) {
if(receivePacketListeners.isEmpty()) {
return;
}
try {
receivePacketListeners.forEach(receivePacketListener -> receivePacketListener.run(packet));
} catch(Exception e) {
LOGGER.error("receivePacketListener error!", e);
}
}
private AtomicBoolean flushTimer = new AtomicBoolean(false);
@Override
public int hashCode() {
final int prime = 31;
int result = prime * result + id;
return result;
}
@Override
public boolean equals(Object obj) {
if(this == obj) {
return true;
}
if(obj == null) {
return false;
}
if(getClass() != obj.getClass()) {
return false;
}
Wsession other = (Wsession) obj;
if(id != other.id) {
return false;
}
return true;
}
// toString()
// 通知
public void notiftClose() {
for(CloseListenr listener : closeListeners) {
try {
listener.run();
} catch(Exception e) {
LOGGER.error("session CloseListener异常", e);
}
}
}
}
总结
SocketPacket
注释标识的类会被认为是协议类,在SocketPacketHander
的初始化时会根据协议类编译对应的生成JProtobuf协议类Codec
,并将packetId
,packetClass
,Codec
的关系储存在起来。SocketMethod
是接受请求的声明,在Spring bean 加载启动中