开启物联网的魔法之门 - 深入探索发布/订阅模式

news2025/9/19 3:30:10

文章目录

  • MQTT 发布/订阅模式
  • MQTT 发布/订阅中的消息路由
  • MQTT 与 HTTP 请求响应
  • MQTT 与消息队列
  • Paho Java 使用示例
  • 结语

在这里插入图片描述

MQTT 发布/订阅模式

发布订阅模式(Publish-Subscribe Pattern)是一种消息传递模式,它将发送消息的客户端(发布者)与接收消息的客户端(订阅者)解耦,使得两者不需要建立直接的联系也不需要知道对方的存在。

MQTT 发布/订阅模式的精髓在于由一个被称为代理(Broker)的中间角色负责所有消息的路由和分发工作,发布者将带有主题的消息发送给代理,订阅者则向代理订阅主题来接收感兴趣的消息。

在 MQTT 中,主题和订阅无法被提前注册或创建,所以代理也无法预知某一个主题之后是否会有订阅者,以及会有多少订阅者,所以只能将消息转发给当前的订阅者,如果当前不存在任何订阅,那么消息将被直接丢弃

MQTT 发布/订阅模式有 4 个主要组成部分:发布者、订阅者、代理和主题。

  • 发布者(Publisher)
    负责将消息发布到主题上,发布者一次只能向一个主题发送数据,发布者发布消息时也无需关心订阅者是否在线。
  • 订阅者(Subscriber)
    订阅者通过订阅主题接收消息,且可一次订阅多个主题。MQTT 还支持通过共享订阅的方式在多个订阅者之间实现订阅的负载均衡。
  • 代理(Broker)
    负责接收发布者的消息,并将消息转发至符合条件的订阅者。另外,代理也需要负责处理客户端发起的连接、断开连接、订阅、取消订阅等请求。
  • 主题(Topic)
    主题是 MQTT 进行消息路由的基础,它类似 URL 路径,使用斜杠 / 进行分层,比如 sensor/1/temperature。一个主题可以有多个订阅者,代理会将该主题下的消息转发给所有订阅者;一个主题也可以有多个发布者,代理将按照消息到达的顺序转发。

MQTT 还支持订阅者使用主题通配符一次订阅多个主题。

在这里插入图片描述

MQTT 发布/订阅中的消息路由

在 MQTT 发布/订阅模式中,一个客户端既可以是发布者,也可以是订阅者,也可以同时具备这两个身份。 当客户端发布一条消息时,它会被发送到代理,然后代理将消息路由到该主题的所有订阅者。 当客户端订阅一个主题时,它会收到代理转发到该主题的所有消息。

一般来说,大多数发布/订阅系统主要通过以下两种方式过滤并路由消息。

  • 根据主题
    订阅者向代理订阅自己感兴趣的主题,发布者发布的所有消息中都会包含自己的主题,代理根据消息的主题判断需要将消息转发给哪些订阅者。

  • 根据消息内容
    订阅者定义其感兴趣的消息的条件,只有当消息的属性或内容满足订阅者定义的条件时,消息才会被投递到该订阅者。

MQTT 协议是基于主题进行消息路由的,在这个基础上,EMQX 从 3.1 版本开始通过基于 SQL 的规则引擎提供了额外的按消息内容进行路由的能力。

MQTT 与 HTTP 请求响应

HTTP 是万维网数据通信的基础,其简单易用无客户端依赖,被广泛应用于各个行业。在物联网领域,HTTP 也可以用于连接物联网设备和 Web 服务器,实现设备的远程监控和控制。

虽然使用简单、开发周期短,但是基于请求响应的 HTTP 在物联网领域的应用却有一定的局限性。首先,协议层面 HTTP 报文相较与 MQTT 需要占用更多的网络开销;其次,HTTP 是一种无状态协议,这意味着服务器在处理请求时不会记录客户端的状态,也无法实现从连接异常断开中恢复;最后,请求响应模式需要通过轮询才能获取数据更新,而 MQTT 通过订阅即可获取实时数据更新。

发布订阅模式的松耦合特性,也给 MQTT 带来了一些副作用。由于发布者并不知晓订阅者的状态,因此发布者也无法得知订阅者是否收到了消息,或者是否正确处理了消息。为此,MQTT 5.0 增加了请求响应特性,以实现订阅者收到消息后向某个主题发送应答,发布者收到应答后再进行后续操作。

MQTT 与消息队列

尽管 MQTT 与消息队列的很多行为和特性非常接近,比如都采用发布/订阅模式,但是他们面向的场景却有着显著的不同。消息队列主要用于服务端应用之间的消息存储与转发,这类场景往往数据量大但客户端数量少。MQTT 是一种消息传输协议,主要用于物联网设备之间的消息传递,这类场景的特点是海量的设备接入、管理与消息传输。

在一些实际的应用场景中,MQTT 与消息队列往往会被结合起来使用,以使 MQTT 服务器能专注于处理设备的连接与设备间的消息路由。比如先由 MQTT 服务器接收物联网设备上报的数据,然后再通过消息队列将这些数据转发到不同的业务系统进行处理。

