[Flink04] Flink部署实践

    Flink部署支持三种模式:本地部署、Standalone部署、Flink on Yarn部署。

    独立(Standalone)模式由Flink自身提供资源,无需其他框架,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但Flink 是大数据计算框架,不是资源调度框架,这并不是它的强项;所以还是应该让专业的框架做专业的事,和其他资源调度框架集成更靠谱。

    Flink on Yarn模式,把资源管理交给Yarn实现,计算机资源统一由Haoop Yarn管理,生产环境测试。

     Yarn(yet another resource negotiator)是一个通用分布式资源管理系统和调度平台,为上层应用提供统一的资源管理和调度。在集群利用率、资源统一管理和数据共享等方面带来巨大好处.

1 基础环境

1.1 服务器环境

操作系统环境为CentOS 8。

1)配置规划集群节点间免密访问

参考相关章节或附录的指南配置,可以有效提供部署和管理效率。

2)配置JAVA环境

参考相关章节或附录的指南配置。

3)配置HDFS存储集群

如果需要与HDFS存储集群集成,则需要提前完成配置。

参考相关章节或附录的指南配置,并且Flink规划集群或设备可网络访问。

4)配置zookeeper集群

如果需要部署Standalone模式,则需要提前完成配置。参考相关章节或附录的指南配置。

5)配置 Yarn集群

如果需要部署Flink on Yarn模式,则需要提前完成配置。参考相关章节或附录的指南配置。

1.2 Flink软件基础配置

在本实践案例中,采用的Flink软件包版本 1.14.5,Hadoop的版本为3.2,Spark软件的根目录(SPARK_HOME)为/opt/flink/flink。

源码下载可以通过官方源和国内源两种方式下载,官方源再国外,下载速度慢,国内源采用清华大学的源,速度相对较快,但只保留最新版本。

Apache官方:https://archive.apache.org/dist/flink/

清华大学:https://mirrors.tuna.tsinghua.edu.cn/apache/flink/

下载软件包:

# wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.12.tgz
# tar -zxvf flink-1.14.5-bin-scala_2.12.tgz

解压软件包:

# tar -zxvf flink-1.14.5-bin-scala_2.12.tgz
# ln -s flink-1.14.5 flink
# ls flink/
bin  conf  examples  lib  LICENSE  licenses  log  NOTICE  opt  plugins  README.txt

本地模式

最简单的启动方式,其实是不搭建集群,直接本地模式启动。

2.1 配置部署

在本地模式下,不需要启动任何的进程,仅仅是使用本地线程来模拟 Flink 的进程,适用于测试开发调试等,不用更改任何配置信息,只需要保证 JDK8 安装正常即可。

1)启动命令

# /usr/local/flink/bin/start-cluster.sh

2)关闭命令

# /usr/local/flink/bin/stop-cluster.sh

2.2 测试验证

1)Flink启动

# /usr/local/flink/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host lake01.
Starting taskexecutor daemon on host lake01

2)访问验证

通过<local主机地址>:8081打开

3)jps查看

# jps
3968 Jps
1941 NameNode
3685 TaskManagerRunner
2790 NodeManager
3418 StandaloneSessionClusterEntrypoint
2159 DataNode

4)执行官方用例WordCount

通过执行官方示例,可以看到flink任务运行成功

# /opt/flink/flink/bin/flink run /opt/flink/flink/examples/batch/WordCount.jar
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID d6987ed5b263fc5e297d5be1e28b465a
Program execution finished
Job with JobID d6987ed5b263fc5e297d5be1e28b465a has finished.
Job Runtime: 373 ms
Accumulator Results:
- 843a1470cb2c3e3169dfb25bcda7369d (java.util.ArrayList) [170 elements]
(a,5)
(action,1)
(after,1)
(against,1)
……

观察Flink WebUI,如下图

2.3 问题-提示无法连接Yarn服务

一、问题描述

从flink on yarn模式切换为本地模式,执行start-cluster.sh提示如下错误:

