C# MQTTNET 服务端+客户端 实现 源码示例

目录

1.演示效果

2.源码下载

3.服务端介绍

4.客户端介绍


1.演示效果

2.源码下载

下载地址:https://download.csdn.net/download/rotion135/89385802

3.服务端介绍

服务端用的控制台程序进行设计,实际使用可以套一层Windows服务的皮,进行服务部署。

调试用控制台显示收发的消息,便于直观

首先安装的MQTTNET 版本是4.3.6.1152 :

自定义了服务的客户端列表数据模型:

    public class MqttClientInfo
    {
        /// <summary>
        /// ID
        /// </summary>
        public string ClientId { get; set; }
        /// <summary>
        /// 客户端名称
        /// </summary>
        public string ClientName { get; set; }
        /// <summary>
        /// 订阅列表
        /// </summary>
        public List<MqttSubscription> Subscriptions { get; set; } = new List<MqttSubscription>();
    }

    public class MqttSubscription
    {
        /// <summary>
        /// 所属客户端
        /// </summary>
        public MqttClientInfo Parent { get; set; }
        /// <summary>
        /// 订阅消息
        /// </summary>
        public string Topic { get; set; }
    }

再对服务端的代码进行封装,添加响应的事件,做一些消息显示到控制台

服务端的代码就这么简单

    public class LSMQTTServer
    {
        MqttServer mqttServer;
        List<MqttClientInfo> MqttClients = new List<MqttClientInfo>();

        /// <summary>
        /// 初始化Mqtt服务并启动服务
        /// </summary>
        /// <param name="ip"></param>
        /// <param name="port"></param>
        public virtual void InitMqttServer(string ip, int port)
        {
            var mqttServerOptions =
                    new MqttServerOptionsBuilder()
                    .WithDefaultEndpoint()
                    .WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip))//set the ip of the server
                    .WithDefaultEndpointPort(port)//set the port of the server                    
                    .Build();
            mqttServer = new MqttFactory().CreateMqttServer(mqttServerOptions); // create MQTT service object
            mqttServer.ValidatingConnectionAsync += MqttServer_ValidatingConnectionAsync;
            mqttServer.ClientConnectedAsync += MqttServer_ClientConnectedAsync;
            mqttServer.ClientDisconnectedAsync += MqttServer_ClientDisconnectedAsync;
            mqttServer.ClientSubscribedTopicAsync += MqttServer_ClientSubscribedTopicAsync;
            mqttServer.ClientUnsubscribedTopicAsync += MqttServer_ClientUnsubscribedTopicAsync;
            mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync;
            mqttServer.ClientAcknowledgedPublishPacketAsync += MqttServer_ClientAcknowledgedPublishPacketAsync;
            mqttServer.InterceptingClientEnqueueAsync += MqttServer_InterceptingClientEnqueueAsync;
            mqttServer.ApplicationMessageNotConsumedAsync += MqttServer_ApplicationMessageNotConsumedAsync;

            mqttServer.StartAsync();
        }       

        private Task MqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg)
        {
            try
            {
            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_ApplicationMessageNotConsumedAsync", ex);
            }
            return Task.CompletedTask;
        }

        private Task MqttServer_InterceptingClientEnqueueAsync(InterceptingClientApplicationMessageEnqueueEventArgs arg)
        {
            try
            {
            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_InterceptingClientEnqueueAsync", ex);
            }
            return Task.CompletedTask;
        }

        private Task MqttServer_ClientAcknowledgedPublishPacketAsync(ClientAcknowledgedPublishPacketEventArgs arg)
        {
            try
            {

            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_ClientAcknowledgedPublishPacketAsync", ex);
            }
            return Task.CompletedTask;
        }

        /// <summary>
        /// 消息接收
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
        {
            try
            {
                var client = arg.ClientId;
                var topic = arg.ApplicationMessage.Topic;
                var content = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment);
                GlobalEvents.OnMessage($"接收到消息:Client[{client}] Topic[{topic}] Message[{content}]");
            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_InterceptingPublishAsync", ex);
            }
            return Task.CompletedTask;
        }

        /// <summary>
        /// 关闭Mqtt服务
        /// </summary>
        public async virtual Task StopMqttServer()
        {
            if (mqttServer != null)
            {
                if (mqttServer.IsStarted)
                {
                    await mqttServer.StopAsync();
                    mqttServer.Dispose();
                }
            }
        }

        /// <summary>
        /// 对客户端的连接进行验证
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        public virtual Task MqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
        {
            try
            {
                //验证ClientId
                if (string.IsNullOrWhiteSpace(arg.ClientId))
                {
                    arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                    return Task.CompletedTask;
                }

                //验证用户名和密码
                bool acceptflag = !(string.IsNullOrWhiteSpace(arg.UserName) || string.IsNullOrWhiteSpace(arg.Password));

                if (!acceptflag)
                {
                    //验证失败
                    arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                    return Task.CompletedTask;
                }
                arg.ReasonCode = MqttConnectReasonCode.Success;
            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_ValidatingConnectionAsync", ex);
            }
            return Task.CompletedTask;
        }

        /// <summary>
        /// 客户端连接成功
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        public virtual Task MqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
        {
            try
            {
                MqttClients.Add(new MqttClientInfo() { ClientId = arg.ClientId, ClientName = arg.UserName });
                GlobalEvents.OnMessage($"客户端上线- ID:【{arg.ClientId}】 Name:【{arg.UserName}】");
            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_ClientConnectedAsync", ex);
            }
            return Task.CompletedTask;
        }

        /// <summary>
        /// 客户端断开连接
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        /// <exception cref="NotImplementedException"></exception>
        public virtual Task MqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
        {
            try
            {
                MqttClientInfo? mqttUser = MqttClients.FirstOrDefault(t => t.ClientId == arg.ClientId);
                if (mqttUser != null)
                {
                    MqttClients.Remove(mqttUser);
                    GlobalEvents.OnMessage($"客户端离线- ID:【{mqttUser.ClientId}】");
                }

            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_ClientDisconnectedAsync", ex);
            }
            return Task.CompletedTask;
        }

        /// <summary>
        /// 客户端发布订阅
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        public virtual Task MqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
        {
            try
            {
                if (arg == null)
                    return Task.CompletedTask;
                MqttClientInfo? mqttUser = MqttClients.FirstOrDefault(t => t.ClientId == arg.ClientId);
                if (mqttUser != null)
                {
                    mqttUser.Subscriptions.Add(new MqttSubscription() { Parent = mqttUser, Topic = arg.TopicFilter.Topic });
                    GlobalEvents.OnMessage($"客户端发布订阅- Topic:【{arg.TopicFilter.Topic}】");
                }
            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_ClientSubscribedTopicAsync", ex);
            }
            return Task.CompletedTask;
        }


        /// <summary>
        /// 客户端取消订阅
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        public virtual Task MqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
        {
            try
            {
                if (arg == null)
                    return Task.CompletedTask;
                MqttClientInfo? mqttUser = MqttClients.FirstOrDefault(t => t.ClientId == arg.ClientId);
                if (mqttUser != null)
                {
                    MqttSubscription? mqttSubedTopic = mqttUser.Subscriptions.FirstOrDefault(t => t.Topic == arg.TopicFilter);
                    if (mqttSubedTopic != null)
                    {
                        mqttUser.Subscriptions.Remove(mqttSubedTopic);
                        GlobalEvents.OnMessage($"客户端取消订阅- Topic:【{mqttSubedTopic.Topic}】");
                    }
                }
            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_ClientUnsubscribedTopicAsync", ex);
            }
            return Task.CompletedTask;
        }


    }

