Spark Core

Spark Core

一、Spark RDD

RDD概述

1.RDD基础

在这里插入图片描述

2.RDD源代码描述

在这里插入图片描述

3.RDD特性

在这里插入图片描述

4.Spark宽窄依赖

在这里插入图片描述

RDD创建

在驱动器中创建RDD
在这里插入图片描述

1.parallelize

在这里插入图片描述

读取外部数据集创建RDD

2.textFile

在这里插入图片描述
在这里插入图片描述

RDD操作

在这里插入图片描述

缓存rdd到内存
在这里插入图片描述

1.RDD转化操作

在这里插入图片描述

2.常见的转化操作

在这里插入图片描述

3.RDD行动操作

在这里插入图片描述

4.常见的行动操作

在这里插入图片描述

Spark传递函数

在这里插入图片描述

在这里插入图片描述

RDD惰性求值与持久化(缓存)

1.惰性求值

在这里插入图片描述

2.缓存方法
cache()
persist()

移除缓存

unpersist()
3.持节化级别

在这里插入图片描述

RDD练习

Test-01
scala>val rdd1=sc.parallelize(List(5,6,4,2,9,7,8,1,10))

scala>val rdd2=rdd1.map(x=>x*2)

scala>val sort=rdd2.sortBy(x=>x,true)

scala>val rdd3=sort.filter(_>=10)

scala>rdd3.collect()

在这里插入图片描述

Test-02
scala>val rdd1=sc.parallelize(Array("a b c","d e f","h i j"))

scala>val rdd2=rdd1.flatMap(_.split(" "))

scala>rdd2.collect()

在这里插入图片描述

Test-03

RDD并集、交集和去重操作

scala>val rdd1=sc.parallelize(List(1,2,3,4))

scala>val rdd2=sc.parallelize(List(5,6,4,3))

scala>val rdd3=rdd1.union(rdd2)

scala>rdd3.collect()

scala>val rdd4=rdd1.intersection(rdd2)

scala>rdd4.collect()

scala>rdd3.distinct.collect

在这里插入图片描述

Test-04

RDD求并集并按照Key进行分组

scala>val rdd1=sc.parallelize(List(("tom",1),("jerry",3),("kitty",2)))

scala>val rdd2=sc.parallelize(List(("jerry",2),("tom",1),("shuke",2)))

scala>val rdd3=rdd1.join(rdd2)

scala>rdd3.collect()

scala>val rdd4=rdd1.union(rdd2)

scala>rdd4.groupByKey.collect

在这里插入图片描述

Test-05

RDD聚合

scala>val rdd1=sc.parallelize(List(1,2,3,4,5))

scala>val rdd2=rdd1.reduce(_+_)

scala>rdd2

在这里插入图片描述

Test-06

RDD按照Key进行聚合,按照value进行排序

("tom",1)把值1作为key

(t=>(t._2,t._1)) ====(1,"tom")

最后再反转输出
scala>val rdd1=sc.parallelize(List(("tom",1),("kitty",2),("shuke",1)))

scala>val rdd2=sc.parallelize(List(("jerry",2),("tom",3),("shuke",2),("kitty",5)))

scala>val rdd3=rdd1.union(rdd2)

scala>val rdd4=rdd3.reduceByKey(_+_)

scala>rdd4.collect()

scala>val rdd5=rdd4.map(t=>(t._2,t._1)).sortByKey(false).map(t=>(t._2,t._1))

scala>rdd5.collect()

在这里插入图片描述

二、Spark RDD键值对操作

Pair RDD(键值对)

1.概念

在这里插入图片描述

2.创建方式
A.直接在程序中创建
B.将普通的RDD转化为Pair RDD
	(K,V)=>Pair RDD
C.数据格式存储为键值对格式的读取(json)
3.创建Pair RDD-三种语言

在这里插入图片描述

file.txt

Hadoop HDFS MapReduce
Spark Core SQL Streaming

在Scala中使用第一个单词为键创建一个Pair RDD

package com.saddam.spark.SparkCore

import org.apache.spark.{SparkConf, SparkContext}

object PairRDD {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf()
      .setMaster("local[2]")
      .setAppName("pair rdd")

