CDPHudi实战-集成spark

[一]使用Spark-shell

1-配置hudi Jar包

[root@cdp73-1 ~]# for i in $(seq 1 6); do scp /opt/software/hudi-1.0.0/packaging/hudi-spark-bundle/target/hudi-spark3.4-bundle_2.12-1.0.0.jar   cdp73-$i:/opt/cloudera/parcels/CDH/lib/spark3/jars/; done
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 418.2MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 304.8MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 365.0MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 406.1MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 472.7MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 447.1MB/s   00:00
[root@cdp73-1 ~]#

2-进入Spark-shell

spark-shell --packages org.apache.hudi:hudi-spark3.4-bundle_2.12:1.0.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'

3-初始化项目

// spark-shell
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import spark.implicits._

val tableName = "trips_table"
val basePath = "hdfs:///tmp/trips_table"

4-创建表

首次提交将自动初始化表,如果指定的基本路径中尚不存在该表。

5-出入数据

// spark-shell
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
  Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
    (1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
    (1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
    (1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"    ),
    (1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));

var inserts = spark.createDataFrame(data).toDF(columns:_*)
inserts.write.format("hudi").
  option("hoodie.datasource.write.partitionpath.field", "city").
  option("hoodie.embed.timeline.server", "false").
  option("hoodie.table.name", tableName).
  mode(Overwrite).
  save(basePath)

【映射到Hudi写操作】​​​​​​​Hudi提供了多种写操作——包括批量和增量写操作——以将数据写入Hudi表,这些操作具有不同的语义和性能。当未配置记录键(请参见下面的键)时,将选择bulk_insert作为写操作,这与Spark的Parquet数据源的非默认行为相匹配。

6-查询数据

// spark-shell
val tripsDF = spark.read.format("hudi").load(basePath)
tripsDF.createOrReplaceTempView("trips_table")

spark.sql("SELECT uuid, fare, ts, rider, driver, city FROM  trips_table WHERE fare > 20.0").show()
spark.sql("SELECT _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare FROM  trips_table").show()

7-更新数据

// Lets read data from target Hudi table, modify fare column for rider-D and update it. 
val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-D").withColumn("fare", col("fare") * 10)

updatesDf.write.format("hudi").
  option("hoodie.datasource.write.operation", "upsert").
  option("hoodie.embed.timeline.server", "false").
  option("hoodie.datasource.write.partitionpath.field", "city").
  option("hoodie.table.name", tableName).
  mode(Append).
  save(basePath)

8-合并数据

// spark-shell
val adjustedFareDF = spark.read.format("hudi").
  load(basePath).limit(2).
  withColumn("fare", col("fare") * 10)
adjustedFareDF.write.format("hudi").
option("hoodie.datasource.write.payload.class","com.payloads.CustomMergeIntoConnector").
mode(Append).
save(basePath)
// Notice Fare column has been updated but all other columns remain intact.
spark.read.format("hudi").load(basePath).show()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/947519.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

mac m2 安装 docker

文章目录 安装1.下载安装包2.在downloads中打开3.在启动台打开打开终端验证 修改国内镜像地址小结 安装 1.下载安装包 到官网下载适配的安装包:https://www.docker.com/products/docker-desktop/ 2.在downloads中打开 拖过去 3.在启动台打开 选择推荐设置 …

Power BI如何连接Azure Databricks数据源?

故事背景: 近期有朋友询问,自己公司有一些项目使用了Azure Databricks用于数据存储。如何使用Power BI Desktop桌面开发软件连接Azure Databricks的数据源呢? 解决方案: 其实Power BI是提供了连接Azure Databricks数据源的选项的,只是配置…

Python入门教程 —— 进制转换

找其他编译器,系统解释器,这样速度会快很多。 进制 现代的计算机和依赖计算机的设备里都用到二进制(即0和1)来保存和表示数据,一个二进制表示一个比特(Bit)。 在二进制的基础上,计算机还支持八进制和十六进制这两种进制。 除了…

HTML5新特性|05 CSS3边框CSS3背景

CSS3边框 1、CSS3边框: 通过CSS3,您能够创建圆角边框,向矩形添加阴影,使用图片来绘制边框-并且不需使用设计软件,比如PhotoShop。 属性: border-radius 圆角box-shadow:水平阴影 垂直阴影 阴影的清晰度 阴影的大小 阴影的颜色…

《Vue3实战教程》26:Vue3Transition

如果您有疑问,请观看视频教程《Vue3实战教程》

SpringCloudAlibaba实战入门之Sentinel服务降级和服务熔断(十五)

一、Sentinel概述 1、Sentinel是什么 随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。 一句话概括:sentinel即Hystrix的替代品,官网: https://sentinelguard.io/zh…

Scratch教学作品 | 白水急流——急流勇进,挑战反应极限! ‍♂️

今天为大家推荐一款刺激又好玩的Scratch冒险作品——《白水急流》!由AgentFransidium制作,这款作品将带你体验惊险的急流救援任务,帮助那位“睡着的疯狂人”安全穿越湍急水域!想要挑战自己的反应极限?快来试试吧&#…

计算机毕业设计Django+Tensorflow音乐推荐系统 音乐可视化 卷积神经网络CNN LSTM音乐情感分析 机器学习 深度学习 Flask

温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…

Nginx服务器配置SSL证书

1.执行以下命令,在Nginx的conf目录下创建一个用于存放证书的目录。 cd /usr/local/nginx/conf #进入Nginx默认配置文件目录。该目录为手动编译安装Nginx时的默认目录,如果您修改过默认安装目录或使用其他方式安装,请根据实际配置调整。 mkd…

Gemini和ChatGPT全面对比分析,有什么区别和优势?

当 AI 聊天机器人首次出现时,每个人都在竞相发布自己的足够好的第一版 AI 聊天机器人,很容易在 Gemini 与 ChatGPT 等应用程序之间进行比较。但随着 Google 和 OpenAI 不断添加新功能、模型和访问其聊天机器人的方式,差异变得不那么明显。 现…

从0到机器视觉工程师(二):封装调用静态库和动态库

目录 静态库 编写静态库 使用静态库 方案一 方案二 动态库 编写动态库 使用动态库 方案一 方案二 方案三 总结 静态库 静态库是在编译时将库的代码合并到最终可执行程序中的库。静态库的优势是在编译时将所有代码包含在程序中,可以使程序独立运行&…

低代码开发:开启企业数智化转型“快捷键”

一、低代码开发浪潮来袭,企业转型正当时 在当今数字化飞速发展的时代,低代码开发已如汹涌浪潮,席卷全球。从国际市场来看,诸多企业巨头纷纷布局低代码领域,像微软的 PowerApps、OutSystems 等平台,凭借强大…

UE5动画蓝图

动画蓝图,混合空间,状态机,瞄准偏移,动画蒙太奇,动画混合,骨骼绑定,动画重定向,动画通知,Control Rig…… 虚幻动画模块是一个庞大的系统,大模块里又包含很多…

[redux] useDispatch的两种用法

先重写2个方法先, 方便ts类型推导,如果你看不懂为什么这么写, 先看我这篇 [redux] ts声明useSelector和useDispatch-CSDN博客 export type RootState ReturnType<typeof store.getState>; export type AppDispatch typeof store.dispatch; export const useAppDispat…

javaEE-网络原理-1初识

目录 一.网络发展史 1.独立模式 2.网络互联 二.局域网LAN 1.基于网线直连&#xff1a; 2.基于集线器组件&#xff1a; 3.基于交换机组件&#xff1a; 4.基于交换机和路由器组件 ​编辑 三、广域网WAN 四、网络通信基础 1.ip地址 2.端口号&#xff1a; 3.协议 4.五…

jenkins入门3

1、新建视图 视图可以理解为是item的集合&#xff0c;这样可以将item分类。新建视频可以选择加入已有的item 2、新建item 1)输入任务名称、选择一个类型&#xff0c;常用的是第一个freestyle project 2&#xff09;进行item相关配置&#xff0c;general 设置项目名字,描述,参数…

【C语言的小角落】--- 深度理解取余/取模运算

Welcome to 9ilks Code World (๑•́ ₃ •̀๑) 个人主页: 9ilk (๑•́ ₃ •̀๑) 文章专栏&#xff1a; C语言的小角落 本篇博客我们来深度理解取余/取模&#xff0c;以及它们在不同语言中出现不同现象的原因。 &#x1f3e0; 关于取整 &#x1f3b5; 向0取整…

mapbox进阶,添加路径规划控件

👨‍⚕️ 主页: gis分享者 👨‍⚕️ 感谢各位大佬 点赞👍 收藏⭐ 留言📝 加关注✅! 👨‍⚕️ 收录于专栏:mapbox 从入门到精通 文章目录 一、🍀前言1.1 ☘️mapboxgl.Map 地图对象1.2 ☘️MapboxDirections 控件二、🍀添加路径规划控件1. ☘️实现思路2. ☘️…

linux-25 文件管理(三)复制、移动文件,cp,mv

命令cp是copy的简写&#xff0c;而mv则是move的简写。那既然copy是用于实现复制文件的&#xff0c;那通常一般我们要指定其要复制的是谁&#xff1f;而且复制完以后保存在什么地方&#xff0c;对吧&#xff1f;那因此它的使用格式很简单&#xff0c;那就是cp srcfile dest&…

IDEA开发Java应用的初始化设置

一、插件安装 如下图所示&#xff1a; 1、Alibaba Java Coding Guidelines 2.1.1 阿里开发者规范&#xff0c;可以帮忙本地自动扫描出不符合开发者规范的代码&#xff0c;甚至是代码漏洞提示。 右击项目&#xff0c;选择《编码规约扫描》&#xff0c;可以进行本地代码规范扫…