DataStream API(源算子)

目录

源算子

1,从集合中读取数据

2,从文件读取数据

3,从 Socket 读取数据

4,从 Kafka 读取数据

5,自定义源算子

6,Flink 支持的数据类型

6.1 Flink 支持多种数据类型,包括但不限于: 

6.2对于 POJO 类型,Flink 有以下要求:

转换算子

输出算子


源算子

       

 Flink是一个强大的流处理框架,可以从各种数据源中获取数据,并构建DataStream进行转换处理。在Flink中,数据的输入来源通常被称为数据源,而读取数据的算子被称为源算子(Source)。因此,Source可以被视为整个处理程序的输入端。

        在Flink代码中,添加源算子的通用方式是调用执行环境的addSource()方法。通过调用addSource()方法,可以获取DataStream对象,并向该方法传入一个实现了SourceFunction接口的对象作为参数。该接口定义了从数据源读取数据的方法。

        以下是一个示例代码片段,演示如何使用addSource()方法添加源算子:


	import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 

	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

	// 添加源算子 

	DataStream<String> stream = env.addSource(new CustomSourceFunction());

        在上面的示例中,CustomSourceFunction是一个实现了SourceFunction接口的自定义类,它定义了从特定数据源读取数据的方法。通过调用addSource()方法并将CustomSourceFunction实例作为参数传递,可以创建一个包含源算子的DataStream对象。

        通过这种方式,您可以轻松地将Flink与各种数据源集成,并开始处理流数据。请注意,Flink还提供了其他方法来添加数据源和构建DataStream,具体取决于您的需求和使用的编程语言。

1,从集合中读取数据

        在处理数据时,直接在代码中创建集合并使用执行环境的 fromCollection 方法进行读取是一种简单且常用的方式。这种方法将数据临时存储在内存中,并形成一个特殊的数据结构,作为数据源供后续处理使用。

        这种方式的优点在于简单易用,适用于数据量较小且不需要频繁更新的场景。它为开发者提供了一种快速在代码中创建数据集的方式,尤其在测试和验证阶段非常方便。

        然而,这种方式的局限性也比较明显。由于数据存储在内存中,它不适合处理大规模数据集,因为内存资源有限。此外,一旦程序结束,数据集就会丢失,无法用于持久化存储或后续分析。

        综上所述,虽然直接在代码中创建集合并使用执行环境的 fromCollection 方法进行读取是一种简单且常用的方式,但它主要用于测试和较小规模数据的处理。对于大规模数据或需要持久化存储的场景,建议使用更稳定和可靠的数据源读取方式,如从文件、数据库或消息队列系统中获取数据。

package com.atguigu.chapter05
import org.apache.flink.streaming.api.scala._
object SourceCollection {
 def main(args: Array[String]): Unit = {
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 env.setParallelism(1)
 val clicks = List(Event("Mary", "/.home", 1000L), Event("Bob", "/.cart", 
2000L))
 val stream = env.fromCollection(clicks)
 stream.print()
 env.execute()
 }
}

2,从文件读取数据

        在真实的应用场景中,将数据直接写入代码并不是常见的做法。这样做不仅降低了代码的可维护性和灵活性,而且难以应对数据变化或数据源增多的情况。

        为了提高数据处理和应用的灵活性与可扩展性,通常我们会从各种存储介质中获取数据。其中,读取日志文件是一种非常常见的方式。日志文件由于其结构化、标准化的特点,成为了许多应用进行数据收集、分析和处理的理想选择。

3,从 Socket 读取数据

        数据处理中,我们通常面对的是有界数据,无论是从集合还是文件中读取。然而,在流处理场景中,数据呈现出截然不同的特性。流数据通常是无限的或具有近乎无限的生命周期,因为它不断地生成并在持续的时间段内持续存在。

        当我们提到从 socket 读取文本流时,它作为一个简单的例子,突显了流处理的一个关键特点。这种方式的数据源是无界的,因为文本可以从 socket 源源不断地传输进来,没有明确的结束点。这种无界数据流的处理带来了独特的挑战,需要特定的策略和技术来应对。

        值得注意的是,这种方式由于其吞吐量较小和稳定性较差,通常仅用于测试和演示目的。在生产环境中,为了高效地处理流数据并获得可靠的性能,更常见的做法是利用消息队列系统如 Apache Kafka 作为数据传输层。

