Flink 数据序列化

为 Flink 量身定制的序列化框架

大家都知道现在大数据生态非常火,大多数技术组件都是运行在JVM上的,Flink也是运行在JVM上,基于JVM的数据分析引擎都需要将大量的数据存储在内存中,这就不得不面临JVM的一些问题,比如Java对象存储密度较低等。针对这些问题,最常用的方法就是实现一个显式的内存管理,也就是说用自定义的内存池来进行内存的分配回收,接着将序列化后的对象存储到内存块中。
现在Java生态圈中已经有许多序列化框架,比如说Java serialization, Kryo,Apache Avro等等。但是Flink依然是选择了自己定制的序列化框架,那么到底有什么意义呢?若Flink选择自己定制的序列化框架,对类型信息了解越多,可以在早期完成类型检查,更好的选取序列化方式,进行数据布局,节省数据的存储空间,直接操作二进制数据。
在这里插入图片描述Flink在其内部构建了一套自己的类型系统,Flink现阶段支持的类型分类如图所示,从图中可以看到Flink类型可以分为基础类型Basic、数组Arrays、复合类型Composite、辅助类型Auxiliary、泛型和其它类型GenericFlink支持任意的Java或是Scala类型。不需要像Hadoop一样去实现一个特定的接口org.apache.hadoop.io.WritableFlink能够自动识别数据类型。
在这里插入图片描述
TypeInformation的思维导图如图所示,从图中可以看出,在Flink中每一个具体的类型都对应了一个具体的TypeInformation实现类,例如BasicTypeInformation中的IntegerTypeInformationFractionalTypeInformation都具体的对应了一个TypeInformation。然后还有BasicArrayTypeInformationCompositeType以及一些其它类型,也都具体对应了一个TypeInformation

TypeInformationFlink类型系统的核心类。对于用户自定义的Function来说,Flink需要一个类型信息来作为该函数的输入输出类型,即TypeInfomation。该类型信息类作为一个工具来生成对应类型的序列化器TypeSerializer,并用于执行语义检查,比如当一些字段在作为joingrouping的键时,检查这些字段是否在该类型中存在。

Flink 的序列化过程

Flink序列化过程中,进行序列化操作必须要有序列化器,那么序列化器从何而来?
每一个具体的数据类型都对应一个TypeInformation的具体实现,每一个TypeInformation都会为对应的具体数据类型提供一个专属的序列化器。通过 Flink的序列化过程图可以看到TypeInformation会提供一个createSerialize()方法,通过这个方法就可以得到该类型进行数据序列化操作与反序化操作的对象TypeSerializer
在这里插入图片描述对于大多数数据类型 Flink可以自动生成对应的序列化器,能非常高效地对数据集进行序列化和反序列化,比如,BasicTypeInfoWritableTypeInfo等,但针对GenericTypeInfo类型,Flink会使用Kyro进行序列化和反序列化。其中,TuplePojoCaseClass类型是复合类型,它们可能嵌套一个或者多个数据类型。在这种情况下,它们的序列化器同样是复合的。它们会将内嵌类型的序列化委托给对应类型的序列化器。

简单的介绍下Pojo的类型规则,即在满足一些条件的情况下,才会选用Pojo的序列化进行相应的序列化与反序列化的一个操作。即类必须是Public的,且类有一个public的无参数构造函数,该类(以及所有超类)中的所有非静态no-static、非瞬态no-transient字段都是public的(和非最终的final)或者具有公共gettersetter方法,该方法遵循gettersetterJava bean命名约定。当用户定义的数据类型无法识别为POJO类型时,必须将其作为GenericType处理并使用Kryo进行序列化。

Flink自带了很多TypeSerializer子类,大多数情况下各种自定义类型都是常用类型的排列组合,因而可以直接复用,如果内建的数据类型和序列化方式不能满足你的需求,Flink的类型信息系统也支持用户拓展。若用户有一些特殊的需求,只需要实现 TypeInformationTypeSerializerTypeComparator即可定制自己类型的序列化和比较大小方式,来提升数据类型在序列化和比较时的性能。

