java-Reactor编程入门

https://developer.ibm.com/zh/articles/j-cn-with-reactor-response-encode/

1. 理论基础

反应式编程来源于数据流和变化的传播,意味着由底层的执行模型负责通过数据流来自动传播变化。比如求值一个简单的表达式 c=a+b,当 a 或者 b 的值发生变化时,传统的编程范式需要对 a+b 进行重新计算来得到 c 的值。如果使用反应式编程,当 a 或者 b 的值发生变化时,c 的值会自动更新。反应式编程最早由 .NET 平台上的 Reactive Extensions (Rx) 库来实现。后来迁移到 Java 平台之后就产生了著名的 RxJava 库,并产生了很多其他编程语言上的对应实现。在这些实现的基础上产生了后来的反应式流(Reactive Streams)规范。该规范定义了反应式流的相关接口,并将集成到 Java 9 中。

在传统的编程范式中,我们一般通过迭代器(Iterator)模式来遍历一个序列。这种遍历方式是由调用者来控制节奏的,采用的是拉的方式。每次由调用者通过 next()方法来获取序列中的下一个值。使用反应式流时采用的则是推的方式,即常见的发布者-订阅者模式。当发布者有新的数据产生时,这些数据会被推送到订阅者来进行处理。在反应式流上可以添加各种不同的操作来对数据进行处理,形成数据处理链。这个以声明式的方式添加的处理链只在订阅者进行订阅操作时才会真正执行。

反应式流中第一个重要概念是负压(backpressure)。在基本的消息推送模式中,当消息发布者产生数据的速度过快时,会使得消息订阅者的处理速度无法跟上产生的速度,从而给订阅者造成很大的压力。当压力过大时,有可能造成订阅者本身的奔溃,所产生的级联效应甚至可能造成整个系统的瘫痪。负压的作用在于提供一种从订阅者到生产者的反馈渠道。订阅者可以通过 request()方法来声明其一次所能处理的消息数量,而生产者就只会产生相应数量的消息,直到下一次 request()方法调用。这实际上变成了推拉结合的模式。

前面提到的 RxJava 库是 JVM 上反应式编程的先驱,也是反应式流规范的基础。RxJava 2 在 RxJava 的基础上做了很多的更新。不过 RxJava 库也有其不足的地方。RxJava 产生于反应式流规范之前,虽然可以和反应式流的接口进行转换,但是由于底层实现的原因,使用起来并不是很直观。RxJava 2 在设计和实现时考虑到了与规范的整合,不过为了保持与 RxJava 的兼容性,很多地方在使用时也并不直观。Reactor 则是完全基于反应式流规范设计和实现的库,没有 RxJava 那样的历史包袱,在使用上更加的直观易懂。Reactor 也是 Spring 5 中反应式编程的基础。学习和掌握 Reactor 可以更好地理解 Spring 5 中的相关概念。

在 Java 程序中使用 Reactor 库非常的简单,只需要通过 Maven 或 Gradle 来添加对 io.projectreactor:reactor-core 的依赖即可,目前的版本是 3.0.5.RELEASE。

2. Flux 和 Mono

Flux 和 Mono 是 Reactor 中的两个基本概念z。Flux 表示的是包含 0 到 N 个元素的异步序列。在该序列中可以包含三种不同类型的消息通知:正常的包含元素的消息、序列结束的消息和序列出错的消息。当消息通知产生时,订阅者中对应的方法 onNext(), onComplete()和 onError()会被调用。Mono 表示的是包含 0 或者 1 个元素的异步序列。该序列中同样可以包含与 Flux 相同的三种类型的消息通知。Flux 和 Mono 之间可以进行转换。对一个 Flux 序列进行计数操作,得到的结果是一个 Mono对象。把两个 Mono 序列合并在一起,得到的是一个 Flux 对象。

2.1 Mono创建

2.1.1 静态创建方法
  • empty():创建一个不包含任何元素,只发布结束消息的序列。
  • just():可以指定序列中包含的全部元素。创建出来的 Mono序列在发布这些元素之后会自动结束。
  • justOrEmpty():从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
  • error(Throwable error):创建一个只包含错误消息的序列。
  • never():创建一个不包含任何消息通知的序列。
  • fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。
  • delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
2.1.2 动态创建方法

通过 create()方法来使用 MonoSink 来创建 Mono。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void monoCreateTest_1(){
Mono.create(sink -> {
SimpleRequest msg = new SimpleRequest();
msg.setRequestMessage("hello zzk");
String requestMessage = msg.getRequestMessage();
SimpleResponse response =
SimpleResponse.newBuilder().setResponseMessage(requestMessage).build();
sink.success(response);
})
.subscribe(System.out::println);
}
@Test
public void monoCreateTest_2(){
CityService cityService = new CityService();
Mono.create(cityMonoSink -> {
int id = 1;
cityMonoSink.success(cityService.findCityById(id)); // 此处可做增删改查
}).subscribe(System.out::println);
}

