Flink Task退出流程与Failover机制

这里写目录标题

  • 1 TaskExecutor端Task退出逻辑
  • 2 JobMaster端failover流程
    • 2.1 Task Execute State Handle
    • 2.2 Job Failover
      • 2.2.1 Task Failure Handle
      • 2.2.2 Restart Task
      • 2.2.3 Cancel Task:
      • 2.2.4 Start Task
  • 3 Task失败的自动重启策略

1 TaskExecutor端Task退出逻辑

Task.doRun() 引导Task初始化并执行其相关代码的核心方法,
构造并实例化Task的可执行对象: AbstractInvokable invokable。
调用 AbstractInvokable.invoke() 开始启动Task包含的计算逻辑。
在这里插入图片描述
在这里插入图片描述

当AbstractInvokable.invoke()执行退出后,根据退出类型执行相应操作:

  1. 正常执行完毕退出:输出ResultPartition缓冲区数据,并关闭缓冲区,标记Task为Finished;
  2. 取消操作导致退出:标记Task为CANCELED,关闭用户代码;
  3. AbstractInvokable.invoke()执行过程中抛出异常退出:标记Task为FAILED,关闭用户代码,记录异常;
  4. AbstractInvokable.invoke()执行过程中JVM抛出错误:强制终止虚拟机,退出当前进程。

紧接着释放Task相关的网络、内存、文件系统资源。最后通过Task->TaskManager->JobMaster的传递链路将Task的终止状态通知给Leader JobMaster线程。

Task.notifyFinalState() -> TaskManagerActions.updateTaskExecutionState(TaskExecutionState) -> JobMasterGateway.updateTaskExecutionState(TaskExecutionState)

  • TaskExecutionState携带的关键信息:
TaskExecutionState {
    JobID // 任务ID
    ExecutionAttemptID  // Task执行的唯一ID,标示每次执行
    ExecutionState  // 枚举值,Task执行状态
    SerializedThrowable // 若Task抛出异常,该字段记录异常堆栈信息
    ...
}
  • Task 执行状态转换:
   CREATED  -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED
      |            |            |          |
      |            |            |   +------+
      |            |            V   V
      |            |         CANCELLING -----+----> CANCELED
      |            |                         |
      |            +-------------------------+
      |
      |                                   ... -> FAILED
      V
  RECONCILING  -> RUNNING | FINISHED | CANCELED | FAILED

2 JobMaster端failover流程

2.1 Task Execute State Handle

JobMaster收到TaskManager通过rpc发送的task执行状态变更信息,将通知当前Flink作业的调度器(SchedulerNG)处理,因为都是通过同个线程调用,后续对ExecutionGraph(运行时执行计划)、failover计数等有状态实例的read/write操作都不会出现线程安全问题。

JobMaster的核心处理逻辑在SchedulerBase.updateTaskExecutionState(TaskExecutionStateTransition) 中(TaskExecutionStateTransition主要是TaskExecutionState的可读性封装)。
处理逻辑:尝试将收到的Task执行状态信息更新到ExecutionGraph中。若更新成功且target状态为FINISHED,根据具体的SchedulingStrategy实现策略,选择可消费的结果分区并调度相应的消费者Task;若更新成功且target状态为FAILED,进入具体的failover流程。

  • SchedulerBase.updateTaskExecutionState(TaskExecutionStateTransition) :
    public final boolean updateTaskExecutionState(
            final TaskExecutionStateTransition taskExecutionState) {
        final Optional<ExecutionVertexID> executionVertexId =
                getExecutionVertexId(taskExecutionState.getID());

        boolean updateSuccess = executionGraph.updateState(taskExecutionState);

        if (updateSuccess) {
            checkState(executionVertexId.isPresent());
            if (isNotifiable(executionVertexId.get(), taskExecutionState)) {
                updateTaskExecutionStateInternal(executionVertexId.get(), taskExecutionState);
            }
            return true;
        } else {
            return false;
        }
    }

  • ExecutionGraph.updateState(TaskExecutionStateTransition): 在当前的物理执行拓扑中找不到目标ExecutionAttemptID 时,将更新失败。需要注意的是这个ID用于唯一标示一个Execution,而Execution则代表ExecutionVertex(代表拓扑顶点的一个subTask计划)的一次执行实例,ExecutionVertex可以重复多次执行。这意味着当有subTask重新运行,currentExecutions将不再持有上一次执行的ID信息。
   /**
     * Updates the state of one of the ExecutionVertex's Execution attempts. If the new status if
     * "FINISHED", this also updates the accumulators.
     *
     * @param state The state update.
     * @return True, if the task update was properly applied, false, if the execution attempt was
     *     not found.
     */
    public boolean updateState(TaskExecutionStateTransition state) {
       assertRunningInJobMasterMainThread();
        final Execution attempt = currentExecutions.get(state.getID());
        if (attempt != null) {
            try {
                final boolean stateUpdated = updateStateInternal(state, attempt);
                maybeReleasePartitions(attempt);
                return stateUpdated;
            } catch (Throwable t) {
                ......
                return false;
            }
        } else {
            return false;
        }
    }

  • JobMaster: 负责一个任务拓扑的中心操作类,涉及作业调度,资源管理,对外通讯等…

  • SchedulerNG:负责调度作业拓扑。所有对该类对象方法的调用都会通过ComponentMainThreadExecutor触发,将不会出现并发调用的情况。

  • ExecutionGraph: 当前执行拓扑的中心数据结构,协调分布在各个节点上的Execution。描述了整个任务的各个SubTask及其分区数据,并与其保持通讯。

