Spark-Scala语言实战(7)

在之前的文章中,我们学习了如何在IDEA中导入jars包,并做了一道例题,了解了RDD。想了解的朋友可以查看这篇文章。同时,希望我的文章能帮助到你,如果觉得我的文章写的不错,请留下你宝贵的点赞,谢谢。

Spark-Scala语言实战(6)-CSDN博客文章浏览阅读695次,点赞15次,收藏24次。今天我会给大家带来如何在IDEA中导入jars包,以及使用SparkRDD,并正确使用它们同时也会给大家讲解一道实训题例。希望在本篇文章中,大家有所收获。也欢迎朋友们到评论区下一起交流学习,共同进步。https://blog.csdn.net/qq_49513817/article/details/137121524?spm=1001.2014.3001.5502

今天开始的文章,我会带给大家如何在spark的中使用我们的RDD方法,今天学习RDD方法中的map,sortby,collect三种方法。

目录

一、知识回顾

二、RDD方法

1.map

2.sortby

3.collect

拓展-RDD和DStream

1.RDD和DStream的区别

2.RDD和DStream的联系


一、知识回顾

导入jars包的过程在上一篇文章中以及讲解的很清楚了,图文一步一步带着做。

主要就是进入Libraries 添加java,然后选择spark的jars文件夹即可

如果还有不懂的朋友可以直接评论问我。

在就是文件的这几行代码

import org.apache.spark.{SparkConf, SparkContext}

 val conf=new SparkConf().setMaster("local").setAppName("123456")
    val sc=new SparkContext(conf)

这是配置与方法,记住它们的作用。

现在,开始今天的学习吧

二、RDD方法

1.map

  • map()方法是一种基础的RDD转换操作,可以对RDD中的每一个数据元素通过某种函数进行转换并返回新的RDD
  • map()方法是转换操作,不会立即进行计算。
  • 转换操作是创建RDD的第二种方法,通过转换已有RDD生成新的RDD。因为RDD是一个不可变的集合,所以如果对RDD数据进行了某种转换,那么会生成一个新的RDD

例:

import org.apache.spark.{SparkConf, SparkContext}  
  
// 定义一个名为p1的Scala对象  
object p1 {  
  // 定义main方法,作为程序的入口点  
  def main(args: Array[String]): Unit = {  
    // 创建一个Spark配置对象,并设置运行模式为"local"(本地模式),应用程序名称为"p2"  
    val conf = new SparkConf().setMaster("local").setAppName("p2")  
      
    // 使用Spark配置对象创建一个SparkContext对象,SparkContext是Spark功能的入口点  
    val sc = new SparkContext(conf)  
      
    // 创建一个包含整数的列表,并使用parallelize方法将其转换为RDD  
    val ppp = sc.parallelize(List(1, 2, 3, 4, 5))  
      
    // 使用map操作将RDD中的每个元素乘以2,并返回一个新的RDD  
    val ppppp = ppp.map(x => x * 2)  
      
    //oreach方法遍历并打印每个元素  
    ppppp.collect().foreach(println)  
         
  }  
}

可以看到我们输出的在原列表上*2,达到了代码预期效果

2.sortby

  • sortBy()方法用于对标准RDD进行排序,有3个可输入参数,说明如下。
  • 1个参数是一个函数f:(T) => K,左边是要被排序对象中的每一个元素,右边返回的值是元素中要进行排序的值。
  • 2个参数是ascending,决定排序后RDD中的元素是升序的还是降序的,默认是true,即升序排序,如果需要降序排序那么需要将参数的值设置为false
  • 3个参数是numPartitions,决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的分区个数相等,即this.partitions.size
  • 第一个参数是必须输入的,而后面的两个参数可以不输入。

例:

import org.apache.spark.{SparkConf, SparkContext}  
    
