Flink之Task解析

Flink之Task解析

  对Flink的Task进行解析前,我们首先要清楚几个角色TaskManagerSlotTaskSubtaskTaskChain分别是什么

角色注释
TaskManager在Flink中TaskManager就是一个管理task的进程,每个节点只有一个TaskManager
SlotSlot就是TaskManager中的槽位,一个TaskManager中可以存在多个槽位,取决于服务器资源和用户配置,可以在槽位中运行Task实例
Task其实Task在Flink中就是一个类,其中可以包含一个或多个算子,这个取决于算子链的构成
SubTaskSubTask就是Task类的并行实例可以是一个或多个,也就是说当代码执行的那一刻开始,就根据用户所设置或者默认的并行度创建出多个SubTask
TaskChainTaskChain就是算子链,何为算子链?就是在一个Task实例中出现的串行算子,算子间必须是OneToOne模式且并行度相同.

  上面对几个角色进行了一个简单的阐述,后面会结合图解和伪代码进行讲解,这里我们以计算中比较经典wordcount为例子,伪代码如下所示:

public class FLinkWordCount {
    public static void main(String[] args) throws Exception {
        // 创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();\
        // 设置并行度3
        env.setParallelism(3)
        // 读取数据文件
        DataStreamSource<String> streamSource = env.readTextFile("xxx");
        // 转大写
        DataStreamSource<String> upperCaseSource = streamSource.map(word -> word.toUpperCase())
        // 转成tuple2格式,计数1
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = upperCaseSource.map(word -> Tuple2.of(word, 1));
        // 按照单词分组
        KeyedStream<Tuple2<String, Integer>, String> keyed = mapStream.keyBy(tup -> tup.f0);
        // 求和
        keyed.sum("f1")
        env.execute();
    }
}

  上面的代码中我们使用了两次map,一次keyBy,一次sum算子,我们下面就结合这几个算子进行讲解,讲解之前有两个条件需要先记住:

  • 同一个Task并行实例不能放在同一个TaskSlot上运行,一个TaskSlot上可以运行多个不同的Task并行实例
  • 同一个共享组的算子允许共享槽位,不同共享组的算子决不允许共享槽位

  上面这两句话一定要记牢,以便于后面的理解.

算子链划分及Task槽位分配

算子链划分

可以根据上面的代码理解下图:
在这里插入图片描述

上图中我们可以看到两个map组成一个task chain,keyBysum组成一个task chain,这里说一下原因,首先就是两个map的并行度是一致的,而且是OneToOne模式,所以可以将两个map绑定成一个算子链,并将其放入到一个SubTask中,而到了keyBy这里为什么不能再放入到一个task chain中,这里我们可以思考一下,keyBy时会发生什么?以spark的角度来说会发生shuffle对吧,这就导致了不能满足OneToOne的模式,简单来说我们也可以想清楚,如果keyBymap组成一个task chain那么还怎么做wordcount?

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bckucHbv-1692099761760)(/Users/jinlong/data/Typora_WorkSpase/FlinkTask/task2.png)]

通过上图应该很容易理解了.

Task槽位分配

  上面讲了关于task chain怎么划分的,为什么这样划分,这里讲一下为什么同一个Task的并行实例(SubTask)不能在同一个task slot中.其实这个也很容易就想清楚,如果同一Task的多个SubTask都出现在一个task slot中那么还有什么意义呢?当这些SubTask出现在一个task slot中时就会发生串行计算,那并行的意义也就没有了.

  同时这种机制也保证了任务的容错性,也就是说对于同一个Task一旦某一个task slot出现异常的情况,其他的task slot中的SubTask还能正常运行,如果将这些SubTask放到一个task slot中,当这个task slot出现异常情况时,就会影响整个任务的执行.

  总结来说,这种设计保证了Flink任务的隔离性、容错性、资源利用性.这里用图解的方式便于大家记忆,如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rlgqeo6A-1692099761760)(/Users/jinlong/data/Typora_WorkSpase/FlinkTask/task3.png)]

槽位共享及算子链断/连

