Sunday, 21 April 2024

Migrating Kafka Producers/Consumers to Use CompletableFuture in Spring-Kafka

 

When upgrading from older versions of Spring to Spring-Kafka 3.1 or later, developers must adapt their code to accommodate changes in asynchronous handling mechanisms. One significant shift is the migration from ListenableFuture to CompletableFuture. In this post, we’ll explore how to replace ListenableFuture with CompletableFuture in a Kafka producer and consumer scenario, ensuring backward compatibility and leveraging the new API’s capabilities.

The Original Scenario with ListenableFuture

Previously, ListenableFuture was used to handle asynchronous operations in Kafka messaging, allowing callbacks to be added directly to handle success and failure scenarios:

ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(message);
future.addCallback(new ListenableFutureCallback<>() {
    @Override
    public void onSuccess(SendResult<String, Object> result) {
        log.info("Sent message=[{}] with offset=[{}]", payload, result.getRecordMetadata().offset());
    }

    @Override
    public void onFailure(Throwable ex) {
        log.info("Unable to send message=[{}] due to : {}", message, ex.getMessage());
    }
});
return future.get().getRecordMetadata().partition() + "-" + future.get().getRecordMetadata().offset();

Transitioning to CompletableFuture

CompletableFuture does not support the addCallback method directly, but provides a more versatile API for chaining asynchronous operations and handling results. Here’s how you can adapt the existing Kafka operations:

Approach 1: Using thenApply and exceptionally

Transform the success path using thenApply and handle exceptions with exceptionally:

CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(message);

return future.thenApply(result -> {
    log.info("Sent message=[{}] with offset=[{}]", payload, result.getRecordMetadata().offset());
    return result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset();
}).exceptionally(ex -> {
    log.info("Unable to send message=[{}] due to : {}", message, ex.getMessage());
    return "Error-Result";
}).join(); // Synchronously wait and retrieve the result

Approach 2: Using handleAsync

Combine both success and failure handling in one method using handleAsync:

CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(message);

return future.handleAsync((result, ex) -> {
    if (ex != null) {
        log.info("Unable to send message=[{}] due to : {}", message, ex.getMessage());
        return "Error-Result";
    } else {
        log.info("Sent message=[{}] with offset=[{}]", payload, result.getRecordMetadata().offset());
        return result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset();
    }
}).join(); // Synchronously wait and retrieve the result

Approach 3: Using thenAcceptAsync and exceptionallyAsync

For developers using Java 12 onwards, separate success and error handling clearly using thenAcceptAsync and exceptionallyAsync:

CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(message);

future.thenAcceptAsync(result -> {
    log.info("Sent message=[{}] with offset=[{}]", payload, result.getRecordMetadata().offset());
}).exceptionallyAsync(ex -> {
    log.info("Unable to send message=[{}] due to : {}", message, ex.getMessage());
    return null; // Continue the chain despite the error
});
return future.join(); // Retrieve the combined future result

Migrating from ListenableFuture to CompletableFuture in Spring-Kafka involves understanding the new API’s method of chaining asynchronous operations and handling results. Each approach offers different advantages, from clear separation of logic to compact error handling. Choose the method that best fits your project’s needs and Java version constraints, ensuring a smooth transition in your Kafka integration within Spring applications.

Labels:

0 Comments:

Post a Comment

Note: only a member of this blog may post a comment.

<< Home