netty源码分析之新连接接入

重读总结:

  • 客户端的通道是由服务器创建的,当主线程监听到Accept事件,会进入事件处理流程,创建客户端通道
  • 子通道的生命周期 创建——初始化——注册
    • 创建,主线程执行处理流程时创建子通道NioSocketChannel对象
    • 初始化,在主通道PipeLine中有一个ServerBootstrapAcceptor处理器,专门对新的子通道执行初始化和注册操作
    • 注册,注册完成之后子通道就处于激活状态了,会调用channelActive方法,向下调用doBeginRead完成监听事件的更改
  • 所有的通道创建后都默认为autoRead,在通道激活之后,就会进入doBeginRead()流程,该流程就是更改通道的监听事件

我们知道服务端启动后,会有一条boss线程在运行着,负责接受客户端的新连接。boss线程具体运行的逻辑在NioEventLoop的run()方法中,这里不做具体体分析,只截取本文关心的代码片段:

1
2
3
4
5
6
7
8
9
//NioEventLoop
protected void run() {
...
strategy = select(curDeadlineNanos);
...
if (strategy > 0) {
processSelectedKeys();
}
}

典型的jdk nio 的代码逻辑,先select,再处理selectedKey,跟进处理代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//NioEventLoop
private void processSelectedKeys() {
if (selectedKeys != null) {
// 优化过的处理 一般会进这里
processSelectedKeysOptimized();
} else {
// 正常处理
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
// 由于主线程上只注册了一条主通道,所以
// 从selectedKey中取出附件只能是NioServerSocketChannel对象
if (a instanceof AbstractNioChannel) {
// 继续跟进
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
...
try{
...
//注意:所有通道注册的时候都没有第一时间指定监听事件,而ops==0 时,这里默认是监听读就绪事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {}
}

看到最终调用了unsafe.read()方法,注意这里处理的是 NioServerSocketChannel 上的读。

这里我们要清楚,netty能读到的数据分为两种类型,对于NioServerSocketChannel来说,它只负责接收新连接,所以读到的是建立连接的请求,netty将这类数据称为 message 。对于已经接入的客户端连接,读到的是业务请求,netty将这类数据成为 byte

由于这里处理的是新连接接入,read()方法将进入 AbstractNioMessageChannel.NioMessageUnsafe 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//NioMessageUnsafe
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);

boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 不断的读取消息,可以猜到读取的是一个个NioSocketChannel对象
int localRead = doReadMessages(readBuf);
...
} while (allocHandle.continueReading());
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 这里向NioServerSocketChannel 的pipeline中传进ChannelRead事件,参数是读取到的NioSocketChannel
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
...
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}

重点在doReadMessages() 方法,该方法将调用到底层的jdk的accept()得到的 SocketChannel ,并将其包装成netty的 NioSocketChannel 并add 到 buf 中,而上层的read()方法可以直接访问 buf。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//NioServerSocketChannel
protected int doReadMessages(List<Object> buf) throws Exception {
// accept得到SocketChannel对象
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// 根据SocketChannel对象 创建NioSocketChannel
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
...
}

return 0;
}

doReadMessages() 调用完之后,我们再回到read()方法,可以看到将读取到每个 NioSocketChannel 作为参数调用了 NioServerSocketChannelpipeline.channalRead(buf.get(i)) 方法,这个调用会产生什么反应呢?

服务端启动分析时提到,初始化 NioServerSocketChannel 阶段向 主通道的PipeLine 中添加了一个 ServerBootstrapAcceptor ,后文称接收器,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//ServerBootstrap
void init(Channel channel) {
...
ChannelPipeline p = channel.pipeline(); // 这里是NioServerSocketChannel的pipeline
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
...
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

初始化结束后, NioServerSocketChannel 的 pipeline 有以下几个节点:

1
head-->ServerBootstrapAcceptor-->tail

再看这个接收器ServerBootstrapAcceptor,它是 NioServerSocketChannel 的一个 InBound 事件处理器,read()方法调用pipeline.channalRead(buf.get(i))时,会首先进入head执行channelRead 方法,head只是简单的向下传播此事件,然后进入 ServerBootstrapAcceptorchannelRead 方法,我们来看具体做了什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//ServerBootstrapAcceptor
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
//1、向NioSocketChannel的PipeLine中 add ChannelInitializer
child.pipeline().addLast(childHandler);
//2. 设置options和attrs
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
//3. 异步执行通道注册,监听事件尚未指定,未指定默认监听读就绪
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}

