netty源码分析之异步编程

netty的异步编程模型

异步编程的目标是:提交一个任务给线程池,在任务执行期间,提交者可以执行其他的逻辑,当提交的任务执行完成,通知提交者来获取执行结果

netty 异步任务的实现Future/Promise异步模型,

模型定义了几个接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface Future<V> extends java.util.concurrent.Future<V> {
boolean isSuccess();
boolean isCancellable();
Throwable cause();
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> sync() throws InterruptedException;
Future<V> syncUninterruptibly();
Future<V> await() throws InterruptedException;
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
V getNow();
boolean cancel(boolean mayInterruptIfRunning);
}
1
2
3
4
5
6
7
8
9
10
11
12
public interface ChannelFuture extends Future<Void> {
Channel channel(); // 实现类需要持有channel引用
boolean isVoid(); // 如果该实现类是一个Future<Void>
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
boolean isSuccess(); // 当且仅当io操作成功返回true
boolean isCancellable(); // 当且仅当io操作被cancel方法取消,返回true
ChannelFuture sync() throws InterruptedException; // 等待这个future直到它完成为止,如果future失败,则重新抛出失败的原因。
ChannelFuture await() throws InterruptedException;// 等待这个future的完成
}
1
2
3
4
5
6
7
8
9
10
11
12
13
public interface Promise<V> extends Future<V> {
Promise<V> setSuccess(V result);// 标记此futrue成功,并通知所有的监听器
boolean trySuccess(V result);//标记此futrue成功,并通知所有的监听器,当且仅当成功返回true
Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
boolean setUncancellable(); // 标记此futrue不可取消,如果这该futrue已取消,返回false
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Promise<V> sync() throws InterruptedException;
Promise<V> await() throws InterruptedException;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public interface ChannelPromise extends ChannelFuture, Promise<Void> {
@Override
Channel channel();
@Override
ChannelPromise setSuccess(Void result);
ChannelPromise setSuccess();
boolean trySuccess();
@Override
ChannelPromise setFailure(Throwable cause);
@Override
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise sync() throws InterruptedException;
@Override
ChannelPromise syncUninterruptibly();
@Override
ChannelPromise await() throws InterruptedException;
@Override
ChannelPromise awaitUninterruptibly();
/**
* Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.
*/
ChannelPromise unvoid();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//ChannelFutureListener
public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> {

ChannelFutureListener CLOSE = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
future.channel().close();
}
};

ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
future.channel().close();
}
}
};

ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
future.channel().pipeline().fireExceptionCaught(future.cause());
}
}
};
}
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
void operationComplete(F future) throws Exception;
}

要创建一个ChannelPromise,需要提供一个channel和处理该channel上事件的Executor,例如注册一个channel时,创建一个DefaultChannelPromise,里面包装了即将注册的channel和即将执行注册操作的Executor:

1
2
3
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
1
2
3
4
5
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}

完成注册操作之后返回这个promise,注意返回值类型是ChannelFutureDefaultChannelPromise向上转型成 ChannelFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void register0(ChannelPromise promise) {
try {

if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise); // >>
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t); // >>
}
}

主要看这两个方法:safeSetSuccesssafeSetFailure

1
2
3
4
5
6
7
8
9
10
11
protected final void safeSetSuccess(ChannelPromise promise) {
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
}
}

protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
}
}

trySuccess会通知所有的监听器,跟一下调用链:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public boolean trySuccess(V result) {
return setSuccess0(result);
}
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
...
notifyListeners();
...
}
private void notifyListeners() {
...
notifyListenersNow();
...

safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
}

在往下就是调用具体的监听器方法operationComplete(future)

此处需要注意的是,DefaultPromise中保存监听器的字段是private Object listeners,它是一个object引用,当我们之添加了一个监听器的时候,该引用是GenericFutureListener类型的,当大于一个监听器被添加到这个promise中,该引用就是DefaultFutureListeners类型了,此类型维护了一个监听器数组。这样处理的原因是为了性能考虑,因为大多数时候我们只会添加一个监听器。看一下add函数更一目了然·

1
2
3
4
5
6
7
8
9
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener);
} else {
listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
}
}

netty的异步响应处理的是多线程之间的任务,最终也没有超出虚拟机的层面,还有一种异步编程,节点与节点之间的异步响应,比如向消息中间件服务器发送异步消息

rocketMq的异步消息发送

rocket mq 底层使用netty实现客户端与服务器之间的通信,使用rocketmq原生的api异步发送一条消息:消息msg发送出去之后立即返回,当服务器响应到达时,mq客户端会通过异步编程处理最终调用到SendCallback中的方法,消息发送成功调用onSuccess(),否则调用onException()

1
2
3
4
5
6
7
8
9
10
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%s%n" ,sendResult);
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
}
});

接下来我将跟一下源码,分析一下这里是怎么实现客户端与服务器之间的异步通信的;

1
2
3
4
5
6
//DefaultMQProducer
public void send(Message msg,
SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.send(msg, sendCallback);
}
1
2
3
4
5
//DefaultMQProducerImpl
public void send(Message msg,
SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//DefaultMQProducerImpl
public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getAsyncSenderExecutor();
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
makeSureStateOK();
Validators.checkMessage(msg, defaultMQProducer);

if (!msg.getTopic().equals(mq.getTopic())) {
throw new MQClientException("message's topic not equal mq's topic", null);
}
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime) {
try {
sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null,
timeout - costTime);
} catch (MQBrokerException e) {
throw new MQClientException("unknown exception", e);
}
} else {
sendCallback.onException(new RemotingTooMuchRequestException("call timeout"));
}
} catch (Exception e) {
sendCallback.onException(e);
}

}

});
} catch (RejectedExecutionException e) {
throw new MQClientException("executor rejected ", e);
}

}