SpringBoot整合MQTT实战:基于EMQX实现双向设备通信(附源码)

news2025/7/16 22:05:44

简言:

在万物互联的时代,MQTT协议凭借其轻量级、高效率的特性,已成为物联网通信的事实标准。本教程将带领您在Ubuntu系统上搭建EMQX 5.9.0消息服务器,并使用Spring Boot快速实现两个客户端的高效通信。通过本指南,您将掌握:

✅ 企业级MQTT消息中间件的部署

✅ Spring Boot与MQTT协议的深度集成

✅ 双向实时通信的完整实现方案

✅ 生产级应用的最佳实践建议


源码地址:https://gitcode.com/Var_ya/mqtt_viteClient

参考文档:

  1. 在 Ubuntu 上安装 EMQX:https://docs.emqx.com/zh/emqx/latest/deploy/install-ubuntu.html
  2. MQTTX 下载:https://mqttx.app/zh/downloads

一、🛠️ 搭建魔法邮局(EMQX服务器)

扩展:在安装EMQX前记得先更新先软件包

apt update

1. 安装EMQX企业版
在Ubuntu终端输入以下咒语:

# 下载魔法卷轴(安装包)
wget https://www.emqx.com/zh/downloads/enterprise/5.9.0/emqx-enterprise-5.9.0-ubuntu24.04-amd64.deb


# 解开卷轴封印
sudo dpkg -i emqx-enterprise-5.9.0-ubuntu20.04-amd64.deb	


# 启动邮局服务
sudo systemctl start emqx

2. 打开魔法管理台
浏览器访问 http://localhost:18083,默认账号admin/public,你将看到:


二、📱 准备第一个信使(MQTTX客户端)

安装MQTTX桌面版

安装地址:https://mqttx.app/zh/downloads

打开后新建连接:

  • 名称:魔法邮箱_varin.cn
  • 服务器:varin:1883


🔍 让我们用Spring Boot的魔法升级Java程序! 把魔杖(原生Java)换成自动施法的魔法书(Spring Boot)~

✨ 三、Spring Boot的核心初始化

1. 创建魔法卷轴(Spring Boot项目)

用Spring Initializr生成项目,勾选:

  • Spring Web (发送HTTP咒语)
  • Spring Integration (MQTT魔法核心)
2. 添加飞天扫帚驱动(POM依赖)
<!--        消息中间件-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
<!--        流消息-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
<!--   核心依赖:     mqtt客户端-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>5.5.5</version>
        </dependency>

3.设置application.yml内容
spring:
  application:
    name: mqtt-client-api

  mqtt:
    username: varya
    password: 123456
    url: tcp://varin.cn:1883
    subClientId: sub_client_id_varya
    subTopic: mqttx_and_springboot_client/,
    pubClientId: pub_client_id_vay
server:
  port: 9999

# knife4j的增强配置,不需要增强可以不配
knife4j:
  enable: true    # 开启knife4j,无需添加@EnableKnife4j注解
  setting:
    language: zh_cn   #中文
  #  swagger-model-name: 实体列表   #默认为: Swagger Models
  basic: # 开启SwaggerBasic认证功能,默认是false
    enable: false
    username: varya
    password: varya


3.建立读取mqtt关于application.yml文件实体
package cn.varin.mqttclientapi.entity;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
 * mqtt配置属性实体类
 */
@Data
@ConfigurationProperties(prefix = "spring.mqtt") // 读取yml文件中的配置

public class MqttConfigProperties {
    private String username;
    private String password;
    private String url;
    private String subClientId;
    private String subTopic;
    private String pubClientId;
}

4. 参考文件目录设置

