개요
이번에는 Kafka Connect을 이용하여 Spring Boot에서 데이터를 보내면 MySQL에 저장하는 로직을 구현해보려 합니다.
멀티 서버 환경에서 Kafka Connect을 이용하여 순차적으로 처리하여 동시성 문제를 해결할 수 있고 비동기적으로 처리하여 성능 향상을 기대할 수 있습니다.
Spring Boot - Producer
Spring Boot에서 MySQL에 DB에 저장하기 위해서는 정해진 패턴대로 메시지를 전송해야 합니다.
정해진 패턴이란 Schema 와 payload로 구성되어 있습니다.
Schema는 Table의 구조를 나타내고 payload는 실제 데이터를 나타냅니다.
스키마를 참고하여 payload의 데이터를 Table에 넣는 식 입니다.
이전 글에서 본 Producer 구조와 동일합니다.
구조는 Schema 안에 Field 리스트가 있고 그 아래에 Payload가 있습니다.
이 형태를 Spring Boot에서 만들어 토픽으로 전송할 겁니다.
저는 테스트용으로 NameTest 라는 Class를 새로 만들었습니다.
NameTest Class
package com.dotd.product.kafkaConnectTest;
import jdk.jfr.Name;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.CreatedDate;
import org.springframework.data.jpa.domain.support.AuditingEntityListener;
import javax.persistence.*;
import java.time.LocalDateTime;
@Entity
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Table(name = "nametest")
@EntityListeners(AuditingEntityListener.class)
public class NameTest {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
private String name;
@CreatedDate
@Column(name = "created_at", nullable = false, columnDefinition="TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
private LocalDateTime createdAt;
}
저는 이 NameTest를 데이터를 Kafka Topic에 전달하여 MySQL에 저장해보려 합니다.
그러기 위해 Scheman를 구성하기 위해 Schema, Field, Payload 파일을 생성합니다.
Schema
import lombok.Builder;
import lombok.Data;
import java.util.List;
@Data
@Builder
public class Schema {
private String type;
private List<Field> fields;
private boolean optional;
private String name;
}
Field
import lombok.AllArgsConstructor;
import lombok.Data;
// 데이터를 저장하기 위해서 어떤 필드가 있는지 저장
@Data
@AllArgsConstructor
public class Field {
private String type;
private boolean optional;
private String field;
}
Payload
import lombok.Builder;
import lombok.Data;
import javax.persistence.criteria.CriteriaBuilder;
import java.time.LocalDateTime;
@Data
@Builder
public class NameTestPayload {
private Integer id;
private String name;
private String created_at;
}
Schema는 Field, Payload로 이루어져 있습니다. 이 3개의 파일을 이용해 데이터 양식을 만듭니다.
Service 단에서 이 내용을 채워서 Kafka Topic에 전달합니다.
NameTestProducer
import com.dotd.product.practice.dto.Field;
import com.dotd.product.practice.dto.KafkaProductDto;
import com.dotd.product.practice.dto.Schema;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
@Service
@Slf4j
@RequiredArgsConstructor
public class NameTestProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
List<Field> fields = Arrays.asList(
new Field("int32", true, "id"),
new Field("string", true, "name"),
new Field("string", true, "created_at")
);
Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("nametest")
.build();
public void send(NameTestDto dto) {
String topic = "nametest";
NameTest nameTest = NameTest.builder()
// .id(dto.getId())
.name(dto.getName())
.createdAt(LocalDateTime.now())
.build();
NameTestPayload nameTestPayload = NameTestPayload.builder()
// .id(nameTest.getId())
.name(nameTest.getName())
.created_at(nameTest.getCreatedAt().toString())
.build();
KafkaNameTestDto kafkaNameTestDto = new KafkaNameTestDto(schema, nameTestPayload);
System.out.println(kafkaNameTestDto);
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(kafkaNameTestDto);
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("카프카 프로듀서 메시지 전송 " + kafkaNameTestDto);
}
}
NameTestProducer는 데이터 양식을 만들고 send 메소드를 통해 Kafka Topic에 데이터를 전달하는 Service 로직 코드 입니다. 이 send 메소드를 실행하는 컨트롤러를 생성합니다.
NameTestController
import com.dotd.product.dto.ProductRegistDto;
import com.dotd.product.dto.ProductResponseDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
@Controller
@RequiredArgsConstructor
@RequestMapping("/nametest")
@Slf4j
public class NameTestController {
private final NameTestProducer nameTestProducer;
@PostMapping("/send")
public ResponseEntity<?> send(@RequestBody NameTestDto dto) {
nameTestProducer.send(dto);
return ResponseEntity.ok("ok");
}
}
여기까지 오시면 Spring Boot를 Producer로서 Kafka Topic에 메세지를 보낼 준비를 마친 겁니다.
이제 Kafka Sink Connector를 등록하여 Topic에 저장된 메세지를 MySQL에 전달하도록 하겠습니다.
Kafka Sink Connector 등록 ( nametest-sink-connect )
Kafka Sink Connector를 등록하기 위해서는 ZooKeeper, Kafka, Kafka Connect 가 먼저 실행되어야 합니다. 이전 글을 참조하시면 도움이 될 겁니다.
Kafka Connect은 RestAPI를 이용하여 Connector를 등록할 수 있습니다.
저는 PostMan으로 이용해 등록하겠습니다.
주소
curl -X POST 127.0.0.1:8083/connectors
데이터
{
"name":"nametest-sink-connect",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3306/dotd",
"connection.user":"root",
"connection.password":"password",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"nametest"
}
}
PostMan 화면
Spring Boot 메세지 전달 결과 확인
Kafka Sink Connector를 등록했으면 Spring Boot를 실행하여 send 메소드를 실행해 데이터를 Kafka Topic에 전달합니다. 전달된 데이터는 등록된 Kafka Sink Connector인 nametest-sink-connector가 작동하여 MySQL에 nametest 테이블에 전달하여 저장 작업이 이루어집니다. MySQL의 테이블은 Kafka Topic 이름과 동일합니다.
PostMan으로 send 메소드 실행
MySQL 데이터 저장
에러 사항
저는 LocalDateTime에서 에러가 참 많이 발생했습니다. 그 이유는 LocalDateTime의 데이터 변환이 이루어지지 않았기 떄문입니다. 실제로 LocalDateTime의 Schema를 보면 String, Integer는 형식이 다릅니다. 이 문제를 LocalDateTime을 String으로 변환하여 해결했습니다.
또한 Kafka Topic에 에러가 발생하면 FAILED 상태로 변해 작동이 멈춥니다. Connector 상세보기 명령어를 통해 자세한 log 내역을 확인할 수 있습니다. 문제를 해결한 다음 재시작하면 문제가 해결될 것 입니다.
양식)
curl -X POST http://<Kafka-Connect-Host>:8083/connectors/<Connector-Name>/tasks/<Task-ID>/restart
예시)
127.0.0.1:8083/connectors/nametest-sink-connect/resume
마무리
멀티 서버에서 쓰기 작업에서 동시성 문제를 해결하기 위해 카프카를 공부했습니다.
순차 처리를 통해 동시성 문제를 해결했지만 공부해야 할 게 많이 남아있다는 것을 깨달았습니다.
열심히 공부하여 새로운 내용으로 다시 돌아오겠습니다!
감사합니다.