Flink—— Data Source 介绍

Data Source 简介

        Flink 做为一款流式计算框架,它可用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即实时的处理些实时数据流,实时的产生数据流结果,只要数据源源不断的过来,Flink 就能够一直计算下去,这个 Data Sources 就是数据的来源地。

        Flink 中你可以使用 StreamExecutionEnvironment.addSource(sourceFunction) 来为你的程序添加数据来源。

        Flink 已经提供了若干实现好了的 source functions,当然你也可以通过实现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source。

Flink Data Source分类

Flink的数据源可以根据数据的来源和特性进行分类。以下是常见的Flink数据源分类:

集合数据源

        集合数据源(Collection Data Source):集合数据源指的是将本地的集合或数组作为输入数据的数据源。在Flink中,可以使用fromCollection、fromElements等方法将Java或Scala中的集合数据转化为数据流进行处理。

1、fromCollection(Collection) - 从 Java 的 Java.util.Collection 创建数据流。集合中的所有元素类型必须相同。

2、fromCollection(Iterator, Class) - 从一个迭代器中创建数据流。Class 指定了该迭代器返回元素的类型。

3、fromElements(T …) - 从给定的对象序列中创建数据流。所有对象类型必须相同。

4、fromParallelCollection(SplittableIterator, Class) - 从一个迭代器中创建并行数据流。Class 指定了该迭代器返回元素的类型。

5、generateSequence(from, to) - 创建一个生成指定区间范围内的数字序列的并行数据流。

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import java.util.Arrays;
import java.util.List;

public class CollectionDataSourceExample {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 创建一个包含整数的集合
        List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);

        // 将集合转化为Flink的DataSet
        DataSet<Integer> dataset = env.fromCollection(data);

        // 打印数据集中的元素
        dataset.print();
    }
}

关于使用集合数据源的注意事项:

  1. 数据规模:集合数据源适用于小规模数据集。确保你的数据集在内存中能够合理存放,不至于导致内存溢出。

  2. 内存消耗:集合数据源会将所有数据存储在内存中,因此需要谨慎处理大型数据集,避免对内存资源造成过大压力。

  3. 并行度设置:在集群环境下,可以通过设置并行度来充分利用集群资源,提高作业的执行效率。

  4. 调试和测试:集合数据源非常适合用于本地调试和测试,可以快速验证处理逻辑并观察输出结果。

使用集合数据源时需要注意这些方面,以确保作业能够稳定运行并获得良好的性能表现。

文件数据源

        文件数据源(File Data Source):文件数据源用于从文件系统中读取数据,可以是本地文件系统或分布式文件系统(如HDFS)。Flink提供了readTextFile、readCsvFile等方法来支持常见文件格式的数据读取。

1、readTextFile(path) - 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回。

2、readFile(fileInputFormat, path) - 根据指定的文件输入格式读取文件(一次)。

3、readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是上面两个方法内部调用的方法。它根据给定的 fileInputFormat 和读取路径读取文件。根据提供的 watchType,这个 source 可以定期(每隔 interval 毫秒)监测给定路径的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次路径对应文件的数据并退出(FileProcessingMode.PROCESS_ONCE)。你可以通过 pathFilter 进一步排除掉需要处理的文件。

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;

public class FileDataSourceExample {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 从文件创建数据集
        String filePath = "path/to/your/file.txt";
        DataSet<String> text = env.readTextFile(filePath);

        // 打印文件中的内容
        text.print();
    }
}

关于使用文件数据源的注意事项:

  1. 文件路径:确保提供的文件路径是正确的,可以是本地文件系统路径,也可以是HDFS路径或其他支持的文件系统路径。

  2. 文件格式:Flink支持多种文件格式,包括文本文件、CSV文件、Parquet文件等。根据实际情况选择合适的文件格式进行读取。

  3. 并行度设置:在集群环境下,可以通过设置并行度来充分利用集群资源,提高文件读取的并行处理能力。

  4. 文件分区:对于大型文件,可以考虑文件分区和并行读取,以加速数据的加载和处理过程。

  5. 文件读取性能:尽量避免频繁的小文件读取操作,因为这会增加文件系统的负担并降低整体性能。

