【Flink状态管理(二)各状态初始化入口】状态初始化流程详解与源码剖析

文章目录

    • 1. 状态初始化总流程梳理
    • 2.创建StreamOperatorStateContext
    • 3. StateInitializationContext的接口设计。
    • 4. 状态初始化举例:UDF状态初始化

在TaskManager中启动Task线程后,会调用StreamTask.invoke()方法触发当前Task中算子的执行,在invoke()方法中会调用restoreInternal()方法,这中间包括创建和初始化算子中的状态数据。
另外在invoke中,可以通过判断任务状态来判断是否需要初始化状态。

        // Allow invoking method 'invoke' without having to call 'restore' before it.
        if (!isRunning) {
            LOG.debug("Restoring during invoke will be called.");
            restoreInternal();
        }

StreamTask调用initializeStateAndOpenOperators()方法对当前Task中所有算子的状态数据进行初始化。

RegularOperatorChain.
public void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {  
    Iterator var2 = this.getAllOperators(true).iterator();  
  
    while(var2.hasNext()) {  
        StreamOperatorWrapper<?, ?> operatorWrapper = (StreamOperatorWrapper)var2.next();  
        StreamOperator<?> operator = operatorWrapper.getStreamOperator();  
        operator.initializeState(streamTaskStateInitializer);  
        operator.open();  
    }  
  
}

 
找到了算子状态初始化的位置,我们继续了解状态是如何初始化的。

1. 状态初始化总流程梳理

AbstractStreamOperator.initializeState中描述了状态初始化的总体流程,如下代码以及注释:

# AbstractStreamOperator.initializeState

public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)  
        throws Exception {  
    //1. 获取类型序列化器
    final TypeSerializer<?> keySerializer =  
            config.getStateKeySerializer(getUserCodeClassloader());  
    //2. get containingTask
    final StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask());  
    final CloseableRegistry streamTaskCloseableRegistry =  
            Preconditions.checkNotNull(containingTask.getCancelables());  
   //3. create StreamOperatorStateContext
    final StreamOperatorStateContext context =  
            streamTaskStateManager.streamOperatorStateContext(  
                    getOperatorID(),  
                    getClass().getSimpleName(),  
                    getProcessingTimeService(),  
                    this,  
                    keySerializer,  
                    streamTaskCloseableRegistry,  
                    metrics,  
                    config.getManagedMemoryFractionOperatorUseCaseOfSlot(  
                            ManagedMemoryUseCase.STATE_BACKEND,  
                            runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),  
                            runtimeContext.getUserCodeClassLoader()),  
                    isUsingCustomRawKeyedState());  
   //4. create stateHandler
    stateHandler =  
            new StreamOperatorStateHandler(  
                    context, getExecutionConfig(), streamTaskCloseableRegistry);  
    timeServiceManager = context.internalTimerServiceManager();  
    //5. initialize OperatorState
    stateHandler.initializeOperatorState(this);  
    //6. set KeyedStateStore in runtimeContext
    runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));  
}

在StreamOperator初始化状态数据的过程中,首先从StreamTask中获取创建状态需要的组件,例如托管状态的管理后端KeyedStateBackend、OperatorStateBackend以及原生状态管理的KeyedStateInputs和OperatorStateInputs组件。

状态数据操作过程中使用的管理组件最终都会封装成StateInitializationContext并传递给子类使用,例如在AbstractUdfStreamOperator中,就会使用StateInitializationContext中的信息初始化用户定义的UDF中的状态数据。

2.创建StreamOperatorStateContext

接下来看如何在Task实例初始化时创建这些组件,并将其存储在StreamOperatorStateContext中供算子使用,如下代码:

StreamTaskStateInitializerImpl
@Override  
public StreamOperatorStateContext streamOperatorStateContext(  
        @Nonnull OperatorID operatorID,  
        @Nonnull String operatorClassName,  
        @Nonnull ProcessingTimeService processingTimeService,  
        @Nonnull KeyContext keyContext,  
        @Nullable TypeSerializer<?> keySerializer,  
        @Nonnull CloseableRegistry streamTaskCloseableRegistry,  
        @Nonnull MetricGroup metricGroup,  
        double managedMemoryFraction,  
        boolean isUsingCustomRawKeyedState)  
        throws Exception {  
    //1. 获取task实例信息
    TaskInfo taskInfo = environment.getTaskInfo();  
    OperatorSubtaskDescriptionText operatorSubtaskDescription =  
            new OperatorSubtaskDescriptionText(  
                    operatorID,  
                    operatorClassName,  
                    taskInfo.getIndexOfThisSubtask(),  
                    taskInfo.getNumberOfParallelSubtasks());  
  
    final String operatorIdentifierText = operatorSubtaskDescription.toString();  
  
    final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =  
            taskStateManager.prioritizedOperatorState(operatorID);  
  
    CheckpointableKeyedStateBackend<?> keyedStatedBackend = null;  
    OperatorStateBackend operatorStateBackend = null;  
    CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;  
    CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;  
    InternalTimeServiceManager<?> timeServiceManager;  
  
    try {  
        // 创建keyed类型的状态后端
        // -------------- Keyed State Backend --------------  
        keyedStatedBackend =  
                keyedStatedBackend(  
                        keySerializer,  
                        operatorIdentifierText,  
                        prioritizedOperatorSubtaskStates,  
                        streamTaskCloseableRegistry,  
                        metricGroup,  
                        managedMemoryFraction);  
        //创建operator类型的状态后端
        // -------------- Operator State Backend --------------  
        operatorStateBackend =  
                operatorStateBackend(  
                        operatorIdentifierText,  
                        prioritizedOperatorSubtaskStates,  
                        streamTaskCloseableRegistry);  
        //创建原生类型状态后端
        // -------------- Raw State Streams --------------  
        rawKeyedStateInputs =  
                rawKeyedStateInputs(  
                        prioritizedOperatorSubtaskStates  
                                .getPrioritizedRawKeyedState()  
                                .iterator());  
        streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);  
  
        rawOperatorStateInputs =  
                rawOperatorStateInputs(  
                        prioritizedOperatorSubtaskStates  
                                .getPrioritizedRawOperatorState()  
                                .iterator());  
        streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);  
        //创建Internal Timer Service Manager
        // -------------- Internal Timer Service Manager --------------  
        if (keyedStatedBackend != null) {  
  
            // if the operator indicates that it is using custom raw keyed state,  
            // then whatever was written in the raw keyed state snapshot was NOT written            // by the internal timer services (because there is only ever one user of raw keyed            // state);            // in this case, timers should not attempt to restore timers from the raw keyed            // state.            final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers =  
                    (prioritizedOperatorSubtaskStates.isRestored()  
                                    && !isUsingCustomRawKeyedState)  
                            ? rawKeyedStateInputs  
                            : Collections.emptyList();  
  
            timeServiceManager =  
                    timeServiceManagerProvider.create(  
                            keyedStatedBackend,  
                            environment.getUserCodeClassLoader().asClassLoader(),  
                            keyContext,  
                            processingTimeService,  
                            restoredRawKeyedStateTimers);  
        } else {  
            timeServiceManager = null;  
        }  
  
        // -------------- Preparing return value --------------  
  
        return new StreamOperatorStateContextImpl(  
                prioritizedOperatorSubtaskStates.getRestoredCheckpointId(),  
                operatorStateBackend,  
                keyedStatedBackend,  
                timeServiceManager,  
                rawOperatorStateInputs,  
                rawKeyedStateInputs);  
    } catch (Exception ex) {  
  
。。。。
}

