aws emr启动standalone的flink集群

关键组件

  • Client,代码由客户端获取并做转换,之后提交给JobManger
  • JobManager,对作业进行中央调度管理,获取到要执行的作业后,会进一步处理转换,然后分发任务给众多的TaskManager。
  • TaskManager,数据的处理操作
image-20240105171119363

在emr上自建standalone集群,文件分发脚本xsync

#!/bin/bash
if [ $# -lt 1 ]
then
    echo Not Enough Arguement!
    exit;
fi
nodelist=$(yarn node -list 2> /dev/null | awk 'NR > 2 {print $1}' | awk -F: '{print $1}')
for host in $nodelist
do
    echo ==================== $host ====================
    for file in $@
    do
        if [ -e $file ]
        then
            pdir=$(cd -P $(dirname $file); pwd)
            fname=$(basename $file)
            ssh $host "mkdir -p $pdir"
            rsync -av $pdir/$fname $host:$pdir
        else
            echo $file does not exists!
        fi
    done
done

先进入节点中创建flink文件(root用户)

#!/bin/bash
nodelist=$(yarn node -list 2> /dev/null | awk 'NR > 2 {print $1}' | awk -F: '{print $1}')
for node in $nodelist; do
  echo "Executing commands on $node"
  ssh -o StrictHostKeyChecking=no $node "
    sudo mkdir -p /usr/lib/flink/
    sudo chown -R hadoop:hadoop /usr/lib/flink
  "
done

将/usr/lib/flink/同步到其他节点上

xsync /usr/lib/flink
xsync /etc/flink/conf.dist

重新软连接

#!/bin/bash
nodelist=$(yarn node -list 2> /dev/null | awk 'NR > 2 {print $1}' | awk -F: '{print $1}')
for node in $nodelist; do
  echo "Executing commands on $node"
  ssh -o StrictHostKeyChecking=no $node "
    sudo rm /usr/lib/flink/conf
    sudo ln -s /etc/flink/conf.dist/ /usr/lib/flink/conf
  "
done

直接启动失败,需要指定workfile

No workers file. Please specify workers in 'conf/workers'.

root用户下写入workers配置

yarn node -list 2> /dev/null | awk 'NR > 2 {print $1}' | awk -F: '{print $1}' > /usr/lib/flink/conf/workers

echo $(hostname).cn-north-1.compute.internal:8081 > /usr/lib/flink/conf/masters

jobmanager配置

# JobManager 节点地址.
jobmanager.rpc.address: $(hostname).cn-north-1.compute.internal
jobmanager.bind-host: 0.0.0.0 # default
rest.address: 0.0.0.0 # webui
rest.bind-address: 0.0.0.0

taskmanager配置

echo taskmanager.host: $(hostname).cn-north-1.compute.internal >> /usr/lib/flink/conf/flink-conf.yaml
echo taskmanager.bind-host: 0.0.0.0 >> /usr/lib/flink/conf/flink-conf.yaml

修改权限

sudo chown -R hadoop:hadoop /var/lib/flink

启动集群

  • 不知道为什么worker节点始终无法启动taskmanagerrunner,最终发现没有权限

  • 这个路径是提供webui上传jar文件用的

    mkdir: cannot create directory ‘/var/run/flink’: Permission denied
    /usr/lib/flink/bin/flink-daemon.sh: line 82: /var/run/flink: No such file or directory
    flock: 200: Bad file descriptor
    Starting taskexecutor daemon on host ip-192-168-28-247.
    /usr/lib/flink/bin/flink-daemon.sh: line 145: /var/run/flink/flink-hadoop-taskexecutor.pid: No such file or directory
    
/usr/lib/flink/bin/start-cluster.sh

在master上查看日志

在这里插入图片描述

最终taskmanager成功注册

image-20240105201137482

关闭集群

/usr/lib/flink/bin/stop-cluster.sh

jpsall脚本

#!/bin/bash
masternode=$(hostname).cn-north-1.compute.internal
nodelist=$(yarn node -list 2> /dev/null | awk 'NR > 2 {print $1}' | awk -F: '{print $1}')
for host in $masternode $nodelist
do
    echo =============== $host ===============
    ssh $host jps
done

在yarn模式下,提交 JAR 文件后,它就会变成由 Flink JobManager 管理的作业。

  • JobManager 位于托管 Flink 会话 Application Master 进程守护程序的 YARN 节点上

集群生命周期大于job,因此实际上对应session模式

flink run --jobmanager localhost:8081 /usr/lib/flink/examples/streaming/WordCount.jar --input s3://zhaojiew-tmp/shakespeare/ --output s3://zhaojiew-tmp/flinkoutput 

flink run --jobmanager localhost:8081 -c org.example.wc.WordCountBatch flinkall-1.0.0.jar

部署模式

session模式

启动flink session

  • 5.5.0 版本中添加了 flink-yarn-session 命令作为 yarn-session.sh 脚本的包装程序以简化执行
flink-yarn-session -d

image-20240105110801556

启动session后提交任务

flink run --jobmanager yarn-cluster -yid application_1704427099392_0001  /usr/lib/flink/examples/streaming/WordCount.jar --input s3://zhaojiew-tmp/shakespeare/ --output s3://zhaojiew-tmp/flinkoutput 

Caused by: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 400, Request ID: 8XKQF6W01QDQCRTD, Extended Request ID: Bsi6AK3alrxV5OYL5aZhu05h/RusTGUBm9P9hRu5dFu0whCv68DKpvFjf8CYL9Wc5zSEoaL759M=)

