基于AWS Serverless的Glue服务进行ETL(提取、转换和加载)数据分析(二)——数据清洗、转换

2 数据清洗、转换

此实验使用S3作为数据源

ETL:

E    extract         输入
T    transform     转换
L    load             输出

大纲

  • 2 数据清洗、转换
    • 2.1 架构图
    • 2.2 数据清洗
    • 2.3 编辑脚本
      • 2.3.1 连接数据源(s3)
      • 2.3.2. 数据结构转换
      • 2.3.2 数据结构拆分、定义
      • 2.3.3 清洗后的数据写入新s3
      • 2.3.4 运行作业
    • 2.4 数据分区
      • 2.4.1 编辑脚本
      • 2.4.2 运行脚本
    • 2.5 总结

2.1 架构图

在这里插入图片描述

2.2 数据清洗

此步会将S3中的原始数据清洗成我们想要的自定义结构的数据。之后,我们可通过APIGateway+Lambda+Athena来实现一个无服务器的数据分析服务。

步骤图例
1、入口在这里插入图片描述
2、创建Job(s3作为数据源,则Type选择Spark,若为Kinesis等,选择Stream Spark)在这里插入图片描述
3、IAM角色需要有s3与Glue的权限在这里插入图片描述
4、选择s3脚本位置,若已经完成脚本的编写工作,则可以选择第二项或第三项,若无则Glue会提供默认脚本在这里插入图片描述
5、安全配置参数在这里插入图片描述建议:添加参数–enable-auto-scaling为true。每次在我们执行Job任务时,会根据运行 ETL 任务的数据处理单元(DPU)的个数来分配动态IP,在我们子网的动态IP数低于DPU数时,Job将会执行失败。此参数将会动态分配IP。
6、数据源()在这里插入图片描述
7、数据目标(我们会将清洗后的数据存储到新的s3桶)在这里插入图片描述
8、设计架构(在本案例中,我们会自定义脚本。所以不再在此处设计架构)(此处设计后,脚本会自动生成相关代码)在这里插入图片描述
9、保存在这里插入图片描述

2.3 编辑脚本

脚本中的args参数的键值需要从Job的安全配置参数中定义

2.3.1 连接数据源(s3)

#数据源
datasource = glueContext.create_dynamic_frame.from_catalog(database = args['db_name'], table_name = tableName, transformation_ctx = "datasource")

2.3.2. 数据结构转换

mapped_readings = ApplyMapping.apply(frame = datasource, mappings = [("lclid", "string", "meter_id", "string"), \
                                                                     ("datetime", "string", "reading_time", "string"), \
                                                                     ("KWH/hh (per half hour)", "double", "reading_value", "double")], \
                                     transformation_ctx = "mapped_readings")

2.3.2 数据结构拆分、定义

mapped_readings_df = DynamicFrame.toDF(mapped_readings)

mapped_readings_df = mapped_readings_df.withColumn("obis_code", lit(""))
mapped_readings_df = mapped_readings_df.withColumn("reading_type", lit("INT"))

reading_time = to_timestamp(col("reading_time"), "yyyy-MM-dd HH:mm:ss")
mapped_readings_df = mapped_readings_df \
    .withColumn("week_of_year", weekofyear(reading_time)) \
    .withColumn("date_str", regexp_replace(col("reading_time").substr(1,10), "-", "")) \
    .withColumn("day_of_month", dayofmonth(reading_time)) \
    .withColumn("month", month(reading_time)) \
    .withColumn("year", year(reading_time)) \
    .withColumn("hour", hour(reading_time)) \
    .withColumn("minute", minute(reading_time)) \
    .withColumn("reading_date_time", reading_time) \
    .drop("reading_time")

2.3.3 清洗后的数据写入新s3

# write data to S3
filteredMeterReads = DynamicFrame.fromDF(mapped_readings_df, glueContext, "filteredMeterReads")

s3_clean_path = "s3://" + args['clean_data_bucket']

glueContext.write_dynamic_frame.from_options(
    frame = filteredMeterReads,
    connection_type = "s3",
    connection_options = {"path": s3_clean_path},
    format = "parquet",
    transformation_ctx = "s3CleanDatasink")

2.3.4 运行作业

    执行成功后,状态将变为"SUCCESS",失败将会给出失败信息,可在CloudWatch 中查看详情

