개요
이번에는 Kafka와 Spring Boot를 연동해보려 합니다.
Spring Boot에 메시지를 전송하는 Producer, 메시지를 받는 Consumer의 코드를 추가하여 Kafka를 통해 메시지를 주고 받게 해볼 겁니다.
이 글은 Kafka Consumer 구현, Kafka Producer 구현, 테스트 순으로 정리했습니다.
시작하기 전
Spring Boot에 Producer, Consumer 코드를 추가하기 전에 Kafka에 Topic을 하나 생성해야 합니다.
또한 테스트를 하려면 ZooKeeper와 Kafka를 실행시켜야 합니다.
ZooKeeper와 Kafka를 설치하고 실행하는 글은 이전에 정리한 글을 참조하시면 도움이 될 것 입니다.
https://cladren123.tistory.com/190
Kafka Consumer
Consumer는 Kafka의 토픽을 구독하여 토픽에 메시지를 가져오는 역할을 합니다.
의존성 추가
implementation 'org.springframework.kafka:spring-kafka'
Kafka Consumer Config 생성
KafkaConsumerConfig 파일을 만들어 KafkaConsumer에 대한 설정을 저장합니다.
이 파일에는 ConsumerFactory(), KafkaListenerContainerFactory() 라는 2가지 Bean을 동륵합니다.
ConsumerFactory() 는 Kafka Topic의 구독 위치와 직렬화 등에 대한 설정을 합니다.
KafkaListenerContainerFactory() 는 ConsumerFactory()의 설정 정보를 받아와 Kafka Listener Container를 만듭니다.
이 Kafka Listener Container는 토픽에 변경 사항을 감지하여 정보를 받아오는 리스너 생성에 쓰입니다.
서비스단에 메소드에 @KafkaListenr를 설정하면 Kafka Listener Container에서 Listener를 생성하여 카프카 토픽에 변경 사항을 감지하여 해당 메소드를 실행시킵니다.
package com.dotd.product.practice.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
// 접속하고자 하는 곳에 정보
@Bean
public ConsumerFactory<String, String> consumerFactory() {
// Map을 이용해 Key, Value 형태로 Kafka Consumer에 필요한 설정을 저장합니다
Map<String, Object> properties = new HashMap<>();
// Kafka Broker 위치 저장합니다.
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// Kafka Consumer 그룹의 ID 지정, 같은 그룹 ID를 가진 컨슈머들은 메시지를 공유하여 처리합니다.
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
// Kafka 메시지의 키, 값을 어떻게 역직렬화 할지 설정합니다.
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 위의 설정한 properties 값을 바탕으로 ConsumerFactory를 생성합니다.
return new DefaultKafkaConsumerFactory<>(properties);
}
// 위에서 정의한 consumerFactory를 사용해 Kafka Listener Containers를 만들어 줍니다.
// 이 설정을 사용하여 해당 토픽의 메시지를 가져옵니다.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
Kafka Consumer 생성
@KafkaListener(topics = "") 의 어노테이션을 통해 카프카 브로커에 토픽을 구독합니다.
이 KafkaListener는 위의 KafkaConsumerConfig에서 생성된 것 입니다.
구독한 토픽에 메시지가 들어오면 해당 메시지를 가지고 와서 메소드의 로직을 수행합니다.
이 때, 서버끼리 통신할 때는 메시지를 직렬화 합니다.(저는 JSON 형태로 직렬화 했습니다.)
그러므로 메시지를 읽을 때는 역직렬화 과정을 수행합니다. 이 작업을 ObjectMapper가 수행합니다.
package com.dotd.product.practice.consumer;
import com.dotd.product.entity.Product;
import com.dotd.product.repository.ProductRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@Service
@Slf4j
@RequiredArgsConstructor // 생성자 자동 생성
public class KafkaConsumer {
private final ProductRepository productRepository;
// 등록해놓은 카프카 토픽에 데이터가 들어오면 실행되는 로직
@KafkaListener(topics = "example-product-topic")
public void updatePrice(String kafkaMessage) {
log.info("Kafka Message : -> " + kafkaMessage);
Map<Object, Object> map = new HashMap<>();
// ObjectMapper를 통해 카프카로부터 받은 메시지를 Map 구조로 변환합니다.
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
log.info("결과 확인" + " " + map.get("name") + " " + map.get("price"));
}
}
Kafka Producer
Producer 는 지정된 토픽에 메시지를 보내는 역할을 합니다.
의존성 추가
implementation 'org.springframework.kafka:spring-kafka'
Kafka Producer Config 생성
Kafka Producer Config 파일을 만들어 설정 정보를 저장합니다.
이 설정 파일은 ProducerFactory(), KafkaTemplate() 두 개의 빈을 등록합니다.
ProducerFactory() 는 Producer를 만들기 위한 설정 정보들을 저장합니다.
Kafka Broker의 주소, 직렬화 방법 등을 설정합니다.
KafkaTemplate() 은 위의 ProducerFactory를 사용하여 Kafka Template를 생성합니다.
Kafka Template은 메시지를 전송하기 위한 주요 메소드들을 제공합니다.
package com.dotd.product.practice.producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaProducerConfig {
// Kafka Producer에 필요한 설정들을 저장
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
// kafka broker 위치 지정
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// kafka 메시지의 키와 값을 어떻게 직렬화하는지 지정
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 위에서 설정한 properties를 사용해 ProducerFactory를 생성
return new DefaultKafkaProducerFactory<>(properties);
}
// KafkaTemplate은 메시지를 전송하기 위한 주요 메소드들을 제공
// ProducerFactory를 사용하여 Kafka Template을 생성
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
KafkaProducer 생성
KafkaProducer는 메시지를 JSON으로 변환하여 원하는 Topic에 메시지를 보냅니다.
이 때 ObjectMapper를 통해 객체를 JSON 형태로 직렬화 합니다.
그 후 KafkaTemplate를 통해 원하는 토픽과 JSON된 직렬화된 객체를 넣어 카프카에 보냅니다.
package com.dotd.product.practice.producer;
import com.dotd.product.dto.ProductRegistDto;
import com.dotd.product.dto.ProductResponseDto;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class KafkaProducer {
// Kafka 메시지를 전송하기 위한 템플릿 (문자열 키, 값)
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// kafka에 메시지 전송
public void send(String topic, ProductRegistDto dto) {
// ObjectMapper를 통해 ProductRegistDto 객체를 JSON 문자열로 변환
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(dto);
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
// JSON 문자열을 해당 topic에 전달
kafkaTemplate.send(topic, jsonInString);
log.info("Kafka Producer sent data from the product micro service " + dto);
}
}
테스트
저는 한 Spring Boot에 Consumer와 Producer를 만들어 테스트를 진행했습니다.
다른 Spring Boot에 Consumer, Producer 를 각각 만들어도 테스트하셔도 됩니다.
KafkaProducerController 생성
위에서 만든 KafkaProducer의 send 메시지를 실행할 Controller 입니다.
필자는 테스트를 위해 카프카에 example-product-topic을 미리 만들었습니다.
package com.dotd.product.practice.producer;
import com.dotd.product.dto.ProductRegistDto;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/kafka")
@RequiredArgsConstructor
public class KafkaProducerController {
private final KafkaProducer kafkaProducer;
// kafka 메시지를 보내는 함수
@PostMapping("/send")
public ResponseEntity<?> send(@RequestBody ProductRegistDto dto) {
kafkaProducer.send("example-product-topic", dto);
return ResponseEntity.ok("ok");
}
}
그리고 테스트용으로 사용한 ProductRegistDto 의 코드는 다음과 같습니다.
package com.dotd.product.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ProductRegistDto {
private String sellerId;
private Integer categoryId;
private String code;
private String name;
private Integer price;
private String description;
}
위의 Controller를 만들어 PostMan으로 요청을 보냈습니다.
다음은 그 결과 로그입니다.
Kafka Producer를 통해 보낸 메시지가 Kafka Consumer 가 받은 것을 확인할 수 있었습니다.
마무리
이번에는 Spring Boot와 Kafka 를 연동하여 서버끼리 메시지큐를 이용한 통신을 구현해 보았습니다.
Kafka는 여러 개의 서비스로 나누어진 MSA 환경에서 중요한 역할을 합니다.
여러 개의 서버가 토픽을 구독할 수 있는 점에서 여러 개의 DB가 있는 MSA 환경에서 데이터의 무결성을 지켜주는 중요한 역할을 합니다.
다음에는 Kafka Connect를 사용해 보겠습니다. 감사합니다.