MQTT协议下温度数据上报观测云最佳实践

MQTT 介绍

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)是一种轻量级的、基于发布/订阅模式的消息传输协议,专为低带宽、高延迟或不可靠的网络环境设计,广泛应用于物联网(IoT)、移动应用和分布式系统中,用于实现设备之间的高效通信。它通过减少数据传输量和简化通信流程,确保消息的可靠传输,同时支持多种服务质量(QoS)等级,以满足不同的业务需求。

观测云

观测云是一款专为 IT 工程师打造的全链路可观测产品,它集成了基础设施监控、应用程序性能监控和日志管理,为整个技术栈提供实时可观察性。这款产品能够帮助工程师全面了解端到端的用户体验追踪,了解应用内函数的每一次调用,以及全面监控云时代的基础设施。此外,观测云还具备快速发现系统安全风险的能力,为数字化时代提供安全保障。

Func 平台

观测云 Func 平台(DataFlux Func)是一个基于 Python 的函数计算与数据处理开发平台,旨在帮助用户快速搭建、管理和执行数据处理任务。它主要由 Server(提供 Web UI 和 API 接口)和 Worker(提供 Python 脚本执行环境)两部分组成。该平台支持多源数据对接,内置丰富的数据源连接器,可快速实现数据汇聚与处理。此外,Func 平台还具备定时任务管理、API 接口发布等功能,支持同步、异步和定时调用,极大简化了开发流程。通过与观测云的深度集成,用户可以利用其强大的可观测性能力,实现数据的实时处理、分析和可视化。

通过各类传感器接收设备数据,进行处理并通过 MQTT 协议上报,通过 Func 消费来自 MQTT 协议的数据并上报至观测云平台进行存储、展示、分析、预警等操作。

实战

  • 场景:采集 Linux 主机温度,上报至观测云。
  • 准备工作:EMQX,接收 MQTT 协议的数据并供 Client 端消费数据

采集温度

Sensors(传感器)采集温度是通过物理或化学原理将温度变化转换为电信号的过程。常见的温度传感器包括热敏电阻(NTC/PTC)、热电偶、热敏二极管和数字温度传感器(如DS18B20)。这些传感器通过感知环境温度的变化,输出与温度成正比或符合特定函数关系的电压、电流或数字信号。这些信号经过放大、滤波和模数转换后,可被微控制器或数据采集系统读取,从而实现对温度的实时监测和记录。

在 Shell 中执行 sensors 可以获取到当前 CPU、硬盘等温度信息。

通过脚本方式获取温度信息:

import psutil

def get_system_temperatures():
    temps = psutil.sensors_temperatures()if not temps:return "No temperature sensors found."
    
    result = ""for chip, sensors in temps.items():
        result += f"{chip}:\n"for sensor in sensors:
            result += f"  {sensor.label or 'Sensor'}: {sensor.current}°C (high={sensor.high}, critical={sensor.critical})\n"return result

# 调用函数并打印结果
system_temps = get_system_temperatures()print(system_temps)

说明:

  • psutil.sensors_temperatures() 返回系统中所有温度传感器的信息。
  • 每个传感器有 current(当前温度)、high(高温警戒值)和 critical(临界温度)等属性。

上报到 EMQX

import psutil
import paho.mqtt.client as mqtt
import time
import json

def get_system_temperatures():
    temps = psutil.sensors_temperatures()
    if not temps:
        return "No temperature sensors found."
    
    result = {}
    for chip, sensors in temps.items():
        result[chip] = []
        for sensor in sensors:
            if sensor.label is not None and sensor.label != "":
                result[chip].append({
                    'host': "liurui",
                    'label': sensor.label,
                    'current': sensor.current,
                    'high': sensor.high,
                    'critical': sensor.critical
                })
    return result

def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")

def publish_temperatures(temps, broker, port, topic):
    client = mqtt.Client()
    client.on_connect = on_connect
    client.connect(broker, port, 60)
    client.loop_start()

    # 打包所有传感器数据成一个JSON对象
    payload = json.dumps(temps)
    client.publish(topic, payload)
    print(f"Published: {payload}")

    client.loop_stop()

if __name__ == "__main__":
    broker = "1.1.1.1"  # 替换为你的EMQX broker地址
    port = 1883  # 默认MQTT端口
    topic = "temperature"  # 替换为你想要发布的主题

    while True:
        system_temps = get_system_temperatures()
        if isinstance(system_temps, str):
            print(system_temps)
        else:
            publish_temperatures(system_temps, broker, port, topic)
        time.sleep(5)  # 每5秒采集并上报一次

申请观测云 API Key

登陆观测云控制台,点击菜单「管理」-「API Key 管理」,新建 API Key。

保存 Key ID 和 Key,后续 Func 平台需要用到。

