网页端HTML使用MQTTJs订阅RabbitMQ数据

news2025/5/25 12:00:47

  最近在做一个公司的日志组件时有一个问题难住了我。今天问题终于解决了。由于在解决问题中,在网上也查了很多资料都没有一个完整的实例可以参考。所以本着无私分享的目的记录一下完整的解决过程和实例。

  需求:做一个统一日志系统可以查看日志列表和一个可以订阅最新日志的页面。通过提供一个封装好日志记录方法的sdk文件将日志统一收集。

  通过上面的需求进行我们使用RabbitMQ+Mongodb来实现系统。

  使用C#封装一个SDK大家都会这里就不说了。C#连接RabbitMQ示例代码也是一堆堆的也没什么好说的。下面重点说一下网页端如何使用JS去订阅RabbitMQ收到的最新日志信息。

  后端都是使用RabbitMQ的AMQP协议,而前端要求在网页HTML上显示数据。我们选择了使用MQTT协议从RabbitMQ中订阅数据。

  具体步骤:

1、先准备好相关JS库。MQTT有一个叫browserMqtt.js看名字就知道是为浏览器提供的JS库。还有一个封装了操作MQ的JS库 mqfactory.js。最后还要一个jquery.js文件。这样工具就准备好了。JS文件下载

2、HTML端代码。

<script type="text/javascript" src="~/js/MqJs/jquery.js"></script>
<script type="text/javascript" src="~/js/MqJs/browserMqtt.min.js"></script>
<script type="text/javascript" src="~/js/MqJs/mqfactory.js"></script>
<body>
    <div>
        <lable>Host: </lable><input id="txtHost" placeholder="192.168.1.88" value="10.1.0.7" /><br />
        <lable>Port: </lable><input id="txtPort" placeholder="15675" value="15675" /><br />
        <label>UserName: </label><input id="txtUserName" placeholder="username" value="admin" /><br />
        <label>Password: </label><input id="txtPassword" placeholder="password" value="admin" /><br />
        <label>Protocol: </label><input id="txtProtocol" placeholder="ws" value="ws" /><br />
        <input id="btnConnect" type="button" value="Connect RabbitMQ" />
    </div>
    <div>
        <input id="btnSubscribe" type="button" value="Subscribe" />
        <input id="btnPublish" type="button" value="Publish" /><br />
        <input id="btnSSHuanjing" type="button" value="Subscribe Huanjing" />
        <input id="hdnIsSubscribed" type="hidden" value="" />
        <input id="btnPubHuanjing" type="button" value="Publish Huanjing"><br />
        路由:<input id="btnRoutingKey" type="text" value="Dcon/Logs/Client"><br />
        <input id="txtMessage" type="text" placeholder="Please enter message" />
    </div>
    <div>
        <label>log:</label><br />
        <ul id="lstLog"></ul>
        <input id="btnClearLog" type="button" value="Clear Log" />
    </div>