可以设置region

  • 看起来是6.15版本的emr_flink存在问题,默认region不对
-Dfs.s3a.bucket.endpoint.region=cn-north-1

在resourcemanager的Tracking UI会跳转到flink jobmanager

但是flink的taskmanager日志并没有汇聚到yarn历史服务器中

查看session fluster的ip和端口号

image-20240105222430792

在这里插入图片描述

或者直接在提交jar界面看ip和端口地址,监听的是内网ip

  • 找到后可以在idea中注册

per-job模式

直接运行batch任务

flink run -m yarn-cluster -Dexecution.runtime-mode=BATCH flinktutorial17-1.0.jar

不同类型任务的name

image-20240105155322448

application模式

提交任务

  • 将jar拷贝到/usr/lib/flink/lib

  • 指定作业入口类,脚本会到 lib 目录扫描所有的 jar 包

/usr/lib/flink/bin/standalone-job.sh start --job-classname org.example.wc.WordCountBatch

Starting standalonejob daemon on host ip-192-168-30-184.

启动taskmanager

/usr/lib/flink/bin/taskmanager.sh start

在application模式下,yarn中同样没有记录

查看jps进程

image-20240105220805065

在master上查看日志

image-20240105220010908

yarn运行模式

yarn运行模式下同样可以使用三种部署模式

session模式

flink-yarn-session -d命令参数

  • Flink1.11.0 版本不再使用-n 参数和-s 参数分别指定 TaskManager 数量和 slot 数量,YARN 会按照需求动态分配 TaskManager 和 slot
-d:分离模式
-jm(--jobManagerMemory):配置 JobManager 所需内存,默认单位 MB
-nm(--name):配置在 YARN UI 界面上显示的任务名
-qu(--queue):指定 YARN 队列名
-tm(--taskManager):配置每个 TaskManager 所使用内存。

per-job模式

提交任务

/usr/lib/flink/bin/flink run -t yarn-per-job -c org.example.wc.WordCountBatch flinkall-1.0.0.jar

 -t,--target <arg>     The deployment target for the given application,
                       which is equivalent to the "execution.target" config
                       option. For the "run" action the currently available
                       targets are: "remote", "local", "kubernetes-session",
                       "yarn-per-job" (deprecated), "yarn-session". For the
                       "run-application" action the currently available
                       targets are: "kubernetes-application",
                       "yarn-application".

如下报错的解决

  • 在 flink 的/opt/module/flink-1.17.0/conf/flink-conf.yaml 配置文件中设置classloader.check-leaked-classloader: false

image-20240105223157373

application模式

提交任务

/usr/lib/flink/bin/flink run-application -t yarn-application -c org.example.wc.WordCountBatch flinkall-1.0.0.jar

/usr/lib/flink/bin/flink run-application -t yarn-application s3://zhaojiew-bigdata/app/WordCount.jar --input s3://zhaojiew-tmp/shakespeare/ --output s3://zhaojiew-tmp/flinkoutput 

可以看到print输出此时在jobmanager中

image-20240105223507271

但是看起来并不支持将jar存储在s3?已知问题,怀疑和flink本身有关,因为使用s3作为input和output是没有问题的

