Message Queuing with Apache Kafka and Spring Cloud

In this article I will show how to configure Spring Cloud to publish and read messages from Apache Kafka for the Message Queuing pattern. I will show how to configure Spring Cloud for both the raw Apache Kafka dependency and for the Stream dependency.

Content:

  • Message Queuing
  • Publisher
  • Consumer
  • Spring Kafka Streams

Watch this video for more details.

All the code is available in this repository.

Message Queuing

I will use Apache Kafka to communicate between two of my microservices: service-user and service-mails. I want to send an email every time a user account is created. But what is Apache Kafka meant for? Apache Kafka, just like RabbitMQ, AWS SQS or GCP Pub Sub, are message queuing services, message brokers. They handle queues of messages.

Message Queuing with Apache Kafka
Message Queuing with Apache Kafka

Those queues have two ends: one for the publishers and one for the subscribers. Instead of communicating via HTTP requests, I can have a publisher adding a message to the queue, and let the subscribers read the message from the same queue. The difference between the HTTP requests is that this is not real-time communication. I may have my publishers adding messages every minute but my subscribers reading the messages only once per day. This decorelates the problems of unavailability in both parts. I may have problems with the publisher, but I can still consume the existing messages. Or I may have problems with my consumers, but I can still add more messages to the queue. They will be handled later, when the service will become available again.

Publisher

I will start with the publisher: service-user, which will publish messages to Apache Kafka. Let’s start with the Maven dependency.

      <dependency>
          <groupId>org.springframework.kafka</groupId>
          <artifactId>spring-kafka</artifactId>
          <version>2.8.2</version>
      </dependency>

Now add the configuration to connect my microservice to Apache Kafka.

@Configuration
public class KafkaConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

And add Kafka address property to my application yaml file.

kafka.bootstrapAddress: localhost:9092

I will now create a method to publish a message to the queue.

@RequiredArgsConstructor
@Service
public class MailService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendUserWelcomeMail(UserDto userDto) {
        kafkaTemplate.send("mails", "user.creation: " + userDto.getLogin());
    }
}

I’ve specified the channel where I want to publish my message (mails) and which message to publish (user.creation). That’s all for the publisher. Let’s go now to the consumer.

Consumer

For the Maven dependency, I will use the same.

      <dependency>
          <groupId>org.springframework.kafka</groupId>
          <artifactId>spring-kafka</artifactId>
          <version>2.8.2</version>
      </dependency>

As before, I need to configure the access to Apache Kafka.

@EnableKafka
@Configuration
public class KafkaConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer1");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

And as before, add the bootstrap address in the application yaml.

kafka.bootstrapAddress: localhost:9092

And finally, a listener to listen to the available messages in the queue.

@Slf4j
@Service
public class KafkaListenerService {

    @KafkaListener(topics = "mails")
    public void listenMessages(String message) {
        var msgParts = message.split(":");
        log.info("Will perform a {} with content {}", msgParts[0], msgParts[1]);
    }
}

With the annotation @KafkaListener, I have the channel I want to subscribe to. I have the message directly as an input parameter of my method. And the acknowledgement is done automatically if I exit my method without any exception.

Spring Kafka Streams

As I said, there is another library to connect my Spring Boot application to Apache Kafka. A library which lets me consume the messages with streams. The message queueing is a pattern which is known to be present mainly in microservices architecture. I may want my microservices to communicate with a queue of messages instead of HTTP requests. A queue of messages behave similar to a stream of data, like in Java 8. I don’t know how many elements there are, I just apply some functional operations to my incoming data. This way, it’s interesting to use streams: when having unbounded messages.

Nevertheless, when I work with microservices, I tend to have multiple instances of the same service running at the same time, multiple replicas. How do I know which will consume my message? Here come the groups of consumers. I can configure a group of consumers which will consume a single queue. Kafka will deliver every message only once to the group. No matter how many services there are inside. Each service will be differentiated just by an index. On the other hand, if I want my messages to be consumed by multiple services, I can have several groups. This way, a single message will be delivered to each group. This case is useful if i want to apply multiple actions to the same message.

As before, I will start with the Maven dependency.

      <dependency>
          <groupId>org.springframework.cloud</groupId>
          <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
      </dependency>

I don’t need anymore the KafkaConfig bean, where I’ve configured the connection. Everything will be done in the configuration file, in the application yaml.

spring:
  cloud:
    stream:
      binder:
        brokers: localhost:9092

That’s all. My MailService with the KafkaTemplates remains as it was. Spring will infer the configuration of the serialization upon the variables used. Let’s go now to the consumer microservice. I also need to update my Maven dependency.

      <dependency>
          <groupId>org.springframework.cloud</groupId>
          <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
      </dependency>

And as before, I don’t need anymore the KafkaConfig bean, where I’ve configured the connection. Everything will be done in the application yaml file.

spring:
  cloud:
    stream:
      bindings:
        listenMessages-in-0.destination: mails
      binder:
        brokers: localhost:9092

With binder.borkers, I have the connection configuration. And with bindings, I have the listener configuration, the group id of the consumer. All I need to know is to create a bean named listenMessages. The in remains for the input channel. And the zero remains for the index of the microservice. And destination means the channel where it will be connected. Let’s create this bean. As I said, now, I will use a stream, a functional programming stream, introduced in Java 8, a consumer.

    @Bean
    public Consumer<KStream<Object, String>> listenMessages() {
        return input -> {
            input.foreach((key, value) -> {
                var msgParts = value.split(":");
                log.info("Will perform a {} with content {}", msgParts[0], msgParts[1]);
            });
        };

I will consume a KStream, which is a Kafka entity composed of keys and values. I won’t enter into the details of Kafka in this article. And that’s all. Less configuration and consuming the queue with a stream.

Conclusion

  • I can use the Spring Kafka dependency:
    • I’ve created a bean to connect to Apache Kafka.
    • I’ve used the KafkaTemplate to publish messages to a queue.
    • In the consumer, I’ve used a similar bean to connect to Kafka.
    • And the annotation @KafkaListener to listen the messages from the queue.
  • And with the stream dependency:
    • I’ve added the binder Kafka streams dependency.
    • Configured my connection in the application yaml.
    • The publisher remains as before, using KafkaTemplates.
    • And the consumer uses now a consumer of KStreams objects.

With the stream dependency, everything is in the configuration. I just need to create consumers and reference them in the application yaml. I think it’s easier.

References

Repository

My New ebook, How to Master Git With 20 Commands, is available now.

One response to “Message Queuing with Apache Kafka and Spring Cloud”

Leave a comment

A WordPress.com Website.