解决SeaTunnel 2.3.4版本写入S3文件报错问题

在使用Apache SeaTunnel时,我遇到了一个写入S3文件的报错问题。通过深入调试和分析,找到了问题所在,并提出了相应的解决方案。 file

本文将详细介绍报错情况、参考资料、解决思路以及后续研究方向,希望对大家有帮助!

一、详细报错

2024-04-12 20:44:18,647 ERROR [.c.FileSinkAggregatedCommitter] [hz.main.generic-operation.thread-43] - commit aggregatedCommitInfo error, aggregatedCommitInfo = FileAggregatedCommitInfo(transactionMap={/xugurtp/seatunnel/tmp/seatunnel/831147703474847745/476b6a6fc7/T_831147703474847745_476b6a6fc7_0_1={/xugurtp/seatunnel/tmp/seatunnel/831147703474847745/476b6a6fc7/T_831147703474847745_476b6a6fc7_0_1/NON_PARTITION/output_params_0.json=/xugurtp/seatunnel/tmp/6af80b38f3434aceb573cc65b9cd12216a/39111/output_params_0.json}}, partitionDirAndValuesMap={}) java.lang.IllegalStateException: Connection pool shut down

二、参考资料

  • HADOOP-16027:https://issues.apache.org/jira/browse/HADOOP-16027
  • CSDN Blog:https://blog.csdn.net/a18262285324/article/details/112470363
  • AWS SDK Java Issue #2337:https://github.com/aws/aws-sdk-java/issues/2337
  • Amazon SQS Java Messaging Lib Issue #96:https://github.com/awslabs/amazon-sqs-java-messaging-lib/issues/96
  • 博客园:https://www.cnblogs.com/xhy-shine/p/10772736.html

三、解决思路

1. 远程调试

在本地IDEA中进行debug未发现报错,但在服务器上执行时却报错,因此决定进行远程debug。执行以下命令添加JVM参数:

-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005

