【Flink网络数据传输(4)】RecordWriter(下)封装数据并发送到网络的过程

文章目录

  • 一. RecordWriter封装数据并发送到网络
    • 1. 数据发送到网络的具体流程
    • 2. 源码层面
      • 2.1. Serializer的实现逻辑
        • a. SpanningRecordSerializer的实现
        • b. SpanningRecordSerializer中如何对数据元素进行序列化
      • 2.2. 将ByteBuffer中间数据写入BufferBuilder
  • 二. BufferBuilder申请资源并创建
    • 1. ChannelSelectorRecordWriter创建BufferBuilder
    • 2. BroadcastRecordWriter创建BufferBuilder

一. RecordWriter封装数据并发送到网络

1. 数据发送到网络的具体流程

RecordWriter对接入的StreamRecord数据进行序列化并等待下游任务消费的过程,整个过程细节如下。

  1. StreamRecord通过RecordWriterOutput写入RecordWriter,并在RecordWriter中通过RecordSerializer组件将StreamRecord序列化为ByteBuffer数据格式。

  2. RecordWriter向ResultPartition申请BufferBuilder对象,用于构建BufferConsumer对象,将序列化后的二进制数据存储在申请到的Buffer中。ResultPartition会向LocalBufferPool申请MemorySegment内存块,用于存储Buffer数据

  3. BufferBuilder中会不断接入ByteBuffer数据,直到将BufferBuilder中的Buffer空间占满,此时会申请新的BufferBuilder继续构建BufferConsumer数据集。

  4. Buffer构建完成后,会调用flushTargetPartition()方法,让ResultPartition向下游输出数据,此时会通知NetworkSequenceViewReader组件开始消费ResultSubPartition中的BufferConsumer对象。

  5. 当BufferConsumer中Buffer数据被推送到网络后,回收BufferConsumer中的MemorySegment内存空间,继续用于后续的消息处理。

在这里插入图片描述

 

2. 源码层面

接下来我们从源码的角度了解RecordWriter具体处理数据的逻辑。在RecordWriterOutput中调用pushToRecordWriter方法将数据写出。

在这里插入图片描述

通过recordWriter.emit(serializationDelegate)方法,将数据元素发送到RecordWriter中进行处理。主要逻辑如下

  1. 序列化数据为ByteBuffer二进制数据,并缓存在SpanningRecordSerializer.serializationBuffer对象中。
  2. 将序列化器生成的中间数据复制到指定分区中,实际上就是将ByteBuffer数据复制到BufferBuiler对象中。
  3. 如果BufferBuiler中存储了完整的数据元素,就会清空序列化器的中间数据,因为序列化器中累积的数据不宜过大。
protected void emit(T record, int targetSubpartition) throws IOException {  
    checkErroneous();  
  
    targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition);  
  
    if (flushAlways) {  
        targetPartition.flush(targetSubpartition);  
    }  
}



protected void emit(T record, int targetChannel) throws IOException, 
   InterruptedException {
   checkErroneous();
   // 数据序列化
   serializer.serializeRecord(record);
   // 将序列化器中的数据复制到指定分区中
   if (copyFromSerializerToTargetChannel(targetChannel)) {
      // 清空序列化器
      serializer.prune();
   }
}

 

2.1. Serializer的实现逻辑

接着了解如何将序列化器中的数据转换成Buffer并存储到ResultPartiton中,最终将数据发送到下游。

a. SpanningRecordSerializer的实现

SpanningRecordSerializer实现将序列化后的BytesBuffer数据写入BufferBuilder。

SpanningRecordSerializer对象主要包含了DataOutputSerializer serializationBuffer和ByteBuffer dataBuffer两个成员变量。

  • DataOutputSerializer可以将数据转换成二进制格式并存储在byte[]数组中。在serialization中会调用serializationBuffer.wrapAsByteBuffer()方法,将serializationBuffer中生成的byte[]数组转换成ByteBuffer数据结构,并赋值给dataBuffer对象。
  • ByteBuffer是Java NIO中用于对二进制数据进行操作的Buffer接口,底层有DirectByteBuffer和HeapByteBuffer等实现,通过ByteBuffer提供的方法,可以轻松实现对二进制数据的操作。

 

