Spark-Scala语言实战(9)

之前的文章中,我们学习了如何在spark中使用RDD方法的flatMap,take,union。想了解的朋友可以查看这篇文章。同时,希望我的文章能帮助到你,如果觉得我的文章写的不错,请留下你宝贵的点赞,谢谢。

Spark-Scala语言实战(8)-CSDN博客文章浏览阅读675次,点赞16次,收藏10次。​今天开始的文章,我会带给大家如何在spark的中使用我们的RDD方法,今天学习RDD方法中的flatMap,take,union三种方法。希望我的文章能帮助到大家,也欢迎大家来我的文章下交流讨论,共同进步。https://blog.csdn.net/qq_49513817/article/details/137157697?今天的文章,我会继续带着大家如何在spark的中使用我们的RDD方法。今天学习RDD方法中的filter,distinct,intersection三种方法,并做一道相关例题。

一、知识回顾

昨天我们学习了RDD的三种方法,分别是flatMap,take,union。

flatMap的一般作用是用来切分我们的单词

它会构建一个新的RDD 

take方法是用来获取我们RDD中前n个元素,n可以自行设置

union可以将我们的两个RDD进行合并操作

但使用我们的union方法时,需保证两个RDD的数据类型相同,否则无法运行。

现在,开始今天的学习吧。

二、RDD方法

1.filter

  • filter()方法是一种转换操作,用于过滤RDD中的元素。
  • filter()方法需要一个参数,这个参数是一个用于过滤的函数,该函数的返回值为Boolean类型。
  • filter()方法将返回值为true的元素保留,将返回值为false的元素过滤掉,最后返回一个存储符合过滤条件的所有元素的新RDD
import org.apache.spark.{SparkConf, SparkContext}

object p1 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("p2")
    val sc=new SparkContext(conf)
    val p = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    val rdd = sc.parallelize(p)
    // 使用filter操作过滤出所有偶数
    val pp = rdd.filter(x => x % 2 == 0)
    // 收集结果并打印
    val ppp = pp.collect()
    ppp.foreach(println)
  }
}

可以看到我们的代码创建了一个1到10的数组,也可以看到注释中我们的需求是筛出里面包括的偶数,那么我们运行代码得到的就应该是2,4,6,8,10,现在,运行我们的代码看看是否得到预期的值吧。

可以看到左下角成功输出代码预期值。

2.distinct

  •  distinct()方法是一种转换操作,用于RDD的数据去重,去除两个完全相同的元素,没有参数。
import org.apache.spark.{SparkConf, SparkContext}

object p1 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("p2")
    val sc=new SparkContext(conf)
    val p = Array(1, 2, 2, 3, 4, 4, 5, 5, 5)
    val pp = sc.parallelize(p)
    // 使用distinct操作去除重复元素
    val ppp = pp.distinct()
    // 收集结果并打印
    val pppp = ppp.collect()
    pppp.foreach(println)

  }
}

可以看到我们的代码给了一组重复数据特别多的数组,那么我们的distinct方法肯定就是要将它进行降重操作了,那么我们现在运行代码来看一下。

可以看到我们预期的降重实现了,但是它的输出顺序特别混乱,这是因为Spark 的分布式计算模型决定了数据在不同分区之间可能会被打乱,并且在执行 distinct 操作时可能会进行重分区。因此,即使你的输入数组  是有序的,经过 distinct 处理后的输出数组很可能不是有序的。

那么要解决这个问题,我们肯定需要手动排序了

在这里我们就可以使用到sorted进行排序。

    val ppppp=pppp.sorted
    ppppp.foreach(println)

把这两行代码加到末尾,运行代码

可以看到输出预期中降重并排序的结果了。 

3.intersection

  •  intersection()方法用于求出两个RDD的共同元素,即找出两个RDD的交集,参数是另一个RDD,先后顺序与结果无关。
import org.apache.spark.{SparkConf, SparkContext}

object p1 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("p2")
    val sc=new SparkContext(conf)
    val p1 = sc.parallelize(Array(1, 2, 3, 4, 5))
    val p2 = sc.parallelize(Array(4, 5, 6, 7, 8))
    // 计算两个RDD的交集
    val ppp = p1.intersection(p2)
    // 收集结果并打印
    val ppppp = ppp.collect()
    ppppp.foreach(println)

  }
}

 看代码,我们定义了两个数组,那么既然intersection是求交集,那么运行代码输出的肯定是两个数组中的共同元素,即4,5。运行代码

可以看到成功输出我们交集4与5

三、任务实现

现在,我们有两个csv文件,里面有我们大量的薪资信息,我们现在需要做的事情如下: 

  • 输出上半年或下半年实际薪资大于20万元的员工姓名。
  • 首先需要过滤出两个RDD中实际薪资大于20万元的员工姓名。
  • 再将两个RDD得到的员工姓名合并到一个RDD中,对员工姓名进行去重。
  • 即可得到上半年或下半年实际薪资大于20万元的员工姓名。

想要完成它,并不困难,现在我们把文件放在C盘的根目录下,方便寻找,当然这个位置可以自己随便放。

