Flink实操:Flink SQL实现SFTP文件读写操作

一、背景

公司需要将Doris数据库中的部分表数据同步至SFTP服务器,以供其他合作企业安全读取和使用。目前,平台数据同步功能统一使用Flink引擎进行实时同步、离线同步的工作。因此,希望能够充分利用现有的Flink引擎,并将其复用于这一需求。下图是我们的解决方案的结构示意图:

在这里插入图片描述

二、技术调研

  1. 由于我们选择使用Flink引擎来实现需求,我们需要进行调研以确定Flink是否支持SFTPDoris的连接器。经过查阅当前的Flink版本(v1.20-SNAPSHOT)的文档,我们发现Flink Table API仅提供FileSystem SQL Connector用于操作文件。
  2. 然而,在文档中并未提及该连接器是否支持SFTP协议,同时也没有提供指定SFTP的主机名、端口和秘钥文件的参数。因此,我们需要进一步验证是否可以使用该连接器来访问SFTP,并配置相关参数。以下是官方示例:
CREATE TABLE MyUserTable (
  column_name1 INT,
  column_name2 STRING,
  ...
  part_name1 INT,
  part_name2 STRING
) PARTITIONED BY (part_name1, part_name2) WITH (
  'connector' = 'filesystem',           -- required: specify the connector
  'path' = 'file:///path/to/whatever',  -- required: path to a directory
  'format' = '...',                     -- required: file system connector requires to specify a format,
                                        -- Please refer to Table Formats
                                        -- section for more details
  'partition.default-name' = '...',     -- optional: default partition name in case the dynamic partition
                                        -- column value is null/empty string
  'source.path.regex-pattern' = '...',  -- optional: regex pattern to filter files to read under the 
                                        -- directory of `path` option. This regex pattern should be
                                        -- matched with the absolute file path. If this option is set,
                                        -- the connector  will recursive all files under the directory
                                        -- of `path` option

  -- optional: the option to enable shuffle data by dynamic partition fields in sink phase, this can greatly
  -- reduce the number of file for filesystem sink but may lead data skew, the default value is false.
  'sink.shuffle-by-partition.enable' = '...',
  ...
)
  1. 因此,我们需要进一步确认该连接器是否适用于SFTP。首先想到的是Flink自身的CheckPoint功能,它支持HDFS、S3等文件存储系统。底层实现是通过org.apache.flink.core.fs.FileSystem类来进行操作。而这个类同样是FileSystem SQL Connector的底层实现类。既然如此,Flink#FileSystem应该支持访问HDFS、S3等其他文件系统,那其内部必然会使用hadoop#FileSystem的api,而hadoop#FileSystem自身如果支持SFTP,则此路可以走通,为了确认这一点,开始查看源码并查看类注释中的相关信息,发现一段有用信息:

    /**
    * Flink implements and supports some file system types directly (for example the default machine-local file system). Other file * system types are accessed by an implementation that bridges to the suite of file systems supported by Hadoop (such as for  
    * example HDFS).
    */
    
    // 翻译:Flink直接实现并支持一些文件系统类型(例如默认的机器本地文件系统)。其他文件系统类型由桥接到Hadoop支持的文件系统套件(例如HDFS)的实现访问。
    
  2. 看到此处信心倍增,继续翻阅后发现在Flink#FileSystem有一个getUnguardedFileSystem函数,如下图:

在这里插入图片描述

  1. 该函数会检测文件url路径,如果路径是file://则会走Flink#FileSystem的内部实现,如果是hdfs://sftp://这类前缀,则会调用loadHadoopFsFactory函数,如下图:

在这里插入图片描述

  1. 至此找到Flink#FileSystemHadoop#FileSystem 的桥接处,在该函数中会先加载hadoop#FileSystem并构建Flink#FileSystem的子类HadoopFileSystem,而在HadoopFileSystem类中使用hadoop#FileSystem提供的能力。
  2. 那么hadoop#FileSystem是否提供了读写SFTP的能力呢?经过调研发现只有Hadoop-2.8.x版本以上才支持SFTP,JIRA工单

在这里插入图片描述

在这里插入图片描述

  1. 至此总结以下:虽然Flink#FileSystem原生并未支持sftp读写,但Flink#FileSystem中如果遇见不支持的文件前缀如: hdfs:// 或者 sftp:// ,则会桥接到Hadoop#FileSystem类中,而Hadoop#FileSystem底层是支持丰富的文件类型,其中的SFTPFileSystem实现类即可读写SFTP,逻辑图如下:

在这里插入图片描述

三、程序验证

  1. 有了理论支撑后开始程序验证,先在sftp文件系统上放置一个user.csv文件供FlinkSQL读取,文件内容如下:

    1,jack
    2,tom
    3,lily
    
  2. 再在IDE中导入相关Flink、Hadoop相关依赖:

<dependencies>
        <!-- flink start -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- flink end -->
        <!-- hadoop start -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!-- hadoop end -->
    </dependencies>
  1. 编写程序:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


public class SftpExample {
    public static void main(String[] args) {

        // 设置执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 定义文件系统连接器
        tableEnv.executeSql(
                "CREATE TABLE SftpCsvTable (" +
                        "  age INT," +
                        "  name STRING" +
                        ") WITH (" +
                        "  'connector'='filesystem'," +
                        "  'path'='sftp://101.230.65.134:21821/user.csv'," +
                        "  'format'='csv'" +
                        ")"
        );

        // 执行 SQL 查询操作
        tableEnv.executeSql("SELECT * FROM SftpCsvTable").print();
    }

}
  1. 启动后报错如下: 没有格式为sftpFileSystem类。
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'sftp'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
	at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:543)
	at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
	at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
	at org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumerator.enumerateSplits(NonSplittingRecursiveEnumerator.java:82)
	at org.apache.flink.connector.file.src.AbstractFileSource.createEnumerator(AbstractFileSource.java:141)
	... 34 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop File System abstraction does not support scheme 'sftp'. Either no file system implementation exists for that scheme, or the relevant classes are missing from the classpath.
	at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:100)
	at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:526)
	... 38 more
Caused by: java.io.IOException: No FileSystem for scheme: sftp
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2798)
	at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:98)
	... 39 more

  1. 随后debug定位异常代码发现 :在hadoop程序中默认不开启sftp的实现类,若要使用则需要在core-site.xml配置文件中配置fs.sftp.impl ,如下图:

在这里插入图片描述

  1. 此外,光配置fs.sftp.impl还不够,生产中访问sftp一般都是指定用户名密码或者秘钥,而这些参数的配置项在SFTPFileSystem中有变量可以参考,如下:
    在这里插入图片描述

  2. 最终我们将开发环境中hadoop#core-site.xml配置文件放置在IDE#Resource目录下,并根据SFTP相关信息配置好对应K,V,内容如下:

<configuration>
  <!-- 配置sftp实现类 -->
  <property>
    <name>fs.sftp.impl</name>
    <value>org.apache.hadoop.fs.sftp.SFTPFileSystem</value>
  </property>
  
  <!-- 配置sftp用户名 -->
  <property>
    <name>fs.sftp.user.101.230.65.134</name>
    <value>username</value>
  </property>
  
  <!-- 配置sftp秘钥路径 -->
  <property>
    <name>fs.sftp.keyfile</name>
    <value>D:\IdeaProjects\flink-sftp\src\main\resources\uat_sftp_qadmin.rsa</value>
  </property>
  
  <!-- 配置sftp密码 -->
  <property>
    <name>fs.sftp.password.101.230.65.134</name>
    <value>password</value>
  </property>

</configuration>
  1. 执行成功,如下图:

在这里插入图片描述

  1. 至此我们可以使用FlinkSQL#FileSystem已经写好的各种文件格式类型以及分区功能,还可以享受Hadoop#SFTPFileSystem读写sftp的能力,可以说完美的解决此需求。

四、总结

通过这次详细排查和研究,对Flink文件系统的实现有了更加深入的理解。起初,我对FlinkSQL是否支持sftp产生了疑问,然而,通过逐步追踪源代码,逐渐揭示了底层逻辑的实现机制:尽管原生的Flink#FileSystem并没有直接支持SFTP的读写操作,但它通过一个巧妙的桥接机制,将不支持的文件前缀(例如hdfs://或者sftp://)重新定向到Hadoop#FileSystem类来处理。而Hadoop#FileSystem底层提供了对多种文件类型的广泛支持,只要存在SFTP的实现类,就可以顺利进行操作。这个过程中,深刻体会到了理解底层原理和追踪代码的重要性。

