SpringBoot整合MQTT实战:基于EMQX构建高可靠物联网通信,从零到一实现设备云端双向对话

news2025/5/19 20:59:30

一、引言

随着物联网(IoT)技术的快速发展,MQTT(Message Queuing Telemetry Transport)协议因其轻量级、低功耗和高效的特点,已成为物联网设备通信的事实标准。本文将详细介绍如何使用SpringBoot框架整合MQTT协议,基于开源MQTT代理EMQX实现设备与服务器之间的双向通信。

二、技术选型与环境准备

2.1 技术栈介绍

  • SpringBoot 2.7.x:简化Spring应用初始搭建和开发过程

  • EMQX 5.0:开源的大规模分布式MQTT消息服务器

  • Eclipse Paho:流行的MQTT客户端库

  • Lombok:简化Java Bean编写

2.2 环境准备

  1. 安装EMQX服务器(可使用Docker快速部署):

    docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.0.14

  2. 确保Java开发环境(JDK 11+)和Maven已安装

三、SpringBoot项目集成MQTT

3.1 创建SpringBoot项目并添加依赖

pom.xml中添加必要的依赖:

<dependencies>
    <!-- SpringBoot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    
    <!-- MQTT Paho Client -->
    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>1.2.5</version>
    </dependency>
    
    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    
    <!-- JSON处理 -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
</dependencies>

3.2 配置MQTT连接参数

application.yml中添加配置:

mqtt:
  broker-url: tcp://localhost:1883
  username: emqx
  password: public
  client-id: springboot-server
  default-topic: device/status
  timeout: 30
  keepalive: 60
  qos: 1
  clean-session: true

创建配置类MqttProperties.java

@Data
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
    private String brokerUrl;
    private String username;
    private String password;
    private String clientId;
    private String defaultTopic;
    private int timeout;
    private int keepalive;
    private int qos;
    private boolean cleanSession;
}

3.3 实现MQTT客户端配置

创建MqttConfiguration.java

@Configuration
@RequiredArgsConstructor
public class MqttConfiguration {
    
    private final MqttProperties mqttProperties;
    
    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{mqttProperties.getBrokerUrl()});
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        options.setConnectionTimeout(mqttProperties.getTimeout());
        options.setKeepAliveInterval(mqttProperties.getKeepalive());
        options.setCleanSession(mqttProperties.isCleanSession());
        options.setAutomaticReconnect(true);
        return options;
    }
    
    @Bean
    public IMqttClient mqttClient() throws MqttException {
        IMqttClient client = new MqttClient(
            mqttProperties.getBrokerUrl(), 
            mqttProperties.getClientId(), 
            new MemoryPersistence()
        );
        client.connect(mqttConnectOptions());
        return client;
    }
}

3.4 实现MQTT消息发布服务

创建MqttPublisher.java

@Service
@RequiredArgsConstructor
@Slf4j
public class MqttPublisher {
    
    private final IMqttClient mqttClient;
    private final MqttProperties mqttProperties;
    
    public void publish(String topic, String payload) throws MqttException {
        if (!mqttClient.isConnected()) {
            mqttClient.reconnect();
        }
        
        MqttMessage message = new MqttMessage(payload.getBytes());
        message.setQos(mqttProperties.getQos());
        message.setRetained(true);
        
        mqttClient.publish(topic, message);
        
        log.info("MQTT message published to topic: {}, payload: {}", topic, payload);
    }
    
    public void publish(String payload) throws MqttException {
        publish(mqttProperties.getDefaultTopic(), payload);
    }
}

3.5 实现MQTT消息订阅服务

创建MqttSubscriber.java

@Service
@RequiredArgsConstructor
@Slf4j
public class MqttSubscriber {
    
    private final IMqttClient mqttClient;
    private final MqttProperties mqttProperties;
    
    @PostConstruct
    public void init() throws MqttException {
        subscribe(mqttProperties.getDefaultTopic());
    }
    
    public void subscribe(String topic) throws MqttException {
        if (!mqttClient.isConnected()) {
            mqttClient.reconnect();
        }
        
        mqttClient.subscribe(topic, mqttProperties.getQos(), this::handleMessage);
        log.info("Subscribed to MQTT topic: {}", topic);
    }
    
