使用 EMQX 开源版的 Webhook 机制处理消息并存储数据

1、前言

EMQX 是一款强大的开源 MQTT 消息代理,它支持大量的连接和高吞吐量,适用于各种物联网应用。Webhook 是 EMQX 提供的扩展功能之一,用于将消息推送到外部的 HTTP 服务。在本文中,我们将介绍如何使用 EMQX 开源版的 Webhook 机制,并展示如何处理收到的 Webhook 请求,将其中的数据存储到数据库中。

2、Webhook 简介

Webhook 是一种常见的 HTTP 回调机制,用于将事件或数据推送到外部服务器。当 MQTT 客户端发布消息时,EMQX 可以通过 Webhook 将该消息发送给指定的 HTTP 端点,方便我们在接收到消息后进一步处理数据。

3、搭建 Webhook 服务

接下来,我们编写一个简单的 SpringBoot 2.7服务,用于接收 EMQX 的 Webhook 请求并将其中的数据存储到数据库中。

3.1、项目依赖

pom.xml 中添加以下依赖:

    <dependencies>
       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.13.5</version>
        </dependency>

        <!-- Jackson Databind -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.13.5}</version>
        </dependency>

        <!-- Jackson Annotations -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.13.5</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.33</version>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.5.6</version>
        </dependency>
    </dependencies>  

3.2、实现 Webhook 控制器

3.2.1、Controller
package ....这里填写你自己的

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;

import java.io.IOException;
import java.util.Map;

@RestController
@RequestMapping("/emqx/test")
@AllArgsConstructor
@Slf4j
public class WebhookController {

    private final EmqxTestService emqxTestService;
    private final ObjectMapper objectMapper = new ObjectMapper();


    @PostMapping("/webhook")
    public String webhook(@RequestBody String payload) {
        try {
            // 解析主 JSON 字符串为 Map
            Map<String, Object> payloadMap = objectMapper.readValue(payload, new TypeReference<Map<String, Object>>() {});

            // 从主 Map 中提取 clientid 和 topic
            String clientId = (String) payloadMap.get("clientid");
            String topic = (String) payloadMap.get("topic");

            log.info("Received clientid: {}", clientId);
            log.info("Received topic: {}", topic);

            // 提取 payload 字段的 JSON 字符串
            String payloadString = (String) payloadMap.get("payload");

            // 解析 payload 字段的 JSON 字符串为 Map
            Map<String, Object> payloadDataMap = objectMapper.readValue(payloadString, new TypeReference<Map<String, Object>>() {});

            // 从 payload 数据中提取 msg 参数
            String msg = (String) payloadDataMap.get("msg");

            log.info("Received msg: {}", msg);

            // 创建 EmqxTest 实例并设置字段
            EmqxTest testData = new EmqxTest();
            testData.setData(payload);
            testData.setClientId(clientId);
            testData.setTopic(topic);

            // 保存数据
            emqxTestService.insertData(testData);

        } catch (IOException e) {
            log.error("解析JSON有效负载失败", e);
            return "Error parsing payload";
        }

        return "Received";
    }
}
 3.2.2、Service
package ....这里填写你自己的

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ldb.tool.entity.EmqxTest;
import com.ldb.tool.mapper.EmqxTestMapper;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.List;

@Service
@AllArgsConstructor
@Slf4j
public class EmqxTestService {

    private final EmqxTestMapper emqxTestMapper;

    public EmqxTest insertData(EmqxTest testData) {
        EmqxTest emqxTest = new EmqxTest();
        // 你可以手动设置其他需要的字段,如 clientId, topic, data 等
        emqxTest.setClientId(testData.getClientId());
        emqxTest.setTopic(testData.getTopic());
        emqxTest.setData(testData.getData());
        emqxTest.setCreateTime(new Date()); // 如果你有自动填充策略,可以忽略这行

        this.emqxTestMapper.insert(emqxTest);

        return emqxTest;
    }
}
3.2.3、Mapper
package ...这里填写你自己的;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.ldb.tool.entity.EmqxTest;

public interface EmqxTestMapper extends BaseMapper<EmqxTest> {
}
 3.2.4、Entity
package ...这里填写你自己的;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.io.Serializable;
import java.util.Date;

@TableName("emqx_test")
@Data
public class EmqxTest implements Serializable {
    private static final long serialVersionUID = 1L;

    @TableId(value = "id", type = IdType.ASSIGN_ID)
    private Long id;
    private String clientId;
    private String topic;
    private String data;
    private Date createTime;
    private Date updateTime;
}

4、配置 EMQX Webhook

4.1、运行

我们这里使用docker来运行EMQX。

通过 Docker 运行 EMQX | EMQX文档

