Hdoop学习笔记(HDP)-Part.18 安装Flink

目录
Part.01 关于HDP
Part.02 核心组件原理
Part.03 资源规划
Part.04 基础环境配置
Part.05 Yum源配置
Part.06 安装OracleJDK
Part.07 安装MySQL
Part.08 部署Ambari集群
Part.09 安装OpenLDAP
Part.10 创建集群
Part.11 安装Kerberos
Part.12 安装HDFS
Part.13 安装Ranger
Part.14 安装YARN+MR
Part.15 安装HIVE
Part.16 安装HBase
Part.17 安装Spark2
Part.18 安装Flink
Part.19 安装Kafka
Part.20 安装Flume

十八、安装Flink

1.配置Ambari的flink资源

(1)创建flink源

下载链接为
https://repo.huaweicloud.com/apache/flink/flink-1.9.3/flink-1.9.3-bin-scala_2.12.tgz
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.6.5-7.0/flink-shaded-hadoop-2-uber-2.6.5-7.0.jar
http://www.java2s.com/Code/JarDownload/javax.ws/javax.ws.rs-api-2.0.jar.zip
上传到hdp01上,并复制到/var/www/html下

mkdir /var/www/html/flink
cp /opt/flink-1.9.3-bin-scala_2.12.tgz /var/www/html/flink/
cp /opt/flink-shaded-hadoop-2-uber-2.6.5-7.0.jar /var/www/html/flink/
cp /opt/javax.ws.rs-api-2.0.jar /var/www/html/flink/

(2)下载ambari-flink-service服务

在外网服务器上

git clone https://ghproxy.com/https://gitee.com/liujingwen-git/ambari-flink-service-master.git /root/FLINK

注:github.com无法直接下载,可以使用gproxy.com进行代理加速,拼接形成加速URL;或者将github.com在本地强制解析为140.82.114.4
将FLINK文件夹上传到hdp01的/var/lib/ambari-server/resources/stacks/HDP/3.1/services/上,对应的版本可在集群的服务器中查看

hdp-select status hadoop-client | sed 's/hadoop-client - \([0-9]\.[0-9]\).*/\1/'

(3)文件说明

从github上下载的ambari-flink-service文件较多,有些可以删除以及修改
在这里插入图片描述

(4)修改metainfo.xml文件

文件位置:
/var/lib/ambari-server/resources/stacks/HDP/3.1/services/FLINK/metainfo.xml
修改版本

            <name>FLINK</name>
            <displayName>Flink</displayName>
            <comment>Apache Flink is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams.</comment>
            <version>1.9.3</version>

删除FLINK_MASTER,仅保留安装FLINK_CLIENT,在Flink on YARN模式下,master与ResourceManager合设,无需单独安装

            <components>
                <component>
                  <name>FLINK_CLIENT</name>
                  <displayName>FlinkCLIENT</displayName>
                  <category>CLIENT</category>
                  <cardinality>1+</cardinality>
                  <commandScript>
                    <script>scripts/flink_client.py</script>
                    <scriptType>PYTHON</scriptType>
                    <timeout>10000</timeout>
                  </commandScript>
                </component>
            </components>

(5)修改flink-ambari-config.xml文件

文件位置:
/var/lib/ambari-server/resources/stacks/HDP/3.1/services/FLINK/configuration/flink-ambari-config.xml
修改安装路径

  <property>
    <name>flink_install_dir</name>
    <value>/usr/hdp/3.1.5.0-152/flink</value>
    <description>Location to install Flink</description>
  </property>

修改安装包下载地址

  <property>
    <name>flink_download_url</name>
    <value>http://hdp01.hdp.com/flink/flink-1.9.3-bin-scala_2.12.tgz</value>
    <description>Snapshot download location. Downloaded when setup_prebuilt is true</description>
  </property>

  <property>
    <name>flink_hadoop_shaded_jar</name>
    <value>http://hdp01.hdp.com/flink/flink-shaded-hadoop-2-uber-2.6.5-7.0.jar</value>
    <description>Flink shaded hadoop jar download location. Downloaded when setup_prebuilt is true</description>
  </property>