然后编写我们的代码:

import org.apache.spark.{SparkConf, SparkContext}

object p1 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("p2")
    val sc=new SparkContext(conf)
    // 从C盘根目录读取第一个CSV文件
    val p1 = sc.textFile("C:\\Employee_salary_first_half.csv")
    // 从C盘根目录读取第二个CSV文件
    val p2 = sc.textFile("C:\\Employee_salary_second_half.csv")
    // 使用mapPartitionsWithIndex方法跳过CSV文件的标题行
    val pp1 = p1.mapPartitionsWithIndex((ix,it) => {
      if (ix ==0) it.drop(1)
      it
    })
    val pp2 = p2.mapPartitionsWithIndex((ix, it) => {
      if (ix == 0) it.drop(1)
      it
    })
    // 将pp1中的每一行转换为(员工名, 工资)元组
    val ppp1 = pp1.map(
      Line => {val data = Line.split(",");(data(1),data(6).toInt)})//使用逗号分割每行数据, 提取第二列和第七列数据,并将第七列转换为整数
    val ppp2 = pp2.map(
      Line => {val data = Line.split(",");(data(1),data(6).toInt)})
    val pppp1=ppp1.filter(x => x._2 > 200000).map(x => x._1)// 找出ppp1中工资超过200,000的元组,并只保留员工名
    val pppp2=ppp2.filter(x => x._2 > 200000).map(x => x._1)//x._n,n即使你要找的元素,通过 ._1 来访问第一个元素 a,通过 ._2 来访问第二个元素 b。
    val ppppp=pppp1.union(pppp2).distinct()//合并并降重
    ppppp.collect().foreach(println)//逐行打印
  }
}

我们先读取了两个文件,在将文件的标题行进行跳过,再分割数据找出需要的两行,最后找出工资大于200000的数据打印

来看看运行效果

可以看到我们预期的输出效果达到了。

拓展-方法参数设置

方法参数描述使用例子不同参数/效果
filterfunc对RDD中的每个元素应用函数func,返回True的元素保留,返回False的元素被过滤掉rdd.filter(lambda x: x > 3)通过修改func,可以定义不同的过滤条件,从而保留或过滤掉不同的元素。例如,lambda x: x % 2 == 0会保留偶数。
distinct返回一个包含RDD中所有不同元素的新RDD,去重rdd.distinct()此方法没有参数,它直接返回一个新的RDD,其中包含了原始RDD中的所有不同元素。这对于去除重复项非常有用。
intersectionother返回当前RDD与另一个RDDother的交集,结果中不包含重复元素rdd1.intersection(rdd2)other参数指定了另一个RDD,该方法将返回两个RDD中共有的元素。改变other的值将会影响交集的结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/505635.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据分析web可视化神器---streamlit框架,无需懂前端也能搭建出精美的web网站页面

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 所属的专栏:数据分析系统化教学,零基础到进阶实战 景天的主页:景天科技苑 文章目录 Streamlit什么是streamli…

基于springboot+vue实现的学校田径运动会管理系统

作者主页:Java码库 主营内容:SpringBoot、Vue、SSM、HLMT、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、小程序、安卓app等设计与开发。 收藏点赞不迷路 关注作者有好处 文末获取源码 技术选型 【后端】:Java 【框架】:spring…

CommunityToolkit.Mvvm----配置

一、介绍: CommunityToolkit.Mvvm 包(又名 MVVM 工具包,以前称为 Microsoft.Toolkit.Mvvm)是一个现代、快速和模块化的 MVVM 库。 它是 .NET 社区工具包的一部分,围绕以下原则生成: 独立于平台和运行时 - …

MySQL MHA高可用配置以及故障切换

目录 什么是 MHA 什么是 MHA MHA(MasterHigh Availability)是一套优秀的MySQL高可用环境下故障切换和主从复制的软件。 MHA 的出现就是解决MySQL 单点的问题。解决主从架构中的主服务器的单点问题 MySQL故障切换过程中,MHA能做到0-30秒内自动…

计算机网络:数据链路层 - 封装成帧 透明传输 差错检测

计算机网络:数据链路层 - 封装成帧 & 透明传输 & 差错检测 数据链路层概述封装成帧透明传输差错检测 数据链路层概述 从数据链路层来看,主机 H1 到 H2 的通信可以看成是在四段不同的链路上的通信组成的,所谓链路就是从一个节点到相邻…

python distribute是什么

Python的包管理工具常见的有easy_install, setuptools, 还有pip, distribute,那麽这几个工具有什么关系呢,看一下下面这个图就明白了: 可以看到distribute是setuptools的替代方案,pip是easy_install的替代方案。 Distribute提供一…

古代书法名家墨迹范本,中国法书碑帖图片合集

一、图片描述 在书法作品里,什么是法书?这是书法用语,又称法帖,学习书法可以作为楷模的范本,以及对古代名家墨迹的敬称,或以此誉称表达对书法作者的尊重之意,法书墨迹是最能反映古代书法艺术面…

