day07_Spark SQL

文章目录

  • day07_Spark SQL课程笔记
    • 一、今日课程内容
    • 二、Spark SQL函数定义(掌握)
      • 1、窗口函数
      • 2、自定义函数背景
        • 2.1 回顾函数分类标准:
          • SQL最开始是_内置函数&自定义函数_两种
        • 2.2 自定义函数背景
      • 3、Spark原生自定义UDF函数
        • 3.1 自定义函数流程:
        • 3.2 自定义演示一:
        • 3.3 自定义演示二:
        • 3.4 自定义演示三:
    • 4、Pandas的自定义函数
        • 4.1 Apache Arrow框架
        • 4.2 基于Arrow完成Pandas和Spark的DataFrame互转
        • 4.3 基于Pandas自定义函数
          • 4.3.1 自定义函数流程
          • 4.3.2 自定义UDF函数
          • 4.3.3 自定义UDAF函数
    • 三、Spark on Hive(操作)
      • 1、集成原理
      • 2、集成环境配置
      • 3、启动metastore服务
      • 4、SparkOnHive操作
        • 4.1 黑窗口测试spark-sql
        • 4.2 python代码测试spark-sql
    • 四、SparkSQL的分布式执行引擎(了解)
      • 1、启动Thrift服务
      • 2、beeline连接Thrift服务
      • 3、开发工具连接Thrift服务
      • 4、控制台编写SQL代码
    • 五、Spark SQL的运行机制(掌握)
      • 5.1 **Catalyst**内部具体的执行流程:
      • **为什么 SparkSQL 的执行流程就像是“从 SQL 语句到结果的流水线”?**
      • **实际意义**
      • 5.2 SparkSQL的执行流程总结:
  • 01_spark原生自定义UDF函数_返回字符串.py
    • 结果
  • 02_spark原生自定义UDF函数_返回列表.py
    • 结果
  • 03_spark原生自定义UDF函数_返回字典.py
    • 结果
  • 04_sparkSQL和pandas中df对象互转操作.py
  • 05_spark基于pandas定义udf函数_s到s.py
  • 06_spark基于pandas定义udaf函数_s到标量.py
  • 07_spark_sql操作数据库.py

day07_Spark SQL课程笔记

在这里插入图片描述

一、今日课程内容

  • 1- Spark SQL函数定义(掌握)
  • 2- Spark On Hive(操作)
  • 3- Spark SQL的分布式执行引擎(了解)
  • 4- Spark SQL的运行机制(掌握)

今日目的:掌握Spark SQL函数定义的两种方式;理解->掌握Spark SQL的运行机制

二、Spark SQL函数定义(掌握)

1、窗口函数

回顾之前学习过的窗口函数:

分析函数 over(partition by xxx order by xxx [asc|desc] [rows between xxx and xxx])

分析函数可以大致分成如下3类:
1- 第一类: 聚合函数 sum() count() avg() max() min()
2- 第二类: 排序函数 row_number() rank() dense_rank() 
3- 第三类: 其他函数 ntile()  first_value() last_value() lead() lag() 

三个排序函数的区别?
row_number(): 巧记 1234  特点: 唯一且连续
rank(): 巧记 1224 特点: 并列不连续
dense_rank(): 巧记 1223  特点: 并列且连续

在Spark SQL中使用窗口函数案例:

已知数据如下:

cookie1,2018-04-10,1
cookie1,2018-04-11,5
cookie1,2018-04-12,7
cookie1,2018-04-13,3
cookie1,2018-04-14,2
cookie1,2018-04-15,4
cookie1,2018-04-16,4
cookie2,2018-04-10,2
cookie2,2018-04-11,3
cookie2,2018-04-12,5
cookie2,2018-04-13,6
cookie2,2018-04-14,3
cookie2,2018-04-15,9
cookie2,2018-04-16,7

需求: 要求找出每个cookie中pv排在前3位的数据,也就是分组取TOPN问题

# 导包
import os
from pyspark.sql import SparkSession,functions as F,Window as W

# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()

    # 2.数据输入
    df = spark.read.csv(
        path='file:///export/data/spark_project/spark_sql/data/cookie.txt',
        sep=',',
        schema='cookie string,datestr string,pv int'
    )
    # 3.数据处理(切分,转换,分组聚合)
    # 4.数据输出
    etldf = df.dropDuplicates().dropna()
    # SQL方式
    etldf.createTempView('cookie_logs')
    spark.sql(
        """
        select cookie,datestr,pv
        from (
           select cookie,datestr,pv,
              dense_rank() over(partition by cookie order by pv desc) as rn
           from cookie_logs
        ) temp where rn <=3 
        """
    ).show()
    # DSL方式
    etldf.select(
        'cookie', 'datestr', 'pv',
        F.dense_rank().over( W.partitionBy('cookie').orderBy(F.desc('pv')) ).alias('rn')
    ).where('rn <=3').select('cookie', 'datestr', 'pv').show()


    # 5.关闭资源
    spark.stop()

运行结果截图:
在这里插入图片描述

2、自定义函数背景

2.1 回顾函数分类标准:
SQL最开始是_内置函数&自定义函数_两种

SQL函数,主要分为以下三大类:

  • UDF函数:普通函数
    • 特点:一对一,输入一个得到一个
    • 例如:split() …
  • UDAF函数:聚合函数
    • 特点:多对一,输入多个得到一个
    • 例如:sum() avg() count() min() max() …
  • UDTF函数:表生成函数
    • 特点:一对多,输入一个得到多个
    • 例如:explode() …

在SQL中提供的所有的内置函数,都是属于以上三类中某一类函数

  1. 简单来说:UDF、UDAF和UDTF是Spark SQL中用于扩展SQL功能的三种自定义函数,分别像是“单兵作战”、“团队协作”和“多面手”,满足不同的数据处理需求。

  2. 具体而言

    • UDF(用户自定义函数)
      • 功能:对单行数据进行操作,输入一行输出一行。
      • 场景:适合简单的数据转换,比如将字符串转换为大写。
      • 示例spark.udf.register("to_upper", lambda x: x.upper())
    • UDAF(用户自定义聚合函数)
      • 功能:对多行数据进行聚合操作,输入多行输出一行。
      • 场景:适合复杂的聚合计算,比如自定义加权平均。
      • 示例:继承UserDefinedAggregateFunction类,实现initializeupdatemerge等方法。
    • UDTF(用户自定义表生成函数)
      • 功能:对单行数据进行操作,输入一行输出多行。
      • 场景:适合数据展开操作,比如将JSON数组拆分为多行。
      • 示例:继承GenericUDTF类,实现initializeprocessclose等方法。
  3. 实际生产场景

    • 在数据清洗中,使用UDF将日期格式统一为标准格式。
    • 在数据分析中,使用UDAF计算复杂的业务指标,如客户生命周期价值(CLV)。
    • 在数据展开中,使用UDTF将嵌套的JSON数据拆分为多行,便于后续分析。
  4. 总之:UDF、UDAF和UDTF是Spark SQL中强大的扩展工具,能够满足从简单转换到复杂聚合、数据展开的多种需求,为数据处理提供了极大的灵活性。