Func 消费 MQTT 数据

1、新建脚本集

2、新建脚本

脚本内容如下:

import json

guance = DFF.CONN('GuanceAPI')

@DFF.API('Message Handler')
def message_handler(topic, message):
    print(f"Received message: {message} on topic {topic}")
    if topic == "temperature":
        parse_and_print_temperatures(message)

def parse_and_print_temperatures(temps_json):
    # 解析 JSON 字符串为 Python 字典
    temps_dict = json.loads(temps_json)
    result = []
    # 遍历每个芯片
    for chip, sensors in temps_dict.items():
        print(f"Chip: {chip}")
        # 遍历每个传感器
        for sensor in sensors:
            result.append({
                    'measurement': 'temperature',
                    'tags': {
                        'host': sensor['host'],
                        'chip': chip,
                        'label': sensor['label']
                    },
                    'fields': {
                        'current': sensor['current'],
                        'high': sensor['high'],
                        'critical': sensor['critical']
                    }
                })

    uploadDataKit(result)

def uploadDataKit(data):
        # 获取 DataKit 操作对象
    status_code, result = guance.dataway.write_by_category_many(category='metric', data=data)
    print(f"上报结果:{status_code}")
3、发布脚本

点击发布按钮进行发布。

4、创建观测云连接器
  • 类型:观测云
  • ID: GuanceAPI

注意:ID 需要与脚本 guance = DFF.CONN('GuanceAPI') 的 ID 一致,其他字段按照实际情况填写。

5、创建 MQTT 连接器
  • 类型为 MQTT Broker (v5.0)
  • 填写 ID、主机、端口
  • 选择主题以及对应主题消费的脚本
  • 点击测试连通性,确保 MQTT 可以正常链接
  • 点击保存即可

效果展示

数据上报至观测云后,可以通过仪表板使用以下 DQL 语句可以查看温度的趋势图。

M::`temperature`:(last(`current`)) BY `chip`, `label`

总结

通过观测云 Func 平台接收来自 MQTT 协议的指标、日志、链路等数据,并通过观测云提供的 API 进行封装,上报至观测云平台进行统一的管理、可视化分析、告警等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/983318.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

立即释放 Mac 空间!Duplicate File Finder 8 重复文件高速清理工具

Duplicate File Finder 专业的 Mac 重复文件清理工具。查找并删除重复的文件、文件夹,甚至相似的照片。 不要让无用的文件占用磁盘上的宝贵空间。 整理你的 Mac。用最好的重复文件查找器来管理你的文件集合。 扫描任何磁盘或文件夹 主文件夹、照片/音乐库、外部磁…

一文对比RAGFLOW和Open WebUI【使用场景参考】

一、RAGFLOW与Open WebUI RAGFLOW是一款基于深度文档理解构建的开源 RAG(Retrieval-Augmented Generation)引擎。RAGFlow 可以为各种规模的企业及个人提供一套精简的 RAG 工作流程,结合大语言模型(LLM)针对用户各类不…

SpringBoot整合Caffeine本地缓存

本文章摘自 Java技术小馆https://www.yuque.com/jtostring/am5oq3/ac34uu2liy50t042?singleDoc#LyUGd 在现代的微服务架构中,缓存已经成为提升系统性能、降低数据库压力和提高响应速度的关键技术之一。对于Java开发者而言,Spring Boot作为一种开发框架…

腾讯 TDF 即将开源 Kuikly 跨端框架,Kotlin 支持全平台

今天,在腾讯的 Shiply 平台看 Flutter 动态化自研框架 Conch 时,在侧边栏看到了有「跨端开发框架」的介绍,点开发现有两个产品: Hippy:面向前端技术栈的跨端开发框架,Web原生开发体验,支持 Rea…

性能案例经验总结

数据库案例总结案例一:索引创建不合适导致性能问题背景接口getResourceByRoleID在单交易测试时,发现接口响应时间过长,DB 消耗资源比较严重。 关键字db2advis 、执行计划、runstat 、权限接口 问题分析首先是找出接口调用的SQL语句,有两个方法通过DB2top 命令 查找通过以下提…

【哇! C++】类和对象(三) - 构造函数和析构函数

目录 一、构造函数 1.1 构造函数的引入 1.2 构造函数的定义和语法 1.2.1 无参构造函数: 1.2.2 带参构造函数 1.3 构造函数的特性 1.4 默认构造函数 二、析构函数 2.1 析构函数的概念 2.2 特性 如果一个类中什么成员都没有,简称为空类。 空类中…

分布式存储—— HBase数据模型 详解

