Seatunnel本地模式快速测验

前言

SeaTunnel(先前称为WaterDrop)是一个分布式、高性能、易于扩展的数据集成平台,旨在实现海量数据的同步和转换。它支持多种数据处理引擎,包括Apache Spark和Apache Flink,并在某个版本中引入了自主研发的Zeta引擎。SeaTunnel不仅适用于离线数据同步,还能支持CDC(Change Data Capture)实时数据同步,这使得它在处理多样化数据集成场景时表现出色。

本节内容作为官方的一个补充测验,快速开始体验吧。


一、Apache Seatunnel是什么?

从官网的介绍看:
Next-generation high-performance, distributed, massive data integration tool.
通过这几个关键词你能看到它的定位:下一代,高性能,分布式,大规模数据集成工具。

那到底好不好用呢?

二、安装

  1. 下载
https://seatunnel.apache.org/download

推荐:v2.3.5

  1. 安装
    环境:Java8
    安装插件:修改 config/plugin_config 以下是一些常用的,不要的暂时不装。
--connectors-v2--
connector-cdc-mysql
connector-clickhouse
connector-file-local
connector-hive
connector-jdbc
connector-kafka
connector-redis
connector-doris
connector-fake
connector-console
connector-elasticsearch
--end--

然后执行:
➜ seatunnel bin/install-plugin.sh 2.3.5
等待执行完毕,就安装完了,很简单。

三、测试

1. 测试 local模式下的用例

修改下模板的测试用例,然后执行如下命令:

bin/seatunnel.sh --config ./config/v2.batch.config -e local

任务的配置很简单:
这里使用了FakeSource来模拟输出两列,通过设置并行度=2 来打印 16 条输出数据。
2024-07-01 21:56:06,617 INFO  [o.a.s.c.s.u.ConfigBuilder     ] [main] - Parsed config file:
{
    "env" : {
        "parallelism" : 2,
        "job.mode" : "BATCH",
        "checkpoint.interval" : 10000
    },
    "source" : [
        {
            "schema" : {
                "fields" : {
                    "name" : "string",
                    "age" : "int"
                }
            },
            "row.num" : 16,
            "parallelism" : 2,
            "result_table_name" : "fake",
            "plugin_name" : "FakeSource"
        }
    ],
    "sink" : [
        {
            "plugin_name" : "Console"
        }
    ]
}

 任务的输出信息,这里的输出组件是 Console所以打印到了控制台
2024-07-01 21:56:07,559 INFO  [o.a.s.c.s.f.s.FakeSourceReader] [BlockingWorker-TaskGroupLocation{jobId=860156818549112833, pipelineId=1, taskGroupId=30000}] - Closed the bounded fake source
2024-07-01 21:56:07,561 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=1:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : hECbG, 520364021
2024-07-01 21:56:07,561 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=1  rowIndex=1:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : LnGDW, 105727523
2024-07-01 21:56:07,561 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=2:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : UYXBT, 1212484110
2024-07-01 21:56:07,561 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=1  rowIndex=2:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : NYiCn, 1208734703
2024-07-01 21:56:07,561 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=3:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : cSZan, 151817804
任务的统计信息:
***********************************************
           Job Statistic Information
***********************************************
Start Time                : 2024-07-01 21:56:06
End Time                  : 2024-07-01 21:56:08
Total Time(s)             :                   2
Total Read Count          :                  32
Total Write Count         :                  32
Total Failed Count        :                   0
***********************************************

2. 使用 Flink引擎

在上面的测试用例中可以看到如下的日志输出:

 Discovery plugin jar for: PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='FakeSource'

