3. postmain으로 producer 서버에서 요청을 보낸 후 consumer 서버에서 확인한다.
- 예제
@Service@Slf4jpublicclassservice{
/**
* 일반 리스너
*/@KafkaListener(topics = "test", groupId = "test-group-00")publicvoidrecordListener(ConsumerRecord<String, String> record){
log.info(record.toString());
// 기본적인 리스너선언 방식으로, poll()이 호출되어 가져온 레코드들을 차례대로 개별 레코드의 메시지 값을 파라미터로 받게 된다.// 파라미터로 컨슈머 레코드를 받기 때문에 메시지 키, 메시지 값에 대한 처리를 이 메서드 안에서 수행하면 된다.
}
@KafkaListener(topics = "test", groupId = "test-group-01")publicvoidsingleTopicListener(String messageValue){
log.info(messageValue);
// 메시지 값을 파라미터로 받는 리스너
}
@KafkaListener(topics = "test", groupId = "test-group-02", properties = {"max.poll.interval.ms:60000", "auto.offset.reset:earliest"})publicvoidsingleTopicWithPropertiesListener(String messageValue){
log.info(messageValue);
// 별도의 프로퍼티 옵션값을 선언해주고 싶을 때 사용한다.
}
@KafkaListener(topics = "test", groupId = "test-group-03", concurrency = "3")publicvoidconcurrentTopicListener(String messageValue){
log.info(messageValue);
// 2개 이상의 카프카 컨슈머 스레드를 실행하고 싶을 때 concurrency 옵션을 활용할 수 있다.// concurrency값 만큼 컨슈머 스레드를 생성하여 병렬처리 한다.
}
@KafkaListener(topicPartitions = {
@TopicPartition(topic = "test01", partitions = {"0", "1"}),
@TopicPartition(topic = "test02", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "3")),
})publicvoidlistenSpecificPartition(ConsumerRecord<String, String> record){
log.info(record.toString());
// 특정 토픽의 특정 파티션만 구독하고 때 `topicPartitions` 파라미터를 사용한다.// `PartitionOffset` 어노테치션을 활용하면 특정 파티션의 특정 오프셋까지 지정할 수 있다.// 이 경우에는 그룹 아이디에 관계없이 항상 설정한 오프셋의 데이터부터 가져온다.
}
/**
* 배치 리스너
*/@KafkaListener(topics = "test", groupId = "test-group-00")publicvoidbatchListener(ConsumerRecords<String, String> records){
records.forEach(record -> log.info(record.toString()));
// 컨슈머 레코드의 묶음(ConsumerRecords)을 파라미터로 받는다.// 카프카 클라이언트 라이브러리에서 poll() 메서드로 리턴받은 ConsumerRecords를 리턴받아 사용하는 방식과 같다.
}
@KafkaListener(topics = "test", groupId = "test-group-01")publicvoidsingleTopicListener(List<String> list){
list.forEach(recordValue -> log.info(recordValue));
// 메시지 값을 List형태로 받는다.
}
@KafkaListener(topics = "test", groupId = "test-group-02", concurrency = "3")publicvoidconcurrentTopicListener(ConsumerRecords<String, String> records){
records.forEach(record -> log.info(record.toString()));
// 2개 이상의 컨슈머 스레드로 배치 리스너를 운영할 경우에 concurrency 옵션을 함께 선언하여 사용하면 된다.
}
}
- Custom Container Factory
@ConfigurationpublicclassListenerContainerConfiguration{
@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> customContainerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(Consumerconfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8080");
props.put(Consumerconfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(Consumerconfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props);
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); // 리스너 컨테이너를 만들기 위해 사용
factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
// 리밸런스 리스너를 선언하기 위해 setConsumerRebalanceListener 메서드를 호출한다.@OverridepublicvoidonPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions){
// 커밋이 되기 전 리밸런스가 발생했을 때 호출되는 메서드
}
@OverridepublicvoidonPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions){
// 커밋이 일어난 이후 리밸런스가 발생했을 때 호출되는 메서드
}
@OverridepublicvoidonPartitionsAssigned(Collection<TopicPartition> partitions){
// 리밸런싱이 끝나서 파티션 소유권이 할당되고 나면 호출되는 메서드
}
@OverridepublicvoidonPartitionsLost(Collection<TopicPartition> partitions){
}
});
factory.setBatchListener(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
factory.setConsumerFactory(cf);
return factory;
}
}
version: '2' //docker-compose 버전 지정
services: //docker-compose의 경우 docker 컨테이너로 수행될 서비스들은 services 하위에 기술
zookeeper: //서비스 이름. service 하위에 작성하면 해당 이름으로 동작
image: wurstmeister/zookeeper //도커 이미지
container_name: zookeeper
ports: //외부포트:컨테이너내부포트
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports: //외부포트:컨테이너내부포트
- "9092:9092"
environment://kafka 브로터를 위한 환경 변수 지정
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 //kafka가 zookeeper에 커넥션하기 위한 대상을 지정
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Spring Boot를 이용할 땐 RestDocs를 가져오면 알아서 build gradle에 추가해준다.
아닐 경우엔 구글에 검색하면 최신 버전에 맞게끔 설정하는 내용이 많을테니 검색해보자.
아래는 추가되어야 할 것들 ( 버전에 따라 다를 수도 있습니다. )
plugins {
id "org.asciidoctor.jvm.convert" version "3.3.2"//RestDoc
}
ext {
set('snippetsDir', file("build/generated-snippets"))
}
configurations {
asciidoctorExt
}
asciidoctor {//RestDoc
inputs.dir snippetsDir
configurations 'asciidoctorExt'
dependsOn test
}
dependencies {
testImplementation 'org.springframework.restdocs:spring-restdocs-mockmvc'
asciidoctorExt 'org.springframework.restdocs:spring-restdocs-asciidoctor'//RestDoc
}
ext {
snippetsDir = file('build/generated-snippets')//RestDoc
}
test {
outputs.dir snippetsDir
useJUnitPlatform()
}
// asccidoctor 작업 이후 생성된 HTML 파일을 static/docs 로 copy
task copyDocument(type: Copy){
dependsOn asciidoctor
from file("build/docs/asciidoc")
into file("src/main/resources/static/docs")
}
// 참고사항 //// 공식 문서에서는 위의 ascidoctor.doFirst부터 아래 내용은 없고 이와 같은 내용만 있습니다.// 이렇게 하면 jar로 만들어 질때 옮겨지는 것으로 IDE로 돌릴 때는 build 폴더에서만 확인이 가능합니다.// 위 방법을 사용하면 IDE에서도 static으로 옮겨진 것을 확인할 수 있습니다.// 위에 방법을 사용하든 아래 방법을 사용하든 편한 선택지를 사용하시면 됩니다.//bootJar {//RestDoc// dependsOn asciidoctor// from("${asciidoctor.outputDir}/html5") {// into 'static/docs'// }//}
tasks.named('test'){
outputs.dir snippetsDir
useJUnitPlatform()
}
tasks.named('asciidoctor'){
inputs.dir snippetsDir
dependsOn test
}
Request와 Response를 명세해주기 위한 메서드를 사용해준다. 필요에 따라서 작성해주면 된다.
.andDo( // rest docs 문서 작성 시작document("member-get", // 문서 조각 디렉토리 명
pathParameters( // path 파라미터 정보 입력
parameterWithName("id").description("Member ID")
),
responseFields( // response 필드 정보 입력
fieldWithPath("id").description("ID"),
fieldWithPath("name").description("name"),
fieldWithPath("email").description("email")
)
)
)
pathParameter 나 requestFields는 다음과 같이 할 수 있다.
- Path Parameters
spring에서는 Path value를 사용할 때 MockMvcRequestBuilders 보다 RestDocumentationRequestBuilders를 선호한다.
= REST Docs
backtony.github.io(부제)
:doctype: book
:icons: font
:source-highlighter: highlightjs
:toc: left
:toclevels: 2
:sectlinks:
[[User-API]]
== User API
[[User-단일-조회]]
=== User 단일 조회
operation::user-get[]
@SpringBootTest@AutoConfigureRestDocs@AutoConfigureMockMvc@Disabled@Import(RestDocsConfig.class)@ExtendWith(RestDocumentationExtension.class)publicclassRestDocsTestSupport{
@Autowiredprotected MockMvc mockMvc;
@Autowiredprotected ObjectMapper objectMapper;
@Autowiredprotected JwtUtil jwtUtil;
@Autowiredprotected RestDocumentationResultHandler restDocs;
@BeforeEachvoidsetUp(final WebApplicationContext context,
final RestDocumentationContextProvider provider){
this.mockMvc = MockMvcBuilders.webAppContextSetup(context)
.apply(
MockMvcRestDocumentation.documentationConfiguration(provider)) // rest docs 설정 주입
.alwaysDo(MockMvcResultHandlers.print()) // andDo(print()) 코드 포함
.alwaysDo(restDocs) // pretty 패턴과 문서 디렉토리 명 정해준것 적용
.addFilters(new CharacterEncodingFilter("UTF-8", true)) // 한글 깨짐 방지
.build();
}
}
여기서 만약 Spring Security를 적용하면 다음과같이 써야한다.
@BeforeEachvoidsetUp(final WebApplicationContext context,
final RestDocumentationContextProvider provider)throws ServletException {
DelegatingFilterProxy delegateProxyFilter = new DelegatingFilterProxy();
delegateProxyFilter.init(
new MockFilterConfig(context.getServletContext(), BeanIds.SPRING_SECURITY_FILTER_CHAIN));
this.mockMvc = MockMvcBuilders.webAppContextSetup(context)
.apply(
MockMvcRestDocumentation.documentationConfiguration(provider)) // rest docs 설정 주입
.alwaysDo(MockMvcResultHandlers.print()) // andDo(print()) 코드 포함
.alwaysDo(restDocs) // pretty 패턴과 문서 디렉토리 명 정해준것 적용
.addFilters(
new CharacterEncodingFilter("UTF-8", true),
delegateProxyFilter
)
.build();
}
아래는 예시 코드이다.
classMemberControllerTestextendsRestDocsTestSupport{
@Testpublicvoidmember_page_test()throws Exception {
Member member = new Member("backtony@gmail.com", 27, MemberStatus.NORMAL);
PageImpl<Member> memberPage = new PageImpl<>(List.of(member), PageRequest.of(0, 10), 1);
given(memberRepository.findAll(ArgumentMatchers.any(Pageable.class))).willReturn(memberPage);
mockMvc.perform(
get("/api/members")
.param("size", "10")
.param("page", "0")
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andDo(
restDocs.document(
requestParameters(
parameterWithName("size").optional().description("size"), // 필수여부 false
parameterWithName("page").optional().description("page") // 필수여부 false
)
)
)
;
}
@Testpublicvoidmember_create()throws Exception {
MemberSignUpRequest dto = MemberSignUpRequest.builder()
.name("name")
.email("hhh@naver.com")
.status(MemberStatus.BAN)
.build();
mockMvc.perform(
post("/api/members")
.contentType(MediaType.APPLICATION_JSON)
.content(objectMapper.writeValueAsString(dto))
)
.andExpect(status().isOk())
.andDo(
restDocs.document(
requestFields(
// 앞서 작성한 RestDocsConfig의 field 메서드로 constraints를 명시
fieldWithPath("name").description("name").attributes(field("constraints", "길이 10 이하")),
fieldWithPath("email").description("email").attributes(field("constraints", "길이 30 이하")),
fieldWithPath("status").description("Code Member Status 참조")
)
)
)
;
}
}
7. Request, Response Fields 커스터마이징
/test/resources/org/springframework/restdocs/templates/asciidoctor/ 경로에 snippet 파일을 추가하면 default 표를 만들 수 있다.