Spark SQL编程

1. Spark SQL概述

1.1 什么是Spark SQL

Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。与Spark SQL交互的方式有多种,包括SQL和Dataset API。计算结果时,使用相同的执行引擎,与您用于表达计算的API/语言无关。

1.2 为什么要有Spark SQL

1.3 SparkSQL的发展

1)发展历史

RDD(Spark1.0)=》Dataframe(Spark1.3)=》Dataset(Spark1.6)

如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同的他们的执行效率和执行方式在现在的版本中,dataSet性能最好,已经成为了唯一使用的接口。其中Dataframe已经在底层被看做是特殊泛型的DataSet<Row>。

2)三者的共性

(1)RDD、DataFrame、DataSet全都是Spark平台下的分布式弹性数据集,为处理超大型数据提供便利

(2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action行动算子如foreach时,三者才会开始遍历运算

(3)三者有许多共同的函数,如filter,排序等

(4)三者都会根据Spark的内存情况自动缓存运算

(5)三者都有分区的概念

1.4 Spark SQL的特点

1)易整合

无缝的整合了SQL查询和Spark编程。

2)统一的数据访问方式

使用相同的方式连接不同的数据源。

3)兼容Hive

在已有的仓库上直接运行SQL或者HQL。

4)标准的数据连接

通过JDBC或者ODBC来连接

1.5 SparkSession新的起始点

在老的版本中,SparkSQL提供两种SQL查询起始点:

  • 一个叫SQLContext,用于Spark自己提供的SQL查询;
  • 一个叫HiveContext,用于连接Hive的查询。

SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。

SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。当我们使用spark-shell的时候,Spark框架会自动的创建一个名称叫做Spark的SparkSession,就像我们以前可以自动获取到一个sc来表示SparkContext。

[atguigu@hadoop102 spark-local]$ bin/spark-shell

20/09/12 11:16:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Spark context Web UI available at http://hadoop102:4040

Spark context available as 'sc' (master = local[*], app id = local-1599880621394).

Spark session available as 'spark'.

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 3.3.1

      /_/

Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_212)

Type in expressions to have them evaluated.

Type :help for more information.

2 常用方式

2.1 方法调用

1)创建一个maven工程SparkSQL

2)创建包名为com.atguigu.sparksql

3)输入文件夹准备:在新建的SparkSQL项目名称上右键=》新建input文件夹=》在input文件夹上右键=》新建user.json。并输入如下内容:

{"age":20,"name":"qiaofeng"}{"age":19,"name":"xuzhu"}{"age":18,"name":"duanyu"}

{"age":22,"name":"qiaofeng"}

{"age":11,"name":"xuzhu"}

{"age":12,"name":"duanyu"}

5)在pom.xml文件中添加spark-sql的依赖

<dependencies>

    <dependency>

       <groupId>org.apache.spark</groupId>

       <artifactId>spark-sql_2.12</artifactId>

       <version>3.3.1</version>

    </dependency>

    <dependency>

       <groupId>org.projectlombok</groupId>

       <artifactId>lombok</artifactId>

       <version>1.18.22</version>

    </dependency>

</dependencies>

6)代码实现

添加javaBean的User

package com.atguigu.sparksql.Bean;

import lombok.Data;

import java.io.Serializable;

@Data

public class User implements Serializable {

    public Long age;

    public String name;

    public User() {

    }

    public User(Long age, String name) {

        this.age = age;

        this.name = name;

    }

}

代码编写

package com.atguigu.sparksql;

import com.atguigu.sparksql.Bean.User;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.function.MapFunction;

import org.apache.spark.api.java.function.ReduceFunction;

import org.apache.spark.sql.*;

import scala.Tuple2;

public class Test01_Method {

