728x90
반응형

Dockerfile로 필요한 confluent 패키지를 다운로드 받고, add_connector.sh 와 connect-distributed.properties 파일을 사용한다.

 

- add_connector.sh 에선 Kafka 연결을 확인하고 connect-mqtt-source.json파일대로 curl을 실행한다.

- connect-distributed.properties 파일로 connect 기본 설정을 한다.

 

1. Kafka, Kafka-Connect Docker Compose 생성

  kafka:
    image: bitnami/kafka:3.4
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      ALLOW_PLAINTEXT_LISTENER: yes
      KAFKA_BROKER_ID: 1
      KAFKA_CFG_PROCESS_ROLES: broker,controller
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: CLIENT
      KAFKA_CFG_LISTENERS: CONTROLLER://:9093,CLIENT://:9092
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,CLIENT:PLAINTEXT
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: true
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_CFG_ADVERTISED_LISTENERS: CLIENT://kafka:9092
      KAFKA_CFG_DELETE_TOPIC_ENABLE: true
      KAFKA_CFG_BROKER_ID: 1
      KAFKA_CFG_NODE_ID: 1
      KRAFT_MODE: true
      KAFKA_ENABLE_KRAFT: yes
      TZ: Asia/Seoul
    volumes:
      - ${KAFKA_DATA_DIR}:/bitnami/kafka/data

  kafka-connect:
    build: ./docker/kafka-connect
    container_name: kafka-connect
    ports:
      - "8083:8083"
    depends_on:
      - kafka
    volumes:
      - ${KAFKA_CONNECT_DATA_DIR}:/var/lib/kafka-connect

 

2. Kafka Connect Dockerfile 생성

프로젝트/docker/kafka-connect

FROM bitnami/kafka:3.4

ARG MODULE=kafka-connect
ENV MODULE=${MODULE}

USER root

WORKDIR /connectors

RUN apt-get update && apt-get upgrade -y && apt-get install unzip -y

RUN curl -O https://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
RUN tar -zxvf confluent-hub-client-latest.tar.gz
RUN rm -f confluent-hub-client-latest.tar.gz
RUN mkdir component && mkdir config && cd config && touch worker.properties
RUN cd /connectors/bin && ./confluent-hub install --no-prompt confluentinc/kafka-connect-mqtt:1.7.1 \
    --component-dir /connectors/component \
    --worker-configs /connectors/config/worker.properties
