【动态读取配置文件】ParameterTool读取带环境的配置信息

不同环境Flink配置信息是不同的,为了区分不同环境的配置文件,使用ParameterTool工具读取带有环境的配置文件信息

区分环境的配置文件

image-20231217202925250

三个配置文件:

flink.properties:决定那个配置文件生效

flink-dev.properties:测试环境配置文件

flink-prod.properties:生产环境配置文件

flink.properties配置文件中只配置一项flink.env.active=dev,读取该配置项然后组装出生效的配置文件名

工具类实现

public class ParameterUtil {

    /**
     * 默认配置文件 flink.properties
     */
    private static final String DEFAULT_CONFIG = ParameterConstants.FLINK_ROOT_FILE;
    /**
     * 带环境配置文件 flink-%s.properties
     */
    private static final String FLINK_ENV_FILE = ParameterConstants.FLINK_ENV_FILE;
    /**
     * 环境变量 flink.env.active
     */
    private static final String ENV_ACTIVE = ParameterConstants.FLINK_ENV_ACTIVE;

    /**
     * 配置文件+启动参数+系统环境变量 生成ParameterTool
     */
    public static ParameterTool getParameters(final String[] args) {

        /* **********************
         * Java读取资源的方式:
         *
         * a. Class.getResourceAsStream(Path): Path 必须以 “/”,表示从ClassPath的根路径读取资源
         * b. Class.getClassLoader().getResourceAsStream(Path):Path 无须以 “/”, 默认从ClassPath的根路径读取资源
         *
         * 推荐使用第2种,也就是类加载器的方式获取静态资源文件, 不要通过ClassPath的相对路径查找
         * *********************/

        InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);

