Quick Start Kafka Producer & Consumer(Springboot) with local Kafka instance

This guide is for an absolute beginner to just get started. Once you have read a little about Kafka and just want to see it up and running quickly.

Prerequisite

  1. DockerDesktop(Mac or Windows)/ docker on VM installed.
  2. Basic understanding of spring boot and annotations.

Setup Kafka with Docker

docker pull landoop/fast-data-dev

run it in detached mode

docker run -d --name landoopkafka -p 2181:2181 -p 3030:3030 -p 8081:8081 -p 8082:8082 -p 8083:8083 -p 9092:9092 -e ADV_HOST=127.0.0.1 landoop/fast-data-dev

check logs to see if everything is up and access the UI
lenses UI

It comes with a command-line producer and consumer.
First, enter into a container.
docker exec -it landoopkafka bash

Producer :
kafka-console-producer --topic first_topic --broker-list 127.0.0.1:9092

Consumer:
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning

SpringBoot App

Add dependency

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

Application.properties the bare minimum.

#server address as set while running container
spring.kafka.bootstrap-servers=127.0.0.1:9092
auto.create.topics.enable=true
#custom field - used to set topic - can be named anything
spring.kafka.incoming.topic=student_topic
#package path of model being used * adds all as trusted source
spring.kafka.consumer.properties.spring.json.trusted.packages=*
#for serizalization and dezrialization, by default it is string
spring.kafka.producer.value-serializer=
org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=
org.springframework.kafka.support.serializer.JsonDeserializer

Producer

Consumer

I have implemented producer and consumer, taking input via a rest call.
Full code at Github-https://github.com/pradeekpmohandas/ConsumerProducer-kafka.git

Basic error handling

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition student_new_topic1–0 at offset 5. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided

Simple workaround.

#remove this property 
spring.kafka.consumer.value-deserializer=
org.springframework.kafka.support.serializer.JsonDeserializer

Here instead of auto mapping of an object we take any input as String and map it later. Thus handling the exception at mapping staging in our code.
Other ways to handle this is to provide an error handler. https://medium.com/@codegard/kafka-consumer-deserialization-error-handling-in-spring-boot-applications-132afd92a8f5

Kafka tool

Currently, we are running lenses with Kafka that’s why we have a UI in a browser. When it’s not there Kafka tool is an alternative for viewing topics and data. Download from https://www.kafkatool.com/.

After the beginner stage

  • Kafka Connect Source API: a framework built on top of the Producer API, to connect external systems such as databases, key-value stores, search indexes, and file systems.
  • Kafka Connect Sink API: a framework build on top of the Consumer API, to connect external systems.
  • Kafka Streams API: In-stream processing world, reading data from Kafka in real-time and after processing it, writing it back to Kafka. Writing on top of consumer and producer API. (Java and Scala)
  • KSQL DB: Event streaming database for Kafka, to process your data in Kafka. Built on top of Kafka’s Streams API.

More in depth detail of each with differences https://medium.com/@stephane.maarek/the-kafka-api-battle-producer-vs-consumer-vs-kafka-connect-vs-kafka-streams-vs-ksql-ef584274c1e

Insta : https://bit.ly/instaPPM Tech Savy | Engineer | psychology enthusiast