【Flink网络数据传输】OperatorChain的设计与实现

文章目录

    • 1.OperatorChain的设计与实现
    • 2.OperatorChain的创建和初始化
    • 3.创建RecordWriterOutput

1.OperatorChain的设计与实现

OperatorChain的大致逻辑

在JobGraph对象的创建过程中,将链化可以连在一起的算子,常见的有StreamMap、StreamFilter等类型的算子。OperatorChain中的所有算子都会被运行在同一个Task实例中。StreamTaskNetworkOutput会将接入的数据元素写入算子链的HeadOperator中,从而开启整个OperatorChain的数据处理。

 
OperatorChain的Output组件:将数据发送到下游

如图所示,在OperatorChain中通过Output组件将上下游算子相连,当上游算子数据处理完毕后,会通过Output组件发送到下游的算子中继续处理。

 
OperatorChain的collect():收集处理完的数据

如图所示,OperatorChain内部定义了WatermarkGaugeExposingOutput接口,且该接口分别继承了Output和Collector接口。Collector接口提供了collect()方法,用于收集处理完的数据。

 
OperatorChain的Output接口:也能输出Watermark和LatencyMarker等事件

Output接口提供了emitWatermark()、emitLatencyMarker()等方法,用于对Collector接口进行拓展,使得Output接口实现类可以输出Watermark和LatencyMarker等事件。WatermarkGaugeExposingOutput接口则提供了获取WatermarkGauge的方法,用于监控最新的Watermark。

 
OperatorChain内部定义了不同的WatermarkGaugeExposingOutput接口实现类。

  1. RecordWriterOutput:用于输出OperatorChain中尾端算子处理完成的数据,借助RecordWriter组件将数据元素写入网络
  2. ChainingOutput/CopyingChainingOutput:适用于上下游算子连接在一起且上游算子属于单输出类型的情况。
  3. BroadcastingOutputCollector/CopyingBroadcastingOutputCollector:上游算子是多输出类型但上下游算子之间的Selector为空时,创建广播类型的BroadcastingOutputCollector。
  4. DirectedOutput/CopyingDirectedOutput:上游算子是多输出类型且Selector不为空时,创建DirectedOutput或CopyingDirectedOutput连接上下游算子

在这里插入图片描述

例子:收集数据并通过Output发数据数据到下游

例如在WordCount的程序中定义flatMap()方法时,会调用Collector.collect()方法收集数据元素,每个算子在定义的函数或使用Output接口的实现类中,完成了上游算子向下游算子发送数据元素的操作

 

2.OperatorChain的创建和初始化

接下来我们看OperatorChain的初始化过程,如下代码,OperatorChain的构造器包含如下逻辑。

  1. 创建StreamOperator(即算子)实例,这里StreamOperator会封装为StreamOperatorFactory并存储在StreamGraph结构中。
  2. 获取算子之间的链接配置。chainedConfigs的配置决定了算子之间Output接口的具体实现。
  3. 遍历当前作业所有节点的输出边,并构建RecordWriterOutput组件,最终通过RecordWriterOutput组件将数据元素输出到网络中。
  4. 创建OperatorChain内部算子之间的上下游连接,完成OperatorChain内部上下游算子之间的数据传输
  5. 单独创建headOperator。headOperator是OperatorChain的头部节点,创建完成后将headOperator暴露到StreamTask实例,供DataOutput接口实现类调用
  6. 如果OperatorChain构建失败,则关闭实例,防止出现内存泄漏。