</body>
<script type="text/javascript">
    $(function () {
        var mqclient;
        //var routingKey = 'Dcon.Logs.ServerWebShow';
        var message;

        $('#btnSubscribe').attr('disabled', 'disabled');
        $('#btnPublish').attr('disabled', 'disabled');
        $('#btnSSHuanjing').attr('disabled', 'disabled');
        $('#btnPubHuanjing').attr('disabled', 'disabled');

        $('#btnConnect').click(function () {
            var mqttOpts = {
                host: (() => $('#txtHost').val())(),
                port: (() => $('#txtPort').val())(),
                username: (() => $('#txtUserName').val())(),
                password: (() => $('#txtPassword').val())(),
                //transformWsUrl方法用于在浏览器中使用MQTT的场景,默认情况下,MQTT自动生成的url为ws://ip:port形式,
                //然而服务器要求的格式是ws://ip:port/ws,所以MQTT提供了此接口用于在生成url时自定义url格式
                transformWsUrl: (url, opts, client) => { return opts.protocol && opts.protocol == 'ws' ? url + 'ws' : url; },
                clientId: (() => { return 'mqttjs_' + Math.random().toString(16).substr(2, 8); })()
            };
            var biz = {
                huanjing: function (handler, isOn) {
                    if (isOn !== false) {
                        this.ss(this.topics.huanjing, handler);
                    } else {
                        this.sus(this.topics.huanjing, handler);
                    }
                },
                topics: {
                    huanjing: '/hyj/huanjing/monitor'
                }
            };
            //系统初始化时注入连接选项
            mqfactory.inject(mqttOpts, biz);
            //创建mqclient单例 
            mqclient = mqfactory.create();
            //注册mqclient的连接成功事件
            mqclient.on('connect', mqconnected);
        });

        $('#btnSubscribe').click(function () {
            if ($(this).val() == 'Subscribe') {
                //订阅成功后,仅注册一次事件(要考虑每次注册事件时,事件处理器调用的次数,如果仅用一次,就用once方法)
                //routingKey = $("#btnRoutingKey").val();
                mqclient.once('onss', mqSubscribeSuccess);
                //简单订阅
                mqclient.ss($("#btnRoutingKey").val());
            } else {
                mqclient.once('onsus', mqUnsubscribeSuccess)
                mqclient.sus($("#btnRoutingKey").val());
            }
        });

        $('#btnPublish').click(function () {
            var msg = $('#txtMessage').val().length > 0 ? $('#txtMessage').val() : guid();
            if (message === msg) {
                msg = guid();
            }
            message = msg;
            $('#txtMessage').val(message);
            //发送消息
            mqclient.pub($("#btnRoutingKey").val(), message);
            $('#lstLog').append('<li>Send Message: ' + message + '</li>');
        });

        $('#btnSSHuanjing').click(function () {
            if ($(this).val() == 'Subscribe Huanjing') {
                mqclient.once('onss', mqHJSubscribeSuccess);
                mqclient.huanjing(onHuanjingMessageArrived);
            } else {
                mqclient.once('onsus', mqHJUnsubscribeSuccess);
                mqclient.huanjing(onHuanjingMessageArrived, false);
            }
        });

        $('#btnPubHuanjing').click(function () {
            var msg = $('#txtMessage').val().length > 0 ? $('#txtMessage').val() : guid();
            if (message === msg) {
                msg = guid();
            }
            message = msg;
            $('#txtMessage').val(message);
            //发送消息
            mqclient.pub(mqclient.topics.huanjing, message);
            $('#lstLog').append('<li>Send Huanjing Message: ' + message + '</li>');
        });

        $('#btnClearLog').click(function () {
            $('#lstLog').empty();
        });

        function mqconnected() {
            //alert("mqconnected");
            $('#btnSubscribe').removeAttr('disabled');
            $('#btnPublish').removeAttr('disabled');
            $('#btnSSHuanjing').removeAttr('disabled');
            $('#btnPubHuanjing').removeAttr('disabled');
            $('#lstLog').append('<li>mqclient connected</li>');
        }

        function mqSubscribeSuccess() {
            //订阅成功,就注册接受消息的方法,此处要接收多次,因此使用了on
            mqclient.on($("#btnRoutingKey").val(), onMessageArrived);
            $('#btnSubscribe').val('Unsubscribe');
            $('#lstLog').append('<li>Subscribe successful.' + $("#btnRoutingKey").val()+'</li>');
        }

        function mqUnsubscribeSuccess() {
            //注销订阅,所以将事件处理器解除绑定
            mqclient.off($("#btnRoutingKey").val(), onMessageArrived);
            $('#btnSubscribe').val('Subscribe');
            $('#lstLog').append('<li>Unsubscribe successful</li>');
        }

        function mqHJSubscribeSuccess() {
            $('#btnSSHuanjing').val('Unsubscribe Huanjing');
            $('#lstLog').append('<li>Hanjing Subscribe successful</li>');
        }

        function mqHJUnsubscribeSuccess() {
            $('#btnSSHuanjing').val('Subscribe Huanjing');
            $('#lstLog').append('<li>Huanjing Unsubscribe successful</li>');
        }

        function onMessageArrived(message) {
            $('#lstLog').append('<li>Receive message: ' + new Date().toString() + '    ' + message.toString() + '</li>');
        }

        function onHuanjingMessageArrived(message) {
            $('#lstLog').append('<li>Receive Huanjing message: ' + new Date().toString() + '    ' + message.toString() + '</li>');
        }

        function guid() {
            function s4() {
                return Math.floor((1 + Math.random()) * 0x10000)
                    .toString(16)
                    .substring(1);
            }
            return s4() + s4() + '-' + s4() + '-' + s4() + '-' +
                s4() + '-' + s4() + s4() + s4();
        }
    });
