🥞 BE
home

Ch1. 카프카 기본 개념과 구조

Date
2025/01/01
Category
Data Engineering
Tag
Apache Kafka
Detail
실전 카프카 Study
목차

0. 카프카란?

메시지 큐

메시지 지향 미들웨어(Message Oriented Middleware: MOM)은 비동기 메시지를 사용하는 다른 응용프로그램 사이의 데이터 송수신을 의미한다. 그리고 이러한 MOM을 구현한 시스템을 메시지 큐(Message Queue: MQ)라 한다.

배경

LinkedIn에서 여러 구직 및 채용 정보들을 한 곳에서 처리할 수 있는 플랫폼으로 개발이 시작되었다.

카프카의 특징

1.
높은 처리량과 낮은 지연시간
대용량 실시간 로그처리에 특화되어 설계된 메시징 시스템 → TPS가 우수하다.
2.
고가용성, 내구성
메시지를 메모리에 저장하는 기존 메시징 시스템과 달리, 메시지를 파일에 저장한다. 때문에 카프카를 재시작해도 메시지 유실 우려가 감소된다.
3.
개발 편의성
기본 메시징 시스템(rabbitMQ, ActiveMQ)에서는 broker가 consumer에게 메시지를 push해주는 방식인데, 카프카는 consumer가 broker에게서 메시지를 직접 가져가는 pull 방식으로 동작하기 때문에 consumer는 자신의 처리 능력만큼의 메시지만 가져와 최적의 성능을 낼 수 있다. 또한 이렇게 분리된 시스템을 통해 필요한 기능만 개발할 수 있다.
→ Q. 엥 보통 디스크보단 메모리가 성능이 우수하지 않나? 근데 왜 카프카에서 성능 관련 얘기가 나오지?
A. 맞음. but, 일반적인 임의 접근(Random Access)와 달리 카프카는 데이터를 디스크에 순차적으로 기록함. 순차 접근은 임의 접근에 비해 수천배 더 빠를 수 있으며, 심지어 메모리에 임의 접근하는 것보다도 빠를 수 있음. 이외에도 데이터를 OS의 페이지에 캐싱해서 활용하기, 메시지를 묶어서 한 번에 배치 처리하기, 제로 카피 등의 다양한 핵심 기능을 통해 성능을 끌어올림.

1. 카프카 구성 요소 및 구조

카프카는 크게 Kafka Cluster, Producer, Consumer로 분리할 수 있다.

클러스터(Cluster)

여러 대의 서버(브로커)로 구성된 카프카 시스템
대량의 데이터를 처리하고, 여러 Consumer, Producer에게 메시지 서비스를 제공
데이터를 여러 브로커에 분산시켜 저장

브로커(Broker)

실행된 카프카 서버를 의미: 브로커 안에 데이터를 담는 토픽이 들어간다고 생각하면 됨
서버 내부에 메시지를 저장하고 처리, 전송하는 역할을 수행
고가용성과 확장성을 위해, 보통 3대 이상의 브로커로 카프카 클러스터를 구성하는 것을 권장
N개의 브로커 중 1대(Leader)는 컨트롤러(Controller) 기능을 수행함
각 브로커에게 담당 파티션을 할당함
브로커가 정상적으로 동작하는지 모니터링
팔로워 브로커는 리더의 데이터를 복제함

토픽(Topic)

데이터가 저장되는 논리적인 카테고리 또는 피드
다양한 종류의 이벤트나 메시지들을 서로 다른 토픽으로 분류 → 카프카에서 메시지를 구분하는 단위
Producer가 데이터를 보내는 대상이며, Consumer가 데이터를 읽는 출처.

파티션(Partition)

