EMQX构建简易的云服务

基本思路:

  1. 使用EMQX作为Mqtt broker
  2. mqtt-receive-server服务,用于接收设备上报的数据
  3. mqtt-sender-service服务,用于下发数据给设备
  4. KafKa实现数据解耦,mqtt-receive-server服务接收的数据简单处理下直接扔到Kafka中
  5. 云服务各业务系统从KafKa中消费数据,各业务需要下发数据的话,调用mqtt-sender-service接口下发数据给设备

基本流程

在这里插入图片描述

DashBoard 定义认证用户

在这里插入图片描述

定义Mqtt协议主题

// 设备激活
public final static String ACTIVATE = "mqtt/0/1";
// 设备重置
public final static String RESET = "mqtt/0/0";
// 上线
public final static String ONLINE = "mqtt/1/1";
// 下线
public final static String OFFLINE = "mqtt/1/0";
// 上行-设备上报数据到平台
public final static String REPORT = "mqtt/2/1";
// 下行-平台下发数据给设备
public final static String ISSUED = "%s/2/0";

设备认证流程

首先在云平台创建产品,生成PK/PS,用于Mqtt Broker的连接认证
将PK/PS烧录到设备中
设备开机启动,首次连接平台携带PK/PS/DK,mqtt连接成功后,云服务端会下发DS给到设备,并标识设备已激活
设备再次连接云服务,mqtt连接成功后,会校验DK/DS是否合法,不合法将设备踢下线。
设备订阅${clientId}/2/0主题

@PostConstruct
 public void init() throws MqttException {
     client.setCallback(new MqttCallbackHandler());
     client.subscribe(String.format(MqttTopicConstant.ISSUED, client.getClientId()));
 }

mqtt-receive-server服务

使用EMQX内置的用户,连接Mqtt Broker,clientId=mqtt_receive_server
订阅ACTIVATE 、RESET 、ONLINE 、OFFLINE 、REPORT 等主题
将接收的数据简单处理,转发到KafKa

mqtt:
    broker-url: tcp://42.194.132.44:1883
    client-id: mqtt_receive_server
    username: mqtt_server
    password: 9b31fa798e16532b0285e130b004836d33391f908f043f2ce0897eea0a669fa0
@PostConstruct
public void init() throws MqttException {
    client.setCallback(new MqttCallbackHandler(kafkaService));
    subscribe(MqttTopicConstant.ACTIVATE);
    subscribe(MqttTopicConstant.RESET);
    subscribe(MqttTopicConstant.ONLINE);
    subscribe(MqttTopicConstant.OFFLINE);
    subscribe(MqttTopicConstant.REPORT);
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
    String data = new String(message.getPayload());
    log.info("接收消息主题:{}, Qos:{}, 消息内容:{}", topic, message.getQos(), data);
    UpData upData = JSONObject.parseObject(data, UpData.class);
    UpKafKaData upKafKaData = new UpKafKaData(topic, data);
    log.info("upKafKaData: {}", JSON.toJSONString(upKafKaData));
    kafkaService.sendData(UP_DATA_TOPIC, upData.getClientId(), JSON.toJSONString(upKafKaData));
}

mqtt-sender-service服务

使用EMQX内置的用户,连接Mqtt Broker,clientId=mqtt_sender_server
不订阅主题,只下发数据,下发数据主题为${clientId}/2/0
提供API给给业务子系统使用,用于下发数据给设备

mqtt:
    broker-url: tcp://42.194.132.44:1883
    client-id: mqtt_sender_server
    username: mqtt_server
    password: 9b31fa798e16532b0285e130b004836d33391f908f043f2ce0897eea0a669fa0
package com.angel.ocean.listener;

import com.alibaba.fastjson2.JSONObject;
import com.angel.ocean.contants.MqttTopicConstant;
import com.angel.ocean.domain.UpKafKaData;
import com.angel.ocean.domain.client.ActivateData;
import com.angel.ocean.mqtt.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import static com.angel.ocean.contants.KafkaTopicConstant.UP_DATA_TOPIC;

