MQTT服务搭建及python使用示例

1、MQTT协议

1.1、MQTT介绍

        MQTT(Message Queuing Telemetry Transport)是一种轻量级的、基于发布/订阅模式的通信协议,通常用于物联网设备之间的通讯。它具有低带宽、低功耗和开放性等特点,适合在网络带宽有限或者网络连接不稳定的环境下使用。MQTT协议使用TCP/IP协议栈进行通讯,支持多种编程语言和平台,并且能够提供可靠的消息传递机制。

        在MQTT中,设备可以发布消息到特定的主题(topic),同时其他设备可以订阅这些主题以接收相应的消息。这种发布/订阅模式使得设备之间的通讯更加灵活和高效。MQTT还支持三种服务质量等级:至多一次(at most once)、至少一次(at least once)和恰好一次(exactly once),以满足不同场景下对消息传递可靠性的需求。

        优点:代码量少,开销低,带宽占用小,即时通讯协议。

1.2、MQTT概念

        订阅(Subscribtion):订阅包含主题筛选器( Topic Filter )和最大服务质量( QoS )。订阅会与一个会话( Session )关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。

        会话(Session):每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。

        主题名(Topic Name):连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。需要注意的是, MQTT 中消息主题按照层级命名,使用 ‘/’ 进行分割。此外,主题中可以使用通配符进行多个主题或多层级的订阅,有两种常见的通配符:

        单层通配符 + :单层通配符只能匹配一层的主题,例如: China/Beijing/+ ,可以匹配的只有 Beijing 这个主 题下面一层的主题,例如 Xicheng, DongCheng, Xuanwu 等等。

        多层通配符 # :顾名思义,多层通配符就是可以匹配多个层级的主题,例如: China/# ,可以匹配到的主题可能有:China/Beijing/Dongcheng, China/Shanghai/PuDong ,等等。

        主题筛选器(Topic Filter):一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。

        消息订阅:消息订阅者所具体接收的内容。

1.3、MQTT中的角色

        Publisher 和 Subscriber 为客户端,Broker 为服务器端,消息主题为消息类型,Broker 根据 Topic 过滤消息,并将消息向客户端推送。

MQTT 中用 QoS 表示服务质量, MQTT 协议中有三种服务质量 (QoS) :

        1 ) QoS =0 ,至多一次,可能会出现丢包的情况,使用在对实时性要求不高的情况,例如,将此服务质量与通信环境传感器数据一起使用。 对于是否丢失个别读取或是否稍后立即发布新的读取并不重要。

        2 ) QoS =1, 至少一次,保证包会到达目的地,但是可能出现重包。

        3 ) QoS =2, 刚好一次,保证包会到达目的地,且不会出现重包的现象。

客户端:

        Publisher 和 Subscriber 都属于客户端。

        发布应用消息给其它相关的客户端。

        订阅以请求接受相关的应用消息。

        取消订阅以移除接受应用消息的请求。

        从服务端断开连接

服务器:

        服务器端即所谓的 MQTT Broker 服务器。

        接受来自客户端的网络连接。

        接受客户端发布的应用消息。

        处理客户端的订阅和取消订阅请求。

        转发应用消息给符合条件的已订阅客户端。

MQTT 提供的公共服务器端( Broker )有:

test.mosquitto.org

broker.hivemq.com

iot.eclipse.org

2、基于公共服务示例

        在次,选择使用EMQX提供的免费MQTT公共服务器,但同样可以选择其他任何MQTT broker。The Free Global Public MQTT Broker | Try Now | EMQ (emqx.com)

Broker: broker.emqx.io 
TCP Port: 1883 
Websocket Port: 8083

python库:pip install paho-mqtt=="1.6.1"

消息发布代码,pub.py

import time
import random
from paho.mqtt import client as mqtt_client

broker = 'broker.emqx.io'
port = 1883
topic = "/flask/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'


def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def publish(client):
    msg_count = 0
    while True:
        time.sleep(1)
        msg = f"messages: {msg_count}"
        result = client.publish(topic, msg)
        # result: [0, 1]
        status = result[0]
        if status == 0:
            print(f"Send `{msg}` to topic `{topic}`")
        else:
            print(f"Failed to send message to topic {topic}")
        msg_count += 1


