网页端HTML使用MQTTJs订阅RabbitMQ数据

  最近在做一个公司的日志组件时有一个问题难住了我。今天问题终于解决了。由于在解决问题中,在网上也查了很多资料都没有一个完整的实例可以参考。所以本着无私分享的目的记录一下完整的解决过程和实例。

  需求:做一个统一日志系统可以查看日志列表和一个可以订阅最新日志的页面。通过提供一个封装好日志记录方法的sdk文件将日志统一收集。

  通过上面的需求进行我们使用RabbitMQ+Mongodb来实现系统。

  使用C#封装一个SDK大家都会这里就不说了。C#连接RabbitMQ示例代码也是一堆堆的也没什么好说的。下面重点说一下网页端如何使用JS去订阅RabbitMQ收到的最新日志信息。

  后端都是使用RabbitMQ的AMQP协议,而前端要求在网页HTML上显示数据。我们选择了使用MQTT协议从RabbitMQ中订阅数据。

  具体步骤:

1、先准备好相关JS库。MQTT有一个叫browserMqtt.js看名字就知道是为浏览器提供的JS库。还有一个封装了操作MQ的JS库 mqfactory.js。最后还要一个jquery.js文件。这样工具就准备好了。JS文件下载

2、HTML端代码。

<script type="text/javascript" src="~/js/MqJs/jquery.js"></script>
<script type="text/javascript" src="~/js/MqJs/browserMqtt.min.js"></script>
<script type="text/javascript" src="~/js/MqJs/mqfactory.js"></script>
<body>
    <div>
        <lable>Host: </lable><input id="txtHost" placeholder="192.168.1.88" value="10.1.0.7" /><br />
        <lable>Port: </lable><input id="txtPort" placeholder="15675" value="15675" /><br />
        <label>UserName: </label><input id="txtUserName" placeholder="username" value="admin" /><br />
        <label>Password: </label><input id="txtPassword" placeholder="password" value="admin" /><br />
        <label>Protocol: </label><input id="txtProtocol" placeholder="ws" value="ws" /><br />
        <input id="btnConnect" type="button" value="Connect RabbitMQ" />
    </div>
    <div>
        <input id="btnSubscribe" type="button" value="Subscribe" />
        <input id="btnPublish" type="button" value="Publish" /><br />
        <input id="btnSSHuanjing" type="button" value="Subscribe Huanjing" />
        <input id="hdnIsSubscribed" type="hidden" value="" />
        <input id="btnPubHuanjing" type="button" value="Publish Huanjing"><br />
        路由:<input id="btnRoutingKey" type="text" value="Dcon/Logs/Client"><br />
        <input id="txtMessage" type="text" placeholder="Please enter message" />
    </div>
    <div>
        <label>log:</label><br />
        <ul id="lstLog"></ul>
        <input id="btnClearLog" type="button" value="Clear Log" />
    </div>