2.2 自定义函数背景

思考:有这么多的内置函数,为啥还需要自定义函数呢?

	为了扩充函数功能。在实际使用中,并不能保证所有的操作函数都已经提前的内置好了。很多基于业务处理的功能,其实并没有提供对应的函数,提供的函数更多是以公共功能函数。此时需要进行自定义,来扩充新的功能函数

​ 在Spark SQL中,针对Python语言,对于自定义函数,原生支持的并不是特别好。目前原生仅支持自定义UDF函数,而无法自定义UDAF函数和UDTF函数。

​ 在1.6版本后,Java 和scala语言支持自定义UDAF函数,但Python并不支持。

1- SparkSQL原生的时候,Python只能开发UDF函数
2- SparkSQL借助其他第三方组件(Arrow,pandas...),Python可以开发UDF、UDAF函数,同时也提升效率

在这里插入图片描述

Spark SQL原生UDF函数存在的问题:大量的序列化和反序列

	虽然Python支持自定义UDF函数,但是其效率并不是特别的高效。因为在使用的时候,传递一行处理一行,返回一行的方式。这样会带来非常大的序列化的开销的问题,导致原生UDF函数效率不好
	
早期解决方案: 基于Java/Scala来编写自定义UDF函数,然后基于python调用即可
	
目前主要的解决方案: 引入Arrow框架,可以基于内存来完成数据传输工作,可以大大的降低了序列化的开销,提供传输的效率,解决原生的问题。同时还可以基于pandas的自定义函数,利用pandas的函数优势完成各种处理操作

在这里插入图片描述

3、Spark原生自定义UDF函数

3.1 自定义函数流程:
第一步: 在PySpark中创建一个Python的函数,在这个函数中书写自定义的功能逻辑代码即可

第二步: 将Python函数注册到Spark SQL中
	注册方式一: udf对象 = sparkSession.udf.register(参数1,参数2,参数3)
		参数1: 【UDF函数名称】,此名称用于后续在SQL中使用,可以任意取值,但是要符合名称的规范
		参数2: 【自定义的Python函数】,表示将哪个Python的函数注册为Spark SQL的函数
		参数3: 【UDF函数的返回值类型】。用于表示当前这个Python的函数返回的类型
		udf对象: 返回值对象,是一个UDF对象,可以在DSL中使用
	
		说明: 如果通过方式一来注册函数, 【可以用在SQL和DSL】
	
	注册方式二:  udf对象 = F.udf(参数1,参数2)
		参数1: Python函数的名称,表示将那个Python的函数注册为Spark SQL的函数
		参数2: 返回值的类型。用于表示当前这个Python的函数返回的类型
		udf对象: 返回值对象,是一个UDF对象,可以在DSL中使用
		
		说明: 如果通过方式二来注册函数,【仅能用在DSL中】
		
	注册方式三:  语法糖写法  @F.udf(returnType=返回值类型)  放置到对应Python的函数上面
		说明: 实际是方式二的扩展。如果通过方式三来注册函数,【仅能用在DSL中】
	
		
第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可

3.2 自定义演示一:

需求1: 请自定义一个函数,完成对 数据 统一添加一个后缀名的操作 , 例如后缀名 ‘_itheima’

效果如下:

在这里插入图片描述

# 导包
import os
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import StringType

# 绑定指定的python解释器

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()


    # 2.数据输入
    df = spark.createDataFrame(
        data=[(1,'张三','广州'),(2,'李四','深圳')],
        schema='id int,name string,address string'
    )
    df.show()
    
    # 3.SparkSQL自定义udf函数
    # 第一步.自定义python函数
    def add_suffix(data):
        return data+'_itheima'

    # 第二步.把python函数注册到SparkSQL
    # ① spark.udf.register注册
    dsl1_add_suffix = spark.udf.register('sql_add_suffix',add_suffix,StringType())
    # ②F.udf注册
    dsl2_add_suffix = F.udf(add_suffix, StringType())
    # ③@F.udf注册
    @F.udf( StringType())
    def candy_add_suffix(data):
        return data+'_itheima'

    # 第三步.在SparkSQL中调用自定义函数
    # SQL方式
    df.createTempView('temp')
    spark.sql(
        """select id,name,sql_add_suffix(address) as new_address from temp"""
    ).show()
    
    # DSL方式
    # 调用dsl1_add_suffix
    df.select(
        'id', 'name', dsl1_add_suffix('address').alias('new_address')
    ).show()
    # 调用dsl2_add_suffix
    df.select(
        'id', 'name', dsl2_add_suffix('address').alias('new_address')
    ).show()
    # 调用candy_add_suffix
    df.select(
        'id', 'name', candy_add_suffix('address').alias('new_address')
    ).show()

    # 4.关闭资源
    spark.stop()

斌哥友情提醒: 可能遇到的问题如下

在这里插入图片描述

原因: 在错误的地方调用了错误的函数。spark.udf.register参数1取的函数名只能在SQL中使用,不能在DSL中用。
3.3 自定义演示二:

需求2: 请自定义一个函数,返回值类型为复杂类型: 列表

效果如下:

在这里插入图片描述

参考代码:

# 导包
import os
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import StringType, ArrayType

# 绑定指定的python解释器

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()


    # 2.数据输入
    df = spark.createDataFrame(
        data=[(1,'张三_广州'),(2,'李四_深圳')],
        schema='id int,name_address string'
    )
    df.show()

    # 3.SparkSQL自定义udf函数
    # 第一步.自定义python函数
    def my_split(data:str):
        list1 = data.split('_')
        return list1

    # 第二步.把python函数注册到SparkSQL
    # ① spark.udf.register注册
    dsl1_add_suffix = spark.udf.register('sql_add_suffix',my_split,ArrayType(StringType()))
    # ②F.udf注册
    dsl2_add_suffix = F.udf(my_split, ArrayType(StringType()))
    # ③@F.udf注册
    @F.udf(ArrayType(StringType()))
    def candy_add_suffix(data):
        list1 = data.split('_')
        return list1

    # 第三步.在SparkSQL中调用自定义函数
    # SQL方式
    df.createTempView('temp')
    spark.sql(
        """select id,sql_add_suffix(name_address) as new_address from temp"""
    ).show()

    # DSL方式
    # 调用dsl1_add_suffix
    df.select(
        'id',  dsl1_add_suffix('name_address').alias('new_name_address')
    ).show()
    # 调用dsl2_add_suffix
    df.select(
        'id',dsl2_add_suffix('name_address').alias('new_name_address')
    ).show()
    # 调用candy_add_suffix
    df.select(
        'id',candy_add_suffix('name_address').alias('new_name_address')
    ).show()


    # 4.关闭资源
    spark.stop()
