Delta lake with Java--数据增删改查

之前写的关于spark sql 操作delta lake表的,总觉得有点混乱,今天用Java以真实的数据来进行一次数据的CRUD操作,所涉及的数据来源于Delta lake up and running配套的 GitGitHub - benniehaelen/delta-lake-up-and-running: Companion repository for the book 'Delta Lake Up and Running'

要实现的效果是新建表,导入数据,然后对表进行增删改查操作,具体代码如下:

package detal.lake.java;

import io.delta.tables.DeltaTable;
import org.apache.spark.sql.SparkSession;

import java.text.SimpleDateFormat;
import io.delta.tables.DeltaTable;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.HashMap;

public class DeltaLakeCURD {

    //将字符串转换成java.sql.Timestamp
    public static java.sql.Timestamp strToSqlDate(String strDate, String dateFormat) {
        SimpleDateFormat sf = new SimpleDateFormat(dateFormat);
        java.util.Date date = null;
        try {
            date = sf.parse(strDate);
        } catch (Exception e) {
            e.printStackTrace();
        }
        java.sql.Timestamp dateSQL = new java.sql.Timestamp(date.getTime());
        return dateSQL;
    }

    public static void main(String[] args) {

        SparkSession spark = SparkSession.builder()
                .master("local[*]")
                .appName("delta_lake")
                .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
                .config("spark.databricks.delta.autoCompact.enabled", "true")
                .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
                .getOrCreate();


        SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        String savePath="file:///D:\\\\bigdata\\\\detla-lake-with-java\\\\YellowTaxi";
        String csvPath="D:\\bookcode\\delta-lake-up-and-running-main\\data\\YellowTaxisLargeAppend.csv";
        String tableName = "taxidb.YellowTaxis";

        spark.sql("CREATE DATABASE IF NOT EXISTS taxidb");

        //定义表
        DeltaTable.createIfNotExists(spark)
                .tableName(tableName)
                .addColumn("RideId","INT")
                .addColumn("VendorId","INT")
                .addColumn("PickupTime","TIMESTAMP")
                .addColumn("DropTime","TIMESTAMP")
                .location(savePath)
                .execute();


        //加载csv数据并导入delta表
        var df=spark.read().format("delta").table(tableName);
        var schema=df.schema();
        System.out.println(schema.simpleString());
        var df_for_append=spark.read().option("header","true").schema(schema).csv(csvPath);
        System.out.println("记录总行数:"+df_for_append.count());
        System.out.println("导入数据,开始时间"+  sdf.format(new Date()));
        df_for_append.write().format("delta").mode(SaveMode.Overwrite).saveAsTable(tableName);
        System.out.println("导入数据,结束时间" + sdf.format(new Date()));


        DeltaTable deltaTable = DeltaTable.forName(spark,tableName);


        //插入数据
        List<Row> list = new ArrayList<Row>();
        list.add(RowFactory.create(-1,-1,strToSqlDate("2023-01-01 10:00:00","yyyy-MM-dd HH:mm:ss"),strToSqlDate("2023-01-01 10:00:00","yyyy-MM-dd HH:mm:ss")));
        List<StructField> structFields = new ArrayList<>();
        structFields.add(DataTypes.createStructField("RideId", DataTypes.IntegerType, true));
        structFields.add(DataTypes.createStructField("VendorId", DataTypes.IntegerType, true));
        structFields.add(DataTypes.createStructField("PickupTime", DataTypes.TimestampType, true));
        structFields.add(DataTypes.createStructField("DropTime", DataTypes.TimestampType, true));
        StructType structType = DataTypes.createStructType(structFields);
        var yellowTaxipDF=spark.createDataFrame(list,structType); //建立需要新增数据并转换成dataframe

        System.out.println("插入数据,开始时间"+  sdf.format(new Date()));
        yellowTaxipDF.write().format("delta").mode(SaveMode.Append).saveAsTable(tableName);
        System.out.println("插入数据,结束时间"+  sdf.format(new Date()));
        System.out.println("插入后数据");
        deltaTable.toDF().select("*").where("RideId=-1").show(false);


        //更新数据
        System.out.println("更新前数据");
        deltaTable.toDF().select("*").where("RideId=999994").show(false);
        System.out.println("更新数据,开始时间"+  sdf.format(new Date()));
        deltaTable.updateExpr(
                "RideId = 999994",
                new HashMap<String, String>() {{
                    put("VendorId", "250");
                }}
        );
        System.out.println("更新数据,结束时间"+  sdf.format(new Date()));
        System.out.println("更新后数据");
        deltaTable.toDF().select("*").where("RideId=999994").show(false);


        //查询数据
        System.out.println("查询数据,开始时间"+  sdf.format(new Date()));
        var selectDf= deltaTable.toDF().select("*").where("RideId=1");
        selectDf.show(false);
        System.out.println("查询数据,结束时间" + sdf.format(new Date()));


        //删除数据
        System.out.println("删除数据,开始时间"+  sdf.format(new Date()));
        deltaTable.delete("RideId=1");
        System.out.println("删除数据,结束时间"+  sdf.format(new Date()));
        deltaTable.toDF().select("*").where("RideId=1").show(false);

    }
}

