Migrating Kafka Producers/Consumers to Use CompletableFuture in Spring-Kafka
When upgrading from older versions of Spring to Spring-Kafka 3.1 or later, developers must adapt their code to accommodate changes in asynchronous handling mechanisms. One significant shift is the migration from ListenableFuture
to CompletableFuture
. In this post, we’ll explore how to replace ListenableFuture
with CompletableFuture
in a Kafka producer and consumer scenario, ensuring backward compatibility and leveraging the new API’s capabilities.
The Original Scenario with ListenableFuture
Previously, ListenableFuture
was used to handle asynchronous operations in Kafka messaging, allowing callbacks to be added directly to handle success and failure scenarios:
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(message);
future.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(SendResult<String, Object> result) {
log.info("Sent message=[{}] with offset=[{}]", payload, result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
log.info("Unable to send message=[{}] due to : {}", message, ex.getMessage());
}
});
return future.get().getRecordMetadata().partition() + "-" + future.get().getRecordMetadata().offset();
Transitioning to CompletableFuture
CompletableFuture
does not support the addCallback
method directly, but provides a more versatile API for chaining asynchronous operations and handling results. Here’s how you can adapt the existing Kafka operations:
Approach 1: Using thenApply and exceptionally
Transform the success path using thenApply
and handle exceptions with exceptionally
:
CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(message);
return future.thenApply(result -> {
log.info("Sent message=[{}] with offset=[{}]", payload, result.getRecordMetadata().offset());
return result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset();
}).exceptionally(ex -> {
log.info("Unable to send message=[{}] due to : {}", message, ex.getMessage());
return "Error-Result";
}).join(); // Synchronously wait and retrieve the result
Approach 2: Using handleAsync
Combine both success and failure handling in one method using handleAsync
:
CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(message);
return future.handleAsync((result, ex) -> {
if (ex != null) {
log.info("Unable to send message=[{}] due to : {}", message, ex.getMessage());
return "Error-Result";
} else {
log.info("Sent message=[{}] with offset=[{}]", payload, result.getRecordMetadata().offset());
return result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset();
}
}).join(); // Synchronously wait and retrieve the result
Approach 3: Using thenAcceptAsync and exceptionallyAsync
For developers using Java 12 onwards, separate success and error handling clearly using thenAcceptAsync
and exceptionallyAsync
:
CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(message);
future.thenAcceptAsync(result -> {
log.info("Sent message=[{}] with offset=[{}]", payload, result.getRecordMetadata().offset());
}).exceptionallyAsync(ex -> {
log.info("Unable to send message=[{}] due to : {}", message, ex.getMessage());
return null; // Continue the chain despite the error
});
return future.join(); // Retrieve the combined future result
Migrating from ListenableFuture
to CompletableFuture
in Spring-Kafka involves understanding the new API’s method of chaining asynchronous operations and handling results. Each approach offers different advantages, from clear separation of logic to compact error handling. Choose the method that best fits your project’s needs and Java version constraints, ensuring a smooth transition in your Kafka integration within Spring applications.
0 Comments:
Post a Comment
Note: only a member of this blog may post a comment.
<< Home