Apache kafka with Spring Boot Microservice – JavaDream

Home / Apache kafka with Spring Boot Microservice – JavaDream

HI All in this article we see how to Integrate Apache kafka with Spring Boot. As we all know Kafka is a open source messaging queue and it is scalable, fault-tolerant and offer high performance.

Why we use Apache Kafka With Spring Boot

Generally we use Spring Boot with Apache Kafka in Async communication like you want to send a email of purchase bill to customer or you want to pass some data to other microservice so for that we use kafka. In this article we see a simple producer consumer example using kafka and spring boot.

To Integrate apache kafka with spring boot We have to install it. It is open source you can download it easily. Here i am installing it in Ubuntu.

Below are the steps to install the Apache Kafka in Ubuntu machine.

  1. Go to Apache Kafka website
  2. Choose any version from Binary Downloads and download .tgz file.
  3. Extract this .tgz file.
  4. Always remember you have to start zookeper before starting your kafka server.
  5. Go to your bin folder and start zookeper.
  6. Now stay in bin folder and run your kafka server.
Follow Above Steps:

1- Go to Apache kafka website and download the binary version.

apache kafka tutorial

2- Extract this .tgz file using below command.

tar -xvf kafka_2.12-2.5.0.tgz

3- Now Go to your apache kafka bin folder and run below command to start zookeper

./zookeeper-server-start.sh ../config/zookeeper.properties

4- Now run below command to run your kafka server .

./kafka-server-start.sh ../config/server.properties

your Apache kafka server has been started Now we have to create a Spring boot project and Integrate this Kafka server with that. In this Example we create a simple producer consumer Example means we create a sender and a client. Sender Simply send a message a client will consume this message. For creating a Spring boot application we have to follow thw below steps:

  1. Create a Spring Boot Project and add required dependeny in pom.xml file.
  2. Define Kafka related properties in your application.yml or application.properties file.
  3. Producer class that writes message on Kafka Topic.
  4. Create a Consumer class that reds message from Kafka Topic.
  5. Create a Controller class and make a endPoint to send a message using postman or your frontend application.

Follow Above Steps:

1- Create a Spring Boot Project and add required dependeny in pom.xml file. You can also copy this dependency from maven Repositry

<dependency>
 <groupId>org.springframework.boot</groupId> 			 
 <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
 <groupId>org.springframework.kafka</groupId>
 <artifactId>spring-kafka</artifactId>
</dependency>

2- Define Kafka related properties in your application.yml or application.properties file.

As you know you can either create a application.yml or application.properties file. But many developer prefers application.properties and many will prefers application.yml so i am sharing both the files use which one you like.

application.properties

server.port=8000
spring.kafka.producer.bootstrap=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.bootstrap=localhost:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=group_id

application.yaml

server:
 port: 8000
spring:
  kafka:
     producer:
       bootstrap: localhost:9092
       key-serializer: org.apache.kafka.common.serialization.StringSerializer
       value-serializer: org.apache.kafka.common.serialization.StringSerializer
     consumer:
      bootstrap: localhost:9092
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

If you see in above file here we simplly define a server port as 8000 means my spring boot application is running on 8000 port. And i also define kafka producer and consumer. Here in producer part bootstrap is used to define the Kafka port as i have install kafka in my local machine so i have given the path localhost:9092. In Producer part there are two more keys one is key-serializer and other is value-serializer. If you know about kafka then you know that kafka use key-value for sending message and serialized them so here we use a simple String message so we use StringSerializer .

Now discuss consumer part here bootstrap is same as producer it defines my kafka server path. Here group-id means in Kafka we have to define a topic to send and receive a message. Sender will write the message to this topic and consumer will read the message from this topic. There is a chance that many consumer will read from the same topic so we define a group-id and assign the consumer that group-id. key-deserializer and value-deserializer is used to deserialized the message that send by the producer.

3- Make a Producer class that writes message on Kafka Topic.

package com.vasu.SpringBootKafka.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class Producer {

	private static final Logger logger = LoggerFactory.getLogger(Producer.class);

	private static final String TOPIC = "demo";

	@Autowired
	private KafkaTemplate<String, Object> kafkaTemplate;

	public void sendMessage(String message) {
		logger.info("vvv::  send message");
		kafkaTemplate.send(TOPIC, message);
	}
}

If you see above class code here we define a topic with name demo. And autowired the KafkaTemplate. This class simppy writes the message on the demo topic using KafkaTemplate.

4- Now Make Consumer class that reds message from Kafka Topic.

package com.vasu.SpringBootKafka.service;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class Consumer {

	private static final Logger logger = LoggerFactory.getLogger(Consumer.class);

	@Autowired
	private final ObjectMapper mapper = new ObjectMapper();

	@KafkaListener(topics = "demo", groupId = "group_id")
	public void consume(String message) throws IOException {
		logger.info(String.format("consumed message is= ", message));
	}
}

In above class code we simply consume the message on demo topic and print this message in console. Here we use KafkaListener annotation to read messages from given topic.

5- Now make a Controller class and make a endPoint to send a message using postman or your frontend application.

package com.vasu.SpringBootKafka.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.vasu.SpringBootKafka.service.Producer;

@RestController
@RequestMapping("/kafka")
public class KafkaController {

	private final Producer producer;

	@Autowired
	KafkaController(Producer producer) {
		this.producer = producer;
	}

	@PostMapping(value = "/publish")
	public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
		this.producer.sendMessage(message);
	}
}

Here we simplly define a controller class and make a endPoint. Now run your spring boot application and open postman and hit a post request with message parameter and see your eclipse console you will get the message.

spring boot kafka
spring boot kafka

You can also see all the message of this topic in your kafka server console. Just go to your kafka server bin folder and run the below command.

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning

Github URL – https://github.com/vasurajput/Spring-Boot-Web/tree/master/SpringBootKafka

You may also like.

How to cache data in springboot

Swagger in Springboot application with Example.

Help Others, Please Share

About Author

3 Comments
  1. tadalafil 20 mg

    Awesome! Its genuinely remarkable post, I have got
    much clear idea regarding from this piece of writing.

  2. Free Watch Jav

    Useful information. Lucky me I discovered your site accidentally, and
    I am shocked why this twist of fate didn’t happened earlier!
    I bookmarked it.

  3. tadalafil 20 mg

    There’s certainly a great deal to know about this subject.
    I really like all the points you’ve made.

Leave a Reply

Your email address will not be published. Required fields are marked *