(注:该代码已上传gitcode代码仓库,欢迎阅读,下载


🧙♂️ 四、Mqtt核心基础配置(代码篇)

1. MqttConfig(mqtt配置类)
package cn.varin.mqttclientapi.config;

import cn.varin.mqttclientapi.entity.MqttConfigProperties;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;

@Configuration
public class MqttConfig {
    // yml获取配置内容
    @Autowired
    private MqttConfigProperties mqttConfigProperties;
//    连接工厂建立
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory (){
        // 建立默认工程
        DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory();
        // 设置连接选项内容
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setUserName(mqttConfigProperties.getUsername());
        mqttConnectOptions.setPassword(mqttConfigProperties.getPassword().toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{mqttConfigProperties.getUrl()});

        defaultMqttPahoClientFactory.setConnectionOptions(mqttConnectOptions );
        return  defaultMqttPahoClientFactory;
    }
}

****

🧙♂️ 五、Mqtt入站信息配置(代码篇)


1. MqttConfig(mqtt配置类)
package cn.varin.mqttclientapi.config;

import cn.varin.mqttclientapi.entity.MqttConfigProperties;
import cn.varin.mqttclientapi.handler.MqttMessageHandle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
 *
 * 配饰入站消息配置
 */

@Configuration
public class MqttInboundConfig {
    @Autowired
    private MqttConfigProperties mqttConfigProperties;
    @Autowired
    private MqttPahoClientFactory mqttPahoClientFactory;

    // 建立入站通道
    @Bean
    public  MessageChannel messageInboundChannel(){
        return  new DirectChannel();
    }
    // 配置入站适配器
    @Bean
    public MessageProducer messageProducer(){
        MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(
                mqttConfigProperties.getUrl(),
                mqttConfigProperties.getSubClientId(),
                mqttPahoClientFactory,
                mqttConfigProperties.getSubTopic().split(",")
        );
        mqttPahoMessageDrivenChannelAdapter.setQos(2);
        mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
        // 设置通道
        mqttPahoMessageDrivenChannelAdapter.setOutputChannel( messageInboundChannel());

        return mqttPahoMessageDrivenChannelAdapter;
    }
    // 设置接收消息处理器

//    @Bean
//    @ServiceActivator(inputChannel = "messageInboundChannel")
//    public MessageHandler messageHandler (){
//        return new MqttMessageHandle();
//    }


}

2. 建立入站信息处理器(MqttMessageHandle)
package cn.varin.mqttclientapi.handler;

import cn.varin.mqttclientapi.entity.MqttMessageResponseBody;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.swagger.v3.core.util.Json;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
/**
 *
 * 接收消息处理器
 *
 */

@Component

public class MqttMessageHandle implements MessageHandler {


   @ServiceActivator(inputChannel = "messageInboundChannel") // 用于指定通道
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
       System.out.println("=================");

       MessageHeaders headers = message.getHeaders();
       String mqtt_receivedTopic = headers.get("mqtt_receivedTopic").toString();
       System.out.println(mqtt_receivedTopic);
       System.out.println("=================");
    }
}

🧙♂️ 六、Mqtt出站信息配置(代码篇)

1. MqttOutboundConfig(mqtt出站信息配置类)
package cn.varin.mqttclientapi.config;

import cn.varin.mqttclientapi.entity.MqttConfigProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

@Configuration
public class MqttOutboundConfig {
    @Autowired
    private MqttConfigProperties mqttConfigProperties;
    @Autowired
    private MqttPahoClientFactory mqttPahoClientFactory;

    // 建立出站通道
    @Bean
    public MessageChannel messageOutboundChannel(){
        return new DirectChannel();
    }
    // 建立发送消息配置

    @ServiceActivator(inputChannel = "messageOutboundChannel")
    @Bean
    public MessageHandler messageOutboundHandle(){
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
        mqttConfigProperties.getUrl(),
        mqttConfigProperties.getPubClientId(),
        mqttPahoClientFactory
        );
        messageHandler.setDefaultQos(2);
        messageHandler.setDefaultTopic("default");
        messageHandler.setAsync(true);
        return messageHandler;

    }
}

2. 建立发送消息网关(MqttGetway)
package cn.varin.mqttclientapi.getway;


import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;

@MessagingGateway(defaultRequestChannel = "messageOutboundChannel")
public interface MqttGetway {

    void send(@Header(value = MqttHeaders.TOPIC) String topic, String payload);
    void send(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) Integer qos, String payload);
}

2. 建立mqtt发送消息服务
package cn.varin.mqttclientapi.service;

public interface MqttMessageSenderService {

    void send(String topic, String payload);
    void send(String topic, Integer qos, String payload);
}