使用文件数据源时需要注意以上方面,以确保能够有效地读取文件数据,并且提高作业的执行效率。

Socket数据源

        Socket数据源(Socket Data Source):Socket数据源允许通过网络套接字接收数据,通常用于测试和演示目的。Flink可以使用socketTextStream方法从TCP socket接收数据流。

socketTextStream(String hostname, int port) - 从 socket 读取。元素可以用分隔符切分。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SocketDataSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从socket创建数据流
        String hostname = "localhost";
        int port = 9999;
        env.socketTextStream(hostname, port)
           .print();

        // 执行作业
        env.execute("Socket Data Source Example");
    }
}

关于使用Socket数据源的注意事项:

  1. 主机和端口:确保指定的主机和端口是正确的,并且能够与数据源通信。

  2. 网络延迟:由于Socket数据源涉及网络通信,因此可能受到网络延迟的影响。需要考虑网络性能对作业整体性能的影响。

  3. 并行度设置:可以通过设置并行度来充分利用集群资源,提高数据流处理的并行能力。

  4. 数据格式:需要确保从Socket接收到的数据能够被正确解析和处理,例如按行读取文本数据等。

  5. 容错机制:在使用Socket数据源时,需要考虑作业的容错机制,以确保在发生故障或数据丢失时能够正确处理和恢复。

使用Socket数据源时需要注意以上方面,以确保能够有效地接收数据并提高作业的执行效率。

自定义数据源

        自定义数据源(Custom Data Source):除了上述内置的数据源外,Flink还支持自定义数据源。用户可以实现自己的SourceFunction接口来定义特定的数据生成逻辑,例如从消息队列、数据库、传感器等实时数据源中读取数据。

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class CustomDataSource extends RichParallelSourceFunction<String> {
    private boolean running = true;

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (running) {
            // 生成数据
            String data = generateData();
            // 发射数据
            ctx.collect(data);
            // 控制数据生成频率
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }

    private String generateData() {
        // 实现自定义的数据生成逻辑
        return "some data";
    }
}

        在这个示例中,我们创建了一个名为CustomDataSource的类,它继承自RichParallelSourceFunction并指定了数据类型为String。在run方法中,我们使用一个循环来生成数据并通过collect方法将数据发射出去。在cancel方法中,我们设置了一个标志位来控制数据源的运行状态。

关于使用自定义数据源的注意事项:

  1. 并行度设置:根据数据源的性质和数据量合理地设置并行度,以充分利用集群资源。

  2. 数据生成频率:确保数据生成的频率和速度能够适应作业的处理能力,避免数据源产生过快导致作业无法及时处理。

  3. 容错机制:在自定义数据源中,需要考虑作业的容错机制,例如在发生故障时如何正确处理和恢复。

  4. 数据格式:确保从自定义数据源产生的数据能够被正确解析和处理,符合作业的输入要求。

  5. 资源管理:需要确保自定义数据源的资源占用和生命周期管理,避免资源泄露或过度占用资源。

使用自定义数据源时需要考虑以上方面,并确保能够有效地产生数据并提高作业的执行效率。

Apache Kafka数据源

        Apache Kafka数据源(Kafka Data Source):作为流数据处理框架,Flink对Kafka提供了良好的集成支持。可以使用addSource方法结合Flink的Kafka Connector来从Kafka主题中读取数据。

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class KafkaDataSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka配置
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "flink-consumer-group");

        // 创建Kafka数据流
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), properties);
        DataStream<String> kafkaDataStream = env.addSource(kafkaConsumer);

        kafkaDataStream.print();

        // 执行作业
        env.execute("Kafka Data Source Example");
    }
}

