netty源码分析之线程模型

一个NioEventLoop对应于Reactor模型中的一个从Reactor线程,它持有一个Thread引用,可以简单将NioEventLoop理解为一个用于处理channel事件的线程。

一个channel上的事件只能被同一个线程处理,NioEventLoop线程对channel事件的处理是一个串行化无锁执行过程,netty在初始化channel的时候在pipeline中添加了一系列用户指定的Handler(通过childHanler()方法),这些Handler的处理需要遵循一个固定的顺序,netty底层使用同一个线程按照这个顺序串行执行,避免了多线程处理同一个channel需要使用锁同步产生的开销,这叫串行化无锁编程

一个NioEventLoop可以处理多个channel的就绪事件,即同一个nio线程可以处理多条连接的请求,这叫多路复用

1、Channel指定evenloop

Channel的EventLoop是在注册的时候指定的,netty服务器启动时,首先会注册一个ServerChannel,该注册工作由Boss线程池完成,通常Boss线程池中只有一个线程。

1
2
3
4
5
6
// AbstractBootstrap
final ChannelFuture initAndRegister() {
...
ChannelFuture regFuture = config().group().register(channel);
...
}

group().register(channel)在boss线程池中注册Channel,boss线程池调用next()方法获取一个EventLoop对象来注册channel,看下面代码:

1
2
3
4
// MultithreadEventLoopGroup
public ChannelFuture register(Channel channel) {
return next().register(channel);
}

next方法获取到一个EventLoop对象之后,再调用SingleThreadEventLoop类的注册方法:

1
2
3
4
5
6
// SingleThreadEventLoop
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}

最终会调用channel的unsafe对象完成注册,并在此时将this EventLoop 作为参数传进去

1
2
3
4
5
6
// AbstractChannel.AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
...
}

注册的时候完成了eventLoop的指定。

总结一下:

以上分析的是ServerChannel注册时指定EventLoop的过程,这个过程只会在服务启动的时候进行,而在服务运行期间,会不断的有新连接接入,每个新连接都需要注册到一个selector上,这是nio编程的规则。

所以服务启动程序创建的childGroup中的每个线程都持有一个selector的,新连接的注册过程,就是从childGroup中选取一个EventLoop对象,然后将新连接channel注册到该线程对象的selector上,再指定新连接channel的EventLoop为当前选取的的实例。

2、reactor线程启动

https://www.jianshu.com/p/0d0eece6d467

netty对于线程的创建采取的懒加载模式,第一次提交任务的时候才会创建线程。

NioServerSocketChannel 第一次提交任务,就是在注册的时候

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//AbstractChannel.AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop; // 刚指定了channel的eventLoop
if (eventLoop.inEventLoop()) { // 还是主线程在执行,返回false
register0(promise);
} else {
try {//来到这里提交注册任务
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {}
}
}

eventLoop.execute()可以向NioEventLoop中提交一个任务,这个方法继承自父类SingleThreadEventExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// SingleThreadEventExecutor 
private void execute(Runnable task, boolean immediate) {
// 判断当前线程是不是this对象的线程
// 还是主线程在执行,返回false
boolean inEventLoop = inEventLoop();
//将task加入EventLoop持有的任务队列
addTask(task);
if (!inEventLoop) {
//如果执行此代码的线程不是eventloop线程,创建新线程并启动
startThread();
...
}
...
}

跟进startThread()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// SingleThreadEventExecutor 
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread(); // >>
...
}

private void doStartThread() {
...
// 创建新线程
executor.execute(new Runnable() {
@Override
public void run() {
// 设置eventLoop持有线程为执行这段代码的线程
thread = Thread.currentThread();
...
SingleThreadEventExecutor.this.run();
...
}
}
}

再调用了doStartThread方法,这个方法是启动线程的核心逻辑了,在执行doStartThread的时候,会调用eventLoop内部的一个用于新建线程的执行器execute()方法,注意与上面的区别,此执行器默认为ThreadPerTaskExecutor类型,创建新线程后将NioEventLoop的主体逻辑run()提交进去,并启动。

ThreadPerTaskExecutor 在每次执行execute() 方法的时候都会通过DefaultThreadFactory创建一个FastThreadLocalThread线程,而这个线程就是netty中的reactor线程实体,创建线程源码如下:

1
2
3
4
5
6
7
8
9
10
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}

