728x90
반응형

1. Mosquitto Docker compose 설정

  mosquitto:
    container_name: lpms-mosquitto
    restart: always
    image: eclipse-mosquitto
    ports:
      - "9000:1883"
      - "9001:9001"
    volumes:
      - ${MOSQUITTO_DIR}/config/mosquitto.conf:/mosquitto/config/mosquitto.conf
      - ${MOSQUITTO_DIR}/data:/mosquitto/data
      - ${MOSQUITTO_DIR}/log:/mosquitto/log

- mosquitto.conf

allow_anonymous true
connection_messages true
log_type all
listener 1883

 

2. Spring boot build.gradle 추가

    implementation 'org.springframework.boot:spring-boot-starter-integration'
    implementation 'org.springframework.integration:spring-integration-mqtt'

 

3. MqttProperties 클래스 생성

@ConfigurationProperties(prefix = "mqtt")
@Component
@Data
@Validated
public class MqttProperties {

    private String name;
    private String password;
    private String url;
    private Integer qos;
    private String topic;

}

 

4. MqttConfig 클래스 생성

본 프로젝트에서는 outBound만 사용 할 예정이다.

@Configuration
@RequiredArgsConstructor
public class MqttConfig {

    private static final String MQTT_CLIENT_ID = MqttAsyncClient.generateClientId();
    private final MqttProperties properties;

    /**
     * DefaultMqttPahoClientFactory를 통해 MQTT 클라이언트를 등록
     */
    @Bean
    public DefaultMqttPahoClientFactory defaultMqttPahoClientFactory() {
        DefaultMqttPahoClientFactory clientFactory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setServerURIs(new String[]{properties.getUrl()});
        options.setUserName(properties.getName());
        options.setPassword(properties.getPassword().toCharArray());
        clientFactory.setConnectionOptions(options);
        return clientFactory;
    }

    /**
     * MQTT 클라이언트를 통해 메시지를 구독하기 위하여 MqttPahoMessageDrivenChannelAdapter를 통해 메시지 수신을 위한 채널을 구성
     */
//    @Bean
//    public MessageChannel mqttInputChannel() {
//        return new DirectChannel();
//    }
//
//    @Bean
//    public MessageProducer inboundChannel() {
//        MqttPahoMessageDrivenChannelAdapter adapter =
//            new MqttPahoMessageDrivenChannelAdapter(
//                properties.getUrl(),
//                MQTT_CLIENT_ID,
//                properties.getTopic());
//        adapter.setCompletionTimeout(5000);
//        adapter.setConverter(new DefaultPahoMessageConverter());
//        adapter.setQos(1);
//        adapter.setOutputChannel(mqttInputChannel());
//        return adapter;
//    }
//
//    @Bean
//    @ServiceActivator(inputChannel = "mqttInputChannel")
//    public MessageHandler inboundMessageHandler() {
//        return message -> {
//            String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
//            System.out.println("Topic:" + topic);
//            System.out.println("Payload" + message.getPayload());
//        };
//    }

    /**
     * Message outbound를 위한 채널 구성
     */

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(DefaultMqttPahoClientFactory clientFactory) {
        MqttPahoMessageHandler messageHandler =
            new MqttPahoMessageHandler(MQTT_CLIENT_ID, clientFactory);
        messageHandler.setAsync(true);
        messageHandler.setDefaultQos(1);
        return messageHandler;
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);

    }

}

 

5. MqttService 클래스 생성

@Service
@RequiredArgsConstructor
public class MqttService {

    private final MyGateway myGateway;

    public void send() {
        myGateway.sendToMqtt("12345", "/a/b/q");
    }
}

 

6. 도커 컨테이너 만들고, Spring 서버 킨 다음 필요한 곳에서 사용

728x90
반응형

+ Recent posts