1、MQTT协议
1.1、MQTT介绍
MQTT(Message Queuing Telemetry Transport)是一种轻量级的、基于发布/订阅模式的通信协议,通常用于物联网设备之间的通讯。它具有低带宽、低功耗和开放性等特点,适合在网络带宽有限或者网络连接不稳定的环境下使用。MQTT协议使用TCP/IP协议栈进行通讯,支持多种编程语言和平台,并且能够提供可靠的消息传递机制。
在MQTT中,设备可以发布消息到特定的主题(topic),同时其他设备可以订阅这些主题以接收相应的消息。这种发布/订阅模式使得设备之间的通讯更加灵活和高效。MQTT还支持三种服务质量等级:至多一次(at most once)、至少一次(at least once)和恰好一次(exactly once),以满足不同场景下对消息传递可靠性的需求。
优点:代码量少,开销低,带宽占用小,即时通讯协议。
1.2、MQTT概念
订阅(Subscribtion):订阅包含主题筛选器( Topic Filter )和最大服务质量( QoS )。订阅会与一个会话( Session )关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。
会话(Session):每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。
主题名(Topic Name):连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。需要注意的是, MQTT 中消息主题按照层级命名,使用 ‘/’ 进行分割。此外,主题中可以使用通配符进行多个主题或多层级的订阅,有两种常见的通配符:
单层通配符 + :单层通配符只能匹配一层的主题,例如: China/Beijing/+ ,可以匹配的只有 Beijing 这个主 题下面一层的主题,例如 Xicheng, DongCheng, Xuanwu 等等。
多层通配符 # :顾名思义,多层通配符就是可以匹配多个层级的主题,例如: China/# ,可以匹配到的主题可能有:China/Beijing/Dongcheng, China/Shanghai/PuDong ,等等。
主题筛选器(Topic Filter):一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。
消息订阅:消息订阅者所具体接收的内容。
1.3、MQTT中的角色
Publisher 和 Subscriber 为客户端,Broker 为服务器端,消息主题为消息类型,Broker 根据 Topic 过滤消息,并将消息向客户端推送。
MQTT 中用 QoS 表示服务质量, MQTT 协议中有三种服务质量 (QoS) :
1 ) QoS =0 ,至多一次,可能会出现丢包的情况,使用在对实时性要求不高的情况,例如,将此服务质量与通信环境传感器数据一起使用。 对于是否丢失个别读取或是否稍后立即发布新的读取并不重要。
2 ) QoS =1, 至少一次,保证包会到达目的地,但是可能出现重包。
3 ) QoS =2, 刚好一次,保证包会到达目的地,且不会出现重包的现象。
客户端:
Publisher 和 Subscriber 都属于客户端。
发布应用消息给其它相关的客户端。
订阅以请求接受相关的应用消息。
取消订阅以移除接受应用消息的请求。
从服务端断开连接
服务器:
服务器端即所谓的 MQTT Broker 服务器。
接受来自客户端的网络连接。
接受客户端发布的应用消息。
处理客户端的订阅和取消订阅请求。
转发应用消息给符合条件的已订阅客户端。
MQTT 提供的公共服务器端( Broker )有:
test.mosquitto.org
broker.hivemq.com
iot.eclipse.org
2、基于公共服务示例
在次,选择使用EMQX提供的免费MQTT公共服务器,但同样可以选择其他任何MQTT broker。The Free Global Public MQTT Broker | Try Now | EMQ (emqx.com)
Broker: broker.emqx.io
TCP Port: 1883
Websocket Port: 8083
python库:pip install paho-mqtt=="1.6.1"
消息发布代码,pub.py
import time
import random
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "/flask/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
运行打印结果:
消息订阅代码,sub.py:
import random
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "/flask/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()
if __name__ == '__main__':
run()
运行打印结果:
可以看到,发布和订阅成功。注意:每个客户端的ID唯一,不能重复!
3、基于Apollo服务示例
3.1、安装Apollo服务
服务器端搭建:
Index of /dist/activemq/activemq-apollo/1.7.1
解压并打开目标如下:
Apollo中间件其实是免安装的,我们只需要下载apache-apollo-1.7.1-windows-distro.zip,然后解压到某个文件夹就可以了。在这里我解压到D:\dist\apache-apollo-1.7.1。在apache-apollo-1.7.1/bin目录下打开终端执行命令:apollo create myapollo
若出现如下错误 Loading configuration file ‘D:\phpStudy\apache-apollo-1.7.1\bin\mybroker\etc\apollo.xml’.Startup failed: java.lang.NoClassDefFoundError: javax/xml/bind/ValidationEventHandler,处理方式:换上jdk1.8版本即可。
然后在生成的myapollo目录的bin目录下执行:pollo-broker.cmd run,结果如下:
服务搭建成功,进入后台管理,打开网页,输入ip + : 61680 进入后台管理 ,默认用户名admin 密码 password,例如:http://127.0.0.1:61680
3.2、示例代码
本地服务基础信息:
host="127.0.0.1"
port = 61613
用户名:admin
密码:password
python库:pip install paho-mqtt=="1.6.1"
发布主题,publish.py
import sys
import time
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
def on_subscribe(client, userdata, mid, granted_qos):
print("消息发送成功")
client = mqtt.Client(protocol=3)
client.username_pw_set("admin", "password")
client.on_connect = on_connect
client.on_subscribe = on_subscribe
client.connect(host="127.0.0.1", port=61613, keepalive=60) # 订阅频道
time.sleep(1)
i = 0
while True:
try:
# 发布MQTT信息
sensor_data = "test" + str(i)
client.publish(topic="public", payload=sensor_data, qos=0)
time.sleep(5)
i += 1
except KeyboardInterrupt:
print("EXIT")
client.disconnect()
sys.exit(0)
订阅主题,subscribe.py
import time
import paho.mqtt.client as mqtt
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("连接成功")
print("Connected with result code " + str(rc))
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
client = mqtt.Client(protocol=3)
client.username_pw_set("admin", "password")
client.on_connect = on_connect
client.on_message = on_message
client.connect(host="127.0.0.1", port=61613, keepalive=60) # 订阅频道
time.sleep(1)
# client.subscribe("public")
client.subscribe([("public", 0), ("test", 2)]) # 订阅
client.loop_forever()
执行脚本后,正确大小消息。浏览器可见连接记录。
4、EMQX客户端工具
下载地址:MQTTX 下载选择适合您的平台,立即开始使用 MQTTX。https://mqttx.app/zh/downloads。
4.1、公共服务测试消息接收
创建连接:
Host:为代码中定义好的 broker.emqx.io
Port:为代码中定义好的 1883
用户名、密码根据需要添加
添加订阅:
主题为:/flask/mqtt
在MQTTX中发布消息:
测试成功。
4.2、自建服务测试消息接收
MQTT版本选择3.1.1,以下参数,与服务保持一致:
host="127.0.0.1"
port = 61613
用户名:admin
密码:password
其它测试步骤,一样。(创建主题,测试)
5、EMQX代理服务器
windows下搭建MQTT代理服务,与Apollo服务功能一样,更方便好用,下载地址:https://www.emqx.io/zh/downloads
下载后解压,进入目录bin文件下,执行:emqx start
打开浏览器输入 http://localhost:18083 进入EMQ的web控制台,输入用户名:admin 密码:public 进行登录。登录进入后,界面如下:
至此,代理服务器已经创建完成!客户端就可以连接代理服务器进行消息的分发和订阅!
附录:
java1.8.1下载,地址:Java Downloads | Oraclehttps://www.oracle.com/java/technologies/downloads/#java8-windows
参考:
1、嵌入式QT- QT使用MQTT
嵌入式QT- QT使用MQTT_qt mqtt-CSDN博客
2、Python实现通信协议(mqtt)5星,mqtt flask
【小沐学Python】Python实现通信协议(mqtt)_python mqtt-CSDN博客