Spark 7:Spark SQL 函数定义

SparkSQL 定义UDF函数

8e9d7ddcc2f04a4fa3d0b7e78921b4f8.png

21cc4f7f05aa4c4d8e9db15c1d271cb1.png

方式1语法:
udf对象 = sparksession.udf.register(参数1,参数2,参数3)
参数1:UDF名称,可用于SQL风格
参数2:被注册成UDF的方法名
参数3:声明UDF的返回值类型
udf对象: 返回值对象,是一个UDF对象,可用于DSL风格
方式2语法:
udf对象 = F.udf(参数1, 参数2)
参数1:被注册成UDF的方法名
参数2:声明UDF的返回值类型
udf对象: 返回值对象,是一个UDF对象,可用于DSL风格
其中F是:
from pyspark.sql import functions as F
其中,被注册成UDF的方法名是指具体的计算方法,如:
def add(x, y): x + y
add就是将要被注册成UDF的方法名

8dee25a8da254eec809eb0c8a5d95baf.png  

# coding:utf8
import time

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    # 构建一个RDD
    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x:[x])
    df = rdd.toDF(["num"])

    # TODO 1: 方式1 sparksession.udf.register(), DSL和SQL风格均可以使用
    # UDF的处理函数
    def num_ride_10(num):
        return num * 10
    # 参数1: 注册的UDF的名称, 这个udf名称, 仅可以用于 SQL风格
    # 参数2: UDF的处理逻辑, 是一个单独的方法
    # 参数3: 声明UDF的返回值类型, 注意: UDF注册时候, 必须声明返回值类型, 并且UDF的真实返回值一定要和声明的返回值一致
    # 返回值对象: 这是一个UDF对象, 仅可以用于 DSL 语法
    # 当前这种方式定义的UDF, 可以通过参数1的名称用于SQL风格, 通过返回值对象用户DSL风格
    udf2 = spark.udf.register("udf1", num_ride_10, IntegerType())

    # SQL风格中使用
    # selectExpr 以SELECT的表达式执行, 表达式 SQL风格的表达式(字符串)
    # select方法, 接受普通的字符串字段名, 或者返回值是Column对象的计算
    df.selectExpr("udf1(num)").show()

    # DSL 风格中使用
    # 返回值UDF对象 如果作为方法使用, 传入的参数 一定是Column对象
    df.select(udf2(df['num'])).show()

    # TODO 2: 方式2注册, 仅能用于DSL风格
    udf3 = F.udf(num_ride_10, IntegerType())
    df.select(udf3(df['num'])).show()

    df.selectExpr("udf3(num)").show()

2562d46c19ee47c3b20f47fea545fb36.png

af9b0504fc474fd8896cb20f93094922.png 

# coding:utf8
import time

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
import pandas as pd
from pyspark.sql import functions as F


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    # 构建一个RDD
    rdd = sc.parallelize([["hadoop spark flink"], ["hadoop flink java"]])
    df = rdd.toDF(["line"])

    # 注册UDF, UDF的执行函数定义
    def split_line(data):
        return data.split(" ")  # 返回值是一个Array对象
    # TODO1 方式1 构建UDF
    udf2 = spark.udf.register("udf1", split_line, ArrayType(StringType()))

    # DLS风格
    df.select(udf2(df['line'])).show()
    # SQL风格
    df.createTempView("lines")
    spark.sql("SELECT udf1(line) FROM lines").show(truncate=False)

    # TODO 2 方式2的形式构建UDF
    udf3 = F.udf(split_line, ArrayType(StringType()))
    df.select(udf3(df['line'])).show(truncate=False)

0e2c268c421848cfbc639a80fc067838.png

fe0861021c4d4be5a8b0566697bf5905.png 