流程梳理:

  1. 从environment中获取TaskInfo,并基于Task实例创建OperatorSubtaskDescriptionText。Operator中Task实例的描述信息包含OperatorID、OperatorClassName等,最终用于创建OperatorStateBackend的状态存储后端。
  2. 创建KeyedStateBackend,KeyedStateBackend是KeyedState的状态管理后端,提供创建和管理KeyedState的方法。
  3. 创建OperatorStateBackend,OperatorStateBackend是OperatorState的状态管理后端,提供获取和管理OperatorState的接口。
  4. 创建KeyGroupStatePartitionStreamProvider实例,提供创建和获取原生KeyedState的方法。
  5. 创建StatePartitionStreamProvider实例,提供创建和获取原生OperatorState的方法。
  6. 将所有创建出来的托管状态管理后端keyedStatedBackend和operatorStateBackend、原生状态存储后端rawKeyedStateInputs和rawOperatorStateInputs及timeServiceManager实例,全部封装在StreamOperatorStateContextImpl上下文对象中,并返回给AbstractStreamOperator使用。

 
小结
StreamTaskStateInitializer.streamOperatorStateContext()方法包含创建托管状态和原生状态管理后端的全过程。StreamOperator的实现类能够从StreamOperatorStateContext中获取这些状态管理组件并使用它们创建指定类型的状态,最终状态数据会存储在状态管理后端指定的物理介质上,例如堆内存或RocksDB。

StateInitializationContext会被用于算子和UserDefinedFunction中,实现算子或函数中的状态数据操作。

 

3. StateInitializationContext的接口设计。

StateInitializationContext接口同时继承了ManagedInitializationContext接口和FunctionInitializationContext接口。StateInitializationContext接口的默认实现类为StateInitializationContextImpl。
在这里插入图片描述

  1. ManagedInitializationContext接口提供了托管状态使用的KeyedStateStore和OperatorStateStore获取方法,即KeyedStateBackend和OperatorStateBackend的封装类。算子进行初始化时,会通过KeyedStateStore和OperatorStateStore提供的方法创建和管理指定类型的托管状态。

  2. FunctionInitializationContext提供了用户自定义函数状态数据初始化需要的方法。它和ManagedInitializationContext保持一致,这主要是为了和算子使用的上下文进行区分,但两者的操作基本一致。

  3. StateInitializationContext提供了对托管状态数据的管理,并在内部继承和拓展了获取及管理原生状态数据的方法,如getRawOperatorStateInputs()、getRawKeyedStateInputs()等

  4. StateInitializationContextImpl具备操作管理状态和原生状态的能力。基于它可以获取不同类型的状态管理后端,并基于状态管理操作状态数据。

在这里插入图片描述

 

4. 状态初始化举例:UDF状态初始化

在AbstractStreamOperator中调用initializeState(StateInitializationContext context)抽象方法初始化Operator中的状态。这里以AbstractUdfStreamOperator为例说明具体算子、UDF是如何进行状态初始化的。

AbstractUdfStreamOperator.initializeState()方法实际上调用了StreamingFunctionUtils.restoreFunctionState()方法对User-Defined Function中的状态数据进行初始化和恢复,实际上就是将上文创建的StateInitializationContext上下文信息提供给Function接口使用。

public void initializeState(StateInitializationContext context) throws Exception {
   super.initializeState(context);
   StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}

恢复函数内部的状态数据涉及Checkpoint的实现,我们会在之后介绍如何在StreamingFunctionUtils.restoreFunctionState()方法中恢复函数中的状态数据。

 
《Flink设计与实现:核心原理与源码解析》张利兵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/378665.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SolidWorks学习笔记——草图绘制的基本命令

目录 一、进入草图绘制 二、直线命令与删除命令 三、圆弧命令与矩形命令 四、槽口命令以及多边形命令 五、椭圆以及倒角命令 六。草图绘制中的剪裁命令 七、草图中的几何关系 八、草图绘制中的智能尺寸 九、从外部粘贴草图&#xff08;CAD&#xff09; 一、进入草图绘…

机器人运动学林沛群——变换矩阵

对于仅有移动&#xff0c;由上图可知&#xff1a; A P B P A P B o r g ^AP^BP^AP_{B org} APBPAPBorg​ 对于仅有转动&#xff0c;可得&#xff1a; A P B A R B P ^AP^A_BR^BP APBA​RBP 将转动与移动混合后&#xff0c;可得&#xff1a; 一个例子 在向量中&#xff…