</configuration>

(6)修改flink-env.xml文件

文件位置:
/var/lib/ambari-server/resources/stacks/HDP/3.1/services/FLINK/configuration/flink-env.xml
修改JAVA环境变量

env.java.home: /usr/local/jdk1.8.0_351/jre/

(7)修改flink_client.py文件

文件位置:
/var/lib/ambari-server/resources/stacks/HDP/3.1/services/FLINK/package/scripts/flink_client.py
该文件的作用调用相关的环境变量,来实现整个安装过程
内容如下:

#!/usr/bin/env ptyhon

from resource_management import *

class FlinkClient(Script):
  def install(self, env):
    print 'Install the Flink Client'
    import params
    env.set_params(params)
    self.configure(env)
    # create flink log dir
    Directory(params.flink_log_dir,
              owner=params.flink_user,
              group=params.flink_group,
              create_parents=True,
              mode=0775
             )
    Execute(format("rm -rf {flink_install_dir}/log/flink"))
    Execute(format("ln -s {flink_log_dir} {flink_install_dir}/log/flink"))

  def configure(self, env):
    import params
    env.set_params(params)
    # write out flink-conf.yaml
    properties_content = InlineTemplate(params.flink_yaml_content)
    File(format(params.flink_install_dir + "/conf/flink-conf.yaml"), content=properties_content)

  def status(self, env):
    raise ClientComponentHasNoStatus()

if __name__ == "__main__":
  FlinkClient().execute()

(8)修改parameter.py文件

文件位置:
/var/lib/ambari-server/resources/stacks/HDP/3.1/services/FLINK/package/scripts/params.py
该文件定义了部分环境变量
内容如下:

#!/usr/bin/env python
from resource_management import *
from resource_management.libraries.script.script import Script
import sys, os, glob
from resource_management.libraries.functions.version import format_stack_version
from resource_management.libraries.functions.default import default

# server configurations
config = Script.get_config()

# params from flink-ambari-config
flink_install_dir = config['configurations']['flink-ambari-config']['flink_install_dir']
flink_numcontainers = config['configurations']['flink-ambari-config']['flink_numcontainers']
flink_numberoftaskslots= config['configurations']['flink-ambari-config']['flink_numberoftaskslots']
flink_jobmanager_memory = config['configurations']['flink-ambari-config']['flink_jobmanager_memory']
flink_container_memory = config['configurations']['flink-ambari-config']['flink_container_memory']
setup_prebuilt = config['configurations']['flink-ambari-config']['setup_prebuilt']
flink_appname = config['configurations']['flink-ambari-config']['flink_appname']
flink_queue = config['configurations']['flink-ambari-config']['flink_queue']
flink_streaming = config['configurations']['flink-ambari-config']['flink_streaming']

hadoop_conf_dir = config['configurations']['flink-ambari-config']['hadoop_conf_dir']
flink_download_url = config['configurations']['flink-ambari-config']['flink_download_url']
flink_hadoop_shaded_jar_url = config['configurations']['flink-ambari-config']['flink_hadoop_shaded_jar']
javax_ws_rs_api_jar = config['configurations']['flink-ambari-config']['javax_ws_rs_api_jar']

conf_dir=''
bin_dir=''

# params from flink-conf.yaml
flink_yaml_content = config['configurations']['flink-env']['content']
flink_user = config['configurations']['flink-env']['flink_user']
flink_group = config['configurations']['flink-env']['flink_group']
flink_log_dir = config['configurations']['flink-env']['flink_log_dir']
flink_log_file = os.path.join(flink_log_dir,'flink-setup.log')

temp_file='/tmp/flink.tgz'

(9)添加用户及组

添加用户和组