实际命令是:

 java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dhazelcast.client.config=/opt/module/seatunnel-2.3.4/config/hazelcast-client.yaml -Dseatunnel.config=/opt/module/seatunnel-2.3.4/config/seatunnel.yaml -Dhazelcast.config=/opt/module/seatunnel-2.3.4/config/hazelcast.yaml -Dlog4j2.configurationFile=/opt/module/seatunnel-2.3.4/config/log4j2_client.properties -Dseatunnel.logs.path=/opt/module/seatunnel-2.3.4/logs -Dseatunnel.logs.file_name=seatunnel-starter-client -Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-client -XX:MaxMetaspaceSize=1g -XX:+UseG1GC -cp /opt/module/seatunnel-2.3.4/lib/*:/opt/module/seatunnel-2.3.4/starter/seatunnel-starter.jar org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient -e local --config job/s3_sink.conf -cn xxx

2. 定位问题

通过调试发现问题出在hadoop-aws使用的缓存连接池对象。关键在于if判断部分,如果上游传递了fs.s3a.impl.disable.cache=true,则不使用缓存。深入debug发现:有时hadoopConf.getSchema获取的不是s3a而是s3n

s3和s3n / s3a的区别

  • s3:基于块的文件系统
  • s3n:基于对象存储的文件系统,支持高达5GB的对象
  • s3a:基于对象存储的文件系统,支持高达5TB的对象,并具有更高的性能

在配置文件中设置的是s3a,但实际获取到的是s3n,这显然不合理。

3. 深入挖掘

我仔细看了一下报错的截图发现:

file

确实是commit期间报的错:那么也就是说commit初始化s3conf并没有走buildWithConfig方法,而是用的默认值,而且我根本没找到commit里面有new s3Conf的代码,再次debug看看谁去重新初始化了S3Conf

file

定位到这里就很头疼了,已经涉及到引擎层而非插件层面了,涉及到classloader的使用以及反序列化操作:

file

反序列化代码:

        logicalDag =
                CustomClassLoadedObject.deserializeWithCustomClassLoader(
                        nodeEngine.getSerializationService(),
                        classLoader,
                        jobImmutableInformation.getLogicalDag());

很明显可以看出,S3Conf(静态类)被重新初始化了,导致SHEMA被重新赋值成s3n

file

因为s3conf它本身的属性都是静态的,而对classloader反序列化是时会重新加载静态属性的,所以导致shema被重新赋值为默认s3n

综上所述

除了sourcesink阶段,AggregatedCommit操作也会写入s3File。错误发生在commit期间,说明初始化S3Conf时并没有走buildWithConfig方法,而是使用了默认值。

由于S3Conf类的属性是静态的,反序列化时会重新加载静态属性,导致SCHEMA被重新赋值为默认的s3n

资料参考:https://wiki.apache.org/hadoop/AmazonS3

s3:基于Block块的文件系统

S3 Block FileSystem(URI scheme:s3)由S3支持的基于块的文件系统。 文件存储为块,就像HDFS一样。 这样可以有效地实现重命名。 此文件系统需要您为文件系统专用一个存储桶 - 您不应使用包含文件的现有存储桶,或将其他文件写入同一存储区。 此文件系统存储的文件大于5GB,但不能与其他S3工具进行互操作。

s3n:基于对象存储的文件系统

S3 Native FileSystem(URI scheme:s3n)用于在S3上读取和写入常规文件的本机文件系统。 这个文件系统的优点是您可以访问使用其他工具编写的S3上的文件。 相反,其他工具可以访问使用Hadoop编写的文件。 缺点是S3的文件大小限制为5GB。

s3a:基于对象存储的文件系统

S3A(URI方案:s3a)是S3 Native,s3n fs的继承者,S3a:系统使用Amazon的库与S3进行交互。 这允许S3A支持较大的文件(不超过5GB的限制),更高的性能操作等等。 文件系统旨在替代S3 Native:从s3n:// URL可访问的所有对象也应该通过替换URL模式从s3a访问。

public class S3Conf extends HadoopConf {
    private static final String HDFS_S3N_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem";
    private static final String HDFS_S3A_IMPL = "org.apache.hadoop.fs.s3a.S3AFileSystem";
    private static final String S3A_SCHEMA = "s3a";
    private static final String DEFAULT_SCHEMA = "s3n";
    private static String SCHEMA = DEFAULT_SCHEMA;

    @Override
    public String getFsHdfsImpl() {
        return switchHdfsImpl();
    }

    @Override
    public String getSchema() {
        return SCHEMA;
    }

    private S3Conf(String hdfsNameKey) {
        super(hdfsNameKey);
    }

    public static HadoopConf buildWithConfig(Config config) {

        HadoopConf hadoopConf = new S3Conf(config.getString(S3ConfigOptions.S3_BUCKET.key()));
        String bucketName = config.getString(S3ConfigOptions.S3_BUCKET.key());
        if (bucketName.startsWith(S3A_SCHEMA)) {
            SCHEMA = S3A_SCHEMA;
        }
        HashMap<String, String> s3Options = new HashMap<>();
        putS3SK(s3Options, config);
        if (CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_PROPERTIES.key())) {
            config.getObject(S3ConfigOptions.S3_PROPERTIES.key())
                    .forEach((key, value) -> s3Options.put(key, String.valueOf(value.unwrapped())));
        }

        s3Options.put(
                S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),
                config.getString(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key()));
        s3Options.put(
                S3ConfigOptions.FS_S3A_ENDPOINT.key(),
                config.getString(S3ConfigOptions.FS_S3A_ENDPOINT.key()));
        hadoopConf.setExtraOptions(s3Options);
        return hadoopConf;
    }

    public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig readonlyConfig) {
        Config config = readonlyConfig.toConfig();
        HadoopConf hadoopConf = new S3Conf(readonlyConfig.get(S3ConfigOptions.S3_BUCKET));
        String bucketName = readonlyConfig.get(S3ConfigOptions.S3_BUCKET);
        if (bucketName.startsWith(S3A_SCHEMA)) {
            SCHEMA = S3A_SCHEMA;
        }
        HashMap<String, String> s3Options = new HashMap<>();
        putS3SK(s3Options, config);
        if (CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_PROPERTIES.key())) {
            config.getObject(S3ConfigOptions.S3_PROPERTIES.key())
                    .forEach((key, value) -> s3Options.put(key, String.valueOf(value.unwrapped())));
        }

        s3Options.put(
                S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),
                readonlyConfig.get(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER).getProvider());
        s3Options.put(
                S3ConfigOptions.FS_S3A_ENDPOINT.key(),
                readonlyConfig.get(S3ConfigOptions.FS_S3A_ENDPOINT));
        hadoopConf.setExtraOptions(s3Options);
        return hadoopConf;
    }

    private String switchHdfsImpl() {
        switch (SCHEMA) {
            case S3A_SCHEMA:
                return HDFS_S3A_IMPL;
            default:
                return HDFS_S3N_IMPL;
        }
    }

    private static void putS3SK(Map<String, String> s3Options, Config config) {
        if (!CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_ACCESS_KEY.key())
                && !CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_SECRET_KEY.key())) {
            return;
        }
        String accessKey = config.getString(S3ConfigOptions.S3_ACCESS_KEY.key());
        String secretKey = config.getString(S3ConfigOptions.S3_SECRET_KEY.key());
        if (S3A_SCHEMA.equals(SCHEMA)) {
            s3Options.put("fs.s3a.access.key", accessKey);
            s3Options.put("fs.s3a.secret.key", secretKey);
            return;
        }
        // default s3n
        s3Options.put("fs.s3n.awsAccessKeyId", accessKey);
        s3Options.put("fs.s3n.awsSecretAccessKey", secretKey);
    }
}

参考了反序列的知识才了解到这个情况:

当对一个包含静态成员的类进行反序列化时,静态成员不会恢复为之前的状态,而是保持在其初始状态。任何静态变量的值都是与该类本身相关的,

4. 解决方案

  • 1.去掉stastic修饰,把有参构造换成无参构造和静态工厂方法:

  • 2.保留stastic静态方法,使用getSchema方法代替静态属性调用:

由此可见,代码中的细节问题,即使看似微不足道,也可能引发严重的后果。一个简单的静态修饰符的误用,不仅能导致程序行为异常,更可能导致系统稳定性和安全性的大问题。

相关的issues已提交,大家有兴趣可以查看:

  • [bigfix][S3 File]:Change the [SCHEMA] attribute of the [S3CONF class] to be non-static to avoid being reassigned after deserialization by LeonYoah · Pull Request #6717 · apache/seatunnel (github.com)

  • [Bug] [S3File] [zeta-local] Error writing to S3File in version 2.3.4:: Java lang. An IllegalStateException: Connection pool shut down · Issue #6678 · apache/seatunnel (github.com)

四、有待研究

1.为什么只有local模式会报错:

推测可能是cluster模式是分布式的,每个算子分布在不同的机器上,所以本地缓存不会被使用,类似于没有走缓存。

2.为什么本地IDEA执行local模式却没问题

可能是Windows和Linux的线程调度机制不同导致的。

结论

通过这次对Apache SeaTunnel S3 File写入报错问题的分析与解决,希望这些经验能帮助到遇到类似问题的开发者,同时也提醒大家在处理分布式系统时注意细节问题,以免引发不必要的故障。

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/772088.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

PyTorch - 神经网络基础

神经网络的主要原理包括一组基本元素&#xff0c;即人工神经元或感知器。它包括几个基本输入&#xff0c;例如 x1、x2… xn &#xff0c;如果总和大于激活电位&#xff0c;则会产生二进制输出。 样本神经元的示意图如下所述。 产生的输出可以被认为是具有激活电位或偏差的加权…

Java通过GeoLite2-City.mmdb 进行IP信息查询地理定位和经纬度筛选。

引入依赖 <dependency><groupId>com.maxmind.geoip2</groupId><artifactId>geoip2</artifactId><version>4.2.0</version> </dependency>下载数据文件&#xff1a;https://download.lin2ur.cn/GeoLite2/ package com.cqclo…

经典递归分析

在前面一篇中, 已经看过许多直观的递归的例子, 在这篇里, 将分析两个经典的递归问题, 阶乘与菲波那契数列数列, 在此过程中, 还将对比递归与循环(迭代)间的异同, 探讨递归与内存中的栈的关系, 以及递归的效率等问题. 如无特别说明, 示例使用的是 Java, IDE 则为 Eclipse. 阶乘(…

Matplotlib 简介

import matplotlib.pyplot as plt plt.plot([1, 2, 3, 4]) plt.ylabel(some numbers) plt.show() 当使用plot只传入单个数组时&#xff0c;matplotlib会认为这是y的值&#xff0c;并自动生成长度相同&#xff0c;但是从0开始的x值&#xff0c;所以这里的x会自动生成为 [0,1,2,…

python自动化办公之BeautifulSoup爬取并解析html文本

用到的库&#xff1a;BeautifulSoup 实现效果&#xff1a;爬取网站内容&#xff0c;拿到html文本并解析html文本 代码&#xff1a; 先爬取 # 先导入requests包 import requests urlhttps://www.baidu.com responserequests.get(url) # 做1个断言&#xff0c;如果执行成功&a…

java的工厂设备管理系统-计算机毕业设计源码16179

摘要 在现代制造业中&#xff0c;高效的设备管理对于确保生产过程的顺利进行至关重要。为了满足工厂对于设备管理的需求&#xff0c;我们设计并实现了一个基于 Java 的工厂设备管理系统。 该系统旨在提供一个全面、可靠且易于使用的解决方案&#xff0c;以帮助工厂有效地管理…

QT截屏,截取控件为图片,指定范围截屏三种截屏方式

项目中我们常用到截取屏幕&#xff0c;Qt给我的们多种方式&#xff1a; 主要有以下三种&#xff1a; 截取全屏&#xff1b;截取控件为图片&#xff1b;指定位置截屏三种截屏方式&#xff1b; 1.截取全屏 常用&#xff1a; 实现&#xff1a; QScreen *screen QGuiApplicat…

【超万卡GPU集群关键技术深度分析 2024】

文末有福利&#xff01; 1. 集群高能效计算技术 随着大模型从千亿参数的自然语言模型向万亿参数的多模态模型升级演进&#xff0c;超万卡集群吸需全面提升底层计算能力。 具体而言&#xff0c;包括增强单芯片能力、提升超节点计算能力、基于 DPU (Data Processing Unit) 实现…

波动方程 - 在三维图中动态显示二维波动方程的解就像水面波澜起伏

波动方程 - 在三维图中动态显示二维波动方程的解就像水面波澜起伏 flyfish 波动方程的求解结果通常不是一个单一的数值&#xff0c;而是一个函数或一组函数&#xff0c;这些函数描述了波随时间和空间的传播情况。具体来说&#xff0c;波动方程的解可以是关于时间和空间变量的…

#LinuxC高级 笔记二

makefile gcc gdb makefile 1. 分文件编程 1.1 源文件&#xff1a;.c结尾的文件 包含main函数的.c 包含子函数的.c 1.2 头文件&#xff1a;.h结尾的文件 头文件、宏定义、typedef 、结构体、共用体、枚举、函数声明 include引用时“”和<>的区别&#xff1a; <>去系…

JSON字符串中获取一个指定字段的值

一、方式一&#xff0c;引用gson工具 测试报文&#xff1a; {"account":"yanxiaosheng","password":"123456" } 引入pom <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson --> <dependency><gr…

假设性文档嵌入 HyDE:大模型 + 对比学习,从关键词相似度搜索到语义搜索

假设性文档嵌入 HyDE&#xff1a;大模型 对比学习&#xff0c;从关键词相似度搜索到语义搜索 提出背景流程图解法拆解类比1. 单一文档嵌入空间的搜索2. 指令跟随型语言模型&#xff08;InstructLM&#xff09;的引入3. 生成文档的嵌入编码 提出背景 论文&#xff1a;https://…

保存huggingface缓存中AI模型(从本地加载AI模型数据)

在github下拉项目后,首次运行时会下拉一堆模型数据&#xff0c;默认是保存在缓存的&#xff0c;如果你的系统盘空间快满的时候就会被系统清理掉&#xff0c;每次运行又重新下拉一次&#xff0c;特别麻烦。 默认下载的缓存路径如下&#xff1a;C:\Users\用户名\.cache\huggingf…

【Unity性能消耗】ScriptableObject复用数据节省内存占用

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 专栏交流&#x1f9e7;&…

APP INVENTOR硬件交互学习教程05——自动连接蓝牙实现

每次打开手机APP&#xff0c;需要选择蓝牙&#xff0c;用起来很麻烦。有没有方法实现自动连接上次的地址呢&#xff0c;接下来请看吧&#xff01;1.界面设计增加了一个微数据库组件&#xff0c;借助它用来存储硬件地址 2.程序设计这里主要用两个方法&#xff0c;存储地址方法…

Python基础语法(与C++对比)(持续更新ing)

代码块 Python在统一缩进体系内&#xff0c;为同一代码块C{...}内部的为同一代码块 注释 Python 单行注释&#xff1a;#... 多行注释&#xff1a;... C 单行注释&#xff1a;//... 多行注释: /*...*/ 数据类型 1. Python数据类型 Python中支持数字之间使用下划线 _ 分割…