        try {
            // 读取根配置文件
            ParameterTool defaultPropertiesFile =
                    ParameterTool.fromPropertiesFile(inputStream);

            // 获取环境参数
            String envActive = getEnvActiveValue(defaultPropertiesFile);

            // 读取真正的配置环境 (推荐使用 Thread.currentThread() 读取配置文件)

            return ParameterTool
                    // ParameterTool读取变量优先级 系统环境变量 > 启动参数变量 > 配置文件变量

                    // 从配置文件获取配置
                    .fromPropertiesFile(
                            //当前线程
                            Thread.currentThread()
                                    //返回该线程的上下文信息, 获取类加载器
                                    .getContextClassLoader()
                                    .getResourceAsStream(envActive))
                    // 从启动参数中获取配置
                    .mergeWith(ParameterTool.fromArgs(args))
                    // 从系统环境变量获取配置
                    .mergeWith(ParameterTool.fromSystemProperties());
        } catch (IOException e) {
            throw new RuntimeException("");
        }

    }


    /**
     * 配置文件+系统环境变量 生成ParameterTool
     */
    public static ParameterTool getParameters() {

        InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);

        /* **********************
         *
         * 注意:
         *
         * ParameterTool 读取配置文件需要抛出 IOException,
         * IOException 的捕捉就在这里 catch
         *
         * 以前代码是直接抛出,没有进行catch,要注意对以前代码的修改
         *
         * *********************/

        try {
            ParameterTool defaultPropertiesFile =
                    ParameterTool.fromPropertiesFile(inputStream);

            //获取环境参数
            String envActive = getEnvActiveValue(defaultPropertiesFile);

            //读取真正的配置环境 (推荐使用 Thread.currentThread() 读取配置文件)
            return ParameterTool
                    // ParameterTool读取变量优先级 系统环境变量>启动参数变量>配置文件变量

                    // 从配置文件获取配置
                    .fromPropertiesFile(
                            //当前线程
                            Thread.currentThread()
                                    //返回该线程的上下文信息, 获取类加载器
                                    .getContextClassLoader()
                                    .getResourceAsStream(envActive))
                    // 从系统环境变量获取配置
                    .mergeWith(ParameterTool.fromSystemProperties());
        } catch (Exception e) {
            throw new FlinkPropertiesException(FlinkPropertiesExceptionInfo.PROPERTIES_NULL);
        }
    }

    /**
     * 获取环境配置变量
     */
    private static String getEnvActiveValue(ParameterTool defaultPropertiesFile) {
        // 选择参数环境
        String envActive = null;
        if (defaultPropertiesFile.has(ENV_ACTIVE)) {
            envActive = String.format(FLINK_ENV_FILE, defaultPropertiesFile.get(ENV_ACTIVE));
        }

        return envActive;
    }

    /**
     * 从配置文件参数配置流式计算的上下文环境
     */
    public static void envWithConfig(
            StreamExecutionEnvironment env,
            ParameterTool parameterTool
    ) {

        /* **********************
         *
         * checkpoint 设置
         *
         * 1.
         * 若checkpoint 时间不要设置太短,
         * 这里的时间包括了超时时间
         *
         * 2.
         * 设置了周期性checkpoint,
         * 若上一个周期的checkpoint没完成,
         * 下一个周期的checkpoint不会开始的.
         *
         * 3.
         * 若checkpoint的持续时间超过了超时时间,
         * 会出现排队,
         * 过多的checkpoint排队会耗费资源
         *
         * 4.
         * 为了解决checkpoint排队堆积,
         * 需要优化checkpoint的完成效率
         *
         * *********************/
        // 每60秒触发checkpoint
        env.enableCheckpointing(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_INTERVAL));
        CheckpointConfig ck = env.getCheckpointConfig();
        // checkpoint 必须在60秒内结束,否则被丢弃
        ck.setCheckpointTimeout(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_TIMEOUT));
        // checkpoint间最小间隔 30秒 (指定了这个值, setMaxConcurrentCheckpoints自动默认为1)
        ck.setMinPauseBetweenCheckpoints(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_MINPAUSE));
        // checkpoint 语义设置为 精确一致( EXACTLY_ONCE )
        ck.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 最多允许 checkpoint 失败 3 次
        ck.setTolerableCheckpointFailureNumber(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_FAILURENUMBER));
        // 同一时间只允许一个 checkpoint 进行
        ck.setMaxConcurrentCheckpoints(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_MAXCONCURRENT));

        // 设置 State 存储
        env.setStateBackend(new HashMapStateBackend());
        // 并行度设置
        env.setParallelism(parameterTool.getInt(ParameterConstants.FLINK_PARALLELISM));

    }

}

读取环境信息

该方法会读取 flink.properties 配置的生效的配置文件,组装成要读取的配置文件

		// flink.properties	
		InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);
        ParameterTool defaultPropertiesFile =
            ParameterTool.fromPropertiesFile(inputStream);  

	private static String getEnvActiveValue(ParameterTool defaultPropertiesFile) {
        // 选择参数环境
        String envActive = null;
        
        // 配置文件中是否有该属性 flink.env.active
        if (defaultPropertiesFile.has(ENV_ACTIVE)) {
            // 有的话,直接拼装 flink-%s.properties -> flink-dev.properties
            envActive = String.format(FLINK_ENV_FILE, defaultPropertiesFile.get(ENV_ACTIVE));
        }

        return envActive;
    }

ParameterTool 获取参数的3种方式

  1. fromPropertiesFile 配置文件

  2. fromArgs 程序启动参数

    - 或者 -- 开头 空格分隔, 如:-name likelong --age 21

  3. fromSystemProperties 系统环境变量, 包括程序 -D启动的变量

    内部调用的是 Java提供的 System.getProperties()

ParameterTool 获取参数优先级, 可通过 mergeWith() 设置优先级, 但 mergeWith() 会覆盖前面的同名变量

因此,上述ParameterTool读取变量优先级 系统环境变量 > 启动参数变量 > 配置文件变量

ParameterTool 注册 global 变量

ParameterTool 注册为 global 变量:env.getConfig().setGlobalJobParameter()

这样, 在上下文中就能获取 ParameterTool

(ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters()

【该方法可以在富函数生命周期方法中调用】

如下:

    private static void initEnv(String[] args) {

        // ParameterTool 注册为 global
        parameterTool = ParameterUtil.getParameters();
        env.getConfig().setGlobalJobParameters(parameterTool);
        // 配置上下文环境
        ParameterUtil.envWithConfig(env, parameterTool);
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/254019.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

国产or进口?台阶仪为何要选择国产

在微观轮廓测量领域,选择一款合适的台阶仪对于获得精准的测量结果至关重要。随着科技的不断发展,台阶仪市场上涌现了许多国产和进口产品,消费者在选择时可能会面临一些疑虑。 什么是台阶仪 台阶仪是一种超精密接触式微观轮廓测量仪&#xf…

小程序地图检索

<template><view style"background-color: #f5f5f5"><!-- 头部开始 --><viewstyle"position: fixed;left: -5rpx;right: -5rpx;z-index: 99;top: -5rpx;width: 101vw;"><view style"position: relative"><view…

常用模块之(time/datetime)

【 一 】时间模块&#xff08;time/datetime&#xff09; 【 二 】 表示时间的三种方式 *时间戳&#xff08;Timestamp&#xff09;是指1970年1月1日00:00:00开始计算的偏移量。可以使用time模块中的time()函数获取当前时间的时间戳&#xff0c;也可以使用datetime模块中的tim…

低代码开发平台的优势及应用场景分析

文章目录 低代码是什么&#xff1f;低代码起源低代码分类低代码的能力低代码的需求市场需要专业开发者需要数字化转型需要 低代码的趋势如何快速入门低代码开发低代码应用领域 低代码是什么&#xff1f; 低代码&#xff08;Low-code&#xff09;是著名研究机构Forrester于2014…

lseek()函数的原型及使用方法,超详细

对于所有打开的文件都有一个当前文件偏移量(current file offset)&#xff0c;文件偏移量通常是一个非负整数&#xff0c;用于表明文件开始处到文件当前位置的字节数。 读写操作通常开始于当前文件偏移量的位置&#xff0c;并且使其增大&#xff0c;增量为读写的字节数。文件被…

机器视觉技术与应用实战(Chapter Two-04)

2.6 图像形态学及常见的图像处理工具 图像形态学&#xff1a;是分析几何形状和结构的数字方法&#xff0c;是建立在集合代数的基础上用集合论方法定量描述几何结构的学科。基本的图像形态学算法有&#xff1a;腐蚀&#xff08;Erode&#xff09;、膨胀&#xff08;Dilate&…

geemap学习笔记028:Landsat8计算时间序列NDVI并导出

前言 本节则是以Landsat8影像数据为例&#xff0c;进行NDVI时间序列计算&#xff0c;并将得到的时间序列NDVI进行展示并导出。 1 导入库并显示地图 import ee import geemap import datetime import pandas as pd import os ee.Initialize()2 定义时间范围 # 定义日期范围 …

【ECMAScript笔记四】自定义对象(创建,遍历)、内置对象(Math、Data、Array、String)、数据类型比较

文章目录 10 自定义对象10.1 创建对象方式10.1.1 字面量10.1.2 new object10.1.3 构造函数 10.2 遍历对象 11 内置对象11.1 Math 数学对象11.2 Date 时间对象11.3 Array 数组对象11.4 String 字符串对象 12 简单数据类型和复杂数据类型 10 自定义对象 JavaScript中的对象分为3…

频谱论文:基于张量Tucker分解的频谱地图构建算法

#频谱# [1]陈智博,胡景明,张邦宁 郭道省.(2023).基于张量Tucker分解的频谱地图构建算法.电子与信息学报(11),4161-4169. &#xff08;陆军工程大学&#xff09; 研究内容 将动态电磁环境的时变频谱地图建模为3维频谱张量&#xff0c;通过张量Tucker分解提取出具有物理意义的核…

使用级联毫米波传感器的成像雷达参考设计2(TI文档)

3.4 级联雷达信号处理链 3.4.1 MIMO雷达信号处理链 以MIMO模式收集的原始信号按照图16所示的流程进行处理。 图16 MIMO信号处理链 3.4.1.1 ADC数据读取和校准 每次数据采集后&#xff0c;将保存4个级联设备对应的二进制数据文件和相应的chirp配置文件。这两个文件是ADC数据读…

Zabbix+Grafana

背景 对指标采集 将采集的信息存储 可视化 报警 因为节点上本身就是zabbix&#xff0c;但对应的server在数据中心&#xff0c;不知道一个agent可否服务于多个server端&#xff0c;而且不确定数据中心是否会提供用户。所以还是放弃zabbix 架构

干货下载丨不分业态、不关注核心需求,怎么做得好项目管理?!

项目管理&#xff1a;装备制造业的破局利刃 对于装备制造行业而言&#xff0c;每一笔订单都是非标定制、小批量制造。这种特性决定了其行业企业普遍存在新品开发周期长、生产效率低、质量不稳定、交货期不稳定、成本预算难控制、非标品报价慢等问题。 如何提升企业的管理水平…

回溯算法去重的两种写法

回溯算法去重的两种写法 关于回溯&#xff0c;无论是排列、组合、子集&#xff0c;都会涉及到两个问题&#xff0c;一个是去重&#xff0c;另一个则是剪枝&#xff1b; 去重通常有几种方法。 以这道题来做验证。 90.子集II 力扣题目链接(opens new window) 给定一个可能包…

【数据结构】树状数组总结

知识概览 树状数组有两个作用&#xff1a; 快速求前缀和 时间复杂度O(log(n))修改某一个数 时间复杂度O(log(n)) 例题展示 1. 单点修改&#xff0c;区间查询 题目链接 活动 - AcWing本活动组织刷《算法竞赛进阶指南》&#xff0c;系统学习各种编程算法。主要面向…

JavaSE第7篇:封装

文章目录 前言一、封装1、好处:2、使用 二、四种权限修饰符三、构造器1、作用2、说明3、属性赋值的过程 四 、JavaBean的使用五、UML类图六 、Java关键字1、this说明2 、this可以用来修饰属性、方法3、 this调用构造器 前言 不管学什么都可以按3w: what? why? how?&#xf…

AttributeError: module ‘jax‘ has no attribute ‘Array‘解决方案

大家好&#xff0c;我是爱编程的喵喵。双985硕士毕业&#xff0c;现担任全栈工程师一职&#xff0c;热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。…

NET模式和桥接模式简要概述

NET模式 NAT是Network Address Translation的缩写&#xff0c;即网络地址转换。NAT模式也是VMware创建虚拟机的默认网络连接模式。在NAT模式中&#xff0c;让虚拟机借助NAT功能&#xff0c;通过宿主机器所在的网络来访问公网。这里的宿主机相当于有两个网卡&#xff0c;一个是…

【FPGA】电梯楼层显示(简易)

前言 这是作者室友的项目&#xff0c;本来不管作者事儿的&#xff0c;但是后来听到说是室友去网上找人花了80块买了个劣质的&#xff0c;不仅是从CSDN上抄的&#xff0c;而且使用的板子还不符合室友的要求。可叹作者心软啊&#xff0c;顺便给室友做了。 在代码实现部分会给出设…

【学习笔记】V8垃圾回收策略

V8 V8是一款主流的JavaScript执行引擎V8采用即时编译,速度比较快V8内存设限,64位操作系统中上限为1.5G,32位系统中不超过800M V8垃圾回收策略 采用分代回收的思想内存分为新生代\老生代针对不同对象采用不同算法 v8常用的GC算法: 分代回收、空间复制、标记清除、标记整理、…

RDD编程

目录 一、RDD编程基础 &#xff08;一&#xff09;RDD创建 &#xff08;二&#xff09;RDD操作 1、转换操作 2、行动操作 3、惰性机制 &#xff08;三&#xff09;持久化 &#xff08;四&#xff09;分区 &#xff08;五&#xff09;一个综合实例 二、键值对RDD &am…