Project Reactor [Part 2] Exploring Operators in Flux & Mono

unsplash-logoDaniil Silantev

This is the second article on a series of Project Reactor. In previous article we discussed basic of Flux an Mono. In this second article, I’ll show you how to use Operators to modified and transform flux. We’ll do this through examples.
The code for this post is available on my Github account here

Filter

This is similar to java 8 stream filter. It takes predicates, the elements which satisfies predicate condition will be pass thro.

Filtering
1
2
3
4
5
6
7
8
9
10
@Test
public void testFilteringFlux() {

Flux<Integer> fluxFromJust = Flux.just(1, 2,3,4,5,6,7,8,9,10).log();
Flux<Integer> filter = fluxFromJust.filter(i -> i % 2 == 0);//filter the even numbers only
StepVerifier
.create(filter)
.expectNext(2,4,6,8,10)
.verifyComplete();
}

Distinct

it filter out duplicates.

Distinct
1
2
3
4
5
6
7
8
9
10
@Test
public void distinct() {

Flux<Integer> fluxFromJust = Flux.just(1, 2,3,4,5,1,2,3,4,5).log();
Flux<Integer> distinct = fluxFromJust.distinct();
StepVerifier
.create(distinct)
.expectNext(1, 2,3,4,5)
.verifyComplete();
}

takeWhile

Consumes values from flux until predicate returns TRUE for the values

takeWhile
1
2
3
4
5
6
7
8
9
10
@Test
public void takeWhile() {

Flux<Integer> fluxFromJust = Flux.just(1, 2,3,4,5,6,7,8,9,10).log();
Flux<Integer> takeWhile = fluxFromJust.takeWhile(i -> i <=5);
StepVerifier
.create(takeWhile)
.expectNext(1, 2,3,4,5)
.verifyComplete();
}

skipWhile

Skips elements until predicate returns TRUE for the values

skipWhile
1
2
3
4
5
6
7
8
9
10
@Test
public void skipWhile() {

Flux<Integer> fluxFromJust = Flux.just(1, 2,3,4,5,6,7,8,9,10).log();
Flux<Integer> takeWhile = fluxFromJust.skipWhile(i -> i <=5);
StepVerifier
.create(takeWhile)
.expectNext(6,7,8,9,10)
.verifyComplete();
}

map

Map operation is similar to java 8 stream map operation. Map operation is use for transforming element from one Type to another.

Convert String Flux To Integer Flux
1
2
3
4
5
6
7
8
9
10
@Test
public void testMapOperationFlux() {

Flux<String> fluxFromJust = Flux.just("RandomString", "SecondString","XCDFRG").log();
Flux<Integer> filter = fluxFromJust.map(i-> i.length());
StepVerifier
.create(filter)
.expectNext(12,12,6)
.verifyComplete();
}

flatMap

FlatMap Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave.

FlatMap
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
public void testFlatMapFlux() {

Flux<Integer> fluxFromJust = Flux.just(1,2,3).log();
Flux<Integer> integerFlux = fluxFromJust
.flatMap(i -> getSomeFlux(i));//getSomeFlux returns flux of range ,
// then we do flatMap on all Flux to convert them in to single Flux

StepVerifier
.create(integerFlux)
.expectNextCount(30)
.verifyComplete();
}
private Flux<Integer> getSomeFlux(Integer i) {
return Flux.range(i,10);
}

Index

Keep information about the order in which source values were received by indexing them with a 0-based incrementing long

index
1
2
3
4
5
6
7
8
9
10
11
12
public  void maintainIndex(){

Flux<Tuple2<Long, String>> index = Flux
.just("First", "Second", "Third")
.index();
StepVerifier.create(index)
.expectNext(Tuples.of(0L,"First"))
.expectNext(Tuples.of(1L,"Second"))
.expectNext(Tuples.of(2L,"Third"))
.verifyComplete();

}

flatMapMany

This operator is very useful if you want to convert mono to flux.flatMapMany transforms the signals emitted by this Mono into signal-specific Publishers, then forward the applicable Publisher’s emissions into the returned Flux.