在这个示例中,我们首先创建了一个StreamExecutionEnvironment对象,然后设置Kafka的连接配置,包括bootstrap servers和consumer group id等。接下来,我们创建了一个FlinkKafkaConsumer对象,指定了要消费的topic以及数据的序列化方式,并将其添加到流处理环境中。最后,我们通过调用print方法来打印数据流中的内容,并通过execute方法启动作业并执行。

关于使用Kafka数据源的注意事项:

  1. Kafka配置:确保指定的Kafka配置正确,并能够与Kafka集群进行通信。

  2. 序列化方式:根据实际情况选择合适的数据序列化方式,例如SimpleStringSchema、JSON、Avro等。

  3. 并行度设置:可以通过设置并行度来充分利用集群资源,提高数据流处理的并行能力。

  4. 数据消费策略:需要考虑消费数据的策略,如是否从最新/最旧的数据开始消费,以及如何处理消费过程中的偏移量。

  5. 容错机制:在使用Kafka数据源时,需要考虑作业的容错机制,以确保在发生故障或数据丢失时能够正确处理和恢复。

使用Kafka数据源时需要注意以上方面,以确保能够有效地消费Kafka中的数据并提高作业的执行效率。

Apache Pulsar数据源

        Apache Pulsar数据源(Pulsar Data Source):类似于Kafka,Flink也集成了对Pulsar的支持,可以直接从Pulsar主题中读取数据。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.PulsarClientException;

public class PulsarDataSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String serviceUrl = "pulsar://localhost:6650";
        String topic = "my-topic";

        FlinkPulsarSource<String> pulsarSource = new FlinkPulsarSource<>(
                serviceUrl,
                topic,
                Schema.STRING
        );

        DataStream<String> pulsarDataStream = env.addSource(pulsarSource);

        pulsarDataStream.print();

        env.execute("Pulsar Data Source Example");
    }
}

        在这个示例中,我们首先创建了一个StreamExecutionEnvironment对象,然后指定了Pulsar的连接信息和要消费的topic。接下来,我们创建了一个FlinkPulsarSource对象,并指定了Pulsar的serviceUrl、topic以及数据的Schema,并将其添加到流处理环境中。最后,我们通过调用print方法来打印数据流中的内容,并通过execute方法启动作业并执行。

关于使用Pulsar数据源的注意事项:

  1. Pulsar连接配置:确保指定的Pulsar连接信息正确,并能够与Pulsar集群进行通信。

  2. Schema设置:根据实际情况选择合适的数据Schema,例如STRING、JSON、AVRO等。

  3. 并行度设置:可以通过设置并行度来充分利用集群资源,提高数据流处理的并行能力。

  4. 数据消费策略:需要考虑消费数据的策略,如是否从最新/最旧的数据开始消费,以及如何处理消费过程中的偏移量。

  5. 容错机制:在使用Pulsar数据源时,需要考虑作业的容错机制,以确保在发生故障或数据丢失时能够正确处理和恢复。

        使用Pulsar数据源时需要注意以上方面,以确保能够有效地消费Pulsar中的数据并提高作业的执行效率。

        这些不同类型的数据源为Flink应用程序提供了灵活的数据接入方式,使得Flink可以轻松地处理不同来源和格式的数据。根据具体的业务需求和场景特点,可以选择合适的数据源类型来构建流处理和批处理应用程序。

