游戏线程模型学习
任务调度器
public class dentityEventExectorGroup {
private static fanal String EXECUTOR_NAME_PREFIX = "Identity-dispatcher";
static private EventExecutor[] childrens;
static private Profile threadProfile = new Profile();
static private Profile taskProfile = new Prfile;
// 初始化, nThreads 线程数量
synchronized public static void init() {
if(childrens == null) {
childrens = new EventExecutor[nThreads];
ThreadFactory threadFactory = new DefaultThreadFactory(EXECUTOR_NAME_PREFIX);
for(int i=0; i< nThreaeds; i++) {
EventExecutor eventExecutors = new EventExecutor(i+1, null, threadDactory, true);
childrebs[i] = eventEventors;
}
}
}
public static void shutdown() {
for(EventExcutor c : childrens) {
c.shutdownGracefully();
}
}
private static EventExcutor takeExecutor(int dispatcherHashCode) {
return childrens[Math.abs(dispatcherHashCode % childrens.length)];
}
// 添加同步任务
public static Future<?> addTask(AbstractDispatcherHashCodeRunnable dispatcherHashCodRunnable) {
checkName(dispatcherHashCodRunnable.name());
EventExecutor eventExecutor = takeExecutor(dispatcherHashCodRunnable.getDispatcherHashCode());
dispathcherHashCodeRunnable.submit(eventExecutor.getIndex(),false);
return eventExecutor.addTask(dispatcherHashCodeRunnable);
}
// 添加同步任务
public static Future<?> addTask(int dispatcherCode,String name, Runnable runnable) {
checkName(name);
return addTask(new AbstractDispatcherHashCodeRunnable() {
@Override
public String name() {
return name;
}
@Ovveride
public int getDispatcherHashCode() {
return dispatcherCode();
}
@Override
public void doRun() {
runnable.ru();
}
});
}
// 添加延迟任务
public static ScheduleFuture<?> addScheduleTask(AbstractDispatcherHashCodeRunnable dispatcherHashCodRunnable, long delay, TimeUtil unit) {
checkName(dispatcherHashCodRunnable.name());
EventExecutor eventExecutor = takeExecutor(dispatcherHashCodRunnable.getDispatcherHashCode());
dispathcherHashCodeRunnable.submit(eventExecutor.getIndex(),false);
return eventExecutor.addScheduleTask(dispatcherHashCodeRunnable);
}
public static ScheduledFuture<?> addScheduleTask(int dispatcherHashCode, String name, String name, long delay, TimeUtil unit, Runnable runnable) {
checkName(name);
return addScheduleTask(new AbstractDispatcherHashCodeRunnable() {
@Override
public int getDispatcherHashCode() {
return dispatcherHashCode;
}
@Override
public String name() {
return name;
}
@Override
public void doRun() {
runnable.run();
}
}, delay, unit);
}
// 添加定时器任务
// 该任务按照周期执行
public static addScheduleAtFixedRate(AbstractDispatcherHashCodeRunnable dispatcherHashCodeRunnable,
long initialDeley, long period, TimeUnit unit) {
checkName(dispatcherHashCodeRunnable.name());
EventExecutor eventExecutor = taskExecutor(dispatcherHashCodeRunnable.getDisatcherHashCode());
// 延迟任务等待耗时统计无意义
dispatcherHashCodeRunnable.submit(eventExecutor.getIndex(), true);
return eventExecutor.addScheduleAtFixedRate(dispatcherHashCodeRunnable, initialDelay, period, unit);
}
// 添加定时器任务
// 该任务按照周期执行
public static ScheduledFuture<?> addScheduleAtFixedRate(int dispatcherHashCode, String name, String name, long initialDelay, long period, TimeUtil unit, Runnable runnable ) {
return addScheduleAtFixedRate(new AbstractDispatcherHashCodeRunnable() {
@Override
public int getDispatcherHashCode() {
return dispatcherHashCode;
}
@Override
public String name() {
return name;
}
@Override
public void doRun() {
runnable.run();
}
}, initialDelay,period, unit);
}
// 添加定时器任务
// 该任务按延迟时间执行
public static addScheduleAtFixedRate(AbstractDispatcherHashCodeRunnable dispatcherHashCodeRunnable,
long initialDeley, long period, TimeUnit unit) {
checkName(dispatcherHashCodeRunnable.name());
EventExecutor eventExecutor = taskExecutor(dispatcherHashCodeRunnable.getDisatcherHashCode());
// 延迟任务等待耗时统计无意义
dispatcherHashCodeRunnable.submit(eventExecutor.getIndex(), true);
return eventExecutor.addScheduleAtFixedDelay(dispatcherHashCodeRunnable, initialDelay, period, unit);
}
// 添加定时器任务
// 该任务按照延迟时间执行
public static ScheduledFuture<?> addScheduleAtFixedRate(int dispatcherHashCode, String name, String name, long initialDelay, long period, TimeUtil unit, Runnable runnable ) {
return addScheduleAtFixedDelay(new AbstractDispatcherHashCodeRunnable() {
@Override
public int getDispatcherHashCode() {
return dispatcherHashCode;
}
@Override
public String name() {
return name;
}
@Override
public void doRun() {
runnable.run();
}
}, initialDelay,period, unit);
}
// 获取线程池线程数量
public static getThreadNum() {
return childrens.length;
}
// 等待线程池前面提交的任务全部执行完成
public static void blockWaitRunOver() {
int threadNum = getThreadNum();
AtomicInteger ready = new AtomicInteger(threadNum);
// 注意,这里是要原子变量的任务塞满任务队列
for(int i =0; i< threadNum; i++>) {
addTask(i, "checkReady", ready::decrementAndGet);
}
// 直到所有加入任务队列的原子减一任务执行完毕,ready的值变为0,才会结束while循环,结束函数结束阻塞
while(ready.get() > 0) {
try{
TimeUnit.MillLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static void checkName(String name) {
if(StringUtils.isEmpty(name)) {
throw new NullPointerException("name is null!");
}
}
}
事件执行器
class EventExecutor extends SingleTreadEventExecutor {
private int index;
protected EventExecutor(int index, EventExecutorGroup parent, ThreadFacotry threadFactory, boolean addTaskWakeUp{ super(parent, threadFactory, addTaskWakesUp);
this.index = index;
}
@Override
protected void run() {
do {
Runnable task = takeTask();
if(task != null) {
task.run();
updateLastExecutionTime();
}
} while (!confirmShutdown());
}
public ScheduleFuture<?> addScheduleTask(AbstractDispatcherHashCodeRunnable task, long delay, TimeUnit unit) {
return schedule(task, deley, unit);
}
// ... 接下来一堆定时器方法
public Future<?> addTask() {
return submit(task);
}
public int getIndex() {
return index;
}
}
任务分发标记器
public abstract class AbstractDispatcherHashCodeRunnable implements Runnable {
private Consumer<AbstractDispatcherHashCodeRunnable> runBefore;
private Consumer<AbstractDispatcherHashCodeRunnable> runAfter;
// 是否为定时任务
private boolean scheduleTask;
private int threadIndex;
private long submitTime;
// 执行的任务
abstract public void doRun();
// 用于分发的编号
public abstract int getDispatcherHashCode();
// 任务类型,同一种类型任务添加一种任务即可
public abstract String name();
// 该任务纳秒时间
protected long timeoutNanoTime() {
return TimeUnit.MILLISECONDS.toNanos(1);
}
public void submit(int threadIndex, boolean scheduleTask) {
this.threadIndex = threadIndex;
this.scheduleTask = scheduleTask;
this.submitTime = System.nanoTime();
IdentityEventExecutorGroup.recordSubmit(name());
}
@Override
final public void run() {
long start = System.nanoTime();
try {
if(runBefore != null) {
runBefore.accept(this);
}
doRun();
if(runAfter != null) {
runAfter.accept(this);
}
} catch(Throwable e) {
IdentityEventExecutorGroup.recordException(name());
} finally {
long now = System.nanoTime();
long useNanoTime = now - start;
long waitNanoTime = start - submitTime;
IdentityEventExcutorGroup.recordExc(name(), threadIndex, scheduleTask, useNanoTime, waitNanoTime);
this.submitTime = now;
}
}
// 任务前后函数的get/set方法 ...
}
io线程池
Io线程池是一种特殊的存在,用来处理有IO操作的业务。
public class IoThreadPool {
// 统计任务名前缀
private static final String IO_WORK_PREFIX = "IO-WORK-";
// 线程名前缀
private static final String THREAD_NAME_PREFIX = "io-thread-pool";
// 线程池线程保持存货x分钟
private staitc final int KEEP_ALIVE_MIN = 1;
// 队伍中允许的最大容量
private static final QUEUE_CAPACITY = 5000;
private static final LinkedBlockingDeque<Runnable> workQueue = new LinkedBlockDeque<>();
private static ExecutorService executorService;
// 队列满再提交失败次数
private static AtomicInteger submitFailCounter = new AtomicInteger(0);
// 队列长读最大值
private static AtomicInteger queueMax = new AtomicInteger(0);
// 拒绝服务数量峰值
private static int maxSubmitFail;
private staitc Profile profile = new Profile();
// 初始化id线程池
public static void init(int nThread) {
executorService = new ThreadPoolExecutor(
nThread / 2 ,
nThread,
KEEP_ALIVE_MIN,
TimeUnit.MINUTES,
workQueue,
new DefaultThreadFactory(THREAD_NAME_PREFIX),
(r, executor) -> {
// 业务被拒绝,队列已满,线程已全部被占用,直接在当前线程执行业务代码
r.run();
// 提交失败次数+1
submitFailCounter.incrementAndGet();
}
);
}
// 添加io业务
public static void execute(String name, Runnable runnable) {
if(StringUtils.isEmpty(name)) {
throw new NullPointerException("name is null");
}
executorService.submit(IoWork.valueOf(name, runnable));
queueMax.accumulateAndGet(workQueue.size(), Math::max);
}
// 关闭io线程池,最多等待15s
public statac void shutdown() throws InterruptedException {
executorService.awaitTermination(15, TimeUnit.SECONDS);
}
// 获取提交失败峰值
public static int getMaxSubmitFail() {
return mexSubmitFail;
}
// 获取队列最大长度
public static int getQueueMax() {
return queueMax.get();
}
// 性能统计信息
private static class IoWork implements Runnable {
String name;
Runnable runnable;
ProInfo proInfo;
long submitTime;
privte static IoWork valueOf(String name, Runnable runnable) {
IoWork vo = new IoWork();
vo.name = IO_WORK_PREFIX + name;
vo.runnable = runnable;
vo.proInfo = profile.getOrCreateProInfo(name);
vo.proInfo.recordSubmit();
vo.subimitTime = System.nanoTime();
return vo;
}
@Override
public voisd run() {
long start = System.nanoTime();
try {
runnable.run();
} catch(Exception e) {
proInfo.recordException();
}
proInfo.recordExc(System.nanoTime() - start, 0);
}
}
}