在这里插入图片描述

可以在提交任务时指定依赖,并非任务jar

bin/flink run-application -t yarnapplication -Dyarn.provided.lib.dirs="hdfs://hadoop102:8020/flinkdist" -c com.atguigu.wc.SocketStreamWordCount
hdfs://hadoop102:8020/flink-jars/FlinkTutorial-1.0-SNAPSHOT.jar

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/650489.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ResNet残差网络的学习【概念+翻译】

基于何明凯前辈论文的学习 1.主要内容&#xff08;背景&#xff09; 1、首先提了一个base&#xff1a;神经网络的深度越深&#xff0c;越难以训练。 2、原因&#xff1a;因为随着神经网络层数的增加&#xff0c;通常会遇到梯度消失或梯度爆炸等问题&#xff0c;这会导致训练变…

二十八、openlayers官网示例Data Tiles解析——自定义绘制DataTile源数据

官网demo地址&#xff1a; https://openlayers.org/en/latest/examples/data-tiles.html 这篇示例讲解的是自定义加载DataTile源格式的数据。 先来看一下什么是DataTile&#xff0c;这个源是一个数组&#xff0c;与我们之前XYZ切片源有所不同。DataTile主要适用于需要动态生成…

【CSharp】将ushort数组保存为1通道位深16bit的Tiff图片

【CSharp】将ushort数组保存为1通道位深16bit的Tiff图片 1.背景2.接口 1.背景 System.Drawing.Common 是一个用于图像处理和图形操作的库&#xff0c;它是 System.Drawing 命名空间的一部分。由于 .NET Core 和 .NET 5 的跨平台特性&#xff0c;许多以前内置于 .NET Framework…

10.SpringBoot 统一处理功能

文章目录 1.拦截器1.1在代码中的应用1.1.1定义拦截器1.1.2注册配置拦截器 1.2拦截器的作用1.3拦截器的实现 2.统一数据返回格式2.1 为什么需要统⼀数据返回格式&#xff1f;2.2 统⼀数据返回格式的实现 3.统一异常处理4.SpringBoot专业版创建项目无Java8版本怎么办&#xff1f;…

[转载]同一台电脑同时使用GitHub和GitLab

原文地址&#xff1a;https://developer.aliyun.com/article/893801 简介&#xff1a; 工作中我们有时可能会在同一台电脑上使用多个git账号&#xff0c;例如&#xff1a;公司的gitLab账号&#xff0c;个人的gitHub账号。怎样才能在使用gitlab与github时&#xff0c;切换成对应…

Vue.js - 计算属性与侦听器 【0基础向 Vue 基础学习】

文章目录 计算属性 computedcomputed 的使用方法computed 与 method 的区别计算属性完整写法 watch 侦听器&#xff08;监视器&#xff09;简单写法 → 简单类型数据&#xff0c;直接监视完整写法 → 添加额外配置项 计算属性 computed computed 的使用方法 **概念&#xff1…

红外超声波雷达测距

文章目录 一HC-SR04介绍1HC-SR04简介及工作原理 二用HAL库实现HC-SR04测量距离1STM32CubeMX配置2keil53代码的添加 三效果 一HC-SR04介绍 1HC-SR04简介及工作原理 超声波是振动频率高于20kHz的机械波。它具有频率高、波长短、绕射现象小、方向性好、能够成为射线而定向传播等…

如何使用 Re-Ranking 改进大模型 RAG 检索

基于大型语言模型&#xff08;LLMs&#xff09;的聊天机器人可以通过检索增强生成&#xff08;RAG&#xff09;提供外部知识来改进。 这种外部知识可以减少错误答案&#xff08;幻觉&#xff09;&#xff0c;并且使模型能够访问其训练数据中未包含的信息。 通过RAG&#xff0…

【Docker学习】详细讲解docker ps

docker ps是我们操作容器次数最多的命令之一&#xff0c;但我们往往使用docker ps或是docker ps -a&#xff0c;对于该命令的其它选项&#xff0c;我们关注比较少。那么这一讲&#xff0c;我给大家详细讲讲该命令的全部方法。 命令&#xff1a; docker container ls 描述&am…

web题解,基础知识巩固(qsnctf)

