netty源码分析之PipeLine

重读总结:

  • 事件传播方法,以fireChannelActive()为例,这个方法是定义在ChannelInboundInvoker接口中的,AbstractChannelHandlerContextDefaultChannelPipeline两个类中都重载了这个方法,两者实现的区别是:
    • AbstractChannelHandlerContext中的重载是从当前的Pipeline节点开始传播事件,一般用于业务逻辑处理结束后继续传播事件,下一节点需要查找
    • DefaultChannelPipeline的逻辑是用于事件刚要进入pipeline的时候,下一节点直接指定,In事件指定给head,out事件指定给tail
  • Handler热插拔

Channel创建的时候会创建一个PipeLine,并且PipeLine也持有Channel对象的引用,二者是互相引用的关系。

1
2
3
4
5
6
7
8
// AbstractChannel.class
protected AbstractChannel(Channel parent) {
...
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}

AbstractChannel 是Channel接口的第一个抽象实现类,其中就声明了对pipeline的引用,在看pipeLine的初始化,创建了一个DefaultChannelPipeline ,构造函数将正在构造的channel对象传进去,源码:

1
2
3
4
5
6
7
8
9
10
11
12
//DefaultChannelPipeline
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);

tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}

第一步就是将传入的channel对象存下来,然后创建头结点head和尾结点tail组成一个初始的双向链表。

ok,至此我们可以了解到PipeLine 数据结构是一个双向链表,头结点是HeadContext对象,尾结点是TailContext对象,而头尾节点都是 AbstractChannelHandlerContext 的子类。

那我们要弄清楚pipeLine的工作方式,肯定要先搞搞明白组成它的节点AbstractChannelHandlerContext 到底是个啥。

PipeLine节点分析

AbstractChannelHandlerContext 是实现了ChannelHandlerContext接口的抽象类,我们根据字面意思理解,ChannelHandlerContext 就是执行 ChannelHandler的上下文,上下文应该包含该Handler的执行逻辑,并且负责这段逻辑的调用,以及调用结果的处理

而ChannelHandler我们应该很熟悉了,用来处理客户端请求或者服务器响应的一些处理器。

服务器对客户端请求的处理,一般来说都是要分步骤执行的,一个常见的例子就是

1
2
3
接受请求得到byteBuf-->解码得到request对象-->对request鉴权-->处理reques得到response

-->对response编码得到byteBuf并发送响应

对于以上流程,服务器程序中需要调用pipeline.addLast(new xxxxHandler())方法加入到pipeline中Handler有:DecodeHandler–>LoginHandler–>BussinessHandler–>EncodeHandler ,跟进addLast方法调用最终会到达

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//DefaultChannelPipeline
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//1 检查是否重复添加
checkMultiplicity(handler);
//2 创建节点 DefaultChannelHandlerContext类型
newCtx = newContext(group, filterName(name, handler), handler);
//3 添加节点 双向链表操作
addLast0(newCtx);
...
}
//4 回调用户方法
callHandlerAdded0(newCtx);
return this;
}
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
//group为null,因此childExecutor(group)也返回null
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
1
2
3
4
5
6
7
//DefaultChannelHandlerContext
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
// 将参数回传到父类,保存Handler的引用
super(pipeline, executor, name, handler.getClass());
this.handler = handler;
}
1
2
3
4
5
6
7
8
9
//AbstractChannelHandlerContext
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor; // null
this.executionMask = mask(handlerClass); // 生成一个掩码 可以快读判断这个Handler重载了哪些方法
ordered = executor == null || executor instanceof OrderedEventExecutor; // ture
}

可以看到,addLast方法将用户写的Handler包装成了一个 DefaultChannelPipeline ,加入到了双向链表的tail节点之前。pipeline节点就拥有了handler执行逻辑。

接下来要弄清楚的是,ChannelHandlerContext 是怎样调用这些逻辑的,ChannelHandlerContext继承了ChannelInboundInvoker, ChannelOutboundInvoker这两个接口,里面定义的方法就是用来调用handler逻辑。

