Flink,spark对比

三:az 如何调度Spark、Flink,MR 任务
首先,使用java编写一个spark任务,定义一个类,它有main方法,里面写好逻辑,sparkConf 和JavaSparkContext 获取上下文,然后打成一个jar包,创建一个sh文件,使用spark提交任务的spark-submit 命令,指定jar包和对应的类名,和运行的参数,然后在job 文件里面指定sh 脚本,接着dependencies指定好依赖就行。最终打包成一个zip包上传。

如果是提交flink任务呢,也是定义一个类,在main方法里,Flink 流批任务只需要分别使用StreamExecutionEnvironment或者ExecutionEnvironment获取对应的执行环境,然后获取到DataStream 或者DataSet, 然后进行一系列的转换,最终达成一个jar 包,它是使用/bin/flink run 去提交任务的,后面的参数指定和spark 大同小异 ,az 也大同小异

MR 如何提交任务呢,肯定要编写Mapper和Reducer的实现处理类,然后有个主类,获取到Hadoop 的Configuration 的对应环境配置,获取到job 指定输入输出以及Mapper以及reducer类,然后打包成一个jar包,使用hadoop jar xx.jar 提交任务。

四:简单介绍下Flink
那就对比下Flink,Spark,MapReduce
Flink ,大数据分布式处理框架,从流处理开始,打造流批一体的框架,用于对无界和有界的数据流进行有状态计算,提供了诸多高级api供用户开发分布式任务,提供了数据分布,容错机制,资源管理和调度等功能

4.1: 首先从编程模型来看,MR的基础就是一条record,spark 就是RDD,rdd就是一批数据,而Flink 是DataStream 和 DataSet,这两个也是一批数据;从这个最开始的编程模型的输入来看就知道spark以及Flink 比 MR 快,后续的数据转换spark和Flink 都有丰富的算子(transform和collect 算子,flink是operator chain),而MR就很局限了,要自己定义
4.2:从数据流转的介质来看,MR会落盘,就是那个Map阶段的结尾会落盘,涉及到磁盘I/O,比较耗费时间;其实Flink 和Spark 也会进行数据的落盘,但是他们和mr的最大的本质不同就是他们可以把数据放在内存中,最后再落盘,而MR一定会落盘;

4.3:算子方面,flink是dataset api,DataStream API, table api, sql;而spark 是 RDD, DataSet, DataFrame, sparkSql;Flink 的核心引擎是runTime,spark的是SparkCore

五:Flink 和sparkStreaming 的区别
5.1: 一个实时,一个微批
5.2: 一个使用StreamingExecutionEnvironment, 一个使用JavaStreamingContext;
5.3: 一个DataStream, 一个是Dstream 的流数据
5.4: 任务调度来说,一个是会依次创建StreamGraph, JobGraph, ExecutionGraph,JobManager 调度ExecutionGraph;而另一个是 创建DstreamGraph, JobGenerator, 和JobScheduler
5.5: 时间机制方面,一个是有数据时间,摄入时间和处理时间;而sparkStreaming 是只有处理时间
5.6: 容错方面,Flink 有分布式快照,使用两阶段提交协议可以做到只有一次处理,而sparkStreaming 也有checkpoint ,能恢复数据,但是做不到恰好一次处理,可能会重复。

六:Flink 和spark的checkpoint 的异同点
6.1: checkpoint 说白了都是为了持久化数据的,Flink 是保存比如某个数据的状态,说白了就是会动态变化的值,比如用户的订单总额就是用户订单数据的状态,而spark 是保存RDD的数据到hdfs,截断RDD,防止数据异常中断,可以恢复;不过都是把内存中的数据持久化到外部的系统中,这里一般是hdfs,持久化嘛
6.2: checkpoint的触发方式不一样,Flink 的checkpoint 是由jobManager 定时触发的,如果配置了的话;而Spark是需要在代码中手动触发的
6.3: checkpoint 的触发机制不一样,Flink的checkpoint 说白了有两个阶段,预提交阶段和提交阶段,预提交阶段会做三个事,如下所示:
6.3.1: 进行checkpoint, 比如记录了用户1和2的订单金额分别是200和300
6.3.2: 写WAL 日志,就是用户1和2又有新的动作,由增加了订单金额100和50(这个可以认为是状态)
6.3.3: 锁定资源,告诉外部系统,用户1和2的订单总金额分别是300和350,但是让外部系统知道,并不是立马更新
如果上述有任何一步失败,我们都会滚到上个checkpoint,然后接下来就是提交阶段,会做两个事:
6.3.4: 把checkpoint 的状态提交
6.3.5: 外部系统更新对应的订单总金额300和350

