第三期【Demo教程】教你使用SeaTunnel把数据从MySQL导到Hive

随着数据技术的快速发展,了解并掌握各种工具和技术变得尤为重要。为此,我们准备在Apache SeaTunnel社区发起如何使用连接器的Demo演示计划,邀请所有热爱数据同步技术的同学分享他们的知识和实操经验!

file

我们第三期主题是:如何使用SeaTunnel连接器从MySQL同步到Hive,如果您对此计划感兴趣,也欢迎联系社区运营同学参与Demo录制!无论您是数据工程师、开发者还是技术爱好者,都欢迎您参与并展示您的技术才能。

敲重点敲重点如果你是用户,想看什么同步场景的Demo!请下滑到最底部留言,我们优先出品呼声最高的同步场景Demo!

Demo计划目标

我们的目标是创建一个共享和学习的平台,通过具体的Demo演示和对应的文档帮助社区成员更好地理解和应用各种数据连接器。这些Demo可以帮助新手快速学习,同时也为资深专家提供一个展示创新解决方案的舞台。

src="//player.bilibili.com/player.html?isOutside=true&aid=1855834722&bvid=BV1ks421T7ZV&cid=1586444145&p=1" scrolling="no" border="0" frameborder="no" framespacing="0" allowfullscreen="true">

如何使用 SeaTunnel进行高效数据同步

关于从MySQL同步到Hive,前段时间也有用户投稿,感兴趣的同学可以搜索看看:

【最佳实践】2个步骤教你从Mysql同步到Hive

如何使用 SeaTunnel 同步 MySQL 数据到 Hive

Mysql Source连接器相关请参考之前的教程: 全方位解读SeaTunnel MySQL CDC连接器:实现数据高效同步的强大工具

需要参考的文档及代码原文链接:https://seatunnel.apache.org/docs/2.3.5/connector-v2/sink/Hive (预计2.3.6版本才能正式使用)

描述

将数据写入到 Hive。

要使用此连接器,您必须确保您的 Spark/Flink 集群已经集成了 Hive。

如果您使用 SeaTunnel Zeta Engine,则需要将 seatunnel-hadoop3-3.1.4-uber.jarhive-exec-3.1.3.jarlibfb303-0.9.3.jar 放置在 $SEATUNNEL_HOME/lib/ 目录下。 :::

关键特性

  • 精确一次

默认情况下,我们使用两阶段提交(2PC)来确保 精确一次

  • 文件格式
    • text
    • csv
    • parquet
    • orc
    • json
  • 压缩编码
    • lzo

选项

名称类型必需默认值
table_namestring-
metastore_uristring-
compress_codecstringnone
hdfs_site_pathstring-
hive_site_pathstring-
hive.hadoop.confMap-
hive.hadoop.conf-pathstring-
krb5_pathstring/etc/krb5.conf
kerberos_principalstring-
kerberos_keytab_pathstring-
abort_drop_partition_metadatabooleantrue
common-options-

table_name [string]

目标 Hive 表的名称,例如:db1.table1。如果源是多模式的,您可以使用 ${database_name}.${table_name} 来生成表名,它会用源中生成的 CatalogTable 的值替换 ${database_name}${table_name}

metastore_uri [string]

Hive Metastore 的 URI。

hdfs_site_path [string]

hdfs-site.xml 的路径,用于加载 namenodes 的高可用配置。

hive_site_path [string]

hive-site.xml 的路径。

hive.hadoop.conf [map]

Hadoop 配置文件中的属性(core-site.xmlhdfs-site.xmlhive-site.xml)。

hive.hadoop.conf-path [string]

core-site.xmlhdfs-site.xmlhive-site.xml 文件的指定加载路径。

krb5_path [string]

krb5.conf 的路径,用于 Kerberos 认证。

kerberos_principal [string]

Kerberos 的 principal。

kerberos_keytab_path [string]

Kerberos 的 keytab 路径。

abort_drop_partition_metadata [list]

决定在中止操作期间是否从 Hive Metastore 中删除分区元数据的标志。

注意:这仅影响 metastore 中的元数据,同步过程中生成的数据将始终被删除。

common options

Sink 插件的常用参数,请参阅 Sink Common Options 获取详细信息。

示例

Hive {
  table_name = "default.seatunnel_orc"
  metastore_uri = "thrift://namenode001:9083"
}

示例 1

我们有一个源表,如下所示:

create table test_hive_source(
  test_tinyint TINYINT,
  test_smallint SMALLINT,
  test_int INT,
  test_bigint BIGINT,
  test_boolean BOOLEAN,
  test_float FLOAT,
  test_double DOUBLE,
  test_string STRING,
  test_binary BINARY,
  test_timestamp TIMESTAMP,
  test_decimal DECIMAL(8,2),
  test_char CHAR(64),
  test_varchar VARCHAR(64),
  test_date DATE,
  test_array ARRAY<INT>,
  test_map MAP<STRING, FLOAT>,
  test_struct STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
)
PARTITIONED BY (test_par1 STRING, test_par2 STRING);

我们需要从源表读取数据并写入到另一个表中:

create table test_hive_sink_text_simple(
  test_tinyint TINYINT,
  test_smallint SMALLINT,
  test_int INT,
  test_bigint BIGINT,
  test_boolean BOOLEAN,
  test_float FLOAT,
  test_double DOUBLE,
  test_string STRING,
  test_binary BINARY,
  test_timestamp TIMESTAMP,
  test_decimal DECIMAL(8,2),
  test_char CHAR(64),
  test_varchar VARCHAR(64),
  test_date DATE
)
PARTITIONED BY (test_par1 STRING, test_par2 STRING);

作业配置文件如下:

env {
  parallelism = 3
  job.name="test_hive_source_to_hive"
}

source {
  Hive {
    table_name = "test_hive.test_hive_source"
    metastore_uri = "thrift://ctyun7:9083"
  }
}

sink {
  Hive {
    table_name = "test_hive.test_hive_sink_text_simple"
    metastore_uri = "thrift://ctyun7:9083"
    hive.hadoop.conf = {
      bucket = "s3a://mybucket"
    }
  }
}

Hive on S3

1、为 EMR 的 Hive 创建 lib 目录

mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib

2、从 Maven 中心获取 jar 到 lib 目录

cd ${SEATUNNEL_HOME}/plugins/Hive/lib
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.5/hadoop-aws-2.6.5.jar
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar

3、从 EMR 环境中复制 jar 到 lib 目录

cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.60.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/hadoop-common-3.3.6-amzn-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/javax.inject-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/aopalliance-1.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib

4、运行测试用例

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    schema = {
      fields {
        pk_id = bigint
        name = string
        score = int
      }
      primaryKey {
        name = "pk_id"
        columnNames = [pk_id]
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [1, "A", 100]
      },
      {
        kind = INSERT
        fields = [2, "B", 100]
      },
      {
        kind = INSERT
        fields = [3, "C", 100]
      }
    ]
  }
}

sink {
  Hive {
    table_name = "test_hive.test_hive_sink_on_s3"
    metastore_uri = "thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083"
    hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
    hive.hadoop.conf = {
       bucket="s3://ws-package"
    }
  }
}

Hive on OSS

1、为 EMR 的 Hive 创建 lib 目录

mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib

2、从 Maven 中心获取 jar 到 lib 目录

cd ${SEATUNNEL_HOME}/plugins/Hive/lib
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar

3、从 EMR 环境中复制 jar 到 lib 目录并删除冲突的 jar

cp -r /opt/apps/JINDOSDK/jindosdk-current/lib/jindo-*.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
rm -f ${SEATUNNEL_HOME}/lib/hadoop-aliyun-*.jar

4、运行测试用例

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    schema = {
      fields {
        pk_id = bigint
        name = string
        score = int
      }
      primaryKey {
        name = "pk_id"
        columnNames = [pk_id]
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [1, "A", 100]
      },
      {
        kind = INSERT
        fields = [2, "B", 100]
      },
      {
        kind = INSERT
        fields = [3, "C", 100]
      }
    ]
  }
}

sink {
  Hive {
    table_name = "test_hive.test_hive_sink_on_oss"
    metastore_uri = "thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083"
    hive.hadoop.conf-path = "/tmp/hadoop"
    hive.hadoop.conf = {
        bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com"
    }
  }
}

示例 2

我们有多个源表,如下所示:

create table test_1(
)
PARTITIONED BY (xx);

create table test_2(
)
PARTITIONED BY (xx);
...

我们需要从这些源表读取数据并写入到其他表中:

作业配置文件如下:

env {
  # 在这里设置 Flink 配置
  parallelism = 3
  job.name="test_hive_source_to_hive"
}