groupadd flink
useradd -d /home/flink -g flink flink

(10)重启ambari服务

重启服务

ambari-server restart

在ambari中的Stack and Versions中可以看到flink的信息
在这里插入图片描述

2.安装

添加flink服务
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在Custom flink-env中新增
Key:yarn.client.failover-proxy-provider
Value:org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.实验:wordcount

实验:wordcount
从尚硅谷下载实验用的程序代码,在idea中对以socket形式接收数据流的代码进行修改,从192.168.111.1的nc处接收数据流,然后对词频统计后输出到本地文件中
在这里插入图片描述

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    DataStream<String> stream = env.socketTextStream("192.168.111.1", 1234);
    DataStream<Tuple2<String, Integer>> resultStream = stream.flatMap(new Tokenizer())
            .keyBy(0)
            .sum(1);
    resultStream.print();
    resultStream.writeAsText("/tmp/wc_result.txt");

    env.execute("Flink Streaming Java API Skeleton");
}

在idea中的Build->Build Artifacts中选择build生成jar包,然后上传到hdp05上,使用一个租户进行kerberos认证后,提交flink任务。

cd /usr/hdp/3.1.5.0-152/flink/bin/
./flink run -m yarn-cluster /root/flink-tutorial-master.jar -c WordCountFromSocket

在这里插入图片描述
在192.168.111.1上启动nc

nc -l 1234

在这里插入图片描述
输入数据流后,中断nc进程,然后在hdp05上查看结果文件/tmp/wc_result.txt
在这里插入图片描述
在yarn上可以查看到对应的应用信息
在这里插入图片描述

4.常见报错

(1)安装时报错Unable to run the custom hook script

报错信息:Error: Error: Unable to run the custom hook script [‘/usr/bin/python’, ‘/var/lib/ambari-agent/cache/stack-hooks/before-ANY/scripts/hook.py’, ‘ANY’, ‘/var/lib/ambari-agent/data/command-1360.json’, ‘/var/lib/ambari-agent/cache/stack-hooks/before-ANY’, ‘/var/lib/ambari-agent/data/structured-out-1360.json’, ‘INFO’, ‘/var/lib/ambari-agent/tmp’, ‘PROTOCOL_TLSv1_2’, ‘’]
2023-03-26 22:23:31,445 - The repository with version 3.1.5.0-152 for this command has been marked as resolved. It will be used to report the version of the component which was installed
在这里插入图片描述
通过ambari添加自定义服务时,总是不能自动增加service账号

python configs.py -u admin -p lnyd@LNsy115 -n HDP315 -l hdp01 -t 8080 -a get -c cluster-env | grep -i ignore_groupsusers_create
python configs.py -u admin -p lnyd@LNsy115 -n HDP315 -l hdp01 -t 8080 -a set -c cluster-env -k ignore_groupsusers_create -v true

在这里插入图片描述

(2)安装后启动报错parent directory /usr/local/flink/conf doesn’t exist

安装后在启动时报错:resource_management.core.exceptions.Fail: Applying File[‘/usr/local/flink/conf/flink-conf.yaml’] failed, parent directory /usr/local/flink/conf doesn’t exist
在这里插入图片描述
不知什么原因没解压过去,需要手动解压到该目录

