C#MQTT编程06--MQTT服务器和客户端(winform版)

1、前言

介绍完基础理论部分,下面在Windows平台上搭建一个简单的MQTT应用,进行简单的应用,整体架构如下图所示;

消息模型:

运用MQTT协议,设备可以很方便地连接到物联网云服务,管理设备并处理数据,最后应用到各种业务场景,如下图所示

前面介绍过,MQTT可以运行在几乎所有的平台,windows,linux什么的都可以,各种语言都有实现MQTT的组件,.net也好,Java也好,都有封装好的mqtt服务器和客户端组件或插件来实现,本系列是在.net平台下实现mqtt的服务器和客户端。

常见的MQTT服务器包括Eclipse Mosquitto、EMQ X、HiveMQ、RabbitMQ、MQTTNET等,本系列文章都是基于.net平台的mqtt服务通信,开发环境vs2022,.net framework4.8。

2、服务器搭建

1、创建项目方案

 

 2、添加库引用

 

 

3、UI布局

 

 

4、控件代码

“启动”按钮

停止按钮

 窗体加载

全部完整代码 

注意这里面用到了多线程及任务task,委托action,异步async及await的技术,在mqtt中必须使用这些技术,否则界面会卡死。

using MQTTnet.Client.Receiving;
using MQTTnet.Server;
using MQTTnet;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
using MQTTnet.Protocol;

namespace MQTTNETServerForms
{
    public partial class Form1 : Form
    {

        private IMqttServer server;//mqtt服务器对象
        List<TopicItem> Topics = new List<TopicItem>();

        public Form1()
        {
            InitializeComponent();
        }

        private void Form1_Load(object sender, EventArgs e)
        {
            //创建服务器对象
            server = new MqttFactory().CreateMqttServer();
            server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(new Action<MqttApplicationMessageReceivedEventArgs>(Server_ApplicationMessageReceived));//绑定消息接收事件
            server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(new Action<MqttServerClientConnectedEventArgs>(Server_ClientConnected));//绑定客户端连接事件
            server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(new Action<MqttServerClientDisconnectedEventArgs>(Server_ClientDisconnected));//绑定客户端断开事件
            server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(new Action<MqttServerClientSubscribedTopicEventArgs>(Server_ClientSubscribedTopic));//绑定客户端订阅主题事件
            server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(new Action<MqttServerClientUnsubscribedTopicEventArgs>(Server_ClientUnsubscribedTopic));//绑定客户端退订主题事件
            server.StartedHandler = new MqttServerStartedHandlerDelegate(new Action<EventArgs>(Server_Started));//绑定服务端启动事件
            server.StoppedHandler = new MqttServerStoppedHandlerDelegate(new Action<EventArgs>(Server_Stopped));//绑定服务端停止事件
        }

        /// <summary>
        /// 绑定消息接收事件
        /// </summary>
        /// <param name="e"></param>
        private void Server_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
        {
            string msg = e.ApplicationMessage.ConvertPayloadToString();
            WriteLog(">>> 收到消息:" + msg + ",QoS =" + e.ApplicationMessage.QualityOfServiceLevel + ",客户端=" + e.ClientId + ",主题:" + e.ApplicationMessage.Topic);
        }

        /// <summary>
        /// 绑定客户端连接事件
        /// </summary>
        /// <param name="e"></param>
        private void Server_ClientConnected(MqttServerClientConnectedEventArgs e)
        {
            Task.Run(new Action(() =>
            {
                lbClients.BeginInvoke(new Action(() =>
                {
                    lbClients.Items.Add(e.ClientId);
                }));
            }));
            WriteLog(">>> 客户端" + e.ClientId + "连接");
        }

        /// <summary>
        /// 绑定客户端断开事件
        /// </summary>
        /// <param name="e"></param>
        private void Server_ClientDisconnected(MqttServerClientDisconnectedEventArgs e)
        {
            Task.Run(new Action(() =>
            {
                lbClients.BeginInvoke(new Action(() =>
                {
                    lbClients.Items.Remove(e.ClientId);
                }));
            }));
            WriteLog(">>> 客户端" + e.ClientId + "断开");
        }

