【数据挖掘】bytewax 与 ydata工具可实时了解您的数据

一、说明

         在这篇博文中,我们将介绍如何将开源流式处理解决方案 bytewax 与 ydata 分析相结合并加以利用,以提高流式处理流的质量。

        STream 处理支持在传输中和存储之前对数据进行实时分析,并且可以是有状态的,也可以是无状态的。

        有状态流处理用于实时建议、模式检测或复杂事件处理,其中处理需要已发生事件的历史记录(窗口、按键连接等)。

        无状态流处理用于内联转换,不需要了解流中的其他数据点,例如屏蔽电子邮件或转换类型。        

        总体而言,数据流在工业中被广泛使用,并且可以应用于欺诈检测患者监控事件预测维护等用例。

二、 数据流必须考虑关键是数据的质量

        与通常在创建数据仓库或仪表板解决方案期间评估数据质量的传统模型不同,流数据需要持续监视

        在从收集到馈送下游应用程序的整个过程中保持数据质量至关重要。毕竟,对于组织来说,糟糕的数据质量的成本可能很高。

        在本文中,我们将向您展示如何结合以分析和提高流媒体流的质量!bytewaxydata-profiling

三、使用 Bytewax 为数据专业人员提供流处理

          是专门为Python开发人员设计的OSS流处理框架。

        它允许用户构建具有类似于Flink,Spark和Kafka Streams功能的流数据管道和实时应用程序,同时提供友好和熟悉的界面以及与Python生态系统的100%兼容性。

        使用内置连接器或现有的 Python 库,您可以连接到实时和流数据源(Kafka、RedPanda、WebSocket 等),并将转换后的数据写入各种下游系统(Kafka、拼花地板文件、数据湖等)。

        对于转换,Bytewax 通过映射窗口聚合方法促进有状态和无状态转换,并具有恢复和可伸缩性等熟悉的功能。

        Bytewax 促进了 Python 优先和以数据为中心的数据流体验,并且是为数据工程师和数据科学家构建的。它允许用户构建流数据管道和实时应用程序,并创建满足其需求所需的自定义项,而无需学习和维护基于 JVM 的流平台,如 Spark 或 Flink。

        Bytewax 非常适合许多用例,即为生成 AI 嵌入管道、处理数据流中的缺失值、在流上下文中使用语言模型来理解金融市场等等。有关用例灵感和更多信息,如文档、教程和指南,请随时查看字节蜡网站。

四、为什么要对数据流进行数据剖析?

        数据剖析是成功启动任何机器学习任务的关键,指的是彻底了解数据的步骤:其结构、行为和质量。

        简而言之,数据分析涉及分析与数据格式和基本描述符相关的方面(例如,样本数量、特征的数量/类型、重复值)、其内在特征(例如存在缺失数据或不平衡的特征)以及在数据收集或处理过程中可能出现的其他复杂因素(例如,错误值或不一致的特征)。

        确保高数据质量标准对所有领域和组织都至关重要,但对于使用输出连续数据的域运营的领域尤其重要,其中情况可能会快速变化,可能需要立即采取行动(例如,医疗保健监测、股票价值、空气质量政策)。

        对于许多领域,从探索性数据分析的角度使用数据分析,考虑存储在数据库中的历史数据。相反,对于数据流,数据分析对于沿流持续验证和质量控制变得至关重要,需要在流程的不同时间范围或阶段检查数据。

        通过将自动分析嵌入到我们的数据流中,我们可以立即获得有关数据当前状态的反馈,并收到任何潜在关键问题的警报 - 无论是与数据一致性和完整性有关(例如,损坏的值或更改格式),还是与短时间内发生的事件(例如,数据漂移, 偏离业务规则和结果)。

        在现实世界的领域——你只知道墨菲定律一定会发生,“一切都可能出错”——自动分析可能会让我们免于多个大脑难题和需要停止生产的系统!

        在涉及数据剖析方面,无论是表格数据还是时间序列数据,它一直是大众的最爱。难怪为什么 - 它是一组广泛的分析和见解的一行代码。ydata-profiling

        复杂且耗时的操作是在后台完成的:ydata 分析会自动检测数据中包含的特征类型,并根据特征类型(数字或分类)调整分析报告中显示的汇总统计数据和可视化效果。

        该软件包促进了以数据为中心的分析,还突出了特征之间的现有关系,重点关注它们的成对交互相关性,并提供了对数据质量警报的全面评估,从重复常量值到偏斜不平衡的特征。

        它实际上是我们数据质量的 360º 视图 - 只需最少的努力。

        分析报告:突出显示潜在的数据质量问题。图片由作者提供。