3.4 自定义演示三:

需求3: 请自定义一个函数,返回值类型为复杂类型: 字典

效果如下:

在这里插入图片描述

注意: 注意: 如果是字典类型,StructType中列名需要和字典的key值一致,否则是null补充
# 导包
import os
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import StringType, ArrayType, StructType

# 绑定指定的python解释器

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()


    # 2.数据输入
    df = spark.createDataFrame(
        data=[(1,'张三_广州'),(2,'李四_深圳')],
        schema='id int,name_address string'
    )
    df.show()

    # 3.SparkSQL自定义udf函数
    # 第一步.自定义python函数
    def my_split(data:str):
        list1 = data.split('_')
        return {'name':list1[0],'address':list1[1]}

    # 第二步.把python函数注册到SparkSQL
    # 注意: 如果是字典类型,StructType中列名需要和字典的key值一致,否则是null
    t = StructType().add('name',StringType()).add('address',StringType())
    # ① spark.udf.register注册
    dsl1_add_suffix = spark.udf.register('sql_add_suffix',my_split,t)
    # ②F.udf注册
    dsl2_add_suffix = F.udf(my_split, t)
    # ③@F.udf注册
    @F.udf(t)
    def candy_add_suffix(data):
        list1 = data.split('_')
        return {'name':list1[0],'address':list1[1]}

    # 第三步.在SparkSQL中调用自定义函数
    # SQL方式
    df.createTempView('temp')
    spark.sql(
        """select id,sql_add_suffix(name_address) as new_name_address from temp"""
    ).show()

    # DSL方式
    # 调用dsl1_add_suffix
    df.select(
        'id', dsl1_add_suffix('name_address').alias('new_name_address')
    ).show()
    # 调用dsl2_add_suffix
    df.select(
        'id',dsl2_add_suffix('name_address').alias('new_name_address')
    ).show()
    # 调用candy_add_suffix
    df.select(
        'id',candy_add_suffix('name_address').alias('new_name_address')
    ).show()

    # 4.关闭资源
    spark.stop()

4、Pandas的自定义函数

2-如果不是3.1.2版本,那么先卸载pyspark

命令: pip uninstall pyspark

3- 再按照【Spark课程阶段_部署文档.doc】中重新安装3.1.2版本pyspark

命令: pip install -i https://pypi.tuna.tsinghua.edu.cn/simple
pyspark==3.1.2

4.1 Apache Arrow框架

​ Apache Arrow是Apache旗下的一款顶级的项目。是一个跨平台的在内存中以列式存储的数据层,它的设计目标就是作为一个跨平台的数据层,来加快大数据分析项目的运行效率

​ Pandas 与 Spark SQL 进行交互的时候,建立在Apache Arrow上,带来低开销 高性能的UDF函数

如何安装? 三个节点建议都安装

检查服务器上是否有安装pyspark
pip list | grep pyspark  或者 conda list | grep pyspark
 pip list | grep pyarrow  
如果服务器已经安装了pyspark的库,那么仅需要执行以下内容,即可安装。例如在 node1安装
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark[sql]
	
如果服务器中python环境中没有安装pyspark,建议执行以下操作,即可安装。例如在 node2 和 node3安装
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyarrow==10.0.0

在这里插入图片描述

Arrow并不会自动使用,在某些情况下,需要配置 以及在代码中需要进行小的更改才可以使用

如何使用呢? 默认不会自动启动的, 一般建议手动配置

spark.conf.set('spark.sql.execution.arrow.pyspark.enabled',True)
4.2 基于Arrow完成Pandas和Spark的DataFrame互转

Pandas中DataFrame:

DataFrame:表示一个二维表对象,就是表示整个表

字段、列、索引;Series表示一列
在这里插入图片描述

Spark SQL中DataFrame:
在这里插入图片描述

使用场景:

1- Spark的DataFrame -> Pandas的DataFrame:当大数据处理到后期的时候,可能数据量会越来越少,这样可以考虑使用单机版的Pandas来做后续数据的分析

2- Pandas的DataFrame -> Spark的DataFrame:当数据量达到单机无法高效处理的时候,或者需要和其他大数据框架集成的时候,可以转成Spark中的DataFrame

Pandas的DataFrame -> Spark的DataFrame: spark.createDataFrame(data=pandas_df)
Spark的DataFrame -> Pandas的DataFrame: init_df.toPandas()

示例:

# 导包
import os
from pyspark.sql import SparkSession

# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()

    # TODO: 手动开启arrow框架
    spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)

    # 2.数据输入
    df = spark.createDataFrame(
        data=[(1,'张三_广州'),(2,'李四_深圳')],
        schema='id int ,name_address string'
    )
    df.show()
    print(type(df))
    print('------------------------')

    # 3.数据处理(切分,转换,分组聚合)
    # 4.数据输出
    # spark->pandas
    pd_df = df.toPandas()
    print(pd_df)
    print(type(pd_df))

    print('------------------------')
    # pandas->spark
    df2 = spark.createDataFrame(pd_df)
    df2.show()
    print(type(df2))
    

    # 5.关闭资源
    spark.stop()

4.3 基于Pandas自定义函数

​ 基于Pandas的UDF函数来转换为Spark SQL的UDF函数进行使用。底层是基于Arrow框架来完成数据传输,允许向量化(可以充分利用计算机CPU性能)操作。

​ Pandas的UDF函数其实本质上就是Python的函数,只不过函数的传入数据类型为Pandas的类型

基于Pandas的UDF可以使用自定义UDF函数和自定义UDAF函数

4.3.1 自定义函数流程
第一步: 在PySpark中创建一个Python的函数,在这个函数中书写自定义的功能逻辑代码即可

第二步: 将Python函数包装成Spark SQL的函数
	注册方式一: udf对象 = spark.udf.register(参数1, 参数2)
		参数1: UDF函数名称。此名称用于后续在SQL中使用,可以任意取值,但是要符合名称的规范
		参数2: Python函数的名称。表示将哪个Python的函数注册为Spark SQL的函数
		使用: udf对象只能在DSL中使用。参数1指定的名称只能在SQL中使用
		
		
	注册方式二: udf对象 = F.pandas_udf(参数1, 参数2)
		参数1: 自定义的Python函数。表示将哪个Python的函数注册为Spark SQL的函数
		参数2: UDF函数的返回值类型。用于表示当前这个Python的函数返回的类型对应到Spark SQL的数据类型
		udf对象: 返回值对象,是一个UDF对象。仅能用在DSL中使用
	
	注册方式三: 语法糖写法  @F.pandas_udf(returnType)  放置到对应Python的函数上面
		说明: 实际是方式二的扩展。仅能用在DSL中使用
	
第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可

基于pandas方式还支持自定义UDAF函数
注意: 如果要用于自定义UDAF函数,理论上只能用上述注册方式三语法糖方式,也就意味着理论只能DSL使用
注意: 如果还想同时用SQL方式和DSL方式,可以把加了语法糖的函数,再传入到方式register注册,就可以使用了!
4.3.2 自定义UDF函数
  • 自定义Python函数的要求:SeriesToSeries

    在这里插入图片描述