2.2 Job Failover

2.2.1 Task Failure Handle

  • Task异常的主要流程在 DefaultScheduler.handleTaskFailure(ExecutionVertexID, Throwable), 根据RestartBackoffTimeStrategy判断是重启还是failed-job;根据FailoverStrategy选择需要重启的SubTask;最后根据任务当前的SchedulingStrategy执行相应的调度策略重启相应的Subtask。
    private void handleTaskFailure(
            final ExecutionVertexID executionVertexId, @Nullable final Throwable error) {
        // 更新当前任务异常信息
        setGlobalFailureCause(error);
        // 如果相关的算子(source、sink)存在coordinator,同知其进一步操作
        notifyCoordinatorsAboutTaskFailure(executionVertexId, error);
        // 应用当前的restart-stratege并获取FailureHandlingResult
        final FailureHandlingResult failureHandlingResult =
                executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
        // 根据结果重启Task或将任务失败
        maybeRestartTasks(failureHandlingResult);
    }


    public class FailureHandlingResult {
        //恢复所需要重启的所有SubTask
          Set<ExecutionVertexID> verticesToRestart;
        //重启延迟
          long restartDelayMS;
        //万恶之源
          Throwable error;
        //是否全局失败
          boolean globalFailure;
    }

  • ExecutionFailureHandler:处理异常信息,根据当前应用策略返回异常处理结果。
    public FailureHandlingResult getFailureHandlingResult(
            ExecutionVertexID failedTask, Throwable cause) {
        return handleFailure(
                cause, 
                failoverStrategy.getTasksNeedingRestart(failedTask, cause),  // 选择出需要重启的SubTask
                false); 
    }

    private FailureHandlingResult handleFailure(
            final Throwable cause,
            final Set<ExecutionVertexID> verticesToRestart,
            final boolean globalFailure) {

        if (isUnrecoverableError(cause)) {
            return FailureHandlingResult.unrecoverable(
                    new JobException("The failure is not recoverable", cause), globalFailure);
        }

        restartBackoffTimeStrategy.notifyFailure(cause);
        if (restartBackoffTimeStrategy.canRestart()) {
            numberOfRestarts++;

            return FailureHandlingResult.restartable(
                    verticesToRestart, restartBackoffTimeStrategy.getBackoffTime(), globalFailure);
        } else {
            return FailureHandlingResult.unrecoverable(
                    new JobException(
                            "Recovery is suppressed by " + restartBackoffTimeStrategy, cause),
                    globalFailure);
        }
    }

  • FailoverStrategy: 故障转移策略。

    • RestartAllFailoverStrategy: 使用该策略,当出现故障,将重启整个作业,即重启所有Subtask。
    • RestartPipelinedRegionFailoverStrategy:当出现故障,重启故障出现Subtask所在的的Region。
    • RestartBackoffTimeStrategy: 当Task发生故障时,决定是否重启以及重启的延迟时间。
  • FixedDelayRestartBackoffTimeStrategy:允许任务以指定延迟重启固定次数。

    • FailureRateRestartBackoffTimeStrategy:允许在固定失败频率内,以固定延迟重启。
    • NoRestartBackoffTimeStrategy:不重启。
  • SchedulingStrategy: Task执行实例的调度策略

    • EagerSchedulingStrategy: 饥饿调度,同时调度所有Task。
    • LazyFromSourcesSchedulingStrategy:当消费的分区数据都准备好后才开始调度其后续Task,用于批处理任务。
    • PipelinedRegionSchedulingStrategy:以pipline链接的Task为一个Region,作为其调度粒度。