4,从 Kafka 读取数据

        Kafka 和 Flink 的结合,使得流处理能力得到了极大的提升。Kafka 作为分布式消息传输队列,具有高吞吐、易于扩展的特性,为实时流处理提供了强大的数据传输能力。与此同时,Flink 作为一个流处理框架,具备高效的分析计算能力,能够快速处理大量的实时数据。

        这种结合的优势在于,Kafka 负责数据的收集和传输,能够保证数据在分布式环境中的可靠传输和高吞吐量;而 Flink 则专注于数据的分析和处理,能够快速地处理实时数据流,提供实时的分析结果。

        为了方便地使用 Kafka 作为数据源,Flink 官方提供了 flink-connector-kafka 连接工具,其中包含了一个名为 FlinkKafkaConsumer 的消费者类,它是一个 SourceFunction,用于从 Kafka 中读取数据。通过引入相应的依赖,开发者可以轻松地将 Kafka 作为数据源集成到 Flink 流处理应用中。

        这种架构的优点在于,它能够帮助企业快速构建实时流处理应用,提高数据处理和分析的效率。同时,由于 Kafka 和 Flink 的高度集成和优化,这种架构还能够提供更好的扩展性和稳定性,满足企业不断增长的数据处理需求。

5,自定义源算子

        在 Apache Flink 中,自定义算子是一种高级功能,允许用户根据特定的业务逻辑或数据处理需求编写自定义的算子。通过自定义算子,您可以扩展 Flink 的内置功能,实现更灵活和定制化的数据处理流程。

要创建自定义算子,您需要遵循以下步骤:

  1. 定义数据类型:首先,确定您要处理的输入和输出数据类型。这可以是 Flink 支持的基本类型、复合类型或自定义类型。
  2. 创建算子类:创建一个 Java 类来实现您所需的自定义算子逻辑。您需要继承 org.apache.flink.api.common.operators.Operator 类或其子类,并实现必要的方法。
  3. 实现处理函数:在自定义算子类中,您需要实现处理函数(process() 方法)。该方法定义了您的自定义逻辑,用于处理输入数据并生成输出数据。
  4. 注册用户定义的函数:使用 Flink 的 UDF(User-Defined Function)机制注册您的自定义算子。这允许您在 Flink SQL 或 DataStream API 中使用自定义算子。
  5. 测试和验证:编写测试用例来验证您的自定义算子的功能和性能。确保它能够正确地处理输入数据并产生期望的输出结果。
  6. 使用自定义算子:一旦您完成了自定义算子的开发和测试,就可以将其集成到 Flink 作业中,并与其他 Flink 算子一起使用。

6,Flink 支持的数据类型

        

        Flink 拥有自己完整的数据类型系统,为数据处理提供了便利。该系统通过使用“类型信息”(TypeInformation)来统一描述数据类型。TypeInformation 不仅是 Flink 中所有类型描述符的基类,还涵盖了类型的基本属性,并为每种数据类型生成特定的序列化器、反序列化器和比较器。

6.1 Flink 支持多种数据类型,包括但不限于: 
  1. 基本类型:Java 和 Scala 的基本类型及其包装类,如 Int、Long、Double 等,以及 Void、String、Date、BigDecimal 和 BigInteger。
  2. 数组类型:包括基本类型数组(如 int[]、double[])和对象数组(如 String[])。
  3. 复合数据类型:
  • Java 元组类型(TUPLE):Flink 内置的元组类型,最多支持 25 个字段。
  • Scala 样例类及 Scala 元组:不支持空字段。
  • 行类型(ROW):具有任意个字段的元组,支持空字段。
  • POJO(Plain Old Java Object):Flink 自定义的类似于 Java bean 的类。

     4.辅助类型:Option、Either、List、Map 等。

     5.泛型类型(GENERIC):Flink 支持所有 Java 类和 Scala 类,但如果没有按照 POJO 类型的定义要求来定义,将被当作泛型类处理。

        在这些类型中,元组类型和 POJO 类型最为灵活,因为它们支持创建复杂类型。而 POJO 还支持在键(key)的定义中直接使用字段名,大大提高了代码的可读性。因此,在项目实践中,流处理程序中的元素类型往往被定义为 Flink 的 POJO 类型。

