【详细介绍及演示】Flink之checkpoint检查点的使用

目录

一、介绍

二、 设置checkpoint检查点演示

1、 代码演示

 2、测试代码效果

3、查看快照情况

​编辑

三、在集群上运行

1、第一次运行

2、第二次运行

四、自定义检查点savePoint

1、提交一个flink job  打成jar包

2、输入一些数据,观察单词对应的数字的变化

​编辑 3、执行savepoint操作,添加检查点

 4、查看最近完成的flink job对应的savepoint

5、重新启动flink job,进行测试

6、观察变化

五、总结


一、介绍

Checkpoint:快照点, 是Flink中所有有状态的Operator在某一个时刻的State快照信息/存档信息。

一句话概括: Checkpoint就是State的快照。

二、 设置checkpoint检查点演示

简单的举例说明:

1、 代码演示

package com.bigdata.checkpoint;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.concurrent.TimeUnit;

public class CheckPointWordCountDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 在windows运行,将数据提交hdfs,会出现权限问题,使用这个语句解决。
        System.setProperty("HADOOP_USER_NAME", "root");
        // 在这个基础之上,添加快照
        // 第一句:开启快照,每隔1s保存一次快照
        env.enableCheckpointing(1000);
        // 第二句:设置快照保存的位置
        env.setStateBackend(new FsStateBackend("hdfs://bigdata01:9820/flink/checkpoint"));
        // 第三句: 通过webui的cancel按钮,取消flink的job时,不删除HDFS的checkpoint目录
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //checkpoint默认重启策略是一直重启,可以自己定义重启策略
        //重启策略可以单独使用,不设置checkpoint也可使用   //savepoint可以手动使用命令设置checkpoint
        //2分钟内重启3次,重启时间间隔是5s
        env.setRestartStrategy(
                RestartStrategies.failureRateRestart(3,
                        Time.of(2, TimeUnit.MINUTES),
                        Time.of(5,TimeUnit.SECONDS))
        );

        //2. source-加载数据
        DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
        //3. transformation-数据处理转换
        source.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                String[] arr = line.split(" ");
                for (String word : arr) {
                    collector.collect(word);
                }
            }
        }).map(new MapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word,1);
            }
        }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
                return tuple2.f0;
            }
        }).sum(1).print();
        //4. sink-数据输出

        //5. execute-执行
        env.execute();
    }
}

 2、测试代码效果

首先启动本地的nc, 启动hdfs服务

3、查看快照情况

运行,刷新查看checkpoint保存的数据,它会先生成一个新的文件夹,然后再删除老的文件夹,在某一时刻,会出现两个文件夹同时存在的情况。

三、在集群上运行

 首先启动flink:start-cluster.sh

由上一步可以发现数据是保存了,但是并没有起作用,想起作用需要在集群上运行,以下演示集群上的效果:

1、第一次运行

在本地先clean, 再package ,再Wagon一下:

在bigdata01服务器上执行以下命令 

#flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jar

flink run -c com.bigdata.day06._01CheckPointDemo /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

#记得,先启动nc ,再启动任务,否则报错!

通过nc -lk 9999 输入以下内容:

进入bigdata01:8081页面上查看结果 

想查看运行结果,可以通过使用的slot数量判断一下:

取消flink job的运行

 查看一下这次的单词统计到哪个数字了:

2、第二次运行

#flink run -c 全类名  -s hdfs://hadoop10:8020/flink-checkpoint/293395ef7e496bda2eddd153a18d5212/chk-34  /opt/app/flink-test-1.0-SNAPSHOT.jar

#启动
flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603  /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

-s 指定从checkpoint目录恢复状态数据 ,注意每个人都不一样

从上一次离开时,截止的checkpoint目录

观察数据:在nc 上输入一个hello,1 得到新的结果hello,8

四、自定义检查点savePoint

checkpoint自动完成state快照、savePoint是手动的完成快照。

如果程序在没有设置checkpoint的情况,可以通过savePoint设置state快照

1、提交一个flink job  打成jar包

package com.bigdata.checkpoint;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.concurrent.TimeUnit;


public class CheckPointWordCountDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //重启策略可以单独使用,不设置checkpoint也可使用  
        //savepoint可以手动使用命令设置checkpoint
        //2分钟内重启3次,重启时间间隔是5s
        env.setRestartStrategy(
                RestartStrategies.failureRateRestart(3,
                        Time.of(2, TimeUnit.MINUTES),
                        Time.of(5,TimeUnit.SECONDS))
        );

        //2. source-加载数据
        DataStreamSource<String> source = env.socketTextStream("bigdata01", 2727);
        //3. transformation-数据处理转换
        source.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                String[] arr = line.split(",");
                for (String word : arr) {
                    collector.collect(word);
                }
            }
        }).map(new MapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                //自制一个bug用来测试
                if(word.equals("bug")){
                    throw new Exception("出错了,请重试");
                }
                return Tuple2.of(word,1);
            }
        }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
                return tuple2.f0;
            }
        }).sum(1).print();
        //4. sink-数据输出

        //5. execute-执行
        env.execute();
    }
}

执行改任务 