public OperatorChain(
      StreamTask<OUT, OP> containingTask,
      RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> 
         recordWriterDelegate
      ) {
   // 获取当前StreamTask的userCodeClassloader
   final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
   // 获取StreamConfig
   final StreamConfig configuration = containingTask.getConfiguration();
   // 获取StreamOperatorFactory
   StreamOperatorFactory<OUT> operatorFactory = 
   configuration.getStreamOperatorFactory(userCodeClassloader);
   // 读取chainedConfigs
   Map<Integer, StreamConfig> chainedConfigs = 
configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);
   // 根据StreamEdge创建RecordWriterOutput组件
   List<StreamEdge> outEdgesInOrder = 
   configuration.getOutEdgesInOrder(userCodeClassloader);
   Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap = 
   new HashMap<>(outEdgesInOrder.size());
   this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
   boolean success = false;
   try {
      for (int i = 0; i < outEdgesInOrder.size(); i++) {
         StreamEdge outEdge = outEdgesInOrder.get(i);
         // 为每个输出边创建RecordWriterOutput
         RecordWriterOutput<?> streamOutput = createStreamOutput(
            recordWriterDelegate.getRecordWriter(i),
            outEdge,
            chainedConfigs.get(outEdge.getSourceId()),
            containingTask.getEnvironment());
         this.streamOutputs[i] = streamOutput;
         streamOutputMap.put(outEdge, streamOutput);
      }
      // 创建OperatorChain内部算子之间的连接
      List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
      this.chainEntryPoint = createOutputCollector(
         containingTask,
         configuration,
         chainedConfigs,
         userCodeClassloader,
         streamOutputMap,
         allOps,
         containingTask.getMailboxExecutorFactory());
      if (operatorFactory != null) {
         WatermarkGaugeExposingOutput<StreamRecord<OUT>> output = 
            getChainEntryPoint();
         // 创建headOperator
         headOperator = StreamOperatorFactoryUtil.createOperator(
               operatorFactory,
               containingTask,
               configuration,
               output);
         headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK,
         output.getWatermarkGauge());
      } else {
         headOperator = null;
      }
      allOps.add(headOperator);
      this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size()]);
      success = true;
   }
   finally {
      // 如果创建不成功,则关闭StreamOutputs中的RecordWriterOutput
      // 这里防止内存泄漏
      if (!success) {
         for (RecordWriterOutput<?> output : this.streamOutputs) {
            if (output != null) {
               output.close();
            }
         }
      }
   }
}

OperatorChain作用小结

当OperatorChain创建完成后,就能正常接收StreamTaskInput中的数据元素了。在OperatorChain内部算子之间进行数据传递和处理,最终通过RecordWriterOutput组件将处理完成的数据元素发送到网络中,供下游的Task实例使用。

对于OperatorChain内部Output接口的实现,这里暂不展开。

 

3.创建RecordWriterOutput

RecordWriterOutput用于将数据输出到网络指定位置。

OperatorChain.createStreamOutput()逻辑如下:

  1. 获取输出边的OutputTag标签,判断当前Stream节点输出边是否为旁路输出,即在DataStream API中是否使用了旁路输出的相关方法。
  2. 返回RecordWriterOutput。RecordWriterOutput中包含RecordWriter组件,最终会通过RecordWriter将算子链处理完成的数据写入网络。
private RecordWriterOutput<OUT> createStreamOutput(
      RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
      StreamEdge edge,
      StreamConfig upStreamConfig,
      Environment taskEnvironment) {
   // 获取OutputTag
   OutputTag sideOutputTag = edge.getOutputTag(); 
   // 获取数据序列化器TypeSerializer
   TypeSerializer outSerializer = null;
   // 如果StreamEdge指定了OutputTag
   if (edge.getOutputTag() != null) {
      // 则进行边路输出
      outSerializer = upStreamConfig.getTypeSerializerSideOut(
            edge.getOutputTag(), taskEnvironment.getUserClassLoader());
   } else {
      // 正常输出
      outSerializer = 
      upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
   }
   // 返回创建的RecordWriterOutput实例
   return new RecordWriterOutput<>(recordWriter, outSerializer, sideOutputTag, this);
}

 

StreamRecord将数据输出的逻辑

在RecordWriterOutput.collect()方法中定义了StreamRecord数据的输出逻辑,实际上是调用pushToRecordWriter()方法将数据写入RecordWriter,最终通过RecordWriter组件进行数据元素的网络输出

public void collect(StreamRecord<OUT> record) {
   if (this.outputTag != null) {
      return;
   }
   pushToRecordWriter(record);
}

 

pushToRecordWriter发送数据

  1. 调用serializationDelegate.setInstance()方法,对接入的数据元素进行序列化操作,将数据元素转换成二进制格式。
  2. 调用recordWriter.emit()方法通过RecordWriter组件将serializationDelegate中序列化后的二进制数据输出到下游网络中。