然后在控制台运行的时候,对服务进行实例化

var ip = IPHelper.GetLocalIP();
int port = 3303;
LSMQTTServer server = new LSMQTTServer();
server.InitMqttServer(ip,port);
GlobalEvents.OnMessage($"MQTT服务启动,IP:{ip},Port{port}");

4.客户端介绍

客户端设计,用的WPF,将每个客户端连接,用自定义控件进行封装,界面添加多个控件即表示多个客户端,添加订阅后,即可以显示收到的消息,便于多个客户端之间的消息调试

首先对连接的客户端也进行了类封装,包括连接,订阅,取消订阅和消息接收等

    public class LSMQTTClient
    {
        MqttClient mqttClient;
        public delegate void DelegateOutMessage(string message);
        public event DelegateOutMessage OnOutMessage;

        public void InitMqttClient(string serverIp, int serverPort, string clientId, string userName, string password)
        {
            try
            {
                var options = new MqttClientOptionsBuilder()
                     .WithCleanSession(true)
                     .WithCredentials(userName, password)
                     .WithClientId(clientId)
                     .WithTcpServer(serverIp,serverPort)
                     .Build();

                ConnectMQTTServer(options);
            }
            catch (Exception ex)
            {
                LogOperate.Error("InitMqttClient 发生异常", ex);
            }
        }

        /// <summary>
        /// 判断是否已连接服务
        /// </summary>
        /// <returns></returns>
        public bool IsConnect()
        {
            if (mqttClient == null)
                return false;
            if(!mqttClient.IsConnected)
                return false;
            return true;
        }


        public async void ConnectMQTTServer(MqttClientOptions options)
        {
            MqttFactory factory = new MqttFactory();
            if (mqttClient == null)
            {
                mqttClient = (MqttClient)factory.CreateMqttClient();
                mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync; ;
                mqttClient.ConnectedAsync += MqttClient_ConnectedAsync; ;
                mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync; ;
            }

            await mqttClient.ConnectAsync(options);
        }


        /// <summary>
        /// 断开服务连接
        /// </summary>
        public void DisConnectMQTTServer()
        {
            if(mqttClient!=null && mqttClient.IsConnected)
            {
                mqttClient.DisconnectAsync();
            }
        }

        /// <summary>
        /// 添加订阅
        /// </summary>
        /// <param name="topic"></param>
        /// <returns></returns>
        public async Task<BaseResult> AddSubscription(string topic)
        {
            if(!IsConnect())
            {
                return new BaseResult(false, "请先连接服务");
            }

            await mqttClient.SubscribeAsync(topic);
            return BaseResult.Successed;
        }

        /// <summary>
        /// 取消订阅
        /// </summary>
        /// <param name="topic"></param>
        /// <returns></returns>
        public async Task<BaseResult> UnSubscription(string topic)
        {
            if (!IsConnect())
            {
                return new BaseResult(false, "请先连接服务");
            }

            await mqttClient.UnsubscribeAsync(topic);
            return BaseResult.Successed;
        }

        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="content"></param>
        /// <returns></returns>
        public async Task<BaseResult> SendMessage(string topic,string content)
        {
            if (!IsConnect())
            {
                return new BaseResult(false, "请先连接服务");
            }

            await mqttClient.PublishStringAsync(topic, content);
            return BaseResult.Successed;
        }

        private Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
        {
            OnOutMessage?.Invoke("已断开服务连接");
            return Task.CompletedTask;
        }

        private Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
        {
            if (arg.ConnectResult.ResultCode == MqttClientConnectResultCode.Success)
                OnOutMessage?.Invoke("已连接到服务");
            else
                OnOutMessage?.Invoke($"连接服务失败【{arg.ConnectResult.ReasonString}】");
            return Task.CompletedTask;
        }

        private Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
        {
            if (arg.ApplicationMessage.PayloadSegment.Array != null)
            {
                var content = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment.Array);// BitConverter.ToString(arg.ApplicationMessage.PayloadSegment.Array,0, arg.ApplicationMessage.PayloadSegment.Count);
                OnOutMessage?.Invoke($"[{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}]-收到消息>>{content}");
            }
            return Task.CompletedTask;
        }
    }

