Quick Start Kafka Producer & Consumer(Springboot) with local Kafka instance

Pradeek Mohandas
4 min readAug 2, 2020

This guide is for an absolute beginner to just get started. Once you have read a little about Kafka and just want to see it up and running quickly.

Prerequisite

  1. Basic Understanding of Kafka.
  2. DockerDesktop(Mac or Windows)/ docker on VM installed.
  3. Basic understanding of spring boot and annotations.

Setup Kafka with Docker

After you’ve installed Docker, you will need to pull down the Landoop Kafka (fast-data-dev) image from Docker Hub. It comes with UI and some existing data to play around with.

docker pull landoop/fast-data-dev

run it in detached mode

docker run -d --name landoopkafka -p 2181:2181 -p 3030:3030 -p 8081:8081 -p 8082:8082 -p 8083:8083 -p 9092:9092 -e ADV_HOST=127.0.0.1 landoop/fast-data-dev

check logs to see if everything is up and access the UI
lenses UI

It comes with a command-line producer and consumer.
First, enter into a container.
docker exec -it landoopkafka bash

Producer :
kafka-console-producer --topic first_topic --broker-list 127.0.0.1:9092

Consumer:
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning

SpringBoot App

Kafka can be configured through configuration class or just adding properties. We will configure by adding minimal properties to get everything running. In the example, we will be producing a JSON and consuming the same.

Add dependency

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

Application.properties the bare minimum.

#server address as set while running container
spring.kafka.bootstrap-servers=127.0.0.1:9092
auto.create.topics.enable=true
#custom field - used to set topic - can be named anything
spring.kafka.incoming.topic=student_topic
#package path of model being used * adds all as trusted source
spring.kafka.consumer.properties.spring.json.trusted.packages=*
#for serizalization and dezrialization, by default it is string
spring.kafka.producer.value-serializer=
org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=
org.springframework.kafka.support.serializer.JsonDeserializer

Producer

Consumer

Two important annotations are @KakfaListner class which will consume the Kafka topics and @Kafkahandler function which will add the implementation of consumption.

I have implemented producer and consumer, taking input via a rest call.
Full code at Github-https://github.com/pradeekpmohandas/ConsumerProducer-kafka.git

Basic error handling

If the incoming JSON is in the wrong format, code will go into error loop. This need to be handled.

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition student_new_topic1–0 at offset 5. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided

Simple workaround.

#remove this property 
spring.kafka.consumer.value-deserializer=
org.springframework.kafka.support.serializer.JsonDeserializer

Here instead of auto mapping of an object we take any input as String and map it later. Thus handling the exception at mapping staging in our code.
Other ways to handle this is to provide an error handler. https://medium.com/@codegard/kafka-consumer-deserialization-error-handling-in-spring-boot-applications-132afd92a8f5

Kafka tool

Currently, we are running lenses with Kafka that’s why we have a UI in a browser. When it’s not there Kafka tool is an alternative for viewing topics and data. Download from https://www.kafkatool.com/.

After the beginner stage

We have used Kafka consumer API and Kafka producer API which is fairly simple.

  • Kafka Connect Source API: a framework built on top of the Producer API, to connect external systems such as databases, key-value stores, search indexes, and file systems.
  • Kafka Connect Sink API: a framework build on top of the Consumer API, to connect external systems.
  • Kafka Streams API: In-stream processing world, reading data from Kafka in real-time and after processing it, writing it back to Kafka. Writing on top of consumer and producer API. (Java and Scala)
  • KSQL DB: Event streaming database for Kafka, to process your data in Kafka. Built on top of Kafka’s Streams API.

More in depth detail of each with differences https://medium.com/@stephane.maarek/the-kafka-api-battle-producer-vs-consumer-vs-kafka-connect-vs-kafka-streams-vs-ksql-ef584274c1e

--

--