Spark---RDD(Key-Value类型转换算子)

文章目录

  • 1.RDD Key-Value类型
      • 1.1 partitionBy
      • 1.2 reduceByKey
      • 1.3 groupByKey
          • reduceByKey和groupByKey的区别
          • 分区间和分区内
      • 1.4 aggregateByKey
          • 获取相同key的value的平均值
      • 1.5 foldByKey
      • 1.6 combineByKey
      • 1.7 sortByKey
      • 1.8 join
      • 1.9 leftOuterJoin
      • 1.10 cogroup

1.RDD Key-Value类型

Key-Value类型的算子即对键值对进行操作。

1.1 partitionBy

将数据按照指定的 Partitioner(分区器) 重新进行分区。Spark 默认的分区器为HashPartitioner,Spark除了默认的分区器外,常见的分区器还有:RangePartitioner、Custom Partitioner、SinglePartitioner等。

函数定义:
def partitionBy(partitioner: Partitioner): RDD[(K, V)]

	//使用HashPartitioner分区器并设置分区个数为2
    val data1: RDD[(Int, String)] = sparkRdd.makeRDD(Array((1, "aaa"), (2, "bbb"), (3, "ccc")), 3)
    data1.partitionBy(new HashPartitioner(2));
    data1.collect().foreach(println)

在这里插入图片描述

1.2 reduceByKey

可以将数据按照相同的 Key 对 Value 进行聚合

函数定义:
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

	//将数据按照相同的key对value进行聚合
    val data1: RDD[(String, Int)] = sparkRdd.makeRDD(List(("a", 1), ("b", 2), ("c", 3),("a",4),("b",5)))
    val data2: RDD[(String, Int)] = data1.reduceByKey((x: Int, y: Int) => {
      x + y
    })
    data2.collect().foreach(println)

在这里插入图片描述

1.3 groupByKey

将数据源的数据根据 key 对 value 进行分组

函数定义:
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

    val dataRDD1 = sparkRdd.makeRDD(List(("a", 1), ("b", 2), ("c", 3),("a",4),("b",5)))
    val data1 = dataRDD1.groupByKey()
    //指定分区个数为2
    val data2 = dataRDD1.groupByKey(2)
    //指定分区器和分区个数
    val data3 = dataRDD1.groupByKey(new HashPartitioner(2))
    
    data1.collect().foreach(println)
    println("-------------------->")
    data2.collect().foreach(println)
    println("-------------------->")
    data3.collect().foreach(println)

在这里插入图片描述

reduceByKey和groupByKey的区别

从功能的角度来看:reduceByKey包含了分组和聚合功能,而groupByKey只包含了分组功能。
从shuffle的角度来看:为了避免占用过多的内存空间,reduceByKey和groupByKey在执行的过程中,都会执行shuffle操作,将数据打散写入到磁盘的临时文件中,而reduceByKey在进行shuffle前会对数据进行预聚合的操作,致使shuffle的效率得到的提升,因为减少了落盘的数据量。但是groupByKey在shuffle前不会进行预聚合操作。所以,reduceByKey在进行分组的时候,效率相对groupByKey来说较高。

reduceByKey:
在这里插入图片描述

groupByKey:

在这里插入图片描述

分区间和分区内

分区间: 顾名思义,分区间就是指的多个分区之间的操作。如reduceByKey在shuffle操作后将不同分区的数据传输在同一个分区中进行聚合。
分区内: 分区内字面意思指的是单个分区内之间的操作。如reduceByKey的预聚合功能就是在分区内完成

1.4 aggregateByKey

将数据根据不同的规则进行分区内计算和分区间计算,如reduceByKey中分区间和分区内都是聚合操作,而使用aggregateByKey可以设置分区间和分区内执行不同的操作。

函数定义:
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]

    //取出每个分区内相同 key 的最大值然后分区间相加
    // aggregateByKey 算子是函数柯里化,存在两个参数列表
    // 1. 第一个参数列表中的参数表示初始值
    // 2. 第二个参数列表中含有两个参数
    // 2.1 第一个参数表示分区内的计算规则
    // 2.2 第二个参数表示分区间的计算规则
    val data1 = sparkRdd.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)),2)
    val data2 = data1.aggregateByKey(0)(
      (x,y)=>{Math.max(x,y)},
      (x,y)=>{x+y}
    )
    data2.collect().foreach(println)**

在这里插入图片描述
注意:最终的结果会受到设置的初始值的影响,返回结果的值的类型和初始值保持一致。

获取相同key的value的平均值
    val data1:RDD[(String,Int)] = sparkRdd.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4),("b",5),("a",6)),2)
    //设置初始值,初始值为一个元组,元组第一个元素表示value,第二个表示出现次数,初始默认都为0
    val data2:RDD[(String,(Int,Int))] = data1.aggregateByKey((0,0))(
      (t, v)=> {
        (t._1 + v, t._2 + 1)
      } ,//分区内计算
      (t1, t2) => {
        (t1._1 + t2._1, t1._2 + t2._2)
      }//分区间计算
    )
       //和除以次数求出平均值
    val data3 = data2.mapValues({
      case (sum, count) => sum / count
    })
    data3.collect().foreach(println)

