Kafka Connect : Spring Boot, MySQL 연동

2023. 10. 14. 02:08· DevOps/Kafka
목차
  1. 개요
  2. Spring Boot - Producer 
  3. Kafka Sink Connector 등록 ( nametest-sink-connect )
  4. Spring Boot 메세지 전달 결과 확인
  5. 에러 사항 
  6. 마무리

Kafka

 

 

 

개요

이번에는 Kafka Connect을 이용하여 Spring Boot에서 데이터를 보내면 MySQL에 저장하는 로직을 구현해보려 합니다. 

멀티 서버 환경에서 Kafka Connect을 이용하여 순차적으로 처리하여 동시성 문제를 해결할 수 있고 비동기적으로 처리하여 성능 향상을 기대할 수 있습니다. 

 

 

 

Spring Boot - Producer 

Spring Boot에서 MySQL에 DB에 저장하기 위해서는 정해진 패턴대로 메시지를 전송해야 합니다.

정해진 패턴이란 Schema 와 payload로 구성되어 있습니다.

Schema는 Table의 구조를 나타내고 payload는 실제 데이터를 나타냅니다.

스키마를 참고하여 payload의 데이터를 Table에 넣는 식 입니다.

 

이전 글에서 본 Producer 구조와 동일합니다. 

구조는 Schema 안에 Field 리스트가 있고 그 아래에 Payload가 있습니다.

데이터 양식 예시

 

이 형태를 Spring Boot에서 만들어 토픽으로 전송할 겁니다.

저는 테스트용으로 NameTest 라는 Class를 새로 만들었습니다. 

 

NameTest Class

더보기
package com.dotd.product.kafkaConnectTest;


import jdk.jfr.Name;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.CreatedDate;
import org.springframework.data.jpa.domain.support.AuditingEntityListener;

import javax.persistence.*;
import java.time.LocalDateTime;

@Entity
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Table(name = "nametest")
@EntityListeners(AuditingEntityListener.class)
public class NameTest {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Integer id;

    private String name;

    @CreatedDate
    @Column(name = "created_at", nullable = false, columnDefinition="TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
    private LocalDateTime createdAt;


}

 

 

저는 이 NameTest를 데이터를 Kafka Topic에 전달하여 MySQL에 저장해보려 합니다. 

그러기 위해 Scheman를 구성하기 위해 Schema, Field, Payload 파일을 생성합니다. 

 

 

Schema 

import lombok.Builder;
import lombok.Data;

import java.util.List;

@Data
@Builder
public class Schema {
    private String type;
    private List<Field> fields;
    private boolean optional;
    private String name;

}

 

 

Field

import lombok.AllArgsConstructor;
import lombok.Data;

// 데이터를 저장하기 위해서 어떤 필드가 있는지 저장

@Data
@AllArgsConstructor
public class Field {

    private String type;
    private boolean optional;
    private String field;

}

 

 

Payload 

import lombok.Builder;
import lombok.Data;

import javax.persistence.criteria.CriteriaBuilder;
import java.time.LocalDateTime;

@Data
@Builder
public class NameTestPayload {

    private Integer id;
    private String name;
    private String created_at;

}

 

 

Schema는 Field, Payload로 이루어져 있습니다. 이 3개의 파일을 이용해 데이터 양식을 만듭니다.

Service 단에서 이 내용을 채워서 Kafka Topic에 전달합니다.

 

NameTestProducer

import com.dotd.product.practice.dto.Field;
import com.dotd.product.practice.dto.KafkaProductDto;
import com.dotd.product.practice.dto.Schema;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;

@Service
@Slf4j
@RequiredArgsConstructor
public class NameTestProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    List<Field> fields = Arrays.asList(
            new Field("int32", true, "id"),
            new Field("string", true, "name"),
            new Field("string", true, "created_at")
    );

    Schema schema = Schema.builder()
            .type("struct")
            .fields(fields)
            .optional(false)
            .name("nametest")
            .build();

