Spark SQL 血缘解析方案

背景

项目背景建设数据中台,往往数据开发人员首先需要能够通过有效的途径检索到所需要的数据,然后根据检索的数据模型进行业务加工然后得到一些中间模型,最后再通过数据抽取工具或者OLAP分析工具直接将数据仓库中加工好的公共模型输出到应用层。这里我不在去介绍数据仓库为何需要分层以及该如何分层,这个逻辑已经有很多厂商在业务中实践过,这里就不再赘述,本次主要需要解决的事数据链路加工血缘采集的方案。本着知识积累的原则记录一下方案。

Hive DDL采集和血缘

目前这个是最简单的,如果没有特殊的需求,可以直接对Apache Atlas中的hive hook进行裁剪,最终可以得到业务所需的血缘采集插件,一般可以到字段级别血缘。

Spark SQL血缘采集

目前针对Spark SQL血缘采集,首先DDL元数据采集依旧使用Apache Atlas中的hive Hook,因为即使使用Spark操作Hive也是最终链接的是hive的metastore数据库。现在主要解决的是Spark SQL计算中如何记录下血缘信息:

  • 方案1:
    如果用过Kyuubi的同学可能知道在该项目的源码中已经集成了Spark SQL血缘采集的板块,这块同样如果需要可以直接裁剪出来。但是这里小编不推荐,因为这个插件解析出来的信息不算是多么丰富,在某些场景下的血缘解析甚至无法正确解析出来。项目地址https://github.com/apache/kyuubi/tree/master/extensions/spark/kyuubi-spark-lineage
    Kyuubi Spark Lineage
  • 方案2:
    开源真的很强大,除了kyuubi产品之外,还有个比较强大的产品Apche Linkis,在这个产品里面也集成了Spark SQL血缘,这个工具解析比较全面给出的信息也比较多,解析的准确率很高。但是输出的json结构比较复杂,这点其实无所谓了,我们可以在了解完它的结构之后,可以对结果进行处理。项目地址https://github.com/AbsaOSS/spline-spark-agent,项目打包也很简单直接选择scala-2.12和spark-xxx即可打包。原生插件集成步骤很多,这里小编就介绍一下kafka的集成。
  • 拷贝kafka-clients-2.4.0.jar和spark-版本-spline-agent-bundle_2.12-2.0.0.jar到spark安装目录下的jar目录
  • 配置conf/spark-default.conf文件
