Flink 算子

Flink 算子

用户通过算子能将一个或多个 DataStream 转换成新的 DataStream,在应用程序中可以将多个数据转换算子合并成一个复杂的数据流拓扑。

这部分内容将描述 Flink DataStream API 中基本的数据转换 API,数据转换后各种数据分区方式,以及算子的链接策略。

数据流转换

Map

DataStream → DataStream #
输入一个元素同时输出一个元素。下面是将输入流中元素数值加倍的 map function:

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
})

FlatMap

DataStream → DataStream #
输入一个元素同时产生零个、一个或多个元素。下面是将句子拆分为单词的 flatmap function:

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

Filter

DataStream → DataStream #
为每个元素执行一个布尔 function,并保留那些 function 输出值为 true 的元素。下面是过滤掉零值的 filter:

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});

KeyBy

DataStream → KeyedStream #
在逻辑上将流划分为不相交的分区。具有相同 key 的记录都分配到同一个分区。在内部, keyBy() 是通过哈希分区实现的。有多种指定 key 的方式。

dataStream.keyBy(value -> value.getSomeKey());
dataStream.keyBy(value -> value.f0);

以下情况,一个类不能作为 key: 它是一种 POJO 类,但没有重写 hashCode() 方法而是依赖于
Object.hashCode() 实现。 它是任意类的数组。

Reduce

KeyedStream → DataStream #
在相同 key 的数据流上“滚动”执行 reduce。将当前元素与最后一次 reduce 得到的值组合然后输出新值。

下面是创建局部求和流的 reduce function:

keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});

Window

KeyedStream → WindowedStream #
可以在已经分区的 KeyedStreams 上定义 Window。Window 根据某些特征(例如,最近 5 秒内到达的数据)对每个 key Stream 中的数据进行分组。请参阅 windows 获取有关 window 的完整说明。

dataStream
  .keyBy(value -> value.f0)
  .window(TumblingEventTimeWindows.of(Time.seconds(5))); 

WindowAll

DataStream → AllWindowedStream #
可以在普通 DataStream 上定义 Window。 Window 根据某些特征(例如,最近 5 秒内到达的数据)对所有流事件进行分组。请参阅windows获取有关 window 的完整说明。

这适用于非并行转换的大多数场景。所有记录都将收集到 windowAll 算子对应的一个任务中。

dataStream
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));

Window Apply

WindowedStream → DataStream #
AllWindowedStream → DataStream #
将通用 function 应用于整个窗口。下面是一个手动对窗口内元素求和的 function。

如果你使用 windowAll 转换,则需要改用 AllWindowFunction。

windowedStream.apply(new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
    public void apply (Tuple tuple,
            Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

// 在 non-keyed 窗口流上应用 AllWindowFunction
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
    public void apply (Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

WindowReduce

WindowedStream → DataStream #
对窗口应用 reduce function 并返回 reduce 后的值。

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
    }
});

Union

DataStream* → DataStream #
将两个或多个数据流联合来创建一个包含所有流中数据的新流。注意:如果一个数据流和自身进行联合,这个流中的每个数据将在合并后的流中出现两次。

dataStream.union(otherStream1, otherStream2, ...);

Window Join

DataStream,DataStream → DataStream #
根据指定的 key 和窗口 join 两个数据流。

dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...});

Interval Join

KeyedStream,KeyedStream → DataStream #
根据 key 相等并且满足指定的时间范围内(e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound)的条件将分别属于两个 keyed stream 的元素 e1 和 e2 Join 在一起。

keyedStream.intervalJoin(otherKeyedStream)
    .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
    .upperBoundExclusive(true) // optional
    .lowerBoundExclusive(true) // optional
    .process(new IntervalJoinFunction() {...});

Window CoGroup

DataStream,DataStream → DataStream #
根据指定的 key 和窗口将两个数据流组合在一起。

dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...});

Connect

DataStream,DataStream → ConnectedStream #
“连接” 两个数据流并保留各自的类型。connect 允许在两个流的处理逻辑之间共享状态。

DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

CoMap, CoFlatMap

ConnectedStream → DataStream #
类似于在连接的数据流上进行 map 和 flatMap。

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
});

Cache

DataStream → CachedDataStream #
把算子的结果缓存起来。目前只支持批执行模式下运行的作业。算子的结果在算子第一次执行的时候会被缓存起来,之后的 作业中会复用该算子缓存的结果。如果算子的结果丢失了,它会被原来的算子重新计算并缓存。