总结一下:

  1. 我们在new一个NioEventLoopGroup的时候,它会持有一个NioEventLoop数组,数组中每个NioEventLoop元素对象会持有一个Thread引用,就是reactor线程本体了。
  2. netty对于线程的初始化采取的懒加载模式,当我们没有用到某个NioEventLoop时,它的线程是不会被创建出来的。
  3. 当我们通过group().next()获取到一个NioEventLoop,并向其提交任务,这时就会触发线程的创建-任务提交-启动。创建是通过 ThreadPerTaskExecutorDefaultThreadFactory 两个类执行的,新线程执行的任务是 NioEventLoop 的主体逻辑run方法。

3、reactor线程执行

https://www.jianshu.com/p/467a9b41833e

回到主线逻辑中,创建的线程中执行了SingleThreadEventExecutor.this.run();即reactor线程的主体逻辑,贴一下主要代码片段

1
2
3
4
5
6
7
8
9
10
11
protected void run() {
for (;;) {
...
strategy = select(curDeadlineNanos);
...
processSelectedKeys();
...
ranTasks = runAllTasks(0);
...
}
}

此方法内部是一个无限循环,一旦启动将一直运行。

从以上代码中可以看出,线程一直在循环做三件事情

  1. 执行select
  2. 处理就绪的channel
  3. 执行任务队列中的任务

此处的源代码很长,下面我将分三个步骤分析run函数的主要流程

3.1 select阶段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
// hasTask 时continue
case SelectStrategy.CONTINUE:
continue;
// 由于NIO不支持忙碌等待,因此要选择跳过
case SelectStrategy.BUSY_WAIT:
// 当没有可调度任务时 strategy = SelectStrategy.SELECT
case SelectStrategy.SELECT:
// 获取现在到下一个计划任务调度执行之间的时间,没有定时任务返回-1
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
//NONE 是Integer.maxValue
curDeadlineNanos = NONE;
}
// 设定原子量
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
...
}
// fall through
default:
}
} catch (IOException e) {
...
}
// select计数器+1
selectCnt++;
...
}catch(Exception e){ }
}
}

首先看strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());这句,netty会根据任务队列的情况执行相应的select操作:

1
2
3
4
// DefaultSelectStrategy
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

hasTasks()返回true表明当前的任务队列中有需要执行的任务,会执行selectSupplier.get(),那么selectSupplier是什么呢?定位到该成员变量的初始化代码:

1
2
3
4
5
6
7
8
9
10
// NioEventLoop
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
int selectNow() throws IOException {
return selector.selectNow();
}

看到这里我就可以得出结论了,strategy的初始化逻辑是:当任务队列hasTasks时selectNow,返回就绪数量,否则返回SelectStrategy.SELECT

strategy=SelectStrategy.SELECT时,获取现在到下一个计划任务调度执行之间的时间curDeadlineNanos,在这段时间内执行strategy=select(curDeadlineNanos)

strategy为正数,说明select到了就绪事件,此时将继续往下执行,进入处理阶段。

思考:SelectStrategy.CONTINUE是哪种情况时设置的?

3.2 处理阶段

ioRatio 时间控制

