Flink四大基石之CheckPoint

1、State Vs Checkpoint

State:状态,是Flink中某一个Operator在某一个时刻的状态,如maxBy/sum,注意State存的是历史数据/状态,存在内存中。

Checkpoint:快照点, 是Flink中所有有状态的Operator在某一个时刻的State快照信息/存档信息。

一句话概括: Checkpoint就是State的快照。

2、设置Checkpoint

package com.bigdata.day06;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class _01CheckPointDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 在windows运行,将数据提交hdfs,会出现权限问题,使用这个语句解决。
        System.setProperty("HADOOP_USER_NAME", "root");
        // 在这个基础之上,添加快照
        // 第一句:开启快照,每隔1s保存一次快照
        env.enableCheckpointing(1000);
        // 第二句:设置快照保存的位置
        env.setStateBackend(new FsStateBackend("hdfs://bigdata01:9820/flink/checkpoint"));
        // 第三句: 通过webui的cancel按钮,取消flink的job时,不删除HDFS的checkpoint目录
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //2. source-加载数据
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                String[] arr = s.split(",");
                return Tuple2.of(arr[0], Integer.valueOf(arr[1]));
            }
        });
        //3. transformation-数据处理转换
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = mapStream.keyBy(0).sum(1);


        result.print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

测试代码效果:启动本地的nc, 启动hdfs服务。

启动代码,发现有权限问题:

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=admin, access=WRITE, inode="/flink":root:supergroup:drwxr-xr-x

解决方案:

System.setProperty("HADOOP_USER_NAME", "root");

在设置检查点之前,设置一句这样带权限的语句,如果是集群运行,不存在该问题。可以不设置!!!

查看快照情况:

运行,刷新查看checkpoint保存的数据,它会先生成一个新的文件夹,然后再删除老的文件夹,在某一时刻,会出现两个文件夹同时存在的情况。

 

启动HDFS、Flink

[root@hadoop10 app]#start-dfs.sh
[root@hadoop10 app]#start-cluster.sh

数据是保存了,但是并没有起作用,想起作用需要在集群上运行,以下演示集群上的效果:

第一次运行的时候 

在本地先clean, 再package ,再Wagon一下:

flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jar

flink run -c com.bigdata.day06._01CheckPointDemo /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

记得,先启动nc ,再启动任务,否则报错!

 通过nc -lk 9999 输入以下内容:

 

想查看运行结果,可以通过使用的slot数量判断一下:

 取消flink job的运行

 查看一下这次的单词统计到哪个数字了:

第二次运行的时候 

flink run -c 全类名  -s hdfs://hadoop10:8020/flink-checkpoint/293395ef7e496bda2eddd153a18d5212/chk-34  /opt/app/flink-test-1.0-SNAPSHOT.jar

启动:
flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603  /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

-s 指定从checkpoint目录恢复状态数据 ,注意每个人都不一样

从上一次离开时,截止的checkpoint目录

 

 观察数据:输入一个hello,1 得到新的结果hello,8

3、重启策略

重启策略的意义:流式数据是不可能停止的,假如有一条错误数据导致程序直接退出,后面的大量数据是会丢失的,对公司来讲,意义是重大的,损失是惨重的。

重启策略是一个单独的策略,如果你配置了 checkpoint 含有重启策略的,如果你没有 checkpoint 也可以自行配置重启策略,总之重启策略和 checkpoint 没有必然联系。

就是一个流在运行过程中,假如出现了程序异常问题,可以进行重启,比如,在代码中人为添加一些异常:

 进行wordcount时,输入了一个bug,1 人为触发异常。

注意:此时如果有checkpoint ,是不会出现异常的,需要将checkpoint的代码关闭,再重启程序。会发现打印了异常,那为什么checkpoint的时候不打印,因为并没有log4j的配置文件,需要搞一个这样的配置文件才行。

程序中添加log4J的代码:

# Global logging configuration
#  Debug   info   warn  error
log4j.rootLogger=debug, stdout
# MyBatis logging configuration...
log4j.logger.org.mybatis.example.BlogMapper=TRACE
# Console output...
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n

 

