
개요
이번에는 Kafka Connect을 이용하여 Spring Boot에서 데이터를 보내면 MySQL에 저장하는 로직을 구현해보려 합니다.
멀티 서버 환경에서 Kafka Connect을 이용하여 순차적으로 처리하여 동시성 문제를 해결할 수 있고 비동기적으로 처리하여 성능 향상을 기대할 수 있습니다.
Spring Boot - Producer
Spring Boot에서 MySQL에 DB에 저장하기 위해서는 정해진 패턴대로 메시지를 전송해야 합니다.
정해진 패턴이란 Schema 와 payload로 구성되어 있습니다.
Schema는 Table의 구조를 나타내고 payload는 실제 데이터를 나타냅니다.
스키마를 참고하여 payload의 데이터를 Table에 넣는 식 입니다.
이전 글에서 본 Producer 구조와 동일합니다.
구조는 Schema 안에 Field 리스트가 있고 그 아래에 Payload가 있습니다.

이 형태를 Spring Boot에서 만들어 토픽으로 전송할 겁니다.
저는 테스트용으로 NameTest 라는 Class를 새로 만들었습니다.
NameTest Class
package com.dotd.product.kafkaConnectTest;
import jdk.jfr.Name;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.CreatedDate;
import org.springframework.data.jpa.domain.support.AuditingEntityListener;
import javax.persistence.*;
import java.time.LocalDateTime;
@Entity
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Table(name = "nametest")
@EntityListeners(AuditingEntityListener.class)
public class NameTest {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
private String name;
@CreatedDate
@Column(name = "created_at", nullable = false, columnDefinition="TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
private LocalDateTime createdAt;
}
저는 이 NameTest를 데이터를 Kafka Topic에 전달하여 MySQL에 저장해보려 합니다.
그러기 위해 Scheman를 구성하기 위해 Schema, Field, Payload 파일을 생성합니다.
Schema
import lombok.Builder;
import lombok.Data;
import java.util.List;
@Data
@Builder
public class Schema {
private String type;
private List<Field> fields;
private boolean optional;
private String name;
}
Field
import lombok.AllArgsConstructor;
import lombok.Data;
// 데이터를 저장하기 위해서 어떤 필드가 있는지 저장
@Data
@AllArgsConstructor
public class Field {
private String type;
private boolean optional;
private String field;
}
Payload
import lombok.Builder;
import lombok.Data;
import javax.persistence.criteria.CriteriaBuilder;
import java.time.LocalDateTime;
@Data
@Builder
public class NameTestPayload {
private Integer id;
private String name;
private String created_at;
}
Schema는 Field, Payload로 이루어져 있습니다. 이 3개의 파일을 이용해 데이터 양식을 만듭니다.
Service 단에서 이 내용을 채워서 Kafka Topic에 전달합니다.
NameTestProducer
import com.dotd.product.practice.dto.Field;
import com.dotd.product.practice.dto.KafkaProductDto;
import com.dotd.product.practice.dto.Schema;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
@Service
@Slf4j
@RequiredArgsConstructor
public class NameTestProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
List<Field> fields = Arrays.asList(
new Field("int32", true, "id"),
new Field("string", true, "name"),
new Field("string", true, "created_at")
);
Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("nametest")
.build();
public void send(NameTestDto dto) {
String topic = "nametest";
NameTest nameTest = NameTest.builder()
// .id(dto.getId())
.name(dto.getName())
.createdAt(LocalDateTime.now())
.build();
NameTestPayload nameTestPayload = NameTestPayload.builder()
// .id(nameTest.getId())
.name(nameTest.getName())
.created_at(nameTest.getCreatedAt().toString())
.build();
KafkaNameTestDto kafkaNameTestDto = new KafkaNameTestDto(schema, nameTestPayload);
System.out.println(kafkaNameTestDto);
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(kafkaNameTestDto);
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("카프카 프로듀서 메시지 전송 " + kafkaNameTestDto);
}
}
NameTestProducer는 데이터 양식을 만들고 send 메소드를 통해 Kafka Topic에 데이터를 전달하는 Service 로직 코드 입니다. 이 send 메소드를 실행하는 컨트롤러를 생성합니다.
NameTestController
import com.dotd.product.dto.ProductRegistDto;
import com.dotd.product.dto.ProductResponseDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
@Controller
@RequiredArgsConstructor
@RequestMapping("/nametest")
@Slf4j
public class NameTestController {
private final NameTestProducer nameTestProducer;
@PostMapping("/send")
public ResponseEntity<?> send(@RequestBody NameTestDto dto) {
nameTestProducer.send(dto);
return ResponseEntity.ok("ok");
}
}
여기까지 오시면 Spring Boot를 Producer로서 Kafka Topic에 메세지를 보낼 준비를 마친 겁니다.
이제 Kafka Sink Connector를 등록하여 Topic에 저장된 메세지를 MySQL에 전달하도록 하겠습니다.
Kafka Sink Connector 등록 ( nametest-sink-connect )
Kafka Sink Connector를 등록하기 위해서는 ZooKeeper, Kafka, Kafka Connect 가 먼저 실행되어야 합니다. 이전 글을 참조하시면 도움이 될 겁니다.
Kafka Connect은 RestAPI를 이용하여 Connector를 등록할 수 있습니다.
저는 PostMan으로 이용해 등록하겠습니다.
주소
curl -X POST 127.0.0.1:8083/connectors
데이터
{
"name":"nametest-sink-connect",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3306/dotd",
"connection.user":"root",
"connection.password":"password",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"nametest"
}
}
PostMan 화면

