实验五 Spark SQL编程初级实践

Spark SQL编程初级实践

  • Spark SQL基本操作

将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ "id":1 , "name":" Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Scala语句完成下列操作:

  1. 查询所有数据;
  2. 查询所有数据,并去除重复的数据;
  3. 查询所有数据,打印时去除id字段;
  4. 筛选出age>30的记录;
  5. 将数据按age分组;
  6. 将数据按name升序排列;
  7. 取出前3行数据;
  8. 查询所有记录的name列,并为其取别名为username;
  9. 查询年龄age的平均值;
  10. 查询年龄age的最小值。

  • 编程实现将RDD转换为DataFrame

源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

  • 编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表6-2所示的两行数据。

表6-2 employee表原有数据

id

name

gender

Age

1

Alice

F

22

2

John

M

25

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表6-3所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。

表6-3 employee表新增数据

id

name

gender

age

3

Mary

F

26

4

Tom

M

23

实验一 :Spark SQL基本操作

1)
// 导入必要的库
import org.apache.spark.sql.SparkSession

// 创建SparkSession
val spark = SparkSession.builder()
  .appName("Spark SQL Basic Operations")
  .getOrCreate()

// 读取JSON文件创建DataFrame
 	val df = spark.read.json("file:///home/hadoop/employee.json")
          // (1) 查询所有数据
df.show()
(2)查询所有数据,并去除重复的数据
df.distinct().show()

(3)
查询所有数据,打印时去除id字段
df.drop("id").show()

(4)
筛选出age>30的记录
df.filter("age > 30").show()

(5)
将数据按age分组
df.groupBy("age").count().show()


(6)
将数据按name升序排列
df.orderBy("name").show()


(7)
取出前3行数据
df.limit(3).show()

(8)
查询所有记录的name列,并为其取别名为username
df.select($"name".alias("username")).show()

(9)
查询年龄age的平均值
df.selectExpr("avg(age)").show()

(10)
查询年龄age的最小值
df.selectExpr("min(age)").show()

实验二 :编程实现将RDD转换为DataFrame

编程代码:

import org.apache.spark.sql.{SparkSession, Row}  
import org.apache.spark.sql.types._  
  
object RDDToDataFrameExample {  
  def main(args: Array[String]): Unit = {  
    // 创建SparkSession  
    val spark = SparkSession.builder()  
      .appName("RDD to DataFrame Example")  
      .master("local[*]") // 使用本地模式,如果连接到集群请更改这里  
      .getOrCreate()  
  
    import spark.implicits._  
  
    // 指定employee.txt文件的位置  
    val inputFilePath = "file:///home/hadoop/employee.txt"  
  
    // 从文本文件读取数据创建RDD  
    val rdd = spark.sparkContext.textFile(inputFilePath)  
  
    // 定义DataFrame的schema  
    val schema = StructType(Array(  
      StructField("id", IntegerType, nullable = false),  
      StructField("name", StringType, nullable = false),  
      StructField("age", IntegerType, nullable = false)  
    ))  
  
    // 将RDD转换为DataFrame  
    val dataFrame = spark.createDataFrame(rdd.map { line =>  
      val parts = line.split(",")  
      Row(parts(0).toInt, parts(1), parts(2).toInt)  
    }, schema)  
  
    // 显示DataFrame内容  
    dataFrame.show(false)  
  
    // 按照指定格式打印所有数据  
    dataFrame.collect().foreach { row =>  
      println(s"id:${row.getAs[Int]("id")},name:${row.getAs[String]("name")},age:${row.getAs[Int]("age")}")  
    }  
  
    // 停止SparkSession  
    spark.stop()  
  }  
}

 命令

/usr/local/spark-3.5.1/bin/spark-submit --class "RDDToDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar

 具体操作参考博客

如何安装sbt(sbt在ubuntu上的安装与配置)(有详细安装网站和图解)-CSDN博客

实验三:编程实现利用DataFrame读写MySQL的数据