# 导包
import os
from pyspark.sql import SparkSession,functions as F
import pandas as pd

# 绑定指定的python解释器
from pyspark.sql.types import LongType, IntegerType

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()

    # TODO: 开启Arrow的使用
    spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'True')

    # 2.数据输入
    df = spark.createDataFrame(
        data = [(1,1),(2,2),(3,3)],
        schema= 'num1 int,num2 int'
    )
    df.show()
    # 3.基于pandas自定义函数 :SeriesTOSeries
    # 第一步: 自定义python函数
    def multiply(num1:pd.Series,num2:pd.Series)->pd.Series:
        return num1*num2

    # 第二步: 把python注册为SparkSQL函数
    # ①spark.udf.register注册
    dsl1_multiply = spark.udf.register('sql_multiply',multiply)
    # ②F.pandas_udf注册
    dsl2_multiply = F.pandas_udf(multiply,IntegerType())
    # ③@F.pandas_udf注册
    @F.pandas_udf(IntegerType())
    def candy_multiply(num1: pd.Series, num2: pd.Series) -> pd.Series:
        return num1 * num2

    # 第三步: 在SparkSQL中调用注册后函数
    # SQL方式
    df.createTempView('temp')
    spark.sql(
        """select num1,num2,sql_multiply(num1,num2) as result from temp"""
    ).show()
    # DSL方式
    #调用dsl1_multiply
    df.select(
        'num1','num2',dsl1_multiply('num1','num2').alias('result')
    ).show()
    # 调用dsl2_multiply
    df.select(
        'num1', 'num2', dsl2_multiply('num1', 'num2').alias('result')
    ).show()
    # 调用candy_multiply
    df.select(
        'num1', 'num2', candy_multiply('num1', 'num2').alias('result')
    ).show()

    # 4.关闭资源
    spark.stop()

4.3.3 自定义UDAF函数
  • 自定义Python函数的要求:Series To 标量
  1. 简单来说:Series To 标量是指将一个Pandas Series(一维数组)转换为一个标量值(单个值),就像是“把一串数据浓缩成一个结果”。

  2. 具体而言

    • Series:Pandas中的一维数据结构,类似于带标签的数组,可以存储任意类型的数据。
    • 标量:单个值,比如整数、浮点数、字符串等。
    • 转换场景
      • 聚合操作:将Series中的所有值通过某种计算(如求和、平均值)转换为一个标量值。
      • 提取操作:从Series中提取某个特定位置的值作为标量。
    • 示例
      import pandas as pd
      
      # 创建一个Series
      s = pd.Series([1, 2, 3, 4, 5])
      
      # 聚合操作:求和
      sum_result = s.sum()  # 输出:15
      
      # 提取操作:获取第一个值
      first_value = s[0]    # 输出:1
      
  3. 实际生产场景

    • 在数据分析中,使用聚合操作将一列数据(如销售额)转换为总销售额或平均销售额。
    • 在数据处理中,从时间序列数据中提取某个时间点的值作为标量。
  4. 总之:Series To 标量是Pandas中常见的操作,通过聚合或提取,将一维数据转换为单个值,为数据分析和处理提供了便利。

表示:自定义函数的输入数据类型是Pandas中的Series对象,返回值数据类型是标量数据类型。也就是Python中的数据类型,例如:int、float、bool、list…

在这里插入图片描述

基于pandas方式还支持自定义UDAF函数
注意: 如果要用于自定义UDAF函数,理论上只能用上述注册方式三语法糖方式,也就意味着理论只能DSL使用
注意: 如果还想同时用SQL方式和DSL方式,可以把加了语法糖的函数,再传入到方式register注册,就可以使用了!
# 导包
import os
from pyspark.sql import SparkSession, functions as F
import pandas as pd