//RecordWriterOutput.pushToRecordWriter()
private <X> void pushToRecordWriter(StreamRecord<X> record) {
   serializationDelegate.setInstance(record);
   try {
      recordWriter.emit(serializationDelegate);
   }
   catch (Exception e) {
      throw new RuntimeException(e.getMessage(), e);
   }
}

 
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/435206.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

[C语言]——分支和循环(1)

目录 一.if语句 1.if 2.else 3.分支中包含多条语句 4.嵌套if 5.悬空else问题 二.关系操作符 三.条件操作符 C语⾔是结构化的程序设计语⾔&#xff0c;这⾥的结构指的是顺序结构、选择结构、循环结构&#xff0c;C语⾔是能够实现这三种结构的&#xff0c;其实我们如果仔细分析&a…

xinput1_3.dll丢失都有什么办法可以有效的解决、xinput1_3.dll导致游戏不能启动怎么办?

使用电脑的过程中是不是会遇到关于某个dll文件丢失的提示&#xff0c;今天想和大家聊的是xinput1_3.dll文件&#xff0c;如果电脑提示xinput1_3.dll丢失有什么办法可以有效的解决&#xff0c;解决办法都有哪些&#xff0c;如果xinput1_3.dll丢失会对电脑有什么影响。&#xff0…

8、Linux-软件安装:rpm和yum;配置yum阿里云镜像源

一、介绍 Linux安装软件有两种方式&#xff0c; ①rpm&#xff1a;安装已有的安装包&#xff0c;类似于Windows中双击exe的安装包程序 ②yum&#xff1a;拉取远程仓库的文件&#xff0c;类似于python的pip install 区别&#xff1a;假设软件A依赖软件B&#xff0c;软件B依赖…

【打工日常】Linux实现可回滚的回收站功能

1.为什么创建可回滚的回收站功能&#xff1f; 为了让运维人员可以有回旋的余地&#xff0c;但是也要保证可以清理不需要的文件。 2.涉及到的文件安全概念&#xff1f; Linux的文件安全概念主要涉及到文件权限和文件系统安全两个方面。 那什么是文件权限&#xff1f; 在Linux系统…

数学建模【整数规划】

一、整数规划简介 整数规划其实是线性规划和非线性规划的一个特殊情况&#xff0c;即有的变量取值只能是整数&#xff0c;不能是小数。这时候就需要一个新的函数来解决问题。 对于整数规划&#xff0c;分为线性整数规划和非线性整数规划 线性整数规划&#xff1a;MATLAB可进…

VS2022打包C#安装包(最新、最全)

开发c#的一个小工具到打包环境碰壁了&#xff0c;在网上找了很多资料耶踩了很多坑&#xff0c;耗时1hour才打包完毕&#xff0c;避免以后碰到类似的问题再次记录&#xff0c;自认为步骤比较全面&#xff0c;如果有帮助麻烦点个赞呗&#xff01;&#xff01;&#xff01; 一、Mi…

MATLAB知识点:循环语句的经典练习题

​讲解视频&#xff1a;可以在bilibili搜索《MATLAB教程新手入门篇——数学建模清风主讲》。​ MATLAB教程新手入门篇&#xff08;数学建模清风主讲&#xff0c;适合零基础同学观看&#xff09;_哔哩哔哩_bilibili 节选自​第4章&#xff1a;MATLAB程序流程控制 下面我们来看…

代码随想录(day1)

1. 二分查找&#xff1a; 704. 二分查找 - 力扣&#xff08;LeetCode&#xff09; 对于二分查找的思想较为简单&#xff0c;具体如下&#xff1a; 假设寻找的值为&#xff0c;分别定义两个变量,其中&#xff0c;。 再定义一个变量&#xff0c;如果&#xff0c;表示需要查找的元…

灵魂指针,教给(一)

欢迎来到白刘的领域 Miracle_86.-CSDN博客 系列专栏 C语言知识 先赞后看&#xff0c;已成习惯 创作不易&#xff0c;多多支持&#xff01; 一、内存和地址 1.1 内存 在介绍知识之前&#xff0c;先来想一个生活中的小栗子&#xff1a; 假如把你放在一个有100间屋子的酒店…