里面涉及spark的TimestampType类型,如何将字符串输入到TimestampType列,找了几个小时才找到答案,具体参考了如下连接,原来直接将string转成java.sql.Timestamp即可,于是在网上找了一个方法,实现了转换,转换代码非原创,也是借鉴其他大牛的。

scala - How to create TimestampType column in spark from string - Stack Overflow

最后运行结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/590317.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

软件无线电系列——信道编译码

微信公众号上线&#xff0c;搜索公众号小灰灰的FPGA,关注可获取相关源码&#xff0c;定期更新有关FPGA的项目以及开源项目源码&#xff0c;包括但不限于各类检测芯片驱动、低速接口驱动、高速接口驱动、数据信号处理、图像处理以及AXI总线等 本节目录 一、信道编译码 1、数字…

开源的贴吧数据查询工具

贴吧数据查询工具 这是一个贴吧数据查询工具&#xff0c;目前仍处于开发阶段。 本地运行 要本地部署这个项目&#xff0c;请 克隆这个仓库并前往项目目录 git clone https://github.com/Dilettante258/tieba-tools.git cd tieba-tools安装依赖 pnpm install运行项目 np…

服务器数据恢复—异常断电导致RAID模块故障的数据恢复案例

服务器数据恢复环境&#xff1a; 某品牌ProLiant DL380系列服务器&#xff0c;服务器中有一组由6块SAS硬盘组建的RAID5阵列&#xff0c;WINDOWS SERVER操作系统&#xff0c;作为企业内部文件服务器使用。 服务器故障&#xff1a; 机房供电几次意外中断&#xff0c;服务器出现故…

RMQ从入门到精通

一.概述与安装 //RabbitMQ //1.核心部分-高级部分-集群部分 //2.什么是MQ 消息队列message queue 先入先出原则;消息通信服务 //3.MQ的大三功能 流量消峰 应用解耦 消息中间件 //&#xff08;1&#xff09;人-订单系统(1万次/S)—> 人 - MQ(流量消峰,对访问人员进行排队) -…

Java 【数据结构】常见排序算法实用详解(上) 插入排序/希尔排序/选择排序/堆排序【贤者的庇护】

登神长阶 上古神器-常见排序算法 插入排序/选择排序/堆排序 &#x1f4d4; 一.排序算法 &#x1f4d5;1.排序的概念 排序 &#xff1a;所谓排序&#xff0c;就是使一串记录&#xff0c;按照其中的某个或某些关键字的大小&#xff0c;递增或递减的排列起来的操作。 稳定性&a…

【Python】函数设计

1.联系函数的设计 2.找质数 3.找因子 4.判断水仙花数 5.斐波拉契数列递归调用&#xff0c;并用数组存储已计算过的数&#xff0c;减少重复计算 1、计算利息和本息 编写两个函数分别按单利和复利计算利息,根据本金、年利率、存款年限得到本息和和利息。调用这两个函数计算1…

学习Rust的第22天:mini_grep第2部分

书接上文&#xff0c;在本文中&#xff0c;我们学习了如何通过将 Rust 程序的逻辑移至单独的库箱中并采用测试驱动开发 (TDD) 实践来重构 Rust 程序。通过在实现功能之前编写测试&#xff0c;我们确保了代码的可靠性。我们涵盖了基本的 Rust 概念&#xff0c;例如错误处理、环境…

【linux-汇编-点灯之思路-程序】

目录 1. ARM汇编中的一些注意事项2. IMXULL汇编点灯的前序&#xff1a;3. IMXULL汇编点灯之确定引脚&#xff1a;4. IMXULL汇编点灯之引脚功能编写&#xff1a;4.1 第一步&#xff0c;开时钟4.2 第二步&#xff0c;定功能&#xff08;MUX&#xff09;4.3 第三步&#xff0c;定电…

【笔试训练】day17