</body>
<script type="text/javascript">
    $(function () {
        var mqclient;
        //var routingKey = 'Dcon.Logs.ServerWebShow';
        var message;

        $('#btnSubscribe').attr('disabled', 'disabled');
        $('#btnPublish').attr('disabled', 'disabled');
        $('#btnSSHuanjing').attr('disabled', 'disabled');
        $('#btnPubHuanjing').attr('disabled', 'disabled');

        $('#btnConnect').click(function () {
            var mqttOpts = {
                host: (() => $('#txtHost').val())(),
                port: (() => $('#txtPort').val())(),
                username: (() => $('#txtUserName').val())(),
                password: (() => $('#txtPassword').val())(),
                //transformWsUrl方法用于在浏览器中使用MQTT的场景,默认情况下,MQTT自动生成的url为ws://ip:port形式,
                //然而服务器要求的格式是ws://ip:port/ws,所以MQTT提供了此接口用于在生成url时自定义url格式
                transformWsUrl: (url, opts, client) => { return opts.protocol && opts.protocol == 'ws' ? url + 'ws' : url; },
                clientId: (() => { return 'mqttjs_' + Math.random().toString(16).substr(2, 8); })()
            };
            var biz = {
                huanjing: function (handler, isOn) {
                    if (isOn !== false) {
                        this.ss(this.topics.huanjing, handler);
                    } else {
                        this.sus(this.topics.huanjing, handler);
                    }
                },
                topics: {
                    huanjing: '/hyj/huanjing/monitor'
                }
            };
            //系统初始化时注入连接选项
            mqfactory.inject(mqttOpts, biz);
            //创建mqclient单例 
            mqclient = mqfactory.create();
            //注册mqclient的连接成功事件
            mqclient.on('connect', mqconnected);
        });

        $('#btnSubscribe').click(function () {
            if ($(this).val() == 'Subscribe') {
                //订阅成功后,仅注册一次事件(要考虑每次注册事件时,事件处理器调用的次数,如果仅用一次,就用once方法)
                //routingKey = $("#btnRoutingKey").val();
                mqclient.once('onss', mqSubscribeSuccess);
                //简单订阅
                mqclient.ss($("#btnRoutingKey").val());
            } else {
                mqclient.once('onsus', mqUnsubscribeSuccess)
                mqclient.sus($("#btnRoutingKey").val());
            }
        });

        $('#btnPublish').click(function () {
            var msg = $('#txtMessage').val().length > 0 ? $('#txtMessage').val() : guid();
            if (message === msg) {
                msg = guid();
            }
            message = msg;
            $('#txtMessage').val(message);
            //发送消息
            mqclient.pub($("#btnRoutingKey").val(), message);
            $('#lstLog').append('<li>Send Message: ' + message + '</li>');
        });

        $('#btnSSHuanjing').click(function () {
            if ($(this).val() == 'Subscribe Huanjing') {
                mqclient.once('onss', mqHJSubscribeSuccess);
                mqclient.huanjing(onHuanjingMessageArrived);
            } else {
                mqclient.once('onsus', mqHJUnsubscribeSuccess);
                mqclient.huanjing(onHuanjingMessageArrived, false);
            }
        });

        $('#btnPubHuanjing').click(function () {
            var msg = $('#txtMessage').val().length > 0 ? $('#txtMessage').val() : guid();
            if (message === msg) {
                msg = guid();
            }
            message = msg;
            $('#txtMessage').val(message);
            //发送消息
            mqclient.pub(mqclient.topics.huanjing, message);
            $('#lstLog').append('<li>Send Huanjing Message: ' + message + '</li>');
        });

        $('#btnClearLog').click(function () {
            $('#lstLog').empty();
        });

        function mqconnected() {
            //alert("mqconnected");
            $('#btnSubscribe').removeAttr('disabled');
            $('#btnPublish').removeAttr('disabled');
            $('#btnSSHuanjing').removeAttr('disabled');
            $('#btnPubHuanjing').removeAttr('disabled');
            $('#lstLog').append('<li>mqclient connected</li>');
        }

        function mqSubscribeSuccess() {
            //订阅成功,就注册接受消息的方法,此处要接收多次,因此使用了on
            mqclient.on($("#btnRoutingKey").val(), onMessageArrived);
            $('#btnSubscribe').val('Unsubscribe');
            $('#lstLog').append('<li>Subscribe successful.' + $("#btnRoutingKey").val()+'</li>');
        }

        function mqUnsubscribeSuccess() {
            //注销订阅,所以将事件处理器解除绑定
            mqclient.off($("#btnRoutingKey").val(), onMessageArrived);
            $('#btnSubscribe').val('Subscribe');
            $('#lstLog').append('<li>Unsubscribe successful</li>');
        }

        function mqHJSubscribeSuccess() {
            $('#btnSSHuanjing').val('Unsubscribe Huanjing');
            $('#lstLog').append('<li>Hanjing Subscribe successful</li>');
        }

        function mqHJUnsubscribeSuccess() {
            $('#btnSSHuanjing').val('Subscribe Huanjing');
            $('#lstLog').append('<li>Huanjing Unsubscribe successful</li>');
        }

        function onMessageArrived(message) {
            $('#lstLog').append('<li>Receive message: ' + new Date().toString() + '    ' + message.toString() + '</li>');
        }

        function onHuanjingMessageArrived(message) {
            $('#lstLog').append('<li>Receive Huanjing message: ' + new Date().toString() + '    ' + message.toString() + '</li>');
        }

        function guid() {
            function s4() {
                return Math.floor((1 + Math.random()) * 0x10000)
                    .toString(16)
                    .substring(1);
            }
            return s4() + s4() + '-' + s4() + '-' + s4() + '-' +
                s4() + '-' + s4() + s4() + s4();
        }
    });
