通讯组件学习

网络连接是游戏服务器的基础中的基础。此游戏服务器使用Jprotobuf事先编译好protobuf文件。通过注解标志各种协议包,接受客户端发送的协议时,自动将其转化为Wsesion对象,玩家角色登陆后再进一步转化为Player对象。

用于标记的枚举

模块声明

@Target(ElementTpye.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SocketClass {

}

模块声明

@Target(ElementTpye.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SocketMethod {
    // 自定义协议号,用于事件体系
    int coustomPacketId() default 0;

}

通讯包声明

@Target(ElementTpye.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Lazy
public @interface SocketMethod {
    // 协议号
    int packetId();
}

Socket服务器

Socket服务器Wserver在启动时加载注解标记的通讯包,

@Component
public class Wserver implements ApplicationContextAware {
    private static final Logger logger = LoggerFactory.getLogger(Wserver.class);

    @Autowired
    private CustomHandlerManager customHandlerManager;
    @Autowired
    private SessionHandler sessionHandler;
    @Autowired
    private FlowFirewall flowFirewall;
    @Autowired
    private IpFirewall ipFirewall;
    @Autowired
    private SocketPacketHandler socketPacketHandler;

    // 自定义的连接日志处理接口
    public static LoggingHandler logginHandler;

    private int[] ports;
    private ApplicationCintext applicationContex;
    private int maxlength;

    // 开启
    public void open() {
        ipFirewal.open();
    }

    // 网络端口是否打开
    public boolean ioOpen() {
        return ipFirewall.isOpened();
    }

    // 关闭
    public void block() {
        ipFirewall.block();
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException{
        this.applicationContext = applicationContext;
    }

    // 加载配置
    public void loadProperties(Resource rsource) {
        Properties prop = new Properties();
        prop.load(resource.getInputStream());
        this.prots = ConfigUtils.getPorts(prop, ServerConfigConstant.SERVER_PORT);

        String maxLengthProp = prop.getProperty(ServerConfigConstant.PACKET_MAXLEGHTH);
        if(maxLengthProp == null) {
            // 默认1m
            maxLength = 1024*1024;
        } else  {
            maxLength = INteger.valueOf(mxaLengthProp) * 1024;
        }
    }

    public void bind() throws InterruptedException, IOException{
        bind(new DummyFirewallManger());
    }

    // 绑定端口
    public void bind(FirewallManager firewallManger) thrwos InterruptedException, IOException{
        socketPacketHandler.setFirewallManager(firewallManager);
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        if(loggingHandler != null) {
            serverBootstrap.handler(loggingHandler);
        }
        serverBootstrap.group(bossGrop, workerGroup)
            .channel(NioSreverSocketchannel.class)
            .option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT)
            .option(ChannelOption.SO_BACKLOG,1024)
            .childOption(ChannelOption.TCP_NODELAY, true).
            .childOption(ChannelOption.TCP_NODELAY)
            .childOption(ChannelOption.SO_RCVBUF,1024*32)
            .childOption(ChannelOption.SO_SNDBUF,1024*32)
            .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,new WriteBufferWaterMark(0,128*1024))
            .childHandler(nwe ChannelInitializer<SocketChannel> -> {
                @Override
                public void initChannel(SocketChannel sc) {
                    ChannelPipeline pipeline = sc.pipeline();
                    pipeline.addLast("encoder", new WpacketEncoder());
                    pipeline.addLast("ipFirewall", ipFirewall);
                    pipeline.addLast("flowFirewal", flowFirewall);
                    pipeline.addLast("decoder", new WpacketDecoder(maxLength)).
                    pipeline.addLast("session", sessionHandler);
                    pipeline.addLast("socketPackHandler",new SocketChooserHandler());
                    for (ChannelHandlerAdapeter cha : customHandlerManager.getHandlers()) {
                        sc.pipeline().addLast(cha);
                    }
                }
            }) ;
        for(int port : ports) {
            ChannelFuture cf = serverBootstrap.bind(port);
            if (StringUitls.isNotEmpty(host)) {
                cf = serverBootstrap.bind(host, port);
            } else {
                cf = serverBootStrap.bind(port);
            }
            cf.sync();
            channelFutures.add(cf);
        }
    }

    class SocketChoserHandler extends ByteToMessageDecoder {
        private final int CHECK_LEN = 5;
        private final String WSOCKET_PREFIX = "GET";

        @Override
        protected void decode(ChannelHandlerContex ctx, ByteBuf in, List<Object> out) throws Ecception{
            String protocol = getProtocol(in);
            if(protocol == null) {
                return;
            }
            ChannelPipeline pipeline = ctx.pipeline();
            if(protocol.startWith(WSOCKET_PREFIX)) {
                addWebSocketHandler(pipeline);
            } else {
                addSocketHandler(pipeline);
            }
            pipeline.remove(SocketChooserHandler.class);

            WpacketDecoder wpacketDecoder = new WpacketDecoder(maxLength);
            pipeline.addLast("decoder", wpacketDecoder);
            ctx.channel().attr(WpacketDecoder.DECODER).set(wpacketDecoder);
            pipeline.addLast("socketPacketHandler", socketPacketHandler);
            for(ChanelHandlerAdpater cha : customHandlerManager.getHandlers()) {
                pipeline.addLast(cha);
            }
        }
    }
    private List<ChannelFuture> channelFutures = new ArrayList<>();

    private EventLoopGroup bossGroup;

    private EventLoopGroup workerGroup;

    public void shutdownGracefully() {
        try {
            for(ChannelFuture cf : channelFutures) {
                if(cf.channel() != null) {
                    try {

                    } catch(Exception w) {
                        logger.error("通讯server channel 关闭异常",e);
                    }
                }
            }
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    // get/set.....

}

用户自定义handler管理器

@Component

public final class CustomHandlerManager {
    private List<ChannelHandlerAdapter> handlers = new ArrayList<>();

    @PostConstruct
    public void init() {
        doInit();
    }

    private void doInit() {
        // TODO
    }

    public List<ChannelHanderAdapter> getHandeers() {
        return handlers;
    }

    public void setHandlers(ChannelHandlerAdapter handlers) {
        this.handlers = handlers;
    }
}

分发器

消息处理器handler

@Sharable
@Component
public class SocketPacketHandler extends ChannelInboundHandlerAdpter implements ApplicationContextAware,
    ApplicationContextAware  {
    private static final boolean OPEN_PROTOBUF_COMPILE = Boolean.valueOf(System.getProperty("openProtobufComplile", "false"));

    /**
    *   类class与packetId的快速映射
    */
    private Map<Class<?>, Integer> calssToPacketId = new ConcurrentHashMap<>();

    /**
    *   packetId与类class的快速映射
    */

    private Map<Integer, Codec> paccketIdToCodec = new ConcurrentHashMap<>();

    private Set<Integer> runInNIOThreadPacketIds = new HashSet<>();

    @Autowired
    private SessionManager sessionManager;

    @Autowierd
    private IEventBusManager  eventManager;

    private FirewallManager firewallManager;
    private ExceptionHandler exceptionHandler;
    private IPacketStatistics packetStatistics;

    public static SocketPacketHandler getInstance() {
        return self;
    }
    private static SocketPacketHandler self;

    @PostConstruct
    private void init() {
        SocketPacketHandler.self = this;
        if(OPEN_PROTOBUF_COMPILE) {
            System.err.println("open protobuf dynamic complie!");
        } else {
            ProtobufProxy.closeCompile();
            System.err.println("close protobuf dynamic complie!");
        }
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        String[] classNames = applicationContext.getBeanNamesForAnnotation(SocketPacket.class);
        for(String className : classNames) {
            Class<?> packetClass = applicationContex.getType(className);
            SocketPacket socketPacket = packetClass.getAnnotion(SocketPacket.class);
            if(packetIdToCodec.containkey(socketPacket.packetId())) {
                throw new RuntimeException(String.format("socketPacket.packetId() [%s] 重复使用!", socketPacket.packetId()));
            }
            Codec codec = ProtobufProxy.create(packetClass);
            packetIdToCodec.put(socketPacket.packetId(), codec);
            classToPacketId.put(packetClass, socketPacket.packetId());
            packetIdToClass.put(socketPacket.packetId(), packetClass);
        }
        stopWatch.stop();
        loogger.debug("load protoProxy {} use time {}s", classNames.length, TimeUnit.MILLSECONDS.toSeconds(stopWatch.getTime()));
        // 默认协议拦截器
        if(packetInterceptor == null) {
            packetIneterceptor = new IPacketInterceptor() {
                @Override
                public boolean isRealPacketMethod() {
                    return true;
                }
                @Override
                public boolran intercept(Wsession wsession, WrequestPacket packet) {
                    return false;
                }
                @Override
                public boolean intercept(Wsession wsession, WresponsePacket packet) {
                    return false;
                }
            };
        } else {
            packetInterceptor.afterInit(this);
        }
    }


    @Override
    public Object postProcessAfterInirialization() throws BeansException{
        Class<?> clazz = bean.getClass();
        if(clazz.isAnnotationPersent(SocektClass.class)) {
            for(Method method : clazz.getMethods()) {
                 SocektMethod methodAnnotation = method.getAnnotation(SocketMethod.class);
                 if(methodAnnotion == null) {
                     continue;
                 }
                 // 参数和返回值验证
                 Class<?>[] clzs = method.getParameterTypes();
                 if(clas.length != 2) {
                     throw new IllegalArgumentException(bean.getClass().getName() + 
                     "."  +method.getName() + "只能拥有两个参数");
                 }
                 // 接收参数验证
                 Calss<?> packetClass = method.getPatameterTypes()[1];
                 int packetId = methodAnnotation.consutomPacketId();
                 if(pcaketId == 0) {
                    SocketPacket socketPacekt = packetCalss.getAnnotation(SocketPacket.class);
                    if(socketPacket == null) {
                        // 该对象没有class注册
                        throw new IllegalArgumentException(String.format("class[%s] 没有包含SocketPacket注解!", packetClass));
                    }
                    packetId = socketPacket.packetId();
                 }
                 if(!"void".equals(method.getReturnType().getName())) {
                     if(method.getReturnType().getAnnotation(SocketPacket.class) == null) {
                        throw new IllegalArgumentException(
                            bean.getClass().getName() + "."+method.getName() + "返回值必须包含SocketPacket注解"
                        );
                     }
                 }
                 if(packetInterceptor != null && !packetInterceptor.isRacketMethod(packetId, method)) {
                     // 检测是否需要注册该协议处理方法
                     continue;
                 }
                 // 是否有其他执行对象已经注册了此消息
                 SocektHandkerDefinition shd = handlerDefinitions.get(packetId);
                 if(shd != null) {
                     throw new IllegalArgumentException(String.format("class[%s]和class[%s]重复使用,一个packetId只能用在一个方法上!", shd.getBean().getClass, packetClass, packetId));
                 }
                 boolean runInNioThread = method.getAnnotation(IRunInNioThread.class) != null;
                 handlerDefinitions.put(packetId, SocketHandlerDefinition.valueOf(bean, method, runInNioThread));
            }
           return bean;
        }
    }

    @Override
    public Object postProcessBeforeInitialization(Object bean, String name) throws BeansException{
        return bean;
    }

        public interface ExceptionHandler {
        // 协议处理异常处理
        void handlerPacketException(Wsession session, Throwable e);

        // 连接异常处理
        void handlerConException(ChannelHandlerContext ctx, Throwable cause);
    }

    private ExceptionHandler exceptionHandler;


    @Override
    public void channelRead(ChannelHandlerContext ctx, Obejct msg) throws Exception {
        // 将包转化为 WrequestPacket 类型 , 编码器已经编码好了
        final WrequestPacket packet = (WrequestPacket) msg;
        // 根据channel获得一个session
        final Wsession session = sessionManager.getSession(ctx.channel().id());

        if(firewallManager != null && !firewallManger.packerFilter(session, packet)) {
            logger.error(String.format("session[%s] packetId[%s]发送非法的信息,可能客户端没有登陆就放在发送信息!"));
            return;
        }
        if(packetInterceptor.intercept(session, packet)) {
            return;
        }
        if(packetStatistics != null) {
            packetStatistics.receive(packet);
        }
        final Object message;
        try {
            message = packetInterceptor.customDecodePacket(packet.getPacketId(), packet.getData());
        } catch(IOException e) {
            logger.error("decode error", e);
            return;
        }
        excMaessage(session, packet.getPacketId(), message);
    }

    /**
    *   解析消息包
    */
    public Object decodePacket(int packetId, byte[] data) {
        Code codec = SocketPacketHanndler.getInstance().getCodec(packetId);
        if(codec == null) {
            logger.error("not found codec with packetId {}", packetId);
            return null;
        }
        try {

        } catch(IOException e) {
            throw new RuntimeException(String.format("decode packet[%d] error", packetId), e);
        }
    }


    // 执行协议
    public void excMessage(Wsession session, int packetId, Obejct message) {
        if(message == null) {
            return;
        }
        final SocketHandlerDefinition shd = handlerDefinitions.get(packetId);
        if(shd == null) {
            logger.error(String.format("没有找到处理[%s]的SocketHandlerDefinition", packetId));
        }
        session.onReceivePacket(message);
        if(shd.isRunInNioThread()) {
            // 特殊业务,在NIO线程执行
            excMessage(session, shd, message);
            return;
        }
        IdentityEventExcutorGrop.addTask(new AbstractDispatcherCodeRUnnable() {
            @Override
            public void doRun() {
                excMessage(session, shd, message);
            }

            @Override
            public long timeoutNanoTime() {
                // 3毫秒
                return 3 * 1000 * 1000;
            }

            @Override
            public String name() {
                return "wSocket_" + packetId;
            }
            @Override
            public int gerDispatcherHashCode() {
                return session.selectDispatcherHashCode();
            }
        });
    }

    public vooid excMessage(Wession session, SocketHandlerDefinition shd, Object message) {
        try {
            Object returnMessage = shd.invoke(session, message);
            if(returnMessage == null) {
                return;
            }
            session.sendPacket(returnMessage);
        } catch(Exception e) {
            if(exceptionHandler != null) {
                exceptionHandler.handlerPacketException(sessoin, e);
            } else {
                logger.error("SocketHandlerDefinition任务执行失败", e);
            }
        }
    }

    private IEventCallback buildEnvetCallback(Wession session) {
        return new IEventCallback() {
            @Override
            public void callback(Object returnMsg) {
                if(returnMsg == null) {
                    return;
                }
                session.sendPacket(returnMsg);
            }

            @Override
            public void exception(Throwable throwable) {
                if(exceptionHandelr != null) {
                    exceptionHandler.handlePacketException(session, throwable);
                } else {
                    logger.error("SocketHandlerDefinition任务执行失败!",throwable);
                }
            }
        };
    }

    // 所有的业务消息必须走这里过去
    public ChannelFuture sendPacket(Wsession session, Channel channel, Object message, boolean flush) {
        try {
            WresponsePacket wp = encodePacket(message);
            if(packetStatistics != null) {
                packetStatistcs.send(wp);
            }
            if(flush) {
                return channel.writeAndFlush(wp);
            }
            ChannelFuture future = channel.write(wp);
            if(session.getFlushTimer().compareAndSet(false, true)) {
                delayFlushTimer.newTimeout(timeout -> {
                    session.getFlushTimer().compareAndSet(true, false);
                    channel.flush();
                    }, 100, TimeUnit.MILLISECONDS);
            }
            return future;
        } catch(Throwable e) {
            String errorMessage = (message == null ? "" : message.getClass().getName()) + " encode编码失败!";
            logger.error(errorMessage, e);
        }
        return null;
    }

    private HashedWheelTimer deleyFlushTimer = new HashedWheelTimeer(100, TimeUnit.MILLISECONDS);

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if(exceptionHandler == null) {
            cause.printStackTrace();
            logger.error("通讯包异常!", cause);
        } else {
            exceptionHandler.handleConException(xtx, cause);
        }
        ctx.close();
    }

    // get/set


    public WresponsePacket encodePacket(Object message) {
        if(message instanceof WresponsePacket) {
            return (WreponsePacket) message;
        }
        WresponsePacket wresponsePacket =  WresponsePacket.valueOf();
        fillPacket(wresponsePacket, message);
        return wresponsePacket;
    }

    // 创建一个网络协议包
    public void fillPacket(WresponsePacket wresponsePacket, Object message) {
        Class<?> messageClass = message.getClass();
        Integer packetId = classToToPacketId.get(messageClass);
        if(packetId == null) {
            throw new NullPointerException("nor found packetId with" + messageClass);
        }
        byte[]  retureMessageBytes = new byte[0];
        try {
            wresponsePacket.setPacketId(packetId);
            returnMessageBytes = getCodec(packetId).encode(message);
            wresponsePacket.setData(returnMessageBytes);
            if(logger.isDebugEnabled()) {
                wresponsePacket.setDebugPacket(message);
            }
        } catch (IOExcetion e) {
            throw new RuntimeException(String.format("encode packet [%d] error", wresponsePacket.getPacketId()), e);
        }
    }


}

消息处理器描述

public class SocketHandlerDefinition {
    private static final Obeject NOT_EXC = new Object();

    private Obeject bean;
    private Map<Class, Method> methodMap;
    private boolean runInNioThread;

    public static SocketHandlerDefition valueOf(Object bean, Method method, boolean runInNioThread) {
        SocketHandlerDefition shd = new SocketHandlerDefinition();
        shd.bean = bean;
        shd.method =  method;
        shd.sessionPatameter = Wsession.class.isAssignableFrom(method.getParameterTypes()[0]);
        shd.runInNioThread = runInNioThread;
        return shd;
    }

    // 调用方法
    public Object invoke(Wsession session, Obejct packet) {
        if(sessionParameter) {
            return ReflectionUtils.invokeMethod(method, bean, session, packet);
        } else {
            Object player = session.getPlayer();
            if(palayer == null) {
                LOGGER.warn("{} player not login, cannot exc socekt packet {}", session, packet);
                return null;
            } else {
                return ReflectionUtils.invokeMethod(method, bean, player, packet);
            }
        }
    }


    // get/set
}

编码器

解码器

 /**
 *  |size-int-4|packetId-short-2|data|
 */
 public class WpacketDecoder extends ByteToMessageDecoder {

     public static final AttaributeKey<WpacketDecoder> DECODER = AttributeKey.valueOf("WpacketDecode");

    /**
    *   1m
    */
    private static int MAX_SIZE = 1*1024*1024;
    private static int MIN_SIZE = 2;

    /**
    *   length+packetId+6
    */
    private static final int NO_AUTH_MIN_READABLE = 4 + 2;
    /*
    *  length+packetId = 10
    */
    private static final int AUTH_MIN_READABLE = 4 + 2 + 4;
    /**
    *   是否有设置加密解锁
    */ 
    private boolean auth;
    /**
    *   是否开启解密
    */
    private boolean openAuth;
    /**
    *  消息加密索引
    */
    private int index;

    private int staerIndex;

    public WpacketDecoder(int maxLength) {
        if(maxLength <=  MIN_SZIE) {
            logger.error("maxLength error ! length[%s] MIN_SIZE[%s]", maxLength, MIN_SIZE);
        }
        MAX_SIZE = maxLength;
    }

    @Override
    public void channelRegistered() throws Exception{
        super.chanelRegistered(ctx);
        ctx.channel().attr(DECODER).set(this);
    }

    @Ovreride
    protected void decode(ChannelHandkerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if(in.refCnt() == 0 ) {
            // 协议已被提前释放
            return;
        }
        int size =  in.markReaderIndex();
        if(readableBytes < (auth ? AUTH_MIN_READABLE : NO_AUTH_MIN_READABLE)) {
            return;
        }
        in.markReaderIndex();
        int size = in.readInt();
        if(sise >= MAX_SIZE) {
            in.clrear();
            ctx.close();
            NetIpCollectUtls.incErrorPacket(ctx);
            logger.warn("{} error size {}", ctx, size);
            retrun;
        }
        if(in.readableBytes() < size) {
            in.resetReaderIndex();
            return;
        }
        short packetId = in.readShort();
        byte[] data;
        if(auth) {
            int reckey = in.readInt();
            data = new byte[size -6];
            in.readytes(data);
            if(openAuth) {
                int token = claDataToken(data);
                int selfKey = (size ^ packetId ^ index) + token;
                if(recKey != selfKey) {
                    in.clear();
                    ctx.close();
                    logger.warn("{} auth key error! packetId[{}], self[{}], rec[{}], index[{}]",
                    xtx, packetId, selfKey, recKey, index);
                    return;
                }
            }
            index += packetId;
            if(index > INDEX_MAX) {
                this.index = startIndex;
            }
        } else {
                data = new byte[size -2];
                in.readBytes(data);
        }
        WrequestPacket wp = WrequestPacket.valueOf(packetId, data);
        out.add(wp);
    }

    // 开启验证
    public void setAuth(int startIndex, boolean open) {
        this.auth = true;
        this.openAuth = open;
        this.index = startIndex;
        this.startIndex = startIndex;
    }

    // 计算数据验证值
    private int calDataToken(byte[] data) {
        int count = 0;
        for(int i=0; i< data.length; i++) {
            count += data[i] & 0xFF;
        }
        return count;
    }

 }

编码器

public class WpacketEncoder extends MeessageToByteEncoder<WresponsePacket> {
    @Override
    protected void encode(ChannelHandlerCoentext ctx, WresponsePacket msg, ByteBuf out) throws Exception {
        msg.write(out);
    }
}

核心

会话管理

@Componet
public class SessionManager {
    private static SessionManager instance;

    public SessionManager() {
        instance = this;
    }

    public static SessionManager self() {
        return instance;
    }

    // 所有会话
    private ConcurrentHashMap<ChannelId, Wsession> allSessions = new ConcurrrntHashMap<>();

    public void add(Wession session) {
        if(!allSessions.containKey(session.getChannel().id())) {
            addSession.put(session.getChannel().id(), session));
        } else {
            // 不应该进入到这里
            logger.error(String.format("channelId[%s], ip[%s]重复注册 sessionManager", session.getChannel().id().asShortText(), session.getChannel().remoteAddress()));
        }
    }

    public int ipSessionCount(String ip) {
        int count = 0;
        for(Wession session : allSessions.values()) {
            if(session.getChannel().remoteAddress().toString().contains(ip)) {
                count++;
            }
        }
        return count;
    }

    public Wsession getSession(ChannelId channelId) {
        return allSesions.get(channelId);
    }

    public Wession remove(ChannelId channelId) {
        return allSessions.get(channeId);
    }

    public Wsession remove(ChannelId id) {
        Wsession sesion = allSessions.remove(id);
        if (session != null) {
            session.notifyClose();
        }
        return session;
    }

    public ConcurrentHashMap<ChannelId, Wession> getAllSessions() {
        return allSessions;
    }

}

基础包

public class WrequestPacket {

    private int packetId;
    private byte[] data;

    public static WrequestPacket valueOf(short packetId, byte[] data) {
        WrequestPacket wp = new WrequestPacket();
        wp.setPacketId(packetId);
        wp.data =  data;
        return wp;
    }

    // get/set
}

@ProtobufClass
public class WresponsePacket {
    @Protobuf(description = "协议")
    private int packetId;
    @Protobuf(description = "数据")

    private Object debugPacket;

    public static WresponsePacket valueOf(int packetID, byte[] data) {
        WresponawPacket wp = new WresponsePacket();
        wp.setPacketId(packetId);
        wp.data = data;
        return wp;
    }

    public static WresponePacket valueOf(inr packetId, bytep[] data) {
        WresponsePacket wp = new WresponsePacket();
        wp.setPacketId(packetId);
        wp.data = data;
        return wp;
    }

    // 写入协议内容
    public void wirte(ByteBuf out) {
         // length
         out.writeInt(data.length + 2);
         out.wiriteShort(packetId);
         out.writeBytes(data);
    }

    // get/set
}

会话

public class Wsession {
    private static final AttributeKey<Object> MAIN_ENTITY = AttributeKey.valueOf("MAIN_ENTITY");
    private static final AtomicInteger SEQ = new AtomicInteger(1);

    private int id;

    private Channel channel;

    private String ip;

    private int dispatcherHashCOde;

    // 流量记录
    private FirewallRecord firewallRecord = new FirewallRecord();

    private List<CloseListener> closeListeners = new CopyOnWriteArrayList<>();

    private List<SendPacketListener> sendPacketListeners = new ArrayList<>(1);

    private List<ReceivePacketListener> receivePacketListeners = new ArrayList<>(1);

    // 连接时间
    private long connectTime;

    @FunctionalInterface
    public interface CloseListener {
        void run();
    }

    @FunctionalInterface
    public interface SendPacketListener {
        void run(Object packet);
    }

    public static Wession valueOf() {
        Wsession session = new Wsession();
        session.channel = channel;
        session.id = SEQ.incrementAndGet();
        session.dispatcherHashCode = channel.hashCode();
        String ip = channel.removeAddress().toString();
        session.ip = ip.substring(1, ip.indexOd(":"));
        session.connectTime = System.currentTimeMillis();

        if(LOGGER.isDebugEnabled()) {
            session.getSendPacketListeners().add(packet -> {
                Object realPacket = packet;
                if(realPacket instanceof WresponsePacket) {
                    realPacket = ((Wrespon) realPacket).getDebugPacket();
                }
                if(realPacket instanceof ISipDebugOut && ((ISkipDebugOut realPacket).skip()) {
                    return;
                }
                Object target = session.getMainEntity();
                LOGGER.debug(String.fotmat("---->>>%s [%s]%S",
                target == null ? channel : target,
                realPacket.getClass().getSimpleName(),
                JsonUtils.object2String(realPacket)));
            });
            session.getReceivePacketListrners().add(packet -> {
                if(packet instanceof ISkillDebugOut && ((ISkipDebugOut) packet).skip()) {
                    return;
                }
                if(packet.getClass().isAnnotationPresent(SkipDebugOut.class)) {
                    return;
                }
                Object target = session.getMainEntity();
                LOGGER.debug(String.format("<<<--%s [%s]%s",
                target == null ? channel : target,
                packet.getClass().getSumpleName(),
                JsonUtils.object2String( packet)));
            });
        }
        return session;
    }

    private Wsession() {

    }

    public <T> getAttr(AttributeKey<T> key) {
        Attribute<T> attr = channel.attr(key);
        return attr == null ? null : attr.get();
    }

    public <T> void setAttr(AttributeKey<T> key, T t) {
        channel.attr(key).set(t);
    }

    // 绑定玩家
    public void bindPlayer(Object player) {
        setAttr(PLAYER, null);
    }
    // 清除玩家绑定
    public void unBindPlayer() {
        setAttr(PLAYER, null);
    }

    @SuppressWarnings("unchecked")
    public <Player> Player getPlayer() {
        return (Player) getAttr(Player);
    }

    // set/get

    // 当dispatcherHashCode没有初始化时选择channel的hashcode作为分发code

    // 业务消息推送
    public ChannelFuture sendPacket(Object packet) {
        ChannelFuture future = SocketPacketHandler.getIsntance().sendPacket(this, channel, packet, flusNow);
        for (SendPacketListener listener : sendPacketListeners) {
            listener.run(packet);
        }
        return future;
    }

    // 接收到协议触发
    public void onReceivePacket(Object pacekt) {        
        if(receivePacketListeners.isEmpty()) {
            return;
        }
        try {
            receivePacketListeners.forEach(receivePacketListener -> receivePacketListener.run(packet));
        } catch(Exception e) {
            LOGGER.error("receivePacketListener error!", e);
        }
    }   

    private AtomicBoolean flushTimer = new AtomicBoolean(false);

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = prime * result + id;
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if(this == obj) {
            return true;
        }
        if(obj == null) {
            return false;
        }
        if(getClass() != obj.getClass()) {
            return false;
        }
        Wsession other = (Wsession) obj;
        if(id != other.id) {
            return false;
        }
        return true;
    }

    // toString()


    // 通知

    public void notiftClose() {
        for(CloseListenr listener : closeListeners) {
            try {
                listener.run();
            } catch(Exception e) {
                LOGGER.error("session CloseListener异常", e);
            }
        }
    }


}

总结

  1. SocketPacket注释标识的类会被认为是协议类,在SocketPacketHander的初始化时会根据协议类编译对应的生成JProtobuf协议类Codec,并将packetId,packetClass,Codec的关系储存在起来。

  2. SocketMethod是接受请求的声明,在Spring bean 加载启动中

背包学习笔记

概述

从储存结构的主键上来说,背包可以可以一玩家id为主健,可以以物品道具唯一id为主健。在RPG项目和SLG项目中并没有太大不同

储存结构 PlayerPackEntity

主健: 玩家唯一id

Map<PackType, ItemStorage> 一个背包类型跟随一些道具数据储存

背包道具容器领域类 ItemStorage :

背包类型 PackType packType

格子位置 —> 背包格子道具 Int2ObjecArrayMap positionPackItemMap

道具配置ID —> 背包格子位置集合(增删,重新加载数据时,同步维护索引) Map<Integer, Set> itemKeyPositionSetMap

索引:唯一道具ID —> 格子位置标识

缓存持久化系统学习

概述

common-ramcache是一个用于缓存和持久化的组件,可以随Spring一起工作。

注解

@Retentiion(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface Cache {

    /** 缓存管理类型 */
    CacheType type() default CacheType.LRU;

    /** 缓存单位 */
    CacheUnit unit() default CacheUnit.ENTITY;

}

持久化

持久化处理器接口

public interface Presister {
    // 初始化方法
    void initialize();
    // 将指定元素插入此队列中
    void put(Element element);
    // 添加监听器
    void addListener(Class<? extends AbstractEntity> clz, Listenrt listener);
    // 获取监听器
    Listener getListener(Class<? extends AbstractEntity> clz, Listener listener);
    // 停止更新队列并等待全部入库完成
    void shutdown();

    // 获取当前的状态信息
    Map<String, String> getInfo();

}

队列更新结果监听器

public abstract class AbstractListener implements Listener {
    // 队列更新的回调
    void notify(EventType type, boolean isSuccesss, Serializable id, AbstractEntity entity, RuntimeException ex);
}

抽象监听器用于简化监听器的开发

public abstract class AbstractListener implements Listener {

    @Override
    public void notify() {
        try {
            if(isSuccess) {
                switch(type) {
                    case INSERT:
                        onSaveSuccess(id, entity);
                        break;
                    case UPDATE:
                        onUpdateSuccess(entity);
                        break;
                    case DELETE:
                        onRemoveSuccess(id);
                        break;
                    deafault: 
                        logger.error("未支持的更新事件类型[{}]", type);
                }
            } else {
                switch(type) {
                    case INSERT:
                        onSeveError(id, entity, ex);
                        break;
                    case UPDATE:
                        onUpdateError(entity, ex);
                        break;
                    case DELETE:
                        onRemoveError(id, ex);
                        break;
                }
            }
        } catch (Exception e) {
            logger.error("队列监听器[{}]处理出现异常", new Obeject[] {this.getClass().getName(), e});
        }
    }

    // 实体保存成功时的回调,需要则覆盖
    protected void onSaveSuccess(Serializable id, AbstractEntity entity) {

    }

    // 实体更新成功时的回调,需要则覆盖
    protected void onUpdateSuccess(AbstractEntity entity) {

    }
    // 实体删除成功的回调,需要则覆盖
    protected void onRemoveSuccess(Serializable id) {

    }
    // 实体保存失败时的回调,需要则覆盖
    protected void onSaveError(Serializable id, AbstractEntity entity, RuntimeException ex) {

    }
    // 实体更新失败时的回调,则需要覆盖
    protected onUpdateError(AbstractEntity entity, RuntimeException ex) {

    }
    // 实体更新失败时的回调,则需要覆盖
    protected onUpdateError(AbstractEntity entity, RuntimeException ex) {

    }
    // 实体删除失败时的回调,需要则覆盖
    protected void onRemoveError(Serializable id, RuntimeException ex) {

    }
}

实体更新事件类型定义

public enum EventType {
    // 插入
    INSERT,
    // 更新
    UPDATE,
    // 删除
    DELETE;
}

队列元素

```java
public class Element {

    // 构造插入元素
    public static Elment saveOf(AbstractEntity entity) {
        return new Element(EventType.INSERT, entity.getId(), entity, entity.getClass());
    }

   // 构造删除元素
    public static Element removeOf(Serializable id, Class<? extends AbstractEntity entityClass>) {
        return new Element(EventType.DELETE, id, null, entityClass);
    }

    // 构造更新元素
    public static Element updateOf(AbstractEntity entity) {
        return new Element(EventType.UPDATE, entity.getId(), entity);
    }

    // 事件类型(不为null)
    private EventType type;
    // 实体主键(不为null)
    private final Serializable id;
    // 实体实例(删除时会为null)
    private AbstractEntity entity;
    // 实体类型(不会为null)
    private final Class<? extends AbstractEntity> entityClass;

    // 构造方法
    private Element(EventType type, Serializable id, AbstractEntity entity, Class<? extends AbstractEntity> entityClass) {
        this.type = type;
        this.id = id;
        this.entity = entity;
        this.entityClass = entityClass;
    }

    // 获取标识
    public String getIdentity() {
        return entiyClass.getName() + ":" + id;
    }

    // 更新队列元素的状态
    // 该方法不假设更新元素是同一个元素,因此元素判断要在使用该方法前处理
    // true: 需要保留 . fale:
    public boolean update(Element element) {
        entity = elemnt.getEntity();
        switch(type) {
            // 之前的状态为SAVE
            case INSERT:
                switch(element.getType()) {
                    // 当前的状态
                    case INSERT:
                        logger.error("更新元素异常,实体[{}]原状态[{}]当前状态[{}]不进行修改",getIdentity(), type, elment.getType());
                        break;
                    case UPDATE:
                        if(logger.isDebgEnabled()) {
                            logger.error("实体[{}]原状态[{}]当前状态[{}]修正后状态[{}]是否保留队列元素[{}]",getIdentity(),EventType.INSERT, elment.getType()
                        }
                        break;
                    case DELETE:
                        if(logger.isDebugenabled()) {
                            logger.error("实体[{}]原状态[{}]当前状态[{}]不进行修改",getIdentity(), type, elment.getType());
                        }
                    return false;
                    default :
                }
                break;
                // 之前的状态为UPDATE
                case UPDATE:
                    switch(element.getType()) {
                       case INSERT:
                        logger.error("更新元素异常,实体[{}]原状态[{}]当前状态[{}]不进行修改",getIdentity(), type, elment.getType());
                        break;
                    case UPDATE:
                        if(logger.isDebgEnabled()) {
                            logger.error("实体[{}]原状态[{}]当前状态[{}]修正后状态[{}]是否保留队列元素[{}]",getIdentity(),, elment.getType());
                        }
                        break;
                    case DELETE:
                        if(logger.isDebugenabled()) {
                            logger.error("实体[{}]原状态[{}]当前状态[{}]不进行修改",getIdentity(), type, elment.getType());
                        }
                        break;
                        default : 
                    } 
                case DELETE:
                    switch(element.getType()) {
                       case INSERT:
                        logger.error("更新元素异常,实体[{}]原状态[{}]当前状态[{}]不进行修改",getIdentity(), type, elment.getType());
                        break;
                        case UPDATE:
                            if(logger.isDebgEnabled()) {
                                logger.error("实体[{}]原状态[{}]当前状态[{}]修正后状态[{}]是否保留队列元素[{}]",getIdentity(),, elment.getType());
                            }
                            break;
                        case DELETE:
                            if(logger.isDebugenabled()) {
                                logger.error("实体[{}]原状态[{}]当前状态[{}]不进行修改",getIdentity(), type, elment.getType());
                            }
                            break;
                            default :
                    }
                    break;
                default:
                }
            return true;
    }


    // Getter anf  Setter...
}

定时持久化处理器。该持久化处理器会以CRON表达指定的时间进行入库操作,未达到入库时间点的持久化操作会进行累计并去重

public class TimingPersister implements Presister {

    // 正在等待更新的信息缓存
    private ConcurrentHashMap<String, Element> elements = new ConcurrentHashMap<>();
    // 对应实体的处理监听器
    private ConcurrentHashMap<Class<? extends AbstractEntity>, Listener> listeners = new ConcurrentHashMap<>();
    // 初始化标识
    private boolean initialize;
    // 初始化
    private TimingConsumer consumer;

    // 实现接口的方法

    // 初始化方法
    @OVerride
    public synchronized void initialize(String name, Accessor, String period) {
        if(initialize) {
            throw new ConfigurationException("重复初始化异常");
        }
        Assert.notNull(accessor, "持久层数据访问器不能为 null");
        try {
            this.elements = new ConcurrentHashMap<>();
            this.consumer = new TimingConsumer(name, period, accessor, this);
            initiallize = true;
        } catch (Exception e) {
            throw new ConfigurationException("定时持久化处理器["+name+"]初始化失败"+e.getMessage());
        }
    }

    // 添加监听器
    @Override
    public void addListener(Class<? extends AbstractEntity> clz, Listenrt listener) {
        if(listener == null) {
            throw new ConfigurationException("被添加的监听器实例不能未空");
        }
        listenner.put(clz,listener);
    }
    // 获取监听器
    public Listener getListener(Class<? extends AbstractEntity> clz) {
        return listeners.get(clz);
    }
    // 指定元素插入此队列中,将等待可用的空间(如果有必要)
    @Override
    public void put(Element element) {
        if(element == null) {
            return;
        }
        if(stop) {
            FormattingTuple message = MessageFormatter.format("实体更新队列已经停止,更新元素[{}]将不被接受",element);
            logger.error(message.getMessage());
            throw new StateException(message.getMessage());
        }
    }
    String id = elment.getIdentity();
    rwLock.readLock().lock();
    Lock lock = lockIdLock(id);
    try {
        Element prev = elements.get(id);
        // 更新元素不存在的场景
        if(perv == null) {
            eleement.put(id, element);
            return;
        }
        // 更新元素已经存在的场景
        EventType prevType = prev.getType();
        if(!prev.update(elemnt)) {
            elements.remove(id);
        } else {
            // 当从REMOVE合并为UPDATE的时候要让监听器通知缓存服务将内部的临时失效主键清除
            if(prevType == EventType.DELETE && prev.getType() == EventType.UPDATE) {
                Listener listener = getListener(element.getEntityClass());
                if(listenner != null) {
                    listener.notify(EventType.DELETE, true, prev.getId(), null, null);
                }
            }
        } finally {
            releaseIdLock(id, lock);
            rwLock.readLock().unlock();
        }
    }

    @Override
    public Map<String, String> getInfo() {
        HashNap<String, String> result = new HashMap<>();
        result.put("size",Integer.toString(size()));
        result.put("state", consumer.getState().name());
        result.put("nextTime", DateUtils.formatl(concumer.getNextTime()));
        return result;
    }

    //自身的方法

    private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();

    Conllection<Element> clearElements()  {
        rwLock.writeLock().lock();
        try {
            ArrayList<Element> result = new ArrayList<>(elments.values());
            elements.clear();
            return result;
        } finally {
            rwLock.writeLock().unlock();
        }
    }

    // 获取队列中的元素数量
    public int size() {
        return elements.size();
    }

    // 停止状态
    private volatile boolean stop;

    // 停止更新队列并等待全部入库完成
    @Override
    public void shutdown() {
        stop = ture;
        consumer.stop();
        while(consumer.getState() != TimingConsumerState.STOPPED) {
            Thread.yield();
        }
    }

    public void flush() {
        consumer.flush();
    }

    public TimingConsumer getConsumer() {
        return consumer;
    }

    // 内同方法

    // 获取标识锁对象
    private Lock lockIdLock(String id) {
        // 获取当前的主键写锁
        ReentrantLock lock = locaks.get(id);
        if(lock == null) {
            lock = new ReentrantLock();
            ReentrantLock prevLock = locaks.putIfAbsent(id, lock);
            lock = prevLock != null ? prevLock : lock;
        }
        lock.lock();
        return lock;
    }

    // 释放锁标志锁
    private void releaseIdLock(String id, Lock lock) {
        lock.unlock();
        locks.remove(id);
    }

}

消费者

public class TimingConsumer implements Runnable  {
    private static Profile = new Profile();
    // 更新队列名
    private String name;
    // 入库间隔
    private int period;
    // 持久层的存储器
    private Accessor accessor;
    // 实体持久化缓存
    private TimingPersister owner;
    // 当前锁对象
    private final Object = new Object();

    // 状态
    private AtomicReference<TimingConsumerState> state = new AtomicReference<>(TimingConsumerStatee.WAITING);

    private volatile boolean stoped;
    // 下次执行的时间
    private Date nextTime;
    // 错误计数器
    private AtomicInteger error = new AtomicInteger();
    // 消费定时任务
    private static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;

    static {
        ThreadGrop group = new ThreadGroup("定时任务");
        NamedThreadFactory threadFactory = new NamedThredFactory(group,"定时储存任务");
        sheduledThreadPoolExcutor = new ScheduledThreadPoolExecutor(50,threadFactory);
    }

    public static void shutdownExecutor {
        if(scheduledThreadPoolExecutor != null) {
            scheduledThreadPoolExecutor.shutdown();
        }
    }

    public TimingConsuer() {
        this.name = name;
        this.period = Integer.valueOf(period);
        this.accessor = accessor;
        this.owner = owner;
        scheduledThreadPoolExecutor.scheduleFixedRate(this, 10L, this.period, TimeUnit.SECONDS);
    }

    @Override
    public void run() {
        if(stoped) {
            return;
        }
        try {
            synchronized(lock) {
                Collection<Element> elements = null;
                state.compareAndSet(TImgConsumerState.WAITING, TimingConsumerState.RUNNING);
                Date satart = new Date();
                if(logger.isDebugEnabled()) {
                    logger.debug("定时入库[{}]开始[{}]执行", name, DateUtils.formatl(start));
                }
                if(elements.isEmpty()) {
                    state.compareAndSet(TimingConsumerState.RUNNING, TingingConsumerState.WAITING);
                    return;
                }
                presist(elements);
                if(logger.isDebugEnabled()) {
                    logger.debug("定时入库[{}]入库[{}]条数据耗时[{}]", name, elements.size(),
                    System.currentTimeNillis() - start.getTime());
                }
                statat.compareAndSet(TimingConsumerState.RUNNING, TimingConsumerState.WAITING);
            }
        } catch (Throwable e) {
            logger.error("Timing执行异常", e);
            state.compareAndSet(TimingConsumerState.RUNNING, TimingCOnsumerState.WAITING);
        }
    }

    // 性能统计信息
    public static Profile collectTaskProfile(boolean rest) {
        Profile profile = TimingConsumer.profile;
        TimingConsumer.profile = new Profile();
        return profile;
    }

    // 记录失败数据
    private void recordFail(List<AbstractEntity> failList) {
        if(failList.isEmpty()) {
            return;
        }
        for(AbstractEntity abstractEntity : failList) {
            logger.error("{} entity {} update fail!", abstractEntity.getClass().getSimpleName(),
            abstractEntity.getId());
            logger.error(JsonUtils.object2String(abstractEntity));
        }
    }

    public TimingConsumerState getState() {
        return state.get();
    }

    public void stop() {
        if(logger.isDebugEnabled()) {
            logger.debug("定时入库[{}]收到停止通知",name);
        }

        synchronized(lock) {
            stop = true;
            Collection<Element> elements  = owner.clearElements();
            persist(elements);
            for(;;) {
                if() {
                   if() {
                       if(state.commpareAndSet(TimingConsumerState.WAITING, TimingConsumerState.STOPPED)) {
                           return;
                       }
                       try{
                           Thread.sleep(1000);
                       } catch(InterrupteException e) {
                           logger.error("停止队列被中断", e);
                       }
                   }
                }
            }
        }
    }

    // 立刻提交储存
    public void flush() {
        run();
    }

    public Date getNextTime() {
        return nextTime;
    }

    public int getError() {
        return error.get();
    }

    private void persist(Collection<Element> elements) {
        List<AbstractEntity> saveList = new ArrayList<>(500);
        List<AbstractEntity> updateList = new ArrayList<>(500);
        List<AbstractEntity> deleteList = new LinkedList<>();

        for(Element element : elements) {
            AbstractEntity entity = element.getEntity();
            try {
                profile.getOrCreateProInfo(elment.getEntityClass().getSimpleName()).recordSubmit();
                Class clz = element.getEntityClass();
                switch(element.getType()) {
                    case INSERT:
                        saveList.add(entity);
                        break;
                    case DELETE:
                        accesor.remove(clz, element.getId());
                        break;
                    casse UPDATE:
                        updateList.add(entity);
                        break;
                    default;
                }
                Listener listener = owener.getListener(clz);
                if(listener != null) {
                    listener.notify(element.getType(), true, element.getId(), entity, e);
                }
            } catch (RuntimeException e) {
                error.getAndIncrement();
                if(elment == null) {
                    logger.error("获取更新队列元素时线程被非法打断",e);
                } else {
                    logger.error("更新队列处理出现未知异常", e);
                }
            }
        }
    }


    List<AbstractEntity> fialList = accessor.batchSave(saveList);
    recordFail(failList);

    failList = accessor.batchUpdate(updateList);
    recordFail(failList);

    failList = accessor.batchDelete(deleteList);
    recordFail(failList);

}

队列消费者

public class QueueConsumer implements Runnable {
    // 更新队列名
    private final String name;
    // 更新队列
    private final BlockingQueue<Elemnt> queue;
    // 持久层的储存器
    private final Accessor accessor;
    // 所有者
    private final QueuePersister owner;
    // 当前消费者线程自身
    private final Thread me;
    // 错误计数器
    private final AtomicInteger error = new AtomicInteger();

    public QueueConsumer(String name, Acceossor accessor, BlockingQueue<Element> queue, QueuePersister owner) {
        this.name = name;
        this.accessor = accessor;
        this.queue = queue;
        this.owner = owner;
        this.me = new Thread(this, "持久化["+ name+ "队列]");
        me.setDaemon(true);
        me.start();
    }

    @Override
    public void run() {
        while(true) {
            Element element = null;
            Class clz = null;
            try {
                element = queue.take();
                clz = elment.getEntityClass();
                switch(element.getType()) {
                    case INSERT :
                        accessor.save(clz, element.getEntity());
                        break;
                    case DELETE :
                        accessor.remove(clz, elment.getId());
                        brea;
                    case UPDATE :
                        // 解除抑制
                        owner.removeUpdating(element.getIdentity());
                        accessor.update(clz, element.getEntity());
                        break;
                    deafault :
                    logger.error("未支持的更新队列元素类型[{}",element);
                    break;
                }
                Listener listener = owner.getListener(clz);
                if(listener != null) {
                    listener.notify(element.getType(), true, element.getId(), element.getEntity(), null;
                }
            } catch (RuntimeException e) {
                error.incrementAndGet();
                if(logger.isWareEnabled()) {
                    logger.warn("实体更新队列[{}]处理元素[{}]时出现异常: {}",new Obeject[] {name, element, e.getMessage()});
                }
                Listener listener = owner.getListener(clz);
                if(listener != null) {
                    listener.notify(element.getType(), false, element.getId(), element.getId(), element.getEntity(), e);
                }
            } catch (Throwable e) {
                error.incrementAndGet();
                if(element == null) {
                    logger.error("获取更新队列元素时线程被非法打断", e);
                } else {
                    logger.error("更新队列处理出现未知异常", e);
                }
            }

        }
    }

    public int getError() {
        return error.get();
    }
}

服务逻辑

实体增强服务接口

public interface EnhanceEnhanceService<PK extends Comparable<PK> & Serializable, T extends AbstractEntity<PK>> extends 
EnhanceService<PK,T> {

    // 检查指定的唯一属性域值是否存在
    boolean hasUniqueValue(String name, Object value);

    // 替换指定实体标识对应的唯一属性缓存值
    void replaceUniqueValue(PK id, String name, Object value);

}

@Transactional(rollbackFor = Exception.class)
public class EntityCacheServiceImpl<PK ezxtends Comparable<PK> & Serialzable, T extends AbstractEntity<PK>> implements 
EntityCacheService<PK, T>, CacheFinder<PK, T>, EntityEnhanceService<PK, T> {

    // 初始化标识
    private boolean initialize;
    // 实体类型
    private Class<T> entityClz;
    // 缓存实体配置信息
    private CachedEntityConfig config;
    // 存储器
    private Accessor accessor;
    // 查询器
    private Querier querier;
    // 持久化缓存器
    private Persister persister;
    // 实体缓存
    private Cache<PK,T> cache;
    // 主键缓存
    private ConcurrentHashSet<PK> biyPrimarKeys;
    // 初始化方法
    public synchronized void initialize(CacheEntityConfig config, Persister persister, Accessor accesoor, Querier querier) {
        if(initialize) {
            throw new StateException("重复初始化异常");
        }
        // 初始化属性域
        initFields(config, persister, accessor, querier);
        // 缓存初始化处理
        initCaches(config, querier);
        // 缓存主键
        cacheBitPremaryKey(querier);
        this.initialize = ture;
    }

    // 实体操作读写锁
    private ConcurrentHashMap<PK, ReentrantLock> locks = new ConcurrentHashMap<>();
    // 正在被移出的实体的主键集合
    private ConcurrentHashSet<PK> removing = new ConcurrentHashSet<>();
    // 唯一键缓存(key: 唯一键名, value:{key:唯一键值, value:PK})
    private HashMap<String, HashBiMap> uniques;

    @Override
    public T getMaybeNull(PK id) {
        return cache.getIfPresent(id);
    }
    @Overide
    public T load(PK id) {
        unInitializeThrowException();
        if(isMaybeNotExist(id)) {
            // 不存在的主键
            return null;
        }
        // 想判断主键是否有效
        if(!removing.contains(id)) {
            // 尝试从缓存中获取
            T current = (T) cache.getIfPresent(id);
            if(current != null) {
                return current;
            }
        }
        // 抑制并发错误
        T currnent = null;
        // 获取主键锁,抑制并发操作
        Lock lock = lockPkLock(id);
        try {
            // 先判断主键是否有效
            if(removing.contains(id)) {
                return null;
            }
            // 尝试从缓存中获取
            current = cache.getIfPresent(id);
            if(current != null) {
                // 尝试反序列化
                current.unSeralize();
                if(config.hasUniqueField())
                    // 添加唯一属性缓存信息
                    Map<String, Object> uniqueValues = config.getUniqueValues(current);
                    for(Entry<String, Obeject> entry : uniqueValues.entrySet()) { 
                        addUniqueValue(entry.getKey(), entry.getValue(), id);
                    }
            }
            cache.put(id, current);
        } finally {
            // 释放主键锁
            releasePkLock(id, lock);
        }
        return current;
    }

    // 实体增强相关的方法

    @Override
    public CacheFinder<PK, T> getFinder() {
        unInitializeThrowException();
        return this;
    }

    //....

    @Override
    public Set<T> add() {
        unInitializeThrowException();
        HashSet<T> result = new HashSet<>(cache.asMap().values());
        return result;
    }

    // 管理相关的方法实现


    @Override
    public CacheEntryConfig getEntityConfig() {
        unInitializeThrowException();
        return config;
    }

    @Override
    public Persister getPersister() {
        return persister;
    }

    @Override
    public void shutdown() {
        cache.asMap().values().foreach(t -> {
            if(t.getSumitMark().compareAndSet(true, false)) {
                // 序列化
                try {

                } catch (Exception e) {
                    logger.error("entity serialize error!", e);
                    return;
                }
                // 储存
                writeBack();
            }
        });
    }

    // 内部方法

    // 添加指定唯一属性值与主键的对应关系(属性名的写锁保护)
    private void initFields(final CachedEntityConfig, final Persister persiter, Accessor accessor, Querier querier) {
        Cached cached = config.getCached();
        // 初始化属性域
        this.config = config;
        this.accessor = accessor;
        this.querier = querier;
        this.enrityClz = querier;
        this.entityClz = (Class<T>) config.getClz();
        this.persister = persister;
        this.persister.addListener(entityClz, new AbstractListener {
            @Override
            protected void onRemoveSuccess(Serializable id) {
                removing.remove(id);
            }
            @Override
            protected void onRemoveErroe() {
                removing.remove(id);
            }
            @Override
            protected void onSaveError() {
                if(ex instanceof ConcurrentModificationException) {
                    persister.put(Element.saveOf(entity));
                }
            }
            @Override
            protected void onUpdateError(AbstractEntity entity, RuntimeException ex) {
                if(ex instanceof ConcurrentModificationException) {

                }
            }
        });

        // 初始化唯一键信息
        if(config.hasUniqueFiled()) {
            this.uniques = config.buildUniqueCache();
        }
        // 初始化实体缓存空间
        switch(cached.type()) {
            case LRU:
                if(config.hasUniqueField()) {
                    this.cache = CacheBuilder.newBuilder().removalListener((RemovalListener<PK, T>) notification -> {
                        for(Entry<String, HashBiMap> entry : uniques.entrySet()) {
                            WriteLock lock = config.getUniqueWriteLock(entry.getKey());
                            lock.lock();
                            try {
                                entry.getValue().inverse().remove(notification.getValue().getId());
                            } finally {
                                lock.unlock();
                            }
                        }
                    }).weakValues().build();
                } else {
                    this.cache = CacheBuilder.newBuidler().weakValues().build();
                }
                break;
            case MANUAL:
                this.cache = CacheBuilder.newBuilder().maximumSize(Integer.MAX_VALUE).build();
                break;
            default:
                throw new ConfigurationException("未支持的缓存管理类型["+ cached.type()+ "]");
        }
    }

    // 缓存主键
    private void cacheBitPrimaryKey(Querier querier) {
        int count = querier.count(entityClz);
        if(count > 100) {
            // 数;据量打,不缓存主键了
            return;
        }
        // 初始化主键索引,避免新开服大量无效的select操作
        List<T> all = querier.all(entityClz);
        bitPrimaryKeys = new ConcirrentHashSet<>();
        for(T t : all) {
            bitPrimaryKeys.add(t.getId());
        }
    }

    // 检测主键缓存存在的情况,是否存在主键
    private boolean isMaybeNotExist(PK pk) {git
        if(bitPrimarykeys == null) {
            // 没有做缓存操作
            return false;
        }
        return !bitPrimaryKeys.contains(pk);
    }

    // 尝试缓存主键,如果没有做过缓存,不用缓存例如,如果已经做过缓存,则缓存新创建的主键
    private void tryCacheBitPrimaryKey(Pk pk) {
        if(bitprimaryKeys == null) {
            // 没有缓存操作
            return;
        }
        bitPrimaryKeys.add(pk);
    }

    // 初始化缓存数据
    private void initCache(CacheEntityConfig config, Querier querier) {
        InitialConfig initial = config.getInitialConfig();
        if(initial == null) {
            return;
        }        
        // 获取要初始化的实体列表
        List<T> entities = null;
        switch(initial.type()) {
            case ALL:
                entities = querier.all(entityClz);
                break;
            case QUERY:
                entities = querier.list(entityClz, initial.query());
                break;
            default:
                throw new ConfigurationException("无法按配置["+ initial + "]初始化实体["+this.entityClz.getName()+"]的缓存");
        }
        // 初始化缓存数据
        for(T entity : entities) {
            // 尝试初始化
            entity,unSerialize();
            PK id = entity.getId();
            cache.put(id, entity);
            if(config.hasUniqueField()) {
                Map<String, Obeject> uniqueValues = config.getUniqueValues(entity);
                // 更新唯一键索引
                for(Entry<String, Object> entry : uniqueValues.entrySet()) { 
                    HashBiMap unique = uniques.get(entry.getKey();
                    unique.put(entry.getValue(), id);
                }
            }
        }
    }

    @Override
    public long getAllSzie {
        return this.cache.size()
    }
    @Override
    public T create(PK id, EntityBuider<PK, T> builder) {
        unInitializeThrowException();
        // 先判断主键是否有效
        if(!removing.contains(id)) {
            // 尝试从缓存中获取
            T current = (T) cache.getIfPresent(id);
            if(current != null) {
                return current;
            }
        }
        // 抑制并发操作
        T current = null;
        // 获取主键锁,抑制并发操作
        Lock lock = lockPkLock(id);
        try {
            // 尝试从缓存中获取
            current = (T) cache.getIfPresent(id);
            if(currnt != null) {
                return current;
            }
            boolean flag = removing.contains(id);
            // 创建实例
            current = builder.newInstance(id);
            if(current == null) {
                throe new InvaildEntityException("创建返回null");
            }
            if(current.getId() == null) {
                 throe new InvaildEntityException("创建的主键为null");
            }
            if(config.hasUniqueFiled()) {
                // 检查唯一属性性值是否合法
                Map<String,, Object> uniqueValues = config.getUniqueValues(current);
                // 迭代每一个唯一属性域
                for(Entry<String, Object> entry : uniqueValues.entrySet()) {
                    // 唯一属性域名
                    String uniqueName = entry。getKey();
                    // 唯一属性域值
                    Obejct uniqueValue = entry.getValue();
                    WriteLock uniqueLock = config.getUniqueWriteLock(uniqueName);
                    uniqueLcok.lock();
                    try {
                        // 检查缓存数据
                        HashBiMap unique = uniques.get(uniqueName);
                        // 更新缓存数据
                        PK prev = (PK) unique.put(uniqueValue, id);
                        if(prev != null) {
                            logger.error("实体主键异常");
                        }
                    } finally {
                        uniqueLock,unlock();
                    }
                }
            }
            if(flag) {
                removing.remove(id);
            }

            // 创建ok了序列化一次
            current.trySerialize();
            //尝试缓存主键
            tryCacheBitPrimaryKey(id);
            // 异步持久化
            persister.put(Element.saveOf(current));
            cache.put(id , current);
            return current;
        } finally {
            // 释放主键锁
            releasePkLock(id, lock);
        }

    }


}

对象锁对象

/**排序顺序说明:

1. 非实体在前,实体{@link AbstractEntity}在后
2. 不同类型的锁对象,按类型{@link Class}的hashCode值的大小进行排序
3. 不同类型的锁对象,当不幸遇到hashCode值相同的情况,用完整类名做字符串排序
4. 类型相同时,更具</code>排序依据</code>进行排序
5. <code>排序依据</code>对于非实体而言,为<code>System.identityHashCode(instance)</code>
6. <code>排序依据</code>对于实体而言,为{@link AbsteactEntity#getIdentity}
*
**/
public class ObjectLock extends ReentrantLock implements Comparable<ObjectLock> {

    private static final Class<AbstractEntity> IENTIT_CLASS = AbstractEntity.class;

    // 锁定对象的类型
    private final Class clz;
    // 锁的排序依据
    private final Comparable value;
    // 该对象锁所锁定的是否实体

    // 构造指定对象的对象锁
    public ObejctLock(Object object) {
        this(object, false);
    }
    // 构造指定对象的对象锁
    public ObejectLock(Obejct object, boolean fair) {
        super(fair);
        clz = obeject.getClass();
        if(object instanceof AbstractEntity) {
            value = (Comparable) ((AbstractEntity) object).getId();
        } else {
            value = new Integer(System.identityHashCode(object));
        }
        if(IENTITY_CLASS.isAssignableFrom(clz)) {
            entity = true;
        } else {
            entity = false;
        }
    }

    /** 检查当前锁是否无法和另一锁分出先后次序
        @param other 与当前锁比较的另一锁
    */
    public boolean isTie(Obeject other) {
        if(this.clz != other.clz) {
            return false;
        }
        if(this.value.compareTo(other.value) == 0) {
            return true;
        }
        return false;
    }

    // 锁定对象的类型
    public Class getClz() {
        return clz;
    }
    // 获取排序依据
    public Comparable getValue() {
        return value;
    }

    // 检查对象锁所锁定的是否实体
    public boolean isEntity() {
        return entity;
    }

    // 检查对象锁所在的是否实体
    public boolean isEntity() {
        return entity;
    }

    // 检查该对象锁所锁定的是否实体
    public boolean isEntity() {
        return entity;
    }

    @Override
    public int compareTo(ObejectLock o) {
        // 实体和非实体间的排序
        if(this.isEntity() && !o.isEntity()) {
            return 1;
        } else if(!this.isEntity() &&o.isEntity()) {
            return -1;
        }
        if(this.clz != o.clz) {
            // 类型不同的排序
            if(this.clz.hashCode() < o.clz.hashCode()) {
                return -1;
            } else if (this.hashCode() > o.clz.hashCode()) {
                return 1;
            }
            return this.clz.getName().compareTo(o.clz.getName());
        } else {
            // 类型相同的处理
            return this.value.compareTo(o.value);
        }
    }

}

对象映射

查询接口

/**
*  物理存储层的数据查询器接口
*   用于给不同的ORM底层实现
*/
public interface Querier {
    // 表数据条数
    <T> int count(Class<T clz);
    // 获取全部实体
    <T> List<T> all(Class<T> clz);
    // 获取指定命名查询的查询结果(返回实体列表)
    <T> List<T> list(Class<T> clz, String queryname, Object... params);
    // 获取指定命名的查询结果(返回投影内容列表)
    <E> List<E> list(Class entityClz, Class<E> retClz, String queryname, Object... params);
    // 获取指定命名查询的查询结果(返回单一的实体对象实例)
    <T> T unique(Class<T> clz, String quertname, Obeject... params);
    // 获取指定命名查询的查询结果(返回单一的投影内容)
    <E> E unique(Class entityClz, Class<E> retClz, String queryname, Obejct... params);
    // 分页查询(返回实体列表)
    <T> List<T> paging(Class<T> clz, String queryname, Paging paging, Object... params);
    // 分页查询(返回投影内容列表)
    <T> List<E> paging(Class entityClz, Class<E> retClz, String queryname, Paging paging, Object... params);
    // 删除数据,慎用
    <T> int delete<Class<T> clz, final String queryname, final Object... params>);

}

数据访问器接口

/**
物理存储层数据访问器接口
用于给不同的ORM底层实现
*/

public interface Accessor {
    // 从存储层加载指定的实体对象实例
    <PK extends Serializable, T extends AbstractEntity> T load(Class<T> clz, PK id);

    // 持久化指定的实体实例,并返回实体的主键值对象
    <Pk extends Serializable, T extends AbstractEntity> void save(Class<T> clz, T entity);

    // 从存储层移除指定实体
    <PK extends Serializable, T extends AbstractEntity> void remove(Class<T> clz, PK id);
    // 更新储存层的实体数据(不允许更新实体的主键值)
    <PK extends Serializable, T extends AbstractEntity> void update(Class<T> clz, T entity);
    // 批量储存
    <PK extends Serializable, T extends AbstractEntity> List<T> batchSave(final List<T> entitys);
    // 批量更新
    <PK extends Serializable, T extends AbstractEntity>  List<T>  batchUpdate(final List<T> entitys);
    // 批量删除 
    // @return 失败数据集合
    <Pk extends Serializable, T extends AbstractEntity> List<T> batchDelete(final List<T> entitys);
}

Accessor的Hibernate实现

public class HibernateBAccessor extends HibernateDaoSupport implements Accessor {
    private IHibernateBeforeLoad hibeenateBeforeLoad;

    @Override
    public <PK extends Serializable, T extends AbstractEntity> T load(Class<T clz, PK id) {
        if(hibeenateBeforeLoad != null) {
            hibeenateBeforeLoad.beforeLoad(clz, id);
        }
        return getHibernateTemplate().execute(session -> {
            session.beginTransation();
            T t = session.get(clz, id);
            session.getTransaction().commit();
            return t;
        });
    }

    @Override
    public <PK extends Serialization, T extends AbstractEntity>  void save(Class<T> clz, T entity) {
        batchExc(Collections.singletonList(entity), Session::save);
    }
    @Override
    public <Pk extends Serializable, T extends AbstractEntity> void remove(Class<T> clz, T entity){
        T entity = load(clz, id);
        if(entity == null) {
            return;
        }
        batchExc(Collections.singletonList(entity), Session::delete);
    }
        @Override
    public <PK extends Serialization, T extends AbstractEntity>  void save(Class<T> clz, T entity) {
        batchExc(Collections.singletonList(entity), Session::update);
    }
    @Override
    public <PK extends Serializable, T extends AbstractEntity> List<T> batchSave(final List<T> entitys) {
        batchExc(ntitys, Session::save);
    }
    @Override
    public <PK extends Serializable, T extends AbstractEntity> List<T> batchUpdate(final List<T> entitys) {
        batchExc(entitys, Session::update);
    }
    @Override
    public <PK extends Serializable, T extends AbstractEntity> List<T> batchDelete(final List<T> entitys) {
        batchExc(entitys, Session::delete);
    }

    privte <PK extends Serializable, T extends AbstractEntity>  List<T> batchExc(List<T> entitys, BitConsumer<Session, T> consumer) {
        if(entitys.isEmpty()) {
            return Collections.emptyList();
        }
        List<T> failList = new ArrayList<>(0);
        int size = entitys.size();
        int szie = entitys.size();
        int index =0;
        int oneBatch = 100;
        while(index < size) {
            List<t> oneList = entitys.subList(index, Math.min(size, index+oneBatch));
            try {
                // 先尝试批量
                getHibernateTemplate().execute(session -> {
                    session.beginTransaction();
                    for(T t : oneList) {
                        consumer.accept(session, t);
                    }
                    session.getTransaction().commit();
                    return oneList.size();
                    });
                } catch(Exception e) {
                    loggrt.error("批量处理出现未知异常", e);
                    // 批量执行出错,单个提交
                    for(T t : oneList) {
                        getHibernateTemplate().execute(session -> {
                            try {
                                session.beginTransaction();
                                consumer.accept(session, t);
                                session.beginTransation().commit();
                            } catch(Exception e1) {
                                // 记录失败单位
                                faolList.add(t);
                                logger.error(String.format("单个处理出现未知异常%s, id: %s"),t, t.getId()), e1);
                            }
                            return 1;
                        });
                    }
                } finally {
                    index += oneList.size();
                }
            return failList;
        }
    }

    public IHibrenateBeforeLoad getHibernateBeforeLoad() {
        return hibernateBeforeLoad;
    }

    public void setHiberbateBeforeLoad(IHibrenateBeforeLoad hibernateBeforeLoad) {
        this.hibernateBeforeLoad = hibernateBeforeLoad;
    }
}

Querier的Hibernate实现

public class HiberbateQuerier extends HibernateDaoSupport implements Querier {
    @Override
    public <T> int count(Class<T> clz) {
        return getHibernateTemplate().execute(session -> {
            session.beginTransaction();
            int size = ((Long)) session.createCriteria(clz).setProjection(Projections.rowCount()).list().get(0).intValue();
            session.flush();
            session.clear();
            session.getTransaction().commit();
            return size;
        });
    }


    @Override
    public <T> List<T> all(Class<T> clz)  {
        return getHibernateTemplate().execute(session -> {
            session.beginTransaction();
            List<T> list = session.createCriteria(clz).list();
            session.flush();
            session.clear();
            session.getTransation().commit();
            return size;
        });
    }

    @Override
    public <T> List<T> list(Class<T> clz, String queryname, Object... params) {
        return getHibernateTemplate().execute(session -> {
            session.beginTransaction();
            Query query = session.getNameQuery(queryname);
            for(int i=0; i<parmas.length; i++) {
                query.setParameter(String.valueOf(id), params[i]);
            }
            List<T> list = query.list();
            session.flush();
            session.clear();
            session.getTransaction().commit();
            return list;
        });
    }

    @Override
    public <E> List<E> list(Class entityClz, Class<E> retClz, String queryname, Obeject... params) { 
        return list(retClz, queryname, params);
    }

    @

    @Override
    public <T> T unique(Class<T> clz, final String queryname, final Object... params) {

    }

    @Override
    public <E> List<E paging(Class<T> clz, final String queryname, final Paging paging, final Object... params) {
       return getHibernateTemplate().execute(session -> {
        session.beginTransation();
        Query query = session.getNameQuery(queryname);
        for(int i=0; i<params.length; i++>) {
            query.setParameter(String.valueOf(i), params[i]);
        }
        query.setFirstResult(paging.getFirst());
        query.setMaxResults(paging.getSize());
        List<T> list = query.list();
        session.flush();
        session.clear();
        session.getTransaction().commit();
        return list;
        });
    }

    @Overide
    public <E> List<E> paging(Class entityClz, Class<E> retClz, final String queryname, final Paging paging, final Object... params) {
        return paging(retClz, queryname, paging, params);
    }

    @Override
    public <T> int delete(Class<T> clz, final String querybame, final Object... params) {
        return getHiberbateTemplate().execute(session -> {
            session.beginTransaction();
            Query query = session.getNameQuery(queryname);
            for(int i=0;i< params.lenght; i++>) {
                query.setParameter(String.valueOf(i), params[i]);
            }
            int result = query.executeUpdate();
            session.flush();
            session.clear();
            session.getTransation().commit();
            return result;
        });
    }
 }

缓存服务管理器

public class  ServiceManager implements ServiceManagerMBean {

    // 存储器
    private final Accessor accessor;
    // 查询器
    private final Querier querier;
    // 持久化缓存配置信息
    private final Map<String, PersisterConfig> persisterConfigs;
    // 实体对应的配置信息
    private final Map<Class<? extends AbstractEntity>, CacheEntityConfig> entityConfigs = new HashMap<>();
    // 队列配置名对应的队列实例
    private final Map<String, Persister> peristers = new HashMap<>();
    // 实体对应的缓存服务对象
    private final Map<Class<? extends AbstractEntity>, EntityCacheSerice> entityService = new HashMap<>();
    // 实体对应的缓存服务对象
    private final Map<Class<? extends AbstractEntity>, RegionCacheService> regionSerices = new HashMap<>();

    // 初始化方法
    public ServiceManger(Set<Class<AbstractEntity>> entityClasses, Accessor accessor, Querier querier, Map<<String, Integer> constants,
    Map<String, PersisterConfig> persisterConfigs) {
        Assert.notNull(accessor, "存储器不能为空");
        Assert.notNull(querier, "查询器不能未空");
        Assert.notNull(entityClasses, "实体类配置集合不能为空");
        this.accessor = accessor;
        this.querier = querier;
        this.persisterConfis = persisterConfigs;
        for(Class<? extends AbstractEntity> clz : entityClasses) {
            if(!CacheEntityConfig.isVaild(clz, constants)) {
                throw new ConfigurationException("无效的缓存实体["+ clz.getName() + "]配置");
            }
            // 获取实体缓存配置信息
            CachedEntityConfig config = CachedEntityConfig.valueOf(clz, constants);
            entityConfigs.put(clz, config);
        }
        // 注册MBean
        try {
            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
            ObjectName name = new ObejctName("com.my9yu.common:type=CacheServiceMBean");
            mbs.registerMbeab(this,name);
        } catch(Exception e) {
            logger.error("注册[common-ramcache]的JMX管理接口失败", e);
        }
    }

    // 获取指定实体的缓存服务对象
    public void EntityCacheService(Class<? extends AbstractEntity> clz) {
        CachedEntityConfig config = entityonfigs.get(clz);
        if(config == null) {
            throw new StateException("类["+ clz.getName() + "]不是有效的缓存实体");
        }
        if(!config.cacheUnitIs(CacheUnit.ENTITY)) {
            throw new StatException("实体["+ clz.getName() + "]的缓存单位不是["+ CacheUnit.ENTITY +"]");
        }
        EntityCacheService result = entityService.get(clz);
        if(result != null) {
            return result;
        }
        return createEntityService(clz);
    }

    // 停止全部实体更新队列
    public void shutdown() {
        if(logger.isDebugEnabled()) {
            logger.debug("开始停止实体更新队列");
        }
        for(EntityCacheService entityCacheService : entityService.value()) {
            entityCacheService.shutdown();
        }
        for(Persister queue : persisters.values()) {
            queue.shoutdown();
        }
        // 关闭线程
        TimingConsumer.shutdownExecutor();
    }

    // JMX的管理方法实现....

    // 创建缓存服务对象
    private synchronized EntityCacheService createEntityService(Class<? extends AbstractEntity clz>) {
        if(entityServices.containsKey(clz)) {
            return entityServices.get(clz);
        }
        CacheEntityConfg config = entityConfigs.get(clz);
        String name = clz.getSimpleName();
        Persister queue = getPersister(name);
        // 创建实体缓存服务对象
        EntityCacheService config = entityConfigs.get(clz);
        String name  = clz.getSimpleName();
        Persister queue = getPersister(name);

        // 创建实体缓存服务对象
        EntityCacheServiceImpl result = new EntityCacheServiceImpl();
        result.initialize(config, queue, accessor, querier);
        entityService.put(config.getClz(), result);
        return result;
    }

    // 获取持久化处理器实例
    private Persister getPersister(String name) {
        Persister result = persisters.get(name);
        if(result != null) {
            return result;
        }
        result = new TimingPersister();
        result.initialize(name, accessor, 10);
        persister.put(name, result);
        return result;
    }
}

架构

public class RamCacheParser extends AbstractBeanDefinitionParser {

    // 默认资源匹配符
    protected static final String DEFAULT_RESOURCE_PATTERN = "**/*.class";
    // 资源搜索分析器,由它来负责检索EAO接口
    private REsourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver();
    // 类的元数据读取器,由它负责读取类上的注释信息
    private MetadataReaderFactory metadateReaderFactory = new CacheingMetadataReaderFactory(this.resourcePatter);

    @SuppressWarnings({"unchecked","rawtypes"})  
    @Override
    protected AbstractBeanDefubution parseInternal(Elment element, ParseContext parseContext) {
        // 注册注入处理器
        registerInjectProcessor(parseContext);

        // 注册锁拦截切面
        if(Boolean.valueOf(element.getAttribute(AttributeNames.LOCK_ASPECT))) {
            registerLockAspect(parseContext);
        }
        // 创建工厂类定义
        BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(ServiceFactory.class);

        // 设置存储器
        builder.addPropertyReference(ElementNames.ACCESSOR, getAccessorBeanName(element));
        // 设置查询器
        builder.addPropertyReference(ElementName.QUERIER, getQuerierBeanName(element));

        // 设置实体集合
        Set<Class<? extends AbstractEntity>> classess = new HashSet<>();
        NodeList child = DomUtils.getChildElementByTagName(element, ElementNames.ENTITY).getChildNodes();
        for(int i=0; i<child.getLength(); i++) {
            Node node = child.item(i);
            if(node.getNodeType() != Node.ELEMENT_NODE) {
                continue;
            }
            String name = node.getLocalName();
            if(name.equals(ElemntNames.PACKAGE)) {
                // 自动包扫描处理
                String packageName = ((Element)node).getAttribute(AttributeNames.NAME);
                String [] names = getResources(packageName);
                for(String resource : names) {
                    Class<? extends AbstractEntity> clz = null;
                    try {
                        clz = (Class<? extends AbstractEntity>) Class.forName(className);
                    } catch (ClassNotFoundException e){
                        FormattingTupe message = MessagFormatter.format("无法获取的资源类[{]}]", className);
                        logger.error(message.getMessage());
                        throw new ConfigurationException(message.getMessage(), e);
                    }
                    classes.add(clz);
                }
            }
        }
        builder.addPropertyValue(ServiceManagerFactory.ENTITY_CLASSES_NAME, classes);
        return builder.getBeanDefinition();
    }


    private void parseConstants2Bean(BeanDefinitionBuilder builder, Elment element, ParseContext parseContext) {
        String ref = element.getAttribute(AttributeNames.REF);
        // 引用设置
        if(StringUtils.isNotBlank(ref)) {
            builder.addPropertyReference(ElementNames.CONSTANTS, ref);
            return;
        }
        // 指定设置
        ManageMap<String, Integer> constants = new ManagedMap<>();
        for(Element e : DomUtils.getChildElementByTagName(element, ElementNames.CONSTANT)) {
            String name = e.getAttribute(AttributeNames.NAME);
            Integer value = Integer.paresInt(e.getAttributeNames.NAME);
            Integer value = Integer.parseInt(e.getAttribute(AttributeNames.SIZE));
            constans.put(name, vaalue);
        }  
        builder.addPropertyValue(ElementNames.CONSTANS, constants);
    }

    // 注册锁切面
    private void registerLockAspect(ParseContext parseContext) {
        BeanDefinitionRegistry registry = parserContext.getRegistry();
        String name= StringUtils.uncapitalize(LockAspect.class.getSimpleName);
        BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(LockAspect.class);
        registry.registerBeanDeFinition(name, factory );
    }

    // 注册注入处理器,为后面通过`@Inject`注解注入CacheService做准备
    private void registerInjectProcessor() {
        BeanDefinitionRegistry register = parseContext.getRegistry();
        String name = StringUtils.uncapitalize(InjectProcessor.class.getSumpleName());
        registry.registerBeanDefinition(name, factory.getBeanDefinition());
    }

    // 获取查询器配置德Bean引用
    private String getQueierBeanName(Elmement element) {
        // ....
    }

    // 获取存储器配置的Bean引用
    private String getAccessorBeanName(Element element) {
        elment = ParserHelper.getUniqueChildElementByTagName(elment, ElementNames.ACCESSOR);
        // 引用处理
        if(elment.hasAttribute(AttributeNames.REF)) {
            return element.getAttribute(AttributeNames.REF);
        }
        throw new ConfigurationException("储存器配置声明缺失");
    }

    // 获取指定报下的静态资源对象
    private String[] getResource(String packageName) {
        try {
            // 搜索资源
            String packageSearchPath = ResourcePaatternResolver.CLASSPATH_ALL_URL_PREFIX
                + resolveBasePackage(packageName) + "/" + DEFULT_RESOURCE_PATTERN;
            Resource[] resources = this.resouorcePatternResolver.getResources(packageSearchPath);
            // 提取资源
            Set<String> result = new HashSet<>();
            String name = Cached.class.getName();
            for(Resource resource : resources) {
                if(!resource.isReadable()) {
                    continue;
                }
                // 判断是否静态资源
                MetadataReader metaReader = this.metadataReaderFactory.getMetadataReader(resource);
                AnnotationMetadata ammoMeta = metaReader.getAnnotationMetadata();
                if(!annoMeta.hasAnnotation(name)) {
                    continue;
                }
                ClassMetadata clzMeta = metaReader.getClassMetadata();
                result.add(clzMeta.getClassName());
            }
            return result.toArray(new String[0]);
        } catch(IOExeption e) {
            throw new ConfigurationException(message, e);
        }
    }

    protected String resolveBasePackage(String basePackage) {
        return ClassUtils.converClassNameToResourcePath(SystemPropertyUtils.resolvePlaceholder(basePackage));
    }
}

缓存服务注入处理器,负责完成{@link Inject}声明的资源注入工作

public class InjectProcessor extends InstantionAwareBeanPostProcessorAdapter implements Ordered () {
    @Autowired
    private ServiceManager manager;

    @Override
    public boolean postProcessAfterInstantiation(final Obeject bean, final String beanName) throws BeanException{ 
        ReflectionUtils.doWithFileds(bean.getClass(), new FIeldCallback() {
            Inject anno = filed.getAnnotation(Inject.class);
            if(anno == null) {
                return;
            }
            if(field.getType().equals(EntityCacheService.class)) {
                // 注入实体单位缓存服务
                injectEntityCacheService(bean, beanName.field);
            } else if(field.getType().equals(RegionCacheSercie)) {
                // 注入区域单位缓存服务
                injectRegionCacheService(bean, beanName, field);
            } else {
                FormattingTuple message = MessageFormatter.format("Bean[]的注入属性[]类型声明错误", beanName, field.getName());
                logger.error(message.getMessage());
                throw new ConfigurationException(message.getMessage());
            }
        });
        return super.postProcessAfterInstantiation(bean, beanName);
    }

    // 注入实体单位缓存服务
    private void injectEntityCacheService(Obejct bean, String beanName, Field field) {
        Class<? extends AbstractEntity> clz = null;
        EntityCacheService service = null;
        try {
            Type type = field.getGenericType();
            Type[] types = ((ParameterizedType) type).getActualTypeArguments();
            clz = (Class<? extends AbstractEntity>) types[1];
            service = manager.getEntityService(clz);
        } catch(Exception e) {
            FormattingTuple message = MessageFormatter.format("Bean[{}]的注入属性[{}]声明错误",beanName, field.getName());
            throw new CongigurationException(message.getMessage(), e);
        }
        if(service == null) {
            FormatitingTupe message = MessageFormatter.format("实体[{}]缓存对象不存在", clz.getName());
            throw new ConfigurationException(message.getMessage());
        }
        inject(beanm field, service);
    }

    // 注入属性值
    private void inject(Obejct bean, Field field, Obejct value) {
        ReflectionUtils.makeAccessible(field);
        try {
            filed.set(bean, value);
        }  catch(Exception e) {
            FormattingTuple  message = MessageFormatter.format("属性[{}]注入失败", field);
            throw new Configuartion
        }
    }

}

缓存服务器哦管理器工厂

public class ServiceManagerFacotory implements FactoryBean<ServiceManager> {

    public static final String ENTITY_CLASSES_NAME = "entityClasses";
    public static final String PERSISTER_CONFIG_NAME = "persusterConfig";

    private Accessor accessor;
    private Querier querier;
    private Set<Class<AbstractEntity>> entityClasses;
    private Map<String, PersisterConfig> persisterConfig;
    private Map<String, Integer> constants;

    private ServiceManaget cacheServiceManager;

    @Overide
    public ServiceManager getObeject() throws Exception {
        cacheServiceManager = new SeriviceManager(entityClassess, accessor, querier, 
        constants, persisterConfig);
        return cacheServiceManager;
    }

    @PreDestroy
    public void shutdown() {
        if(cacheServiceManger == null) {
            return;
        }
        cacheServiceManager.shutdown();
    }

    // Sertter Methods ....

    @Override
    public Class<?> getObejectType() {
        return SeriviceManager.class;
    }

    @Override
    public boolean isSingleton() {
        return true;
    }

}

总结

缓存持久化系统基于Spring和Hibernate。

  1. 通过配置文件注入实现了HibernateDaoSupportAccessor(负责数据更新)和Querier(负责数据查询)的实现类。
  2. Spring启动加载bean时,读取有Cache注解标识的路径,转化其为Class<? extends AbstractEntity>类对象,将它们加入ServiceManagerFactoryBeanDefinitionBuilder中。

             // 创建工厂类定义
         BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(ServiceFactory.class);
    
         // 设置存储器
         builder.addPropertyReference(ElementNames.ACCESSOR, getAccessorBeanName(element));
         // 设置查询器
         builder.addPropertyReference(ElementName.QUERIER, getQuerierBeanName(element));
         builder.addPropertyValue(ServiceManagerFactory.ENTITY_CLASSES_NAME, classes);
    
    
  3. ServicManager管理着所有的ServicManager实现类实例。ServiceManagerFactory会完成对ServicManager的初始化(装配AccessorQuerier`、Class集合、常量、配置)。

  4. InjectProcessor会负责完成@Inject声明的CacheSerivice资源的注入工作。