Building Kafka Producer-Consumer Using Spring Boot and Docker

User Icon By Azam Akram,   Calendar Icon October 21, 2024
spring-boot-kafka-producer-consumer-with-docker

In this blog, we’ll walk through the process of building Kafka Producer and Consumer microservices using Spring Boot, integrated with Docker. Microservices architecture is a popular approach for building scalable and maintainable applications. By breaking down applications into smaller, independent services, developers can enhance flexibility and streamline deployment cycles. We will implement Kafka producer and consumer applications in Spring Boot and demonstrate how to use Kafka and Zookeeper in Docker containers with Docker Compose to create a seamless microservices environment.

Spring Boot is a powerful framework for building Java-based applications with minimal configuration. It simplifies the development process by offering pre-configured components and a wide range of integration capabilities.

Kafka is a distributed streaming platform used to build real-time data pipelines and streaming applications. It allows producers to send messages to topics, which are then consumed by various consumers, making it ideal for event-driven architectures.

Zookeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and managing group services in distributed systems. In the Kafka ecosystem, Zookeeper helps manage and coordinate Kafka brokers.

Docker is a platform that allows developers to automate the deployment of applications inside lightweight, portable containers. With Docker Compose, you can manage multiple services, like Kafka and Zookeeper, in isolated containers, making it easy to build and maintain microservices architectures.

In this guide, we will:

  1. Build a Spring Boot Kafka producer.
  2. Build a Spring Boot Kafka consumer.
  3. Set up Kafka and Zookeeper using Docker.
  4. Run the application end-to-end.

Prerequisites

  • If you're looking to learn how to set up a Spring Boot web project in VS Code, this blog is a good guide.
  • Java 23 (JDK 23)
  • Gradle (version 8.10.2)
  • Docker
  • Docker Compose
  • Basic knowledge of Spring Boot and Kafka

Project Overview

We will create two Spring Boot applications:

  1. Kafka Producer: Sends messages to a Kafka topic.
  2. Kafka Consumer: Listens to the topic and consumes messages.
springboot-kafka-docker/
├── Kafka-producer/
├── Kafka-consumer/
└── docker-compose.yml

Kafka and Zookeeper will be run inside Docker containers using docker-compose.

Download full code from this github repository.

Event Model

Let’s start by defining our message model, which will be used by the producer and consumer applications.

package springboot.kafka.docker.kafka_producer.model;

import lombok.*;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;

@Getter
@Setter
@ToString
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Message {
    @NotNull
    private String uuid;

    @NotBlank
    private String from;

    @NotBlank
    private String to;
}

Kafka Producer

We'll start with the Kafka Producer, which send messages to a Kafka topic.

Configuration

Below is Spring Boot Kafka Producer configuration, which defines Kafka settings such as the topic name and Kafka broker address.

application:
  topic:
    message-topic: demo-kafka-topic

spring:
  profiles:
    active: dev
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

KafkaProducer.java

The following Spring Boot component is responsible for creating and sending messages to the Kafka topic.

package springboot.kafka.docker.kafka_producer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import springboot.kafka.docker.kafka_producer.model.Message;

import java.util.UUID;

@Slf4j
@Component
public class KafkaProducer {
    @Autowired
    private ObjectMapper objectMapper;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${application.topic.message-topic}")
    private String topic;

    public void sendMessage() {
        try {
            Message message = createMessage();
            String messageJson = objectMapper.writeValueAsString(message);
            kafkaTemplate.send(topic, messageJson);
            log.info("Message sent to topic {}: {}", topic, createMessage());
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }        
    }

    private Message createMessage() {
        return Message.builder()
                .uuid(UUID.randomUUID().toString())
                .from("Sender")
                .to("Receiver")
                .build();
    }
}
  • kafkaTemplate: A helper class used to send messages to a topic.
  • @Value("${application.topic.message-topic}"): Retrieves the Kafka topic name from application.yaml.
  • sendMessage(): Sends a sample JSON message to the Kafka topic.

Producer Application

The main application class will start the Spring Boot application and send a message when the application runs.

package springboot.kafka.docker.kafka_producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaProducerApplication implements CommandLineRunner {

	public static void main(String[] args) {
		SpringApplication.run(KafkaProducerApplication.class, args);
	}

	@Autowired
	private KafkaProducer producer;

	@Override
	public void run(String... args) throws Exception {
		producer.sendMessage();
	}

}

Gradle Build

We build kafka producer application using Gradle build tool. Here is build.gradle file which Gradle uses to build the application,

plugins {
	id 'java'
	id 'org.springframework.boot' version '3.3.4'
	id 'io.spring.dependency-management' version '1.1.6'
}

group = 'springboot.kafka.docker'
version = '0.0.1-SNAPSHOT'

java {
	toolchain {
		languageVersion = JavaLanguageVersion.of(23)
	}
}

configurations {
	compileOnly {
		extendsFrom annotationProcessor
	}
}

repositories {
	mavenCentral()
}

