Spark-Scala语言实战(13)

在之前的文章中,我们学习了如何在spark中使用键值对中的keys和values,reduceByKey,groupByKey三种方法。想了解的朋友可以查看这篇文章。同时,希望我的文章能帮助到你,如果觉得我的文章写的不错,请留下你宝贵的点赞,谢谢。

Spark-Scala语言实战(12)-CSDN博客文章浏览阅读722次,点赞19次,收藏15次。今天开始的文章,我会带给大家如何在spark的中使用我们的键值对方法,今天学习键值对方法中的keys和values,reduceByKey,groupByKey三种方法。希望我的文章能帮助到大家,也欢迎大家来我的文章下交流讨论,共同进步。https://blog.csdn.net/qq_49513817/article/details/137385224今天的文章开始,我会继续带着大家如何在spark的中使用我们的键值对里的方法。今天学习键值对方法中的fullOuterJoin,zip,combineByKey三种方法。

目录

一、知识回顾

二、键值对方法

1.fullOuterJoin

2.zip

3.combineByKey

拓展-方法参数设置


一、知识回顾

 上一篇文章中我们学习了键值对的三种方法,分别是keys和values,reduceByKey,groupByKey。

keys和values分别对应了我们的键与值。

我们可以用它们来创建我们的RDD

 reduceByKey可以进行统计,将有相同键的值进行相加,统一输出。

而 groupByKey方法就是对我们的键值对RDD进行分组了

它可以将我们的相同的键,不同的值组合成一个组。

那么,开始今天的学习吧~ 

二、键值对方法

1.fullOuterJoin

  •  fullOuterJoin()方法用于对两个RDD进行全外连接,保留两个RDD中所有键的连接结果。
import org.apache.spark.{SparkConf, SparkContext}
object p1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("p2")
    val sc = new SparkContext(conf)
    // 创建两个RDD(弹性分布式数据集)
    val p1 = sc.parallelize(Seq(("a1", "1"), ("a2", "2"), ("a3", "3")))
    val p2 = sc.parallelize(Seq(("a2", "A"), ("a3", "B"), ("a4", "C")))
    // 将RDD转换为键值对
    val pp1 = p1.map { case (key, value) => (key, value) }
    val pp2 = p2.map { case (key, value) => (key, value) }
    // 执行fullOuterJoin操作
    val ppp = pp1.fullOuterJoin(pp2)
    // 收集结果并打印
    ppp.collect().foreach(println)
    }
}

我们的代码创建了两个键值对RDD,那么使用 fullOuterJoin方法全外连接那么两个键值对都会连接。

可以看到两个键值对里的键与值都连接上了,互相没有的值即显示None值。 

2.zip

  • zip()方法用于将两个RDD组合成键值对RDD,要求两个RDD的分区数量以及元素数量相同,否则会抛出异常。
  • 将两个RDD组合成Key/Value形式的RDD,这里要求两个RDDpartition数量以及元素数量都相同,否则会抛出异常
import org.apache.spark.{SparkConf, SparkContext}
object p1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("p2")
    val sc = new SparkContext(conf)
    // 创建两个RDD
    val p1 = sc.parallelize(Seq(1, 2, 3))
    val p2 = sc.parallelize(Seq("a", "b", "c"))
    // 使用zip方法将两个RDD组合在一起
    val pp1 = p1.zip(p2)
    val pp2 = p2.zip(p1)
    // 收集结果并打印
    pp1.collect().foreach(println)
    pp2.collect().foreach(println)
    }
}

 代码创建了两个不同的RDD键值对,分别使用p1zip方法p2与p2zip方法p1,那么它们输出的结果会是一样的吗?

可以看到是不一样的,谁在前面谁就是键,反之是值。 

3.combineByKey

  • combineByKey()方法是Spark中一个比较核心的高级方法,键值对的其他一些高级方法底层均是使用combineByKey()方法实现的,如groupByKey()方法、reduceByKey()方法等。
  • combineByKey()方法用于将键相同的数据聚合,并且允许返回类型与输入数据的类型不同的返回值。
  • combineByKey()方法的使用方式如下。
    • combineByKey(createCombiner,mergeValue,mergeCombiners,numPartitions=None)
import org.apache.spark.{SparkConf, SparkContext}
object p1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("p2")
    val sc = new SparkContext(conf)
    val p1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5)))
    val p2 = p1.combineByKey(
      // createCombiner: 将第一个值转换为累加器
      (v: Int) => v,
      // mergeValue: 将新的值加到累加器上
      (c: Int, v: Int) => c + v,
      // mergeCombiners: 合并两个累加器
      (c1: Int, c2: Int) => c1 + c2
    )
    p2.collect().foreach { case (key, value) =>
      println(s"Key: $key, Value: $value")
    }
  }
}

