Data Bricks Delta Lake 入门

Delta Lake 是一个开源存储层,它将关系数据库语义添加到基于 Spark 的数据湖处理中。 适用于 PySpark、Scala 和 .NET 代码的 Azure Synapse Analytics Spark , Azure DataBricks 都支持 Delta Lake。在大数据这个领域,对象存储的最影响效率的问题就是针对对象存储数据的更新,传统的对象存储如AWS 的S3 , Azure的 Blob等如果要更新要给对象数据的时候,必须要先将对象查找到并删除,然后再追加。这通常会导致性能效率低下。Delta Lake很高的的解决了对象数据更新的问题,并同时支持实时数据流的更新,主要功能如下:

  • 支持查询和数据修改的关系表。 使用 Delta Lake,可以将数据存储在支持 CRUD(创建、读取、更新和删除)操作的表中。 换句话说,可以采用与在关系数据库系统中相同的方式选择、插入、更新和删除数据行。
  • 支持 ACID 事务。 关系数据库旨在支持事务数据修改,这些修改提供原子性(事务作为单个工作单元完成)、一致性(事务使数据库保持一致状态)、隔离(进行中的事务不能相互干扰)和持久性(事务完成时,它所做的更改将保留)。 Delta Lake 通过实现事务日志并强制实施并发操作的可序列化隔离,为 Spark 提供相同的事务支持。
  • 数据版本控制和按时间顺序查看。 由于所有事务都记录在事务日志中,因此可以跟踪每个表行的多个版本,甚至使用按时间顺序查看功能在查询中检索某行的先前版本。
  • 支持批处理和流式处理数据。 虽然大多数关系数据库包括存储静态数据的表,但 Spark 包含通过 Spark 结构化流式处理 API 流式处理数据的本机支持。 Delta Lake 表可用作流式处理数据的接收器(目标)和源。
  • 标准格式和互操作性。 Delta Lake 表的基础数据以 Parquet 格式存储,该格式通常用于数据湖引入管道。

以下开始Delta的入门操作:

使用免费的Azure Data Bricks 的工作区,参加如下链接:

利用 Azure Data Bricks的免费资源学习云上大数据-CSDN博客

一、创建 Delta Lake 表 

1、从数据帧创建 Delta Lake 表

创建Ddelta Lake表,以增量格式保存数据帧,指定应存储表的数据文件和相关元数据信息的路径

下载试验用数据:

 %sh
 rm -r /dbfs/delta_lab
 mkdir /dbfs/delta_lab
 wget -O /dbfs/delta_lab/products.csv https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/products.csv

例如:使用现有文件中的数据加载数据帧,然后将该数据帧以增量格式保存到新文件夹位置:

# Load a file into a dataframe
df = spark.read.load('/delta_lab/products.csv', format='csv', header=True)

# Save the dataframe as a delta table
delta_table_path = "/delta/mydata"
df.write.format("delta").save(delta_table_path)

2、验证保存的Delta Lake数据文件

保存 delta 表后,指定的路径位置包括数据的 parquet 文件

%sh 
ls /dbfs/delta/mydata

执行后结果如下:

 可以使用覆盖模式将现有 Delta Lake 表替换为数据帧的内容,如下所示:

new_df.write.format("delta").mode("overwrite").save(delta_table_path)

还可以使用追加模式将数据帧中的行添加到现有表:

new_rows_df.write.format("delta").mode("append").save(delta_table_path)

二、根据条件进行表的更新

1、首先查看原始表的数据:

df1 = df.select("ProductName", "ListPrice").where((df["ProductName"]=="Road-750 Black, 58"))
display(df1)

执行结果:

2、执行数据更新语句

虽然可以在数据帧中进行数据修改,然后通过覆盖数据来替换 Delta Lake 表,但数据库中的一种更常见的模式是插入、更新或删除现有表中的行作为离散事务操作。 若要对 Delta Lake 表进行此类修改,可以使用 Delta Lake API 中的 DeltaTable 对象,该对象支持更新、删除和合并操作。 例如,可以使用以下代码更新 category 列值为“Accessories”的所有行的 price 列:

from delta.tables import *
from pyspark.sql.functions import *

# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "ProductName=='Road-750 Black, 58'",
    set = { "ListPrice": "ListPrice * 0.9" })

执行结果:

3、查看验证数据是否更新:

df2 = spark.read.load('/delta/mydata')
df3= df2.select("ProductName", "ListPrice").where((df2["ProductName"]=="Road-750 Black, 58"))
display(df3)