b. SpanningRecordSerializer中如何对数据元素进行序列化

SpanningRecordSerializer.serializeRecord()方法主要逻辑如下。

1)清理serializationBuffer的中间数据,实际上就是将byte[]数组的position参数置为0。
2)设定serialization buffer的初始容量,默认不小于4。
3)将数据元素写入serializationBuffer的bytes[]数组。(所有数据元素都实现了IOReadableWritable接口,可以直接将数据对象转换为二进制格式)
4)获取serializationBuffer的长度信息,并写入serializationBuffer。
5)将serializationBuffer中的byte[]数据封装为java.io.ByteBuffer数据结构,最终赋值到dataBuffer的中间结果中。

public void serializeRecord(T record) throws IOException {
   if (CHECKED) {
      if (dataBuffer.hasRemaining()) {
         throw new IllegalStateException("Pending serialization of previous record.");
      }
   }
   // 首先清理serializationBuffer中的数据
   serializationBuffer.clear();
   // 设定serialization buffer数量
   serializationBuffer.skipBytesToWrite(4);
   // 将record数据写入serializationBuffer
   record.write(serializationBuffer);
   // 获取serializationBuffer的长度信息并记录到serializationBuffer对象中
   int len = serializationBuffer.length() - 4;
   serializationBuffer.setPosition(0);
   serializationBuffer.writeInt(len);
   serializationBuffer.skipBytesToWrite(len);
   // 对serializationBuffer进行wrapp处理,转换成ByteBuffer数据结构
   dataBuffer = serializationBuffer.wrapAsByteBuffer();
}

Flink 1.12版本中RecordWriter就提供了serializeRecord的能力,没有单拎出来实现。

 

2.2. 将ByteBuffer中间数据写入BufferBuilder

首先BufferBuilder用于构建完整的Buffer数据。在copyFromSerializerToTargetChannel()方法中实现了将RecordSerializer中的ByteBuffer中间数据写入BufferBuilder的逻辑:

  1. 对序列化器进行Reset操作,重置初始化位置。
  2. 将序列化器的ByteBuffer中间数据写入BufferBuilder。
  3. 判断当前BufferBuilder是否构建了完整的Buffer数据,完成BufferBuilder中Buffer的构建。
  4. 判断SerializationResult中是否具有完整的数据元素,如果是则将pruneTriggered置为True,然后清空当前的BufferBuilder,并跳出循环。
  5. 创建新的bufferBuilder,继续从序列化器中将中间数据复制到BufferBuilder中。
  6. 指定flushAlways参数为True,调用flushTargetPartition()方法将数据写入ResultPartition。为防止过度频繁地将数据写入ResultPartiton,在RecordWriter中会有独立的outputFlusher线程(在构造器中),周期性地将构建出来的Buffer数据推送到ResultPartiton本地队列中存储,默认延迟为100ms。
protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws 
   IOException, InterruptedException {
   // 对序列化器进行Reset操作,初始化initial position
   serializer.reset();
   // 创建BufferBuilder
   boolean pruneTriggered = false;
   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
   // 调用序列化器将数据写入bufferBuilder
   SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
   // 如果SerializationResult是完整Buffer
   while (result.isFullBuffer()) {
      // 则完成创建Buffer数据的操作
      finishBufferBuilder(bufferBuilder);
      // 如果是完整记录,则将pruneTriggered置为True
      if (result.isFullRecord()) {
         pruneTriggered = true;
         emptyCurrentBufferBuilder(targetChannel);
         break;
      }
      // 创建新的bufferBuilder,继续复制序列化器中的数据到BufferBuilder中
      bufferBuilder = requestNewBufferBuilder(targetChannel);
      result = serializer.copyToBufferBuilder(bufferBuilder);
   }
   checkState(!serializer.hasSerializedData(), "All data should be written at once");
     // 如果指定的flushAlways,则直接调用flushTargetPartition将数据写入ResultPartition
   if (flushAlways) {
      flushTargetPartition(targetChannel);
   }
   return pruneTriggered;
}

 