那为什么要继承两个接口呢?

Handler 有 in 和 out 之分,但是HandlerContext没有,所以为了既能调用inhandler逻辑又能调用outhandler逻辑,就继承了两个接口。

老版本的netty,AbstractChannelHandlerContext 有两个bool属性InBound 和outBound,InBound=true表示该节点是inBound,outBound=true表示该节点是outBound,当然也可能同时为true。在我阅读的源代码版本(4.1.50)中已经删除了这两个属性,取而代之的是属性executionMask,通过使用该属性可以在事件传播时,快速的判断出该节点中的Handler在inBound方向和outBound方向有没有重载某事件处理逻辑。此处将另外分析。

最后要弄明白的是,handler的执行结果应该怎么处理?

我们不要忘了,pipeLine是一个双向链表,在一个上下文节点中,我们可以很方便的获取到前一个或者后一个上下文节点,当前的节点执行完handler逻辑之后,调用ChannelInboundInvoker或者ChannelOutboundInvoker定义的方法(AbstractChannelHandlerContext对这些方法做了实现),就可以将当前的执行结果传递给下一个节点或者上一个节点。这将取决于事件的类型是inBound还是outBound。

事件传播

在此分别选取一种事件传播的源码看一下

inBound事件

看一个inBound事件,当unsafe中执行pipeline.fireChannelRead()

1
2
3
4
5
6
//DefaultChannelPipeline
public final ChannelPipeline fireChannelRead(Object msg) {
// 最先调用的是head节点的channelRead方法,此处是静态方法调用
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
//AbstractChannelHandlerContext

// 最最核心的就是这个静态调用,负责传播的核心方法
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}

// 这是实例方法,在某节点中确定下一个执行节点,确定了之后将执行静态调用
public ChannelHandlerContext fireChannelRead(final Object msg) {
// 又是这个静态调用
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
// 寻找下一个节点的逻辑
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
// 这里可以看出inBound事件是向后传播的
ctx = ctx.next;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
}

// 具体负责执行handler逻辑的方法
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireChannelRead(msg);
}
}

在handler channelRead()逻辑最后,都会调用一下ctx.channelRead,将事件传播下去

outBound事件

再看一个outBound事件传播的代,当我们在某handler中执行 ctx.pipeline().writeAndFlush()

1
2
3
4
//DefaultChannelPipeline
public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
return tail.write(msg, promise); //pipeline直接找tail
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//AbstractChannelHandlerContext
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
write(msg, false, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
...
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise); // 执行这里
}
} else {
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
task.cancel();
}
}
}
// 寻找下一个节点
private AbstractChannelHandlerContext findContextOutbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.prev;// 往前找
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
return ctx;
}
void invokeWrite(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
} else {
write(msg, promise);
}
}
// 执行handler逻辑的方法
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}

在handler的write方法中最后,都会调用一下ctx.write()将时间传播上去

inBound事件传播是从head节点开始,到tail节点结束。tail节点,作为InBoundHandler实现了所有的inBound事件处理方法,而实现的逻辑是空的,即不作任何处理,以结束inBound事件的传播。

OutBound事件传播是从tail节点开始,到head节点结束,head节点作为OutBoundHandler,实现了所有的outBound事件处理方法,将所有的outBoud事件委托给unsafe执行相应的底层逻辑。

整理一下思路:对于in 和 out 应该站在pipeLine的角度去理解,比如新连接channel注册成功之后,会调用pipeline().fireChannelActive(),向pipeline中传入事件,这种就是in事件;而writeAndFlush操作,需要跳出pipeline调用unSafe来向channel中写数据,因此是一个out事件。

至此,还有一点需要分析,那就是异常的传播。

异常传播

我们通常在业务代码中,会加入一个异常处理器,统一处理pipeline过程中的所有的异常,并且,一般该异常处理器需要加载自定义节点的最末尾

此类ExceptionHandler一般继承自 ChannelDuplexHandler,标识该节点既是一个inBound节点又是一个outBound节点,我们分别分析一下inBound事件和outBound事件过程中,ExceptionHandler是如何才处理这些异常的