netty通过一个int型参数ioRatio来控制io与cpu的耗时比例,ioRatio默认的初始值为50,即io和cpu计算各占50%

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
cancelledKeys = 0;  //
needsToSelectAgain = false;
// io时间所占比例,初始值为50
final int ioRatio = this.ioRatio;
boolean ranTasks; // 记录runAllTasks返回值,表示runAllTasks执行时间有没有超出参数指定的时间,是返回true
if (ioRatio == 100) {
try {
if (strategy > 0) {
// 就绪数量大于0,处理selectedKeys,这里的处理属于io处理
processSelectedKeys();
}
} finally {
// io时间占比达到100% 将一直执行runAllTasks 这里的处理属于cpu处理
// runAllTasks没有指定时间参数,当且仅当最后一个任务被执行之后 返回 true
ranTasks = runAllTasks();
}
} else if (strategy > 0) { // 说明有通道就绪
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
// 由于ioRatio = 50 所以运行runAllTasks时间与上面处理io时间一致
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
  • 如果ioRatio等于100,那么优先处理io事件,处理完就绪通道,再执行队列任务,一次全部执行完。
  • 如果ioRatio不等于100,
    • 若有就绪通道,那么在处理通道io事件时,会记录用时,完了之后再使用同样的时间去执行队列任务
    • 若没有就绪通道,则执行最小数量(不一定是1个,后面分析)的队列任务

就绪通道处理不仅仅是对执行时间进行了细粒度的优化,在处理过程中也是尽可能的考虑到了执行效率,继续往下分析处理,也就是processSelectedKeys()方法,贴上代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
//NioEventLoop
private void processSelectedKeys() {
//这里做的优化,主要是使用netty 自定义的selectedKeys对象 通过反射的方式替换掉jdk原生selector中的 selectedKeySet字段属性
// 自定义的keyset 使用数组存放selectedKeys,原生的使用HashSet,主要优化的是检索性能
// 替换过程 见本类的openSelector方法
if (selectedKeys != null) {
// 优化过的处理 一般会进这里
processSelectedKeysOptimized();
} else {
// 正常处理
processSelectedKeysPlain(selector.selectedKeys());
}
}
selectedKeys优化

我们对优化过的selectedKeys的处理稍微展开一下,看看netty是如何优化的,我们查看 selectedKeys 被引用过的地方,有如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private SelectorTuple openSelector() {
...
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
...
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
}
...
}
});
...
selectedKeys = selectedKeySet;
...
}

首先,selectedKeys是一个 SelectedSelectionKeySet 类对象,在NioEventLoopopenSelector 方法中创建,之后就通过反射将selectedKeys与 sun.nio.ch.SelectorImpl 中的两个field绑定

sun.nio.ch.SelectorImpl 中我们可以看到,这两个field其实是两个HashSet

1
2
3
4
5
6
7
8
9
10
11
12
13
abstract class SelectorImpl extends AbstractSelector
{
// Public views of the key sets
private final Set<SelectionKey> publicKeys; // Immutable
private final Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition
protected SelectorImpl(SelectorProvider sp) {
super(sp);
keys = ConcurrentHashMap.newKeySet();
selectedKeys = new HashSet<>();
publicKeys = Collections.unmodifiableSet(keys);
publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}
}

selector在调用select()族方法的时候,如果有IO事件发生,就会往里面的两个field中塞相应的selectionKey(具体怎么塞有待研究),即相当于往一个hashSet中add元素,既然netty通过反射将jdk中的两个field替换掉,那我们就应该意识到是不是netty自定义的SelectedSelectionKeySetadd方法做了某些优化呢?

带着这个疑问,我们进入到 SelectedSelectionKeySet 类中探个究竟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
SelectionKey[] keys;
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}
return true;
}
...
}

该类其实很简单,继承了 AbstractSet,说明该类可以当作一个set来用,但是底层使用一个数据来存放selectedKey,在add方法中经历下面三个步骤

  1. 将SelectionKey塞到该数组的逻辑尾部

  2. 更新该数组的逻辑长度+1

  3. 如果该数组的逻辑长度等于数组的物理长度,就将该数组扩容

我们可以看到,待程序跑过一段时间,等数组的长度足够长,每次在轮询到nio事件的时候,netty只需要O(1)的时间复杂度就能将 SelectionKey 塞到 set中去,而jdk底层使用的hashSet需要O(lgn)的时间复杂度

执行就绪事件