    val sc=new SparkContext(conf)

    val lines = sc.textFile("D:\\Spark\\DataSets\\file.txt")

    val pairs=lines.map(x=>(x.split(" ")(0),x))
   
    pairs.foreach(p=>{println(p._1+":"+p._2)})
    /*
    Hadoop:Hadoop HDFS MapReduce
    Spark:Spark Core SQL Streaming
     */
	sc.stop()
  }
}

在Java中使用第一个单词为键创建一个Pair RDD

在这里插入图片描述

在Python中使用第一个单词为键创建一个Pair RDD

在这里插入图片描述

4.Pair RDD转化操作

在这里插入图片描述

元素过滤

对第二个元素进行筛选

a.读取文件生成rdd
b.将rdd转化为pair rdd
c.使用filter进行过滤

在这里插入图片描述

聚合操作

在这里插入图片描述

package com.saddam.spark.SparkCore

import org.apache.spark.{SparkConf, SparkContext}

object PairRDD_聚合操作 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf()
      .setMaster("local[2]")
      .setAppName("pair rdd")

    val sc=new SparkContext(conf)

    val rdd=sc.parallelize(List(("panda",0),("pink",3),("pirate",3),("panda",1),("pink",4)))

    val rdd1=rdd.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))

    rdd1.foreach(x=>{println(x._1+":"+x._2._1/x._2._2)})

  }

}

panda:0
pirate:3
pink:3
数据分组

在这里插入图片描述

package com.saddam.spark.SparkCore

import org.apache.spark.{SparkConf, SparkContext}

object PairRDD_数据分组 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf()
      .setMaster("local[2]")
      .setAppName("pair rdd")

    val sc=new SparkContext(conf)

    val rdd=sc.parallelize(List(("panda",0),("pink",3),("pirate",3),("panda",1),("pink",4)))

    val rdd1=rdd.groupByKey()

    //持久化
    val rdd2 = rdd1.persist()

//    println(rdd2.first()._1+":"+rdd2.first()._2)

    rdd2.foreach(x=>{println(x._1+":"+x._2)})
  }

}

pirate:CompactBuffer(3)
panda:CompactBuffer(0, 1)
pink:CompactBuffer(3, 4)
连接操作

在这里插入图片描述

package com.saddam.spark.SparkCore

import org.apache.spark.{SparkConf, SparkContext}

object PairRDD_连接操作 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf()
      .setMaster("local[2]")
      .setAppName("pair rdd")

    val sc=new SparkContext(conf)

    val rdd1=sc.parallelize(List(("frank",30),("bob",9),("silly",3)))

    val rdd2=sc.parallelize(List(("frank",88),("bob",12),("marry",22),("frank",21),("bob",22)))

    //内连接
    val rdd3=rdd1.join(rdd2)
    rdd3.foreach(x=>{println(x._1+":"+x._2)})
    /*
frank:(30,88)
bob:(9,12)
frank:(30,21)
bob:(9,22)
     */

    //左外连接
    val rdd4=rdd1.leftOuterJoin(rdd2)
    rdd4.foreach(x=>{println(x._1+":"+x._2)})
    /*
silly:(3,None)
frank:(30,Some(88))
frank:(30,Some(21))
bob:(9,Some(12))
bob:(9,Some(22))
     */

    //右外连接
    val rdd5=rdd1.rightOuterJoin(rdd2)
    rdd5.foreach(x=>{println(x._1+":"+x._2)})
    /*
frank:(Some(30),88)
frank:(Some(30),21)
marry:(None,22)
bob:(Some(9),12)
bob:(Some(9),22)
     */

    //全连接
    val rdd6=rdd1.fullOuterJoin(rdd2)
    rdd6.foreach(x=>{println(x._1+":"+x._2)})
    /*
frank:(Some(30),Some(88))
frank:(Some(30),Some(21))
silly:(Some(3),None)
marry:(None,Some(22))
bob:(Some(9),Some(12))
bob:(Some(9),Some(22))
     */
  }
}
数据排序

在这里插入图片描述

package com.saddam.spark.SparkCore

import org.apache.spark.{SparkConf, SparkContext}

