auto-offset-reset: 가장 이른 것은 소비자가 가장 이른 이벤트부터 읽는다는 것을 의미
key-deserializer 및 value-deserializer는 메시지를 보내기 위해 Kafka 생산자가 보낸 키와 값을 역직렬화하는 역할
2. Main
Main에서 @KafkaListener를 사용하여 아래처럼 구현한다.
@SpringBootApplication
@Slf4j
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("one-topic")
.partitions(10)
.replicas(1)
.build();
}
@KafkaListener(id = "myGroup", topics = "one-topic")
public void listen(String in) {
log.info("message={}",in);
}
}
3. postmain으로 producer 서버에서 요청을 보낸 후 consumer 서버에서 확인한다.
- 예제
@Service
@Slf4j
public class service {
/**
* 일반 리스너
*/
@KafkaListener(topics = "test", groupId = "test-group-00")
public void recordListener(ConsumerRecord<String, String> record) {
log.info(record.toString());
// 기본적인 리스너선언 방식으로, poll()이 호출되어 가져온 레코드들을 차례대로 개별 레코드의 메시지 값을 파라미터로 받게 된다.
// 파라미터로 컨슈머 레코드를 받기 때문에 메시지 키, 메시지 값에 대한 처리를 이 메서드 안에서 수행하면 된다.
}
@KafkaListener(topics = "test", groupId = "test-group-01")
public void singleTopicListener(String messageValue) {
log.info(messageValue);
// 메시지 값을 파라미터로 받는 리스너
}
@KafkaListener(topics = "test", groupId = "test-group-02", properties = {"max.poll.interval.ms:60000", "auto.offset.reset:earliest"})
public void singleTopicWithPropertiesListener(String messageValue) {
log.info(messageValue);
// 별도의 프로퍼티 옵션값을 선언해주고 싶을 때 사용한다.
}
@KafkaListener(topics = "test", groupId = "test-group-03", concurrency = "3")
public void concurrentTopicListener(String messageValue) {
log.info(messageValue);
// 2개 이상의 카프카 컨슈머 스레드를 실행하고 싶을 때 concurrency 옵션을 활용할 수 있다.
// concurrency값 만큼 컨슈머 스레드를 생성하여 병렬처리 한다.
}
@KafkaListener(topicPartitions = {
@TopicPartition(topic = "test01", partitions = {"0", "1"}),
@TopicPartition(topic = "test02", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "3")),
})
public void listenSpecificPartition(ConsumerRecord<String, String> record) {
log.info(record.toString());
// 특정 토픽의 특정 파티션만 구독하고 때 `topicPartitions` 파라미터를 사용한다.
// `PartitionOffset` 어노테치션을 활용하면 특정 파티션의 특정 오프셋까지 지정할 수 있다.
// 이 경우에는 그룹 아이디에 관계없이 항상 설정한 오프셋의 데이터부터 가져온다.
}
/**
* 배치 리스너
*/
@KafkaListener(topics = "test", groupId = "test-group-00")
public void batchListener(ConsumerRecords<String, String> records) {
records.forEach(record -> log.info(record.toString()));
// 컨슈머 레코드의 묶음(ConsumerRecords)을 파라미터로 받는다.
// 카프카 클라이언트 라이브러리에서 poll() 메서드로 리턴받은 ConsumerRecords를 리턴받아 사용하는 방식과 같다.
}
@KafkaListener(topics = "test", groupId = "test-group-01")
public void singleTopicListener(List<String> list) {
list.forEach(recordValue -> log.info(recordValue));
// 메시지 값을 List형태로 받는다.
}
@KafkaListener(topics = "test", groupId = "test-group-02", concurrency = "3")
public void concurrentTopicListener(ConsumerRecords<String, String> records) {
records.forEach(record -> log.info(record.toString()));
// 2개 이상의 컨슈머 스레드로 배치 리스너를 운영할 경우에 concurrency 옵션을 함께 선언하여 사용하면 된다.
}
}
- Custom Container Factory
@Configuration
public class ListenerContainerConfiguration {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> customContainerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(Consumerconfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8080");
props.put(Consumerconfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(Consumerconfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props);
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); // 리스너 컨테이너를 만들기 위해 사용
factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
// 리밸런스 리스너를 선언하기 위해 setConsumerRebalanceListener 메서드를 호출한다.
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// 커밋이 되기 전 리밸런스가 발생했을 때 호출되는 메서드
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// 커밋이 일어난 이후 리밸런스가 발생했을 때 호출되는 메서드
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 리밸런싱이 끝나서 파티션 소유권이 할당되고 나면 호출되는 메서드
}
@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
}
});
factory.setBatchListener(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
factory.setConsumerFactory(cf);
return factory;
}
}
version: '2' //docker-compose 버전 지정
services: //docker-compose의 경우 docker 컨테이너로 수행될 서비스들은 services 하위에 기술
zookeeper: //서비스 이름. service 하위에 작성하면 해당 이름으로 동작
image: wurstmeister/zookeeper //도커 이미지
container_name: zookeeper
ports: //외부포트:컨테이너내부포트
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports: //외부포트:컨테이너내부포트
- "9092:9092"
environment://kafka 브로터를 위한 환경 변수 지정
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 //kafka가 zookeeper에 커넥션하기 위한 대상을 지정
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Spring Boot를 이용할 땐 RestDocs를 가져오면 알아서 build gradle에 추가해준다.
아닐 경우엔 구글에 검색하면 최신 버전에 맞게끔 설정하는 내용이 많을테니 검색해보자.
아래는 추가되어야 할 것들 ( 버전에 따라 다를 수도 있습니다. )
plugins {
id "org.asciidoctor.jvm.convert" version "3.3.2"//RestDoc
}
ext {
set('snippetsDir', file("build/generated-snippets"))
}
configurations {
asciidoctorExt
}
asciidoctor {//RestDoc
inputs.dir snippetsDir
configurations 'asciidoctorExt'
dependsOn test
}
dependencies {
testImplementation 'org.springframework.restdocs:spring-restdocs-mockmvc'
asciidoctorExt 'org.springframework.restdocs:spring-restdocs-asciidoctor' //RestDoc
}
ext {
snippetsDir = file('build/generated-snippets')//RestDoc
}
test {
outputs.dir snippetsDir
useJUnitPlatform()
}
// asccidoctor 작업 이후 생성된 HTML 파일을 static/docs 로 copy
task copyDocument(type: Copy) {
dependsOn asciidoctor
from file("build/docs/asciidoc")
into file("src/main/resources/static/docs")
}
// 참고사항 //
// 공식 문서에서는 위의 ascidoctor.doFirst부터 아래 내용은 없고 이와 같은 내용만 있습니다.
// 이렇게 하면 jar로 만들어 질때 옮겨지는 것으로 IDE로 돌릴 때는 build 폴더에서만 확인이 가능합니다.
// 위 방법을 사용하면 IDE에서도 static으로 옮겨진 것을 확인할 수 있습니다.
// 위에 방법을 사용하든 아래 방법을 사용하든 편한 선택지를 사용하시면 됩니다.
//bootJar {//RestDoc
// dependsOn asciidoctor
// from("${asciidoctor.outputDir}/html5") {
// into 'static/docs'
// }
//}
tasks.named('test') {
outputs.dir snippetsDir
useJUnitPlatform()
}
tasks.named('asciidoctor') {
inputs.dir snippetsDir
dependsOn test
}
Request와 Response를 명세해주기 위한 메서드를 사용해준다. 필요에 따라서 작성해주면 된다.
.andDo( // rest docs 문서 작성 시작
document("member-get", // 문서 조각 디렉토리 명
pathParameters( // path 파라미터 정보 입력
parameterWithName("id").description("Member ID")
),
responseFields( // response 필드 정보 입력
fieldWithPath("id").description("ID"),
fieldWithPath("name").description("name"),
fieldWithPath("email").description("email")
)
)
)
pathParameter 나 requestFields는 다음과 같이 할 수 있다.
- Path Parameters
spring에서는 Path value를 사용할 때 MockMvcRequestBuilders 보다 RestDocumentationRequestBuilders를 선호한다.
= REST Docs
backtony.github.io(부제)
:doctype: book
:icons: font
:source-highlighter: highlightjs
:toc: left
:toclevels: 2
:sectlinks:
[[User-API]]
== User API
[[User-단일-조회]]
=== User 단일 조회
operation::user-get[]
문서를 html로 보는 방법은 몇 가지가 있는데, 위에서 설치했던 plugin으로 볼 수 있다
html버튼을 누르면 브라우저가 뜨고 파일이 생성되는 것을 볼 수 있다.
23.06.28 추가 내용
6. 리팩토링
테스트 코드의 andDo(document()) 부분의 중복을 삭제하는 리팩토링을 진행해보자.
먼저 디렉토리 이름을 따로 지정해주지 않아도 되도록 아래 Configuration을 선언해준다.
utils/RestDocsConfig.java
@TestConfiguration
public class RestDocsConfig {
@Bean
public RestDocumentationResultHandler write() {
return MockMvcRestDocumentation.document(
"{class-name}/{method-name}",
Preprocessors.preprocessRequest(Preprocessors.prettyPrint()),
Preprocessors.preprocessResponse(Preprocessors.prettyPrint())
);
}
public static final Attribute field(
final String key,
final String value) {
return new Attribute(key, value);
}
}
@ToString
@Getter
@NoArgsConstructor
@Builder
public class ApiResponseDto<T> {
private T data;
private ApiResponseDto(T data){
this.data=data;
}
public static <T> ApiResponseDto<T> of(T data) {
return new ApiResponseDto<>(data);
}
}
utils/consts/EnumDocs
@Getter
@Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class EnumDocs {
// 문서화하고 싶은 모든 enum값을 명시
Map<String,String> Sex;
Map<String,String> memberStatus;
}
utils/consts/CommonDocController
@ToString
@Getter
@NoArgsConstructor
@Builder
public class ApiResponseDto<T> {
private T data;
private ApiResponseDto(T data){
this.data=data;
}
public static <T> ApiResponseDto<T> of(T data) {
return new ApiResponseDto<>(data);
}
}
utils/consts/CommonDocControllerTest
// restdocs의 get 이 아님을 주의!!
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
class CommonDocControllerTest extends RestDocsTestSupport {
@Test
public void enums() throws Exception {
// 요청
ResultActions result = this.mockMvc.perform(
get("/test/enums")
.contentType(MediaType.APPLICATION_JSON)
);
// 결과값
MvcResult mvcResult = result.andReturn();
// 데이터 파싱
EnumDocs enumDocs = getData(mvcResult);
// 문서화 진행
result.andExpect(status().isOk())
.andDo(restDocs.document(
customResponseFields("custom-response", beneathPath("data.memberStatus").withSubsectionId("memberStatus"), // (1)
attributes(key("title").value("memberStatus")),
enumConvertFieldDescriptor((enumDocs.getMemberStatus()))
),
customResponseFields("custom-response", beneathPath("data.sex").withSubsectionId("sex"),
attributes(key("title").value("sex")),
enumConvertFieldDescriptor((enumDocs.getSex()))
)
));
}
// 커스텀 템플릿 사용을 위한 함수
public static CustomResponseFieldsSnippet customResponseFields
(String type,
PayloadSubsectionExtractor<?> subsectionExtractor,
Map<String, Object> attributes, FieldDescriptor... descriptors) {
return new CustomResponseFieldsSnippet(type, subsectionExtractor, Arrays.asList(descriptors), attributes
, true);
}
// Map으로 넘어온 enumValue를 fieldWithPath로 변경하여 리턴
private static FieldDescriptor[] enumConvertFieldDescriptor(Map<String, String> enumValues) {
return enumValues.entrySet().stream()
.map(x -> fieldWithPath(x.getKey()).description(x.getValue()))
.toArray(FieldDescriptor[]::new);
}
// mvc result 데이터 파싱
private EnumDocs getData(MvcResult result) throws IOException {
ApiResponseDto<EnumDocs> apiResponseDto = objectMapper
.readValue(result.getResponse().getContentAsByteArray(),
new TypeReference<ApiResponseDto<EnumDocs>>() {}
);
return apiResponseDto.getData();
}
}