flatMapMany
1
2
3
4
5
6
7
8
9
@Test
public void flatMapManyTest() {
Mono<List<Integer>> just = Mono.just(Arrays.asList(1, 2, 3));
Flux<Integer> integerFlux = just.flatMapMany(it -> Flux.fromIterable(it));
StepVerifier
.create(integerFlux)
.expectNext(1, 2, 3)
.verifyComplete();
}

startWith

startWith
1
2
3
4
5
6
7
8
@Test
public void startWith(){
Flux<Integer> just = Flux.just(1, 2, 3);
Flux<Integer> integerFlux = just.startWith(0);
StepVerifier.create(integerFlux)
.expectNext(0,1,2,3)
.verifyComplete();
}

concatWith

The concatWith method does concatenation of two flux sequentially subscribing to the first flux then waits for completion and then subscribes to the next.

concatWith
1
2
3
4
5
6
7
8
@Test
public void concatWith(){
Flux<Integer> just = Flux.just(1, 2, 3);
Flux<Integer> integerFlux = just.concatWith(Flux.just(4,5));
StepVerifier.create(integerFlux)
.expectNext(1,2,3,4,5)
.verifyComplete();
}

merge

Merge data from Publisher sequences contained in an array / vararg into an interleaved merged sequence. Unlike concat, sources are subscribed to eagerly.

Merge
1
2
3
4
5
6
7
8
9
@Test
public void zip() throws InterruptedException {
Flux<Integer> firsFlux = Flux.just(1, 2, 3,4,5).delayElements(Duration.ofSeconds(1));
Flux<Integer> secondFlux = Flux.just(10, 20, 30, 40).delayElements(Duration.ofSeconds(1));
firsFlux.mergeWith(secondFlux)
.subscribe(System.out::println);
//This will print numbers received from firsFlux and secondFlux in random order
TimeUnit.SECONDS.sleep(11);
}

CollectList

Collect all elements emitted by this Flux into a List that is emitted by the resulting Mono when this sequence completes.

CollectList
1
2
3
4
5
6
7
8
9
@Test
public void CollectList(){
Mono<List<Integer>> listMono = Flux
.just(1, 2, 3)
.collectList();
StepVerifier.create(listMono)
.expectNext(Arrays.asList(1,2,3))
.verifyComplete();
}
CollectSortedList
1
2
3
4
5
6
7
8
9
@Test
public void CollectSortedListList(){
Mono<List<Integer>> listMono = Flux
.just(1, 2, 3,9,8)
.collectSortedList();
StepVerifier.create(listMono)
.expectNext(Arrays.asList(1,2,3,8,9))
.verifyComplete();
}

zip

Zip multiple sources together, that is to say wait for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator). The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This “Step-Merge” processing is especially useful in Scatter-Gather scenarios.

Zip
1
2
3
4
5
6
7
8
9
10
@Test
public void zip() {
Flux<Integer> firsFlux = Flux.just(1, 2, 3);
Flux<Integer> secondFlux = Flux.just(10, 20, 30, 40);
Flux<Integer> zip = Flux.zip(firsFlux, secondFlux, (num1, num2) -> num1 + num2);
StepVerifier
.create(zip)
.expectNext(11, 22, 33)
.verifyComplete();
}

buffer

Collect all incoming values into a single List buffer that will be emitted by the returned Flux once this Flux completes.

buffer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
public void bufferTest() {
Flux<List<Integer>> buffer = Flux
.just(1, 2, 3, 4, 5, 6, 7)
.buffer(2);

StepVerifier
.create(buffer)
.expectNext(Arrays.asList(1, 2))
.expectNext(Arrays.asList(3, 4))
.expectNext(Arrays.asList(5, 6))
.expectNext(Arrays.asList(7))
.verifyComplete();

}

You can check all available operators here

It the next article, I’ll show you how to handel errors while processing data in Mono and Flux.

The code for this post is available on my Github account here

Share Comments