object PairRDD_数据排序 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf()
      .setMaster("local[2]")
      .setAppName("pair rdd")

    val sc=new SparkContext(conf)

    var rdd=sc.parallelize(List(("frank",30),("bob",9),("silly",3)))

    //排序
    //按照字典序
    val rdd2=rdd.sortByKey()

    rdd2.collect().foreach(println)
    /*
(bob,9)
(frank,30)
(silly,3)
     */

    //按照值排序
    val rdd3=rdd.map(x=>(x._2,x._1)).sortByKey().map(x=>(x._2,x._1)) //sortByKey(false)-降序
    rdd3.collect().foreach(println)
/*
(silly,3)
(bob,9)
(frank,30)
 */
  }
}
5.Pair RDD行动操作

在这里插入图片描述

6.数据分区
数据分区原因

在这里插入图片描述

数据分区操作

在这里插入图片描述

示例:PageRank算法

计算网页价值
在这里插入图片描述

package com.saddam.spark.SparkCore

import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object PageRank {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf()
      .setMaster("local[2]")
      .setAppName("pair rdd")

    val sc=new SparkContext(conf)

    val links=sc.parallelize(List(("A",List("B","C")),("B",List("A","C")),("C",List("A","B","D")),("D",List("C"))))
      .partitionBy(new HashPartitioner(10)).persist()

    var ranks = links.mapValues(v => 1.0)

    for (i<-0 until 10){
      val contributions=links.join(ranks).flatMap{
        case (pageId,(links,rank))=>links.map(dest=>(dest,rank/links.size))
      }
      ranks=contributions.reduceByKey((x,y)=>x+y).mapValues(v=>0.15+0.85*v)
    }
    ranks.sortByKey().collect().foreach(println)
    ranks.saveAsTextFile("D:\\Spark\\OutPut\\PageRank")
  }
/*
(A,0.9850243302878132)
(B,0.9850243302878132)
(C,1.4621033282930214)
(D,0.5678480111313515)
 */
}
7.宽窄依赖
概念

在这里插入图片描述

窄依赖优点

在这里插入图片描述

宽窄依赖的使用

在这里插入图片描述

三、Spark数据读取与保存

Spark数据源

在这里插入图片描述

概述及分类

在这里插入图片描述

文件格式

在这里插入图片描述

文件系统

在这里插入图片描述

Spark SQL中的结构化数据

在这里插入图片描述

Spark访问数据库系统

1.MySQL数据库连接

在这里插入图片描述

1.代码实战
package com.saddam.spark.SparkCore

import java.sql.{DriverManager, ResultSet}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.JdbcRDD

/**
  * Spark访问MySQL数据库
  */
object SparkConnectMySQL {
  //第一步:定义函数:创建一个连接连接数据库
  def createConnection()={
    Class.forName("com.mysql.jdbc.Driver").newInstance()
    DriverManager.getConnection("jdbc:mysql://localhost:3306/saddam","root","324419")
  }

  //第二步:定义一个函数:映射
  def extractValues(r:ResultSet)={
    (r.getInt(1),r.getString(2))
  }

  def main(args: Array[String]): Unit = {
    val conf=new SparkConf()
      .setMaster("local[2]")
      .setAppName("pair")

    val sc=new SparkContext(conf)

    //第三步:创建RDD读取数据
    val data=new JdbcRDD(
      sc,createConnection,
      "select * from users where ?<=id and id<=?",
      lowerBound = 1,
      upperBound = 3,
      numPartitions = 2,
      mapRow = extractValues
    )

    data.foreach(println)
    sc.stop()

  }

}

(1,admin)
(1,zhangsan)
(1,lisi)
2.问题发现与解决
javax.net.ssl.SSLException MESSAGE: 
closing inbound before receiving peer's close_notify
    
解决:useSSL=false
jdbc:mysql://localhost:3306/saddam?useSSL=false
2.Cassandra连接

开源的NoSQL数据库

在这里插入图片描述

3.HBase连接

在这里插入图片描述

四、Spark编程进阶

Spark的高级特性

累加器

广播变量

基于分区的操作

与外部程序间的广播

数值RDD操作