再设计自定义控件,将连接属性单独实例化

自定义控件包括XAML和ViewModel的设计,详细的可以下载源码进行查看,此处不展示太多了,代码量也确实有一些些,无非就是 连接的各个参数,如IP、端口,客户端ID,用户名、密码等等

然后再主界面设计两个按钮,添加自定义控件和清理自定义控件

界面设计的东西不介绍太多了,因为客户端可以有很多种设计的方式,但通讯那一块就已经在上边展示的代码里边了;

到此服务端+客户端就已经实现了,是不是没有想象中那么复杂。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/674317.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

015、列表_应用场景

1.消息队列 如图所示,Redis的lpush+brpop命令组合即可实现阻塞队列,生产者客户端使用lrpush从列表左侧插入元素,多个消费者客户端使用brpop命令阻塞式的“抢”列表尾部的元素,多个客户端保证了消费的负载均衡和高可用性。 2.文章列表 每个用户有属于自己的文章列表,现…

CSAPP Lab07——Malloc Lab完成思路

等不到天黑 烟火不会太完美 回忆烧成灰 还是等不到结尾 ——她说 完整代码见&#xff1a;CSAPP/malloclab-handout at main SnowLegend-star/CSAPP (github.com) Malloc Lab 按照惯例&#xff0c;我先是上来就把mm.c编译了一番&#xff0c;结果产生如下报错。搜索过后看样子应…