mysql代码

CREATE DATABASE sparktest;  
USE sparktest;  
  
CREATE TABLE employee (  
  id INT PRIMARY KEY,  
  name VARCHAR(50),  
  gender CHAR(1),  
  age INT  
);  
  
INSERT INTO employee (id, name, gender, age) VALUES (1, 'Alice', 'F', 22);  
INSERT INTO employee (id, name, gender, age) VALUES (2, 'John', 'M', 25);

如何安装msyql参考博客

 在ubuntu上安装mysql(在线安装需要)-CSDN博客

如何安装mysl驱动程序jar包-CSDN博客

编程代码

import org.apache.spark.sql.{SparkSession, Row}  
import java.util.Properties  
import org.apache.spark.sql.SparkSession  
import org.apache.spark.sql.Dataset  
import org.apache.spark.sql.Row  
import org.apache.spark.sql.functions.max  
import org.apache.spark.sql.functions.sum  
  
object MySQLDataFrameExample {  
  def main(args: Array[String]): Unit = {  
    // 创建SparkSession  
    val spark = SparkSession.builder()  
      .appName("MySQL DataFrame Example")  
      .master("local[*]") // 使用本地模式,如果连接到集群请更改这里  
      .getOrCreate()  
  
    import spark.implicits._  
  
    // 配置MySQL JDBC连接  
    val jdbcProperties = new Properties()  
    jdbcProperties.setProperty("user", "root")  
    jdbcProperties.setProperty("password", "mysql")  
    jdbcProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")  
  
    // 定义MySQL的JDBC连接URL  
    val jdbcUrl = "jdbc:mysql://localhost:3306/sparktest"  
  
    // 创建DataFrame以插入数据  
    val newEmployeeData = Seq(  
      (3, "Mary", "F", 26),  
      (4, "Tom", "M", 23)  
    ).toDF("id", "name", "gender", "age")  
  
    // 将DataFrame数据插入到MySQL的employee表中  
    newEmployeeData.write  
      .mode("append") // 使用append模式来添加数据,而不是覆盖  
      .jdbc(jdbcUrl, "employee", jdbcProperties)  
  
    // 从MySQL读取employee表的数据  
    val employeeDF = spark.read  
      .jdbc(jdbcUrl, "employee", jdbcProperties)  
  
    // 打印age的最大值  
    val maxAge = employeeDF.agg(max("age")).collect()(0).getAs[Int](0)  
    println(s"Max age: $maxAge")  
  
    // 打印age的总和  
    val sumAge = employeeDF.agg(sum("age")).collect()(0).getAs[Long](0)  
    println(s"Sum of ages: $sumAge")  
  
    // 停止SparkSession  
    spark.stop()  
  }  
}

编程详细步骤参考

 如何安装sbt(sbt在ubuntu上的安装与配置)(有详细安装网站和图解)-CSDN博客

 运行命令

/usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar  --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar

产生错误

主要问题都在实验三中,因为实验三中涉及到一个mysql数据库连接

命令更新为

/usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar  --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar

加了一个mysl驱动的jar的引用

如何安装mysql驱动参考博客

如何安装mysl驱动程序jar包-CSDN博客

打包失败

这个问题是代码错误

代码未引入一些包

加上下面这些就可以了

import org.apache.spark.sql.{SparkSession, Row}  

import java.util.Properties  

import org.apache.spark.sql.SparkSession  

import org.apache.spark.sql.Dataset  

import org.apache.spark.sql.Row  

import org.apache.spark.sql.functions.max  

import org.apache.spark.sql.functions.sum  

运行失败

未引入mysl驱动程序

要下载mysql驱动

采用命令引入

/usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar  --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar

参考链接

如何安装sbt(sbt在ubuntu上的安装与配置)(有详细安装网站和图解)-CSDN博客

在ubuntu上安装mysql(在线安装需要)-CSDN博客

在ubuntu上安装mysql(在线安装需要)-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/572728.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

