Vert.x Completable Future
This project provides a way to use the Completable Future API with Vert.x.
Why can't you use Completable Future with Vert.x
Using Completable Future with Vert.x may lead to some thread issues.
When you execute an asynchronous operation with Vert.x, the Handler<AsyncResult<T>> is called on on the same thread as the method having enqueued the asynchronous operation. Thanks to this single-threaded aspect, Vert.x removes the need of synchronization and also it improves performances.
Completable Future uses a fork-join thread pool. So callbacks (dependent stages) are called in a thread of this pool, so it's not the caller thread.
This project provides the Completable Future API but enforces the Vert.x threading model:
- When using
xAsyncmethods (withoutexecutor), the callbacks are called on the Vert.x context - When using non-async, it uses the caller thread. If it's a Vert.x thread the same thread is used. If not called from a Vert.x thread, it still uses the caller thread
- When using
xAsyncmethods with anExecutorparameter, this executor is used to execute the callback (does not enforce the Vert.x thread system)
Examples
All methods taking a Vertx instance as parameter exists with a Context parameter.
Creating a VertxCompletableFuture
// From a Vert.x instance
CompletableFuture<T> future = new VertxCompletableFuture(vertx);
// From a specific context
Context context = ...
CompletableFuture<T> future = new VertxCompletableFuture(context);
// From a completable future
CompletableFuture cf = ...
CompletableFuture<T> future = VertxCompletableFuture.from(context, cf);
// From a Vert.x future
Future<T> fut = ...
CompletableFuture<T> future = VertxCompletableFuture.from(context, fut);
You can also pass a Supplier or a Runnable:
// Run in context
CompletableFuture<String> future = VertxCompletableFuture.supplyAsync(vertx, () -> return "foo";);
// Run in Vert.x worker thread
CompletableFuture<String> future = VertxCompletableFuture.supplyBlockingAsync(vertx, () -> return "foo");
CompletableFuture<Void> future = VertxCompletableFuture.runAsync(vertx, () -> System.out.println(foo"));
CompletableFuture<Void> future = VertxCompletableFuture.runBlockingAsync(vertx, () -> System.out.println(foo"));
*BlockingAsync method uses a Vert.x worker thread and do not block the Vert.x Event Loop.
Stages
Once you have the VertxCompletableFuture instance, you can use the CompletableFuture API:
VertxCompletableFuture<Integer> future = new VertxCompletableFuture<>(vertx);
future
.thenApply(this::square)
.thenAccept(System.out::print)
.thenRun(System.out::println);
// Somewhere in your code, later...
future.complete(42);
You can compose, combine, join completable futures with:
- combine
- runAfterEither
- acceptEither
- runAfterBoth
- ...
All and Any operations
The VertxCompletableFuture class offers two composition operators:
allOf- execute all the passedCompletableFuture(not necessarilyVertxCompletableFuture) and calls dependant stages when all of the futures have been completed or one has failedanyOf- execute all the passedCompletableFuture(not necessarilyVertxCompletableFuture) and calls dependant stages when one of them has been completed
Unlike CompletableFuture, the dependent stages are called in the Vert.x context.
HttpClientOptions options = new HttpClientOptions().setDefaultPort(8080).setDefaultHost("localhost");
HttpClient client1 = vertx.createHttpClient(options);
HttpClient client2 = vertx.createHttpClient(options);
VertxCompletableFuture<Integer> requestA = new VertxCompletableFuture<>(vertx);
client1.get("/A").handler(resp -> {
resp.exceptionHandler(requestA::completeExceptionally)
.bodyHandler(buffer -> {
requestA.complete(Integer.parseInt(buffer.toString()));
});
}).exceptionHandler(requestA::completeExceptionally).end();
VertxCompletableFuture<Integer> requestB = new VertxCompletableFuture<>(vertx);
client2.get("/B").handler(resp -> {
resp.exceptionHandler(requestB::completeExceptionally)
.bodyHandler(buffer -> {
requestB.complete(Integer.parseInt(buffer.toString()));
});
}).exceptionHandler(requestB::completeExceptionally).end();
VertxCompletableFuture.allOf(requestA, requestB).thenApply(v -> requestA.join() + requestB.join())
.thenAccept(i -> {
tc.assertEquals(65, i);
async.complete();
});
}
From / To Vert.x Futures
You can transform a VertxCompletableFuture to a Vert.x Future with the toFuture method.
You can also creates a new VertxCompletableFuture from a Vert.x Future using:
Future<Integer> vertxFuture = ...
VertxCompletableFuture<Integer> vcf = VertxCompletableFuture.from(vertx, vertxFuture);
vcf.thenAccept(i -> {...}).whenComplete((res, err) -> {...})