头歌:SparkSQL简单使用

第1关:SparkSQL初识
 

任务描述


本关任务:编写一个sparksql基础程序。

相关知识


为了完成本关任务,你需要掌握:1. 什么是SparkSQL 2. 什么是SparkSession。  

什么是SparkSQL


Spark SQL是用来操作结构化和半结构化数据的接口。
当每条存储记录共用已知的字段集合,数据符合此条件时,Spark SQL就会使得针对这些数据的读取和查询变得更加简单高效。具体来说,Spark SQL提供了以下三大功能:
(1) Spark SQL可以从各种结构化数据源(例如JSON、Parquet等)中读取数据。

(2) Spark SQL不仅支持在Spark程序内使用SQL语句进行数据查询,也支持从类似商业智能软件Tableau这样的外部工具中通过标准数据库连接器(JDBC/ODBC)连接sparkSQL进行查询。

(3) 当在Spark程序内使用Spark SQL时,Spark SQL支持SQL与常规的Python/Java/Scala代码高度整合,包括连接RDD与SQL表、公开的自定义SQL函数接口等。

什么是SparkSession


Spark中所有功能的入口点都是SparkSession类。要创建基本的SparkSession,只需使用SparkSession.builder()。

import  org.apache.spark.sql.SparkSession;
SparkSession  spark  =  SparkSession 
                  .builder()
                  .appName("Java Spark SQL基本示例")
                  .master("local")
                  .config("spark.some.config.option" , "some-value")
                  .getOrCreate();
 //打印spark版本号
 System.out.println(spark.version());


编程要求


请仔细阅读右侧代码,根据方法内的提示,在Begin - End区域内进行代码补充,具体任务如下:

打印spark的版本号。
测试说明
补充完代码后,点击测评,平台会对你编写的代码进行测试,当你的结果与预期输出一致时,即为通过。

package com.educoder.bigData.sparksql;
 
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
 
 
public class Test1 {
	
 
	public static void main(String[] args) throws AnalysisException {
		/********* Begin *********/
		SparkSession  spark  =  SparkSession   
                  .builder()  
                  .appName("Java Spark SQL基本示例")  
                  .master("local")  
                  .config("spark.some.config.option" , "some-value")  
                  .getOrCreate();  
 //打印spark版本号  
       System.out.println(spark.version());  
		
		
		
		
		
		
		/********* End *********/
	}
 
	
 
}

第2关:Dataset创建及使用
 

任务描述


本关任务:创建Dataset并使用

相关知识


为了完成本关任务,你需要掌握:

什么是Dataset;
Dataset如何创建 ;
Dataset如何操作数据。
什么是Dataset
在Spark2.0版本以后,DataFrame API将会和Dataset  API合并,统一数据处理API。故实训中的Dateset和DataFrame可看成一个概念。
Dataset和RDD一样,也是Spark的一种弹性分布式数据集,它是一个由列组成的数据集,概念上等同于关系型数据库中的一张表,但在底层具有更丰富的优化。Dataset可以从多种来源构建,例如:结构化数据文件,Hive中的表,外部数据库或现有RDD。有人肯定会问,已经有了弹性分布式数据集RDD,为什么还要引入Dataset呢?因为在Spark中,我们可以像在关系型数据库中使用SQL操作数据库表一样,使用Spark SQL操作Dataset。这让熟悉关系型数据库SQL人员也能轻松掌握。

上图直观地体现了 Dataset 和 RDD 的区别。左侧的 RDD[Person] 虽然以 Person 为类型参数,但 Spark 框架本身不了解 Person 类的内部结构。而右侧的 Dataset 却提供了详细的结构信息,使得 Spark SQL 可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。Dataset 除了提供了比 RDD 更丰富的算子以外,更重要的特点能提升执行效率、减少数据读取以及执行计划的优化等。

Dataset 上可用的操作分为转换和操作。转换是产生新 Dataset 的转换,动作是触发计算和返回结果的转换。示例转换包括map,filter,select和aggregate(groupBy)。示例操作将数据计数,显示或写入文件系统。

Dataset 是“懒惰的”,即只有在调用动作时才会触发计算。在内部,Dataset 表示描述生成数据所需计算的逻辑计划。调用操作时,Spark 的查询优化器会优化逻辑计划,并以并行和分布式方式生成有效执行的物理计划。要探索逻辑计划以及优化的物理计划,请使用explain函数。

