목차
카프카 커넥트(Kafka Connect)는 데이터베이스 같은 외부 시스템 간의 연결을 간소화하고 자동화하기 위한 프레임워크이다.
사용자들은 카프카 커넥트 프레임워크를 이용해 대용량의 데이터를 카프카의 안팎으로 손쉽게 이동시킬 수 있고, 코드를 작성하지 않고도 사용할 수 있다.
물론 프로듀서와 컨슈머를 직접 개발해서 원하는 동작을 실행하고 처리할 수 있지만, 그러한 리소스나 비용을 줄이고 카프카 커넥트를 활용해 좀 더 효율적이고 빠르게 클라이언트를 구성하고 적용할 수 있다.
1. 카프카 커넥트의 핵심 개념
1.1. 주요 특징
•
표준화된 데이터 파이프라인
Kafka Connect는 데이터를 읽어오는(source connector) 역할과 데이터를 내보내는(sink connector) 역할을 수행하는 커넥터들을 표준화된 인터페이스로 제공한다. 이를 통해 여러 시스템과의 데이터 연동 작업을 일관성 있게 관리할 수 있다.
•
유연성과 확장성
분산(distributed) 모드와 단일(standalone) 모드를 모두 지원하여, 소규모 실험부터 대규모 프로덕션 환경까지 유연하게 적용할 수 있다. 분산 모드에서는 커넥트 작업자들이 클러스터 내에서 협력하여 작업을 분산 처리하며, 장애 발생 시 자동으로 복구되는 고가용성이 보장된다.
•
설정 기반 운영
복잡한 데이터 연동 작업을 코드 작성 없이 설정 파일이나 REST API를 통해 쉽게 구성하고 관리할 수 있다. 이를 통해 개발자는 비즈니스 로직에 집중할 수 있다.
1.2. 주요 구성 요소
카프카 커넥트는 위 그림과 같이 카프카 클러스터를 먼저 구성한 후, 카프카 클러스터의 양쪽 옆에 배치할 수 있다.
카프카를 기준으로 들어오고 나가는 양방향에 커넥트가 존재하게 되는데, 동일한 두 커넥트를 서로 구분하기 위해 소스(source) 방향에 있는 커넥트를 소스 커넥트(Source Connect), 나가는 방향에 있는 커넥트를 싱크 커넥트 (Sink Connect) 라고 한다.
•
소스 커넥트 = 프로듀서
•
싱크 커넥트 = 컨슈머
위 그림은 3대의 워커(인스턴스)를 실행한 분산 모드 소스 커넥트를 나타낸 것이다. 만약 단독 모드로 실행했다면, 위 그림에서는 단 하나의 워커 1만 동작할 것이다.
•
워커(Worker)
워커는 카프카 커넥트 프로세스가 실행되는 서버 또는 인스턴스 등을 의미하며, 커넥터나 태스크들이 워커에서 실행된다.
분산 모드는 특정 워커에 장애가 발생하더라도, 해당 워커에서 동작 중인 커넥터나 태스크들이 다른 위치로 이동해 연속해서 동작할 수 있다는 장점이 있지만, 단독 모드는 그렇지 않다.
•
커넥터(Connector)
직접 데이터를 복사하지 않고, 특정 데이터 소스나 싱크에 연결하여 데이터를 읽거나 쓸 수 있도록 해주는 플러그인 역할을 한다. 예시로, DB에 변경 사항을 캡쳐하거나, 로그 파일의 데이터를 Kafka로 전송하는 커넥터가 있다. 앞서 말한 커넥트와 동일하게, 소스에서 카프카로 전송하는 소스 커넥터와 저장소로 싱크하는 싱크 커넥터가 있다.
•
태스크(Task)
커넥터가 수행하는 작업을 실제로 실행하는 단위로, 데이터를 가져오거나 내보내는 병렬 처리 작업을 담당한다. 하나의 커넥터는 여러 태스크로 구성될 수 있으며, 이를 통해 데이터 처리량을 확장할 수 있다. 태스크 역시 소스 태스크와 싱크 태스크로 나뉜다.
•
컨버터(Converter)
컨버터는 소스에서 카프카로 전송할 때의 직렬화(Serialization)와 카프카에서 싱크로 전송할 때의 역직렬화(Deserialization)를 담당한다. 소스에서 다양한 데이터 포맷의 데이터는 컨버터를 거쳐 직렬화 후 카프카의 토픽으로 저장된다. 카프카의 메시지 형태는 키(옵션)과 밸류 형태로 이뤄지므로, 컨버터도 키 컨버터와 밸류 컨버터로 나뉜다. 이러한 컨버터를 통해 카프카는 데이터 포맷을 표준화된 상태로 처리할 수 있기에, 불필요한 컨버팅 코드를 작성할 필요가 없다.
1.3. 운영 모드
•
Standalone 모드
단일 프로세스 내에서 실행되며, 소규모 테스트나 개발 환경에 적합하다. 설정과 배포가 간단하지만, 확장성과 내결함성 측면에서는 제한적.
•
Distributed 모드
여러 작업자(worker)가 분산된 환경에서 협력하여 실행된다. 프로덕션 환경에서 높은 확장성과 장애 복구 능력을 요구할 때 사용된다.
2. 카프카 커넥트의 내부 동작
JDBC 커넥터를 이용하는 경우 위 그림과 같이 JDBC 커넥터에서 요구하는 몇 가지 값들(클래스, 커넥션 정보, 테이블등)만 추가하고 REST API를 호출하면, 즉시 JDBC 커넥터가 생성된다.
다음으로 데이터 베이스에서 발생하는 데이터가 카프카의 토픽으로 전송되게 된다.
이렇게 사용자는 한 줄의 코드 작성이 없이, REST API 호출만으로 데이터베이스에서 발생하는 데이터를 카프카로 전송할 수 있다.
바로 이렇게 간단하게 파이프라인을 생성할 수 있다는 부분이 카프카 커넥트의 장점이다.
다음으로 두 번째 단계인 카프카에서 HDFS로 전송하는 과정을 살펴보자.
앞서 첫 번째 단계와 마찬가지로 카프카 → HDFS로의 데이터 전송작업 역시 카프카 커넥터를 사용하며, 이번에는 HDFS 커넥터를 이용한다.
HDFS 커넥터에서 요구하는 몇 가지 값들만 추가한 후 REST API를 호출하면 즉시 HDFS 커넥터가 생성되고, 카프카에 저장된 데이터를 HDFS로 전송하게 된다.
앞선 JDBC 커넥터와 마찬가지로 코드 작성 없이 또 하나의 파이프라인이 생성되었다.
전체적인 동작 내용을 정리하면 다음 그림과 같다.
데이터의 흐름 순서대로 살펴보면, 데이터베이스(소스 시스템) -> JDBC 소스 커넥터(카프카 커넥트) -> 카프카 -> HDFS 싱크 커넥터(카프카 커넥트) -> HDFS(싱크 시스템) 순으로 데이터가 처리된다.
또한 별도의 코드 작성 없이, 카프카 커넥트의 JDBC 커넥터와 HDFS 커넥터만 이용하여 전체적인 데이터 파이프라인을 구성할 수 있었다.
3. 오프셋 관리
싱크 커넥터와 오프셋
싱크 커넥터는 컨슈머와 유사한 기능을 하며, 카프카의 토픽에서 데이터를 읽은 뒤 싱크 시스템으로 전송하는 역할을 한다. 컨슈머는 오프셋을 메시지를 어디까지 읽어왔는지를 알 수 있는 중요한 기준으로 활용하고 있는데,
현재 버전의 카프카에서는 카프카의 내부 토픽인 __consumer_offset 토픽에 저장하고 있다.
컨슈머의 경우 해당 오프셋에 저장된 내용을 바탕으로 위치를 조정할 수 있다. 카프카 커넥트의 경우는 총 3개의 내부 토픽을 사용하게 되는데, 각각의 토픽과 용도는 다음과 같다.
•
config.storage.topic: 커넥트의 구성과 관련된 정보 저장
•
offset.storage.topic: 커넥트의 오프셋 정보 저장
•
status.storage.topic: 커넥트의 상태 정보 저장
만약 테스트나 개발 목적으로 카프카 커넥트를 단독 모드로 구성하는 경우, 오프셋 정보 등은 로컬에 위치하는 별도의 파일에 저장되며, 카프카 커넥트를 분산 모드로 구성하는 경우에만 커넥트에서 내부적으로 자체 관리하는 토픽에 저장합니다.
그 외에도 외부의 별도 토픽으로 오프셋을 관리하거나 개발 환경에서 반복적으로 테스트할 때, 커넥터 이름을 변경하지 않고도 오프셋을 초기화 하거나 오류 처리 기능으로 해결할 수 없는 문제를 발생시키는 레코드를 건너뛰는 경우 관리자가 조치하기가 다소 까다로웠다.
하지만, 카프카 3.5 버전부터는 오프셋을 관리할 수 있는 REST API 기능이 추가되어, 관리자 또는 개발자 입장에서 매우 유연하게 대응할 수 있게 되었다.
•
GET /connectors/{connector}/offsets
•
PUT /connectors/{connector}/stop
먼저 카프카 커넥트의 상태를 확인한다.
$ curl http://kcluster01.foo.bar:8083 | jq
{
"version": "3.6.1",
"commit": "5e3c2b738d253ff5",
"kafka_cluster_id": "QrqgwH_BR8-md0QbhKrxpQ"
}
PowerShell
복사
카프카 커넥트는 3.6.1이며, 카프카의 아이디가 표시되고 있다. FileStreamSinkConnector를 실행하여 sqlite-sample-Models 토픽의 레코드를 sink.txt로 저장한다.
$ curl -s -X POST \ -H "Content-Type: application/json" \ --data '{ "name": "File-Sink-Connector", "config": { "topics": "sqlite-sample-Models", "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "file": "sink.txt" } }' http://kcluster01.foo.bar:8083/connectors
PowerShell
복사
FileStreamSinkConnector 커넥터가 정상적으로 실행되었는지 상태를 확인.
$ curl -s http://kcluster01.foo.bar:8083/connectors/File-Sink-Connector/status | jq
{
"name": "File-Sink-Connector",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.0.4:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "192.168.0.4:8083"
}
],
"type": "sink"
}
PowerShell
복사
sqlite-sample-Models 토픽의 레코드가 모두 sink.txt 파일에 저장되었는지 파일의 내용을 확인.
$ cat sink.txt
Struct{model_id=1,model_name=The Blonde,model_base_price=23000,brand_id=1}
Struct{model_id=2,model_name=The Brunette,model_base_price=25000,brand_id=1}
Struct{model_id=3,model_name=The Red Head,model_base_price=29000,brand_id=1}
Struct{model_id=4,model_name=Hat,model_base_price=22000,brand_id=2}
Struct{model_id=5,model_name=Sweater,model_base_price=25000,brand_id=2}
Struct{model_id=6,model_name=T-Shirt,model_base_price=27000,brand_id=2}
Struct{model_id=7,model_name=Orange,model_base_price=15000,brand_id=3}
Struct{model_id=8,model_name=Blue,model_base_price=12000,brand_id=3}
Struct{model_id=9,model_name=Green,model_base_price=17000,brand_id=3}
Struct{model_id=10,model_name=LaFerrari,model_base_price=125000,brand_id=4}
Struct{model_id=11,model_name=450,model_base_price=75000,brand_id=4}
Struct{model_id=12,model_name=F12 Berlinetta,model_base_price=110000,brand_id=4}
Struct{model_id=13,model_name=F40,model_base_price=100000,brand_id=4}
Struct{model_id=14,model_name=Extra,model_base_price=30000,brand_id=5}
Struct{model_id=15,model_name=Too Much,model_base_price=35000,brand_id=5}
Struct{model_id=16,model_name=Beats,model_base_price=24000,brand_id=6}
Struct{model_id=17,model_name=Bars,model_base_price=35000,brand_id=6}
PowerShell
복사
FileStreamSinkConnector 커넥터의 오프셋을 확인.
$ curl -s -X GET http://kcluster01.foo.bar:8083/connectors/File-Sink-Connector/offsets | jq
{
"offsets": [
{
"partition": {
"kafka_partition": 0,
"kafka_topic": "sqlite-sample-Models"
},
"offset": {
"kafka_offset": 17
}
}
]
}
PowerShell
복사
현재 오프셋은 17로 설정된 것을 알 수 있다. 그럼 오류가 생겼다고 가정하고, 오프셋을 10으로 변경해보자. 오프셋을 변경하기 전에는 커넥터를 중지하고 오프셋을 변경한다.
$ curl -s -X PUT http://kcluster01.foo.bar:8083/connectors/File-Sink-Connector/stop
$ curl -s -X PATCH \
-H "Content-Type: application/json" \
--data '{
"offsets": [
{
"partition": {
"kafka_topic": "sqlite-sample-Models",
"kafka_partition": 0
},
"offset": {
"kafka_offset": 10
}
}
]
}' http://kcluster01.foo.bar:8083/connectors/File-Sink-Connector/offsets
{"message":"The offsets for this connector have been altered successfully"}
PowerShell
복사
응답 메시지를 통해 오프셋이 정상적으로 변경된 것을 확인할 수 있다.
다시 한번 FileStreamSinkConnector 커넥터의 변경된 오프셋을 확인.
$ curl -s -X GET http://kcluster01.foo.bar:8083/connectors/File-Sink-Connector/offsets | jq
{
"offsets": [
{
"partition": {
"kafka_partition": 0,
"kafka_topic": "sqlite-sample-Models"
},
"offset": {
"kafka_offset": 10
}
}
]
}
PowerShell
복사
오프셋이 10으로 변경된 것을 확인했다. 이제 FileStreamSinkConnector 커넥터를 재시작한다.
$ curl -s -X PUT http://kcluster01.foo.bar:8083/connectors/File-Sink-Connector/resume
PowerShell
복사
마지막으로 파일의 내용을 확인한다.
$ cat sink.txt
Struct{model_id=1,model_name=The Blonde,model_base_price=23000,brand_id=1}
Struct{model_id=2,model_name=The Brunette,model_base_price=25000,brand_id=1}
Struct{model_id=3,model_name=The Red Head,model_base_price=29000,brand_id=1}
Struct{model_id=4,model_name=Hat,model_base_price=22000,brand_id=2}
Struct{model_id=5,model_name=Sweater,model_base_price=25000,brand_id=2}
Struct{model_id=6,model_name=T-Shirt,model_base_price=27000,brand_id=2}
Struct{model_id=7,model_name=Orange,model_base_price=15000,brand_id=3}
Struct{model_id=8,model_name=Blue,model_base_price=12000,brand_id=3}
Struct{model_id=9,model_name=Green,model_base_price=17000,brand_id=3}
Struct{model_id=10,model_name=LaFerrari,model_base_price=125000,brand_id=4}
Struct{model_id=11,model_name=450,model_base_price=75000,brand_id=4}
Struct{model_id=12,model_name=F12 Berlinetta,model_base_price=110000,brand_id=4}
Struct{model_id=13,model_name=F40,model_base_price=100000,brand_id=4}
Struct{model_id=14,model_name=Extra,model_base_price=30000,brand_id=5}
Struct{model_id=15,model_name=Too Much,model_base_price=35000,brand_id=5}
Struct{model_id=16,model_name=Beats,model_base_price=24000,brand_id=6}
Struct{model_id=17,model_name=Bars,model_base_price=35000,brand_id=6}
Struct{model_id=11,model_name=450,model_base_price=75000,brand_id=4}
Struct{model_id=12,model_name=F12 Berlinetta,model_base_price=110000,brand_id=4}
Struct{model_id=13,model_name=F40,model_base_price=100000,brand_id=4}
Struct{model_id=14,model_name=Extra,model_base_price=30000,brand_id=5}
Struct{model_id=15,model_name=Too Much,model_base_price=35000,brand_id=5}
Struct{model_id=16,model_name=Beats,model_base_price=24000,brand_id=6}
Struct{model_id=17,model_name=Bars,model_base_price=35000,brand_id=6}
PowerShell
복사
파일의 내용을 확인해보면, model_id=17 이후에, model_id=11부터 다시 추가된 것을 확인할 수 있다.
카프카 커넥트의 오프셋 관리 기능을 활용하여, 손상된 데이터를 다시 처리하거나 특정 상황에서 원하는 오프셋으로 재조정할 수 있음을 확인해보았다.
특히 REST API를 통해 손쉽게 커넥터의 상태를 점검하고, 오프셋을 관리할 수 있어 관리자 입장에서 유연하게 대응할 수 있다.