Spring Boot 메세지 전달 결과 확인
Kafka Sink Connector를 등록했으면 Spring Boot를 실행하여 send 메소드를 실행해 데이터를 Kafka Topic에 전달합니다. 전달된 데이터는 등록된 Kafka Sink Connector인 nametest-sink-connector가 작동하여 MySQL에 nametest 테이블에 전달하여 저장 작업이 이루어집니다. MySQL의 테이블은 Kafka Topic 이름과 동일합니다.
PostMan으로 send 메소드 실행

MySQL 데이터 저장

에러 사항
저는 LocalDateTime에서 에러가 참 많이 발생했습니다. 그 이유는 LocalDateTime의 데이터 변환이 이루어지지 않았기 떄문입니다. 실제로 LocalDateTime의 Schema를 보면 String, Integer는 형식이 다릅니다. 이 문제를 LocalDateTime을 String으로 변환하여 해결했습니다.
또한 Kafka Topic에 에러가 발생하면 FAILED 상태로 변해 작동이 멈춥니다. Connector 상세보기 명령어를 통해 자세한 log 내역을 확인할 수 있습니다. 문제를 해결한 다음 재시작하면 문제가 해결될 것 입니다.
양식)
curl -X POST http://<Kafka-Connect-Host>:8083/connectors/<Connector-Name>/tasks/<Task-ID>/restart
예시)
127.0.0.1:8083/connectors/nametest-sink-connect/resume
마무리
멀티 서버에서 쓰기 작업에서 동시성 문제를 해결하기 위해 카프카를 공부했습니다.
순차 처리를 통해 동시성 문제를 해결했지만 공부해야 할 게 많이 남아있다는 것을 깨달았습니다.
열심히 공부하여 새로운 내용으로 다시 돌아오겠습니다!
감사합니다.

개요
이번에는 Kafka Connect을 이용하여 Spring Boot에서 데이터를 보내면 MySQL에 저장하는 로직을 구현해보려 합니다.
멀티 서버 환경에서 Kafka Connect을 이용하여 순차적으로 처리하여 동시성 문제를 해결할 수 있고 비동기적으로 처리하여 성능 향상을 기대할 수 있습니다.
Spring Boot - Producer
Spring Boot에서 MySQL에 DB에 저장하기 위해서는 정해진 패턴대로 메시지를 전송해야 합니다.
정해진 패턴이란 Schema 와 payload로 구성되어 있습니다.
Schema는 Table의 구조를 나타내고 payload는 실제 데이터를 나타냅니다.
스키마를 참고하여 payload의 데이터를 Table에 넣는 식 입니다.
이전 글에서 본 Producer 구조와 동일합니다.
구조는 Schema 안에 Field 리스트가 있고 그 아래에 Payload가 있습니다.

