消息通讯-MQTT WebHookSpringBoot案例

一、EMQX WebHook介绍

1、EMQX WebHook 是由 emqx_web_hook (opens new window)插件提供的将EMQX中的钩子事件通知到某个Web服务的功能。
2、WebHook 的内部实现是基于钩子,借助 Webhook 可以完成设备在线、上下线记录,订阅与消息存储、消息送达确认等诸多业务。

3、它通过在钩子上的挂载回调函数,获取到 EMQX 中的各种事件,并转发至 emqx_web_hook 中配置的 Web 服务器。

以 客户端成功接入(client.connected) 事件为例,其事件的传递流程如下:

    Client      |    EMQX     |  emqx_web_hook |   HTTP       +------------+
  =============>| - - - - - - -> - - - - - - - ->===========>  | Web Server |
                |    Broker    |                |  Request     +------------+

注意:WebHook 对于事件的处理是单向的,它仅支持将 EMQX 中的事件推送给 Web 服务,并不关心 Web 服务的返回。

二、EMQX WebHook配置项

Webhook 的配置文件位于 etc/plugins/emqx_web_hook.conf,配置项的详细说明可以查看官方文档配置

在默认etc/plugins/emqx_web_hook.conf中,官方为我们提供了如下的配置触发规则:

##====================================================================
## WebHook
##====================================================================

web.hook.api.url = http://127.0.0.1:80

## Format:
##   web.hook.rule.<HookName>.<No> = <Spec>
#web.hook.rule.client.connect.1       = {"action": "on_client_connect"}
#web.hook.rule.client.connack.1       = {"action": "on_client_connack"}
#web.hook.rule.client.connected.1     = {"action": "on_client_connected"}
#web.hook.rule.client.disconnected.1  = {"action": "on_client_disconnected"}
#web.hook.rule.client.subscribe.1     = {"action": "on_client_subscribe"}
#web.hook.rule.client.unsubscribe.1   = {"action": "on_client_unsubscribe"}
#web.hook.rule.session.subscribed.1   = {"action": "on_session_subscribed"}
#web.hook.rule.session.unsubscribed.1 = {"action": "on_session_unsubscribed"}
#web.hook.rule.session.terminated.1   = {"action": "on_session_terminated"}
#web.hook.rule.message.publish.1      = {"action": "on_message_publish"}
#web.hook.rule.message.delivered.1    = {"action": "on_message_delivered"}
#web.hook.rule.message.acked.1        = {"action": "on_message_acked"}

其中webhook支持的事件如下:

三、如何使用EMQX WebHook配置

手动修改Webhook的配置文件位于:etc/plugins/emqx_web_hook.conf.
修改配置非常简单,打开指定文件后,官方已经写好这些了,如上面所示,
只需要修改web.hook.url,和在你需要的钩子事件前的#号去掉即可。

emqx 启用插件 webhook插件(emqx有web控制台,在插件里启用就可以)

刷新后emqx_web_hook插件运行中即启动成功

然后就是自行搭建一个web接口服务,当客户端有指定事件后会主动推送,
接口地址即为Webhook的配置文件中配置的web.hook.api.url

WebHook配置事件推送参数详解

1、client.connect(处理连接报文)

 

2、client.connack(下发连接应答)

3、client.connected(成功接入)

4、client.disconnected(连接断开)

5、client.subscribe(订阅主题)

6、client.unsubscribe(取消订阅)

session.subscribed(会话订阅主题)
同 client.subscribe,action 为 session_subscribed

session.unsubscribed(会话取消订阅)
同 client.unsubscribe,action 为 session_unsubscribe

1、message.publish(消息发布)
2、message.delivered(消息投递)

3、message.acked(消息回执)

四、SpringBoot集成Webhook实现客户端断连监控

通过使用Emqx的Webhook插件实现监控所有客户端的连接状态,并做出客户端上线和下线后的逻辑处理

1. 实现前提

修改 etc/plugins/emqx_web_hook.conf 文件,设置事件转发的url和地址和触发规则
 

# 事件需要转发的目的服务器地址
web.hook.api.url = http://127.0.0.1:8000/api/mqtt/webhook

# 打开下面两个触发规则
web.hook.rule.client.connected.1     = {"action": "on_client_connected"}
web.hook.rule.client.disconnected.1  = {"action": "on_client_disconnected"}
2. 代码实现接口

通过EMQX 的webhook将客户端的连接断开等事件通知到我们自建的服务上,通过事件类型获取客户端的连接状态,然后将客户端的连接状态进行存储,并且提供HTTP API供后台系统查询所有客户端的状态。

package com.team.modules.mqtt.controller;