槽位共享

  前面讲过同一个Task的多个SubTask不能出现在一个task slot中,但是不同TaskSubTask是可以共享同一个task slot的,但是在Flink中有一个机制,就是用户(开发人员)可以自定义不同的算子间是否可以共享同一个task slot,如上面的例子中两个map的并行度一致并且符合OneToOne的模式,在正常情况下必然会会分到一个task chain中,但是Flink给用户提供了的slot group的概念,也就是说用户可以将这两个map分配到不同的slot group中,这种情况下两个map就不会划分到一个task chain中,试想一下当两个map都不允许共享同一个task slot时,怎么可能划分到同一个task chain中呢?

  伪代码如下:

public class FLinkWordCount {
    public static void main(String[] args) throws Exception {
        // 创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();\
        // 设置并行度3
        env.setParallelism(1)
        // 读取数据文件
        DataStreamSource<String> streamSource = env.readTextFile("xxx");
        // 转大写
        DataStreamSource<String> upperCaseSource = streamSource.map(word -> word.toUpperCase())
        // 通过slotSharingGroup()将upperCaseSource作为一个分组"g1"
        SingleOutputStreamOperator<String> slotGroup1 = upperCaseSource.slotSharingGroup("g1");
          
        // 转成tuple2格式,计数1
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = upperCaseSource.map(word -> Tuple2.of(word, 1));
        // 通过slotSharingGroup()将mapStream作为一个分组"g3"
        SingleOutputStreamOperator<Tuple2<String, Integer>> slotGroup2 = mapStream.slotSharingGroup("g2");
        // 按照单词分组
        KeyedStream<Tuple2<String, Integer>, String> keyed = mapStream.keyBy(tup -> tup.f0);
        // 求和
        keyed.sum("f1")
        env.execute();
    }
}

上面的代码中我们将upperCaseSourcemapStream分成了两个task slot,这样两个map就不可以共享相同的task slot,同时代码中将并行度改为了1,这样便于图解,如下图:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-saLgMu0Q-1692099761760)(/Users/jinlong/data/Typora_WorkSpase/FlinkTask/task4.png)]
如果说集群中总task slot只有3个,并且在代码中两个map设置了不同的task slot且两个map的并行度都为3时会怎么样?很简单,提交任务时就会报错,因为提交任务所需要的资源已经超出了集群的资源.

  这里说一下对于对task slot进行分组处理的实际用处,就以代码中两个map为例子,在实际的业务中如果两个map处理的数据量都极大,如果将两个map的计算都放到一个节点的一个task slot时会发生什么?数据的积压、任务异常失败等等都有可能发生,但是有slotSharingGroup我们就可以保证同一个task slot不会承载过大的计算任务,也就达到了资源合理分配的目的.

算子链断/连

  前面讲了关于将两个map进行slotSharingGroup后会将两个map划分到不同的task chain,如果有这样一个情况两个map满足OneToOne的模式且并行度相同时,我们不使用slotSharingGroup能否将两个map划分成不同的task chain?答案是当然可以的,Flink为我们提供了对应的API,伪代码如下:

public class FLinkWordCount {
    public static void main(String[] args) throws Exception {
        // 创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();\
        // 设置并行度3
        env.setParallelism(3)
        // 读取数据文件
        DataStreamSource<String> streamSource = env.readTextFile("xxx");
        // 转大写
        DataStreamSource<String> upperCaseSource = streamSource.map(word -> word.toUpperCase())
  
        // 转成tuple2格式,计数1
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = upperCaseSource.map(word -> Tuple2.of(word, 1));
        // 将mapStream划分到一个新的task chain中
        SingleOutputStreamOperator<Tuple2<String, Integer>> newTaskChainMapStream = mapStream.startNewChain();
        // 按照单词分组
        KeyedStream<Tuple2<String, Integer>, String> keyed = mapStream.keyBy(tup -> tup.f0);
        // 求和
        keyed.sum("f1")
        env.execute();
    }
}

在上面代码中我们调用了startNewChain()后就可以将mapStream划分到一个新的task chain中,这样的情况下,两个map既属于不同的task chain又可以共享同一个task slot,如下图:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jOIlz8uH-1692099761760)(/Users/jinlong/data/Typora_WorkSpase/FlinkTask/task5.png)]
以上就是对于Task的讲解,如有错误欢迎指出,如有问题共同探讨.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/77346.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