    public static void main(String[] args) {

        //1. 创建配置对象

        SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

        //2. 获取sparkSession

        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        //3. 编写代码

        // 按照行读取

        Dataset<Row> lineDS = spark.read().json("input/user.json");

        // 转换为类和对象

        Dataset<User> userDS = lineDS.as(Encoders.bean(User.class));

//        userDS.show();

        // 使用方法操作

        // 函数式的方法

        Dataset<User> userDataset = lineDS.map(new MapFunction<Row, User>() {

            @Override

            public User call(Row value) throws Exception {

                return new User(value.getLong(0), value.getString(1));

            }

        },

                // 使用kryo在底层会有部分算子无法使用

                Encoders.bean(User.class));

        // 常规方法

        Dataset<User> sortDS = userDataset.sort(new Column("age"));

        sortDS.show();

        // 区分

        RelationalGroupedDataset groupByDS = userDataset.groupBy("name");

        // 后续方法不同

        Dataset<Row> count = groupByDS.count();

        // 推荐使用函数式的方法  使用更灵活

        KeyValueGroupedDataset<String, User> groupedDataset = userDataset.groupByKey(new MapFunction<User, String>() {

            @Override

            public String call(User value) throws Exception {

                return value.name;

            }

        }, Encoders.STRING());

        // 聚合算子都是从groupByKey开始

        // 推荐使用reduceGroup

        Dataset<Tuple2<String, User>> result = groupedDataset.reduceGroups(new ReduceFunction<User>() {

            @Override

            public User call(User v1, User v2) throws Exception {

                // 取用户的大年龄

                return new User(Math.max(v1.age, v2.age), v1.name);

            }

        });

        result.show();

        //4. 关闭sparkSession

        spark.close();

    }

}

在sparkSql中DS直接支持的转换算子有:map(底层已经优化为mapPartition)、mapPartition、flatMap、groupByKey(聚合算子全部由groupByKey开始)、filter、distinct、coalesce、repartition、sort和orderBy(不是函数式的算子,不过不影响使用)。

2.2 SQL使用方式

package com.atguigu.sparksql;

import org.apache.spark.SparkConf;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

public class Test02_SQL {

    public static void main(String[] args) {

        //1. 创建配置对象

        SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

        //2. 获取sparkSession

        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        //3. 编写代码

        Dataset<Row> lineDS = spark.read().json("input/user.json");

        // 创建视图 => 转换为表格 填写表名

        // 临时视图的生命周期和当前的sparkSession绑定

        // orReplace表示覆盖之前相同名称的视图

        lineDS.createOrReplaceTempView("t1");

        // 支持所有的hive sql语法,并且会使用spark的又花钱

        Dataset<Row> result = spark.sql("select * from t1 where age > 18");

        result.show();

        //4. 关闭sparkSession

        spark.close();

    }

}}

2.3 DSL特殊语法(扩展)

package com.atguigu.sparksql;

import org.apache.spark.SparkConf;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

import static org.apache.spark.sql.functions.col;

public class Test03_DSL {

    public static void main(String[] args) {

        //1. 创建配置对象

        SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

        //2. 获取sparkSession

        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        //3. 编写代码

        // 导入特殊的依赖 import static org.apache.spark.sql.functions.col;

        Dataset<Row> lineRDD = spark.read().json("input/user.json");

        Dataset<Row> result = lineRDD.select(col("name").as("newName"),col("age").plus(1).as("newAge"))

                .filter(col("age").gt(18));

        result.show();

        //4. 关闭sparkSession

        spark.close();

    }

}

3 SQL语法的用户自定义函数

3.1 UDF 用户自定义函数

1)UDF:一行进入,一行出

2)代码实现

package com.atguigu.sparksql;

import org.apache.spark.SparkConf;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.api.java.UDF1;

import org.apache.spark.sql.expressions.UserDefinedFunction;

import org.apache.spark.sql.types.DataTypes;

import static org.apache.spark.sql.functions.udf;

public class Test04_UDF {

