spark中Rdd依赖和SparkSQL介绍--学习笔记

1,RDD的依赖

1.1概念

rdd的特性之一

相邻rdd之间存在依赖关系(因果关系)

窄依赖

每个父RDD的一个Partition最多被子RDD的一个Partition所使用

父rdd和子rdd的分区是一对一(多对一)

触发窄依赖的算子

map(),flatMap(),filter()

宽依赖

父RDD的一个partition会被子rdd的多个Partition所使用
父rdd和子rdd的分区是一对多
触发宽依赖的算子:
grouBy(),groupByKey,sortBy(),SortByKey(),reduceByKey,distinct()

1.3DAG图计算

DAG又叫做有向无环图

管理rdd依赖关系,保证rdd按照依赖关系进行数据的顺序计算

会根据rdd的依赖关系将计算过程分成多个计算步骤,每个计算步骤成为一个fasse

在计算的rdd依赖关系中,一旦发生了宽依赖就会进行数据才分生成新的stage

1.4Spark术语

app-应用程序(一个py文件/一个交互式页面)

job->作业(调用action算子时会触发job)

stage->计算步骤,由DAG图根据宽依赖产生新的stage

task->任务,有多少个分区数,就有多少个task任务,task任务是以线程方式执行

1.5为什么划分stage计算步骤

spark的task的任务是以线程方式并行计算

线程方式并行计算会有资源竞争导致计算不准确问题

通过stage来解决计算不准确的问题

同一个stage中数据不会进行shuffle(重新洗牌),多个task是可以并行计算

不同stage之间是需要等待上一个stage执行完成后(获取所有数据),再执行下一个stage
如何划分stage?
对于窄依赖,partition的转换处理在说stage中完成计算,不划分(将窄依赖尽量放在同一个stage中,可以实现流水线计算)
对于宽依赖,只能在父EDD处理完成后,才能开始接下来的计算也就是说需要划分stage
遇到宽依赖就需要划分stage
在这里插入图片描述

2,Spark的运行流程(内核调度)

Spark的核心是根据RDD来实现的,Spark Scheduler则为Spark核心实现的重要一环,其作用就是任务调度。Spark的任务调度就是如何组织任务去处理RDD中每个分区的数据,根据RDD的依赖关系构建DAG,基于DAG划分Stage,将每个Stage中的任务发到指定节点运行。基于Spark的任务调度原理,可以合理规划资源利用,做到尽可能用最少的资源高效地完成任务计算。

  • DAGScheduler
    • 根据rdd间的依赖关系,将提交的job划分成多个stage。
    • 对每个stage中的task进行描述(task编号,task执行的rdd算子)
  • TaskScheduler
    • 获取DAGScheduler提交的task
    • 调用SchedulerBackend获取executor的资源信息
    • 给task分配资源,维护task和executor对应关系
    • 管理task任务队列
    • 将task给到SchedulerBackend,然后由SchedulerBackend分发对应的executor执行
  • SchedulerBackend
    • 向RM申请资源
    • 获取executor信息
    • 分发task任务
      在这里插入图片描述

3,Spark的shuffle过程

spark shuffle的两个部分

  • shuffle write 写 map阶段,上一个stage得到最后的结果写入
  • shuffle read 读 reduce阶段,下一个stage拉取上一个stage进行合并
  • 会进行文件的读写,影响spark的计算速度

sortshuffle

  • 进行的是排序计算
  • bypass模式版本和普通模式版本
  • bypass模式版本不会排序,会进行hash操作
  • 普通模式版本会进行排序
  • 可以通过配置指定按照哪种模式执行

在这里插入图片描述
无论是hash还是排序都是将相同key值放在一起处理

  • [(‘a’,1),(‘b’,2),(‘a’,1)]
  • hash(key)%分区数,相同的key数据余数是相同的,会放一起,交给同一个分区进行处理
  • 按照key排序,相同key的数据也会放在一起 ,然后交给同一分区处理

3.1 sparkShuffle配置