五、把所有东西放在一起:字节蜡和ydata-profile。

        在开始项目之前,我们需要先设置 python 依赖项并配置数据源。

        首先,让我们安装 和 软件包(您可能希望为此使用虚拟环境 - 如果您需要一些额外的指导,请查看这些说明!bytewaxydata-profiling

pip install bytewax==0.16.2 ydata-profiling==4.3.1

        然后,我们将上传环境传感器遥测数据集(许可证 — CC0:公共域),其中包含来自不同 IoT 设备的温度、湿度、一氧化碳液化石油气、烟雾、光线和运动的多个测量值:        在生产环境中,这些测量将由每个设备连续生成,输入看起来像我们在 Kafka 等流媒体平台中期望的。在本文中,为了模拟我们在流数据中找到的上下文,我们将一次一行地从 CSV 文件中读取数据,并使用字节蜡创建数据流。

        (作为快速旁注,数据流本质上是一个数据管道,可以描述为有向无环图 — DAG)

        首先,让我们进行一些必要的导入

from datetime import datetime, timedelta, timezone

from bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutput
from bytewax.connectors.files import CSVInput
from bytewax.testing import run_main
        然后,我们定义数据流对象。之后,我们将使用无状态映射方法,在其中传入一个函数将字符串转换为 datetime 对象并将数据重组为格式(device_id,数据)。

        map 方法将以无状态的方式对每个数据点进行更改。我们修改数据形状的原因是,我们可以在接下来的步骤中轻松地对数据进行分组,以单独分析每个设备的数据,而不是同时分析所有设备的数据。

flow = Dataflow()
flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000"))

# parse timestamp
def parse_time(reading_data):
    reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc)
    return reading_data

flow.map(parse_time)


# remap format to tuple (device_id, reading_data)
flow.map(lambda reading_data: (reading_data['device'], reading_data))

        现在,我们将利用有状态功能,在我们定义的时间段内收集每个设备的数据。 需要一段时间内的数据快照,这使得窗口运算符成为执行此操作的完美方法。

        在bytewaxydata-profiling 中,我们能够为为特定上下文指定的数据帧生成汇总统计信息。例如,在我们的示例中,我们可以生成引用每个物联网设备或特定时间框架的数据快照:ydata-profiling

from bytewax.window import EventClockConfig, TumblingWindow

# This is the accumulator function, and outputs a list of readings
def acc_values(acc, reading):
    acc.append(reading)
    return acc


# This function instructs the event clock on how to retrieve the
# event's datetime from the input.
def get_time(reading):
    return reading["ts"]

  
# Configure the `fold_window` operator to use the event time.
cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30))

# And a tumbling window
align_to = datetime(2020, 1, 1, tzinfo=timezone.utc)
wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1))

flow.fold_window("running_average", cc, wc, list, acc_values)

flow.inspect(print)

        定义快照后,利用就像为我们要分析的每个数据帧调用 一样简单:ydata-profilingPorfileReport

import pandas as pd
from ydata_profiling import ProfileReport

def profile(device_id__readings):
    print(device_id__readings)
    device_id, readings = device_id__readings
    start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S')
    df = pd.DataFrame(readings)
    profile = ProfileReport(
        df,
        tsmode=True,
        sortby="ts",
        title=f"Sensor Readings - device: {device_id}"
    )

    profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html")
    return f"device {device_id} profiled at hour {start_time}"


flow.map(profile)

        在此示例中,我们将图像作为 map 方法中函数的一部分写入本地文件。这些可以通过消息传递工具报告出来,或者我们可以在将来将它们保存到一些远程存储中。配置文件完成后,数据流需要一些输出,因此我们可以使用内置设备打印已分析的设备,以及从映射步骤中的配置文件函数传递的配置文件时间:StdOutput

flow.output("out", StdOutput())

        有多种方法可以执行字节蜡数据流。在这个例子中,我们使用相同的本地机器,但 Bytewax 也可以在多个 Python 进程上运行,跨多个主机,在 Docker 容器中运行,使用 Kubernetes 集群等等。

        在本文中,我们将继续使用本地设置,但我们鼓励你查看我们的帮助程序工具 waxctl,该工具在管道准备好过渡到生产环境后管理 Kubernetes 数据流部署。

