Flink DataSource介绍

介绍

Flink的Data Source(数据源、源算子)是Flink作业的起点,它定义了数据输入的来源。Flink可以从各种数据来源获取数据,例如文件系统、消息队列、数据库等。以下是对Flink Data Source的详细介绍:

概述:

  • Flink中的Data Source用于定义数据输入的来源。
  • 将数据源添加到Flink执行环境中,可以创建一个数据流。
  • Flink支持多种类型的数据源,包括内置数据源和自定义数据源。

内置数据源:

  • 基于集合构建:使用Flink的API(如fromCollection、fromElements等)将Java或Scala中的集合数据转化为数据流进行处理。
  • 基于文件构建:从文件系统中读取数据,支持多种文件格式,如CSV、JSON等。
  • 基于Socket构建:从Socket连接中读取数据,适用于实时流数据场景。

自定义数据源:

  • Flink允许用户通过实现SourceFunction接口或扩展RichParallelSourceFunction来自定义数据源。
  • 常见的自定义数据源包括从第三方系统连接器(如Kafka、RabbitMQ、MongoDB等)中读取数据。

添加数据源到Flink执行环境:

  • 使用StreamExecutionEnvironment.addSource(sourceFunction)方法将数据源添加到Flink执行环境中。
  • sourceFunction需要实现SourceFunction接口或扩展RichParallelSourceFunction。

数据流处理:

  • 一旦数据源被添加到Flink执行环境中,就可以创建一个数据流(DataStream)。
  • 接下来,可以使用Flink的各种算子(如map、filter、reduce等)对数据流进行转换处理。

输出结果:

  • 处理后的数据可以写入其他系统,如文件系统、数据库、消息队列等。
  • Flink支持多种输出方式,如使用DataStream的writeAsText、writeAsCsv等方法将数据写入文件,或使用Flink的连接器将数据写入Kafka、HBase等系统。

总之,Flink的Data Source是构建Flink数据流处理应用的重要组成部分。通过选择合适的数据源和输出方式,可以方便地构建高效、可靠的数据流处理应用。

样例

程序中添加数据源

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.addSource(new SourceMySQL()).print();
        env.execute("Flink add mysql sourc");

Flink 已经提供了若干实现好了的SourceFunctions也可以通过实现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source,

stream sources

StreamExecutionEnvironment 加载Source

基于集合:

  1. fromCollection(Collection)
  2. fromCollection(Iterator, Class)
  3. fromElements(T …)
  4. fromParallelCollection(SplittableIterator, Class)
  5. generateSequence(from, to)
    例如:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input = env.fromElements(
	new Event(1, "ba", 4.0),
	new Event(2, "st", 5.0),
	new Event(3, "fo", 6.0),
	...
);

文件

  1. readTextFile(String filePath)
  2. readTextFile(String filePath, String charsetName)
  3. readFile(FileInputFormat inputFormat, String filePath)
    样例:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/file");

Socket

  1. socketTextStream(String hostname, int port)
  2. socketTextStream(String hostname, int port, String delimiter)
  3. socketTextStream(String hostname, int port, String delimiter, long maxRetry)
    样例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple2<String, Integer>> dataStream = env
        .socketTextStream("localhost", 8888) // 监听 localhost 的 8888端口过来的数据
        .flatMap(new Splitter())
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);

自定义资源

通过实现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source。
继承类

SourceFunction 非并行

SourceFunction 是一个用于定义数据源的函数接口。这个接口的实现通常负责从外部系统(如 Kafka、文件系统、数据库等)读取数据,并将这些数据作为 Flink 流处理或批处理作业的输入。

SourceFunction 通常与 Flink 的 DataStream API 一起使用,以定义和构建数据流处理任务。尽管 Flink 的内部实现和 API 可能会随着版本的更新而有所变化,但通常,一个 SourceFunction 会被实现为:

  1. 创建一个可以持续生成数据或等待外部数据到达的组件。
  2. 当有新数据到达时,将数据作为 Flink 的 DataStream 的一部分进行发射(emit)。