这表示默认情况下它使用的是 seatunnel engine 执行的,官方称之为 zeta 。 这一块内容我们先看下 Flink引擎这边是如何执行的。

  1. 下载安装 flink1.17
    https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/

    启动local cluster 模式

    ➜  flink bin/start-cluster.sh
    Starting cluster.
    Starting standalonesession daemon on host MacBook-Pro-2.local.
    Starting taskexecutor daemon on host MacBook-Pro-2.local.
    
  2. 配置环境变量

    ➜  config cat seatunnel-env.sh
    # Home directory of spark distribution.
    SPARK_HOME=${SPARK_HOME:-/Users/mac/apps/spark}
    # Home directory of flink distribution.
    FLINK_HOME=${FLINK_HOME:-/Users/mac/apps/flink}
    
  3. 修改slot插槽数量为大于等于 2
    为什么?因为默认的配置中配置了 2 个并行度,而 local启动的默认情况下只有个插槽可供使用,因此任务无法运行。
    在这里插入图片描述
    默认启动后资源插槽:
    在这里插入图片描述
    提交程序运行后,发现一直无法对 sourcez做任务切分:
    在这里插入图片描述

    这是因为 job 的并行度是 2,如下所示:
    在这里插入图片描述

    在这里插入图片描述

    因此需要修改插槽数量才可以运行,官方这点可没说清楚,需要注意下。

  4. 运行测试用例

    ➜  seatunnel bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/v2.streaming.conf.template
    Execute SeaTunnel Flink Job: ${FLINK_HOME}/bin/flink run -c org.apache.seatunnel.core.starter.flink.SeaTunnelFlink /Users/mac/server/apache-seatunnel-2.3.5/starter/seatunnel-flink-15-starter.jar --config ./config/v2.streaming.conf.template --name SeaTunnel
    Job has been submitted with JobID 9a949409a6f218d50b66ca22cc49b9c4
    

    现在我们修改插槽数量为 2,测试如下:
    访问:http://localhost:8081/#/overview
    在这里插入图片描述
    TaskManager输出日志如下:
    在这里插入图片描述

3. 使用 Spark引擎

  1. 提交命令
➜  seatunnel bin/start-seatunnel-spark-3-connector-v2.sh \
--master 'local[4]' \
--deploy-mode client \
--config ./config/v2.streaming.conf.template

Execute SeaTunnel Spark Job: ${SPARK_HOME}/bin/spark-submit --class "org.apache.seatunnel.core.starter.spark.SeaTunnelSpark" --name "SeaTunnel" --master "local[4]" --deploy-mode "client" --jars "/Users/mac/server/seatunnel/lib/seatunnel-transforms-v2.jar,/Users/mac/server/seatunnel/lib/seatunnel-hadoop3-3.1.4-uber.jar,/Users/mac/server/seatunnel/connectors/connector-fake-2.3.5.jar,/Users/mac/server/seatunnel/connectors/connector-console-2.3.5.jar" --conf "job.mode=STREAMING" --conf "parallelism=2" --conf "checkpoint.interval=2000" /Users/mac/server/apache-seatunnel-2.3.5/starter/seatunnel-spark-3-starter.jar --config "./config/v2.streaming.conf.template" --master "local[4]" --deploy-mode "client" --name "SeaTunnel"

遇到报错:

2024-07-01 23:25:04,610 INFO v2.V2ScanRelationPushDown:
Pushing operators to SeaTunnelSourceTable
Pushed Filters:
Post-Scan Filters:
Output: name#0, age#1

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/connector/write/Write

看样子是缺少包导致的导致的,可以参见 issue讨论https://github.com/apache/seatunnel/issues/4879 貌似需要 spark 版本 >=3.2 ,而我的是 3.1.1 因此当前这个问题暂时无解。

Since spark 3.2.0, buildForBatch and buildForStreaming have been deprecated in org.apache.spark.sql.connector.write.WriteBuilder. So you should keep spark version >= 3.2.0.

于是,我便下载了 3.2.4(spark -> spark-3.2.4-bin-without-hadoop) 测试后出现了新的问题。

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/log4j/spi/Filter
	at java.lang.Class.getDeclaredMethods0(Native Method)
	at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
	at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
	at java.lang.Class.getMethod0(Class.java:3018)
	at java.lang.Class.getMethod(Class.java:1784)
	at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:684)
	at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:666)
Caused by: java.lang.ClassNotFoundException: org.apache.log4j.spi.Filter
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:359)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 7 more