import com.google.gson.Gson;
import com.team.modules.mqtt.client.MyMQTTClient;
import com.team.modules.mqtt.service.SenderService;
import com.team.modules.system.domain.vo.ResultVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;

import java.util.HashMap;
import java.util.Map;

@RestController
@Slf4j
@RequestMapping("/api/mqtt")
public class SenderController {

    @Autowired
    private MyMQTTClient myMQTTClient;

    @PostMapping("/webhook")
    public void webhook(@RequestBody() Map<String, Object> message) {
        // 监听连接的上线和下线
        System.out.println("收到的消息:" + message);

        String action = (String) message.get("action");
        String clientid = (String) message.get("clientid");
        if (action.equals("client_connected")) {
            log.info("client:{} 上线", clientid);
        }
        if (action.equals("client_disconnected")) {
            log.info("client:{} 下线", clientid);
        }
    }
}
3. 监听结果

启动服务后,模拟客户端连接emqx,查看服务记录的日志:

五、总结

由此看出客户端建立连接和断开连接后,Emqx服务都会给配置的接口web.hook.api.url = http://127.0.0.1:8000/api/mqtt/webhook发送一条数据,该数据字段可以参考上述的:WebHook配置事件推送参数详解。使用此方法可以将mqtt应用的更加灵活。其他的监听事件配置也类似,遇到合适场景可以自行发挥。
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/156048.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【开源】基于Vue和SpringBoot的固始鹅块销售系统

项目编号&#xff1a; S 060 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S060&#xff0c;文末获取源码。} 项目编号&#xff1a;S060&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 鹅块类型模块2.3 固…

绕过类安全问题分析方法

什么是绕过 逻辑漏洞是指程序设计中逻辑不严密&#xff0c;使攻击者能篡改、绕过或中断程序&#xff0c;令其偏离开发人员预期的执行。 常见表现形式 1、接口&#xff08;功能类&#xff09;绕过&#xff1a;即接口或功能中通过某参数&#xff0c;绕过程序校验 2、流程类绕…

[模版总结] - 树的基本算法2 - BST

BST定义 BST - Binary Search Tree, 即二叉搜索树(有序二叉树) 特性 中序遍历有序查找/插入/删除某个数值可以通过 即树的高度&#xff0c;最优,最坏 . 有多种改进BST可以动态维持插入删除后树结构能尽可能保持平衡 BST基本操作 查询 - 二分查找 搜索数值 - 二分法 class…

有Mac或无Mac电脑通用的获取安卓公钥的方案

从2023年9月开始&#xff0c;所有上架应用市场的app都需要进行APP备案。 其中后端服务器在阿里云的可以在阿里云备案&#xff0c;后端服务器在腾讯云的可以在腾讯云备案。但无论你是在什么云厂商里做备案&#xff0c;无一例外的是&#xff0c;无论是上架安卓应用还是上架IOS应…

Python大数据学习问题整理汇总

day01 分区表与分桶表的区别 在这里插入代码片day02 数据仓分层/与本质 数据仓库(OLAP)的本质叫联机分析处理, 一般针对某些主题的历史数据进行分析 主要面向分析,支持管理决策。源数据层&#xff08;ODS&#xff09;&#xff1a; 此层数据无任何更改&#xff0c;直接沿用外围…

【2021集创赛】基于arm Cortex-M3处理器与深度学习加速器的实时人脸口罩检测 SoC

团队介绍 参赛单位&#xff1a;深圳大学 队伍名称&#xff1a;光之巨人队 指导老师&#xff1a;钟世达、袁涛 参赛队员&#xff1a;冯昊港、潘家豪、慕镐泽 图1 团队风采 1. 项目简介 新冠疫情席卷全球&#xff0c;有效佩戴口罩可以极大程度地减小病毒感染的风险。本项目开发…

JWT登录认证(3拦截器)

Jwt登录认证&#xff08;拦截器&#xff09;&#xff1a; 使用拦截器统一验证令牌 登录和注册接口需要放行 interceptors.LoginInterceptor&#xff1a;&#xff08;注册一个拦截器&#xff09; package com.lin.springboot01.interceptors;import com.lin.springboot01.pojo.…

wpf devexpress添加TreeListControl到项目

此教程示范如何添加TreeListControl到项目和绑定控件自引用数据源&#xff1a; 添加数据模型 绑定tree&#xff0c;并添加如下字段到数据源对象&#xff1a; Key字段包含唯一值索引节点 Parent字段包含父索引节点 添加数据模型&#xff08;Employee和Staff类&#xff09;到…

物理驱动深度学习方法总结

一、物理驱动深度学习方法总结 现有博主更新物理驱动深度学方法总体介绍 二、 PINN介绍 PINN综述Blog介绍&#xff1a;内嵌物理知识神经网络 &#xff08;Physics Informed Neural Network&#xff0c;简称PINN&#xff09; 是一种科学机器在传统数值领域的应用方法&…