@Slf4j
@Component
public class UpDataConsumerListener {

    @Resource
    private MqttService mqttService;

    /**
     * 批量消费
     */
    @KafkaListener(topics = UP_DATA_TOPIC, containerFactory = "batchFactory")
    public void batchListen(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
        try {
            log.info("UpDataConsumerListener.batchListen(), records.size: {}", records.size());
            for (ConsumerRecord<String, String> record : records) {
                UpKafKaData data = JSONObject.parseObject(record.value(), UpKafKaData.class);
                log.info("{}", record.value());
                handler(data.getTopic(), data.getData());
            }
        } catch (Exception e) {
            log.error("UpDataConsumerListener.batchListen() Exception:{}", e.getMessage(), e);
        } finally {
            // 手动确认
            ack.acknowledge();
        }
    }

    private void handler(String topic, String data) {
        switch (topic) {
            case MqttTopicConstant.ACTIVATE:
                activateHandler(data);
                break;
            case MqttTopicConstant.RESET:
                otherHandler(data);
                break;
            case MqttTopicConstant.OFFLINE:
                otherHandler(data);
                break;
            case MqttTopicConstant.ONLINE:
                otherHandler(data);
                break;
            case MqttTopicConstant.REPORT:
                otherHandler(data);
                break;
            default:
                otherHandler(data);
        }
    }

    private void activateHandler(String data) {
        ActivateData activateData = JSONObject.parseObject(data, ActivateData.class);
        String clientId = activateData.getClientId();
        mqttService.publish(String.format(MqttTopicConstant.ISSUED, clientId), "200");
    }

    private void otherHandler(String data) {
        log.info("{}", data);
    }

}
package com.angel.ocean.controller;

import com.angel.ocean.common.ApiResult;
import com.angel.ocean.contants.MqttTopicConstant;
import com.angel.ocean.mqtt.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;

@Slf4j
@RestController
@RequestMapping("/mqtt/server")
public class MqttController {

    @Resource
    private MqttClient server;

    @Resource
    private MqttService mqttService;

    /**
     * 数据下发接口
     * @param clientId
     * @param data
     * @return
     */
    @RequestMapping("/sender")
    public ApiResult<?> publish(String clientId, String data) {

        String topic = String.format(MqttTopicConstant.ISSUED, clientId);

        mqttService.publish(topic, data);

        if(server.isConnected()) {
            MqttMessage message = new MqttMessage(data.getBytes());
            message.setQos(0);
            try {
                server.publish(topic, message);
                log.info("Message published, topic:{}, data:{}", topic, data);
            } catch (MqttException e) {
                log.error("Message publish failed, topic:{}", topic, e);
                return ApiResult.error();
            }
            return ApiResult.success();
        }

        log.info("Message publish failed, not online.");

        return ApiResult.error();
    }
}

代码验证

场景:设备上报消息,云服务端回复消息给设备; 云服务主动下发数据给设备。

模拟设备上报消息, 接收云平台回复

发了两次:
在这里插入图片描述mqtt-client 本地客户端日志:

在这里插入图片描述
mqtt-receive-server云服务日志:

在这里插入图片描述
mqtt-sender-server云服务日志:

在这里插入图片描述

模拟云平台主动下发数据

在这里插入图片描述mqtt-sender-server云服务主动下发的日志:

在这里插入图片描述mqtt-client数据接收日志:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/940725.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于MindSpore NLP的PEFT微调

创建notebook 登录控制台 创建notebook 如果出现提示按如下操作 回到列表页面创建notebook参数如下&#xff1a; 配置mindnlp环境 打开GitHub - mindspore-lab/mindnlp: Easy-to-use and high-performance NLP and LLM framework based on MindSpore, compatible with model…

半连接转内连接 | OceanBase SQL 查询改写

