【大家好,我是爱干饭的猿,本文重点介绍SparkSQL 定义UDF函数、SparkSQL 使用窗口函数。
后续会继续分享其他重要知识点总结,如果喜欢这篇文章,点个赞👍,关注一下吧】
上一篇文章:《【SparkSQL】DataFrame入门(重点:df代码操作、数据清洗API、通过JDBC读写数据库)》
4. SparkSQL函数定义
4.1 SparkSQL 定义UDF函数
无论Hive还是SparkSQL分析处理数据时,往往需要使用函数,SparkSQL模块本身自带很多实现公共功能的函数,在pyspark.sql.unctions中。SparkSQL与Hive一样支持定义函数:UDF和UDAF,尤其是UDF函数在实际项目中使用最为广泛。
回顾Hive中自定义函数有三种类型:
- 第一种:UDF (User-Defined-Function)函数
- 一对一的关系,输入一个值经过函数以后输出一个值;
- 在Hive中继承UDF类,方法名称为evaluate,返回值不能为void,其实就是实现一个方法;
- 第二种:UDAF(User-Defined Aggregation Function)聚合函数
- 多对一的关系,输入多个值输出一个值,通常与groupBy联合使用;
- 第三种:UDTF (User-Defined Table-Generating Functions)函数
- —对多的关系,输入一个值输出多个值(一行变为多行);
- 用户自定义生成函数,有点像flatMap;
目前来说Spark框架各个版本及各种语言对自定义函数的支持:
在SparkSQL中,目前仅仅支持UDF函数和UDAF函数,目前Python仅支持UDF
定义方式有2种:
-
sparksession.udf.register()
注册的UDF可以用于DSL和SQL
返回值用于DSL风格,传参内给的名字用于SQL风格 -
pyspark.sql.functions.udf
仅能用于DSL风格
-
方式1语法:
udf对象 = sparksession.udf.register(参数1,参数2,参数3)
参数1:UDF名称,可用于SQL风格
参数2:被注册成UDF的方法名
参数3:声明UDF的返回值类型
udf对象: 返回值对象,是一个UDF对象,可用于DSL风格 -
方式2语法:
udf对象 = F.udf(参数1, 参数2)
参数1:被注册成UDF的方法名
参数2:声明UDF的返回值类型
udf对象: 返回值对象,是一个UDF对象,可用于DSL风格 -
其中F是:
from pyspark.sql import functions as F
其中,被注册成UDF的方法名是指具体的计算方法,如:
def add(x, y): x + y
add就是将要被注册成UDF的方法名
1. 构建一个Interger返回值类型的UDF
# coding:utf8
import time
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F
if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
config("spark.sql.shuffle.partitions", 2).\
getOrCreate()
sc = spark.sparkContext
# 构建一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8]).map(lambda x:[x])
df = rdd.toDF(["num"])
# TODO 1: 方式1 sparksession.udf.register(), DSL和SQL风格均可以使用
# UDF的处理函数
def num_ride_10(num):
return num * 10
# 参数1: 注册的UDF的名称, 这个udf名称, 仅可以用于 SQL风格
# 参数2: UDF的处理逻辑, 是一个单独的方法
# 参数3: 声明UDF的返回值类型, 注意: UDF注册时候, 必须声明返回值类型, 并且UDF的真实返回值一定要和声明的返回值一致
# 返回值对象: 这是一个UDF对象, 仅可以用于 DSL 语法
# 当前这种方式定义的UDF, 可以通过参数1的名称用于SQL风格, 通过返回值对象用户DSL风格
udf2 = spark.udf.register("udf1", num_ride_10, IntegerType())
# 1.1 SQL风格中使用
# selectExpr 以SELECT的表达式执行, 表达式 SQL风格的表达式(字符串)
# select方法, 接受普通的字符串字段名, 或者返回值是Column对象的计算
df.selectExpr("udf1(num)").show()
# 1.2 DSL 风格中使用
# 返回值UDF对象 如果作为方法使用, 传入的参数 一定是Column对象
df.select(udf2(df['num'])).show()
# TODO 2: 方式2注册, 仅能用于DSL风格
udf3 = F.udf(num_ride_10, IntegerType())
df.select(udf3(df['num'])).show()
2. 构建一个ArrayType(数字\list)类型的返回值UDF
# coding:utf8
import time
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
import pandas as pd
from pyspark.sql import functions as F
if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
config("spark.sql.shuffle.partitions", 2).\
getOrCreate()
sc = spark.sparkContext
# 构建一个RDD
rdd = sc.parallelize([["hadoop spark flink"], ["hadoop flink java"]])
df = rdd.toDF(["line"])
# 注册UDF, UDF的执行函数定义
def split_line(data):
return data.split(" ") # 返回值是一个Array对象
# TODO 1: 方式1 构建UDF
udf2 = spark.udf.register("udf1", split_line, ArrayType(StringType()))
# 1. DLS风格
df.select(udf2(df['line'])).show()
# 2. SQL风格
df.createTempView("lines")
spark.sql("select udf1(line) from lines").show(truncate=False)
# TODO 2: 方式2的形式构建UDF
udf3 = F.udf(split_line, ArrayType(StringType()))
df.select(udf3(df['line'])).show(truncate=False)
3. 构建一个字典类型的返回值的UDF
# coding:utf8
import string
import time
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
import pandas as pd
from pyspark.sql import functions as F
if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
config("spark.sql.shuffle.partitions", 2).\
getOrCreate()
sc = spark.sparkContext
# 假设 有三个数字 1 2 3 我们传入数字 ,返回数字所在序号对应的 字母 然后和数字结合形成dict返回
# 比如传入1 我们返回 {"num":1, "letters": "a"}
rdd = sc.parallelize([[1], [2], [3]])
df = rdd.toDF(["num"])
# 注册UDF
def process(data):
return {"num": data, "letters": string.ascii_letters[data]}
"""
UDF的返回值是字典的话, 需要用StructType来接收
"""
udf1 = spark.udf.register("udf1", process, StructType().add("num", IntegerType(), nullable=True).\
add("letters", StringType(), nullable=True))
df.selectExpr("udf1(num)").show(truncate=False)
df.select(udf1(df['num'])).show(truncate=False)
4. 以mapPartitions API 完成UDAF构建
# coding:utf8
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
config("spark.sql.shuffle.partitions", 2).\
getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([1, 2, 3, 4, 5], 3)
df = rdd.map(lambda x: [x]).toDF(['num'])
# 折中的方式 就是使用RDD的mapPartitions 算子来完成聚合操作
# 如果用mapPartitions API 完成UDAF聚合, 一定要单分区
single_partition_rdd = df.rdd.repartition(1)
def process(iter):
sum = 0
for row in iter:
sum += row['num']
return [sum] # 一定要嵌套list, 因为mapPartitions方法要求的返回值是list对象
print(single_partition_rdd.mapPartitions(process).collect())
注意
使用UDF两种方式的注册均可以
唯一需要注意的就是:返回值类型—定要有合适的类型来声明
- 返回int 可以用IntergerType
- 返回值小数,可以用FolatType或者DoubleType
- 返回数组list可用ArrayType描述
- 返回字典可用StructType描述
这些Spark内置的数据类型均存储在:pyspark.sql.types包中
4.2 SparkSQL 使用窗口函数
-
介绍
开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。
开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用GROUP BY子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列。 -
聚合函数和开窗函数
聚合函数是将多行变成一行,count,avg…开窗函数是将一行变成多行;
聚合函数如果要显示其他的列必须将列加入到group by中开窗函数可以不使用group by,直接将所有信息显示出来 -
开窗函数分类
- 聚合开窗函数
聚合函数(列)OVER(选项),这里的选项可以是PARTITION BY子句,但不可以是ORDER BY子句。 - 排序开窗函数
排序函数(列)OVER(选项),这里的选项可以是ORDER BY子句,也可以是OVER(PARTITION BY子句ORDER BY子句),但不可以是PARTITION BY子句。 - 分区类型NTILE的窗口函数
- 聚合开窗函数
代码演示:
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
config("spark.sql.shuffle.partitions", 2).\
getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([
("张三", 'class_1', 99),
("王五", 'class_2', 35),
("王三", 'class_3', 57)
])
schema = StructType().add("name", StringType()).\
add("class", StringType()).\
add("score", IntegerType())
df = rdd.toDF(schema)
df.createTempView("stu")
# 1. T0D0聚合窗口函数的演示
spark.sql("""
SELECT *, AVG(score) OVER() as avg_score from stu
""").show()
# 2. T0D0排序相关的窗口函数计算
# RAKN over,DENSE_RANK over ROW_NUABER over
spark.sql("""
SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) As row_number_rank,
DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) As dense_rank,
RANK() OVER(ORDER BY score) AS rank
FROM stu
""").show()
# 3. TOD0 NTILE
spark.sql("""
SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu
""").show()
4.3 总结
- SparkSQL支持UDF和UDAF定义,但在Python中,暂时只能定义UDF(通过rdd的mapPartitions算子模拟实现udtf:通过返回array或者dict类型来模拟实现)
- UDF定义支持2种方式, 1:使用SparkSession对象构建. 2: 使用functions包中提供的UDF API构建. 要注意, 方式1可用DSL和SQL风格, 方式2 仅可用于DSL风格
- SparkSQL支持窗口函数使用, 常用SQL中的窗口函数均支持, 如聚合窗口\排序窗口\NTILE分组窗口等