728x90
반응형

이번엔 Consumer 서버를 만들어보고 Producer 서버에서 생성한 토픽을 구독하여 읽어보기로 하자.

 

1. application.properties

server.port=8081
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeSerializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeSerializer

logging.level.org.apache.kafka=ERROR
spring.kafka.bootstrap-servers=localhost:9092
  • auto-offset-reset: 가장 이른 것은 소비자가 가장 이른 이벤트부터 읽는다는 것을 의미
  • key-deserializer 및 value-deserializer는 메시지를 보내기 위해 Kafka 생산자가 보낸 키와 값을 역직렬화하는 역할

2. Main

Main에서 @KafkaListener를 사용하여 아래처럼 구현한다.

@SpringBootApplication
@Slf4j
public class ConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(ConsumerApplication.class, args);
	}

	@Bean
	public NewTopic topic() {
		return TopicBuilder.name("one-topic")
			.partitions(10)
			.replicas(1)
			.build();
	}

	@KafkaListener(id = "myGroup", topics = "one-topic")
	public void listen(String in) {
		log.info("message={}",in);
	}
}

 

3. postmain으로 producer 서버에서 요청을 보낸 후 consumer 서버에서 확인한다.

 

 

 

- 예제

@Service
@Slf4j
public class service {

    /**
     * 일반 리스너
     */

    @KafkaListener(topics = "test", groupId = "test-group-00")
    public void recordListener(ConsumerRecord<String, String> record) {
        log.info(record.toString());
        // 기본적인 리스너선언 방식으로, poll()이 호출되어 가져온 레코드들을 차례대로 개별 레코드의 메시지 값을 파라미터로 받게 된다.
        // 파라미터로 컨슈머 레코드를 받기 때문에 메시지 키, 메시지 값에 대한 처리를 이 메서드 안에서 수행하면 된다.
    }

    @KafkaListener(topics = "test", groupId = "test-group-01")
    public void singleTopicListener(String messageValue) {
        log.info(messageValue);
        // 메시지 값을 파라미터로 받는 리스너
    }

    @KafkaListener(topics = "test", groupId = "test-group-02", properties = {"max.poll.interval.ms:60000", "auto.offset.reset:earliest"})
    public void singleTopicWithPropertiesListener(String messageValue) {
        log.info(messageValue);
        // 별도의 프로퍼티 옵션값을 선언해주고 싶을 때 사용한다.
    }

    @KafkaListener(topics = "test", groupId = "test-group-03", concurrency = "3")
    public void concurrentTopicListener(String messageValue) {
        log.info(messageValue);
        // 2개 이상의 카프카 컨슈머 스레드를 실행하고 싶을 때 concurrency 옵션을 활용할 수 있다.
        // concurrency값 만큼 컨슈머 스레드를 생성하여 병렬처리 한다.
    }

    @KafkaListener(topicPartitions = {
        @TopicPartition(topic = "test01", partitions = {"0", "1"}),
        @TopicPartition(topic = "test02", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "3")),
    })
    public void listenSpecificPartition(ConsumerRecord<String, String> record) {
        log.info(record.toString());
        // 특정 토픽의 특정 파티션만 구독하고 때 `topicPartitions` 파라미터를 사용한다.
        // `PartitionOffset` 어노테치션을 활용하면 특정 파티션의 특정 오프셋까지 지정할 수 있다.
        // 이 경우에는 그룹 아이디에 관계없이 항상 설정한 오프셋의 데이터부터 가져온다.
    }

    /**
     * 배치 리스너
     */

    @KafkaListener(topics = "test", groupId = "test-group-00")
    public void batchListener(ConsumerRecords<String, String> records) {
        records.forEach(record -> log.info(record.toString()));
        // 컨슈머 레코드의 묶음(ConsumerRecords)을 파라미터로 받는다.
        // 카프카 클라이언트 라이브러리에서 poll() 메서드로 리턴받은 ConsumerRecords를 리턴받아 사용하는 방식과 같다.
    }

    @KafkaListener(topics = "test", groupId = "test-group-01")
    public void singleTopicListener(List<String> list) {
        list.forEach(recordValue -> log.info(recordValue));
        // 메시지 값을 List형태로 받는다.
    }

    @KafkaListener(topics = "test", groupId = "test-group-02", concurrency = "3")
    public void concurrentTopicListener(ConsumerRecords<String, String> records) {
        records.forEach(record -> log.info(record.toString()));
        // 2개 이상의 컨슈머 스레드로 배치 리스너를 운영할 경우에 concurrency 옵션을 함께 선언하여 사용하면 된다.
    }
}

 

- Custom Container Factory

@Configuration
public class ListenerContainerConfiguration {
    
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> customContainerFactory() {
        
        Map<String, Object> props = new HashMap<>();
        props.put(Consumerconfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8080");
        props.put(Consumerconfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(Consumerconfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        
        DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props);
        
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();  //  리스너 컨테이너를 만들기 위해 사용
        factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
            // 리밸런스 리스너를 선언하기 위해 setConsumerRebalanceListener 메서드를 호출한다.
            
            @Override
            public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
                // 커밋이 되기 전 리밸런스가 발생했을 때 호출되는 메서드
            }
            
            @Override
            public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
                // 커밋이 일어난 이후 리밸런스가 발생했을 때 호출되는 메서드
            }
            
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                // 리밸런싱이 끝나서 파티션 소유권이 할당되고 나면 호출되는 메서드
            }
            
            @Override
            public void onPartitionsLost(Collection<TopicPartition> partitions) {
                
            }
            
        });
        
        factory.setBatchListener(false);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
        factory.setConsumerFactory(cf);
        
        return factory;
    }
    
}

 

customContainerFactory를 사용해준다.

@KafkaListener(topics = "test", groupId = "test-group", containerFactory = "customContainerFactory")
public void customListener(String data) {
    log.info(data);
    // customContainerFactory 옵션을 커스텀 컨테이너 팩토리로 설정하여 사용한다.
}

 

- 참고 spring kafka 사용법 | D-log (leejaedoo.github.io)

728x90
반응형

+ Recent posts