object p1 {   
  def main(args: Array[String]): Unit = {     
    val conf = new SparkConf().setMaster("local").setAppName("p2")  
      
    // 使用配置好的conf对象创建一个SparkContext对象sc。   
    val sc = new SparkContext(conf)  
    // 使用SparkContext的parallelize方法将包含整数的序列转换成一个RDD。  
    // 这个RDD现在可以在Spark上并行处理。  
    val ppp = sc.parallelize(Seq(5, 1, 9, 3, 7))  
    // 对ppp RDD中的元素进行排序。  
    // 使用sortBy方法,并传递一个函数x => x作为参数,表示按照元素本身的值进行排序(升序)。  
    val pppp = ppp.sortBy(x => x)   
    // 这将返回一个包含RDD所有元素的数组,存储在ppppp中。  
    val ppppp = pppp.collect()  
      
    // 使用foreach方法遍历数组ppppp中的每个元素,并使用println函数打印它们。  
    ppppp.foreach(println)  

  }  
}

看下输出可以看到我们的元素已经排序了

3.collect

  • collect()方法是一种行动操作,可以将RDD中所有元素转换成数组并返回到Driver端,适用于返回处理后的少量数据。
  • 因为需要从集群各个节点收集数据到本地,经过网络传输,并且加载到Driver内存中,所以如果数据量比较大,会给网络传输造成很大的压力。
  • 因此,数据量较大时,尽量不使用collect()方法,否则可能导致Driver端出现内存溢出问题。

例:

import org.apache.spark.{SparkConf, SparkContext}

object p1 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("p2")
    val sc=new SparkContext(conf)
    val pp = sc.parallelize(Seq(1, 2, 3, 4, 5))
    val ppp = pp.collect()
    ppp.foreach(println)
  }
}

collect的作用是将RDD中的数据收集到驱动程序中,所以这里运行看不出区别。

拓展-RDD和DStream

在上一篇文章中,我们了解到了RDD,那么DStream是什么呢,我们先来了解一下:

DStream(离散流)是Spark Streaming提供的一种高级抽象,代表了一个持续不断的数据流。DStream的内部实际上是一系列持续不断产生的RDD,每个RDD包含特定时间间隔的数据。DStream的创建可以通过输入数据源如Kafka、Flume,或者通过对其他DStream应用高阶函数如map、reduce、join、window来实现。

1.RDD和DStream的区别

RDDDStream
定义弹性分布式数据集,是Spark中最基本的数据处理模型。离散流,是Spark Streaming提供的一种高级抽象,代表一个持续不断的数据流。
数据结构静态的、不可变的数据集,可以划分为多个分区。动态的、连续的数据流,内部由一系列RDD组成。
数据处理方式批处理,适用于静态数据的处理和分析。流处理,适用于实时数据流的处理和分析。
时间维度无特定的时间维度,主要关注数据的分区和处理。具有时间维度,每个RDD代表一段时间内的数据。
操作方式对整个RDD进行操作,结果生成新的RDD。对DStream进行操作,结果生成新的DStream,底层转换为RDD操作。
应用场景大规模数据的批处理任务,如机器学习、数据挖掘等。实时数据流处理任务,如日志分析、实时监控等。
容错性具有容错性,数据丢失可以自动恢复。继承了RDD的容错性特点。
与Spark的关系Spark的核心组件,用于构建各种数据处理和分析任务。Spark Streaming的核心组件,用于处理实时数据流。

2.RDD和DStream的联系

RDDDStream
基础构建单元RDD是Spark的基本数据处理单元。DStream基于RDD构建,每个时间间隔内的数据对应一个RDD。
计算模型RDD支持分布式计算模型,数据被划分为多个分区进行并行处理。DStream继承了RDD的计算模型,对流数据进行分布式处理。
容错性RDD具有容错性,可以自动恢复丢失的数据。DStream同样具有容错性,因为它基于RDD构建。
操作方式RDD提供了一系列转换操作(如map、reduce)和动作操作(如collect、save)。DStream也提供了类似的操作,这些操作最终会转换为底层RDD的操作。
数据处理能力RDD适用于批处理任务,可以对大规模数据集进行处理和分析。DStream适用于实时流处理任务,可以对连续的数据流进行实时分析和处理。
底层实现DStream内部实际上是由一系列RDD组成的,每个RDD代表一段时间内的数据。DStream的操作最终会转换为RDD的操作,利用RDD的分布式计算能力。
扩展性RDD可以通过自定义操作进行扩展,支持更多的数据处理场景。DStream同样可以通过自定义操作和转换函数进行扩展,以满足特定的实时处理需求

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/501129.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

