728x90
반응형

Dockerfile로 필요한 confluent 패키지를 다운로드 받고, add_connector.sh 와 connect-distributed.properties 파일을 사용한다.

 

- add_connector.sh 에선 Kafka 연결을 확인하고 connect-mqtt-source.json파일대로 curl을 실행한다.

- connect-distributed.properties 파일로 connect 기본 설정을 한다.

 

1. Kafka, Kafka-Connect Docker Compose 생성

  kafka:
    image: bitnami/kafka:3.4
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      ALLOW_PLAINTEXT_LISTENER: yes
      KAFKA_BROKER_ID: 1
      KAFKA_CFG_PROCESS_ROLES: broker,controller
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: CLIENT
      KAFKA_CFG_LISTENERS: CONTROLLER://:9093,CLIENT://:9092
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,CLIENT:PLAINTEXT
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: true
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_CFG_ADVERTISED_LISTENERS: CLIENT://kafka:9092
      KAFKA_CFG_DELETE_TOPIC_ENABLE: true
      KAFKA_CFG_BROKER_ID: 1
      KAFKA_CFG_NODE_ID: 1
      KRAFT_MODE: true
      KAFKA_ENABLE_KRAFT: yes
      TZ: Asia/Seoul
    volumes:
      - ${KAFKA_DATA_DIR}:/bitnami/kafka/data

  kafka-connect:
    build: ./docker/kafka-connect
    container_name: kafka-connect
    ports:
      - "8083:8083"
    depends_on:
      - kafka
    volumes:
      - ${KAFKA_CONNECT_DATA_DIR}:/var/lib/kafka-connect

 

2. Kafka Connect Dockerfile 생성

프로젝트/docker/kafka-connect

FROM bitnami/kafka:3.4

ARG MODULE=kafka-connect
ENV MODULE=${MODULE}

USER root

WORKDIR /connectors

RUN apt-get update && apt-get upgrade -y && apt-get install unzip -y

RUN curl -O https://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
RUN tar -zxvf confluent-hub-client-latest.tar.gz
RUN rm -f confluent-hub-client-latest.tar.gz
RUN mkdir component && mkdir config && cd config && touch worker.properties
RUN cd /connectors/bin && ./confluent-hub install --no-prompt confluentinc/kafka-connect-mqtt:1.7.1 \
    --component-dir /connectors/component \
    --worker-configs /connectors/config/worker.properties