我的代码中: 

createCombiner: 这个函数定义了如何将每个键的第一个值转换为初始的累加器值。 

代表着每个键,第一个出现的值将作为累加器的初始值。

mergeValue: 这个函数定义了如何将新值与当前的累加器值合并。在我的代码中,我将新值与累加器相加。

代表着每个键的后续值,它们都会被加到当前的累加器值上。

mergeCombiners: 这个函数定义了当两个累加器(对应于同一个键但可能来自不同的分区)需要合并时应该执行的操作。在我的代码中,也是将两个累加器值相加

这确保了无论数据如何在分区之间分布,最终每个键都会得到正确的累加结果。

看看输出效果

可以看到我们的键值对成功累加。

快去试试吧~ 

拓展-方法参数设置

方法参数描述例子
fullOuterJoinotherRDD另一个要与之进行全外连接的RDDrdd1.fullOuterJoin(rdd2)
fullOuterJoinnumPartitions结果RDD的分区数(可选)rdd1.fullOuterJoin(rdd2, numPartitions=10)
zipotherRDD要与之进行zip操作的另一个RDDrdd1.zip(rdd2)
combineByKeycreateCombiner处理第一个出现的每个键的值的函数lambda v: (v, 1)
combineByKeymergeValue合并具有相同键的值的函数lambda acc, v: (acc[0] + v, acc[1] + 1)
combineByKeymergeCombiners合并两个累积器的函数lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
combineByKeynumPartitions结果RDD的分区数(可选)rdd.combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=5)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/519598.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

海康摄像头插件嵌入iframe时视频播放插件位置问题

参考:https://juejin.cn/post/6857670423971758094 原因:没有按照iframe相对位置计算视频插件位置。 解决: $(window).on(resize, resize);function resize(){// 解决iframe中嵌入海康插件初始化问题:// 1. 获取iframe相比于窗口的偏移量;c…

第二节课《轻松玩转书生·浦语大模型趣味 Demo》

比较匆忙,假期前仿照第一期课程的内容好像被清空了,重新搭建一次。 https://github.com/InternLM/Tutorial/blob/camp2/helloworld/hello_world.md 按照那老师写好的,一步步复制就好了 浦语灵笔2的大概率是会超出显存,先不测试了…

水泥5G智能制造工厂数字孪生可视化平台,推进水泥行业数字化转型

水泥5G智能制造工厂数字孪生可视化平台,推进水泥行业数字化转型。水泥5G智能制造工厂数字孪生可视化平台,是水泥行业数字化转型的关键推手。数字孪生平台运用先进的信息技术和数字化手段,实现水泥生产过程的数字化模拟、可视化监控和智能化管…

泰坦尼克号幸存者数据分析

泰坦尼克号幸存者数据分析 1、泰坦尼克号数据集2、数据集加载与概览3、泰坦尼克号幸存者数据分析4、哪些人可能成为幸存者? 1、泰坦尼克号数据集 泰坦尼克号的沉没是世界上最严重的海难事故之一,造成了大量的人员伤亡。这是一艘号称当时世界上最大的邮轮…

LoRa自组网络设计 6

1 深入了解LoRaWan 1.1 LoRaWan概述 LoRaWAN采用星型无线拓扑 End Nodes 节点 Gateway 网关 Network Server 网络服务器 Application Server 应用服务器 LoRa联盟是2015年3月Semtech牵头成立的一个开放的、非盈利的组织,发起成员还有法国Actility,中国…