6.2对于 POJO 类型,Flink 有以下要求:
  1. 类必须是公共的(public)且独立的(没有非静态的内部类)。
  2. 类必须有一个公共的无参构造方法。
  3. 类中的所有字段必须是 public 且非 final 修饰的;或者提供一个公共的 getter 和 setter 方法,这些方法需符合 Java bean 的命名规范。
  4. 对于 Scala 的样例类,它类似于 Java 中的 POJO 类。例如,Scala 的 Event 就是一个样例类,使用起来非常方便。

转换算子

转换算子

输出算子

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/342319.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

一、认识 JVM 规范(JVM 概述、字节码指令集、Class文件解析、ASM)

1. JVM 概述 JVM&#xff1a;Java Virtual Machine&#xff0c;也就是 Java 虚拟机 所谓虚拟机是指&#xff1a;通过软件模拟的具有完整硬件系统功能的、运行在一个完全隔离环境中的计算机系统。 即&#xff1a;虚拟机是一个计算机系统。这种计算机系统运行在完全隔离的环境中…

Linux Centos7环境下安装Redis5

Centos7环境下安装Redis5 使用 yum 安装创建符号链接针对可执⾏程序设置符号链接针对配置⽂件设置符号链接 修改配置文件启动Redis停止Redis 一般情况下我们在 linux 系统中想要安装一些程序首先会想到使用 yum 源来安装, 但是在下图中我们可以看到 Redis 版本还是3.X的版本, 所…

优化用户体验测试应用领域:提升产品质量与用户满意度

在当今数字化时代&#xff0c;用户体验测试应用已经成为确保产品质量、提升用户满意度的关键工具。随着技术的不断发展&#xff0c;用户的期望也在不断演变&#xff0c;因此&#xff0c;为了保持竞争力&#xff0c;企业必须将用户体验置于产品开发的核心位置。本文将探讨用户体…

vcruntime140_1.dll文件丢失有什么办法可以解决

vcruntime140_1.dll文件丢失是电脑中常见的事情&#xff0c;解决vcruntime140_1.dll丢失的办法也有很多种&#xff0c;今天我们就来聊聊为什么要修复vcruntime140_1.dll文件&#xff0c;vcruntime140_1.dll在电脑中的重要性&#xff0c;以及详细的解决vcruntime140_1.dll丢失的…

GPT-5不叫GPT-5?下一代模型会有哪些新功能?

OpenAI首席执行官奥特曼在上周三达沃斯论坛接受媒体采访时表示&#xff0c;他现在的首要任务就是推出下一代大模型&#xff0c;这款模型不一定会命名GPT-5。虽然GPT-5的商标早已经注册。 如果GPT-4目前解决了人类任务的10%&#xff0c;GPT-5应该是15%或者20%。 OpenAI从去年开…

使用DockerFile构建镜像与镜像上传

目录 前言&#xff1a;为什么要使用Dockerfile &#xff1f; DockerFile构建镜像 1、构建基础对象 2、Dockerfile文件结构 3、构建Dockerfile文件镜像 二、镜像上传&#xff08;阿里云&#xff09; 前言&#xff1a;为什么要使用Dockerfile &#xff1f; 首先Dockerfile …

Element组件完整引入、按需引入、样式修改(全局、局部)、简单安装less以及npm命令证书过期等

目录 一、npm 安装二、完整引入三、按需引入四、样式修改1.按需加载的全局样式修改2. 局部样式修改1. 在 css 预处理器如 less scss 等直接使用::v-deep2. 只能用在原生 CSS 语法中:/deep/ 或者 >>> 五、 拓展&#xff1a;npm 安装less报错&#xff0c;提示证书过期六…

Java Web(二)--HTML

基本介绍 官网文档地址: HTML 教程 HTML&#xff08;HyperText Mark-up Language&#xff09;即超文本标签语言&#xff1b;HTML 文本是由 HTML 标签组成的文本&#xff0c;可以包括文字、图形、动画、声音、表格、链接等&#xff1b;HTML 的结构包括头部&#xff08;Head&…

CWE、CVE

文章目录 前言一、CVE是什么&#xff1f;二、官网浏览主页词汇 三、CWE 前言 一、CVE是什么&#xff1f; 关于CVE是什么&#xff0c;前辈已经阐述得很详细通透&#xff0c;这里不再赘述或生产一些垃圾信息&#xff0c;CVE公共漏洞和暴露的学习 总结&#xff1a; 标识某个漏洞…