tar -zxvf /opt/flink-1.9.3-bin-scala_2.12.tgz -C /root/
mv /root/flink-1.9.3/* /usr/local/flink/

(3)提交任务报错java.lang.NoClassDefFoundError

报错信息:
java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties
在这里插入图片描述
maven会自动下载相关的依赖jar包,因此需要将project下的jersey依赖jar包拷贝至flink的lib目录下
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/215316.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

⾃定义类型:联合和枚举(C语言版)

一.联合体. 1.1.联合体类型的声明,定义 像结构体⼀样&#xff0c;联合体也是由⼀个或者多个成员构成&#xff0c;这些成员可以不同的类型,但是编译器只为最⼤的成员分配⾜够的内存空间。联合体的特点是 所有成员共⽤同⼀块内存空间 。所以联合体也叫&#xff1a;共⽤体。 给联…

TextCNN文本分类快速上手

这里写目录标题 TextCNN介绍&#xff1a;Docker从0安装Docker基于镜像安装容器打包操作&#xff08;生成镜像时使用的命令&#xff09;安装时命令 页面访问模型训练API访问性能测试其他查看显卡信息 TextCNN介绍&#xff1a; 1.支持语义识别和分类置信度输出。 2.训练速度快&…

10倍提升启动的时间?Graalvm打包Springboot+MyBatis实测

graalvm使用前后对比图 相关代码博客&#xff1a;https://blog.csdn.net/weixin_43914278/article/details/134446327 工具大小时间graalvm打包的exe文件84.14MB0.251秒graalvm打包的docker文件121.27MB0.253秒jar包51.34MB2.153秒 解析 文件大小: graalvm打包的Docker文件…

如何提高WhatsApp回复率,附相关进阶技巧

现在WhatsApp开发客户基本上也是成为了很多外贸人的共识了&#xff0c;基本上大部分的外贸朋友都有在用。但是用多了就会发现&#xff0c;虽然WhatsApp的回复率是比较高的&#xff0c;但是很多都是礼貌性回复&#xff0c;真正进行营销的时候&#xff0c;其实客户还是很多不会回…

scikit-learn线性回归法进行利润预测

大家好&#xff0c;生成式人工智能无疑是一个改变游戏规则的技术&#xff0c;但对于大多数商业问题来说&#xff0c;回归和分类等传统的机器学习模型仍然是首选。 私募股权或风险投资这样的投资者利用机器学习&#xff0c;首先必须了解关注的数据以及它是如何被使用的。投资公…

TimeGPT:时间序列预测模型实例

时间序列预测领域正在经历一个非常激动人心的时期。在过去的三年里&#xff0c;我们见证了许多重要的贡献&#xff0c;如N-BEATS、N-HiTS、PatchTST和TimesNet等。同时&#xff0c;大型语言模型&#xff08;LLM&#xff09;近来在流行度方面取得了很大的成功&#xff0c;例如Ch…

【源码解析】聊聊线程池 实现原理与源码深度解析(一)

一、Java 线程池 实现原理与源码深度解析 架构 总揽线程池设计&#xff0c;其实可以发现都是符合顶层的接口设计&#xff0c;中间抽象类&#xff0c;最终是实际工作类 使用示例 public class MyRunnable implements Runnable{Overridepublic void run() {System.out.println…

深度学习——第3章 Python程序设计语言(3.2 Python程序流程控制)

3.2 Python程序流程控制 目录 1.布尔数据类型及相关运算 2.顺序结构 3.选择&#xff08;分支&#xff09;结构 4.循环结构 无论是在机器学习还是深度学习中&#xff0c;Python已经成为主导性的编程语言。而且&#xff0c;现在许多主流的深度学习框架&#xff0c;例如PyTorc…

EXPLAIN解析

针对以下sql进行解析 EXPLAIN SELECTdauk.id AS daukId,dasm.mailbox AS storeAccount,dau.id,dau.id AS userId,das.score AS score,das.sell_num AS sellNum,dapa.product_link AS productLink,dapa.able_category_ids AS ableCategoryIds,dac.parent_name AS parentName,da…

SAP_ABAP_RZ11解决SAP运行超时问题 TIME_OUT / rdisp/scheduler/prio_high/max_runtime

SAP ABAP 顾问&#xff08;开发工程师&#xff09;能力模型_Terry谈企业数字化的博客-CSDN博客文章浏览阅读510次。目标&#xff1a;基于对SAP abap 顾问能力模型的梳理&#xff0c;给一年左右经验的abaper 快速成长为三年经验提供超级燃料&#xff01;https://blog.csdn.net/j…

Unity | 渡鸦避难所-1 | 修复资源导入后呈现洋红色(Built-in 转 URP)

1 前言 Unity 编辑器导入 Asset Store 的资源包后&#xff0c;在预览和使用时&#xff0c;发现对象显示为洋红色 以小狐狸为例&#xff0c;打开资源包中的场景&#xff0c;可以看到小狐狸和地板均显示为洋红色 这是因为 Asset Store 中的资源包大部分是针对内置渲染管线项目制…

自然语言处理:电脑如何理解我们的语言?

☁️主页 Nowl &#x1f525;专栏《机器学习实战》 《机器学习》 &#x1f4d1;君子坐而论道&#xff0c;少年起而行之 ​ 文章目录 ​编辑 常见方法 1.基于词典的方法 2.基于计数的方法 基于推理的方法 Bert input_ids attention_mask token_type_ids 结语 在广…

施密特触发器

1、作用 简单来说&#xff0c;施密特触发器可以将模拟信号转变成数字信号 2、为什么不使用比较器将模拟信号转变成数字信号 当输入电平高于参考电压时&#xff0c;输出高电平&#xff1b;输入电压低于参考电压时&#xff0c;输出低电平。这样比较器也可以实现模拟信号转换成数…

封装Servlet使用自定义注解进行参数接收

文章目录 前言一、前后对比✨二、具体实现&#x1f387;三、效果展示&#x1f38f; 前言 先说项目背景&#xff0c;本项目是本人在校期间老师布置的作业&#xff08;就一个CRUD&#xff09;&#xff0c;课程是后端应用程序设计&#xff0c;其实就是servlet和jsp那一套&#xf…

Python异常处理【侯小啾python领航班系列(二十六)】

Python异常处理【侯小啾python领航班系列(二十六)】 大家好,我是博主侯小啾, 🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹…

Java三种代理模式:静态代理、动态代理和CGLIB代理

Java三种代理模式&#xff1a;静态代理、动态代理和CGLIB代理 代理模式 代理模式是23种设计模式种的一种。代理模式是一种结构型设计模式&#xff0c;它允许为其他对象提供一个替代品或占位符&#xff0c;以控制对这个对象的访问。代理模式可以在不修改被代理对象的基础上&am…

SPM/SCM 流量跟踪体系

SPM SPM&#xff08;shopping page mark&#xff0c;导购页面标记&#xff09; 是淘宝社区电商业务&#xff08;xTao&#xff09;为外部合作伙伴&#xff08;外站&#xff09;提供的跟踪引导成交效果数据的解决方案 注&#xff1a;也有解释 SPM 为超级位置模型(Super Position…

2024年甘肃省职业院校技能大赛(中职教师组)网络安全竞赛样题卷④

2024年甘肃省职业院校技能大赛&#xff08;中职教师组&#xff09;网络安全竞赛样题卷④ 2024年甘肃省职业院校技能大赛&#xff08;中职教师组&#xff09;网络安全竞赛样题卷④A模块基础设施设置/安全加固&#xff08;本模块200分&#xff09;A-1任务一 登录安全加固&#xf…

(C++)和为s的两个数字--双指针算法

个人主页&#xff1a;Lei宝啊 愿所有美好如期而遇 和为S的两个数字_牛客题霸_牛客网输入一个升序数组 array 和一个数字S&#xff0c;在数组中查找两个数&#xff0c;使得他们的和正好是S&#xff0c;如果。题目来自【牛客题霸】https://www.nowcoder.com/practice/390da4f7a…

关于你对 Zookeeper 的理解

看看普通人和高手是如何回答这个问题的&#xff1f; 普通人 Zookeeper 是一种开放源码的分布式应用程序协调服务 是一个分布式的小文件存储系统 一般对开发者屏蔽分布式应用开发过过程种的底层细节 用来解决分布式集群中应用系统的一致性问题 高手 对于 Zookeeper 的理解…