在 Flink 的内部,SourceFunction 可能会有多种不同的实现,具体取决于其要处理的数据源类型。例如,对于 Kafka 这样的消息队列,Flink 提供了专门的 Kafka 连接器和相应的 SourceFunction 实现,用于从 Kafka 主题中读取数据。

在实现自定义的 SourceFunction 时,你需要考虑以下几个方面:

  • 数据源的连接和断开连接逻辑。
  • 数据的读取和解析逻辑。
  • 如何在 Flink 运行时环境中优雅地处理可能的错误和失败。
  • 如何将读取的数据转换为 Flink 可以理解的格式(如 Tuple、POJO 或其他自定义类型)。

ParallelSourceFunction 并行

ParallelSourceFunction 是一个接口,用于定义并行数据源的行为。这个接口允许你创建自定义的数据源,这些数据源能够并行地读取数据并传递给 Flink 的数据处理管道。

ParallelSourceFunction 继承自 SourceFunction,但增加了并行处理的能力。当 Flink 任务需要并行处理多个数据流时,你可以通过实现 ParallelSourceFunction 来创建并行数据源。

Flink 还提供了一个 RichParallelSourceFunction 抽象类,它是 ParallelSourceFunction 的子类,并提供了更多的生命周期方法和上下文信息。使用 RichParallelSourceFunction 可以让你更容易地管理你的并行数据源,因为它提供了诸如 open()、close() 和 cancel() 等方法,这些方法可以在数据源的生命周期中的不同阶段被调用。

下面是一个简单的示例,演示了如何使用 RichParallelSourceFunction 创建一个并行数据源,该数据源生成递增的数字:

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;  
public class IncrementingNumberSource extends RichParallelSourceFunction<Long> {  
    private volatile boolean running = true;  
    private long count = 0L;  
    @Override  
    public void run(SourceContext<Long> ctx) throws Exception {  
        while (running) {  
            synchronized (ctx.getCheckpointLock()) {  
                ctx.collect(count++);  
                // 这里可以添加一些休眠或其他逻辑来控制数据的生成速度  
                Thread.sleep(100);  
            }  
        }  
    }  
    @Override  
    public void cancel() {  
        running = false;  
    }  
}

在这个示例中,IncrementingNumberSource 类继承了 RichParallelSourceFunction,并覆盖了 run() 和 cancel() 方法。在 run() 方法中,我们创建了一个无限循环来生成递增的数字,并使用 ctx.collect() 方法将每个数字发送到 Flink 的数据处理管道中。在 cancel() 方法中,我们设置了一个标志来停止 run() 方法中的循环,以便在需要时可以优雅地关闭数据源。

自定义资源DEMO

/**
 * Desc: 自定义 source mysql 数据
 */
public class SourceMySQL extends RichSourceFunction<Map<String, Object>> {

    PreparedStatement ps;
    private Connection connection;

    /**
     * open 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接。
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = MySQLUtil.getConnection("com.mysql.jdbc.Driver","jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8","root","123456");
        String sql = "select * from ST;";
        ps = this.connection.prepareStatement(sql);
    }

    /**
     * 关闭连接和释放资源的动作
     */
    @Override
    public void close() throws Exception {
        super.close();
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    /**
     * DataStream 从run方法用来获取数据
     */
    @Override
    public void run(SourceContext<Map<String, Object>> ctx) throws Exception {
        ResultSet resultSet = ps.executeQuery();
        while (resultSet.next()) {
            Map<String, Object> rs = new HashMap<>();
            rs.put("id", resultSet.getInt("id"));
            rs.put("name", resultSet.getString("name").trim());
            ctx.collect(rs);
        }
    }
    @Override
    public void cancel() {
    }
}


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/614459.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ZigBee设备入网流程抓包分析(以飞利浦灯泡为例)

1.第一步&#xff0c;网关打开入网许可&#xff0c;广播Pjoin 2.第二步&#xff0c;设备开始扫网&#xff0c;发送Beacon Request 3.第三步&#xff0c;网关收到Beacon Request请求后&#xff0c;应答Beacon数据帧 4.收到可入网的Beacon帧后&#xff0c;发送关联请求&#xff0…

Java的时间类

1. 日期类 1.1 第一代日期类 1) Date: 精确到毫秒&#xff0c;代表特定的瞬间 2) SimpleDateFormat: 格式和解析日期的类 SimpleDateFormat 格式化和解析日期的具体类。它允许进行格式化(日期-→>文本)、解析(文本->日期)和规范化. import java.text.ParseExce…

