「Spring Reactive Stack」Reactor异常处理

不管是在响应式编程还是普通的程序设计中,异常处理都是一个非常重要的方面。

对于Flux或者Mono来说,所有的异常都是一个终止的操作,即使你使用了异常处理,原生成序列也不会继续。
但是如果你对异常进行了处理,那么它会将oneError信号转换成为新的序列的开始,并将替换掉之前上游产生的序列。

先看一个Flux产生异常的例子

1
2
3
4
5
6
@Test
void test1() {
Flux.just(10, 5, 0)
.map(i -> "100 / " + i + " = " + (10 / i))
.subscribe(System.out::println);
}

会得到一个异常ErrorCallbackNotImplemented

1
2
3
4
100 / 10 = 1
100 / 5 = 2
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
Caused by: java.lang.ArithmeticException: / by zero

1. onError方法

ReactorsubscribeonError方法(subscribe第二个参数),就是try catch的一个具体应用:

1
2
3
4
5
6
7
@Test
void test2() {
Flux.just(10, 5, 0)
.map(i -> "100 / " + i + " = " + (10 / i))
.subscribe(System.out::println,
error -> System.err.println("Error: " + error));
}

运行结果:

1
2
3
100 / 10 = 1
100 / 5 = 2
Error: java.lang.ArithmeticException: / by zero

2. onErrorReturn方法

onErrorReturn可以在遇到异常的时候fallback到一个静态的默认值:

1
2
3
4
5
6
7
@Test
void test3() {
Flux.just(10, 5, 0)
.map(i -> "100 / " + i + " = " + (10 / i))
.onErrorReturn("Divided by zero :(")
.subscribe(System.out::println);
}

运行结果:

1
2
3
100 / 10 = 1
100 / 5 = 2
Divided by zero :(

onErrorReturn还支持一个Predicate参数,用来判断要falback的异常是否满足条件。

1
public final Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue) 

3. onErrorResume方法

onErrorResume可以在捕获异常之后调用其他的方法。

1
2
3
4
5
6
7
@Test
void test4() {
Flux.just(10, 5, 0)
.map(i -> "100 / " + i + " = " + (10 / i))
.onErrorResume(e -> System.out::println)
.subscribe(System.out::println);
}

运行结果:

1
2
3
100 / 10 = 1
100 / 5 = 2
reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber@23469199

4. retry方法

retry的作用就是当遇到异常的时候,重启一个新的序列

1
2
3
4
5
6
7
8
@Test
void test8() {
Flux.just(10, 5, 0)
.map(i -> "100 / " + i + " = " + (10 / i))
.retry(1)
.elapsed()
.subscribe(System.out::println);
}

运行结果:

1
2
3
4
5
6
[4,100 / 10 = 1]
[0,100 / 5 = 2]
[9,100 / 10 = 1]
[0,100 / 5 = 2]
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
Caused by: java.lang.ArithmeticException: / by zero

elapsed是用来展示产生的value时间之间的duration。从结果我们可以看到,retry之前是不会产生异常信息的。

5. doOnError方法

doOnError只记录异常信息,不破坏原来的React结构。

1
2
3
4
5
6
7
@Test
void test5() {
Flux.just(10, 5, 0)
.map(i -> "100 / " + i + " = " + (10 / i))
.doOnError(error -> System.out.println("we got the error: "+ error))
.subscribe(System.out::println);
}

运行结果:

1
2
3
4
5
100 / 10 = 1
100 / 5 = 2
we got the error: java.lang.ArithmeticException: / by zero
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
Caused by: java.lang.ArithmeticException: / by zero

doOn系列方法是publisher的同步钩子方法,在subscriber触发一系列事件的时候触发

6. doFinally方法

doFinally可以像传统的同步代码那样使用finally去做一些事情,比如关闭http连接,清理资源等。

1
2
3
4
5
6
7
@Test
void test6() {
Flux.just(10, 5, 0)
.map(i -> "100 / " + i + " = " + (10 / i))
.doFinally(error -> System.out.println("Finally,I will make sure to do something:"+error))
.subscribe(System.out::println);
}

运行结果:

1
2
3
4
5
6
100 / 10 = 1
100 / 5 = 2
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
Caused by: java.lang.ArithmeticException: / by zero
at ...
Finally,我会确保做一些事情:onError

第二种收尾操作的方法是using,我们先看一个using的定义:

1
2
public static <T, D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends
Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup)

可以看到using支持三个参数,resourceSupplier是一个生成器,用来在subscribe的时候生成要发送的resource对象。
sourceSupplier是一个生成Publisher的工厂,接收resourceSupplier传过来的resource,然后生成Publisher对象。
resourceCleanup用来对resource进行收尾操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
void test7() {
AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
@Override
public void dispose() {
isDisposed.set(true);
}
@Override
public String toString() {
return "DISPOSABLE";
}
};
Flux.using(
() -> disposableInstance,
disposable -> Flux.just(disposable.toString()),
Disposable::dispose)
.subscribe(System.out::println);
}