이 형태를 Spring Boot에서 만들어 토픽으로 전송할 겁니다.
저는 테스트용으로 NameTest 라는 Class를 새로 만들었습니다.
NameTest Class
package com.dotd.product.kafkaConnectTest;
import jdk.jfr.Name;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.CreatedDate;
import org.springframework.data.jpa.domain.support.AuditingEntityListener;
import javax.persistence.*;
import java.time.LocalDateTime;
@Entity
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Table(name = "nametest")
@EntityListeners(AuditingEntityListener.class)
public class NameTest {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
private String name;
@CreatedDate
@Column(name = "created_at", nullable = false, columnDefinition="TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
private LocalDateTime createdAt;
}
저는 이 NameTest를 데이터를 Kafka Topic에 전달하여 MySQL에 저장해보려 합니다.
그러기 위해 Scheman를 구성하기 위해 Schema, Field, Payload 파일을 생성합니다.
Schema
import lombok.Builder;
import lombok.Data;
import java.util.List;
@Data
@Builder
public class Schema {
private String type;
private List<Field> fields;
private boolean optional;
private String name;
}
Field
import lombok.AllArgsConstructor;
import lombok.Data;
// 데이터를 저장하기 위해서 어떤 필드가 있는지 저장
@Data
@AllArgsConstructor
public class Field {
private String type;
private boolean optional;
private String field;
}
Payload
import lombok.Builder;
import lombok.Data;
import javax.persistence.criteria.CriteriaBuilder;
import java.time.LocalDateTime;
@Data
@Builder
public class NameTestPayload {
private Integer id;
private String name;
private String created_at;
}
Schema는 Field, Payload로 이루어져 있습니다. 이 3개의 파일을 이용해 데이터 양식을 만듭니다.
Service 단에서 이 내용을 채워서 Kafka Topic에 전달합니다.
NameTestProducer
import com.dotd.product.practice.dto.Field;
import com.dotd.product.practice.dto.KafkaProductDto;
import com.dotd.product.practice.dto.Schema;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
@Service
@Slf4j
@RequiredArgsConstructor
public class NameTestProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
List<Field> fields = Arrays.asList(
new Field("int32", true, "id"),
new Field("string", true, "name"),
new Field("string", true, "created_at")
);
Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("nametest")
.build();
public void send(NameTestDto dto) {
String topic = "nametest";
NameTest nameTest = NameTest.builder()
// .id(dto.getId())
.name(dto.getName())
.createdAt(LocalDateTime.now())
.build();
NameTestPayload nameTestPayload = NameTestPayload.builder()
// .id(nameTest.getId())
.name(nameTest.getName())
.created_at(nameTest.getCreatedAt().toString())
.build();
KafkaNameTestDto kafkaNameTestDto = new KafkaNameTestDto(schema, nameTestPayload);
System.out.println(kafkaNameTestDto);
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(kafkaNameTestDto);
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("카프카 프로듀서 메시지 전송 " + kafkaNameTestDto);
}
}
NameTestProducer는 데이터 양식을 만들고 send 메소드를 통해 Kafka Topic에 데이터를 전달하는 Service 로직 코드 입니다. 이 send 메소드를 실행하는 컨트롤러를 생성합니다.
NameTestController
import com.dotd.product.dto.ProductRegistDto;
import com.dotd.product.dto.ProductResponseDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
@Controller
@RequiredArgsConstructor
@RequestMapping("/nametest")
@Slf4j
public class NameTestController {
private final NameTestProducer nameTestProducer;
@PostMapping("/send")
public ResponseEntity<?> send(@RequestBody NameTestDto dto) {
nameTestProducer.send(dto);
return ResponseEntity.ok("ok");
}
}
여기까지 오시면 Spring Boot를 Producer로서 Kafka Topic에 메세지를 보낼 준비를 마친 겁니다.
이제 Kafka Sink Connector를 등록하여 Topic에 저장된 메세지를 MySQL에 전달하도록 하겠습니다.
Kafka Sink Connector 등록 ( nametest-sink-connect )
Kafka Sink Connector를 등록하기 위해서는 ZooKeeper, Kafka, Kafka Connect 가 먼저 실행되어야 합니다. 이전 글을 참조하시면 도움이 될 겁니다.
Kafka Connect은 RestAPI를 이용하여 Connector를 등록할 수 있습니다.
저는 PostMan으로 이용해 등록하겠습니다.
주소
curl -X POST 127.0.0.1:8083/connectors
데이터
{
"name":"nametest-sink-connect",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3306/dotd",
"connection.user":"root",
"connection.password":"password",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"nametest"
}
}
PostMan 화면

Spring Boot 메세지 전달 결과 확인
Kafka Sink Connector를 등록했으면 Spring Boot를 실행하여 send 메소드를 실행해 데이터를 Kafka Topic에 전달합니다. 전달된 데이터는 등록된 Kafka Sink Connector인 nametest-sink-connector가 작동하여 MySQL에 nametest 테이블에 전달하여 저장 작업이 이루어집니다. MySQL의 테이블은 Kafka Topic 이름과 동일합니다.
PostMan으로 send 메소드 실행

MySQL 데이터 저장

에러 사항
저는 LocalDateTime에서 에러가 참 많이 발생했습니다. 그 이유는 LocalDateTime의 데이터 변환이 이루어지지 않았기 떄문입니다. 실제로 LocalDateTime의 Schema를 보면 String, Integer는 형식이 다릅니다. 이 문제를 LocalDateTime을 String으로 변환하여 해결했습니다.
또한 Kafka Topic에 에러가 발생하면 FAILED 상태로 변해 작동이 멈춥니다. Connector 상세보기 명령어를 통해 자세한 log 내역을 확인할 수 있습니다. 문제를 해결한 다음 재시작하면 문제가 해결될 것 입니다.
양식)
curl -X POST http://<Kafka-Connect-Host>:8083/connectors/<Connector-Name>/tasks/<Task-ID>/restart
예시)
127.0.0.1:8083/connectors/nametest-sink-connect/resume
마무리
멀티 서버에서 쓰기 작업에서 동시성 문제를 해결하기 위해 카프카를 공부했습니다.
순차 처리를 통해 동시성 문제를 해결했지만 공부해야 할 게 많이 남아있다는 것을 깨달았습니다.
열심히 공부하여 새로운 내용으로 다시 돌아오겠습니다!
감사합니다.