定时任务系统学习笔记


游戏许多业务会用到一些需要能修改配置的定时任务,例如凌晨重置、boss定时刷新、副本/战场/竞技场定时开启、拍卖行/交易的心跳等,这个时候需要通用的定时任务系统。

配置资源

定时业务配置类

@Resource
public class ScheduledResource implement IAfterResourceLoad {
    @ResourceId
    private ScheduledType id;

    // 时间cron
    private String cron;
    private String trigger;
    private CronSequenceGenerator CronSequenceGenerator;

    @Override
    public void afterLoad() {
        trigger = new CronTrigger(cron);
        cromSequenceGenerator = new CromSequenceGenerator(cron, TimeZone.getDefault());
    }

    // 获取当前时间之后的下一次执行时间
    public long getNextTime(long lastRunTime) {
        retrun cromSequenceGenerator.next(new Date(lastRunTime)).getTime();
    }
}

定时任务类型

public enum ScheduleType {
    // 凌晨刷新
    midnight(RunScope.all),
    // 公会申请过期检查
    GANG_APPLICATION_CHECK(RunScope.game),
    // 沙巴克分组
    SHABAKE_SPLIT_GROUP(RunScope.center),
    // 灭神塔普通模式结束时间
    GOD_KILL_TOWER_GANG_START(RunScope.game),

    // ......接下来省略...

}

代码运行范围,用于区分游戏服代码与跨服代码


public enum RunScope {

    // 只在游戏服运行
    game {
        @Override
        public boolean canRun() {
            return ServerType.isGameServer();
        }
    }

    // 只在中心服运行
    center {
        @Override
        public boolean canRun() {
            return ServerType.isCenterServer();
        }
    }

    // 当前服务器类型是否允许运行
    public abstract boolean canRun();

    // 游戏服与中心服都运行
    all {
        @Override
        public boolean canRun() {
            return !canRun();
        }
    }

    // 当前服务器类类型是否不允许运行
    public boolean canNotRun() {
        return !canRun();
    }
}

在添加一个新类型的定时任务类型时,需要分别在ScheduledResource.xlsx和ScheduleType里同时加入。


定时任务管理的抽象

// 服务器刷新时,这里定义的定时任务,在服务器重启后,如果错过了刷新点,会进行一次补偿

public abstract class BaseFixScheduleManager<Process extends Comparble<Process>, Run> {

    @Autowird
    private ScheduledManager scheduledManager;

    // 已注册的定时业务
    private Map<ScheduledType TreeMap<Process, Lsit<Run>>> fixSchedleMap = new HashMap<>();

    // 开始定时
    public void start() {
        if(ServerType.isTransferServer()) {
            // 战斗服不运行全局定时器
            return;
        }
        for(ScheduledType scheduledType : fixScheduleMap.keySet()) {
            if(scheduledType.getRunScope().canNotRun()) {
                // 与服务器类型匹配的才运行
                continue;
            }
            // 定时刷新
            scheduledManager.addScheduled(scheduled, () -> scheduledRefresh(scheduledType));
        }
    }

    // 时间到了,开始刷新
    protected abstract void scheduledRefresh(ScheduledType scheduleType);

    // 定时任务启动前
    public synchronized void registerBefore(ScheduledType type, Run runnable) {
        register(ScheduledProcess.before, type, runnable);
    }


    // 定时任务启动中
    public synchronized void registerOn(ScheduledType type, Run runnable) {
        register(ScheduledProcess.on, type, runnable);
    }

    // 注册监听服务器数据刷新
   public synchronized void register(Proccess process, ScheduledType type, Run runnable) {
       TreeMap<Process, List<Run>> map = fixScheduleMap.computeIfAbsent(type, k -> new TreeMap<>());
       List<Run> list = map.computeIfAbsent(process, k -> ArrayLsit<>());
       list.add(runnable);
   }

   // 获取所有已注册的定时器类型
   public Collection<ScheduleType>  getAllTypes() {
       return fixScheduleMap.keySet();
   }

   // 尝试批量刷新,保证执行顺序
   protected void tryBatchRefresh(ScheduledData scheduledData, Collection<ScheduledType> scheduledTypes, Consumer<ScheduleddData> saver, BiConsumer<Run, Long> run) {
       List<WaitSchedule> list = new ArrayList<>(5);
       for(ScheduledType scheduledType : scheduledTypes) {
           long nextShceduledTime = nextSheduledTime(scheduleData, scheduledType, saver);
           if(nextScheduledTime <= System.currentTimeMillis()) {
               list.add(WaitSchedule.valueOf(scheduledType,nextScheduledTime));
           }
           if(list.isEmpty()) {
               return;
           }
       }
           // 按照时间排序,保证定时器执行的顺序
        Collections.sort(list);
        list.forEach(waitSchedule -> runScheduled(scheduledData, waitSchedule.getScheduledType(),saver,run));
   }

   // 尝试刷新
   protected void tryRefresh(ScheduledData shceduledData, ScheduledType scheduledType) {
       if(nextScheduledTime(scheduledData, scheduledType, saver) <= System.currentTimeMillis()) {
           runShceduled(scheduleData, scheduledType, savaer, run);
       }
   }