    private void handleMessage(String topic, MqttMessage message) {
        String payload = new String(message.getPayload());
        log.info("Received MQTT message from topic: {}, payload: {}", topic, payload);
        
        // 这里可以添加业务逻辑处理接收到的消息
        processMessage(topic, payload);
    }
    
    private void processMessage(String topic, String payload) {
        // 示例:解析JSON格式的消息
        try {
            ObjectMapper mapper = new ObjectMapper();
            JsonNode jsonNode = mapper.readTree(payload);
            
            // 根据不同的topic和payload内容进行业务处理
            if (topic.startsWith("device/status")) {
                handleDeviceStatus(jsonNode);
            } else if (topic.startsWith("device/control")) {
                handleDeviceControl(jsonNode);
            }
        } catch (JsonProcessingException e) {
            log.error("Failed to parse MQTT message payload: {}", payload, e);
        }
    }
    
    private void handleDeviceStatus(JsonNode jsonNode) {
        // 处理设备状态上报
        String deviceId = jsonNode.get("deviceId").asText();
        String status = jsonNode.get("status").asText();
        log.info("Device {} status updated to: {}", deviceId, status);
    }
    
    private void handleDeviceControl(JsonNode jsonNode) {
        // 处理设备控制指令响应
        String deviceId = jsonNode.get("deviceId").asText();
        String command = jsonNode.get("command").asText();
        String result = jsonNode.get("result").asText();
        log.info("Device {} executed command {} with result: {}", deviceId, command, result);
    }
}

四、实现双向通信

4.1 服务器向设备发送控制指令

创建REST API接口用于发送控制指令:

@RestController
@RequestMapping("/api/device")
@RequiredArgsConstructor
@Slf4j
public class DeviceController {
    
    private final MqttPublisher mqttPublisher;
    
    @PostMapping("/control")
    public ResponseEntity<String> sendControlCommand(@RequestBody DeviceCommand command) {
        try {
            ObjectMapper mapper = new ObjectMapper();
            String payload = mapper.writeValueAsString(command);
            
            String topic = "device/control/" + command.getDeviceId();
            mqttPublisher.publish(topic, payload);
            
            return ResponseEntity.ok("Control command sent successfully");
        } catch (Exception e) {
            log.error("Failed to send control command", e);
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body("Failed to send control command: " + e.getMessage());
        }
    }
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class DeviceCommand {
        private String deviceId;
        private String command;
        private Map<String, Object> params;
    }
}

4.2 设备模拟客户端

为了测试双向通信,我们可以创建一个简单的设备模拟客户端:

@Component
@Slf4j
public class DeviceSimulator {
    
    private final MqttPublisher mqttPublisher;
    private final MqttProperties mqttProperties;
    private IMqttClient deviceClient;
    
    public DeviceSimulator(MqttPublisher mqttPublisher, MqttProperties mqttProperties) {
        this.mqttPublisher = mqttPublisher;
        this.mqttProperties = mqttProperties;
        initDeviceClient();
    }
    
    private void initDeviceClient() {
        try {
            String deviceId = "device-" + UUID.randomUUID().toString().substring(0, 8);
            deviceClient = new MqttClient(
                mqttProperties.getBrokerUrl(), 
                deviceId, 
                new MemoryPersistence()
            );
            
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setAutomaticReconnect(true);
            
            deviceClient.connect(options);
            
            // 订阅控制主题
            String controlTopic = "device/control/" + deviceId;
            deviceClient.subscribe(controlTopic, (topic, message) -> {
                String payload = new String(message.getPayload());
                log.info("Device received control command: {}", payload);
                
                // 模拟设备执行命令并返回响应
                executeCommand(payload, deviceId);
            });
            
            // 模拟设备定期上报状态
            simulatePeriodicStatusReport(deviceId);
            
        } catch (MqttException e) {
            log.error("Failed to initialize device simulator", e);
        }
    }
    