Matlab进阶绘图第58期—带填充纹理的横向堆叠图

带填充纹理的横向堆叠图是通过在原始横向堆叠图的基础上添加不同的纹理得到的&#xff0c;可以很好地解决由于颜色区分不够而导致的对象识别困难问题。 由于Matlab中未收录提供填充纹理选项&#xff0c;因此需要大家自行设法解决。 本文使用hatchfill2工具&#xff08;Kesh I…

一些智能音箱类的软硬件方案

主要参考资料 Rabbit R1: https://www.rabbit.tech/rabbit-r1 mediatek-helio-p35: https://www.mediatek.com/products/smartphones-2/mediatek-helio-p35 NSdisplay: https://www.nsdisplay.com/ai-holobox-mini/ai-holobox-mini.html RK3566: https://www.rock-chips.com/a/…

I2C总线上拉电阻计算

I2C 总线上拉电阻计算 I2C接口的上拉电阻计算是一个常见问题。本文介绍如何使用简单的方程式进行计算。 1 介绍 I2C通信标准是当今电子系统中应用最广泛的芯片间通信标准。它是一种漏极开路/集电极开路通信标准&#xff0c;这意味着可以连接具有不同电源轨的集成电路 &#…

java jar包后台运行方式

在实际工作中&#xff0c;java开发的spring boot等通过jar包部署需要一直运行的程序部署到服务器上时&#xff0c;都希望后台运行&#xff0c;方便管理程序服务、防止被误操作关闭&#xff0c;本文结合自己工作经验讲解jar包后台运行的两种方式&#xff0c;分别是按操作系统支持…

LeetCode 两两交换链表中的节点

原题链接24. 两两交换链表中的节点 - 力扣&#xff08;LeetCode&#xff09; 思路&#xff0c;请看图片的过程模拟&#xff0c;这里添加了一个哨兵节点0&#xff0c;目的是为了方便操作&#xff0c;得到指向1节点的指针。 class Solution {public:ListNode* swapPairs(ListNod…

17、matlab实现均值滤波、中值滤波、Butterworth滤波和线性相位FIR滤波

1、创建信号 1&#xff09;创建正余弦信号、噪声信号和混合信号 原始正余弦信号公式&#xff1a;Signal1 sin(2*pi*20* t) sin(2*pi*40* t) sin(2*pi*60* t) 高斯分布的白噪声&#xff1a;NoiseGauss [randn(1,2000)] 均匀分布的白噪声&#xff1a;[rand(1,2000)] 正余弦…

AD软件底层丝印反转

快捷键VB&#xff0c;翻转后底部视图所有显示就正常了&#xff0c;当底层确认之后再按VB就回到正常状态。 否则你就看到一个镜像的丝印。 快捷键VB后 注意&#xff0c;经过VB反转BOTTOM后TOP层的丝印变镜像翻转了。 设计完毕后调整过来即可。

物质的量质量,它们可不是一个概念

物质的量&质量&#xff0c;它们可不是一个概念。 物质的量&质量 乍一听物质的量&#xff0c;还以为是和质量有什么关系&#xff0c;是不是&#xff1f;其实物质的量和质量没什么直接的联系。 物质的量是国际单位制中7个基本物理量之一&#xff0c;其符号为n&#xf…