2.2.2 Restart Task


    private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) {
        if (failureHandlingResult.canRestart()) {
            restartTasksWithDelay(failureHandlingResult);
        } else {
            failJob(failureHandlingResult.getError());
        }
    }

    private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) {
        final Set<ExecutionVertexID> verticesToRestart =
                failureHandlingResult.getVerticesToRestart();

        final Set<ExecutionVertexVersion> executionVertexVersions =
                new HashSet<>(
                        executionVertexVersioner
                                .recordVertexModifications(verticesToRestart)
                                .values());
        final boolean globalRecovery = failureHandlingResult.isGlobalFailure();

        addVerticesToRestartPending(verticesToRestart);

        // 取消所有需要重启的SubTask
        final CompletableFuture<?> cancelFuture = cancelTasksAsync(verticesToRestart);

        delayExecutor.schedule(
                () ->
                        FutureUtils.assertNoException(
                                cancelFuture.thenRunAsync(   // 停止后才能重新启动
                                        restartTasks(executionVertexVersions, globalRecovery), 
                                        getMainThreadExecutor())),
                failureHandlingResult.getRestartDelayMS(),
                TimeUnit.MILLISECONDS);
    }


2.2.3 Cancel Task:

  • 取消正在等待Slot分配的SubTask,若已经处于部署/运行状态,则需要通知TaskExecutor执行停止操作并等待操作完成。
    private CompletableFuture<?> cancelTasksAsync(final Set<ExecutionVertexID> verticesToRestart) {
        // clean up all the related pending requests to avoid that immediately returned slot
        // is used to fulfill the pending requests of these tasks
        verticesToRestart.stream().forEach(executionSlotAllocator::cancel); // 取消可能正处于等待分配Slot的SubTask

        final List<CompletableFuture<?>> cancelFutures =
                verticesToRestart.stream()
                        .map(this::cancelExecutionVertex) // 开始停止SubTask
                        .collect(Collectors.toList());

        return FutureUtils.combineAll(cancelFutures);
    }

    public void cancel() {
        while (true) { // 状态变更失败则重试
            ExecutionState current = this.state;
            if (current == CANCELING || current == CANCELED) {
                // already taken care of, no need to cancel again
                return;
            }
            else if (current == RUNNING || current == DEPLOYING) {
                // 当前状态设为CANCELING,并向TaskExecutor发送RPC请求停止SubTask
                if (startCancelling(NUM_CANCEL_CALL_TRIES)) {
                    return;
                }
            } else if (current == FINISHED) {
                // 即使完成运行,后续也无法消费,释放分区结果
                sendReleaseIntermediateResultPartitionsRpcCall();
                return;
            } else if (current == FAILED) {
                return;
            } else if (current == CREATED || current == SCHEDULED) {
                // from here, we can directly switch to cancelled, because no task has been deployed
                if (cancelAtomically()) {
                    return;
                }
            } else {
                throw new IllegalStateException(current.name());
            }
        }
    }

  • 操作完毕后又会执行Task退出流程通知ExecutionGraph执行相应数据更新: ExecutionGraph.updateState(TaskExecutionStateTransition)->ExecutionGraph.updateStateInternal(TaskExecutionStateTransition, Execution) -> Execution.completeCancelling(…) -> Execution.finishCancellation(boolean) -> ExecutionGraph.deregisterExecution(Execution) 。在deregisterExecution操作会在currentExecutions中移除已停止的执行ExecutionTask。

2.2.4 Start Task

        private Runnable restartTasks(
                final Set<ExecutionVertexVersion> executionVertexVersions,
                final boolean isGlobalRecovery) {
            return () -> {
                final Set<ExecutionVertexID> verticesToRestart =
                        executionVertexVersioner.getUnmodifiedExecutionVertices(
                                executionVertexVersions);
    
                removeVerticesFromRestartPending(verticesToRestart);
                // 实例化新的SubTask执行实例(Execution)
                resetForNewExecutions(verticesToRestart);
    
                try {
                    // 恢复状态
                    restoreState(verticesToRestart, isGlobalRecovery);
                } catch (Throwable t) {
                    handleGlobalFailure(t);
                    return;
                }
                // 开始调度,申请Slot并部署
                schedulingStrategy.restartTasks(verticesToRestart);
            };
        }