DataStream<Integer> dataStream = //...
CachedDataStream<Integer> cachedDataStream = dataStream.cache();
cachedDataStream.print(); // Do anything with the cachedDataStream
...
env.execute(); // Execute and create cache.     
cachedDataStream.print(); // Consume cached result.
env.execute();

物理分区

Flink 也提供以下方法让用户根据需要在数据转换完成后对数据分区进行更细粒度的配置。

自定义分区

DataStream → DataStream #
使用用户定义的 Partitioner 为每个元素选择目标任务。

dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);

随机分区

DataStream → DataStream #
将元素随机地均匀划分到分区。

dataStream.shuffle();

Rescaling

DataStream → DataStream #
将元素以 Round-robin 轮询的方式分发到下游算子。如果你想实现数据管道,这将很有用,例如,想将数据源多个并发实例的数据分发到多个下游 map 来实现负载分配,但又不想像 rebalance() 那样引起完全重新平衡。该算子将只会到本地数据传输而不是网络数据传输,这取决于其它配置值,例如 TaskManager 的 slot 数量。

上游算子将元素发往哪些下游的算子实例集合同时取决于上游和下游算子的并行度。例如,如果上游算子并行度为 2,下游算子的并发度为 6, 那么上游算子的其中一个并行实例将数据分发到下游算子的三个并行实例, 另外一个上游算子的并行实例则将数据分发到下游算子的另外三个并行实例中。再如,当下游算子的并行度为2,而上游算子的并行度为 6 的时候,那么上游算子中的三个并行实例将会分发数据至下游算子的其中一个并行实例,而另外三个上游算子的并行实例则将数据分发至另下游算子的另外一个并行实例。

当算子的并行度不是彼此的倍数时,一个或多个下游算子将从上游算子获取到不同数量的输入。

请参阅下图来可视化地感知上述示例中的连接模式:
在这里插入图片描述
dataStream.rescale();

广播

DataStream → DataStream #
将元素广播到每个分区 。
dataStream.broadcast();

算子链和资源组

将两个算子链接在一起能使得它们在同一个线程中执行,从而提升性能。Flink 默认会将能链接的算子尽可能地进行链接(例如, 两个 map 转换操作)。此外, Flink 还提供了对链接更细粒度控制的 API 以满足更多需求:

如果想对整个作业禁用算子链,可以调用 StreamExecutionEnvironment.disableOperatorChaining()。下列方法还提供了更细粒度的控制。需要注意的是,这些方法只能在 DataStream 转换操作后才能被调用,因为它们只对前一次数据转换生效。例如,可以 someStream.map(…).startNewChain() 这样调用,而不能 someStream.startNewChain() 这样。

一个资源组对应着 Flink 中的一个 slot 槽,更多细节请看 slots 。 你可以根据需要手动地将各个算子隔离到不同的 slot 中。

创建新链

基于当前算子创建一个新的算子链。
后面两个 map 将被链接起来,而 filter 和第一个 map 不会链接在一起。

someStream.filter(...).map(...).startNewChain().map(...);

禁止链接

禁止和 map 算子链接在一起。

someStream.map(...).disableChaining();

配置 Slot 共享组

为某个算子设置 slot 共享组。Flink 会将同一个 slot 共享组的算子放在同一个 slot 中,而将不在同一 slot 共享组的算子保留在其它 slot 中。这可用于隔离 slot 。如果所有输入算子都属于同一个 slot 共享组,那么 slot 共享组从将继承输入算子所在的 slot。slot 共享组的默认名称是 “default”,可以调用 slotSharingGroup(“default”) 来显式地将算子放入该组。

someStream.filter(...).slotSharingGroup("name");

名字和描述

Flink里的算子和作业节点会有一个名字和一个描述。名字和描述。名字和描述都是用来介绍一个算子或者节点是在做什么操作,但是他们会被用在不同地方。

名字会用在用户界面、线程名、日志、指标等场景。节点的名字会根据节点中算子的名字来构建。 名字需要尽可能的简洁,避免对外部系统产生大的压力。

描述主要用在执行计划展示,以及用户界面展示。节点的描述同样是根据节点中算子的描述来构建。 描述可以包括详细的算子行为的信息,以便我们在运行时进行debug分析。

someStream.filter(...).name("filter").setDescription("x in (1, 2, 3, 4) and y > 1");

