728x90
반응형

Kafka SASL에 관한 정보는 구글링해도 잘 나온다.

하지만 세팅하는 부분은 생각보다 별로 없어서 여기 적어놔야겠다.

1. Kafka Broker Docker Compose

  kafka:
    image: bitnami/kafka:3.4
    hostname: kafka
    ports:
      - 9092:9092
    environment:
      KAFKA_CLIENT_USERS: user
      KAFKA_CLIENT_PASSWORDS: pass
      ALLOW_PLAINTEXT_LISTENER: yes
      KAFKA_BROKER_ID: 1
      KAFKA_CFG_PROCESS_ROLES: broker,controller
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: CLIENT
      KAFKA_CFG_LISTENERS: CONTROLLER://:9093,CLIENT://:9092
      KAFKA_CFG_ADVERTISED_LISTENERS: CLIENT://host.docker.internal:9092
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,CLIENT:SASL_PLAINTEXT
      KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: true
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_CFG_DELETE_TOPIC_ENABLE: true
      KAFKA_CFG_BROKER_ID: 1
      KAFKA_CFG_NODE_ID: 1
      KRAFT_MODE: true
      KAFKA_ENABLE_KRAFT: yes
      KAFKA_MESSAGE_MAX_BYTES: 2000000000
      TZ: Asia/Seoul
    volumes:
      - ${KAFKA_DATA_DIR}:/bitnami/kafka/data

위와 같이 Users, Passwords를 추가해주고 ( 여러개여도 상관 없음. )

SASL에 관한 필드를 추가해준다.

 

2. Producer

  kafka:
    producer:
      bootstrap-servers: host.docker.internal:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    properties:
      security.protocol: SASL_PLAINTEXT
      sasl.mechanism: PLAIN
      sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="pass!";

 

3. Consumer

  kafka:
    consumer:
      bootstrap-servers: kafka:9092
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    properties:
      security.protocol: SASL_PLAINTEXT
      sasl.mechanism: PLAIN
      sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="pass!";
728x90
반응형

'Kafka' 카테고리의 다른 글

Kafka | Verify Server Connection  (0) 2024.02.22
728x90
반응형

kafkaTemplate.send() 메소드에는 훅이 없다. 혹여나 Kafka 서버가 다운될 경우, kafkaTemplate.send() 메소드를 호출 자체를 막는게 좋은데, 마땅한 조건이 없을 경우 다음과 같은 함수를 만들어 검증해보자.

 

public boolean verifyConnection() throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrap);
        props.put("request.timeout.ms", 3000);
        props.put("connections.max.idle.ms", 5000);

        AdminClient client = AdminClient.create(props);
        Collection<Node> nodes = client.describeCluster()
            .nodes()
            .get();
        return nodes != null && nodes.size() > 0;
    }

 

또한, try catch절로 예외처리를 할 수도 있다.

    public boolean verifyConnection(String message, MessageType type) {
        
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrap);
        props.put("request.timeout.ms", 500);
        props.put("connections.max.idle.ms", 900);

        AdminClient client = AdminClient.create(props);
        try {
            client.describeCluster()
                .nodes()
                .get();
        } catch (InterruptedException | ExecutionException e) {
            
            return false;
        }
        return true;
    }

 

이 방법이 마음에 안들면 정석은 아니지만 Sockek 객체로 해당 포트가 살아있는지 확인하는 방법도 있다.

Socket socket = new Socket();
        String[] bootstrapAddress = bootstrapServers.split(":");
        try {
            socket.connect(new InetSocketAddress(bootstrapAddress[0],
                Integer.parseInt(bootstrapAddress[1])));
            socket.close();
            return true;
        } catch (Exception e) {
            log.error("[Kafka Server is not available] message = {}", e.getMessage());
            return false;
        }
728x90
반응형

'Kafka' 카테고리의 다른 글

Kafka | SASL(PLAINTEXT) | 세팅하기 with Docker Compose  (0) 2024.02.23

+ Recent posts