spark.shuffle.file.buffer
(默认是32K)。将数据写到磁盘文件之前会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。 可以将其调整为原来的2倍 3倍
spark.reducer.maxSizeInFlight
如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m,默认48M),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。
spark.shuffle.io.maxRetries :
shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。(默认是3次)
spark.shuffle.io.retryWait:
该参数代表了每次重试拉取数据的等待间隔。(默认为5s)
调优建议:一般的调优都是将重试次数调高,不调整时间间隔。
spark.shuffle.memoryFraction=10
该参数代表了Executor 1G内存中,分配给shuffle read task进行聚合操作内存比例。
spark.shuffle.manager
参数说明:该参数用于设置shufflemanager的类型(默认为sort)

Hash:spark1.x版本的默认值,HashShuffleManager

Sort:spark2.x版本的默认值,普通机制。当shuffle read task 的数量小于等于200采用bypass机制
spark.shuffle.sort.bypassMergeThreshold=200

  • 根据task数量决定sortshuffle的模式
  • task数量小于等于200 就采用bypass task大于200就采用普通模式
    当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些
pyspark --master yarn --name shuffle_demo --conf 'spark.shuffle.sort.bypassMergeThreshold=300'

代码中配置

SparkConf().set('spark.shuffle.sort.bypassMergeThreshold','300')

# 将配置添加到sparkcontext中
# appName 就是计算任务名称指定和--name作用一样
# conf 参数就算是指定配置信息的
sc = SparkContext(master='yarn',appName='shuffle_demo',conf=conf)

4,spark并行度

4.1 资源并行度(物理并行)

资源并行度调整只能通过交互式,不能通过脚本,由executors节点数和cores核数决定

spark中cpu核心数据设置

  • –num-executors=2 设置executors数量
  • –executor-cores=2 设置每个executors中的cpu核心数,不能超过服务器cpu核心数

4.2数据并行度(逻辑并行)

由task数量决定,task由分区数决定。

为了保证task能充分利用cpu资源,实现并行计算,需要设置的task数量应该和资源并行度(cpu核心数)一致

  • task = cpu core 这样会导致计算快的task执行结束后,一些资源就会处于等待状态,浪费资源

    在实际公司中就要根据公司资源并行度设置分区数

  • 有的场景下公司会要求数据并行度大于资源并行度

  • 建议task数量是cpu core的2~3倍

  • 只有task足够多才能更好的利用资源,但是如果task很多的话,资源少,那么就会先执行一批后再执行下一批

4.3并行度设置

交互式模式

pyspark --master yarn --num-executors=3  --executor-cores=2

开发模式设置

spark-submit --master yarn --num-executors=3 --executor-cores=2  /root/python_spark/a.py

4.1spark调优

  • 并行度
    • 调整集群节点数和核心数
    • 调整数据分区数
  • shuffle
    • 调整缓冲区的大小 32kb 32M
    • 调整shuffle模式
    • 调整分配给read的过程聚合操作的内存大小
  • cache和checkpoint
    • 提升计算效率
    • 容错性
  • sql代码编写调优

5,sparkSql介绍

5.1什么是sparksql

是spark的一个模块

是Apache Spark用于处理数据结构化数据的模块

结构化数据

表数据 包含行,列字段

python ->DataFrame数据类型

java/scala - >Dataset数据类型

1.2特点

融合性

可以使用纯sql进行数据计算

可以使用DSL方式进行数据计算->将sql中的关键字替换成方法进行使用

统一数据访问

read->读取mysql/hdfs/es/kafka/hbase/文件数据转换成DataFrame数据类型

write->将计算结果保存到mysql/hdfs/es/kafka/hbase/文件兼容Hive

支持Hivesql转换成spark计算任务

标准化数据连接

可以使用三方工具(pycharm/datagrip)通过jdbc或odbc方式连接Spark Sql

5.2数据类型

三种数据类型

RDD:spark中最基础的数据类型,所有的组件的代码都是要转换成rdd任务进行执行,只是存储的数据值