在这里插入图片描述

1.5 foldByKey

当分区内和分区间的计算规则相同的时候,aggregateByKey 就可以简化为 foldByKey

函数定义:
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

    val dataRDD1 = sparkRdd.makeRDD(List(("a",1),("b",2),("a",3)))
    val dataRDD2 = dataRDD1.foldByKey(0)(_+_)
    dataRDD2.collect().foreach(println)

在这里插入图片描述

1.6 combineByKey

最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

函数定义:

def combineByKey[C](
createCombiner: V => C,//对数据进行转换
mergeValue: (C, V) => C, //分区内合并
mergeCombiners: (C, C) => C): RDD[(K, C)] //分区间合并

//将数据 List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))求每个 key 的平均值
val rddSource: RDD[(String, Int)] = sparkRdd.makeRDD(List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),2)
val combinRdd: RDD[(String, (Int, Int))] = rddSource.combineByKey(
      ((x:Int)=>{
        (x,1)
      }),//对每个value进行转换,转换后为(value,1),第一个元素为值,第二个元素为出现的次数
      ((t1:(Int,Int),v)=>{
        (t1._1+v,t1._2+1)
      }),//分区内合并
            ((t1,t2)=>{
        (t1._1+t2._1,t1._2+t2._2)
      })//分区间合并
)
    //mapValues算子是在key保持不变的时候对value进行操作
    val mapRdd: RDD[(String, Int)] = combinRdd.mapValues({
      case ((sum: Int, count: Int)) => sum / count
    })

    mapRdd.collect().foreach(println)

在这里插入图片描述
由此看出,combineByKey和aggreateByKey的不同之处在于,combineByKey可以不设置初始值,只需要对第一个元素进行转换,转换到合适的计算格式即可。

1.7 sortByKey

在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的

函数定义:

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)]

//升序排序
    val dataRDD1 = sparkRdd.makeRDD(List(("a",1),("b",2),("c",3)))
    val sortRdd: RDD[(String, Int)] = dataRDD1.sortByKey()

    sortRdd.collect().foreach(print)

在这里插入图片描述
sortByKey默认为升序排序,如果想要降序排序,只需要将sortByKey第一个参数修改为false即可。
在这里插入图片描述

1.8 join

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的(K,(V,W))的 RDD

函数定义:

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

	//join操作相当于数据库中的内连接,在连接的时候自动去除两边的悬浮元组
    val rdd0: RDD[(Int, String)] = sparkRdd.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
    val rdd1: RDD[(Int, Int)] = sparkRdd.makeRDD(Array((1, 4), (2, 5), (3, 6)))

    rdd0.join(rdd1).collect().foreach(print)
    //修改rdd1,使其少了key=3的这个元素
        val rdd0: RDD[(Int, String)] = sparkRdd.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
    val rdd1: RDD[(Int, Int)] = sparkRdd.makeRDD(Array((1, 4), (2, 5)))

    rdd0.join(rdd1).collect().foreach(print)

在这里插入图片描述
在这里插入图片描述

1.9 leftOuterJoin

类似于 SQL 语句的左外连接

函数定义:

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

    val rdd0: RDD[(Int, String)] = sparkRdd.makeRDD(List((1, "a"), (2, "b")))
    val rdd1: RDD[(Int, Int)] = sparkRdd.makeRDD(List((1, 4), (2, 5),(3, 6)))

    val rddRes = rdd0.leftOuterJoin(rdd1)
    rddRes.collect().foreach(print)

在这里插入图片描述

1.10 cogroup

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD,即先对

函数定义:

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

    val rdd0: RDD[(Int, String)] = sparkRdd.makeRDD(List((1, "a"), (2, "b"),(3,"c")))
    val rdd1: RDD[(Int, Int)] = sparkRdd.makeRDD(List((1, 4), (2, 5)))

    val rddRes = rdd0.cogroup(rdd1)
    rddRes.collect().foreach(print)

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/317014.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Javaweb的网络投票系统的设计与实现

目的与意义 原始的投票管理基本上是人工操作,效率低下,缺乏方便性,网上投票管理系统运用计算机和其他附加设备,不再需要手工操作,基本上是全自动化,能够节省人力,大大的提高了效率。使投票变得…

【项目管理】CMMI-风险与机会管理过程

1、文档结构 2、风险与机会概率 风险与机会概率指的是风险与机会实际发生的可能性。可以用自然语言术语来映射数字概率范围。下表列出了七段概率分级中自然语言术语和数字概率范围映射关系。注意,用来计算的概率值等于概率范围的中间值取整。有了映射表格的帮助&am…

C++ Primer 6.3 返回类型和return语句 知识点+练习题

C Primer 6.3 返回类型和return语句 无返回值函数有返回值的函数两个错误值是如何被返回的返回类类型的函数和调用运算符引用返回左值列表初始化返回值主函数main的返回值返回数组指针 递归练习题疑问待更新 无返回值函数 用在返回值类型为void的函数中,可以不写re…