    public static void main(String[] args) {

        //1. 创建配置对象

        SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

        //2. 获取sparkSession

        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        //3. 编写代码

        Dataset<Row> lineRDD = spark.read().json("input/user.json");

        lineRDD.createOrReplaceTempView("user");

        // 定义一个函数

        // 需要首先导入依赖import static org.apache.spark.sql.functions.udf;

        UserDefinedFunction addName = udf(new UDF1<String, String>() {

            @Override

            public String call(String s) throws Exception {

                return s + " 大侠";

            }

        }, DataTypes.StringType);

        spark.udf().register("addName",addName);

        spark.sql("select addName(name) newName from user")

                .show();

        // lambda表达式写法

        spark.udf().register("addName1",(UDF1<String,String>) name -> name + " 大侠",DataTypes.StringType);

        //4. 关闭sparkSession

        spark.close();

    }

}

3.2 UDAF 用户自定义聚合函数

1)UDAF:输入多行,返回一行。通常和groupBy一起使用,如果直接使用UDAF函数,默认将所有的数据合并在一起。

2)Spark3.x推荐使用extends Aggregator自定义UDAF,属于强类型的Dataset方式。

3)Spark2.x使用extends UserDefinedAggregateFunction,属于弱类型的DataFrame

4)案例实操

需求:实现求平均年龄,自定义UDAFMyAvg(age)

(1)自定义聚合函数实现-强类型

package com.atguigu.sparksql;

import org.apache.spark.SparkConf;

import org.apache.spark.sql.Encoder;

import org.apache.spark.sql.Encoders;

import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.expressions.Aggregator;

import java.io.Serializable;

import static org.apache.spark.sql.functions.udaf;

public class Test05_UDAF {

    public static void main(String[] args) {

        //1. 创建配置对象

        SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

        //2. 获取sparkSession

        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        //3. 编写代码

        spark.read().json("input/user.json").createOrReplaceTempView("user");

        // 注册需要导入依赖 import static org.apache.spark.sql.functions.udaf;

        spark.udf().register("avgAge",udaf(new MyAvg(),Encoders.LONG()));

        spark.sql("select avgAge(age) newAge from user").show();

        //4. 关闭sparkSession

        spark.close();

    }

    public static class Buffer implements Serializable {

        private Long sum;

        private Long count;

        public Buffer() {

        }

        public Buffer(Long sum, Long count) {

            this.sum = sum;

            this.count = count;

        }

        public Long getSum() {

            return sum;

        }

        public void setSum(Long sum) {

            this.sum = sum;

        }

        public Long getCount() {

            return count;

        }

        public void setCount(Long count) {

            this.count = count;

        }

    }

    public static class MyAvg extends Aggregator<Long,Buffer,Double>{

        @Override

        public Buffer zero() {

            return new Buffer(0L,0L);

        }

        @Override

        public Buffer reduce(Buffer b, Long a) {

            b.setSum(b.getSum() + a);

            b.setCount(b.getCount() + 1);

            return b;

        }

        @Override

        public Buffer merge(Buffer b1, Buffer b2) {

            b1.setSum(b1.getSum() + b2.getSum());

            b1.setCount(b1.getCount() + b2.getCount());

            return b1;

        }

        @Override

        public Double finish(Buffer reduction) {

            return reduction.getSum().doubleValue() / reduction.getCount();

        }

        @Override

        public Encoder<Buffer> bufferEncoder() {

            // 可以用kryo进行优化

            return Encoders.kryo(Buffer.class);

        }

        @Override

        public Encoder<Double> outputEncoder() {

            return Encoders.DOUBLE();

        }

    }

}

3.3 UDTF(没有)

输入一行,返回多行(Hive)

SparkSQL中没有UDTF,需要使用算子类型的flatMap先完成拆分。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/146304.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

qt+opengl 着色器VAO、VBO、EBO(四)