这说的是 log4j的 jar包似乎不存在,由于我们使用的 spark 版本没有 hadoop的依赖,因此需要在 spark-env.sh里面配置相关的属性,如下:

export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home
export HADOOP_HOME=/Users/mac/apps/hadoop
export HADOOP_CONF_DIR=/Users/mac/apps/hadoop/etc/hadoop
export SPARK_DIST_CLASSPATH=$(/Users/mac/apps/hadoop/bin/hadoop classpath)
export SPARK_MASTER_HOST=localhost
export SPARK_MASTER_PORT=7077

再次提交测试后,结果如下:

24/07/02 13:40:19 INFO ConfigBuilder: Parsed config file:
{
    "env" : {
        "parallelism" : 2,
        "job.mode" : "STREAMING",
        "checkpoint.interval" : 2000
    },
    "source" : [
        {
            "schema" : {
                "fields" : {
                    "name" : "string",
                    "age" : "int"
                }
            },
            "row.num" : 16,
            "parallelism" : 2,
            "result_table_name" : "fake",
            "plugin_name" : "FakeSource"
        }
    ],
    "sink" : [
        {
            "plugin_name" : "Console"
        }
    ]
}

24/07/02 13:40:19 INFO SparkContext: Running Spark version 3.2.4
24/07/02 13:40:25 INFO FakeSourceReader: wait split!
24/07/02 13:40:25 INFO FakeSourceReader: wait split!
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Calculated splits for table fake successfully, the size of splits is 2.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Calculated splits for table fake successfully, the size of splits is 2.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Assigned [FakeSourceSplit(tableId=fake, splitId=1, rowNum=16), FakeSourceSplit(tableId=fake, splitId=0, rowNum=16)] to 2 readers.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Calculated splits successfully, the size of splits is 2.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Assigned [FakeSourceSplit(tableId=fake, splitId=1, rowNum=16), FakeSourceSplit(tableId=fake, splitId=0, rowNum=16)] to 2 readers.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Assigning splits to readers 1 [FakeSourceSplit(tableId=fake, splitId=1, rowNum=16)]
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Calculated splits successfully, the size of splits is 2.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Assigning splits to readers 0 [FakeSourceSplit(tableId=fake, splitId=0, rowNum=16)]
24/07/02 13:40:26 INFO FakeSourceReader: 16 rows of data have been generated in split(fake_1) for table fake. Generation time: 1719898826259
24/07/02 13:40:26 INFO FakeSourceReader: 16 rows of data have been generated in split(fake_0) for table fake. Generation time: 1719898826259
24/07/02 13:40:26 INFO ConsoleSinkWriter: subtaskIndex=1  rowIndex=1:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : eMaly, 2131476727
24/07/02 13:40:26 INFO ConsoleSinkWriter: subtaskIndex=0  rowIndex=1:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : Osfqi, 257240275
24/07/02 13:40:26 INFO ConsoleSinkWriter: subtaskIndex=1  rowIndex=2:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : BYVKb, 730735331

看结果符合预期,也就是使用 spark 提交 seatunnl引擎的流任务,通过FakeSource模拟两列输出了 16 条数据。看来的确是需要 spark3.2.x版本的才能成功了。


参考

https://www.modb.pro/db/605827

总结

本节主要总结了单机模式下使用 seatunel完成官方示例程序,初步体会使用,其实使用起来还是很简单的,模式同我之前介绍的 DataX如出一辙,可喜的是它有自己的 web页面可以配置,
因此后面我将分享下如何在页面中进行配置同步任务,最后时间允许的情况下,分析起优秀的源码设计思路,千里之行始于足下,要持续学习,持续成长,然后持续分享,再会~。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/776468.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【c++】通过写一个C++函数来模拟跨境洗钱和系统警告

效果图&#xff1a; 源码&#xff1a; #include <iostream> #include <cstdlib> #include <ctime> #include <iomanip> #include <chrono> #include <thread> // 引入线程头文件#ifdef _WIN32 // 确保只在Windows上包含Windows.h #inclu…