</script>

3.后端代码:

3.1客户端sdk代码

/// <summary>
        /// 写日志
        /// </summary>
        /// <param name="model"></param>
        public static void Write(LogModel model)
        {
            //判断写入的日志级别
            if (model != null && model.LogLevel >= LogLevel)
            {
                try
                {
                    var mqMsg = new MqMessage()
                    {
                        MessageBody = JSON.Serialize(model),
                        MessageRouter = SystemConst.RoutingKeyTopic.LogTopic_Producer
                    };
                    //MQHelper.Instance.ProducerMessage_Fanout(mqMsg);
                    MQHelper.Instance.ProducerMessage_Topic(mqMsg);
                }
                catch (Exception ex)
                {
                    var errorLog = string.Format("Ip:{0},LogHelper.Write方法异常,{1}", IpHelper.LocalHostIp, ex.Message);
                    //MQHelper.Instance.ProducerMessage_Fanout(new MqMessage() { MessageBody = errorLog });
                    MQHelper.Instance.ProducerMessage_Topic(new MqMessage() { MessageBody = errorLog });
                }
            }
        }

3.2后端MQ代码:

#region 主题 交换机
        /// <summary>
        /// 生产者 客户端调用
        /// </summary>
        /// <param name="msg"></param>
        public void ProducerMessage_Topic(MqMessage msg)
        {
            try
            {
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        var body = Encoding.UTF8.GetBytes(msg.MessageBody);
                        channel.BasicPublish(exchange: SystemConst.MqName_LogMq_TopicDefault,
                                             routingKey: msg.MessageRouter,
                                             basicProperties: null,
                                             body: body);
                        Console.WriteLine(" [x] Sent {0}", msg.MessageBody);
                    }
                }
            }
            catch (Exception ex)
            {
                var exMsg = ex.Message;
            }
        }

        /// <summary>
        /// 消费者 服务器接收并写入数据库
        /// 消费方法无法通过参数传入
        /// EventHandler<BasicDeliverEventArgs> received
        /// </summary>
       public void ConsumeMessage_Topic(params string[] routingKeys)
        {
            if (routingKeys == null || routingKeys.Length == 0)
            {
                throw new Exception("请指定接收路由");
            }
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    var queueName = channel.QueueDeclare().QueueName;//获得已经生成的随机队列名
                    //对列与交换机绑定
                    foreach (var rKey in routingKeys)
                    {
                        channel.QueueBind(queue: queueName,
                                      exchange: SystemConst.MqName_LogMq_TopicDefault,
                                      routingKey: rKey);
                    }

                    var consumer = new EventingBasicConsumer(channel);
                    //绑定消费方法
                    consumer.Received += consomer_Received_Topic;
                    //绑定消费者
                    channel.BasicConsume(queue: queueName,
                                         autoAck: true,
                                         consumer: consumer);
                    Console.WriteLine("日志订阅服务启动成功.");
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
        }

        /// <summary>
        /// 接收通知服务异步的推送
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        void consomer_Received_Topic(object sender, BasicDeliverEventArgs e)
        {
            var body = e.Body;
            var message = Encoding.UTF8.GetString(body);
            Console.WriteLine(" [x] {0}", message);
//这里可以增加写入数据库的代码
        }
        #endregion

3.3路由

/// <summary>
        /// 主题路由
        /// </summary>
        public class RoutingKeyTopic
        {
            /// <summary>
            /// 生产者
            /// </summary>
            public const string LogTopic_Producer = "Dcon.Logs.Client";

            /// <summary>
            /// 消息者_日志服务_保存日志
            /// </summary>
            public const string LogTopic_Consume_Server_SaveDB = "Dcon.Logs.*";

            /// <summary>
            /// 消息者_日志服务_Web显示日志
            /// </summary>
            public const string LogTopic_Consume_Server_WebShow = "Dcon.Logs#";//".Logs.Client";

            /// <summary>
            /// 消息者_日志服务_Web显示日志
            /// </summary>
            public const string LogTopic_Consume_Server_WebShow_T = "*.Logs.Client";//".Logs.Client";

            /// <summary>
            /// 消息者_日志服务_ # 接收所有
            /// </summary>
            public const string LogTopic_Consume_Server_All = "#";//".Logs.Client";
        }
    }

