[Kafka] 스프링부트에 Kafka 적용해보기
스프링 프로젝트 생성하기
- Dependencies에 우선 Kafka, Spring Web, Mysql, Lombok, JPA(없어도 무방은 할 것 같으나 추후 추가 테스트를 위해)
를 추가했다.
server.address=localhost
server.port=8080
# API 호출시, SQL 문을 콘솔에 출력한다.
spring.jpa.show-sql=true
# DDL 정의시 데이터베이스의 고유 기능을 사용합니다.
# ex) 테이블 생성, 삭제 등
spring.jpa.generate-ddl=true
# MySQL 을 사용할 것.
spring.jpa.database=mysql
# MySQL 설정
spring.datasource.url=jdbc:mysql://localhost:3306/kafka?useSSL=false&characterEncoding=UTF-8&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# MySQL 상세 지정
spring.jpa.database-platform=org.hibernate.dialect.MySQL5InnoDBDialect
############################## Kafka ##############################
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=kafka-demo
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.max-poll-records=1000
spring.kafka.template.default-topic=kafka-demo
spring.kafka.bootstrap-servers
카프카서버 정보, 기본적으로 9092 포트를 사용한다.
spring.kafka.consumer.group-id
컨슈머의 그룹id
spring.kafka.consumer.enable-auto-commit
데이터를 어디까지 읽었다는 offset을 주기적으로 저장할지 여부
spring.kafka.consumer.auto-offset-reset
offset에 오류가 있을 경우 어디서부터 다시 할지 여부
ealiest - 맨처음부터 다시 읽는다
latest - 이전꺼는 무시하고, 이제부터 들어오는 데이터부터 읽기 시작한다
spring.kafka.producer.key-serializer
데이터를 kafka로 전달할때 사용하는 Key Encoder ClassStringSerializer는 문자열 형태의 데이터에만 사용 가능
spring.kafka.consumer.key-deserializer
데이터를 kafka에서 받아서 사용하는 Key Decoder ClassStringDeserializer는 문자열 형태의 데이터에만 사용 가능
spring.kafka.producer.value-serializer
데이터를 kafka로 전달할때 사용하는 Value Encoder ClassStringSerializer는 문자열 형태의 데이터에만 사용 가능
spring.kafka.consumer.value-deserializer
데이터를 kafka에서 받아서 사용하는 Value Decoder ClassStringDeserializer는 문자열 형태의 데이터에만 사용 가능
spring.kafka.consumer.max-poll-records
consumer가 한번에 가져오는 message 갯수
spring.kafka.template.default-topic
기본 설정 topic name
Controller / Service 만들어 주기
KafkaProducer
package com.example.kafka.Service;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class KafkaProducer {
private static final String TOPIC = "kafka-demo";
private final KafkaTemplate<String , String> kafkaTemplate;
public void sendMessage(String message){
System.out.println(String.format("Produce message : %s", message));
kafkaTemplate.send(TOPIC, message);
}
}
- KafkaProducer는 위와 같다. TOPIC은 properties에서 설정한 토픽으로 설정해줘야 한다.
- 아 물론 당연히 저 토픽 부분이 앞으로는 request에서 header, body(data), params 등 으로 가변적인 요소가 될 것이다.
- 핵심은 producer는 this.kafkaTemplate.send(TOPIC, message); 를 통해서 TOPIC에 해당하는 message를 전달할 것이다.
KafkaConsumer
package com.example.kafka.Service;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "kafka-demo", groupId = "kafka-demo")
public void consume(String message) throws IOException {
System.out.println(String.format("Consumed message : %s",message));
}
}
- KafkaConsumer는 위와 같다. topics와 groupId는 우선 properties에서 설정한 것으로 해주자.
KafkaController
package com.example.kafka.Controller;
import com.example.kafka.Service.KafkaProducer;
import com.example.kafka.Vo.JsonResultVo;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.LinkedHashMap;
import java.util.List;
@RestController
@RequestMapping("/kafka")
@RequiredArgsConstructor
public class KafkaController {
private final KafkaProducer producer;
@PostMapping
public ResponseEntity<Object> sendMessage(@RequestParam("message") String message){
JsonResultVo<List> result = new JsonResultVo<>();
try {
producer.sendMessage(message);
return ResponseEntity.ok().body(new JsonResultVo<>(message));
}catch (Exception e){
result.setMsg(e.toString());
return ResponseEntity.ok().body(result);
}
}
}
서버를 Run하고 Postman으로 테스트 진행해보자.
실제 Kafka 서버에 들어가서 Queue가 저장되었는지 확인해보자.
- container로 접근해서 consumer를 실행시키는 shell을 통해서 'kafka-demo'에 해당하는 토픽을 'from-beginning'으로 찍어보니 우리가 보낸 메시지, Hello Jacob을 볼 수 있다.
- 실시간으로 직접 해보자! 생각보다 퍼포먼스가 굉장히 빠르다는 것을 바로 체감할 수 있다!!