1.小乐乐该数字 遇到按位处理的情况可以考虑用字符串去读 代码&#xff1a; #define _CRT_SECURE_NO_WARNINGS 1 #include <iostream> #include<string> using namespace std;int main() {string str;cin >> str;int ans 0;for (int i 0; i < str.siz…

JavaEE 初阶篇-深入了解 Junit 单元测试框架和 Java 中的反射机制(使用反射做一个简易版框架)

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 Junit 单元测试框架概述 1.1 使用 Junit 框架进行测试业务代码 1.2 Junit 单元测试框架的常用注解&#xff08;Junit 4.xxx 版本&#xff09; 2.0 反射概述 2.1 获…

神经网络中的优化方法

一、引入 在传统的梯度下降优化算法中&#xff0c;如果碰到平缓区域&#xff0c;梯度值较小&#xff0c;参数优化变慢 &#xff0c;遇到鞍点&#xff08;是指在某些方向上梯度为零而在其他方向上梯度非零的点。&#xff09;&#xff0c;梯度为 0&#xff0c;参数无法优化&…

机器人系统ros2-开发实践04-ROS2 中 tf2的定义及示例说明

1. what ros2 tf2 &#xff1f; tf2的全称是transform2&#xff0c;在ROS&#xff08;Robot Operating System&#xff09;中&#xff0c;它是专门用于处理和变换不同坐标系间位置和方向的库。这个名字来源于“transform”这个词&#xff0c;表示坐标变换&#xff0c;而“2”则…

【Leetcode每日一题】 动态规划 - 简单多状态 dp 问题 - 删除并获得点数(难度⭐⭐)(70)

1. 题目解析 题目链接&#xff1a;740. 删除并获得点数 这个问题的理解其实相当简单&#xff0c;只需看一下示例&#xff0c;基本就能明白其含义了。 2.算法原理 问题分析 本题是「打家劫舍」问题的变种&#xff0c;但核心逻辑依然保持一致。题目要求从给定的数组nums中选择…

C++ stack和queue的使用方法与模拟实现

文章目录 一、 stack的使用方法二、 queue的使用方法三、 容器适配器四、 stack的模拟实现五、 queue的模拟实现 一、 stack的使用方法 stack介绍文档 stack是一种容器适配器&#xff0c;专门用在具有后进先出操作的上下文环境中&#xff0c;其删除只能从容器的一端进行元素的…

Windows如何通过wsl2迅速启动Docker desktop的PHP的Hyperf项目容器?

一、安装WSL 什么是WSL&#xff1f; 官网&#xff1a;什么是WSL&#xff1f; Windows Subsystem for Linux (WSL) 是一个在Windows 10和Windows 11上运行原生Linux二进制可执行文件的兼容性层。 换句话说&#xff0c;WSL让你可以在Windows系统上运行Linux环境&#xff0c;而无需…

【linux】unzip解压乱码或者报错处理办法

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全…

2023-2024年数字化转型报告/方案合集(精选198份)

数字化转型报告/方案&#xff08;精选198份&#xff09; 2023-2024年 来源&#xff1a;2023-2024年数字化转型报告/方案合集&#xff08;精选198份&#xff09; 【以下是资料目录】 2023-2024年度医药健康行业数字化调研报告 2023-2024中国财务数字化报告 2023⻝品饮料行业…

Redis运维篇-快速面试笔记(速成版)

文章目录 1. Redis的持久化1.1 RDB&#xff08;快照模式&#xff09;1.2 AOF 模式 2. Redis主从模型&#xff08;高可用&#xff09;2.1 Redis的主从复制2.2 Redis拓扑结构 3. Redis集群模式&#xff08;高并发&#xff09;3.1 Redis的Slots3.2 集群模式的常用命令3.3 多主多从…

使用 MediaMTX 和 FFmpeg 推拉 RTSP 流媒体

实时流传输协议 RTSP&#xff08;Real-Time Streaming Protocol&#xff09;是 TCP/IP 协议体系中的一个应用层协议&#xff0c;由哥伦比亚大学、网景和 RealNetworks 公司提交的 IETF RFC 标准。该协议定义了一对多应用程序如何有效地通过 IP 网络传送多媒体数据。RTSP 在体系…

Unity射击游戏开发教程:(8)构建 UI 元素:添加分数显示

用户界面决定用户如何与屏幕交互。UI 适用于所有类型的游戏和应用程序,在此示例中,我们将为我的太空射击游戏设置一个简单的记分板。 第一步是在层次结构中创建一个 UI 元素。只需在层次结构中右键单击,滚动 UI 并选择要添加的 UI 元素类型。在本例中,我们将使用文本元素。…