要有效地支持特定于域的对象,需要使用编码器。编码器将域特定类型T映射到 Spark 的内部类型系统。例如,给定一个具有两个字段的  Person,name(string)和age(int),编码器用于告诉 Spark 在运行时生成代码以将 Person 对象序列化为二进制结构。该二进制结构通常具有低得多的存储器占用面积以及针对数据处理(例如,柱状格式)的效率进行优化。要了解数据的内部二进制表示,请使用模式函数。

Dataset如何创建
通常有两种方法来创建Dataset。最常见的方法是使用SparkSession上提供的读取功能将Spark指向存储系统上的某些文件。

//创建泛型的Dataset
Dataset<Row> df = spark.read().json("people.json");
//创建Person类型的Dataset
   Dataset<Person> people = spark.read().json("people.json").as(Encoders.bean(Person.class));
//以表格形式显示前20行Dataset
df.show();
people.show();


也可以通过现有数据集上的转换来创建Dataset。 例如,以下内容通过对现有数据集应用过滤器来创建新Dataset:

   Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING));


Dataset如何操作数据
Dataset操作数据有两种方式:API方式处理数据和以编程方式处理数据。

API方式处理数据


Dataset操作也可以通过以下定义的各种特定于域的语言(DSL)函数进行无类型操作:Dataset(类),列和函数。 这些操作与R或Python中的数据框抽象中可用的操作非常相似。
要从数据集中选择列,请在在Java中使用col 。

  

 Column ageCol = people.col("age");


请注意,Column类型也可以通过其各种功能进行操作。

import static org.apache.spark.sql.functions.col;
以树格式
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// 仅选择“名称”列
df.select("name").show();
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+
// 选择所有人,但将年龄增加1 
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+
// 选择年龄超过21 
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// 计数按年龄的人
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+


以编程方式处理数据
SparkSession支持让应用程序以编程方式运行SQL查询并返回结果。

//读取json,并将Dataset,并注册为SQL临时视图
sparkSession.read().json("people.json").createOrReplaceTempView("people");
//以表格形式显示前20行Dataset
sparkSession.sql("select * from people").show();
// + ---- + ------- + 
// | 年龄| 名称| 
// + ---- + ------- + 
// | null | Michael | 
// | 30 | 安迪| 
// | 19 | 贾斯汀| 
// + ---- + ------- +


编程要求


根据提示,在右侧编辑器补充代码,读取people.json文件,过滤age为23的数据,并以表格形式显示前20行Dataset。

people.json文件内容如下:

{"age":21,"name":"张三", "salary":"3000"}
{"age":22,"name":"李四", "salary":"4500"}
{"age":23,"name":"王五", "salary":"7500"}

package com.educoder.bigData.sparksql;

import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.SparkSession;


public class Test2 {


	public static void main(String[] args) throws AnalysisException {
		
		SparkSession  spark  =  SparkSession 
				  .builder()
				  .appName("test1")
				  .master("local")
				  .config("spark.some.config.option" , "some-value")
				  .getOrCreate();
		/********* Begin *********/
		spark.read().json("people.json").createOrReplaceTempView("people");  

spark.sql("select * from people where age != '23'").show();
		
		
		
		
		
		/********* End *********/
	}

	

}

第3关:Dataset自定义函数

 

任务描述


本关任务:编写Dataset自定义函数。

相关知识


为了完成本关任务,你需要掌握:

UserDefinedAggregateFunction介绍;
如何使用。
UserDefinedAggregateFunction
UserDefinedAggregateFunction是实现用户定义的聚合函数基础类,用户实现自定义无类型聚合函数必须扩展UserDefinedAggregateFunction 抽象类,相关方法如下:

方法及方法返回    描述


StructType bufferSchema()    StructType表示聚合缓冲区中值的数据类型。
DataType dataType()    UserDefinedAggregateFunction的返回值的数据类型
boolean deterministic()    如果此函数是确定性的,则返回true
Object evaluate(Row buffer)    根据给定的聚合缓冲区计算此UserDefinedAggregateFunction的最终结果
void initialize(MutableAggregationBuffer buffer)    初始化给定的聚合缓冲区
StructType inputSchema()    StructType表示此聚合函数的输入参数的数据类型。
void merge(MutableAggregationBuffer buffer1, Row buffer2)    合并两个聚合缓冲区并将更新的缓冲区值存储回buffer1
void update(MutableAggregationBuffer buffer, Row input)    使用来自输入的新输入数据更新给定的聚合缓冲区
如何使用
我们以计算员工薪水平均值的例子来说:
首先在用户自定义函数的构造函数中,定义聚合函数的输入参数的数据类型和聚合缓冲区中值的数据类型。