k8s问题汇总

作者前言 本文章为记录使用k8s遇到的问题和解决方法&#xff0c;文章持续更新中… 目录 作者前言正常配置ingress&#xff0c;但是访问错误添加工作节点报错安装k8s报错使用kubectl命令报错container没有运行安装会出现kubelet异常&#xff0c;无法识别删除k8s集群访问dashboa…

IT运维:使用数据分析平台监控深信服防火墙

概述 深信服防火墙自身监控可以满足绝大部分需求&#xff0c;比如哪个应用占了最大带宽&#xff0c;哪个用户访问了哪些网站&#xff1f;这里我们为什么使用鸿鹄呢&#xff1f;因为我们要的是数据的处理和分析&#xff0c;比如某个用户在某个事件都做了哪些行为&#xff0c;这个…

Nginx运行Vue项目:基本运行

需求 在Nginx服务器中&#xff0c;运行Vue项目。 说明 Vue项目打包生成的生产文件&#xff0c;是无法直接在浏览器打开的。需要放到Nginx服务器中&#xff0c;才能够访问。 本文章只介绍最基本的情况&#xff1a;Nginx中运行一个Vue项目。 实际生产环境&#xff0c;一个Ng…

Linux-C++开发项目:基于主从Reactor模式的高性能并发服务器

目录 1.项目介绍2.1项目部署2.2安装版本较高的编译器 2.项目开发过程2.1网络库模块开发2.1.1简单日志宏的实现2.1.2Buffer模块实现2.1.3Socket模块实现2.1.4Channel模块实现2.1.5Poller模块实现2.1.6TimerWheel模块实现2.1.7EventLoop模块实现2.1.8整合测试12.1.9LoopThread模块…

centos7离线安装gdal3.6.3

本文档以纯离线环境为基础&#xff0c;所有的安装包都是提前下载好的。以gdal3.6.3为例&#xff08;其他版本安装步骤或方式可能不同&#xff09;&#xff0c;在centos7系统离线安装&#xff0c;并运行java项目&#xff0c;实现在java服务中调用gdal库解析地理数据。以下任意组…

【JavaEE基础学习打卡04】JDBC之MySQL数据库安装

目录 前言一、JDBC与数据库二、MySQL数据库1.MySQL数据库2.MySQL服务下载安装3.MySQL服务启动停止4.MySQL命令 三、MySQL客户端安装总结 前言 &#x1f4dc; 本系列教程适用于JavaWeb初学者、爱好者&#xff0c;小白白。我们的天赋并不高&#xff0c;可贵在努力&#xff0c;坚持…

Ajax及前端工程化

Ajax&#xff1a;异步的js与xml。 作用&#xff1a; 1、通过ajax给服务器发送数据&#xff0c;并获得其响应的数据。 2、可以在不更新整个网页的情况下&#xff0c;与服务器交换数据并更新部分网页的技术。 一、同步与异步 二、原生Ajax 1、准备数据地址 2、创建XMLHttpReq…

Vitis高层次综合学习——FPGA

高层次综合 什么是高层次综合&#xff1f;就是使用高级语言&#xff08;如C/C&#xff09;来编写FPGA算法程序。 在高层次综合上并不需要制定微架构决策&#xff0c;如创建状态机、数据路径、寄存器流水线等。这些细节可以留给 HLS 工具&#xff0c;通过提供输入约束&#xff…

tauri-vue:快速开发跨平台软件的架子,支持自定义头部UI拖拽移动和窗口阴影效果

Tauri Vue Typescript 一个使用 taurivuets 开发跨平台软件的模板&#xff0c;支持窗口头部自定义 UI 和拖拽和窗口阴影&#xff0c;不用再自己做适配了&#xff0c;拿来即用&#xff0c;非常 nice。而且已经封装好了 tauri 的 http 请求工具&#xff0c;省去很多弯路。开源…

(二)结构型模式:8、代理模式(Proxy Pattern)(C++示例)