zigbee笔记:六、看门狗定时器(Watch Dog)

一、看门狗基础 1、看门狗功能&#xff1a; 由于单片机的工作常常会受到来自外界电磁场的干扰&#xff0c;造成各种寄存器和内存的数据混乱&#xff0c;会导致程序指针错误等&#xff0c;程序运行可能会陷入死循环。程序的正常运行被打断&#xff0c;由单片机控制的系统无法继…

芯片的PPA-笔记

写在前面&#xff1a;这个仅记录自己对芯片PPA的一些思考&#xff0c;不一定正确&#xff0c;还请各位网友思辨的看待&#xff0c;欢迎大家谈谈自己的想法。 1 此次笔记的起因 记录的原因&#xff1a;自己在整理这段时间的功耗总结&#xff0c;又看到工艺对功耗的影响&#x…

12.SQL注入-盲注基于时间(base on time)

SQL注入-盲注基于时间(base on time) boolian的盲注类型还有返回信息的状态&#xff0c;但是基于时间的盲注就什么都没有返回信息。 输入payload语句进行睡5秒中&#xff0c;通过开发这工具查看时间&#xff0c;如图所示&#xff0c;会在5秒钟后在执行&#xff0c;因此存在基于…

面试篇-系统设计题总结

文章目录 1、设计一个抢红包系统1.1 高可用的解决方案&#xff1a;1.2 抢红包系统的设计1.3 其他 2、秒杀系统设计 这里记录一些有趣的系统设计类的题目&#xff0c;一般大家比较喜欢出的设计类面试题目会和高可用系统相关比如秒杀和抢红包等。欢迎大家在评论中评论自己遇到的题…

磁钢生产领域上下料解决方案

随着智能制造技术的不断革新&#xff0c;磁钢生产领域正逐步引入自动化生产线。然而&#xff0c;传统的人工上下料方式存在诸多问题&#xff0c;难以满足现代生产需求。富唯智能提出了一款复合机器人磁钢上下料解决方案&#xff0c;通过先进的自动化技术&#xff0c;提高生产效…

填报高考志愿,怎样正确地选择大学专业?

大学专业的选择&#xff0c;会关系到未来几年甚至一辈子的发展方向。这也是为什么很多人结束高考之后就开始愁眉苦脸&#xff0c;因为他们不知道应该如何选择大学专业&#xff0c;生怕一个错误的决定会影响自己一生。 毋庸置疑&#xff0c;在面对这种选择的时候&#xff0c;我…

Keycloak SSO 如何验证已添加的 SPN 是否生效

使用 Kerberos Ticket 验证&#xff1a; 在客户端计算机上&#xff0c;运行以下命令以获取 Kerberos Ticket&#xff1a; klist检查是否存在与 HTTP/yourdomain.com 相关的票证。如果存在&#xff0c;说明 SPN 已生效。 测试应用程序&#xff1a; 使用具有 HTTP/yourdomain.com…

windows USB 设备驱动开发-控制传输的数据包

每次在主机控制器和 USB 设备之间移动数据时&#xff0c;都会发生传输。 通常&#xff0c;USB 传输可大致分为控制传输和数据传输。 所有 USB 设备都必须支持控制传输&#xff0c;并且可以支持用于数据传输的端点。 每种类型的传输都与设备缓冲区USB 端点 的类型相关联。 控制传…

Linux 查看磁盘是不是 ssd 的方法

lsblk 命令检查 $ lsblk -d -o name,rota如果 ROTA 值为 1&#xff0c;则磁盘类型为 HDD&#xff0c;如果 ROTA 值为 0&#xff0c;则磁盘类型为 SSD。可以在上面的屏幕截图中看到 sda 的 ROTA 值是 1&#xff0c;表示它是 HDD。 2. 检查磁盘是否旋转 $ cat /sys/block/sda/q…

北京十大拆迁律师事务所排名