토픽의 분산 처리를 위해 사용. → 파티션의 수만큼 Consumer 연결 가능
토픽(Topic) 생성 시 파티션 개수를 지정 가능. (단, 추가만 가능함. 스케일 아웃만.)
파티션 내부에서 데이터는 큐와 같이 끝에서부터 차곡차곡 들어감 → 데이터 일관성 및 순서 보장
파티션 내부에서 각 데이터(메시지)는 offset(고유 번호)로 구분
파티션이 한 개일 때는 모든 데이터에 대해 순서가 보장됨
파티션이 한 개가 아닌 여러 개일 때 데이터 전송 방식 종류 (순서 보장 X)
key 값이 존재하는 경우: key 값의 hash 값을 이용해 할당
key 값이 존재하지 않는 경우: Round-Robin 방식으로 할당
offset이란?
Consumer에서 메시지를 어디까지 읽었는지 저장하는 값.
Consumer Group의 Consumer들은 각각의 파티션에 자신이 가져간 메시지의 위치 정보(offset)을 기록한다. 그러므로, Consumer 장애가 발생한 후 재실행했을 때 마지막으로 읽었던 위치에서부터 다시 읽어들일 수 있다. 일종의 세이브 포인트라고 생각하면 이해가 쉬울 것
Kafka의 순서 보장
Kafka의 파티션에서 메시지 순서 보장은 메시지가 저장된 순서대로 처리된다는 것을 의미한다. 프로듀서가 메시지를 보낸 순서가 파티션 내에서의 처리 순서를 결정한다. 따라서, 메시지가 비순차적으로 보내진 경우, Kafka는 그 순서대로 메시지를 저장하고 처리한다.
→ 따라서, 프로듀서 측에서 메시지 발송 순서를 관리하는 것이 중요하다. 특히 시계열 데이터나 트랜잭션 순서가 중요한 경우, 올바른 순서로 메시지를 보내야 한다.

세그먼트(Segment)

파티션의 데이터를 실제로 저장하는 물리적인 파일. 로그 형식으로 디스크에 저장됨.
세그먼트 기반의 데이터 관리는 Kafka가 불필요한 데이터를 효율적으로 정리하고, 디스크 공간을 최적화 하는 것을 도와줌.
설정에 따라 오래된 세그먼트를 삭제하거나, 로그 중복 제거 기능이 존재

토픽 레플리케이션(Topic Replication)

여러 대의 브로커를 설정하여 동일한 데이터를 복제해서 각각 저장하는 레플레케이션(replication)을 사용할 수 있음
원본 파티션 = 리더(Leader) 파티션, 복제 파티션 = 팔로워(Follower) 파티션이라 함.
리더 파티션과 팔로워 파티션을 합쳐 ISR(In Sync Replica)라고 부름
브로커가 갑자기 사용불가하게 되더라도, 복제된 다른 팔로워 파티션을 통해 기능을 정상적으로 수행 가능
리더에 문제가 발생하면 ISR에 속한 팔로워 중 하나가 새로운 리더로 승격될 수 있음. → 데이터 무손실, 서비스 연속성 보장

프로듀서(Producer)