4.1.1、获取镜像
docker pull emqx/emqx:5.8.0
4.1.2、启动容器

docker run -d --name emqx \
  -p 1883:1883 -p 8083:8083 \
  -p 8084:8084 -p 8883:8883 \
  -p 18083:18083 \
  -v $PWD/data:/opt/emqx/data \
  -v $PWD/log:/opt/emqx/log \
  emqx/emqx:5.8.0

4.2、配置EMQX-Webhook

4.2.1、创建Webhook

访问EMQX可视化后台(http://localhost:18083/)=>集成=>Webhook=>创建Webhook

在填写设置的时候,需要注意的是我们本地docke访问宿主机,在容器内部URL:127.0.0.1,指向的是容器本身,你可以获取宿主机IP作为URL,比如192.168.30.44。

我们通过URL选项的测试按钮可以点击测试是否正常请求。

5、测试 Webhook

在保证我们的Java-Webhook、EMQX服务运行的情况下,我们可以通过MQTTX(简介 - MQTTX 文档)软件去模拟一台直连的MQTT设备发起一个主题,因为我们在创建Webhook的时候触发者是消息发布。

5.1、MQTTX发送主题

首先我们需要新建一个MQTT连接,配置如下所以,未设置认证的话不需要用户名密码。

右下角,我们填写主题(Topic)的消息路由为listen/me,消息内容为{"msg": "send messgae","status":1},点击小飞机按钮发送。

5.2、查看Webhook触发情况

在EMQX后台,集成=>Webhook,查看送达情况。

在查看我们的Java服务的日志打印,也收到了。

查看sql表,也已经正常保存。

6、结论

Webhook 是一种强大的机制,MQTT 消息发布事件触发后,通过 HTTP 推送到 Spring Boot 服务,对接收到的数据进行解析和存储。这种机制能够让我们轻松地将消息从 EMQX 转发到其他服务,从而实现复杂的业务逻辑处理。


7、参考资料

  • EMQX官方文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/872852.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

整型数组按个位值排序

题目描述 给定一个非空数组(列表)&#xff0c;其元素数据类型为整型&#xff0c;请按照数组元素十进制最低位从小到大进行排序&#xff0c;十进制最低位相同的元司 相对位置保持不变。 当数组元素为负值时&#xff0c;十进制最低位等同于去除符号位后对应十进制值最低位。 输…

QT核心内容(9.6)

1> 手写unique_ptr智能指针 代码&#xff1a; #include <iostream> #include <cassert>using namespace std; template<typename T> class my_unique_ptr { private:T* ptr;// 禁止拷贝构造函数和拷贝赋值操作符my_unique_ptr(const my_unique_ptr&a…

手机扬声器音量总是不够大?试试“扬声器助推器”吧

手机的扬声器音量总是不够大&#xff0c;尤其是在嘈杂的环境中&#xff0c;音乐和视频的声音总是不太清晰。直到我发现了这款“扬声器助推器”&#xff0c;我的手机音质瞬间提升了好几个档次。 软件简介&#xff1a; “扬声器助推器”利用先进的音频处理技术&#xff0c;能够…

vivado error:Combinatorial Loop Alert:1 LUT cells form a combinatorial loop

VIVADO ERROR :Combinatorial Loop Alert&#xff1a;1 LUT cells form a combinatorial loop vivao生成bit流时发生报错&#xff0c;如下图所示定位原因解决 vivao生成bit流时发生报错&#xff0c;如下图所示 定位原因 在三段式状态机中&#xff0c;组合逻辑代码if else 语句…

STM32:TIM定时中断配置的最全库函数讲解笔记

声明&#xff1a;本博客为哔哩哔哩up主江协科技 “STM32入门教程”的听课笔记&#xff0c;仅供学习、参考使用&#xff0c;不得用作其他用途&#xff0c;违者必究。如有版权问题&#xff0c;请联系作者修改。 目录 一、综述 二、TIM库 初始化 2.1、TIM_DeInit 恢复缺省值 …

IPv6 Sec机制的深度解析与优势探讨

IPv6的sec机制&#xff0c;主要指的是IPv6协议中内置的安全机制&#xff0c;特别是通过IP Sec协议集来实现的。IPv6在设计之初就考虑到了安全性问题&#xff0c;并内置了对IP Sec的支持&#xff0c;这使得IPv6网络在安全性能上相比IPv4有了显著的提升。 IP Sec协议集主要由认证…

Android Studio打开Modem模块出现:The project ‘***‘ is not a Gradle-based project

花了挺长时间处理该问题&#xff0c;特记录如下&#xff1a;1.背景&#xff1a; 在Android studio 下导入一个新增的modem模块&#xff0c;如MPSS.DE.3.1.1\modem_proc\AAA, 目的是看代码方便一些&#xff0c;可以自由搜索各种关键字。但导入该项目时出现了如下错误&#xff1a…

好用的AI编程助手[豆包]

欢迎来到 Marscode 的世界&#xff01;这里将为你揭秘 Marscode&#xff0c;它的独特之处、应用领域等相关精彩内容等你来探索。 一、打开VS Code 二、选择 Extensions,搜索marscode 三、点击安装 四、点击使用 五、输入需要编写的代码 六、根据自己的需求修改代码 MarsCode 注…

RabbitMQ 应用

文章目录 前言1. Simple 简单模式2. Work Queue 工作队列模式3. Pubulish/Subscribe 发布/订阅模式Exchange 的类型 4. Routing 路由模式5. Topics 通配符模式6. RPC RPC通信7. Publisher Confirms 发布确认1. 单独确认2. 批量确认3. 异步确认 前言 前面我们学习了 RabbitMQ 的…

学习笔记--MybatisPlus

官网&#xff1a;MyBatis-Plus &#x1f680; 为简化开发而生 快速入门 入门案例 引入MybatisPlus的起步依赖 定义Mapper 问题&#xff1a; MybatisPlus中Invalid bound statement (not found): com.itheima.mp.mapper.UserMapper.insert 一定要指定实体类&#xff01;&am…

GDB watch starti i files

watch break starti 在程序的最初开始运行的位置处断下来 ​​ i files 查看程序及加载的 so 的 sections ​​

遍历有向网格链路实现

在实际的业务中&#xff0c;我们可能遇到复杂规则&#xff08;多个或与条件组合&#xff09;&#xff0c;复杂链路等类似场景问题&#xff0c;如&#xff1a;规则引擎相关业务&#xff0c;生产任务排期等。 复杂链路示意图如下&#xff1a; 复杂网路链路场景描述 有一个或多…

机器学习如何用于音频分析?

机器学习如何用于音频分析&#xff1f; 一、说明 近十年来&#xff0c;机器学习越来越受欢迎。事实上&#xff0c;它被用于医疗保健、农业和制造业等众多行业。随着技术和计算能力的进步&#xff0c;机器学习有很多潜在的应用正在被创造出来。由于数据以多种格式大量可用&…

EasyExcel实现复杂Excel的导入

最近项目中遇到一个复杂的Excel的导入&#xff0c;并且数据量较大。因为数据不规则&#xff0c;所以只能使用POI进行自定义读取&#xff0c;但是发现数据量大之后&#xff0c;读取数据非常耗时。后面换成EasyExcel&#xff0c;性能起飞。 1. Excel样板 如上图&#xff0c;需要…

USB - 笔记

1.USB接口区分 2 充电宝 图中提到的各种充电协议都是用于快速充电技术的标准,适用于不同品

Chrome 浏览器插件获取网页 window 对象(方案三)

前言 最近有个需求&#xff0c;是在浏览器插件中获取 window 对象下的某个数据&#xff0c;当时觉得很简单&#xff0c;和 document 一样&#xff0c;直接通过嵌入 content_scripts 直接获取&#xff0c;然后使用 sendMessage 发送数据到插件就行了&#xff0c;结果发现不是这…

JAMA network open|自动化定量评估胃肠道肿瘤中三级淋巴结构的机器学习模型|文献精析·24-09-07

小罗碎碎念 这篇文章报道了一种基于机器学习模型的自动化方法&#xff0c;用于在常规组织病理学图像中检测和分类胃肠道癌症中的三级淋巴结构&#xff0c;并验证了其与患者生存预后的关联。 在这项多中心诊断/预后研究中&#xff0c;开发了一种基于机器学习的计算工具&#xff…

【pyhton】python如何实现将word等文档中的文字转换成语音

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

数据结构基本知识

一、什么是数据结构 1.1、组织存储数据 ---------》内存&#xff08;存储&#xff09; 1.2、研究目的 如何存储数据&#xff08;变量&#xff0c;数组....)程序数据结构算法 1.3、常见保存数据的方法 数组&#xff1a;保存自己的数据指针&#xff1a;是间接访问已经存在的…

移远通信高端5G智能模组SG560D-NA率先通过PTCRB认证

近日&#xff0c;移远通信宣布&#xff0c;其基于高通QCM6490平台打造的高端5G智能模组SG560D-NA顺利通过PTCRB认证。 在此之前&#xff0c;该模组还获得了美国FCC和加拿大IC认证&#xff0c;这意味着&#xff0c;其已完全满足北美地区的相关标准和规定&#xff0c;能够支持相关…