[flink 实时流基础]源算子和转换算子

文章目录

    • 1. 源算子 Source
        • 1. 从集合读
        • 2. 从文件读取
        • 3. 从 socket 读取
        • 4. 从 kafka 读取
        • 5. 从数据生成器读取数据
    • 2. 转换算子
        • 基本转换算子(map/ filter/ flatMap)


1. 源算子 Source

Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。
image.png
在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方法:
DataStream stream = env.addSource(…);
方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口。
从Flink1.12开始,主要使用流批统一的新Source架构:
DataStreamSource stream = env.fromSource(…)
Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。

1. 从集合读
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 1. 从集合读
//        DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 2, 3));

        // 2. 直接填元素
        DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);

        source.print();

        env.execute();
    }
2. 从文件读取
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-files</artifactId>
			<version>${flink.version}</version>
		</dependency>

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        FileSource<String> source = FileSource.forRecordStreamFormat(
            new TextLineInputFormat(),
            new Path("input/world.txt"))
            .build();

        env
            .fromSource(source, WatermarkStrategy.noWatermarks(), "fileSource")
            .print();


        env.execute();
    }
3. 从 socket 读取
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.socketTextStream("localhost", 7777);
        source.print();


        env.execute();
    }

可以使用 nc -l 7777创建一个监听链接的 tcp

4. 从 kafka 读取
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka</artifactId>
			<version>${flink.version}</version>
		</dependency>
public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
            .setBootstrapServers("hadoop102:9092")
            .setTopics("topic_1")
            .setGroupId("atguigu")
            .setStartingOffsets(OffsetsInitializer.latest())
            .setValueOnlyDeserializer(new SimpleStringSchema()) 
            .build();

        DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");

        stream.print("Kafka");

        env.execute();
    }
5. 从数据生成器读取数据
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-datagen</artifactId>
			<version>${flink.version}</version>
		</dependency>
 public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {
            @Override
            public String map(Long value) throws Exception {
                return "Number:" + value;
            }
        }, 10, // 自动生成的数字序列
            RateLimiterStrategy.perSecond(10), // 限速策略,每秒生成10条
            Types.STRING // 返回类型
        );


        env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "datagenerator").print();


        env.execute();


    }

2. 转换算子

数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。
image.png

基本转换算子(map/ filter/ flatMap)

map是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。
image.png
filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。
image.png
flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。
:::info
消费一个元素,可以产生0到多个元素。
:::
flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。
image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/500953.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

hcia datacom课程学习(5):MAC地址与arp协议

1.MAC地址 1.1 含义与作用 &#xff08;1&#xff09;含义&#xff1a; mac地址也称物理地址&#xff0c;是网卡设备在数据链路层的地址&#xff0c;全世界每一块网卡的mac地址都是唯一的&#xff0c;出厂时烧录在网卡上不可更改 &#xff08;2&#xff09;作用&#xff1a…

OKCC的API资源管理平台怎么用?

API资源管理平台&#xff0c;重点是“资源”管理平台&#xff0c;不是API接口管理平台。 天天讯通推出的API资源管理平台&#xff0c;类似昆石的VOS系统&#xff0c;区别是VOS是SIP资源管理系统&#xff0c;我们的API资源管理平台是API资源管理系统&#xff08;AXB、AX、回拨AP…

科技下乡:数字乡村改变乡村生活方式

在科技飞速发展的时代&#xff0c;数字化、信息化浪潮正以前所未有的速度席卷全球。在这场科技革命中&#xff0c;乡村不再是滞后的代名词&#xff0c;而是成为了数字乡村建设的热土。科技下乡&#xff0c;让数字乡村成为了改变乡村生活方式的重要力量。 一、科技下乡&#xf…

京东云8核16G服务器配置租用优惠价格1198元1年、4688元三年

京东云轻量云主机8核16G服务器租用优惠价格1198元1年、4688元三年&#xff0c;配置为8C16G-270G SSD系统盘-5M带宽-500G月流量&#xff0c;华北-北京地域。京东云8核16G服务器活动页面 yunfuwuqiba.com/go/jd 活动链接打开如下图&#xff1a; 京东云8核16G服务器优惠价格 京东云…

操作系统OS Chapter1

操作系统OS 一、概念和功能1.概念2.功能3.目标 二、特征1.并发2.共享3.虚拟4.异步 三、发展四、运行机制五、中断和异常1.中断的作用2.中断的类型3.中断机制的原理 六、系统调用七、操作系统结构八、操作系统引导九、虚拟机 一、概念和功能 1.概念 操作系统&#xff08;OS&…

harbor api v2.0

harbor api v2.0 v2.0 v2.0 “harbor api v2.0”与原来区别较大&#xff0c;此处harbor也做了https。另外&#xff0c;通过接口拿到的数据也是只能默认1页10个&#xff0c;所以脚本根据实际情况一页页的抓取数据 脚本主要用于统计repo、image&#xff0c;以及所有镜像的tag数&…

HTML网站的概念

目录 前言&#xff1a; 1.什么是网页&#xff1a; 2.什么是网站&#xff1a; 示例&#xff1a; 3.服务器&#xff1a; 总结&#xff1a; 前言&#xff1a; HTML也称Hyper Text Markup Language&#xff0c;意思是超文本标记语言&#xff0c;同时HTML也是前端的基础&…

IF= 13.4| 当eDNA遇上机器学习法