데이터를 토픽의 특정 파티션에 생성하는 역할을 수행
메시지 전송 시 Batch 처리가 가능함
메시지의 키 값을 설정하여 특정 토픽에 보낼 수 있음
반드시 파티셔너(Partitioner)를 통해 브로커로 데이터 전송
파티셔너: 데이터를 토픽의 어떤 파티션 내에 넣을지 결정하는 역할 수행
Key 값 가진 경우: 레코드는 특정한 해쉬값으로 생성되어 그 값을 기준으로 할당 (특정 파티션)
Key 값 없는 경우: Round-Robin 방식으로 파티션에 할당
키 분산 전략
Kafka에서 키 선택은 클러스터의 전체적인 성능과 안정성에 중대한 영향을 미친다. 균형 잡힌 키 분산은 각 파티션과 브로커의 효율적인 활용을 보장하고, 불균형한 분산은 클러스터의 성능 저하와 안정성 문제를 야기할 수 있다. 따라서, Kafka를 사용할 때는 키 분산 전략을 신중하게 고려해야 한다.
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class PeterProducerCallback implements Callback { //콜백을 사용하기 위해 org.apache.kafka.clients.producer.Callback를 구현하는 클래스가 필요함. private ProducerRecord<String, String> record; public PeterProducerCallback(ProducerRecord<String, String> record) { this.record = record; } @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { e.printStackTrace(); //카프카가 오류를 리턴하면 onCompletion()은 예외를 갖게 되며, 실제 운영환경에서는 추가적인 예외처리가 필요함. } else { System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Received Message: %s\n", metadata.topic(), metadata.partition() , metadata.offset(), record.key(), record.value()); } } }
Java
복사
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerAsync { public static void main(String[] args) { Properties props = new Properties(); //Properties 오브젝트를 시작합니다. props.put("bootstrap.servers", "peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka03.foo.bar:9092"); //브로커 리스트를 정의. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //메시지 키와 벨류에 문자열을 지정하므로 내장된 StringSerializer를 지정함. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); //Properties 오브젝트를 전달해 새 프로듀서를 생성. try { for (int i = 0; i < 3; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("peter-basic01", "Apache Kafka is a distributed streaming platform - " + i); //ProducerRecord 오브젝트를 생성. producer.send(record, new PeterProducerCallback(record)); //프로듀서에서 레코드를 보낼 때 콜백 오브젝트를 같이 보냄. } } catch (Exception e){ e.printStackTrace(); } finally { producer.close(); // 프로듀서 종료 } } }
Java
복사
콜백 객체를 추가하여, 비동기로 프로듀서를 구현한 코드.

컨슈머(Consumer)

토픽에서 데이터를 가져오는 역할 수행 → 폴링(polling)이라고 함
한 개의 Consumer는 여러 개의 Topic을 처리할 수 있음.
토픽에서 데이터를 가져가더라도 데이터를 삭제하지 않음. (삭제는 Kafka delete policy에 의해 진행) → 한 번 저장된 데이터를 여러번 소비하는 것이 가능
Consumer는 여러 개 만들 수 있으며, 컨슈머 그룹(Consumer Group)으로 묶어 관리할 수 있음
컨슈머 그룹을 사용하는 이유 어떤 Consumer에서 오류가 발생한다면, 파티션 재조정(리밸런싱)을 통해 다른 Consumer가 해당 파티션의 sub을 맡아서 한다. 오프셋 정보를 그룹 간에 공유하고 있기 때문에 오류가 발생하기 전 마지막으로 읽었던 데이터 위치부터 시작한다.
그룹에 속하는 컨슈머가 여러 개이면 로드밸런싱을 통해 데이터 분배 가능
컨슈머 그룹들끼리는 서로 영향을 미치지 않음 (토픽 안에 있는 데이터 변경 X)
관련 추가 개념
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class ConsumerAuto { public static void main(String[] args) { Properties props = new Properties(); //Properties 오브젝트를 시작. props.put("bootstrap.servers", "peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka03.foo.bar:9092"); //브로커 리스트를 정의. props.put("group.id", "peter-consumer01"); //컨슈머 그룹 아이디 정의. props.put("enable.auto.commit", "true"); //오토 커밋을 사용. props.put("auto.offset.reset", "latest"); //컨슈머 오프셋을 찾지 못하는 경우 latest로 초기화 합니다. 가장 최근부터 메시지를 가져옴. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //문자열을 사용했으므로 StringDeserializer 지정. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //Properties 오브젝트를 전달하여 새 컨슈머를 생성. consumer.subscribe(Arrays.asList("peter-basic01")); //구독할 토픽을 지정. try { while (true) { //무한 루프 시작. 메시지를 가져오기 위해 카프카에 지속적으로 poll()을 함. ConsumerRecords<String, String> records = consumer.poll(1000); //컨슈머는 폴링하는 것을 계속 유지하며, 타임 아웃 주기를 설정.해당 시간만큼 블럭. for (ConsumerRecord<String, String> record : records) { //poll()은 레코드 전체를 리턴하고, 하나의 메시지만 가져오는 것이 아니므로, 반복문 처리. System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } } catch (Exception e){ e.printStackTrace(); } finally { consumer.close(); //컨슈머를 종료. } } }
Java
복사
가장 일반적으로 활용되는 오토 커밋을 구현한 코드. (enable.auto.commit)

주키퍼(Zookeeper)

지노드(znode)를 이용해 카프카의 메타 정보를 기록하며, 주키퍼는 이러한 지노드를 활용해 브로커의 노드 관리, 토픽 관리, 컨트롤러 관리 등 중요한 역할을 하고 있다.

Reference