source {
  Hive {
    tables_configs = [
      {
        table_name = "test_hive.test_1"
        metastore_uri = "thrift://ctyun6:9083"
      },
      {
        table_name = "test_hive.test_2"
        metastore_uri = "thrift://ctyun7:9083"
      }
    ]
  }
}

sink {
  Hive {
    table_name = "${database_name}.${table_name}"
    metastore_uri = "thrift://ctyun7:9083"
  }
}

通过视频教程,我们探讨了如何使用 Apache SeaTunnel 的 Hive Sink Connector 将数据高效地写入 Hive 表。

无论是在本地环境还是云上部署,使用 Hive Sink Connector 都能够帮助企业构建高效、可靠的数据处理流程。希望通过本文的指导,您能更好地理解和应用这一强大的工具,以满足您的数据处理需求。

如果您对本文内容有任何疑问或建议,欢迎在评论区分享您的想法。让我们共同探讨和进步,不断推动数据技术的边界。

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/727948.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Cesium源码解析六(3dtiles属性获取、建筑物距离计算、建筑物着色及其原理分析)

快速导航 Cesium源码解析一&#xff08;搭建开发环境&#xff09; Cesium源码解析二&#xff08;terrain文件的加载、解析与渲染全过程梳理&#xff09; Cesium源码解析三&#xff08;metadataAvailability的含义&#xff09; Cesium源码解析四&#xff08;metadata元数据拓展…

物联网设备安装相关知识整理

拓扑图 对于ADAM-4150先接设备的整体的供电。 ADAM-4150就涉及到几个电子元器件的连接&#xff0c;一个是485-232的转换器&#xff0c;一个是将RS-232转换为USB的转接口&#xff0c;因为现在的计算机很多都去掉了RS-232接口而使用USB接口。 4150右侧有个拨码&#xff0c;分别两…

在Linux服务器上安装Anaconda使用conda

1. 下载安装包 wget https://repo.anaconda.com/archive/Anaconda3-5.3.0-Linux-x86_64.sh 安装成功 2. 安装anaconda chmod x Anaconda3-5.3.0-Linux-x86_64.sh./Anaconda3-5.3.0-Linux-x86_64.sh 一直回车 直到出现 yes or no&#xff0c; 输入 yes 继续回车&#xff0c;然…

链表OJ

GDUFE 在期末前再刷一次链表题 ~ 203. 移除链表元素 - 力扣&#xff08;LeetCode&#xff09; /*** Definition for singly-linked list.* struct ListNode {* int val;* struct ListNode *next;* };*/ struct ListNode* removeElements(struct ListNode* head, int …

文本高效管理神器:支持自定义行数拆分,轻松实现批量高效编辑与管理新体验

在信息爆炸的时代&#xff0c;文本处理成为了我们日常工作中不可或缺的一部分。然而&#xff0c;面对大量的文本数据&#xff0c;如何高效地进行编辑和管理&#xff0c;却成为了许多人头疼的问题。现在&#xff0c;有了我们的文本批量高效编辑管理工具&#xff0c;一切将变得简…

Srouce Insight 4出现乱码

今天用SI4打开一个工程文件&#xff0c;一打开发现注释全是乱码。中文全部看不出来&#xff0c;英文和数字可以看得出来。 那是因为中文的编码格式不算特别兼容。所以需要调整编码格式。 于是我在这里调整了编码格式&#xff1a; 找到菜单的Options-Preferences里面的Files 调…

web中间件漏洞-Tomcat漏洞-密码爆破、war包上传

web中间件漏洞-Tomcat漏洞-密码爆破、war包上传 密码爆破 步骤: 抓登陆包、对字典进行base64编码&#xff0c;爆破得到账号密码tomcat/tomcat,登陆即可 tomcat/tomcat登陆成功 服务器 查看 tomcat-users.xml里的账号密码 war包上传 步骤 上传war包、访问即可

JAVA每日作业day6.20

ok了家人们&#xff0c;今天学习了面向对象的继承&#xff0c;话不多说让我们看看怎么个事。 我们先把昨天学 面向对象-封装 的温习一下&#xff0c;来个例子 1&#xff0c;综合案例 做一个关于学生的随机点名器 定义了两个变量&#xff0c;name和age&#xff0c;给他们封装一…

自动化办公04 使用pyecharts制图

目录 一、柱状图 二、折线图 三、饼图 四、地图 1. 中国地图 2. 世界地图 3. 省会地图 五、词云 Pyecharts是一个用于数据可视化的Python库。它基于Echarts库&#xff0c;可以通过Python代码生成各种类型的图表&#xff0c;如折线图、柱状图、饼图、散点图等。 Pyecha…

