使用java远程提交flink任务到yarn集群

使用java远程提交flink任务到yarn集群

背景

由于业务需要,使用命令行的方式提交flink任务比较麻烦,要么将后端任务部署到大数据集群,要么弄一个提交机,感觉都不是很离线。经过一些调研,发现可以实现远程的任务发布。接下来就记录一下实现过程。这里用flink on yarn 的Application模式实现

环境准备

  • 大数据集群,只要有hadoop就行
  • 后端服务器,linux mac都行,windows不行

正式开始

1. 上传flink jar包到hdfs

去flink官网下载你需要的版本,我这里用的是flink-1.18.1,把flink lib目录下的jar包传到hdfs中。

在这里插入图片描述
其中flink-yarn-1.18.1.jar需要大家自己去maven仓库下载。

2. 编写一段flink代码

随便写一段flink代码就行,我们目的是测试

package com.azt;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class WordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.addSource(new SourceFunction<String>() {
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                String[] words = {"spark", "flink", "hadoop", "hdfs", "yarn"};
                Random random = new Random();
                while (true) {
                    ctx.collect(words[random.nextInt(words.length)]);
                    TimeUnit.SECONDS.sleep(1);
                }
            }
            @Override
            public void cancel() {

            }
        });

        source.print();
        env.execute();
    }
}

3. 打包第二步的代码,上传到hdfs

在这里插入图片描述

4. 拷贝配置文件

  • 拷贝flink conf下的所有文件到java项目的resource中
  • 拷贝hadoop配置文件到到java项目的resource中

具体看截图
在这里插入图片描述

5. 编写java远程提交任务的程序

这一步有个注意的地方就是,如果你跟我一样是windows电脑,那么本地用idea提交会报错;如果你是mac或者linux,那么可以直接在idea中提交任务。

package com.test;


import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterInformationRetriever;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;

import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;

/**
 * @date :2021/5/12 7:16 下午
 */
public class Main {
    public static void main(String[] args) throws Exception {
        ///home/root/flink/lib/lib
        System.setProperty("HADOOP_USER_NAME","root");
//        String configurationDirectory = "C:\\project\\test_flink_mode\\src\\main\\resources\\conf";
        String configurationDirectory = "/export/server/flink-1.18.1/conf";
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
        conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
        conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
        String flinkLibs = "hdfs://node1.itcast.cn/flink/lib";
        String userJarPath = "hdfs://node1.itcast.cn/flink/user-lib/original.jar";
        String flinkDistJar = "hdfs://node1.itcast.cn/flink/lib/flink-yarn-1.18.1.jar";

        YarnClient yarnClient = YarnClient.createYarnClient();
        YarnConfiguration yarnConfiguration = new YarnConfiguration();
        yarnClient.init(yarnConfiguration);
        yarnClient.start();

        YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever
                .create(yarnClient);
        //获取flink的配置
        Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
                configurationDirectory);
        flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
        flinkConfiguration.set(
                PipelineOptions.JARS,
                Collections.singletonList(
                        userJarPath));
        YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration,configurationDirectory);
        Path remoteLib = new Path(flinkLibs);
        flinkConfiguration.set(
                YarnConfigOptions.PROVIDED_LIB_DIRS,
                Collections.singletonList(remoteLib.toString()));

        flinkConfiguration.set(
                YarnConfigOptions.FLINK_DIST_JAR,
                flinkDistJar);
        //设置为application模式
        flinkConfiguration.set(
                DeploymentOptions.TARGET,
                YarnDeploymentTarget.APPLICATION.getName());
        //yarn application name
        flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "jobname");
        //设置配置,可以设置很多
        flinkConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024",MEGA_BYTES));
        flinkConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024",MEGA_BYTES));
        flinkConfiguration.set(TaskManagerOptions.NUM_TASK_SLOTS, 4);
        flinkConfiguration.setInteger("parallelism.default", 4);

        ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
                .createClusterSpecification();