节点的描述默认是按照一个多行的树形结构来构建的,用户可以通过把pipeline.vertex-description-mode设为CASCADING, 实现将描述改为老版本的单行递归模式。

Flink SQL框架生成的算子默认会有一个由算子的类型以及id构成的名字,以及一个带有详细信息的描述。 用户可以通过将table.exec.simplify-operator-name-enabled设为false,将名字改为和以前的版本一样的详细描述。

当一个作业的拓扑很复杂时,用户可以把pipeline.vertex-name-include-index-prefix设为true,在节点的名字前增加一个拓扑序的前缀,这样就可以很容易根据指标以及日志的信息快速找到拓扑图中对应节点。

本来来自官网文档:https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/datastream/operators/overview/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/620743.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Excel 两层分类后的行转列

例题描述 Excel 文件中有下图所示的数据&#xff0c;同 Name 的物品可能有多种颜色。 现在想要把数据列出下图的形式&#xff0c;每种Type一行&#xff0c;其后依次列出每种Name及其Color。 实现方法 使用 Excel 插件 SPL XLL 在空白单元格写入公式&#xff1a; spl("…

阿里云短信提示被攻击怎么解决!!

你是否收到过这样的短信&#xff0c;【阿里云】尊敬的用户&#xff1a;您的IP: 实例名称&#xff1a; 受到攻击流量已超过云盾DDoS基础防护的带宽峰值&#xff0c;服务器的所有访问已被屏蔽&#xff0c;如果35分钟后攻击停止将自动解除否则会延期解除。详情请登录云盾控制台DDo…

5.12学习总结

一.JAVA聊天室项目 文件发送 使用 Java Socket 实现聊天内容或文件的传输的原理如下&#xff1a; 服务器端启动&#xff1a;聊天室的服务器端在指定的端口上监听客户端的连接。它创建一个 ServerSocket 对象&#xff0c;并通过调用 accept() 方法等待客户端的连接请求。客户…

LSTM计算指示图

掌握网络结构组件构成 输入门、遗忘门、输出门候选记忆细胞记忆细胞隐藏状态ref&#xff1a;6.8. 长短期记忆&#xff08;LSTM&#xff09; — 《动手学深度学习》 文档 (gluon.ai)

《深入浅出LLM基础篇》(四):主流大模型介绍

&#x1f389;AI学习星球推荐&#xff1a; GoAI的学习社区 知识星球是一个致力于提供《机器学习 | 深度学习 | CV | NLP | 大模型 | 多模态 | AIGC 》各个最新AI方向综述、论文等成体系的学习资料&#xff0c;配有全面而有深度的专栏内容&#xff0c;包括不限于 前沿论文解读、…

冯喜运:5.13黄金晚间还会跌吗?原油还会涨吗?

【黄金消息面分析】&#xff1a;自5月初以来&#xff0c;黄金和白银一直在享受需求的回归&#xff0c;买家在过去几天加大了力度&#xff0c;一度推动金价重返2370美元上方&#xff0c;白银重返28.5美元上方。不过&#xff0c;经过几天的盘整后&#xff0c;黄金白银价格双双下跌…

限流算法(令牌桶漏桶计数器)

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;Spring⛺️稳中求进&#xff0c;晒太阳 业务重的三种情况&#xff1a;突发流量、恶意流量、业务本身需要 限流: 是为了保护自身系统和下游系统不被高并发流量冲垮&#xff0c;导致系统雪崩…

AI智剪轻松学:一键操作技巧,批量视频剪辑不求人

随着科技的飞速发展&#xff0c;人工智能已经渗透到我们生活的方方面面&#xff0c;其中AI在视频剪辑领域的应用更是为众多创作者带来了福音。AI智剪技术&#xff0c;以其高效、便捷的特点&#xff0c;正在逐步改变着传统视频剪辑的方式。今天&#xff0c;我们就来探讨一下如何…

新火种AI|清华开发首个AI医院小镇!开启智能医疗新纪元。

作者&#xff1a;小岩 编辑&#xff1a;彩云 AI技术飞速发展&#xff0c;它的影响力正在逐渐渗透到各行各业&#xff0c;医疗领域也不例外。 人生无非是生老病死&#xff0c;医疗领域与其中的“病”息息相关。所以&#xff0c;每每医疗领域产生什么重大进展&#xff0c;都会…

ranger配置ha高可用方案