如果是spark的checkpoint ,则直接把数据存储到hdfs了,没有啥特殊的。

7:Flink 和Spark的集群规模
Flink on yarn,一般是10台;cpu核数是36;内存是128G;
spark on yarn,是200台,pb级别的数据,cpu 核数是36,内存是128G

8:Flink 和spark, yarn 的集群角色
8.1:说明
Flink 是有client,jobManager 以及taskManger;client 是提交任务的作用,并且接收结果返回;而JobManager 接收提交任务,进行任务调度,故障恢复,容错管理;管理tm;
spark 也是有driver,master 以及 worker,和flink的一一对应,此外还有个executor 和 clusterManager
yarn 则是有ResourceManager(整体资源的管理), NoderManager(管理节点上的资源), ApplicationMaster(一个应用程序的管理者),Container(实际运行程序的容器)以及Client

9:flink 以及 spark 还有Mr 提交任务到yarn上的流程对比
9.1:Flink 提交任务流程如下,Flink 支持三种模式,session 模式,perJob模式和Application 模式,前面两者都相当于spark的yarn-cleint 模式,一个是共享资源,一个独享资源;而Application 模式是相当于spark的yarn-cluster 模式,客户端在yarn上,生产环境使用application模式,如下所示:
在这里插入图片描述
这里的ResourceManager 是flink 自己的,不是yarn的

9.2:spark 在yarn上有yarn-client 模式和yarn-cluster 模式之分,一般我们使用yarn-cluster 模式,这个最主要的点就是driver 是在客户端还是yarn上,这里的applicationMaster 就可以理解为Driver,生产环境如下:
在这里插入图片描述
10. Flink 的TaskSlot
它的目的是为了控制一个taskManager 能运行多少个task,所以对资源进行了分配,划分成不同的slot,一般和cpu是1:1 的关系,所以一个算子分布在不同的taskManger 上面,在一个tm的并行度和slot是一比一的关系,那么全局的并行度就是我们自己设置的并行度了,不过我们在考虑的时候就是考虑单个tm里面的并行度好点;slot 做了内存隔离,没有做cpu的隔离。

11:Flink 和spark的常用算子比较
FLink 独有的算子,keyBy, process, window
spark 独有的,mapPartition, repartition,colease, union ; transformation 和 action 算子

12.Flink 分区策略
GlobalPartitioner; ShufflePartitioner, RebalancePartitioner; RescalePartitioner(根据上下游算子的并行度分发数据), BrodcastPartitioner,ForwardPartitioner(上下游算子并行度一致);KeyGroupStreamPartitioner(Hash分区),CustomPartitioner(自定义分区策略)
Flink的默认分区数就是等于并行度

spark的默认分区数等于cpu的核数,也可以使用repartition,

13:Flink 和Spark的编程流转区别
Flink 流式这边一直返回的会是DataStream, 批返回的是DataSet的数据集
而Spark这边流失返回的会是Dstream以及衍生类的数据集,而批返回的则是RDD以及衍生类的数据集

14: Spark 和Flink 的序列化
为什么这两者都要实现自己的序列化框架呢,因为Java的序列化存储密度低,分布式计算的话内存要用在刀刃上,所以他们实现了自己的序列化框架,Spark 是使用了KyroSerializer 序列化,Flink的序列化的基本类是TypeInfomation.

15: Spark 和flink的反压机制
spark.streaming.backpressure.enabled, sparkStreaming 动态调整,
Flink 手动调整,看并行度,算子处理情况。

16:flink 和spark 数据在内存的抽象
16.1: 就是java对象 --StreamRecord–Buffer–memorySegment–Byte数组
16.2 RDD在缓存到内存之前,partition中record对象实例在堆内other内存区域中的不连续空间中存储。RDD的缓存过程中, 不连续存储空间内的partition被转换为连续存储空间的Block对象,并在Storage内存区域存储,此过程被称作为Unroll(展开)。