序列化就是将数据结构或者对象转换成一个二进制串的过程,在Java里面可以简单地理解成一个byte数组。而反序列化恰恰相反,就是将序列化过程中所生成的二进制串转换成数据结构或者对象的过程。下面就以内嵌型的Tuple3这个对象为例,简述一下它的序列化过程。
[点击并拖拽以移动] ​

Tuple3包含三个层面,一是int类型,一是double类型,还有一个是PersonPerson包含两个字段,一是int型的ID,另一个是 String 类型的name,它在序列化操作时,会委托相应具体序列化的序列化器进行相应的序列化操作。从图中可以看到Tuple3 会把 int类型通过IntSerializer进行序列化操作,此时int只需要占用四个字节就可以了。根据int占用四个字节,这个能够体现出Flink可序列化过程中的一个优势,即在知道数据类型的前提下,可以更好的进行相应的序列化与反序列化操作。相反,如果采用Java的序列化,虽然能够存储更多的属性信息,但一次占据的存储空间会受到一定的损耗。Person类会被当成一个Pojo对象来进行处理,PojoSerializer序列化器会把一些属性信息使用一个字节存储起来。同样,其字段则采取相对应的序列化器进行相应序列化,在序列化完的结果中,可以看到所有的数据都是由MemorySegment去支持。

MemorySegment具有什么作用呢? MemorySegmentFlink中会将对象序列化到预分配的内存块上,它代表1个固定长度的内存,默认大小为32 kbMemorySegment代表Flink中的一个最小的内存分配单元,相当于是Java的一个byte数组。 每条记录都会以序列化的形式存储在一个或多个MemorySegment中。

Flink 序列化的最佳实践

Flink常见的应用场景有四种,即注册子类型、注册自定义序列化器、添加类型提示、手动创建TypeInformation,具体如下:
【1】注册子类型: 如果函数签名只描述了超类型,但是它们实际上在执行期间使用了超类型的子类型,那么让Flink了解这些子类型会大大提高性能。可以在StreamExecutionEnvironmentExecutionEnvironment中调用.registertype (clazz) 注册子类型信息。
【2】注册自定义序列化器: 对于不适用于自己的序列化框架的数据类型,Flink会使用Kryo来进行序列化,并不是所有的类型都与Kryo无缝连接,具体注册方法在下文介绍。
【3】添加类型提示: 有时,当Flink用尽各种手段都无法推测出泛型信息时,用户需要传入一个类型提示TypeHint,这个通常只在Java API中需要。
【4】手动创建TypeInformation 在某些API调用中,这可能是必需的,因为Java的泛型类型擦除导致Flink无法推断数据类型。
其实在大多数情况下,用户不必担心序列化框架和注册类型,因为Flink已经提供了大量的序列化操作,不需要去定义自己的一些序列化器,但是在一些特殊场景下,需要去做一些相应的处理。

实践 - 类型声明: 类型声明去创建一个类型信息的对象是通过哪种方式?通常是用TypeInformation.of()方法来创建一个类型信息的对象,具体说明如下:
【1】对于非泛型类,直接传入class对象即可

PojoTypeInfo<Person> typeInfo = (PojoTypeInfo<Person>) TypeInformation.of(Person.class);

【2】对于泛型类,需要通过TypeHint来保存泛型类型信息

final TypeInfomation<Tuple2<Integer,Integer>> resultType = TypeInformation.of(new TypeHint<Tuple2<Integer,Integer>>(){});

【3】预定义常量:BasicTypeInfo,这个类定义了一系列常用类型的快捷方式,对于StringBooleanByteShortIntegerLongFloatDoubleChar等基本类型的类型声明,可以直接使用。而且Flink还提供了完全等价的Typesorg.apache.flink.api.common.typeinfo.Types。特别需要注意的是,flink-table模块也有一个Typesorg.apache.flink.table.api.Types,用于table模块内部的类型定义信息,用法稍有不同。使用IDE 的自动import时一定要小心。
【4】自定义TypeInfoTypeInfoFactory 通过自定义TypeInfo为任意类提供Flink原生内存管理(而非Kryo),使存
储更紧凑,运行时也更高效。需要注意在自定义类上使用@TypeInfo注解,随后创建相应的TypeInfoFactory并覆盖createTypeInfo()方法。