3. 操作符

Reactor可以在反应式流上通过 声明 的方式添加多种不同的操作符。

3.1 buffer 和 bufferTimeout

作用:将当前流中的元素收集到集合中,并将集合对象作为流中的新元素

在进行收集时可以指定不同的条件:

  • 所包含的元素的最大数量;
  • 收集的时间间隔。(事件间隔使用Duration对象表示)

方法buffer()仅使用一个条件,而bufferTimeout可以使用两个条件

除了元素数量和时间间隔之外,还可以通过bufferUntil和bufferWhile操作符来进行收集。这两个操作符的参数是表示每个集合中的元素所要满足的条件的predicate对象。

bufferUtils会一只手机知道predicate返回为true,使得predicate返回true的那个元素可以选择添加到当前集合或者下一个集合中

bufferWhile则至于当Predicate返回为true时才会收集,一旦返回false,会立即开始下一次收集。

测试:

1
2
3
4
5
6
7
public static void bufferTest(){
Flux.range(1,10).bufferWhile(i->i>5).subscribe(System.out::println);

Flux.range(1,10).bufferUntil(integer -> integer % 2 ==0).subscribe(System.out::println);

Flux.range(1,10).buffer().subscribe(System.out::println);
}

理解:按一定的条件,将流中元素分类,聚合成新的集合元素

3.2 filter

作用:对流中包含的元素进行过滤,只满足Predicate指定条件的元素。

测试:

1
2
3
public static void filterTest(){
Flux.range(1,10).filter(integer -> integer%2==0).subscribe(System.out::println);
}

3.3 window

作用:类似于buffer,不同的是window操作符吧当前流中的元素收集到另外的Flux序列中,因此返回值是

Flux

测试:

1
2
3
public static void windowTest(){
Flux.range(1,100).window(20).subscribe(System.out::println);
}

在代码中,输出结果分别是 5 个和 2 个 UnicastProcessor 字符。这是因为 window 操作符所产生的流中包含的是 UnicastProcessor 类的对象,而 UnicastProcessor 类的 toString 方法输出的就是 UnicastProcessor 字符。

3.4 zipWith

作用:把当前流中的元素与另一个流中的元素按照一对一的方式进行合并。

  • 在合并时,可以不作任何处理,由此得到的是一个元素类型为Tuple2的流

  • 也可以通过一个BiFuction函数对合并的元素进行处理,的到的流元素为该函数的返回值

测试:

1
2
3
4
public static void zipWithTest(){
Flux.just("a","b").zipWith(Flux.just("c","d")).subscribe(System.out::println);
Flux.just("a","b").zipWith(Flux.just("c","d"),(s1,s2)->s1+s2).subscribe(System.out::println);
}

3.5 take

作用:用于从当提前流中提取元素,提取方式有多种:

  • take(long n),take(Duration timespan):按照指定的数量或时间间隔来提取。
  • takeLast(long n):提取流中的最后 N 个元素。
  • takeUntil(Predicate<? super T> predicate):提取元素直到 Predicate 返回 true。
  • takeWhile(Predicate<? super T> continuePredicate): 当 Predicate 返回 true 时才进行提取。
  • takeUntilOther(Publisher<?> other):提取元素直到另外一个流开始产生元素。

测试:

1
2
3
4
5
6
7
8
9
10
public static void takeTest(){
// 获取前10个元素
Flux.range(1,1000).take(10).subscribe(System.out::println);
// 获取后10个元素
Flux.range(1,1000).takeLast(10).subscribe(System.out::println);
// 提取元素直到Predicate返回true
Flux.range(1,1000).takeUntil(integer -> integer==100).subscribe(System.out::println);
//当Predicate返回true时,提取元素
Flux.range(1,100).takeWhile(integer -> integer<10).subscribe(System.out::println);
}

3.6 reduce 和 reduceWith

作用:对流中包含的所有元素进行累积操作,得到一份包含计算结果的Mono序列。累积操作是通过一份BiFunction来表示的

在操作的时候可以指定一个初始值,如果没有初始值,序列中的第一个元素作为初始值

1
2
3
4
5
6
public static void reduceTest(){
// 累加 返回5050
Flux.range(1,100).reduce((x,y)->x+y).subscribe(System.out::println);
// 在50的基础上累加 返回5100
Flux.range(1,100).reduceWith(()->50,(x,y)->x+y).subscribe(System.out::println);
}

3.7 merge 和 mergeSequential

作用:用于把多个流合并成一个Flux序列。

  • merge 按照所有流中元素的实际生产顺序进行合并

  • mergeSequential 按照所有流被订阅的顺序,以流为单位进行合并

