728x90
반응형

Dockerfile로 필요한 confluent 패키지를 다운로드 받고, add_connector.sh 와 connect-distributed.properties 파일을 사용한다.

 

- add_connector.sh 에선 Kafka 연결을 확인하고 connect-mqtt-source.json파일대로 curl을 실행한다.

- connect-distributed.properties 파일로 connect 기본 설정을 한다.

 

1. Kafka, Kafka-Connect Docker Compose 생성

  kafka:
    image: bitnami/kafka:3.4
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      ALLOW_PLAINTEXT_LISTENER: yes
      KAFKA_BROKER_ID: 1
      KAFKA_CFG_PROCESS_ROLES: broker,controller
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: CLIENT
      KAFKA_CFG_LISTENERS: CONTROLLER://:9093,CLIENT://:9092
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,CLIENT:PLAINTEXT
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: true
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_CFG_ADVERTISED_LISTENERS: CLIENT://kafka:9092
      KAFKA_CFG_DELETE_TOPIC_ENABLE: true
      KAFKA_CFG_BROKER_ID: 1
      KAFKA_CFG_NODE_ID: 1
      KRAFT_MODE: true
      KAFKA_ENABLE_KRAFT: yes
      TZ: Asia/Seoul
    volumes:
      - ${KAFKA_DATA_DIR}:/bitnami/kafka/data

  kafka-connect:
    build: ./docker/kafka-connect
    container_name: kafka-connect
    ports:
      - "8083:8083"
    depends_on:
      - kafka
    volumes:
      - ${KAFKA_CONNECT_DATA_DIR}:/var/lib/kafka-connect

 

2. Kafka Connect Dockerfile 생성

프로젝트/docker/kafka-connect

FROM bitnami/kafka:3.4

ARG MODULE=kafka-connect
ENV MODULE=${MODULE}

USER root

WORKDIR /connectors

RUN apt-get update && apt-get upgrade -y && apt-get install unzip -y

RUN curl -O https://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
RUN tar -zxvf confluent-hub-client-latest.tar.gz
RUN rm -f confluent-hub-client-latest.tar.gz
RUN mkdir component && mkdir config && cd config && touch worker.properties
RUN cd /connectors/bin && ./confluent-hub install --no-prompt confluentinc/kafka-connect-mqtt:1.7.1 \
    --component-dir /connectors/component \
    --worker-configs /connectors/config/worker.properties
RUN cp -p /connectors/component/confluentinc-kafka-connect-mqtt/lib/* /opt/bitnami/kafka/libs

WORKDIR /tmp
COPY . .
CMD /tmp/add_connector.sh & /opt/bitnami/kafka/bin/connect-distributed.sh /tmp/connect-distributed.properties

 

3. connect-distributed.properties 생성

bootstrap.servers=kafka:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
converter.encoding=UTF-8
offset.flush.interval.ms=10000
topic.prefix=test
tasks.max=5
plugin.path=/connectors

 

4. connect-mqtt-source.json 생성

{
  "name": "mqtt-source-1",
  "config": {
    "bootstrap.servers": "kafka:9092",
    "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
    "tasks.max": "1",
    "mqtt.server.uri": "tcp://host.docker.internal:9000",
    "mqtt.topics": "data",
    "kafka.topic": "mqtt.data",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "confluent.topic.bootstrap.servers": "kafka:9092",
    "confluent.topic.replication.factor": 1
  }
}

 

5. add_connector.sh 생성

# Wait for Kafka Connect to start
while true
do
  status=$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
  if [ "$status" -eq 200 ]; then
    echo "Kafka Connect has started."
    break
  else
    echo "Waiting for Kafka Connect to start..."
    sleep 3
  fi
done


echo "Start to add kafka-connect"

# ADD Mqtt Source Connectors
curl -d @/tmp/connect-mqtt-source.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

echo "Complete to add kafka-connect"

 

 

6. Kafka consumer (Spring Boot) 사용법은 아래를 참고한다.

Spring boot | Spring Apache Kafka 사용법 ( with Docker Container ) | Consumer :: 티포의개발일지 (tistory.com)

 

Spring boot | Spring Apache Kafka 사용법 ( with Docker Container ) | Consumer

이번엔 Consumer 서버를 만들어보고 Producer 서버에서 생성한 토픽을 구독하여 읽어보기로 하자. 1. application.properties server.port=8081 spring.kafka.consumer.bootstrap-servers=localhost:9092 spring.kafka.consumer.auto-offset-

typo.tistory.com

 

7. docker compose 실행 후 mqtt에서 보낸 데이터를 확인한다.

 

728x90
반응형

+ Recent posts