//		设置用户jar的参数和主类
        ApplicationConfiguration appConfig = new ApplicationConfiguration(args,"com.azt.WordCount");


        YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
                flinkConfiguration,
                yarnConfiguration,
                yarnClient,
                clusterInformationRetriever,
                true);
        ClusterClientProvider<ApplicationId> clusterClientProvider = null;
        try {
            clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
                    clusterSpecification,
                    appConfig);
        } catch (ClusterDeploymentException e){
            e.printStackTrace();
        }

        ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
        System.out.println(clusterClient.getWebInterfaceURL());
        ApplicationId applicationId = clusterClient.getClusterId();

        System.out.println(applicationId);

        Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get();
        int counts = 30;
        while (jobStatusMessages.size() == 0 && counts > 0) {
            Thread.sleep(1000);
            counts--;
            jobStatusMessages = clusterClient.listJobs().get();
            if (jobStatusMessages.size() > 0) {
                break;
            }
        }
        if (jobStatusMessages.size() > 0) {
            List<String> jids = new ArrayList<>();
            for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
                jids.add(jobStatusMessage.getJobId().toHexString());
            }
            System.out.println(String.join(",",jids));
        }

    }
}


由于我这里是windows电脑,所以我打包放到服务器上去运行
执行命令 :

java -cp test_flink_mode-1.0-SNAPSHOT.jar com.test.Main

不出以外的话,会打印如下日志

log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
http://node2:33811
application_1715418089838_0017
6d4d6ed5277a62fc9a3a274c4f34a468

复制打印的url连接,就可以打开flink的webui了,在yarn的前端页面中也可以看到flink任务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/622428.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

为什么3d重制变换模型会变形?---模大狮模型网

3D建模和渲染过程中&#xff0c;设计师经常会遇到一个让人头疼的问题&#xff0c;那就是模型在进行重制变换后出现的意外变形。这种变形不仅影响了模型的外观和质量&#xff0c;也给设计工作带来了额外的麻烦。本文将深入探讨3D模型进行重制变换后出现变形的原因&#xff0c;帮…

Hystrix服务熔断

服务熔断 熔断机制是应对雪崩效应的一种微服务链路保护机制。当某个微服务不可用或者响应时间太长时&#xff0c; 会进行服务降级&#xff0c;进而熔断该节点微服务的调用&#xff0c;快速返回“错误”的响应信息。当检测到该节点微 服务调用响应正常后恢复调用链路。 在Spri…

【Java】HOT100+代码随想录 动态规划(上)背包问题

目录 理论基础 一、基础题目 LeetCode509&#xff1a;斐波那契数 LeetCode70&#xff1a;爬楼梯 LeetCode746&#xff1a;使用最小花费爬楼梯 LeetCode62&#xff1a;不同路径 LeetCode63&#xff1a;不同路径ii LeetCode343&#xff1a;整数拆分 LeetCode96&#xff1a;不…

海外动态IP:揭秘其背后的技术与应用

在数字化时代&#xff0c;网络技术的发展日新月异&#xff0c;其中海外动态IP作为网络通信技术的重要一环&#xff0c;逐渐走进公众视野。海外动态IP不仅为跨国企业提供了灵活的网络接入方案&#xff0c;还为个人用户带来了更多样化的网络体验。本文将深入探讨海外动态IP的技术…

【Docker学习】重启容器的docker restart

命令&#xff1a; docker container restart 描述&#xff1a; 重启一个或多个容器 用法&#xff1a; docker container restart [OPTIONS] CONTAINER [CONTAINER...] 别名&#xff1a; docker restart(docker的一些命令可以简写&#xff0c;docker restart就等同于docker cont…

树莓派|连接CSI接口摄像头+opencv

CSI&#xff08;Camera Serial Interface&#xff09;接口摄像头是一种常见的嵌入式系统或移动设备中使用的摄像头接口。它通常用于与处理器或图像传感器进行直接连接&#xff0c;实现高速的图像数据传输。 CSI接口摄像头具有以下特点&#xff1a; 高速传输&#xff1a;CSI接口…

免翻,剪映出品的AI作图和AI视频官网免费体验!

哈喽&#xff0c;各位小伙伴们好&#xff0c;我是给大家带来各类黑科技与前沿资讯的小武。 近日&#xff0c;据剪映 Dreamina 官方消息&#xff0c;Deramina正式更名为即梦&#xff0c;同时宣布其AI作图和AI视频生成功能已全量上线。 ▲ 官网主页面 AI作图 1、通过文字描述或…

华中科大:感谢大家,我的春招之旅结束了

今天在论坛上看到一个帖子&#xff0c;一位华中科大的同学&#xff0c;因为家中父亲突然病倒&#xff0c;发求助帖&#xff1a; 请问大家&#xff0c;春招走哪个方向能最快找到工作&#xff1f;还是说继续读研呢&#xff0c;但是家里急需钱…… 当时这个帖子直接热榜第一&…

Python练习04