3 Task失败的自动重启策略

Task 级别的故障重启,是系统自动进行的;

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/399098.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

算法项目(2)—— LSTM、RNN、GRU(SE注意力)、卡尔曼轨迹预测

本文包含什么? 项目运行的方式(包教会)项目代码LSTM、RNN、GRU(SE注意力)、卡尔曼四种算法进行轨迹预测.各种效果图运行有问题? csdn上后台随时售后.项目说明 本文实现了三种深度学习算法加传统算法卡尔曼滤波进行轨迹预测, 预测效果图 首先看下不同模型的指标: 模型RM…

MySQL学习Day19——索引的数据结构

一、为什么使用索引: 索引是存储引擎用于快速找到数据记录的一种数据结构&#xff0c;就好比一本教课书的目录部分&#xff0c;通过目录中找到对应文章的页码&#xff0c;便可快速定位到需要的文章。MySQL中也是一样的道理&#xff0c;进行数据査找时&#xff0c;首先查看查询…

相机图像质量研究(26)常见问题总结:CMOS期间对成像的影响--坏点

系列文章目录 相机图像质量研究(1)Camera成像流程介绍 相机图像质量研究(2)ISP专用平台调优介绍 相机图像质量研究(3)图像质量测试介绍 相机图像质量研究(4)常见问题总结&#xff1a;光学结构对成像的影响--焦距 相机图像质量研究(5)常见问题总结&#xff1a;光学结构对成…

FreeRTOS学习笔记——(FreeRTOS中断管理)

这里写目录标题 一、什么是中断&#xff1f;&#xff08;了解&#xff09;二、中断优先级分组设置&#xff08;熟悉&#xff09;三、中断相关寄存器&#xff08;熟悉&#xff09;四、FreeRTOS中断管理实验&#xff08;掌握&#xff09; 一、什么是中断&#xff1f;&#xff08;…

【Azure 架构师学习笔记】- Azure Databricks (8) --UC架构简介

本文属于【Azure 架构师学习笔记】系列。 本文属于【Azure Databricks】系列。 接上文 【Azure 架构师学习笔记】- Azure Databricks (7) --Unity Catalog(UC) 基本概念和组件 前言 UC 简单来说&#xff0c;就是管理两样东西&#xff1a;用户和元存储。 用户管理 所有Databri…

Flink 在蚂蚁实时特征平台的深度应用

摘要&#xff1a;本文整理自蚂蚁集团高级技术专家赵亮星云&#xff0c;在 Flink Forward Asia 2023 AI 特征工程专场的分享。本篇内容主要分为以下四部分&#xff1a; 蚂蚁特征平台特征实时计算特征 Serving特征仿真回溯 一、蚂蚁特征平台 蚂蚁特征平台是一个多计算模式融合的高…

小程序红包服务端请求一直是签名错误如何解决

当小程序红包服务端请求一直显示签名错误时&#xff0c;这可能是由于多种原因导致的&#xff0c;包括密钥错误、参数错误、签名算法错误、时间戳问题以及网络请求问题等。解决这个问题需要细心检查和分析&#xff0c;下面将简单的介绍一下如何针对这些可能的原因进行排查和解决…

19个Web前端交互式3D JavaScript框架和库

JavaScript &#xff08;JS&#xff09; 是一种轻量级的解释&#xff08;或即时编译&#xff09;编程语言&#xff0c;是世界上最流行的编程语言。JavaScript 是一种基于原型的多范式、单线程的动态语言&#xff0c;支持面向对象、命令式和声明式&#xff08;例如函数式编程&am…

使用 Next.js 连接 mysql 数据库

前言 本文主要为大家介绍&#xff0c;如何使用 Next 框架实现一个简单的后端接口&#xff0c;并且从数据库中请求数据返回给前端。 实现 创建api/getData文件夹 项目创建完成后在 app 文件下新建api文件夹&#xff0c;在 api 文件夹下新建 getData 文件夹&#xff0c;在 ge…