u盘监控系统—公司电脑如何监控U盘使用?【详解】

在当今的办公环境中,U盘等移动存储设备已成为数据传输和存储的重要工具。 然而,随着U盘的广泛使用,也带来了潜在的安全风险,如数据泄露、病毒传播等。 因此,对于随时会有数据泄露风险的企业而言,U盘的使用…

深入浅出的说地弹(即地噪声)

1. 什么是地弹,地弹的概念,为何叫地弹 地弹、振铃、串扰、信号反射这几个在信号完整性分析总是分析的重点对象。初学者一看:好高深!其实,感觉高深是因为你满天听到“地弹”二字,却到处找不到“地弹…

使用pandas按照商品和下单人统计下单数据

目录 一:需求描述 二:代码实现 三:注意事项 一:需求描述 最近运营那边给到一个excel表格,是一个小程序用户的下单数据,要以商品为维度,统计用户下单情况,主要是下单的商品总金额&…

DNS解析和主从复制

一、DNS名称解析协议 二、DNS正向解析 三、DNS主从复制 主服务器 从服务器

CAN总线通信详解 (超详细配34张高清图)

CAN总线通信详解 (超详细配34张高清图) 1. CAN总线历史 CAN 是 Controller Area Network 的缩写(以下称为 CAN),是 ISO国际标准化的串行通信协议。 在当前的汽车产业中,出于对安全性、舒适性、方便性、低公害、低成本的要求&#…

案例119:基于微信小程序的宿舍管理系统设计与实现

文末获取源码 开发语言:Java 框架:SSM JDK版本:JDK1.8 数据库:mysql 5.7 开发软件:eclipse/myeclipse/idea Maven包:Maven3.5.4 小程序框架:uniapp 小程序开发软件:HBuilder X 小程序…

【Python机器学习】分类器的不确定估计——决策函数

scikit-learn接口的分类器能够给出预测的不确定度估计,一般来说,分类器会预测一个测试点属于哪个类别,还包括它对这个预测的置信程度。 scikit-learn中有两个函数可以用于获取分类器的不确定度估计:decidion_function和predict_pr…

kubebuilder+code-generator开发k8s的controller

本文记录用kubebuilder和code-generator开发k8s的crd控制器。 概览 和k8s.io/code-generator类似,是一个码生成工具,用于为你的CRD生成kubernetes-style API实现。区别在于: Kubebuilder不会生成informers、listers、clientsets&#xff0c…

【工具栏】jclasslib 插件的安装和使用

1. 安装 2.使用 安装之后 在 view 的 ToolWindows 里也有一个这样的窗口 jclasslib 的主要作用是查看字节码的相关信息 package com.test;public class Test {public static void main(String[] args) {Integer a 1;int b a 2;} }例如我写了一段这样的代码,然后去…

语义分割发展现状

语义分割是对图像中的每一个像素进行分类,目前广泛应用于医学图像与无人驾驶等。从这几年的论文来看,这一领域主要分为有监督语义分割、无监督语义分割、视频语义分割等。 语意分割究竟有什么用呢?似乎看起来没有目标检测/跟踪等应用范围广。…

P1379 八数码难题

题目描述 在 33 的棋盘上,摆有八个棋子,每个棋子上标有 1 至 8 的某一数字。棋盘中留有一个空格,空格用 0 来表示。空格周围的棋子可以移到空格中。要求解的问题是:给出一种初始布局(初始状态)和目标布局&…

Linux centos stream9 parted

在Linux中,常用的磁盘管理工具包括 fdisk、parted、gdisk 等。它们可以用于创建、删除、调整分区、查看分区表等操作。 传统的MBR分区表(即主引导记录)大家都很熟悉,是过去我们使用windows时常见的。所支持的最大卷2T,且对分区有限制&#x…

SRM供应商招标采购管理系统(源码)

软件相关资料获取:点我获取 一、SRM供应商在线采购 SRM供应商在线采购是指企业通过互联网平台,实现对供应商的在线招募、选择、关系管理等一系列活动。这种采购方式具有高效、透明、便于管理的特点,能够帮助企业降低采购成本,提…

Vue中v-if与v-show区别详解

✨ 专栏介绍 在当今Web开发领域中,构建交互性强、可复用且易于维护的用户界面是至关重要的。而Vue.js作为一款现代化且流行的JavaScript框架,正是为了满足这些需求而诞生。它采用了MVVM架构模式,并通过数据驱动和组件化的方式,使…

Nightingale 夜莺监控系统 - 告警篇(3)

Author:rab 官方文档:https://flashcat.cloud/docs/content/flashcat-monitor/nightingale-v6/usage/alert/alert-rule/ 目录 前言一、配置1.1 创建钉钉机器人1.2 n9e 创建通知用户1.3 n9e 创建团队(组)1.4 将通知用户添加团队1.…

C++核心编程——文件操作

本专栏记录C学习过程包括C基础以及数据结构和算法,其中第一部分计划时间一个月,主要跟着黑马视频教程,学习路线如下,不定时更新,欢迎关注。 当前章节处于: ---------第1阶段-C基础入门 ---------第2阶段实战…