package cn.varin.mqttclientapi.service.impl;

import cn.varin.mqttclientapi.getway.MqttGetway;
import cn.varin.mqttclientapi.service.MqttMessageSenderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MqttMessageSenderServiceImpl  implements MqttMessageSenderService {
    @Autowired
    private MqttGetway mqttGetway;
    @Override
    public void send(String topic, String payload) {
        mqttGetway.send(topic,payload);
    }

    @Override
    public void send(String topic, Integer qos, String payload) {
        mqttGetway.send(topic,qos,payload);
    }
}

🧙♂️ 七、Mqtt消息发送Controller(代码篇)

package cn.varin.mqttclientapi.controller;


import cn.varin.mqttclientapi.entity.MqttRequestBody;
import cn.varin.mqttclientapi.handler.UnifiedResponseHandler;
import cn.varin.mqttclientapi.service.MqttMessageSenderService;
import cn.varin.mqttclientapi.service.impl.MqttMessageSenderServiceImpl;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@Tag(name = "MQTT服务接口")
@RestController
@RequestMapping("/mqtt")
public class MqttController {

    @Autowired
    private MqttMessageSenderServiceImpl mqttMessageSenderService;

    @Operation(summary = "发送消息,")
    @PostMapping("/send,有qos")
    public UnifiedResponseHandler.Result send(@RequestBody MqttRequestBody mqttRequestBody){
            System.out.println(mqttRequestBody.toString());
        mqttMessageSenderService.send(mqttRequestBody.getMqtt_topic(),mqttRequestBody.getQos(),mqttRequestBody.getPayload());
        return new UnifiedResponseHandler.Result(200,"success",null);
    }
    @Operation(summary = "发送消息,无qos")
    @PostMapping("/send")
    public UnifiedResponseHandler.Result send2(@RequestBody MqttRequestBody mqttRequestBody){
        System.out.println(mqttRequestBody.toString());
        mqttMessageSenderService.send(mqttRequestBody.getMqtt_topic(),mqttRequestBody.getPayload());
        return new UnifiedResponseHandler.Result(200,"success",null);
    }

}

🧙♂️ 八、MqttTest测试文件(代码篇)

  1. 参考目录

  1. 建立test启动类
package cn.varin.mqttclientapi;

import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class MqttClientApiApplicationTests {

    @Test
    void contextLoads() {
    }

}

  1. 测试代码
package cn.varin.mqttclientapi.test;


import cn.varin.mqttclientapi.service.impl.MqttMessageSenderServiceImpl;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest(value = "MqttClientApiApplicationTests.class")
public class MqttMesageSenderTest {

    @Autowired
    private MqttMessageSenderServiceImpl mqttMessageSenderService;
    @Test
    public void MqttMessageSendTest(){
        // 实际业务
        mqttMessageSenderService.send("java_test/","testaaa");
    }
}

🌌🚀 九、通信魔法测试大赏

场景1:使用Test测试类测试
  1. 点击启动按钮(画红线的绿色按钮)

  1. 显示测试结果

场景2:HTTP请求测试(使用idea自带的http接口测试插件)
  1. 在MQTTX发送:
POST http://localhost:9999/mqtt/send
Content-Type: application/json

{
  "mqtt_topic":"java_test/",
  "qos":2,
  "payload":"h33333ello"
}


(点击红线上的绿色按钮)

  1. 测试结果:

  1. Spring Boot控制台会:



升级完毕! 现在你的MQTT程序拥有了Spring Boot的自动施法能力,就像拥有了老魔杖+隐形斗篷+复活石的组合!快去征服分布式魔法世界吧~ 🎩

常见问题排查

现象检查方向解决手段
连接失败防火墙设置/端口开放netstat -tulnp
消息丢失QoS级别配置确认使用QoS1/2
高延迟网络带宽/负载均衡EMQX集群横向扩展

通过本方案,您已经构建了一个基于Spring Boot的企业级MQTT通信系统。这种架构可广泛应用于物联网设备管理、实时数据采集、远程控制等场景,为智能硬件与云端系统搭建了可靠的消息桥梁。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2375137.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