RUN cp -p /connectors/component/confluentinc-kafka-connect-mqtt/lib/* /opt/bitnami/kafka/libs

WORKDIR /tmp
COPY . .
CMD /tmp/add_connector.sh & /opt/bitnami/kafka/bin/connect-distributed.sh /tmp/connect-distributed.properties

 

3. connect-distributed.properties 생성

bootstrap.servers=kafka:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
converter.encoding=UTF-8
offset.flush.interval.ms=10000
topic.prefix=test
tasks.max=5
plugin.path=/connectors

 

4. connect-mqtt-source.json 생성

{
  "name": "mqtt-source-1",
  "config": {
    "bootstrap.servers": "kafka:9092",
    "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
    "tasks.max": "1",
    "mqtt.server.uri": "tcp://host.docker.internal:9000",
    "mqtt.topics": "data",
    "kafka.topic": "mqtt.data",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "confluent.topic.bootstrap.servers": "kafka:9092",
    "confluent.topic.replication.factor": 1
  }
}

 

5. add_connector.sh 생성

# Wait for Kafka Connect to start
while true
do
  status=$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
  if [ "$status" -eq 200 ]; then
    echo "Kafka Connect has started."
    break
  else
    echo "Waiting for Kafka Connect to start..."
    sleep 3
  fi
done


echo "Start to add kafka-connect"

# ADD Mqtt Source Connectors
curl -d @/tmp/connect-mqtt-source.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

echo "Complete to add kafka-connect"

 

 

6. Kafka consumer (Spring Boot) 사용법은 아래를 참고한다.

Spring boot | Spring Apache Kafka 사용법 ( with Docker Container ) | Consumer :: 티포의개발일지 (tistory.com)

 

Spring boot | Spring Apache Kafka 사용법 ( with Docker Container ) | Consumer

이번엔 Consumer 서버를 만들어보고 Producer 서버에서 생성한 토픽을 구독하여 읽어보기로 하자. 1. application.properties server.port=8081 spring.kafka.consumer.bootstrap-servers=localhost:9092 spring.kafka.consumer.auto-offset-

typo.tistory.com

 

7. docker compose 실행 후 mqtt에서 보낸 데이터를 확인한다.

 

728x90
반응형
728x90
반응형

1. Mosquitto Docker compose 설정

  mosquitto:
    container_name: lpms-mosquitto
    restart: always
    image: eclipse-mosquitto
    ports:
      - "9000:1883"
      - "9001:9001"
    volumes:
      - ${MOSQUITTO_DIR}/config/mosquitto.conf:/mosquitto/config/mosquitto.conf
      - ${MOSQUITTO_DIR}/data:/mosquitto/data
      - ${MOSQUITTO_DIR}/log:/mosquitto/log

- mosquitto.conf

allow_anonymous true
connection_messages true
log_type all
listener 1883

 

2. Spring boot build.gradle 추가

    implementation 'org.springframework.boot:spring-boot-starter-integration'
    implementation 'org.springframework.integration:spring-integration-mqtt'

 

3. MqttProperties 클래스 생성

@ConfigurationProperties(prefix = "mqtt")
@Component
@Data
@Validated
public class MqttProperties {

    private String name;
    private String password;
    private String url;
    private Integer qos;
    private String topic;

}

 

4. MqttConfig 클래스 생성

본 프로젝트에서는 outBound만 사용 할 예정이다.

@Configuration
@RequiredArgsConstructor
public class MqttConfig {

    private static final String MQTT_CLIENT_ID = MqttAsyncClient.generateClientId();
    private final MqttProperties properties;

    /**
     * DefaultMqttPahoClientFactory를 통해 MQTT 클라이언트를 등록
     */
    @Bean
    public DefaultMqttPahoClientFactory defaultMqttPahoClientFactory() {
        DefaultMqttPahoClientFactory clientFactory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setServerURIs(new String[]{properties.getUrl()});
        options.setUserName(properties.getName());
        options.setPassword(properties.getPassword().toCharArray());
        clientFactory.setConnectionOptions(options);
        return clientFactory;
    }

    /**
     * MQTT 클라이언트를 통해 메시지를 구독하기 위하여 MqttPahoMessageDrivenChannelAdapter를 통해 메시지 수신을 위한 채널을 구성
     */
//    @Bean
//    public MessageChannel mqttInputChannel() {
//        return new DirectChannel();
//    }
//
//    @Bean
//    public MessageProducer inboundChannel() {
//        MqttPahoMessageDrivenChannelAdapter adapter =
//            new MqttPahoMessageDrivenChannelAdapter(
//                properties.getUrl(),
//                MQTT_CLIENT_ID,
//                properties.getTopic());
//        adapter.setCompletionTimeout(5000);
//        adapter.setConverter(new DefaultPahoMessageConverter());
//        adapter.setQos(1);
//        adapter.setOutputChannel(mqttInputChannel());
//        return adapter;
//    }
//
//    @Bean
//    @ServiceActivator(inputChannel = "mqttInputChannel")
//    public MessageHandler inboundMessageHandler() {
//        return message -> {
//            String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
//            System.out.println("Topic:" + topic);
//            System.out.println("Payload" + message.getPayload());
//        };
//    }

    /**
     * Message outbound를 위한 채널 구성
     */

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(DefaultMqttPahoClientFactory clientFactory) {
        MqttPahoMessageHandler messageHandler =
            new MqttPahoMessageHandler(MQTT_CLIENT_ID, clientFactory);
        messageHandler.setAsync(true);
        messageHandler.setDefaultQos(1);
        return messageHandler;
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);

    }

}

 

5. MqttService 클래스 생성

@Service
@RequiredArgsConstructor
public class MqttService {

    private final MyGateway myGateway;

    public void send() {
        myGateway.sendToMqtt("12345", "/a/b/q");
    }
}

 

