大数据学习之Spark分布式计算框架RDD、内核进阶

一.RDD

28.RDD_为什么需要RDD

29.RDD_定义

30.RDD_五大特性总述

31.RDD_五大特性1

32.RDD_五大特性2

33.RDD_五大特性3

34.RDD_五大特性4

35.RDD_五大特性5

36.RDD_五大特性总结

37.RDD_创建概述

38.RDD_并行化创建

演示代码:
// 获取当前 RDD 的分区数
@Since ( "1.6.0" )
final def getNumPartitions : Int =
partitions . length
// 显示出 RDD 被分配到不同分区的信息
/**Return an RDD created by coalescing all
elements within each partition into an
array.*/
def glom (): RDD [ Array [ T ]]
1
2
3
4
5
6
package com . itbaizhan . rdd
//1. 导入 SparkConf 类、 SparkContext
import org . apache . spark . rdd . RDD
import org . apache . spark .{ SparkConf ,
SparkContext }
object CreateByParallelize {
def main ( args : Array [ String ]): Unit = {
   
//2. 构建 SparkConf 对象。并设置本地运行和程序的
名称
   
val conf = new
SparkConf (). setMaster ( "local[2]" ). setAppName
( "CreateRdd1" )
   
//3. 构建 SparkContext 对象
   
val sc = new SparkContext ( conf )
   
//4. 通过并行化创建 RDD 对象:将本地集合 -> 分布式的
RDD 对象
1
2
3
4
5
6
7
8
9
10
11
12
79    
//val rdd: RDD[Int] =
sc.parallelize[Int](List(1, 2, 3, 4, 5, 6,
7, 8))
   
val rdd : RDD [ Int ] =
sc . parallelize ( List ( 1 , 2 , 3 , 4 , 5 , 6 , 7 ,
8 ), 3 )
   
//5. 输出默认的分区数
   
//5.1
setMaster("local[*]")&¶llelize(List(1,
2, 3, 4, 5, 6, 7, 8))
   
//println(" 默认分区
数: "+rdd.getNumPartitions)//8, 默认当前系统的
CPU
   
//5.2
setMaster("local[2]")&¶llelize(List(1,
2, 3, 4, 5, 6, 7, 8))
   
//println(" 默认分区
数: "+rdd.getNumPartitions)//2
   
//5.3
setMaster("local[2]")&¶llelize(List(1,
2, 3, 4, 5, 6, 7, 8),3)
   
println ( " 默认分区
数: " + rdd . getNumPartitions ) //3
   
//6.collect 方法:将 rdd 对象中每个分区的数据,都
发送到 Driver ,形成一个 Array 对象
   
val array1 : Array [ Int ] = rdd . collect ()
println ( "rdd.collect()=" + array1 . mkString ( ",
" ))
   
//7. 显示出 rdd 对象中元素被分布到不同分区的数据信
13
14
15
16
17
18
19
20
21
22
23
24
25
80 运行结果:
实时效果反馈
1. 以下关于并行化创建 RDD 的描述错误的是:
A
通过并行化集合创建,将本地集合对象转分布式 RDD
B
parallelize() 方法必须传递两个参数。
C
parallelize 没有给定分区数 , 默认分区数等于执行程序的当前
服务器 CPU 核数。
答案:
   
val array2 : Array [ Array [ Int ]] =
rdd . glom (). collect ()
   
println ( "rdd.glom().collect() 的内容是 :" )
   
/*for(eleArr<- array2){
     
println(eleArr.mkString(","))
   
}*/
array2 . foreach ( eleArr => println ( eleArr . mkStr
ing ( "," )))
}
}
26
27
28
29
30
31
32
33
默认分区数: 3
rdd.collect()=1,2,3,4,5,6,7,8
rdd.glom().collect() 的内容是 :
1,2
3,4,5
6,7,8

39.RDD_读取文件创建RDD

40.RDD_读取小文件创建RDD

扩展 wholeTextFiles 适合读取一堆小文件:
//path 指定小文件的路径目录
//minPartitions 最小分区数 可选参数
def wholeTextFiles ( path :
String , minPartitions : Int =
defaultMinPartitions ): RDD [( String , String )]
1
2
3
85 代码演示:
package com . itbaizhan . rdd
//1. 导入类
import org . apache . spark . rdd . RDD
import org . apache . spark .{ SparkConf ,
SparkContext }
object CreateByWholeTextFiles {
def main ( args : Array [ String ]): Unit = {
   
//2. 构建 SparkConf 对象,并设置本地运行和程序名
   
val conf : SparkConf = new
SparkConf (). setMaster ( "local[*]" ). setAppName
( "WholeTextFiles" )
   
//3. 使用 conf 对象构建 SparkContet 对象
   
val sc = new SparkContext ( conf )
   
//5. 读取指定目录下的小文件
   
val rdd : RDD [( String , String )] =
sc . wholeTextFiles ( "data/tiny_files" )
   
//(filePath1, " 内容 1"),(filePath2, " 内容
2"),...,(filePathN, " 内容 N")
   
val tuples : Array [( String , String )] =
rdd . collect ()
tuples . foreach ( ele => println ( ele . _1 , ele . _2 ))
   
//6. 获取小文件中的内容
   
val array : Array [ String ] =
rdd . map ( _ . _2 ). collect ()
   
println ( "---------------------------" )
   
println ( array . mkString ( "|" ))
   
//4. 关闭 sc 对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
86 运行输出结果 :
RDD_ 算子概述
定义: 分布式集合 RDD 对象的方法被称为算子
算子分类:
Transformation 转换算子
1
Action 行动算子
2
   
sc . stop ()
}
}
22
23
24
(file:/D:/codes/itbaizhan/sparkdemo/data/tin
y_files/file1.txt,hello Linux
hello Zookeper
hello Maven
hello hive
hello spark)
(file:/D:/codes/itbaizhan/sparkdemo/data/tin
y_files/file2.txt,Spark Core
Spark RDD
Spark Sql)
----------------
hello Linux
hello Zookeper
hello Maven
hello hive
hello spark|Spark Core
Spark RDD
Spark Sql

41.RDD_算子概述

42.RDD_转换算子map

43.RDD_转换算子flatmap

44.RDD_转换算子reducebykey

45.RDD_转换算子filter

46.RDD_转换算子distinct

47.RDD_转换算子glom

48.RDD_转换算子groupby

object RddGroupBy {
def main ( args : Array [ String ]): Unit = {
   
//2. 构建 SparkConf 对象,并设置本地运行和程序名
   
val conf : SparkConf = new
SparkConf (). setMaster ( "local[*]" ). setAppName
( "groupBy" )
   
//3. 使用 conf 对象构建 SparkContet 对象
   
val sc = new SparkContext ( conf )
   
//5. 创建 Rdd
   
val rdd : RDD [( Char , Int )] =
sc . parallelize ( Array (( 'a' , 1 ), ( 'a' , 2 ),
( 'b' , 1 ), ( 'b' , 2 ), ( 'a' , 3 ), ( 'a' , 4 )))
   
//6. 通过 groupBy 算子对 rdd 对象中的数据进行分组
   
//groupBy 插入的函数的用意是指定按照谁进行分组
   
// 分组后的结果是有二元组组成的 RDD
   
val gbRdd : RDD [( Char , Iterable [( Char ,
Int )])] = rdd . groupBy ( tupEle => tupEle . _1 )
   
// 收集到 Driver
   
val result1 : Array [( Char ,
Iterable [( Char , Int )])] = gbRdd . collect ()
   
//(a,CompactBuffer((a,1), (a,2), (a,3),
(a,4))),(b,CompactBuffer((b,1), (b,2)))
   
println ( result1 . mkString ( "," ))
   
//7. 使用 map 转换算子
   
//(a,List((a,1), (a,2), (a,3), (a,4))),
(b,List((b,1), (b,2)))
   
val result2 : Array [( Char , List [( Char ,
Int )])] = gbRdd . map ( tup => ( tup . _1 ,
tup . _2 . toList )). collect ()
   
println ( result2 . mkString ( "," ))
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
104 实时效果反馈
1. 以下关于
rdd.groupBy(tupEle => tupEle._1)
的描述错误的是:
A
groupBy 传入的函数的意思是 : 通过这个函数 , 确定按照谁来
分组。
B
groupBy 方法适用于元素为元祖类型的 RDD ,元祖元素的个
数只能为 2
C
groupBy 方法适用于元素为元祖类型的 RDD ,元祖元素的个
>=2
答案:
1=>B

49.RDD_转换算子groupbyKey

50.RDD_转换算子sortby

51.RDD_转换算子sortbyKey

52.RDD_转换算子union并集

53.RDD_转换算子交集和差集

54.RDD_转换算子关联算子

55.RDD_转换算子partitionBy

56.RDD_转换算子mapPatitions

57.RDD_转换算子sample

58.RDD_行动算子foreachPartition

59.RDD_行动算子foreach

60.RDD_行动算子saveAsTestFile

61.RDD_行动算子countByKey

62.RDD_行动算子reduce

63.RDD_行动算子fold

64.RDD_行动算子first_take_count

65.RDD_行动算子top_takeOrderd

66.RDD_行动算子takeSample

二.内核进阶

67.内核进阶_DAG概述

68.内核进阶_血缘关系

69.内核进阶_宽窄依赖关系

70.内核进阶_stage划分

71.内核进阶_任务调度概述

72.内核进阶_管道计算模式上

73.内核进阶_管道计算模式下

74.内核进阶_cache缓存

75.内核进阶_checkpoint检查点

76.内核进阶_cache和checkpoint区别

77.内核进阶_并行度

78.内核进阶_广播变量

79.内核进阶_累加器一

80.内核进阶_累加器二

81.内核进阶_累加器之重复计算

82.内核进阶_项目实战PVUV需求分析

83.内核进阶_项目实战PV分析

84.内核进阶_项目实战UV分析

85.内核进阶_二次排序实战

86.内核进阶_分组取topN实战

87.内核进阶_卡口统计项目需求分析

88.内核进阶_卡口统计项目统计正常的卡口

89.内核进阶_卡口统计项目TOP5

90.内核进阶_卡口统计项目统计不同区域同时出现的车辆

91.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹一

92.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹二

93.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹三

94.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹四

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/965090.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

第一性原理:游戏开发成本的思考

利润 营收-成本 营收定价x销量x分成比例 销量 曝光量x 点击率x &#xff08;购买率- 退款率&#xff09; 分成比例 100%- 平台抽成- 税- 引擎费- 发行抽成 成本开发成本运营成本 开发成本 人工外包办公地点租金水电设备折旧 人工成本设计成本开发成本迭代修改成本后续内容…

MLA 架构

注&#xff1a;本文为 “MLA 架构” 相关文章合辑。 未整理去重。 DeepSeek 的 MLA 架构 原创 老彭坚持 产品经理修炼之道 2025 年 01 月 28 日 10:15 江西 DeepSeek 的 MLA&#xff08;Multi-head Latent Attention&#xff0c;多头潜在注意力&#xff09;架构 是一种优化…

数据结构-堆和PriorityQueue

1.堆&#xff08;Heap&#xff09; 1.1堆的概念 堆是一种非常重要的数据结构&#xff0c;通常被实现为一种特殊的完全二叉树 如果有一个关键码的集合K{k0,k1,k2,...,kn-1}&#xff0c;把它所有的元素按照完全二叉树的顺序存储在一个一维数组中&#xff0c;如果满足ki<k2i…

BUUCTF_[安洵杯 2019]easy_web(preg_match绕过/MD5强碰撞绕过/代码审计)

打开靶场&#xff0c;出现下面的静态html页面&#xff0c;也没有找到什么有价值的信息。 查看页面源代码 在url里发现了img传参还有cmd 求img参数 这里先从img传参入手&#xff0c;这里我发现img传参好像是base64的样子 进行解码&#xff0c;解码之后还像是base64的样子再次进…

Linux的简单使用和部署4asszaaa0

一.部署 1 环境搭建方式主要有四种: 1. 直接安装在物理机上.但是Linux桌面使用起来非常不友好.所以不建议.[不推荐]. 2. 使用虚拟机软件,将Linux搭建在虚拟机上.但是由于当前的虚拟机软件(如VMWare之类的)存在⼀些bug,会导致环境上出现各种莫名其妙的问题比较折腾.[非常不推荐…

RK3566-移植5.10内核Ubuntu22.04

说明 记录了本人使用泰山派&#xff08;RK3566&#xff09;作为平台并且成功移植5.10.160版本kernel和ubuntu22.04&#xff0c;并且成功配置&连接网络的完整过程。 本文章所用ubuntu下载地址&#xff1a;ubuntu-cdimage-ubuntu-base-releases-22.04-release安装包下载_开源…

二级C语言题解:十进制转其他进制、非素数求和、重复数统计

目录 一、程序填空&#x1f4dd; --- 十进制转其他进制 题目&#x1f4c3; 分析&#x1f9d0; 二、程序修改&#x1f6e0;️ --- 非素数求和 题目&#x1f4c3; 分析&#x1f9d0; 三、程序设计&#x1f4bb; --- 重复数统计 题目&#x1f4c3; 分析&#x1f9d0; 前言…

UE求职Demo开发日志#22 显示人物信息,完善装备的穿脱

1 创建一个人物信息显示的面板&#xff0c;方便测试 简单弄一下&#xff1a; UpdateInfo函数&#xff1a; 就是获取ASC后用属性更新&#xff0c;就不细看了 2 实现思路 在操作目标为装备栏&#xff0c;或者操作起点为装备栏时&#xff0c;交换前先判断能否交换&#xff08;只…

在游戏本(6G显存)上本地部署Deepseek,运行一个14B大语言模型,并使用API访问

在游戏本6G显存上本地部署Deepseek&#xff0c;运行一个14B大语言模型&#xff0c;并使用API访问 环境说明环境准备下载lmstudio运行lmstudio 下载模型从huggingface.co下载模型 配置模型加载模型测试模型API启动API服务代码测试 deepseek在大语言模型上的进步确实不错&#xf…

专业学习|一文了解并实操自适应大邻域搜索(讲解代码)

一、自适应大邻域搜索概念介绍 自适应大邻域搜索&#xff08;Adaptive Large Neighborhood Search&#xff0c;ALNS&#xff09;是一种用于解决组合优化问题的元启发式算法。以下是关于它的详细介绍&#xff1a; -自适应大领域搜索的核心思想是&#xff1a;破坏解、修复解、动…

记录一下 在Mac下用pyinstallter 打包 Django项目

安装: pip install pyinstaller 在urls.py from SheepMasterOneToOne import settings from django.conf.urls.static import staticurlpatterns [path("admin/", admin.site.urls),path(generate_report/export/, ReportAdmin(models.Report, admin.site).generat…

如何在Intellij IDEA中识别一个文件夹下的多个Maven module?

目录 问题描述 理想情况 手动添加Module&#xff0c;配置Intellij IDEA的Project Structure 问题描述 一个文件夹下有多个Maven项目&#xff0c;一个一个开窗口打开可行但是太麻烦。直接open整个文件夹会发现Intellij IDEA默认可能就识别一个或者几个Maven项目&#xff0c;如…

Linux 文件和目录

Linux 文件和目录 文章目录 Linux 文件和目录Linux 目录Linux 目录配置的依据 --FHS目录树文件属性文件的分类一般权限 UGO特殊权限 suid\sgid\sticky隐藏属性 ATTR文件访问控制列表 ACL文件相关的命令权限的修改 chmod chown chgrp umaskchmodchgrpumask相关文档 /etc/profile…

【大数据技术】本机DataGrip远程连接虚拟机MySQL/Hive

本机DataGrip远程连接虚拟机MySQL/Hive datagrip-2024.3.4VMware Workstation Pro 16CentOS-Stream-10-latest-x86_64-dvd1.iso写在前面 本文主要介绍如何使用本机的DataGrip连接虚拟机的MySQL数据库和Hive数据库,提高编程效率。 安装DataGrip 请按照以下步骤安装DataGrip软…

【大模型】DeepSeek大模型技术路径

【大模型】DeepSeek大模型技术路径 一、总体架构(一)Transformer架构:奠定坚实基础(二)Mixture-of-Experts(MoE)架构:提升灵活性与效率二、技术突破(一)训练方法创新(二)架构优化(三)训练效率与成本优化(四)推理能力提升三、总结一、总体架构 DeepSeek大模型以…

【LLM-agent】(task2)用llama-index搭建AI Agent

note LlamaIndex 实现 Agent 需要导入 ReActAgent 和 Function Tool&#xff0c;循环执行&#xff1a;推理、行动、观察、优化推理、重复进行。可以在 arize_phoenix 中看到 agent 的具体提示词&#xff0c;工具被装换成了提示词ReActAgent 使得业务自动向代码转换成为可能&am…

解决Mac安装软件的“已损坏,无法打开。 您应该将它移到废纸篓”问题

mac安装软件时&#xff0c;如果出现这个问题&#xff0c;其实很简单 首先打开终端&#xff0c;输入下面的命令 sudo xattr -r -d com.apple.quarantine 输入完成后&#xff0c;先不要回车&#xff0c;点击访达--应用程序--找到你无法打开的app图标&#xff0c;拖到终端窗口中…

(9) 上:学习与验证 linux 里的 epoll 对象里的 EPOLLIN、 EPOLLHUP 与 EPOLLRDHUP 的不同

&#xff08;1&#xff09;经过之前的学习。俺认为结论是这样的&#xff0c;因为三次握手到四次挥手&#xff0c;到 RST 报文&#xff0c;都是 tcp 连接上收到了报文&#xff0c;这都属于读事件。所以&#xff1a; EPOLLIN : 包含了读事件&#xff0c; FIN 报文的正常四次挥手、…

一文讲解Spring如何解决循环依赖

Spring 通过三级缓存机制来解决循环依赖&#xff1a; 一级缓存&#xff1a;存放完全初始化好的单例 Bean。 二级缓存&#xff1a;存放正在创建但未完全初始化的 Bean 实例。 三级缓存&#xff1a;存放 Bean 工厂对象&#xff0c;用于提前暴露 Bean。 试问:三级缓存解决循环依…

Vue canvas画图画线例子,数据回显与隔离,点拖拽修改

组件 <template><divstyle"display: flex; height: 342px; width: 760px; border: 1px solid #000"><divstyle"position: relative; height: 100%; width: 608px; min-width: 608px"><canvasid"mycanvas"ref"mycanva…