DevOps/Kafka

Kafka와 Spring Boot 연동

너지살 2023. 9. 30. 01:46

메시지큐 Kafka

 

개요

이번에는 Kafka와 Spring Boot를 연동해보려 합니다.

Spring Boot에 메시지를 전송하는 Producer, 메시지를 받는 Consumer의 코드를 추가하여 Kafka를 통해 메시지를 주고 받게 해볼 겁니다.

이 글은 Kafka Consumer 구현, Kafka Producer 구현, 테스트 순으로 정리했습니다.

 

 

 

시작하기 전

Spring Boot에 Producer, Consumer 코드를 추가하기 전에 Kafka에 Topic을 하나 생성해야 합니다. 

또한 테스트를 하려면 ZooKeeper와 Kafka를 실행시켜야 합니다. 

ZooKeeper와 Kafka를 설치하고 실행하는 글은 이전에 정리한 글을 참조하시면 도움이 될 것 입니다.

https://cladren123.tistory.com/190

 

 

 

Kafka Consumer

Consumer는 Kafka의 토픽을 구독하여 토픽에 메시지를 가져오는 역할을 합니다.

 

 

의존성 추가

implementation 'org.springframework.kafka:spring-kafka'

 

 

Kafka Consumer Config 생성

 

KafkaConsumerConfig 파일을 만들어 KafkaConsumer에 대한 설정을 저장합니다.

이 파일에는 ConsumerFactory(), KafkaListenerContainerFactory() 라는 2가지 Bean을 동륵합니다.

 

ConsumerFactory() 는 Kafka Topic의 구독 위치와 직렬화 등에 대한 설정을 합니다. 

KafkaListenerContainerFactory() 는 ConsumerFactory()의 설정 정보를 받아와 Kafka Listener Container를 만듭니다.

이 Kafka Listener Container는 토픽에 변경 사항을 감지하여 정보를 받아오는 리스너 생성에 쓰입니다.

서비스단에 메소드에 @KafkaListenr를 설정하면 Kafka Listener Container에서 Listener를 생성하여  카프카 토픽에 변경 사항을 감지하여 해당 메소드를 실행시킵니다.

package com.dotd.product.practice.consumer;


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    // 접속하고자 하는 곳에 정보
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {

        // Map을 이용해 Key, Value 형태로 Kafka Consumer에 필요한 설정을 저장합니다
        Map<String, Object> properties = new HashMap<>();

        // Kafka Broker 위치 저장합니다.
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

        // Kafka Consumer 그룹의 ID 지정, 같은 그룹 ID를 가진 컨슈머들은 메시지를 공유하여 처리합니다.
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");

        // Kafka 메시지의 키, 값을 어떻게 역직렬화 할지 설정합니다.
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        // 위의 설정한 properties 값을 바탕으로 ConsumerFactory를 생성합니다.
        return new DefaultKafkaConsumerFactory<>(properties);
    }

    // 위에서 정의한 consumerFactory를 사용해 Kafka Listener Containers를 만들어 줍니다.
    // 이 설정을 사용하여 해당 토픽의 메시지를 가져옵니다.
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
                = new ConcurrentKafkaListenerContainerFactory<>();

        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());

        return kafkaListenerContainerFactory;
    }

}

 

 

 

Kafka Consumer 생성

@KafkaListener(topics = "") 의 어노테이션을 통해 카프카 브로커에 토픽을 구독합니다.

이 KafkaListener는 위의 KafkaConsumerConfig에서 생성된 것 입니다. 

구독한 토픽에 메시지가 들어오면 해당 메시지를 가지고 와서 메소드의 로직을 수행합니다. 

이 때, 서버끼리 통신할 때는 메시지를 직렬화 합니다.(저는 JSON 형태로 직렬화 했습니다.)

그러므로 메시지를 읽을 때는 역직렬화 과정을 수행합니다. 이 작업을 ObjectMapper가 수행합니다.

package com.dotd.product.practice.consumer;


import com.dotd.product.entity.Product;
import com.dotd.product.repository.ProductRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

@Service
@Slf4j
@RequiredArgsConstructor // 생성자 자동 생성
public class KafkaConsumer {

    private final ProductRepository productRepository;

    // 등록해놓은 카프카 토픽에 데이터가 들어오면 실행되는 로직
    @KafkaListener(topics = "example-product-topic")
    public void updatePrice(String kafkaMessage) {
        log.info("Kafka Message : -> " + kafkaMessage);

        Map<Object, Object> map = new HashMap<>();

        // ObjectMapper를 통해 카프카로부터 받은 메시지를 Map 구조로 변환합니다.
        ObjectMapper mapper = new ObjectMapper();
        try {
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
        } catch (JsonProcessingException ex) {
            ex.printStackTrace();
        }

        log.info("결과 확인" + " " + map.get("name") + " " + map.get("price"));

    }

}

 

 

 

 

 

