netty源码分析之executionMask

老版本的netty,AbstractChannelHandlerContext 有两个bool属性InBound 和outBound,InBound=true表示该节点是inBound,outBound=true表示该节点是outBound,当然也可能同时为true。在我阅读的源代码版本(4.1.50)中已经删除了这两个属性,取而代之的是属性executionMask,通过使用该属性可以在事件传播时,快速的判断出该节点中的Handler在inBound方向和outBound方向有没有重载某事件处理逻辑。

之前分析过Pipeline的源码,在向Pipeline添加节点的时候,会创建DefaultChannelHandlerContext对象,调用其父类构造函数时

1
2
3
4
5
6
7
8
9
10
11
import static io.netty.channel.ChannelHandlerMask.mask;

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor; // null
this.executionMask = mask(handlerClass); // 生成一个掩码 可以快读判断这个Handler重载了哪些方法
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor; // ture
}

会用mask方法给其成员executionMask赋值,mask是ChannelHandlerMask类的静态方法,本文主要分析这类的源码,该类的源码并不复杂,之所以要拿出来分析,完全是因为这种设计思想还是值得借鉴的。

首先看这个类定义了许多的int型静态变量,作为标记,每一个事件处理方法对应了17位中的一位

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static final int MASK_EXCEPTION_CAUGHT = 1;

static final int MASK_CHANNEL_REGISTERED = 1 << 1; //2
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2; //4
static final int MASK_CHANNEL_ACTIVE = 1 << 3; //8
static final int MASK_CHANNEL_INACTIVE = 1 << 4; //16
static final int MASK_CHANNEL_READ = 1 << 5; //32
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6; //64
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7; //128
static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8; //256

static final int MASK_BIND = 1 << 9; //512
static final int MASK_CONNECT = 1 << 10; //1024
static final int MASK_DISCONNECT = 1 << 11; //2048
static final int MASK_CLOSE = 1 << 12; //4096
static final int MASK_DEREGISTER = 1 << 13; //8192
static final int MASK_READ = 1 << 14; //16384
static final int MASK_WRITE = 1 << 15; //32768
static final int MASK_FLUSH = 1 << 16; //65536

根据以上的定义

1
2
3
static final int MASK_ONLY_INBOUND =  MASK_CHANNEL_REGISTERED |
MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;

计算出纯纯的In事件方法掩码为510(1 1111 1110)

加上异常处理方法:

1
private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;

所有的In事件方法掩码为511 (1 1111 1111)

同理计算纯纯的out事件方法掩码为:130559 (1 1111 1111 0000 0000)

1
2
static final int MASK_ONLY_OUTBOUND =  MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;

加上异常处理方法为: 130560 (1 1111 1111 0000 0001)

1
private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;

重要的线程私有对象:

1
2
3
4
5
6
7
private static final FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>> MASKS =
new FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>>() {
@Override
protected Map<Class<? extends ChannelHandler>, Integer> initialValue() {
return new WeakHashMap<Class<? extends ChannelHandler>, Integer>(32);
}
};

很显然这个对象是用来存储ChannelHandler和其掩码之间的映射,注意线程私有属性。

mask方法

1
2
3
4
5
6
7
8
9
static int mask(Class<? extends ChannelHandler> clazz) {
Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
Integer mask = cache.get(clazz);
if (mask == null) {
mask = mask0(clazz);
cache.put(clazz, mask);
}
return mask;
}

首先尝试从map中获取掩码,求而不得再调用mask0方法计算掩码,存入map并返回

mask0方法

假设某Handler实现了两个方法MASK_CHANNEL_ACTIVEMASK_CHANNEL_REGISTERED

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
private static int mask0(Class<? extends ChannelHandler> handlerType) {
int mask = MASK_EXCEPTION_CAUGHT; // mask=1
try {
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND; //mask= 0 0000 0001 | 1 1111 1111 = 1 1111 1111
// isSkippable的逻辑就是 如果我在Handler中重写了channelRegistered方法 则返回false
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
// 由于重载了channelRegistered,这里不执行 后面同理
mask &= ~MASK_CHANNEL_REGISTERED;
}
if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
// mask = 1 1111 1111 & 1 1111 1011 = 1 1111 1011
mask &= ~MASK_CHANNEL_UNREGISTERED;
}
if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {、
// 跳过
mask &= ~MASK_CHANNEL_ACTIVE;
}
if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
// mask = 1 1111 1011 & 1 1110 1111 = 1 1110 1011
mask &= ~MASK_CHANNEL_INACTIVE;
}
if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
// mask = 1 1110 1011 & 1 1101 1111 = 1 1100 1011
mask &= ~MASK_CHANNEL_READ;
}
if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
// mask = 1 1100 1011 & 1 1011 1111 = 1 1000 1011
mask &= ~MASK_CHANNEL_READ_COMPLETE;
}
if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
// mask = 1 1000 1011 & 1 0111 1111 = 1 0000 1011
mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
}
if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
// mask = 1 0000 1011 & 0 1111 1111 = 0 0000 1011
mask &= ~MASK_USER_EVENT_TRIGGERED;
}
// 最终得到的 mask 是重写的所有方法对应位为1的一个int值
}

if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_OUTBOUND;

if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_BIND;
}
if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_CONNECT;
}
if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DISCONNECT;
}
if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_CLOSE;
}
if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DEREGISTER;
}
if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
mask &= ~MASK_READ;
}
if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
Object.class, ChannelPromise.class)) {
mask &= ~MASK_WRITE;
}
if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
mask &= ~MASK_FLUSH;
}
}

if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
mask &= ~MASK_EXCEPTION_CAUGHT;
}
} catch (Exception e) {
// Should never reach here.
PlatformDependent.throwException(e);
}

return mask;
}

在pipeline节点中,有两个方法,用于查找重载了指定方法的下一个或前一个节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.next;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
}

/**
* mask 是需要匹配的方法的mask
*/
private AbstractChannelHandlerContext findContextOutbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.prev;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
return ctx;
}

传入的参数mask是指定方法的掩码,例如ChannelHandlerMask.MASK_CHANNEL_ACTIVE

重要的逻辑还是在skipContext()方法中

1
2
3
4
5
private static boolean skipContext(
AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
return (ctx.executionMask & (onlyMask | mask)) == 0 ||
(ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}

mask = 0 0000 1000

onlyMask = 1 1111 1110

mask | onlyMask = 1 1111 1110

此处假设handler重载了channelActive方法

executionMask = 0 0000 1011

1
2
3
0 0000 1011  &  
1 1111 1110 =
0 0000 1010

第一个判断0 0000 1010 == 0 为 false

第二次判断主要看后面

executionMask & mask

1
2
3
0 0000 1011 &
0 0000 1000 =
0 0000 1000

0 0000 1000 == 0 也为 false

此时skipContext方法可以返回 false

findContextInbound结束循环。