flutter_gen依赖

flutter_gen 5.4.0 flutter项目内终端&#xff1a; dart pub global activate flutter_gen export PATH“ P A T H " : " PATH":" PATH":"HOME/.pub-cache/bin” fluttergen

为 OpenBMC 添加一个新的系统

1. 前言 在上一篇文章中向大家介绍了 OpenBMC 的是什么以及它的作用和应用场景&#xff0c;并且以一个自带的示例平台 romulus 展示了从下载源码包开始到启动系统并访问 Web 控制页面的整体构建流程。 通过前文已经了解到如何为已有的平台构建系统镜像&#xff0c;下面我们来…

AcWing 1027. 方格取数

解题思路 相关代码 import java.util.Scanner;public class Main {public static void main(String[] args){Scanner scanner = new Scanner(System.in);int n = scanner.nextInt();int nums[][] = new int[n+1][n+1];while(true){int a = scanner.nextInt();int b = scanner.…

1.初识python

1.初识python 编程语言是用来定义计算机程序的语言&#xff0c;用来向计算机发出指令。 1.python语言是一种面向对象的解释型高级编程语言。 解释型语言&#xff1a;使用专门的解释器对源码程序逐行解释成特定平台的机器并立即执行&#xff0c;是代码在执行时才被解释器一行行…

数据库(mysql)-新手笔记-基本知识点(1)

基本概念 数据库 Database :存储数据的容器 表 Table : 在数据库中存储的基本结构,它由行和列组成 行 Row : 表中的一条记录 列 Column : 表中的字段,定义了数据的类型和约束 数据类型 数据值 如 INT(整型),FLAOT(浮点型) ,DECIMAL (精确小数点) 字符串 如 VARCHAR(可变长度字…

Redis报错NOAUTH Authentication required怎么解决?

问题描述 在使用redis-cli时&#xff0c;可能会遇到报错 (error) NOAUTH Authentication required. 问题分析 这是因为在redis的配置文件 redis.windows-service.conf 中设置了密码&#xff0c;导致了这个问题 问题解决 1. 在redis.windows-service.conf文件查看密码 2. 输…

苹果曝出两个 iOS 系统 0-Day 漏洞

最近&#xff0c;苹果公司发布了紧急安全更新&#xff0c;解决了两个 iOS 零日漏洞。这些漏洞存在于 iOS 内核&#xff08;CVE-2024-23225&#xff09;和 RTKit&#xff08;CVE-2024-23296&#xff09;中&#xff0c;威胁攻击者可利用其绕过内核内存保护&#xff0c;这就给了具…

[R] ggplot2 - exercise (“fill =“)

We have made the plots like: Lets practice with what we have learnt in: [R] How to communicate with your data? - ggplot2-CSDN博客https://blog.csdn.net/m0_74331272/article/details/136513694 #tutorial 5 -script #Exercise 1 #1.1# ggplot(smoking_and_drug_use_…

20 easy 70. 爬楼梯

//假设你正在爬楼梯。需要 n 阶你才能到达楼顶。 // // 每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢&#xff1f; // // // // 示例 1&#xff1a; // // //输入&#xff1a;n 2 //输出&#xff1a;2 //解释&#xff1a;有两种方法可以爬到楼顶。 /…

electron 架构

文章目录 Chromium 架构Electron 架构 Chromium 架构 主体架构&#xff1a;主进程 Browser&#xff0c;打开一个页面就会启动一个 Render 渲染进程&#xff0c;进程间通信就是 IPC 机制&#xff08;Inter-Process Communication&#xff09;。 主进程的 RenderProcessHost 和 R…

算法 - 【受限条件下可到达节点的数目】

受限条件下可到达节点的数目 题目示例1示例2 分析代码 题目 现有一棵由 n 个节点组成的无向树&#xff0c;节点编号从 0 到 n - 1 &#xff0c;共有 n - 1 条边。给你一个二维整数数组 edges &#xff0c;长度为 n - 1 &#xff0c;其中 edges[i] [ai, bi] 表示树中节点 ai 和…