变更影响面 变更完需要重启所有组件 配置lb(需要客户侧配置并提供LB地址) 转发方式选择ip hash(哈希) 监听端口为6080 协议为tcp 配置后端监听

考研数学|24像张宇那样的题?李林880和李永乐660不够用了?

以前的卷子就不说了&#xff0c;就说说最近的24年的考研数学题 24年考研数学真题评价&#xff1a; 首先数学二在计算量上超过了数学三&#xff0c;尤其是在高等数学的选择题部分&#xff0c;这使得数学二的难度可能略高于数学三&#xff0c;尽管两者之间并没有本质的差异。与…

测试二(测试点)

能掌握80% 一、能对穷举场景设计测试点 能对穷举场景设计测试点——>等价类划分法 1.1、等价类划分法 1.1.1. 说明 | 分类 | 步骤 说明&#xff1a;在所有测试数据中&#xff0c;具有某种共同特征的数据集合进行划分。 分类&#xff1a;有效等价类&#xff1a;满足…

Multisim14 安装教程

1、下载压缩包 链接&#xff1a;https://pan.baidu.com/s/1L50kBBKWFtud6GhmmqHLiw?pwd8888 提取码&#xff1a;8888 2、解压 3、运行应用程序&#xff0c;开始安装&#xff0c; 4、点击确定 5、点击unzip&#xff0c;解压 6、点击确定 7、点击安装 8、填写name和organ&a…

排序-快速排序(Quick Sort)

快排的简介 快速排序&#xff08;Quick Sort&#xff09;是一种高效的排序算法&#xff0c;采用分治法的策略&#xff0c;其基本思想是选择一个基准元素&#xff0c;通过一趟排序将待排序的数据分割成独立的两部分&#xff0c;其中一部分的所有数据都比另外一部分的所有数据要…

pywinauto,一款Win自动化利器!

pywinauto&#xff0c;一款Win自动化利器&#xff01; 1.安装 pywinauto是一个用于自动化Python模块&#xff0c;适合Windows系统的软件&#xff08;GUI&#xff09;&#xff0c;可以通过Pywinauto遍历窗口&#xff08;对话框&#xff09;和窗口里的控件&#xff0c;也可以控…

11、FreeRTOS 队列、队列集,邮箱的使用

文章目录 一、队列的特性1.1 队列常规操作1.2 传输数据的两种方法1.3 队列的阻塞访问 二 队列函数2.1创建2.2 复位2.3 删除2.4 写队列2.5 读队列2.6 查询2.7 覆盖/偷看 三、示例3.1示例 队列的基本使用3.2 示例: 分辨数据源3.3 示例: 传输大块数据3.4 : 邮箱(Mailbox) 四、队列…

多人同时导出 Excel 干崩服务器!新来的阿里大佬给出的解决方案太优雅了!

前言 业务诉求&#xff1a;考虑到数据库数据日渐增多&#xff0c;导出会有全量数据的导出&#xff0c;多人同时导出可以会对服务性能造成影响&#xff0c;导出涉及到mysql查询的io操作&#xff0c;还涉及文件输入、输出流的io操作&#xff0c;所以对服务器的性能会影响的比较大…

十进制整数转平衡三进制

求解原视频&#xff1a;平衡三进制 求赞&#xff01;100赞买个乒乓球拍&#xff01;_哔哩哔哩_bilibili 题目&#xff1a; 上海市计算机学会竞赛平台 | YACS 求解程序&#xff1a; using namespace std; #include <iostream> #include <cstring>string work(int n…

网页版五子棋的自动化测试

目录 前言 一、主要技术 二、测试环境的准备部署 三、测试用例 四、执行测试 4.1、公共类设计 创建浏览器驱动对象 测试套件 释放驱动类 4.2、功能测试 登录页面 注册页面 游戏大厅页面 游戏房间页面 测试套件结果 4.3、界面测试 登录页面 注册页面 游戏大…

【Linux基础】Vim保姆级一键配置教程(手把手教你把Vim打造成高效率C++开发环境)

目录 一、前言 二、安装Vim 三、原始Vim编译器的缺陷分析 四、Vim配置 &#x1f95d;预备知识----.vimrc 隐藏文件 &#x1f34b;手动配置 Vim --- &#xff08;不推荐&#xff09; &#x1f347;自动化一键配置 Vim --- (强烈推荐) ✨功能演示 五、共勉 一、前言 Vim作为…