关于netty对SelectionKeySet的优化我们暂时就跟这么多,下面我们继续跟netty对IO事件的处理,转到processSelectedKeysOptimized方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// 手动置空 gc
selectedKeys.keys[i] = null;
// SelectionKey中附件了channel的引用
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task); // >>
}
// needsToSelectAgain 默认为false 在本类的cancel方法中,当连接取消次数达到256 被设置为true
// cancel方法 最终是在 AbstractNioChannel 类的 doDeregister 方法中被调用,调用一次取消连接数+1
if (needsToSelectAgain) {
// reset将selectedKeys中未处理的所有key手动置空
selectedKeys.reset(i + 1);
//重新select
selectAgain();
i = -1;
}
}
}

该方法有两个关键内容

  • processSelectedKey() 方法,就绪事件处理的核心逻辑
  • needsToSelectAgain 参数,判断是否该再来次轮询

processSelectedKey:逻辑很简单,获取就绪通道的监听事件,根据不同的事件分别进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
...
return;
}
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // 表示监听了连接事件
int ops = k.interestOps();
// 连接就绪是所有就绪选择中最简单的,对它的处理也很简单。
// 当客户端调用connect()并注册OP_CONNECT事件后,连接操作就会就绪。
// 因此该事件只可能出现在客户端程序
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0)
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 注意:所有通道注册的时候都没有第一时间指定监听事件,而ops==0 时,这里默认是监听读就绪事件
// NioServerSocketChannel 中创建的是NioMessageUnSafe实例 ,处理OP_ACCEPT
// NioSocketChannel 中创建的是NioByteUnSafe实例 , 处理 OP_READ
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
needsToSelectAgain及重轮询
1
2
3
4
5
6
7
8
9
// needsToSelectAgain 默认为false 在本类的cancel方法中,当连接取消次数达到256 被设置为true
// cancel方法 最终是在 AbstractNioChannel 类的 doDeregister 方法中被调用,调用一次取消连接数+1
if (needsToSelectAgain) {
// reset将selectedKeys中未处理的所有key手动置空
selectedKeys.reset(i + 1);
//重新select
selectAgain();
i = -1;
}

回忆上面run函数的代码,这个参数在每一次轮询结束后都会被置为false,那么上面时候会置为true,从而执行在一次轮询呢?

通过检索,找到这样一个方法:

1
2
3
4
5
6
7
8
9
// NioEventLoop
void cancel(SelectionKey key) {
key.cancel();
cancelledKeys ++;
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
needsToSelectAgain = true;
}
}

还需要继续找出调用cancel函数的地方,使用IDEA的find usages

1
2
3
4
//AbstractNioChannel
protected void doDeregister() throws Exception {
eventLoop().cancel(selectionKey());
}

不难看出,在channel从selector上移除的时候,调用cancel函数将key取消,并且当被去掉的key到达 CLEANUP_INTERVAL 的时候,设置needsToSelectAgain为true,CLEANUP_INTERVAL默认值为256

也就是说,对于每个NioEventLoop而言,每隔256个channel从selector上移除的时候,就标记 needsToSelectAgain 为true,我们还是跳回到上面这段代码

每满256次,就会进入到if的代码块,首先,将selectedKeys的内部数组全部清空,方便被jvm垃圾回收,然后重新调用selectAgain重新填装一下 selectionKey

1
2
3
4
5
6
7
8
private void selectAgain() {
needsToSelectAgain = false;
try {
selector.selectNow();
} catch (Throwable t) {
logger.warn("Failed to update SelectionKeys.", t);
}
}

netty这么做的目的应该是每隔256次channel断线,重新清理一下selectionKey,保证现存的SelectionKey及时有效

4 执行队列任务

https://www.jianshu.com/p/58fad8e42379

4.1 netty任务

netty中的task的常见使用场景:

  • 非当前reactor线程调用channel的各种方法

已经分析过的服务启动过程中,主通道的注册、新连接接入时子通道的注册就是提交任务完成的,因为彼时运行的线程是main线程和主线程。