二. BufferBuilder申请资源并创建

1. ChannelSelectorRecordWriter创建BufferBuilder

在ChannelSelectorRecordWriter.getBufferBuilder()方法中定义了BufferBuilder的创建过程。

//1. targetChannel确认数据写入的分区,ID与下游InputGate中的InputChannelID是对应的
//2. 
public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, 
   InterruptedException {
	//在ChannelSelectorRecordWriter中维护了
	//bufferBuilders[]数组,用于存储创建好的BufferBuilder对象
   if (bufferBuilders[targetChannel] != null) {
      return bufferBuilders[targetChannel];
   } else {
   //只有在无法从bufferBuilders[]中获取BufferBuilder时,
   //才会调用requestNewBufferBuilder()方法创建新的BufferBuilder对象。
      return requestNewBufferBuilder(targetChannel);
   }
}

requestNewBufferBuilder()方法逻辑如下

  1. 检查bufferBuilders[]的状态,确保bufferBuilders[targetChannel]为空或者bufferBuilders[targetChannel].isFinished()方法返回值为True。
  2. 调用targetPartition.getBufferBuilder()方法获取新的BufferBuilder,这里的targetPartition就是前面提到的ResultPartition。在ResultPartition中会向LocalBufferPool申请Buffer内存空间,用于存储序列化后的ByteBuffer数据。
  3. 向targetPartition添加通过bufferBuilder构建的BufferConsumer对象,bufferBuilder和BufferConsumer内部维护了同一个Buffer数据。BufferConsumer会被存储到ResultSubpartition的BufferConsumer队列中。
  4. 将创建好的bufferBuilder添加至数组,用于下次直接获取和构建BufferConsumer对象。
public BufferBuilder requestNewBufferBuilder(int targetChannel) throws 
   IOException, InterruptedException {
   checkState(bufferBuilders[targetChannel] == null 
              || bufferBuilders[targetChannel].isFinished());
   // 调用targetPartition获取BufferBuilder
   BufferBuilder bufferBuilder = targetPartition.getBufferBuilder();
   // 向targetPartition中添加BufferConsumer
   targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(),
                                     targetChannel);
   // 将创建好的bufferBuilder添加至数组
   bufferBuilders[targetChannel] = bufferBuilder;
   return bufferBuilder;
}

 

2. BroadcastRecordWriter创建BufferBuilder

在BroadcastRecordWriter内部创建BufferBuilder的过程中,会将创建的bufferConsumer对象添加到所有的ResultSubPartition中,实现将Buffer数据下发至所有InputChannel,如下代码:

public BufferBuilder requestNewBufferBuilder(int targetChannel) 
    throws IOException, InterruptedException {
   checkState(bufferBuilder == null || bufferBuilder.isFinished());
   BufferBuilder builder = targetPartition.getBufferBuilder();
   if (randomTriggered) {
      targetPartition.addBufferConsumer(builder.createBufferConsumer(), targetChannel);
   } else {
      try (BufferConsumer bufferConsumer = builder.createBufferConsumer()) {
         for (int channel = 0; channel < numberOfChannels; channel++) {
            targetPartition.addBufferConsumer(bufferConsumer.copy(), channel);
         }
      }
   }
   bufferBuilder = builder;
   return builder;
}

 

以上步骤就是在RecordWriter组件中将数据元素序列化成二进制格式,然后通过BufferBuilder构建成Buffer类型数据,最终存储在ResultPartition的ResultSubPartition中。

这是从Task的层面了解数据网络传输过程,下篇了解在TaskManager中如何构建底层的网络传输通道。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/435394.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

OpenHarmony教程指南—Navigation开发 页面切换场景范例