//定义员工薪水的输入参数类型为LongType
List<StructField> inputFields = new ArrayList<StructField>();
inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
inputSchema = DataTypes.createStructType(inputFields);
//定义员工薪水总数、员工个数的参数类型
List<StructField> bufferFields = new ArrayList<StructField>();
bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
bufferSchema = DataTypes.createStructType(bufferFields);


对聚合缓冲区中值设置初始值。

@Override
    public void initialize(MutableAggregationBuffer buffer) {
        // TODO Auto-generated method stub
        buffer.update(0, 0L);
        buffer.update(1, 0L);
    }


把自定义函数的输入薪水数据转化为定义的聚合缓冲区的值(薪水总数、员工个数),并更新。

@Override
public void update(MutableAggregationBuffer buffer, Row input) {
    if (!input.isNullAt(0)) {
        long updatedSum = buffer.getLong(0) + input.getLong(0);
        long updatedCount = buffer.getLong(1) + 1;
        buffer.update(0, updatedSum);
        buffer.update(1, updatedCount);
    }
}


把多个聚合缓冲区的值进行合并。

@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
    // TODO Auto-generated method stub
    long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
    long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
    buffer1.update(0, mergedSum);
    buffer1.update(1, mergedCount);
}


最后通过聚合缓冲区的值计算输出结果。

@Override
public Object evaluate(Row buffer) {
    // TODO Auto-generated method stub
     return ((double) buffer.getLong(0)) / buffer.getLong(1);
}


就此自定义函数就开发完了,通过SparkSession的udf()方法会返回注册用户定义函数的方法集合UDFRegistration
通过UDFRegistration调用register方法进行自定义函数注册,使用如下:

// 注册自定义函数myAverage
spark.udf().register("myAverage", new MyAverage());
//读取json文件
spark.read().json("people.json").createOrReplaceTempView("people");
//使用自定义函数计算薪水平均值
spark.sql("SELECT myAverage(salary) as average_salary FROM people").show();
// +--------------+
// |average_salary|
// +--------------+
// |        5000|
// +--------------+


编程要求


请仔细阅读右侧代码,根据方法内的提示,在Begin - End区域内进行代码补充,编写自定义函数类MyAverage,用来计算用户薪水平均值,平台已提供了最后的实现:

spark.udf().register("myAverage", new MyAverage());
spark.read().json("people.json").createOrReplaceTempView("people");
spark.sql("SELECT myAverage(salary) as average_salary FROM people").show();

测试说明


补充完代码后,点击测评,平台会对你编写的代码进行测试,当你的结果与预期输出一致时,即为通过。

package com.educoder.bigData.sparksql;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class MyAverage extends UserDefinedAggregateFunction {
private static final long serialVersionUID = 1L;
private StructType inputSchema;
private StructType bufferSchema;
public MyAverage() {
List<StructField> inputFields = new ArrayList<StructField>();
inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
inputSchema = DataTypes.createStructType(inputFields);
List<StructField> bufferFields = new ArrayList<StructField>();
bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
bufferSchema = DataTypes.createStructType(bufferFields);
}
@Override
public StructType bufferSchema() {
// TODO Auto-generated method stub
return bufferSchema;
}
@Override
public DataType dataType() {
// TODO Auto-generated method stub
return DataTypes.DoubleType;
}
@Override
public boolean deterministic() {
// TODO Auto-generated method stub
return true;
}
@Override
public Object evaluate(Row buffer) {
// TODO Auto-generated method stub
return ((double) buffer.getLong(0)) / buffer.getLong(1);
}
@Override
public void initialize(MutableAggregationBuffer buffer) {
// TODO Auto-generated method stub
buffer.update(0, 0L);
buffer.update(1, 0L);
}
@Override
public StructType inputSchema() {
// TODO Auto-generated method stub
return inputSchema;
}
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
// TODO Auto-generated method stub
long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
buffer1.update(0, mergedSum);
buffer1.update(1, mergedCount);
}
@Override
public void update(MutableAggregationBuffer buffer, Row input) {
if (!input.isNullAt(0)) {
long updatedSum = buffer.getLong(0) + input.getLong(0);
long updatedCount = buffer.getLong(1) + 1;
buffer.update(0, updatedSum);
buffer.update(1, updatedCount);
}
}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/587888.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【深耕 Python】Data Science with Python 数据科学(18)Scikit-learn机器学习(三)

