Write Clean Asynchronous Code With CompletableFuture Java-8

unsplash-logorawpixel

Java 8 has introduced a lot of features. With Addition of CompletableFuture . Writing Clean & Readable Asynchronous code has become much more easier. CompletableFuture has more than 50 methods which makes it very useful.

The code for this post is available for download here.

CompletableFuture

CompletableFuture is an implementation of the Future & CompletionStage interface but with a lot of modern Features. It Supports lambdas and takes advantage of non-blocking methods via callbacks and promotes asynchronous reactive programming model. CompletableFuture allows us to write non-blocking code by running a task on a separate thread than the main application thread and notifying the main thread about its Progress, Completion or Failure. CompletableFuture is inspired from ListenableFuture in Guava and Are similar to Promise in java scripts.

Why CompletableFuture instead Of Future?

Callable and Future were introduced in Java 5. Future is placeholders for a result that hasn’t happened yet.Future Can use a Runnable or Callable instance to complete the submitted task. There are two methods to get actual value from Future.
get() : When this method is called, thread will wait for result indefinitely.
get(long timeout, TimeUnit unit): When this method is called, thread will wait for result only for specified time.

There are multiple problems with Future
Blocking - The get method is blocking and need to wait until the computation is done. Future does not have any method that can notify on completion and does not have capability to attach a callback function.
Chaining & Composition - Many times we want to chain multiple future to complete long computation. You need to merger results and send results to another task. It’s Hard to implement such chaining with future.
Exception Handling - Future does not provide any construct for Exception Handling.
All these issues are addressed by CompletableFuture.
lets try different methods provided by CompletableFuture

Create Simple Completeable Future


The simplest way is to create CompleteableFuture is CompleteableFuture.completedFuture method which returns an a new, finished CompleteableFuture. Creating already Completed CompleteableFuture becomes very useful in many cases.
Create Completed CompleteableFuture
1
2
3
4
5
6
7
8
9
10
@Test
void simpleComletedCompletableFuture() {
CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("Some Value");
assertTrue(completableFuture.isDone());
try {
assertEquals("Some Value", completableFuture.get());
} catch (ExecutionException | InterruptedException e) {
fail("No Exception expected");
}
}

Note : if we call get method on incomplete CompleteableFuture , the get call will block forever because the Future is never completed.We can use CompletableFuture.complete() method to manually complete a Future.

Simple Asynchronous computation using runAsync

If We want to run some task in background that does not returns any value, then we can use CompletableFuture.runAsync() it takes a Runnable and returns CompletableFuture<Void>

Simple Asynchronous computation using runAsync
1
2
3
4
5
6
7
8
9
10
11
   
public void process() {
System.out.println(Thread.currentThread() + " Process");
someStateVaribale.set(100);
}
@Test
void completableFutureRunAsync() {
CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> process());
runAsync.join();
assertEquals(100, someStateVaribale.get());
}

Simple Asynchronous computation using SupplyAsync

If we want to run some task in background that Returns Some Value, then we can use CompletableFuture.supplyAsync() it takes a Supplier<T> and returns completableFuture<T>

Simple Asynchronous computation using supplyAsync
1
2
3
4
5
6
7
8
9
@Test
void completableFutureSupplyAsync() {
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::processSomeData);
try {
assertEquals("Some Value", supplyAsync.get()); //Blocking
} catch (ExecutionException | InterruptedException e) {
fail("No Exception expected");
}
}

CompletableFuture with Custom Executor

You might be wondering, Which Thread is executing the supplyAsync & runAsync task and Who is creating these Threads? Similar to parallel streams CompletableFuture executes these tasks in a thread obtained from the global ForkJoinPool.commonPool().
We Can always provide our custom Executor to CompletableFuture.All the methods in the CompletableFuture API has two variants, With or Without Executor.

CompletableFuture with Custom Executor
1
2
3
4
5
6
7
8
9
10
  @Test
void completableFutureSupplyAsyncWithExecuto() {
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::processSomeData,newFixedThreadPool);
try {
assertEquals("Some Value", supplyAsync.get());
} catch (ExecutionException | InterruptedException e) {
fail("No Exception expected");
}
}

CompletableFuture Callbacks and Chaining

We know that CompletableFuture.get() is blocking and we want to avoid this. We should get some notification after Future completes.
CompletableFuture provides thenApply(), thenAccept() and thenRun() to attach callbacks