首先对读取到的 NioSocketChannel 执行了初始化,该方法主要对 NioSocketChannel 做了三件事:

  1. 添加用户代码中指定的 InBound 事件处理器 ChannelInitializer 用于初始化NioSocketChannel,该handle向NioSocketChannel 中 add 一系列事件处理器 ,ChannelInitializer执行完成之后将会被remove掉。
  2. 设置用户代码中指定的 NioSocketChannel 的 options 和 attrs
  3. 使用 childGroup 线程池执行 NioSocketChannel 注册任务

childGroup 会使用 chooser 选择分配一条线程(EventLoop)给 NioSocketChannel,之后所有该channel的任务都将由这条线程执行。

注册过程与NioServerSocketChannel的注册流程一致,最终会调用到 AbstractChannel.AbstractUnsafe 类的 register0 ()方法进行注册。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void register0(ChannelPromise promise) {
try {
...
boolean firstRegistration = neverRegistered;
//1. 执行注册
doRegister();
neverRegistered = false;
registered = true;
//2.传入事件handlerAdded
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
//3.传入事件channelRegistered
pipeline.fireChannelRegistered();
//4.注册成功则传入事件channelActice
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
...
}
}

调用jdk底层注册channel

1
2
3
4
5
protected void doRegister() throws Exception {
...
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
...
}

再提一次,注册时并没有设置感兴趣的事件,第二个参数为0,第三个参数则是将netty封装后的NioSocketChannel 当做附件,放到了selectionKey中,之后select出的selectionKey中都将带有netty的channel对象,这种设计实现了netty channel 与 jdk channel 的映射。

继续往下看,注册完之后,会调用一系列的pipeline方法,handlerAddedchannelRegisteredchannelActive,此时应该已经很清楚,从 unsafe 中调用 pipeline 的方法,调用进入 pipeline传入的事件我们称之为 inbound 事件

其中 channelActive 为流程中的重要一环,pipeLine调用后首先会调用head节点的channelActive方法,我们看一下head的channelActive源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
// HeadCotext
public void channelActive(ChannelHandlerContext ctx) {
// 向下传递事件
ctx.fireChannelActive();
// 如果设置为自动读,autoRead 默认为true,则调用channel的read方法
readIfIsAutoRead();
}

private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}

往下看调用链

1
2
3
4
5
// AbstractChannel
public Channel read() {
pipeline.read();
return this;
}

再跟进

1
2
3
4
5
// DefaultChannelPipeline
public final ChannelPipeline read() {
tail.read();
return this;
}

最终调用到了tail的read方法, tail 的 read 方法是继承自父类 AbstractChannelHandlerContext:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//AbstractChannelHandlerContext
public ChannelHandlerContext read() {
// read方法 将从tail开始往前检索,找到实现了read方法的OutBoundHandler,将找到head节点
final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeRead();
} else {
Tasks tasks = next.invokeTasks;
if (tasks == null) {
next.invokeTasks = tasks = new Tasks(next);
}
executor.execute(tasks.invokeReadTask);
}
return this;
}

逻辑也很清晰,从 tail 开始,向前调用重载了read方法的OutboundHandler,直到head节点,看一下head节点的 read() 方法:

1
2
3
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}

调用到了 unsafe 的 beginRead方法,这里我们需要明白从 pipeline 中不断调用最终到达unsafe 的调用链,称为OutBound 事件传播,调用出pipeline,所以 read 是一个outbound事件。

继续跟源码:

1
2
3
4
5
6
@Override
public final void beginRead() {
...
doBeginRead();
...
}

跟进:

1
2
3
4
5
6
7
8
9
10
11
12
13
// AbstractNioChannel
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
// interestOps & OP_READ 若果没有监听读就绪事件 do it
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}

重点是最后一行,设置监听事件为读就绪。

一个连接从接入,到注册,到设置监听读就绪,之后,客户端与服务器便能正常的通信了。