   // 计算一下次的运算时间
   private long nextScheduledTime(ScheduledData scheduledData, ScheduledType scheduledType, Consumer<ScheduledData> saver) {
       Long lastScheduledTime = scheduledData.getLastScheduledTime(scheduledType, lastScheduledTime);
       if(lastScheduled == null) {
           // 这里记录当前时间,是跳过玩家数据第一次初始化不需要刷新
           lastScheduledTime = System.currentTimeMills();
           scheduledData.recordScheduled(scheduledType, lastScheduledTime);
           saver.accept(scheduledData);
       }
       return scheduledManager.getNextTime(scheduleType, lastScheduledTime);
   }

   // 执行定时任务
   private void runScheduled(ScheduledData scheduledData,
                            ScheduledType scheduledType,
                            Consumer<ScheduledData> saver,
                            BiConsumer<Run, Long> run) {
        long lastScheduledTime = scheduledData.getLastScheduledTime(scheduledType);
        // 记录刷新时间
        scheduledData.recordScheduled(scheduledType, System.currentTimeMills());
        // 刷新
        TreeMap<Process, List<Run>> map = fixScheduleMap.get(sheduledType);
        for(List<Run> list: map.values()) {
            try {
                run.accept(scheduledRun, lastScheduledTime);
            } catch (Exception e) {

            }
        }
        saver.accept(scheduledData);
   }

}

定时任务属性记录

public class ScheduledData {

    // 定时任务刷新记录
    private ConcurrentHashMap<ScheduledType, Long> scheduledRecord;

    public static ScheduledData valueOf() {
        ScheduleData vo = new ScheduleData();
        vo.scheduledRecord = new ConcurrentHashMap<>(0);
        return vo;
    }

    // 获取最后一次刷新的时间
    public Long getLastScheduledTime(ScheduledType scheduleType) {
        return scheduledRecord.get(scheduledType);
    }

    // 记录定时任务定时任务
    public void recordScheduled(ScheduledType scheduledType, Long time) {
        scheduledRecord.put(scheduledType, time);
    }
}

定时业务员管理,定时业务初始化的逻辑


@Component
public class ScheduleManager extends InstantiationAwareBeanPostProcessorAdpater {

    private static ScheduledManager instance;

    public ScheduledManager() { instance = this; }

    public static ScheduledManger self() {
        return instance;
    }
    @Static
    private Strorage<ScheduledType, ScheduleddResource> strorage;

    @Autowired
    private Scheduler scheduler;

    // 记录所有定时业务
    private Map<ScheduledType, RunMethod> existTasks = new ConcurrentHashMap<>();

    // 定时业务是否已经启动
    private boolean init = false;

    @RessourceReload(ScheduledResource.class)
    public void onLoad() {
        init();
    }

    // 初始所有定时业务
    public synchronized void init() {
        existTasks.forEach(this::schedule);
        init = true;
    }

     @Override
     public boolean postProcessAfeterInstantiation(Object bean , String beanName) throws BeansException {
          // 通过反射工具找到找到`ScheduledByResource`注解,
        ReflectionUtils.doWithMethods(AopUtils.getTargetClass(bean), method -> {
             ScheduledByResource annotation = AnnotationUtils.getAnnotation(method, ScheduledByResource.class);
             if(annotation == null) {
                 return;
             }
             ScheduledType scheduledType = annotation.value();
             final MethodInvokingRunnable runnable = new MethodInvokingRunnable();
             runnable.setTargetObject(bean);
             runnable.setTargetMethod(method.getName());
             try {
                 runnable.prepare();
             } catch(Exception e) {
                 throw new IllegalSateException("无法创建定时任务,类型:"+ annotation.value(),e);
             }
             addScheduled(scheduledType, runnable);
        });
        return super.postProcessAfterInstantiation(bean, beanName);
    }

    // 添加定时任务,如果服务器已启动则启动定时业务
    // 如果服务器没有启动则等待服务器启动完成以后,再启动定时业务
    public sychronizedvoid addScheduled(ScheduledType scheduledType, Runnable runnable) {
        if(SreverType.isTransferServer()) {
            // 战斗服不启动全服定时器
            return;
        }
        RunMethod runMethod = existTasks.get(scheduledType);
        if(runMethod == null) {
            runMethod = new RunMethod();
            existTasks.put(scheduledType, runMethod);
        }
        runMenthod.addRunnable(runnable);
        existTasks.put(scheduledType, runMethod);
        if(init) {
            // 服务器启动后,定时业务已经初始结束,则直接启动新的定时业务
            schedule(scheduledType, runMethod);
        }
    }

    // 启动定时业务
    private void schedule(ScheduledType scheduledType, RunMethod runMethod) {
        if(runMethod.future != null) {
            // 取消原来的定时业务
            runMethod.future.cancel(false);
        }
        ScheduledResource resource = storage.get(scheduledType, true);
        runMethod.future = scheduler.schedule(new ScheduledTask() {
            @Override
            public String getName() {
                return scheduleType.name();
            }
            @Override
            public void run() {
                runMethod.run();
            }
        }, resource.getTrigger());
        LOGGER.debug("启动定时业务{}",scheduledType);
    }