conda使用记录

linux 使用conda创建新一个新的python环境过程 conda create -n recommendation_env python3.8.18 # 指定python版本 conda env list # 查看所有的环境 conda activate recommendation_env # 激活创建的新环境 pip install flask # 安装依赖 或者 pip install flask版本号 或者…

XUbuntu22.04之Typora快捷键Ctrl+5不生效问题(二百二十六)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏:多媒…

5、axios请求、动画、组件、路由重定向、UI组件

一、axios请求 Axios是一个基于Promise的HTTP状态库&#xff0c;封装ajax。ajax包含axios安装 npm install axios 引入 import axios form “axios” 1、get请求 <script> // 1.本页面引入 import axios from "axios";data() {return {imgSrc: ""…

Springboot+vue的高校科研信息管理系统(有报告)。Javaee项目,springboot vue前后端分离项目。

演示视频&#xff1a; Springbootvue的高校科研信息管理系统&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot vue前后端分离项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#x…

C++:list类

list的介绍 1. list 是可以在常数范围内在任意位置进行插入和删除的序列式容器,并且该容器可以前后双向迭代 2. list 的底层是双向链表结构&#xff0c;双向链表中每个元素存储在互不相关的独立节点中&#xff0c;在节点中通过指针指向其前一个元素和后一个元素。 3. list 与 …

蓝桥杯物联网竞赛_STM32L071_13_定时器

CubeMx配置LPTIM: counts internal clock events 计数内部时钟事件 prescaler 预分频器 updata end of period 更新期末 kil5配置&#xff1a; 中断回调函数完善一下&#xff1a; void HAL_LPTIM_AutoReloadMatchCallback(LPTIM_HandleTypeDef *hlptim){if(cnt ! 10) cnt…

算法——动态规划:01背包

原始01背包见下面这篇文章&#xff1a;http://t.csdnimg.cn/a1kCL 01背包的变种&#xff1a;. - 力扣&#xff08;LeetCode&#xff09; 给你一个 只包含正整数 的 非空 数组 nums 。请你判断是否可以将这个数组分割成两个子集&#xff0c;使得两个子集的元素和相等。 简化一…

数据结构:单调栈和单调队列

文章目录 一、单调栈1.1、栈的思想1.2、单调栈1.2.1、单调栈的基本应用&#xff1a;找出数组中每个元素右侧第一个更大的元素1.2.2、单调栈的基本应用&#xff1a;找出数组中每个元素左侧第一个更大的元素1.2.3、单调栈拓展1.2.4、单调栈LeetCode题单 二、单调队列2.1、队列的思…

【chemistry 5】脂类代谢、氨基酸代谢、核酸代谢

&#x1f31e;欢迎来到生物化学的世界 &#x1f308;博客主页&#xff1a;卿云阁 &#x1f48c;欢迎关注&#x1f389;点赞&#x1f44d;收藏⭐️留言&#x1f4dd; &#x1f31f;本文由卿云阁原创&#xff01; &#x1f4c6;首发时间&#xff1a;&#x1f339;2024年3月29日&…

Discourse 用户可以自己修改用户名吗

Discourse 是可以修改用户名的&#xff0c;但用户修改自己的用户名会有时间的限制。 这是因为根据官方的说法就是当用户修改用户名后可能会导致内容的失效等问题。 在默认的安装配置下&#xff0c;用户可以在完成注册后的 3 天自己对用户名进行修改。 3 天以后&#xff0c;用…