【JAVA SE】抽象类和接口

✨✨欢迎大家来到Celia的博客✨✨ &#x1f389;&#x1f389;创作不易&#xff0c;请点赞关注&#xff0c;多多支持哦&#x1f389;&#x1f389; 所属专栏&#xff1a;JAVA 个人主页&#xff1a;Celias blog~ 目录 引言 一、抽象类 1.1 抽象类的定义 1.2 抽象方法 1.3 抽象…

QT等比例缩放图片

1、这里使用QLabe控件显示图片&#xff0c;如下&#xff1a; label->setPixmap(QPixmap::fromImage(image_2)); 以上是直接原始大小来显示QImage image_2图片。 label->setAlignment(Qt::AlignCenter); 以上代码&#xff0c;是将显示的图片居中展示&#xff0c;默认…

使用git将本地文件上传到仓库+git常用指令

个人主页&#xff1a;Yang-ai-cao 一个小小搬砖人~~~~~ 目录 个人主页&#xff1a;Yang-ai-cao 一个小小搬砖人~~~~~ 配置 基本操作 分支操作 远程仓库 标签 撤销操作 日常操作示例 进阶操作 &#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#…

【python】OpenCV—Merge Image

文章目录 np.hstack / np.vstackSlicecv2.addWeighted自定义渐变式叠加cv2.bitwise_not / cv2.bitwise_and / cv2.add np.hstack / np.vstack 利用 numpy 的 hstack 和 vstack&#xff0c;对图片进行拼接 import cv2 import numpy as nph, w 256,256 img1 cv2.resize(cv2.i…

企业内业务系统与Activiti流程引擎的结合(十一)

摘要:前文分享了企业内部系统集成Activiti的架构和API设计,这里再介绍下 bpmn 流程图的绘制以及与 流程图与bpm后台服务代码的结合点。 一、画流程图 以使用 eclipse 画流程图为例 1. 将 Activiti BPMN 插件安装到 eclipse 插件安装成本后的效果:新建向导中出现 Activiti…

干货分享 | TSMaster 中 Hex 文件编辑器使用详细教程

TSMaster 软件的 Hex 文件编辑器提供了文件处理的功能&#xff0c;这一特性让使用 TSMaster 软件的用户可以更便捷地对 Hex、bin、mot、s19 和 tsbinary 类型的文件进行处理。 本文重点讲述 TSMaster 中 Hex 文件编辑器的使用方法&#xff0c;该编辑器能实现将现有的 Hex、bin、…

Python 的 os 和 shutil 模块

大家好&#xff0c;在日常的编程工作中&#xff0c;处理文件和目录是一个非常常见的任务。无论是创建、复制、移动还是删除文件&#xff0c;这些操作都需要我们与文件系统进行交互。在 Python 中&#xff0c;有两个强大的模块可以帮助我们轻松地进行文件和目录操作&#xff0c;…

怎么判断护眼台灯真的有用吗?揭秘护眼台灯怎么选!

中国目前面临着严重的近视问题&#xff0c;各学段学生的近视率普遍偏高&#xff0c;且高度近视的占比也不容忽视。这不仅对学生的身体健康构成威胁&#xff0c;也对国家的经济社会可持续发展和国家安全构成潜在风险。随着时代的进步和教育的普及&#xff0c;儿童近视的问题已经…

私有仓库搭建

目前市面上比较常见的私有仓库搭建方法为&#xff1a; 通过 Sinopia 或 verdaccio 搭建&#xff08;Sinopia 已经停止维护&#xff0c;verdaccio 是 Fork 自 Sinopia&#xff0c;基本上大同小异&#xff09;&#xff0c;其优点是搭建简单&#xff0c;不需要其他服务。通过 cnp…

ESP32-C3模组上跑通OTA升级(11)

接前一篇文章&#xff1a;ESP32-C3模组上跑通OTA升级&#xff08;10&#xff09; 本文内容参考&#xff1a; 《ESP3-C3 物联网工程开发实战》 乐鑫科技 特此致谢&#xff01; 上一回说到解决了证书验证的问题&#xff0c;但紧接着又出现了一个新的问题&#xff1a;版本相同无…