@TypeInfo(MyTupleTypeInfoFactory.class)
public class MyTuple<T0,T1>{
    public T0 myfield0;
    public T1 myfield1;
}

public class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple>{

    @Override
    public TypeInformation<MyTuple> createTypeInfo(Type t, Map<String, TypeInfomation<?>> genericParameters){
        return new MyTupleTypeInfo(genericParameters.get("T0").genericParameters.get("T1"));
    }
}

实践 - 注册子类型

Flink认识父类,但不一定认识子类的一些独特特性,因此需要单独注册子类型。StreamExecutionEnvironmentExecutionEnvironment提供registerType()方法用来向Flink注册子类信息。

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerType(typeClass);

registerType()方法内部,会使用TypeExtractor来提取类型信息,如上所示,获取到的类型信息属于PojoTypeInfo及其子类,那么需要将其注册到一起,否则统一交给Kryo去处理,Flink并不过问 ( 这种情况下性能会变差 )。

实践 - Kryo 序列化

对于Flink无法序列化的类型(例如用户自定义类型,没有registerType,也没有自定义TypeInfoTypeInfoFactory),默认会交给 Kryo处理,如果Kryo仍然无法处理(例如GuavaThriftProtobuf等第三方库的一些类),有两种解决方案:
【1】强制使用Avro来代替Kryo

env.getConfig().enableForceAvro();

【2】为Kryo增加自定义的Serializer以增强Kryo的功能

env.getConfig().addDefaultKryoSerializer(clazz, serializer);

注:如果希望完全禁用Kryo100%使用Flink的序列化机制),可以通过Kryoenv.getConfig().disableGenericTypes()的方式完成,但注意一切无法处理的类都将导致异常,这种对于调试非常有效。

Flink 通信层的序列化

FlinkTask之间如果需要跨网络传输数据记录, 那么就需要将数据序列化之后写入NetworkBufferPool,然后下层的Task读出之后再进行反序列化操作,最后进行逻辑处理。为了使得记录以及事件能够被写入 Buffer,随后在消费时再从Buffer中读出,
Flink提供了数据记录序列化器RecordSerializer与反序列化器RecordDeserializer以及事件序列化器EventSerializer

Function发送的数据被封装成SerializationDelegate,它将任意元素公开为IOReadableWritable以进行序列化,通过setInstance()来传入要序列化的数据。在Flink通信层的序列化中,有几个问题值得关注,具体如下:
【1】何时确定Function的输入输出类型?
[点击并拖拽以移动] ​

在构建StreamTransformation的时候通过TypeExtractor工具确定Function的输入输出类型。TypeExtractor类可以根据方法签名、子类信息等蛛丝马迹自动提取或恢复类型信息。
【2】何时确定Function的序列化 / 反序列化器?
构造StreamGraph时, 通过TypeInfomationcreateSerializer()方法获取对应类型的序列化器TypeSerializer,并在addOperator()的过程中执行setSerializers() 操作,设置StreamConfigTYPESERIALIZERIN1TYPESERIALIZERIN2TYPESERIALIZEROUT_1属性。
【3】何时进行真正的序列化 / 反序列化操作? 这个过程与TypeSerializer又是怎么联系在一起的呢?
构造StreamGraph时, 通过TypeInfomationcreateSerializer()方法获取对应类型的序列化器TypeSerializer,并在addOperator()的过程中执行setSerializers()操作,设置StreamConfigTYPESERIALIZERIN1TYPESERIALIZERIN2TYPESERIALIZEROUT_1属性。
【4】何时进行真正的序列化 / 反序列化操作? 这个过程与TypeSerializer又是怎么联系在一起的呢?
[点击并拖拽以移动] ​

大家都应该清楚TaskStreamTask两个概念,Task是直接受TaskManager管理和调度的,而Task又会调用StreamTask,而StreamTask中真正封装了算子的处理逻辑。在run()方法中,首先将反序列化后的数据封装成StreamRecord交给算子处理;然后将处理结果通过Collector发送给下游 ( 在构建Collector时已经确定了SerializtionDelegate),并通过RecordWriter写入器将序列化后的结果写入DataOutput;最后序列化的操作交给SerializerDelegate处理,实际还是通过TypeSerializerserialize()方法完成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/266459.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