# coding:utf8
import string
import time

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
import pandas as pd
from pyspark.sql import functions as F


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    # 假设 有三个数字  1 2 3  我们传入数字 ,返回数字所在序号对应的 字母 然后和数字结合形成dict返回
    # 比如传入1 我们返回 {"num":1, "letters": "a"}
    rdd = sc.parallelize([[1], [2], [3]])
    df = rdd.toDF(["num"])

    # 注册UDF
    def process(data):
        return {"num": data, "letters": string.ascii_letters[data]}

    """
    UDF的返回值是字典的话, 需要用StructType来接收
    """
    udf1 = spark.udf.register("udf1", process, StructType().add("num", IntegerType(), nullable=True).\
                              add("letters", StringType(), nullable=True))

    df.selectExpr("udf1(num)").show(truncate=False)
    df.select(udf1(df['num'])).show(truncate=False)

2034e0d567664867a1c76c153620c99e.png  

c5a05c06e0c24009abe4b20cdc81ed7e.png

 SparkSQL 使用窗口函数

f3cf9802e8084fc4bf2518765b500397.png

7d43ea9fba3c479094ff69bb62cd99da.png

# coding:utf8
# 演示sparksql 窗口函数(开窗函数)
import string
from pyspark.sql import SparkSession
# 导入StructType对象
from pyspark.sql.types import ArrayType, StringType, StructType, IntegerType
import pandas as pd
from pyspark.sql import functions as F

if __name__ == '__main__':
    spark = SparkSession.builder. \
        appName("create df"). \
        master("local[*]"). \
        config("spark.sql.shuffle.partitions", "2"). \
        getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([
    ('张三', 'class_1', 99),
    ('王五', 'class_2', 35),
    ('王三', 'class_3', 57),
    ('王久', 'class_4', 12),
    ('王丽', 'class_5', 99),
    ('王娟', 'class_1', 90),
    ('王军', 'class_2', 91),
    ('王俊', 'class_3', 33),
    ('王君', 'class_4', 55),
    ('王珺', 'class_5', 66),
    ('郑颖', 'class_1', 11),
    ('郑辉', 'class_2', 33),
    ('张丽', 'class_3', 36),
    ('张张', 'class_4', 79),
    ('黄凯', 'class_5', 90),
    ('黄开', 'class_1', 90),
    ('黄恺', 'class_2', 90),
    ('王凯', 'class_3', 11),
    ('王凯杰', 'class_1', 11),
    ('王开杰', 'class_2', 3),
    ('王景亮', 'class_3', 99)
])
schema = StructType().add("name", StringType()). \
    add("class", StringType()). \
    add("score", IntegerType())
df = rdd.toDF(schema)
# 窗口函数只用于SQL风格, 所以注册表先
df.createTempView("stu")
# TODO 聚合窗口
spark.sql("""
SELECT *, AVG(score) OVER() AS avg_score FROM stu
""").show()
# SELECT *, AVG(score) OVER() AS avg_score FROM stu 等同于
# SELECT * FROM stu
# SELECT AVG(score) FROM stu
# 两个SQL的结果集进行整合而来
spark.sql("""
SELECT *, AVG(score) OVER(PARTITION BY class) AS avg_score FROM stu
""").show()
# SELECT *, AVG(score) OVER(PARTITION BY class) AS avg_score FROM stu 等同于
# SELECT * FROM stu
# SELECT AVG(score) FROM stu GROUP BY class
# 两个SQL的结果集进行整合而来
# TODO 排序窗口
spark.sql("""
SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank, 
DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank, 
RANK() OVER(ORDER BY score) AS rank
FROM stu
""").show()
# TODO NTILE
spark.sql("""
SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu
""").show()

SparkSQL支持UDF和UDAF定义,但在Python中,暂时只能定义UDF
UDF定义支持2种方式, 1:使用SparkSession对象构建. 2: 使用functions包中提供的UDF API构建. 要注意, 方式1可用DSL和SQL风格, 方式2 仅可用于DSL风格
SparkSQL支持窗口函数使用, 常用SQL中的窗口函数均支持, 如聚合窗口\排序窗口\NTILE分组窗口等

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/91312.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【设备树笔记整理6】中断系统中的设备树