2、输入一些数据,观察单词对应的数字的变化

 3、执行savepoint操作,添加检查点

  • 停止flink job,并且触发savepoint操作
flink stop --savepointPath  hdfs://bigdata01:9820/flink-savepoint  152e493da9cdeb327f6cbbad5a7f8e41

    后面的序号为Job 的ID

  • 不会停止flink的job,只是完成savepoint操作(执行这个操作)
flink savepoint 79f53c5c0bb3563b6b6ed3011176c411 hdfs://bigdata01:9820/flink-savepoint
  •  停止一个 flink 的任务
flink stop 6a27b580aa5c6b57766ae6241d9270ce

    后面的序号为Job 的ID

 4、查看最近完成的flink job对应的savepoint

 

发现任务中已经有检查点 

5、重新启动flink job,进行测试

停止任务后,查看最终检查点的路径

然后重新启动

flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink-savepoint/savepoint-79f53c-64b5d94771eb 
 /opt/app/flink-test-1.0-SNAPSHOT.jar

-c后是全类名,-s 后是检查的路径 ,最后一部分是jar包的位置

6、观察变化

再次输入单词,可以看到在之前的基础上累加

另外,在集群中运行我们的程序,默认并行度为1,它不会按照机器的CPU核数,而是按照配置文件中的一个默认值运行的

五、总结

有两种添加检查点的方式:

1、在java代码中自动添加

在执行任务时会在hdfs上创建检查点

// 第一句:开启快照,每隔1s保存一次快照
env.enableCheckpointing(1000);
// 第二句:设置快照保存的位置
env.setStateBackend(new FsStateBackend("hdfs://bigdata01:9820/flink/checkpoint"));
// 第三句: 通过webui的cancel按钮,取消flink的job时,不删除HDFS的checkpoint目录
  env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

2、在集群上通过命令在指定位置手动添加

flink savepoint 任务号  hdfs://bigdata01:9820/flink-savepoint

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/926893.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【进阶篇-Day15:JAVA线程-Thread的介绍】

目录 1、进程和线程1.1 进程的介绍1.2 并行和并发1.3 线程的介绍 2、JAVA开启线程的三种方法2.1 继承Thread类&#xff1a;2.2 实现Runnable接口2.3 实现Callable接口2.4 总结&#xff1a; 3、线程相关方法3.1 获取和设置线程名字的方法3.2 线程休眠方法&#xff1a;3.3 线程优…

springboot(20)(删除文章分类。获取、更新、删除文章详细)(Validation分组校验)

目录 一、删除文章分类功能。 &#xff08;1&#xff09;接口文档。 1、请求路径、请求参数。 2、请求参数。 3、响应数据。 &#xff08;2&#xff09;实现思路与代码书写。 1、controller层。 2、service接口业务层。 3、serviceImpl实现类。 4、mapper层。 5、后端接口测试。…

如何搭建JMeter分布式集群环境来进行性能测试

在性能测试中&#xff0c;当面对海量用户请求的压力测试时&#xff0c;单机模式的JMeter往往力不从心。如何通过分布式集群环境&#xff0c;充分发挥JMeter的性能测试能力&#xff1f;这正是许多测试工程师在面临高并发、海量数据时最关注的问题。那么&#xff0c;如何轻松搭建…

Y20030025基于php+mysql的幼儿健康管理系统设计与实现 源代码 配置 文档

幼儿健康管理系统的设计与实现 1.摘要2.开发目的和意义3.系统功能设计4.系统界面截图5.源码获取 1.摘要 在信息化时代的浪潮中&#xff0c;幼儿健康管理面临着前所未有的挑战与机遇。为了更好地满足家长和幼儿园对幼儿健康管理的需求&#xff0c;我们致力于开发一套基于PHP的幼…

时频转换 | Matlab基于垂直二阶同步压缩变换vertical second-order synchrosqueezing一维数据转二维图像方法

目录 基本介绍程序设计参考资料获取方式基本介绍 时频转换 | Matlab基于垂直二阶同步压缩变换vertical second-order synchrosqueezing一维数据转二维图像方法 程序设计 clear clc % close all load x.mat % 导入数据 x

1.1 数据结构的基本概念

1.1.1 基本概念和术语 一、数据、数据对象、数据元素和数据项的概念和关系 数据&#xff1a;是客观事物的符号表示&#xff0c;是所有能输入到计算机中并被计算机程序处理的符号的总称。 数据是计算机程序加工的原料。 数据对象&#xff1a;是具有相同性质的数据元素的集合&…

SpringBoot小知识(2):日志

日志是开发项目中非常重要的一个环节&#xff0c;它是程序员在检查程序运行的手段之一。 1.日志的基础操作 1.1 日志的作用 编程期调试代码运营期记录信息&#xff1a; * 记录日常运营重要信息(峰值流量、平均响应时长……) * 记录应用报错信息(错误堆栈) * 记录运维过程数据(…

传输控制协议(TCP)

传输控制协议是Internet一个重要的传输层协议。TCP提供面向连接、可靠、有序、字节流传输服务。 1、TCP报文段结构 注&#xff1a;TCP默认采用累积确认机制。 2、三次握手、四次挥手 &#xff08;1&#xff09;当客户向服务器发送完最后一个数据段后&#xff0c;发送一个FIN段…