Dataframe

sparksql中数据类型,结构化数据类型。

存储了数据值行数据,row对象

存储了表结构(schema)->schema对象

一条数据就是rdd中的一个元素

dataset类型->java/scale

一条数据就是一个dataframe

5.3DataFrame基本使用

from pyspark.sql import Row
from pyspark.sql.types import  *
from pyspark.sql import SparkSession

#SparkSession类型
#SparkSession.builder  类名.属性名  -》返回builser类的对象


ss = SparkSession.builder.getOrCreate()

#row对象
row1 = Row(id = 1,name = '小明',age = 22)
row2 = Row(id = 2,name = '小红',age = 22)

#创建schema对象(指定数据类型,方式一)
schemal1 = StructType().add('id',IntegerType(),True).\
    add('name',StringType(),False).\
    add(field='age',data_type=IntegerType(),nullable=True)

#创建dataframe数据df对象
#data:接受的是一个结构化数据类型,二维数据类型[[],[]],[(),()],[{},{}]等都可以

df1 = ss.createDataFrame(data=[row1,row2],schema=schemal1)

df1.show()
# 结果
# +---+----+---+
# | id|name|age|
# +---+----+---+
# |  1|小明| 22|
# |  2|小红| 22|
# +---+----+---+
df1.printSchema()
# 结果
# root
#  |-- id: integer (nullable = true)
#  |-- name: string (nullable = false)
#  |-- age: integer (nullable = true)
#指定数据类型(方式二)
schemal2 = 'id int ,name string,age int'

data_list = [(1,'张三',20),(2,'李四',30)]
df2 = ss.createDataFrame(data=data_list,schema=schemal2)
df2.show()


data_list2 = [{'id':1,'name':'阿三','age':5},{'id':2,'name':'王六','age':80}]
#不指定数据类型会自动创建
df3 = ss.createDataFrame(data_list2)
df3.show()
# 结果
# +---+---+----+
# |age| id|name|
# +---+---+----+
# |  5|  1|阿三|
# | 80|  2|王六|
# +---+---+----+
df3.printSchema()
# 结果
# root
#  |-- age: long (nullable = true)
#  |-- id: long (nullable = true)
#  |-- name: string (nullable = true)

5.4Rdd和Dataframe的相互转换

    from pyspark.sql import SparkSession
    from  pyspark.sql.types import *
    
    
    #创建ss对象
    ss = SparkSession.builder.getOrCreate()
    
    #创建sc对象(ss可以通过sparkContext方法将自己转化为sc对象)
    #@property装饰器可以实现调用方法时通过属性方式来调用
    sc = ss.sparkContext
    
    
    #sc对象才能创建rdd对象
    #rdd数据结构时列表嵌套->二位数据结构
    rdd1 = sc.parallelize([[1,'张三',20],[2,'李四',34]])
    
    df1 = ss.createDataFrame(data=rdd1)
    df1.show()
    # +---+----+---+
    # | _1|  _2| _3|
    # +---+----+---+
    # |  1|张三| 20|
    # |  2|李四| 34|
    # +---+----+---+
    df1.printSchema()
    
    #创建schema对象(指定数据类型,方式一)
    schemal1 = StructType().add('id',IntegerType(),True).\
        add('name',StringType(),False).\
        add(field='age',data_type=IntegerType(),nullable=True)
    
    #将rdd结构的数据转换为dataFrame
    df2 = ss.createDataFrame(data=rdd1,schema=schemal1)
    print(type(df2))
    #结果<class 'pyspark.sql.dataframe.DataFrame'>
    df2.show()
    
    rdd1 = df2.rdd
    print(type(rdd1))
    #结果<class 'pyspark.rdd.RDD'>
    
    
    
    ######################
    
    schemal2 = 'id int ,name string,age int'
    
    df3 = ss.createDataFrame(data=rdd1,schema=schemal2)
    df3.show()
    
    #schema:后面只需要传入列名列表不需要类型
    df4 = rdd1.toDF(schema=['id' ,'name','age'])
    print(type(df4))
    #<class 'pyspark.sql.dataframe.DataFrame'>
    df4.show()
    #结果
    # +---+----+---+
    # | id|name|age|
    # +---+----+---+
    # |  1|张三| 20|
    # |  2|李四| 34|
    # +---+----+---+
    
    new_rdd = df3.rdd
    print(type(new_rdd))
    #结果<class 'pyspark.rdd.RDD'>
    print(new_rdd.collect())
    
    #获取rdd中每个row对象元素中的id列的值
    #x->
    rdd_map = new_rdd.map(lambda x:x.id)
    print(rdd_map.collect())

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/318505.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

