netty源码分析之服务端启动

重读总结:

  • 想要更好的理解服务启动流程,就要明白netty的线程模型——reactor模型,主线程负责接受新连接,只监听主线程selector上注册的ACCEPT事件
  • 主通的生命周期为 创建——初始化——注册——绑定
    • 创建,就是通过反射调用 默认的构造函数,创建一个NioServerSocketChannel对象
    • 初始化,将启动程序中对主通道的设置配置到该对象中,最主要的是在主通道PipeLine中添加ServerBootstrapAcceptor处理器,用于处理新子通道的注册
    • 注册,将主通道注册到主线程的Selector上,此时未设置监听事件,且注册完成之后,主通道还是处于未激活状态
    • 绑定,将主通道绑定到一个端口上,绑定成功将激活通道,触发主通道PipeLine的channelActice事件,这是InBound事件,首先调用HeadContextchannelActice方法,在该方法中开始执行read流程。由于read是一个个OutBound流程,最终还是会调用到HeadContext.read()方法。HeadContext.read又会调用unsafe层的read方法,在该方法中对主通道的监听事件进行了初始化。
  • 需要注意的是,主线程上只注册了主通道一条通道。

首先贴一段简单的服务器启动代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args){
NioEventLoopGroup parent = new NioEventLoopGroup(1);
NioEventLoopGroup children = new NioEventLoopGroup();
ServerBootstrap bs = new ServerBootstrap();
bs.group(parent,children)
.channel(NioServerSocketChannel.class)
.handler(new ServerHandler())
.childHandler(new ChannelInitializer<Channel>(){
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new Spliter());
}
});
bs.bind(8080);
}

代码逻辑:

1、创建了一个线程池 parent,其中只有一个线程,主要负责接受新连接

2、创建另一个线程池 children,线程个数为默认值,核心数的2倍,主要负责处理客户端channel上的各种事件

3、创建服务器启动引导对象,将两个线程池通过 group 方法设置进去

4、使用 channel 方法指定服务器使用的i/o模型为nio

5、使用 handler 方法给服务器的 NioServerSocketChannel 的pipeline添加节点。

6、使用childHandler 方法指定新连接接入过程中客户端 NioSocketChannel 初始化方法,主要给这些channe l的 PipeLine 添加节点

7、通过 bind(8080) 方法启动服务绑定到8080端口

对于Reactor线程模型一直有一点疑惑,worker线程池的工作方式是一条channel分配一条线程执行所有的业务逻辑,但是boss线程池面对是仅有一条的NioServerSocketChannel,为什么还需要线程池来处理呢?

实际上bossGroup中有多个NioEventLoop线程,每个NioEventLoop绑定一个端口,也就是说,如果程序只需要监听1个端口的话,bossGroup里面只需要有一个NioEventLoop线程就行了。

本文的主要讲述的是服务端的启动流程,所以以bind方法为入口,源码节选关键代码块。

bind方法定义在 ServerBootstrap 类的父类 AbstractBootstrap中:

1
2
3
4
5
//AbstractBootstrap
public ChannelFuture bind() {
...
return doBind(localAddress);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private ChannelFuture doBind(final SocketAddress localAddress) {
// 核心 初始化通道 注册通道
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
...

if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
// 核心 绑定监听端口
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
...
}
return promise;
}

doBind方法中主要有两个核心方法 initAndRegister()doBind0(),前者主要负责创建、初始化、注册NioServerSocketChannel,后者负责将创建的通绑定到指定端口并启动服务。

下面我们逐一来分析,首先是initAndRegister(),看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 创建
channel = channelFactory.newChannel();
// 初始化
init(channel);
} catch (Throwable t) {
...
}
// 注册
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
...
}
}

逻辑清晰,先创建,再初始化,再注册。

创建:见文章开头服务器启动代码,调用ServerBootstrap的channel()方法设置io模式的时候:

1
2
3
4
5
6
7
8
9
10
11
//AbstractBootstrap
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
...
this.channelFactory = channelFactory;
return self();
}

已经指定了 channelFactory 为 ReflectiveChannelFactory , 所以创建语句channelFactory.newChannel()会调用到ReflectiveChannelFactory的newChannel()方法,源码就不贴了,就是反射调用指定类型的默认构造函数创建一个Channel对象,io模型为NIO时,创建的是NioServerSocketChannel对象。

初始化init(channel)调用到ServerBootstrapinit()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//ServerBootstrap
void init(Channel channel) {
// 设置options
setChannelOptions(channel, newOptionsArray(), logger);
// 设置 attrs
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
// 设置新接入channel的options和attrs
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
// 向NioServerSocketChannel中添加用户自定义Handler,最后添加用于处理新连接的ServerBootstrapAcceptor
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler); // 添加ServerHandler
}
// 向serverChannel的流水线处理器中加入了一个 ServerBootstrapAcceptor,
// 从名字上就可以看出来,这是一个接入器,专门接受新请求
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

ServerBootstrapAcceptor是一个重点类,这是一个InBoundHandler,当客户端连接请求到来,服务器channel会将新的客户端channel丢给这个handler进行处理,而他主要的执行逻辑就是,将这个新的客户端channel注册到childGroup中的一个线程中,看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 这里拿到的服务端为新接入的客户端创建的 NioSocketChannel对象
final Channel child = (Channel) msg;
//向NioSocketChannel的PipeLine中 add ChannelInitializer,初始化客户端channel的执行流程pipleLine
child.pipeline().addLast(childHandler);

setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
// 异步执行客户端通道注册,监听事件尚未指定,未指定默认监听读就绪
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// 强制关闭
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}

注册config().group().register(channel)经过一系列调用,最终会调用到 AbstractChannel.AbstractUnsafe 类的 register0 ()方法进行注册。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//AbstractChannel.AbstractUnsafe
private void register0(ChannelPromise promise) {
try {
...
boolean firstRegistration = neverRegistered;
//1. 执行注册
doRegister();
neverRegistered = false;
registered = true;
//2.传入事件handlerAdded
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
//3.传入事件channelRegistered
pipeline.fireChannelRegistered();
//4.注册成功判断是否激活,是则传入事件channelActice
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
...
}
}

调用jdk底层注册channel

1
2
3
4
5
6
//AbstractChannel.AbstractUnsafe
protected void doRegister() throws Exception {
...
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
...
}

可以看到,注册时并没有设置感兴趣的事件,第二个参数为0,第三个参数则是将netty封装后的NioSocketChannel 当做附件,放到了selectionKey中,之后select出的selectionKey中都将带有netty的channel对象,这种设计实现了netty channel 与 jdk channel 的映射。

注册完成之后会会进行一次判断isActive(),对于NioServerSocketChannel来说,到这里并没有激活,因为NioServerSocketChannel 的激活条件是isOpen() && javaChannel().socket().isBound(),channel需要open且已绑定端口,目前只完成了注册还未进行绑定,所以这里不能触发channelActive事件。

而对于NioSocketChannel来说,判断条件是ch.isOpen() && ch.isConnected(),open并已连接,所以NioSocketChannel 一般是在此处触发channelActive事件。

initAndRegister()执行完,来到了dBind0()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//AbstractBootstrap
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise)
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

唯一的逻辑就是提交了一个异步任务,调用channel.bind()方法。

对于channel来说bind是一个outBound事件,channel.bind()会继续调用pipeline.bind(),继续往下掉用tail.bind(),然后就是一个节点一个节点往前传,最终调用到head节点的bind方法:

1
2
3
4
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}

看到unsafe我就开心,因为马上要干活了,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//AbstractChannel.AbstractUnsafe
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
...//略过一大堆判断
// 连接是否激活 绑定之前时false
boolean wasActive = isActive();
try {
// 关键点
doBind(localAddress);
} catch (Throwable t) {}
// 之前未激活 现在已激活
if (!wasActive && isActive()) {
//触发连接激活事件
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}

最终unsafe.doBind()调用到了NioServerSocketChannel中的doBind(),兜了一个大圈还是回到了原点。

1
2
3
4
5
6
7
8
9
//NioServerSocketChannel
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
// 这里调用jdk nio的api 进行绑定
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}

这里就是底层jdk的逻辑了,这里执行完成,serverSocket就算是真正的启动了起来。

再回到unsafe.doBind(),成功后触发pipeline.fireChannelActive(),我们现在都有经验了,这种inBound事件,调用一大圈最终都是从head节点开始执行,来看head的channelActive()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
// HeadCotext
public void channelActive(ChannelHandlerContext ctx) {
// 向下传递事件
ctx.fireChannelActive();
// 如果设置为自动读,autoRead 默认为true,则调用channel的read方法
readIfIsAutoRead();
}

private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}

往下看调用链:

1
2
3
4
5
// AbstractChannel
public Channel read() {
pipeline.read();
return this;
}

再跟进

1
2
3
4
5
// DefaultChannelPipeline
public final ChannelPipeline read() {
tail.read();
return this;
}

最终调用到了tail的read方法, tail 的 read 方法是继承自父类 AbstractChannelHandlerContext:

1
2
3
4
5
6
7
8
9
10
11
12
//AbstractChannelHandlerContext
public ChannelHandlerContext read() {
// read方法 将从tail开始往前检索,找到实现了read方法的OutBoundHandler,将找到head节点
final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeRead();
} else {
...
}
return this;
}

逻辑也很清晰,从 tail 开始,向前调用重载了read方法的OutboundHandler,直到head节点,看一下head节点的 read() 方法:

1
2
3
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}

调用到了 unsafe 的 beginRead方法,这里我们需要明白从 pipeline 中不断调用最终到达unsafe 的调用链,称为OutBound 事件传播,调用出pipeline,所以 read 是一个outbound事件。

继续跟源码:

1
2
3
4
5
6
//AbstractChannel.AbstractUnsafe
public final void beginRead() {
...
doBeginRead();
...
}

跟进:

1
2
3
4
5
6
7
8
9
10
11
12
13
// AbstractNioChannel
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
// 若果没有监听指定的事件 do it 这里readInterestOp = OP_ACCEPT
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}

重点是最后一行,设置监听事件为OP_ACCEPT。至此 NioServerSocketChannel 注册成功并监听OP_ACCEPT事件。

readInterestOpAbstractNioChannel的成员变量,在其子类NioServerSocketChannel初始化的时候进行了指定,指定为SelectionKey.OP_ACCEPT,看代码:

1
2
3
4
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

下一篇分析客户端新连接入流程。