Spark的高级特性

在这里插入图片描述

累加器

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/447946.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

面试官:MySQL的七种日志

哪七种日志日志&#xff1f; 错误日志&#xff08;error log&#xff09; error log主要记录MySQL在启动、关闭或者运行过程中的错误信息&#xff0c;在MySQL的配置文件my.cnf中&#xff0c; 可以通过log-error/var/log/mysqld.log 执行mysql错误日志的位置。 慢查询日志&a…

kpm算法

这里写自定义目录标题 介绍KMP算法的核心思想KMP算法的步骤例题问题分析图解代码输出结果 介绍 KMP&#xff08;Knuth-Morris-Pratt&#xff09;算法是一种用于在一个文本串&#xff08;text&#xff09;中查找一个模式串&#xff08;pattern&#xff09;的高效字符串匹配算法…

基于java的足球联赛管理系统(程序+数据库+文档)

** &#x1f345;点赞收藏关注 → 私信领取本源代码、数据库&#x1f345; 本人在Java毕业设计领域有多年的经验&#xff0c;陆续会更新更多优质的Java实战项目&#xff0c;希望你能有所收获&#xff0c;少走一些弯路。&#x1f345;关注我不迷路&#x1f345;** 一、研究背景…

大数据与云计算

目录 一、大数据时代二、云计算——大数据的计算三、云计算发展现状四、云计算实现机制五、云计算压倒性的成本优势 一、大数据时代 我们先来看看百度关于 “大数据”&#xff08;Big Data&#xff09;的搜索指数。 可以看出&#xff0c;“大数据” 这个词是从2012年才引起关注…

趣学前端 | Taro迁移完成之后,总结了一些踩坑经验

背景 四月份的时候&#xff0c;尝试将老的移动端项目改造成多端。因为老项目使用的React框架&#xff0c;综合考量&#xff0c;保障当前业务开发的进度同时&#xff0c;进行项目迁移&#xff0c;所以最后选择了Taro框架。迁移成本会低一些&#xff0c;上手快一些。 上个月&am…

docker部署在线聊天室平台Fiora

Fiora 是一款开源免费的在线聊天系统 https://github.com/yinxin630/fiora 部署 创建docker网络 docker network create fiora-networkdocker-compose部署 vim docker-compose.yml version: 3 services:fiora_redis:image: rediscontainer_name: fiora_redisrestart: alway…

Linux 地址空间

目录 一、程序地址空间 1、虚拟地址 Makefile新写法 2、进程地址空间分布 3、栈&堆 4、static修饰局部变量 5、字符串常量不可修改 6、虚拟地址与物理地址的联系 二、CPU读取程序全过程 1、形成可执行程序 2、生成虚拟地址 3、程序的启动 4、创建进程 5、地…

11---数字温度 OR 湿度传感器电路设计

视频链接 数字温度or湿度传感器电路设计02_哔哩哔哩_bilibili 数字温度 OR 湿度传感器电路设计 1、温湿度传感器 DHT11 DHT11是一款有已校准数字信号输出的温湿度传感器。 其精度湿度-5%RH&#xff0c; 温度-2℃&#xff0c;量程湿度20-90%RH&#xff0c; 温度0~50℃。 D…

maven运行spring boot项目

我用idea想跑一个整合flowable的spring boot项目&#xff0c;但是跑不起来&#xff0c;原因是jdk版本不够高。但是我的idea是2018版本&#xff0c;最高只能支持到jdk11。就想办法不用idea编译、打包、运行项目。因为spring boot是maven项目&#xff0c;所以可以用maven进行打包…

纵行科技邀您相聚“2024全球物流技术大会”

3月27-29日&#xff0c;中国物流与采购联合将在海口召开“2024全球物流技术大会”&#xff0c;大会将全方位、多层次、立体化展现前沿物流技术的发展&#xff0c;加速前沿技术装备在物流行业的应用。作为供应链物流物联网技术及产品厂商的代表&#xff0c;纵行科技将携“ZETag云…

vite vue3 路由配置@找不到文件问题描述