def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)


if __name__ == '__main__':
    run()

运行打印结果:

消息订阅代码,sub.py:

import random
from paho.mqtt import client as mqtt_client

broker = 'broker.emqx.io'
port = 1883
topic = "/flask/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'


def connect_mqtt() -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

    client.subscribe(topic)
    client.on_message = on_message


def run():
    client = connect_mqtt()
    subscribe(client)
    client.loop_forever()


if __name__ == '__main__':
    run()

运行打印结果:

        可以看到,发布和订阅成功。注意:每个客户端的ID唯一,不能重复!

3、基于Apollo服务示例

3.1、安装Apollo服务

        服务器端搭建:

Index of /dist/activemq/activemq-apollo/1.7.1

解压并打开目标如下:

        Apollo中间件其实是免安装的,我们只需要下载apache-apollo-1.7.1-windows-distro.zip,然后解压到某个文件夹就可以了。在这里我解压到D:\dist\apache-apollo-1.7.1。在apache-apollo-1.7.1/bin目录下打开终端执行命令:apollo create myapollo

        若出现如下错误 Loading configuration file ‘D:\phpStudy\apache-apollo-1.7.1\bin\mybroker\etc\apollo.xml’.Startup failed: java.lang.NoClassDefFoundError: javax/xml/bind/ValidationEventHandler,处理方式:换上jdk1.8版本即可。

        然后在生成的myapollo目录的bin目录下执行:pollo-broker.cmd run,结果如下:

        服务搭建成功,进入后台管理,打开网页,输入ip + : 61680 进入后台管理 ,默认用户名admin 密码 password,例如:http://127.0.0.1:61680

3.2、示例代码

本地服务基础信息:

host="127.0.0.1" 
port = 61613 
用户名:admin 
密码:password

python库:pip install paho-mqtt=="1.6.1"

发布主题,publish.py

import sys
import time
import paho.mqtt.client as mqtt


def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))


def on_subscribe(client, userdata, mid, granted_qos):
    print("消息发送成功")


client = mqtt.Client(protocol=3)
client.username_pw_set("admin", "password")
client.on_connect = on_connect
client.on_subscribe = on_subscribe
client.connect(host="127.0.0.1", port=61613, keepalive=60)  # 订阅频道
time.sleep(1)

i = 0
while True:
    try:
        # 发布MQTT信息
        sensor_data = "test" + str(i)
        client.publish(topic="public", payload=sensor_data, qos=0)
        time.sleep(5)
        i += 1
    except KeyboardInterrupt:
        print("EXIT")
        client.disconnect()
        sys.exit(0)

订阅主题,subscribe.py

import time
import paho.mqtt.client as mqtt


# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("连接成功")
        print("Connected with result code " + str(rc))


def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))


client = mqtt.Client(protocol=3)
client.username_pw_set("admin", "password")
client.on_connect = on_connect
client.on_message = on_message
client.connect(host="127.0.0.1", port=61613, keepalive=60)  # 订阅频道
time.sleep(1)

# client.subscribe("public")
client.subscribe([("public", 0), ("test", 2)])  # 订阅
client.loop_forever()

执行脚本后,正确大小消息。浏览器可见连接记录。

4、EMQX客户端工具

下载地址:MQTTX 下载选择适合您的平台,立即开始使用 MQTTX。icon-default.png?t=N7T8https://mqttx.app/zh/downloads

4.1、公共服务测试消息接收

创建连接:

Host:为代码中定义好的 broker.emqx.io 
Port:为代码中定义好的 1883 
用户名、密码根据需要添加

添加订阅:

主题为:/flask/mqtt

在MQTTX中发布消息:

测试成功。

4.2、自建服务测试消息接收

MQTT版本选择3.1.1,以下参数,与服务保持一致:

host="127.0.0.1" 
port = 61613 
用户名:admin 
密码:password

其它测试步骤,一样。(创建主题,测试)

