05 Flink 的 WordCount

前言

本文对应于 spark 系列的 Spark 的 WordCount

这里主要是 从宏观上面来看一下 flink 这边的几个角色, 以及其调度的整个流程 

一个宏观 大局上的任务的处理, 执行 

基于 一个本地的 flink 集群 

 

 

测试用例

/**
 * com.hx.test.Test01WordCount
 *
 * @author Jerry.X.He
 * @version 1.0
 * @date 2021/4/12 10:14
 */
public class Test01WordCount {

    // com.hx.test.Test01WordCount
    // -Xmx100M -XX:+UseSerialGC -XX:+TraceClassUnloading
    public static void main(String[] args) throws Exception {

//        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        String jarPath = "D:\\IdeaWorkStations\\HelloFlink\\target\\HelloFlink-0.0.1.jar";
        ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("127.0.0.1", 8081, jarPath);
        env.setParallelism(1);

        String inputPath = "D:\\IdeaWorkStations\\HelloFlink\\src\\main\\resources\\Test01WordCount.txt";
        DataSource<String> inputDs = env.readTextFile(inputPath);

        DataSet<Tuple2<String, Integer>> result = inputDs
                .flatMap(new MyFlatMapMapper())
                .map(new MyMapMapper())
                .groupBy(0)
                .sum(1);
        result.print();

        System.gc();
        System.in.read();
        System.out.println(" end ");

    }

    /**
     * MyFlatMapMapper
     *
     * @author Jerry.X.He
     * @version 1.0
     * @date 2021/4/12 10:24
     */
    private static class MyFlatMapMapper implements FlatMapFunction<String, String> {
        private static List<byte[]> dummyBytes = new ArrayList<>();

        static {
            try {
                for (int i = 0; i < 10; i++) {
                    byte[] tmpBytes = FileUtils.readAllBytes(Paths.get("D:\\IdeaWorkStations\\HelloFlink\\target\\logs\\ROOT.2021-12-27-9.log"));
                    dummyBytes.add(tmpBytes);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                new Thread(new MyRunnable()).start();
            }
        }

        @Override
        public void flatMap(String line, Collector<String> out) throws Exception {
            String[] splits = line.split("\\s+");
            for (String split : splits) {
                out.collect(split);
            }
        }
    }

    /**
     * MyRunnable
     *
     * @author Jerry.X.He
     * @return
     * @date 2021/12/27 16:16
     */
    public static class MyRunnable implements Runnable {
        @Override
        public void run() {
            System.err.println(" MyRunnable.run before ");
            IoUtils.sleep(1000_000);
            System.err.println(" MyRunnable.run after ");
        }
    }

    /**
     * MyMapMapper
     *
     * @author Jerry.X.He
     * @version 1.0
     * @date 2021/4/12 10:29
     */
    private static class MyMapMapper implements MapFunction<String, Tuple2<String, Integer>> {
        @Override
        public Tuple2<String, Integer> map(String word) throws Exception {
            return new Tuple2<>(word, 1);
        }
    }

}

 

Test01WordCount.txt 内容如下 

a0b994935dfb439a9500092dce659245.png

 

 

整体交互流程 

Driver 提交 Job 到 JobManager, JobManager 分配任务到 TaskManager

4488db13d6934b61b44b01fdd687c756.png

然后 TaskManager 和 JobManager 这边交互如下 

cde2d6cfdfbe49448caebb3e0219759e.png 

 

Driver 这边的处理  

这里是 driver 这边的根据 DataSet阶段 转换为 Plan阶段

a3e0f50291034d6d900d9a668aec3c62.png

 

这里是 Plan阶段 转换为 OptimizedPlan阶段 

801f3e46704d40aeb79d1dc04c32e660.png 

这里是 OptimizedPlan阶段 转换为 JobGraph阶段 

c2c7661153954cf3a8a4cbb62a4e7b0f.png 

然后 提交的就是 JobGraph, 然后 等待集群响应结果信息  

ca22a3db7d8c40d8b1155722c59b28d3.png 

这里是将 JobGraph 序列化为 为一个临时文件, 然后提交给 flink 集群 

然后 另外就是 job 这边需要使用的 jar 列表, 也需要提交给集群 

db20124bd6cd419b9a8b0b9a4d302d64.png

 

然后更详细的提交的请求内容如下, 合计传送了 47kb

然后传递的主要内容为 三部分 

第一部分为元数据, 内容为 “{"jobGraphFileName": "flink-jobgraph126256148600228610.bin", "jobJarFileNames": ["HelloFlink-0.0.1.jar"], "jobArtifactFileNames": []}” 

第二部分为 jobGraph 序列化之后的临时文件, “flink-jobgraph126256148600228610.bin”

第三部分为 job 执行需要的 jar 包, “HelloFlink-0.0.1.jar”

244ec2a2627e4c92abc08a542bb67e0c.png

 

 

JobManager 这边的处理 

JobManager 这边拿到如上 driver 这边提交的 HttpRequest 之后, 处理如下 

根据 jobGraphFileName 反序列化 JobGraph, 上传 job 所需要的 jar 到 BlobServer

然后就是向 Dispatcher 提交 jobGraph

返回当前提交的 job 的相关信息, 主要是 jobId

a727614e3b2c49f7a1fdfa267dab09a9.png

 

然后是 Dispatcher 这边 persistAndRunJob, 创建 JobManagerRunner, JobMaster

450ee6fb0ab04a34808411d478dd51fc.png 

然后是 JobManagerRunner 启动 JobMaster

ff9c69b8b0504b279343dff237ae828a.png

 