输出保留3位小数的浮点数

输出保留3位小数的浮点数 C语言代码C代码Java代码Python代码 &#x1f490;The Begin&#x1f490;点点关注&#xff0c;收藏不迷路&#x1f490; 读入一个单精度浮点数&#xff0c;保留3位小数输出这个浮点数。 输入 只有一行&#xff0c;一个单精度浮点数。 输出 也只有一…

安装 RabbitMQ 服务

安装 RabbitMQ 服务 一. RabbitMQ 需要依赖 Erlang/OTP 环境 (1) 先去 RabbitMQ 官网&#xff0c;查看 RabbitMQ 需要的 Erlang 支持&#xff1a;https://www.rabbitmq.com/ 进入官网&#xff0c;在 Docs -> Install and Upgrade -> Erlang Version Requirements (2) …

【竞技宝】CS2-上海major:MongoLZ成为亚洲之光

北京时间2024年12月1日,上海major在昨日正式拉开比赛序幕,首日第六轮迎来MongolZ对阵MIBR、COL对阵PUA。以下是本轮比赛的详细战报。 MongoLz 13-6 MIBR(比赛地图:远古遗迹) 上半场,MongoLz先做进攻方。手枪局,MongoLz抱团进攻遭遇MIBR重防被接连秒掉三人,然而在5V2的残局中,M…

【绘图】数据可视化(python)

对于数据绝对值差异较大&#xff08;数据离散&#xff09; 1. 对数坐标直方图&#xff08;Histogram with Log Scale&#xff09; import pandas as pd import matplotlib.pyplot as plt import numpy as np# 示例数据 data {count: [10, 20, 55, 90, 15, 5, 45, 80, 1000, …

MySQL - Why Do We Need a Thread Pool? - mysql8.0

MySQL - Why Do We Need a Thread Pool? - mysql8.0 本文主要由于上次写的感觉又长又臭&#xff0c; 感觉学习方法有问题&#xff0c; 我们这次直接找来了 thread pool 的原文&#xff0c;一起来看看官方的开发者给出的blog – 感觉是个大神 但是好像不是最官方的 &#xff0c…

【JS】栈内存、堆内存、事件机制区别

js中&#xff0c;内存主要分为两种类型&#xff1a;栈内存&#xff08;stack&#xff09;、堆内存&#xff08;heap&#xff09;&#xff0c;两种内存区域在存储和管理数据时有各自的特点和用途。 栈内存 访问顺序 栈是先进后出、后进先出的数据结构&#xff0c;栈内存是内存用…

glog在vs2022 hello world中使用

准备工作 设置dns为阿里云dns 223.5.5.5&#xff0c;下载cmake&#xff0c;vs2022&#xff0c;git git clone https://github.com/google/glog.git cd glog mkdir build cd build cmake .. 拷贝文件 新建hello world并设置 设置预处理器增加GLOG_USE_GLOG_EXPORT;GLOG_NO_AB…

20241127 给typecho文章编辑附件 添加视频 图片预览

Typecho在写文章时&#xff0c;如果一次性上传太多张图片可能分不清哪张&#xff0c;因为附件没有略缩图&#xff0c;无法实时阅览图片&#xff0c;给文章插入图片时很不方便。 编辑admin/file-upload.php 大约十八行的位置 一个while 循环里面,这是在进行html元素更新操作,在合…

重生之我在异世界学编程之C语言:二维数组篇

大家好&#xff0c;这里是小编的博客频道 小编的博客&#xff1a;就爱学编程 很高兴在CSDN这个大家庭与大家相识&#xff0c;希望能在这里与大家共同进步&#xff0c;共同收获更好的自己&#xff01;&#xff01;&#xff01; 本文目录 引言正文一 二维数组的创建1. 二维数组的…

Tree搜索二叉树、map和set_数据结构

数据结构专栏 如烟花般绚烂却又稍纵即逝的个人主页 本章讲述数据结构中搜索二叉树与HashMap的学习&#xff0c;感谢大家的支持&#xff01;欢迎大家踊跃评论&#xff0c;感谢大佬们的支持! 目录 搜索二叉树的概念二叉树搜索模拟实现搜索二叉树查找搜索二叉树插入搜索二叉树删除…

分离整数的各个数

分离整数的各个数 C语言代码C 语言代码Java语言代码Python语言代码 &#x1f490;The Begin&#x1f490;点点关注&#xff0c;收藏不迷路&#x1f490; 给定一个整数&#xff0c;要求从个位开始分离出它的每一位数字。 输入 输入一个整数&#xff0c;整数在1到100000000之间…

OpenAI Whisper 语音识别 模型部署及接口封装

环境配置: 一、安装依赖&#xff1a; pip install -U openai-whisper 或者&#xff0c;以下命令会从这个存储库拉取并安装最新的提交&#xff0c;以及其Python依赖项&#xff1a; pip install githttps://github.com/openai/whisper.git 二、安装ffmpeg&#xff1a; cd …