RUN cp -p /connectors/component/confluentinc-kafka-connect-mqtt/lib/* /opt/bitnami/kafka/libs

WORKDIR /tmp
COPY . .
CMD /tmp/add_connector.sh & /opt/bitnami/kafka/bin/connect-distributed.sh /tmp/connect-distributed.properties

 

3. connect-distributed.properties 생성

bootstrap.servers=kafka:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
converter.encoding=UTF-8
offset.flush.interval.ms=10000
topic.prefix=test
tasks.max=5
plugin.path=/connectors

 

4. connect-mqtt-source.json 생성

{
  "name": "mqtt-source-1",
  "config": {
    "bootstrap.servers": "kafka:9092",
    "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
    "tasks.max": "1",
    "mqtt.server.uri": "tcp://host.docker.internal:9000",
    "mqtt.topics": "data",
    "kafka.topic": "mqtt.data",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "confluent.topic.bootstrap.servers": "kafka:9092",
    "confluent.topic.replication.factor": 1
  }
}

 

5. add_connector.sh 생성

# Wait for Kafka Connect to start
while true
do
  status=$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
  if [ "$status" -eq 200 ]; then
    echo "Kafka Connect has started."
    break
  else
    echo "Waiting for Kafka Connect to start..."
    sleep 3
  fi
done


echo "Start to add kafka-connect"

# ADD Mqtt Source Connectors
curl -d @/tmp/connect-mqtt-source.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

echo "Complete to add kafka-connect"

 

 

6. Kafka consumer (Spring Boot) 사용법은 아래를 참고한다.

Spring boot | Spring Apache Kafka 사용법 ( with Docker Container ) | Consumer :: 티포의개발일지 (tistory.com)

 

Spring boot | Spring Apache Kafka 사용법 ( with Docker Container ) | Consumer

이번엔 Consumer 서버를 만들어보고 Producer 서버에서 생성한 토픽을 구독하여 읽어보기로 하자. 1. application.properties server.port=8081 spring.kafka.consumer.bootstrap-servers=localhost:9092 spring.kafka.consumer.auto-offset-

typo.tistory.com

 

7. docker compose 실행 후 mqtt에서 보낸 데이터를 확인한다.

 

728x90
반응형
728x90
반응형

1. Mosquitto Docker compose 설정

  mosquitto:
    container_name: lpms-mosquitto
    restart: always
    image: eclipse-mosquitto
    ports:
      - "9000:1883"
      - "9001:9001"
    volumes:
      - ${MOSQUITTO_DIR}/config/mosquitto.conf:/mosquitto/config/mosquitto.conf
      - ${MOSQUITTO_DIR}/data:/mosquitto/data
      - ${MOSQUITTO_DIR}/log:/mosquitto/log

- mosquitto.conf

allow_anonymous true
connection_messages true
log_type all
listener 1883

 

2. Spring boot build.gradle 추가

    implementation 'org.springframework.boot:spring-boot-starter-integration'
    implementation 'org.springframework.integration:spring-integration-mqtt'

 

3. MqttProperties 클래스 생성

@ConfigurationProperties(prefix = "mqtt")
@Component
@Data
@Validated
public class MqttProperties {

    private String name;
    private String password;
    private String url;
    private Integer qos;
    private String topic;

}

 

4. MqttConfig 클래스 생성

본 프로젝트에서는 outBound만 사용 할 예정이다.

@Configuration
@RequiredArgsConstructor
public class MqttConfig {

    private static final String MQTT_CLIENT_ID = MqttAsyncClient.generateClientId();
    private final MqttProperties properties;

    /**
     * DefaultMqttPahoClientFactory를 통해 MQTT 클라이언트를 등록
     */
    @Bean
    public DefaultMqttPahoClientFactory defaultMqttPahoClientFactory() {
        DefaultMqttPahoClientFactory clientFactory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setServerURIs(new String[]{properties.getUrl()});
        options.setUserName(properties.getName());
        options.setPassword(properties.getPassword().toCharArray());
        clientFactory.setConnectionOptions(options);
        return clientFactory;
    }

    /**
     * MQTT 클라이언트를 통해 메시지를 구독하기 위하여 MqttPahoMessageDrivenChannelAdapter를 통해 메시지 수신을 위한 채널을 구성
     */
//    @Bean
//    public MessageChannel mqttInputChannel() {
//        return new DirectChannel();
//    }
//
//    @Bean
//    public MessageProducer inboundChannel() {
//        MqttPahoMessageDrivenChannelAdapter adapter =
//            new MqttPahoMessageDrivenChannelAdapter(
//                properties.getUrl(),
//                MQTT_CLIENT_ID,
//                properties.getTopic());
//        adapter.setCompletionTimeout(5000);
//        adapter.setConverter(new DefaultPahoMessageConverter());
//        adapter.setQos(1);
//        adapter.setOutputChannel(mqttInputChannel());
//        return adapter;
//    }
//
//    @Bean
//    @ServiceActivator(inputChannel = "mqttInputChannel")
//    public MessageHandler inboundMessageHandler() {
//        return message -> {
//            String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
//            System.out.println("Topic:" + topic);
//            System.out.println("Payload" + message.getPayload());
//        };
//    }

    /**
     * Message outbound를 위한 채널 구성
     */

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(DefaultMqttPahoClientFactory clientFactory) {
        MqttPahoMessageHandler messageHandler =
            new MqttPahoMessageHandler(MQTT_CLIENT_ID, clientFactory);
        messageHandler.setAsync(true);
        messageHandler.setDefaultQos(1);
        return messageHandler;
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);

    }

}

 

5. MqttService 클래스 생성

@Service
@RequiredArgsConstructor
public class MqttService {

    private final MyGateway myGateway;

    public void send() {
        myGateway.sendToMqtt("12345", "/a/b/q");
    }
}

 

6. 도커 컨테이너 만들고, Spring 서버 킨 다음 필요한 곳에서 사용

728x90
반응형
728x90
반응형

1. 프로세스는 다음과 같다.

  1. Git Repository에 push 또는 merge
  2. Github Actions에서 이를 감지하고 빌드 및 테스트 실행
  3. 테스트까지 실행될 경우 Docker 이미지로 빌드
  4. 빌드된 이미지를 Dockerhub에 업로드
  5. EC2 Instance에 ssh로 접속 후 이미지를 pull

2. 파이프라인 구축

1. Github에 Repository를 만들고 프로젝트를 업로드한다.

2. Dockerfile을 만들어준다.