简介 在应用开发时&#xff0c;我们常常遇到&#xff0c;需要在应用内多页面跳转场景时中使用Navigation导航组件做统一的页面跳转管理&#xff0c;它提供了一系列属性方法来设置页面的标题栏、工具栏以及菜单栏的各种展示样式。除此之外还拥有动态加载&#xff0c;navPathSta…

安全增强型 Linux

书接上篇 一查看selinux状态 SELinux的状态&#xff1a; enforcing&#xff1a;强制&#xff0c;每个受限的进程都必然受限 permissive&#xff1a;允许&#xff0c;每个受限的进程违规操作不会被禁止&#xff0c;但会被记录于审计日志 disabled&#xff1a;禁用 相关命令…

操作系统原理与实验——实验四短进程优先调度

实验指南 运行环境&#xff1a; Dev c 算法思想&#xff1a; 短进程优先 (SPF)调度算法则是从就绪队列中选出一个估计运行时间最短的进程&#xff0c;将处理机分配给它&#xff0c;使它立即执行并一直执行到完成 核心数据结构&#xff1a; typedef struct data{ int hour; int…

kafka消费端消息去重方案

背景 我们在日常工作中&#xff0c;消费kafka消息是一个最常见的操作&#xff0c;不过由于kafka队列中经常包含重复的消息&#xff0c;并且消息量巨大&#xff0c;所以我们消费端总是需要先把消息进行去重后在消费&#xff0c;以减少消费端的压力&#xff0c;那么日常中我们一…

Java面试(1)之 JVM篇

内存模型及原理 1, JVM内存模型 2, 类加载器及双亲委派模型 2.1 类加载器的作用? 将Java文件解析成Class文件对象,即 通过一个类的全限定名来得到其二进制字节流.(不同类加载器加载的对象一定不同) 2.2 什么是双亲委派模型? 如果一个类接收到类加载的请求不会自己去加载,…

微服务系列(一)springcloudAlibaba之Nacos注册和配置中心及openFeign远程调用

一&#xff0c;认识微服务 我们先看看开发大型项目采用单体架构存在哪些问题&#xff0c;而微服务架构又是如何解决这些问题的。 1.1 单体架构 单体架构&#xff08;monolithic structure&#xff09;&#xff1a;整个项目中所有功能模块都在一个工程中开发&#xff1b;项目部署…

MySQL 备份方案

优质博文&#xff1a;IT-BLOG-CN 一、为什么要备份 【1】容灾恢复&#xff1a;硬件故障、不经意的 Bug 导致数据损坏&#xff0c;或者服务器及其数据由于某些原因不可获取或无法使用等&#xff08;例如&#xff1a;机房大楼烧毁&#xff0c;恶意的黑客攻击或 Mysql 的 Bug 等&…

React_ 三、Router路由配置

文章目录 [TOC](文章目录) Router路由配置安装和封装使用声明式导航Link和编程式导航useNavigate 导航传参useSearchParams 接收传参useParams 接收传参 路由嵌套children和菜单式渲染404路由配置 路由模式history模式&#xff0c;无/#/ 需要后端支持hash模式&#xff0c;有/#/…

开源模型应用落地-工具使用篇-Spring AI(七)

一、前言 在AI大模型百花齐放的时代&#xff0c;很多人都对新兴技术充满了热情&#xff0c;都想尝试一下。但是&#xff0c;实际上要入门AI技术的门槛非常高。除了需要高端设备&#xff0c;还需要面临复杂的部署和安装过程&#xff0c;这让很多人望而却步。不过&#xff0c;随着…

删除的文件能恢复吗?分享3个恢复方法

我们经常会遇到文件夹里的文件不小心被删除的情况&#xff0c;面对这种情况很多人会感到焦虑和无助。但实际上文件恢复并不是一件难事。在本文中我将分享一些实用的文件恢复方法&#xff0c;并深入探讨各种方法的优缺点&#xff0c;帮助大家更好地应对文件误删的问题。 首先让我…

集简云新增通义千问qwen 72b chat、qwen1.5 等多种大语言模型,提升多语言支持能力