Kafka Producer

Producer 는 지정된 토픽에 메시지를 보내는 역할을 합니다.

 

 

의존성 추가

implementation 'org.springframework.kafka:spring-kafka'

 

 

Kafka Producer Config 생성

Kafka Producer Config 파일을 만들어 설정 정보를 저장합니다.

이 설정 파일은 ProducerFactory(), KafkaTemplate() 두 개의 빈을 등록합니다.

 

ProducerFactory() 는 Producer를 만들기 위한 설정 정보들을 저장합니다. 

Kafka Broker의 주소, 직렬화 방법 등을 설정합니다.

 

KafkaTemplate() 은 위의 ProducerFactory를 사용하여 Kafka Template를 생성합니다.

Kafka Template은 메시지를 전송하기 위한 주요 메소드들을 제공합니다. 

 

package com.dotd.product.practice.producer;



import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaProducerConfig {

    // Kafka Producer에 필요한 설정들을 저장
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<>();

        // kafka broker 위치 지정
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

        // kafka 메시지의 키와 값을 어떻게 직렬화하는지 지정
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 위에서 설정한 properties를 사용해 ProducerFactory를 생성 
        return new DefaultKafkaProducerFactory<>(properties);
    }

    // KafkaTemplate은 메시지를 전송하기 위한 주요 메소드들을 제공
    // ProducerFactory를 사용하여 Kafka Template을 생성
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

 

 

 

KafkaProducer 생성

KafkaProducer는 메시지를 JSON으로 변환하여 원하는 Topic에 메시지를 보냅니다.

이 때 ObjectMapper를 통해 객체를 JSON 형태로 직렬화 합니다.

그 후 KafkaTemplate를 통해 원하는 토픽과 JSON된 직렬화된 객체를 넣어 카프카에 보냅니다.

package com.dotd.product.practice.producer;


import com.dotd.product.dto.ProductRegistDto;
import com.dotd.product.dto.ProductResponseDto;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class KafkaProducer {

    // Kafka 메시지를 전송하기 위한 템플릿 (문자열 키, 값)
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // kafka에 메시지 전송
    public void send(String topic, ProductRegistDto dto) {

        // ObjectMapper를 통해 ProductRegistDto 객체를 JSON 문자열로 변환
        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try {
            jsonInString = mapper.writeValueAsString(dto);
        } catch (JsonProcessingException ex) {
            ex.printStackTrace();
        }

        // JSON 문자열을 해당 topic에 전달 
        kafkaTemplate.send(topic, jsonInString);

        log.info("Kafka Producer sent data from the product micro service " + dto);

    }

}

 

 

 

테스트

저는 한 Spring Boot에 Consumer와 Producer를 만들어 테스트를 진행했습니다.

다른 Spring Boot에 Consumer, Producer 를 각각 만들어도 테스트하셔도 됩니다. 

 

 

KafkaProducerController 생성

위에서 만든 KafkaProducer의 send 메시지를 실행할 Controller 입니다.

필자는 테스트를 위해 카프카에 example-product-topic을 미리 만들었습니다. 

package com.dotd.product.practice.producer;


import com.dotd.product.dto.ProductRegistDto;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/kafka")
@RequiredArgsConstructor
public class KafkaProducerController {
    
    private final KafkaProducer kafkaProducer;
    
    // kafka 메시지를 보내는 함수 
    @PostMapping("/send")
    public ResponseEntity<?> send(@RequestBody ProductRegistDto dto) {
        kafkaProducer.send("example-product-topic", dto);
        return ResponseEntity.ok("ok");
    }
}

 

그리고 테스트용으로 사용한 ProductRegistDto 의 코드는 다음과 같습니다.

더보기
package com.dotd.product.dto;


import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ProductRegistDto {

    private String sellerId;

    private Integer categoryId;

    private String code;
    private String name;
    private Integer price;
    private String description;

}

 

위의 Controller를 만들어 PostMan으로 요청을 보냈습니다.

다음은 그 결과 로그입니다.

 

Kafka의 Producer, Consumer 로그

 

 

Kafka Producer를 통해 보낸 메시지가 Kafka Consumer 가 받은 것을 확인할 수 있었습니다. 

 

 

 

 

마무리

이번에는 Spring Boot와 Kafka 를 연동하여 서버끼리 메시지큐를 이용한 통신을 구현해 보았습니다.

Kafka는 여러 개의 서비스로 나누어진 MSA 환경에서 중요한 역할을 합니다.

여러 개의 서버가 토픽을 구독할 수 있는 점에서 여러 개의 DB가 있는 MSA 환경에서 데이터의 무결성을 지켜주는 중요한 역할을 합니다.

다음에는 Kafka Connect를 사용해 보겠습니다. 감사합니다.