3. Github에 들어가 Actions에서 Java with Gradle configure버튼을 클릭한다.

4. 원하는 yml 파일 이름을 작성하고 아래 코드를 붙여넣는다.

 

# Workflow 이름
name: Spring Boot & Gradle CI/CD

# 어떤 이벤트가 발생하면 workflow 실행할 지 명시
on:
  # main 브랜치에 push나 pull request 발생 시
  push:
    branches: [ "main" ]
  pull_request:
    branches: [ "main" ]
    
permissions:
  contents: write
  
# 위 이벤트 발생 시 실행될 작업들
jobs:
  build:
    # VM의실행 환경 지정 => 우분투 최신 버전
    runs-on: ubuntu-latest
    
    # working directory
    defaults:
    	run:
        	working-directory: ./backend

    # 실행될 jobs를 순서대로 명시
    steps:
    - name: Checkout
      uses: actions/checkout@v3

    # JDK 11 설치
    - name: Set up JDK 11
      uses: actions/setup-java@v3
      with:
        java-version: '11'
        distribution: 'temurin'

    # Gradle Build를 위한 권한 부여
    - name: Grant execute permission for gradlew
      run: chmod +x gradlew

    # Gradle Build (test 제외)
    - name: Build with Gradle
      run: ./gradlew clean build --exclude-task test
      
    # 테스트 결과를 코멘트로 달아줌
    - name: Publish test result
      uses: EnricoMi/publish-unit-test-result-action@v1
      if: always()
      with:
        files: 'build/test-results/test/*.xml'
        
	# 테스트 실패 시 해당 부분에 코멘트 달아줌.
    - name: When test fail, a comment is registered in the error code line.
      uses: mikepenz/action-junit-report@v3
      if: always()
      with:
        report_paths: 'build/test-results/test/*.xml'

    # DockerHub 로그인
    - name: DockerHub Login
      uses: docker/login-action@v2
      with:
        username: ${{ secrets.DOCKERHUB_USERNAME }}
        password: ${{ secrets.DOCKERHUB_PASSWORD }}

    # Docker 이미지 빌드
    - name: Docker Image Build
      run: docker build -t ${{ secrets.DOCKERHUB_USERNAME }}/${{ secrets.PROJECT_NAME }} .

    # DockerHub Push
    - name: DockerHub Push
      run: docker push ${{ secrets.DOCKERHUB_USERNAME }}/${{ secrets.PROJECT_NAME }}

    # EC2 인스턴스 접속 및 애플리케이션 실행
    - name: Application Run
      uses: appleboy/ssh-action@v0.1.6
      with:
        host: ${{ secrets.EC2_HOST }}
        username: ${{ secrets.EC2_USERNAME }}
        key: ${{ secrets.EC2_KEY }}

        script: |
          sudo docker kill ${{ secrets.PROJECT_NAME }}
          sudo docker rm -f ${{ secrets.PROJECT_NAME }}
          sudo docker rmi ${{ secrets.DOCKERHUB_USERNAME }}/${{ secrets.PROJECT_NAME }}
          sudo docker pull ${{ secrets.DOCKERHUB_USERNAME }}/${{ secrets.PROJECT_NAME }}

          sudo docker run -p ${{ secrets.PORT }}:${{ secrets.PORT }} \
          --name ${{ secrets.PROJECT_NAME }} \
          -e SPRING_DATASOURCE_URL=${{ secrets.DB_URL }} \
          -e SPRING_DATASOURCE_USERNAME=${{ secrets.DB_USERNAME }} \
          -e SPRING_DATASOURCE_PASSWORD=${{ secrets.DB_PASSWORD }} \
          -d ${{ secrets.DOCKERHUB_USERNAME }}/${{ secrets.PROJECT_NAME }}
  • ${{}} 변수는 환경변수이다.
  • 완성되면 .github/workflows 경로에 yml 파일이 추가된다.
  • 수정하고 싶으면 이 파일을 수정하고, 다른 workflow 파일을 작성해도 된다.
  • 만약 안된다면 Github Settings -> Developer Settings -> Personal access tokens에서 생성한 토큰에 들어가 workflow를 체크해주면 된다.