# 绑定指定的python解释器
from pyspark.sql.types import LongType, IntegerType, FloatType

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()

    # TODO: 开启Arrow的使用
    spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'True')

    # 2.数据输入
    df = spark.createDataFrame(
        data=[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        schema='id int,value float'
    )
    df.show()


    # 3.基于pandas自定义函数 :SeriesTOSeries
    # 第一步: 自定义python函数
    # ③@F.pandas_udf注册  注意: 理论上UDAF只能用注册方式三语法糖方式,也就意味着只能DSL使用
    @F.pandas_udf(FloatType())
    def candy_mean_v(value: pd.Series) -> float:
        return value.mean()


    # 第二步: 注意: 如果还想同时用SQL方式和DSL方式,可以把加了语法糖的函数,再传入到方式一register注册
    # ①spark.udf.register注册
    dsl1_mean_v = spark.udf.register('sql_mean_v', candy_mean_v)

    # 第三步: 在SparkSQL中调用注册后函数
    # DSL方式
    # 调用candy_mean_v
    df.groupBy('id').agg(
        candy_mean_v('value').alias('result')
    ).show()

    # 调用dsl1_mean_v
    df.groupBy('id').agg(
        dsl1_mean_v('value').alias('result')
    ).show()

    # SQL方式
    df.createTempView('temp')
    spark.sql(
        """select id,sql_mean_v(value) as result from temp group by id"""
    ).show()

    # 4.关闭资源
    spark.stop()

三、Spark on Hive(操作)

1、集成原理

在这里插入图片描述

HiveServer2的主要作用: 接收SQL语句,进行语法检查;解析SQL语句;优化;将SQL转变成MapReduce程序,提交到Yarn集群上运行

SparkSQL与Hive集成,实际上是替换掉HiveServer2。是SparkSQL中的HiveServer2替换掉了Hive中的HiveServer2。

集成以后优点如下:
1- 对于SparkSQL来说,可以避免在代码中编写schema信息。直接向MetaStore请求元数据信息
2- 对于SparkSQL来说,多个人可以共用同一套元数据信息,避免每个人对数据理解不同造成代码功能兼容性问题
3- 对于Hive来说,底层执行引擎由之前的MapReduce变成了Spark Core,能够提升运行效率
4- 对于使用者/程序员来说,SparkSQL与Hive集成,对于上层使用者来说,是完全透明的。

2、集成环境配置

环境搭建,参考【Spark课程阶段_部署文档.doc】的7章节内容。

1-node1上将hive-site.xml拷贝到spark安装路径conf目录

cd /export/server/hive/conf

cp hive-site.xml /export/server/spark/conf/

2-node1上执行以下命令将mysql的连接驱动包拷贝到spark的jars目录下

注意: 之前拷贝过的可以忽略此操作

cd /export/server/hive/lib

cp mysql-connector-java-5.1.32.jar  /export/server/spark/jars/

3、启动metastore服务

# 注意: 
# 启动 hadoop集群
start-all.sh

# 启动hive的metastore
nohup /export/server/hive/bin/hive --service metastore &

# 测试spark-sql
/export/server/spark/bin/spark-sql

4、SparkOnHive操作

4.1 黑窗口测试spark-sql
[root@node1 bin]# /export/server/spark/bin/spark-sql
...
spark-sql>show databases;
...
spark-sql>create database if not exists spark_demo;
...
spark-sql>create table if not exists spark_demo.stu(id int,name string);
...
spark-sql>insert into  spark_demo.stu values(1,'张三'),(2,'李四');
...
4.2 python代码测试spark-sql

SparkOnHive配置:

spark.sql.warehouse.dir: 告知Spark数据表存放的地方。推荐使用HDFS。如果不配置,默认使用本地磁盘存储。
hive.metastore.uris: 告知Spark,MetaStore元数据管理服务的连接信息
enableHiveSupport() : 开启Spark和Hive的集成

使用格式如下:
 spark = SparkSession.builder\
        .config('spark.sql.warehouse.dir','hdfs://node1:8020/user/hive/warehouse')\
        .config('hive.metastore.uris','thrift://node1.itcast.cn:9083')\
        .appName('pyspark_demo')\
        .master('local[1]')\
        .enableHiveSupport()\
        .getOrCreate()

示例:

# 导包
import os
import time

from pyspark.sql import SparkSession

# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    spark = SparkSession.builder\
        .config('spark.sql.warehouse.dir','hdfs://node1:8020/user/hive/warehouse')\
        .config('hive.metastore.uris','thrift://node1.itcast.cn:9083')\
        .appName('pyspark_demo')\
        .master('local[1]')\
        .enableHiveSupport()\
        .getOrCreate()


    # 2.执行sql
    # 查看所有库
    spark.sql( "show databases").show()

    # 查看demo1的student表内容
    spark.sql("select * from demo1.student").show()

    # 测试是否能建库: 可以
    spark.sql( "create database if not exists spark_demo" )

    # 测试是否能在spark_demo建表: 可以
    spark.sql("""create table if not exists spark_demo.stu(id int,name string)""")

    # 测试是否可以往spark_demo.stu表插入数据: 可以
    spark.sql("""insert into  spark_demo.stu values(1,'张三'),(2,'李四')""")

    # 为了方便查看web页面
    time.sleep(500)

    # 3.关闭资源
    spark.stop()

四、SparkSQL的分布式执行引擎(了解)

分布式执行引擎 == Thrift服务 == ThriftServer == SparkSQL中的Hiveserver2

1、启动Thrift服务

​ 目前,我们已经完成Spark集成Hive的配置。但是目前集成后,如果需要连接Hive,此时需要启动一个Spark的客户端(spark-sql、代码)才可以。这个客户端底层相当于启动服务项,用于连接Hive的metastore的服务,进行处理操作。一旦退出客户端,相当于这个服务也就没有了,无法再使用

​ 目前的情况非常类似于在Hive部署的时候,有一个本地模式部署(在启动Hive客户端的时候,内部自动启动一个Hive的hiveserver2服务项)

大白话: 目前在Spark后台,并没有一个长期挂载的Spark的服务(Spark HiveServer2服务)。导致每次启动Spark客户端,都需要在内部启动一个服务项。这种方式,不适合测试使用,不合适后续的快速开发

​ 如何启动Spark 提供的分布式的执行引擎呢? 这个引擎大家完全可以将其理解为Spark的HiveServer2服务,实际上就是Spark的Thrift服务项

# 注意: 要启动sparkThriftServer2服务,必须要保证先启动好Hadoop以及Hive的metastore,不能启动Hive的hiveserver2服务!
# 启动 hadoop集群
start-all.sh

# 启动hive的metastore
nohup /export/server/hive/bin/hive --service metastore &

# 最后执行以下命令启动sparkThriftServer2:
/export/server/spark/sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10000 \
--hiveconf hive.server2.thrift.bind.host=node1 \
--hiveconf spark.sql.warehouse.dir=hdfs://node1:8020/user/hive/warehouse \
--master local[2]

校验是否成功:
在这里插入图片描述

访问界面:默认4040

在这里插入图片描述

2、beeline连接Thrift服务

启动后,可以通过spark提供beeline的方式连接这个服务。连接后,直接编写SQL即可

相当于模拟了一个Hive的客户端,但是底层执行的是Spark SQL,最终将其转换为Spark RDD的程序




启动命令:/export/server/spark/bin/beeline

然后输入:!connect jdbc:hive2://node1:10000

继续输入用户名: root
注意密码: 不需要写,直接回车

在这里插入图片描述

3、开发工具连接Thrift服务

如何通过DataGrip或者PyCharm连接Spark进行操作
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

4、控制台编写SQL代码

进入以下页面就可以愉快的编写sql了,再也不用担心在spark.sql()中编写没有提示了:)
在这里插入图片描述

五、Spark SQL的运行机制(掌握)

Spark SQL底层依然运行的是Spark RDD的程序,所以说Spark RDD程序的运行的流程,在Spark SQL中依然是存在的,只不过在这个流程的基础上增加了从SQL翻译为RDD的过程

​ Spark SQL的运行机制,其实就是在描述如何将Spark SQL翻译为RDD程序:
在这里插入图片描述

​ 整个Spark SQL 转换为RDD 是基于Catalyst 优化器实施,基于这个优化器即可完成整个转换操作

5.1 Catalyst内部具体的执行流程:

在这里插入图片描述
在这里插入图片描述

大白话:

SQL执行顺序: from->join on->where->groupby->聚合操作->having->select [distinct] ->order by ->limit

1- 接收客户端提交过来的SQL/DSL代码,首先会校验SQL/DSL的语法是否正常。如果通过校验,根据SQL/DSL的执行顺序,生成未解析的逻辑计划,也叫做AST抽象语法树

2- 对于AST抽象语法树加入元数据信息,确定一共涉及到哪些字段、字段的数据类型是什么,以及涉及到的表的其他相关元数据信息。加入元数据信息以后,就得到了(已经解析但是未优化的)逻辑计划

3- 对(未优化的)逻辑计划执行优化操作,整个优化通过优化器来执行。在优化器匹配相对应的优化规则,实时具体的优化。SparkSQL底层提供了一两百中优化规则,得到优化的逻辑计划。例如: 谓词下推(断言下推)、列值裁剪
	3.1- 谓词下推: 也叫做断言下推。将数据过滤操作提前到数据扫描的时候执行,减少后续处理数据量,提升效率。
	3.2- 列值裁剪: 在表中只加载数据分析用到的字段,不相关的字段不加载进来。减少后续处理数据量,提升效率。
	
4- 由于优化规则很多,导致会得到多个优化的逻辑计划。在转换成物理执行计划的过程中,会根据 成本模型(对比每个计划运行的耗时、资源消耗等)得到最优的一个物理执行计划

5- 将物理执行计划通过code generation(代码生成器),转变成Spark RDD的代码

