Flink四大基石之CheckPoint(检查点) 的使用详解

目录

一、Checkpoint 剖析

State 与 Checkpoint 概念区分

设置 Checkpoint 实战

执行代码所需的服务与遇到的问题

二、重启策略解读

重启策略意义

代码示例与效果展示

三、SavePoint 

与 Checkpoint 异同

操作步骤详解

四、总结


        在大数据流式处理领域,Apache Flink 凭借其卓越的性能和强大的功能占据重要地位。而理解 Flink 中的 Checkpoint(检查点)、重启策略以及 SavePoint(保存点)这些关键概念,对于保障流处理任务的稳定性、容错性以及可维护性至关重要。本文将深入剖析它们的原理、用法,并结合实际代码示例展示其效果,希望能帮助大家更好地掌握 Flink 相关知识。

一、Checkpoint 剖析

State 与 Checkpoint 概念区分

State(状态)

        在 Flink 中,State 代表某一个 Operator(算子)在某一时刻的状态,像常见的聚合算子 maxBysum 等操作过程中就会维护状态信息。比如在对数据流按某个字段做 sum 聚合时,它需要记住历史数据以便持续累加计算,并且这些状态数据默认存于内存之中,为算子的持续、准确运行提供依据。

Checkpoint(检查点 / 快照点)

        它是 Flink 中所有有状态的 Operator 在某一个特定时刻的 State 快照信息汇总,也就是 State 的存档记录。可以简单理解为对整个作业运行时状态拍了一张 “照片”,定格所有相关算子彼时的状态,方便后续在故障恢复等场景使用。

设置 Checkpoint 实战

以下是一段设置 Checkpoint 的 Flink Java 代码示例:

package com.bigdata.day06;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class _01CheckPointDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 在windows运行,将数据提交hdfs,会出现权限问题,使用这个语句解决。
        System.setProperty("HADOOP_USER_NAME", "root");
        // 在这个基础之上,添加快照
        // 第一句:开启快照,每隔1s保存一次快照
        env.enableCheckpointing(1000);
        // 第二句:设置快照保存的位置
        env.setStateBackend(new FsStateBackend("hdfs://bigdata01:9820/flink/checkpoint"));
        // 第三句: 通过webui的cancel按钮,取消flink的job时,不删除HDFS的checkpoint目录
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //2. source-加载数据
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                String[] arr = s.split(",");
                return Tuple2.of(arr[0], Integer.valueOf(arr[1]));
            }
        });
        //3. transformation-数据处理转换
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = mapStream.keyBy(0).sum(1);


        result.print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

执行代码所需的服务与遇到的问题

启动本地的nc, 启动hdfs服务。

启动代码,发现有权限问题:

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=admin, access=WRITE, inode="/flink":root:supergroup:drwxr-xr-x

解决方案:

System.setProperty("HADOOP_USER_NAME", "root");

        在设置检查点之前,设置一句这样带权限的语句,如果是集群运行中,不存在该问题。可以不设置!!! 

查看快照情况:

        运行,刷新查看checkpoint保存的数据,它会先生成一个新的文件夹,然后再删除老的文件夹,在某一时刻,会出现两个文件夹同时存在的情况。

启动HDFS、Flink

start-dfs.sh
start-cluster.sh

数据是保存了,但是并没有起作用,想起作用需要在集群上运行,以下演示集群上的效果:

第一次运行的时候

在本地先clean, 再package ,再Wagon一下:

flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jar

flink run -c com.bigdata.day06._01CheckPointDemo /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

记得,先启动nc ,再启动任务,否则报错!

通过nc -lk 9999 输入以下内容:

想查看运行结果,可以通过使用的slot数量判断一下:

取消flink job的运行

查看一下这次的单词统计到哪个数字了:

第二次运行的时候

flink run -c 全类名  -s hdfs://hadoop10:8020/flink-checkpoint/293395ef7e496bda2eddd153a18d5212/chk-34  /opt/app/flink-test-1.0-SNAPSHOT.jar

启动:
flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603  /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

-s 指定从checkpoint目录恢复状态数据 ,注意每个人都不一样

从上一次离开时,截止的checkpoint目录

观察数据:输入一个hello,1 得到新的结果hello,8

二、重启策略解读

重启策略意义


        流式数据如同永不干涸的河流持续流淌,一旦因某条错误数据致使程序异常退出,后续海量数据丢失风险极高,对企业而言,这意味着数据资产受损、业务分析结果偏差等严重后果,重启策略应运而生。它作为独立策略,与 Checkpoint 虽无必然绑定关系(即便没配置 Checkpoint 也能单独配置重启策略),却在保障程序持续运行层面协同发挥关键作用。

        一个流在运行过程中,假如出现了程序异常问题,可以进行重启,比如,在代码中人为添加一些异常:

进行wordcount时,输入了一个bug,1 人为触发异常。

        注意:此时如果有checkpoint ,是不会出现异常的,需要将checkpoint的代码关闭,再重启程序。会发现打印了异常,那为什么checkpoint的时候不打印,因为并没有log4j的配置文件,需要搞一个这样的配置文件才行。

程序中添加log4j.properties的代码:

# Global logging configuration
#  Debug   info   warn  error
log4j.rootLogger=debug, stdout
# MyBatis logging configuration...
log4j.logger.org.mybatis.example.BlogMapper=TRACE
# Console output...
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n

        开启检查点之后,报错了程序还在运行是因为开启检查点之后,程序会进行自动重启(无限重启【程序错了才重启】)

//开启checkpoint,默认是无限重启,可以设置为不重启
//env.setRestartStrategy(RestartStrategies.noRestart());

//重启3次,重启时间间隔是10s
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));

//2分钟内重启3次,重启时间间隔是5s
env.setRestartStrategy(
    RestartStrategies.failureRateRestart(3,
                                         Time.of(2,TimeUnit.MINUTES),
                                         Time.of(5,TimeUnit.SECONDS))
);


env.execute("checkpoint自动重启");   //最后一句execute可以设置jobName,显示在8081界面

程序如果上传至服务器端运行,可以看到重启状态

代码示例与效果展示

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.concurrent.TimeUnit;


public class Demo02 {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 代码中不能有checkpoint,不是说checkpoint不好,而是太好了,它已经自带重试机制了。而且是无限重启的
        // 通过如下方式可将重试机制关掉
        // env.setRestartStrategy(RestartStrategies.noRestart());
        //
        // 两种办法
        // 第一种办法:重试3次,每一次间隔10S
        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));
        // 第二种写法:在2分钟内,重启3次,每次间隔10s
        env.setRestartStrategy(
                RestartStrategies.failureRateRestart(3,
                        Time.of(2,TimeUnit.MINUTES),
                        Time.of(5,TimeUnit.SECONDS))
        );
        //2. source-加载数据
        DataStreamSource<String> streamSource = env.socketTextStream("bigdata01", 8899);
        streamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {

            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] arr = value.split(",");
                String word = arr[0];
                if(word.equals("bug")){
                    throw new Exception("有异常,服务会挂掉.....");
                }
                // 将一个字符串变为int类型
                int num = Integer.valueOf(arr[1]);
                // 第二种将字符串变为数字的方法
                System.out.println(Integer.parseInt(arr[1]));
                Tuple2<String, Integer> tuple2 = new Tuple2<>(word,num);
                // 还有什么方法? 第二种创建tuple的方法
                Tuple2<String, Integer> tuple2_2 = Tuple2.of(word,num);
                return tuple2;
            }
        }).keyBy(tuple->tuple.f0).sum(1).print();
        //3. transformation-数据处理转换
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

        在此代码中人为在 map 函数里设置异常触发点(输入包含 “bug” 的数据时抛出异常)。若开启 Checkpoint,因它自带重试机制(默认无限重启),异常可能被掩盖,需关闭 Checkpoint 相关代码才能看到异常打印情况。同时,要完整看到重启策略效果(如按设定的次数、间隔重启),需打包代码上传至集群运行,本地测试难以呈现完整现象,且提交时务必确认使用的类名准确无误。

三、SavePoint 

与 Checkpoint 异同

相同点

        本质都是对 Flink 作业状态的一种保存方式,以便后续恢复作业时复用状态,保障数据处理连贯性。

不同点

        Checkpoint 是 Flink 自动按设定规则周期性完成 State 快照保存,旨在应对故障自动恢复场景;而 SavePoint 是手动触发的快照操作,提供更灵活的作业状态管理时机,比如在版本升级、业务规则调整需暂停并后续重启作业场景发挥优势。

操作步骤详解

提交作业并输入数据

        提交含重启策略代码打包成的 jar 包运行作业(类似 flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jar),输入数据观察单词对应数字变化。

执行 SavePoint 操作

以下是 -->  停止flink job,并且触发savepoint操作
flink stop --savepointPath  hdfs://bigdata01:9820/flink-savepoint  152e493da9cdeb327f6cbbad5a7f8e41

后面的序号为Job 的ID

以下是 -->  不会停止flink的job,只是完成savepoint操作
flink savepoint 79f53c5c0bb3563b6b6ed3011176c411 hdfs://bigdata01:9820/flink-savepoint

备注:如何正确停止一个 flink 的任务