LVS+Nginx高可用集群--基础篇

1.集群概述 单体部署&#xff1a; 可以将上面内容分别部署在不同的服务器上。 单体架构的优点&#xff1a; 小团队成型就可完成开发&#xff0c;测试&#xff0c;上线 迭代周期短&#xff0c;速度快 打包方便&#xff0c;运维简单 单体架构的挑战&#xff1a;单节点宕机造成…

day03-numpy数据类型

numpy数据类型 名称描述名称描述bool_布尔型数据类型&#xff08;True 或者 False&#xff09;float_float64 类型的简写int_默认的整数类型&#xff08;类似于 C 语言中的 long&#xff0c;int32 或 int64&#xff09;float16/32/64半精度浮点数:1 个符号位&#xff0c;5 个指…

《ClipCap》论文笔记(上)

原文出处 [2111.09734] ClipCap: CLIP Prefix for Image Captioning (arxiv.org) 原文笔记 What ClipCap&#xff1a; CLIP Prefix for Image Captioning 一言以蔽之&#xff1a;使用 CLIP 编码作为标题的前缀&#xff0c;使用简单的映射网络&#xff0c;然后微调语言模型…

datawhale大模型应用开发夏令营学习笔记一

参考自 基于LangChainLLM的本地知识库问答&#xff1a;从企业单文档问答到批量文档问答datawhale的llm-universe 作者现在在datawhale夏令营的大模型应用开发这个班中&#xff0c;作为一个小白&#xff0c;为了能为团队做出一点贡献&#xff0c;现在就要开始学习怎么使用langch…