JCIM | MD揭示PTP1B磷酸酶激活RtcB连接酶的机制

Background 内质网应激反应&#xff08;UPR&#xff09; 中的一个重要过程。UPR是由内质网中的三种跨膜传感器&#xff08;IRE1、PERK和ATF6&#xff09;控制的细胞应激反应&#xff0c;当内质网中的蛋白质折叠能力受到压力时&#xff0c;UPR通过减少蛋白质合成和增加未折叠或错…

MySQL 时间索引的选择

背景 MySQL 在使用过程中经常会对时间加索引&#xff0c;方便进行时间范围的查询&#xff0c;常见的时间类型有 data、datetime、long、timestamp 等&#xff0c;在此分析下这几种时间类型的索引大小&#xff0c;以找到比较合适的时间类型。 时间类型对比 常用的索引类型是 …

一篇文章入门Shell编程

一、初始Shell shell 是一个命令行解释器&#xff0c;它接收应用程序/用户命令&#xff0c;然后调用操作系统内核。 shell 还是一个功能相当强大的编程语言&#xff0c;易编写&#xff0c;易调试&#xff0c;灵活性强。 Linux提供的Shell解析器有&#xff1a; /bin/sh /bin/b…

字节3面真题,LeetCode上hard难度,极具启发性题解

文章目录 &#x1f680;前言&#x1f680;LeetCode&#xff1a;41. 缺失的第一个正整数&#x1f680;思路&#x1f680;整个代码思路串一下&#x1f680;Code &#x1f680;前言 铁子们好啊&#xff01;阿辉来讲道题&#xff0c;这道题据说是23年字节3面真题&#xff0c;LeetC…

中科大计网学习记录笔记(四):Internet 和 ISP | 分组延时、丢失和吞吐量

前言&#xff1a; 学习视频&#xff1a;中科大郑烇、杨坚全套《计算机网络&#xff08;自顶向下方法 第7版&#xff0c;James F.Kurose&#xff0c;Keith W.Ross&#xff09;》课程 该视频是B站非常著名的计网学习视频&#xff0c;但相信很多朋友和我一样在听完前面的部分发现信…

iPhone解锁 AnyMP4 iPhone Unlocker

AnyMP4 iPhone Unlocker是一款功能强大的iPhone解锁软件&#xff0c;旨在帮助用户轻松解决iPhone密码忘记、设备锁定等问题。无论是屏幕密码、指纹解锁还是Face ID&#xff0c;该软件都能提供有效的解决方案。 这款软件支持多种iPhone型号&#xff0c;包括最新的iPhone 14系列…

微服务OAuth 2.1认证授权可行性方案(Spring Security 6)

文章目录 一、背景二、微服务架构介绍三、认证服务器1. 数据库创建2. 新建模块3. 导入依赖和配置4. 安全认证配置类 四、认证服务器测试1. AUTHORIZATION_CODE&#xff08;授权码模式&#xff09;1. 获取授权码2. 获取JWT 2. CLIENT_CREDENTIALS(客户端凭证模式) 五、Gateway1.…

LeetCode Python - 1.两数之和

文章目录 题目答案运行结果 题目 给定一个整数数组 nums 和一个整数目标值 target&#xff0c;请你在该数组中找出 和为目标值 target 的那 两个 整数&#xff0c;并返回它们的数组下标。 你可以假设每种输入只会对应一个答案。但是&#xff0c;数组中同一个元素在答案里不能…

svg基础(五)滤镜-高斯模糊,混合模式,偏移,颜色变换

1 作用 滤镜用于对SVG图形增加特殊效果 2 效果 feBlend - 与图像相结合的滤镜feColorMatrix - 用于彩色滤光片转换feComponentTransferfeCompositefeConvolveMatrixfeDiffuseLightingfeDisplacementMapfeFloodfeGaussianBlur 高斯模糊feImagefeMergefeMorphologyfeOffset - …