17: Spark 和Flink以及Hive 调优
都是从三个方面来说,
分别是资源调优,代码性能调优,业务调优
17.1: 对于spark 和Flink 来说,资源调优方面,可以使得单个executor 或者taskManager 可以使用的内存和cpu最大的话就尽量可以配置最大,先说spark;
17.1.1: spark一般调整的就是num-executors ,相当于flink的tm的个数;executro-memory, executor-cores,以及driver-memory 分别相当于tm的内存,tm的slot 个数,jm的内存;spark.default.parallelism 也相当于flink的并行度,spark.storage.memoryFraction 是用来持久化RDD的那部分内存,一般是executor-memory 堆内内存的60%的50%;spark.shuffle.memoryFraction就是用来shuffle的内存,和刚刚的一样,占有堆内内存的60%的50%;所以实际生产看看到底哪个用的多一点,就多给点

17.1.2: 在资源参数这里,hive需要调整的无非也是内存和cpu这方面,如下所示:
mapreduce.map.java.opts, map 阶段的jvm进程的堆内存;
mapreduce.map.memory.mb,map阶段的jvm 进程的堆内存和堆外内存的和;
mapreduce.reduce.java.opts,reduce 阶段的jvm进程的堆内存;
mapreduce.reduce.memory.mb,reduce 阶段的 的jvm 进程的堆内存和堆外内存的和;
mapreduce.map/reduce.cpu.vcores, map 和reduce 阶段可用的cpu 的个数;当给大点

但是hive中的map和reduce 的task的数量取决于总文件的个数和每个文件数的大小,一般是每个文件数的大小起作用,如下所示:
mapred.min/max.split.size,就是可以分割文件的最小和最大文件大小,但是map的task数量还不是由这个决定的,还是由多个因素决定的,看下图
在这里插入图片描述
因为hadoop系统中dfs.block.size 一般是128M,所以如果我们没有设置上述的最小和最大的话,就是默认按照128去分割,如果要提高task数量,要么提高mapred.map.tasks的数量,要么增大mapred.min.split.size 的大小,到256M也可以。

那么reduce的task的数量呢?
reducer_num = min(total_size/hive.exec.reducers.bytes.per.reducers, mapred.reduce.tasks);
所以最直接的办法是通过mapred.reduce.tasks = 10 来设定就可以,当然设定太小了执行时间会长,所以要居中;太大的话则小文件过多,也不好。

17.2: 算子性能调优
17.2.1: spark算子性能调优
spark.sql.adaptive.enabled 默认为false 自适应执行框架的开关
spark.sql.adaptive.skewedJoin.enabled 默认为 false 倾斜处理开关
spark.hadoop.mapreduce.input.fileinputformat.split.minsize 是用于聚合input的小文件,用于控制每个mapTask的输入文件,防止小文件过多时候,产生太多的task
spark.sql.autoBroadcastJoinThreshold 用于控制在spark sql中使用BroadcastJoin时候表的大小阈值,适当增大可以让一些表走BroadcastJoin,提升性能,但是如果设置太大又会造成driver内存压力
用 reduceByKey( ) 和 aggregrateByKey( ) 来取代 groupByKey,因为前者会进行预聚合
操作数据库建义采用foreachPartition( ) ,资源可以的情况下使用mapPartitions 代替map
数据复用使用persist
减少数据碎片使用 coalesce( )进行重分区
spark.shuffle.file.buffer参数是调节map端缓冲区大小,单位是kb,减少磁盘溢写次数;
spark.reducer.maxSizeInFlight 参数是调节shuffle的时候reduce端的缓冲区大小,单位是MB
spark.shuffle.io.maxRetries reduce端拉取重试次数,以及拉取失败等待间隔,spark.shuffle.io.retryWait,单位是s,比如60s
spark.shuffle.sort.bypassMergeThreshold, 如果确实不需要排序操纵,那就调大sortByPass的阈值,调大到400等,默认是200

