游戏线程模型学习

任务调度器

public class dentityEventExectorGroup {

    private static fanal String EXECUTOR_NAME_PREFIX = "Identity-dispatcher";

    static private EventExecutor[] childrens;
    static private Profile threadProfile = new Profile();
    static private Profile taskProfile = new Prfile;

    // 初始化, nThreads 线程数量
    synchronized public static void init() {
        if(childrens == null) {
            childrens = new EventExecutor[nThreads];
            ThreadFactory threadFactory = new DefaultThreadFactory(EXECUTOR_NAME_PREFIX);
            for(int i=0; i< nThreaeds; i++) {
                EventExecutor eventExecutors = new EventExecutor(i+1, null, threadDactory, true);
                childrebs[i] = eventEventors;
            }
        }
    }

    public static void shutdown() {
        for(EventExcutor c : childrens) {
            c.shutdownGracefully();
        }
    }

    private static EventExcutor takeExecutor(int dispatcherHashCode) {
        return childrens[Math.abs(dispatcherHashCode % childrens.length)];
    }

    // 添加同步任务
    public static Future<?> addTask(AbstractDispatcherHashCodeRunnable dispatcherHashCodRunnable) {
        checkName(dispatcherHashCodRunnable.name());
        EventExecutor eventExecutor = takeExecutor(dispatcherHashCodRunnable.getDispatcherHashCode());
        dispathcherHashCodeRunnable.submit(eventExecutor.getIndex(),false);
        return eventExecutor.addTask(dispatcherHashCodeRunnable);
    }

    // 添加同步任务
    public static Future<?> addTask(int dispatcherCode,String name, Runnable runnable) {
        checkName(name);
        return addTask(new AbstractDispatcherHashCodeRunnable() {
            @Override
            public String name() {
                return name;
            }
            @Ovveride
            public int getDispatcherHashCode() {
                return dispatcherCode();
            }
            @Override
            public void doRun() {
                runnable.ru();
            }
        });
    }

    // 添加延迟任务
    public static ScheduleFuture<?> addScheduleTask(AbstractDispatcherHashCodeRunnable dispatcherHashCodRunnable, long delay, TimeUtil unit) {
         checkName(dispatcherHashCodRunnable.name());
        EventExecutor eventExecutor = takeExecutor(dispatcherHashCodRunnable.getDispatcherHashCode());
        dispathcherHashCodeRunnable.submit(eventExecutor.getIndex(),false);
        return eventExecutor.addScheduleTask(dispatcherHashCodeRunnable);
    }

    public static ScheduledFuture<?> addScheduleTask(int dispatcherHashCode, String name,  String name, long delay, TimeUtil unit, Runnable runnable) {
        checkName(name);
        return addScheduleTask(new AbstractDispatcherHashCodeRunnable() {
            @Override
            public int getDispatcherHashCode() {
                return dispatcherHashCode;
            }
            @Override
            public String name() {
                return name;
            }
            @Override
            public void doRun() {
                runnable.run();
            }
        }, delay, unit);
    }

    // 添加定时器任务
    // 该任务按照周期执行
    public static addScheduleAtFixedRate(AbstractDispatcherHashCodeRunnable dispatcherHashCodeRunnable,
    long initialDeley, long period, TimeUnit unit) {
        checkName(dispatcherHashCodeRunnable.name());
        EventExecutor eventExecutor = taskExecutor(dispatcherHashCodeRunnable.getDisatcherHashCode());
        // 延迟任务等待耗时统计无意义
        dispatcherHashCodeRunnable.submit(eventExecutor.getIndex(), true);
        return eventExecutor.addScheduleAtFixedRate(dispatcherHashCodeRunnable, initialDelay, period, unit);
    }

