【极数系列】Flink配置参数如何获取?(06)

文章目录

  • gitee码云地址
  • 简介概述
  • 01 配置值来自.properties文件
    • 1.通过路径读取
    • 2.通过文件流读取
    • 3.通过IO流读取
  • 02 配置值来自命令行
  • 03 配置来自系统属性
  • 04 注册以及使用全局变量
  • 05 Flink获取参数值Demo
    • 1.项目结构
    • 2.pom.xml文件如下
    • 3.配置文件
    • 4.项目主类
    • 5.运行查看相关日志

gitee码云地址

直接下载解压可用 https://gitee.com/shawsongyue/aurora.git
模块:aurora_flink
主类:GetParamsStreamingJob

简介概述

​ 1.几乎所有的批和流的 Flink 应用程序,都依赖于外部配置参数。这些配置参数可以用于指定输入和输出源(如路径或地址)、系统参数(并行度,运行时配置)和特定的应用程序参数(通常使用在用户自定义函数)。

​ 2.为解决以上问题,Flink 提供一个名为 Parametertool 的简单公共类,其中包含了一些基本的工具。请注意,这里说的 Parametertool 并不是必须使用的。Commons CLI 和 argparse4j 等其他框架也可以非常好地兼容 Flink。

​ 3.**ParameterTool**定义了一组静态方法,用于读取配置信息。该工具类内部使用了 Map` 类型,这样使得它可以很容易地与你的配置集成在一起。

01 配置值来自.properties文件

1.通过路径读取

//定义文件路径
String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";

//方式一:直接使用内置工具类
ParameterTool parameter_01 = ParameterTool.fromPropertiesFile(propertiesFilePath);
String jobName_01 = parameter_01.get("jobName");
logger.info("方式一:读取配置文件中指定的key值={}",jobName_01);

2.通过文件流读取

//定义文件路径
String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";

//方式二:使用文件
File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter_02 = ParameterTool.fromPropertiesFile(propertiesFile);
String jobName_02 = parameter_02.get("jobName");
logger.info("方式二:读取配置文件中指定的key值={}",jobName_02);

3.通过IO流读取

//定义文件路径
String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";

//方式三:使用IO流
InputStream propertiesFileInputStream = new FileInputStream(new File(propertiesFilePath));
ParameterTool parameter_03 = ParameterTool.fromPropertiesFile(propertiesFileInputStream);
String jobName_03 = parameter_03.get("jobName");
logger.info("方式三:读取配置文件中指定的key值={}",jobName_03);

02 配置值来自命令行

tips:在idea的命令行传参,格式:–jobName program_job_aurora

在这里插入图片描述

ParameterTool parameter_04 = ParameterTool.fromArgs(args);
String jobName_04 = parameter_04.get("jobName");
logger.info("方式四:命令行传参key值={}",jobName_04);

03 配置来自系统属性

tips:在idea的的jvm系统参数设置,格式:-Dinput=hdfs:///mydata

在这里插入图片描述

//方式五:获取jvm参数值
ParameterTool parameter_05 = ParameterTool.fromSystemProperties();
String jobName_05 = parameter_05.get("input");
logger.info("方式五:获取jvm参数key值={}",jobName_05);

04 注册以及使用全局变量

注意:Flink全局变量仅支持在富函数中使用,即Rich开头的类使用

//定义文件路径
String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";

//直接使用内置工具类获取参数
ParameterTool parameter_01 = ParameterTool.fromPropertiesFile(propertiesFilePath);

//方式六:注册全局参数
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(parameter_01);
        //在任意富函数中均可以获取,注意!注意!注意!只有富文本函数才可以使用
        //1.创建富函数
        RichFlatMapFunction<String, String> richFlatMap = new RichFlatMapFunction<>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                //获取运行环境
                ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
                //获取对应的值
                String jobName = parameters.getRequired("jobName");
                logger.info("方式六:获取全局注册参数key值={}",jobName_05);
            }
        };
        //2.创建数据集
        ArrayList<String> list = new ArrayList<>();
        list.add("001");
        list.add("002");
        list.add("003");
        //3.把有限数据集转换为数据源
        DataStreamSource<String> dataStreamSource = env.fromCollection(list).setParallelism(1);
        //4.执行富文本处理
        dataStreamSource.flatMap(richFlatMap);
        //5.启动程序
        env.execute();

05 Flink获取参数值Demo

1.项目结构

在这里插入图片描述

2.pom.xml文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xsy</groupId>
    <artifactId>aurora_flink</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!--属性设置-->
    <properties>
        <!--java_JDK版本-->
        <java.version>11</java.version>
        <!--maven打包插件-->
        <maven.plugin.version>3.8.1</maven.plugin.version>
        <!--编译编码UTF-8-->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!--输出报告编码UTF-8-->
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <!--json数据格式处理工具-->
        <fastjson.version>1.2.75</fastjson.version>
        <!--log4j版本-->
        <log4j.version>2.17.1</log4j.version>
        <!--flink版本-->
        <flink.version>1.18.0</flink.version>
        <!--scala版本-->
        <scala.binary.version>2.11</scala.binary.version>
        <!--log4j依赖-->
        <log4j.version>2.17.1</log4j.version>
    </properties>

    <!--通用依赖-->
    <dependencies>

        <!-- json -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <!--================================集成外部依赖==========================================-->
        <!--集成日志框架 start-->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <!--集成日志框架 end-->
    </dependencies>

    <!--编译打包-->
    <build>
        <finalName>${project.name}</finalName>
        <!--资源文件打包-->
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.xml</include>
                </includes>
            </resource>
        </resources>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>org.google.code.flindbugs:jar305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <excluder>org.apache.logging.log4j:*</excluder>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.xsy.sevenhee.flink.TestStreamJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <!--插件统一管理-->
        <pluginManagement>
            <plugins>
                <!--maven打包插件-->
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                    <configuration>
                        <fork>true</fork>
                        <finalName>${project.build.finalName}</finalName>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>repackage</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>

                <!--编译打包插件-->
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>${maven.plugin.version}</version>
                    <configuration>
                        <source>${java.version}</source>
                        <target>${java.version}</target>
                        <encoding>UTF-8</encoding>
                        <compilerArgs>
                            <arg>-parameters</arg>
                        </compilerArgs>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <!--配置Maven项目中需要使用的远程仓库-->
    <repositories>
        <repository>
            <id>aliyun-repos</id>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <!--用来配置maven插件的远程仓库-->
    <pluginRepositories>
        <pluginRepository>
            <id>aliyun-plugin</id>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>

</project>

3.配置文件

(1)application.properties

jobName=job_aurora
jobMemory=1024
taskName=task_aurora

(2)log4j2.properties

rootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmprootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmp

4.项目主类

package com.aurora;


import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;

/**
 * @description flink获取外部参数作业
 *
 * @author 浅夏的猫
 * @datetime 15:54 2024/1/28
*/
public class GetParamsStreamingJob {

    private static final Logger logger = LoggerFactory.getLogger(GetParamsStreamingJob.class);

    public static void main(String[] args) throws Exception {

        //定义文件路径
        String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";

        //方式一:直接使用内置工具类
        ParameterTool parameter_01 = ParameterTool.fromPropertiesFile(propertiesFilePath);
        String jobName_01 = parameter_01.get("jobName");
        logger.info("方式一:读取配置文件中指定的key值={}",jobName_01);

        //方式二:使用文件
        File propertiesFile = new File(propertiesFilePath);
        ParameterTool parameter_02 = ParameterTool.fromPropertiesFile(propertiesFile);
        String jobName_02 = parameter_02.get("jobName");
        logger.info("方式二:读取配置文件中指定的key值={}",jobName_02);

        //方式三:使用IO流
        InputStream propertiesFileInputStream = new FileInputStream(new File(propertiesFilePath));
        ParameterTool parameter_03 = ParameterTool.fromPropertiesFile(propertiesFileInputStream);
        String jobName_03 = parameter_03.get("jobName");
        logger.info("方式三:读取配置文件中指定的key值={}",jobName_03);

        //方式四:命令行传参格式:--jobName program_job_aurora
        ParameterTool parameter_04 = ParameterTool.fromArgs(args);
        String jobName_04 = parameter_04.get("jobName");
        logger.info("方式四:命令行传参key值={}",jobName_04);

        //方式五:获取jvm参数值
        ParameterTool parameter_05 = ParameterTool.fromSystemProperties();
        String jobName_05 = parameter_05.get("input");
        logger.info("方式五:获取jvm参数key值={}",jobName_05);

        //方式六:注册全局参数
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(parameter_01);
        //在任意富函数中均可以获取,注意!注意!注意!只有富文本函数才可以使用
        //1.创建富函数
        RichFlatMapFunction<String, String> richFlatMap = new RichFlatMapFunction<>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                //获取运行环境
                ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
                //获取对应的值
                String jobName = parameters.getRequired("jobName");
                logger.info("方式六:获取全局注册参数key值={}",jobName_05);
            }
        };
        //2.创建数据集
        ArrayList<String> list = new ArrayList<>();
        list.add("001");
        list.add("002");
        list.add("003");
        //3.把有限数据集转换为数据源
        DataStreamSource<String> dataStreamSource = env.fromCollection(list).setParallelism(1);
        //4.执行富文本处理
        dataStreamSource.flatMap(richFlatMap);
        //5.启动程序
        env.execute();
    }

}

5.运行查看相关日志

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/355425.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Spark系列2】Spark编程模型RDD

RDD概述 RDD最初的概述来源于一片论文-伯克利实验室的Resilient Distributed Datasets&#xff1a;A Fault-Tolerant Abstraction for In-Memory Cluster Computing。这篇论文奠定了RDD基本功能的思想 RDD实际为Resilient Distribution Datasets的简称&#xff0c;意为弹性分…

04 Redis之命令(Hash型Value命令+List型Value命令+Set型Value命令+有序集合ZSET型Value命令)

3.4 Hash型Value命令 Hash 表就是一个映射表 Map&#xff0c;也是由键-值对构成&#xff0c;为了与整体的 key 进行区分&#xff0c;这里的键称为 field&#xff0c;值称为 value。注意&#xff0c;Redis 的 Hash 表中的 field-value 对均为 String 类型。 3.4.1 hset  格…

Python笔记14-实战小游戏飞机大战(上)

文章目录 功能规划安装pygame绘制游戏窗口添加玩家飞机图像屏幕上绘制飞船代码重构驾驶飞船全屏模式射击 本示例源码地址 点击下载 功能规划 玩家控制一艘最初出现在屏幕底部中央的飞船。玩家可以使用箭头键左右移动飞船&#xff0c;还可使用空格键射击。游戏开始时&#xff…

【华为 ICT HCIA eNSP 习题汇总】——题目集11

1、某公司的内网用户采用 NAT 技术的 NO-pat 方式访问互联网&#xff0c;若所有的公网地址均被使用&#xff0c;则后续上网的内网用户会&#xff08;&#xff09;。 A、挤掉前一个用户&#xff0c;强制进行 NAT 转换上网 B、将报文同步到其他 NAT 转换设备上进行 NAT 转换 C、自…

259:vue+openlayers: 显示海量多边形数据,10ms加载完成

第259个 点击查看专栏目录 本示例的目的是介绍演示如何在vue+openlayers项目中通过WebGLVectorLayerRenderer方式加载海量多边形数据。这里相当于将海量的数据放在同一个层的source中,然后通过webglTile的方式渲染出这一层。 本示例数据为5000个多边形,加载速度超级快。 直接…

初识人工智能,一文读懂贝叶斯优化进阶的知识文集(9)

&#x1f3c6;作者简介&#xff0c;普修罗双战士&#xff0c;一直追求不断学习和成长&#xff0c;在技术的道路上持续探索和实践。 &#x1f3c6;多年互联网行业从业经验&#xff0c;历任核心研发工程师&#xff0c;项目技术负责人。 &#x1f389;欢迎 &#x1f44d;点赞✍评论…

解决Linux部署报错No main manifest attribute, in XXX.jar

这是我近期遇到的一个问题&#xff0c;报错原因就是没找到主类&#xff0c;首先你在你本地运行&#xff0c;本地运行ok的话&#xff0c;解压生成的jar包&#xff0c;里面有个META-INF文件&#xff0c;打开MANIFEST.MF文件&#xff0c;该文件是一个清单文件。该文件包含有关JAR文…

【Spring实战】31 Spring Boot3 集成 Gateway 微服务网关

文章目录 1. 定义2. 功能3. 示例代码1) 创建一个业务服务2&#xff09;创建一个网关服务3&#xff09;启动服务4&#xff09;验证 4. 代码参考结语 1. 定义 Spring Cloud Gateway 是一个基于 Spring Framework 的开源网关服务&#xff0c;用于构建微服务架构中的 API 网关。它…

MySQL:三大日志(binlog、redolog、undolog)

再了解三个日志前我们先了解一下MySQL的两层架构&#xff1a; Server 层负责建立连接、分析和执行 SQL。MySQL 大多数的核心功能模块都在这实现&#xff0c;主要包括连接器&#xff0c;查询缓存、解析器、预处理器、优化器、执行器等。另外&#xff0c;所有的内置函数和所有跨…

HCIP复习课(三层架构)

1、ip配置 R1&#xff1a; R2&#xff1a; SW1&#xff1a; SW2: 2、vlanif配置&#xff1a; SW1&#xff1a; SW2&#xff1a; 3、ospf配置&#xff1a; R1&#xff1a; SW1&#xff1a; SW2&#xff1a; 4、vlan配置 SW1&#xff1a; SW2&#xff1a; SW3&#xff1a; SW…

解决:IDEA无法下载源码,Cannot download sources, sources not found for: xxxx

原因 Maven版本太高&#xff0c;遇到http协议的镜像网站会阻塞&#xff0c;要改为使用https协议的镜像网站 解决方案 1.打开设置 2. 拿到settings.xml路径 3. 将步骤2里箭头2的User settings file&#xff1a;settings.xml打开&#xff0c;作以下修改 保存即可。如果还不行…

Python爬虫请求库安装

请求库的安装 爬虫可以简单分为几步&#xff1a;抓取页面、分析页面和存储数据。 在抓取页面的过程中&#xff0c;我们需要模拟浏览器向服务器发出请求&#xff0c;所以需要用到一些 Python 库来实现 HTTP 请求操作。在本教程中&#xff0c;我们用到的第三方库有 requests、S…

[嵌入式软件][启蒙篇][仿真平台] STM32F103实现SPI控制OLED屏幕

上一篇&#xff1a; [嵌入式软件][启蒙篇][仿真平台] STM32F103实现LED、按键 [嵌入式软件][启蒙篇][仿真平台] STM32F103实现串口输出输入、ADC采集 [嵌入式软件][启蒙篇][仿真平台]STM32F103实现定时器 [嵌入式软件][启蒙篇][仿真平台] STM32F103实现IIC控制OLED屏幕 文章目…

【微服务】springboot集成ELK使用详解

目录 一、前言 二、为什么需要ELK 三、ELK介绍 3.1 什么是elk 3.2 elk工作原理 四、ELK环境搭建 4.1 搭建es环境 4.1.1 获取es镜像 4.1.2 启动es容器 2.1.3 配置es参数 2.1.4 重启es容器并访问 4.2 搭建kibana 4.2.1 拉取kibana镜像 4.2.2 启动kibana容器 4.2.3 …

第八篇【传奇开心果短博文系列】Python的OpenCV技术点案例示例:深度学习

传奇开心果短博文系列 系列短博文目录Python的OpenCV技术点案例示例系列 短博文目录一、前言二、OpenCV深度学习介绍三、OpenCV常用深度学习算法和实现分别示例代码四、归纳总结 系列短博文目录 Python的OpenCV技术点案例示例系列 短博文目录 一、前言 OpenCV深度学习&…

【Android】MediaCodec学习

在开源Android屏幕投屏代码scrcpy中&#xff0c;使用了MediaCodec去获取和display关联的surface的内容&#xff0c;再通过写fd的方式&#xff08;socket等&#xff09;传给PC端&#xff0c; MediaCodec的处理看起来比较清楚&#xff0c;数据in和数据out 这里我们做另外一个尝试…

SharedPreferences卡顿分析

SP的使用及存在的问题 SharedPreferences(以下简称SP)是Android本地存储的一种方式&#xff0c;是以key-value的形式存储在/data/data/项目包名/shared_prefs/sp_name.xml里&#xff0c;SP的使用示例及源码解析参见&#xff1a;Android本地存储之SharedPreferences源码解析。以…

ASP .NET Core Api 使用过滤器

过滤器说明 过滤器与中间件很相似&#xff0c;过滤器&#xff08;Filters&#xff09;可在管道&#xff08;pipeline&#xff09;特定阶段&#xff08;particular stage&#xff09;前后执行操作。可以将过滤器视为拦截器&#xff08;interceptors&#xff09;。 过滤器级别范围…

第二证券:外围突传大利好!看涨期权交易骤增,中国资产大反攻继续?

外资正在做多我国财物。 据海外买卖网站marketchameleon的最新数据显现&#xff0c;海外挂钩追寻我国股票的iShares我国大型股ETF(FXI)的看涨期权买卖量在近一周内出现骤增&#xff0c;到达一年多来的最高点。别的&#xff0c;专心于科技范畴的KraneShares CSI我国互联网ETF&a…

开源之力与GPT的碰撞:探索未来技术的无限可能

摘要&#xff1a; 在本文中&#xff0c;我们将探讨开源软件与GPT&#xff08;大型预训练语言模型&#xff09;的完美结合如何推动技术的飞速发展。我们将简要介绍开源文化的价值观及其对技术创新的影响&#xff0c;分析GPT系列模型在开源社区中的发展与应用&#xff0c;并通过代…