Project Reactor [Part 3] Error Handling

unsplash-logoDaniil Silantev
This is the third article on a series of articles on Project Reactor. In previous article we discussed different operators in Project Reactor. In this article, I’ll show you how to do error handling in reactor. We’ll do this through examples.

In Reactive programming errors are also consider as terminal events. When an error occurs, event is sent to onError method of Subscriber.Before we start looking at how we handle errors, you must keep in mind that any error in a reactive sequence is a terminal event. Even if an error-handling operator is used, it does not let the original sequence continue. Rather, it converts the onError signal into the start of a new sequence (the fallback one). In other words, it replaces the terminated sequence upstream of it.

The code for this post is available on my Github account here

OnError

In below code blok, subscriber will print values from 1,2,3,4,5 and after that onError code block will get executed. Note that number 6 will never be printed as Error is terminal event. subscriber Completed line will also not be printed on console as in case of error we do not get onComplete event.

OnError
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

public void testErrorFlowFlux() {
Flux<Integer> fluxFromJust = Flux.just(1, 2,3,4,5)
.concatWith(Flux.error(new RuntimeException("Test")))
.concatWith(Flux.just(6));
fluxFromJust.subscribe(
(it)-> System.out.println("Number is " + it), // OnNext
(e) -> e.printStackTrace(), //OnError
() -> System.out.println("subscriber Completed") //onComplete
);
//To Unit test this code
StepVerifier
.create(fluxFromJust)
.expectNext(1, 2,3,4,5)
.expectError(RuntimeException.class);

onErrorResume

If you want to report exception and then want to return some fallback value you can use ‘onErrorResume’

onErrorResume
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 
@Test
public void testOnErrorResume() throws InterruptedException {
Flux<Integer> fluxFromJust = Flux.just(1, 2,3,4,5)
.concatWith(Flux.error(new RuntimeException("Test")))
.concatWith(Flux.just(6))
.onErrorResume(e->{
log.info("**************");
System.out.println("Exception occured " + e.getMessage());
//Return Some Fallback Values
return Flux.just(7,8);
});

StepVerifier
.create(fluxFromJust)
.expectNext(1, 2,3,4,5)
.expectNext(7,8)
.verifyComplete();

}

onErrorReturn

onErrorReturn can be used if you just want to return some fallback value for error item

onErrorResume
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
   
@Test
public void testOnErrorReturn() throws InterruptedException {

Flux<Integer> fluxFromJust = Flux.just(1, 2,3,4,5)
.concatWith(Flux.error(new RuntimeException("Test")))
.concatWith(Flux.just(6))
.onErrorReturn(99)
;
StepVerifier
.create(fluxFromJust)
.expectNext(1, 2,3,4,5)
.expectNext(99)
.verifyComplete();

}

OnErrorContinue

If you want to ignore error produce by any operator onErrorContinue can be use.
onErrorContinue will ignore the error element and continue the sequence.
in below example for number 3 we are getting some exception, onErrorContinue will simply ignore that exception.

onErrorContinue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 

@Test
public void testOnErrorContinue() throws InterruptedException {

Flux<Integer> fluxFromJust = Flux.just(1, 2,3,4,5)
.map(i->mapSomeValue(i))
.onErrorContinue((e,i)->{
System.out.println("Error For Item +" + i );
})
;
StepVerifier
.create(fluxFromJust)
.expectNext(1, 2,4,5)
.verifyComplete();

}

private int mapSomeValue(Integer i) {
if( i==3)
throw new RuntimeException("Exception From Map");
return i;
}

OnErrorMap

To Map Exception to any custom exception

OnErrorMap
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 
@Test
public void testOnErrorMap() throws InterruptedException {

Flux<Integer> fluxFromJust = Flux.just(1, 2,3,4,5)
.concatWith(Flux.error(new RuntimeException("Test")))
.concatWith(Flux.just(6))
.map(i->i*2)
.onErrorMap(e -> new CustomeException(e) )
;
StepVerifier
.create(fluxFromJust)
.expectNext(2, 4,6,8,10)
.expectError(CustomeException.class);
}

retry

You can add retry on error. But keep in mind the retry will be started from first element.

retry
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void testOnRetry() throws InterruptedException {

Flux<Integer> fluxFromJust = Flux.just(1, 2,3)
.map(i->i*2)
.onErrorMap(e -> new CustomeException(e) )
.retry(2);

StepVerifier
.create(fluxFromJust)
.expectNext(2, 4,6)
.expectNext(2, 4,6)
.expectNext(2, 4,6)
.expectError(CustomeException.class)
.verify();
}

onErrorStop

onErrorStop will stop the execution

onErrorStop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void testOnErrorStop() throws InterruptedException {

Flux<Integer> fluxFromJust = Flux.just(1, 2,3)
.concatWith(Flux.error(new RuntimeException("Test")))
.concatWith(Flux.just(6))
.map(i->doubleValue(i))
.onErrorStop();
StepVerifier
.create(fluxFromJust)
.expectNext(2, 4,6)
.verifyError();
}
private Integer doubleValue(Integer i) {
System.out.println("Doing Multiple");
return i*2;
}

doOnError

It you want to execute side effect on error

doOnError
1
2
3
4
5
6
7
8
9
10
11
12
13
14
 @Test
public void testDoOnError() throws InterruptedException {

Flux<Integer> fluxFromJust = Flux.just(1, 2,3,4,5)
.concatWith(Flux.error(new RuntimeException("Test")))
.doOnError(e -> System.out.println("Rum some Side effect!!"));

StepVerifier
.create(fluxFromJust)
.expectNext(1, 2,3,4,5)
.expectError()
.verify();
TimeUnit.SECONDS.sleep(2);
}

doFinally

doFinally is similar to finally block of try catch.

doOnError
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 @Test
public void testDoFinally() throws InterruptedException {
Flux<Integer> fluxFromJust = Flux.just(1, 2,3,4,5)
.concatWith(Flux.error(new RuntimeException("Test")))
.doFinally( i->{
if (SignalType.ON_ERROR.equals(i)) {
System.out.println("Completed with Error ");
}
if (SignalType.ON_COMPLETE.equals(i)) {
System.out.println("Completed without Error ");
}
});

StepVerifier
.create(fluxFromJust)
.expectNext(1, 2,3,4,5)
.expectError()
.verify();
TimeUnit.SECONDS.sleep(2);
}


The code for this post is available on my Github account here

Share Comments