此外,这种情况在push系统中比较常见,一般在业务线程里面,根据用户的标识,找到对应的channel引用,然后调用write类方法向该用户推送消息,就会进入到这种场景

1
2
// non reactor thread
channel.write(...)

关于channel.write()类方法的调用链,后面会单独拉出一篇文章来深入剖析,这里,我们只需要知道,最终write方法串至以下方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void write(Object msg, boolean flush, ChannelPromise promise) {
// ...
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}

外部线程在调用write的时候,executor.inEventLoop()会返回false,直接进入到else分支,将write封装成一个WriteTask(这里仅仅是write而没有flush,因此flush参数为false), 然后调用 safeExecute方法

1
2
3
4
5
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
// ...
executor.execute(runnable);
// ...
}

接下来的调用链就进入到第一种场景了,但是和第一种场景有个明显的区别就是,第一种场景的调用链的发起线程是reactor线程,第二种场景的调用链的发起线程是用户线程,用户线程可能会有很多个,显然多个线程并发写taskQueue可能出现线程同步问题,此时阻塞队列的作用展现出来了。

  • 用户自定义普通任务

推荐使用的方法。

1
2
3
4
5
6
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
//...
}
});

如果在业务处理的代码中,即业务处理Handler中,没有使用这种提交任务的方式,那么业务处理逻辑就会被当成IO阶段的一部分直接执行,占用IO时间。如果连接数大,会影响系统的吞吐量,推荐使用任务提交的方式处理业务请求。

我们跟进execute方法,看重点

1
2
3
4
5
6
@Override// NioEventLoop
public void execute(Runnable task) {
//...
addTask(task);
//...
}

execute方法调用 addTask方法

1
2
3
4
5
6
7
// NioEventLoop
protected void addTask(Runnable task) {
// ...
if (!offerTask(task)) {
reject(task);
}
}

然后调用offerTask方法,如果offer失败,那就调用reject方法,通过默认的 RejectedExecutionHandler 直接抛出异常

1
2
3
4
5
// NioEventLoop
final boolean offerTask(Runnable task) {
// ...
return taskQueue.offer(task);
}

跟到offerTask方法,基本上task就落地了,netty内部使用一个taskQueue将task保存起来,那么这个taskQueue又是何方神圣?

我们查看 taskQueue 定义的地方和被初始化的地方

1
2
3
4
5
6
7
8
9
// NioEventLoop
private final Queue<Runnable> taskQueue;

taskQueue = newTaskQueue(this.maxPendingTasks);

@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
}

我们发现 taskQueue在NioEventLoop中默认是阻塞队列,老版本(4.1.6)中使用mpsc队列,即多生产者单消费者队列,netty使用mpsc,方便的将外部线程的task聚集,在reactor线程内部用单线程来串行执行,此处为什么要做此改变?

在本节讨论的任务场景中,所有代码的执行都是在reactor线程中的,所以,所有调用 inEventLoop() 的地方都返回true,既然都是在reactor线程中执行,那么其实这里的阻塞队列其实没有发挥真正的作用。

  • 用户自定义定时任务
1
2
3
4
5
6
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {

}
}, 60, TimeUnit.SECONDS);

第三种场景就是定时任务逻辑了,用的最多的便是如上方法:在一定时间之后执行任务

我们跟进schedule方法

1
2
3
4
5
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
//...
return schedule(new ScheduledFutureTask<Void>(
this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}

通过 ScheduledFutureTask, 将用户自定义任务再次包装成一个netty内部的任务

1
2
3
4
5
6
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
// ...
scheduledTaskQueue().add(task);
// ...
return task;
}

到了这里,我们有点似曾相识,在非定时任务的处理中,netty通过一个阻塞队列将任务落地,这里,是否也有一个类似的队列来承载这类定时任务呢?带着这个疑问,我们继续向前

