728x90
반응형
Dockerfile로 필요한 confluent 패키지를 다운로드 받고, add_connector.sh 와 connect-distributed.properties 파일을 사용한다.
- add_connector.sh 에선 Kafka 연결을 확인하고 connect-mqtt-source.json파일대로 curl을 실행한다.
- connect-distributed.properties 파일로 connect 기본 설정을 한다.
1. Kafka, Kafka-Connect Docker Compose 생성
kafka:
image: bitnami/kafka:3.4
container_name: kafka
ports:
- "9092:9092"
environment:
ALLOW_PLAINTEXT_LISTENER: yes
KAFKA_BROKER_ID: 1
KAFKA_CFG_PROCESS_ROLES: broker,controller
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: CLIENT
KAFKA_CFG_LISTENERS: CONTROLLER://:9093,CLIENT://:9092
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,CLIENT:PLAINTEXT
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: true
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_CFG_ADVERTISED_LISTENERS: CLIENT://kafka:9092
KAFKA_CFG_DELETE_TOPIC_ENABLE: true
KAFKA_CFG_BROKER_ID: 1
KAFKA_CFG_NODE_ID: 1
KRAFT_MODE: true
KAFKA_ENABLE_KRAFT: yes
TZ: Asia/Seoul
volumes:
- ${KAFKA_DATA_DIR}:/bitnami/kafka/data
kafka-connect:
build: ./docker/kafka-connect
container_name: kafka-connect
ports:
- "8083:8083"
depends_on:
- kafka
volumes:
- ${KAFKA_CONNECT_DATA_DIR}:/var/lib/kafka-connect
2. Kafka Connect Dockerfile 생성
프로젝트/docker/kafka-connect
FROM bitnami/kafka:3.4
ARG MODULE=kafka-connect
ENV MODULE=${MODULE}
USER root
WORKDIR /connectors
RUN apt-get update && apt-get upgrade -y && apt-get install unzip -y
RUN curl -O https://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
RUN tar -zxvf confluent-hub-client-latest.tar.gz
RUN rm -f confluent-hub-client-latest.tar.gz
RUN mkdir component && mkdir config && cd config && touch worker.properties
RUN cd /connectors/bin && ./confluent-hub install --no-prompt confluentinc/kafka-connect-mqtt:1.7.1 \
--component-dir /connectors/component \
--worker-configs /connectors/config/worker.properties
RUN cp -p /connectors/component/confluentinc-kafka-connect-mqtt/lib/* /opt/bitnami/kafka/libs
WORKDIR /tmp
COPY . .
CMD /tmp/add_connector.sh & /opt/bitnami/kafka/bin/connect-distributed.sh /tmp/connect-distributed.properties
3. connect-distributed.properties 생성
bootstrap.servers=kafka:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
converter.encoding=UTF-8
offset.flush.interval.ms=10000
topic.prefix=test
tasks.max=5
plugin.path=/connectors
4. connect-mqtt-source.json 생성
{
"name": "mqtt-source-1",
"config": {
"bootstrap.servers": "kafka:9092",
"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max": "1",
"mqtt.server.uri": "tcp://host.docker.internal:9000",
"mqtt.topics": "data",
"kafka.topic": "mqtt.data",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"confluent.topic.bootstrap.servers": "kafka:9092",
"confluent.topic.replication.factor": 1
}
}
5. add_connector.sh 생성
# Wait for Kafka Connect to start
while true
do
status=$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
if [ "$status" -eq 200 ]; then
echo "Kafka Connect has started."
break
else
echo "Waiting for Kafka Connect to start..."
sleep 3
fi
done
echo "Start to add kafka-connect"
# ADD Mqtt Source Connectors
curl -d @/tmp/connect-mqtt-source.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors
echo "Complete to add kafka-connect"
6. Kafka consumer (Spring Boot) 사용법은 아래를 참고한다.
Spring boot | Spring Apache Kafka 사용법 ( with Docker Container ) | Consumer :: 티포의개발일지 (tistory.com)
7. docker compose 실행 후 mqtt에서 보낸 데이터를 확인한다.
728x90
반응형
'Back-End > Spring Boot' 카테고리의 다른 글
Spring boot | Restdocs-api-spec with Swagger, Docker 완전 정복 하기 (0) | 2024.05.08 |
---|---|
Spring Boot | Reverse Proxy Server with Spring Cloud Gateway (2) | 2024.01.26 |
Spring Boot | MQTT ( Mosquitto ) with Kafka | MQTT 사용하기 (2) | 2024.01.10 |
Spring Boot | @AuthenticationPrincipal in Spring Security (0) | 2023.09.06 |
Spring Boot | LocalDateTime timezone 세팅하기 (0) | 2023.08.29 |