Android AOSP Ubuntu源码编译电脑卡顿问题定位解决

文章目录 问题概述分析问题解决问题查看交换分区创建交换分区删除交换分区调整交换分区的活跃度 问题概述 开发SystemUI时&#xff0c;使用内存为16G的主机&#xff0c;Ubuntu 20.04的系统编译SystemUI的源码&#xff0c;编译的过程中发现电脑卡顿&#xff0c;鼠标不能移动。必…

在Linux上安装并运行RabbitMQ

目录 准备CentOS服务器 下载rabbit-server和erlang文件 启动RabbitMQ服务 准备CentOS服务器 两个命令&#xff0c;选一个能用的&#xff0c;查看CentOS服务器的版本 lsb_release -a下载rabbit-server和erlang文件 参考文章&#xff1a;http://t.csdnimg.cn/t8BbM 1、创建新…

IPv6资产测绘哪家强?揭秘新一代网络空间资产测绘平台的独门秘籍

网络空间资产测绘&#xff0c;即通过一系列技术手段&#xff0c;对网络中的各类资产进行全面的发现、分类和定位&#xff0c;为各类用户提供精准的数据支撑和决策依据。网络空间资产测绘作为一门新兴的交叉学科&#xff0c;融合了计算机网络技术、数据挖掘、人工智能、信息安全…

Mat: Unknown HPROF Version

问题&#xff1a;Mat 加载 android studio 导出的 hprof 文件失败 原因&#xff1a;android hprof 文件不是标准的 java hprof 文件 解决办法&#xff1a; 使用 android sdk 自带的命令将 hprof 转换成标准的 java hprof

去哪里找高清视频素材?推荐几个短视频素材免费网站

在数字时代&#xff0c;视频内容的质量直接影响观众的吸引力和留存率。尤其是高清、4K视频素材和可商用素材&#xff0c;它们在提升视觉质量和叙事深度方面起到了至关重要的作用。以下是一些国内外的顶级视频素材网站&#xff0c;它们提供的资源将为您的创作提供极大的支持和灵…

企业微信hook接口协议,ipad协议http,群发消息(每天每人一次)

群发消息&#xff08;每天每人一次&#xff09; 参数名必选类型说明uuid是String每个实例的唯一标识&#xff0c;根据uuid操作具体企业微信vids是long要发送的人或者群ididisroom是bool是否是群消息 请求示例 {"uuid":"xxxxxxxx","vids": [&qu…

渗透思考题

一&#xff0c;尝试登录。 客户端对密码进行哈希处理并缓存密码hash&#xff0c;丢弃实际的明文密码&#xff0c;然后将用户名发送到服务器&#xff0c;发起认证请求 密文存储位置&#xff1a;数据库文件位于C:WindowsSystem32configsam&#xff0c;同时挂载在注册表中的HKLMSA…

用python写算法——栈笔记

栈 栈的定义相关算法题 栈的定义 1.它是一种运算受限的线性表。限定仅在表尾进行插入和删除操作的线性表。这一端被称为栈顶&#xff0c;相对地&#xff0c;把另一端称为栈底。向一个栈插入新元素又称作进栈、入栈或压栈&#xff0c;它是把新元素放到栈顶元素的上面&#xff0…

12个网上赚钱野路子信息差,人人可做的赚钱小项目!