1
2
3
4
5
6
7
8
9
10
//
Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
SCHEDULED_FUTURE_TASK_COMPARATOR,
// Use same initial capacity as java.util.PriorityQueue
11);
}
return scheduledTaskQueue;
}

果不其然,scheduledTaskQueue() 方法,会返回一个优先级队列,然后调用 add 方法将定时任务加入到队列中去,但是,这里为什么要使用优先级队列,而不需要考虑多线程的并发?

因为我们现在讨论的场景,调用链的发起方是reactor线程,不会存在多线程并发这些问题

但是,万一有的用户在reactor之外执行定时任务呢?虽然这类场景很少见,但是netty作为一个无比健壮的高性能io框架,必须要考虑到这种情况。

对此,新老版本的netty的处理方式是不同的:

Netty-4.1.6.Final中,如果是在外部线程调用schedule,netty将添加定时任务的逻辑封装成一个普通的task,这个task的任务是添加[添加定时任务]的任务,而不是添加定时任务,其实也就是第二种场景,这样,对 PriorityQueue的访问就变成单线程,即只有reactor线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
// 进入到场景二,进一步封装任务
execute(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
}

而在4.1.50.Final版本中,并没有将外部线程提交的定时任务添加到定时任务队列中,而是直接添加到了普通队列中,并添加了一些线程控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduleFromEventLoop(task); // 如果在线程内,添加到定时队列
} else { // 否则
final long deadlineNanos = task.deadlineNanos();
// 在提交之前进行一次判断 deadlineNanos 是否 小于 当前距离线程下一次被唤醒的nanos
if (beforeScheduledTaskSubmitted(deadlineNanos)) {
execute(task); // >> 这里需要跟进去
} else {
lazyExecute(task);
// 提交完成之后再一次进行判断, deadlineNanos是否 小于 当前距离线程下一次被唤醒的nanos
if (afterScheduledTaskSubmitted(deadlineNanos)) {
execute(WAKEUP_TASK); // 执行唤醒线程的任务
}
}
}

return task;
}

跟进execute(task)方法

1
2
3
4
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}

task是ScheduledFutureTask类型,task instanceof LazyRunnable为false,wakesUpForTask(task)返回true,所以这里会调用execute(task,true)

1
2
3
4
5
6
7
8
9
10
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
//将task加入任务队列 taskQueue 中
addTask(task);
...
// addTaskWakesUp 一般默认为false
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}

传入的参数immediate表示是否立即唤醒select()来执行任务,所以此时会执行wakeup(inEventLoop)方法,

1
2
3
4
5
6
// NioEventLoop
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
selector.wakeup();
}
}

那么再看lazyExecute(task)

1
2
3
public void lazyExecute(Runnable task) {
execute(ObjectUtil.checkNotNull(task, "task"), false);
}

指定参数immediate为false,表示任务添加之后不会立即唤醒线程。所以在任务提交完成之后,会再一次进行判断

deadlineNanos是否 小于 当前距离线程下一次被唤醒的nanos,如果是则执行唤醒线程的任务(那个空任务)

总结:

下面继续分析:

为什么定时任务要保存在优先级队列中,我们可以先不看源码,来思考一下优先级对列的特性

优先级队列按一定的顺序来排列内部元素,内部元素必须是可以比较的,联系到这里每个元素都是定时任务,那就说明定时任务是可以比较的,那么到底有哪些地方可以比较?

每个任务都有一个下一次执行的截止时间,截止时间是可以比较的,截止时间相同的情况下,任务添加的顺序也是可以比较的,就像这样,阅读源码的过程中,一定要多和自己对话,多问几个为什么

带着猜想,我们研究与一下ScheduledFutureTask,抽取出关键部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
private static final AtomicLong nextTaskId = new AtomicLong();
private static final long START_TIME = System.nanoTime();

static long nanoTime() {
return System.nanoTime() - START_TIME;
}

private final long id = nextTaskId.getAndIncrement();
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
private final long periodNanos;

@Override
public int compareTo(Delayed o) {
//...
}

// 精简过的代码
@Override
public void run() {
}