那为什么开启检查点之后,报错了程序还在运行?因为开启检查点之后,程序会进行自动重启(无限重启【程序错了才重启】)

//开启checkpoint,默认是无限重启,可以设置为不重启
//env.setRestartStrategy(RestartStrategies.noRestart());

//重启3次,重启时间间隔是10s
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));

//2分钟内重启3次,重启时间间隔是5s
env.setRestartStrategy(
    RestartStrategies.failureRateRestart(3,
                                         Time.of(2,TimeUnit.MINUTES),
                                         Time.of(5,TimeUnit.SECONDS))
);


env.execute("checkpoint自动重启");   //最后一句execute可以设置jobName,显示在8081界面

 程序如果上传至服务器端运行,可以看到重启状态

完整代码如下:

package com.bigdata.day06;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.concurrent.TimeUnit;


public class Demo02 {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 代码中不能有checkpoint,不是说checkpoint不好,而是太好了,它已经自带重试机制了。而且是无限重启的
        // 通过如下方式可将重试机制关掉
        // env.setRestartStrategy(RestartStrategies.noRestart());
        //
        // 两种办法
        // 第一种办法:重试3次,每一次间隔10S
        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));
        // 第二种写法:在2分钟内,重启3次,每次间隔10s
        env.setRestartStrategy(
                RestartStrategies.failureRateRestart(3,
                        Time.of(2,TimeUnit.MINUTES),
                        Time.of(5,TimeUnit.SECONDS))
        );
        //2. source-加载数据
        DataStreamSource<String> streamSource = env.socketTextStream("bigdata01", 8899);
        streamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {

            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] arr = value.split(",");
                String word = arr[0];
                if(word.equals("bug")){
                    throw new Exception("有异常,服务会挂掉.....");
                }
                // 将一个字符串变为int类型
                int num = Integer.valueOf(arr[1]);
                // 第二种将字符串变为数字的方法
                System.out.println(Integer.parseInt(arr[1]));
                Tuple2<String, Integer> tuple2 = new Tuple2<>(word,num);
                // 还有什么方法? 第二种创建tuple的方法
                Tuple2<String, Integer> tuple2_2 = Tuple2.of(word,num);
                return tuple2;
            }
        }).keyBy(tuple->tuple.f0).sum(1).print();
        //3. transformation-数据处理转换
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

在本地测试,是没有办法看到重试机制的现象的,需要打包上传至集群,特别注意:使用的类名到底是哪一个。

4、savePoint

checkpoint自动完成state快照、savePoint是手动的完成快照。

如果程序在没有设置checkpoint的情况,可以通过savePoint设置state快照

1.提交一个flink job --提交的还是重启策略的代码打成的jar包

flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jar

2.输入一些数据,观察单词对应的数字的变化

3.执行savepoint操作

以下是 -->  停止flink job,并且触发savepoint操作
flink stop --savepointPath  hdfs://bigdata01:9820/flink-savepoint  152e493da9cdeb327f6cbbad5a7f8e41

后面的序号为Job 的ID

以下是 -->  不会停止flink的job,只是完成savepoint操作
flink savepoint 79f53c5c0bb3563b6b6ed3011176c411 hdfs://bigdata01:9820/flink-savepoint

备注:如何正确停止一个 flink 的任务

flink stop 6a27b580aa5c6b57766ae6241d9270ce(任务编号)

4.查看最近完成的flink job对应的savepoint

5.根据之前的savepoint路径,重新启动flink job

flink run -c 全类名 -s hdfs://hadoop10:8020/flink-savepoint/savepoint-79f53c-64b5d94771eb /opt/app/flink-test-1.0-SNAPSHOT.jar
Job has been submitted with JobID 3766ec9ff6f34b46376493a04b50a1f4

 再次输入单词,可以看到在之前的基础上累加

另外,在集群中运行我们的程序,默认并行度为1,它不会按照机器的CPU核数,而是按照配置文件中的一个默认值运行的。比如:flink-conf.yaml

web-ui 界面提交作业:

 

这个图形化界面,跟我们使用如下命令是一个效果:

flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603  /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/925377.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

如何给GitHub的开源项目贡献PR

&#x1f3af;导读&#xff1a;本文详细介绍了如何向开源项目“代码随想录”贡献自己的题解。首先&#xff0c;需要Fork原项目的仓库至个人GitHub账户&#xff0c;然后解决克隆仓库时可能遇到的SSH密钥问题。接着&#xff0c;按照标准流程对本地仓库进行代码或文档的修改&#…

不可分割的整体—系统思考的微妙法则

不可分割的整体——系统思考的微妙法则 作为企业领导者&#xff0c;我们经常需要做出决策&#xff0c;但有时候&#xff0c;我们会忽略一个事实&#xff1a;每个决策都不是孤立的&#xff0c;它背后都是一个复杂系统的一部分。 无论是市场动态、团队协作&#xff0c;还是产品…

layui-vue第三方库表格列事件怎么写

插槽写入列点击事件 <div class"le-table-box" ref"TableBoxRef" :style"{ height: ShowPage ? calc(100% - 60px) : 100% }"><lay-table row-double"dbRowClick" :columns"TableColumn" :data-source"Table…

第一个 C++ 程序 001

1. main 函数 同 c 语言的 main 函数 2. 字符和字符串 在C的 STL 中⼜引⼊了 string 来表⽰字符串&#xff0c;功能更加强⼤&#xff0c;C 语⾔不⽀持&#xff0c;后期会详细介绍。 其他的和 c 语言类似 3. 头文件 和 C 语言中的头文件一样&#xff0c;相当于一个工具箱 不过…

修改插槽样式,el-input 插槽 append 的样式

需缩少插槽 append 的 宽度 方法1、使用内联样式直接修改&#xff0c;指定 width 为 30px <el-input v-model"props.applyBasicInfo.outerApplyId" :disabled"props.operateCommandType input-modify"><template #append><el-button click…

Hot100 - 搜索二维矩阵II

Hot100 - 搜索二维矩阵II 最佳思路&#xff1a; 利用矩阵的特性&#xff0c;针对搜索操作可以从右上角或者左下角开始。通过判断当前位置的元素与目标值的关系&#xff0c;逐步缩小搜索范围&#xff0c;从而达到较高的效率。 从右上角开始&#xff1a;假设矩阵是升序排列的&a…

杂七杂八的网络安全知识

一、信息安全概述# 1.信息与信息安全# 信息与信息技术 信息奠基人&#xff1a;香农&#xff1a;信息是用来消除随机不确定性的东西 信息的定义&#xff1a;信息是有意义的数据&#xff0c;是一种要适当保护的资产。数据经过加工处理之后&#xff0c;就成为信息。而信息需要…

Vision Transformer(vit)的Embedding层结构

代码&#xff1a; class PatchEmbed(nn.Module):"""2D Image to Patch Embedding"""def __init__(self, img_size224, patch_size16, in_c3, embed_dim768, norm_layerNone):super().__init__()img_size (img_size, img_size) #图像尺寸默认22…

Spring Boot 实战:基于 Validation 注解实现分层数据校验与校验异常拦截器统一返回处理

1. 概述 本文介绍了在spring boot框架下&#xff0c;使用validation数据校验注解&#xff0c;针对不同请求链接的前端传参数据&#xff0c;进行分层视图对象的校验&#xff0c;并通过配置全局异常处理器捕获传参校验失败异常&#xff0c;自动返回校验出错的异常数据。 2. 依赖…

Linux查看网络基础命令

文章目录 Linux网络基础命令1. ifconfig 和 ip一、ifconfig命令二、ip命令 2. ss命令一、基本用法二、常用选项三、输出信息四、使用示例 3. sar 命令一、使用sar查看网络使用情况 4. ping 命令一、基本用法二、常用选项三、输出结果四、使用示例 Linux网络基础命令 1. ifconf…

Python酷库之旅-第三方库Pandas(245)