从零开始掌握FreeRTOS(2)链表之节点的定义

目录 节点 节点定义 节点实现 根节点 根节点定义 精简节点定义 根节点实现 在上篇文章,我们完成了 FreeRTOS 的移植。在创建任务之前,我们需要先了解FreeRTOS的运转机制。 FreeRTOS是一个多任务系统,由操作系统来管理执行每个任务。这些任务全都挂载到一个双向循…

【数据结构】——双向链表

一、链表的分类 我们前面学习了单链表&#xff0c;其是我们链表中的其中一种&#xff0c;我们前面的单链表其实全称是单向无头不循环链表&#xff0c;我们的链表从三个维度进行分类&#xff0c;一共分为八种。 1、单向和双向 可以看到第一个链表&#xff0c;其只能找到其后一个…

mybatis中${}和#{}的区别

先测试&#xff0c;再说结论 userService.selectStudentByClssIds(10000, "wzh or 11");List<StudentEntity> selectStudentByClssIds(Param("stuId") int stuId, Param("field") String field);<select id"selectStudentByClssI…

抗量子计算攻击的数据安全体系构建:从理论突破到工程实践

在“端 - 边 - 云”三级智能协同理论中&#xff0c;端 - 边、边 - 云之间要进行数据传输&#xff0c;网络的安全尤为重要&#xff0c;为了实现系统总体的安全可控&#xff0c;将构建安全网络。 可先了解我的前文&#xff1a;“端 - 边 - 云”三级智能协同平台的理论建构与技术实…

uniapp|实现手机通讯录、首字母快捷导航功能、多端兼容(H5、微信小程序、APP)

基于uniapp实现带首字母快捷导航的通讯录功能,通过拼音转换库实现汉字姓名首字母提取与分类,结合uniapp的scroll-view组件与pageScrollTo API完成滚动定位交互,并引入uni-indexed-list插件优化索引栏性能。 目录 核心功能实现动态索引栏生成​联系人列表渲染​滚动定位联动性…

【Linux】基础IO(二)

&#x1f4dd;前言&#xff1a; 上篇文章我们对Linux的基础IO有了一定的了解&#xff0c;这篇文章我们来讲讲IO更底层的东西&#xff1a; 重定向及其原理感受file_operation文件缓冲区 &#x1f3ac;个人简介&#xff1a;努力学习ing &#x1f4cb;个人专栏&#xff1a;Linux…

【生存技能】ubuntu 24.04 如何pip install

目录 原因解决方案说明关于忽略系统路径 在接手一个新项目需要安装python库时弹出了以下提示: 原因 这个报错是因为在ubuntu中尝试直接使用 pip 安装 Python 包到系统环境中&#xff0c;ubuntu 系统 出于稳定性考虑禁止了这种操作 这里的kali是因为这台机器的用户起名叫kali…

SHAP分析!Transformer-GRU组合模型SHAP分析,模型可解释不在发愁!

SHAP分析&#xff01;Transformer-GRU组合模型SHAP分析&#xff0c;模型可解释不在发愁&#xff01; 目录 SHAP分析&#xff01;Transformer-GRU组合模型SHAP分析&#xff0c;模型可解释不在发愁&#xff01;效果一览基本介绍程序设计参考资料 效果一览 基本介绍 基于SHAP分析…

知名人工智能AI培训公开课内训课程培训师培训老师专家咨询顾问唐兴通AI在金融零售制造业医药服务业创新实践应用

AI赋能未来工作&#xff1a;引爆效率与价值创造的实战营 AI驱动的工作革命&#xff1a;从效率提升到价值共创 培训时长&#xff1a; 本课程不仅是AI工具的操作指南&#xff0c;更是面向未来的工作方式升级罗盘。旨在帮助学员系统掌握AI&#xff08;特别是生成式AI/大语言模型…

Qt Creator 配置 Android 编译环境

Qt Creator 配置 Android 编译环境 环境配置流程下载JDK修改Qt Creator默认android配置文件修改sdk_definitions.json配置修改的内容 Qt Creator配置 异常处理删除提示占用编译报错连接安卓机调试APP闪退 环境 Qt Creator 版本 qtcreator-16.0.1Win10 嗯, Qt这个开发环境有点难…