执行结果:

 三、查询表以前的版本

Delta Lake 表支持通过事务日志进行版本控制。 事务日志记录对表进行的修改,指出每个事务的时间戳和版本号。 可以使用此记录的版本数据查看表以前的版本 - 称为按时间顺序查看的功能。

可以通过将数据从 delta 表位置读取到数据帧中,将所需版本指定为 versionAsOf 选项,从 Delta Lake 表的特定版本检索数据:

df4 = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)

执行结果:

可以使用 timestampAsOf 选项指定时间戳:

df4 = spark.read.format("delta").option("timestampAsOf", '2024-01-16 09:36:23').load(delta_table_path)

查看历史变化

deltaTable.history(10).show(10, False, True)

 执行结果:

四、创建和查询目录表

下面将Delta Lake 表定义为 Spark 群集的 Hive 元存储中的目录表,并使用 SQL 来进行处理。

Spark 目录中的表(包括 Delta Lake 表)可以是托管或外部表(非托管表)

  • 托管表是在没有指定位置的情况下定义的,数据文件存储在元存储使用的存储中。 删除表不仅会从目录中删除其元数据,还删除存储其数据文件的文件夹。
  • 外部表指的是为自定义文件位置定义外部表,其中存储了表的数据。 表的元数据定义在 Spark 目录中。 删除表会从目录中删除元数据,但不会影响数据文件。

1、使用数据帧来创建目录表

可以使用 saveAsTable 操作写入数据帧来创建托管表和非托管表,如以下示例所示:

# 托管表 Save a dataframe as a managed table 
df.write.format("delta").saveAsTable("MyManagedTable")

# 非托管表 specify a path option to save as an external table
df.write.format("delta").option("path", "/mydata").saveAsTable("MyExternalTable")

 2、使用 SQL 创建目录表

可以使用含 USING DELTA 子句的 CREATE TABLE SQL 语句和用于外部表的可选 LOCATION 参数来创建目录表。 可以使用 SparkSQL API 运行语句,如以下示例所示:

使用Spark SQL 创建一个外部表

spark.sql("CREATE DATABASE AdventureWorks")
spark.sql("CREATE TABLE AdventureWorks.ProductsExternal USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("DESCRIBE EXTENDED AdventureWorks.ProductsExternal").show(truncate=False)

执行结果:

使用SQL直接查询

%sql
USE AdventureWorks;
SELECT * FROM ProductsExternal;

 执行结果: 

创建一个完全托管的表

df.write.format("delta").saveAsTable("AdventureWorks.ProductsManaged")
spark.sql("DESCRIBE EXTENDED AdventureWorks.ProductsManaged").show(truncate=False)

 

 查询该表:

%sql
USE AdventureWorks;
SELECT * FROM ProductsManaged;

执行结果:

3、比较外部表和托管表的差别

执行如下语句:

%sql
USE AdventureWorks;
SHOW TABLES;

结果如下: 

执行下面语句来看两个表所在的不同位置:

 %sh
 echo "External table:"
 ls /dbfs/delta/mydata
 echo
 echo "Managed table:"
 ls /dbfs/user/hive/warehouse/adventureworks.db/productsmanaged

执行结果:

执行下面命令将表删除,看有何不同

%sql
USE AdventureWorks;
DROP TABLE IF EXISTS ProductsExternal;
DROP TABLE IF EXISTS ProductsManaged;
SHOW TABLES;
 %sh
 echo "External table:"
 ls /dbfs/delta/mydata
 echo
 echo "Managed table:"
 ls /dbfs/user/hive/warehouse/adventureworks.db/productsmanaged

 执行结果如下:Managed Table的数据文件已经不在,外部表的数据文件还在

五、使用 Delta Lake 对数据进行流式处理 

假设我们是一个IoT设备的流式数据,数据结构如下:

 1、下载JSON格式的流式数据文件

 %sh
 rm -r /dbfs/device_stream
 mkdir /dbfs/device_stream
 wget -O /dbfs/device_stream/devices1.json https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/devices1.json

 执行结果如下:

下面基于JSON文件所在的文件夹创建iotstream,如下命令: 

from pyspark.sql.types import *
from pyspark.sql.functions import *
   
# Create a stream that reads data from the folder, using a JSON schema
inputPath = '/device_stream/'
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
print("Source stream created...")

执行结果:

 

 将数据流写入delta表

# Write the stream to a delta table
delta_stream_table_path = '/delta/iotdevicedata'
checkpointpath = '/delta/checkpoint'
deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)
print("Streaming to delta sink...")