第4章 | 安徽某高校《统计建模与R软件》期末复习

第4章 参数估计 参数估计是统计建模的关键步骤之一&#xff0c;它涉及根据样本数据推断总体参数的过程。在统计学中&#xff0c;参数通常用于描述总体的特征&#xff0c;如均值、方差等。通过参数估计&#xff0c;我们可以利用样本信息对这些未知参数进行推断&#xff0c;从而…

kubernetes集群 应用实践 kafka部署

kubernetes集群 应用实践 kafka部署 零.1、环境说明 零.2、kafka架构说明 zookeeper在kafka集群中的作用 一、Broker注册 二、Topic注册 三、Topic Partition选主 四、生产者负载均衡 五、消费者负载均衡 一、持久化存储资源准备 1.1 创建共享目录 [rootnfsserver ~]# mkdir -…

深度学习 | 基础卷积神经网络

卷积神经网络是人脸识别、自动驾驶汽车等大多数计算机视觉应用的支柱。可以认为是一种特殊的神经网络架构&#xff0c;其中基本的矩阵乘法运算被卷积运算取代&#xff0c;专门处理具有网格状拓扑结构的数据。 1、全连接层的问题 1.1、全连接层的问题 “全连接层”的特点是每个…

用C的递归函数求n!-----(C每日一编程)

用递归函数求n&#xff01; 有了上面这个递归公式就能写C代码了。 参考代码&#xff1a; int fac(int n) {if (n 1 || n 0)return 1;else return n * fac(n - 1); } void main() {int n;scanf("%d",&n);int f fac(n);printf("\n%d!%d\n", n, f); …

linux设置线程优先级以及调度策略浅析

linux线程调度策略 Linux内核会根据线程的优先级和调度策略来分配处理器时间。线程的优先级越高&#xff0c;它在竞争处理器时间时就越有可能被选中执行。调度策略定义了内核在选择下一个要执行的线程时所遵循的规则。 在Linux中&#xff0c;有以下几种常见的调度策略&#x…

【iOS】UICollectionView

文章目录 前言一、实现简单九宫格布局二、UICollectionView中的常用方法和属性1.UICollectionViewFlowLayout相关属性2.UICollectionView相关属性 三、协议和代理方法&#xff1a;四、九宫格式的布局进行升级五、实现瀑布流布局实现思路实现原理代码调用顺序实现步骤实现效果 总…

ospf学习纪要

1、为避免区域&#xff08;area0,area1等&#xff09;间的路由形成环路&#xff0c;非骨干区域之间不允许直接相互发布区域间的路由。因此&#xff0c;所有的ABR&#xff08;Area Border Router,区域边界路由器&#xff09;都至少有一个借口属于Area0,所以Area0始终包含所有的A…

深度学习第6天:ResNet深度残差网络

☁️主页 Nowl &#x1f525;专栏 《深度学习》 &#x1f4d1;君子坐而论道&#xff0c;少年起而行之 ​​ 文章目录 什么是ResNet模型结构整体架构残差块 模型特性示例代码 什么是ResNet ResNet是一种用于图像识别的深度残差网络&#xff0c;是卷积神经网络的一种重要模型&…

哈希拓展攻击CTF题做法

目录 基础&#xff1a; 盐&#xff08;Salt&#xff09;&#xff1a; 哈希长度拓展攻击&#xff1a; kali下载相关工具hash-ext-attack&#xff1a; hash拓展题目特征&#xff1a; 哈希拓展ctf题&#xff1a; 2023楚慧杯upload_shell 实验吧之让我进去&#xff1a; 前言…

【量化金融】证券投资学

韭菜的自我修养 第一章&#xff1a; 基本框架和概念1.1 大盘底部形成的技术条件1.2 牛市与熊市1.3 交易系统1.3.1 树懒型交易系统1.3.2 止损止损的4个技术 第二章&#xff1a;证券家族4兄弟2.1 债券&#xff08;1&#xff09;债券&#xff0c;是伟大的创新&#xff08;2&#x…

