Applying Reactive Programing to Existing Batch Application- RXJava2

Today, I will share my experience of migrating our existing Spring batch application to reactive programing model using ReactiveX extensions.
The goal is to explain, What is ReactiveX-RXjava2 and demonstrating Working example.

What is ReactiveX And RxJava

ReactiveX Is An API for asynchronous programming with observable streams.ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.

RxJava is the Java implementation of ReactiveX. RxJava provides Java API for asynchronous programming with observable streams.
ReactiveX

Why Reactive programming?

For Me Reactive Programming is a development model focus around Asynchronous Data Streams.
In Reactive programming, Data streams are the center part of your application. Mesages ,Events, Errors ,Event chains ,call etc are going to be conveyed As a data stream.
Reactive programming provides a simple way of asynchronous programming.

Migration Story

The Batch application was very traditional use case of file processing. Processing includes Multiple complex Steps.
To Stimulate Similar processing with less complexity, I have created small Spring-Batch application that process csv file containing FirstName,LastName and Age. Processing includes below Steps.

  1. Validations - Age Should not be more than 100 years.
  2. Enrichment 1 - Upper Case First Name.
  3. Enrichment 2 - Upper Case Last Name.
  4. Segregation - Based on Age, Categories Person As Teenager, YoungAdult or Senior Citizens.
  5. Database Insert- Based on Age Group insert person in associate table.

The complete source code Spring-Batch Implementation
The complete source code RXJava2 Implementation

Let’s Start with RXJava2 Implementation

Building blocks for RxJava

Observables Representing sources of data
Subscribers or observers listening to the observables
Operators Operators create Observables Or Transform data or operate on an Observable and return an Observable set of methods for modifying and composing the data.

Step 1 : Creating Observable on Input File

ObServable is Responsible for reading data from file and passing it to Subscriber.There are multiple ways by which we create Observable. For reading data from file we will use defer() operator.
The defer() operator does not create the Observable until the observer subscribes, and create a fresh Observable for each observer.

Create Observable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
	
Observable<Person> observable = Observable.defer(() -> new CsvFileObservableSource(filePath))

public class CsvFileObservableSource implements ObservableSource<Person> {

private static final Logger log = LoggerFactory.getLogger(CsvFileObservableSource.class);
private final String filename;
CsvFileObservableSource(String filename) {
this.filename = filename;
}
@Override
public void subscribe(Observer<? super Person> observer) {
try {
Files.lines(Paths.get(filename)).forEach(inputLine -> {
String[] split = inputLine.split(",");
//Send data
observer.onNext(
Person.builder()
.firstName(split[0])
.lastName(split[1])
.age(Integer.parseInt(split[2]))
.build());
});
observer.onComplete(); // No more data to be send
} catch (IOException e) {
observer.onError(e); // Some error , send error information to subscriber.
}
}
}

Step 2 : Subscribe to Observable

observable.subscribe(onNext,onError,onComplete);
Provide onNext,onError,onComplete consumers
The Subscriber will print the values line by line read from file.

1
2
3
4
5
observable.subscribe(
person -> System.out::println,
Throwable::printStackTrace,
() -> System.out::println("I am Done");
);

Step 3 : Add Processing logic using Operators

Now Process the elements read from the file using operators like filter & map This is similar to java 8 stream.

1
2
3
4
5
6
final Observable<List<Person>> observable = Observable.defer(() -> new CsvFileObservableSource(filePath))
.filter(ValidationProcessor::process)
.map(FirstNameProcessor::process)
.map(LastNameProcessor::process)
.map(AgeProcessor::process)
.buffer(chunkSize);

Note If you observe,Observable is of Type List<Person> instead of <Person> . This is because we want to process data in chunk and not as individual data item. This capability is provided by buffer operator.

Buffer
periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time.

Step 4 : Group Person elements and Insert in different Tables

Now Based on Age, We want to Categories Person As Teenager, YoungAdult or Senior Citizens.
GroupBy Operator
Divide an Observable into a set of Observables that each emit a different subset of items from the original Observable.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 Observable<GroupedObservable<String, Person>> groupObservable = Observable
.defer(() -> new CsvFileObservableSource(filePath))
.filter(ValidationProcessor::process)
.map(FirstNameProcessor::process)
.map(LastNameProcessor::process)
.map(AgeProcessor::process)
.groupBy(Person::getAgeGroup);

groupObservable.subscribe(s -> {
if ("TEENAGER".equals(s.getKey())) {
s.buffer(chunkSize).subscribe(daoServiceImpl::insertTeenagerList);
}
if ("YOUNGADULT".equals(s.getKey())) {
s.buffer(chunkSize).subscribe(daoServiceImpl::insertYoungadultList);
}
if ("SENIORCITIZENS".equals(s.getKey())) {
s.buffer(chunkSize).subscribe(daoServiceImpl::insertSeniorcitizensList);
}
}, Throwable::printStackTrace, () -> {
System.out.println("File Processing Complited");
});

The complete source code RXJava2 Implementation

There are other important aspect like Error Handling, Backpressure and Multithreading are not in scope of this article. I will try to cover them in upcoming articles.

Share Comments