6. 도커 컨테이너 만들고, Spring 서버 킨 다음 필요한 곳에서 사용

728x90
반응형
728x90
반응형

Spring Security를 사용할 때 UserDetail 객체를 사용하는데, 예를들어

@AllArgsConstructor
public class UserDetailsImpl implements UserDetails {

@Getter
private Long id;

위와 같은 애들을 Request로 받아올 수 있다.

 

아래와 같이 사용하면 된다.

 

@PostMapping("/logout")
public CommonResponse<Boolean> logout(
@AuthenticationPrincipal UserDetailsImpl userDetails,
HttpServletResponse response
) {
728x90
반응형
728x90
반응형

기본적으로 LocalDateTime에는 timezone이 없기 때문에 따로 세팅을 해주어야한다.

먼저 now를 생성할 때 다음과 같이 UTC 시간을 준다.

LocalDateTime now = LocalDateTime.now(ZoneOffset.UTC);

 

그 다음 timezone을 세팅할 때 다음과 같이 해준다.

now..atZone(TimeZone.getDefault().toZoneId())
                                .format(DateTimeFormatter.RFC_1123_DATE_TIME)

 

어떤 형식으로 보여줄 지는 다음 사이트를 참고하면 된다.

DateTimeFormatter (Java Platform SE 8 ) (oracle.com)

 

DateTimeFormatter (Java Platform SE 8 )

Parses the text using this formatter, without resolving the result, intended for advanced use cases. Parsing is implemented as a two-phase operation. First, the text is parsed using the layout defined by the formatter, producing a Map of field to value, a

docs.oracle.com

 

728x90
반응형
728x90
반응형

Microsoft Teams로 알림메세지를 보낼 경우 다음과 같이 구현할 수 있다.

 

1. TeamsWebhookService

@Service
@Scope(scopeName = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
@RequiredArgsConstructor
public class TeamsWebhookService {

    public void send(TeamsWebhookMessageDto dto) {
        SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
        factory.setConnectTimeout(10000);
        factory.setReadTimeout(10000);
        HttpHeaders httpHeaders = new HttpHeaders();
        RestTemplate restTemplate = new RestTemplate(factory);
        httpHeaders.setContentType(new MediaType("application", "json", StandardCharsets.UTF_8));

        HttpEntity<TeamsWebhookMessageDto> request = new HttpEntity<>(dto, httpHeaders);

        try {
            restTemplate.postForLocation(new URI(dto.getUrl()), request);
        } catch (URISyntaxException e) {
            log.error(e.getMessage());
            throw new RuntimeException(e);
        }
    }

    public List<Map<String, Object>> makeAttachments(
        String title
    ) {
        List<Map<String, Object>> attachments = List.of(
            Map.of(
                "contentType", "application/vnd.microsoft.card.adaptive",
                "content", new Content(
                    List.of(
                        Map.of(
                            "type", "Container",
                            "items", List.of(
                                Map.of(
                                    "type", "TextBlock",
                                    "text", title,
                                    "weight","bolder",
                                    "size","medium"
                                )
                            )
                        ),
                        Map.of(
                            "type", "Container",
                            "items", List.of(
                                Map.of(
                                    "type", "FactSet",
                                    "facts", List.of(
                                        Map.of(
                                            "title","title: ",
                                            "value","value"
                                        )
                                    )
                                )
                            )
                        )
                    )
                )
            )
        );
        return attachments;
    }
}

 

2. MessageDto

@Data
public class TeamsWebhookMessageDto {

    private String url;
    private String type = "message";
    private List<Map<String, Object>> attachments;

    @Data
    public static class Content {

        @JsonProperty("$schema")
        private String schema;
        private String type;
        private String version ;
        private List<Map<String, Object>> body;

        public Content(List<Map<String, Object>> body) {
            this.schema = "http://adaptivecards.io/schemas/adaptive-card.json";
            this.type = "AdaptiveCard";
            this.version = "1.0";
            this.body = body;
        }
    }

    public TeamsWebhookMessageDto(List<Map<String, Object>> attachments) {
        this.attachments = attachments;
    }

}

 

3. send 부분

TeamsWebhookMessageDto messageDto = new TeamsWebhookMessageDto(
                    teamsWebhookService.makeAttachments("title")
                );
                messageDto.setUrl("URL 들어가는 부분");

                teamsWebhookService.send(messageDto);

 

- Adaptive Card Example code

Schema Explorer | Adaptive Cards

 

Schema Explorer | Adaptive Cards

Schema Explorer Choose element: Important note about accessibility: In version 1.3 of the schema we introduced a label property on Inputs to improve accessibility. If the Host app you are targeting supports v1.3 you should use label instead of a TextBlock

adaptivecards.io

 

728x90
반응형
728x90
반응형

자바스크립트에서 참 편했던 점이 따로 객체를 만들지 않고 중괄호나 대괄호로 바로 리스트 맵을 만들 수 있었던 점이였다.

자바에서도 똑같진 않지만 아래와 같이 편하게 초깃값이 주어진 채로 선언할 수 있다.

 

public static List<Map<String, Integer>> eventList = Arrays.asList(
        new HashMap<>() {{
            put(value.name(), i);
        }},
        new HashMap<>() {{
            put("NO_ERROR", 0x00000000);
        }}
    );
728x90
반응형
728x90
반응형

삭제 명령을 내릴 때 데이터가 삭제되는 것이 아니라 다른 액션을 주고 싶을 때 

@SQLDelete 어노테이션을 쓰면 간단하게 해결할 수 있다.

@SQLDelete(sql = "UPDATE my_table SET deleted_at = current_timestamp WHERE id = ?")
public class MyTable {

...

@Column
private LocalDateTime deletedAt;


}

 

위의 예시는 삭제된 시점을 deleted_at 컬럼으로 지정한 것이다.

 

데이터를 조회할 땐 deleted_at이 null인지 여부를 따져서 조회하면 된다.

728x90
반응형
728x90
반응형

순회를 하는 도중에 무언가 작업을 한다면 ConcurrentModificationException 이 뜰 수 있다.

다음과 같이 써보자.

 

 

 

List<String> list = new ArrayList<>();

list.add("str1");
list.add("str2");
list.add("str3");

Iterator<String> iterator = list.iterator();
while (iterator.hasNext()) {
    String str = iterator.next();
    if ("str1".equals(str)) {
        iterator.remove();
    }
}

 

아래와 같이 removeIf 메소드로 간편하게 사용할 수도 있다.

 

list.removeIf(event -> !otherList.contains(event));
728x90
반응형
728x90
반응형

일반 리스트에서 내가 원하는 Key를 가진 Map List로 만들 경우 다음과 같이 Collectors를 사용해 편하게 만들 수 있다.

public Map<Integer, Animal> convertListAfterJava8(List<Animal> list) {
    Map<Integer, Animal> map = list.stream()
      .collect(Collectors.toMap(Animal::getId, Function.identity()));
    return map;
}

 

728x90
반응형
728x90
반응형

Enum 의 name 들만 따로 list로 만들고 싶을 경우 다음과 같이 만들 수 있다.

public enum State {
    NEW,
    RUNNABLE,
    BLOCKED,
    WAITING,
    TIMED_WAITING,
    TERMINATED;

    public static String[] names() {
        // ...
    }
}

 

public static String[] getNames(Class<? extends Enum<?>> e) {
    return Arrays.toString(e.getEnumConstants()).replaceAll("^.|.$", "").split(", ");
}

 

아래와 같이 파라미터를 자유자재로 쓸 수도 있다.

public String[] getNames() {
    return Arrays.stream(MyEnum.class.getEnumConstants()).map(Enum::name)
        .toArray(String[]::new);
}
        
public String[] getNames(Class<? extends Enum<?>> e) {
    return Arrays.stream(e.getEnumConstants()).map(Enum::name)
        .toArray(String[]::new);
}

public List<String> getNames(Class<? extends Enum<?>> e) {
        return Arrays.stream(e.getEnumConstants()).map(Enum::name).toList();
    }
728x90
반응형

+ Recent posts