目录 一、用法精讲 1156、pandas.tseries.offsets.MonthEnd.is_month_start方法 1156-1、语法 1156-2、参数 1156-3、功能 1156-4、返回值 1156-5、说明 1156-6、用法 1156-6-1、数据准备 1156-6-2、代码示例 1156-6-3、结果输出 1157、pandas.tseries.offsets.Mon…

DAMODEL丹摩|Faster-Rcnn训练与部署实战

本文仅做测评体验&#xff0c;非广告。 文章目录 1. 丹摩介绍2. Faster-Rcnn介绍3. 准备3. 1 丹摩平台准备实例 3. 2 Faster-Rcnn4. 部署开始5. 训练5. 资源释放6. 结语 1. 丹摩介绍 详细介绍请看&#xff1a;丹摩平台介绍。 丹摩智算平台&#xff08;DAMODEL&#xff09;是…

NLP信息抽取大总结:三大任务(带Prompt模板)

信息抽取大总结 1.NLP的信息抽取的本质&#xff1f;2.信息抽取三大任务&#xff1f;3.开放域VS限定域4.信息抽取三大范式&#xff1f;范式一&#xff1a;基于自定义规则抽取&#xff08;2018年前&#xff09;范式二&#xff1a;基于Bert下游任务建模抽取&#xff08;2018年后&a…

LLM*:路径规划的大型语言模型增强增量启发式搜索

路径规划是机器人技术和自主导航中的一个基本科学问题&#xff0c;需要从起点到目的地推导出有效的路线&#xff0c;同时避开障碍物。A* 及其变体等传统算法能够确保路径有效性&#xff0c;但随着状态空间的增长&#xff0c;计算和内存效率会严重降低。相反&#xff0c;大型语言…

C#基础题总结

16.一张单据上有一个5位数的号码为6**42&#xff0c;其中百位数和千位数已模糊不清&#xff0c;但知道该数能被 57 和 67 除尽。设计一个算法&#xff0c;找出该单据所有可能的号码。 17.编程序求2&#xff5e;10000以内的完全数。一个数的因子&#xff08;除了这个数本身&…

Android数据存储——文件存储、SharedPreferences、SQLite、Litepal

数据存储全方案——详解持久化技术 Android系统中主要提供了3中方式用于简单地实现数据持久化功能&#xff0c;即文件存储、SharedPreference存储以及数据库存储。除了这三种方式外&#xff0c;还可以将数据保存在手机的SD卡中&#xff0c;不给使用文件、SharedPreference或者…

【动手学电机驱动】STM32-FOC(8)MCSDK Profiler 电机参数辨识

STM32-FOC&#xff08;1&#xff09;STM32 电机控制的软件开发环境 STM32-FOC&#xff08;2&#xff09;STM32 导入和创建项目 STM32-FOC&#xff08;3&#xff09;STM32 三路互补 PWM 输出 STM32-FOC&#xff08;4&#xff09;IHM03 电机控制套件介绍 STM32-FOC&#xff08;5&…

ubuntu 安装proxychains

在Ubuntu上安装Proxychains&#xff0c;你可以按照以下步骤操作&#xff1a; 1、更新列表 sudo apt-update 2、安装Proxychains sudo apt-get install proxychains 3、安装完成后&#xff0c;你可以通过编辑/etc/proxychains.conf文件来配置代理规则 以下是一个简单的配置示例&…

ZooKeeper 基础知识总结

先赞后看&#xff0c;Java进阶一大半 ZooKeeper 官网这样介绍道&#xff1a;ZooKeeper 是一种集中式服务&#xff0c;用于维护配置信息、命名、提供分布式同步和提供组服务。 各位hao&#xff0c;我是南哥&#xff0c;相信对你通关面试、拿下Offer有所帮助。 ⭐⭐⭐一份南哥编写…

visionpro官方示例分析(一) 模板匹配工具 缺陷检测工具

1.需求&#xff1a;找出图像中的这个图形。 2.步骤 使用CogPMAlignTool工具&#xff0c;该工具是模板匹配工具&#xff0c;见名知意&#xff0c;所谓模板匹配工具就是说先使用该工具对一张图像建立模板&#xff0c;然后用这个模板在其他图像上进行匹配&#xff0c;匹配上了就说…