17.2.2: Hive 性能调优
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 自动合并小文件
set hive.merge.mapredfiles = true; 设置reduce 端对输出文件的合并
set hive.archive.enabled=true; 使用hadoop archive 文件对小文件归档
set hive.mapred.mode=strict 开启严格模式;不允许对分区表查询where不带分区,order by 必须加上limit,不允许笛卡尔积等;
set hive.exec.parallel=true; //打开任务并行执行
set mapred.job.reuse.jvm.num.tasks=10 设置jvm重用
set hive.map.aggr=true; set hive.groupby.skewindata = true; 进行数据负载均衡,数据倾斜优化
set hive.fetch.task.conversion=more; 可以减少不必要的走mapreduce 任务
set hive.auto.convert.join = true; 开启map join

17.2.3: Flink 性能调优
算子方面暂无,主要是资源和倾斜方面,要改代码

17.3: 业务代码调优
最典型的问题,数据倾斜怎么办?
hive只能是自己可以通过刚刚那个skew_in_data 去均衡,那么flink 和spark呢?
17.3.1: spark和flink 数据倾斜处理
17.3.1.1: 碰到大量空值的或者就是某个大量值的,加上随机字符串,均匀shuffle
17.3…1.2: 把聚合的步骤往前放,放到hive或者mapreudce 里面去做
17.3.1.3: 过滤掉少数导致倾斜的key
17.3.1.4: 提高shuffle操作的并行度,增加并行处理能力
17.3.1.5: 两阶段聚合,局部聚合+全局聚合,对于倾斜的key打上随机浅醉,聚合后再去掉再聚合,这个适合聚合算子,不适合join
17.3.1.6: Reduce join 换成MapJoin
17.3.1.7: 倾斜key 拆分join,打上随机前缀,然后后续不倾斜的扩容和它join,最终过滤掉前缀得到正确结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/779696.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于机器学习(霍特林统计量,高斯混合模型,支持向量机)的工业数据异常检测(MATLAB R2021B)

近年来,隨着集散控制系统、工业物联网、智能仪表等信息技术在现代工业生产系统中的应用,生产过程的运行状态能够以大量数据的形式被感知和记录。基于数据的故障诊断方法以过程数据为基础,采用统计分析、统计学习、信号处理等方法,…

笔记:SpringBoot+Vue全栈开发2

笔记:SpringBootVue全栈开发2 1. MVVM模式2. Vue组件化开发3. 第三方组件element-ui的使用4. axios网络请求5. 前端路由VueRouter 1. MVVM模式 MVVM是Model-View-ViewModel的缩写,是一种基于前端开发的架构模式,其核心是提供对View和ViewMod…

【简单介绍下Memcached】

🌈个人主页: 程序员不想敲代码啊 🏆CSDN优质创作者,CSDN实力新星,CSDN博客专家 👍点赞⭐评论⭐收藏 🤝希望本文对您有所裨益,如有不足之处,欢迎在评论区提出指正,让我们共…

独立开发者系列(21)——HTTP协议的使用

作为网络访问的必备知识点,http协议,我们已经知道http协议属于tcp的一种,而且一般是用于网络通讯的,但是本身http协议本身包含的内容也很多,正是因为有这种协议,前后端和各种硬件接口/服务器接口/前端&…

VSCode远程服务器如何上传下载文件(超方便!)

方法一: 1、在VSCode应用商店安装SFTP插件 2、然后就可以直接把文件拖进VSCode即可,如下图所示: 这里的目录是我远程服务器上的目录,可以直接将要上传的文件直接拖进需要的文件夹 3、如果要从远程服务器上下载文件到本地&#x…

手写实现一个ORM框架

手写实现一个ORM框架 什么是ORM框架、ORM框架的作用效果演示框架设计代码细节SqlBuilderSqlExecutorStatementHandlerParameterHandlerResultSetHandler逆序生成实体类 大家好,本人最近写了一个ORM框架,想在这里分享给大家,让大家来学习学习。…

axios的使用,处理请求和响应,axios拦截器

1、axios官网 https://www.axios-http.cn/docs/interceptors 2、安装 npm install axios 3、在onMouunted钩子函数中使用axios来发送请求,接受响应 4.出现的问题: (1) 但是如果发送请求请求时间过长,回出现请求待处…

分布式共识算法

分布式的基石 分布式共识算法 前置知识:分布式的 CAP 问题,在事务一章中已有详细介绍。 正式开始探讨分布式环境中面临的各种技术问题和解决方案以前,我们先把目光从工业界转到学术界,学习两三种具有代表性的分布式共识算法&…