    // 获取指定定时任务下一次的执行时间点
    public long getNextTime() {
        return storage.get(scheduledType, true).getNextTime(lastRunTime);
    }
    // 检查指定时间是否在两个时间检查之间
    public boolean inTime(ScheduledType start, ScheduledType end, long lastRunTime) {
        long nextStartTime = storage.get(start, true).getNextTime(lastRunTime);
        long nextEndTime = storage.get(end,true).getNextTime(lastRunTime);
        retrun nextStartTime >= nextEndTime;
    }

    // 检查当前时间是否在两个时间点之间
    public boolean inTime(ScheduleType start, ScheduledType end) {
        return inTime(start, end, System.currentTimeMillis());
    }

    public ScheduleResource getResourceNullable(ScheduledType type) {
        return storage.get(tyoe,false)
    }

    // 执行方法的抽象
    private class RunMethod {
        ScheduledFuture future;
        private List<Runnable> runnableList = new CopyOnWriteArrayList<>();

        public void addRunnable(Runnable runnable) {
            runnableList.add(runnable);
        }

        public void run() {
            runnableList.forEach(runnable -> {
                try {
                    runnable.run();
                } catch (Exception e) {
                    LOGGER.error("定时业务执行出错",e);
                }
            });
        }
    }
}


定时模块

玩家定时任务执行接口

@FunctionalInterface
public interface ScheduledPlayerRun {
    /**
    *   定时任务执行接口
    *   @param player
    *   @param lastRunTime 上次执行时间
    */
    void run(Player player, long lastRunTime);
}

玩家刷新定时,这里定义的定时任务,玩家在线会马上刷新,玩家不在线会在玩家重新上线时进行补偿刷新

public class PlayerFixScheduleManager extends BaseFixScheduleManager<ScheduledPlayerRun> {

    @Override
    protected void scheduledRefresh(ScheduledType scheduledType) {
        PlayerManager.getInstance().walkOnlinePlayer(
            player -> IdentitityEventExecutorGroup.addTask(player.getDispatcherHashCode(),scheduledType.name(),
            () -> playerRefresh(player,scheduledType))
        );
    }


    private void playerRefresh(Player player, ScheduleType scheduledType) {
        ScheduledData scheduledData = player.getPlayerLoginInfo().getScheduledData();
        tryBatchRefresh(scheduledData,getAllType);
    }


    // 玩家上线时,检测刷新情况,没有刷新的执行刷新
    public void checkOnPlayerLogin(Player player) {
        ScheduledData scheduledData = player.getPlayerLoginInfo().getScheduledData();
        tryBathchRefresh(scheduledData,getAllTypes(),
        (sd) -> PlayerLoginManager.getSelf().update(),
        (scheduledPlayerRun,lastRunTime) -> scheduledPlayerRun.run(player,lastRunTime);
        );
    }

    public registerBefore(ScheduledType type, ScheduledRun runnable) {
        super.register(ServerScheduledProcess.before, type, runnable);
    }

    public void register(ScheduledType type, ScheduledRun runnable) {
         super.register(ServerScheduledProcess.on, type, runnable);
    }
}

玩家刷新定时,这里定义的定时任务,服务器重启以后,如果错过了刷新点,会进行一次补偿

public class ServerFixScheduleManager extends BaseFixScheduleManager<ScheduledRun> {

    @OVerride
    public void start() {
        super.start();
        // 启动时尝试一次刷新
        ScheduledData scheduledData = serverManager.getData(ServerDataKey.SCHEDULED);
        tryBatchRefresh(scheduledData, getAllTypes(),
        (sd) -> serverManager.update(ServerDataKey.SCHEDULED),
        ScheduledRun:run
        );
    }

    @Override
    protected void scheduledRefresh(ScheduledType scheduledType) {
        ScheduledData scheduledData = serverManager.getData(ServerDataKey.SCHEDULED);
        tryBatchRefresh(scheduledData, scheduledType,
        (sd) -> serverManager.update(ServerDataKey.SCHEDULED),
        ScheduledRun:run
        );
    }

    public registerBefore(ScheduledType type, ScheduledRun runnable) {
        super.register(ServerScheduledProcess.before, type, runnable);
    }

    public void register(ScheduledType type, ScheduledRun runnable) {
         super.register(ServerScheduledProcess.on, type, runnable);
    }
}

定时系统的使用

  1. ScheduleResource.xml中定义定时任务类型和Corn字符串,在ScheduleType枚举中添加定时任务类型

  2. 在需要启动定时任务的地方使用registerOn或者registerBefore方法,例如

    • PlayerFixScheduleManager.self().registerOn(ScheduledType.midnight, () -> { // 业务逻辑 })
    • ServerFixScheduleManager.self().registerBefore(ScheduledType.midnight, () -> { // 业务逻辑 })