测试:

1
2
3
4
5
6
7
8
9
public static void mergeTest(){
Flux.merge(Flux.interval(Duration.of(1, ChronoUnit.SECONDS)).take(10),
Flux.interval(Duration.of(2,ChronoUnit.SECONDS)).take(10))
.toStream().forEach(System.out::println);

Flux.mergeSequential(Flux.interval(Duration.of(1, ChronoUnit.SECONDS)).take(10),
Flux.interval(Duration.of(2,ChronoUnit.SECONDS)).take(10))
.toStream().forEach(System.out::println);
}

第一句:两个序列交叉输出

第二句:先输出第一个序列,第二个序列虽然没输出但是已经发射的元素存在内存中了,序列一输出完成立即输出以发射的序列二元素,再继续按照序列二的节奏输出。

3.8 flatMap 和 flatMapSequential

作用:把流中的每个元素转换成一个流,再把所有流中的元素进行合并。

flatMap 与flatMapSequential的区别,和上文介绍的merge操作符一致

1
2
3
4
public static void flatMapTest(){
Flux.just(3,5).flatMap(x->Flux.interval(Duration.of(x,ChronoUnit.SECONDS)).take(x)).toStream()
.forEach(System.out::println);
}

3.9 concatMap

作用:也是把流中的每个元素转换成一个流,再把所有流进行合并。

与 flatMap 不同的是,concatMap 会根据原始流中的元素顺序依次把转换之后的流进行合并;

与 flatMapSequential 不同的是,concatMap 对转换之后的流的订阅是动态进行的,而 flatMapSequential 在合并之前就已经订阅了所有的流。

1
2
3
4
5
6
public static void concatMapTest(){
Flux.just(3, 4)
.concatMap(x -> Flux.interval(Duration.of(x,ChronoUnit.SECONDS)).take(x))
.toStream()
.forEach(System.out::println);
}

3.10 combineLatest

作用:把所有流中的最新产生的元素合并成一个新的元素,作为返回结果流中的元素。只要其中任何一个流中产生了新的元素,合并操作就会被执行一次,结果流中就会产生新的元素。

在下面代码,流中最新产生的元素会被收集到一个数组中,通过 Arrays.toString 方法来把数组转换成 String。

1
2
3
4
5
6
7
public static void combineLatestTest(){
Flux.combineLatest(
Arrays::toString,
Flux.interval(Duration.of(4,ChronoUnit.SECONDS)).take(5),
Flux.interval(Duration.of(8,ChronoUnit.SECONDS)).take(5)
).toStream().forEach(System.out::println);
}

3.11 defer

mono defer方法创建数据源属于懒汉型,与Mono.just等创建数据源则是恶汉型,下面看一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void defer(){
//声明阶段创建DeferClass对象
Mono<Date> m1 = Mono.just(new Date());
Mono<Date> m2 = Mono.defer(()->Mono.just(new Date()));
m1.subscribe(System.out::println);
m2.subscribe(System.out::println);
//延迟5秒钟
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
m1.subscribe(System.out::println);
m2.subscribe(System.out::println);
}

输出结果

Fri Feb 07 10:22:51 GMT+08:00 2020
Fri Feb 07 10:22:51 GMT+08:00 2020
Fri Feb 07 10:22:51 GMT+08:00 2020
Fri Feb 07 10:22:56 GMT+08:00 2020

我们可以看到,创建了两个数据源,一个使用Mono.just创建,一个用Mono.defer创建,然后分别通过lambda表达式订阅这两个publisher,可以看到两个输出的时间都是10:22:51,延迟5秒钟后重新订阅,Mono.just创建的数据源时间没变,但是Mono.defer创建的数据源时间相应的延迟了5秒钟,原因在于Mono.just会在声明阶段构造Date对象,只创建一次,但是Mono.defer却是在subscribe阶段才会创建对应的Date对象,每次调用subscribe方法都会创建Date对象,在webflux中

4. 消息处理

当需要处理Flux和Mono中的消息时,可以通过subscribe方法来添加相应的订阅逻辑。在调用subscribe方法时可以指定需要处理的消息类型。可以只处理其中包含的正常消息,也可以同时处理错误消息和完成消息。

  • 通过subscribe()方法处理正常和错误消息
1
2
3
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.subscribe(System.out::println, System.err::println);

输出:

1
2
3
1
2
java.lang.IllegalStateException
  • 出现错误是返回默认值

正常的消息处理相对简单。当出现错误时,有多种不同的处理策略。第一种策略是通过 onErrorReturn()方法返回一个默认值。

1
2
3
4
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.onErrorReturn(-1)
.subscribe(System.out::println);

输出:

1
2
3
1
2
-1