728x90
반응형

1. application.properties

spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

logging.level.org.apache.kafka=ERROR
spring.kafka.bootstrap-servers=localhost:9092

위와 같이 application.properties를 설정해준다. 자세한 설정은 아래를 참고하자.


Kafka Publisher Configuration

  • bootstrap.servers
    • 연결할 서버 정보. e.g. : host1:port1,host2:port2와 같이 여러개를 나열할 수 있음
    • 초기 커넥션 연결시에 사용하기 때문에, 모든 서버 리스트를 포함할 필요는 없음. (실제 메시지 전송시에는 새로운 커넥션을 맺은 다음에 전송하기 때문)
  • key.serializer, value.serializer
    • 메시지를 serialize 할 때 사용할 클래스를 지정
    • ByteArraySerializer, StringSerializer 등등 Serializer를 implements한 클래스들이 있음
  • partitioner.class
    • 어떤 파티션에 메시지를 전송할지 결정하는 클래스임
    • 기본값은 DefaultPartitioner이며 메시지 키의 해시값을 기반으로 전송할 파티션을 결정함
  • acks
    • 프로듀서가 전송한 메시지를 카프카가 잘 받은 걸로 처리할 기준을 말함
    • 0, 1, all 값으로 세팅할 수 있으며 각각 메시지 손실률과 전송 속도에 대해 차이가 있음
    • 설정값 비교
      설정값 손실률 속도 설명
      acks = 0 높음 빠름 프로듀서는 서버의 확인을 기다리지 않고
      메시지 전송이 끝나면 성공으로 간주합니다.
      acks = 1 보통 보통 카프카의 leader가 메시지를 잘 받았는지만 확인합니다.
      acks = all 낮음 느림 카프카의 leader와 follower까지 모두 받았는지를 확인합니다.
    • 기본값은 acks=1 
  • buffer.memory
    • 프로듀서가 서버로 전송 대기중인 레코드를 버퍼링하는데 사용할 수 있는 메모리 양
    • 레코드가 서버에 전달될 수 있는 것보다더 빨리 전송되면 max.block.ms동안 레코드를 보내지 않음
    • 기본값은 33554432, 약 33MB임
  • retries
    • 프로듀서가 에러가 났을때 다시 시도할 횟수를 말함
    • 0보다 큰 숫자로 설정하면 그 숫자만큼 오류 발생시에 재시도 함
  • max.request.size
    • 요청의 최대 바이트 크기를 말합니다. 대용량 요청을 보내지 않도록 제한할 수 있음
    • 카프카 서버에도 별도로 설정할 수 있으므로 서로 값이 다를 수 있음
  • connections.max.idle.ms
    • 지정한 시간 이후에는 idle 상태의 연결을 닫음
  • max.block.ms
    • 버퍼가 가득 찼거나 메타데이터를 사용할 수 없을 때 차단할 시간을 정할 수 있음
  • request.timeout.ms
    • 클라이언트가 요청 응답을 기다리는 최대 시간을 정할 수 있음
    • 정해진 시간 전에 응답을 받지 못하면 다시 요청을 보내거나 재시도 횟수를 넘어서면 요청이 실패
  • retry.backoff.ms
    • 실패한 요청에 대해 프로듀서가 재시도하기 전에 대기할 시간
  • producer.type
    • 메시지를 동기(sync), 비동기(async)로 보낼지 선택할 수 있음
    • 비동기를 사용하는 경우 메시지를 일정 시간동안 쌓은 후 전송하므로 처리 효율을 향상시킬 수 있음

- 참고 Spring Boot에서 Apache Kafka 사용 ... 1/2 :: Modern Architecture Stories (tistory.com)

 

2. Controller

간단한 Controller를 만들자.

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RequiredArgsConstructor
@RestController
@RequestMapping("/publish")
public class controller {

    private final KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping
    public String publish() {

        kafkaTemplate.send("one-topic", "abc");
        return "success";
    }

}

 

3. postman으로 요청을 보내본다.

 

4. kafka 컨테이너에 접속해서 리스트를 보자.

정상적으로 토픽이 생성된 것을 볼 수 있다.

 

5. 토픽 내용을 확인한다.

# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic one-topic --from-beginning

 

6. 토픽을 상세히 본다.

# kafka-topics.sh --bootstrap-server localhost:9092 --topic one-topic --describe

 

7. Custom KafkaTemplete

다른 템플릿을 사용하고 싶으면 다음과 같이 Config를 추가하여 의존성을 주입해주면 된다.

@Configuration
@RequiredArgsConstructor
public class KafkaProducerConfig {
    
    private final KafkaProperties kafkaProperties;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.ACKS_CONFIG, "all");
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

 

 

728x90
반응형

+ Recent posts