这里,我们一眼就找到了compareTo 方法,cmd+u跳转到实现的接口,发现就是Comparable接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public int compareTo(Delayed o) {
if (this == o) {
return 0;
}

ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
long d = deadlineNanos() - that.deadlineNanos();
if (d < 0) {
return -1;
} else if (d > 0) {
return 1;
} else if (id < that.id) {
return -1;
} else if (id == that.id) {
throw new Error();
} else {
return 1;
}
}

进入到方法体内部,我们发现,两个定时任务的比较,确实是先比较任务的截止时间,截止时间相同的情况下,再比较id,即任务添加的顺序,如果id再相同的话,就抛Error

这样,在执行定时任务的时候,就能保证最近截止时间的任务先执行

下面,我们再来看下netty是如何来保证各种定时任务的执行的,netty里面的定时任务分以下三种

1.若干时间后执行一次
2.每隔一段时间执行一次
3.每次执行结束,隔一定时间再执行一次

netty使用一个 periodNanos 来区分这三种情况,正如netty的注释那样

1
2
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
private final long periodNanos;

了解这些背景之后,我们来看下netty是如何来处理这三种不同类型的定时任务的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void run() {
if (periodNanos == 0) {
V result = task.call();
setSuccessInternal(result);
} else {
task.call();
long p = periodNanos;
if (p > 0) {
deadlineNanos += p;
} else {
deadlineNanos = nanoTime() - p;
}
scheduledTaskQueue.add(this);
}
}
}

if (periodNanos == 0) 对应 若干时间后执行一次 的定时任务类型,执行完了该任务就结束了。

否则,进入到else代码块,先执行任务,然后再区分是哪种类型的任务,periodNanos大于0,表示是以固定频率执行某个任务,和任务的持续时间无关,然后,设置该任务的下一次截止时间为本次的截止时间加上间隔时间periodNanos,否则,就是每次任务执行完毕之后,间隔多长时间之后再次执行,截止时间为当前时间加上间隔时间,-p就表示加上一个正的间隔时间,最后,将当前任务对象再次加入到队列,实现任务的定时执行

netty内部的任务添加机制了解地差不多之后,我们就可以查看reactor第三部曲是如何来调度这些任务的

4.2 reactor线程task的调度

首先,我们将目光转向最外层的外观代码

1
runAllTasks(long timeoutNanos);

顾名思义,这行代码表示了尽量在一定的时间内,将所有的任务都取出来run一遍。timeoutNanos 表示该方法最多执行这么长时间,netty为什么要这么做?我们可以想一想,reactor线程如果在此停留的时间过长,那么将积攒许多的IO事件无法处理(见reactor线程的前面两个步骤),最终导致大量客户端请求阻塞,因此,默认情况下,netty将控制内部队列的执行时间

好,我们继续跟进

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
protected boolean runAllTasks(long timeoutNanos) {
//从scheduledTaskQueue转移定时任务到taskQueue
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
//计算本次任务循环的截止时间
final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
//执行任务
for (;;) {
safeExecute(task);
runTasks ++;

// 每隔0x3F任务,即每执行完64次任务之后,判断当前时间是否超过本次reactor任务循环的截止时间了,
// 如果超过,那就break掉,如果没有超过,那就继续执行。
// 可以看到,netty对性能的优化考虑地相当的周到,假设netty任务队列里面如果有海量小任务,
// 如果每次都要执行完任务都要判断一下是否到截止时间,那么效率是比较低下的
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
// 收尾
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}

这段代码便是reactor执行task的所有逻辑,可以拆解成下面几个步骤

  1. 从scheduledTaskQueue转移定时任务到taskQueue(mpsc queue)
  2. 计算本次任务循环的截止时间
  3. 执行任务
  4. 收尾

从scheduledTaskQueue转移定时任务到taskQueue(mpsc queue)

首先调用 fetchFromScheduledTaskQueue()方法,将到期的定时任务转移到mpsc queue里面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private boolean fetchFromScheduledTaskQueue() {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return true;
}
//为当前纳秒减去ScheduledFutureTask类被加载的纳秒个数
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
for (;;) {
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
return true;
}
// 转移失败重新放回定时队列
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
}
}