假设我们与具有数据流定义的文件位于同一目录中,则可以使用以下方法运行它:

python -m bytewax.run ydata-profiling-streaming:flow

        然后,我们可以使用分析报告来验证数据质量,检查模式或数据格式的更改,并比较不同设备或时间窗口之间的数据特征

        事实上,我们可以利用比较报告功能,以直接的方式突出显示两个数据配置文件之间的差异,从而更容易检测需要调查的重要模式或必须解决的问题:

snapshot_a_report = ProfileReport(df_a, title="Snapshot A")
snapshot_b_report = ProfileReport(df_b, title="Snapshot B")

comparison_report =snapshot_a_report(snapshot_b_report)
comparison_report.to_file("comparison_report.html")

六、准备好探索您自己的数据流了吗?

        验证数据流对于连续识别数据质量问题并比较不同时间段的数据状态至关重要。

        对于医疗保健能源制造娱乐领域的组织(所有组织都在处理连续的数据流),分析是建立从质量评估到数据隐私的数据治理最佳实践的关键

        这需要对数据快照进行分析,如本文所示,可以通过组合 和 无缝实现数据快照。bytewaxydata-profiling

        Bytewax负责处理数据流并将其构建为快照所需的所有过程,然后可以通过数据特征的综合报告对其进行汇总并与ydata分析进行比较。

        能够适当地处理和分析传入的数据开启了跨不同领域的大量用例,从纠正数据架构和格式中的错误到突出显示和缓解实际活动产生的其他问题,例如异常检测(例如,欺诈或入侵/威胁检测)、设备故障以及其他偏离预期的事件(例如,数据偏移或与业务规则不一致)。

        现在,您就可以开始探索数据流了!让我们知道您发现了哪些其他用例,并一如既往地在评论中给我们留言,或在以数据为中心的 AI 社区中找到我们以获取进一步的问题和建议!再见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/42597.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

双向循环链表的基本操作(创建节点,头插,头删,尾插,尾删)

头定义: typedef char datatype[20];//datatypechar[20] typedef struct Node {//数据域 数据元素datatype data;//指针域 下一个节点地址struct Node* next;//指针域 上一个节点地址struct Node* prev; }*DoubleLink; 创建链表节点: DoubleLink crea…

轻量级Web报表工具ActiveReportsJS全新发布v4.0,支持集成更多前端框架!

ActiveReportsJS 是一款基于 JavaScript 和 HTML5 的轻量级Web报表工具,采用拖拽式设计模式,不需任何服务器和组件支持,即可在 Mac、Linux 和 Windows 操作系统中,设计多种类型的报表。ActiveReportsJS 同时提供跨平台报表设计、纯…

第五章 编程之免交互

免交互:不需要人为控制就可以完成的自动化操作(自动化运维) shell脚本和面交互是一个概念,但是两种写法 shell:默认解释器是bash 使用i/o(输入/输出)重定向的方式,将命令的列表提供…

Flask 定制日志并输出到文件

Flask 定制日志并输出到文件 定制日志器flask缺省日志器配置自定义日志器 定制日志器 flask缺省日志器配置 flask自带的日志系统,缺省配置dictConfig(),但必须在Flask()应用之前使用 # flask缺省配置 from logging.config import dictConfig dictConfig…

文心千帆大模型测评分享,效果超出预期

一、前言 现如今,随着ChatGPT的爆火越来越多的人开始关注人工智能领域了,大家都在尝试使用它来帮助自己在工作上提高效率亦或是解决一些问题。但ChatGPT是有一定的使用门槛的:首先需要我们“科学上网”才能访问,其次GPT4的价格相…

C语言库函数 — 错误信息报告函数

前言 本文介绍错误信息报告函数 错误信息报告函数的作用: 帮助程序员快速定位代码中的错误,以便更快地进行调试和修复问题。 文章目录 前言一、错误信息报告函数什么是错误信息报告函数错误信息报告函数的作用strerror函数介绍strerror函数使用错误码对应…

Data Structure, Algorithm,and Applications in C++

在学习这本书进阶内容之前,我们可以跟着它的第一章部分再巩固和复习。本书由Sartaj Sahni撰写,由王立柱和刘志红翻译。全书通俗易懂,内容丰富,是巩固C内容的不二选择。希望本文对各位有所帮助。 目录 1.函数与参数 1.1.传值参数…

C++的类型转换