历史时刻在重演&#xff0c;土地征地拆迁作为城市发展中不可或缺的环节备受地方政府重视。然而&#xff0c;在土地征收过程中&#xff0c;往往因为拆迁补偿引发各种纠纷案件&#xff0c;给拆迁方和被拆迁方带来重大损失&#xff0c;侵害双方利益&#xff0c;尤其是被征收人。因…

CentOS修复OpenSSH漏洞升级到openssh 9.7 RPM更新包

在做政府和学校单位网站时&#xff0c;经常需要服务器扫描检测&#xff0c;经常被OpenSSH Server远程代码执行漏洞&#xff08;CVE-2024-6387&#xff09;安全风险通告&#xff0c;出了报告需要升级OpenSSH。 使用yum update openssh是无法更新到最新的&#xff0c;因为系统里的…

YOLOX算法实现血细胞检测

原文:YOLOX算法实现血细胞检测 - 知乎 (zhihu.com) 目标检测一直是计算机视觉中比较热门的研究领域。本文将使用一个非常酷且有用的数据集来实现YOLOX算法,这些数据集具有潜在的真实应用场景。 问题陈述 数据来源于医疗相关数据集,目的是解决血细胞检测问题。任务是通过显微…

谷粒商城学习-09-配置Docker阿里云镜像加速及各种docker问题记录

文章目录 一&#xff0c;配置Docker阿里云镜像加速二&#xff0c;Docker安装过程中的几个问题1&#xff0c;安装报错&#xff1a;Could not resolve host: mirrorlist.centos.org; Unknown error1.1 检测虚拟机网络1.2 重设yum源 2&#xff0c;报错&#xff1a;Could not fetch…

给我的 IM 系统加上监控两件套:【Prometheus + Grafana】

监控是一个系统必不可少的组成部分&#xff0c;实时&#xff0c;准确的监控&#xff0c;将会大大有助于我们排查问题。而当今微服务系统的话有一个监控组合很火那就是 Prometheus Grafana&#xff0c;嘿你别说 这俩兄弟配合的相当完美&#xff0c;Prometheus负责数据采集&…

MyBatis入门程序详解

目录 一、MyBatis概述 二、编写MyBatis入门程序 三、配置SQL提示 四、传统jdbc的劣势 一、MyBatis概述 MyBatis是一个基于Java的持久层框架&#xff0c;它内部封装了JDBC操作&#xff0c;使得开发人员可以更专注于SQL语句本身而非繁琐的JDBC操作细节。在MyBatis中&#xff0…

LLM - 词向量 Word2vec

1. 词向量是一个词的低维表示&#xff0c;词向量可以反应语言的一些规律&#xff0c;词意相近的词向量之间近乎于平行。 2. 词向量的实现&#xff1a; &#xff08;1&#xff09;首先使用滑动窗口来构造数据&#xff0c;一个滑动窗口是指在一段文本中连续出现的几个单词&#x…

逻辑这回事(八)---- 时钟与复位

时钟设计总结 时钟和复位是FPGA设计的基础&#xff0c;本章总结了一些逻辑时钟复位设计、使用中出现的问题&#xff0c;给出了设计要点&#xff0c;避免后续问题重犯。时钟和复位&#xff0c;本文都先从板级谈起&#xff0c;再到FPGA芯片级&#xff0c;最后到模块级别。仅在此…

Java需要英语基础吗?

Java编程语言本身并不要求必须有很强的英语基础&#xff0c;因为Java的语法和逻辑是独立于任何特定语言的。我收集归类了一份嵌入式学习包&#xff0c;对于新手而言简直不要太棒&#xff0c;里面包括了新手各个时期的学习方向编程教学、问题视频讲解、毕设800套和语言类教学&am…

Nginx的安装与配置 —— Linux系统

一、Nginx 简介 1.1 什么是 Nginx Nginx是一款轻量级的Web 服务器/反向代理服务器及电子邮件&#xff08;IMAP/POP3&#xff09;代理服务器&#xff0c;在BSD-like 协议下发行。其特点是占有内存少&#xff0c;并发能力强&#xff0c;事实上nginx的并发能力在同类型的网页服务…