定时任务系统学习笔记
游戏许多业务会用到一些需要能修改配置的定时任务,例如凌晨重置、boss定时刷新、副本/战场/竞技场定时开启、拍卖行/交易的心跳等,这个时候需要通用的定时任务系统。
配置资源
定时业务配置类
@Resource
public class ScheduledResource implement IAfterResourceLoad {
@ResourceId
private ScheduledType id;
// 时间cron
private String cron;
private String trigger;
private CronSequenceGenerator CronSequenceGenerator;
@Override
public void afterLoad() {
trigger = new CronTrigger(cron);
cromSequenceGenerator = new CromSequenceGenerator(cron, TimeZone.getDefault());
}
// 获取当前时间之后的下一次执行时间
public long getNextTime(long lastRunTime) {
retrun cromSequenceGenerator.next(new Date(lastRunTime)).getTime();
}
}
定时任务类型
public enum ScheduleType {
// 凌晨刷新
midnight(RunScope.all),
// 公会申请过期检查
GANG_APPLICATION_CHECK(RunScope.game),
// 沙巴克分组
SHABAKE_SPLIT_GROUP(RunScope.center),
// 灭神塔普通模式结束时间
GOD_KILL_TOWER_GANG_START(RunScope.game),
// ......接下来省略...
}
代码运行范围,用于区分游戏服代码与跨服代码
public enum RunScope {
// 只在游戏服运行
game {
@Override
public boolean canRun() {
return ServerType.isGameServer();
}
}
// 只在中心服运行
center {
@Override
public boolean canRun() {
return ServerType.isCenterServer();
}
}
// 当前服务器类型是否允许运行
public abstract boolean canRun();
// 游戏服与中心服都运行
all {
@Override
public boolean canRun() {
return !canRun();
}
}
// 当前服务器类类型是否不允许运行
public boolean canNotRun() {
return !canRun();
}
}
在添加一个新类型的定时任务类型时,需要分别在ScheduledResource.xlsx和ScheduleType
里同时加入。
定时任务管理的抽象
// 服务器刷新时,这里定义的定时任务,在服务器重启后,如果错过了刷新点,会进行一次补偿
public abstract class BaseFixScheduleManager<Process extends Comparble<Process>, Run> {
@Autowird
private ScheduledManager scheduledManager;
// 已注册的定时业务
private Map<ScheduledType TreeMap<Process, Lsit<Run>>> fixSchedleMap = new HashMap<>();
// 开始定时
public void start() {
if(ServerType.isTransferServer()) {
// 战斗服不运行全局定时器
return;
}
for(ScheduledType scheduledType : fixScheduleMap.keySet()) {
if(scheduledType.getRunScope().canNotRun()) {
// 与服务器类型匹配的才运行
continue;
}
// 定时刷新
scheduledManager.addScheduled(scheduled, () -> scheduledRefresh(scheduledType));
}
}
// 时间到了,开始刷新
protected abstract void scheduledRefresh(ScheduledType scheduleType);
// 定时任务启动前
public synchronized void registerBefore(ScheduledType type, Run runnable) {
register(ScheduledProcess.before, type, runnable);
}
// 定时任务启动中
public synchronized void registerOn(ScheduledType type, Run runnable) {
register(ScheduledProcess.on, type, runnable);
}
// 注册监听服务器数据刷新
public synchronized void register(Proccess process, ScheduledType type, Run runnable) {
TreeMap<Process, List<Run>> map = fixScheduleMap.computeIfAbsent(type, k -> new TreeMap<>());
List<Run> list = map.computeIfAbsent(process, k -> ArrayLsit<>());
list.add(runnable);
}
// 获取所有已注册的定时器类型
public Collection<ScheduleType> getAllTypes() {
return fixScheduleMap.keySet();
}
// 尝试批量刷新,保证执行顺序
protected void tryBatchRefresh(ScheduledData scheduledData, Collection<ScheduledType> scheduledTypes, Consumer<ScheduleddData> saver, BiConsumer<Run, Long> run) {
List<WaitSchedule> list = new ArrayList<>(5);
for(ScheduledType scheduledType : scheduledTypes) {
long nextShceduledTime = nextSheduledTime(scheduleData, scheduledType, saver);
if(nextScheduledTime <= System.currentTimeMillis()) {
list.add(WaitSchedule.valueOf(scheduledType,nextScheduledTime));
}
if(list.isEmpty()) {
return;
}
}
// 按照时间排序,保证定时器执行的顺序
Collections.sort(list);
list.forEach(waitSchedule -> runScheduled(scheduledData, waitSchedule.getScheduledType(),saver,run));
}
// 尝试刷新
protected void tryRefresh(ScheduledData shceduledData, ScheduledType scheduledType) {
if(nextScheduledTime(scheduledData, scheduledType, saver) <= System.currentTimeMillis()) {
runShceduled(scheduleData, scheduledType, savaer, run);
}
}
// 计算一下次的运算时间
private long nextScheduledTime(ScheduledData scheduledData, ScheduledType scheduledType, Consumer<ScheduledData> saver) {
Long lastScheduledTime = scheduledData.getLastScheduledTime(scheduledType, lastScheduledTime);
if(lastScheduled == null) {
// 这里记录当前时间,是跳过玩家数据第一次初始化不需要刷新
lastScheduledTime = System.currentTimeMills();
scheduledData.recordScheduled(scheduledType, lastScheduledTime);
saver.accept(scheduledData);
}
return scheduledManager.getNextTime(scheduleType, lastScheduledTime);
}
// 执行定时任务
private void runScheduled(ScheduledData scheduledData,
ScheduledType scheduledType,
Consumer<ScheduledData> saver,
BiConsumer<Run, Long> run) {
long lastScheduledTime = scheduledData.getLastScheduledTime(scheduledType);
// 记录刷新时间
scheduledData.recordScheduled(scheduledType, System.currentTimeMills());
// 刷新
TreeMap<Process, List<Run>> map = fixScheduleMap.get(sheduledType);
for(List<Run> list: map.values()) {
try {
run.accept(scheduledRun, lastScheduledTime);
} catch (Exception e) {
}
}
saver.accept(scheduledData);
}
}
定时任务属性记录
public class ScheduledData {
// 定时任务刷新记录
private ConcurrentHashMap<ScheduledType, Long> scheduledRecord;
public static ScheduledData valueOf() {
ScheduleData vo = new ScheduleData();
vo.scheduledRecord = new ConcurrentHashMap<>(0);
return vo;
}
// 获取最后一次刷新的时间
public Long getLastScheduledTime(ScheduledType scheduleType) {
return scheduledRecord.get(scheduledType);
}
// 记录定时任务定时任务
public void recordScheduled(ScheduledType scheduledType, Long time) {
scheduledRecord.put(scheduledType, time);
}
}
定时业务员管理,定时业务初始化的逻辑
@Component
public class ScheduleManager extends InstantiationAwareBeanPostProcessorAdpater {
private static ScheduledManager instance;
public ScheduledManager() { instance = this; }
public static ScheduledManger self() {
return instance;
}
@Static
private Strorage<ScheduledType, ScheduleddResource> strorage;
@Autowired
private Scheduler scheduler;
// 记录所有定时业务
private Map<ScheduledType, RunMethod> existTasks = new ConcurrentHashMap<>();
// 定时业务是否已经启动
private boolean init = false;
@RessourceReload(ScheduledResource.class)
public void onLoad() {
init();
}
// 初始所有定时业务
public synchronized void init() {
existTasks.forEach(this::schedule);
init = true;
}
@Override
public boolean postProcessAfeterInstantiation(Object bean , String beanName) throws BeansException {
// 通过反射工具找到找到`ScheduledByResource`注解,
ReflectionUtils.doWithMethods(AopUtils.getTargetClass(bean), method -> {
ScheduledByResource annotation = AnnotationUtils.getAnnotation(method, ScheduledByResource.class);
if(annotation == null) {
return;
}
ScheduledType scheduledType = annotation.value();
final MethodInvokingRunnable runnable = new MethodInvokingRunnable();
runnable.setTargetObject(bean);
runnable.setTargetMethod(method.getName());
try {
runnable.prepare();
} catch(Exception e) {
throw new IllegalSateException("无法创建定时任务,类型:"+ annotation.value(),e);
}
addScheduled(scheduledType, runnable);
});
return super.postProcessAfterInstantiation(bean, beanName);
}
// 添加定时任务,如果服务器已启动则启动定时业务
// 如果服务器没有启动则等待服务器启动完成以后,再启动定时业务
public sychronizedvoid addScheduled(ScheduledType scheduledType, Runnable runnable) {
if(SreverType.isTransferServer()) {
// 战斗服不启动全服定时器
return;
}
RunMethod runMethod = existTasks.get(scheduledType);
if(runMethod == null) {
runMethod = new RunMethod();
existTasks.put(scheduledType, runMethod);
}
runMenthod.addRunnable(runnable);
existTasks.put(scheduledType, runMethod);
if(init) {
// 服务器启动后,定时业务已经初始结束,则直接启动新的定时业务
schedule(scheduledType, runMethod);
}
}
// 启动定时业务
private void schedule(ScheduledType scheduledType, RunMethod runMethod) {
if(runMethod.future != null) {
// 取消原来的定时业务
runMethod.future.cancel(false);
}
ScheduledResource resource = storage.get(scheduledType, true);
runMethod.future = scheduler.schedule(new ScheduledTask() {
@Override
public String getName() {
return scheduleType.name();
}
@Override
public void run() {
runMethod.run();
}
}, resource.getTrigger());
LOGGER.debug("启动定时业务{}",scheduledType);
}
// 获取指定定时任务下一次的执行时间点
public long getNextTime() {
return storage.get(scheduledType, true).getNextTime(lastRunTime);
}
// 检查指定时间是否在两个时间检查之间
public boolean inTime(ScheduledType start, ScheduledType end, long lastRunTime) {
long nextStartTime = storage.get(start, true).getNextTime(lastRunTime);
long nextEndTime = storage.get(end,true).getNextTime(lastRunTime);
retrun nextStartTime >= nextEndTime;
}
// 检查当前时间是否在两个时间点之间
public boolean inTime(ScheduleType start, ScheduledType end) {
return inTime(start, end, System.currentTimeMillis());
}
public ScheduleResource getResourceNullable(ScheduledType type) {
return storage.get(tyoe,false)
}
// 执行方法的抽象
private class RunMethod {
ScheduledFuture future;
private List<Runnable> runnableList = new CopyOnWriteArrayList<>();
public void addRunnable(Runnable runnable) {
runnableList.add(runnable);
}
public void run() {
runnableList.forEach(runnable -> {
try {
runnable.run();
} catch (Exception e) {
LOGGER.error("定时业务执行出错",e);
}
});
}
}
}
定时模块
玩家定时任务执行接口
@FunctionalInterface
public interface ScheduledPlayerRun {
/**
* 定时任务执行接口
* @param player
* @param lastRunTime 上次执行时间
*/
void run(Player player, long lastRunTime);
}
玩家刷新定时,这里定义的定时任务,玩家在线会马上刷新,玩家不在线会在玩家重新上线时进行补偿刷新
public class PlayerFixScheduleManager extends BaseFixScheduleManager<ScheduledPlayerRun> {
@Override
protected void scheduledRefresh(ScheduledType scheduledType) {
PlayerManager.getInstance().walkOnlinePlayer(
player -> IdentitityEventExecutorGroup.addTask(player.getDispatcherHashCode(),scheduledType.name(),
() -> playerRefresh(player,scheduledType))
);
}
private void playerRefresh(Player player, ScheduleType scheduledType) {
ScheduledData scheduledData = player.getPlayerLoginInfo().getScheduledData();
tryBatchRefresh(scheduledData,getAllType);
}
// 玩家上线时,检测刷新情况,没有刷新的执行刷新
public void checkOnPlayerLogin(Player player) {
ScheduledData scheduledData = player.getPlayerLoginInfo().getScheduledData();
tryBathchRefresh(scheduledData,getAllTypes(),
(sd) -> PlayerLoginManager.getSelf().update(),
(scheduledPlayerRun,lastRunTime) -> scheduledPlayerRun.run(player,lastRunTime);
);
}
public registerBefore(ScheduledType type, ScheduledRun runnable) {
super.register(ServerScheduledProcess.before, type, runnable);
}
public void register(ScheduledType type, ScheduledRun runnable) {
super.register(ServerScheduledProcess.on, type, runnable);
}
}
玩家刷新定时,这里定义的定时任务,服务器重启以后,如果错过了刷新点,会进行一次补偿
public class ServerFixScheduleManager extends BaseFixScheduleManager<ScheduledRun> {
@OVerride
public void start() {
super.start();
// 启动时尝试一次刷新
ScheduledData scheduledData = serverManager.getData(ServerDataKey.SCHEDULED);
tryBatchRefresh(scheduledData, getAllTypes(),
(sd) -> serverManager.update(ServerDataKey.SCHEDULED),
ScheduledRun:run
);
}
@Override
protected void scheduledRefresh(ScheduledType scheduledType) {
ScheduledData scheduledData = serverManager.getData(ServerDataKey.SCHEDULED);
tryBatchRefresh(scheduledData, scheduledType,
(sd) -> serverManager.update(ServerDataKey.SCHEDULED),
ScheduledRun:run
);
}
public registerBefore(ScheduledType type, ScheduledRun runnable) {
super.register(ServerScheduledProcess.before, type, runnable);
}
public void register(ScheduledType type, ScheduledRun runnable) {
super.register(ServerScheduledProcess.on, type, runnable);
}
}
定时系统的使用
在
ScheduleResource.xml
中定义定时任务类型和Corn字符串,在ScheduleType
枚举中添加定时任务类型在需要启动定时任务的地方使用
registerOn
或者registerBefore
方法,例如PlayerFixScheduleManager.self().registerOn(ScheduledType.midnight, () -> { // 业务逻辑 })
ServerFixScheduleManager.self().registerBefore(ScheduledType.midnight, () -> { // 业务逻辑 })