 然后是 JobMaster 这边基于内部的 scheudlerNG 来开始调度任务

2dc84b523fd34aa0815b323dd907dd7b.png

 

然后是根据 ExecutionVertex 封装 TaskDeploymentDescriptor, 然后 向 TaskManager 执行 job 拆解之后的任务 

28daa5ab7488408fb96cf3ba5b5da170.png

 

 

 TaskManager 这边的处理

TaskExecutor 这边收到了 TaskDeploymentDescriptor 之后, 反序列化 jobInformation, taskInformation, 创建 Task, 然后执行 Task 

这里可以从 taskInformation 的上下文信息, 可以看到当前 Task 属于哪一个 JobVertex, 以及改 JobVertex 总共有一个 SubTask, 以及当前 Task 是属于第几个 SubTask 

3298bcfb67ed4f55809691b51527a2c0.png

 

然后就是 Task 这边的执行 

945e20bee9f041fa99207604df6d8239.png 

这边执行的就是基于 jar 包 和 DataSourceTask, ChainnedFlatMapDriver, ChainnedMapDriver 等等 来执行具体的业务处理

业务这边执行流程如下

DataSource 逐行读取 Test01WordCount.txt 中的字符串信息, 这里读取的是第一行 “123 234 456”

ChainedFlatMapDriver 这边是将一行的字符串转换为多行字符串, “123 234 456” 转换为 “123”, “234”, “456”

ChainedMapDriver 这边是将一个输入进行映射, 这里的是 “123” 转换为 (“123”, 1)

SynchronousChainedCombineDriver 这边是将到目前为止的结果, hash partition 之后输出到下游的 SubTask 

SynchronousChainedCombineDriver 这边先是将记录写入到 sorter, 然后 close的时候, 在迭代记录将记录输出到下游 

495e543574d145fc93cf59704922f9e4.png

 

输出当前 Task 的各个记录的地方 

2b2c1476c3f84c6c8c8aaec7778d27d5.png 

repartition 的地方 

b3a65c985ba1472e83268605a15c5756.png 

每一个 partition 的数据是写出到 RecordWriter 下面的 ReleaseOnSonsumptionResultPartition 下面的 subpartitions[partitionIndex]

然后这部分 ResultPartition 数据是维护在 ResultPartitionManager, 这个是每一个 TaskExecutor 维护一个 ResultPartitionManager 用于相同的任务之间的不同的 SubTask 的数据交互 

所以说一个 SubTask 维护了一个 ReleaseOnSonsumptionResultPartition, 然后维护了 parallelism 个 subpartitions

然后下游的 SubTask 依次来遍历上游的 SubTask, 获取当前 SubTask 需要读取的 subpartitions[index] 来作为输入 

ef806d3fc228449aabe980c3c34f3241.png

 

输出一条记录之后信息如下, bufferBuilder 中输出了 12 字节, 前面四个字节为长度标记 0x04 长度为 4

然后接下来 8个字节为 ”0431323300000001”, 表示的是 “123” + 1, 最前面的 0x04 表示 0x03 + 1[参见 StringValue.writeString]

6812aa0d6c2843f18bdf651390caf66f.png

 

然后下游的 SubTask 这边读取的是上游的 N 个 SubTask 中的当前 subpartitionIndex 部分的输出, 这里的 partitionId 为上游输入 SubTask 的 partitionId

这里当前 SubTask 会对应 parallelism 个 InputChannel, 每一个分别关联上上游 SubTask 的输出 

2cb90acbfbe144d0bcddfc31787829cb.png

 

 

各个 SubTask 的数据交互

上游任务启动的时候, 会向 PartitionManager 注册输出的 ResultPartition 的信息 

然后这里的 partition.getPartitionId 是来自于当前 SubTask 的 partition, 每一个 SubTask 单独生成一个 

a1489d547145440e845e525d200c8595.png

 

在 TaskManager 这边传递过程如下, TaskDeploymentDescriptor.producedPartitions.shuffleDescriptor.resultPartitionID -> ReleaseOnConsumptionResultPartition -> ResultPartition

然后再到 后面的 ResultPartition.setup 的向 PartitionManager 注册 

52c1f8dc2c9b49b1a42bf81f5f8ee621.png

 

然后 JobManager 这边是生成这个 shuffleDescriptor 相关, 传递流程如下 

IntermediateResultPartition -> Vertex.resultPartitions -> Partition -> PartitionDescriptor -> NettyShuffleDescriptor

131e23b40835406d8674334574beffac.png

 

然后 Vertex.resultPartitions 这边初始化如下, 里面的组合了一个 IntermediateResultPartition, 它的 partitionId 是传递到后面 NettyShuffleDescriptor 的 partitionID, 然后这个 partitionId 是随机生成的

根据上下文来看, 就是每一个 SubTask 一个 

d931bbcb2aff4e708703f8a0f6ec7675.png

 

然后我们看一下 下游的 SubTask 这边来消费上游的 SubTask 的处理, 这里获取的是 InputChannel 的 partitionId

ed6b6ac662fb486a86b97ffd26ab9e84.png 

InputChannel 的 partitionId 是来自于上游的 inputChannelDescriptor.partitionId, 这样就把整个流程串联起来了 

下游的 SubTask 可以读取到所有上游SubTask 的结果信息 

2899588ab8c84074a34257a68b1a9e95.png

 

 

各个 Task 执行的通知 

Task 和 Task 之间是有依赖关系的, 下游的 Task 相对于 上游的Task 称之为 consumers

当上游的 Task 有数据提交到之后, 这里会通知到 JobMaster 通知 partitionId 已经在产出数据了 

然后 jobMaster 通知该 Task 的下游的 Task 开始执行 

d68d2b90be3a4d2aa39ad5846508af00.png

 

然后 JobMaster 这边收到 scheduleOrUpdateConsumers 之后的处理如下 

开始 调度下游的 consumers, 即下游的 SubTask 开始申请资源, 然后 执行 等等

6fdc0b75e85e478597b8c60771b2ef91.png

2da373267e9f442da186f19fed875c2f.png 

 

计算结果的交互 

首先是 driver 这边 

Job 提交了之后, 会执行 requestJobResult, 这里面是向 JobManager 这边发送 http 请求, 获取 给定的 Job 的执行结果 

8ef34abcbe5c40d9a085e32148669366.png

 

发送的 http 请求如下, 请求的是 “/v1/jobs/682debfc2ac22e73847c23b1953343e1/execution-result”

d03f858b568d43c6aeba184ddbffb14e.png 

然后 JobMaster 这边的处理如下, 我们这里 需要关注的是 这个 accumulatorResults, 这里面暂存的我们计算的结果 

5b385e2b66714324b34ca16e69fcb1e3.png 

然后这个 accumulatorResults 的数据来自于各个 Task 执行完成之后通知到 JobMaster 这边的 accumulators 

如下图 这里是 Task 执行完成之后提交更新任务执行状态的请求到 JobMaster, state 中携带了 accumulators

07c14a7d69e149f9960a1f68b7cdcdd7.png

 

接下来是封装 ArchivedExecutionGraph, 这里封装的 accumulators 是使用的各个 Task 执行完成之后响应的 accumulators

2b8227576bf74c7f98410f769258836c.png

 

然后 executionGraph.getAccumulatorsSerialized 遍历的基础 accumulatos 是来自于如下, 可以看到的是遍历的当前执行计划的所有的 Vertex 的 accumulators

然后 结合上上一张图可以看到的是 这里是从 vertex.attempt 中获取的数据, 然后 vertex.attempt 的数据是来自于 ExecutionGraph.updateStateInternal

然后外层 JobMaster.jobStatusChanged 将这上面生成的 ArchivedExecutionGraph 设置到了 JobManagerRunner.resultFuture 中 

81fc8e086dc64fecbefbd61c137fde9d.png

 

Task 这边, 任务执行完成之后, 将 accumulators 封装到 TaskExecutionState, 然后响应给 JobMaster

e2551d79a7dc434ba9c7dd63c8672e63.png 

 

完 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/410807.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

架构设计:流式处理与实时计算

引言 随着大数据技术的不断发展&#xff0c;流式处理和实时计算在各行各业中变得越来越重要。那么什么是流式处理呢&#xff1f;我们又该怎么使用它&#xff1f;流式处理允许我们对数据流进行实时分析和处理&#xff0c;而实时计算则使我们能够以低延迟和高吞吐量处理数据。本…

Bert基础(四)--解码器(上)

1 理解解码器 假设我们想把英语句子I am good&#xff08;原句&#xff09;翻译成法语句子Je vais bien&#xff08;目标句&#xff09;。首先&#xff0c;将原句I am good送入编码器&#xff0c;使编码器学习原句&#xff0c;并计算特征值。在前文中&#xff0c;我们学习了编…

4.测试教程 - 用例篇

文章目录 1.测试用例的基本要素2.测试用例的给我们带来的好处3.测试用例的设计方法3.1基于需求进行测试用例的设计3.1.1功能需求测试分析3.1.2非功能需求测试分析 3.2具体的设计方法3.2.1等价类3.2.2边界值3.2.3错误猜测法3.2.4判定表3.2.5场景设计法3.2.6因果图3.2.7因果图的需…

c++:vector的相关oj题(136. 只出现一次的数字、118. 杨辉三角、26. 删除有序数组中的重复项、JZ39 数组中出现次数超过一半的数字)

文章目录 1. 136. 只出现一次的数字题目详情代码(直接来异或&#xff09;思路 2. 118. 杨辉三角题目详情代码1思路代码2思路2 3. 26. 删除有序数组中的重复项题目详情代码思路 4. JZ39 数组中出现次数超过一半的数字题目详情代码1&#xff08;暴力&#xff09;思路1代码2&#…

A Visual Guide to Mamba and State Space Models

用于语言建模的 Transformers 的替代方案 Transformer 架构一直是大型语言模型 &#xff08;LLMs&#xff09; 成功的主要组成部分。它已被用于当今几乎所有LLMs正在使用的产品&#xff0c;从 Mistral 等开源模型到 ChatGPT 等闭源模型。 为了进一步改进LLMs&#xff0c;开发…

【HarmonyOS】鸿蒙开发之Stage模型-基本概念——第4.1章

Stage模型-基本概念 名词解释 AbilityStage:应用组件的“舞台“ UIAbility:包含UI界面的应用组件&#xff0c;是系统调度的基本单元 WindowStage:组件内窗口的“舞台“ Window&#xff1a;用来绘制UI页面的窗口 HAP:Harmony Ability Package(鸿蒙能力类型的包) HSP:Harmony Sh…

【算法 - 动态规划】找零钱问题Ⅰ

在前面的动态规划系列文章中&#xff0c;关于如何对递归进行分析的四种基本模型都介绍完了&#xff0c;再来回顾一下&#xff1a; 从左到右模型 &#xff1a;arr[index ...] 从 index 之前的不用考虑&#xff0c;只考虑后面的该如何选择 。范围尝试模型 &#xff1a;思考 [L ,…

C++——二叉搜索树

二叉搜索树 二叉搜索树&#xff1a; 又为搜索二叉树&#xff0c;一般具有以下的性质 若它的左子树不为空&#xff0c;则左子树上所有的节点的值都小于父亲节点若它的右子树不为空&#xff0c;则右子树上所有的节点的值都大于父亲节点它的左右子树也都为二叉搜索树 二叉搜索树…

Vue前端实现一个本地消息队列(MQ), 让消息延迟消费或者做缓存

MQ功能实现的具体代码(TsMQ.ts)&#xff1a; import { v4 as uuidx } from uuid;import emitter from /utils/mittclass Message {// 过期时间&#xff0c;0表示马上就消费exp: number;// 消费标识&#xff0c;避免重复消费tag : string;// 消息体body : any;constructor( exp…

Docker基础篇(六) dockerfile体系结构语法

FROM&#xff1a;基础镜像&#xff0c;当前新镜像是基于哪个镜像的 MAINTAINER &#xff1a;镜像维护者的姓名和邮箱地址 RUN&#xff1a;容器构建时需要运行的命令 EXPOSE &#xff1a;当前容器对外暴露出的端口号 WORKDIR&#xff1a;指定在创建容器后&#xff0c;终端默认登…

python中的类与对象(1)

目录 一. 引子&#xff1a;模板 二. 面向过程与面向对象 &#xff08;1&#xff09;面向过程编程 &#xff08;2&#xff09;面向对象编程 三. 对象与类 &#xff08;1&#xff09;对象 &#xff08;2&#xff09;类 四. 面向对象程序设计的特点&#xff1a;封装&#…

daydayEXP: 支持自定义Poc文件的图形化漏洞利用工具

daydayEXP: 支持自定义Poc文件的图形化漏洞利用工具 基于java fx写的一款支持加载自定义poc文件的、可扩展的的图形化渗透测试框架。支持批量漏洞扫描、漏洞利用、结果导出等功能。 使用 经过测试,项目可在jdk8环境下正常使用。jdk11因为缺少一些必要的组件,所以jdk11版本工…

sqli-labs第46关

注&#xff1a;说明借鉴&#xff08;现阶段水平不够&#xff0c;只能靠借鉴来完成本次作业&#xff0c;若侵权&#xff0c;必删&#xff09; 基于Sqli-Labs靶场的SQL注入-46~53关_sqli-lab less46-CSDN博客 SQL-Labs46关order by注入姿势-CSDN博客 一、首先需要sql-labs的环…

计算机设计大赛 深度学习图像风格迁移

文章目录 0 前言1 VGG网络2 风格迁移3 内容损失4 风格损失5 主代码实现6 迁移模型实现7 效果展示8 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 深度学习图像风格迁移 - opencv python 该项目较为新颖&#xff0c;适合作为竞赛课题…

StringBuffer StringBuilder

String 为什么StringBuilder是线程不安全的&#xff1f;StringBuffer是线程安全的&#xff1f; - Jacian - 博客园 (cnblogs.com) StringBuilder 线程安全的可变字符学序列 速度快 StringBuffer 线程不安全的可变字符序列 创建StringBuilder对象 new StringBuilder&…

【Java程序员面试专栏 算法思维】二 高频面试算法题:二分查找

一轮的算法训练完成后,对相关的题目有了一个初步理解了,接下来进行专题训练,以下这些题目就是汇总的高频题目,本篇主要聊聊二分查找,包括基础二分,寻找目标值的左右边界,搜索旋转数组以及波峰,以及x的平方根问题,所以放到一篇Blog中集中练习 题目关键字解题思路时间空…

BlackWidow靶场

kali&#xff1a;192.168.223.128 主机发现 nmap -sP 192.168.223.0/24 目标IP:192.168.223.153 端口扫描 nmap -sV -p- -A 192.168.223.153 22/tcp open ssh OpenSSH 7.9p1 Debian 10deb10u2 (protocol 2.0) 80/tcp open http Apache httpd 2.4.38 ((Deb…

【C++】类与对象——友元,内部类,匿名对象

类与对象 1 友元1.1 概念&#xff1a;1.2 友元函数1.3 友元类 2 内部类概念&#xff1a;特性&#xff1a;举例&#xff1a; 3 匿名对象Thanks♪(&#xff65;ω&#xff65;)&#xff89;谢谢阅读&#xff01;&#xff01;&#xff01;下一篇文章见&#xff01;&#xff01;&am…

定制红酒:设计专属标签与包装,打造与众不同个性

在云仓酒庄洒派的定制红酒服务中&#xff0c;为消费者提供个性化、专属的标签与包装设计是提升红酒与众不同性和纪念价值的关键环节。通过巧妙的设计&#xff0c;消费者可以打造出与众不同的红酒&#xff0c;展现自己的个性与品味。 首先&#xff0c;标签设计是展现红酒个性的重…

Mysql 的高可用详解

Mysql 高可用 复制 复制是解决系统高可用的常见手段。其思路就是&#xff1a;不要把鸡蛋都放在一个篮子里。 复制解决的基本问题是让一台服务器的数据与其他服务器保持同步。一台主库的数据可以同步到多台备库上&#xff0c;备库本身也可以被配置成另外一台服务器的主库。主…