注意点:

1、MQTT的路由是以 / 来分割的。在RabbitMQ中会被转义成 . 如示例中的路由Dcon/Logs/Client会被转换成 Dcon.Logs.Client

2、网页端接收时的路由要和发送端的路由一至。也就是说 后端用 Dcon.Logs.Client 来推数据前端就要使用 Dcon/Logs/Client来接收数据。

3、MQTT路由不支持通配符.

4、由于MQTT的JS库没有提供Topic交换机与路由绑定功能。所以前端接收时 不能设置订阅主题交换机名称。如果要和amqp交互只能使用amqp的默认主题交换机名称 amq.topic

运行效果图:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1593278.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

轻量化模块整理,即插即用

轻量化模块整理&#xff0c;即插即用&#xff08;持续更新&#xff09; 整理一些轻量化的结构&#xff0c;作为知识储备&#xff0c;可以用到后续的项目和研究中 Mobilenetv3 深度可分离卷积 MobileNetV3 是一个轻量级的深度学习模型&#xff0c;专为移动和边缘设备上的高效…

IntelliJ IDEA 2024 for Mac/Win:引领Java开发新纪元的高效集成环境

在日新月异的软件开发领域&#xff0c;一款高效、智能的集成开发环境&#xff08;IDE&#xff09;无疑是程序员们不可或缺的神兵利器。今天&#xff0c;我要为大家介绍的&#xff0c;正是这样一款集大成之作——IntelliJ IDEA 2024。无论是Mac用户还是Windows用户&#xff0c;只…

Golang | Leetcode Golang题解之第27题移除元素

题目&#xff1a; 题解&#xff1a; func removeElement(nums []int, val int) int {left, right : 0, len(nums)for left < right {if nums[left] val {nums[left] nums[right-1]right--} else {left}}return left }

8个 可以让 Python 加速的 tips

Python 是一种脚本语言&#xff0c;相比 C/C 这样的编译语言&#xff0c;在效率和性能方面存在一些不足。但是&#xff0c;有很多时候&#xff0c;Python 的效率并没有想象中的那么夸张。本文对一些 Python 代码加速运行的技巧进行整理。 0. 代码优化原则 本文会介绍不少的 P…

攻防世界12-baby_web

12-baby_web 题目说想想初始页面是哪个&#xff0c;一般都是index.php&#xff0c;然后如题分析即可。 我们在链接后面拼接上/index.php&#xff0c;返回后发现界面又回到了1.php&#xff0c;有可能是重定向。 我们点击检查-网络&#xff0c;发现没有index的请求&#xff0c;…

番外篇 | YOLOv8改进之在C2f中引入即插即用RepViTBlock模块 | CVPR2024清华RepViT

前言:Hello大家好,我是小哥谈。YOLOv8是一种基于深度学习的实时物体检测算法,其通过将物体检测任务转化为目标框回归问题,并使用卷积神经网络实现高效的特征提取和目标分类。然而,YOLOv8在处理一些复杂场景和小目标时可能存在一定的性能限制。为了克服YOLOv8的局限性,清华…

ES6: set和map数据结构以及使用场景

ES6:set和map数据结构 一、Set 数据结构&#xff1a;二、使用场景&#xff1a;使用Set 进行去重三、Map 数据结构四、使用场景&#xff1a;使用Map进行树型数据懒加载刷新五、Set和Map的区别六、Map、Set的实际使用场景 Set 和 Map 是 ES6 中引入的两种新的数据结构&#xff0c…

代码随想录算法训练营三刷day53 | 动态规划之子序列 1143.最长公共子序列 1035.不相交的线 53. 最大子序和

day53 1143.最长公共子序列1.确定dp数组&#xff08;dp table&#xff09;以及下标的含义2.确定递推公式3.dp数组如何初始化4.确定遍历顺序5.举例推导dp数组 1035.不相交的线53. 最大子序和1.确定dp数组&#xff08;dp table&#xff09;以及下标的含义2.确定递推公式3.dp数组如…

2024 十五届蓝桥杯省赛Python B组

