Kafka
카프카 설정
쫑글이
2024. 9. 26. 10:39
일반적으론 YAML 파일 대신 @Configuration 클래스로 설정하지만 학습하는 단계이므로 간단하게 설정
kafka:
bootstrap-servers: localhost:9092 # 클러스터에 있는 브로커 서버 주소 (초기 연결용)
consumer:
group-id: ${spring.application.name}-group # 그룹 아이디 (@KafkaListener에 설정 권장)
auto-offset-reset: earliest # earliest: 처음부터 읽음, latest: 최근부터 읽음 (일반적으로 latest 사용)
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 메시지 키 역직렬화
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer # 메시지 값 역직렬화
properties:
spring.json.trusted.packages: '*' # 역직렬화에 사용할 패키지 목록 (모든 패키지 허용)
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 메시지 키 직렬화
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 메시지 값 직렬화
1. 부트스트랩 서버 (bootstrap-servers)
- 설명: 클러스터에 있는 여러 브로커 중 하나 이상의 주소를 설정하여 클라이언트가 Kafka 클러스터에 처음 연결할 때 사용됩니다.
- 역할: 클라이언트는 이 주소를 통해 다른 브로커들의 메타데이터를 얻고 클러스터와 통신을 시작합니다.
2. 컨슈머 그룹 아이디 (group-id)
- 설명: 컨슈머 그룹을 식별하는 아이디로, 동일한 그룹에 속한 여러 컨슈머가 토픽의 메시지를 나누어 소비합니다.
- 특징:
- 그룹 아이디를 기반으로 오프셋을 추적하여 재시작 시 마지막 읽은 위치에서 이어서 메시지를 소비합니다.
- 동일 그룹 내에서 메시지 중복 소비는 발생하지 않으며, 각 컨슈머는 다른 파티션의 메시지를 소비합니다.
- 권장: @KafkaListener에서 groupId를 직접 설정하는 것이 일반적입니다.
3. auto-offset-reset
- 설명: 새로운 컨슈머 그룹이 메시지를 처음 읽을 때, 이전 오프셋을 찾을 수 없는 경우 어떤 시점부터 메시지를 읽을지 설정합니다.
- 옵션:
- earliest: 가장 처음부터 메시지를 읽음
- latest: 가장 최근 메시지부터 읽음 (기본적으로 많이 사용)
- 용도: 특정 시점부터 메시지를 읽고 싶다면 earliest를 사용하고, 일반적인 경우에는 latest를 사용합니다.
4. 직렬화 및 역직렬화 (Serialization & Deserialization)
- 설명: 메시지를 보내고 받을 때 데이터를 변환하는 설정입니다.
- 옵션:
- 컨슈머:
- key-deserializer: 메시지의 키를 역직렬화
- value-deserializer: 메시지의 값을 역직렬화
- 프로듀서:
- key-serializer: 메시지의 키를 직렬화
- value-serializer: 메시지의 값을 직렬화
- 컨슈머:
5. ack 설정
- 설명: 프로듀서가 전송한 메시지에 대해 브로커로부터 받는 응답 방식입니다.
- 옵션:
- acks=0:
- 특징: 프로듀서는 브로커로부터 응답을 기다리지 않음
- 장점: 성능이 매우 높음
- 단점: 메시지 손실 가능성이 높음
- acks=1 (기본값):
- 특징: 리더 브로커가 메시지를 받으면 프로듀서에게 응답을 보냄
- 장점: 적당한 성능과 안정성의 균형
- 단점: 리더 브로커가 메시지를 잃으면 데이터 손실 가능성
- acks=all 또는 acks=-1:
- 특징: 리더와 모든 팔로워 브로커에 메시지가 복제된 후 프로듀서에게 응답을 보냄
- 장점: 데이터 손실 가능성이 거의 없음
- 단점: 성능이 가장 낮음
- acks=0:
참고)
'bootstrap 서버 주소'라고 부르는 이유는 클러스터와 연결하기 위한 초기 진입점(bootstrap point)으로서 사용되기 때문입니다. 클라이언트(컨슈머 또는 프로듀서)가 Kafka 클러스터에 처음 연결할 때, 클러스터 내 모든 브로커의 정보를 알 필요는 없습니다. 대신, 하나 또는 몇 개의 브로커 주소만 알면 나머지 브로커에 대한 정보를 자동으로 조회할 수 있게 됩니다.
Spring Kafka 설정을 Java @Configuration 파일로 변환한 예시
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConfig {
// Consumer 설정
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // bootstrap 서버 주소
props.put(ConsumerConfig.GROUP_ID_CONFIG, "your-group-id"); // 컨슈머 그룹 아이디 설정
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // auto-offset-reset 설정
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 키 역직렬화
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); // 값 역직렬화
props.put("spring.json.trusted.packages", "*"); // 역직렬화 가능한 패키지
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
// Producer 설정
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // bootstrap 서버 주소
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 키 직렬화
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); // 값 직렬화
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
// 추가로 토픽을 생성하는 설정 (필요 시 사용)
@Bean
public NewTopic topic1() {
return new NewTopic("topic-name", 1, (short) 1);
}
}