dependencies {
	implementation group: 'org.springframework.boot', name: 'spring-boot-starter-web', version: '3.3.4'
	implementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.2.4'
	implementation group: 'javax.validation', name: 'validation-api', version: '2.0.1.Final'
	compileOnly group: 'org.projectlombok', name: 'lombok', version: '1.18.34'

	annotationProcessor 'org.projectlombok:lombok'
	testImplementation 'org.springframework.boot:spring-boot-starter-test'
	testImplementation 'org.springframework.kafka:spring-kafka-test'
	testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}

tasks.named('test') {
	useJUnitPlatform()
}

Kafka Consumer

Configuration

application.yaml:

The application.yaml file defines configuration settings for the Spring Boot application, such as the Kafka server address, topic names, and serialization classes.

server:
  port: 5555
application:
  topic:
    message-topic: demo-kafka-topic
spring:
  profiles:
    active: dev
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      group-id: demo-kafka-consumer
      enable-auto-commit: true
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  • bootstrap-servers: This points to the Kafka server. The KAFKA_BOOTSTRAP_SERVERS environment variable is used to make it configurable.
  • consumer.group-id: Defines the group that consumes messages from the Kafka topic.
  • key-serializer and value-serializer: Specify how to serialize the keys and values for Kafka messages.

KafkaConsumer.java

This class listens to Kafka messages on a specific topic. When a message is received, it deserializes the JSON data into a Message object.

package springboot.kafka.docker.kafka_consumer;

import springboot.kafka.docker.kafka_consumer.model.Message;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Slf4j
@Component
public class KafkaConsumer {
    @KafkaListener(topics = "${application.topic.message-topic}", groupId = "${spring.kafka.consumer.group-id}")
    public void onMessage(final ConsumerRecord<String, String> consumerRecord) throws IOException {
        Message message = new ObjectMapper().readValue(consumerRecord.value(), Message.class);
        log.info("Received Messasge: : {}", message.toString());
    }
}
  • @KafkaListener: Annotates a method to listen for Kafka messages on the specified topic and consumer group.
  • ObjectMapper: Converts the received JSON string into a Java object.

Consumer Application

package springboot.kafka.docker.kafka_consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaConsumerApplication {
	public static void main(String[] args) {
		SpringApplication.run(KafkaConsumerApplication.class, args);
	}
}

Gradle Build

plugins {
	id 'java'
	id 'org.springframework.boot' version '3.3.4'
	id 'io.spring.dependency-management' version '1.1.6'
}

group = 'springboot.kafka.docker'
version = '0.0.1-SNAPSHOT'

java {
	toolchain {
		languageVersion = JavaLanguageVersion.of(23)
	}
}

configurations {
	compileOnly {
		extendsFrom annotationProcessor
	}
}

repositories {
	mavenCentral()
}

dependencies {
	implementation group: 'org.springframework.boot', name: 'spring-boot-starter-web', version: '3.3.4'
	implementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.2.4'
	implementation group: 'javax.validation', name: 'validation-api', version: '2.0.1.Final'
	compileOnly group: 'org.projectlombok', name: 'lombok', version: '1.18.34'

	annotationProcessor 'org.projectlombok:lombok'
	testImplementation 'org.springframework.boot:spring-boot-starter-test'
	testImplementation 'org.springframework.kafka:spring-kafka-test'
	testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}

tasks.named('test') {
	useJUnitPlatform()
}

Building and Running the Application

Docker Compose for Kafka Setup

The docker-compose.yml file orchestrates running Kafka and Zookeeper containers for the application. Zookeeper plays a crucial role in Kafka’s architecture by acting as a centralized service for maintaining configuration, and ensuring proper synchronization between them. It helps Kafka brokers register themselves on startup and keeps track of the status of each broker in the cluster.

services:
  zookeeper:
    image: wurstmeister/zookeeper:latest
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka:latest
    ports:
      - "9092:9092"
    expose:
      - "9093"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_BROKER_ID: 1
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  • zookeeper: Manages Kafka brokers. It runs on port 2181.
  • kafka: The Kafka broker service. The KAFKA_ADVERTISED_LISTENERS points to the Kafka service running within the Docker container.

To start the Kafka and Zookeeper services:

docker-compose up -d

This will spin up the Zookeeper and Kafka containers.Verify if both images are up and running,

docker ps

Build & Run Producer and Consumer

Move to kafka_producer directory, and build and run

./gradlew clean build
./gradlew bootRun

Repeat above steps for kafka consumer.

Kafka Producer sending message over kafka topic
Kafka Consumer receiving message from Kafka topic

Conclusion

This blog walked you through the process of building a Spring Boot Kafka Producer-Consumer application using Docker. We explored how to set up Kafka and Zookeeper in Docker containers using Docker Compose, simplifying the process of managing these distributed services. By integrating Spring Boot with Kafka, we demonstrated how to efficiently handle real-time data streams in a microservices architecture. This setup not only makes your application scalable and flexible but also easier to maintain and deploy.