1 中断概念的引入与处理流程 1.1 中断处理框图 1.2 中断程序的使用 主函数() while(1) {do_routine_task(); }中断处理函数() {handle_interrupt_task(); }如何调用中断处理函数? 1.3 ARM对异常(中断)的处理过程 (1)初始化 ① 设置中断…

银河麒麟服务器、centos7服务器mysql离线安装:通过获取临时密码进行登录修改新密码

离线安装脚本 cd /home/zenglg/mysql5.7# 判断mysql是否安装# 下面这种方法必须是rpm安装的判断才有效,不通用# IS_INSTALLED$(rpm -qa |grep mysql)# if [ $? -eq 0 ]# 下面的判断方法是查询版本号,比较通用SQL_VERSIONmysql -V | grep -i -o -P 5.7.4…

[Open-source tool] 可搭配PHP和SQL的表單開源工具_Form tools(1):簡介和建置

Form tools是一套可搭配PHP和SQL的表單開源工具,可讓開發者靈活運用,同時其有數個表單模板和應用模組供挑選,方便且彈性。Form tools已開發超過20年,為不同領域的需求者或開發者提供一個自由和開放的平台,使他們可建構…

【C++】容器适配器stack、queue以及deque容器

🏖️作者:malloc不出对象 ⛺专栏:C的学习之路 👦个人简介:一名双非本科院校大二在读的科班编程菜鸟,努力编程只为赶上各位大佬的步伐🙈🙈 目录 前言一、什么是容器适配器1.1 stack的…

面试题(三)

目录 一.Spring 1.Spring IOC & AOP 2.Spring bean (1) 作用域 (2) Spring 中的 bean ⽣命周期 (3) Spring 框架中⽤到了哪些设计模式 二.Mybatis 1.标签 2.Dao接口 3.返回与映射 4.延迟加载 三.Kafka 四.设计模式 1.IO 设计模式 2.Spring 中的设计模式详解…

银河麒麟服务器、centos7服务器一键卸载mysql脚本

脚本 # 查看mysql相关的rpm包写到rmsql.sh文件中 rpm -aq | grep -i mysql >rmsql.sh # 修改文件为卸载mysql的脚本文件 sed -i -e s/^/yum remove -y / rmsql.sh # 修改文本权限 chmod 777 rmsql.sh # 全盘查找mysql相关文件,写到my.sh脚本中 find / -name mysq…

开源游戏开发:机会与挑战

🌷🍁 博主猫头虎 带您 Go to New World.✨🍁 🦄 博客首页——猫头虎的博客🎐 🐳《面试题大全专栏》 文章图文并茂🦕生动形象🦖简单易学!欢迎大家来踩踩~🌺 &a…

深度学习9:简单理解生成对抗网络原理

目录 生成算法 生成对抗网络(GAN) “生成”部分 “对抗性”部分 GAN如何运作? 培训GAN的技巧? GAN代码示例 如何改善GAN? 结论 生成算法 您可以将生成算法分组到三个桶中的一个: 鉴于标签&#…

函数指针.

首先看一段代码&#xff1a; #include <stdio.h> void test() {printf("hehe\n"); } int main() {printf("%p\n", test);printf("%p\n", &test);return 0; } 输出结果&#xff1a; 输出的是两个地址&#xff0c;这两个地址是 test 函…

探索pytest:Python自动化测试的新境界

在当今的软件开发领域&#xff0c;测试已经不仅仅是一个简单的步骤&#xff0c;而是确保软件质量的核心环节。Python&#xff0c;作为全球最受欢迎的编程语言之一&#xff0c;拥有丰富的测试框架和工具。而在这其中&#xff0c;pytest无疑是最受欢迎和最具影响力的一个。本文将…