强化学习应用(七):基于Q-learning的无人机物流路径规划研究(提供Python代码)

一、Q-learning简介 Q-learning是一种强化学习算法&#xff0c;用于解决基于马尔可夫决策过程&#xff08;MDP&#xff09;的问题。它通过学习一个价值函数来指导智能体在环境中做出决策&#xff0c;以最大化累积奖励。 Q-learning算法的核心思想是通过不断更新一个称为Q值的…

【每日小bug】mybatis plus id注解错误导致的问题

插入数据 id不为自增 指定了主键&#xff0c;没有指定自增。会导致出现 修改如上 报错 Data truncation: Out of range value for column ‘id’ at row 1 数据库是bigint&#xff0c;java中是Integer。 修改如上

GCC工具源码编译

文章目录 背景一、下载源码二、编译前依赖准备2.1 相关工具依赖2.2 相关lib&#xff08;gmp/ mpfr /mpc&#xff09;依赖2.2.1 lib源码下载2.2.2 lib源码编译 三、编译GCC3.1 编译3.2 链接 四、报错处理 背景 日常可能涉及到系统里自带GCC版本与被编译源码存在不兼容&#xff…

1 - Spring 基本介绍

官网&#xff1a;https://spring.io/ Spring 是一个可以管理整合其他框架的框架 1. IOC 开发模式 程序不再负责对象的创建&#xff0c;而是直接使用ioc容器的对象来完成相关的业务逻辑 1.1 控制反转实现思想 1&#xff09;Spring 根据配置文件 xml/注解&#xff0c;创建对象…

AR HUD全面「上新」

AR HUD赛道正在迎来新的时代。 上周&#xff0c;蔚来ET9正式发布亮相&#xff0c;新车定位为D级行政旗舰轿车&#xff0c;其中&#xff0c;在智能座舱交互层面&#xff0c;继理想L系列、长安深蓝S7之后&#xff0c;也首次取消仪表盘&#xff0c;取而代之的是业内首个全焦段AR H…

9.5.1 函数模板特化

函数模板 有了泛化版本比较函数&#xff0c;我们可以比较两个整数&#xff0c;两个字符&#xff0c;两个指针 6~10行&#xff0c;是一个函数模板 13~16行&#xff0c;都可以得到正常结果 22行&#xff0c;得到的结果是&#xff0c;"A001" < "A000", …

亚马逊时尚如何运用人工智能帮助您找到合适的尺码

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

命令行(无图形界面)登录dlut-lingshui

1 登录原理 利用python的requests库向校园网认证服务器发送认证请求。 2 登录步骤 获取校园网认证界面的用户名和密码。用户名是自己学号&#xff1b;密码由网页加密&#xff0c;需要一台有图形界面的电脑辅助获取&#xff0c;获取方法见下一节。把获取到的用户名和密码填入…

【AIGC】IP-Adapter:文本兼容图像提示适配器,用于文本到图像扩散模型

前言 IPAdapter能够通过图像给Stable Diffusion模型以内容提示&#xff0c;让其生成参考该图像画风&#xff0c;可以免去Lora的训练&#xff0c;达到参考画风人物的生成效果。 摘要 通过文本提示词生成的图像&#xff0c;往往需要设置复杂的提示词&#xff0c;通常设计提示词变…

【JavaSE语法】图书管理系统实现详解