文章目录 一、顶点着色器和片段着色器代码分析1. 着色器12. 顶点着色器2 二、使用步骤1. 使用着色器12. 使用着色器23. 在着色器2中使用EBO 三、完整代码 一、顶点着色器和片段着色器代码分析 1. 着色器1 用到的坐标矩阵, 四个四边形顶点坐标 float vertices_data[36] {// 所…

PlantUML基础使用教程

环境搭建 IDEA插件下载 打开IEDA系列IDE&#xff0c;从FIle–>Settings–>Plugins–>Marketplace 进入到插件下载界面&#xff0c;搜索PlantUML&#xff0c;安装PlantUML Integration和PlantUML Parser两个插件&#xff0c;并重启IDE 安装和配置Graphviz 进入官网…

C/C++轻量级并发TCP服务器框架Zinx-框架开发001: 读取标准输入,回显到标准输出

文章目录 完整代码实现参考-非项目使用项目使用的代码 - 乱-但是思路与上面的相同创建Kernel类添加删除修改epoll&#xff0c;才能写run方法创建stdin_Channel类在Kernel类中实现run方法 完整代码实现参考-非项目使用 #include <errno.h> #include <signal.h> #in…

蓝桥杯每日一题2023.11.14

题目描述 题目分析 此题目的最终目标是将字母都填上数使等式符合条件&#xff0c;实际我们发现可以使用搜索将所有符合条件的进行判断&#xff08;答案&#xff1a;29&#xff09; 由于小数可能会出现错误故我们将其进行简单变化进行搜索 #include<bits/stdc.h> using…

No207.精选前端面试题,享受每天的挑战和学习

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云课上架的前后端实战课程《Vue.js 和 Egg.js 开发企业级健康管理项目》、《带你从入…

Pytorch自动混合精度的计算:torch.cuda.amp.autocast

1 autocast介绍 1.1 什么是AMP? 默认情况下&#xff0c;大多数深度学习框架都采用32位浮点算法进行训练。2017年&#xff0c;NVIDIA研究了一种用于混合精度训练的方法&#xff0c;该方法在训练网络时将单精度&#xff08;FP32&#xff09;与半精度(FP16)结合在一起&#xff…

Dart利用私有构造函数_()创建单例模式

文章目录 类的构造函数_()函数dart中构造函数定义 类的构造函数 类的构造函数有两种&#xff1a; 1&#xff09;默认构造函数&#xff1a; 当实例化对象的时候&#xff0c;会自动调用的函数&#xff0c;构造函数的名称和类的名称相同&#xff0c;在一个类中默认构造函数只能由…

post 和get参数 请求

json参数 post请求格式 RestController public class HelloController { //json参数 post 请求RequestMapping("/jsonParam")public String jsonParam(RequestBody User user){System.out.println(user);return "OK";} } postman 接口测试工具…

spring cloud alibaba 简介

微服务搭建组件选型 1.服务注册中心 Nacos(spring-cloud-alibaba) 2.服务通信 OpenFeign(spring-cloud) 3.服务熔断、降级、限流 Sentinel(spring-cloud-alibaba) 4.网关 Gateway(spring-cloud) 5.服务配置中心 …

MySQL被攻击后创建数据库报错1044 - Access denied for user ‘root‘@‘%‘ to database ‘xxx‘

MySQL被攻击后创建数据库报错1044 - Access denied for user root% to database xxx 一、问题二、解决过程1、正常过程2、踩坑&#xff08;已经解决问题的可以不看&#xff09; 一、问题 最近数据库被攻击了&#xff0c;业务数据库都没了 还好也不是有重要数据&#xff0c;但再…

【HUST】网安纳米|2023年研究生纳米技术考试参考

目录 1 纳米材料是什么 2 纳米材料的结构特性 3 纳米结构的其他特性 4 纳米结构的检测技术 5 纳米材料的应用 打印建议&#xff1a;PPT彩印&#xff08;这样重点比较突出&#xff09;&#xff0c;每面12张PPT&#xff0c;简单做一下关键词目录&#xff0c;亲测可以看清。如…

Uniapp开发 购物商城源码 在线电商商城源码 适配移动终端项目及各小程序

lilishop电商商城系统 商城移动端&#xff0c;使用Uniapp开发&#xff0c;可编译为所有移动终端项目及各小程序 源码下载&#xff1a;https://download.csdn.net/download/m0_66047725/88487579 源码下载2&#xff1a;关注我留言

02 # 类型基础:强类型与弱类型

宽泛的定义 在强类型语言中&#xff0c;当一个对象从调用函数传递到被调用函数时&#xff0c;其类型必须与被调用函数中声明的类型兼容 – Liskov, Zilles 1974 通俗定义 强类型语言不允许改变变量的数据类型&#xff0c;除非进行强制类型转换 比如下面 Java 里不能将布尔类…

最小二乘法及参数辨识

文章目录 一、最小二乘法1.1 定义1.2 SISO系统运用最小二乘估计进行辨识1.3 几何解释1.4 最小二乘法性质 二、加权最小二乘法三、递推最小二乘法四、增广最小二乘法 一、最小二乘法 1.1 定义 1974年高斯提出的最小二乘法的基本原理是未知量的最可能值是使各项实际观测值和计算…

〖大前端 - 基础入门三大核心之JS篇㉟〗- JavaScript 的DOM简介

说明&#xff1a;该文属于 大前端全栈架构白宝书专栏&#xff0c;目前阶段免费&#xff0c;如需要项目实战或者是体系化资源&#xff0c;文末名片加V&#xff01;作者&#xff1a;不渴望力量的哈士奇(哈哥)&#xff0c;十余年工作经验, 从事过全栈研发、产品经理等工作&#xf…

基于Python实现汽车销售数据可视化【500010086】

导入模块 import numpy as np import pandas as pd import plotly.graph_objects as go import plotly.express as px获取数据 df1 pd.read_excel(r"./data/中国汽车总体销量.xlsx") print(df1.head(5))df1.info()df1[年份] df1[时间].dt.year df1[月份] df1[时…

科研学习|科研软件——有序多分类Logistic回归的SPSS教程!

一、问题与数据 研究者想调查人们对“本国税收过高”的赞同程度&#xff1a;Strongly Disagree——非常不同意&#xff0c;用“0”表示&#xff1b;Disagree——不同意&#xff0c;用“1”表示&#xff1b;Agree--同意&#xff0c;用“2”表示&#xff1b;Strongly Agree--非常…

【VBA】基于EXCEL生成Insert语句工具

工具介绍 基于Excel生成INSERT语句工具是一个辅助工具&#xff0c;用于帮助用户根据Excel数据生成INSERT语句。通常&#xff0c;在数据库中插入大量数据时&#xff0c;手动编写INSERT语句会非常繁琐和耗时。而使用这个工具&#xff0c;可以通过Excel中的数据自动生成相应的INS…

第五章ARM处理器的嵌入式硬件系统设计——课后习题

1ARM处理器的工作状态 ARM处理器有两种工作状态。具体而言&#xff0c;ARM处理器执行32位ARM指令集时&#xff0c;工作在ARM状态&#xff0c;当ARM处理器执行16位thumb指令集时候&#xff0c;工作在thumb状态。 1ARM指令特点 1一个大的&#xff0c;统一的寄存器文件。 2基于…

nginx四层tcp负载均衡及主备、四层udp负载均衡及主备、7层http负载均衡及主备配置(wndows系统主备、负载均衡)

准备工作 服务器上安装、配置网络负载平衡管理器 windows服务器热备、负载均衡配置-CSDN博客 在windows服务器上安装vmware17 win10 上安装vmware17-CSDN博客 在windows上利用vmware17 搭建centos7 mini版 在windows上利用vmware17 搭建centos7 mini版本服务器-CSDN博客 …