【CASS精品教程】CASS11台阶画法大全

文章目录 一、无边台阶二、有边台阶三、圆弧无边台阶四、U型台阶五、曲线U型台阶六、L型台阶一、无边台阶 点击【居民地】→【房屋附属】→【台阶】: 选择【两点边】即可。 两点边的绘制方法是,依次点击四个点,或者点击三个点后空格,注意台阶缺口(有白色线条)为下。 四…

HarmonyOS实战开发-Stage模型下Ability的创建和使用

介绍 本篇Codelab基于Stage模型&#xff0c;对Ability的创建和使用进行讲解。首先在课程中我们将带领大家使用DevEco Studio创建一个Stage模型Ability&#xff0c;并使用UIAbilityContext启动另一个Ability&#xff0c;然后借助Want&#xff0c;在Ability之间传递参数&#xf…

基于SpringBoot的“财务管理系统”的设计与实现(源码+数据库+文档+PPT)

基于SpringBoot的“财务管理系统”的设计与实现&#xff08;源码数据库文档PPT) 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SpringBoot 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 系统总体结构图 系统登录界面图 管理员功能界面图…

Deepspeed、ZeRO、FSDP、ZeRO-Offload、all reduce、reduce-scatter

Transformer为基础的大模型应该如何并行 数据并行。但是如果模型太大放不到一块卡上就没用了。为了解决把参数放到一块卡上的问题&#xff0c;演进出了论文Zero的思想&#xff0c;分为Zero-DP和Zero-R两部分。Zero-DP是解决Data parallel的问题&#xff0c;并行过程中内容不够…

Plecs电力电子仿真专业教程-软件操作

Plecs仿真软件基本操作方法&#xff1a; 从连线中引出线&#xff1a;Ctrl 鼠标左键 设置元件参数&#xff1a;双击元件&#xff0c;进行设置&#xff0c;若要显示参数&#xff0c;则在参数后的方框打勾。 CTRL E ---- 仿真参数设置 Ctrl T -----开始仿真 CtrlF …

如何检查电脑的最近历史记录?这里提供详细步骤

如果你怀疑有人在使用你的计算机,并且你想查看他们在做什么,下面是如何查看是否有访问内容的痕迹。 如何检查我的计算机的最近历史记录 要检查计算机的最近历史记录,应该从web浏览器历史记录开始,然后移动到文件。但是,可以修改或删除浏览器历史记录,也可以隐藏Windows…

Redis 和 Mysql 数据库数据如何保持一致性????

1、前言 我们在实际项目中经常会使用到Redis缓存用来缓解数据库压力&#xff0c;但是当更新数据库时&#xff0c;如何保证缓存及数据库一致性&#xff0c;一般我们采用延时双删策略。 目前系统中常用的做法是一个查询接口&#xff0c;先查询Redis&#xff0c;如果不存在则查询…

Vue——案例01(查询用户)

一、案例实现页面 二、案例实现效果 1. 查询效果 2. 年龄升序 3. 年龄降序 4. 原顺序 三、案例实现思路 1. 定义界面所需标签样式 <div id"app"><h2>查询用户:</h2><input type"text" placeholder"请输入名字"/><b…

RabbitMQ高级笔记

视频链接&#xff1a;【黑马程序员RabbitMQ入门到实战教程】 文章目录 1.发送者的可靠性1.1.生产者重试机制1.2.生产者确认机制1.3.实现生产者确认1.3.1.开启生产者确认1.3.2.定义ReturnCallback1.3.3.定义ConfirmCallback 2.MQ的可靠性2.1.数据持久化2.1.1.交换机持久化2.1.2.…

Python基础之函数

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言1.收集函数2.收集关键字函数3.分配关键字函数4.调用自己编写的模块函数5.匿名函数lambda6.lambda函数和filter函数联用7.lambda函数和map函数联用 前言 1.收集…