图片出处&#xff1a;The worlds biggest drone photo and video sharing platform | SkyPixel.com 导言 在学完JavaSE语法后&#xff0c;我们就可以去尝试写一个简单的图书管理系统来进一步提升我们面对对象编程的思想。在该系统中会涉及到数组&#xff0c;接口&#xff0c;封…

案例118:基于微信小程序的电影院订票选座系统设计及实现

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…

安卓手机变iOS!

Launcher iOS 16 - 安卓手机秒变iOS Launcher iOS 16 是一款iOS启动器&#xff0c;可以将安卓手机桌面变成iOS样子&#xff0c;还有iOS的开机动画和景深效果&#xff01; 下载链接&#xff1a;【Launcher iOS 16】 ​

python flask学生管理系统

预览 前端 jquery css html bootstrap: 4.x 后端 python: 3.6.x flask: 2.0.x 数据库 mysql: 5.7 学生管理模块 登录、退出查看个人信息、修改个人信息成绩查询查看已选课程选课、取消选课搜索课程课程列表分页功能 教师模块 登录、退出查看个人信息、修改个人信息录入…

为什么选择Go语言编写网络应用程序

关注公众号【爱发白日梦的后端】分享技术干货、读书笔记、开源项目、实战经验、高效开发工具等&#xff0c;您的关注将是我的更新动力&#xff01; 作为一名后端开发者&#xff0c;你一定对选择合适的编程语言来编写网络应用程序非常重视。在众多的编程语言中&#xff0c;Go语言…

Vue keep-alive的使用和原理解析

✨ 专栏介绍 在当今Web开发领域中&#xff0c;构建交互性强、可复用且易于维护的用户界面是至关重要的。而Vue.js作为一款现代化且流行的JavaScript框架&#xff0c;正是为了满足这些需求而诞生。它采用了MVVM架构模式&#xff0c;并通过数据驱动和组件化的方式&#xff0c;使…

持续集成-Jenkins显示HTML报告

1 需要安装startup-trigger-plugin和Groovy插件。 2 在Job配置页面&#xff0c;构建触发器&#xff0c;勾选Build when job nodes start&#xff1b; 3 在Job配置页面&#xff0c;增加构建步骤Execute system Groovy script&#xff0c;在Groovy Command中输入上面命令&…

图神经网络|图注意网络Graph Attention Network

图注意网络Graph Attention Network Leaky ReLU 有利于压低负数对结局的影响。 图注意网络Graph Attention Network的流程 输入向量 h i h_i hi​乘上权重矩阵W得到对应的向量 h i ∗ h_i^* hi∗​,并将 h i ∗ h_i^* hi∗​计算出对应的 a i a_i ai​,从而得到最终对结果向量…

AcWing 103. 电影(map、pair连用or离散化)

题目 方法一&#xff08;mappair&#xff09; 其实上面这么长巴拉巴拉就是在说 首先&#xff0c;每个科学家会的语言都不同。但是呢每部电影的字幕和语言是不一样的&#xff08;字幕和语言一定不相同&#xff09; 要求找到一部电影使得在场能听懂的科学家最多&#xff08;如果存…

neo4j 图数据库 py2neo 操作 示例代码

文章目录 摘要前置NodeMatcher & RelationshipMatcher创建节点查询获取节点节点有则查询&#xff0c;无则创建创建关系查询关系关系有则查询&#xff0c;无则创建 Cypher语句创建节点 摘要 利用py2neo包&#xff0c;实现把excel表里面的数据&#xff0c;插入到neo4j 图数据…

5288 SDH/PDH数字传输分析仪

5288 SDH/PDH数字传输分析仪 数字通信测量仪器 5288 SDH/PDH数字传输分析仪为高性能手持式数字传输分析仪&#xff0c;符合ITU-T SDH/PDH技术规范和我国光同步传输网技术体制的规定,支持2.048、34.368、139.264Mb/s及155.520Mb/s传输速率的测试。可进行SDH/PDH传输设备和网络的…