개요
이번에는 Kafka Source Connect와 Kafka Sink Connect를 등록해보려 합니다.
Kafka Source Connect는 MySQL에 변경 사항을 감지하여 Kafka Topic에 데이터를 보내는 역할을 합니다.
Kafka Sink Connect는 Topic에 저장된 메세지를 MySQL에 보내 저장하는 역할을 합니다.
이를 위해 Kafka Connect와 MySQL에 접근하기 위해 JDBC Connector와 MySQL Connector 설정되어야 합니다. 이는 전글을 참조하면 좋을 거 같습니다.
https://cladren123.tistory.com/205
Kafka Connect 설치 & 실행 & MySQL 설정
개요 이번에는 Windows에 Kafka Connect 설치해보려 합니다. Kafka Connect는 Apache Kafka 일부로 제공되는 프레임워크로 스트림 데이터를 외부 시스템과 연결하는데 사용합니다. Kafka Connect는 Kafka Connect Sourc
cladren123.tistory.com
Kafka Source Connector
Kafka Source Connect는 외부 시스템과 Kafka 사이의 스트림 데이터를 통합하기 위한 프레임워크 입니다. 즉, 외부 시스템에서 데이터를 읽어와 Kafka 토픽에 데이터를 전송하는 역할을 합니다.
Kafka Source Connector 등록
이번에는 MySQL에 테이블에 데이터 변동이 일어나면 Kafka Topic에 해당 데이터를 전달하는 Kafka Source Connector를 등록해 보겠습니다. 등록을 위해서 ZooKeeper, Kafka, Kafka Connect를 먼저 실행시킵니다.
Kafka Connect는 Rest API 통신을 지원하며 Rest API 통신을 통해 Kafka Source Connector를 등록할 수 있습니다.
Kafka Connect의 기본 포트 번호는 8083 입니다.
보낼 주소
127.0.0.1:8083/connectors
Post로 보냅니다.
보낼 데이터
{
"name" : "my-source-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://localhost:3306/dotd",
"connection.user":"root",
"connection.password":"password",
"mode": "incrementing",
"incrementing.column.name" : "id",
"table.whitelist":"products",
"topic.prefix" : "my_topic_",
"tasks.max" : "1"
}
}
각 명령어의 의미
"name" : Kafka Source Connector의 등록되는 이름입니다.
"config" : 커넥터의 구성을 정의하는 항목입니다.
"connector.class" : 사용할 커넥터의 클래스 이름입니다. 사전에 준비한 JDBC Connector를 사용합니다.
"connection.url" : JDBC URL을 통해 데이터베이스에 연결합니다. 저는 dotd 데이터베이스에 연결했습니다.
"connection.user" : DB 사용자 이름입니다.
"connection.password" : DB 비밀번호 입니다.
"mode" : 데이터를 가져오는 모드입니다. incrementing 은 지정된 증가 컬럼을 기반으로 새로운 레코드를 가져옵니다.
"incrementing.column.name" : incrementing 모드에서 사용할 컬럼 이름 입니다. 저는 id 컬럼을 사용하도록 설정했습니다. 이 컬럼 값의 이전의 마지막으로 가져온 값보다 큰 레코드만 선택한다는 뜻 입니다. id 칼럼은 자동 증가가 걸려있는 products 테이블의 primary key 입니다.
"table.whitelist" : 가져올 테이블 목록 입니다. 즉, 변경 사항을 감지할 테이블 입니다. 저는 products 테이블로 설정했습니다.
"topic.prefix" : 생성되는 Kafka 토픽의 접두사 입니다. table.whitelist에서 설정한 값과 합쳐집니다. 여기서는 my_topic_products 라는 이름으로 토픽이 생깁니다.
"task.max" : 커넥터가 생성할 최대 작업의 수 입니다. 각 작업은 독립적으로 데이터를 가져와 Kafka로 전송합니다.
이 Kafka Source Connect는 products 테이블의 데이터의 변경을 감지하여 해당 데이터를 my_topic_products 라는 Kafka Topic으로 전송하는 역할을 합니다. id 컬럼의 값이 증가하는 순서대로 레코드를 선택하여 전송합니다.
저는 PostMan을 이용하여 Kafks Source Connector를 등록했습니다.
위와 같이 응답이 오면 잘 등록된 겁니다.
Kafka Source Connector 확인
Kafka Source Connector는 토픽 목록 확인을 통해 확인할 수 있습니다.
RestAPI 통신을 통해 등록된 Connector들의 목록으로 확인할 수 있습니다.
Connector 등록 확인
GET 방식
127.0.0.1:8083/connectors
등록된 Connector들을 상세히 확인할 수 있습니다.
GET 방식
양식)
127.0.0.1:8083/connectors/커넥터 이름/status
예시)
127.0.0.1:8083/connectors/my-source-connect/status
그 외에도 RestAPI 통신을 통해 다양한 기능을 수행할 수 있습니다.
Connector 중지
PUT
양식)
127.0.0.1:8083/connectors/커넥터 이름/pause
예시)
127.0.0.1:8083/connectors/product-sink-connect/pause
Connector 재시작
PUT
양식)
127.0.0.1:8083/connectors/커넥터 이름/resume
예시)
127.0.0.1:8083/connectors/product-sink-connect/resume
Connector 삭제
DELETE
양식)
127.0.0.1:8083/connectors/커넥터 이름
예시)
127.0.0.1:8083/connectors/nametest-sink-connect
Kafka Sink Connector
Kafka Sink Connector는 Topic에 데이터를 읽어 외부 시스템으로 데이터를 전송하는 역할을 합니다.
이번에는 Kafka Topic에 데이터를 MySQL에 저장하는 Sink를 등록해보려 합니다.
Kafka Sink Connector 등록
Kafka Source Connector를 등록할 때와 마찬가지로 RestAPI 통신을 통해 등록합니다.
보낼 주소
POST
127.0.0.1:8083/connectors
보낼 데이터
{
"name":"my-sink-connect",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3306/dotd",
"connection.user":"root",
"connection.password":"password",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"my_topic_products"
}
}
각 명령어 의미
Kafka Source Connector 와 겹치는 내용은 제외했습니다.
"auto.create" : true로 설정되면 해당 DB에 테이블이 존재하지 않을 경우 자동으로 해당 테이블을 생성합니다.
"auto.evolve" : true로 설정되며 Kafka의 스키마와 DB의 테이블 스키마가 일치하지 않으면 DB의 테이블 스키마를 Kafka 스키마로 자동 업데이트합니다.
"delete.enabled" : true로 설정되면 Kafka Topic에 삭제된 레코드에 해당하는 DB의 레코드도 삭제됩니다.
"topics" : 커넥터가 데이터를 읽을 Kafka의 토픽입니다.
PostMan을 통해 Kafka Sink Connector 등록
Sink Connector는 Source Connector와 마찬가지로 확인, 상세확인, 정지, 재시작, 삭제 등의 작업을 수행할 수 있습니다.
Sink Connector가 생성되면 DB에 Topic의 이름과 같은 테이블이 생성됩니다.
이제 products에 데이터를 삽입하면 my_topic_products에 데이터가 삽입됩니다.
Products에 데이터를 삽입하자 Kafka Source Connector에 의해 my_topic_products라는 토픽에 데이터가 전송되어 저장되었습니다. my_topic_products에 저장된 메세지는 Kafka Sink Connector에 의해 MySQL에 my_topic_products 라는 테이블에 데이터가 저장됩니다. 이 과정을 확인해 보았습니다.
Kafka Source Connector의 스키마 확인
my_topic_products 토픽에 Consumer를 등록하면 어떤 스키마로 데이터를 전달하는지 확인할 수 있습니다.
JSON 포맷 파일을 통해 정리하면 다음과 같습니다.
{
"schema":{
"type":"struct",
"fields":[
{"type":"int32","optional":false,"field":"id"},
{"type":"int32","optional":true,"field":"category_id"},
{"type":"string","optional":true,"field":"code"},
{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"created_at"},
{"type":"string","optional":true,"field":"description"},
{"type":"int32","optional":true,"field":"like_count"},
{"type":"string","optional":true,"field":"name"},
{"type":"int32","optional":true,"field":"order_count"},
{"type":"int32","optional":true,"field":"price"},
{"type":"int32","optional":true,"field":"review_count"},
{"type":"string","optional":true,"field":"seller_id"},
{"type":"int32","optional":true,"field":"view_count"}
],
"optional":false,
"name":"products"
},
"payload":{
"id":4,
"category_id":1,
"code":"cd01",
"created_at":1694692406966,
"description":"설명",
"like_count":0,
"name":"기계식 키보드",
"order_count":0,
"price":1000,
"review_count":0,
"seller_id":"브랜드",
"view_count":0
}
}
schema로 데이터 형식을 정의하고 있습니다.
fields 배열을 통해 컬럼을 정의하고 있습니다. payload 배열에서는 field에 해당하는 데이터가 입력된 것을 확인할 수 있습니다.
마무리
Spring Boot에서 Kafka Topic에 데이터를 전달할 때도 위의 스키마와 같은 형식으로 데이터를 전송합니다.
이를 통해 멀티 서버에서 한 DB를 사용할 때 Kafka를 이용해 순차적으로 쓰기 작업을 처리하여 동시성 문제를 해결할 수 있습니다. 또한 Kafka는 비동기적으로 처리하므로 성능 향상을 기대할 수도 있습니다.
다음에는 Spring Boot와 연결하여 DB에 값을 넣어보도록 하겠습니다.
감사합니다.