查询优化器是关系型数据库系统的核心模块&#xff0c;是数据库内核开发的重点和难点&#xff0c;也是衡量整个数据库系统成熟度的“试金石”。为了帮助大家更好地理解 OceanBase 查询优化器&#xff0c;我们撰写了查询改写系列文章&#xff0c;带大家更好地掌握查询改写的精髓&…

imx6ull qt多页面控制系统(正点原子imx系列驱动开发)

开题答辩完了也考完了四六级&#xff0c;赶紧来更新一下一个月前留下的坑吧 QAQ首先&#xff0c;因为毕业设计需要用到这些知识所以就从网络上找了一个智能车机系统&#xff0c;借鉴了一下大佬的项目思路&#xff0c;缝缝补补一个月终于完成了这一内容。 在这里先感谢从两位大佬…

Intel-ECI之Codesys PLC + Ethercat 远端IO + Codesys IDE编程

目录 一、 准备工作 二、安装Codesys 软件 PLC 三、 使用Codesys IDE 编程测试 CODESYS* 是领先的独立于制造商的 IEC 61131-3 自动化软件&#xff0c;适用于工程控制系统。它用于 Intel Edge Controls for Industrial&#xff08;Intel ECI 或 ECI&#xff09;&#xff0c;…

vscode的keil assistant 中搜索不到全局变量

搜不到 但是在包含的文件中输入 ../../../,就是全局搜索的结果 我的文件结构是&#xff1a;\Desktop\LVGL文件系统移植&#xff08;lvgl8&#xff0e;&#xff13;&#xff09;\Projects\MDK-ARM 盲猜是keil assistant 当前文件夹打开的时候是进入到了MDK-ARM文件夹层次&…

Unity A*算法实现+演示

注意&#xff1a; 本文是对基于下方文章链接的理论&#xff0c;并最终代码实现&#xff0c;感谢作者大大的描述&#xff0c;非常详细&#xff0c;流程稍微做了些改动&#xff0c;文末有工程网盘链接&#xff0c;感兴趣的可以下载。 A*算法详解(个人认为最详细,最通俗易懂的一…

格式工厂,各类文件格式转换

今天给大家推荐一个老牌的软件格式工厂。这个软件早就能支持转换视频、音频、图片、文档等市面上主流格式的软件了&#xff0c;现在也很能打。 格式工厂 各类文件格式转换 软件无需安装&#xff0c;打开这个图标就能直接使用。 屏幕录像功能还是非常强大的&#xff0c;可以全屏…

Java web的发展历史

目录 前言&#xff1a; 一.Model I和Model II 1.Model I开发模式 ​编辑 2.Model II开发模式 二. MVC模式 前言&#xff1a; 该篇文章主要介绍了Java web的发展历史&#xff0c;以及MVC相关内容 一.Model I和Model II 1.Model I开发模式 Model1的开发模式是&#xff…

Pyqt6在lineEdit中输入文件名称并创建或删除JSON文件

1、创建JSON文件 代码 import osdef addModulekeyWordFile(self):if "" ! self.lineEdit_module.text():moduleFile self.lineEdit_module.text() .jsonelse:self.toolLogPrinting(请输入模块名称)returnfilePath modulekeyWordFileDir moduleFileif os.path.e…

练习题 最小栈

