Project Reactor [Part 1] - Playing With Flux & Mono

unsplash-logoDaniil Silantev

What is Reactor

Project Reactor implements the reactive programming model.It implements the Reactive Streams Specification a standard for building reactive applications.
reactor integrates directly with the Java 8 functional APIs, notably CompletableFuture, Stream, and Duration. It offers composable asynchronous sequence APIs — Flux (for [N] elements) and Mono (for [0|1] elements).

Key Components of Reactive Manifesto and Reactive Streams Specification

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols. Reactive Specification are based on Reactive Manifesto

Reactive Specifications defines below key Contracts
Publisher: Representing sources of data also called as Observables.
Subscriber: Listening to the Publisher. Subscriber subscribes to publisher
Subscription: Publisher will create a subscription for every Subscriber which will try to subscribe to it.
Processor: A processor can be used as a publisher as well as subscriber. Processors are used for data transformation. these are set of methods for modifying and composing the data.

Objective Of this post

Objective of these series of post is to show, How to use Flux & Mono. How to subscribe to them for consuming data. Different operation that we can perform on data while consuming data. I will be using Spring Webflux which internally used Project Reactor

Let get started, We need spring-boot-starter-webflux to get started.Project rector also provides very handy library reactor-test for unit testing.

The code for this post is available on my Github account here

Pom File
1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scop>
</dependency>
Creating Flux and Mono

Flux A Reactive Streams Publisher with rx operators that emits 0 to N elements, and then completes(successfully or with an error).

Few Example of creating Flux
1
2
3
4
5
6
7
8
9

Flux<Integer> fluxFromJust = Flux.just(1, 2, 3);//Create a Flux that emits the provided elements and then completes.
Flux<Integer> integerFlux = Flux.fromIterable(Arrays.asList(1, 2, 3));//Create a Flux that emits the items contained in the provided Iterable.
Flux.fromStream(Arrays.asList(1,2,3).stream());//Create a Flux that emits the items contained in the provided Stream.
Integer[] num2 = {1, 2, 3, 4, 5};
Flux<Integer> integerFluxFromArray = Flux.fromArray(num2);//Create a Flux that emits the items contained in the provided array.
Flux.generate(<Consumer>) ;//Programmatically create a Flux by generating signals one-by-one via a consumer callback.
Flux.Creare(<Consumer>); //Programmatically create a Flux with the capability of emitting multiple elements in a synchronous or asynchronous manner through the FluxSink API.

Mono A Reactive Streams Publisher with basic rx operators that completes successfully by emitting an element, or with an error.

Few Examples of creating Mono
1
2
3
4
5
6
7
8
Mono<Integer> just = Mono.just(1);//
Mono<Object> empty = Mono.empty();//
Mono.create(); //Create a Mono that completes without emitting any item.
Mono.from(<Publisher>)//Expose the specified Publisher with the Mono API, and ensure it will emit 0 or 1 item.

//there are multiple options available for creating mono from Callable,CompletionStage,CompletableFuture etc
Mono.fromCallable()
}

Flux & Mono are lazy

Lazy in context of reactive programming means, No Matter how many operation you do on the stream, They won’t be executed until you consume it. Flux & Mono start emitting values only when subscriber is attached.

Creating Subscriber and consuming values

Subscriber has multiple overloaded methods, let’s check few of them.

Subscriber with onNext

Subscriber with consumer (onNext)
1
2
Flux<Integer> fluxFromJust = Flux.just(1, 2, 3);
fluxFromJust.subscribe(i->System.out.println(i));//It will print number 1,2,3

Subscriber with onNext & onError. OnError will be get call in case of error.
Flux has one handy concatWith which we can use to concat error

Subscriber with consumer and error handler (onError)
1
2
3
4
5
6
7
Flux<Integer> fluxFromJust = 
Flux.just(1, 2, 3)
.concatWith(Flux.error( new RuntimeException("Test Exception"));
fluxFromJust.subscribe(
i->System.out.println(i),// onNext
e->System.out.println("In Error Block " + e.getMessage()) //onError
);

Subscriber with onNext, onError and onComplete. On Successful completion onComplete method get’s called
Subscriber with consumer, error handler and onComplete
1
2
3
4
5
6
7
8
Flux<Integer> fluxFromJust = 
Flux.just(1, 2, 3)
.concatWith(Flux.error( new RuntimeException("Test Exception"));
fluxFromJust.subscribe(
i->System.out.println(i),//Will Print 1,2,3 //onNext
e->System.out.println("In Error Block " + e.getMessage())//OnError
()-> System.out.println("Process Completed") //OnComplete
);

To Log the activities on Flux / Mono. We can use log method and it will start logging all events.

Log Events
1
2
3
Flux<Integer> fluxFromJust = 
Flux.just(1, 2, 3)
.log()

Lets write some Unit test using reactor-test

StepVerifier can be use like below to validate the expectations

StepVerifier
1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void testCreateFluxAndSubscribe() {

Flux<Integer> fluxFromJust = Flux.just(1, 2, 3).log();
StepVerifier
.create(fluxFromJust)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.verifyComplete();
}


IF You want to verify only count then we can use expectNextCount
StepVerifier
1
2
3
4
5
6
7
8
9
@Test
public void testCreateFluxAndSubscribeVerifyCount() {

Flux<Integer> fluxFromJust = Flux.just(1, 2, 3).log();
StepVerifier
.create(fluxFromJust)
.expectNextCount(3)
.verifyComplete();
}

To Assert on Error
StepVerifier
1
2
3
4
5
6
7
8
9
@Test
public void testCreateFluxAndSubscribeVerifyError() {

Flux<Integer> fluxFromJust = Flux.just(1).concatWith(Flux.error(new RuntimeException("Test")));
StepVerifier
.create(fluxFromJust)
.expectNextCount(1)
.verifyError(RuntimeException.class);
}

It the next article, I’ll show you how to process and transform data in Mono and Flux.
The code for this post is available on my Github account here

Share Comments