目录 1.3 HBase数据模型 1.3.1 两类数据模型 1.3.2 数据模型的重要概念 1.3.3 数据模型的操作 1.3.4 数据模型的特殊属性 1.3.5 CAP原理与最终一致性 1.3.6 小结 本文章参考、总结于学校教材课本《HBase开发与应用》 1.3 HBase数据模型 在开始学习HBase之前非常…

WebSocket:实现实时通信的利器

在现代Web应用中,实时通信变得越来越重要。无论是聊天应用、在线游戏,还是实时数据推送,传统的HTTP请求-响应模式已经无法满足需求。WebSocket作为一种全双工通信协议,应运而生,成为实现实时通信的利器。本文将深入探讨…

Aruco 库详解:计算机视觉中的高效标记检测工具

1. 引言:Aruco 在计算机视觉中的重要性 在计算机视觉领域,标记(Marker)检测和识别是许多应用的基础,包括 机器人导航、增强现实(AR)、相机标定(Calibration)以及物体跟踪…

SQL_语法

1 数据库 1.1 新增 create database [if not exists] 数据库名; 1.2 删除 drop database [if exists] 数据库名; 1.3 查询 (1) 查看所有数据库 show databases; (2) 查看当前数据库下的所有表 show tables; 2 数据表 2.1 新增 (1) 创建表 create table [if not exists…

react中的fiber和初次渲染

源码中定义了不同类型节点的枚举值 组件类型 文本节点HTML标签节点函数组件类组件等等 src/react/packages/react-reconciler/src/ReactWorkTags.js export const FunctionComponent 0; export const ClassComponent 1; export const IndeterminateComponent 2; // Befo…

AutoGen学习笔记系列(七)Tutorial - Managing State

这篇文章瞄准的是AutoGen框架官方教程中的 Tutorial 章节中的 Managing State 小节,主要介绍了如何对Team内的状态管理,特别是如何 保存 与 加载 状态,这对于Agent系统而言非常重要。 官网链接:https://microsoft.github.io/auto…

Compose Multiplatform+Kotlin Multiplatfrom 第四弹跨平台

文章目录 引言功能效果开发准备依赖使用gradle依赖库MVIFlow设计富文本显示 总结 引言 Compose Multiplatformkotlin Multiplatfrom 今天已经到compose v1.7.3,从界面UI框架上实战开发看,很多api都去掉实验性注解,表示稳定使用了!…

[Java基础-线程篇]7_线程设计模式与总结

摘要:懒汉单例模式怎么变得线程安全?Master-Worker归并模式,工作窃取算法。Java线程相关源码使用了什么设计模式? 资料引用:《Java高并发核心编程卷2》 目录 线程安全的单例模式 Master-Worker模式 工作窃取算法 …

Kubermetes 部署mysql pod

步骤 1: 创建 PersistentVolume 和 PersistentVolumeClaim 首先为 MySQL 创建一个 PersistentVolume (PV) 和 PersistentVolumeClaim (PVC) 来确保数据的持久性。 mysql-pv.yaml: apiVersion: v1 kind: PersistentVolume metadata:name: mysql-pv-volume spec:cap…

【四.RAG技术与应用】【12.阿里云百炼应用(下):RAG的云端优化与扩展】

在上一篇文章中,我们聊了如何通过阿里云百炼平台快速搭建一个RAG(检索增强生成)应用,实现文档智能问答、知识库管理等基础能力。今天咱们继续深入,聚焦两个核心问题:如何通过云端技术优化RAG的效果,以及如何扩展RAG的应用边界。文章会穿插实战案例,手把手带你踩坑避雷。…

交叉编译openssl及curl

操作环境:Ubuntu20.04 IDE工具:Clion2020.2 curl下载地址:https://curl.se/download/ openssl下载地址:https://openssl-library.org/source/old/index.html 直接交叉编译curl会报错找不到openssl,所以需要先交叉编…

在笔记本电脑上用DeepSeek搭建个人知识库

最近DeepSeek爆火,试用DeepSeek的企业和个人越来越多。最常见的应用场景就是知识库和知识问答。所以本人也试用了一下,在笔记本电脑上部署DeepSeek并使用开源工具搭建一套知识库,实现完全在本地环境下使用本地文档搭建个人知识库。操作过程共…

HarmonyOS 应用程序包结构 (发布态)

每个应用中至少包含一个.hap文件,可能包含若干个.hsp文件、也可能不含,一个应用中的所有.hap与.hsp文件合在一起称为Bundle,其对应的bundleName是应用的唯一标识(详见app.json5配置文件中的bundleName标签)。 当应用发…

idea中的查看git历史记录,不显示详细信息

一、正常情况显示 1、idea中git查看history正常显示如下图: 二、非正常情况下显示 1、idea中git查看history,现在不显示提交的历史文件详细信息,如下图: 三、解决方式 1、找到如下窗口中画红色框的黑色线条,鼠标放在…