    // 添加定时器任务
    // 该任务按照周期执行
    public static ScheduledFuture<?> addScheduleAtFixedRate(int dispatcherHashCode, String name,  String name, long initialDelay, long  period, TimeUtil unit, Runnable runnable ) {
         return addScheduleAtFixedRate(new AbstractDispatcherHashCodeRunnable() {
            @Override
            public int getDispatcherHashCode() {
                return dispatcherHashCode;
            }
            @Override
            public String name() {
                return name;
            }
            @Override
            public void doRun() {
                runnable.run();
            }
        }, initialDelay,period, unit);
    }

    // 添加定时器任务
    // 该任务按延迟时间执行
    public static addScheduleAtFixedRate(AbstractDispatcherHashCodeRunnable dispatcherHashCodeRunnable,
    long initialDeley, long period, TimeUnit unit) {
        checkName(dispatcherHashCodeRunnable.name());
        EventExecutor eventExecutor = taskExecutor(dispatcherHashCodeRunnable.getDisatcherHashCode());
        // 延迟任务等待耗时统计无意义
        dispatcherHashCodeRunnable.submit(eventExecutor.getIndex(), true);
        return eventExecutor.addScheduleAtFixedDelay(dispatcherHashCodeRunnable, initialDelay, period, unit);
    }

    // 添加定时器任务
    // 该任务按照延迟时间执行
    public static ScheduledFuture<?> addScheduleAtFixedRate(int dispatcherHashCode, String name,  String name, long initialDelay, long  period, TimeUtil unit, Runnable runnable ) {
         return addScheduleAtFixedDelay(new AbstractDispatcherHashCodeRunnable() {
            @Override
            public int getDispatcherHashCode() {
                return dispatcherHashCode;
            }
            @Override
            public String name() {
                return name;
            }
            @Override
            public void doRun() {
                runnable.run();
            }
        }, initialDelay,period, unit);
    }

    // 获取线程池线程数量

    public static getThreadNum() {
        return childrens.length;
    }

    // 等待线程池前面提交的任务全部执行完成
    public static void blockWaitRunOver() {
        int threadNum = getThreadNum();
        AtomicInteger ready = new AtomicInteger(threadNum);
        // 注意,这里是要原子变量的任务塞满任务队列
        for(int i =0; i< threadNum; i++>) {
            addTask(i, "checkReady", ready::decrementAndGet);
        }
        // 直到所有加入任务队列的原子减一任务执行完毕,ready的值变为0,才会结束while循环,结束函数结束阻塞
        while(ready.get() > 0) {
                try{
                    TimeUnit.MillLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
        }
    }

    private static void checkName(String name) {
        if(StringUtils.isEmpty(name)) {
            throw new NullPointerException("name is null!");
        }
    }
} 

事件执行器

class EventExecutor extends SingleTreadEventExecutor {

    private int index;

    protected EventExecutor(int index, EventExecutorGroup parent, ThreadFacotry threadFactory, boolean addTaskWakeUp{   super(parent, threadFactory, addTaskWakesUp);
        this.index = index;
    }

    @Override
    protected void run() {
        do {
            Runnable task = takeTask();
            if(task != null) {
                task.run();
                updateLastExecutionTime();
            }
        } while (!confirmShutdown());
    }

    public ScheduleFuture<?> addScheduleTask(AbstractDispatcherHashCodeRunnable task, long delay, TimeUnit unit) {
        return schedule(task, deley, unit);
    }

    // ... 接下来一堆定时器方法


    public Future<?> addTask() {
        return submit(task);
    }

    public int getIndex() {
        return index;
    }
}

任务分发标记器

public abstract class AbstractDispatcherHashCodeRunnable implements Runnable {

    private Consumer<AbstractDispatcherHashCodeRunnable> runBefore;
    private Consumer<AbstractDispatcherHashCodeRunnable> runAfter;

    // 是否为定时任务
    private boolean scheduleTask;
    private int threadIndex;
    private long submitTime;

    // 执行的任务
    abstract public void doRun();

    // 用于分发的编号
    public abstract int getDispatcherHashCode();

    // 任务类型,同一种类型任务添加一种任务即可
    public abstract String name();

    // 该任务纳秒时间
    protected long timeoutNanoTime() {
        return TimeUnit.MILLISECONDS.toNanos(1);
    }