inBound异常

我们以数据的读取为例,看下netty是如何传播在这个过程中发生的异常

我们前面已经知道,对于每一个节点的数据读取都会调用AbstractChannelHandlerContext.invokeChannelRead()方法

1
2
3
4
5
6
7
8
//AbstractChannelHandlerContext
private void invokeChannelRead(Object msg) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
}

可以看到该节点最终委托到其内部的ChannelHandler处理channelRead,而在最外层catch整个Throwable,因此,我们在如下用户代码中的异常会被捕获

1
2
3
4
5
6
7
8
9
public class BusinessHandler extends ChannelInboundHandlerAdapter {
@Override
protected void channelRead(ChannelHandlerContext ctx, Object data) throws Exception {
//...
throw new BusinessException(...);
//...
}

}

上面这段业务代码中的 BusinessException 会被 BusinessHandler所在的节点捕获,进入到 notifyHandlerException(t);往下传播,我们看下它是如何传播的

1
2
3
4
5
6
7
8
9
//AbstractChannelHandlerContext
private void notifyHandlerException(Throwable cause) {
// 略去了非关键代码,读者可自行分析
invokeExceptionCaught(cause);
}

private void invokeExceptionCaught(final Throwable cause) {
handler().exceptionCaught(this, cause);
}

可以看到,此Hander中异常优先由此Handelr中的exceptionCaught方法来处理,默认情况下,如果不覆写此Handler中的exceptionCaught方法,调用

1
2
3
4
5
//ChannelInboundHandlerAdapter
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.fireExceptionCaught(cause);
}
1
2
3
4
5
//AbstractChannelHandlerContext
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
invokeExceptionCaught(next, cause);
return this;
}

到了这里,已经很清楚了,如果我们在自定义Handler中没有处理异常,那么默认情况下该异常将一直传递下去,遍历每一个节点,直到最后一个自定义异常处理器ExceptionHandler来终结,收编异常

1
2
3
4
5
6
7
public Exceptionhandler extends ChannelDuplexHandler {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
// 处理该异常,并终止异常的传播
}
}

到了这里,你应该知道为什么异常处理器要加在pipeline的最后了吧?

outBound异常

然而对于outBound事件传播过程中所发生的异常,该Exceptionhandler照样能完美处理,为什么?

我们以前面提到的writeAndFlush方法为例,来看看outBound事件传播过程中的异常最后是如何落到Exceptionhandler中去的

前面我们知道,channel.writeAndFlush()方法最终也会调用到节点的 invokeFlush0()方法(write机制比较复杂,我们留到后面的文章中将)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//AbstractChannelHandlerContext
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}


private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}

invokeFlush0()会委托其内部的ChannelHandler的flush方法,我们一般实现的即是ChannelHandler的flush方法

1
2
3
4
5
6
7
private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}

好,假设在当前节点在flush的过程中发生了异常,都会被 notifyHandlerException(t);捕获,该方法会和inBound事件传播过程中的异常传播方法一样,也是轮流找下一个异常处理器,而如果异常处理器在pipeline最后面的话,一定会被执行到,这就是为什么该异常处理器也能处理outBound异常的原因

关于为啥 ExceptionHandler 既能处理inBound,又能处理outBound类型的异常的原因,总结一点就是,在任何节点中发生的异常都会往下一个节点传递,最后终究会传递到异常处理器

Handler热插拔

netty 还有个最大的特性之一就是Handler可插拔,可以做到动态编织pipeline,比如在首次建立连接的时候,需要通过进行权限认证,在认证通过之后,就可以将此context移除,下次pipeline在传播事件的时候就就不会调用到权限认证处理器。

新连接接入时,ServerBootstrapAcceptor 会给新的NioSocketChannel 添加一个InBoundHandler叫ChannelInitialzer,添加成功之后会触发handlerAdded方法,该方法会调用重载的initialChannel方法初始化新连接的pipeline,结束后会调用pipeline.remove 删除此handler,这里也是热插拔的体现。