Kafka日志

位置 server.properties配置文件中通过log.dir指定日志存储目录 log.dir/{topic}-{partition} 核心文件 .log 存储消息的日志文件&#xff0c;固定大小为1G&#xff0c;写满后会新增一个文件&#xff0c;文件名表示当前日志文件记录的第一条消息的偏移量。 .index 以偏移…

【Spring实战】04 Lombok集成及常用注解

文章目录 0. 集成1. Data2. Getter 和 Setter3. NoArgsConstructor&#xff0c;AllArgsConstructor和RequiredArgsConstructor4. ToString5. EqualsAndHashCode6. NonNull7. Builder总结 Lombok 是一款 Java 开发的工具&#xff0c;它通过注解的方式简化了 Java 代码的编写&…

nodejs+vue+微信小程序+python+PHP计算机网络在线考试系统-计算机毕业设计推荐

信息数据的处理完全依赖人工进行操作&#xff0c; 所以电子化信息管理的出现就能缓解以及改变传统人工方式面临的处境&#xff0c;一方面可以确保信息数据在短时间被高效处理&#xff0c;还能节省人力成本&#xff0c;另一方面可以确保信息数据的安全性&#xff0c;可靠性&…

SQL进阶理论篇(二十):什么是SQL注入

文章目录 简介SQL注入的原理SQL注入的实例搭建sqli-labs注入环境实例一&#xff1a;猜测where条件判断查询语句的字段数获取当前数据库和用户信息获取MySQL中的所有数据库名称查询wucai数据库中的所有数据表查询heros数据表中的所有字段参考文献 简介 这节是纯兴趣篇了。 web…

【快速开发】使用SvelteKit

自我介绍 做一个简单介绍&#xff0c;酒架年近48 &#xff0c;有20多年IT工作经历&#xff0c;目前在一家500强做企业架构&#xff0e;因为工作需要&#xff0c;另外也因为兴趣涉猎比较广&#xff0c;为了自己学习建立了三个博客&#xff0c;分别是【全球IT瞭望】&#xff0c;【…

MATLAB遗传算法工具箱的三种使用方法

MATLAB中有三种调用遗传算法的方式&#xff1a; 一、遗传算法的开源文件 下载“gatbx”压缩包文件&#xff0c;解压后&#xff0c;里面有多个.m文件&#xff0c;可以看到这些文件的编辑日期都是1998年&#xff0c;很古老了。 这些文件包含了遗传算法的基础操作&#xff0c;包含…

YOLOv5算法改进(23)— 更换主干网络GhostNet + 添加CA注意力机制 + 引入GhostConv

前言:Hello大家好,我是小哥谈。本节课就让我们结合论文来对YOLOv5进行组合改进(更换主干网络GhostNet + 添加CA注意力机制 + 引入GhostConv),希望同学们学完本节课可以有所启迪,并且后期可以自行进行YOLOv5算法的改进!🌈 前期回顾: YOLOv5算法改进(1)— 如何去…

MyBatis笔记

Mybatis Mybatis介绍 什么是Mybatis? mybatis是支持普通SQL查询、存储过程和高级映射的优秀持久层框架。 Mybatis优点 几乎消除了JDBC代码和参数的手动设置消除结果集的检索使用XML或注解用于配置和原始映射&#xff0c;将接口和POJOs(实体类)映射成数据库中的记录。 My…

Qt/QML编程学习之心得:在QML中调用fileDialog(十六)

Qt中有一些内置的对话框dialog,比如 在QWidget工程中使用比较容易,比如 #include <QFileDialog>fileName = QFileDialog::getOpenFileName(this, tr("Open Image"), "/home/jana", tr("Image Files (*.png *.jpg *.bmp)")); 那么在QM…

oracle即时客户端(Instant Client)安装与配置

之前的文章记录了oracle客户端和服务端的下载与安装&#xff0c;内容参见&#xff1a; 在Windows中安装Oracle_windows安装oracle 如果不想安装oracle客户端&#xff08;或者是电脑因为某些原因无法安装oracle客户端&#xff09;&#xff0c;还想能够连接oracle远程服务&#…