写在前面 关于数据科学环境的建立&#xff0c;可以参考我的博客&#xff1a; 【深耕 Python】Data Science with Python 数据科学&#xff08;1&#xff09;环境搭建 往期数据科学博文一览&#xff1a; 【深耕 Python】Data Science with Python 数据科学&#xff08;2&…

2024五一杯数学建模C题思路分享 - 煤矿深部开采冲击地压危险预测

文章目录 1 赛题选题分析 2 解题思路2.1 问题重述2.2 第一问完整思路2.2 二、三问思路更新 3 最新思路更新 1 赛题 C题 煤矿深部开采冲击地压危险预测 煤炭是中国的主要能源和重要的工业原料。然而&#xff0c;随着开采深度的增加&#xff0c;地应力增大&#xff0c;井下煤岩动…

搜索引擎的设计与实现参考论文(论文 + 源码)

【免费】搜索引擎的设计与实现.zip资源-CSDN文库https://download.csdn.net/download/JW_559/89249705?spm1001.2014.3001.5501 搜索引擎的设计与实现 摘要&#xff1a; 我们处在一个大数据的时代&#xff0c;伴随着网络信息资源的庞大&#xff0c;人们越来越多地注重怎样才能…

汽车车灯的材料是什么?汽车车灯的灯罩如果破损破裂破洞了要怎么修复?

汽车车灯的材料主要包括灯罩和灯底座两部分&#xff0c;它们所使用的材料各不相同。 车灯罩的材料主要是透明且具有良好耐热性和耐紫外线性能的塑料。其中&#xff0c;聚碳酸酯&#xff08;PC&#xff09;是一种常用的材料&#xff0c;它具有高抗冲击性、耐化学品腐蚀和优良的…

Pandas入门篇(二)-------Dataframe篇4(进阶)(Dataframe的进阶用法)(机器学习前置技术栈)

目录 概述一、复合索引&#xff08;一&#xff09;创建具有复合索引的 DataFrame1. 使用 set_index 方法&#xff1a;2.在创建 DataFrame 时直接指定索引&#xff1a; &#xff08;二&#xff09;使用复合索引进行数据选择和切片&#xff08;三&#xff09;重置索引&#xff08…

Spring Cloud Kubernetes 本地开发环境调试

一、Spring Cloud Kubernetes 本地开发环境调试 上面文章使用 Spring Cloud Kubernetes 在 k8s 环境中实现了服务注册发现、服务动态配置&#xff0c;但是需要放在 k8s 环境中才能正常使用&#xff0c;在本地开发环境中可能没有 k8s 环境&#xff0c;如何本地开发调试呢&#…

1. 深度学习笔记--神经网络中常见的激活函数

1. 介绍 每个激活函数的输入都是一个数字&#xff0c;然后对其进行某种固定的数学操作。激活函数给神经元引入了非线性因素&#xff0c;如果不用激活函数的话&#xff0c;无论神经网络有多少层&#xff0c;输出都是输入的线性组合。激活函数的意义在于它能够引入非线性特性&am…

小程序wx.getlocation接口如何开通?

小程序地理位置接口有什么功能&#xff1f; 随着小程序生态的发展&#xff0c;越来越多的小程序开发者会通过官方提供的自带接口来给用户提供便捷的服务。但是当涉及到地理位置接口时&#xff0c;却经常遇到申请驳回的问题&#xff0c;反复修改也无法通过&#xff0c;给的理由…

计算机网络chapter1——家庭作业

文章目录 复习题1.1节&#xff08;1&#xff09; “主机”和“端系统”之间有何不同&#xff1f;列举几种不同类型的端系统。web服务器是一种端系统吗&#xff1f;&#xff08;2&#xff09;协议一词常用来用来描述外交关系&#xff0c;维基百科是如何描述外交关系的&#xff1…

十大排序算法之->插入排序