    public void send(NameTestDto dto) {

        String topic = "nametest";

        NameTest nameTest = NameTest.builder()
                // .id(dto.getId())
                .name(dto.getName())
                .createdAt(LocalDateTime.now())
                .build();

        NameTestPayload nameTestPayload = NameTestPayload.builder()
                // .id(nameTest.getId())
                .name(nameTest.getName())
                .created_at(nameTest.getCreatedAt().toString())
                .build();


        KafkaNameTestDto kafkaNameTestDto = new KafkaNameTestDto(schema, nameTestPayload);

        System.out.println(kafkaNameTestDto);

        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try {
            jsonInString = mapper.writeValueAsString(kafkaNameTestDto);
        } catch (JsonProcessingException ex) {
            ex.printStackTrace();
        }


        kafkaTemplate.send(topic, jsonInString);

        log.info("카프카 프로듀서 메시지 전송 " + kafkaNameTestDto);

    }
}

NameTestProducer는 데이터 양식을 만들고 send 메소드를 통해 Kafka Topic에 데이터를 전달하는 Service 로직 코드 입니다. 이 send 메소드를 실행하는 컨트롤러를 생성합니다. 

 

 

 

NameTestController

import com.dotd.product.dto.ProductRegistDto;
import com.dotd.product.dto.ProductResponseDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;

@Controller
@RequiredArgsConstructor
@RequestMapping("/nametest")
@Slf4j
public class NameTestController {

    private final NameTestProducer nameTestProducer;

    @PostMapping("/send")
    public ResponseEntity<?> send(@RequestBody NameTestDto dto) {
        nameTestProducer.send(dto);
        return ResponseEntity.ok("ok");
    }


}

 

여기까지 오시면 Spring Boot를 Producer로서 Kafka Topic에 메세지를 보낼 준비를 마친 겁니다. 

이제 Kafka Sink Connector를 등록하여 Topic에 저장된 메세지를 MySQL에 전달하도록 하겠습니다. 

 

 

Kafka Sink Connector 등록 ( nametest-sink-connect )

Kafka Sink Connector를 등록하기 위해서는 ZooKeeper, Kafka, Kafka Connect 가 먼저 실행되어야 합니다. 이전 글을 참조하시면 도움이 될 겁니다. 

 

Kafka Connect은 RestAPI를 이용하여 Connector를 등록할 수 있습니다. 

저는 PostMan으로 이용해 등록하겠습니다. 

 

주소 

curl -X POST 127.0.0.1:8083/connectors

 

데이터 

{
	"name":"nametest-sink-connect",
	"config":{
	"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
	"connection.url":"jdbc:mysql://localhost:3306/dotd",
	"connection.user":"root",
	"connection.password":"password",
	"auto.create":"true",
	"auto.evolve":"true",
	"delete.enabled":"false",
	"tasks.max":"1",
	"topics":"nametest"
	}
}

 

 

PostMan 화면 

Kafka Sink Connector 등록 (PostMan)

 

 

Spring Boot 메세지 전달 결과 확인

Kafka Sink Connector를 등록했으면 Spring Boot를 실행하여 send 메소드를 실행해 데이터를 Kafka Topic에 전달합니다. 전달된 데이터는 등록된 Kafka Sink Connector인 nametest-sink-connector가 작동하여 MySQL에 nametest 테이블에 전달하여 저장 작업이 이루어집니다. MySQL의 테이블은 Kafka Topic 이름과 동일합니다. 

 

PostMan으로 send 메소드 실행 

send 메소드 실행 (PostMan)

 

 

MySQL 데이터 저장 

nametest에 데이터 저장

 

 

 

에러 사항 

저는 LocalDateTime에서 에러가 참 많이 발생했습니다. 그 이유는 LocalDateTime의 데이터 변환이 이루어지지 않았기 떄문입니다. 실제로 LocalDateTime의 Schema를 보면 String, Integer는 형식이 다릅니다. 이 문제를 LocalDateTime을 String으로 변환하여 해결했습니다. 

 

또한 Kafka Topic에 에러가 발생하면 FAILED 상태로 변해 작동이 멈춥니다. Connector 상세보기 명령어를 통해 자세한 log 내역을 확인할 수 있습니다. 문제를 해결한 다음 재시작하면 문제가 해결될 것 입니다.

양식)
curl -X POST http://<Kafka-Connect-Host>:8083/connectors/<Connector-Name>/tasks/<Task-ID>/restart

예시)
127.0.0.1:8083/connectors/nametest-sink-connect/resume

 

 

 

 

마무리

멀티 서버에서 쓰기 작업에서 동시성 문제를 해결하기 위해 카프카를 공부했습니다.

순차 처리를 통해 동시성 문제를 해결했지만 공부해야 할 게 많이 남아있다는 것을 깨달았습니다.

 