这次经历让我明白,在后续的开发过程中,我们应该保持持续的好奇心,提出更多问题,积极思考,并深入探索底层实现原理,时刻保持探索精神,不断拓展我们的知识和技能,以便在开发过程中能够更加游刃有余地应对各种挑战。

五、相关资料

  • Hadoop#JIRA工单
  • FileSystem SQL Connector
  • org.apache.flink.core.fs.FileSystem
  • getUnguardedFileSystem
  • loadHadoopFsFactory
  • HadoopFileSystem

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/449422.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

四 超级数据查看器 讲解稿 列表功能1

四 超级数据查看器 讲解稿 列表功能1 点击此处 以新页面 打开B站 播放教学视频 APP下载地址 百度手机助手 下载地址4 讲解稿全文&#xff1a; 大家好&#xff0c;今天我们讲解一下&#xff0c;超级数据查看器列表界面&#xff0c;分为1-2两集。 首先&#xff0c…

ChatGPT+MATLAB应用

MatGPT是一个由chatGPT类支持的MATLAB应用程序&#xff0c;由官方Toshiaki Takeuchi开发&#xff0c;允许您轻松访问OpenAI提供的chatGPT API。作为官方发布的内容&#xff0c;可靠性较高&#xff0c;而且也是完全免费开源的&#xff0c;全程自己配置&#xff0c;无需注册码或用…

MySQL的加锁规则

学习了MySQL的锁后&#xff0c;知道其有这么多锁&#xff0c;那应该会有些疑惑&#xff0c;这么多锁&#xff0c;究竟我在写sql语句时候用到哪个锁的&#xff0c;什么情况是用什么锁的&#xff1f;在哪里查看该sql语句是用了哪些锁的呢&#xff1f;加锁的规则是什么呢&#xff…

【C++初阶】第六站 : 模板初阶

前言&#xff1a; 本章知识点&#xff1a;泛型编程、函数模板、类模板 专栏&#xff1a; C初阶 目录 泛型编程 函数模板 1.函数模板概念 2.函数模板格式 3.函数模板的原理 4.函数模板的实例化 5.模板参数的匹配原则 类模板 类模板的定义格式 类模板的实例化 泛型编程 如何实现一…

Redis 的基本全局命令

前言 Redis 常用的有 5 种数据结构&#xff0c;字符串&#xff0c;列表&#xff0c;哈希表&#xff0c;集合&#xff0c;有序集合&#xff0c;每一种数据结构都有自己独特的命令&#xff0c;但也有些通用的全局命令&#xff0c;本文所提到的是最基本的命令&#xff0c;Redis 的…

linux查看文件内容cat,less,vi,vim

学习记录 目录 catlessvi vim cat 输出 FILE 文件的全部内容 $ cat [OPTION] FILE示例 输出 file.txt 的全部内容 $ cat file.txt查看 file1.txt 与 file2.txt 连接后的内容 $ cat file1.txt file2.txt为什么名字叫 cat&#xff1f; 当然和猫咪没有关系。 cat 这里是 co…

使用 IDEA 将本地jar上传到本地maven仓库

IDEA中的操作步骤 创建一个 Maven 运行配置 在开发工具的导航栏中&#xff0c;点击选择配置&#xff1a; 在配置界面点击左上角的加号&#xff0c;随后选择增加一个maven运行配置&#xff1a; 编辑 Maven 配置 上图中的含义&#xff1a; Name 对应的是本配置的名字、用处或功…

PyTorch搭建AlexNet训练集

本次项目是使用AlexNet实现5种花类的识别。 训练集搭建与LeNet大致代码差不多&#xff0c;但是也有许多新的内容和知识点。 1.导包&#xff0c;不必多说。 import torch import torch.nn as nn from torchvision import transforms, datasets, utils import matplotlib as p…

NFTScan | 03.04~03.10 NFT 市场热点汇总

欢迎来到由 NFT 基础设施 NFTScan 出品的 NFT 生态热点事件每周汇总。 周期&#xff1a;2024.03.04~ 2024.03.10 NFT Hot News 01/ 数据&#xff1a;比特币链上 NFT 过去 24 小时销售额超 3100 万美元 3 月 4 日&#xff0c;据数据显示&#xff0c;比特币链上 NFT 过去 24 小…

设计模式十:原型模式