        /// <summary>
        /// 绑定客户端订阅主题事件
        /// </summary>
        /// <param name="e"></param>
        private void Server_ClientSubscribedTopic(MqttServerClientSubscribedTopicEventArgs e)
        {
            Task.Run(new Action(() =>
            {
                var topic = Topics.FirstOrDefault(t => t.Topic == e.TopicFilter.Topic);
                if (topic == null)
                {
                    topic = new TopicItem { Topic = e.TopicFilter.Topic, Count = 0 };
                    Topics.Add(topic);
                }
                if (!topic.Clients.Exists(c => c == e.ClientId))
                {
                    topic.Clients.Add(e.ClientId);
                    topic.Count++;
                }

                lvTopic.Invoke(new Action(() =>
                {
                    this.lvTopic.Items.Clear();
                }));

                foreach (var item in this.Topics)
                {

                    lvTopic.Invoke(new Action(() =>
                    {
                        this.lvTopic.Items.Add($"{item.Topic}:{item.Count}");
                    }));
                }
            }));
            WriteLog(">>> 客户端" + e.ClientId + "订阅主题" + e.TopicFilter.Topic);
        }

        /// <summary>
        /// 绑定客户端退订主题事件
        /// </summary>
        /// <param name="e"></param>
        private void Server_ClientUnsubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e)
        {
            Task.Run(new Action(() =>
            {
                var topic = Topics.FirstOrDefault(t => t.Topic == e.TopicFilter);
                if (topic != null)
                {
                    topic.Count--;
                    topic.Clients.Remove(e.ClientId);
                }

                this.lvTopic.Items.Clear();
                foreach (var item in this.Topics)
                {
                    this.lvTopic.Items.Add($"{item.Topic}:{item.Count}");
                }
            }));

            WriteLog(">>> 客户端" + e.ClientId + "退订主题" + e.TopicFilter);
        }

        /// <summary>
        /// 绑定服务端启动事件
        /// </summary>
        /// <param name="e"></param>
        private void Server_Started(EventArgs e)
        {
            WriteLog(">>> 服务端已启动!");
        }

        /// <summary>
        /// 绑定服务端停止事件
        /// </summary>
        /// <param name="e"></param>
        private void Server_Stopped(EventArgs e)
        {
            WriteLog(">>> 服务端已停止!");
        }

        /// <summary>
        /// 显示日志
        /// </summary>
        /// <param name="message"></param>
        public void WriteLog(string message)
        {
            if (txtMsg.InvokeRequired)
            {
                txtMsg.Invoke(new Action(() =>
                {
                    txtMsg.Text = "";
                    txtMsg.Text = (message + "\r");
                }));
            }
            else
            {
                txtMsg.Text = "";
                txtMsg.Text = (message + "\r");
            }
        }

        /// <summary>
        /// 启动
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        [Obsolete]
        private async void btnStart_Click(object sender, EventArgs e)
        {
            var optionBuilder = new MqttServerOptionsBuilder()
          .WithDefaultEndpointBoundIPAddress(System.Net.IPAddress.Parse(this.txtIP.Text))
          .WithDefaultEndpointPort(int.Parse(this.txtPort.Text))
          .WithDefaultCommunicationTimeout(TimeSpan.FromMilliseconds(5000))
          .WithConnectionValidator(t =>
          {
              string un = "", pwd = "";
              un = this.txtUname.Text;
              pwd = this.txtUpwd.Text;
              if (t.Username != un || t.Password != pwd)
              {
                  t.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
              }
              else
              {
                  t.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
              }
          });
            var option = optionBuilder.Build();
            //启动
            await server.StartAsync(option);
            WriteLog(">>> 服务器启动成功");
        }

        /// <summary>
        /// 停止 
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void btnStop_Click(object sender, EventArgs e)
        {
            if (server != null)
            {
                server.StopAsync();
            }
        }
    }
}

 注意这个端口,帐号,密码可以自己决定,Ip地址也是。

 注意这里面的代码,服务器上必须注册绑定实现的几个事件:消息接收事件 ,客户端连接事件,客户端断开事件,客户端订阅主题事件,客户端退订主题事件,服务端启动事件,服务端停止事件

 server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(new Action<MqttApplicationMessageReceivedEventArgs>(Server_ApplicationMessageReceived));//绑定消息接收事件
 server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(new Action<MqttServerClientConnectedEventArgs>(Server_ClientConnected));//绑定客户端连接事件
 server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(new Action<MqttServerClientDisconnectedEventArgs>(Server_ClientDisconnected));//绑定客户端断开事件
 server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(new Action<MqttServerClientSubscribedTopicEventArgs>(Server_ClientSubscribedTopic));//绑定客户端订阅主题事件
 server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(new Action<MqttServerClientUnsubscribedTopicEventArgs>(Server_ClientUnsubscribedTopic));//绑定客户端退订主题事件
 server.StartedHandler = new MqttServerStartedHandlerDelegate(new Action<EventArgs>(Server_Started));//绑定服务端启动事件
 server.StoppedHandler = new MqttServerStoppedHandlerDelegate(new Action<EventArgs>(Server_Stopped));//绑定服务端停止事件