在这里插入图片描述

在这里插入图片描述


清洗后的数据保存到了s3


在这里插入图片描述
数据清洗完毕后,可通过上一篇中的爬网程序步骤,将清洗后的数据的结构创建表到数据目录中,
此时我们可以使用Athena对清洗后的数据进行分析。

2.4 数据分区

接下来我们对数据进行分区处理(此处只提供了按天分区
重新进行数据清洗中的创建Job操作后,重写脚本

2.4.1 编辑脚本

连接数据源。表为上一步最后重新爬取生成的新表。

cleanedMeterDataSource = glueContext.create_dynamic_frame.from_catalog(database = args['db_name'], table_name = tableName, transformation_ctx = "cleanedMeterDataSource")

根据type与data_str分区

business_zone_bucket_path_daily = "s3://{}/daily".format(args['business_zone_bucket'])

businessZone = glueContext.write_dynamic_frame.from_options(frame = cleanedMeterDataSource, \
    connection_type = "s3", \
    connection_options = {"path": business_zone_bucket_path_daily, "partitionKeys": ["reading_type", "date_str"]},\
    format = "parquet", \
    transformation_ctx = "businessZone")

2.4.2 运行脚本

分区后的数据结果:
在这里插入图片描述
再次创建、运行爬网程序,将会在数据目录中生成新的分区表。

2.5 总结

到这一步,我们已经使用Glue ETL对s3桶中的数据进行了清洗、分区操作。在进行上篇中的Athena操作后,我们已经可以通过Athena直接查询到清洗、分区后的数据集了。
接下来,我们会通过使用APIGateway+Lambda+Athena来构建一个无服务器的数据查询分析服务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/218551.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CSS-200个小案例(一)

文章目录 1.Simple Parallax Scrolling Effect(简单的视差滚动效果)2.Fullscreen Video Background(全屏视频背景)3.Transform Effects on Scroll(滚动时产生的变换效果)4.Fullscreen Overlay Responsive Navigation M…

今日实施|解读新国标对数据库审计的能力要求

数据库审计是数据安全建设不可或缺的技术工具之一,无论是国家级的法律或标准,还是等保以及行业级的安全标准均对使用数据库审计有明确要求。据相关数据统计显示,数据库审计产品的市场需求已占据中国数据库安全市场容量的6成以上。 12月1日&am…

身份统一管理创新与优化 ——华为云OneAccess应用身份管理服务的2023年

2023年,随着云计算、物联网、人工智能等技术的快速发展,企业面临着数字化转型的巨大挑战与机遇。身份统一管理是企业数字化转型的基础,也是业务发展的关键。如何高效、安全、灵活地实现身份统一管理,成为企业亟待解决的首要课题。…

如何将四元数转换为旋转矩阵

什么是四元数? 四元数是表示物体在三维空间中的方向和旋转的几种数学方法之一。另一种方法是使用基于欧拉角的旋转矩阵,即滚动、俯仰和偏航,就像的封面图片。 通常使用四元数代替欧拉角旋转矩阵,因为“与 旋转矩阵相比 &#xff…

使用Python Flask搭建Web问答应用程序并发布到公网远程访问

使用Python Flask搭建web问答应用程序框架,并发布到公网上访问 文章目录 使用Python Flask搭建web问答应用程序框架,并发布到公网上访问前言1. 安装部署Flask并制作SayHello问答界面2. 安装Cpolar内网穿透3. 配置Flask的问答界面公网访问地址4. 公网远程…

数字化时代的保镖:实人认证API在身份验证中的角色

前言 随着数字化时代的迅猛发展,个人信息的安全性和隐私保护成为了当今社会中备受关注的话题。在这个背景下,实人认证API崭露头角,成为数字领域中的一项重要技术,为身份验证提供了全新的保障机制。本文将探讨实人认证API在身份验…

实现用户登陆

输入用户名和密码,如果输入用户名和密码正确,允许登录 编程过程中采用字符串拉接。 SQL注入,当使用拼接的sql语句. 输入密码时把语句拼接成or,or后面跟上一个条件正确的式子。 Java 防止sql注入,预编译手段&#xff…

网上下载的pdf文件,为什么不能复制文字?

不知道大家有没有到过这种情况?在网上下载的PDF文件打开之后,发现选中文字之后无法复制。甚至其他功能也都无法使用,这是怎么回事?该怎么办? 当我们发现文件打开之后,编辑功能无法使用,很可能是…

【教3妹学编程-算法题】到达首都的最少油耗

3妹:“太阳当空照,花儿对我笑,小鸟说早早早,你为什么背上炸药包” 2哥 :3妹,什么事呀这么开发。 3妹:2哥你看今天的天气多好啊,阳光明媚、万里无云、秋高气爽,适合秋游。 2哥&#x…

距离传感器VL6180V1NR/1

参考模块 【优信电子】VL6180X近距离感测器 环境光线传感器 手势识别-淘宝网 (taobao.com)https://item.taobao.com/item.htm?spma21n57.1.0.0.13f7523csXwKYp&id584789574087&ns1&abbucket6#detail检测原理 系统框图 价格参考 电路连接 怎样快速生成距离传感器曲…

STM32上模拟CH340芯片的功能 (一)

#虚拟串口模拟CH340# 后续代码更新放gitee 上 一、思路 1. 确定通信接口:CH340是一款USB转串口芯片,因此您需要选择STM32上的某个USB接口来实现USB通信。通常情况下,STM32系列芯片都有内置的USB接口,您可以根据您的具体型号选择…

数据库——索引的数据结构

在数据库中引入索引可以提高查询速率,那么为什么引入索引可以提高查询速率呢,其底层组织数据的数据结构又是什么呢?接下来,一起来探讨索引的数据结构吧!!! 在介绍数据库索引数据库前&#xff0…

适用于 Windows 的最佳(免费/付费)数据恢复软件

借助最佳数据恢复工具从 Windows PC 恢复丢失和删除的数据 您是否正在寻找一种巧妙的方法来从计算机中取消删除或恢复已删除的文件?如果是,那么这篇文章就是为您准备的!在本教程中,我们整理了一份全面的数据恢复软件列表&#xf…

[BJDCTF2020]ZJCTF,不过如此1

提示 伪协议的各种应用 首先我们来一步一步分析 首先要判断text是否传入参数,并且将传入的text以只读的方式打开要绝对等于I have a dream才会进入下一步 这里需要用到伪协议data://text/plain或者php://input都可以 最终要利用到这个include包含函数 这里提示了…

论文解读:Axial-DeepLab: Stand-Alone Axial-Attention forPanoptic Segmentation

论文是一个分割任务,但这里的方法不局限于分割,运用到检测、分类都可以。 论文下载 https://www.yuque.com/yuqueyonghupjh9oc/ovceh4/onilw42ux6e9n1ne?singleDoc# 《轴注意力机制》 一个问题 为什么transformer一开始都有CNN:降低H、W…

Open3D 最小二乘拟合空间直线(方法二)

目录 一、算法原理1、算法过程2、参考文献二、代码实现三、结果展示四、相关链接本文由CSDN点云侠原创,原文链接。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的爬虫与GPT。 一、算法原理

分享70个节日PPT,总有一款适合您

分享70个节日PPT,总有一款适合您 70个节日PPT下载链接:https://pan.baidu.com/s/1IRIKuFoGjQJ14OVkeW_mDQ?pwd6666 提取码:6666 Python采集代码下载链接:采集代码.zip - 蓝奏云 学习知识费力气,收集整理更不易…

Python GUI 新手入门教程:轻松构建图形用户界面

概要 Python 凭借其简单性和多功能性,已经成为最流行的编程语言之一。被广泛应用于从 web 开发到数据科学的各个领域。 在本教程中,我们将探索用于创建图形用户界面(GUIs)的 Python 内置库: Tkinter:无论…

「Go框架」gin框架是如何处理panic的?

本文我们介绍下recover在gin框架中的应用。 首先,在golang中,如果在子协程中遇到了panic,那么主协程也会被终止。如下: package mainimport ("github.com/gin-gonic/gin" )func main() {r : gin.Default()// 在子协程中…

20:kotlin 类和对象 --泛型(Generics)

类可以有类型参数 class Box<T>(t: T) {var value t }要创建类实例&#xff0c;需提供类型参数 val box: Box<Int> Box<Int>(1)如果类型可以被推断出来&#xff0c;可以省略 val box Box(1)通配符 在JAVA泛型中有通配符?、? extends E、? super E&…