目录 制作一个简易的注册登陆系统 实现过程 声明需要用到的库 构造一个判断用户文件是否存在的函数 构造一个存储用户文件的函数 制作UI 制作系统主体 运行效果 制作一个简易的注册登陆系统 通过所学知识制作一个简易的注册登陆系统&#xff0c;要求可以存储账户及密码&#…

省级生活垃圾无害化处理率面板数据(2004-2022年)

01、数据简介 生活垃圾无害化处理率是指经过处理的生活垃圾中&#xff0c;达到无害化标准的垃圾所占的比例。这一指标是衡量城市垃圾处理水平的重要标准&#xff0c;反映了城市对垃圾进行有效管理和处理的能力。 生活垃圾无害化处理的主要方式包括生活垃圾焚烧、生活垃圾卫生…

2024生日快乐祝福HTNL源码修复版

源码介绍 2024生日快乐祝福HTNL源码&#xff0c;源码由HTMLCSSJS组成&#xff0c;记事本打开源码文件可以进行内容文字之类的修改&#xff0c;双击html文件可以本地运行效果&#xff0c;也可以上传到服务器里面&#xff0c; 源码截图 源码下载 2024生日快乐祝福HTNL源码

数据库编程

PL/SQL程序 1.PL/SOL程序块 整个PL/SQL块分三部分&#xff1a;声明部分、执行部分、异常处理部分&#xff1b; 示例&#xff1a; declare --变量声明 v_sno varchar2(10) : ‘04001’; v_cno varchar2(10) :‘001’; v_grade number : 90; begin --程序入口 insert…

PyQt程序的打包

Qt hello - 专注于Qt的技术分享平台 记录下PyQt程序的打包。 一&#xff0c;安装 pip3 install PyInstaller 二&#xff0c;打包 pyinstaller -w -n app app.py 根据需要选择打包参数&#xff0c;例如&#xff1a;-F表示生成单文件模式&#xff0c;即只有一个可执行文件…

使用Eigen将经纬度、高程、偏北角转成变换矩阵

目录 1、前言 2、示例 3、代码解析 4、垂直于给定点的切平面变换 5、代码解析 1、前言 在地球表面进行刚体变换时候&#xff0c;要将具有经纬度、高程和偏北角的坐标信息转换为变换矩阵表达&#xff0c;首先需要了解坐标系之间的转换关系。 通常&#xff0c;我们会将经纬…

攻防演练-防守单位常见防守策略

为方便您的阅读&#xff0c;可点击下方蓝色字体&#xff0c;进行跳转↓↓↓ 01 防守单位常见防守策略 01 防守单位常见防守策略 为普及网络安全知识&#xff0c;提高网络安全防范意识&#xff0c;和网络安全工作技能。我们将向大家介绍网络安全攻防演练中防守单位的一些关键策…

自回归模型的优缺点及改进方向

在学术界和人工智能产业中&#xff0c;关于自回归模型的演进与应用一直是一个引发深入讨论和多方观点交锋的热门议题。尤其是Yann LeCun&#xff0c;这位享誉全球的AI领域学者、图灵奖的获得者&#xff0c;以及被誉为人工智能领域的三大巨擘之一&#xff0c;他对于自回归模型持…

2.三极管

2.习题 3.知识补充

ssm125四六级报名与成绩查询系统+jsp

四六级报名与成绩查询系统的设计与实现 摘 要 互联网发展至今&#xff0c;无论是其理论还是技术都已经成熟&#xff0c;而且它广泛参与在社会中的方方面面。它让信息都可以通过网络传播&#xff0c;搭配信息管理工具可以很好地为人们提供服务。针对四六级报名信息管理混乱&am…

最短路(图论学习总结部分内容)

文章目录 前言二、最短路多源最短路 F l o y d Floyd Floyd​ 算法例题及变形 e g 1 &#xff1a; S o r t i n g I t A l l O u t eg1&#xff1a;Sorting\ It\ All\ Out eg1&#xff1a;Sorting It All Out ( 蓝书例题&#xff0c;传递闭包 ) (蓝书例题&#xff0c;传递闭包…

pytest教程-44-钩子函数-pytest_report_collectionfinish

领取资料&#xff0c;咨询答疑&#xff0c;请➕wei: June__Go 上一小节我们学习了pytest_report_header钩子函数的使用方法&#xff0c;本小节我们讲解一下pytest_report_collectionfinish钩子函数的使用方法。 pytest_report_collectionfinish 钩子函数在 pytest 完成所有测…