5、EMQX代理服务器

        windows下搭建MQTT代理服务,与Apollo服务功能一样,更方便好用,下载地址:https://www.emqx.io/zh/downloads

        下载后解压,进入目录bin文件下,执行:emqx start 

        打开浏览器输入 http://localhost:18083 进入EMQ的web控制台,输入用户名:admin 密码:public 进行登录。登录进入后,界面如下:

至此,代理服务器已经创建完成!客户端就可以连接代理服务器进行消息的分发和订阅!

附录:

java1.8.1下载,地址:Java Downloads | Oracleicon-default.png?t=N7T8https://www.oracle.com/java/technologies/downloads/#java8-windows

参考:

1、嵌入式QT- QT使用MQTT

嵌入式QT- QT使用MQTT_qt mqtt-CSDN博客

2、Python实现通信协议(mqtt)5星,mqtt flask

【小沐学Python】Python实现通信协议(mqtt)_python mqtt-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/608468.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

利用智能私信软件,快速拓展潜在客户群体

在数字化营销的浪潮中,企业如何快速而有效地触及并吸引潜在客户,已成为一个不可忽视的挑战。随着人工智能技术的不断进步,智能私信软件作为一种新型工具,正逐渐改变着企业的市场拓展方式。本文将探讨如何通过这类软件,…

复现NerfingMVS(更新中)

按以下代码一步步操作 conda create -n NerfingMVS python3.7 conda activate NerfingMVS conda install pytorch1.7.1 torchvision0.8.2 torchaudio0.7.2 -c pytorch pip install -r requirements.txthttps://colmap.github.io/install.html Linux 中 建议的依赖&#xff1…

AI边缘计算盒子优势有哪些?如何实现低延迟处理?

AI边缘计算盒子作为一种集成人工智能技术的边缘计算设备,其优势主要体现在以下几个方面,万物纵横为您详细介绍: 1. 低延迟处理 AI边缘计算盒子靠近数据产生源头,能够即时处理数据,大幅减少数据传输至云端的时间&#…

深入解析算法效率核心:时间与空间复杂度概览及优化策略

算法复杂度,即时间复杂度与空间复杂度,衡量算法运行时资源消耗。时间复杂度反映执行时间随数据规模增长的关系,空间复杂度表明额外内存需求。优化策略,如选择合适数据结构、算法改进、循环展开等,对于提升程序效率、减…

从抖音阳哥的经验看,选品师项目是否值得你投入?

在抖音这个短视频平台上,无数的创作者分享着他们的知识和经验,其中阳哥作为一个备受关注的博主,他的每一次分享都能引起广大网友的热烈讨论。最近,阳哥分享了一个名为“选品师”的项目,让许多对电商和抖音运营感兴趣的…

RK3576 Android平台SD启动

RK3576 Android平台SD启动 要求 1,Android14及以上版本;2,SD卡制作工具:瑞芯微创建升级磁盘工具V1.78及以上版本;3,SD卡容量8G-32G之间; 软件修改 1,Android部分 修改PRODUCT_BO…

Android 的 Timer 和 TimerTask

Timer 简介(来自Gemini) Timer 是 Java 中用于创建定时任务的类。它位于 java.util 包中。可以使用 Timer 来安排一次性或定期执行的任务。 每个 Timer 对象都对应一个后台线程。此线程负责从任务队列中检索任务并按计划执行它们。 使用 Timer 要使用 Timer,首先…

3D 渲染至少需要多少显存?显存真得越大越好吗?

无论是电影制作、游戏开发还是建筑效果图设计,在今天的计算机图形学领域,都需要强大的图形处理能力。而显存(VRAM)作为支持这一切的重要组成部分,对于高效3D渲染同样至关重要。在本文中,我们将一起探讨显存…

Cmake-learning

