【EMQX】通过EMQX webhook实现转发消息到Python web服务器

EMQX webhook消息转发Web服务器

    • 一、前言
    • 二、实现
      • 1、EMQX服务器搭建
        • EMQX下载、安装、启动
      • 2、本地Web服务搭建
        • 创建Flask项目
        • 代码
      • 3、EMQX中创建webhook数据桥接
      • 4、EMQX中创建数据转发规则
    • 三、效果

一、前言

需求:获取设备通过mqtt协议发送过来的数据并将数据保存到外部服务中,期间需要使用EMQX代理服务器将消息转发到自有的Web服务中


实现:通过EMQX中的webhook实现消息转发

官方:https://www.emqx.io/docs/zh/v5.0/data-integration/data-bridge-webhook.html
在这里插入图片描述

二、实现

1、EMQX服务器搭建

EMQX下载、安装、启动
  • 到EMQX官网进行下载:https://www.emqx.io/zh/downloads?os=Windows
    EMQX下载安装启动
  • 安装运行完成后,可直接访问 EMQX Dashboard 管理控制台
    • http://localhost:18083/localhost根据实际IP地址修改)
      EMQX服务器控制台
  • 默认用户名及密码
    • admin / public
  • 停止EMQX服务
    • 打开cmd,进入到emqx所在文件夹中的bin目录
    • 输入一下指令
    ./emqx/bin/emqx stop
    

2、本地Web服务搭建

本次Web服务器使用Python Flask进行搭建

创建Flask项目

项目结构

  • 相关功能的代码实现直接在app.py中完成
代码

本次将 mqtt客户端接收EMQX转发的消息数据 都写在该Flask项目中

import json

from flask import request, jsonify, Flask, Blueprint, render_template, session, current_app
from flask_mqtt import Mqtt
from werkzeug.local import LocalProxy

app = Flask(__name__)

# 代理地址(根据实际使用的IP地址进行修改,需要和EMQX处于同一地址)
app.config['MQTT_BROKER_URL'] = '127.0.0.1'
# 端口
app.config['MQTT_BROKER_PORT'] = 1883
# 当需要验证用户名和密码时,设置该项(根据实际情况设定)
# app.config['MQTT_USERNAME'] = 'user'
# 当需要验证用户名和密码时,设置该项(根据实际情况设定)
# app.config['MQTT_PASSWORD'] = '123456'
# 设置心跳时间,单位为秒
app.config['MQTT_KEEPALIVE'] = 60
# 如果服务器支持 TLS,则设置为 True
app.config['MQTT_TLS_ENABLED'] = False
# 主题(根据实际情况设定)
topic = 't/1'
# 实例化
mqtt_client = Mqtt(app)


@app.route('/')
def index():
    # 初始路由
    return render_template('index.html')


@mqtt_client.on_connect()
def handle_connect(client, userdata, flags, rc):
    """连接回调函数"""
    if rc == 0:
        print('Connected successfully')
        # 订阅主题
        mqtt_client.subscribe(topic)
    else:
        # 连接失败
        print('Bad connection. Code:', rc)


@mqtt_client.on_message()
def handle_mqtt_message(client, userdata, message):
    """ 消息回调函数 """
    # 定义接受到的消息
    data = dict(
        # 主题
        topic=message.topic,
        # 内容
        payload=message.payload.decode()
    )
    print(data)
    # 打印输出接收到的消息
    print('Received message on topic: {topic} with payload: {payload}'.format(**data))


@app.route('/publish', methods=['POST'])
def publish_message():
    """
    消息发布接口(实际应用中,该接口可能需要处理一些复杂业务逻辑)
    """
    # 获取前端页面提交的数据,并格式化
    request_data = request.get_json()
    # print("接收到的数据", request_data)
    # 发布消息
    publish_result = mqtt_client.publish(request_data['topic'], request_data['payload'])
    # 返回JSON数据
    return jsonify({'code': publish_result[0]})


@app.route('/emqx', methods=['POST'])
def test_emqx_conn():
    """
    测试 搭建简易EMQX HTTP服务(用于接收EMQX转发过来的消息)
    在后面的 webhook数据桥接 创建中,URL填写为:http://127.0.0.1:5000/emqx
    """
    # 响应
    reply = {"result": "ok", "message": "success"}
    print("got post request: ", request.get_data())
    return json.dumps(reply), 200