问题描述 在vite.config.js文件中配置路由的时候&#xff0c;添加路由界面&#xff0c;找不到指定的文件&#xff0c;提示错误&#xff0c;如图所示&#xff1a; 但是换成 ./ 或者 ../ 就正常了&#xff0c;也没有报错问题 解决办法 1.安装一个path的插件 npm install --sav…

什么是Git引用和分支?

一. 引言 什么是Git引用和分支&#xff1f;比如我在 Github 上一个项目的 .git/refs目录下&#xff1a; ├─heads │ dev │ master │ ├─remotes │ └─origin │ master │ └─tags refs 目录下包含了 heads、remote、tags 三个子目录&#xff0…

300分钟吃透分布式缓存-22讲:怎么认识和应用Redis内部数据结构?

Redis 内部数据结构 RdeisDb Redis 中所有数据都保存在 DB 中&#xff0c;一个 Redis 默认最多支持 16 个 DB。Redis 中的每个 DB 都对应一个 redisDb 结构&#xff0c;即每个 Redis 实例&#xff0c;默认有 16 个 redisDb。用户访问时&#xff0c;默认使用的是 0 号 DB&…

【CSS】简单的抽屉面板展开收起自然过渡效果的css

目录 效果展示css固定梯形按钮至抽屉面板中间梯形按钮css过渡动画 效果展示 1.收起时点击蓝色梯形按钮展开 2. 展开时点击蓝色按钮收起 3.展开收起时需要过渡自然&#xff0c;有抽屉推拉效果 css 固定梯形按钮至抽屉面板中间 .toggle{ position: absolute;left:-21px;top…

【CSP】2022-03-3 计算资源调度器 stl大模拟使用map优化索引 完整思路+完整的写代码过程(遇到的问题)+完整代码

2022-03-3 计算资源调度器 stl大模拟使用map优化索引 2022-03-3 计算资源调度器 stl大模拟使用map优化索引思路写代码的过程&#xff08;遇到的问题&#xff09;完整代码 2022-03-3 计算资源调度器 stl大模拟使用map优化索引 在联系了之前那么多道stl大模拟题后&#xff0c;终…

Flutter does not exist

Flutter does not exist 原因&#xff1a;Generated.config 配置文件内路径缺失 原因&#xff1a;Flutter SDK缺失 通过配置文件查到Flutter SDK在本地的存放位置FLUTTER_FRAMEWORK_DIR/Users/haijunyan/Documents/flutter/bin/cache/artifacts/engine/ios 真机所需&#xf…

day06、07-MySQL

文章目录 一、MySQL概述1.1 安装1.2 数据模型1.3 SQL简介1.3.1 SQL通用语法1.3.2 分类 二. 数据库设计-DDL2.1 项目开发流程2.2 数据库操作2.2.1 查询数据库2.2.2 创建数据库2.2.3 使用数据库2.2.4 删除数据库 2.3 图形化工具2.3.1 介绍2.3.2 安装2.3.3 使用2.2.3.1 连接数据库…

【数据结构与算法】贪心算法题解(一)

这里写目录标题 一、455. 分发饼干二、56. 合并区间三、53. 最大子数组和 一、455. 分发饼干 简单 假设你是一位很棒的家长&#xff0c;想要给你的孩子们一些小饼干。但是&#xff0c;每个孩子最多只能给一块饼干。 对每个孩子 i&#xff0c;都有一个胃口值 g[i]&#xff0c;这…

五种常见户外LED显示屏模组故障维修方法

在户外LED显示屏的使用过程中&#xff0c;可能会出现各种故障&#xff0c;例如连续不亮、坏点等问题&#xff0c;这通常是由LED显示屏模组上出现问题所导致的。以下是针对五种常见户外LED显示屏模组故障的解决办法&#xff1a; 连续不亮或有异常&#xff1a; 这种情况通常导致L…

matlab去除图片上的噪声

本问题来自CSDN-问答板块,题主提问。 如何利用matlab去除图片上的噪声? 一、运行效果图 左边是原图,右边是去掉噪音后的图片。 二、中文说明 中值滤波是一种常见的图像处理技术,用于去除图像中的噪声。其原理如下: 1. 滤波器移动:中值滤波器是一个小的窗口,在图像上移…