[C#]OpenCvSharp使用帧差法或者三帧差法检测移动物体

关于C版本帧差法可以参考博客 [C]OpenCV基于帧差法的运动检测-CSDN博客https://blog.csdn.net/FL1768317420/article/details/137397811?spm1001.2014.3001.5501 我们将参考C版本转成opencvsharp版本。 帧差法,也叫做帧间差分法,这里引用百度百科上的…

C语言数据结构专题(3应用-通讯录的实现)

前言 前面的两节我们弄清了顺序表是什么?顺序表是怎么实现的?此时大家可能有疑问了:顺序表被创造出来具体有什么用呢?那么本节就给大家带来顺序表的应用--通讯录的实现,废话不多说,我们正式进入本节的学习 …

探寻马来西亚服务器托管的优势与魅力

随着全球跨境业务的不断增加,境外服务器成为越来越受欢迎的选择。在这其中,马来西亚服务器备受关注,其机房通常位于马来西亚首都吉隆坡。对于客户群体主要分布在东南亚、澳大利亚和新西兰等地区的用户来说,马来西亚服务器是一个理…

MATLAB近红外光谱分析技术应用

郁磊副教授,主要从事MATLAB编程、机器学习与数据挖掘、数据可视化和软件开发、生理系统建模与仿真、生物医学信号处理,具有丰富的实战应用经验,主编《MATLAB智能算法30个案例分析》、《MATLAB神经网络43个案例分析》相关著作。已发表多篇高水…

JVM基础:类的生命周期详解

JDK版本:jdk8 IDEA版本:IntelliJ IDEA 2022.1.3 文章目录 一. 生命周期概述二. 加载阶段(Loading)2.1 加载步骤2.2 查看内存中的对象 三. 连接阶段(Linking)3.1 连接之验证3.2 连接之准备3.3 连接阶段之解析 四. 初始化阶段(Initialization)4.1 单个类的…

约数与倍数-第12届蓝桥杯选拔赛Python真题精选

[导读]:超平老师的Scratch蓝桥杯真题解读系列在推出之后,受到了广大老师和家长的好评,非常感谢各位的认可和厚爱。作为回馈,超平老师计划推出《Python蓝桥杯真题解析100讲》,这是解读系列的第45讲。 约数与倍数&#…

rust 面向对象编程特性、模式与模式匹配、高级特征

面向对象编程OOP 学习了结构体、枚举,它们可以包含自定义数据字段,也可以定义内部方法,它们提供了与对象相同的功能。 面向对象的四大特征:封装、继承、多态 通过pub标记为公有的结构体,在其他模块中可以访问使用这…

python爬虫———post请求方式(第十四天)

🎈🎈作者主页: 喔的嘛呀🎈🎈 🎈🎈所属专栏:python爬虫学习🎈🎈 ✨✨谢谢大家捧场,祝屏幕前的小伙伴们每天都有好运相伴左右,一定要天天…

C语言【编译和链接】

1.程序执行过程 C语言的编译和链接是将源代码转换为可执行程序的过程。下面是C语言编译和链接的基本步骤: 预处理:在编译前,预处理器会对源代码进行。它会处理以"#"开头的预处理指令,#include和#define,并将…

算法笔记————ST表

运用了倍增思想,从小到大处理 1.【模板】ST 表 // Problem: // P3865 【模板】ST 表 // // Contest: Luogu // URL: https://www.luogu.com.cn/problem/P3865 // Memory Limit: 125 MB // Time Limit: 800 ms // // Powered by CP Editor (https://cpedi…

Kotlin学习日志(一)TextView、Button、Toast的使用(1)

android:layout_width“wrap_content” android:layout_height“wrap_content”/> import kotlinx.android.synthetic.main.activity_main.* 这句话的意思是引进Kotlin的的控件变量自动映射功能,接下来只要是这个activity_main.xml文件中的控件,我…

非关系型数据库——Redis基本操作

目录 一、Redis数据库常用命令 1.Set——存放数据 2.Get——获取数据 3.Keys——获取符合条件的键值 4.Exists——判断键值是否存在 5.Del——删除指定键值 6.Type——获取键值对应的类型 7.Rename——对已有键值重命名(覆盖) 8.Renamenx——对…

160 Linux C++ 通讯架构实战14,epoll 反应堆模型

到这里,我们需要整理一下之前学习的epoll模型,并根据之前的epoll模型,提出弊端,进而整理epoll反应堆模型,进一步深刻理解,这是因为epoll实在是太重要了。 复习之前的epoll的整体流程以及思路。 参考之前写…

虚幻UE5智慧城市全流程开发教学

一、背景 这几年,智慧城市/智慧交通/智慧水利等飞速发展,骑士特意为大家做了一个这块的学习路线。 二、这是学习大纲 1.给虚幻UE5初学者准备的智慧城市/数字孪生蓝图开发教程 https://www.bilibili.com/video/BV1894y1u78G 2.UE5数字孪生蓝图开发教学…

【软件工程】测试规格

1. 引言 1.1简介 本次的测试用例是基于核心代码基本开发完毕,在第一代系统基本正常运行后编写的,主要目的是为了后续开发与维护的便利性。 该文档主要受众为该系统后续开发人员,并且在阅读此文档前最后先阅读本系统的需求文档、概要设计文…