深度学习1.卷积神经网络-CNN

目录 卷积神经网络 – CNN CNN 解决了什么问题&#xff1f; 需要处理的数据量太大 保留图像特征 人类的视觉原理 卷积神经网络-CNN 的基本原理 卷积——提取特征 池化层&#xff08;下采样&#xff09;——数据降维&#xff0c;避免过拟合 全连接层——输出结果 CNN …

VR法治警示教育:情景式课堂增强教育效果

VR法治警示教育平台是一款基于虚拟现实技术的在线教育平台&#xff0c;旨在通过模拟真实场景和互动体验&#xff0c;向公众普及法律知识&#xff0c;提高公民的法律意识和素养。该平台采用先进的虚拟现实技术&#xff0c;将用户带入一个逼真的仿真环境&#xff0c;让用户身临其…

小程序input的placeholder不垂直居中的问题解决

input的placeholder不垂直居中&#xff0c;input设置高度后&#xff0c;使用line-height只能使输入的文字垂直居中&#xff0c;但是placeholder不会居中&#xff0c;反而会偏上。 首先placeholder样式自定义 有两种方法&#xff0c;第一种行内样式&#xff1a; <input ty…

​LeetCode解法汇总5-正则表达式匹配​

目录链接&#xff1a; 力扣编程题-解法汇总_分享记录-CSDN博客 GitHub同步刷题项目&#xff1a; https://github.com/September26/java-algorithms 原题链接&#xff1a; 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 描述&#xff1a; 给你一棵…

Redis三种特殊数据类型

Redis三种特殊数据类型 geospatial 地理位置 Redis 地理空间数据类型简介 Redis 地理空间索引允许您存储坐标并搜索它们。 此数据结构可用于查找给定半径或边界框内的邻近点。 基本命令 GEOADD 将位置添加到给定的地理空间索引&#xff08;请注意&#xff0c;使用此命令&a…

常见的时序数据库

1.概念 时序数据库全称为时间序列数据库。时间序列数据库指主要用于处理带时间标签&#xff08;按照时间的顺序变化&#xff0c;即时间序列化&#xff09;的数据&#xff0c;带时间标签的数据也称为时间序列数据。 时间序列数据主要由电力行业、化工行业、气象行业、地理信息…

基于MATLAB的径向基函数插值(RBF插值)(一维、二维、三维)

基于MATLAB的径向基函数插值&#xff08;RBF插值&#xff09;&#xff08;一维、二维、三维&#xff09; 0 前言1 RBF思路2 1维RBF函数2.1 参数说明2.1.1 核函数选择2.1.2 作用半径2.1.3 多项式拟合2.1.4 误差项&#xff08;光滑项&#xff09; 3 2维RBF函数4 3维RBF函数 惯例声…

玩转git第7章节,本地git的用户名和密码的修改

一 本地git的用户名和密码 1.1 本地用户名和密码修改 1.本地用户名修改 2.凭据管理 3.进行修改密码 1.2 代码提交操作

python爬虫的js逆向入门到进阶教程文章分享汇总~持续更新

目录 一、内容介绍二 、专栏内容-持续更新1、JS逆向入门2、Js逆向进阶3、爬虫基础知识4、工具与安装5、漫星内容分享 三、星球使用四、b站up主视频推荐 一、内容介绍 二 、专栏内容-持续更新 1、JS逆向入门 2023-08-25》11.常见加密>xx音乐RSA加密 https://articles.zsxq.c…

【C++】list类的模拟实现

&#x1f3d6;️作者&#xff1a;malloc不出对象 ⛺专栏&#xff1a;C的学习之路 &#x1f466;个人简介&#xff1a;一名双非本科院校大二在读的科班编程菜鸟&#xff0c;努力编程只为赶上各位大佬的步伐&#x1f648;&#x1f648; 目录 前言一、list类的模拟实现1.1 list的…