6- 最后就是将Spark RDD代码部署到集群上运行。

后续过程与Spark内核调度中Job的调度流程完全一致。

专业的术语:

1- Spark SQL底层解析是由RBO(基于规则的优化器)和CBO(基于代价的优化器)优化完成的

2- RBO是基于规则优化,对于SQL或DSL的语句通过执行引擎得到未执行逻辑计划,在根据元数据得到逻辑计划,之后加入列值裁剪或谓词下推等优化手段形成优化的逻辑计划

3- CBO是基于优化的逻辑计划得到多个物理执行计划,根据 代价函数(成本模型) 选择出最优的物理执行计划

4- 通过code genaration代码生成器完成RDD的代码构建

5- 底层依赖于DAGScheduler和TaskScheduler完成任务计算执行

后续过程与Spark内核调度中Job的调度流程完全一致。
  1. 简单来说:SparkSQL的执行流程就像是“从SQL语句到结果的流水线”,通过解析、优化和执行,将SQL查询转化为分布式计算任务,最终返回结果。

  2. 具体而言

    • SQL解析
      • 将SQL语句解析为抽象语法树(AST)。
      • 使用ANTLR工具将AST转换为逻辑计划(Logical Plan)。
    • 逻辑优化
      • 对逻辑计划进行优化,如谓词下推、列剪裁等。
      • 生成优化后的逻辑计划。
    • 物理计划生成
      • 将逻辑计划转换为物理计划(Physical Plan),选择最优的执行策略。
      • 物理计划包括RDD转换、数据源读取等具体操作。
    • 任务调度与执行
      • 将物理计划分解为多个Stage和Task。
      • 通过DAGScheduler和TaskScheduler将Task分配到集群节点上执行。
    • 结果返回
      • 将计算结果返回给客户端,如DataFrame或直接输出。
  3. 实际生产场景

    • 在数据仓库中,使用SparkSQL查询海量数据,生成报表和洞察。
    • 在实时分析中,结合Structured Streaming,使用SparkSQL处理实时数据流。
  4. 总之:SparkSQL的执行流程通过解析、优化和执行,将SQL查询高效地转化为分布式计算任务,为大规模数据处理提供了强大的支持。

为什么 SparkSQL 的执行流程就像是“从 SQL 语句到结果的流水线”?

  1. 流水线:分阶段处理

    • 流水线:将复杂任务分解为多个阶段,每个阶段专注于特定任务。
    • SparkSQL:将SQL查询分解为SQL解析逻辑优化物理计划生成任务调度与执行结果返回等多个阶段,每个阶段完成特定任务。
  2. 高效流转:逐步优化和执行

    • 流水线:每个阶段完成后,数据会流转到下一个阶段,逐步完成最终目标。
    • SparkSQL:SQL语句经过解析、优化、物理计划生成等步骤,逐步转化为分布式计算任务,最终高效执行并返回结果。
  3. 自动化:无需手动干预

    • 流水线:自动化完成每个阶段的任务,无需人工干预。
    • SparkSQL:通过Catalyst优化器和Tungsten引擎,自动优化查询计划并执行,开发者只需关注SQL语句和结果。
  4. 结果导向:最终输出

    • 流水线:最终输出成品。
    • SparkSQL:最终输出查询结果(如DataFrame或报表),为业务决策提供支持。

实际意义

SparkSQL的执行流程就像“从SQL语句到结果的流水线”,通过分阶段、高效流转和自动化的方式,将SQL查询转化为分布式计算任务,最终返回结果,为大规模数据处理提供了强大的支持。

5.2 SparkSQL的执行流程总结:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

01_spark原生自定义UDF函数_返回字符串.py

# 导包
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 先创建spark session对象
    spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()

    # 创建DF对象
    df = spark.createDataFrame([
        (1, "张三", '广州'),
        (2, "李四", '深圳'),
        (3, "王五", '上海')
    ], schema=["id", "name", "address"])
    # 测试是否有数据
    df.show()


    # 需求: 自定义函数,功能是给df的所有地址都添加一个后缀'_itheima'
    # 一.自定义能添加后缀'_itheima'功能的python函数
    def add_suffix(address):
        return address + '_itheima'


    # 二.将python函数注册为spark的UDF函数(SQL风格和DSL风格)
    # 注册方式1: 适用于sql和dsl风格
    dsl_add_suffix = spark.udf.register('sql_add_suffix', add_suffix, StringType())

    # 三.使用UDF函数
    # 方式1: SQL风格
    # 先有临时表,再调用sql执行
    df.createTempView("stu_tb")
    spark.sql("""
        select *,
            sql_add_suffix(address) as address_new
        from stu_tb
    """).show()

    # 方式2: DSL风格
    # df.select(
    #     "*",
    #     dsl_add_suffix("address").alias("address_new")
    # ).show()

    # 注意: 最后一定释放资源
    spark.stop()

在这里插入图片描述

结果

在这里插入图片描述

02_spark原生自定义UDF函数_返回列表.py

# 导包
import os
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StringType, ArrayType

# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 先创建spark session对象
    spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()

    # 创建DF对象
    df = spark.createDataFrame([
        (1, "张三_广州"),
        (2, "李四_深圳"),
        (3, "王五_上海")
    ], schema=["id", "name_address"])
    # 测试是否有数据
    df.show()


    # 需求: 自定义函数
    # 一.自定义能返回列表的功能的python函数
    def my_split(name_address):
        return name_address.split('_')


    # 二.将python函数注册为spark的UDF函数(SQL风格和DSL风格)
    # 注册方式1: 适用于sql和dsl风格
    dsl_my_split = spark.udf.register('sql_my_split', my_split, ArrayType(StringType()))

    # 三.使用UDF函数
    # 方式1: SQL风格
    # 先有临时表,再调用sql执行
    df.createTempView("stu_tb")
    spark.sql("""
        select *,
            sql_my_split(name_address)[0] as name,
            sql_my_split(name_address)[1] as address
        from stu_tb
    """).show()

    # 方式2: DSL风格
    df.select(
        "*",
        dsl_my_split("name_address")[0].alias("name"),
        dsl_my_split("name_address")[1].alias("address")
    ).show()

    # 注意: 最后一定释放资源
    spark.stop()

结果

在这里插入图片描述

03_spark原生自定义UDF函数_返回字典.py

# 导包
import os
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StringType, ArrayType, StructType

# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 先创建spark session对象
    spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()

    # 创建DF对象
    df = spark.createDataFrame([
        (1, "张三_广州"),
        (2, "李四_深圳"),
        (3, "王五_上海")
    ], schema=["id", "name_address"])
    # 测试是否有数据
    df.show()


    # 需求: 自定义函数
    # 一.自定义能返回字典的功能的python函数
    def my_split(name_address):
        list1 = name_address.split('_')
        dict1 = {"name": list1[0], "address": list1[1]}
        return dict1


    # 二.将python函数注册为spark的UDF函数(SQL风格和DSL风格)
    # 注册方式1: 适用于sql和dsl风格
    # 注意: 如果原始函数返回的是字典,就必须用StructType()且字段名必须和原生字典的key值一样,否则null补充
    t = StructType().add("name", StringType()).add("address", StringType())
    dsl_my_split = spark.udf.register('sql_my_split', my_split, t)

    # 三.使用UDF函数
    # 方式1: SQL风格
    # 先有临时表,再调用sql执行
    df.createTempView("stu_tb")
    spark.sql("""
        select *,
            sql_my_split(name_address)['name'] as name,
            sql_my_split(name_address)['address'] as address
        from stu_tb
    """).show()

    # 方式2: DSL风格
    df.select(
        "*",
        dsl_my_split("name_address")['name'].alias("name"),
        dsl_my_split("name_address")['address'].alias("address")
    ).show()

    # 注意: 最后一定释放资源
    spark.stop()

结果

在这里插入图片描述

04_sparkSQL和pandas中df对象互转操作.py

# 导包
import os
from pyspark.sql import SparkSession

# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 先创建spark session对象
    spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()

    # 注意: 如果想优化createDataFrame()效率可以手动开启arrow设置
    # TODO: 手动开启arrow设置
    spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)

    # 1.先创建sparkSQL的df对象
    spark_df = spark.createDataFrame([
        (1, "张三"),
        (2, "李四"),
        (3, "王五")
    ], schema=["id", "name"])
    # 查看数据类型
    print(type(spark_df))  # <class 'pyspark.sql.dataframe.DataFrame'>
    spark_df.show()

    # 2.把saprk_df转换为pandas的df对象
    pd_df = spark_df.toPandas()
    # 查看数据类型
    print(type(pd_df))  # <class 'pandas.core.frame.DataFrame'>
    print(pd_df)

    # 3.把pandas的df对象转换为sparkSQL的df对象

    spark_df2 = spark.createDataFrame(pd_df)
    # 查看数据类型
    print(type(spark_df2))  # <class 'pyspark.sql.dataframe.DataFrame'>
    spark_df2.show()

    # 注意: 最后一定释放资源
    spark.stop()

05_spark基于pandas定义udf函数_s到s.py

# 导包
import os
from pyspark.sql import SparkSession, functions as F
import pandas as pd
from pyspark.sql.types import DoubleType, IntegerType

# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 先创建spark session对象
    spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()

    # TODO: 手动开启arrow设置
    spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)

    # 创建DF对象
    df = spark.createDataFrame([(1, 1), (2, 2), (3, 3)], schema=["n1", "n2"])
    df.show()


    # 一.自定义python函数
    # 功能:输入两列,输出对应乘积1列
    def mul(n1: pd.Series, n2: pd.Series) -> pd.Series:
        return n1 * n2


    # 二.把python函数包装成spark的UDF函数(sql和dsl风格)
    # 注册方式1: 适用于sql和dsl风格
    dsl_mul = spark.udf.register("sql_mul", mul)
    # 注册方式2: 仅适用于dsl风格
    dsl2_mul = F.pandas_udf(mul, IntegerType())


    # 注册方式3: 仅适用于dsl风格
    @F.pandas_udf(IntegerType())
    def candy_mul(n1: pd.Series, n2: pd.Series) -> pd.Series:
        return n1 * n2


    # 三.使用UDF函数
    # 方式1: SQL风格
    # 先有临时表,再调用sql执行
    df.createTempView("nums_tb")
    spark.sql("""
        select n1,n2,sql_mul(n1, n2) as n3 from nums_tb
    """).show()

    # 方式2: DSL风格
    df.select(
        "n1", "n2",
        dsl_mul("n1", "n2").alias("n3"),
        dsl2_mul("n1", "n2").alias("n4"),
        candy_mul("n1", "n2").alias("n5")
    ).show()

    # 注意: 最后一定释放资源
    spark.stop()

06_spark基于pandas定义udaf函数_s到标量.py

# 导包
import os

import pandas as pd
from pyspark.sql import SparkSession, functions as F

# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 先创建spark session对象
    spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()

    # 创建DF对象
    df = spark.createDataFrame(
        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        schema="id int, value float"
    )
    df.show()


    # 二.使用语法糖方式注册原始函数为udaf函数
    @F.pandas_udf("float")
    # 一.定义原始python函数
    def candy_my_avg(values: pd.Series) -> float:
        return values.mean()


    # 三.使用自定义的udaf函数
    # dsl方式
    df.groupby("id").agg(
        candy_my_avg("value").alias("avg_value")
    ).show()
    # 如果想用sql方式怎么办?把添加了语法糖的函数,再注册为udaf函数
    dsl_my_avg = spark.udf.register("sql_my_avg", candy_my_avg)
    df.createTempView('nums_tb')
    spark.sql("""
        select id,sql_my_avg(value) as avg_value
        from nums_tb
        group by id
    """).show()

    # 注意: 最后一定释放资源
    spark.stop()

07_spark_sql操作数据库.py

# 导包
import os
from pyspark.sql import SparkSession

# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 先创建spark session对象
    spark = (SparkSession.builder
            .config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse")
            .config("hive.metastore.uris", "thrift://node1.itcast.cn:9083")
             .appName("spark_demo")
             .master("local[1]")
             .enableHiveSupport()
             .getOrCreate()
             )

    spark.sql("""
        create database if not exists spark_demo2
    """)
    spark.sql("""
        create table if not exists spark_demo2.stu(
            id int,
            name string,
            age int
        );
    """)
    spark.sql("""
        insert into spark_demo2.stu values(1,'张三',18),(2,'李四',28)
    """)
    spark.sql("""
           select * from  spark_demo2.stu
     """).show()

    # 注意: 最后一定释放资源
    spark.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/953246.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

presto不支持concat_ws

在sparksql/hive中&#xff0c;将一个数据集合已指定的分隔符拼接可以用concat_ws&#xff0c;但是在presto中没有这个函数&#xff0c;不过presto提供了一个集合方法array_join&#xff0c;来达到相同的目的 同样的对数据集去重可以用array_distinct 如果你不需要去重就直接…

【日常小记】Ubuntu启动后无图形界面且网络配置消失

【日常小记】Ubuntu启动后无图形界面且网络配置消失 解决方法&#xff1a; 1. 输入后恢复网络: #sudo dhclient 2. 重新安装ubuntu-desktop #sudo apt-get install ubuntu-desktop&#xff01;&#xff01;&#xff01;请关注是否能ping通某网站&#xff08;例如百度&…

01、kafka知识点综合

kafka是一个优秀大吞吐消息队列&#xff0c;下面我就从实用的角度来讲讲kafka中&#xff0c;“kafka为何有大吞吐的机制”&#xff0c;“数据不丢失问题”&#xff0c;“精准一次消费问题” 01、kafka的架构组织和运行原理 kafka集群各个节点的名称叫broker&#xff0c;因为kaf…