    public void submit(int threadIndex, boolean scheduleTask) {
        this.threadIndex = threadIndex;
        this.scheduleTask = scheduleTask;
        this.submitTime = System.nanoTime();
        IdentityEventExecutorGroup.recordSubmit(name());
    }

    @Override
    final public void run() {
        long start = System.nanoTime();
        try {
            if(runBefore != null) {
                runBefore.accept(this);
            }
            doRun();
            if(runAfter != null) {
                runAfter.accept(this);
            }
        } catch(Throwable e) {
            IdentityEventExecutorGroup.recordException(name());
        } finally {
            long now = System.nanoTime();
            long useNanoTime = now - start;
            long waitNanoTime = start - submitTime;
            IdentityEventExcutorGroup.recordExc(name(), threadIndex, scheduleTask, useNanoTime, waitNanoTime);
            this.submitTime = now;
        }
    }

    // 任务前后函数的get/set方法 ...

}

io线程池

Io线程池是一种特殊的存在,用来处理有IO操作的业务。

public class IoThreadPool {

    // 统计任务名前缀
    private static final String IO_WORK_PREFIX = "IO-WORK-";
    // 线程名前缀
    private static final String THREAD_NAME_PREFIX = "io-thread-pool";
    // 线程池线程保持存货x分钟
    private staitc final int KEEP_ALIVE_MIN = 1;
    // 队伍中允许的最大容量
    private static final QUEUE_CAPACITY = 5000;

    private static final LinkedBlockingDeque<Runnable> workQueue = new LinkedBlockDeque<>();

    private static ExecutorService executorService;

    // 队列满再提交失败次数
    private static AtomicInteger submitFailCounter = new AtomicInteger(0);
    // 队列长读最大值
    private static AtomicInteger queueMax = new AtomicInteger(0);
    // 拒绝服务数量峰值
    private static int maxSubmitFail;
    private staitc Profile profile = new Profile();

    // 初始化id线程池
    public static void init(int nThread) {
        executorService = new ThreadPoolExecutor(
            nThread / 2 ,
            nThread,
            KEEP_ALIVE_MIN,
            TimeUnit.MINUTES,
            workQueue,
            new DefaultThreadFactory(THREAD_NAME_PREFIX),
            (r, executor) -> {
                // 业务被拒绝,队列已满,线程已全部被占用,直接在当前线程执行业务代码
                r.run();
                // 提交失败次数+1
                submitFailCounter.incrementAndGet();
            }
        );
    }

    // 添加io业务
    public static void execute(String name, Runnable runnable) {
        if(StringUtils.isEmpty(name)) {
            throw new NullPointerException("name is null");
        }
        executorService.submit(IoWork.valueOf(name, runnable));
        queueMax.accumulateAndGet(workQueue.size(), Math::max);
    }

    // 关闭io线程池,最多等待15s
    public statac void shutdown() throws InterruptedException {
        executorService.awaitTermination(15, TimeUnit.SECONDS);
    }

    // 获取提交失败峰值
    public static int getMaxSubmitFail() {
        return mexSubmitFail;
    }
    // 获取队列最大长度
    public static int getQueueMax() {
        return queueMax.get();
    }

    // 性能统计信息
    private static class IoWork implements Runnable {
        String name;
        Runnable runnable;
        ProInfo proInfo;
        long submitTime;

        privte static IoWork valueOf(String name, Runnable runnable) {
            IoWork vo = new IoWork();
            vo.name = IO_WORK_PREFIX + name;
            vo.runnable = runnable;
            vo.proInfo = profile.getOrCreateProInfo(name);
            vo.proInfo.recordSubmit();
            vo.subimitTime = System.nanoTime();
            return vo;
        }

        @Override
        public voisd run() {
            long start = System.nanoTime();
            try {
                runnable.run();
            } catch(Exception e) {
                proInfo.recordException();
            }
            proInfo.recordExc(System.nanoTime() - start, 0);
        }
    }
}