文章目录 一. C语言的类型转换二. C的四种类型转换1. static_cast2. reinterpret_cast3. const_cast4. dynamic_cast 三. RTTI结束语 一. C语言的类型转换 在C语言中,如果赋值运算符左右两侧类型不同,或者形参与实参类型不匹配,或者返回值类…

25 MFC 数据库

文章目录 导入ADO库 导入ADO库 #import "C:\Program Files\Common Files\System\ado\msado15.dll" no_namespace rename("EOF","rsEOF")void CADODlg::OnBnClickedBtnQuery() {//导入ADO库::CoInitialize(NULL);//初始化COM库_ConnectionPtr pCo…

ChatGPT 最佳实践指南

GPT Best Practices GPT 最佳实践指南 This guide shares strategies and tactics for getting better results from GPTs. The methods described here can sometimes be deployed in combination for greater effect. We encourage experimentation to find the methods that…

Python爬虫-贝壳二手房

前言 本文是该专栏的第3篇,后面会持续分享python爬虫案例干货,记得关注。 本文以某二手房网为例,如下图所示,采集对应城市的二手房源数据。具体思路和方法跟着笔者直接往下看正文详细内容。(附带完整代码) 正文 地址:aHR0cHM6Ly9zei5rZS5jb20vZXJzaG91ZmFuZy8= 目标:…

QT调用torch的环境配置(2023.7.19 / Win10+Qt+libtorch(1.9.1)+cuda11.1+cuDNN v8.0.4)

QT/C成功调用libtorch的环境配置(2023.7.19) QT/C成功调用libtorch的环境配置Pytorch 模型训练下载训练转化 libtorch模型使用下载C使用Qt使用 最后的话 QT/C成功调用libtorch的环境配置 背景:和同门一起搭的新系统是基于QT的,如…

Monocular 3D Object Detection with Depth from Motion 论文学习

论文链接:Monocular 3D Object Detection with Depth from Motion 1. 解决了什么问题? 从单目输入感知 3D 目标对于自动驾驶非常重要,因为单目 3D 的成本要比多传感器的方案低许多。但单目方法很难取得令人满意的效果,因为单张图…

NAT技术是什么?谈谈它的实现方式、优缺点以及作用

作者:Insist-- 个人主页:insist--个人主页 作者会持续更新网络知识和python基础知识,期待你的关注 前言 随着网络的不断发展,网络的应用也越来越多,有限的IPV4地址就显得不怎么够用,所以出现了NAT技术&…

Rust 数据类型 之 结构体(Struct)

目录 结构体(Struct) 定义与声明 结构体定义 结构体实例 结构体分类 单元结构体(Unit Struct) 元组结构体(Tuple Struct) 具名结构体(Named Struct) 结构体嵌套 结构体方法…

jenkins war包 centos启动安装指导

文章目录 步骤1:进入官网,下载到Jenkins的war包1.1 放置在指定位置1.2 放置安装包和创建文件放置路径1.3 检查环境1.4 配置启动命令和结束命令 步骤2: 启动后进入到Jenkins页面2.1 安装插件,例如流水线2.2 依然出现安装插件失败的…

疑问:为什么我的手机不能同时放两张电信卡呢?联通移动可以

很多后台的小伙伴私信我:“为什么我的双卡双待手机不能用两张电信卡呢?”其实我一直在认真的去查证这个问题,因为现在普遍网上的大流量手机卡套餐,电信是主力,如果第一张卡是电信,第二张卡不能使用电信了&a…

公网访问的Linux CentOS本地Web站点搭建指南

文章目录 前言1. 本地搭建web站点2. 测试局域网访问3. 公开本地web网站3.1 安装cpolar内网穿透3.2 创建http隧道,指向本地80端口3.3 配置后台服务 4. 配置固定二级子域名5. 测试使用固定二级子域名访问本地web站点 前言 在web项目中,部署的web站点需要被外部访问,则…

ES6基础知识一:说说var、let、const之间的区别

一、var 在ES5中,顶层对象的属性和全局变量是等价的,用var声明的变量既是全局变量,也是顶层变量 注意:顶层对象,在浏览器环境指的是window对象,在 Node 指的是global对象 var a 10; console.log(window.…

uview2.0使用u-calendar 的formatter属性,在formatter方法里无法访问this的bug,解决办法!!!!

uview 版本2.0.36 文档 使用该文档的案例,在 formatter打印this也会是undefined。 自己写了个demo 父给子传值v-bind传一个函数,然后在这个函数里面打印this,this是子组件的实例,但是不知道为什么formatter里会打印undefined。希…