目录
- Flask + MQTT 实现消息订阅发布
- 准备
- 开始
- 1.创建Flask项目
- 2创建py文件:`mqtt_demo.py`
- 3.代码实现
- 4.项目运行
- 5.测试
- 5.1 测试消息接收
- 5.2 测试消息发布
- 6、扩展
Flask + MQTT 实现消息订阅发布
准备
- 本次项目主要使用到的库:
flask_mqtt
pip install flask_mqtt
开始
1.创建Flask项目
2创建py文件:mqtt_demo.py
- *也可以直接在项目中的 `app.py` 文件进行代码编写*
3.代码实现
from flask import Flask, request, jsonify
from flask_mqtt import Mqtt
app = Flask(__name__)
# 代理地址
app.config['MQTT_BROKER_URL'] = 'broker.emqx.io'
# 端口
app.config['MQTT_BROKER_PORT'] = 1883
# 当需要验证用户名和密码时,请设置该项
app.config['MQTT_USERNAME'] = 'user'
# 当需要验证用户名和密码时,请设置该项
app.config['MQTT_PASSWORD'] = '123456'
# 设置心跳时间,单位为秒
app.config['MQTT_KEEPALIVE'] = 60
# 如果服务器支持 TLS,则设置为 True
app.config['MQTT_TLS_ENABLED'] = False
# 主题
topic = '/flask/mqtt'
# 实例化
mqtt_client = Mqtt(app)
@app.route('/')
def index():
# 初始路由
return "Welcome mqtt_flask"
@mqtt_client.on_connect()
def handle_connect(client, userdata, flags, rc):
"""连接回调函数"""
if rc == 0:
print('Connected successfully')
# 订阅主题
mqtt_client.subscribe(topic)
else:
# 连接失败
print('Bad connection. Code:', rc)
@mqtt_client.on_message()
def handle_mqtt_message(client, userdata, message):
""" 消息回调函数 """
# 定义接受到的消息
data = dict(
# 主题
topic=message.topic,
# 内容
payload=message.payload.decode()
)
# 打印输出接收到的消息
print('Received message on topic: {topic} with payload: {payload}'.format(**data))
@app.route('/publish', methods=['POST'])
def publish_message():
""" 消息发布接口(实际应用中,该接口可能需要处理一些复杂业务逻辑) """
# 格式化数据
request_data = request.get_json()
# 发布消息
publish_result = mqtt_client.publish(request_data['topic'], request_data['msg'])
return jsonify({'code': publish_result[0]})
if __name__ == '__main__':
app.run()
4.项目运行
- 运行项目前可在Pycharm中设置
host
和port
- 设置好后直接运行项目
5.测试
使用
MQTTX
进行消息测试
5.1 测试消息接收
-
创建连接
Host
:为代码中定义好的broker.emqx.io
Port
:为代码中定义好的1883
- 用户名、密码根据需要添加
-
添加订阅
- 主题为:
/flask/mqtt
- 主题为:
-
在
MQTTX
中发布消息- 主题:
/flask/mqtt
- 主题:
-
Flask控制台中接收到的消息
5.2 测试消息发布
- 订阅使用消息接收的订阅
- 主题为:
/flask/mqtt
- 主题为:
- 使用Postman调用
/publish
接口,并发送消息到/flask/mqtt
主题中
MQTTX
中接收到的消息
6、扩展
本次
Flask
项目在Win10下运行,MQTTX
消息消息发送订阅在Ubuntu下进行
- 内网穿透
- 使用花生壳进行内网映射
- 内网地址及端口为:Flask项目所在主机IP,端口为Flask运行端口
- 使用协议:HTTPS
参考:
- 链接: https://www.emqx.com/zh/blog/how-to-use-mqtt-in-flask
- 链接: https://blog.csdn.net/emqx_broker/article/details/124816126