
Spring WebFlux是Spring Framework 5.0中引入的以Reactor为基础的响应式编程Web框架。
WebFlux 的异步处理是基于Reactor实现的,是将输入流适配成Mono或Flux进行统一处理。
1. 响应式流(Reactive Streams)
Reactor 是一个响应式流,它有对应的发布者(Publisher),用两个类来表示:
Flux(返回0-n个元素)
Mono(返回0或1个元素)
Reactor 的订阅者(Subscriber)则是由Spring框架去完成。
响应式流(Reactive Streams) 其实就是一个规范,其特点:
- 无阻塞;
- 一个数据流;
- 可以异步执行;
- 能够处理背压;
背压(Backpressure) 可以简单理解为 消费决定生产,生产者可以根据消费压力进行动态调节生产速率的机制。
2. 发布者(Publisher)
由于响应流的特点,我们不能再返回一个简单的POJO对象来表示结果了。必须返回一个类似Java中的Future的概念,在有结果可用时通知消费者进行消费响应。
Reactive Stream规范中这种被定义为Publisher ,Publisher是一个可以提供0-N个序列元素的提供者,并根据其订阅者Subscriber的需求推送元素。一个Publisher可以支持多个订阅者,并可以根据订阅者的逻辑进行推送序列元素。
可以通过下图Excel来理解,1-9行可以看作发布者Publisher提供的元素序列,10-13行的结果计算看作订阅者Subscriber。

响应式的一个重要特点:当没有订阅时发布者(Publisher)什么也不做。
而Flux和Mono都是Publisher在Reactor3实现。Publisher提供了subscribe方法,允许消费者在有结果可用时进行消费。如果没有消费者Publisher不会做任何事情,他根据消费情况进行响应。Publisher可能返回零或者多个,甚至可能是无限的,为了更加清晰表示期待的结果就引入了两个实现模型Mono和Flux。
在WebFlux中,你的方法只需返回Mono或Flux即可。你的代码基本也只和Mono或Flux打交道。而WebFlux则会实现Subscriber ,onNext时将业务开发人员编写的Mono或Flux转换为HTTP Response返回给客户端。
3. Mono和Flux的抽象模型
Mono和Flux都是Publisher(发布者)的实现模型。
3.1 Flux
Flux是一个发出(emit)0-N个元素组成的异步序列的Publisher<T>,可以被onComplete信号或者onError信号所终止。
在响应流规范中存在三种给下游消费者调用的方法 onNext, onComplete, 和onError。
下面这张图表示了Flux的抽象模型:

3.2 Mono
Mono是一个发出(emit)0-1个元素的Publisher<T>,可以被onComplete信号或者onError信号所终止。
下面这张图表示了Mono的抽象模型(整体和Flux差不多,只不过这里只会发出0-1个元素):

4. Mono API
Mono和Flux都是实现org.reactivestreams.Publisher接口的抽象类。
Mono代表0-1个元素的发布者(Publisher)。
Mono里面有很多API:
- **just()**:可以指定序列中包含的全部元素。创建出来的
Mono序列在发布这些元素之后会自动结束。
- **empty()**:创建一个不包含任何元素,只发布结束消息的序列。
- **justOrEmpty(Optional<? extends T> data)**:从一个
Optional对象或可能为null的对象中创建Mono。只有Optional对象中包含值或对象不为null时,Mono序列才产生对应的元素。
- **error(Throwable error)**:创建一个只包含错误消息的序列。
- **never()**:创建一个不包含任何消息通知的序列。
- **delay(Duration duration)和delayMillis(long duration)**:创建一个
Mono序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
- **fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和fromSupplier()**:分别从
Callable、CompletionStage、CompletableFuture、Runnable和Supplier中创建Mono。
- **ignoreElements(Publisher source)**:创建一个
Mono序列,忽略作为源的Publisher中的所有元素,只产生结束消息。
- **create()**:通过
create()方法来使用MonoSink来创建Mono。
API使用案例如下所示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| @Slf4j @SpringBootTest public class MonoTest { @Test public void mono() { Mono.just("my name is charles").subscribe(log::info); Mono.empty().subscribe(); Mono.justOrEmpty(null).subscribe(System.out::println); Mono.justOrEmpty("测试justOrEmpty").subscribe(System.out::println); Mono.justOrEmpty(Optional.of("测试justOrEmpty")).subscribe(System.out::println); Mono.error(new RuntimeException("error")).subscribe(System.out::println, System.err::println); Mono.never().subscribe(System.out::println); Mono.delay(Duration.ofMillis(2)).map(String::valueOf).subscribe(log::info); Mono.fromRunnable(() -> { System.out.println("thread run"); throw new RuntimeException("thread run error"); }).subscribe(System.out::println, System.err::println); Mono.fromCallable(() -> "callback function").subscribe(log::info); Mono.fromFuture(CompletableFuture.completedFuture("from future")).subscribe(log::info); Mono<Void> runnableMono = Mono.fromRunnable(() -> log.warn(Thread.currentThread().getName())); runnableMono.subscribe(); Mono.fromSupplier(() -> new Date().toString()).subscribe(log::info); Mono.from(Flux.just("from", "flux")).subscribe(log::info); Mono.create(sink -> sink.success("测试create")).subscribe(System.out::println); } }
|
运行结果:

5. Flux API
Mono和Flux都是实现org.reactivestreams.Publisher接口的抽象类。
Flux表示连续序列,和Mono的创建方法有些不同,Mono是Flux的简化版,Flux可以用来表示流。
Flux API:
- **just()**:可以指定序列中包含的全部元素。
- **range()**:可以用来创建连续数值。
- **empty()**:创建一个不包含任何元素。
- **error(Throwable error)**:创建一个只包含错误消息的序列。
- **fromIterable()**:通过迭代器创建如list,set
- **fromStream()**:通过流创建
- **fromArray(T[])**:通过列表创建 如 String[], Integer[]
- **merge()**:通过将两个flux合并得到新的flux
- **interval()**:每隔一段时间生成一个数字,从1开始递增
API使用案例如下所示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @Slf4j @SpringBootTest public class FluxTest { @Test public void flux () throws InterruptedException { Flux<Integer> intFlux = Flux.just(1, 2, 3, 4, 5); Flux<Integer> rangeFlux = Flux.range(6, 4); Flux<Integer> intMerge = Flux.merge(intFlux, rangeFlux); intMerge.subscribe(System.out::print); System.out.println(); Flux.fromArray(new Integer[]{1,3,5,7,9}).subscribe(System.out::print); System.out.println(); Flux<String> strFluxFromStream = Flux.fromStream(Stream.of(" just", " test", " reactor", " Flux", " and", " Mono")); Flux<String> strFluxFromList = Flux.fromIterable(Arrays.asList(" just", " test", " reactor", " Flux", " and", " Mono")); Flux<String> strMerge = Flux.merge(strFluxFromStream, strFluxFromList); strMerge.subscribe(System.out::print); System.out.println(); Flux.interval(Duration.ofMillis(100)).map(String::valueOf) .subscribe(System.out::print); Thread.sleep(2000); } }
|
运行结果:
1 2 3 4
| 123456789 13579 just test reactor Flux and Mono just test reactor Flux and Mono 012345678910111213141516171819
|
6. subscribe方法
subscribe()方法表示对数据流的订阅动作,subscribe()方法有多个重载的方法,最多可以传入四个参数;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Test public void subscribe () throws InterruptedException { Mono.just(1).subscribe(System.out::println); Flux.just('a', 'b').subscribe(System.out::println); Flux.just('i', 'j').map(chr -> { if ('j'== chr) throw new RuntimeException("test 2 parameters"); else return String.valueOf(chr); }) .subscribe(System.out::println, err -> log.error(err.getMessage())); Flux.just("你", "我", "他", "它", "ta") .subscribe(System.out::print, System.err::println, () -> System.out.println("complete for 3")); Flux.interval(Duration.ofMillis(100)) .map(i -> { if (i == 3) throw new RuntimeException("fake a mistake"); else return String.valueOf(i); }) .subscribe(info -> log.info("info: {}", info), err -> log.error("error: {}", err.getMessage()), () -> log.info("Done"), sub -> sub.request(10)); Thread.sleep(2000); }
|
运行结果:

7. 使用StepVerifier测试响应式异步代码
通过expectNext执行类似断言的功能,如果断言不符合实际情况,就会报错。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Test public void StepVerifier () { Flux flux = Flux.just(1, 2, 3, 4, 5, 6); StepVerifier.create(flux) .expectNext(1, 2, 3, 4, 5, 6) .expectComplete() .verify(); Mono<String> mono = Mono.just("charles").log(); StepVerifier.create(mono) .expectNext("char") .verifyComplete(); }
|
运行结果:
1 2
| java.lang.AssertionError: expectation "expectNext(char)" failed (expected value: char; actual value: charles) ...
|
参考连接:https://mp.weixin.qq.com/s/O1VGS7d1TLQhgrCaQ-UQCw
原文链接: http://chaooo.github.io/2021/02/25/spring-reactive-webflux.html
版权声明: 转载请注明出处.