PyDeequ库在AWS EMR启动集群中数据质量检查功能的配置方法和实现代码

PyDeequ是一个基于Apache Spark的Python API,专门用于定义和执行“数据单元测试”,从而在大规模数据集中测量数据质量。
PyDeequ框架在PySpark代码中提供了全面的数据质量检查功能,能够帮助用户&有效地监控和提升大规模数据集的数据质量。它在PySpark代码中的数据质量检查功能主要包括以下几个方面:

核心组件

  1. 指标计算(Metrics Computation):利用分析器(Analyzers)对数据集的每一列进行分析,生成数据概要。

  2. 约束建议:自动提出基于不同分析组的验证约束,以确保数据的一致性。

  3. 约束验证:依据设定的标准对数据集进行实时或批量验证。

  4. 度量存储库:实现对验证历史的跟踪与存储,便于持续监控数据质量。

功能特性

  1. 数据剖析:PyDeequ可以对数据集的每一列进行深入的剖析,包括数据的完整性、空值情况、唯一性统计等关键指标。

  2. 约束定义与验证:用户可以定义各种数据质量约束,如数据的类型、范围、唯一性、非空性等,并使用PyDeequ对这些约束进行验证。验证结果会明确指出哪些数据不符合预设的约束条件。

  3. 灵活性与可扩展性:PyDeequ支持用户根据业务需求自定义约束条件和分析规则,灵活应对各种数据质量挑战。同时,它也易于集成到现有的PySpark工作流中。

  4. 报告与监控:PyDeequ可以生成详细的数据质量报告,帮助用户了解数据集的整体质量情况。此外,它还支持对验证历史的跟踪与存储,便于用户持续监控数据质量的变化趋势。

应用场景

  1. 数据湖管理:在AWS Glue、Athena等服务的支持下,PyDeequ可以帮助用户监控数据湖中的数据质量。

  2. 数据仓库:在数据仓库中,PyDeequ可以用于定期检测数据质量,防止数据质量问题影响业务决策。

  3. 实时数据处理:在实时数据处理系统中,PyDeequ可以用于实时监控数据流的质量。

一、AWS EMR 集群配置 PyDeequ 的具体步骤

1. 创建 Bootstrap Script (引导脚本)

PyDeequ 依赖 Java 库和 Python 包,需在 EMR 集群初始化时自动安装。

#!/bin/bash
# bootstrap.sh

# 安装 Python 依赖
sudo pip3 install pydeequ

# 下载 Deequ JAR 包到 Spark 类路径
aws s3 cp s3://deequ/jars/deequ-2.0.3-spark-3.1.jar /usr/lib/spark/jars/
2. 启动 EMR 集群时指定 Bootstrap 动作

通过 AWS CLI 或控制台启动集群时添加以下参数:

aws emr create-cluster \
--name "PyDeequ_Cluster" \
--release-label emr-6.9.0 \
--applications Name=Spark Name=Hadoop \
--instance-type m5.xlarge \
--instance-count 3 \
--bootstrap-actions Path="s3://your-bucket/bootstrap.sh" \
--use-default-roles
3. 关键验证点
  • 确保 JAR 文件路径正确:/usr/lib/spark/jars/deequ-*.jar
  • Python 环境需为 3.x,可通过 EMR 配置 emr-release-label >= 6.0

二、PyDeequ 数据质量检查核心代码示例

1. 初始化 SparkSession
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PyDeequ-Data-Quality") \
    .config("spark.jars.packages", "com.amazon.deequ:deequ:2.0.3") \
    .config("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED") \
    .getOrCreate()
2. 指标计算(Metrics Computation)
from pydeequ.analyzers import *

df = spark.read.parquet("s3://your-data-bucket/transactions")

analysisResult = AnalysisRunner(spark) \
    .onData(df) \
    .addAnalyzer(Size()) \
    .addAnalyzer(Completeness("customer_id")) \
    .addAnalyzer(ApproxCountDistinct("order_id")) \
    .addAnalyzer(Mean("total_amount")) \
    .run()

analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()
3. 约束建议(Constraint Suggestion)
from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
    .onData(df) \
    .addConstraintRule(DEFAULT()) \
    .run()

print("Suggested Constraints:")
for constraint in suggestionResult['constraint_suggestions']:
    print(f"- {constraint['description']}")
4. 约束验证(Constraint Verification)
from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Error, "DataQualityCheck")

result = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x >= 1000) \
            .isComplete("customer_id") \
            .isUnique("order_id") \
            .isNonNegative("total_amount") \
            .hasPattern("email", r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$") \
    ).run()

result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
result_df.show(truncate=False)
5. 指标存储(Metric Repository)
from pydeequ.repository import *
from pydeequ.metrics import *

metrics_repository = FileSystemMetricsRepository(spark, path="s3://quality-metrics-bucket/")
result_key = ResultKey(spark, datetime.strptime("2024-01-01", "%Y-%m-%d"))

AnalysisRunner(spark) \
    .onData(df) \
    .useRepository(metrics_repository) \
    .saveOrAppendResult(result_key) \
    .addAnalyzer(Completeness("customer_id")) \
    .run()

三、关键配置说明

组件配置要点
JAR 依赖Deequ JAR 必须位于 Spark 的 jars 目录,版本需与 Spark 兼容
Python 版本EMR 6.x 默认使用 Python 3.7+,需通过 pip3 安装
权限配置EMR 角色需有权限访问 S3 存储桶(读取数据/写入指标)
优化参数调整 Spark 内存分配(spark.executor.memory)以处理大规模数据

四、高级应用场景扩展

1. 实时数据质量监控(Kafka 集成)
stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka-host:9092") \
    .option("subscribe", "transactions-topic") \
    .load()

def quality_check_microbatch(df, epoch_id):
    VerificationSuite(spark).onData(df).addCheck(...).run()

stream_df.writeStream \
    .foreachBatch(quality_check_microbatch) \
    .start()
2. 自定义分析规则
from pydeequ.analyzers import Analyzer

class CustomRangeAnalyzer(Analyzer):
    def __init__(self, column, min_val, max_val):
        super().__init__()
        self.column = column
        self.min = min_val
        self.max = max_val

    def to_metric(self, state):
        # 实现自定义指标计算逻辑
        pass

analysisResult = AnalysisRunner(spark) \
    .addAnalyzer(CustomRangeAnalyzer("temperature", 0, 100)) \
    .run()

以上配置和代码实现了 PyDeequ 在 AWS EMR 的完整数据质量流水线。实际部署时需根据数据规模调整 Spark 资源配置(spark-submit 参数),并建议将质量报告存储至 DynamoDB 或 Amazon CloudWatch 实现可视化监控。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/962523.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

因果推断与机器学习—用机器学习解决因果推断问题

Judea Pearl 将当前备受瞩目的机器学习研究戏谑地称为“仅限于曲线拟合”,然而,曲线拟合的实现绝非易事。机器学习模型在图像识别、语音识别、自然语言处理、蛋白质分子结构预测以及搜索推荐等多个领域均展现出显著的应用效果。 在因果推断任务中,在完成因果效应识别之后,需…

python算法和数据结构刷题[2]:链表、队列、栈

链表 链表的节点定义: class Node():def __init__(self,item,nextNone):self.itemitemself.nextNone 删除节点: 删除节点前的节点的next指针指向删除节点的后一个节点 添加节点: 单链表 class Node():"""单链表的结点&quo…

AJAX案例——图片上传个人信息操作

黑马程序员视频地址&#xff1a; AJAX-Day02-11.图片上传https://www.bilibili.com/video/BV1MN411y7pw?vd_source0a2d366696f87e241adc64419bf12cab&spm_id_from333.788.videopod.episodes&p26 图片上传 <!-- 文件选择元素 --><input type"file"…

deepseek大模型本机部署

2024年1月20日晚&#xff0c;中国DeepSeek发布了最新推理模型DeepSeek-R1&#xff0c;引发广泛关注。这款模型不仅在性能上与OpenAI的GPT-4相媲美&#xff0c;更以开源和创新训练方法&#xff0c;为AI发展带来了新的可能性。 本文讲解如何在本地部署deepseek r1模型。deepseek官…

使用 Ollama 和 Kibana 在本地为 RAG 测试 DeepSeek R1

作者&#xff1a;来自 Elastic Dave Erickson 及 Jakob Reiter 每个人都在谈论 DeepSeek R1&#xff0c;这是中国对冲基金 High-Flyer 的新大型语言模型。现在他们推出了一款功能强大、具有开放权重的思想链推理 LLM&#xff0c;这则新闻充满了对行业意味着什么的猜测。对于那些…

Greenplum临时表未清除导致库龄过高处理

1.问题 Greenplum集群segment后台日志报错 2.回收库龄 master上执行 vacuumdb -F -d cxy vacuumdb -F -d template1 vacuumdb -F -d rptdb 3.回收完成后检查 仍然发现segment还是有库龄报警警告信息发出 4.检查 4.1 在master上检查库年龄 SELECT datname, datfrozen…

栈和队列特别篇:栈和队列的经典算法问题

图均为手绘,代码基于vs2022实现 系列文章目录 数据结构初探: 顺序表 数据结构初探:链表之单链表篇 数据结构初探:链表之双向链表篇 链表特别篇:链表经典算法问题 数据结构:栈篇 数据结构:队列篇 文章目录 系列文章目录前言一.有效的括号(leetcode 20)二.用队列实现栈(leetcode…

记录一次,PyQT的报错,多线程Udp失效,使用工具如netstat来检查端口使用情况。

1.问题 报错Exception in thread Thread-1: Traceback (most recent call last): File "threading.py", line 932, in _bootstrap_inner File "threading.py", line 870, in run File "main.py", line 456, in udp_recv IndexError: list…

论文阅读(十):用可分解图模型模拟连锁不平衡

1.论文链接&#xff1a;Modeling Linkage Disequilibrium with Decomposable Graphical Models 摘要&#xff1a; 本章介绍了使用可分解的图形模型&#xff08;DGMs&#xff09;表示遗传数据&#xff0c;或连锁不平衡&#xff08;LD&#xff09;&#xff0c;各种下游应用程序之…

本地部署DeepSeek开源多模态大模型Janus-Pro-7B实操

本地部署DeepSeek开源多模态大模型Janus-Pro-7B实操 Janus-Pro-7B介绍 Janus-Pro-7B 是由 DeepSeek 开发的多模态 AI 模型&#xff0c;它在理解和生成方面取得了显著的进步。这意味着它不仅可以处理文本&#xff0c;还可以处理图像等其他模态的信息。 模型主要特点:Permalink…

从 UTC 日期时间字符串获取 Unix 时间戳:C 和 C++ 中的挑战与解决方案

在编程世界里&#xff0c;从 UTC 日期时间字符串获取 Unix 时间戳&#xff0c;看似简单&#xff0c;实则暗藏玄机。你以为输入一个像 “Fri, 17 Jan 2025 06:07:07” 这样的 UTC 时间&#xff0c;然后轻松得到 1737094027&#xff08;从 1970 年 1 月 1 日 00:00:00 UTC 开始经…

Linux——网络(tcp)

文章目录 目录 文章目录 前言 一、TCP逻辑 1. 面向连接 三次握手&#xff08;建立连接&#xff09; 四次挥手&#xff08;关闭连接&#xff09; 2. 可靠性 3. 流量控制 4. 拥塞控制 5. 基于字节流 6. 全双工通信 7. 状态机 8. TCP头部结构 9. TCP的应用场景 二、编写tcp代码函数…

51单片机(STC89C52)开发:点亮一个小灯

软件安装&#xff1a; 安装开发板CH340驱动。 安装KEILC51开发软件&#xff1a;C51V901.exe。 下载软件&#xff1a;PZ-ISP.exe 创建项目&#xff1a; 新建main.c 将main.c加入至项目中&#xff1a; main.c:点亮一个小灯 #include "reg52.h"sbit LED1P2^0; //P2的…

力扣116. 填充每个节点的下一个右侧节点指针

Problem: 116. 填充每个节点的下一个右侧节点指针 文章目录 题目描述思路复杂度Code 题目描述 思路 遍历思想(利用二叉树的先序遍历) 本题目的难点在于对于不同父节点的邻接问题因此我们可以抽象将两两节点为一组&#xff08;不同父节点的两个孩子节点也抽象为一组&#xff09…

k8s简介,k8s环境搭建

目录 K8s简介环境搭建和准备工作修改主机名&#xff08;所有节点&#xff09;配置静态IP&#xff08;所有节点&#xff09;关闭防火墙和seLinux&#xff0c;清除iptables规则&#xff08;所有节点&#xff09;关闭交换分区&#xff08;所有节点&#xff09;修改/etc/hosts文件&…

苯乙醇苷类化合物的从头生物合成-文献精读108

Complete pathway elucidation of echinacoside in Cistanche tubulosa and de novo biosynthesis of phenylethanoid glycosides 管花肉苁蓉中松果菊苷全生物合成途径解析及苯乙醇苷类化合物的从头生物合成 摘要 松果菊苷&#xff08;ECH&#xff09;是最具代表性的苯乙醇苷…

C++ 新特性实现 ThreadPool

序言 在之前我们实现过线程池&#xff0c;但是非常基础。答题思路就是实现一个安全的队列&#xff0c;再通过 ThreadPool 来管理队列和线程&#xff0c;对外提供一个接口放入需要执行的函数&#xff0c;但是这个函数是无参无返回值的。  参数的问题我们可以使用 bind 来封装&a…

网络攻防实战指北专栏讲解大纲与网络安全法

专栏 本专栏为网络攻防实战指北&#xff0c;大纲如下所示 进度&#xff1a;目前已更完准备篇、HTML基础 计划&#xff1a;所谓基础不牢&#xff0c;地动山摇。所以下一步将持续更新基础篇内容 讲解信息安全时&#xff0c;结合《中华人民共和国网络安全法》&#xff08;以下简…

计算机网络——流量控制

流量控制的基本方法是确保发送方不会以超过接收方处理能力的速度发送数据包。 通常的做法是接收方会向发送方提供某种反馈&#xff0c;如&#xff1a; &#xff08;1&#xff09;停止&等待 在任何时候只有一个数据包在传输&#xff0c;发送方发送一个数据包&#xff0c;…

知识库管理系统助力企业实现知识共享与创新价值的转型之道

内容概要 知识库管理系统&#xff08;KMS&#xff09;作为现代企业知识管理的重要组成部分&#xff0c;其定义涵盖了系统化捕捉、存储、共享和应用知识的过程。这类系统通过集成各种信息来源&#xff0c;不仅为员工提供了一个集中式的知识平台&#xff0c;还以其结构化的方式提…