STM32学习和实践笔记(21):定时器中断实验

通用定时器配置步骤如下: 第一步:使能定时器时钟 RCC_APB1PeriphClockCmd(RCC_APB1Periph_TIM4,ENABLE);//使能TIM4时钟 第二步:初始化定时器参数,包含自动重装值,分频系数,计数方式等 voidTIM_TimeBaseInit(TIM_T…

C++编译器如何实现 const(常量)?

C编译器如何实现 const(常量)? 表面上看,我们在讨论 “编译器怎么保证一个常量不会被程序员强行改变呢?”;其实,我们说的是:如果你表明自己就是要强行修改一个常量,那么…

每日一题:托普利茨矩阵

给你一个 m x n 的矩阵 matrix 。如果这个矩阵是托普利茨矩阵,返回 true ;否则,返回 false 。 如果矩阵上每一条由左上到右下的对角线上的元素都相同,那么这个矩阵是 托普利茨矩阵 。 示例 1: 输入:matrix…

【原创教程】EPLAN如何制作专属的封面

想要给EPLAN制作专属封面吗?没问题,我来给你支个招。在EPLAN设计电气图纸时,封面就是第一印象,得好好弄。咱们以口罩机项目为例,来看看怎么做吧! 首先,得新建个封面。在项目属性里找到表格名称,点那个数值下拉菜单,选择“查找”。在弹出的表格里挑个你喜欢的模版,点击…

【IC设计】边沿检测电路(上升沿、下降沿、双沿,附带源代码和仿真波形)

文章目录 边沿检测电路的概念上升沿检测电路下降沿检测电路双边沿检测电路代码和仿真RTL代码Testbench代码仿真波形 参考资料 边沿检测电路的概念 边沿检测指的是检测一个信号的上升沿或者下降沿,如果发现了信号的上升沿或下降沿,则给出一个信号指示出来…

OurBMC开源大赛高校获奖队伍专访来啦!

精彩纷呈的 OurBMC 开源大赛已告一段落,经历为期四个月的实战,各个参赛队伍也积淀了丰富的实践经验与参赛心得。本期,社区特别邀请 OurBMC 开源大赛获奖高校团队分享「走进OurBMC开源大赛,共同践行开放包容、共创共赢的开源精神」…

【春秋云境】文件上传漏洞合集

CVE-2022-30887 1.题目简介 2.CVE-2022-30887简介 使用工具: 蚁剑 burpsuite 一句话木马 3.渗透测试 输入用户名密码进行抓包 猜测账号密码 无有用信息,根据页面现有信息找到作者邮箱: mayuri.infospacegmail.com,猜测密码为&a…

每日一题:跳跃游戏II

给定一个长度为 n 的 0 索引整数数组 nums。初始位置为 nums[0]。 每个元素 nums[i] 表示从索引 i 向前跳转的最大长度。换句话说&#xff0c;如果你在 nums[i] 处&#xff0c;你可以跳转到任意 nums[i j] 处: 0 < j < nums[i] i j < n 返回到达 nums[n - 1] 的最…

YOLOv3没有比这详细的了吧

YOLOv3&#xff1a;目标检测基于YOLOv2的改进 在目标检测领域&#xff0c;YOLO&#xff08;You Only Look Once&#xff09;系列以其出色的性能和速度而闻名。YOLOv3作为该系列的第三个版本&#xff0c;不仅继承了前身YOLOv2的优势&#xff0c;还在多个方面进行了创新和改进。…

机器学习理论基础—支持向量机的推导(一)

机器学习理论基础—支持向量机的推导 算法原理 SVM:从几何角度&#xff0c;对于线性可分数据集&#xff0c;支持向量机就是找距离正负样本都最远的超平面&#xff0c;相比于感知机&#xff0c;其解是唯一的&#xff0c;且不偏不倚&#xff0c;泛化性能更好。 超平面 n维空间…

如何拿取 macOS 系统中的图标文件