thenAccept()
If We want to run some code after receiving some value from Future then we can use thenAccept()
thenApply()
If We want to run some code after receiving value from Future and then want to return some value for this we can use thenAccept()
thenRun()
If We want to run some code after completion of the Future and dont want to return any value for this we can use thenRun()

CompletableFuture thenAccept thenApply thenRun
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  
@Test
public void completableFutureThenAccept() {
CompletableFuture.supplyAsync(this::process)
.thenAccept(this::notify) //Non Blocking,notify method will be called automatically after compilation or process method
.join();
assertEquals(100,someStateVaribale.get());
}

@Test
public void completableFutureThenApply() {
Integer notificationId = CompletableFuture.supplyAsync(this::process)
.thenApply(this::notify)//Non Blocking will return some value
.join();
assertEquals(new Integer(1),notificationId);
}

@Test
public void completableFutureThenApply() {
CompletableFuture.supplyAsync(this::process)
.thenRun(this::notifyMe)
.join();
assertEquals(100,someStateVaribale.get());
}

Chaining Callbacks
If We have large Asynchronous computation, Then we can continue passing values from one callback to another.

Chaining Callbacks
1
2
3
4
5
6
7
8
  
@Test
public void completableFutureThenApplyAccept() {
CompletableFuture.supplyAsync(this::findAccountNumber)
.thenApply(this::calculateBalance)
.thenApply(this::notifyBalance)
.thenAccept((i)->notifyByEmail()).join();
}

async variants of thenApply(),thenAccept() and thenRun()
Note In all the previus examples, All methods are executed on Same threads. But If we want them to be run on separate thread then we can use async variants of these methods.
Async Variants
1
2
3
4
5
6
7
8
9
10
11
12
13
  
@Test
public void completableFutureApplyAsync() {
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
CompletableFuture<Integer> completableFuture =
CompletableFuture
.supplyAsync(this::findAccountNumber,newFixedThreadPool)//will run on thread obtain from newFixedThreadPool
.thenApplyAsync(this::calculateBalance,newSingleThreadScheduledExecutor) //will run on thread obtain from newSingleThreadScheduledExecutor
.thenApplyAsync(this::notifyBalance);//will run on thread obtain from common pool
Integer balance = completableFuture.join();
assertEquals(Integer.valueOf(balance), Integer.valueOf(100));
}

CompletableFuture thenCompose and thenCombine

thenCompose
Let’s Say we want to first find Account Number and then calculate Balance for that account and after calculations we want to send notifications.
Now All these task are Dependent and methods are returning CompletableFuture , Then We need to use thenCompose Method.
This is similar to flatMap in case of Streams.

thenCompose
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
 