不就业,纯兴趣,应该自学C#还是JAVA?

不就业&#xff0c;纯兴趣&#xff0c;应该自学C#还是JAVA? 在开始前我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「JAVA的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“888”之后私信回复“888”&#xff0c;全部无偿共享给大家&#xff…

测试老司机聊聊测试设计都包含什么?

一、数据组合测试设计 数据组合测试设计&#xff08;Combinatorial Test Design&#xff0c;CTD&#xff09;是一种优化测试用例的方法&#xff0c;它通过系统地组合不同的测试数据输入&#xff0c;以确保全面覆盖各种可能的测试情况。这种方法主要应用于软件测试领域&#xff…

Aria2 WebUI控制台 任意文件读取漏洞复现(CVE-2023-39141)

0x01 产品简介 Aria2 WebUI控制台是用于下载文件的实用程序。它支持 HTTP(S)/FTP/SFTP/BitTorrent 和 Metalink 协议。aria2可以从多个来源/协议下载文件,并尝试利用您的最大下载带宽。它支持同时从HTTP(S)/FTP/SFTP和BitTorrent下载文件,而从HTTP(S)/FTP/SFTP下载的数据上…

【算法】选择最佳路线(超级源点)

题目 有一天&#xff0c;琪琪想乘坐公交车去拜访她的一位朋友。 由于琪琪非常容易晕车&#xff0c;所以她想尽快到达朋友家。 现在给定你一张城市交通路线图&#xff0c;上面包含城市的公交站台以及公交线路的具体分布。 已知城市中共包含 n 个车站&#xff08;编号1~n&…

银行数据仓库体系实践(4)--数据抽取和加载

1、ETL和ELT ETL是Extract、Transfrom、Load即抽取、转换、加载三个英文单词首字母的集合&#xff1a; E&#xff1a;抽取&#xff0c;从源系统(Souce)获取数据&#xff1b; T&#xff1a;转换&#xff0c;将源系统获取的数据进行处理加工&#xff0c;比如数据格式转化、数据精…

原来岳云鹏背后的女人竟然是她?有她,岳云鹏红遍大江南北。

♥ 为方便您进行讨论和分享&#xff0c;同时也为能带给您不一样的参与感。请您在阅读本文之前&#xff0c;点击一下“关注”&#xff0c;非常感谢您的支持&#xff01; 文 |猴哥聊娱乐 编 辑|徐 婷 校 对|侯欢庭 岳云鹏&#xff0c;一个出身于农村的普通孩子&#xff0c;曾经…

如何在容器内部进行抓包

//先获取POD 的容器ID号 //去pod容器所在节点进行解析id为pid号 //通过pid号进入这个容器的网络命名空间 docker inspect --format {{.State.Pid}} 05f38d2a61e29b5a9d24fc7a3906991ab92ecd58ff7e0eb4e339a4cc6b2c4fc4 //访问容器内部&#xff0c;Node01节点

第10次修改了可删除可持久保存的前端html备忘录

第10次修改了可删除可持久保存的前端html备忘录 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>…

JAVA的面试题四

1.电商行业特点 &#xff08;1&#xff09;分布式&#xff1a; ①垂直拆分:根据功能模块进行拆分 ②水平拆分:根据业务层级进行拆分 &#xff08;2&#xff09;高并发&#xff1a; 用户单位时间内访问服务器数量,是电商行业中面临的主要问题 &#xff08;3&#xff09;集群&…

备战2个月,面试被问麻了....

&#x1f525; 交流讨论&#xff1a;欢迎加入我们一起学习&#xff01; &#x1f525; 资源分享&#xff1a;耗时200小时精选的「软件测试」资料包 &#x1f525; 教程推荐&#xff1a;火遍全网的《软件测试》教程 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1…

中小学班主任工作规定

作为一名长期扎根在中小学教育一线的教师&#xff0c;您是否曾经思考过&#xff1a;作为班主任&#xff0c;我们的工作有哪些明确的规定和要求&#xff1f; 众所周知&#xff0c;班主任工作是中小学教育的重要组成部分。那么&#xff0c;具体来说&#xff0c;班主任的工作有哪…