</script>

3.后端代码:

3.1客户端sdk代码

/// <summary>
        /// 写日志
        /// </summary>
        /// <param name="model"></param>
        public static void Write(LogModel model)
        {
            //判断写入的日志级别
            if (model != null && model.LogLevel >= LogLevel)
            {
                try
                {
                    var mqMsg = new MqMessage()
                    {
                        MessageBody = JSON.Serialize(model),
                        MessageRouter = SystemConst.RoutingKeyTopic.LogTopic_Producer
                    };
                    //MQHelper.Instance.ProducerMessage_Fanout(mqMsg);
                    MQHelper.Instance.ProducerMessage_Topic(mqMsg);
                }
                catch (Exception ex)
                {
                    var errorLog = string.Format("Ip:{0},LogHelper.Write方法异常,{1}", IpHelper.LocalHostIp, ex.Message);
                    //MQHelper.Instance.ProducerMessage_Fanout(new MqMessage() { MessageBody = errorLog });
                    MQHelper.Instance.ProducerMessage_Topic(new MqMessage() { MessageBody = errorLog });
                }
            }
        }

3.2后端MQ代码:

#region 主题 交换机
        /// <summary>
        /// 生产者 客户端调用
        /// </summary>
        /// <param name="msg"></param>
        public void ProducerMessage_Topic(MqMessage msg)
        {
            try
            {
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        var body = Encoding.UTF8.GetBytes(msg.MessageBody);
                        channel.BasicPublish(exchange: SystemConst.MqName_LogMq_TopicDefault,
                                             routingKey: msg.MessageRouter,
                                             basicProperties: null,
                                             body: body);
                        Console.WriteLine(" [x] Sent {0}", msg.MessageBody);
                    }
                }
            }
            catch (Exception ex)
            {
                var exMsg = ex.Message;
            }
        }

        /// <summary>
        /// 消费者 服务器接收并写入数据库
        /// 消费方法无法通过参数传入
        /// EventHandler<BasicDeliverEventArgs> received
        /// </summary>
       public void ConsumeMessage_Topic(params string[] routingKeys)
        {
            if (routingKeys == null || routingKeys.Length == 0)
            {
                throw new Exception("请指定接收路由");
            }
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    var queueName = channel.QueueDeclare().QueueName;//获得已经生成的随机队列名
                    //对列与交换机绑定
                    foreach (var rKey in routingKeys)
                    {
                        channel.QueueBind(queue: queueName,
                                      exchange: SystemConst.MqName_LogMq_TopicDefault,
                                      routingKey: rKey);
                    }

                    var consumer = new EventingBasicConsumer(channel);
                    //绑定消费方法
                    consumer.Received += consomer_Received_Topic;
                    //绑定消费者
                    channel.BasicConsume(queue: queueName,
                                         autoAck: true,
                                         consumer: consumer);
                    Console.WriteLine("日志订阅服务启动成功.");
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
        }

        /// <summary>
        /// 接收通知服务异步的推送
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        void consomer_Received_Topic(object sender, BasicDeliverEventArgs e)
        {
            var body = e.Body;
            var message = Encoding.UTF8.GetString(body);
            Console.WriteLine(" [x] {0}", message);
//这里可以增加写入数据库的代码
        }
        #endregion

3.3路由

/// <summary>
        /// 主题路由
        /// </summary>
        public class RoutingKeyTopic
        {
            /// <summary>
            /// 生产者
            /// </summary>
            public const string LogTopic_Producer = "Dcon.Logs.Client";

            /// <summary>
            /// 消息者_日志服务_保存日志
            /// </summary>
            public const string LogTopic_Consume_Server_SaveDB = "Dcon.Logs.*";

            /// <summary>
            /// 消息者_日志服务_Web显示日志
            /// </summary>
            public const string LogTopic_Consume_Server_WebShow = "Dcon.Logs#";//".Logs.Client";

            /// <summary>
            /// 消息者_日志服务_Web显示日志
            /// </summary>
            public const string LogTopic_Consume_Server_WebShow_T = "*.Logs.Client";//".Logs.Client";

            /// <summary>
            /// 消息者_日志服务_ # 接收所有
            /// </summary>
            public const string LogTopic_Consume_Server_All = "#";//".Logs.Client";
        }
    }

注意点:

1、MQTT的路由是以 / 来分割的。在RabbitMQ中会被转义成 . 如示例中的路由Dcon/Logs/Client会被转换成 Dcon.Logs.Client

2、网页端接收时的路由要和发送端的路由一至。也就是说 后端用 Dcon.Logs.Client 来推数据前端就要使用 Dcon/Logs/Client来接收数据。

3、MQTT路由不支持通配符.

4、由于MQTT的JS库没有提供Topic交换机与路由绑定功能。所以前端接收时 不能设置订阅主题交换机名称。如果要和amqp交互只能使用amqp的默认主题交换机名称 amq.topic

运行效果图:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/542324.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据——关键生产要素

数据作为数字经济时代的关键生产要素&#xff0c;逐步融入生产生活各方面&#xff0c;深刻影响并重构着经济社会运行和社会治理&#xff0c;已成为影响未来发展的关键战略性资源。近年来&#xff0c;我国高度重视发展数字经济、数据要素及其市场化配置改革&#xff0c;发布了一…

Go——网络编程

一. 互联网协议介绍 网络基础——网络传输基本流程_网络传输过程-CSDN博客 应用层HTTP协议-CSDN博客 传输层UDP/TCP协议_udp报文提供的确认号用于接收方跟发送方确认-CSDN博客 网络层IP协议-CSDN博客 链路层以太网详解_以太网数据链路层-CSDN博客 二. Socket编程 Socket是…

vite+react+ts+scss 创建项目

npm create vitelatest输入项目名称选择react选择typescript swc WC 通过利用 Rust 编写的编译器&#xff0c;使用了更先进的优化技术&#xff0c;使得它在处理 TypeScript 代码时能够更快地进行转换和编译。特别是在大型项目中&#xff0c;SWC 相对于传统的 TypeScript 编译器…

Hive的分区与排序

一、Hive分区 1.引入&#xff1a; 在大数据中&#xff0c;最常见的一种思想就是分治&#xff0c;我们可以把大的文件切割划分成一个个的小的文件&#xff0c;这样每次操作一个个小的文件就会很容易了&#xff0c;同样的道理&#xff0c;在hive当中也是支持这种思想的&#xff…

error: src refspec master does not match any

文章目录 1 问题复现2 问题解决 1 问题复现 在把文件推送到远程仓库时&#xff0c;出现了如下错误。 错误原因&#xff1a;没有“master”分支。 2 问题解决 1&#xff0c;查看现有分支&#xff1b; (base) macmacbook DesignPatterns % git branch * main2&#xff0c;创…

Unity上接入手柄,手柄控制游戏物体移动

1、unity软件上安装system input 组件。菜单栏【window】-【Packag Manager】打开如下界面,查找Input System,并且安装。 2、安装成功后插入手柄到windows上,打开菜单栏上【window】--【Analysis】--【Input Debuger】 进入Input Debug界面,可以看到手柄设备能被Unity识别。…

刷代码随想录有感(31):删除字符串中所有相邻重复项

题干&#xff1a; 代码&#xff1a; class Solution { public:stack<char> st;string res "";string removeDuplicates(string s) {for(char i : s){if(st.empty() || st.top() ! i){st.push(i);}else{st.pop();}}while(!st.empty()){res st.top();st.pop()…

使用云服务器搭建CentOS操作系统

云服务器搭建CentOS操作系统 前言一、购买云服务器腾讯云阿里云华为云 二、使用 XShell 远程登陆到 Linux关于 Linux 桌面下载 XShell安装XShell查看 Linux 主机 ip使用 XShell 登陆主机 三、无法使用密码登陆的解决办法 前言 CentOS是一种基于Red Hat Enterprise Linux&#…

:app debug:armeabi-v7a failed to configure C/C++

报错信息 由于刚换电脑不久&#xff0c;新建native c工程时&#xff0c;出现报错如下&#xff1a; :app debug:armeabi-v7a failed to configure C/C null java.lang.NullPointerExceptionat com.android.build.gradle.tasks.CmakeQueryMetadataGenerator.getProcessBuilder(…

了解大语言模型的参数高效微调(Parameter-Effcient Fine-Tuning)

&#x1f349; CSDN 叶庭云&#xff1a;https://yetingyun.blog.csdn.net/ 大语言模型在众多应用领域实现了突破性的进步&#xff0c;显著提升了各种任务的完成度。然而&#xff0c;其庞大的规模也带来了高昂的计算成本。这些模型往往包含数十亿甚至上千亿参数&#xff0c;需要…