昇思MindSpore学习总结十——ResNet50迁移学习

1、迁移学习 (抄自CS231n Convolutional Neural Networks for Visual Recognition) 在实践中,很少有人从头开始训练整个卷积网络(使用随机初始化),因为拥有足够大小的数据集相对罕见。相反,通常…

Flask之电子邮件

前言:本博客仅作记录学习使用,部分图片出自网络,如有侵犯您的权益,请联系删除 目录 一、使用Flask-Mail发送电子邮件 1.1、配置Flask-Mail 1.2、构建邮件数据 1.3、发送邮件 二、使用事务邮件服务SendGrid 2.1、注册SendGr…

昇思25天学习打卡营第11天|MindSpore 助力下的 GPT2:数据集加载处理及模型全攻略

目录 环境配置 数据集下载和获取 数据集拆分 处理数据集 模型构建 ​​​​​​​模型训练 ​​​​​​​模型推理 环境配置 “%%capture captured_output”这一行指令通常旨在捕获后续整个代码块所产生的输出结果。首先,将已预装的 mindspore 库予以卸载。随后&a…

68.WEB渗透测试-信息收集- WAF、框架组件识别(8)

免责声明:内容仅供学习参考,请合法利用知识,禁止进行违法犯罪活动! 内容参考于: 易锦网校会员专享课 上一个内容:67.WEB渗透测试-信息收集- WAF、框架组件识别(7) 右边这些是waf的…

【Java学习笔记】方法的使用

【Java学习笔记】方法的使用 一、一个例子二、方法的概念及使用(一)什么是方法(二)方法的定义(三)方法调用的执行过程(四)实参和形参的关系(重要)&#xff08…

第1节、基于太阳能的环境监测系统——MPPT充电板

一、更新时间: 本篇文章更新于:2024年7月6日23:33:30 二、内容简介: 整体系统使用太阳能板为锂电池充电和系统供电,天黑后锂电池为系统供电,本节主要介绍基于CN3722的MPPT太阳能充电模块,这块主要是硬件…

如何从相机的存储卡中恢复原始照片

“不好了。” 当您意识到自己不小心从存储卡中删除了照片,或者错误地格式化了相机的记忆棒时,您首先会喊出这两个词。这是一种常见的情况,每个人一生中都会遇到这种情况。幸运的是,有办法从相机的 RAW 记忆棒中恢复已删除的照片。…

关于小爱同学自定义指令执行

1.前言 之前买了小爱同学音响,一直想让其让我的生活变得更智能,编写一些程序来完成一些自动化任务,但是经过搜索发现,官方开发者平台不能用了,寻找api阶段浪费了我很长时间。最后在github 开源项目发现了俩个比较关键…

gcc的编译C语言的过程

gcc的简介 GCC(GNU Compiler Collection)是由GNU项目开发和维护的一套开源编程语言编译器集合。它支持多种编程语言,包括但不限于C、C、Objective-C、Fortran、Ada等。GCC被广泛应用于编译和优化各种程序,是许多开发者和组织的首选…

防火墙基础及登录(华为)

目录 防火墙概述防火墙发展进程包过滤防火墙代理防火墙状态检测防火墙UTM下一代防火墙(NGFW) 防火墙分类按物理特性划分软件防火墙硬件防火墙 按性能划分百兆级别和千兆级别 按防火墙结构划分单一主机防火墙路由集成式防火墙分布式防火墙 华为防火墙利用…

ubuntu22.04+pytorch2.3安装PyG图神经网络库

ubuntu下安装torch-geometric库,图神经网络 开发环境 ubuntu22.04 conda 24.5.0 python 3.9 pytorch 2.0.1 cuda 11.8 pyg的安装网上教程流传着许多安装方式,这些安装方式主要是:预先安装好pyg的依赖库,这些依赖库需要对应上pyth…

C++11|包装器

目录 引入 一、function包装器 1.1包装器使用 1.2包装器解决类型复杂 二、bind包装器 引入 在我们学过的回调中,函数指针,仿函数,lambda都可以完成,但他们都有一个缺点,就是类型的推导复杂性,从而会…