目录 1、代理模式&#xff08;Proxy Pattern&#xff09;含义 2、代理模式的UML图学习 3、代理模式的应用场景 4、代理模式的优缺点 5、C实现代理模式的实例 1、代理模式&#xff08;Proxy Pattern&#xff09;含义 代理模式&#xff08;Proxy&#xff09;&#xff0c;为…

大数据-玩转数据-Flink 自定义Sink(Mysql)

一、说明 如果Flink没有提供给我们可以直接使用的连接器&#xff0c;那我们如果想将数据存储到我们自己的存储设备中&#xff0c;mysql 的安装使用请参考 mysql-玩转数据-centos7下mysql的安装 创建表 CREATE TABLE sensor (id int(10) ) ENGINEInnoDB DEFAULT CHARSETutf8二…

Wi-Fi 安全在学校中的重要性

Wi-Fi 是教育机构的基础设施&#xff0c;从在线家庭作业门户到虚拟教师会议&#xff0c;应有尽有。大多数 K-12 管理员对自己的 Wi-Fi 网络的安全性充满信心&#xff0c;并认为他们现有的网络安全措施已经足够。 不幸的是&#xff0c;这种信心往往是错误的。Wi-Fi 安全虽然经常…

Layui列表表头去掉复选框改为选择

效果&#xff1a; 代码&#xff1a; // 表头复选框去掉改为选择 $(".layui-table th[data-field"0"] .layui-table-cell").html("<span>选择</span>");

keil构建STM32工程并使用proteus仿真led点灯实验

STM32单片机与51单片机有很大区别&#xff0c;不仅结构上有很大差异&#xff0c;STM32更复杂一些&#xff0c;在操作上来说&#xff0c;STM32也要复杂很多&#xff0c;51单片机上手写代码&#xff0c;可以很直接操作引脚&#xff0c;但是STM32单片机在操作引脚之前需要作很多初…

数据结构——栈(C语言)

需求&#xff1a;无 栈的概念&#xff1a; 栈&#xff1a;一种特殊的线性表&#xff0c;其只允许在固定的一端进行插入和删除元素操作。进行数据插入和删除操作的一端称为栈顶&#xff0c;另一端为栈底。栈中的数据元素遵守后进先出&#xff08;LIFO&#xff09;原则。压栈&…

分类预测 | MATLAB实现MTBO-CNN多输入分类预测

分类预测 | MATLAB实现MTBO-CNN多输入分类预测 目录 分类预测 | MATLAB实现MTBO-CNN多输入分类预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.MATLAB实现MTBO-CNN多输入分类预测 2.代码说明&#xff1a;基于登山队优化算法&#xff08;MTBO&#xff09;、卷积神经…

ABAP: SQL 多值查询

基础查数据 问题举例&#xff1a;例如查物料类型为ZFRT、ZROH和ZRSA的物料编码。 1、直接查询&#xff0c;三种不同类型的物料类型是或的关系。 SELECT DISTINCT ma~matnr ma~mtartFROM mara AS maINNER JOIN mbewh AS mbON ma~matnr mb~matnrINTO CORRESPONDING FIELDS OF…

EmbedPress Pro 在WordPress网站中嵌入任何内容

EmbedPress Pro可让您通过高级自定义、自定义品牌、延迟加载和更多惊人功能嵌入源。为古腾堡块和Elementor编辑器提供支持的一体化 WordPress 嵌入解决方案。使用 EmbedPress 在古腾堡创建交互式内容。使用 EmbedPress 的古腾堡块立即将任何内容嵌入到您的网站。 网址: EmbedP…

最强自动化测试框架Playwright(29)-文件选择对象

FileChooser对象通过page.on("filechoose")事件监听。 如下代码实现点击百度搜图按钮&#xff0c;上传文件进行搜索。 from playwright.sync_api import Playwright, sync_playwright, expectdef run(playwright: Playwright) -> None:browser playwright.chro…

linux 学习————LNMP之分布式部署

目录 一、概述 二、LNMP环境部署 三、配置nginx 四、 配置php使nginx能够解析.php 五、配置mysql 六、配置discuz进行登录论坛访问测试 一、概述 LNMP代表 Linux、Nginx、MySQL、PHP&#xff0c;是一种常用的服务器架构。它由以下组件组成&#xff1a; Linux&#xff1a;作…