智能手表蓝牙 GATT 通讯协议文档

以下是一份适用于智能手表的 蓝牙 GATT 通讯协议文档&#xff0c;适用于 BLE 5.0 及以上标准&#xff0c;兼容 iOS / Android 平台&#xff1a; 智能手表蓝牙 GATT 通讯协议文档 文档版本&#xff1a;V1.0 编写日期&#xff1a;2025年xx月xx日 产品型号&#xff1a;Aurora Wat…

RT-THREAD RTC组件中Alarm功能驱动完善

使用Rt-Thread的目的为了更快的搭载工程&#xff0c;使用Rt-Thread丰富的组件和第三方包资源&#xff0c;解耦硬件&#xff0c;在更换芯片时可以移植应用层代码。你是要RTT的目的什么呢&#xff1f; 文章项目背景 以STM32L475RCT6为例 RTC使用的为LSE外部低速32 .756k Hz 的…

用ffmpeg压缩视频参数建议

注意:代码中的斜杠\可以删除 一、基础压缩命令&#xff08;画质优先) libx265​​推荐配置 ffmpeg -i input.mp4 -c:v libx265 -crf 25 -preset medium -c:a aac -b:a 128k output.mp4-crf&#xff1a;建议25-28&#xff08;值越小画质越高&#xff09; -preset&#xff1a;平…

输入顶点坐标输出立方体长宽高的神经网络 Snipaste贴图软件安装

写一个神经网络&#xff0c;我输入立方体投影线段的三视图坐标&#xff0c;输出分类和长宽高 放这了明天接着搞 -------------------------------------------- 开搞 然而我的数据是这样的 winget install Snipaste f1启动&#xff0c;双击贴图隐藏 用右边4个数据做输入…

用python清除PDF文件中的水印(Adobe Acrobat 无法删除)

学校老师发的资料&#xff0c;有时候会带水印&#xff0c;有点强迫症的都想给它去掉。用Adobe Acrobat试了下&#xff0c;检测不到水印&#xff0c;无法删除&#xff01;分析发现原来这类PDF文件是用word编辑的&#xff0c;其中的水印是加在了页眉中&#xff01; 自己动手想办法…

Dagster Pipes系列-1:调用外部Python脚本

本文是"Dagster Pipes教程"的第一部分&#xff0c;介绍如何通过Dagster资产调用外部Python脚本并集成到数据管道中。首先&#xff0c;创建Dagster资产subprocess_asset&#xff0c;利用PipesSubprocessClient资源执行外部脚本external_code.py&#xff0c;实现跨进程…

python shutil 指定文件夹打包文件为 zip 压缩包

python shutil 指定文件夹打包文件为 zip 压缩包&#xff0c;具体代码如下&#xff1a; import shutil# 指定要打包的文件夹路径 src_doc ./test# 指定输出的压缩包文件名&#xff08;不包含扩展名&#xff09; output_filename testfromat_ zip# 打包并压缩文件夹为 ZIP …

Webug4.0通关笔记25- 第30关SSRF

目录 一、SSRF简介 1.SSRF原理 2.渗透方法 二、第30关SSRF渗透实战 1.打开靶场 2.渗透实战 &#xff08;1&#xff09;Windows靶场修复 &#xff08;2&#xff09;Docker靶场修复 &#xff08;3&#xff09;获取敏感文件信息 &#xff08;4&#xff09;内网端口与服务…

OpenCV 中用于背景分割的一个类cv::bgsegm::BackgroundSubtractorLSBP

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 cv::bgsegm::BackgroundSubtractorLSBP 是 OpenCV 中用于背景分割的一个类&#xff0c;它基于局部样本二进制模式&#xff08;Local Sample Bina…

MacOS 上构建 gem5

MacOS 中只存在 python3&#xff0c;但是scons 只认 python&#xff0c;不在 系统中创建 软连接&#xff0c;一个是因为比较难操作&#xff1b;另一个是尽量不要更改系统。所以独立构件python 和scons&#xff1a; 1&#xff0c;安装python 下载源代码&#xff1a; Python S…