2022-10-13 20:38:05,757 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink/flink-1.14.5/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2022-10-13 20:38:05,946 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at **lake02/******:8032
2022-10-13 20:38:06,017 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2022-10-13 20:38:07,043 INFO  org.apache.hadoop.ipc.Client                                 [] - Retrying connect to server: **lake02/******:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
……
2022-10-13 20:38:16,052 WARN  org.apache.hadoop.ipc.Client                                 [] - Failed to connect to server: **lake02/******:8032: retries get failed due to exceeded maximum allowed retries number: 10
java.net.ConnectException: Connection refused

二、问题分析

通过jps发现缺少TaskManagerRunner。

# jps
1902910 StandaloneSessionClusterEntrypoint
1861160 RunJar
1903349 SqlClient
1861350 RunJar
128746 NameNode
200744 QuorumPeerMain
1865793 Kafka
1911504 Jps

发现workers和masters文件均为空

三、解决方案

恢复masters和workers的内容

# cat masters
localhost:8081

# cat workers
localhost

3 Standalone模式

3.1 概述

     Standalone模式是最简单的一种集群模式,不需要Yarn、mesos等资源调度平台,自带集群,资源管理由flink集群管理,开发环境测试使用。

    Standalone模式是一种主从模式,主要有两个组件构成分别是JobManager(Master)和TaskManager(Slave)。

当一个应用提交执行时,Flink的各个组件是如何交互协作的:

1)App程序通过rest接口提交给Dispatcher(rest接口是跨平台,并且可以直接穿过防火墙,不需考虑拦截)。

2)Dispatcher把JobManager进程启动,把应用交给JobManager。

3)JobManager拿到应用后,向ResourceManager申请资源(slots),ResouceManager会启动对应的TaskManager进程,TaskManager空闲的slots会向ResourceManager注册。

4)ResourceManager会根据JobManager申请的资源数量,向TaskManager发出指令(这些slots由你提供给JobManager)。

5)接着,TaskManager可以直接和JobManager通信了(它们之间会有心跳包的连接),TaskManager向JobManager提供slots,JobManager向TaskManager分配在slots中执行的任务。

6)最后,在执行任务过程中,不同的TaskManager会有数据之间的交换。

3.2 配置部署

一、节点规划

本次通过虚拟机部署,采用5个节点 ,每一个节点提供一块500GB的硬盘。

Hostname

IP

用途

说明

labnode01

192.168.80.131

master, jobmanager

OScentos8.0

labnode02

192.168.80.132

slavetaskmanager

OScentos8.0

labnode03

192.168.80.133

slavetaskmanager

OScentos8.0

二、修改配置文件

1)修改flink-conf.yaml配置文件:

##配置master节点ip
jobmanager.rpc.address: 192.168.1.100

##配置每个节点的可用slot,1 核CPU对应 1 slot
##the number of available CPUs per machine
taskmanager.numberOfTaskSlots: 30

##默认并行度 1 slot资源
parallelism.default: 1

2)修改master和work配置文件

Master文件

# cat masters
labnode01:8081

workers文件

labnode02
labnode03

将以上文件分发各节点对应文件夹。

三、集群启动和关闭

在master节点上执行此脚本,就可以启动集群,前提要保证master节点到slaver节点可以免密登录。

因为它的启动过程是:先在master节点启动jobmanager进程,然后ssh到各slaver节点启动taskmanager进程。

启动集群

# /usr/local/flink/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host lake02.
Starting taskexecutor daemon on host lake03.
Starting taskexecutor daemon on host lake04.
Starting taskexecutor daemon on host slake05.

停止集群:

# /usr/local/flink/bin/stop-cluster.sh

3.3 运行验证

1)启动Flink

# /usr/local/flink/bin/start-cluster.sh

2)访问flink webUI

3)执行官方用例WordCount

执行命令:

# /usr/local/flink/bin/flink run /usr/local/flink/examples/batch/WordCount.jar
……
- f27663f6191a378629eea720a988cc53 (java.util.ArrayList) [170 elements]


(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
……

查看Flink WebUI

4 Flink On Yarn模式

4.1 概述

    独立(Standalone)模式由 Flink 自身提供资源,无需其他框架,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但Flink 是大数据计算框架,不是资源调度框架,这并不是它的强项;所以还是应该让专业的框架做专业的事,和其他资源调度框架集成更靠谱。

    在目前大数据生态中,国内应用最为广泛的资源管理平台是Yarn。Yarn(yet another resource negotiator)是一个通用分布式资源管理系统和调度平台,为上层应用提供统一的资源管理和调度。在集群利用率、资源统一管理和数据共享等方面带来巨大好处。

    Flink on Yarn 企业生产环境运行Flink任务大多数的选择。

    在强大的Yarn平台上,Flink是如何在Yarn上集成部署的,其过程是:客户端把Flink 应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的 NodeManager 申请容器。在这些容器上,Flink会部署JobManager 和 TaskManager 的实例,从而启动集群。Flink会根据运行在JobManger上的作业所需要的 Slot 数量动态分配TaskManager资源。

https://upload-images.jianshu.io/upload_images/6178553-c51c1c9b2f695e5b.png

1)提交App之前,先上传Flink的Jar包和配置到HDFS,以便JobManager和TaskManager共享HDFS的数据。

2)客户端向ResourceManager提交Job,ResouceManager接到请求后,先分配container资源,然后通知NodeManager启动ApplicationMaster。

3)ApplicationMaster会加载HDFS的配置,启动对应的JobManager,然后JobManager会分析当前的作业图,将它转化成执行图(包含了所有可以并发执行的任务),从而知道当前需要的具体资源。

4)接着,JobManager会向ResourceManager申请资源,ResouceManager接到请求后,继续分配container资源,然后通知ApplictaionMaster启动更多的TaskManager(先分配好container资源,再启动TaskManager)。container在启动TaskManager时也会从HDFS加载数据。

5)最后,TaskManager启动后,会向JobManager发送心跳包。JobManager向TaskManager分配任务。

Flink提供了yarn上运行的3模式,分别为Session-Cluster,Application Mode和Per-Job-Cluster模式。

4.2 配置部署

一、节点规划

本次通过虚拟机部署,采用5个节点 ,每一个节点提供一块500GB的硬盘。

Hostname

IP

用途

说明

labnode01

192.168.80.131

master, jobmanager

labnode02

192.168.80.132

slavetaskmanager

labnode03

192.168.80.133

slavetaskmanager

二、Yarn环境配置

在Yarn-site.xml中配置关闭内存校验。

Yarn-site.xml是hadoop中/etc/hadoop下的配置文件,否则flink任务可能会因为内存超标而被Yarn集群主动杀死。

<!-- Mem Check Start -->
<!-- 设置不检查虚拟内存的值,不然内存不够会报错 -->
    <property>
        <name>yarn.nodemanager.pmem-check-enabled</name>
        <value>false</value>
    </property>
    <property>
        <name>yarn.nodemanager.vmem-check-enabled</name>
        <value>false</value>
    </property>
    <!-- Mem Check end -->

将修改后的配置文件分发到各节点,然后重启Yarn集群。

三、将Flink软件和配置文件分发到Flink集群规划节点

将Flink的配置文件conf/flink-conf.yaml恢复为初始状态。

4.3 Session-Cluster模式(yarn-session)

4.3.1 概述

    Session-Cluster模式需要先启动Flink集群,向Yarn申请资源。以后提交任务都向这里提交。这个Flink集群会常驻在yarn集群中,除非手工停止。

    在向Flink集群提交Job的时候, 如果资源被用完了,则新的Job不能正常提交.

    缺点: 如果提交的作业中有长时间执行的大作业, 占用了该Flink集群的所有资源, 则后续无法提交新的job.

    所以, Session-Cluster适合那些需要频繁提交的多个小Job, 并且执行时间都不长的Job。

会话模式有两种操作模式:

  1. 附加模式(默认):yarn-session.sh客户端将 Flink 集群提交给 YARN,但客户端一直在运行,跟踪集群的状态。如果集群失败,客户端将显示错误。如果客户端被终止,它也会发出集群关闭的信号。
  2. 分离模式(-d或--detached):yarn-session.sh客户端将 Flink 集群提交给 YARN,然后客户端返回。需要再次调用客户端或 YARN 工具来停止 Flink 集群。

4.3.2 常用命令

1)yarn-session.sh参数说明

使用bin/yarn-session.sh --help 查看可用参数:

Usage:
   Optional
     -at,--applicationType <arg>     Set a custom application type for the application on Yarn
     -D <property=value>             use value for given property
     -d,--detached                   If present, runs the job in detached mode
     -h,--help                       Help for the Yarn session CLI.
     -id,--applicationId <arg>       Attach to running Yarn session
     -j,--jar <arg>                  Path to Flink jar file
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -m,--jobmanager <arg>           Set to Yarn-cluster to use Yarn execution mode.
     -nl,--nodeLabel <arg>           Specify Yarn node label for the Yarn application
     -nm,--name <arg>                Set a custom name for the application on Yarn
     -q,--query                      Display available Yarn resources (memory, cores)
     -qu,--queue <arg>               Specify Yarn queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -t,--ship <arg>                 Ship files in the specified directory (t for transfer)
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -yd,--Yarndetached              If present, runs the job in detached mode (deprecated; use non-Yarn specific option instead)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode

2)启动命令

使用Yarn-session.sh命令申请资源初始化一个Flink集群,命令格式如下:

bin/yarn-session.sh <参数>

如示例:

# /opt/flink/flink/bin/yarn-session.sh -d

3)关闭Flink

停止 flink on Yarn 会话模式中的flink集群

yarn application -kill <appid>

echo "stop" | ./bin/flink -id <appid>

如示例:

# echo "stop" | /opt/flink/flink/bin/flink -id application_1661480406159_0025

4.3.3 运行验证

1)启动Flink

# /opt/flink/flink/bin/yarn-session.sh -d

执行结果:

访问Yarn WebUI:

访问Flink WebUI,http://lake04:38347

2)运行官方用例WordCount

# /opt/flink/flink/bin/flink run /opt/flink/flink/examples/batch/WordCount.jar

命令行执行结果:

Flink WebUI的首页:

Flink WebUI中的结果:

3)关闭

执行命令关闭Flink

# echo "stop" | /opt/flink/flink/bin/flink -id application_1661480406159_0025

4.4 Per-Job-Cluster模式(yarn-cluster)

4.4.1 概述

一个Job会对应一个Flink集群,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。

每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。

4.4.2 常用命令

1)参数说明

flink run -m yarn-cluster --help;可用参数:

该模式下不需要先启动 yarn-session,确保 Hadoop 集群是健康的情况下直接提交 Job 命令:

bin/flink -m yarn-cluster <参数> <jar file>

如示例:

# /opt/flink/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /opt/flink/flink/examples/batch/WordCount.jar

4.4.3 运行验证

1)启动并执行官方用例WordCount

# /opt/flink/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /opt/flink/flink/examples/batch/WordCount.jar

执行结果:

访问Yarn WebUI:

4.4.4 优缺点

优点:随到随用,只有任务需要运行时才会开启flink集群;运行完就关闭释放资源,资源利用更合理;

缺点:对于小作业不太友好,

适用场景:适合大作业,长时间运行的大作业。

4.5 Application Mode

4.5.1 概述

    Application Mode会在Yarn上启动集群, 应用jar包的main函数(用户类的main函数)将会在JobManager上执行. 只要应用程序执行结束, Flink集群会马上被关闭. 也可以手动停止集群.

与Per-Job-Cluster的区别: 就是Application Mode下, 用户的main函数式在集群中执行的

官方建议:

出于生产的需求, 我们建议使用Per-job or Application Mode,因为他们给应用提供了更好的隔离!

4.5.2 运行验证

启动

# /opt/flink/flink/bin/flink run-application -t yarn-application /opt/flink/flink/examples/batch/WordCount.jar

执行结果:

访问Yarn WebUI:

4.5.3 常见问题

任务提示 Could not allocate the required slot within slot request tim

一、错误日志

Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
        at org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResource$8(DefaultScheduler.java:539)
        ... 37 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        ... 35 more
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
        at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
        ... 28 more
Caused by: java.util.concurrent.TimeoutException: Timeout has occurred: 300000 ms
        ... 29 more

解决方案

将flink的配置文件conf/flink-conf.yaml恢复为初始状态,重新启动flink的Yarn session。

问题FLINK Could not get job jar and dependencies from JAR file: JAR file does not exist:

一、问题描述

使用flink客户端将执行flink提交到Yarn,输入-yjm参数提示错误

# /opt/flink/flink/bin/flink run -m Yarn-cluster -yn 2 -yjm 1024 -ytm 1024 /opt/flink/flink/examples/batch/WordCount.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink/flink-1.14.5/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/hadoop-3.2.4/share/hadoop/common/lib/slf4j-reload4j-1.7.35.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Could not get job jar and dependencies from JAR file: JAR file does not exist: -yn

二、问题分析

flink1.8版本之后已弃用该参数,ResourceManager将自动启动所需的尽可能多的容器,以满足作业请求的并行性。

三、解决方案

去掉即可

Deployment took more than 60 seconds. Please check if the requested resources are

一、问题描述

日志信息如下:

INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster
INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster

截图如下:

二、解决方案

配置yarn-site.xml

  <property>
        <name>yarn.scheduler.minimum-allocation-mb</name>
        <value>1024</value>
    </property>
    <property>
        <name>yarn.scheduler.maximum-allocation-mb</name>
        <value>102400</value>
    </property>
    <property>
        <name>yarn.nodemanager.resource.cpu-vcores</name>
        <value>32</value>
    </property>
   <property>
        <name>yarn.nodemanager.resource.memory-mb</name>
        <value>51200</value>
    </property>

Flink读取Hudi表时报错lassNotFoundException: *mapred.FileInputFormat

一、问题现象:

执行“select * from t1;”报错,报错信息如下:

Flink SQL> select * from t1;
……
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.mapred.FileInputFormat

二、原因分析

mapred代表的是hadoop旧API,而mapreduce代表的是hadoop新的API

三、解决办法

解决办法为复制集群的hadoop-mapreduce-client-core.jar到Flink/lib中。

读取数据表失败NoSuchMethodError: *Preconditions.checkArgument

一、问题描述

创建表格式如下

CREATE TABLE t2(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://10.101.12.140:9000/datas/flink-hudi/test0907/t1',
'table.type' = 'MERGE_ON_READ',
'read.tasks' = '1',
'read.streaming.enabled' = 'true',
'read.streaming.start-commit' = '20210316134557',
'read.streaming.check-interval' = '4'
);
INSERT INTO t2 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

执行成功,然后执行表内容查询

select * from t2;

报出如下错误:

Flink SQL> select * from t2;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputForm
Flink SQL>

二、原因分析

mapred代表的是hadoop旧API,而mapreduce代表的是hadoop新的API

三、解决办法

解决办法为复制集群的hive-exec-3.1.3.jar到各节点的flink/lib中。(注意hive-exec和hadoop版本的匹配)

启动失败NoSuchMethodError: *Preconditions.checkArgument

一、错误描述

通过bin/yarn-session.sh -d启动yarn-session失败,报错信息如下:

The program finished with the following exception:

org.apache.hadoop.yarn.exceptions.YarnRuntimeException: java.lang.reflect.InvocationTargetException
        at org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl.getClient(RpcClientFactoryPBImpl.java:81)
        at org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC.getProxy(HadoopYarnProtoRPC.java:48)
        ……
        ... 21 more
Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
        at org.apache.hadoop.conf.Configuration.set(Configuration.java:1357)
        ……
        at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.<init>(ApplicationClientProtocolPBClientImpl.java:209)
        ... 26 more

二、错误原因

Preconditions是guava下的工具类,hudi的源码依赖了不同的项目,这些项目使用了不同的guava版本,所报错误是由于运行时guava版本过旧,没有相应的方法。

三、解决方案

在HADOOP_HOME下查询hadoop使用的guava版本,将其拷贝到FLINK_HOME/lib下:

# find ./ -name guava*
./share/hadoop/common/lib/guava-27.0-jre.jar
./share/hadoop/hdfs/lib/guava-27.0-jre.jar

将文件复制到所有yarn集群的FLINK_HOME/lib下。

重新执行bin/yarn-session.sh -d,成功。

通过yarn启动flink失败-连接yarn失败

通过yarn模式启动flink,报出如下异常,关键信息如下:

# /usr/local/flink/bin/yarn-session.sh -d
….
2022-10-27 11:04:30,332 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at /0.0.0.0:8032
…
2022-10-27 11:04:41,153 WARN  org.apache.hadoop.ipc.Client                                 [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 9 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
2022-10-27 11:04:41,153 WARN  org.apache.hadoop.ipc.Client                                 [] - Failed to connect to server: 0.0.0.0/0.0.0.0:8032: retries get failed due to exceeded maximum allowed retries number: 10
java.net.ConnectException: Connection refused
……

原因分析:

1)检查是否启动hadoop集群, 如果没有启动, 是无法连接到hadoop的yarn。

2)flink运行于yarn上,flink要能找到hadoop配置,因为要连接到yarn的resourcemanager和hdfs。

如果正常启动还无法连接yarn, 可以查看一下hadoop的环境变量是否配置好。

在本实例中,时因为无法获取HADOOP_CONF_DIR的配置信息导致问题发生。

二、解决方案

设置HADOOP_CONF_DIR环境变量,并使之生效。

# cat /etc/profile | grep HADOOP_CONF_DIR
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
# source /etc/profile

然后重新启动flink。

BTW:如果已经设置HADOOP_CONF_DIR环境变量,可能由于某种原因HADOOP_CONF_DIR环境变量没有生效,这个原因有很多。

5 参考资料

[01] https://blog.csdn.net/Vector97/article/details/117398947

[02] https://www.jianshu.com/p/8c9c897ea72a

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/393503.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

老兵(11)

百度文心一格&#xff0c;大约是一年前上线并免费向用户开放的。其实也不是免费&#xff0c;而是“电量”比较好获得&#xff0c;白送的就16/每天&#xff0c;如果只是好奇玩玩的话也算够吧。 当时就很开心&#xff0c;因为一直想着把一些文案图像化&#xff0c;做成漫画的形式…

【教程】Linux使用aria2c多线程满速下载

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhang.cn] 安装aria2c&#xff1a; sudo apt-get install aria2多线程下载&#xff1a; aria2c -x 16 -s 16 <url> 比如&#xff1a; aria2c -x 16 -s 16 http://images.cocodataset.org/zips/test2017.zip

轨道交通信号增强与覆盖解决方案——经济高效,灵活应用于各类轨道交通场景!

方案背景 我国是世界上轨道交通里程最长的国家&#xff0c;轨道交通也为我们的日常出行带来极大的便利。伴随着无线通信技术的快速发展将我们带入电子时代&#xff0c;出行的过程中对无线通信的依赖程度越来越高&#xff0c;无论是车站还是车内都需要强大、高质量的解决方案以…

【JavaEE】_HTTP响应

目录 1. 首行 2. 报头header 3.空行 4. 正文body 1. 首行 响应首行&#xff1a;版本号状态码状态码描述&#xff1b; HTTP状态码描述了这次响应的结果&#xff08;比如成功、失败&#xff0c;以及失败原因等&#xff09;&#xff1b; 1. HTTP状态码有&#xff1a; &#…

三种vcruntime140.dll丢失解决方法,有效解决vcruntime140.dll文件丢失

vcruntime140.dll作为一个动态链接库文件&#xff0c;具有重要的功能和用途。它是由Microsoft Visual C Redistributable软件包提供的一个重要组件&#xff0c;用于支持运行在Windows操作系统上的应当vcruntime140.dll文件丢失时&#xff0c;将会对计算机系统产生一系列的影响。…

CVE-2022-24652 漏洞复现

CVE-2022-24652 开题 后台管理是thinkphp的&#xff0c;但是工具没检测出漏洞。 登陆后界面如下&#xff0c;上传头像功能值得引起注意 这其实就是CVE-2022-24652&#xff0c;危险类型文件的不加限制上传&#xff0c;是文件上传漏洞。漏洞路由/user/upload/upload 参考文章&a…

如何减少 HTTP 响应的数据大小

资料来源 : 小林coding 小林官方网站 : 小林coding (xiaolincoding.com) 如何减少 HTTP 响应的数据大小? 对于 HTTP 的请求和响应&#xff0c;通常 HTTP 的响应的数据大小会比较大&#xff0c;也就是服务器返回的资源会比较大。 于是&#xff0c;我们可以考虑对响应的资源进…

【蓝桥杯】算法模板题(Floyd算法)

一.弗洛伊德算法 用途&#xff1a;用来求解多源点最短路径问题。 思想&#xff1a;Floyd算法又称为插点法&#xff0c;是一种利用动态规划的思想寻找给定的加权图中多源点之间最短路径的算法。 主要步骤&#xff1a; 1&#xff09;初始化&#xff1a;使用邻接矩阵初始化dis…

并发CPU伪共享及优化

目录 伪共享 解决 伪共享 缓存系统中是以缓存行&#xff08;cache line&#xff09;为单位存储的。缓存行是2的整数幂个连续字节&#xff0c;一般为32-256个字节。最常见的缓存行大小是64个字节。当多线程修改互相独立的变量时&#xff0c;如果这些变量共享同一个缓存行&am…

代码随想录算法训练营第十六天 | 654.最大二叉树、617.合并二叉树、700.二叉搜索树中的搜索、98.验证二叉搜索树

代码随想录算法训练营第十六天 | 654.最大二叉树、617.合并二叉树、700.二叉搜索树中的搜索、98.验证二叉搜索树 文章目录 代码随想录算法训练营第十六天 | 654.最大二叉树、617.合并二叉树、700.二叉搜索树中的搜索、98.验证二叉搜索树1 LeetCode 654.最大二叉树2 LeetCode 61…

04_device_bus_driverLinux内核模块

01_basicLinux内核模块-CSDN博客文章浏览阅读45次。环境IDubuntuMakefilemodules:clean:basic.creturn 0;运行效果。https://blog.csdn.net/m0_37132481/article/details/136157384?csdn_share_tail%7B%22type%22%3A%22blog%22%2C%22rType%22%3A%22article%22%2C%22rId%22%3A%…

c高级 函数+Makefile

一、作业 1.写一个函数&#xff0c;输出当前用户的uid和gid&#xff0c;并使用变量接收结果 #!/bin/bash function fun(){retid -uret1id -gecho $ret $ret1 } retfun echo $ret二、练习回顾 1.分文件编译&#xff08;实现冒泡排序&#xff09; 正确的&#xff1a;将数组的…

<网络安全>《35 网络攻防专业课<第一课 - 网络攻防准备>》

1 主要内容 认识黑客 认识端口 常见术语与命令 网络攻击流程 VMWare虚拟环境靶机搭建 2 认识黑客 2.1 白帽、灰帽和黑帽黑客 白帽黑客是指有能力破坏电脑安全但不具恶意目的黑客。 灰帽黑客是指对于伦理和法律态度不明的黑客。 黑帽黑客经常用于区别于一般&#xff08;正面…

【性能测试】分布式压测之locust和Jmeter的使用

受限于单台机器的配置问题&#xff0c;我们在单台机器上达不到一个很高的压测并发数&#xff0c;那这个时候就需要引入分布式压测 分布式压测原理&#xff1a; 一般通过局域网把不同测试计算机链接到一起&#xff0c;达到测试共享、分散操作、集中管理的目的。 选择一台作为…

二叉树(4)——链式二叉树

1 二叉树的概念 二叉树是&#xff1a; 空树非空&#xff1a;根节点&#xff0c;根节点的左子树、根节点的右子树组成的。 二叉树定义是递归式的&#xff0c;因此后序基本操作中基本都是按照该概念实现的。 2 二叉树的遍历 2.1 前序、中序以及后序遍历 学习二叉树结构&#xf…

单片机学习笔记---AD模数转换DA数模转换

目录 AD模数转换 XPT2046.c XPT2046.h main.c DA数模转换 main.c 上一篇博客讲了AD/DA转换的工作原理&#xff0c;也介绍了运算放大器的工作原理&#xff0c;这节开始代码演示&#xff01; AD模数转换 新创建一个工程&#xff1a;AD模数转换 第一个工程将用到LCD1602和…

Day20 -- learning english

一、积累 1.gulp 2.clog 3.artery 4.bloat 5.kidnap 6.groom 7.prey 8.cargo 9. jerk 10.treadmill 11.shatter 12. acrobatic 13. aggravate 14.moldy 15.curl 16.manual 17.slay 18.sibling 19.hatch 20.dense 二、练习 1.牛津原译 Gulp /ɡʌlp 1.~ sth (down)to swallow …

[计算机网络]深度学习传输层TCP协议

&#x1f493; 博客主页&#xff1a;从零开始的-CodeNinja之路 ⏩ 收录专栏&#xff1a;深度学习传输层TCP协议 &#x1f389;欢迎大家点赞&#x1f44d;评论&#x1f4dd;收藏⭐文章 [计算机网络]深度学习传输层TCP协议 前提概括一: TCP协议段格式二:确认应答三:超时重传四:…

YOLO v5项目实战 P5 解决运行detect文件时设置了--view-img但是显示不出来的问题

up主讲的实时显示目标检测后的图片的两种方法&#xff1a; &#xff08;1&#xff09;在下面的Terminal中输入下列命令&#xff1a; python detect.py --view-img &#xff08;2&#xff09;点击进入右上方的detect的Edit Configurations 然后在这个参数这里输入 --view img…

企业建站用什么服务器,多线BGP线路服务器值得信赖

随着数字化时代的到来&#xff0c;很多企业会选择建立自己的网站&#xff0c;让企业网站成为对外展示、业务开展的平台。但是有的企业在建站时&#xff0c;选择了不适合的服务器&#xff0c;导致出现访问延迟、数据加载缓慢等问题&#xff0c;对企业的形象和业务造成很大影响。…