Python之scapy(1)基础使用

Python之scapy(1)基础使用 Author: Once Day Date: 2024年6月4日 一位热衷于Linux学习和开发的菜鸟&#xff0c;试图谱写一场冒险之旅&#xff0c;也许终点只是一场白日梦… 漫漫长路&#xff0c;有人对你微笑过嘛… 全系列文章可参考专栏: Python开发_Once-Day的博客-CSDN博…

审稿人:拜托,请把模型时间序列去趋势!!

大侠幸会&#xff0c;在下全网同名「算法金」 0 基础转 AI 上岸&#xff0c;多个算法赛 Top 「日更万日&#xff0c;让更多人享受智能乐趣」 时间序列分析是数据科学中一个重要的领域。通过对时间序列数据的分析&#xff0c;我们可以从数据中发现规律、预测未来趋势以及做出决策…

【python - 函数】

一、递归函数 如果函数体中直接或间接调用了函数本身&#xff0c;则函数称为递归&#xff08;recursive&#xff09;函数。也就是说&#xff0c;执行递归函数主体的过程中可能需要再次调用该函数。在 Python 中&#xff0c;递归函数不需要使用任何特殊语法&#xff0c;但它们确…

智慧消防新篇章:可视化数据分析平台引领未来

一、什么是智慧消防可视化数据分析平台&#xff1f; 智慧消防可视化数据分析平台&#xff0c;运用大数据、云计算、物联网等先进技术&#xff0c;将消防信息以直观、易懂的图形化方式展示出来。它不仅能够实时监控消防设备的运行状态&#xff0c;还能对火灾风险进行预测和评估…

大数据助力电商发展||电商API接口接入

伴随互联网尤其是移动互联网的高速发展&#xff0c;电子商务已经成为人们生活中不可或缺的一部分&#xff0c;人们的购物理念和消费模式正在发生颠覆性的转变。基于天然的数据优势&#xff0c;电子商务平台利用大数据计算技术不断实施数据的累积、分析和处理&#xff0c;消费者…

如何设计一个点赞系统

首先我们定义出一个点赞系统需要对外提供哪些接口&#xff1a; 1.用户对特定的消息进行点赞&#xff1b; 2.用户查看自己发布的某条消息点赞数量以及被哪些人赞过&#xff1b; 3.用户查看自己给哪些消息点赞过&#xff1b; 这里假设每条消息都有一个message_id, 每一个用户都…

[17] 使用Opencv_CUDA 进行滤波操作

使用Opencv_CUDA 进行滤波操作 邻域处理操作 > 滤波操作&#xff0c;拒绝或者允许某特定频段通过如果图像某处的灰度级变化缓慢&#xff0c;那么就是低频区域&#xff0c;如果灰度级变化剧烈&#xff0c;就是高频区域邻域滤波即卷积操作形态学处理&#xff1a;膨胀&#xf…

【论文通读】VideoGUI: A Benchmark for GUI Automation from Instructional Videos

VideoGUI: A Benchmark for GUI Automation from Instructional Videos 前言AbstractMotivationVideoGUIPipelineEvaluation ExperimentsMain ResultsAnalysis Conclusion 前言 数字智能体的探索又来到了新的阶段&#xff0c;除了常见的桌面工具如PPT&#xff0c;Word&#xf…

HTML(15)——盒子模型

盒子模型组成 内容区域 -width&height内边距-padding &#xff08;出现在内容与盒子边缘之间&#xff09;边框线-border外边距-margin &#xff08;出现在盒子外面&#xff09; div { width: 200px; height: 200px; background-color: rgb(85, 226, 193); padding: 20px; …

【项目实践】Ulike充电牙刷拆解

前言 用了一段时间的充电牙刷&#xff0c;某一次突然没电了&#xff0c;按键也没有反应。无奈只能废弃。最近略微得了些空闲&#xff0c;想着把它拆解看看里面的结构和电路。以下是鼓捣过程记录。 为什么不能直接抽出来&#xff1f; 在网上看到很多拆解视频&#xff0c;都是打开…

基于Windows API DialogBox的对话框

在C中&#xff0c;DialogBox函数是Windows API的一部分&#xff0c;它用于在Win32应用程序中创建并显示一个模态对话框。DialogBox函数是USER32.DLL中的一个导出函数&#xff0c;因此你需要在你的C Win32应用程序中链接到这个库。 #include "framework.h" #include …