    private void executeCommand(String payload, String deviceId) {
        try {
            ObjectMapper mapper = new ObjectMapper();
            JsonNode jsonNode = mapper.readTree(payload);
            
            String command = jsonNode.get("command").asText();
            
            // 模拟命令执行
            Thread.sleep(1000); // 模拟执行耗时
            
            // 构造响应
            ObjectNode response = mapper.createObjectNode();
            response.put("deviceId", deviceId);
            response.put("command", command);
            response.put("result", "success");
            response.put("timestamp", System.currentTimeMillis());
            
            // 发布响应
            String responseTopic = "device/control/response/" + deviceId;
            mqttPublisher.publish(responseTopic, response.toString());
            
        } catch (Exception e) {
            log.error("Failed to execute command", e);
        }
    }
    
    private void simulatePeriodicStatusReport(String deviceId) {
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        executor.scheduleAtFixedRate(() -> {
            try {
                ObjectMapper mapper = new ObjectMapper();
                ObjectNode status = mapper.createObjectNode();
                status.put("deviceId", deviceId);
                status.put("status", "online");
                status.put("cpuUsage", Math.random() * 100);
                status.put("memoryUsage", 30 + Math.random() * 50);
                status.put("timestamp", System.currentTimeMillis());
                
                String topic = "device/status/" + deviceId;
                mqttPublisher.publish(topic, status.toString());
                
            } catch (Exception e) {
                log.error("Failed to send status report", e);
            }
        }, 0, 10, TimeUnit.SECONDS);
    }
}

五、测试与验证

5.1 测试设备状态上报

  1. 启动SpringBoot应用

  2. 观察日志输出,应该能看到设备模拟客户端定期上报状态信息

5.2 测试服务器控制指令

使用Postman或curl发送控制指令:

curl -X POST http://localhost:8080/api/device/control \
-H "Content-Type: application/json" \
-d '{
    "deviceId": "device-123456",
    "command": "restart",
    "params": {
        "delay": 5
    }
}'

5.3 验证双向通信

  1. 服务器发送控制指令到特定设备

  2. 设备接收指令并执行

  3. 设备发送执行结果回服务器

  4. 服务器接收并处理设备响应

六、高级功能扩展

6.1 消息持久化与QoS级别

  • QoS 0:最多一次,消息可能丢失

  • QoS 1:至少一次,消息不会丢失但可能重复

  • QoS 2:恰好一次,消息不丢失且不重复

根据业务需求选择合适的QoS级别:

// 在发布消息时设置QoS
message.setQos(2); // 使用最高级别的QoS

6.2 安全配置

  1. 启用TLS加密:

mqtt:
  broker-url: ssl://localhost:8883
  1. 配置EMQX的ACL规则,限制客户端权限

6.3 集群部署

对于生产环境,可以部署EMQX集群:

# 启动第一个节点
docker run -d --name emqx1 -p 1883:1883 -p 8081:8081 -e EMQX_NODE_NAME=emqx@node1.emqx.io -e EMQX_CLUSTER__DISCOVERY=static -e EMQX_CLUSTER__STATIC__SEEDS="emqx@node1.emqx.io,emqx@node2.emqx.io" emqx/emqx:5.0.14

# 启动第二个节点
docker run -d --name emqx2 -p 1884:1883 -p 8082:8081 -e EMQX_NODE_NAME=emqx@node2.emqx.io -e EMQX_CLUSTER__DISCOVERY=static -e EMQX_CLUSTER__STATIC__SEEDS="emqx@node1.emqx.io,emqx@node2.emqx.io" emqx/emqx:5.0.14

6.4 消息桥接与WebHook

通过EMQX的桥接功能,可以将消息转发到其他MQTT服务器或Kafka等消息队列。也可以通过WebHook将消息推送到HTTP服务。

七、总结

本文详细介绍了如何使用SpringBoot整合MQTT协议,基于EMQX实现设备与服务器之间的双向通信。主要内容包括:

  1. SpringBoot项目中集成MQTT客户端

  2. 实现消息发布和订阅功能

  3. 设计双向通信机制

  4. 设备模拟与测试验证

  5. 高级功能扩展建议

这种架构非常适合物联网场景,能够支持海量设备连接和实时消息通信。开发者可以根据实际业务需求,在此基础上进行扩展和优化,构建稳定可靠的物联网平台。