VBA中类的解读及应用第九讲:用WithEvents关键字声明实例化对象类变量

《VBA中类的解读及应用》教程【10165646】是我推出的第五套教程&#xff0c;目前已经是第一版修订了。这套教程定位于最高级&#xff0c;是学完初级&#xff0c;中级后的教程。 类&#xff0c;是非常抽象的&#xff0c;更具研究的价值。随着我们学习、应用VBA的深入&#xff0…

nacos配置自动刷新源码解析

文章目录 一、前言二、源码解析1、nacos客户端如何监听服务端配置变化的2、ConfigurationProperties注解的bean是如何自动刷新的3、RefreshScope 注解的bean是如何自动刷新的 三、总结 一、前言 最近好奇 nacos 是怎么做到配置自动刷新的&#xff0c;于是就去debug跟了下源码&…

LeetCode 0993. 二叉树的堂兄弟节点:深度优先搜索(BFS)

【LetMeFly】993.二叉树的堂兄弟节点&#xff1a;深度优先搜索(BFS) 力扣题目链接&#xff1a;https://leetcode.cn/problems/cousins-in-binary-tree/ 在二叉树中&#xff0c;根节点位于深度 0 处&#xff0c;每个深度为 k 的节点的子节点位于深度 k1 处。 如果二叉树的两个…

C语言:操作符详解

创作不易&#xff0c;给个三连吧&#xff01;&#xff01; 一、算术操作符 C语言中为了方便计算&#xff0c;提供了算数操作符&#xff0c;分别是:,-,*,/,% 由于这些操作符都是有两个操作数&#xff08;位于操作符两边&#xff09;&#xff0c;所以这种操作符也叫做双目操作…

114.乐理基础-五线谱-快速识别五线谱的谱号

内容参考于&#xff1a;三分钟音乐社 上一个内容&#xff1a;113.乐理基础-五线谱-五线谱的调号&#xff08;二&#xff09;-CSDN博客 15个调号&#xff0c;如下图&#xff0c;该怎样才能随便拿出一个来就能快速的知道这是什么调号呢&#xff1f; 一共分为三个要点&#xff1…

【芯片设计- RTL 数字逻辑设计入门 11 -- 移位运算与乘法】

请阅读【嵌入式开发学习必备专栏 】 文章目录 移位运算与乘法Verilog Codeverilog 拼接运算符&#xff08;{}&#xff09;Testbench CodeVCS 波形仿真 问题小结 移位运算与乘法 已知d为一个8位数&#xff0c;请在每个时钟周期分别输出该数乘1/3/7/8,并输出一个信号通知此时刻输…

Redis篇之分布式锁

一、为什么要使用分布式锁 1.抢劵场景 &#xff08;1&#xff09;代码及流程图 &#xff08;2&#xff09;抢劵执行的正常流程 就是正好线程1执行完整个操作&#xff0c;线程2再执行。 &#xff08;3&#xff09;抢劵执行的非正常流程 因为线程是交替进行的&#xff0c;所以有…

python适配器模式开发实践

1. 什么是适配器设计模式&#xff1f; 适配器&#xff08;Adapter&#xff09;设计模式是一种结构型设计模式&#xff0c;它允许接口不兼容的类之间进行合作。适配器模式充当两个不兼容接口之间的桥梁&#xff0c;使得它们可以一起工作&#xff0c;而无需修改它们的源代码。 …

[linux]:匿名管道和命名管道(什么是管道,怎么创建管道(函数),匿名管道和命名管道的区别,代码例子)

目录 一、匿名管道 1.什么是管道&#xff1f;什么是匿名管道&#xff1f; 2.怎么创建匿名管道&#xff08;函数&#xff09; 3.匿名管道的4种情况 4.匿名管道有5种特性 5.怎么使用匿名管道&#xff1f;匿名管道有什么用&#xff1f;&#xff08;例子&#xff09; 二、命名…