Windows-安装infercnv包(自备)

目录 安装基础 ①安装JAGS a,找到适配版本 b,install for me only安装路径 ②安装"rjags"包 ③安装inferCNV 安装基础 版本: R version 4.2.2 (2022-10-31 ucrt) -- "Innocent and Trusting"安装的JAGS版本为JAGS 4.3.1 首…

Nginx_简介 + Linux系统下详细安装教程指路

安装教程指路 可参看该视频【尚硅谷Nginx教程(亿级流量nginx架构设计)】 https://www.bilibili.com/video/BV1yS4y1N76R/?p2&share_sourcecopy_web&vd_source4c2f33f3ba1a0dd45bfdf574befd0069 的p2-p7。从安装centos虚拟机到在虚拟机上安装ng…

考研数学|听完一遍汤家凤基础,1800都没思路,怎么办?

看了我这篇回答,保证你可以顺利的做1800题! 如果你听了汤家凤老师的课,但是做题没思路,请不要担心,也不要急着换老师,你很有可能是方法错了。 请你反思一下: 1、你是不是听完课立刻就去做题。…

Qt QWebSocket讲解

QWebSocket 是 Qt 框架中用于处理 WebSocket 通信的类。WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。这使得客户端和服务器之间可以进行实时交互,而无需频繁地建立和关闭连接。 QWebSocket 的基本使用 创建 QWebSocket 对象: 你可以创建一个…

LoRa物联网行业解决方案 1

1 行业应用 智慧停车 智能抄表 智慧牧场 智能生产 智能物流 智能健康 2 物联网智慧农场项目需求 3 为什么选lora? 4 设计 5 模块性能参数 sx1278 lora扩频无线模块 SEMTECH公司SX1278芯片 LoRa 扩频技术 通信距离10000米 SPI通信接口 mcu选型 硬件平台介绍 …

【Java多线程】7——阻塞队列线程池

7 线程池 ⭐⭐⭐⭐⭐⭐ Github主页👉https://github.com/A-BigTree 笔记仓库👉https://github.com/A-BigTree/tree-learning-notes 个人主页👉https://www.abigtree.top ⭐⭐⭐⭐⭐⭐ 如果可以,麻烦各位看官顺手点个star~&#x…

卷积篇 | 引入可改变核卷积AKConv:具有任意采样形状和任意数目参数的卷积核

前言:Hello大家好,我是小哥谈。可改变核卷积(AKConv)是一种深度学习中的卷积神经网络(CNN)结构,它可以根据需要自适应地改变其卷积核。AKConv相对于传统的卷积神经网络,具有更高的灵活性和适应性,可以在不同的任务和数据集上实现更好的性能。🌈 目录 🚀1…

【SpringBoot】-- Spring Validation参数校验框架

SPringle Validation是Spring提供的一个参数校验框架&#xff0c;使用预定义的注解完成参数校验。 一、自定义参数校验 1、引入Spring Valiation依赖 <!-- validation依赖--><dependency><groupId>org.springframework.boot</groupId><arti…

提升 RAG 效果的实践

提升 RAG 效果的实践 0. 引言1. 测试数据2. cohere/embed-multilingual-v3.0 的几组测试结果2-1. 第1组测试2-2. 第2组测试 3. BAAI/bge-m3 的几组测试结果3-1. 第1组测试 0. 引言 AI 大语言模型的主要应用方式之一就是 RAG&#xff0c;接下来计划陆续分享工作中提升 RAG 效果…

数据库之迁移常规操作(Mongodb篇)

一、部署mongodb数据库。 注&#xff1a;采用docker容器进行安装 部署详情参考此文 》》https://blog.csdn.net/u014642921/article/details/136022683 二、在admin用户创建testdb文档插入两条数据 admin> db.testdb.insertMany([{name:"1",age:1,addr:"…

【IP组播】PIM-SM的RP、RPF校验

目录 一&#xff1a;PIM-SM的RP 原理概述 实验目的 实验内容 实验拓扑 1.基本配置 2.配置IGP 3.配置PIM-SM和静态RP 4.配置动态RP 5.配置Anycast RP 二&#xff1a; RPF校验 原理概述 实验目的 实验内容 实验拓扑 1.基本配置 2.配置IGP 3.配置PIM-DM 4.RPF校…

centos7.5安装gitlab-runner,配置CI/CD流水线

一般不建议gitlab-server和gitlab-runner装在同一台服务器 第一步&#xff1a;安装gitlab-runner,最好和gitlab实例版本一致 # 下载官方gitlab-runner安装脚本 curl -L "https://packages.gitlab.com/install/repositories/runner/gitlab-runner/script.rpm.sh" | s…

springboot简历系统

摘 要 随着科学技术的飞速发展&#xff0c;社会的方方面面、各行各业都在努力与现代的先进技术接轨&#xff0c;通过科技手段来提高自身的优势&#xff0c;简历系统当然也不能排除在外。简历系统是以实际运用为开发背景&#xff0c;运用软件工程原理和开发方法&#xff0c;采用…