5. Settings -> Secrets and variables -> Actions 에서 환경변수 추가

  • DOCKERHUB_USERNAME : 본인의 Docker Hub Username
  • DOCKERHUB_PASSWORD : 본인의 Docker Hub Password
  • PROJECT_NAME : 프로젝트 이름 (ci-cd-practice) => 이 이름으로 Docker Hub에 올라가게 되고, Docker Container 이름도 이 이름으로 설정할 예정
  • PORT : Docker을 실행시킬 포트 번호 (ex: 8080)
  • DB_URL : 프로젝트에 사용될 DB의 URL (ex:  jdbc:mysql://RDS주소:DB포트/DB명)
  • DB_USERNAME : 프로젝트에 사용될 DB의 Username (ex: root)
  • DB_PASSWORD : 프로젝트에 사용될 DB의 Password
  • EC2_HOST : AWS EC2 인스턴스의 퍼블릭 IPv4 DNS (ex: ec2-52-79-213-143.ap-northeast-2.compute.amazonaws.com)
  • EC2_USERNAME : AWS EC2 인스턴스의 Username (ex: ubuntu)
  • EC2_KEY : AWS EC2 인스턴스를 생성할 때 저장된 pem 키
    • MAC을 기준으로 터미널에서 cat <pem 키 경로>를 입력하면 (드래그앤 드롭해도 됨) '-----BEGIN RSA PRIVATE KEY-----'부터 '-----END RSA PRIVATE KEY-----'까지(맨 뒤에 %빼고)를 복사해서 이 값으로 넣어주면 됨

 

6. workflow를 실행해준다.

 

3. 테스트 실패 시 Merge 막기

Repository Settings  Branches  Add rule 을 선택한다.

 

Branch name pattern을 설정하고,
Require status checks to pass before merging 설정을 통해 merge를 위해 통과해야할 Action들을 선택할 수 있다.

설정 후에 다음과 같이 merge를 못하도록 막혀져있는 것을 확인할 수 있다.
(지금은 admin 이라 강제로 merge할 수 있게 merge 버튼이 활성화돼있지만, admin이 아닌 경우 merge 버튼이 비활성화 된다)

 

참고

[CI/CD] Github Actions를 활용한 CI/CD 파이프라인 구축 (+ Docker hub) (tistory.com)

 

 

728x90
반응형
728x90
반응형

Spring Security를 사용할 때 UserDetail 객체를 사용하는데, 예를들어

@AllArgsConstructor
public class UserDetailsImpl implements UserDetails {

@Getter
private Long id;

위와 같은 애들을 Request로 받아올 수 있다.

 

아래와 같이 사용하면 된다.

 

@PostMapping("/logout")
public CommonResponse<Boolean> logout(
@AuthenticationPrincipal UserDetailsImpl userDetails,
HttpServletResponse response
) {
728x90
반응형
728x90
반응형

기본적으로 LocalDateTime에는 timezone이 없기 때문에 따로 세팅을 해주어야한다.

먼저 now를 생성할 때 다음과 같이 UTC 시간을 준다.

LocalDateTime now = LocalDateTime.now(ZoneOffset.UTC);

 

그 다음 timezone을 세팅할 때 다음과 같이 해준다.

now..atZone(TimeZone.getDefault().toZoneId())
                                .format(DateTimeFormatter.RFC_1123_DATE_TIME)

 

어떤 형식으로 보여줄 지는 다음 사이트를 참고하면 된다.

DateTimeFormatter (Java Platform SE 8 ) (oracle.com)

 

DateTimeFormatter (Java Platform SE 8 )

Parses the text using this formatter, without resolving the result, intended for advanced use cases. Parsing is implemented as a two-phase operation. First, the text is parsed using the layout defined by the formatter, producing a Map of field to value, a

docs.oracle.com

 

728x90
반응형
728x90
반응형

Microsoft Teams로 알림메세지를 보낼 경우 다음과 같이 구현할 수 있다.

 

1. TeamsWebhookService

@Service
@Scope(scopeName = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
@RequiredArgsConstructor
public class TeamsWebhookService {

    public void send(TeamsWebhookMessageDto dto) {
        SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
        factory.setConnectTimeout(10000);
        factory.setReadTimeout(10000);
        HttpHeaders httpHeaders = new HttpHeaders();
        RestTemplate restTemplate = new RestTemplate(factory);
        httpHeaders.setContentType(new MediaType("application", "json", StandardCharsets.UTF_8));

        HttpEntity<TeamsWebhookMessageDto> request = new HttpEntity<>(dto, httpHeaders);

        try {
            restTemplate.postForLocation(new URI(dto.getUrl()), request);
        } catch (URISyntaxException e) {
            log.error(e.getMessage());
            throw new RuntimeException(e);
        }
    }

    public List<Map<String, Object>> makeAttachments(
        String title
    ) {
        List<Map<String, Object>> attachments = List.of(
            Map.of(
                "contentType", "application/vnd.microsoft.card.adaptive",
                "content", new Content(
                    List.of(
                        Map.of(
                            "type", "Container",
                            "items", List.of(
                                Map.of(
                                    "type", "TextBlock",
                                    "text", title,
                                    "weight","bolder",
                                    "size","medium"
                                )
                            )
                        ),
                        Map.of(
                            "type", "Container",
                            "items", List.of(
                                Map.of(
                                    "type", "FactSet",
                                    "facts", List.of(
                                        Map.of(
                                            "title","title: ",
                                            "value","value"
                                        )
                                    )
                                )
                            )
                        )
                    )
                )
            )
        );
        return attachments;
    }
}

 

2. MessageDto

@Data
public class TeamsWebhookMessageDto {

    private String url;
    private String type = "message";
    private List<Map<String, Object>> attachments;

    @Data
    public static class Content {

        @JsonProperty("$schema")
        private String schema;
        private String type;
        private String version ;
        private List<Map<String, Object>> body;

        public Content(List<Map<String, Object>> body) {
            this.schema = "http://adaptivecards.io/schemas/adaptive-card.json";
            this.type = "AdaptiveCard";
            this.version = "1.0";
            this.body = body;
        }
    }

    public TeamsWebhookMessageDto(List<Map<String, Object>> attachments) {
        this.attachments = attachments;
    }

}

 

3. send 부분

TeamsWebhookMessageDto messageDto = new TeamsWebhookMessageDto(
                    teamsWebhookService.makeAttachments("title")
                );
                messageDto.setUrl("URL 들어가는 부분");

                teamsWebhookService.send(messageDto);

 

- Adaptive Card Example code

Schema Explorer | Adaptive Cards

 

Schema Explorer | Adaptive Cards

Schema Explorer Choose element: Important note about accessibility: In version 1.3 of the schema we introduced a label property on Inputs to improve accessibility. If the Host app you are targeting supports v1.3 you should use label instead of a TextBlock

adaptivecards.io

 

728x90
반응형
728x90
반응형

자바스크립트에서 참 편했던 점이 따로 객체를 만들지 않고 중괄호나 대괄호로 바로 리스트 맵을 만들 수 있었던 점이였다.

자바에서도 똑같진 않지만 아래와 같이 편하게 초깃값이 주어진 채로 선언할 수 있다.

 

public static List<Map<String, Integer>> eventList = Arrays.asList(
        new HashMap<>() {{
            put(value.name(), i);
        }},
        new HashMap<>() {{
            put("NO_ERROR", 0x00000000);
        }}
    );
728x90
반응형
728x90
반응형

삭제 명령을 내릴 때 데이터가 삭제되는 것이 아니라 다른 액션을 주고 싶을 때 

@SQLDelete 어노테이션을 쓰면 간단하게 해결할 수 있다.

@SQLDelete(sql = "UPDATE my_table SET deleted_at = current_timestamp WHERE id = ?")
public class MyTable {

...

@Column
private LocalDateTime deletedAt;


}

 

위의 예시는 삭제된 시점을 deleted_at 컬럼으로 지정한 것이다.

 

데이터를 조회할 땐 deleted_at이 null인지 여부를 따져서 조회하면 된다.

728x90
반응형
728x90
반응형

순회를 하는 도중에 무언가 작업을 한다면 ConcurrentModificationException 이 뜰 수 있다.

다음과 같이 써보자.

 

 

 

List<String> list = new ArrayList<>();

list.add("str1");
list.add("str2");
list.add("str3");

Iterator<String> iterator = list.iterator();
while (iterator.hasNext()) {
    String str = iterator.next();
    if ("str1".equals(str)) {
        iterator.remove();
    }
}

 

아래와 같이 removeIf 메소드로 간편하게 사용할 수도 있다.

 

list.removeIf(event -> !otherList.contains(event));
728x90
반응형
728x90
반응형

일반 리스트에서 내가 원하는 Key를 가진 Map List로 만들 경우 다음과 같이 Collectors를 사용해 편하게 만들 수 있다.

public Map<Integer, Animal> convertListAfterJava8(List<Animal> list) {
    Map<Integer, Animal> map = list.stream()
      .collect(Collectors.toMap(Animal::getId, Function.identity()));
    return map;
}

 

728x90
반응형

+ Recent posts