In this blog, we’ll walk through the process of building Kafka Producer and Consumer microservices using Spring Boot, integrated with Docker. Microservices architecture is a popular approach for building scalable and maintainable applications. By breaking down applications into smaller, independent services, developers can enhance flexibility and streamline deployment cycles. We will implement Kafka producer and consumer applications in Spring Boot and demonstrate how to use Kafka and Zookeeper in Docker containers with Docker Compose to create a seamless microservices environment.
Spring Boot is a powerful framework for building Java-based applications with minimal configuration. It simplifies the development process by offering pre-configured components and a wide range of integration capabilities.
Kafka is a distributed streaming platform used to build real-time data pipelines and streaming applications. It allows producers to send messages to topics, which are then consumed by various consumers, making it ideal for event-driven architectures.
Zookeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and managing group services in distributed systems. In the Kafka ecosystem, Zookeeper helps manage and coordinate Kafka brokers.
Docker is a platform that allows developers to automate the deployment of applications inside lightweight, portable containers. With Docker Compose, you can manage multiple services, like Kafka and Zookeeper, in isolated containers, making it easy to build and maintain microservices architectures.
In this guide, we will:
- Build a Spring Boot Kafka producer.
- Build a Spring Boot Kafka consumer.
- Set up Kafka and Zookeeper using Docker.
- Run the application end-to-end.
Prerequisites
- If you're looking to learn how to set up a Spring Boot web project in VS Code, this blog is a good guide.
- Java 23 (JDK 23)
- Gradle (version 8.10.2)
- Docker
- Docker Compose
- Basic knowledge of Spring Boot and Kafka
Project Overview
We will create two Spring Boot applications:
- Kafka Producer: Sends messages to a Kafka topic.
- Kafka Consumer: Listens to the topic and consumes messages.
springboot-kafka-docker
|
|_ Kafka-producer
|_ Kafka-consumer
|_ docker-compose
Kafka and Zookeeper will be run inside Docker containers using docker-compose.
Download full code from this github repository.
Event Model
Let’s start by defining our message model, which will be used by the producer and consumer applications.
package springboot.kafka.docker.kafka_producer.model;
import lombok.*;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
@Getter
@Setter
@ToString
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Message {
@NotNull
private String uuid;
@NotBlank
private String from;
@NotBlank
private String to;
}
Kafka Producer
We'll start with the Kafka Producer, which send messages to a Kafka topic.
Configuration
Below is Spring Boot Kafka Producer configuration, which defines Kafka settings such as the topic name and Kafka broker address.
application:
topic:
message-topic: demo-kafka-topic
spring:
profiles:
active: dev
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
KafkaProducer.java
The following Spring Boot component is responsible for creating and sending messages to the Kafka topic.
package springboot.kafka.docker.kafka_producer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import springboot.kafka.docker.kafka_producer.model.Message;
import java.util.UUID;
@Slf4j
@Component
public class KafkaProducer {
@Autowired
private ObjectMapper objectMapper;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${application.topic.message-topic}")
private String topic;
public void sendMessage() {
try {
Message message = createMessage();
String messageJson = objectMapper.writeValueAsString(message);
kafkaTemplate.send(topic, messageJson);
log.info("Message sent to topic {}: {}", topic, createMessage());
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
private Message createMessage() {
return Message.builder()
.uuid(UUID.randomUUID().toString())
.from("Sender")
.to("Receiver")
.build();
}
}
kafkaTemplate
: A helper class used to send messages to a topic.@Value("${application.topic.message-topic}")
: Retrieves the Kafka topic name fromapplication.yaml
.sendMessage()
: Sends a sample JSON message to the Kafka topic.
Producer Application
The main application class will start the Spring Boot application and send a message when the application runs.
package springboot.kafka.docker.kafka_producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaProducerApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}
@Autowired
private KafkaProducer producer;
@Override
public void run(String... args) throws Exception {
producer.sendMessage();
}
}
Gradle Build
We build kafka producer application using Gradle build tool. Here is build.gradle file which Gradle uses to build the application,
plugins {
id 'java'
id 'org.springframework.boot' version '3.3.4'
id 'io.spring.dependency-management' version '1.1.6'
}
group = 'springboot.kafka.docker'
version = '0.0.1-SNAPSHOT'
java {
toolchain {
languageVersion = JavaLanguageVersion.of(23)
}
}
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
dependencies {
implementation group: 'org.springframework.boot', name: 'spring-boot-starter-web', version: '3.3.4'
implementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.2.4'
implementation group: 'javax.validation', name: 'validation-api', version: '2.0.1.Final'
compileOnly group: 'org.projectlombok', name: 'lombok', version: '1.18.34'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
tasks.named('test') {
useJUnitPlatform()
}
Kafka Consumer
Configuration
application.yaml:
The application.yaml
file defines configuration settings for the Spring Boot application, such as the Kafka server address, topic names, and serialization classes.
server:
port: 5555
application:
topic:
message-topic: demo-kafka-topic
spring:
profiles:
active: dev
kafka:
bootstrap-servers: 127.0.0.1:9092
consumer:
group-id: demo-kafka-consumer
enable-auto-commit: true
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
bootstrap-servers
: This points to the Kafka server. TheKAFKA_BOOTSTRAP_SERVERS
environment variable is used to make it configurable.consumer.group-id
: Defines the group that consumes messages from the Kafka topic.key-serializer
andvalue-serializer
: Specify how to serialize the keys and values for Kafka messages.
KafkaConsumer.java
This class listens to Kafka messages on a specific topic. When a message is received, it deserializes the JSON data into a Message
object.
package springboot.kafka.docker.kafka_consumer;
import springboot.kafka.docker.kafka_consumer.model.Message;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(topics = "${application.topic.message-topic}", groupId = "${spring.kafka.consumer.group-id}")
public void onMessage(final ConsumerRecord<String, String> consumerRecord) throws IOException {
Message message = new ObjectMapper().readValue(consumerRecord.value(), Message.class);
log.info("Received Messasge: : {}", message.toString());
}
}
@KafkaListener
: Annotates a method to listen for Kafka messages on the specified topic and consumer group.ObjectMapper
: Converts the received JSON string into a Java object.
Consumer Application
package springboot.kafka.docker.kafka_consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaConsumerApplication.class, args);
}
}
Gradle Build
plugins {
id 'java'
id 'org.springframework.boot' version '3.3.4'
id 'io.spring.dependency-management' version '1.1.6'
}
group = 'springboot.kafka.docker'
version = '0.0.1-SNAPSHOT'
java {
toolchain {
languageVersion = JavaLanguageVersion.of(23)
}
}
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
dependencies {
implementation group: 'org.springframework.boot', name: 'spring-boot-starter-web', version: '3.3.4'
implementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.2.4'
implementation group: 'javax.validation', name: 'validation-api', version: '2.0.1.Final'
compileOnly group: 'org.projectlombok', name: 'lombok', version: '1.18.34'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
tasks.named('test') {
useJUnitPlatform()
}
Building and Running the Application
Docker Compose for Kafka Setup
The docker-compose.yml
file orchestrates running Kafka and Zookeeper containers for the application. Zookeeper plays a crucial role in Kafka’s architecture by acting as a centralized service for maintaining configuration, and ensuring proper synchronization between them. It helps Kafka brokers register themselves on startup and keeps track of the status of each broker in the cluster.
services:
zookeeper:
image: wurstmeister/zookeeper:latest
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:latest
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_BROKER_ID: 1
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- zookeeper: Manages Kafka brokers. It runs on port 2181.
- kafka: The Kafka broker service. The
KAFKA_ADVERTISED_LISTENERS
points to the Kafka service running within the Docker container.
To start the Kafka and Zookeeper services:
docker-compose up -d
This will spin up the Zookeeper and Kafka containers.Verify if both images are up and running,
docker ps
Build & Run Producer and Consumer
Move to kafka_producer
directory, and build and run
./gradlew clean build
./gradlew bootRun
Repeat above steps for kafka consumer.
Conclusion
This blog walked you through the process of building a Spring Boot Kafka Producer-Consumer application using Docker. We explored how to set up Kafka and Zookeeper in Docker containers using Docker Compose, simplifying the process of managing these distributed services. By integrating Spring Boot with Kafka, we demonstrated how to efficiently handle real-time data streams in a microservices architecture. This setup not only makes your application scalable and flexible but also easier to maintain and deploy.