if __name__ == '__main__':
    app.debug = True
    app.run()

3、EMQX中创建webhook数据桥接

  • EMQX控制台中找到webhook,点击创建
    在这里插入图片描述
  • 输入数据桥接名称(要求是大小写英文字母和数字的组合)
  • 请求方法选择 POST,
  • URL 为 http://127.0.0.1:5000/emqx(根据实际使用填写)
  • 其他使用默认值
  • 点击最下方保存按钮完成规则创建。
    在这里插入图片描述

4、EMQX中创建数据转发规则

  • 创建好webhook后,会自动根据创建的webhook桥接生成一个规则
  • 直接点击生成的规则中的 设置
    在这里插入图片描述
  • SQL编辑器根据个人实际业务进行修改,修改完成后直接点击更新
    在这里插入图片描述

三、效果

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/248268.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

cgal教程 3D Alpha Wrapping

文章目录 3D Alpha Wrapping (3D alpha 包裹)1 介绍2 方法2.1 算法2.2 保证 3 接口4 选择参数4.1 alpha4.2 Offset4.3 关于“双面”包裹的注意事项 5 性能6 例子 3D Alpha Wrapping (3D alpha 包裹) 原文地址: https://doc.cgal.org/latest/Alpha_wrap_3/index.html#Chapter_3D…

数智赋能进行时 百望云荣获第四届长三角“金融科技领军企业”奖

近日,由上海金融业联合会、上海市银行同业公会、华东师范大学指导,《金融电子化》杂志社有限责任公司、华东师范大学长三角金融科技研究院等单位联合主办,上海市互联网金融行业协会等单位协办的“2023长三角金融科技节——长三角经济圈金融科…

微服务实战系列之ZooKeeper(上)

前言 历经1个多月的创作和总结,纵观博主微服务系列博文,大致脉络覆盖了以下几个方面: 数据方面(缓存&安全) 比如Redis、MemCache、Ehcache、J2cache(两级缓存框架)、RSA加密、Sign签名…传…

力扣22. 括号生成(java 回溯法)

Problem: 22. 括号生成 文章目录 题目描述思路解题方法复杂度Code 题目描述 思路 我们首先要知道,若想生成正确的括号我们需要让右括号去适配左括号,在此基础上我们利用回溯去解决此题目 1.题目给定n个括号,即当回溯决策路径长度等于 2 n 2n…

学习笔记9——JUC三种量级的锁机制

学习笔记系列开头惯例发布一些寻亲消息 链接:https://baobeihuijia.com/bbhj/contents/3/197325.html 多线程访问共享资源冲突 临界区:一段代码块存在对共享资源的多线程读写操作,称这段代码块为临界区 竞态条件:多个线程在临界…

Anaconda文件目录(打开默认路径)更改

Anaconda 文件默认目录更改 每次打开 Anaconda 都在C盘怎么办,如何改为D盘或是其他盘符位置? 可以进行下述操作。 1. 单次修改路径 单次修改路径:在 exe 文件(Anaconda Prompt (Anaconda_py))中写入下面代码: jupyter notebook …

微信小程序ec-canvas(echarts)显示地图【以甘肃省为例】

文章目录 一、效果图二、实现1、下载echarts插件2、定制图形,生成 echarts.min.js 文件3、小程序中使用(1)下载甘肃地图(2)使用 参考文档《微信小程序使用echarts显示全国地图》《如何在微信小程序开发中使用echarts以…

详解Keras3.0 KerasCV API: StableDiffusion image-generation model

Stable Diffusion 图像生成模型,可用于根据简短的文本描述(称为“提示”)生成图片 keras_cv.models.StableDiffusion(img_height512, img_width512, jit_compileTrue) 参数说明 img_height:int,要生成的图像的高度…

安路IP核应用举例(OSC、UART)

1.OSC(内部振荡器) 按照Project->New Project顺序新建工程后,后按照Tools->IP Generator顺序,创建IP核,如下图: 安路FPGA的内置OSC振荡模块频率可选30MHz、60MHz。 可选Verilog或VHDL语言。 如图,生成的.v文件只…