1.文章管理系统 1&#xff09;打开题目&#xff0c;把它页面翻完了&#xff0c;没看懂它有啥用 2&#xff09;看了看源码&#xff0c;也是一样的&#xff0c;没找到有用的东西 3&#xff09;想着可能还是在隐藏文件里找&#xff0c;那我就直接用dirsearch扫扫看 4&#xff09;…

常见API(JDK7时间、JDK8时间、包装类、综合练习)

一、JDK7时间——Date 1、事件相关知识点 2、Date时间类 Data类是一个JDK写好的Javabean类&#xff0c;用来描述时间&#xff0c;精确到毫秒。 利用空参构造创建的对象&#xff0c;默认表示系统当前时间。 利用有参构造创建的对象&#xff0c;表示指定的时间。 练习——时间计…

Flink 数据源

原理 在 Flink 中&#xff0c;数据源&#xff08;Source&#xff09;是其中一个核心组件&#xff0c;负责从各种来源读取数据供 Flink 程序处理。 Flink 的数据源类型丰富&#xff0c;涵盖了从简单测试到生产环境使用的各种场景。Kafka、Socket、文件和集合是 Flink 中最常见…

Java后端面经

1.可重复读&#xff0c;已提交读&#xff0c;这两个隔离级别表现的现象是什么&#xff0c;区别是什么样的&#xff1f; 可重复读&#xff1a;表示整个事务看到的事务和开启后的事务能看到的数据是一致的&#xff0c;既然数据是一致的&#xff0c;所以不存在不可重复读。而且不…

【傻呱呱】VirtualHere共享局域网中的USB设备(使用Pavadan老毛子固件搭建篇)

前期准备 SSH工具&#xff08;FinalShell&#xff09;老毛子固件路由器一台 搭建VirtualHere服务端 进入VirtualHere官网下载对应处理器架构的包&#xff0c;我的是RT-N14U-GPIO路由器刷的老毛子固件&#xff0c;这种一般选择最后一个或者倒数第二个包&#xff0c;这里我选择…

[NOIP 2014] 寻找道路

[NOIP 2014] 寻找道路 在有向图 G 中&#xff0c;每条边的长度均为 11&#xff0c;现给定起点和终点&#xff0c;请你在图中找一条从起点到终点的路径&#xff0c;该路径满足以下条件&#xff1a; 路径上的所有点的出边所指向的点都直接或间接与终点连通。在满足条件 11 的情…

01Python相关基础学习

Python基础 模块相关导入模块sys模块 模块相关 导入模块 1. import 模块名 2. import 模块名 as 别名 3. from 模块名 import 成员名 as 别名sys模块 1. sys.argv 介绍: 实现从程序的外部想程序传递参数返回的是一个列表,第一个元素是程序文件名,第二个元素是程序外部传入的…

景源畅信:新手做抖音运营难不难?

在这个信息爆炸的时代&#xff0c;社交媒体平台如抖音已经成为了人们日常生活中不可或缺的一部分。随着抖音的兴起&#xff0c;越来越多的人开始尝试进入这个领域&#xff0c;希望通过抖音运营实现自己的价值。然而&#xff0c;对于新手来说&#xff0c;抖音运营是否真的容易呢…

网络安全等级保护2.0(等保)是什么

等保的全称是信息安全等级保护&#xff0c;是《网络安全法》规定的必须强制执行的&#xff0c;保障公民、社会、国家利益的重要工作。 通俗来讲就是&#xff1a;公司或者单位因为要用互联网&#xff0c;但是网上有坏人&#xff0c;我们不仅要防御外部坏人&#xff0c;还要看看…

我的世界开服保姆级教程

前言 Minecraft开服教程 如果你要和朋友联机时&#xff0c;可以选择的方法有这样几种&#xff1a; 局域网联机&#xff1a;优点&#xff1a;简单方便&#xff0c;在MC客户端里自带。缺点&#xff1a;必须在同一局域网内。 有些工具会带有联机功能&#xff1a;优点&#xff1a;一…

云原生Kubernetes: 云主机部署K8S 1.30版本 单Master架构

目录 一、实验 1.环境 2.Termius连接云主机 3.网络连通性与安全机制 4.云主机部署docker 5.云主机配置linux内核路由转发与网桥过滤 6.云主机部署cri-dockerd 7.云主机部署kubelet,kubeadm,kubectl 8.kubernetes集群初始化 9.容器网络&#xff08;CNI&#xff09;部署…