文章目录 1、原型模式1.1 类创建过程1.2 浅拷贝1.3 深拷贝 2、示例2.1 简单形式2.2 复杂形式 3、spring中的原型模式3.1 ArrayList的原型模式3.2 spring中的原型模式 1、原型模式 原型模式就是从一个对象再创建另外一个可定制的对象&#xff0c; 而且不需要知道任何创建的细节。…

Vscode+QT+Python

参考链接&#xff1a;VSCodePyQt之Python界面编写_vscode编写图形化界面-CSDN博客 1.安装库 pip install PyQt5 pip install PyQt5-tools pip install qt5_applications 2.在VSCode里下载并安装PYQT Integration 3.配置pyqt integration 4.打开qt designer 在工程文件的空白…

python自动化之pytest框架以及数据驱动(第五天)

1.pytest框架需要遵循的规则 &#xff08;1&#xff09;.py 测试文件必须以test 开头(或者以 test结尾) &#xff08;2&#xff09;测试类必须以Test开头&#xff0c;并且不能有 init 方法 &#xff08;3&#xff09;测试方法必须以test 开头 &#xff08;4&#xff09;断言…

分享个好用的GPT网站

目录 一、背景 二、功能描述 1、写代码 2、联网查询 3、AI绘图 一、背景 我现在的开发工作都依靠ChatGPT&#xff0c;效率提升了好几倍。这样一来&#xff0c;我有更多时间来摸鱼&#xff0c;真是嘎嘎香~ ⭐⭐⭐点击直达 ⭐⭐⭐ 二、功能描述 1、写代码 import java.ut…

机器学习之分类回归模型(决策数、随机森林)

回归分析 回归分析属于监督学习方法的一种&#xff0c;主要用于预测连续型目标变量&#xff0c;可以预测、计算趋势以及确定变量之间的关系等。 Regession Evaluation Metrics 以下是一些最流行的回归评估指标: 平均绝对误差(MAE):目标变量的预测值与实际值之间的平均绝对差…

基于PHP+Amaze+JQuery的学习论坛的设计与实现1.99

摘 要 互联网教育服务是在互联网技术、通信技术、计算机技术不断发展融合的基础之上&#xff0c;人们在对以信息为基础的各种各样应用需求快速增长的激励之下&#xff0c;在现在社会信息化的水平日益提高前提之下&#xff0c;迅速发展起来的一种全新大众服务方式。 笔者拟设计…

前端食堂技术周刊第 115 期:Rolldown 正式开源、马斯克宣布 xAI 本周将开源 Grok、如何使用 Copilot 完成 50% 的日常工作?

美味值&#xff1a;&#x1f31f;&#x1f31f;&#x1f31f;&#x1f31f;&#x1f31f; 口味&#xff1a;手打柠檬茶 食堂技术周刊仓库地址&#xff1a;https://github.com/Geekhyt/weekly 大家好&#xff0c;我是童欧巴。欢迎来到前端食堂技术周刊&#xff0c;我们先来看…

Docker的安装及镜像加速的配置

文章目录 一.切换到root二.卸载旧版docker三.配置docker的yum库四.安装Docker五.Docker的启动和验证六.配置Docker阿里云镜像加速(全程免费) 该文章文章演示在Linux系统中安装docker&#xff0c;Windows安装docker请参考以下文章 Windows系统中安装docker及镜像加速的配置 一…

基于android的物业管理系统的设计与实现19.8

目录 基于android的物业管理系统的设计与实现 3 摘 要 3 Android property managemengt system 5 Abstract 5 1 绪论 6 1.1 选题背景 6 1.2 课题研究现状 6 1.3 设计研究主要内容 7 1.4 系统主要设计思想 8 2 开发环境 8 2.1 Android系统的结构 8 图2-1 Android系统架构图 9 2…

kibana新增查看更新删除es中的数据

登录kibana&#xff0c;打开开发工具 写入数据 PUT test20240311/person/1 {"name": "张三","mobile":"13011111111" } 查询数据 GET /test20240311/person/_search {"query": {"term": {"mobile": {…

中科数安|公司办公终端、电脑文件数据 \ 资料防泄密系统

#中科数安# 中科数安是一家专注于信息安全技术与产品研发的高新技术企业&#xff0c;其提供的公司办公终端、电脑文件数据及资料防泄密系统&#xff08;也称为终端数据防泄漏系统或简称DLP系统&#xff09;主要服务于企业对内部敏感信息的安全管理需求。 www.weaem.com 该系统…