最小栈 最小栈 class MinStack {private Stack<Integer> stack;private Stack<Integer> minstack;public MinStack() {stacknew Stack<>();minstacknew Stack<>();}public void push(int val) {stack.push(val);if(minstack.empty()){minstack.push(…

概率论得学习和整理32: 用EXCEL描述正态分布,用δ求累计概率,以及已知概率求X的区间

目录 1 正态分布相关 2 正态分布的函数和曲线 2.1 正态分布的函数值&#xff0c;用norm.dist() 函数求 2.2 正态分布的pdf 和 cdf 2.3 正态分布的图形随着u 和 δ^2的变化 3 正态分布最重要的3δ原则 3.0 注意&#xff0c;这里说的概率一定是累计概率CDF&#xff0c;而…

食家巷大烤馍:岁月沉淀下的麦香传奇

在繁华都市的街角巷尾&#xff0c;隐藏着许多不为人知的美食宝藏&#xff0c;食家巷大烤馍便是其中之一。它宛如一位低调的美食大师&#xff0c;默默散发着独特的魅力&#xff0c;用最质朴的味道&#xff0c;征服着每一个过往食客的味蕾。 初见食家巷大烤馍&#xff0c;你会被…

wxWidgets使用wxStyledTextCtrl(Scintilla编辑器)的正确姿势

开发CuteMySQL/CuteSqlite开源客户端的时候&#xff0c;需要使用Scintilla编辑器&#xff0c;来高亮显示SQL语句&#xff0c;作为C/C领域最成熟稳定又小巧的开源编辑器&#xff0c;Scintilla提供了强大的功能&#xff0c;wxWidgets对Scintilla进行包装后的是控件类&#xff1a;…

【基础还得练】数值分析中的样条插值

什么是三次样条&#xff08;Cubic Spline&#xff09;&#xff1f; 三次样条&#xff08;Cubic Spline&#xff09;是一种常用于数据插值和曲线拟合的数学方法&#xff0c;它利用多个三次多项式函数来平滑连接数据点&#xff0c;使得拟合曲线不仅通过所有数据点&#xff0c;同时…

AMS1117芯片驱动电路·降压芯片的驱动电路详解

目录 AMS1117常见封装 AMS1117不同系列 AMS1117驱动电路 参考数据手册 编写不易&#xff0c;仅供学习&#xff0c;请勿搬运&#xff0c;感谢理解 相同LDO芯片驱动专栏文章 LM7805系列降压芯片驱动电路降压芯片驱动电路详解-CSDN博客 ME6211C系列降压芯片驱动电路降压芯片…

[项目代码] YOLOv8 遥感航拍飞机和船舶识别 [目标检测]

项目代码下载链接 &#xff1c;项目代码&#xff1e;YOLO 遥感航拍飞机和船舶识别&#xff1c;目标检测&#xff1e;https://download.csdn.net/download/qq_53332949/90163939YOLOv8是一种单阶段&#xff08;one-stage&#xff09;检测算法&#xff0c;它将目标检测问题转化为…

《Qt Creator 4.11.1 教程》

《Qt Creator 4.11.1 教程》 一、Qt Creator 4.11.1 概述&#xff08;一&#xff09;简介&#xff08;二&#xff09;界面构成 二、常用设置指南&#xff08;一&#xff09;环境设置&#xff08;二&#xff09;文本编辑器设置&#xff08;三&#xff09;构建和运行设置 三、构建…

探索未知,乐享惊喜 —— 盲盒APP开发,开启您的个性化惊喜之旅!

在这个瞬息万变的数字时代&#xff0c;我们总在寻找那些能触动心灵、带来无限可能的小确幸。为了满足您对未知的好奇与对惊喜的渴望&#xff0c;我们匠心打造了一款全新的盲盒APP&#xff0c;旨在为您的生活增添一抹不同寻常的色彩&#xff0c;让每一次打开都是一次全新的探索与…

前端和后端解决跨域问题的方法

目前很多java web开发都是采用前后端分离框架进行开发&#xff0c;相比于单体项目容易产生跨域问题。 一、跨域问题CORS 1.什么是跨域问题&#xff1f; 后端接收到请求并返回结果了&#xff0c;浏览器把这个响应拦截了。 2.跨域问题是怎么产生的&#xff1f; 浏览器基于同源…

c#上班,上学,交通方式接口

using System;namespace INTERFACE {abstract class Person{public string Name { get; set; }public int Age { get; set; }public virtual void ShowInfo(){Console.WriteLine($"Name: {Name}, Age: {Age}");}}// 接口 IWorkinterface IWork{void GotoCompany();}/…