spark.sql.queryExecutionListeners	za.co.absa.spline.harvester.listener.SplineQueryExecutionListener
spark.spline.lineageDispatcher	kafka
spark.spline.lineageDispatcher.kafka.topic	linkis_spark_lineage
spark.spline.lineageDispatcher.kafka.producer.bootstrap.servers	localhost:9092
# 添加额外属性,适合多租户场景下的血缘采集
spark.spline.postProcessingFilter	userExtraMeta
spark.spline.postProcessingFilter.userExtraMeta.className	za.co.absa.spline.harvester.postprocessing.metadata.MetadataCollectingFilter
spark.spline.postProcessingFilter.userExtraMeta.rules	{
   \"executionPlan\":{
   \"extra\":{
   \"companyCode\":\"1200202023020320\"\\,\"originQuery\":{
   \"$js\":\"session.conf().get('sql'\\,'')\"}}}}

到这里就可以启动Spark SQL客户端查看效果,例如小编执行如下sql

CREATE TABLE test.t_order (
  id INT,
  uid INT,
  amount INT,
  price DOUBLE,
  c_time TIMESTAMP
 );
 
CREATE TABLE test.t_user (
  uid INT,
  name STRING,
  age INT
);

CREATE TABLE test.t_order_detail (
  id INT,
  name STRING,
  cost DOUBLE,
  c_time TIMESTAMP
);
set sql= insert into t_order_detail select o.id,u.name,(o.amount * o.price) as cost,o.c_time from t_user u left join t_order o on o.uid=u.uid;

insert into t_order_detail select o.id,u.name,(o.amount * o.price) as cost,o.c_time from t_user u left join t_order o on o.uid=u.uid;

消费kafka的topiclinkis_spark_lineage可以消费到如下数据:

{
   
    "id": "49a81e8e-51f2-5a05-96c3-bc22a1bc3f81",
    "name": "SparkSQL::10.253.30.205",
    "operations": {
   
        "write": {
   
            "outputSource": "file://ZBMac-C02CW08SM:8020/Users/jiangzhongzhou/Software/bigdata2.0/spark-3.5.0-bin-hadoop-3.2.x/spark-warehouse/test.db/t_order_detail",
            "append": true,
            "id": "op-0",
            "name": "InsertIntoHiveTable",
            "childIds": [
                "op-1"
            ],
            "params": {
   
                "table": {
   
                    "identifier": {
   
                        "table": "t_order_detail",
                        "database": "test"
                    },
                    "storage": "Storage(Location: file:/Users/jiangzhongzhou/Software/bigdata2.0/spark-3.5.0-bin-hadoop-3.2.x/spark-warehouse/test.db/t_order_detail, Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, InputFormat: org.apache.hadoop.mapred.TextInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, Storage Properties: [serialization.format=1])"
                }
            },
            "extra": {
   
                "destinationType": "hive"
            }
        },
        "reads": [
            {
   
                "inputSources": [
                    "file://ZBMac-C02CW08SM:8020/Users/jiangzhongzhou/Software/bigdata2.0/spark-3.5.0-bin-hadoop-3.2.x/spark-warehouse/test.db/t_user"
                ],
                "id": "op-5",
                

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/743265.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Qt】day2

文章目录 菜单栏工具栏状态栏铆接部件(浮动窗口)中心部件添加图片对话框模态对话框非模态对话框标准对话框(信息对话框)错误对话框信息对话框提问对话框警告对话框 其他标准对话框颜色对话框文件对话框 字体对话框 登录窗口布局按…

《重构》读书笔记【第1章 重构,第一个示例,第2章 重构原则】

文章目录 第1章 重构,第一个示例1.1 重构前1.2 重构后 第2章 重构原则2.1 何谓重构2.2 两顶帽子2.3 为何重构2.4 何时重构2.5 重构和开发过程 第1章 重构,第一个示例 我这里使用的IDE是IntelliJ IDEA 1.1 重构前 plays.js export const plays {&quo…

将 MinIO 与 Keycloak OIDC 集成

Keycloak是一种单点登录解决方案。使用Keycloak,用户使用Keycloak而不是MinIO进行身份验证。如果没有Keycloak,您将不得不为每个用户创建一个单独的身份 - 从长远来看,这将很麻烦。您需要一个集中身份解决方案来管理 MinIO 的身份验证和授权。…

Python深度学习技术

原文链接:Python深度学习技术 近年来,伴随着以卷积神经网络(CNN)为代表的深度学习的快速发展,人工智能迈入了第三次发展浪潮,AI技术在各个领域中的应用越来越广泛。Transformer模型(BERT、GPT-…

3.2 文件包含漏洞渗透实战(OWASP实战训练)

3.2 文件包含漏洞渗透实战(OWASP实战训练) 原理及危害3.低安全级别渗透3.1本地文件包含漏洞3.2 本地文件包含漏洞webshell3.3远程文件包含漏洞 上一节讲了本地文件包含和远程文件包含 本地文件包含需要将文件传上去或者不传上去(本地系统的一…

导出 S 参数扫描结果供 INTERCONNECT 使用

导出 S 参数扫描结果供 INTERCONNECT 使用 正文正文 有时候,对于 FDTD 无法直接进行仿真的大型仿真链路,我们需要使用 FDTD 针对单个小的模块进行仿真,再将得到的 S 参数结果导入到 INTERCONNECT 中使用,最终完成整个链路的仿真。通常完成 S 参数扫描后其状态如下图所示:…

【数据结构】栈的定义与实现(附完整运行代码)

目录 一、栈的定义 二、顺序栈 链栈比较 三、栈的实现(顺序栈) 3.1 ❥ 定义栈结构 3.2 ❥ 初始化 3.3 ❥ 销毁 3.4 ❥ 插入(入栈) 3.5 ❥ 删除 (出栈) 3.6 ❥ 获取栈顶元素 3.7 ❥ 判空 3.8 ❥…

1962springboot VUE社区服务平台系统开发mysql数据库web结构java编程计算机网页源码maven项目

一、源码特点 springboot VUE社区服务平台系统是一套完善的完整信息管理类型系统,结合springboot框架和VUE完成本系统,对理解vue java编程开发语言有帮助系统采用springboot框架(MVC模式开发),系统具有完整的源代码和…

光泽正在褪去,所以我们又回到了人工智能领域。

光泽正在褪去,所以我们又回到了人工智能领域。 人工智能冬天将被私有化 自从“人工智能”这个流行词在20世纪50年代被创造出来以来,人工智能经历了几次繁荣和萧条周期。 一种新的技术方法看起来很有趣,并取得了一些成果。它被荒谬地炒作并获…

EdgeOne 边缘函数 + Hono.js + Fauna 搭建个人博客

一、背景 虽然 “博客” 已经是很多很多年前流行的东西了,但是时至今日,仍然有一部分人在维护自己的博客站点,输出不少高质量的文章。 我使用过几种博客托管平台或静态博客生成框架,前段时间使用Hono.jsFauna ,基于 …

热电发电机越来越受到研发关注

热电发电机 (TEG) 利用热量(或更准确地说,温差)和众所周知的塞贝克效应来发电。它们的应用范围从收集可用热能,尤其是在工业和其他情况下“浪费”的热能,到在放射性同位素热发电机 (RTG) 中使用航天器的放射性电源作为…

静电场的基本方程

目录 场积分方程 通量(高斯定理) 环量 场微分方程 散度 旋度 小结 补充知识 立体角 场积分方程 通量(高斯定理) 环量 场微分方程 散度 旋度 小结 补充知识 立体角

慢性病防治新策略:诊所管理系统助力健康管理变革

慢性病,如高血压、糖尿病等,正逐渐成为全球健康领域的重要挑战。据尚普咨询的数据显示,全球每年有4100万人死于慢性非传染性疾病,占全球死亡总数的71%。而在中国,随着经济社会发展和卫生健康服务水平的提高&#xff0c…

OpenAI突然宣布停止向中国提供API服务!

标题 🌟 OpenAI突然宣布停止向中国提供API服务! 🌟摘要 📜引言 📢正文 📝1. OpenAI API的重要性2. 停止服务的原因分析3. 对中国市场的影响4. 应对措施代码案例 📂常见问题解答(QA)❓…

使用AES,前端加密,后端解密,spring工具类了

学习python的时候,看到很多会对参数进行加密,于是好奇心驱使下,让我去了解了下AES加密如何在java中实现。 首先 npm install crypto-js 然后在你的方法中,给你们前端源码看看,因为我用的ruoyi框架做的实验&#xff…

iSCSI driver not found和Failed to start Open-iSCSI的解决方法

案例1:iscsi的配置有问题 方法:一般的处理方法为重装iscsi-initiator-utils。 案例2:linux安装了多个内核,启动所选的内核与iscsi服务不匹配 方法:重启系统,选择对应的内核版本启动系统。 (注…

Python 围棋

效果图 完整代码 源码地址:Python 围棋 # 使用Python内置GUI模块tkinter from tkinter import * # ttk覆盖tkinter部分对象,ttk对tkinter进行了优化 from tkinter.ttk import * # 深拷贝时需要用到copy模块 import copy import tkinter.me…

机器学习课程复习——奇异值分解

1. 三种奇异值分解 奇异值分解(Singular Value Decomposition, SVD)包含了: 完全奇异值分解(Complete Singular Value Decomposition, CSVD)紧奇异值分解(Tight Singular Value Decomposition, TSVD&…

赶快收藏!全网最佳 WebSocket 封装:完美支持断网重连、自动心跳!

文章目录 一、WebSocket 基础WebSocket 的基本使用 二、封装 WebSocket 客户端WebSocketClient 类使用 WebSocketClient 类解释代码实现 三、总结优点未来改进 🎉欢迎来到SpringBoot框架学习专栏~ ☆* o(≧▽≦)o *☆嗨~我是IT陈寒🍹✨博客主页&#xff…

【复旦邱锡鹏教授《神经网络与深度学习公开课》笔记】卷积

卷积经常用在信号处理中,用于计算信号的延迟累积。假设一个信号发射器每个时刻 t t t产生一个信号 x t x_t xt​,其信息的衰减率为 w k w_k wk​,即在 k − 1 k-1 k−1个时间步长后,信息为原来的 w k w_k wk​倍,时刻 …