一个NioEventLoop对应于Reactor模型中的一个从Reactor线程,它持有一个Thread引用,可以简单将NioEventLoop理解为一个用于处理channel事件的线程。
一个channel上的事件只能被同一个线程处理,NioEventLoop线程对channel事件的处理是一个串行化无锁执行过程,netty在初始化channel的时候在pipeline中添加了一系列用户指定的Handler(通过childHanler()
方法),这些Handler的处理需要遵循一个固定的顺序,netty底层使用同一个线程按照这个顺序串行执行,避免了多线程处理同一个channel需要使用锁同步产生的开销,这叫串行化无锁编程
一个NioEventLoop可以处理多个channel的就绪事件,即同一个nio线程可以处理多条连接的请求,这叫多路复用
1、Channel指定evenloop
Channel的EventLoop
是在注册的时候指定的,netty服务器启动时,首先会注册一个ServerChannel,该注册工作由Boss线程池完成,通常Boss线程池中只有一个线程。
1 | // AbstractBootstrap |
group().register(channel)
在boss线程池中注册Channel,boss线程池调用next()
方法获取一个EventLoop对象来注册channel,看下面代码:
1 | // MultithreadEventLoopGroup |
next方法获取到一个EventLoop
对象之后,再调用SingleThreadEventLoop
类的注册方法:
1 | // SingleThreadEventLoop |
最终会调用channel的unsafe对象完成注册,并在此时将this EventLoop 作为参数传进去
1 | // AbstractChannel.AbstractUnsafe |
注册的时候完成了eventLoop的指定。
总结一下:
以上分析的是ServerChannel注册时指定EventLoop的过程,这个过程只会在服务启动的时候进行,而在服务运行期间,会不断的有新连接接入,每个新连接都需要注册到一个selector上,这是nio编程的规则。
所以服务启动程序创建的childGroup中的每个线程都持有一个selector的,新连接的注册过程,就是从childGroup中选取一个EventLoop对象,然后将新连接channel注册到该线程对象的selector上,再指定新连接channel的EventLoop为当前选取的的实例。
2、reactor线程启动
https://www.jianshu.com/p/0d0eece6d467
netty对于线程的创建采取的懒加载模式,第一次提交任务的时候才会创建线程。
NioServerSocketChannel
第一次提交任务,就是在注册的时候
1 | //AbstractChannel.AbstractUnsafe |
eventLoop.execute()
可以向NioEventLoop
中提交一个任务,这个方法继承自父类SingleThreadEventExecutor
1 | // SingleThreadEventExecutor |
跟进startThread()
方法
1 | // SingleThreadEventExecutor |
再调用了doStartThread
方法,这个方法是启动线程的核心逻辑了,在执行doStartThread
的时候,会调用eventLoop内部的一个用于新建线程的执行器execute()
方法,注意与上面的区别,此执行器默认为ThreadPerTaskExecutor
类型,创建新线程后将NioEventLoop
的主体逻辑run()
提交进去,并启动。
ThreadPerTaskExecutor
在每次执行execute()
方法的时候都会通过DefaultThreadFactory
创建一个FastThreadLocalThread
线程,而这个线程就是netty中的reactor线程实体,创建线程源码如下:
1 | public final class ThreadPerTaskExecutor implements Executor { |
总结一下:
- 我们在new一个NioEventLoopGroup的时候,它会持有一个NioEventLoop数组,数组中每个NioEventLoop元素对象会持有一个Thread引用,就是reactor线程本体了。
- netty对于线程的初始化采取的懒加载模式,当我们没有用到某个NioEventLoop时,它的线程是不会被创建出来的。
- 当我们通过
group().next()
获取到一个NioEventLoop,并向其提交任务,这时就会触发线程的创建-任务提交-启动。创建是通过ThreadPerTaskExecutor
和DefaultThreadFactory
两个类执行的,新线程执行的任务是NioEventLoop
的主体逻辑run方法。
3、reactor线程执行
https://www.jianshu.com/p/467a9b41833e
回到主线逻辑中,创建的线程中执行了SingleThreadEventExecutor.this.run();
即reactor线程的主体逻辑,贴一下主要代码片段
1 | protected void run() { |
此方法内部是一个无限循环,一旦启动将一直运行。
从以上代码中可以看出,线程一直在循环做三件事情
- 执行select
- 处理就绪的channel
- 执行任务队列中的任务
此处的源代码很长,下面我将分三个步骤分析run函数的主要流程
3.1 select阶段
1 | // |
首先看strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
这句,netty会根据任务队列的情况执行相应的select操作:
1 | // DefaultSelectStrategy |
当hasTasks()
返回true表明当前的任务队列中有需要执行的任务,会执行selectSupplier.get()
,那么selectSupplier
是什么呢?定位到该成员变量的初始化代码:
1 | // NioEventLoop |
看到这里我就可以得出结论了,strategy的初始化逻辑是:当任务队列hasTasks时selectNow,返回就绪数量,否则返回SelectStrategy.SELECT。
当strategy=SelectStrategy.SELECT
时,获取现在到下一个计划任务调度执行之间的时间curDeadlineNanos
,在这段时间内执行strategy=select(curDeadlineNanos)
当strategy
为正数,说明select到了就绪事件,此时将继续往下执行,进入处理阶段。
思考:SelectStrategy.CONTINUE
是哪种情况时设置的?
3.2 处理阶段
ioRatio 时间控制
netty通过一个int型参数ioRatio来控制io与cpu的耗时比例,ioRatio默认的初始值为50,即io和cpu计算各占50%
1 | cancelledKeys = 0; // |
- 如果ioRatio等于100,那么优先处理io事件,处理完就绪通道,再执行队列任务,一次全部执行完。
- 如果ioRatio不等于100,
- 若有就绪通道,那么在处理通道io事件时,会记录用时,完了之后再使用同样的时间去执行队列任务
- 若没有就绪通道,则执行最小数量(不一定是1个,后面分析)的队列任务
就绪通道处理不仅仅是对执行时间进行了细粒度的优化,在处理过程中也是尽可能的考虑到了执行效率,继续往下分析处理,也就是processSelectedKeys()
方法,贴上代码:
1 | //NioEventLoop |
selectedKeys优化
我们对优化过的selectedKeys的处理稍微展开一下,看看netty是如何优化的,我们查看 selectedKeys
被引用过的地方,有如下代码
1 | private SelectorTuple openSelector() { |
首先,selectedKeys是一个 SelectedSelectionKeySet
类对象,在NioEventLoop
的 openSelector
方法中创建,之后就通过反射将selectedKeys与 sun.nio.ch.SelectorImpl
中的两个field绑定
sun.nio.ch.SelectorImpl
中我们可以看到,这两个field其实是两个HashSet
1 | abstract class SelectorImpl extends AbstractSelector |
selector在调用select()
族方法的时候,如果有IO事件发生,就会往里面的两个field中塞相应的selectionKey
(具体怎么塞有待研究),即相当于往一个hashSet中add元素,既然netty通过反射将jdk中的两个field替换掉,那我们就应该意识到是不是netty自定义的SelectedSelectionKeySet
在add
方法做了某些优化呢?
带着这个疑问,我们进入到 SelectedSelectionKeySet
类中探个究竟
1 | final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> { |
该类其实很简单,继承了 AbstractSet
,说明该类可以当作一个set来用,但是底层使用一个数据来存放selectedKey,在add
方法中经历下面三个步骤
将SelectionKey塞到该数组的逻辑尾部
更新该数组的逻辑长度+1
如果该数组的逻辑长度等于数组的物理长度,就将该数组扩容
我们可以看到,待程序跑过一段时间,等数组的长度足够长,每次在轮询到nio事件的时候,netty只需要O(1)的时间复杂度就能将 SelectionKey
塞到 set中去,而jdk底层使用的hashSet需要O(lgn)的时间复杂度
执行就绪事件
关于netty对SelectionKeySet
的优化我们暂时就跟这么多,下面我们继续跟netty对IO事件的处理,转到processSelectedKeysOptimized
方法:
1 | private void processSelectedKeysOptimized() { |
该方法有两个关键内容
processSelectedKey()
方法,就绪事件处理的核心逻辑needsToSelectAgain
参数,判断是否该再来次轮询
processSelectedKey:逻辑很简单,获取就绪通道的监听事件,根据不同的事件分别进行处理
1 | private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { |
needsToSelectAgain及重轮询
1 | // needsToSelectAgain 默认为false 在本类的cancel方法中,当连接取消次数达到256 被设置为true |
回忆上面run函数的代码,这个参数在每一次轮询结束后都会被置为false,那么上面时候会置为true,从而执行在一次轮询呢?
通过检索,找到这样一个方法:
1 | // NioEventLoop |
还需要继续找出调用cancel函数的地方,使用IDEA的find usages
1 | //AbstractNioChannel |
不难看出,在channel从selector上移除的时候,调用cancel函数将key取消,并且当被去掉的key到达 CLEANUP_INTERVAL
的时候,设置needsToSelectAgain为true,CLEANUP_INTERVAL
默认值为256
也就是说,对于每个NioEventLoop而言,每隔256个channel从selector上移除的时候,就标记 needsToSelectAgain 为true,我们还是跳回到上面这段代码
每满256次,就会进入到if的代码块,首先,将selectedKeys的内部数组全部清空,方便被jvm垃圾回收,然后重新调用selectAgain
重新填装一下 selectionKey
1 | private void selectAgain() { |
netty这么做的目的应该是每隔256次channel断线,重新清理一下selectionKey,保证现存的SelectionKey及时有效
4 执行队列任务
https://www.jianshu.com/p/58fad8e42379
4.1 netty任务
netty中的task的常见使用场景:
- 非当前reactor线程调用channel的各种方法
已经分析过的服务启动过程中,主通道的注册、新连接接入时子通道的注册就是提交任务完成的,因为彼时运行的线程是main线程和主线程。
此外,这种情况在push系统中比较常见,一般在业务线程里面,根据用户的标识,找到对应的channel引用,然后调用write类方法向该用户推送消息,就会进入到这种场景
1 | // non reactor thread |
关于channel.write()类方法的调用链,后面会单独拉出一篇文章来深入剖析,这里,我们只需要知道,最终write方法串至以下方法
1 | private void write(Object msg, boolean flush, ChannelPromise promise) { |
外部线程在调用write
的时候,executor.inEventLoop()
会返回false,直接进入到else分支,将write封装成一个WriteTask
(这里仅仅是write而没有flush,因此flush
参数为false), 然后调用 safeExecute
方法
1 | private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) { |
接下来的调用链就进入到第一种场景了,但是和第一种场景有个明显的区别就是,第一种场景的调用链的发起线程是reactor线程,第二种场景的调用链的发起线程是用户线程,用户线程可能会有很多个,显然多个线程并发写taskQueue
可能出现线程同步问题,此时阻塞队列的作用展现出来了。
- 用户自定义普通任务
推荐使用的方法。
1 | ctx.channel().eventLoop().execute(new Runnable() { |
如果在业务处理的代码中,即业务处理Handler中,没有使用这种提交任务的方式,那么业务处理逻辑就会被当成IO阶段的一部分直接执行,占用IO时间。如果连接数大,会影响系统的吞吐量,推荐使用任务提交的方式处理业务请求。
我们跟进execute
方法,看重点
1 | // NioEventLoop |
execute
方法调用 addTask
方法
1 | // NioEventLoop |
然后调用offerTask
方法,如果offer失败,那就调用reject
方法,通过默认的 RejectedExecutionHandler
直接抛出异常
1 | // NioEventLoop |
跟到offerTask
方法,基本上task就落地了,netty内部使用一个taskQueue
将task保存起来,那么这个taskQueue
又是何方神圣?
我们查看 taskQueue
定义的地方和被初始化的地方
1 | // NioEventLoop |
我们发现 taskQueue
在NioEventLoop中默认是阻塞队列,老版本(4.1.6)中使用mpsc队列,即多生产者单消费者队列,netty使用mpsc,方便的将外部线程的task聚集,在reactor线程内部用单线程来串行执行,此处为什么要做此改变?
在本节讨论的任务场景中,所有代码的执行都是在reactor线程中的,所以,所有调用 inEventLoop()
的地方都返回true,既然都是在reactor线程中执行,那么其实这里的阻塞队列其实没有发挥真正的作用。
- 用户自定义定时任务
1 | ctx.channel().eventLoop().schedule(new Runnable() { |
第三种场景就是定时任务逻辑了,用的最多的便是如上方法:在一定时间之后执行任务
我们跟进schedule
方法
1 | public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { |
通过 ScheduledFutureTask
, 将用户自定义任务再次包装成一个netty内部的任务
1 | <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { |
到了这里,我们有点似曾相识,在非定时任务的处理中,netty通过一个阻塞队列将任务落地,这里,是否也有一个类似的队列来承载这类定时任务呢?带着这个疑问,我们继续向前
1 | // |
果不其然,scheduledTaskQueue()
方法,会返回一个优先级队列,然后调用 add
方法将定时任务加入到队列中去,但是,这里为什么要使用优先级队列,而不需要考虑多线程的并发?
因为我们现在讨论的场景,调用链的发起方是reactor线程,不会存在多线程并发这些问题
但是,万一有的用户在reactor之外执行定时任务呢?虽然这类场景很少见,但是netty作为一个无比健壮的高性能io框架,必须要考虑到这种情况。
对此,新老版本的netty的处理方式是不同的:
在Netty-4.1.6.Final
中,如果是在外部线程调用schedule,netty将添加定时任务的逻辑封装成一个普通的task,这个task的任务是添加[添加定时任务]的任务,而不是添加定时任务,其实也就是第二种场景,这样,对 PriorityQueue
的访问就变成单线程,即只有reactor线程
1 | <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { |
而在4.1.50.Final
版本中,并没有将外部线程提交的定时任务添加到定时任务队列中,而是直接添加到了普通队列中,并添加了一些线程控制
1 | private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { |
跟进execute(task
)方法
1 | public void execute(Runnable task) { |
task是ScheduledFutureTask
类型,task instanceof LazyRunnable
为false,wakesUpForTask(task)
返回true,所以这里会调用execute(task,true)
1 | private void execute(Runnable task, boolean immediate) { |
传入的参数immediate
表示是否立即唤醒select()
来执行任务,所以此时会执行wakeup(inEventLoop)
方法,
1 | // NioEventLoop |
那么再看lazyExecute(task)
1 | public void lazyExecute(Runnable task) { |
指定参数immediate
为false,表示任务添加之后不会立即唤醒线程。所以在任务提交完成之后,会再一次进行判断
deadlineNanos是否 小于 当前距离线程下一次被唤醒的nanos,如果是则执行唤醒线程的任务(那个空任务)
总结:
下面继续分析:
为什么定时任务要保存在优先级队列中,我们可以先不看源码,来思考一下优先级对列的特性
优先级队列按一定的顺序来排列内部元素,内部元素必须是可以比较的,联系到这里每个元素都是定时任务,那就说明定时任务是可以比较的,那么到底有哪些地方可以比较?
每个任务都有一个下一次执行的截止时间,截止时间是可以比较的,截止时间相同的情况下,任务添加的顺序也是可以比较的,就像这样,阅读源码的过程中,一定要多和自己对话,多问几个为什么
带着猜想,我们研究与一下ScheduledFutureTask
,抽取出关键部分
1 | final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> { |
这里,我们一眼就找到了compareTo
方法,cmd+u
跳转到实现的接口,发现就是Comparable
接口
1 | public int compareTo(Delayed o) { |
进入到方法体内部,我们发现,两个定时任务的比较,确实是先比较任务的截止时间,截止时间相同的情况下,再比较id,即任务添加的顺序,如果id再相同的话,就抛Error
这样,在执行定时任务的时候,就能保证最近截止时间的任务先执行
下面,我们再来看下netty是如何来保证各种定时任务的执行的,netty里面的定时任务分以下三种
1.若干时间后执行一次
2.每隔一段时间执行一次
3.每次执行结束,隔一定时间再执行一次
netty使用一个 periodNanos
来区分这三种情况,正如netty的注释那样
1 | /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */ |
了解这些背景之后,我们来看下netty是如何来处理这三种不同类型的定时任务的
1 | public void run() { |
if (periodNanos == 0)
对应 若干时间后执行一次
的定时任务类型,执行完了该任务就结束了。
否则,进入到else代码块,先执行任务,然后再区分是哪种类型的任务,periodNanos
大于0,表示是以固定频率执行某个任务,和任务的持续时间无关,然后,设置该任务的下一次截止时间为本次的截止时间加上间隔时间periodNanos
,否则,就是每次任务执行完毕之后,间隔多长时间之后再次执行,截止时间为当前时间加上间隔时间,-p
就表示加上一个正的间隔时间,最后,将当前任务对象再次加入到队列,实现任务的定时执行
netty内部的任务添加机制了解地差不多之后,我们就可以查看reactor第三部曲是如何来调度这些任务的
4.2 reactor线程task的调度
首先,我们将目光转向最外层的外观代码
1 | runAllTasks(long timeoutNanos); |
顾名思义,这行代码表示了尽量在一定的时间内,将所有的任务都取出来run一遍。timeoutNanos
表示该方法最多执行这么长时间,netty为什么要这么做?我们可以想一想,reactor线程如果在此停留的时间过长,那么将积攒许多的IO事件无法处理(见reactor线程的前面两个步骤),最终导致大量客户端请求阻塞,因此,默认情况下,netty将控制内部队列的执行时间
好,我们继续跟进
1 | protected boolean runAllTasks(long timeoutNanos) { |
这段代码便是reactor执行task的所有逻辑,可以拆解成下面几个步骤
- 从scheduledTaskQueue转移定时任务到taskQueue(mpsc queue)
- 计算本次任务循环的截止时间
- 执行任务
- 收尾
从scheduledTaskQueue转移定时任务到taskQueue(mpsc queue)
首先调用 fetchFromScheduledTaskQueue()
方法,将到期的定时任务转移到mpsc queue里面
1 | private boolean fetchFromScheduledTaskQueue() { |
可以看到,netty在把任务从scheduledTaskQueue
转移到taskQueue
的时候还是非常小心的,当taskQueue
无法offer的时候,需要把从scheduledTaskQueue
里面取出来的任务重新添加回去
从scheduledTaskQueue
从拉取一个定时任务的逻辑如下,传入的参数nanoTime
为当前时间(其实是当前纳秒减去ScheduledFutureTask
类被加载的纳秒个数),这里是一处优化,为了解决第一次添加定时任务时,ScheduledFutureTask
加载耗时导致定时任务定时缩短的问题。
1 | protected final Runnable pollScheduledTask(long nanoTime) { |
可以看到,每次 pollScheduledTask
的时候,只有在当前任务的截止时间已经到了,才会取出来
计算本次任务循环的截止时间
1 | Runnable task = pollTask(); |
这一步将取出第一个任务,用reactor线程传入的超时时间 timeoutNanos
来计算出当前任务循环的deadline,并且使用了runTasks
,lastExecutionTime
来时刻记录任务的状态
循环执行任务
1 | //执行任务 |
这一步便是netty里面执行所有任务的核心代码了。
首先调用safeExecute
来确保任务安全执行,忽略任何异常
然后将已运行任务 runTasks
加一,每隔0x3F
任务,即每执行完64个任务之后,判断当前时间是否超过本次reactor任务循环的截止时间了,如果超过,那就break掉,如果没有超过,那就继续执行。可以看到,netty对性能的优化考虑地相当的周到,假设netty任务队列里面如果有海量小任务,如果每次都要执行完任务都要判断一下是否到截止时间,那么效率是比较低下的
收尾
1 | afterRunningAllTasks(); |
收尾工作很简单,调用一下 afterRunningAllTasks
方法
1 | //SingleThreadEventLoop |
NioEventLoop
可以通过父类SingleTheadEventLoop
的executeAfterEventLoopIteration
方法向tailTasks
中添加收尾任务,比如,你想统计一下一次执行一次任务循环花了多长时间就可以调用此方法
1 | public final void executeAfterEventLoopIteration(Runnable task) { |
5 解决JDK空轮询bug
https://www.jianshu.com/p/3ec120ca46b2
空轮询bug:
正常情况下,selector.select()
操作是阻塞的,只有被监听的fd有读写操作时,才被唤醒
但是,在这个bug中,没有任何fd有读写请求,但是select()
操作依旧被唤醒
很显然,这种情况下,selectedKeys()
返回的是个空数组
然后按照逻辑执行到while(true)
处,循环执行,导致死循环。
netty 会在每次进行 selector.select(timeoutMillis)
之前记录一下开始时间currentTimeNanos
,在select之后记录一下结束时间,判断select操作是否至少持续了timeoutMillis秒
这里将time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos
改成
time - currentTimeNanos >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis)
或许更好理解一些
如果持续的时间大于等于timeoutMillis,说明就是一次有效的轮询,重置selectCnt标志,否则,表明该阻塞方法并没有阻塞这么长时间,可能触发了jdk的空轮询bug,当空轮询的次数超过一个阀值的时候,默认是512,就开始重建selector