八、参考资料

  1. EMQX官方文档:Introduction | EMQX 5.0 Docs

  2. Eclipse Paho项目:Eclipse Paho | The Eclipse Foundation

  3. MQTT协议规范:MQTT Version 3.1.1

  4. Spring Boot官方文档:Spring Boot

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2379533.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

界面控件DevExpress WinForms v24.2 - 数据处理功能增强

DevExpress WinForms拥有180组件和UI库&#xff0c;能为Windows Forms平台创建具有影响力的业务解决方案。DevExpress WinForms能完美构建流畅、美观且易于使用的应用程序&#xff0c;无论是Office风格的界面&#xff0c;还是分析处理大批量的业务数据&#xff0c;它都能轻松胜…

Linux的MySQL头文件和找不到头文件问题解决

头文件 #include <iostream> #include <mysql_driver.h> #include <mysql_connection.h> #include <cppconn/statement.h> #include <cppconn/resultset.h> #include <cppconn/prepared_statement.h> #include <cppconn/exception.h&g…

wps excel将表格输出pdf时所有列在一张纸上

记录&#xff1a;wps excel将表格输出pdf时所有列在一张纸上 1&#xff0c;调整缩放比例&#xff0c;或选择将所有列打印在一页 2&#xff0c;将表格的所有铺满到这套虚线

zabbix7.2最新版本 nginx自定义监控(三) 设置触发器

安装zabbix-get服务 在zabbix-server端口安装zabbix-get服务 [rootlocalhost ~]# dnf install -y zabbix-get Last metadata expiration check: 1:55:49 ago on Wed 14 May 2025 09:24:49 AM CST. Dependencies resolved. Package Architectur…

缓存的相关内容

缓存是一种介于数据永久存储介质与数据应用之间数据临时的存储介质 实用化保存可以有效地减少低俗数据读取的次数 (例如磁盘IO), 提高系统性能 缓存不仅可以用于提高永久性存储介质的数据读取效率&#xff0c;还可以提供临时的数据存储空间 spring boot中提供了缓存技术, 方便…

[ctfshow web入门] web77

信息收集 上一题的读取flag方式不能用了&#xff0c;使用后的回显是&#xff1a;could not find driver 解题 同样的查目录方法 cvar_export(scandir("glob:///*"));die();cforeach(new DirectoryIterator("glob:///*") as $a){echo($a->__toString…

C++学习-入门到精通-【7】类的深入剖析

C学习-入门到精通-【7】类的深入剖析 类的深入剖析 C学习-入门到精通-【7】类的深入剖析一、Time类的实例研究二、组成和继承三、类的作用域和类成员的访问类作用域和块作用域圆点成员选择运算符(.)和箭头成员选择运算符(->)访问函数和工具函数 四、具有默认实参的构造函数重…

主成分分析的应用之sklearn.decomposition模块的PCA函数

主成分分析的应用之sklearn.decomposition模块的PCA函数 一、模型建立整体步骤 二、数据 2297.86 589.62 474.74 164.19 290.91 626.21 295.20 199.03 2262.19 571.69 461.25 185.90 337.83 604.78 354.66 198.96 2303.29 589.99 516.21 236.55 403.92 730.05 438.41 225.80 …

1. Go 语言环境安装

&#x1f451; 博主简介&#xff1a;高级开发工程师 &#x1f463; 出没地点&#xff1a;北京 &#x1f48a; 人生目标&#xff1a;自由 ——————————————————————————————————————————— 版权声明&#xff1a;本文为原创文章&#xf…

IP协议深度解析:互联网世界的核心基石

作为互联网通信的基础协议&#xff0c;IP&#xff08;Internet Protocol&#xff09;承载着全球99%的网络数据流量。本文将深入剖析IP协议的核心特性、工作原理及演进历程&#xff0c;通过技术原理、协议对比和实战案例分析&#xff0c;为您揭示这个数字世界"隐形交通规则…

Oracle DBMS_STATS.GATHER_DATABASE_STATS 默认行为