열심히 공부하여 새로운 내용으로 다시 돌아오겠습니다!

감사합니다. 

  1. 개요
  2. Spring Boot - Producer 
  3. Kafka Sink Connector 등록 ( nametest-sink-connect )
  4. Spring Boot 메세지 전달 결과 확인
  5. 에러 사항 
  6. 마무리
'DevOps/Kafka' 카테고리의 다른 글
  • Kafka Connect : Source, Sink 등록
  • Kafka Connect 설치 & 실행 & MySQL 설정
  • Kafka와 Spring Boot 연동
  • Kafka 설치 & 테스트 (Windows 환경)
너지살
너지살
너지살
너지살개발자
너지살
전체
오늘
어제
  • 분류 전체보기 (375)
    • 잡식 (2)
      • 티스토리 (2)
    • 개발 일지 (0)
      • OMS 프로젝트 (4)
      • 우테코 6기 프리코스 (1)
    • Git (2)
    • JAVA (15)
      • Java 공부 (6)
      • 자료구조 (4)
      • 도움되는 메모 (4)
    • DevOps (18)
      • AWS (6)
      • Docker (2)
      • Jenkins (1)
      • Nginx (1)
      • Kafka (6)
      • RabbitMQ (2)
    • Spring, Spring Boot (16)
      • Test Code (1)
      • AOP (2)
      • Batch (3)
      • Cache - Redis (5)
      • Cloud Config - 설정 파일 관리 (3)
      • 성능 측정 (1)
      • 예외 처리 (1)
    • BackEnd (1)
      • Spring 공부 (1)
      • Thymeleaft (0)
    • DB (17)
      • JPA (2)
      • DB 공부 (3)
      • DB 포스팅 (4)
      • DB 답장 (1)
      • MySQL (2)
      • Redis (5)
      • MongoDB (0)
    • CS (8)
      • Spring (4)
      • DataBase (3)
      • Java (1)
    • Algorithm (203)
      • 알고리즘 개념 (5)
      • 정렬 알고리즘 (11)
      • 프로그래머스 문제풀이 (18)
      • 백준 문제풀이 (165)
      • 소프티어 문제풀이 (3)
      • 알고리즘 시험 정리 (1)
    • SQL (0)
      • 문법 (1)
      • 프로그래머스 문제풀이 (52)
      • 리트코드 문제풀이 (19)
    • IT (1)
      • IT 공부 (1)
    • 정리 (10)
      • 질문 정리 (10)

블로그 메뉴

  • 홈
  • 태그
  • 방명록

공지사항

인기 글

태그

  • 외판원 순회 문제
  • 그래프 이론
  • 분리 집합
  • 다이나믹 프로그래밍
  • JPA
  • DFS
  • Bitmast
  • 우선수위큐
  • Union-Find
  • 경로표현식
  • 그래프 탐색
  • Spring Boot Redis 연결
  • 투포인트
  • 최소 신장 트리
  • cache
  • Spring Batch
  • redis
  • dynamiceprogramming
  • 다음 순열 찾기
  • 깊이/너비 우선탐색
  • 유니온파인드
  • 부분탐색
  • 설정
  • 투 포인터
  • two pointer
  • 비트마스킹
  • Java
  • 두 포인터
  • Next permutation
  • Spring Boot
  • 알고리즘
  • Algorithm
  • 다이나믹프로그래밍
  • 자료구조
  • Sorting algorithm
  • 최소 스패닝 트리
  • 병렬 처리
  • Test code
  • dynamic programing
  • 크루스칼 알고리즘
  • 백준
  • Java 정리
  • db
  • DP
  • 질문 정리
  • 소프티어
  • docker
  • git
  • 데이터베이스
  • MST

최근 댓글

최근 글

hELLO · Designed By 정상우.v4.2.2
너지살
Kafka Connect : Spring Boot, MySQL 연동
상단으로

티스토리툴바

단축키

내 블로그

내 블로그 - 관리자 홈 전환
Q
Q
새 글 쓰기
W
W

블로그 게시글

글 수정 (권한 있는 경우)
E
E
댓글 영역으로 이동
C
C

모든 영역

이 페이지의 URL 복사
S
S
맨 위로 이동
T
T
티스토리 홈 이동
H
H
단축키 안내
Shift + /
⇧ + /

* 단축키는 한글/영문 대소문자로 이용 가능하며, 티스토리 기본 도메인에서만 동작합니다.