Back-End/Spring Boot
Spring Boot | MQTT ( Mosquitto ) with Kafka | MQTT 사용하기
개발자티포
2024. 1. 10. 15:23
728x90
반응형
1. Mosquitto Docker compose 설정
mosquitto:
container_name: lpms-mosquitto
restart: always
image: eclipse-mosquitto
ports:
- "9000:1883"
- "9001:9001"
volumes:
- ${MOSQUITTO_DIR}/config/mosquitto.conf:/mosquitto/config/mosquitto.conf
- ${MOSQUITTO_DIR}/data:/mosquitto/data
- ${MOSQUITTO_DIR}/log:/mosquitto/log
- mosquitto.conf
allow_anonymous true
connection_messages true
log_type all
listener 1883
2. Spring boot build.gradle 추가
implementation 'org.springframework.boot:spring-boot-starter-integration'
implementation 'org.springframework.integration:spring-integration-mqtt'
3. MqttProperties 클래스 생성
@ConfigurationProperties(prefix = "mqtt")
@Component
@Data
@Validated
public class MqttProperties {
private String name;
private String password;
private String url;
private Integer qos;
private String topic;
}
4. MqttConfig 클래스 생성
본 프로젝트에서는 outBound만 사용 할 예정이다.
@Configuration
@RequiredArgsConstructor
public class MqttConfig {
private static final String MQTT_CLIENT_ID = MqttAsyncClient.generateClientId();
private final MqttProperties properties;
/**
* DefaultMqttPahoClientFactory를 통해 MQTT 클라이언트를 등록
*/
@Bean
public DefaultMqttPahoClientFactory defaultMqttPahoClientFactory() {
DefaultMqttPahoClientFactory clientFactory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setServerURIs(new String[]{properties.getUrl()});
options.setUserName(properties.getName());
options.setPassword(properties.getPassword().toCharArray());
clientFactory.setConnectionOptions(options);
return clientFactory;
}
/**
* MQTT 클라이언트를 통해 메시지를 구독하기 위하여 MqttPahoMessageDrivenChannelAdapter를 통해 메시지 수신을 위한 채널을 구성
*/
// @Bean
// public MessageChannel mqttInputChannel() {
// return new DirectChannel();
// }
//
// @Bean
// public MessageProducer inboundChannel() {
// MqttPahoMessageDrivenChannelAdapter adapter =
// new MqttPahoMessageDrivenChannelAdapter(
// properties.getUrl(),
// MQTT_CLIENT_ID,
// properties.getTopic());
// adapter.setCompletionTimeout(5000);
// adapter.setConverter(new DefaultPahoMessageConverter());
// adapter.setQos(1);
// adapter.setOutputChannel(mqttInputChannel());
// return adapter;
// }
//
// @Bean
// @ServiceActivator(inputChannel = "mqttInputChannel")
// public MessageHandler inboundMessageHandler() {
// return message -> {
// String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
// System.out.println("Topic:" + topic);
// System.out.println("Payload" + message.getPayload());
// };
// }
/**
* Message outbound를 위한 채널 구성
*/
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound(DefaultMqttPahoClientFactory clientFactory) {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler(MQTT_CLIENT_ID, clientFactory);
messageHandler.setAsync(true);
messageHandler.setDefaultQos(1);
return messageHandler;
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
}
}
5. MqttService 클래스 생성
@Service
@RequiredArgsConstructor
public class MqttService {
private final MyGateway myGateway;
public void send() {
myGateway.sendToMqtt("12345", "/a/b/q");
}
}
6. 도커 컨테이너 만들고, Spring 서버 킨 다음 필요한 곳에서 사용
728x90
반응형