软件测试基础 —— 单元测试

Hello&#xff01;大家好&#xff0c;我是BugBear&#xff0c;一个专注于分享软件测试干货的测试开发。 对于软件测试&#xff0c;我们先按照开发阶段来进行划分&#xff0c;将软件测试分为单元测试、集成测试、系统测试、验收测试&#xff0c;下面我们来聊聊单元测试。 1、什…

JVM-HotSpot虚拟机对象探秘

目录 一、对象的实例化 &#xff08;一&#xff09;创建对象的方式 &#xff08;二&#xff09;创建对象的步骤 二、对象的内存布局 &#xff08;一&#xff09;对象头 &#xff08;二&#xff09;实例数据 &#xff08;三&#xff09;对齐填充 三、 对象的访问定位 &…

小迈迈驰组态软件支持国产龙芯2K1000等处理器

自2019年起&#xff0c;南京迈思德电气自动化有限公司组织研发团队&#xff0c;进行跨平台组态软件的研发&#xff0c;适配国产处理器&#xff0c;目前已经完成单机版产品的研发&#xff0c;并在基于龙芯2K1000处理器[Loongnix操作系统]的加固人机界面产品上应用。 Loongnix是龙…

可逆矩阵的性质

如果矩阵A可逆&#xff0c;那么它的逆矩阵也可逆&#xff0c;并且如果矩阵A可逆&#xff0c;假设是一个不为0的数&#xff0c;那么也可逆&#xff0c;并且如果矩阵A和都可逆&#xff0c;而且它们的阶数也相同&#xff0c;那么它们的乘积也是可逆的&#xff0c;并且如果矩阵A可逆…

docker数据卷详细讲解及数据卷常用命令

docker数据卷详细讲解及数据卷常用命令 Docker 数据卷是一种将宿主机的目录或文件直接映射到容器中的特殊目录&#xff0c;用于实现数据的持久化和共享。Docker 数据卷有以下特点&#xff1a; 数据卷可以在一个或多个容器之间共享和重用&#xff0c;不受容器的生命周期影响。…

<Linux>(极简关键、省时省力)《Linux操作系统原理分析之Linux 进程管理 2》(6)

《Linux操作系统原理分析之Linux 进程管理 2》&#xff08;6&#xff09; 4 Linux 进程管理4.2 Linux 进程的状态和标识4.2.1 Linux 进程的状态及转换4.2.2 Linux 进程的标识4.2.3 进程标识哈希表 4 Linux 进程管理 4.2 Linux 进程的状态和标识 4.2.1 Linux 进程的状态及转换…

提高Producer的发送速度

发送一条消息出去要经过三步&#xff0c;一是客户端发送请求到服务器&#xff0c;二是服务器处理该请求&#xff0c;三是服务器向客户端返回应答&#xff0c;一次消息的发送耗时是上述三个步骤的总和。在一些对速度要求高&#xff0c;但是可靠性要求不高的场景下&#xff0c;比…

2023年中国机动车拍卖网络化趋势加速,网络拍卖专场数量大幅上升至47489场[图]

2022年&#xff0c;由于机动车拍卖网络化趋势继续加速&#xff0c;网络拍卖专场数量大幅上升&#xff0c;全国机动车专场拍卖会高达59450场&#xff0c;较上年攀升125.31%。在389家拍卖企业中&#xff0c;举办场次超过100场的企业有27家&#xff0c;合计54850场&#xff0c;占比…

11.4MyBatis(基础)

一.搭环境 1.创建完SSM项目,添加MySQL和MyBatis后,项目启动一定会报错,这是正常情况. 2.配置文件 properties: server.port9090 spring.datasource.urljdbc:mysql://127.0.0.1:3306/test1?characterEncodingutf8&useSSLfalse spring.datasource.usernameroot spring.d…

快速入门:构建您的第一个 .NET Aspire 应用程序

##前言 云原生应用程序通常需要连接到各种服务&#xff0c;例如数据库、存储和缓存解决方案、消息传递提供商或其他 Web 服务。.NET Aspire 旨在简化这些类型服务之间的连接和配置。在本快速入门中&#xff0c;您将了解如何创建 .NET Aspire Starter 应用程序模板解决方案。 …

贪吃蛇、俄罗斯方块

贪吃蛇 一、创建新项目 创建一个新的项目&#xff0c;并命名。 创建一个名为images的文件夹用来存放游戏相关图片。 然后再在项目的src文件下创建一个com.xxx.view的包用来存放所有的图形界面类&#xff0c; 创建一个com.xxx.controller的包用来存放启动的入口类(控制类) 二…