Migrating Kafka Producers/Consumers to Use CompletableFuture in Spring-Kafka
When upgrading from older versions of Spring to Spring-Kafka 3.1 or later, developers must adapt their code to accommodate changes in asynchronous handling mechanisms. One significant shift is the migration from ListenableFuture to CompletableFuture. In this post, we’ll explore how to replace ListenableFuture with CompletableFuture in a Kafka producer and consumer scenario, ensuring backward compatibility and leveraging the new API’s capabilities.
The Original Scenario with ListenableFuture
Previously, ListenableFuture was used to handle asynchronous operations in Kafka messaging, allowing callbacks to be added directly to handle success and failure scenarios:
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(message);
future.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(SendResult<String, Object> result) {
log.info("Sent message=[{}] with offset=[{}]", payload, result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
log.info("Unable to send message=[{}] due to : {}", message, ex.getMessage());
}
});
return future.get().getRecordMetadata().partition() + "-" + future.get().getRecordMetadata().offset();
Transitioning to CompletableFuture
CompletableFuture does not support the addCallback method directly, but provides a more versatile API for chaining asynchronous operations and handling results. Here’s how you can adapt the existing Kafka operations:
Approach 1: Using thenApply and exceptionally
Transform the success path using thenApply and handle exceptions with exceptionally:
CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(message);
return future.thenApply(result -> {
log.info("Sent message=[{}] with offset=[{}]", payload, result.getRecordMetadata().offset());
return result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset();
}).exceptionally(ex -> {
log.info("Unable to send message=[{}] due to : {}", message, ex.getMessage());
return "Error-Result";
}).join(); // Synchronously wait and retrieve the result
Approach 2: Using handleAsync
Combine both success and failure handling in one method using handleAsync:
CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(message);
return future.handleAsync((result, ex) -> {
if (ex != null) {
log.info("Unable to send message=[{}] due to : {}", message, ex.getMessage());
return "Error-Result";
} else {
log.info("Sent message=[{}] with offset=[{}]", payload, result.getRecordMetadata().offset());
return result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset();
}
}).join(); // Synchronously wait and retrieve the result
Approach 3: Using thenAcceptAsync and exceptionallyAsync
For developers using Java 12 onwards, separate success and error handling clearly using thenAcceptAsync and exceptionallyAsync:
CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(message);
future.thenAcceptAsync(result -> {
log.info("Sent message=[{}] with offset=[{}]", payload, result.getRecordMetadata().offset());
}).exceptionallyAsync(ex -> {
log.info("Unable to send message=[{}] due to : {}", message, ex.getMessage());
return null; // Continue the chain despite the error
});
return future.join(); // Retrieve the combined future resultMigrating from ListenableFuture to CompletableFuture in Spring-Kafka involves understanding the new API’s method of chaining asynchronous operations and handling results. Each approach offers different advantages, from clear separation of logic to compact error handling. Choose the method that best fits your project’s needs and Java version constraints, ensuring a smooth transition in your Kafka integration within Spring applications.


0 Comments:
Post a Comment
Note: only a member of this blog may post a comment.
<< Home