如下图:数据作业已经启动,开始实时写入 

 

读取实时表中的数据:

# Read the data in delta format into a dataframe
df = spark.read.format("delta").load(delta_stream_table_path)
display(df)

执行结果: 9条记录

 基于这个stream创建一个目录表

# create a catalog table based on the streaming sink
spark.sql("CREATE TABLE IotDeviceData USING DELTA LOCATION '{0}'".format(delta_stream_table_path))

 查询这张表:

%sql
SELECT * FROM IotDeviceData;

执行结果 如下:9条记录 

 

执行以下语句刷新Iot数据到Stream

 %sh
 wget -O /dbfs/device_stream/devices2.json https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/devices2.json

 执行结果:

 查看数据是否已经更新到表中,执行如下语句:

%sql
SELECT * FROM IotDeviceData;

执行结果如下:从9条增加到16条 

 

 执行如下语句:停止Steam

deltastream.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/332633.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【C语言】linux内核ipoib模块 - ipoib_start_xmit

一、ipoib_start_xmit函数定义 static netdev_tx_t ipoib_start_xmit(struct sk_buff *skb, struct net_device *dev) {struct ipoib_dev_priv *priv ipoib_priv(dev);struct rdma_netdev *rn netdev_priv(dev);struct ipoib_neigh *neigh;struct ipoib_pseudo_header *phdr…

【保姆级教程|YOLOv8改进】【3】使用FasterBlock替换C2f中的Bottleneck

《博主简介》 小伙伴们好,我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源,可关注公-仲-hao:【阿旭算法与机器学习】,共同学习交流~ 👍感谢小伙伴们点赞、关注! 《------往期经典推…

代码、课程、教学的一些思考-2024

1 代码、算法、艺术品 1.1 代码 最典型的C代码示例。 以下是一个简单的C代码示例&#xff0c;它打印出“Hello, World!”&#xff1a; #include <iostream> int main() { std::cout << "Hello, World!"; return 0; } 这段代码定义了一个程序&a…

鲁大师2023年牛角尖颁奖盛典落幕,顶尖产品之间的又一次碰撞

1月18日&#xff0c;鲁大师2023年度牛角尖颁奖典礼在四川省内江市威远县船石湖豪生温泉度假酒店完美落幕。 本届鲁大师牛角尖颁奖盛典举办地选在了威远县可谓是深有其意&#xff0c;其名称的由来最早可追溯到隋朝&#xff0c;取“威名远震”之意。而这也与鲁大师牛角尖奖项的设…

Apache安全及优化

配置第一台虚拟机 VM1网卡 yum仓库 挂载磁盘 上传3个软件包到/目录 到/目录下进行解压缩 tar xf apr-1.6.2.tar.gz tar xf apr-util-1.6.0.tar.gz tar -xjf httpd-2.4.29.tar.bz2 mv apr-1.6.2 httpd-2.4.29/srclib/apr mv apr-util-1.6…

基于Springboot+vue鲜花商城系统(前后端分离)

该项目完全免费 项目技术栈&#xff1a; 前端&#xff1a;vueelementUIecharts 后端&#xff1a;SpringbootmybatisMySQL 项目主要功能&#xff1a; 商品信息 商品分类 角色管理 公告管理 轮播图管理 订单管理 收货地址管理 日志管理 部分功能截图&#xff1a;

大数据工作岗位分析

前言&#xff1a;随着大数据需求的增多&#xff0c;许多中小公司和团队也新增或扩展了大数据工作岗位&#xff1b;但是却对大数据要做什么和能做什么&#xff0c;没有深入的认识&#xff1b;往往是招了大数据岗位&#xff0c;搭建起基础能力后&#xff0c;就一直处于重复开发和…

【Linux】

Linux零基础入门 列出文件/文件夹新建/切换路径查看当前路径重命名或者移动文件夹拷贝文件/文件夹删除文件夹设置环境变量编辑文本文件压缩和解压查看cpu的信息查看/杀死进程查看进程的CPU和内存占用重定向日志场景一场景二场景三场景四 列出文件/文件夹 命令&#xff1a;Ls(L…

2017年认证杯SPSSPRO杯数学建模A题(第一阶段)安全的后视镜全过程文档及程序

2017年认证杯SPSSPRO杯数学建模 A题 安全的后视镜 原题再现&#xff1a; 汽车后视镜的视野对行车安全非常重要。一般来说&#xff0c;汽车的后视镜需要有良好的视野范围&#xff0c;以便驾驶员能够全面地了解车后方的道路情况。同时&#xff0c;后视镜也要使图像的畸变尽可能…

shopee选品案例分析:如何在Shopee平台上进行选品并取得成功

在Shopee平台上进行选品是卖家们开设店铺的重要步骤之一。通过分析成功案例&#xff0c;卖家们可以获取灵感和策略&#xff0c;从而更好地进行选品。本文将以一个女装店铺为例&#xff0c;介绍如何在Shopee平台上进行选品并取得成功。 先给大家推荐一款shopee知虾数据运营工具…

人工智能之卷积神经网络(CNN)

前言&#xff1a;今天我们重点探讨一下卷积神经网络(CNN)算法。 _ 20世纪60年代&#xff0c;Hubel和Wiesel在研究猫脑皮层中用于局部敏感和方向选择的神经元时发现其独特的网络结构可以有效地降低反馈神经网络的复杂性&#xff0c;继而提出了卷积神经网络CNN&#xff08;Convo…

详解IP安全:IPSec协议簇 | AH协议 | ESP协议 | IKE协议_ipsec esp

目录 IP安全概述 IPSec协议簇 IPSec的实现方式 AH&#xff08;Authentication Header&#xff0c;认证头&#xff09; ESP&#xff08;Encapsulating Security Payload&#xff0c;封装安全载荷&#xff09; IKE&#xff08;Internet Key Exchange&#xff0c;因特网密钥…

分布式文件系统协议:NFS(Network File System)网络文件系统

文章目录 NFS名词解释NFS的历史版本NFS支持的操作系统NFS工作原理NFS使用的端口NFS的认证机制NFS的优点NFS使用场景推荐阅读 NFS名词解释 NFS&#xff08;Network File System&#xff09;网络文件系统是一种分布式文件系统协议&#xff0c;最初由Sun Microsystems开发&#x…

Vue中的日历组件 Calendar 实现 考勤打卡记录

日历组件 Calendar 可以自定义在页面添加内容。 实现效果图 1.由于Calendar没有右上角月份切换的API事件&#xff0c;可以给组件源码添加自定义添加一个事件 2.也可以通过自带的input事件来获取日历 3.vue页面完整代码 注释&#xff1a;this.$m(this.beginTime).format(…

揭秘程序栈:你的代码在幕后是怎么运行的?

计算机科学中&#xff0c;许多概念和原理可能会让开发者感到头疼&#xff0c;比如程序栈。这个看似晦涩的概念&#xff0c;实对我们理解程序运行至关重要。本文将以通俗易懂的方式&#xff0c;带你深入理解程序栈的工作原理和优化策略。 一、为什么需要栈&#xff1f; 栈是一…

Jupyter-Notebook无法创建ipynb文件

文章目录 概述排查问题恢复方法参考资料 概述 用户反馈在 Notebook 上无法创建 ipynb 文件&#xff0c;并且会返回以下的错误。 报错的信息是: Unexpected error while saving file: Untitled5.ipynb attempt to write a readonly database 排查问题 这个是一个比较新的问…

保姆版Vps安装灯塔(ARL)

因为灯塔的默认端口为5003&#xff0c;所以我们在安装之前就在防火墙里把我们的5003端口打开 打开端口步骤如下&#xff1a; 1.我们打开控制面板&#xff0c;在控制面板里点击 系统和安全 。如下图&#xff1a; 2.接着点击 Windows Defender防火墙,如下图&#xff1a; 3.再…

IPhone、IPad、安卓手机、平板以及鸿蒙系统使用惠普无线打印教程

演示机型&#xff1a;惠普M281fdw&#xff0c;测试可行机型&#xff1a;惠普M277&#xff0c;惠普M452、惠普M283 点击右上角图标。 点击WI-FI Direct 开&#xff0c;(如果WI-FI Direct关闭&#xff0c;请打开&#xff01;) 记录打印机的wifi名称(SSID)和密码。 打开IPhone、I…

kotlin Kmp多平台模板生成

地址: Kotlin Multiplatform Wizard | JetBrains 可生成kotlin多个平台模板 https://terrakok.github.io/Compose-Multiplatform-Wizard/

冻结Prompt微调LM: PET(b) LM-BFF

PET-TC(B) paper b: 2020.9 It’s not just size that matters: Small language models are also few-shot learners. Prompt&#xff1a; 多字完形填空式人工Prompt Task&#xff1a;Text Classification Model: Albert-xxlarge-v2 Take Away: 支持多字的完形填空Prompt&a…