美国 AGU 发布 AI 应用手册,明确 6 大指导方针

爆发性的 AI 应用:风险与机遇并存 在空间和环境科学领域,AI 工具的应用越来越广泛——诸如天气预报和气候模拟,能源及水资源管理等等。可以说,我们正在经历前所未有的 AI 应用爆发,面对其中的机遇与风险,更…

DTC 故障严重程度

文章目录 简介DTC严重性 位定义DTC 类别定义参考 简介 DTCSeverityMask(DTC严重性掩码)/ DTCSeverity(DTC严重性)包含了DTC严重性和DTC类别信息。 DTCSeverityMask(DTC严重性掩码)/DTCSeverit…

找不到mfc100u.dll,程序无法继续执行?三步即可搞定

在使用电脑过程中,我们经常会遇到一些错误提示,其中之一就是“找不到mfc100u.dll”。mfc100u.dll是Microsoft Foundation Class(MFC)库中的一个版本特定的DLL文件。MFC是微软公司为简化Windows应用程序开发而提供的一套C类库。它包…

接口测试工具Postman接口测试图文教程

一、前言 在前后端分离开发时,后端工作人员完成系统接口开发后,需要与前端人员对接,测试调试接口,验证接口的正确性可用性。而这要求前端开发进度和后端进度保持基本一致,任何一方的进度跟不上,都无法及时…

牛客网BC107矩阵转置

答案&#xff1a; #include <stdio.h> int main() {int n0, m0,i0,j0,a0,b0;int arr1[10][10]{0},arr2[10][10]{0}; //第一个数组用来储存原矩阵&#xff0c;第二个数组用来储存转置矩阵scanf("%d%d",&n,&m); if((n>1&&n<10)&&am…

【最新版】PyCharm基础调试功能详解

文章目录 一、断点1. 断点的类型a. 行断点b. 异常断点 2. 设置断点a. 设置行断点b. 设置异常断点 3. 管理断点a. 删除断点b. 将断点静音 二、调试功能0. 测试代码1. 设置断点2. 调试的多种启动方式3. 观察调试控制台a. 步过b. 步入c. 单步执行代码d. 步出e. 运行到光标处f. 重新…

vivado约束方法6

生成的时钟 定时约束向导建议在的输出上创建一个生成的时钟顺序单元&#xff0c;当它直接或通过驱动其他顺序单元的时钟引脚时一些互连逻辑。与PLL或MMCM不同&#xff0c;用户逻辑不能将主时钟&#xff0c;因此向导仅提供指定除法系数的选项&#xff0c;如中所示如下图所示&am…

protobuf基础学习

部分内容出自&#xff1a;https://blog.csdn.net/baidu_32237719/article/details/99723353 proto文件来预先定义的消息格式。数据包是按照proto文件所定义的消息格式完成二进制码流的编码和解码。proto文件&#xff0c;简单地说&#xff0c;就是一个消息的协议文件&#xff0c…

Cloudflare始终使用HTTPS且带参数跳转到www的域名

文章目录 设置教程设置图跳转实测 设置教程 关闭 SSL/TLS -> 边缘证书 的 Always Use HTTPS 规则 -> 页面规则 -> URL: http://www.example.com/* 设置成始终使用HTTPS 规则 -> 页面规则 -> URL: example.com/* 设置成 转发URL301重定向到 to https://www.ex…

sql 数据类型注入+tamper

数字型 0-9 查询语句&#xff1a; $sql"select * from sy_guestbook where id$i"; 字符型 a-z 中文 标点符号 加入了单引号 查询语句&#xff1a; $sql"select * from sy_guestbook where gTpl$g"; simple order by 16--select * from sy_guestbook w…

【Spring】Spring AOP

Spring AOP AOP概述什么是AOP Spring AOP快速入门1.引入AOP依赖2. 编写AOP程序 Spring AOP 详解Spring AOP 核心概念切点(Pointcut)连接点(Join Point)通知(Advice)切面(Aspect) 通知类型PointCut切面优先级Order切点表达式execution表达式annotation自定义注解切面类 AOP原理代…