可以看到,netty在把任务从scheduledTaskQueue转移到taskQueue的时候还是非常小心的,当taskQueue无法offer的时候,需要把从scheduledTaskQueue里面取出来的任务重新添加回去

scheduledTaskQueue从拉取一个定时任务的逻辑如下,传入的参数nanoTime为当前时间(其实是当前纳秒减去ScheduledFutureTask类被加载的纳秒个数),这里是一处优化,为了解决第一次添加定时任务时,ScheduledFutureTask加载耗时导致定时任务定时缩短的问题。

1
2
3
4
5
6
7
8
9
10
11
12
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();

ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
//只有在当前任务的截止时间已经到了才会取出来
if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
return null;
}
scheduledTaskQueue.remove();
scheduledTask.setConsumed();
return scheduledTask;
}

可以看到,每次 pollScheduledTask 的时候,只有在当前任务的截止时间已经到了,才会取出来

计算本次任务循环的截止时间

1
2
3
4
5
6
7
8
9
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
//计算本次任务循环的截止时间
final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;

这一步将取出第一个任务,用reactor线程传入的超时时间 timeoutNanos 来计算出当前任务循环的deadline,并且使用了runTaskslastExecutionTime来时刻记录任务的状态

循环执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//执行任务
for (;;) {
safeExecute(task);
runTasks ++;
// 每隔0x3F任务,即每执行完64次任务之后,判断当前时间是否超过本次reactor任务循环的截止时间了,
// 如果超过,那就break掉,如果没有超过,那就继续执行。
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}

task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}

这一步便是netty里面执行所有任务的核心代码了。
首先调用safeExecute来确保任务安全执行,忽略任何异常

然后将已运行任务 runTasks 加一,每隔0x3F任务,即每执行完64个任务之后,判断当前时间是否超过本次reactor任务循环的截止时间了,如果超过,那就break掉,如果没有超过,那就继续执行。可以看到,netty对性能的优化考虑地相当的周到,假设netty任务队列里面如果有海量小任务,如果每次都要执行完任务都要判断一下是否到截止时间,那么效率是比较低下的

收尾

1
2
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;

收尾工作很简单,调用一下 afterRunningAllTasks 方法

1
2
3
4
@Override  //SingleThreadEventLoop
protected void afterRunningAllTasks() {
runAllTasksFrom(tailTasks);
}

NioEventLoop可以通过父类SingleTheadEventLoopexecuteAfterEventLoopIteration方法向tailTasks中添加收尾任务,比如,你想统计一下一次执行一次任务循环花了多长时间就可以调用此方法

1
2
3
4
5
6
7
public final void executeAfterEventLoopIteration(Runnable task) {
// ...
if (!tailTasks.offer(task)) {
reject(task);
}
//...
}

5 解决JDK空轮询bug

https://www.jianshu.com/p/3ec120ca46b2

空轮询bug:

正常情况下,selector.select()操作是阻塞的,只有被监听的fd有读写操作时,才被唤醒

但是,在这个bug中,没有任何fd有读写请求,但是select()操作依旧被唤醒

很显然,这种情况下,selectedKeys()返回的是个空数组

然后按照逻辑执行到while(true)处,循环执行,导致死循环。

netty 会在每次进行 selector.select(timeoutMillis) 之前记录一下开始时间currentTimeNanos,在select之后记录一下结束时间,判断select操作是否至少持续了timeoutMillis秒

这里将time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos改成

time - currentTimeNanos >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis)或许更好理解一些

如果持续的时间大于等于timeoutMillis,说明就是一次有效的轮询,重置selectCnt标志,否则,表明该阻塞方法并没有阻塞这么长时间,可能触发了jdk的空轮询bug,当空轮询的次数超过一个阀值的时候,默认是512,就开始重建selector