一、插入排序 插入排序的基本思想是将一个记录插入到已经排好序的有序表中&#xff0c;从而形成一个新的、记录数增1的有序表。 排序过程&#xff1a; 1、外层循环&#xff1a;从第二个元素开始&#xff0c;依次选取未排序的元素。 2、内层循环&#xff1a;将当前选取的元素…

【UnityRPG游戏制作】Unity_RPG项目_玩家逻辑相关

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;就业…

Typescript精进:前端必备的5大技巧(AI写作)

首先&#xff0c;这篇文章是基于笔尖AI写作进行文章创作的&#xff0c;喜欢的宝子&#xff0c;也可以去体验下&#xff0c;解放双手&#xff0c;上班直接摸鱼~ 按照惯例&#xff0c;先介绍下这款笔尖AI写作&#xff0c;宝子也可以直接下滑跳过看正文~ 笔尖Ai写作&#xff1a;…

通过自然语言处理执行特定任务的AI Agents;大模型控制NPC执行一系列的动作;个人化的电子邮件助手Panza

✨ 1: OpenAgents 通过自然语言处理执行特定任务的AI代理 OpenAgents是一个开放平台&#xff0c;旨在使语言代理&#xff08;即通过自然语言处理执行特定任务的AI代理&#xff09;的使用和托管变得更加便捷和实用。它特别适合于日常生活中对数据分析、工具插件获取和网络浏览…

【Mac】Mac安装软件常见问题解决办法

前言 刚开始用Mac系统的小伙伴或者在更新系统版本后运行App的朋友会经常碰到弹窗提示「xxx已损坏&#xff0c;无法打开&#xff0c;您应该将它移到废纸篓」、「打不开xxx&#xff0c;因为Apple无法检查其是否包含恶意软件」、「打不开xxx&#xff0c;因为它来自身份不明的开发…

Pandas入门篇(三)-------数据可视化篇3(seaborn篇)(pandas完结撒花!!!)

目录 概述一、语法二、常用单变量绘图1. 直方图&#xff08;histplot&#xff09;2. 核密度预估图&#xff08;kdeplot&#xff09;3. 计数柱状图&#xff08;countplot&#xff09; 三、常用多变量绘图1.散点图(1) scatterplot(2)regplot 散点图拟合回归线(3)jointplot 散点图…

【Spring 】Spring MVC 入门Ⅱ

Spring MVC 入门Ⅱ 一、接收Cookie / Session 这两者都是用来保存用户信息的&#xff0c;但不同的是&#xff1a; Cookie存在客户端 Session存在服务器 Session产生时会生成一个唯一性的SessionID&#xff0c;这个SessionID可以用于匹配Session和Cookie SessionID可以在Cooki…

【kettle003】kettle访问SQL Server数据库并处理数据至execl文件

一直以来想写下基于kettle的系列文章&#xff0c;作为较火的数据ETL工具&#xff0c;也是日常项目开发中常用的一款工具&#xff0c;最近刚好挤时间梳理、总结下这块儿的知识体系。 熟悉、梳理、总结下Microsoft SQL Server 2022关系数据库相关知识体系 3.欢迎批评指正&#xf…

ChatGPT 记忆功能上线 能记住你和GPT互动的所有内容

你和ChatGPT的互动从今天开始变得更加智能&#xff01;ChatGPT现在可以记住你的偏好和对话细节&#xff0c;为你提供更加相关的回应。和它聊天&#xff0c;你可以教它记住新的东西&#xff0c;例如&#xff1a;“记住我是素食主义者&#xff0c;当你推荐食谱时。”想了解ChatGP…

吴恩达机器学习笔记:第 9 周-15 异常检测(Anomaly Detection) 15.1-15.2

目录 第 9 周 15、 异常检测(Anomaly Detection)15.1 问题的动机15.2 高斯分布 第 9 周 15、 异常检测(Anomaly Detection) 15.1 问题的动机 在接下来的一系列视频中&#xff0c;我将向大家介绍异常检测(Anomaly detection)问题。这是机器学习算法的一个常见应用。这种算法的…

Qwen-Audio:推动通用音频理解的统一大规模音频-语言模型(开源)

随着人工智能技术的不断进步&#xff0c;音频语言模型&#xff08;Audio-Language Models&#xff09;在人机交互领域变得越来越重要。然而&#xff0c;由于缺乏能够处理多样化音频类型和任务的预训练模型&#xff0c;该领域的进展受到了限制。为了克服这一挑战&#xff0c;研究…