不同于消息队列,MQTT 主题不需要提前创建。MQTT 客户端在订阅或发布时即自动的创建了主题,开发者无需再关心主题的创建,并且也不需要手动删除主题。

Paho Java 使用示例

通过包管理工具 Maven 可以方便地安装 Paho Java 客户端库,截止目前最新版本安装如下:

<dependency>
  <groupId>org.eclipse.paho</groupId>
	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
	<version>1.2.2</version>
</dependency>

Java 体系中 Paho Java 是比较稳定、广泛应用的 MQTT 客户端库,本示例包含 Java 语言的 Paho Java 连接 EMQX Broker,并进行消息收发完整代码:

public class App {
    public static void main(String[] args) {
        String subTopic = "testtopic/#";
        String pubTopic = "testtopic/1";
        String content = "Hello World";
        int qos = 2;
        String broker = "tcp://broker.emqx.io:1883";
        String clientId = "emqx_test";
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient client = new MqttClient(broker, clientId, persistence);

            // MQTT 连接选项
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName("emqx_test");
            connOpts.setPassword("emqx_test_password".toCharArray());
            // 保留会话
            connOpts.setCleanSession(true);

            // 设置回调
            client.setCallback(new PushCallback());

            // 建立连接
            System.out.println("Connecting to broker: " + broker);
            client.connect(connOpts);

            System.out.println("Connected");
            System.out.println("Publishing message: " + content);

            // 订阅
            client.subscribe(subTopic);

            // 消息发布所需参数
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            client.publish(pubTopic, message);
            System.out.println("Message published");

            client.disconnect();
            System.out.println("Disconnected");
            client.close();
            System.exit(0);
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }
}

回调消息处理类 OnMessageCallback.java

public class OnMessageCallback implements MqttCallback {
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        System.out.println("连接断开,可以做重连");
    }

    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        System.out.println("接收消息主题:" + topic);
        System.out.println("接收消息Qos:" + message.getQos());
        System.out.println("接收消息内容:" + new String(message.getPayload()));
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }
}

结语

MQTT 的发布/订阅机制可以很轻易地满足我们一对一、一对多、多对一的通信需要。这也在很大程度上拓宽了 MQTT 在 IoT 领域之外的应用,像网络直播互动、手机消息推送等行业场景,都非常适合使用 MQTT。

链接来源:https://www.emqx.com/zh/blog/mqtt-5-introduction-to-publish-subscribe-model

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1355442.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Jetson Orin Nano_初识,关于板载资源

1、开发板上有什么 英伟达Jetson Orin Nano&#xff0c;内存8GB&#xff0c;算力40TOPS&#xff08;CPU&#xff09;固态硬盘128GB&#xff08;系统镜像以及文件存储&#xff09;千兆以太网口、无线网卡&#xff08;用来上网&#xff09;4个USB&#xff08;用来接鼠标键盘&…

es6中import * as导入方式