启动测试服务器

 启动成功。

3、客户端创建

 1、添加项目

2、添加库引用 

注意这里添加的与服务器不一样,别混错了

 

3、UI布局

 布局使用的是常规的label,textbox,button 

4、控件代码

连接代码

订阅代码

 

发布代码

 完整代码

using MQTTnet.Client.Options;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
using MQTTnet;

namespace MQTTNETClientForms
{
    public partial class Form1 : Form
    {
        private IManagedMqttClient mqttClient;//客户端mqtt对象

        public Form1()
        {
            InitializeComponent();
        }

        /// <summary>
        /// 连接
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private async void btnConn_Click(object sender, EventArgs e)
        {
            var mqttClientOptions = new MqttClientOptionsBuilder()
              .WithClientId(this.txtId.Text)
              .WithTcpServer(this.txtIP.Text, int.Parse(this.txtPort.Text))
              .WithCredentials(this.txtName.Text, this.txtUpwd.Text);

            var options = new ManagedMqttClientOptionsBuilder()
                        .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
                        .WithClientOptions(mqttClientOptions.Build())
                        .Build();
            //开启
            await mqttClient.StartAsync(options);
        }

        /// <summary>
        /// 断开
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private async void btnClose_Click(object sender, EventArgs e)
        {
            if (mqttClient != null)
            {
                if (mqttClient.IsStarted)
                {
                    await mqttClient.StopAsync();
                }
                mqttClient.Dispose();
            }
        }

        private void Form1_Load(object sender, EventArgs e)
        {
            var factory = new MqttFactory();
            mqttClient = factory.CreateManagedMqttClient();//创建客户端对象

            //绑定断开事件
            mqttClient.UseDisconnectedHandler(async ee =>
            {
                WriteLog("与服务器之间的连接断开了,正在尝试重新连接");
                // 等待 5s 时间
                await Task.Delay(TimeSpan.FromSeconds(5));
                try
                {
                    mqttClient.UseConnectedHandler(tt =>
                    {
                        WriteLog(">>> 连接到服务成功");
                    });
                }
                catch (Exception ex)
                {
                    WriteLog($"重新连接服务器失败:{ex}");
                }
            });

            //绑定接收事件
            mqttClient.UseApplicationMessageReceivedHandler(aa =>
            {
                try
                {
                    string msg = aa.ApplicationMessage.ConvertPayloadToString();
                    WriteLog(">>> 消息:" + msg + ",QoS =" + aa.ApplicationMessage.QualityOfServiceLevel + ",客户端=" + aa.ClientId + ",主题:" + aa.ApplicationMessage.Topic);
                }
                catch (Exception ex)
                {
                    WriteLog($"+ 消息 = " + ex.Message);
                }
            });

            //绑定连接事件
            mqttClient.UseConnectedHandler(ee =>
            {
                WriteLog(">>> 连接到服务成功");
            });
        }

        /// <summary>
        /// 显示日志
        /// </summary>
        /// <param name="message"></param>
        private void WriteLog(string message)
        {
            if (txtMsg.InvokeRequired)
            {
                txtMsg.Invoke(new Action(() =>
                {
                    txtMsg.Text = (message);
                }));
            }
            else
            {
                txtMsg.Text = (message);
            }
        }

        /// <summary>
        /// 订阅
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>

        [Obsolete]
        private async void btnSub_Click(object sender, EventArgs e)
        {
            if (string.IsNullOrWhiteSpace(this.txtTopic.Text))
            {
                WriteLog(">>> 请输入主题");
                return;
            }
            //在 MQTT 中有三种 QoS 级别: 
            //At most once(0) 最多一次
            //At least once(1) 至少一次
            //Exactly once(2) 恰好一次
            //await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(this.tbTopic.Text).WithAtMostOnceQoS().Build());//最多一次, QoS 级别0
            await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(this.txtTopic.Text).WithAtLeastOnceQoS().Build());//恰好一次, QoS 级别1 
            WriteLog($">>> 成功订阅");
        }