更多消息资讯,请访问昂焱数据(https://www.ayshuju.com)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/123842.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

福州湾107㎡三室两厅两卫,温柔如风的奶油原木风,自由浪漫的灵魂。福州中宅装饰,福州装修

今天要分享的是一套面积107平米的奶油原木风三室两厅的案例。设计师于业主诉求中抽丝剥茧&#xff0c;汲取灵感&#xff0c;以大热的现代风格为主&#xff0c;暖色为主基调&#xff0c;配合原木肌理和巧思的质感细节装饰&#xff0c;最终打造出一种自由与惬意的空间。 01丨业 主…

React路由与导航

目录 前言&#xff1a; 什么是React路由&#xff1f; 导航和页面切换 路由参数和动态路由 路由守卫和权限控制 总结 前言&#xff1a; React是一个流行的JavaScript库&#xff0c;用于构建用户界面。在使用React开发Web应用程序时&#xff0c;路由和导航是必不可少的功能…

大语言模型研究进展综述

1、历史 自20世纪50年代图灵测试被提出以来&#xff0c;研究人员一直在探索和开发能够理解并掌握语言的人工智能技术。 作为重要的研究方向之一&#xff0c;语言模型得到了学术界的广泛研究&#xff0c;从早期的统计语言模型和神经语言模型开始&#xff0c;发展到基于Transform…

1、Sentinel基本应用限流规则(1)

Sentinel基本应用&限流规则 1.1 概述与作用 随着微服务的流行&#xff0c;服务和服务之间的稳定性变得越来越重要。缓存、降级和限流是保护微服务系统运行稳定性的三大利器。 缓存&#xff1a;提升系统访问速度和增大系统能处理的容量 降级&#xff1a;当服务出问题或者影…

osgEarth之添加shp

目录 效果 代码 代码分析 加载模式 效果 代码 #include "stdafx.h" #include <osg/Notify> #include <osgGA/StateSetManipulator> #include <osgViewer/Viewer> #include <osgViewer/ViewerEventHandlers>#include <osgEarth/MapNo…

IP-guard WebServer 远程命令执行漏洞

IP-guard WebServer 远程命令执行漏洞 免责声明漏洞描述漏洞影响漏洞危害网络测绘Fofa: app="ip-guard"漏洞复现1. 构造poc2. 访问文件3. 执行命令免责声明 仅用于技术交流,目的是向相关安全人员展示漏洞利用方式,以便更好地提高网络安全意识和技术水平。 任何人不得…

高速信号PCB布局怎么布?(电子硬件)

对于高速信号&#xff0c;pcb的设计要求会更多&#xff0c;因为高速信号很容易收到其他外在因素的干扰&#xff0c;导致实际设计出来的东西和原本预期的效果相差很多。 所以在高速信号pcb设计中&#xff0c;需要提前考虑好整体的布局布线&#xff0c;良好的布局可以很好的决定布…

AI:67-基于深度学习的脱机手写汉字识别

🚀 本文选自专栏:AI领域专栏 从基础到实践,深入了解算法、案例和最新趋势。无论你是初学者还是经验丰富的数据科学家,通过案例和项目实践,掌握核心概念和实用技能。每篇案例都包含代码实例,详细讲解供大家学习。 📌📌📌在这个漫长的过程,中途遇到了不少问题,但是…

Django(二、静态文件的配置、链接数据库MySQL)

文章目录 一、静态文件及相关配置1.以登录功能为例2.静态文件3.资源访问4.静态文件资源访问如何解决&#xff1f; 二、静态文件相关配置1. 如何配置静态文件配置&#xff1f;2.接口前缀3. 接口前缀动态匹配4. form表单请求方法补充form表单要注意的点 三、request对象方法reque…

阿里云 :推出通义大模型编码助手产品【通义灵码】

本心、输入输出、结果 文章目录 阿里云 &#xff1a;推出通义大模型编码助手产品【通义灵码】前言通义灵码简介主要功能主要功能点 支持的语言和 IDEjetbrains IDEA 安装计费相关弘扬爱国精神 阿里云 &#xff1a;推出通义大模型编码助手产品【通义灵码】 编辑&#xff1a;简简…

SOLIDWORKS --电磁仿真篇

什么是 SIMULIA? 基于3DEXPERIENCE平台的品牌 多学科多领域的协同仿真与分析优化 三大核心仿真领域 结构仿真 流体仿真 SIMULIA电磁仿真是什么? 完备的求解技术&#xff0c;支持从静场、低频到高频、光波的电磁仿真&#xff0c;支持全波仿真、混合仿真、多物理场仿真和场路…

任务管理器的正确使用教程

快捷键 Ctrlshiftesc&#xff1a;进入任务管理器 我以Win11举例 如何给XX排序 给XX排序&#xff0c;点击空白处可以选择某项降序排列&#xff08;可以找到最占用某项资料的程序&#xff09;&#xff0c;再点击空白处可以选择某项升序排列 文件正在使用&#xff0c;如何解决 …

台式电脑怎么无损备份迁移系统到新硬盘(使用傲梅,免费的就可以)

文章目录 前言一、想要将源硬盘上的系统原封不动地迁移到新硬盘上二、准备工作2.具体步骤 总结 前言 半路接手公司一台台式电脑&#xff0c;C盘&#xff08;120g&#xff09;爆红&#xff0c;仅剩几个G&#xff0c;优化了几次&#xff0c;无果后。准备换一个大一点的增到500g。…

Python实现从Labelme数据集中挑选出含有指定类别的数据集

Python实现从Labelme数据集中挑选出含有指定类别的数据集 前言前提条件相关介绍实验环境Labelme数据集中挑选出含有指定类别的数据集代码实现输出结果 前言 由于本人水平有限&#xff0c;难免出现错漏&#xff0c;敬请批评改正。更多精彩内容&#xff0c;可点击进入Python日常小…

【STM32 开发】| INA219采集电压、电流值

目录 前言1 原理图2 IIC地址说明3 寄存器地址说明4 开始工作前配置5 程序代码1&#xff09;驱动程序2&#xff09;头文件3) 测试代码 前言 INA219 是一款具备 I2C 或 SMBUS 兼容接口的分流器和功率监测计。该器件监测分流器电压降和总线电源电压&#xff0c;转换次数和滤波选项…

算法打卡01——求两数之和

题目&#xff1a; 给定一个整数数组 nums 和一个整数目标值 target&#xff0c;请你在该数组中找出 和为目标值 target 的那 两个 整数&#xff0c;并返回它们的数组下标。 你可以假设每种输入只会对应一个答案。但是&#xff0c;数组中同一个元素在答案里不能重复出现。 你…

Python进行数据可视化,探索和发现数据中的模式和趋势。

文章目录 前言第一步&#xff1a;导入必要的库第二步&#xff1a;加载数据第三步&#xff1a;创建基本图表第四步&#xff1a;添加更多细节第五步&#xff1a;使用Seaborn库创建更复杂的图表关于Python技术储备一、Python所有方向的学习路线二、Python基础学习视频三、精品Pyth…

2023年度API安全状况详解

随着云计算和移动应用的快速发展&#xff0c;API&#xff08;应用程序接口&#xff09;已成为不可或缺的技术组成部分。然而&#xff0c;API的广泛使用也带来了安全风险。本文将探讨2023年的API安全状况&#xff0c;并介绍了一些应对这些安全挑战的最佳实践。 引言 随着全球互联…

【Leetcode】【每日一题】【简单】2609. 最长平衡子字符串

力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台备战技术面试&#xff1f;力扣提供海量技术面试资源&#xff0c;帮助你高效提升编程技能&#xff0c;轻松拿下世界 IT 名企 Dream Offer。https://leetcode.cn/problems/find-the-longest-balanced-subs…

在linux安装单机版hadoop-3.3.6

一、下载hadoop https://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/core/hadoop-3.3.6/ 二、配置环境变量 1、配置java环境变量 2、配置hadoop环境变量 export HADOOP_HOME/usr/local/bigdata/hadoop-3.3.6 export HBASE_HOME/usr/local/bigdata/hbase-2.5.6 export JA…