Web/Kafka

[Kafka] 스프링부트에 Kafka 적용해보기

부에나온다 2022. 8. 2. 17:43

스프링 프로젝트 생성하기

      - Dependencies에 우선 Kafka, Spring Web, Mysql, Lombok, JPA(없어도 무방은 할 것 같으나 추후 추가 테스트를 위해)

         를 추가했다.

server.address=localhost
server.port=8080

# API 호출시, SQL 문을 콘솔에 출력한다.
spring.jpa.show-sql=true

# DDL 정의시 데이터베이스의 고유 기능을 사용합니다.
# ex) 테이블 생성, 삭제 등
spring.jpa.generate-ddl=true

# MySQL 을 사용할 것.
spring.jpa.database=mysql

# MySQL 설정
spring.datasource.url=jdbc:mysql://localhost:3306/kafka?useSSL=false&characterEncoding=UTF-8&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

# MySQL 상세 지정
spring.jpa.database-platform=org.hibernate.dialect.MySQL5InnoDBDialect

############################## Kafka ##############################

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=kafka-demo
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.max-poll-records=1000
spring.kafka.template.default-topic=kafka-demo

spring.kafka.bootstrap-servers
카프카서버 정보, 기본적으로 9092 포트를 사용한다.

spring.kafka.consumer.group-id
컨슈머의 그룹id

spring.kafka.consumer.enable-auto-commit
데이터를 어디까지 읽었다는 offset을 주기적으로 저장할지 여부

spring.kafka.consumer.auto-offset-reset
offset에 오류가 있을 경우 어디서부터 다시 할지 여부
ealiest - 맨처음부터 다시 읽는다
latest - 이전꺼는 무시하고, 이제부터 들어오는 데이터부터 읽기 시작한다

spring.kafka.producer.key-serializer
데이터를 kafka로 전달할때 사용하는 Key Encoder ClassStringSerializer는 문자열 형태의 데이터에만 사용 가능

spring.kafka.consumer.key-deserializer
데이터를 kafka에서 받아서 사용하는 Key Decoder ClassStringDeserializer는 문자열 형태의 데이터에만 사용 가능

spring.kafka.producer.value-serializer
데이터를 kafka로 전달할때 사용하는 Value Encoder ClassStringSerializer는 문자열 형태의 데이터에만 사용 가능

spring.kafka.consumer.value-deserializer
데이터를 kafka에서 받아서 사용하는 Value Decoder ClassStringDeserializer는 문자열 형태의 데이터에만 사용 가능

spring.kafka.consumer.max-poll-records
consumer가 한번에 가져오는 message 갯수

spring.kafka.template.default-topic
기본 설정 topic name

 

Controller / Service 만들어 주기

프로젝트 구성도

KafkaProducer

package com.example.kafka.Service;

import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class KafkaProducer {
    private static final String TOPIC = "kafka-demo";
    private final KafkaTemplate<String , String> kafkaTemplate;

    public void sendMessage(String message){
        System.out.println(String.format("Produce message : %s", message));
        kafkaTemplate.send(TOPIC, message);
    }

}
  • KafkaProducer는 위와 같다. TOPIC은 properties에서 설정한 토픽으로 설정해줘야 한다.
  • 아 물론 당연히 저 토픽 부분이 앞으로는 request에서 header, body(data), params 등 으로 가변적인 요소가 될 것이다.
  • 핵심은 producer는 this.kafkaTemplate.send(TOPIC, message); 를 통해서 TOPIC에 해당하는 message를 전달할 것이다.

KafkaConsumer

package com.example.kafka.Service;

import lombok.RequiredArgsConstructor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "kafka-demo", groupId = "kafka-demo")
    public void consume(String message) throws IOException {
        System.out.println(String.format("Consumed message : %s",message));
    }

}
  • KafkaConsumer는 위와 같다. topics와 groupId는 우선 properties에서 설정한 것으로 해주자.

KafkaController

package com.example.kafka.Controller;

import com.example.kafka.Service.KafkaProducer;
import com.example.kafka.Vo.JsonResultVo;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.LinkedHashMap;
import java.util.List;

@RestController
@RequestMapping("/kafka")
@RequiredArgsConstructor
public class KafkaController {
    private final KafkaProducer producer;

    @PostMapping
    public ResponseEntity<Object> sendMessage(@RequestParam("message") String message){
        JsonResultVo<List> result = new JsonResultVo<>();

        try {
            producer.sendMessage(message);
            return ResponseEntity.ok().body(new JsonResultVo<>(message));
        }catch (Exception e){
            result.setMsg(e.toString());
            return ResponseEntity.ok().body(result);
        }
    }
}

 

서버를 Run하고 Postman으로 테스트 진행해보자.

서버 로그에 찍힌것을 확인할수 있다.

실제 Kafka 서버에 들어가서 Queue가 저장되었는지 확인해보자.

  • container로 접근해서 consumer를 실행시키는 shell을 통해서 'kafka-demo'에 해당하는 토픽을 'from-beginning'으로 찍어보니 우리가 보낸 메시지, Hello Jacob을 볼 수 있다.
  • 실시간으로 직접 해보자! 생각보다 퍼포먼스가 굉장히 빠르다는 것을 바로 체감할 수 있다!!