uniapp 卡片勾选

前言 公司的app项目使用的uniapp&#xff0c;项目里有一个可勾选的卡片功能&#xff0c;效果图如下&#xff1a; 找了一圈没找到什么太好的组件&#xff0c;于是就自己简单写了一个&#xff0c;记录一下。避免以后还会用到 代码 <template><view class"card-…

虚幻引擎启动报错记录

0x00007FFEF0C8917C (UnrealEditor-CoreUObject.dll)处(位于 UnrealEditor.exe 中)引发的异常: 0xC0000005: 写入位置 0x0000000000000030 时发生访问冲突。 解决办法&#xff1a;首先查看堆栈信息&#xff0c;我的项目启动是因为默认场景编译不过&#xff0c;进到编辑器配置文…

【Linux实践室】Linux高级用户管理实战指南:用户所属组变更操作详解

&#x1f308;个人主页&#xff1a;聆风吟_ &#x1f525;系列专栏&#xff1a;Linux实践室、网络奇遇记 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 一. ⛳️任务描述二. ⛳️相关知识2.1 &#x1f514;Linux查看用户所属组2.1.1 &#x1f47b;使…

商城系统必备营销功能——分销裂变

电商流量红利已经过去&#xff0c;现在的电商营销&#xff0c;重点在于私域用户&#xff0c;在于客户资源裂变。人们通过分销裂变挖掘客户资源&#xff0c;能降低获客成本&#xff0c;对于需要解决成本困扰的企业来说&#xff0c;确实是个不错的选择。今天&#xff0c;我们就来…

【MySQL】:深入解析多表查询(下)

&#x1f3a5; 屿小夏 &#xff1a; 个人主页 &#x1f525;个人专栏 &#xff1a; MySQL从入门到进阶 &#x1f304; 莫道桑榆晚&#xff0c;为霞尚满天&#xff01; 文章目录 &#x1f4d1;前言一. 自连接1.1 自连接查询1.2 联合查询 二. 子查询2.1 概述2.2 分类2.3 标量子查…

c++11 标准模板(STL)本地化库 - 平面类别(std::codecvt) - 在字符编码间转换,包括 UTF-8、UTF-16、UTF-32 (四)

本地化库 本地环境设施包含字符分类和字符串校对、数值、货币及日期/时间格式化和分析&#xff0c;以及消息取得的国际化支持。本地环境设置控制流 I/O 、正则表达式库和 C 标准库的其他组件的行为。 平面类别 在字符编码间转换&#xff0c;包括 UTF-8、UTF-16、UTF-32 std::…

关于机器学习/深度学习的一些事-答知乎问(一)

如何将领域知识融入到深度学习模型中&#xff1f; 在早期的研究阶段&#xff0c;大多基于经验和对问题的一些理解&#xff0c; 启发式地设计网络&#xff0c; 并通过端到端学习的方式得到解决具体问题的深度模型。这类方法在很多问题如图像识别中取得了巨大的成功&#xff0c;…

Spring Boot统一功能处理(一)

本篇主要介绍Spring Boot的统一功能处理中的拦截器。 目录 一、拦截器的基本使用 二、拦截器实操 三、浅尝源码 初始化DispatcherServerlet 处理请求&#xff08;doDispatch) 四、适配器模式 一、拦截器的基本使用 在一般的学校或者社区门口&#xff0c;通常会安排几个…

前后端分离vue.js+nodejs新闻文章发布论坛网站系统44x94

Vue&#xff1a; Vue是一套用于构建用户界面的渐进式框架。与其它大型框架不同的是&#xff0c;Vue 被设计为可以自底向上逐层应用。Vue 的核心库只关注视图层&#xff0c;不仅易于上手&#xff0c;还便于与第三方库或既有项目整合。另一方面&#xff0c;当与现代化的工具链以及…

网络篇01 | 入门篇

网络篇01 | 入门篇 01 各层协议目录[网络篇02 | 应用层 kcp&#xff08;王者荣耀&#xff09;](https://blog.csdn.net/qiushily2030/article/details/135835946)[网络篇03 | 应用层 quic](https://blog.csdn.net/qiushily2030/article/details/136192481)网络篇04 | 应用层 mq…