可以把cmake看成一款自动生成 Makefile的工具,所以编译流程就变成了:cmake—>make–>可执行文件 在哪个目录执行cmake txt位置 就会在哪个目录生成构造文件和可执行文件 project(HELLO): 非强制,指明项目名称 add_executable(he…

【算法】滑动窗口——水果成篮

本篇博客是我对“水果成篮”这道题由暴力解法到滑动窗口思路的具体思路,有需要借鉴即可。 目录 1.题目2.暴力求解3.暴力优化3.1每次right不用回退3.2有些left长度一定不如前一个,不用走,left不回退 4.滑动窗口算法5.总结 1.题目 题目链接&am…

怎么用AI软件设计字体

一 工具准备 软件:Adobe illustrator 如下网盘自取 链接:https://pan.baidu.com/s/1hlImpN4QlsSkOLLUxINOGA 提取码:love 安装的时候看不全界面,多按几下tab键就能看到按钮。 直接找一款喜欢的字体修改,字体包如下…

物联网网关制造生产全流程揭秘!

如果您正有开发和定制物联网网关的计划,找一个专业的物联网设备厂商协助您制造生产物联网网关可以节省大量时间和成本,可以让您能专注于当前核心业务,而无需将精力过多地投入到自己不擅长的领域。 当然,了解物联网网关的测试和制…

基于JSP动漫论坛的设计与实现(二)

目录 3. 系统开发环境及技术介绍 3.1 开发环境 3.2 开发工具 3.2.1 MyEclipse8.5 3.2.2 MySql 3.3 相关技术介绍 3.3.1 JSP技术简介 3.3.2 JDBC技术技术简介 3.3.3 MVC模式与Struts框架技术 4. 总体设计 4.1 系统模块总体设计 4.1.1 普通用户模块设计 4…

数据库的使用基础-SQL语句

一、在MYSQL中&#xff0c;创建数据库&#xff0c;语法如下&#xff1a; CREATE DATABASE [IF NOT EXISTS] <数据库名> [[DEFAULT] CHARACTER SET <字符集名>] [[DEFAULT] COLLATE <校对规则名>];[ ]中的内容是可选的。语法说明如下&#xff1a; <数据库…

LeetCode738:单调递增的数字

题目描述 当且仅当每个相邻位数上的数字 x 和 y 满足 x < y 时&#xff0c;我们称这个整数是单调递增的。 给定一个整数 n &#xff0c;返回 小于或等于 n 的最大数字&#xff0c;且数字呈 单调递增 。 332 代码 class Solution { public:int monotoneIncreasingDigits(…

Imagine Flash、StyleMamba 、FlexControl、Multi-Scene T2V、TexControl

本文首发于公众号&#xff1a;机器感知 Imagine Flash、StyleMamba 、FlexControl、Multi-Scene T2V、TexControl You Only Cache Once: Decoder-Decoder Architectures for Language Models We introduce a decoder-decoder architecture, YOCO, for large language models, …

QT---day4事件

1、思维导图 2、 头文件 #ifndef MYWIDGET_H #define MYWIDGET_H #include <QWidget> #include<QIcon> //图标类 #include<QLabel> //标签类 #include<QMovie> //动图类 #include<QLineEdit> //行编辑器类 #include<QPushButton> //按钮…

AJAX知识点(前后端交互技术)

原生AJAX AJAX全称为Asynchronous JavaScript And XML,就是异步的JS和XML&#xff0c;通过AJAX可以在浏览器中向服务器发送异步请求&#xff0c;最大的优势&#xff1a;无需刷新就可获取数据。 AJAX不是新的编程语言&#xff0c;而是一种将现有的标准组合在一起使用的新方式 …

【进程等待】阻塞等待 | options非阻塞等待

目录 waitpid 阻塞等待 options&非阻塞等待 pid_t返回值 阻塞等待VS非阻塞等待 waitpid 回顾上篇&#xff1a; pid_ t waitpid(pid_t pid, int *status, int options); 返回值&#xff1a; 当正常返回的时候waitpid返回收集到的子进程的进程ID&#xff1b;如果设置了…

C++容器之vector类

目录 1.vector的介绍及使用1.1vector的介绍1.2vector的使用1.2.1 vector的定义1.2.2 vector iterator 的使用1.2.3 vector 空间增长问题1.2.4 vector 增删查改1.2.5vector 迭代器失效问题1.2.6 vector 在OJ中的使用。 2.vector深度剖析及模拟实现2.1 std::vector的核心框架接口…