如何拿取 macOS 系统中的图标文件 比如在 Finder 中看到这个文件夹图标很好看&#xff0c;想用一下&#xff0c;就是不知道它在什么位置&#xff0c;我来告诉你。 它在系统中的位置是 /System/Library/CoreServices/CoreTypes.bundle/Contents/Resources/如何打开这个位置&am…

计算机网络物理层思维导图+大纲笔记

大纲笔记&#xff1a; 物理层的基本概念 解决如何在连接各种计算机的传输媒体上传输数据比特流&#xff0c;而不是具体的传输媒体 主要任务 确定与传输媒体接口有关的一些特性 机械特性 电气特性 功能特性 规程特性信道上传送的信号 基带信号 来自信源的信号&#xff0c;直接表…

【CLI命令行接口和Java连接openLooKeng查询数据 】

CLI命令行接口和Java连接openLooKeng查询数据 一、摘要二、正文0. 环境说明1. CLI命令行工具的使用2. Java API 的使用三、小结一、摘要 通过CLI命令行接口工具连接openLooKeng,可帮助初学者能够使用SQL语句的方式快速操作openLooKeng,任何只要熟悉SQL的人都可以快速切换到op…

解决 uniapp uni.getLocation 定位经纬度不准问题

【问题描述】 直接使用uni.getLocation获取经纬度不准确&#xff0c;有几百米的偏移。 【解决办法】 加偏移量 //加偏移 let x longitude let y latitude let x_pi (3.14159265358979324 * 3000.0) / 180.0 let z Math.sqrt(x * x y * y) 0.00002 * Math.sin(y * x_pi)…

ArcGIS Pro专题地图系列教程

专题地图系列是ArcGIS Pro3.2的新功能。之前&#xff0c;如果要做8张相同区域的专题图&#xff0c;可能需要新建8个布局&#xff0c;分别进行排版&#xff0c;再导出。现在&#xff0c;一幅地图&#xff0c;一个布局&#xff0c;就可以完成这个流程。 原理是&#xff0c;根据单…

Swift-24-集合对象

概述 在了解正式内容之前可以先回顾下objectiveC中提供的集合特性。 它的特点是&#xff0c;拿NSArray举例&#xff0c;包含NSArray 和 NSMutableArray两个API&#xff0c;前者是不可变数组&#xff0c;一旦创建其值和数量就不能改变了&#xff1b;NSMutableArray是可变数组&…

tableau基础学习——添加标靶图、甘特图、瀑布图

标靶图 添加参考线 添加参考分布 甘特图 创建新的字段 如设置延迟天数****计划交货日期-实际交货日期 为正代表提前交货&#xff0c;负则代表延迟交货 步骤&#xff1a;创建——计算新字段 把延迟天数放在颜色、大小里面就可以 瀑布图 两个表按照地区连接 先做个条形图&…

工业4.0的基石:探索工业级光模块的力量

引言 工业4.0代表着智能制造的新时代&#xff0c;而工业级光模块则是这一革命性转变的基石。这些高科技组件不仅是现代通信网络的核心&#xff0c;更是连接智能工厂、智慧城市和远程服务的关键。本文将深入探讨工业级光模块的技术特性、应用领域以及它们如何塑造未来工业的面貌…

公司网页制作需要多少钱

公司网页制作需要多少钱&#xff1f;这是一个非常常见的问题。答案取决于您需要的功能和设计。一些小型企业网站可能只需要一些基本的功能&#xff0c;花费可能低至几百美元&#xff0c;而一些大型企业网站可能需要高级功能和设计&#xff0c;可能需要几万美元。 以下是一些考虑…

js如何获取对象的属性值

获取对象的属性值&#xff0c;有两种方式。 方式一&#xff1a; 对象.属性名 let obj {name:张三,age:23 }; console.log(obj.name); //张三方式二&#xff1a; 对象[属性名] let obj {name:张三,age:23 }; console.log(obj[name]); //张三 两种方式有什么不同&am…