728x90
반응형
1. application.properties
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
logging.level.org.apache.kafka=ERROR
spring.kafka.bootstrap-servers=localhost:9092
위와 같이 application.properties를 설정해준다. 자세한 설정은 아래를 참고하자.
Kafka Publisher Configuration
- bootstrap.servers
- 연결할 서버 정보. e.g. : host1:port1,host2:port2와 같이 여러개를 나열할 수 있음
- 초기 커넥션 연결시에 사용하기 때문에, 모든 서버 리스트를 포함할 필요는 없음. (실제 메시지 전송시에는 새로운 커넥션을 맺은 다음에 전송하기 때문)
- key.serializer, value.serializer
- 메시지를 serialize 할 때 사용할 클래스를 지정
- ByteArraySerializer, StringSerializer 등등 Serializer를 implements한 클래스들이 있음
- partitioner.class
- 어떤 파티션에 메시지를 전송할지 결정하는 클래스임
- 기본값은 DefaultPartitioner이며 메시지 키의 해시값을 기반으로 전송할 파티션을 결정함
- acks
- 프로듀서가 전송한 메시지를 카프카가 잘 받은 걸로 처리할 기준을 말함
- 0, 1, all 값으로 세팅할 수 있으며 각각 메시지 손실률과 전송 속도에 대해 차이가 있음
- 설정값 비교
설정값 손실률 속도 설명 acks = 0 높음 빠름 프로듀서는 서버의 확인을 기다리지 않고
메시지 전송이 끝나면 성공으로 간주합니다.acks = 1 보통 보통 카프카의 leader가 메시지를 잘 받았는지만 확인합니다. acks = all 낮음 느림 카프카의 leader와 follower까지 모두 받았는지를 확인합니다. - 기본값은 acks=1
- buffer.memory
- 프로듀서가 서버로 전송 대기중인 레코드를 버퍼링하는데 사용할 수 있는 메모리 양
- 레코드가 서버에 전달될 수 있는 것보다더 빨리 전송되면 max.block.ms동안 레코드를 보내지 않음
- 기본값은 33554432, 약 33MB임
- retries
- 프로듀서가 에러가 났을때 다시 시도할 횟수를 말함
- 0보다 큰 숫자로 설정하면 그 숫자만큼 오류 발생시에 재시도 함
- max.request.size
- 요청의 최대 바이트 크기를 말합니다. 대용량 요청을 보내지 않도록 제한할 수 있음
- 카프카 서버에도 별도로 설정할 수 있으므로 서로 값이 다를 수 있음
- connections.max.idle.ms
- 지정한 시간 이후에는 idle 상태의 연결을 닫음
- max.block.ms
- 버퍼가 가득 찼거나 메타데이터를 사용할 수 없을 때 차단할 시간을 정할 수 있음
- request.timeout.ms
- 클라이언트가 요청 응답을 기다리는 최대 시간을 정할 수 있음
- 정해진 시간 전에 응답을 받지 못하면 다시 요청을 보내거나 재시도 횟수를 넘어서면 요청이 실패
- retry.backoff.ms
- 실패한 요청에 대해 프로듀서가 재시도하기 전에 대기할 시간
- producer.type
- 메시지를 동기(sync), 비동기(async)로 보낼지 선택할 수 있음
- 비동기를 사용하는 경우 메시지를 일정 시간동안 쌓은 후 전송하므로 처리 효율을 향상시킬 수 있음
- 참고 Spring Boot에서 Apache Kafka 사용 ... 1/2 :: Modern Architecture Stories (tistory.com)
2. Controller
간단한 Controller를 만들자.
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RequiredArgsConstructor
@RestController
@RequestMapping("/publish")
public class controller {
private final KafkaTemplate<String, String> kafkaTemplate;
@GetMapping
public String publish() {
kafkaTemplate.send("one-topic", "abc");
return "success";
}
}
3. postman으로 요청을 보내본다.
4. kafka 컨테이너에 접속해서 리스트를 보자.
정상적으로 토픽이 생성된 것을 볼 수 있다.
5. 토픽 내용을 확인한다.
# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic one-topic --from-beginning
6. 토픽을 상세히 본다.
# kafka-topics.sh --bootstrap-server localhost:9092 --topic one-topic --describe
7. Custom KafkaTemplete
다른 템플릿을 사용하고 싶으면 다음과 같이 Config를 추가하여 의존성을 주입해주면 된다.
@Configuration
@RequiredArgsConstructor
public class KafkaProducerConfig {
private final KafkaProperties kafkaProperties;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
728x90
반응형
'Back-End > Spring Boot' 카테고리의 다른 글
Spring Boot | Thread ID 찾아서 interrupt 하기 (0) | 2023.03.14 |
---|---|
Spring boot | Spring Apache Kafka 사용법 ( with Docker Container ) | Consumer (0) | 2023.03.02 |
Spring boot | Spring Apache Kafka 사용법 ( with Docker Container ) | Kafka 설치 (0) | 2023.03.02 |
Spring boot | Jacoco로 테스트 커버리지 확인하기 (0) | 2023.02.23 |
Spring Boot | MockMvc with Spring Security and RestDocs (0) | 2023.02.21 |