netty源码分析之定时任务的的优化

1、时间计算上的优化

ScheduledFutureTask使用一个成员变量deadlineNanos记录定时任务延迟执行的时间,所以延时时间是从创建任务开始计算的。

考虑以下两种情况:

  1. 第一次创建ScheduledFutureTask时,需要先将ScheduledFutureTask类加载到虚拟机中
  2. 所有定时任务执行完成之后,JVM内存中不再有ScheduledFutureTask对象引用,下一次full gc时,ScheduledFutureTask类的元信息也会被释放掉,只有再创建定时任务需要执行类加载流程

以上情况,导致在创建定时任务之前,会执行类加载,类加载阶段的耗时,会延长定时任务的期望执行等待时间。

为了解决以上问题,netty是这样做的,当ScheduledFutureTask类加载时,会用一个静态常量记录加载完成的时间,称为T0:

1
private static final long START_TIME = System.nanoTime();

提供一个静态方法

1
2
3
4
// 获取当前时间 和 ScheduledFutureTask 加载时间的 差值
static long nanoTime() {
return System.nanoTime() - START_TIME;
}

所以,以nanoTime()返回的时间戳作为定时任务计时起点,能够有效的排除不确定的类加载导致的定时延长问题。

2、外部提交定时任务方式优化

在分析定时任务源码的时候,发现一个有意思的地方,外部线程向reactor线程提交一个定时任务。

虽然这类场景很少见,但是netty作为一个无比健壮的高性能io框架,必须要考虑到这种情况。

对此,新老版本的netty的处理方式是不同的:

  • Netty-4.1.6.Final中,如果是在外部线程调用schedule,netty将添加定时任务的逻辑封装成一个普通的task,这个task的任务是添加[添加定时任务]的任务,而不是添加定时任务,其实也就是第二种场景,这样,对 PriorityQueue的访问就变成单线程,即只有reactor线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
// 进入到场景二,进一步封装任务
execute(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
}
  • 而在4.1.50.Final版本中,并没有将外部线程提交的定时任务添加到定时任务队列中,而是直接添加到了普通队列中,并添加了一些线程控制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduleFromEventLoop(task); // 如果在线程内,添加到定时队列
} else { // 否则
final long deadlineNanos = task.deadlineNanos();
// 在提交之前进行一次判断 deadlineNanos < nextWakeUpNanos
if (beforeScheduledTaskSubmitted(deadlineNanos)) {
execute(task);
} else {
lazyExecute(task);
// 提交完成之后再一次进行判断, deadlineNanos < nextWakeUpNanos
if (beforeScheduledTaskSubmitted(deadlineNanos)) {
if (afterScheduledTaskSubmitted(deadlineNanos)) {
execute(WAKEUP_TASK); // 执行唤醒线程的任务
}
}
}
return task;
}

两者的不同之处在对else分支的处理上

第一种方式,提交一个 提交定时任务任务,这种方式貌似很有设计感,却经不起推敲。

因为提交定时任务的任务并不是立即执行的,这段代码可能本身就在任务队列中一个任务的执行流程中。

并且,netty每次执行任务 默认的最小数量是64个,见之前的分析文章。

所以,通过这种方式提交的定时任务,定时时间多多少少会被延长,也就是说所有通过这种方式提交的定时任务都不会在期望的时间内执行,而是会比期望的执行点延后执行。

所以在新版本中,为了修复以上的bug,在提交任务的时候会先比较 定时任务的剩余时间 和 下一次唤醒select()剩余时间:

  • 对于定时时间很短的任务,在提交前 deadlineNanos < nextWakeUpNanos 那么直接将任务添加到任务队列中,并立即唤醒select,使reactor线程执行队列任务

    1
    2
    3
    4
    5
    6
    @Override
    protected void wakeup(boolean inEventLoop) {
    if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
    selector.wakeup();
    }
    }
  • 对于提交前 deadlineNanos > nextWakeUpNanos 的任务,依然会提交到任务队列,但不会执行唤醒select操作。

  • 对于第二种情况,提交完成之后,会再一次判断,因为此时nextWakeUpNanos可能会发生改变(nextWakeUpNanos是reactor线程的变量,并且此代码是在外部线程运行,此时reactor线程可能正要开始新一轮的轮询,nextWakeUpNanos会重新设置,因此需要再次判断),如果此时deadlineNanos < nextWakeUpNanos成立,则提交一个空任务WAKEUP_TASK给任务队列,该任务的存在会让reactor线程在下一轮轮询无法正常开始,因为轮询之前需要判断

    1
    2
    3
    if (!hasTasks()) { // 没有待执行的任务,才能开始轮询
    strategy = select(curDeadlineNanos);
    }

之前的困惑:为什么提交到taskQueue还是能够定时执行?

因为提交的是一个ScheduledFutrueTask,这种task执行时,会根据deadLineNanos进行判断,如果没到时间直接不做运行直接返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void run() {
assert executor().inEventLoop();
try {
if (delayNanos() > 0L) { //delayNanos() 返回当前任务剩余等待时间
// 大于0表示还没到期
if (isCancelled()) {
scheduledExecutor().scheduledTaskQueue().removeTyped(this);
} else {
scheduledExecutor().scheduleFromEventLoop(this);
}
return; // 不做运行直接返回
}
if (periodNanos == 0) { //periodNanos=0 表示只执行一次
if (setUncancellableInternal()) {
V result = runTask();
setSuccessInternal(result);
}
} else {
// check if is done as it may was cancelled
if (!isCancelled()) {
runTask();
if (!executor().isShutdown()) {
if (periodNanos > 0) {
deadlineNanos += periodNanos;
} else {
deadlineNanos = nanoTime() - periodNanos;
}
if (!isCancelled()) { // 添加回定时队列
scheduledExecutor().scheduledTaskQueue().add(this);
}
}
}
}
} catch (Throwable cause) {
setFailureInternal(cause);
}
}

periodNanos:

  • 0 表示不重复执行
  • 正数 表示每隔 periodNanos 执行1一次
  • 负数 表示执行完之后间隔 periodNanos 再执行