728x90
반응형
이번엔 Consumer 서버를 만들어보고 Producer 서버에서 생성한 토픽을 구독하여 읽어보기로 하자.
1. application.properties
server.port=8081
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeSerializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeSerializer
logging.level.org.apache.kafka=ERROR
spring.kafka.bootstrap-servers=localhost:9092
- auto-offset-reset: 가장 이른 것은 소비자가 가장 이른 이벤트부터 읽는다는 것을 의미
- key-deserializer 및 value-deserializer는 메시지를 보내기 위해 Kafka 생산자가 보낸 키와 값을 역직렬화하는 역할
2. Main
Main에서 @KafkaListener를 사용하여 아래처럼 구현한다.
@SpringBootApplication
@Slf4j
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("one-topic")
.partitions(10)
.replicas(1)
.build();
}
@KafkaListener(id = "myGroup", topics = "one-topic")
public void listen(String in) {
log.info("message={}",in);
}
}
3. postmain으로 producer 서버에서 요청을 보낸 후 consumer 서버에서 확인한다.
- 예제
@Service
@Slf4j
public class service {
/**
* 일반 리스너
*/
@KafkaListener(topics = "test", groupId = "test-group-00")
public void recordListener(ConsumerRecord<String, String> record) {
log.info(record.toString());
// 기본적인 리스너선언 방식으로, poll()이 호출되어 가져온 레코드들을 차례대로 개별 레코드의 메시지 값을 파라미터로 받게 된다.
// 파라미터로 컨슈머 레코드를 받기 때문에 메시지 키, 메시지 값에 대한 처리를 이 메서드 안에서 수행하면 된다.
}
@KafkaListener(topics = "test", groupId = "test-group-01")
public void singleTopicListener(String messageValue) {
log.info(messageValue);
// 메시지 값을 파라미터로 받는 리스너
}
@KafkaListener(topics = "test", groupId = "test-group-02", properties = {"max.poll.interval.ms:60000", "auto.offset.reset:earliest"})
public void singleTopicWithPropertiesListener(String messageValue) {
log.info(messageValue);
// 별도의 프로퍼티 옵션값을 선언해주고 싶을 때 사용한다.
}
@KafkaListener(topics = "test", groupId = "test-group-03", concurrency = "3")
public void concurrentTopicListener(String messageValue) {
log.info(messageValue);
// 2개 이상의 카프카 컨슈머 스레드를 실행하고 싶을 때 concurrency 옵션을 활용할 수 있다.
// concurrency값 만큼 컨슈머 스레드를 생성하여 병렬처리 한다.
}
@KafkaListener(topicPartitions = {
@TopicPartition(topic = "test01", partitions = {"0", "1"}),
@TopicPartition(topic = "test02", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "3")),
})
public void listenSpecificPartition(ConsumerRecord<String, String> record) {
log.info(record.toString());
// 특정 토픽의 특정 파티션만 구독하고 때 `topicPartitions` 파라미터를 사용한다.
// `PartitionOffset` 어노테치션을 활용하면 특정 파티션의 특정 오프셋까지 지정할 수 있다.
// 이 경우에는 그룹 아이디에 관계없이 항상 설정한 오프셋의 데이터부터 가져온다.
}
/**
* 배치 리스너
*/
@KafkaListener(topics = "test", groupId = "test-group-00")
public void batchListener(ConsumerRecords<String, String> records) {
records.forEach(record -> log.info(record.toString()));
// 컨슈머 레코드의 묶음(ConsumerRecords)을 파라미터로 받는다.
// 카프카 클라이언트 라이브러리에서 poll() 메서드로 리턴받은 ConsumerRecords를 리턴받아 사용하는 방식과 같다.
}
@KafkaListener(topics = "test", groupId = "test-group-01")
public void singleTopicListener(List<String> list) {
list.forEach(recordValue -> log.info(recordValue));
// 메시지 값을 List형태로 받는다.
}
@KafkaListener(topics = "test", groupId = "test-group-02", concurrency = "3")
public void concurrentTopicListener(ConsumerRecords<String, String> records) {
records.forEach(record -> log.info(record.toString()));
// 2개 이상의 컨슈머 스레드로 배치 리스너를 운영할 경우에 concurrency 옵션을 함께 선언하여 사용하면 된다.
}
}
- Custom Container Factory
@Configuration
public class ListenerContainerConfiguration {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> customContainerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(Consumerconfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8080");
props.put(Consumerconfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(Consumerconfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props);
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); // 리스너 컨테이너를 만들기 위해 사용
factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
// 리밸런스 리스너를 선언하기 위해 setConsumerRebalanceListener 메서드를 호출한다.
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// 커밋이 되기 전 리밸런스가 발생했을 때 호출되는 메서드
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// 커밋이 일어난 이후 리밸런스가 발생했을 때 호출되는 메서드
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 리밸런싱이 끝나서 파티션 소유권이 할당되고 나면 호출되는 메서드
}
@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
}
});
factory.setBatchListener(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
factory.setConsumerFactory(cf);
return factory;
}
}
customContainerFactory를 사용해준다.
@KafkaListener(topics = "test", groupId = "test-group", containerFactory = "customContainerFactory")
public void customListener(String data) {
log.info(data);
// customContainerFactory 옵션을 커스텀 컨테이너 팩토리로 설정하여 사용한다.
}
728x90
반응형
'Back-End > Spring Boot' 카테고리의 다른 글
Spring Boot | Multi Scheduling in Multi Thread (0) | 2023.03.21 |
---|---|
Spring Boot | Thread ID 찾아서 interrupt 하기 (0) | 2023.03.14 |
Spring boot | Spring Apache Kafka 사용법 ( with Docker Container ) | Producer (0) | 2023.03.02 |
Spring boot | Spring Apache Kafka 사용법 ( with Docker Container ) | Kafka 설치 (0) | 2023.03.02 |
Spring boot | Jacoco로 테스트 커버리지 확인하기 (0) | 2023.02.23 |