以下仅是我的答案&#xff0c;仅供参考&#xff0c;欢迎讨论。 A&#xff1a;穿越时空之门 二进制、四进制转换。答案&#xff1a;63。 B&#xff1a;数字串个数 排除0&#xff0c;总的方案数9^10000,减去不存在3和不存在7的2*8^10000&#xff0c;再加上同时不存在3和7的7^…

Coding and Paper Letter(八十九)

CPL之第八十九期。 1 Coding: 1.openai通用代理转换是一个用于将其他厂商服务转为openai 标准接口相应的工具. 通过该工具, 可以将其他厂商的服务转为openai 标准接口. 讯飞星火,通义千问,gemini,openai,copilot,double&#xff0c;kimi&#xff0c;智谱清言 使用spring2webf…

Quantum Temple借助Sui通过NFT推动再生旅游

从金融到艺术&#xff0c;从游戏到无线网络&#xff0c;各行各业都涌现出大量初创公司&#xff0c;利用区块链技术颠覆现状。说到旅游业&#xff0c;让人联想到拥挤的机场、快节奏的旅游和豪华游轮&#xff0c;可能看起来对区块链创新持守旧态度。一家初创公司认为现在是时候改…

FreeRTOS创建第一个程序

使用freeRTOS创建任务时使用如下函数 函数的参数 创建一个FreeRTOS任务点亮led灯实现led灯500毫秒翻转一次 具体的代码实现 #include "stm32f10x.h" // Device header #include "Delay.h" #include "freeRTOS.h" #include &quo…

使用 Python 开发一个 Python 解释器

计算机只能理解机器码。归根结底&#xff0c;编程语言只是一串文字&#xff0c;目的是为了让人类更容易编写他们想让计算机做的事情。真正的魔法是由编译器和解释器完成&#xff0c;它们弥合了两者之间的差距。解释器逐行读取代码并将其转换为机器码。 在本文中&#xff0c;我…

C#简单工厂模式的实现

using System.Diagnostics.Metrics; using System.Runtime.InteropServices; using static 手写工厂模式.Program;namespace 手写工厂模式 {internal class Program{public interface eats {void eat();}//定义了一个接口public class rice : eats{public void eat() {Console.…

代码随想录算法训练营DAY25|C++回溯算法Part.2|216. 组合总和III、17.电话号码的字母组合

文章目录 216. 组合总和III题意理解树形结构伪代码实现剪枝操作CPP代码实现 17.电话号码的字母组合解题思路树形结构伪代码实现隐藏回溯CPP代码 216. 组合总和III 力扣题目链接 文章讲解&#xff1a;216. 组合总和III 视频讲解&#xff1a;和组合问题有啥区别&#xff1f;回溯算…

Oracle获取对象的DDL创建语句

1.命令行方式&#xff08;如&#xff1a;sqlplus&#xff09; ## 用户 select dbms_metadata.get_ddl(USER,TEST) from dual;## 表 select dbms_metadata.get_ddl(TABLE,TEST,T1) from dual;## 表空间 select dbms_metadata.get_ddl(TABLESPACE,TBS_NAME) from dual;## 索引 s…

【MySQL】:深入解析多表查询(下)

&#x1f3a5; 屿小夏 &#xff1a; 个人主页 &#x1f525;个人专栏 &#xff1a; MySQL从入门到进阶 &#x1f304; 莫道桑榆晚&#xff0c;为霞尚满天&#xff01; 文章目录 &#x1f4d1;前言一. 自连接1.1 自连接查询1.2 联合查询 二. 子查询2.1 概述2.2 分类2.3 标量子查…

Linux数据库自动备份 - 定时任务发到百度云盘、坚果云、邮箱附件

前言 1. 坚果云的webdav云盘最好&#xff01; &#xff08;免费账号每月1G上传流量&#xff09; 2. 不建议数据库备份文件发送到SMTP邮箱&#xff0c;因为对方服务器非常容易当做垃圾邮件处理&#xff0c;而且发信的SMTP账号会被封禁&#xff08;实测163发到QQ邮箱被封&…

区块链媒体推广的8个成功案例解析-华媒舍

区块链领域作为一个新兴行业&#xff0c;媒体推广对于项目的成功发展起着至关重要的作用。本文将从八个成功案例中来分析区块链媒体推广的重要性和成功策略。 1. 媒体报道对于区块链项目的重要影响 媒体报道是提升区块链项目知名度和用户认可度的重要手段。对于区块链项目来说…