在这个多元化的时代&#xff0c;副业已经成为许多人增加收入、实现自我价值的重要途径。今天&#xff0c;我们就来聊聊那些既有趣又能赚钱的副业项目&#xff0c;让你的钱包鼓起来&#xff01; 1.文字创作 写作不仅是情感的宣泄&#xff0c;更是财富的积累。无论是自媒体文、软…

【3dmax笔记】036:FDD修改器

一、FDD修改器简介 FDD修改器是对模型进行变形处理的命令,FDD后面的数字越大,编辑节点越多,编辑越精细,但是FDD控制点多的同时,模型上的节点也要多才可以。 FFD修改器是一种非常灵活的修改器,可以让我们对模型进行自由的变形操作。通过在FFD修改器中设置变形点,我们可…

HarmonyOS开发案例:【生活健康app之获取成就】(3)

获取成就 本节将介绍成就页面。 功能概述 成就页面展示用户可以获取的所有勋章&#xff0c;当用户满足一定的条件时&#xff0c;将点亮本页面对应的勋章&#xff0c;没有得到的成就勋章处于熄灭状态。共有六种勋章&#xff0c;当用户连续完成任务打卡3天、7天、30天、50天、…

域基础-NTLM协议

简介 NTLM(New Technology LAN Manager)协议是微软用于Windows身份验证的主要协议之一。继SMB、LM协议之后微软提出了NTLM协议&#xff0c;这一协议安全性更高&#xff0c;不仅可以用于工作组中的机器身份验证&#xff0c;又可以用于域环境身份验证&#xff0c;还可以为SMB、H…

任务:单域,域树的搭建

一、单域&#xff1a; 搭建所需的系统&#xff1a;win2016 sever&#xff0c;win10 1.在创建域前&#xff0c;先设置静态ip 先查看win2016 sever的IP&#xff0c; ip&#xff1a;192.168.154.133 网关&#xff1a;192.168.154.2 DNS服务器&#xff1a;192.168.154.2 设置…

【计算机网络原理】初识网络原理和一些名词解释​​

˃͈꒵˂͈꒱ write in front ꒰˃͈꒵˂͈꒱ ʕ̯•͡˔•̯᷅ʔ大家好&#xff0c;我是xiaoxie.希望你看完之后,有不足之处请多多谅解&#xff0c;让我们一起共同进步૮₍❀ᴗ͈ . ᴗ͈ აxiaoxieʕ̯•͡˔•̯᷅ʔ—CSDN博客 本文由xiaoxieʕ̯•͡˔•̯᷅ʔ 原创 CSDN 如…

前端笔记-day04

文章目录 01-后代选择器02-子代选择器03-并集选择器04-交集选择器05-伪类选择器06-拓展-超链接伪类07-CSS特性-继承性08-CSS特性-层叠性09-CSS特性-优先级11-Emmet写法12-背景图13-背景图平铺方式14-背景图位置15-背景图缩放16-背景图固定17-background属性18-显示模式19-显示模…

Agisoft Metashape Pro for Mac/win:开启三维建模新视界

在当今数字化的时代&#xff0c;三维建模技术正发挥着越来越重要的作用。而 Agisoft Metashape Pro for Mac/win 无疑是该领域的一颗璀璨明星。 这款强大的三维建模软件为专业人士和爱好者提供了无与伦比的工具和功能。无论你是从事建筑设计、考古研究、影视特效制作还是地理信…

acer笔记本怎样进行系统还原?教你两招!

acer笔记本怎样进行系统还原&#xff1f;教你两招&#xff01; 作为笔记本用户&#xff0c;你在日常使用中可能会遇到各种各样的电脑问题。一般来说&#xff0c;对于一些小问题&#xff0c;我们可以通过一些简单的操作来解决&#xff0c;比如重新启动电脑或者长按电源键强制关机…

经典多模态大模型

“浅对齐”模型 经典多模态结构BLIP2 Motivation 端到端的进行vision-language预训练成本太大了&#xff0c;之前存在很多预训练好的模型&#xff0c;这篇文章希望能够使用这些训练好的参数&#xff0c;节约成本。 如果直接冻结预训练好的参数&#xff0c;去做下游任务&…