Python模拟MQTT v3.1.1服务器

示例代码

import logging
import asyncio
from hbmqtt.broker import Broker

# 设置日志级别为DEBUG
logging.basicConfig(level=logging.DEBUG)

# 创建MQTT服务器
broker = Broker()

# 启动MQTT服务器
async def start_broker():
    await broker.start()

# 停止MQTT服务器
async def stop_broker():
    await broker.stop()

# 主函数
async def main():
    # 启动MQTT服务器
    await start_broker()

    try:
        # 保持主程序运行
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        # 捕捉键盘中断信号
        pass

    # 停止MQTT服务器
    await stop_broker()

# 运行主程序
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

运行结果

"D:\Program Files (x86)\Python
310\python.exe" E:/文档/Projects/Pycharm/forklift/MQTT_SERVER.py
Traceback (most recent call last):
  File "E:\文档\Projects\Pycharm\forklift\MQTT_SERVER.py", line 10, in <module>
    from hbmqtt.broker import Broker
  File "D:\Program Files (x86)\Python310\lib\site-packages\hbmqtt\broker.py", line 15, in <module>
    from hbmqtt.session import Session
  File "D:\Program Files (x86)\Python310\lib\site-packages\hbmqtt\session.py", line 8, in <module>
    from hbmqtt.mqtt.publish import PublishPacket
  File "D:\Program Files (x86)\Python310\lib\site-packages\hbmqtt\mqtt\__init__.py", line 5, in <module>
    from hbmqtt.mqtt.packet import (
  File "D:\Program Files (x86)\Python310\lib\site-packages\hbmqtt\mqtt\packet.py", line 8, in <module>
    from hbmqtt.adapters import ReaderAdapter, WriterAdapter
  File "D:\Program Files (x86)\Python310\lib\site-packages\hbmqtt\adapters.py", line 6, in <module>
    from websockets.protocol import WebSocketCommonProtocol
ImportError: cannot import name 'WebSocketCommonProtocol' from 'websockets.protocol' (D:\Program Files (x86)\Python310\lib\site-packages\websockets\protocol.py)

Process finished with exit code 1

cannot import name ‘WebSocketCommonProtocol’ from ‘websockets.protocol’

解决办法:
降低websockets版本

pip install websockets==8.1

在这里插入图片描述
新报错:

"D:\Program Files (x86)\Python385\python3.exe" E:/文档/Projects/Pycharm/forklift/MQTT_SERVER.py
Traceback (most recent call last):
  File "D:\Program Files (x86)\Python385\lib\site-packages\hbmqtt\broker.py", line 185, in _build_listeners_config
    listeners_config = broker_config['listeners']
KeyError: 'listeners'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "E:/文档/Projects/Pycharm/forklift/MQTT_SERVER.py", line 17, in <module>
    broker = Broker()
  File "D:\Program Files (x86)\Python385\lib\site-packages\hbmqtt\broker.py", line 157, in __init__
    self._build_listeners_config(self.config)
  File "D:\Program Files (x86)\Python385\lib\site-packages\hbmqtt\broker.py", line 192, in _build_listeners_config
    raise BrokerException("Listener config not found invalid: %s" % ke)
hbmqtt.broker.BrokerException: Listener config not found invalid: 'listeners'

Process finished with exit code 1

解决办法
根据错误信息,看起来问题出在您的MQTT服务器配置中缺少了"listeners"配置项。您可以在实例化Broker对象时,提供一个有效的配置,包括"listeners"配置项。

config = {
    "listeners": {
        "default": {
            "type": "tcp",
            "bind": "localhost:1883",  # 监听本地1883端口
            "max_connections": 10  # 最大连接数
        }
    },
    "sys_interval": 10,
    "topic-check": {
        "enabled": False
    }
}

broker = Broker(config)

在这里插入图片描述参考:《mqtt 客户端和服务端搭建及基本使用详解》

下载客户端
在这里插入图片描述
打开客户端,新建连接
在这里插入图片描述
Github教程

创建教程

本地MQTT服务器

import os
import subprocess

# 启动Mosquitto MQTT服务器
def start_mosquitto_server():
    command = "mosquitto"
    process = subprocess.Popen(command, shell=True)
    process.wait()

# 停止Mosquitto MQTT服务器
def stop_mosquitto_server():
    command = "pkill mosquitto"
    os.system(command)

# 主函数
def main():
    try:
        # 启动MQTT服务器
        start_mosquitto_server()

        # 保持主程序运行
        while True:
            pass
    except KeyboardInterrupt:
        # 捕捉键盘中断信号
        pass

        # 停止MQTT服务器
        stop_mosquitto_server()

# 运行主程序
if __name__ == "__main__":
    main()

在这里插入图片描述
系统找不到mosquitto命令,这可能是因为没有安装或正确配置Mosquitto MQTT服务器软件。

要解决这个问题,需要确保已经正确安装了Mosquitto MQTT服务器软件,并将其路径配置到系统的环境变量中。

安装Mosquitto MQTT服务器软件:

从Mosquitto官方网站(https://mosquitto.org/)下载并安装适合你操作系统的Mosquitto MQTT服务器软件。

配置系统环境变量:
将Mosquitto MQTT服务器软件的安装路径添加到系统的环境变量中,这样系统就能够找到mosquitto命令。

按照以下步骤进行配置:

打开系统的“高级系统设置”
在“系统变量”部分,找到名为“Path”的变量,选中它并点击“编辑”按钮
在编辑窗口中,点击“新建”按钮,并将Mosquitto MQTT服务器软件的安装路径添加进去
确认所有对话框并保存更改

重新运行代码:在完成以上步骤后,关闭并重新打开命令行窗口,然后再次运行你的代码,系统应该能够找到并启动mosquitto命令。
在这里插入图片描述
在这里插入图片描述在这里插入图片描述
在这里插入图片描述
重启电脑后,依然读取不到mosquitto:

"D:\Program Files (x86)\Python310\python310.exe" E:/文档/Projects/Pycharm/forklift/MQTT_SERVER_0629_01.py
'mosquitto' 不是内部或外部命令,也不是可运行的程序
或批处理文件。

在这里插入图片描述
在任务栏看到了该服务,说明服务已经启动,通过客户端模拟,成功发送了数据
在这里插入图片描述
在这里插入图片描述

import paho.mqtt.client as mqtt

# 定义回调函数,用于处理接收到的消息
def on_message(client, userdata, msg):
    print("Received message:", msg.topic, msg.payload.decode())

# 创建MQTT客户端实例
client = mqtt.Client()

# 配置MQTT服务器地址和端口
broker_address = "mqtt://localhost:1883"
client.connect(broker_address)

# 设置订阅回调函数
client.on_message = on_message

# 订阅主题
topic = "0629tt"
client.subscribe(topic)

try:
    # 启动MQTT客户端循环
    client.loop_forever()
except KeyboardInterrupt:
    # 捕捉键盘中断信号
    pass

# 停止MQTT客户端循环和断开连接
client.loop_stop()
client.disconnect()

在这里插入图片描述

pip install paho-mqtt

在这里插入图片描述

"D:\Program Files (x86)\Python310\python310.exe" E:/文档/Projects/Pycharm/forklift/MQTT_SERVER_0629_01.py
Traceback (most recent call last):
  File "E:\文档\Projects\Pycharm\forklift\MQTT_SERVER_0629_01.py", line 19, in <module>
    client.connect(broker_address)
  File "D:\Program Files (x86)\Python310\lib\site-packages\paho\mqtt\client.py", line 914, in connect
    return self.reconnect()
  File "D:\Program Files (x86)\Python310\lib\site-packages\paho\mqtt\client.py", line 1044, in reconnect
    sock = self._create_socket_connection()
  File "D:\Program Files (x86)\Python310\lib\site-packages\paho\mqtt\client.py", line 3685, in _create_socket_connection
    return socket.create_connection(addr, timeout=self._connect_timeout, source_address=source)
  File "D:\Program Files (x86)\Python310\lib\socket.py", line 824, in create_connection
    for res in getaddrinfo(host, port, 0, SOCK_STREAM):
  File "D:\Program Files (x86)\Python310\lib\socket.py", line 955, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno 11001] getaddrinfo failed

Process finished with exit code 1

解决办法

# 配置MQTT服务器地址和端口
broker_address = "mqtt://localhost:1883"
client.connect(broker_address)

修改为:

# 配置MQTT服务器地址和端口
broker_address = "mqtt://127.0.0.1:1883"
client.connect(broker_address)

依然无法访问
在这里插入图片描述
Try Except 打印出错误
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
参考文章

C:\Windows\System32\drivers\etc

在这里插入图片描述
在使用 paho-mqtt 库时,broker_address 的格式应该是 host:port,而不是使用 mqtt:// 协议前缀。
修改代码中的 broker_address 如下:

broker_address = 'localhost'  # MQTT 服务器的主机名或 IP 地址
port = 1883  # MQTT 服务器的端口号
try:
    client.connect(broker_address, port=port)
except Exception as e:
    print("connect error:", e)

在这里插入图片描述
源码:

import paho.mqtt.client as mqtt

# 定义回调函数,用于处理接收到的消息
def on_message(client, userdata, msg):
    print("Received message:", msg.topic, msg.payload.decode())

# 创建MQTT客户端实例
client = mqtt.Client()

# 配置MQTT服务器地址和端口
broker_address = 'localhost'  # MQTT 服务器的主机名或 IP 地址
port = 1883  # MQTT 服务器的端口号
try:
    client.connect(broker_address, port=port)
except Exception as e:
    print("connect error:", e)

# 设置订阅回调函数
client.on_message = on_message

# 订阅主题
topic = "0629tt"
client.subscribe(topic)

try:
    # 启动MQTT客户端循环
    client.loop_forever()
except KeyboardInterrupt:
    # 捕捉键盘中断信号
    pass

# 停止MQTT客户端循环和断开连接
client.loop_stop()
client.disconnect()

其他参考代码
打开 Mosquitto 的配置文件。在 Windows 系统上,
配置文件通常位于 Mosquitto 安装目录的 \mosquitto\mosquitto.conf 或 \mosquitto\mosquitto.conf 文件中。

找到 listener 部分的配置。如果找不到该部分,请在文件的末尾添加以下内容:

listener 1883
bind_address 192.168.183.176

参考链接:《Mosquitto 安装指南》
在这里插入图片描述

C:\Program Files\mosquitto> mosquitto.exe -c mosquitto.conf -v

在这里插入图片描述

listener 1883
allow_anonymous true

通讯测试结果:

success!!

在这里插入图片描述

订阅全部消息

可以将主题设置为通配符 #。通配符 # 表示匹配主题层级中的任意层级。

import paho.mqtt.client as mqtt

# 定义回调函数,用于处理接收到的消息
def on_message(client, userdata, msg):
    print("Received message:", msg.topic, msg.payload.decode())

# 创建MQTT客户端实例
client = mqtt.Client()

# 配置MQTT服务器地址和端口
broker_address = 'localhost'  # MQTT 服务器的主机名或 IP 地址
port = 1883  # MQTT 服务器的端口号
try:
    client.connect(broker_address, port=port)
except Exception as e:
    print("connect error:", e)

# 设置订阅回调函数
client.on_message = on_message

# 订阅所有主题
topic = "#"
client.subscribe(topic)

try:
    # 启动MQTT客户端循环
    client.loop_forever()
except KeyboardInterrupt:
    # 捕捉键盘中断信号
    pass

# 停止MQTT客户端循环和断开连接
client.loop_stop()
client.disconnect()



本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/36784.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

python离线安装ibm_db

下载离线包ibm_db以及clidriver 下载imb_db 在pypi官方网站https://pypi.org/project/ibm-db/#files下载离线安装包ibm_db-3.0.2.tar.gz。下载clidriver 下载地址&#xff1a;https://public.dhe.ibm.com/ibmdl/export/pub/software/data/db2/drivers/odbc_cli/nt32_odbc_cli.…

C语言学生信息管理系统

C语言版学生信息管理系统 一&#xff0c;开发环境 操作系统&#xff1a;windows10, windows11, linux, mac等。开发工具&#xff1a;Qt, vscode, visual studio等开发语言&#xff1a;c语言 二&#xff0c;功能需求 1. 用户界面: 提供一个简洁的文本界面&#xff0c;用户可…

AI 对抗超级细菌:麦克马斯特大学利用深度学习发现新型抗生素 abaucin

内容一览&#xff1a;鲍曼不动杆菌是一种常见的医院获得性革兰氏阴性病原体&#xff0c;通常表现出多重耐药性。利用传统方法&#xff0c;发现抑制此菌的新型抗生素很困难。但利用机器学习可以快速探索化学空间&#xff0c;从而增加发现新型抗菌分子的可能性。近期&#xff0c;…

AI大数据智能视频融合平台EasyCVR新增Ehome黑白名单配置

EasyCVR视频融合平台基于云边端智能协同架构&#xff0c;具有强大的数据接入、处理及分发能力&#xff0c;平台支持海量视频汇聚管理&#xff0c;可支持多协议接入&#xff0c;包括市场主流标准协议与厂家私有协议及SDK&#xff0c;如&#xff1a;国标GB28181、RTMP、RTSP/Onvi…

2023-07-12:RocketMQ如何做到消息不丢失?

2023-07-12&#xff1a;RocketMQ如何做到消息不丢失&#xff1f; 答案2023-07-12&#xff1a; RocketMQ通过刷盘机制、消息拉取机制和ACK机制等多种方式来确保消息投递的可靠性&#xff0c;防止消息丢失。 1.刷盘机制 RocketMQ中的消息分为内存消息和磁盘消息&#xff0c;内…

【Linux】基础开发工具——gcc/g++篇

文章目录 一、预处理1.1 头文件展开1.2 条件编译 二、编译三、汇编四、链接4.1 什么是库?4.2 库的分类4.3 目标文件和库是如何链接的&#xff1f;4.3.1 动态链接4.3.2 静态链接 4.4 动静态链接的优缺点对比 五、Debug&&release 前言 &#xff1a;  在前面的文章里给大…

1、计算机网络核心

序号地址1计算机网络核心2数据库相关3Redis4Linux相关5JVM的内容6GC相关的7Java多线程与并发8Java多线程与并发-原理9Java常用类库与技巧10Java框架-Spring 文章目录 1、OSI开放式互联参考模型2、TCP/IP3、TCP报文头4、TCP的三次握手5、TCP的四次挥手6、为什么会有TIME_WAIT状态…

ARM Coresight 系列文章 7 - ARM Coresight 通过 AHB-AP 访问 cpu 内部 coresight 组件

文章目录 如下图所示&#xff0c;如果A78想去访问M33的内部 coresight 组件 ETM&#xff0c;需要要怎么做&#xff1f; 答案也正是在图中&#xff0c;首先A78 通过AXI 互联&#xff0c;接入到 APBIC 的 slave port&#xff0c;再通过APBIC 的 master 送出&#xff0c;而APBIC中…

机器学习-进化算法

进化算法 遗传算法&#xff08;Genetic Algorithm&#xff0c;GA&#xff09;crossovermutation 进化策略&#xff08;Evolutionary Strategies&#xff0c;ES&#xff09;基因编程&#xff08;Genetic Programming&#xff09;Multi-objective Evolutionary Algorithms 遗传算…

在Linux中传输文件文件夹的10个scp命令

scp 命令的基本语法 下面的命令将读作 copy source_file_name进入destination_folder在destination_host使用username account。 > scp source_file_name usernamedestination_host:destination_folder里面有很多参数scp你可以使用的命令。以下是可能在日常使用中使用的参数…

跟着Promise的节奏,让你的代码脱颖而出

文章目录 Promise简介Promise实例方法1. then(onFulfilled, onRejected)2. catch(onRejected)3. finally(onFinally)4. Promise.resolve(value)5. Promise.reject(reason)6. Promise.all(iterable)7. Promise.race(iterable) Promise实例方法1. prototype.then(onFulfilled, on…

基于springboot+Redis的前后端分离项目(八)-【黑马点评】

&#x1f381;&#x1f381;资源文件分享 链接&#xff1a;https://pan.baidu.com/s/1189u6u4icQYHg_9_7ovWmA?pwdeh11 提取码&#xff1a;eh11 好友关注&Feed流 &#xff08;一&#xff09;好友关注-关注和取消关注(二)好友关注-共同关注&#xff08;三&#xff09; 好友…

mfc120u.dll丢失修复,mfc120u.dll缺失的解决方法

MFC120u.dll缺失的原因 当系统中缺少或损坏了MFC120u.dll文件时&#xff0c;就会出现"MFC120u.dll缺失"的错误提示。造成MFC120u.dll缺失的原因可能有以下几种情况&#xff1a; 1.文件删除或损坏&#xff1a;MFC120u.dll文件可能因为误删除、病毒感染、硬盘故障等原…

Vue--》Vue3打造可扩展的项目管理系统后台的完整指南(十一)

今天开始使用 vue3 + ts 搭建一个项目管理的后台,因为文章会将项目的每一个地方代码的书写都会讲解到,所以本项目会分成好几篇文章进行讲解,我会在最后一篇文章中会将项目代码开源到我的GithHub上,大家可以自行去进行下载运行,希望本文章对有帮助的朋友们能多多关注本专栏…

Jmeter上传文件接口测试

Jmeter上传文件接口测试 接口测试&#xff0c;想必大家都做过&#xff0c;但是上传文件的接口&#xff0c;可能就不知所措。其实呢&#xff0c;还是那么回事~ 一、接口的业务 在接口文档缺失的前提下&#xff0c;那就从抓包玩起~Fiddler或者F12都可以。 本次我们接口实现的…

python验证公网ip与内网ip

什么是公网IP 公网IP&#xff08;Public IP&#xff09;是指在互联网中全球唯一标识一个设备或网络的IP地址。它是供公众访问和通信的IP地址。 公网IP是由互联网服务提供商&#xff08;ISP&#xff09;分配给用户或组织的&#xff0c;它允许设备通过互联网与其他设备进行通信…

如何高效的开展app的性能测试

目录 APP性能测试是什么 APP性能测试怎么做 性能测试场景的设计 性能指标的定义 规范化执行流程 性能数据数据收集 性能数据分析 性能问题定位 性能测试报告 APP性能测试是什么 从网上查了一下&#xff0c;貌似也没什么特别的定义&#xff0c;我这边根据自己的经验给出…

3.清除浮动

3.1 为什么需要清除浮动? 由于父级盒子在很多情况下&#xff0c;不方便给高度&#xff0c;但是子盒子浮动又不占有位置&#xff0c;最后父级盒子高度为0时&#xff0c;就会影响下面的标准流盒子。 ●由于浮动元素不再占用原文档流的位置&#xff0c;所以它会对后面的元素排…

进制转换解析

进制 进制介绍 对于整数&#xff0c;有四种表示方式&#xff1a; 二进制&#xff1a;0,1 &#xff0c;满 2 进 1.以 0b 或 0B 开头。 十进制&#xff1a;0-9 &#xff0c;满 10 进 1。 八进制&#xff1a;0-7 &#xff0c;满 8 进 1. 以数字 0 开头表示。 十六进制&#xff1…

Spring Boot 中的 Spring Cloud Feign

Spring Boot 中的 Spring Cloud Feign Spring Boot 是一个非常流行的 Java Web 开发框架&#xff0c;它提供了很多工具和组件来简化 Web 应用程序的开发。其中&#xff0c;Spring Cloud Feign 是 Spring Boot 中的一个非常重要的组件&#xff0c;它可以帮助我们实现声明式的 R…