【ArcGIS微课1000例】0137:色彩映射表转为RGB全彩模式

本文讲述ArcGIS中,将tif格式的影像数据从色彩映射表转为RGB全彩模式。 参考阅读:【GlobalMapper精品教程】093:将tif影像色彩映射表(调色板)转为RGB全彩模式 文章目录 一、色彩映射表预览二、色彩映射表转为RGB全彩模式一、色彩映射表预览 加载配套数据包中的0137.rar中的…

Python教程丨Python环境搭建 (含IDE安装)——保姆级教程!

工欲善其事&#xff0c;必先利其器。 学习Python的第一步不要再加收藏夹了&#xff01;提高执行力&#xff0c;先给自己装好Python。 1. Python 下载 1.1. 下载安装包 既然要下载Python&#xff0c;我们直接进入python官网下载即可 Python 官网&#xff1a;Welcome to Pyt…

2025.1.13运算符重载和继承

作业 #include <iostream> #include <cstring> using namespace std; //在之前做的mystring类的基础上&#xff0c;将能够重载的运算符全部进行重载class mystring { private:char *str;int size;public://无参构造mystring():size(10){str new char[size];str[0…

慧集通(DataLinkX)iPaaS集成平台-业务建模之业务对象(二)

3.UI模板 当我们选择一条已经建好的业务对象点击功能按钮【UI模板】进入该业务对象的UI显示配置界面。 右边填写的是UI模板的编码以及对应名称&#xff1b;菜单界面配置以业务对象UI模板编码获取显示界面。 3.1【列表-按钮】 展示的对应业务对象界面的功能按钮配置&#xff1…

springboot使用Easy Excel导出列表数据为Excel

springboot使用Easy Excel导出列表数据为Excel Easy Excel官网&#xff1a;https://easyexcel.opensource.alibaba.com/docs/current/quickstart/write 主要记录一下引入时候的pom&#xff0c;直接引入会依赖冲突 解决方法&#xff1a; <!-- 引入Easy Excel的依赖 -->&l…

计算机的错误计算(二百一十)

摘要 利用两个大模型计算 . 若可能&#xff0c;保留10位有效数字。实验表明&#xff0c;一个大模型给出了错误结果。另外一个大模型提供了 Python代码&#xff1b;运行代码后&#xff0c;输出中有2位错误数字。 例1. 计算 . 若可能&#xff0c;保留10位有效数字。 下面是一…

用vscode+ollama自定义Cursor AI编辑的效果

在vscode上搜索Continue 添加大语言模型 选择对应的本地模型版本 效果

基于微信小程序的汽车销售系统的设计与实现springboot+论文源码调试讲解

第4章 系统设计 一个成功设计的系统在内容上必定是丰富的&#xff0c;在系统外观或系统功能上必定是对用户友好的。所以为了提升系统的价值&#xff0c;吸引更多的访问者访问系统&#xff0c;以及让来访用户可以花费更多时间停留在系统上&#xff0c;则表明该系统设计得比较专…

ANSYS Fluent学习笔记(七)求解器四部分

16.亚松弛因子 Controls面板里面设置&#xff0c;它能够稳定计算的过程。如果采用常规的迭代算法可能结果就会发生振荡的情况。采用亚松驰因子可以有助于残差的稳定。 他的取值范围是0-1&#xff0c;0代表没有亚松驰&#xff0c;1表示物理量变化很快&#xff0c;一般情况下取…

【MySQL数据库】基础总结

目录 前言 一、概述 二、 SQL 1. SQL通用语法 2. SQL分类 3. DDL 3.1 数据库操作 3.2 表操作 4. DML 5. DQL 5.1 基础查询 5.2 条件查询 5.3 聚合函数 5.4 分组查询 5.5 排序查询 5.6 分页查询 6. DCL 6.1 管理用户 6.2 权限控制 三、数据类型 1. 数值类…

【数学】概率论与数理统计(五)

文章目录 [toc] 二维随机向量及其分布随机向量离散型随机向量的概率分布律性质示例问题解答 连续型随机向量的概率密度函数随机向量的分布函数性质连续型随机向量均匀分布 边缘分布边缘概率分布律边缘概率密度函数二维正态分布示例问题解答 边缘分布函数 二维随机向量及其分布 …

mysql中创建计算字段

目录 1、计算字段 2、拼接字段 3、去除空格和使用别名 &#xff08;1&#xff09;去除空格 &#xff08;2&#xff09;使用别名&#xff1a;AS 4、执行算术计算 5、小结 博主用的是mysql8 DBMS&#xff0c;附上示例资料&#xff1a; 百度网盘链接: https://pan.baidu.co…

uniapp 之 uni-forms校验提示【提交的字段[‘xxx‘]在数据库中并不存在】解决方案

目录 场景问题代码结果问题剖析解决方案 场景 uni-forms官方组件地址 使用uniapp官方提供的组件&#xff0c;某个表单需求&#xff0c;单位性质字段如果是高校&#xff0c;那么工作单位则是高校的下拉选择格式&#xff0c;单位性质如果是其他的类型&#xff0c;工作单位则是手动…

【SH】Xiaomi9刷Windows10系统研发记录 、手机刷Windows系统教程、小米9重装win10系统

文章目录 参考资料云盘资料软硬件环境手机解锁刷机驱动绑定账号和设备解锁手机 Mindows工具箱安装工具箱和修复下载下载安卓和woa资源包第三方Recovery 一键安装Windows准备工作创建分区安装系统 效果展示Windows和Android一键互换Win切换安卓安卓切换Win 删除分区 参考资料 解…

苹果电脑怎么清理后台,提升苹果电脑运行速度

苹果电脑以其流畅的系统和高效的性能备受用户青睐&#xff0c;但即使是性能强大的Mac&#xff0c;随着使用时间的增长&#xff0c;也会遇到运行变慢、卡顿的问题。造成这种现象的一个主要原因是后台运行的程序和进程过多&#xff0c;占用了系统资源。那么&#xff0c;苹果电脑怎…

qt 快捷功能 快速生成 setter getter 构造函数 父类虚函数重写 成员函数实现 代码框架 查看父类及父类中的虚函数

qt 快速生成 setter getter 构造函数 父类虚函数重写 成员函数实现 代码框架 1、找到要实现的头文件 2、鼠标移动到在头文件中的类定义的类名上&#xff0c;右键进行选择。 这是插入父类虚函数(父类虚函数重写) 选项弹出来的结果。可以查看到所有父类及父类中的所有的虚函数

2_CSS3 背景 --[CSS3 进阶之路]

CSS3 中的背景属性提供了许多强大的功能来增强网页设计&#xff0c;包括但不限于多背景图像、渐变、背景大小控制等。以下是一些关键的 CSS3 背景属性及其用法示例。 1. 多重背景图像 CSS3 允许你为一个元素设置多个背景图像。这些图像按照它们在 background-image 属性中定义…