通义千问再开源&#xff01;继发布多模态模型后&#xff0c;通义千问 1.5 版本也在春节前上线。 此次大模型包括六个型号&#xff1a;0.5B、1.8B、4B、7B、14B 和 72B&#xff0c;性能评测基础能力在在语言理解、代码生成、推理能力等多项基准测试中均展现出优异的性能&#x…

Jupyter如何开启Debug调试功能

由于需要对算子做远程调试功能&#xff0c;需要在jupyter中开启远程断点调试功能&#xff0c;特此记录。 本文写作时用到的系统是Ubuntu22&#xff0c;Python的版本是3.8. 首先&#xff0c;创建虚拟环境。 python -m venv venv source venv/bin/activate接着&#xff0c;安装…

hardlock.sys蓝屏解决办法【windows】

微软系统有时会蓝屏无法开机&#xff0c; 需要记下导致蓝屏的文件。 这里是【hardlock.sys】文件导致的。 解决办法是找到这个文件&#xff0c;把文件改名字&#xff0c;让系统找不到这个文件。 可以参考路径&#xff1a;C盘》C:\Windows\System32\drivers\hardlock.sys 把…

回归预测 | Matlab实现BiTCN-BiGRU-Attention双向时间卷积双向门控循环单元融合注意力机制多变量回归预测

回归预测 | Matlab实现BiTCN-BiGRU-Attention双向时间卷积双向门控循环单元融合注意力机制多变量回归预测 目录 回归预测 | Matlab实现BiTCN-BiGRU-Attention双向时间卷积双向门控循环单元融合注意力机制多变量回归预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.M…

金鸣识别(OCR)与人眼识别哪个更准?

关于OCR&#xff08;Optical Character Recognition&#xff0c;光学字符识别&#xff09;金鸣识别与人眼识别率的对比&#xff0c;确实是一个引人入胜的话题。首先&#xff0c;我们要明确一点&#xff0c;虽然OCR技术在过去几十年里取得了巨大的进步&#xff0c;但要达到与人类…

QCustomPlot / C++ 追踪点、标签绘制开发

一、项目介绍&#xff1a; QCustomPlot曲线相关 1、曲线&#xff08;折线&#xff09;的后面有一个标签&#xff1b;点击标签可移动垂直方向移动曲线 2、曲线下方有纯文本标签 3、曲线设置多个追踪点 4、追踪点可跟随鼠标沿着曲线移动 5、多条曲线移动不卡顿 二、项目展示…

[IDE工具]Ubuntu18.04 VSCode版本升级

一、下载新版本 https://code.visualstudio.com/Download 二、安装deb sudo dpkg -i code_1.87.0-1709078641_amd64.deb 升级完成&#xff01; 三、问题解决 1. 依赖于 libc6 (> 2.28)&#xff1b;然而&#xff1a;系统中 libc6:amd64 的版本为 2.27-3ubuntu1.6 1.1…

代码学习记录13

随想录日记part13 t i m e &#xff1a; time&#xff1a; time&#xff1a; 2024.03.06 主要内容&#xff1a;今天的主要内容是二叉树的第二部分哦&#xff0c;主要有层序遍历&#xff1b;翻转二叉树&#xff1b;对称二叉树。 102.二叉树的层序遍历226.翻转二叉树101. 对称二叉…

什么是ElasticSearch的深度分页问题?如何解决?

在ElasticSearch中进行分页查询通常使用from和size参数。当我们对ElasticSearch发起一个带有分页参数的查询(如使用from和size参数)时,ElasticSearch需要遍历所以匹配的文档直到达到指定的起始点(from),然后返回从这一点开始的size个文档 在这个例子中: 1.from 参数定义…

华为配置智能升级功能升级设备示例

配置智能升级功能升级设备示例 组网图形 图1 配置智能升级功能组网图 背景信息组网需求配置思路前提条件操作步骤操作结果 背景信息 为了方便用户及时了解设备主流运行版本&#xff0c;快速完成升级修复&#xff0c;华为设备支持自动下载、自助升级功能。用户在设备Web网管…