es6中import * as导入方式 一、问题和解决方法二、简介import * as三、ES6 模块化语法导入导出1.导入2.导出 一、问题和解决方法 问题报错: export ‘default’ (imported as ‘XLSX’) was not found in ‘xlsx’ (possible exports: CFB, SSF, parse_xlscfb, parse_zip, read…

遥测终端机:数据世界的千里眼与顺风耳

在当今这个信息爆炸的时代&#xff0c;数据的重要性日益凸显。如何高效、准确地收集、传输和处理这些数据&#xff0c;成为了众多企业和研究机构关注的焦点。而遥测终端机&#xff0c;正是这样一种解决这一问题的强大工具。 遥测终端机&#xff0c;顾名思义&#xff0c;是一种…

java SSM水质历史数据可视化设计myeclipse开发mysql数据库springMVC模式java编程计算机网页设计

一、源码特点 java SSM水质历史数据可视化设计是一套完善的web设计系统&#xff08;系统采用SSM框架进行设计开发&#xff0c;springspringMVCmybatis&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主…

Mybatis源码基本原理--XML版

文章目录 mybatis是什么架构设计首先建立起Mapper的代理工程和代理映射器的注册和使用XML文件解析数据源解析、创建和使用SQL执行器&#xff08;Executor&#xff09;的定义与实现SQL解析参数处理器&#xff1a;策略模式实现封装处理结果注解 mybatis 是什么 MyBatis 是一款优…

中国5米分辨率坡度数据

中国5米分辨率坡度数据 坡度是地表单元陡缓的程度&#xff0c;通常把坡面的垂直高度和水平距离的比值称为坡度。坡度的表示方法有百分比法、度数法、密位法和分数法四种&#xff0c;其中以百分比法和度数法较为常用。 中国5米分辨率坡度数据集&#xff0c;利用5米分辨率DEM数据…

多肉植物,预计到2025我国市场规模将达到140亿元人民币

多肉植物是一种新兴的盆栽植物&#xff0c;由于造型各异、易于养殖、低维护难度等优点&#xff0c;在全球市场和中国市场受到了越来越多消费者的追捧。全球市场分析 从全球市场来看&#xff0c;多肉植物市场规模正在逐步扩大。各种形态各异的多肉植物受到消费者的喜爱&#xff…

trino 433 开启 HTTPS

什么要开启https 因为开始password验证要求必须得https。 摘要 trino节点之间可以不用开启SSL&#xff0c;对外访问开启SSL。如果自备证书可以直接配置到trino的config文件&#xff0c;如果没有证书可以使用mkcert生成自签证书&#xff08;客户端需要信任证书&#xff0c;尤…

【Leetcode 2487】从链表中移除节点 —— 单调栈

2487. 从链表中移除节点 给你一个链表的头节点head。 移除每个右侧有一个更大数值的节点。 返回修改后链表的头节点head。 示例 1&#xff1a; 输入&#xff1a;head [5,2,13,3,8] 输出&#xff1a;[13,8] 解释&#xff1a;需要移除的节点是 5 &#xff0c;2 和 3 。 节点 1…

aliexpress商品API(item_get-获得aliexpress商品详情):进行批量操作

使用AliExpress的店铺或分类API&#xff1a;这些API可以为你提供某个店铺或分类下的所有商品列表&#xff0c;然后你可以根据这个列表逐个查询商品详情。分批查询&#xff1a;你可以将商品ID分成多个批次&#xff0c;每次只查询一部分商品详情&#xff0c;这样既可以减少每次请…

如何搭建中后台管理系统

vue3 TS vite 搭建中后台管理系统 前言1、搭建步骤及方法2、集成多种插件功能&#xff0c;实现中后台按需使用3、新手学TS如何快速进入状态、定义TS类型4、layout搭建四款常见风格6、大屏搭建效果5、vue3Ts运营管理系统总结&#xff1a; 前言 要成功&#xff0c;先发疯&…

制造企业如何打破“信息孤岛”,跑赢从制造到“智造”的破局之路?

随着工业4.0时代到来&#xff0c;制造业乘上了智能制造发展的快车&#xff0c;但“乘客”却偏少。普华永道发布的《2022年数字化工厂转型调研报告》中指出&#xff0c;来自23个国家和地区的700多家受访企业中&#xff0c;只有10%的企业已经完成数字化转型计划或处于转型最后阶段…

快速打通 Vue 3(二):响应式对象基础

很激动进入了 Vue 3 的学习&#xff0c;作为一个已经上线了三年多的框架&#xff0c;很多项目都开始使用 Vue 3 来编写了 这一组文章主要聚焦于 Vue 3 的新技术和新特性 如果想要学习基础的 Vue 语法可以看我专栏中的其他博客 Vue&#xff08;一&#xff09;&#xff1a;Vue 入…

Prometheus插件安装(NodeExporter)二进制安装包安装

一&#xff0c;下载安装包并解压 **下载地址&#xff1a;**https://github.com/prometheus/node_exporter/releases 同样物理机上下载&#xff0c;然后上传到服务器&#xff0c;本次安装使用的版本为&#xff1a;node_exporter-1.5.0.linux-amd64 1&#xff0c;根据服务器情况…

2024上海城博会|上海国际城市与建筑博览会-官 网

2024上海城博会|上海国际城市与建筑博览会 时间&#xff1a;2024年10月30日-11月1日 地点&#xff1a;上海世博展览馆 主办单位&#xff1a;联合国人居署 上海市住房和城乡建设管理委员会 协办单位&#xff1a;上海世界城市日事务协调中心 展会介绍 上海国际城市与建筑博览…

AI:110-基于深度学习的药物分子结构生成与预测

🚀点击这里跳转到本专栏,可查阅专栏顶置最新的指南宝典~ 🎉🎊🎉 你的技术旅程将在这里启航! 从基础到实践,深入学习。无论你是初学者还是经验丰富的老手,对于本专栏案例和项目实践都有参考学习意义。 ✨✨✨ 每一个案例都附带有在本地跑过的关键代码,详细讲解供…

如何写出一份优秀的简历?(求职必知)

你需要知道的事 简历是对自己职场的总结和概括&#xff0c;是通往下一段职业经历的敲门砖和 垫脚石。 因此&#xff0c;一份好的简历应该突出应聘者的优势&#xff0c;并引起企业方的好奇心。 知己知彼&#xff0c;百战百胜&#xff0c;求职者只有了解自己&#xff0c;以及了解…

vue-mixins混入处理

定义 mixins&#xff08;混入&#xff09;&#xff1a;一种分发 Vue 组件中可复用功能的非常灵活的方式&#xff0c;mixins 是一个 js 对象&#xff0c;它可以包含我们组件script中的任意功能选项&#xff0c;如&#xff1a;data、components、methods、created、computed 等等…

Avalonia学习(十七)-CEF

今天开始继续Avalonia练习。 本节&#xff1a;CefNet 1.引入 CefNet.Avalonia.Eleven 2.项目引入 Program中加入 using Avalonia; using Avalonia.ReactiveUI; using Avalonia.Threading; using CefNet; using System; using System.IO; using System.Linq; using System…