        /// <summary>
        /// 发布
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private async void btnPub_Click(object sender, EventArgs e)
        {
            if (string.IsNullOrWhiteSpace(this.txtTopik.Text))
            {
                WriteLog(">>> 请输入主题");
                return;
            }
            var result = await mqttClient.PublishAsync(
                this.txtTopik.Text,
                this.txtContent.Text,
                MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);//恰好一次, QoS 级别1   
            WriteLog($">>> 主题:{this.txtTopik.Text},消息:{this.txtContent.Text},结果: {result.ReasonCode}");
        }
    }
}

4、运行测试

生成编译解决方案,成功后开始测试

1、启动服务器

2、启动客户端

找到生成的客户端debug目录下的.exe文件

3、测试连接 

连接成功,服务器看到客户端上线了

4、测试订阅

再运行一个客户端,连接服务器

5、测试发布

 c1向cced主题发布一个消息,结果是c1,c2都收到了消息

同样,c2发布一个消息,c1,c2都收到了消息

 6、测试下线

c1关闭,服务器马上知道了

 5、小结

基于mqttnet的组件搭建的mqtt服务器和客户端通信成功,发布和订阅都ko,ko,ko。

讲解不易,分析不易,原创不易,整理不易,伙伴们动动你的金手指,你的支持是我最大的动力。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/324886.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于SSM的网上招聘系统的设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…

云服务器CVM_云主机_云计算服务器_弹性云服务器-腾讯云

腾讯云服务器CVM提供安全可靠的弹性计算服务&#xff0c;腾讯云明星级云服务器&#xff0c;弹性计算实时扩展或缩减计算资源&#xff0c;支持包年包月、按量计费和竞价实例计费模式&#xff0c;CVM提供多种CPU、内存、硬盘和带宽可以灵活调整的实例规格&#xff0c;提供9个9的数…

【QML COOK】- 009-组件(Components)

组件对于QML来说就如同C的类一样。可以用同一个组件创建多个对象。 组件有两种定义方式&#xff1a; 用独立的.qml文件定义组件在.qml文件中用Component对象定义组件 1. 创建项目&#xff0c;新建文件IndependentComponent.qml import QtQuickRectangle {id : rootText {id…

Sqoop安全性:确保安全的数据传输

确保数据传输的安全性在大数据处理中至关重要。Sqoop作为一个用于数据传输的工具&#xff0c;也提供了多种安全性措施&#xff0c;以确保数据在传输过程中的机密性和完整性。本文将深入探讨Sqoop的安全性特性&#xff0c;提供详细的示例代码和全面的内容&#xff0c;以帮助大家…

Flink-SQL——时态表(Temporal Table)

时态表(Temporal Table) 文章目录 时态表(Temporal Table)数据库时态表的实现逻辑时态表的实现原理时态表的查询实现时态表的意义 Flink中的时态表设计初衷产品价格的例子——时态表汇率的例子——普通表 声明版本表声明版本视图声明普通表 一个完整的例子测试数据代码实现测试…

使用flutter开发一个渐变色按钮

因为项目需要&#xff0c;需要使用flutter开发一个渐变色的按钮&#xff0c;flutter自带的按钮样式不太好调整&#xff0c;所以需要自定义实现&#xff0c;实现的思路就是使用GestureDetector嵌套Container&#xff0c;Container里面嵌套text实现。 实现的效果&#xff1a; 实…

【Nuxt3】nuxt3目录文件详情描述:.nuxt、.output、assets、public、utils(一)

简言 nuxt3的中文网站 上次简单介绍了nuxt3创建项目的方法和目录文件大概用处。 这次详细说下.nuxt、.output、assets、public、utils五个文件夹的用处。 正文 .nuxt Nuxt在开发中使用.nuxt/目录来生成你的Vue应用程序。 为了避免将开发构建的输出推送到你的代码仓库中&…

C语言:自定义类型——结构体

一、什么叫做结构体 C语⾔已经提供了内置类型&#xff0c;如&#xff1a;char、short、int、long、float、double等&#xff0c;但是只有这些内置类型还是不够的&#xff0c;假设我想描述学⽣&#xff0c;描述⼀本书&#xff0c;这时单⼀的内置类型是不⾏的。描述⼀个学⽣需要 …

每日一练:LeeCode-144、145、94.二叉树的前中后序遍历【二叉树】