public CompletableFuture<Integer> findAccountNumber() {
sleep(1);
System.out.println(Thread.currentThread() + " findAccountNumber");
return CompletableFuture.completedFuture(10);
}
public CompletableFuture<Integer> calculateBalance(int accountNumber) {
System.out.println(Thread.currentThread() + " calculateBalance");
sleep(1);
return CompletableFuture.completedFuture(accountNumber * accountNumber);
}
public CompletableFuture<Integer> notifyBalance(Integer balance) {
System.out.println(Thread.currentThread() + "Sending Notification");
sleep(1);
return CompletableFuture.completedFuture(balance);

}
private void sleep(int seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void completableFutureThenCompose()
{
Integer join = findAccountNumber()
.thenComposeAsync(this::calculateBalance)
.thenCompose(this::notifyBalance)
.join();
assertEquals(new Integer(100), join);
}

thenCombine
As name suggest thenCombine is used to merge results of two independent CompletableFuture. Assume that for a person we get first name and last name by calling two different independent methods. To get the Full name we want ot merge results of both the methods then we will use thenCombine.

thenCombine
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 
public CompletableFuture<String> findFirstName() {
return CompletableFuture.supplyAsync(() -> {
sleep(1);
return "Niraj";
});
}
public CompletableFuture<String> findLastName() {
return CompletableFuture.supplyAsync(() -> {
sleep(1);
return "Sonawane";
});
}
@Test
public void completableFutureThenCombine() {
CompletableFuture<String> thenCombine =
findFirstName().thenCombine(findLastName(), (firstName, lastname) -> {
return firstName + lastname;});
String fullName = thenCombine.join();
assertEquals("NirajSonawane", fullName);
}

CompletableFuture allOf

In Many scenario we want to run run multiple task in parallel and want to do some processing after all of them are complete.

Assume we want to find firstName of five different users and combine the results.The CompletableFuture.allOf static method allows to wait for completion of all of the Futures.
The allOf method has limitation that it does not return the combined results of all Futures. We you have to manually combine the results from Futures.

allOf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public CompletableFuture<String> findSomeValue() {
return CompletableFuture.supplyAsync(() -> {
sleep(1);
return "Niraj";
});
}
@Test
public void completableFutureAllof() {
List<CompletableFuture<String>> list = new ArrayList<>();
IntStream.range(0, 5).forEach(num -> {
list.add(findSomeValue());
});

CompletableFuture<Void> allfuture = CompletableFuture.allOf(list.toArray(new CompletableFuture[list.size()]));//Created All of object
CompletableFuture<List<String>> allFutureList = allfuture.thenApply(val -> {
return list.stream().map(f -> f.join()).collect(Collectors.toList());
});

CompletableFuture<String> futureHavingAllValues = allFutureList.thenApply(fn -> {
System.out.println("I am here");
return fn.stream().collect(Collectors.joining());});
String concatenateString = futureHavingAllValues.join();
assertEquals("NirajNirajNirajNirajNiraj", concatenateString);
}

CompletableFuture Exception Handling

Handing Exceptions in Multithreaded code in Java was always pain. Luckily CompletableFuture has a nice way of handling exceptions.

Exception Handling
1
2
3
4
CompletableFuture<Integer> thenApply = CompletableFuture
.supplyAsync(this::findAccountNumber)
.thenApply(this::calculateBalance)
.thenApply(this::notifyBalance)

In Above Code if findAccountNumber method throws the Exception then callback chain calculateBalance and notifyBalance will not be called. Future will be resolved with the exception occurred.Similarly if calculateBalance throws the Exception then after the callback chain will break.

Handel Exceptions using exceptionally

Exceptionally callback will be called if preceding methods fails with an exception. exceptionally Returns a new CompletableFuture that is completed when this CompletableFuture completes, with the result of the given function of the exception triggering this CompletableFuture’s completion when it completes exceptionally; otherwise, if this CompletableFuture completes normally, then the returned CompletableFuture also completes normally with the same value.

exceptionally
1
2
3
4
5
6
7
8
9
10
11
12
13
14
   @Test
public void completableFutureExceptionally()
{
CompletableFuture<Integer> thenApply = CompletableFuture.supplyAsync(this::findAccountNumber)
.thenApply(this::calculateBalance)
.thenApply(this::notifyBalance)
.exceptionally(ex -> {
System.out.println("Got Some Exception "+ex.getMessage());
System.out.println("Returning some default value");
return 0;
});
Integer join = thenApply.join();
assertEquals(new Integer(0), join);
}

Handel Exceptions using Handel Method

Handel method is more flexible than exceptionally method.As we get both exception as well as Result.

Handel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void completableFutureHandel()
{
CompletableFuture<Integer> thenApply = CompletableFuture.supplyAsync(this::findAccountNumber)
.thenApply(this::calculateBalance)
.thenApply(this::notifyBalance)
.handle((ok, ex) -> {
System.out.println("Code That we want to run in finally ");
if (ok != null) {
System.out.println("No Exception !!");
} else {
System.out.println("Got Exception " + ex.getMessage());
return -1;
}
return ok;
});
}

Handel Exceptions using WhenComplete Method

WhenComplete
1
2
3
4
5
6
7
8
@Test
public void completableFutureWhenComplete()
{
CompletableFuture.supplyAsync(this::findAccountNumber)
.thenApply(this::calculateBalance)
.thenApply(this::notifyBalance)
.whenComplete((i,t)->System.out.println("finally action"));
}

TimeOut java 9 Improvement

While Working on Asynchronous Code, We Need to handel timeouts. We Can not wait forever to finish the task. Unfortunately we do not have anything in java 8 for timeouts.
Java 9 has added orTimeout and completeOnTimeout methods to handel this.

If the task does not complete in given time, a TimeoutException will be thrown.

orTimeout
1
2
3
4
5
6
@Test
public void completableFutureWhenComplete()
{
CompletableFuture.supplyAsync(this::findAccountNumber)
.orTimeout(1, TimeUnit.MINUTES);
}

The code for this post is available for download here.

Share Comments