Oracle DBMS_STATS.GATHER_DATABASE_STATS 默认行为 DBMS_STATS.GATHER_DATABASE_STATS的默认选项究竟是’GATHER’还是’GATHER AUTO’&#xff1f;这个问题非常重要&#xff0c;因为理解默认行为直接影响统计信息收集策略。 一 官方文档确认 根据Oracle 19c官方文档&#…

C++天空之城的树 全国信息素养大赛复赛决赛 C++小学/初中组 算法创意实践挑战赛 内部集训模拟题详细解析

C++天空之城的树 全国青少年信息素养大赛 C++复赛/决赛模拟练习题 博主推荐 所有考级比赛学习相关资料合集【推荐收藏】1、C++专栏 电子学会C++一级历年真题解析

GO语言语法---switch语句

文章目录 基本语法1. 特点1.1 不需要break1.2 表达式可以是任何类型1.3 省略比较表达式1.4 多值匹配1.5 类型switch1.6 case穿透1.7 switch后直接声明变量1.7.1 基本语法1.7.2 带比较表达式1.7.3 不带比较表达式1.7.4 结合类型判断 1.8 switch后的表达式必须与case语句中的表达…

开疆智能Profient转ModbusTCP网关连接ABB机器人MODBUS TCP通讯案例

本案例是通过开疆智能Profinet转ModbusTCP网关将西门子PLC与ABB机器人进行通讯 因西门子PLC采用Profinet协议&#xff0c;而ABB机器人采用的是ModbusTCP通讯。故采取此种方案。 配置过程&#xff1a; 1.MODBUS/TCP基于以太网&#xff0c;故ABB机器人在使用时需要有616-1PCIN…

解决qt.network.ssl: QSslSocket::connectToHostEncrypted: TLS initialization failed

可以参考&#xff1a;解决qt.network.ssl: QSslSocket::connectToHostEncrypted: TLS initialization failed-CSDN博客 讲的是程序执行目录下可能缺少了&#xff1a; libssl-1_1-x64.dll 和 libcrypto-1_1-x64.dll 库文件&#xff0c;将其复制到可执行文件exe的同级目录下即可…

Text2SQL:自助式数据报表开发---0517

Text2SQL技术 早期阶段&#xff1a;依赖于人工编写的规则模板来匹配自然语言和SQL语句之间的对应关系 机器学习阶段&#xff1a;采用序列到序列模型等机器学习方法来学习自然语言与SQL之间的关系 LLM阶段&#xff1a;借助LLM强大的语言理解和代码生成能力&#xff0c;利用提示…

使用Visual Studio将C#程序发布为.exe文件

说明 .exe 是可执行文件&#xff08;Executable File&#xff09;的扩展名。这类文件包含计算机可以直接运行的机器代码指令&#xff0c;通常由编程语言&#xff08;如 C、C、C#、Python 等&#xff09;编译或打包生成。可以用于执行自动化操作&#xff08;执行脚本或批处理操…

React Flow 边的基础知识与示例:从基本属性到代码实例详解

本文为《React Agent&#xff1a;从零开始构建 AI 智能体》专栏系列文章。 专栏地址&#xff1a;https://blog.csdn.net/suiyingy/category_12933485.html。项目地址&#xff1a;https://gitee.com/fgai/react-agent&#xff08;含完整代码示​例与实战源&#xff09;。完整介绍…

oracle 资源管理器的使用

14.8.2资源管理器的使用 资源管理器控制CPU资源使用说明&#xff1a;  第一种分配方法&#xff1a;EMPHASIS CPU 分配方法确定在资源计划中对不同使用者组中的会话的重视程度。CPU占用率的分配级别为从1 到8&#xff0c;级别1 的优先级最高。百分比指定如何将CPU 资源分配给每…

贝叶斯优化Transformer融合支持向量机多变量回归预测,附相关性气泡图、散点密度图,Matlab实现

贝叶斯优化Transformer融合支持向量机多变量回归预测&#xff0c;附相关性气泡图、散点密度图&#xff0c;Matlab实现 目录 贝叶斯优化Transformer融合支持向量机多变量回归预测&#xff0c;附相关性气泡图、散点密度图&#xff0c;Matlab实现效果一览基本介绍程序设计参考资料…