本文是力扣LeeCode-144、145、94.二叉树的前中后序遍历 学习与理解过程&#xff0c;本文仅做学习之用&#xff0c;对本题感兴趣的小伙伴可以出门左拐LeeCode前序遍历、中序遍历、后序遍历。 给你二叉树的根节点 root &#xff0c;返回它节点值的 前序遍历。 给定一个二叉树的根…

RK3399平台入门到精通系列讲解(外设篇)热成像传感器MLX90640 JNI控制程序

文章目录 JNI回调函数回调函数的实现驱动可以详看:链接 JNI 文件:native-lib.cpp

编译 FastDFS 时报错 fatal error: sf/sf_global.h: No such file or directory 解决办法

编译 FastDFS 时&#xff0c;报错如下 gcc -Wall -D_FILE_OFFSET_BITS64 -D_GNU_SOURCE -g -O1 -DDEBUG_FLAG -c -o ../common/fdfs_global.o ../common/fdfs_global.c -I../common -I/usr/local/include In file included from ../common/fdfs_global.c:21:0: ../common/fdf…

Ps:认识路径

在 Photoshop 中&#xff0c;路径 Path广泛地应用于创建精确的图像边界&#xff08;包括精准抠图&#xff09;以及复杂的图形设计之中。 路径又称为“矢量路径”&#xff0c;或者“贝塞尔曲线” Bezier Curves路径。 路径本身只是一种基于数学方程的“轮廓指示”&#xff0c;并…

曲面上偏移命令的查找

今天学习老王的SW绘图时&#xff0c;遇到一个命令找不到&#xff0c;查询了一会终于找到了这个命令&#xff0c;防止自己忘记&#xff0c;特此记录一下&#xff0c;这个命令就是“曲面上偏移”&#xff0c;网上好多的教程都是错误的&#xff0c;实际上这个命令没有在曲面里面&a…

绝地求生追封原理

绝地求生追封原理是指在网络游戏《绝地求生》中&#xff0c;玩家通过观察和分析游戏中的各种信息&#xff0c;追踪其他玩家的位置和行动&#xff0c;以便更好地进行战术和攻击。 追封原理主要通过以下几种方式实现&#xff1a; BattleEye作弊系统检测 绝地求生玩家对这个系统…

MHFormer 论文解读

目录​​​​​​​ Multi-Hypothesis Transformer 结果 Introduction & Related work 多假设 为什么作者提出这个模型&#xff1f; 3.Multi-Hypothesis Transformer 3.1 Preliminary 3.2 MultiHypothesis Generation 3.3 Temporal Embedding 3.4. SelfHypothesi…

Kubernetes (K8S) 3 小时快速上手 + 实践

1. Kubernetes 简介 k8s即Kubernetes。其为google开发来被用于容器管理的开源应用程序&#xff0c;可帮助创建和管理应用程序的容器化。用一个的例子来描述&#xff1a;"当虚拟化容器Docker有太多要管理的时候&#xff0c;手动管理就会很麻烦&#xff0c;于是我们便可以通…

网络安全的威胁PPT

建议的PPT免费模板网站&#xff1a;http://www.51pptmoban.com/ppt/ 此PPT模板下载地址&#xff1a;https://file.51pptmoban.com/d/file/2023/03/20/1ae84aa8a9b666d2103f19be20249b38.zip 内容截图&#xff1a;

2.3 数据链路层03

2.3 数据链路层03 2.3.7 以太网交换机 1、以太网交换机的基本功能 以太网交换机是基于以太网传输数据的交换机&#xff0c;以太网交换机通常都有多个接口&#xff0c;每个接口都可以直接与一台主机或另一个以太网交换机相连&#xff0c;一般都工作在全双工方式。 以太网交换…

个性化定制的知识付费小程序,为用户提供个性化的知识服务

明理信息科技知识付费saas租户平台 随着知识经济的兴起&#xff0c;越来越多的人开始重视知识付费&#xff0c;并希望通过打造自己的知识付费平台来实现自己的知识变现。本文将介绍如何打造自己的知识付费平台&#xff0c;并从定位、内容制作、渠道推广、运营维护四个方面进行…

C语言自学之运算符3

1、算术运算符 加减乘除 2、取模运算 3、递增递减运算符 4、赋值运算符 5、比较运算符 6、逻辑非运算符 7、逻辑与运算符 8、逻辑或运算符 9、运算符优先级