Windows使用NVM安装NodeJS

*注 1、安装NVM前&#xff0c;建议先卸载电脑上现有的NodeJS&#xff0c;避免冗余。 一、NVM介绍 NVM&#xff1a;Node Version Manage&#xff0c;即Node的版本管理工具。使用NVM&#xff0c;可以很方便地在多个NodeJS版本之间进行切换。 由于项目开发当中&#xff0c;不同…

网关服务gateway注册Consul时报错Consul service ids must not be empty

网关服务gateway启动时&#xff0c;初始化Consul相关配置时报错。 Consul service ids must not be empty, must start with a letter, end with a letter or digit, and have as interior characters only letters, digits, and hyphen: cbda-server-gateway:10.111.236.142:…

什么是 Wake-on-LAN?如何使用 Splashtop 远程喊醒电脑

在当今数字互联的世界里&#xff0c;远程访问电脑已不仅仅是一种便利&#xff0c;而是许多人的需要。无论是远程工作、IT 支持&#xff0c;还是管理整个网络中的计算机群&#xff0c;我们都必须掌握正确的工具和技术。 其中一项在远程访问中发挥关键作用的技术是 Wake-on-LAN …

跨境ERP定制指南:5大误区,如何避免项目失败?

随着全球化进程不断推进&#xff0c;越来越多的企业选择跨境ERP定制&#xff0c;以适应不同国家和地区的业务需求。然而&#xff0c;ERP定制项目常常面临诸多挑战&#xff0c;如果不正确处理&#xff0c;容易导致项目失败。作为跨境ERP定制领域的专家&#xff0c;下面我将分享5…

科技助力快乐养老,山东恒康养老服务中心与清雷科技达成合作

谈到养老服务&#xff0c;大家或许会有一些刻板印象。 如果说一个落落大方、笑容温柔的90后女孩是一家养老院的院长&#xff0c;很多人都会感到诧异。但就是这位来自山东省龙口市恒康养老服务中心的90后院长韩雨&#xff0c;实现了百分百入住率、百分百好评的养老服务奇迹。 韩…

印刷企业实施MES管理系统有哪些注意事项

随着信息技术的不断发展&#xff0c;MES管理系统解决方案已成为提升企业生产效率、优化资源配置、加强过程控制的重要手段。对于印刷企业而言&#xff0c;实施MES管理系统不仅可以提高生产过程的透明度&#xff0c;还可以减少浪费、提升产品质量&#xff0c;从而增强市场竞争力…

相机图像质量研究(23)常见问题总结:CMOS期间对成像的影响--紫晕

系列文章目录 相机图像质量研究(1)Camera成像流程介绍 相机图像质量研究(2)ISP专用平台调优介绍 相机图像质量研究(3)图像质量测试介绍 相机图像质量研究(4)常见问题总结&#xff1a;光学结构对成像的影响--焦距 相机图像质量研究(5)常见问题总结&#xff1a;光学结构对成…

github开启两步验证

https://docs.github.com/en/authentication/securing-your-account-with-two-factor-authentication-2fa/configuring-two-factor-authentication 我开发的chatgpt网站&#xff1a; https://chat.xutongbao.top/

Adobe将类ChatGPT集成到PDF中

2月21日&#xff0c;全球多媒体巨头Adobe在官网宣布&#xff0c;推出生成式AI助手AI Assistant&#xff0c;并将其集成在Reader 和Acrobat 两款PDF阅读器中。 据悉&#xff0c;AI Assistant的功能与ChatGPT相似&#xff0c;可以基于PDF文档提供摘要、核心见解、基于文档内容&a…

ansible及其模块

一、ansible是什么&#xff1f; Ansible是一个基于Python开发的配置管理和应用部署工具&#xff0c;现在也在自动化管理领域大放异彩。它融合了众多老牌运维工具的优点&#xff0c;Pubbet和Saltstack能实现的功能&#xff0c;Ansible基本上都可以实现。 Ansible能批量配置、部…

使用 Nuxt 构建简单后端接口及数据库数据请求

写在前面 本文主要为大家介绍&#xff0c;如何使用 Nuxt 框架实现一个简单的后端接口&#xff0c;并且从数据库中请求数据返回给前端。 实现 创建 serverMiddleware 文件夹 首先我们新建一个名字为 serverMiddleware 文件夹用来存储接口相关信息 目录结构如下&#xff1a;…