flink stop 6a27b580aa5c6b57766ae6241d9270ce(任务编号)

查看与重启作业

        查看最近完成作业对应的 SavePoint,之后依据之前保存路径重启作业(flink run -c 全类名 -s hdfs://hadoop10:8020/flink-savepoint/savepoint-79f53c-64b5d94771eb /opt/app/flink-test-1.0-SNAPSHOT.jar),再次输入数据可看到基于之前状态的累加效果。

        此外,在集群运行 Flink 程序时,默认并行度常为 1,它不会按照机器的CPU核数,而是按照配置文件中的一个默认值运行的。比如:flink-conf.yaml

web-ui 界面提交作业:


这个图形化界面,跟我们使用如下命令是一个效果:

flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603  /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

四、总结

        通过对 Flink 中 Checkpoint、重启策略和 SavePoint 的详细解读与代码实践展示,我们明晰它们各自在保障流处理任务稳定、容错与灵活运维层面的独特价值。合理运用这些机制,能助我们打造更健壮、高效的 Flink 大数据处理应用,从容应对复杂多变的业务需求与运行环境挑战,后续大家可在实际项目中深入实践优化,挖掘其更大潜力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/927610.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

springboot旅游管理系统的设计与实现

springboot旅游管理系统的设计与实现 如需源码pc端&#x1f449;&#x1f449;&#x1f449;资源 手机端&#x1f449;&#x1f449;&#x1f449;资源 摘 要 信息化社会内需要与之针对性的信息获取途径&#xff0c;但是途径的扩展基本上为人们所努力的方向&#xff0c;由于…

16asm - 汇编介绍 和 debug使用

文章目录 前言硬件运行机制微机系统硬件组成计算机系统组成8086cpu组织架构dosbox安装配置debug debug使用R命令D命令E命令U命令T命令A命令标志寄存器 总结 前言 各位师傅大家好&#xff0c;我是qmx_07&#xff0c;今天给大家讲解 十六位汇编 和 debug调试器的使用 硬件运行…

多级缓存设计实践

缓存是什么&#xff1f; 缓存技术是一种用于加速数据访问的优化策略。它通过将频繁访问的数据存储在高速存储介质&#xff08;如内存&#xff09;中&#xff0c;减少对慢速存储设备&#xff08;如硬盘或远程服务器&#xff09;的访问次数&#xff0c;从而提升系统的响应速度和…

鸿蒙开发-HMS Kit能力集(地图服务、华为支付服务)

地图服务 Map Kit&#xff08;地图服务&#xff09;是鸿蒙生态下的一个地图服务&#xff0c;为开发者提供强大而便捷的地图能力&#xff0c;助力全球开发者实现个性化地图呈现、地图搜索和路线规划等功能&#xff0c;轻松完成地图构建工作。 Map Kit提供了千万级别的 Poi&…

【k8s深入学习之 event 记录】初步了解 k8s event 记录机制

event 事件记录初始化 一般在控制器都会有如下的初始化函数&#xff0c;初始化 event 记录器等参数 1. 创建 EventBroadcaster record.NewBroadcaster(): 创建事件广播器&#xff0c;用于记录和分发事件。StartLogging(klog.Infof): 将事件以日志的形式输出。StartRecording…

STM32 ADC --- 知识点总结

STM32 ADC — 知识点总结 文章目录 STM32 ADC --- 知识点总结cubeMX中配置注解单次转换模式、连续转换模式、扫描模式单通道采样的情况单次转换模式&#xff1a;连续转换模式&#xff1a; 多通道采样的情况禁止扫描模式&#xff08;单次转换模式或连续转换模式&#xff09;单次…

如何打开链接中的网址

文章目录 1 概念介绍2 使用方法3 示例代码我们在上一章回中介绍了包管理相关的内容,本章回中将介绍如何使用url_launcher包.闲话休提,让我们一起Talk Flutter吧。 1 概念介绍 我们在这里介绍url_launcher包主要用来打开Url中的内容,Url可以是电话号码,网址,邮箱等内容。如…

cpp的set

一、关联式容器和键值对 1.关联式容器 关联式容器也是用来存储数据的&#xff0c;与序列式容器不同的是&#xff0c;其里面存储的是<key, value>结构的键值对&#xff0c;在数据检索时比序列式容器效率更高 2.键值对 用来表示具有一一对应关系的一种结构&#xff0c;…

Vue3学习宝典

1.ref函数调用的方式生成响应式数据&#xff0c;可以传复杂和简单数据类型 <script setup> // reactive接收一个对象类型的数据 import { reactive } from vue;// ref用函数调用的方式生成响应式数据&#xff0c;可以传复杂和简单数据类型 import { ref } from vue // 简…

数据结构——排序第三幕(深究快排(非递归实现)、快排的优化、内省排序,排序总结)超详细!!!!

文章目录 前言一、非递归实现快排二、快排的优化版本三、内省排序四、排序算法复杂度以及稳定性的分析总结 前言 继上一篇博客基于递归的方式学习了快速排序和归并排序 今天我们来深究快速排序&#xff0c;使用栈的数据结构非递归实现快排&#xff0c;优化快排&#xff08;三路…

数字经济发展的新视角:数字产业化、数据资产化、产业数字化与数据产业

在当今数字化、网络化和智能化的时代&#xff0c;数字经济的快速发展催生了一系列新兴概念&#xff0c;其中“数字产业化、数据资产化、产业数字化与数据产业”尤为引人注目。这四个概念不仅代表了数字经济发展的不同阶段和方向&#xff0c;也深刻影响着传统产业的转型升级和经…

springboot370高校宣讲会管理系统(论文+源码)_kaic

毕 业 设 计&#xff08;论 文&#xff09; 高校宣讲会管理系统设计与实现 摘 要 传统办法管理信息首先需要花费的时间比较多&#xff0c;其次数据出错率比较高&#xff0c;而且对错误的数据进行更改也比较困难&#xff0c;最后&#xff0c;检索数据费事费力。因此&#xff0c…

ansible自动化运维(一)配置主机清单

目录 一、介绍 1.1了解自动化运维 1.2 ansible简介 1.3 ansible自动化运维的优势 1.4 ansible架构图 二、部署ansible 2.1 基本参数 2.2 Ansible帮助命令 2.3 配置主机清单 2.3.1 查看ansible的所有配置文件 2.3.2 /etc/ansible/ansible.cfg常用配置选项 2.3.3 ssh密…

计算机网络 —— HTTP 协议(详解)

前一篇文章&#xff1a;网页版五子棋—— WebSocket 协议_网页可以实现websocket吗-CSDN博客 目录 前言 一、HTTP 协议简介 二、HTTP 协议格式 1.抓包工具的使用 2.抓包工具的原理 3.抓包结果 4.HTTP协议格式总结 三、HTTP 请求 1. URL &#xff08;1&#xff09;UR…

GaussDB的BTree索引和UBTree索引

目录 一、简介 二、BTree索引和UBTree索引结构 三、BTree索引和UBTree索引优势 四、总结与展望 一、简介 数据库通常使用索引来提高业务查询的速度。本文将深入介绍GaussDB中最常用的两种索引&#xff1a;BTree索引和UBTree索引。我们将重点解读BTree索引和UBTree索引的存储…

通义灵码走进北京大学创新课堂丨阿里云云原生 10 月产品月报

云原生月度动态 云原生是企业数字创新的最短路径。 《阿里云云原生每月动态》&#xff0c;从趋势热点、产品新功能、服务客户、开源与开发者动态等方面&#xff0c;为企业提供数字化的路径与指南。 趋势热点 &#x1f947; 通义灵码走进北京大学创新课堂&#xff0c;与 400…

python 练习题

目录 1&#xff0c;输入三个整数&#xff0c;按升序输出 2&#xff0c;输入年份及1-12月份&#xff0c;判断月份属于大月&#xff0c;小月&#xff0c;闰月&#xff0c;平月&#xff0c;并输出本月天数 3&#xff0c;输入一个整数&#xff0c;显示其所有是素数因子 4&#…

IDEA 2024 配置Maven

Step 1:确定下载Apache Maven版本 在IDEA 2024中&#xff0c;随便新建一个Maven项目&#xff1b; 在File下拉菜单栏中&#xff0c;找到Setings&#xff1b; 在Build&#xff0c;Execution&#xff0c;Deployment中找到Maven 确定下载的Apache Maven版本应略低于或等于IDEA绑…

ubuntu20.04更换安装高版本CUDA以及多个CUDA版本管理

Ubuntu 20.04下多版本CUDA的安装与切换 CUDA安装配置环境变量软连接附上参考博客CUDA安装 cuda官方下载地址 因为我需要安装的是11.1版本的,所以这里按着11.1举例安装 安装命令如下: wget https://developer.download.nvidia.com/compute/cuda/11.1.0/local_installers/cu…

Web基础

实践目标 &#xff08;1&#xff09;Web前端HTML&#xff08;2&#xff09;Web前端javascipt&#xff08;3&#xff09;Web后端&#xff1a;MySQL基础&#xff1a;正常安装、启动MySQL&#xff0c;建库、创建用户、修改密码、建表&#xff08;4&#xff09;Web后端&#xff1a…