近日&#xff0c;凌恩生物客户重庆医科大学在《Water Research》&#xff08;IF 13.4&#xff09;发表研究论文“Supervised machine learning improves general applicability of eDNA metabarcoding for reservoir health monitoring”。该研究主要介绍了一种基于eDNA的机器学…

mysql的主从配置

MySQL主从复制是一种常见的数据库复制技术&#xff0c;用于实现数据在一个主数据库服务器和一个或多个从数据库服务器之间的同步。在主从配置中&#xff0c;主服务器负责接收和处理写操作&#xff0c;然后将这些变更通过binlog日志传播到从服务器&#xff0c;从服务器根据主服务…

【MySQL】7.MHA高可用配置及故障切换

什么是MHA MHA&#xff08;MasterHigh Availability&#xff09;是一套优秀的MySQL高可用环境下故障切换和主从复制的软件 mha用于解决mysql的单点故障问题&#xff1b; 出现故障时&#xff0c;mha能在0~30秒内自动完成故障切换&#xff1b; 并且能在故障切换过程中&#xff0…

《让你的时间多一倍》逃离时间陷阱,你没有自己想的那么懒 - 三余书屋 3ysw.net

让你的时间多一倍 今天我们来阅读法比安奥利卡尔的作品《让你的时间多一倍》。或许你会心生疑虑&#xff0c;这本书是否又是一本沉闷的时间管理指南&#xff1f;但我要告诉你的是&#xff0c;尽管时间管理这个话题已经为大众所熟知&#xff0c;这本书却为我们揭示了一个全新的…

【Roadmap to learn LLM】Large Language Models in Five Formulas

by Alexander Rush Our hope: reasoning about LLMs Our Issue 文章目录 Perpexity(Generation)Attention(Memory)GEMM(Efficiency)用矩阵乘法说明GPU的工作原理 Chinchilla(Scaling)RASP(Reasoning)结论参考资料 the five formulas perpexity —— generationattention —— m…

PyCharm中配置PyQt5并添加外部工具

Qt Designer、PyUIC和PyRcc是Qt框架下的三个重要工具&#xff0c;总的来说&#xff0c;这三个工具各司其职&#xff0c;相辅相成&#xff0c;能显著提升Qt开发的速度与效率。 Qt Designer&#xff1a;是一个用于创建图形用户界面的工具&#xff0c;可轻松构建复杂的用户界面。…

matlab及其在数字信号处理中的应用001:软件下载及安装

目录 一&#xff0c;matlab的概述 matlab是什么 matlab适用于的问题 matlab的易扩展性 二&#xff0c;matlab的安装 1&#xff0c;解压所有压缩文件 2&#xff0c;解压镜像压缩文件 3&#xff0c;运行setup.exe 4&#xff0c;开始安装 5&#xff0c;不要运行软件…

EasyBoss ERP上线实时数据大屏,Shopee本土店铺数据实时监测

近日&#xff0c;灵隐寺PPT汇报用上数据大屏疯狂刷屏&#xff0c;有做东南亚本土电商的老板发现这种数据大屏的模式可以很好地展现店铺运营状况。 所以就有老板来问&#xff1a;EasyBoss能不能也上线实时数据大屏的功能&#xff1f;没问题&#xff01;立马安排&#xff01; 要有…

BasicVSR++模型转JIT并用c++libtorch推理

BasicVSR模型转JIT并用clibtorch推理 文章目录 BasicVSR模型转JIT并用clibtorch推理安装BasicVSR 环境1.下载源码2. 新建一个conda环境3. 安装pytorch4. 安装 mim 和 mmcv-full5. 安装 mmedit6. 下载模型文件7. 测试一下能否正常运行 转换为JIT模型用c libtorch推理效果 安装Ba…

只出现一次的数字 II

题目链接 只出现一次的数字 II 题目描述 注意点 nums中&#xff0c;除某个元素仅出现一次外&#xff0c;其余每个元素都恰出现三次设计并实现线性时间复杂度的算法且使用常数级空间来解决此问题 解答思路 本题与只出现一次的数字的数字类似&#xff0c;区别是重复的数字会…

深度学习InputStreamReader类

咦咦咦&#xff0c;各位小可爱&#xff0c;我是你们的好伙伴——bug菌&#xff0c;今天又来给大家普及Java SE相关知识点了&#xff0c;别躲起来啊&#xff0c;听我讲干货还不快点赞&#xff0c;赞多了我就有动力讲得更嗨啦&#xff01;所以呀&#xff0c;养成先点赞后阅读的好…

SpringMVC注解及使用规则

文章目录 前言一、SpringMVC注解是什么&#xff1f;二、使用步骤1.注解使用2创建JSP3 SpringMVC视图1. 逻辑视图&#xff08;Logical View&#xff09;2. 物理视图&#xff08;Physical View&#xff09;区别和关系 4 SpringMVC注解总结 总结 前言 提示&#xff1a;这里可以添…

Java运算符-三元运算符,这你必须得会!

哈喽&#xff0c;各位小伙伴们&#xff0c;你们好呀&#xff0c;我是喵手。 今天我要给大家分享一些自己日常学习到的一些知识点&#xff0c;并以文字的形式跟大家一起交流&#xff0c;互相学习&#xff0c;一个人虽可以走的更快&#xff0c;但一群人可以走的更远。 我是一名后…