SparkSql---用户自定义函数UDFUDAF

文章目录

  • 1.UDF
  • 2.UDAF
    • 2.1 UDF函数实现原理
    • 2.2需求:计算用户平均年龄
      • 2.2.1 使用RDD实现
      • 2.2.2 使用UDAF弱类型实现
      • 2.2.3 使用UDAF强类型实现

1.UDF

用户可以通过 spark.udf 功能添加自定义函数,实现自定义功能。

如:实现需求在用户name前加上"Name:"字符串,并打印在控制台

  def main(args: Array[String]): Unit = {
    //创建上下文环境配置对象
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo03")

    //创建 SparkSession 对象
    val sc: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    import sc.implicits._

    //创建DataFrame
    val dataRDD: RDD[(String,Int)] = sc.sparkContext.makeRDD(List(("zhangsan",21),("lisi",24)))
    val dataframe = dataRDD.toDF("name","age")
    //注册udf函数
    sc.udf.register("addName",(x:String)=>
      "Name:"+x
    )
    //创建临时视图
    dataframe.createOrReplaceTempView("people")
    //对临时视图使用udf函数
    sc.sql("select addName(name) from people").show()


    sc.stop()
  }

在这里插入图片描述

2.UDAF

强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。**通过继承 UserDefinedAggregateFunction 来实现用户自定义弱类型聚合函数。**从 Spark3.0 版本后,UserDefinedAggregateFunction 已经不推荐使用了。可以统一采用强类型聚合函数Aggregator。

2.1 UDF函数实现原理

在这里插入图片描述
在Spark中,UDF(用户自定义函数)在对表中的数据进行处理时,通常会将数据放入缓冲区中以便进行计算。这种缓冲策略可以提高数据处理的效率,特别是对于大数据集。

2.2需求:计算用户平均年龄

2.2.1 使用RDD实现

    val dataRDD: RDD[(String,Int)] = sc.sparkContext.makeRDD(List(("zhangsan",21),("lisi",24),("wangwu",26)))
    val reduceResult: (Int, Int) = dataRDD.map({
      case (name, age) => {
        (age, 1)
      }
    }).reduce((t1, t2) => {
      (t1._1 + t2._1, t1._2 + t2._2)
    })
    println(reduceResult._1/reduceResult._2)

在这里插入图片描述

2.2.2 使用UDAF弱类型实现

需要用户自定义类实现UserDefinedAggregateFunction,并重写其中的方法,当前已不推荐使用。

package bigdata.wordcount.udf

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StructField, StructType}
import org.apache.spark.util.AccumulatorV2

/**
 * 用户自定义函数
 */
object UDF_Demo02 {
  def main(args: Array[String]): Unit = {
    //创建上下文环境配置对象
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo03")

    //创建 SparkSession 对象
    val sc: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    import sc.implicits._

    val dataRDD: RDD[(String, Int)] = sc.sparkContext.makeRDD(List(("zhangsan", 19), ("lisi", 21), ("wangwu", 22)))
    val dataFrame: DataFrame = dataRDD.toDF("name","age")
    dataFrame.createOrReplaceTempView("user")

    //创建聚合函数
    var myAvg=new MyAverageUDAF()
    //在Spark中注册自定义的聚合函数
    sc.udf.register("avgMy",myAvg)

    sc.sql("select avgMy(age) from user").show()
    sc.stop()
  }

  case class User(var name:String,var age:Int)

}

class MyAverageUDAF extends UserDefinedAggregateFunction{
  //输入的要进行聚合的参数的类型
  override def inputSchema: StructType = StructType(Array(StructField("age",IntegerType)))

  //聚合函数缓冲区中的值的数据类型
  override def bufferSchema: StructType = StructType(Array(StructField("sum",LongType),StructField("count",LongType)))

  //函数返回的值的数据类型
  override def dataType: DataType = DoubleType

  //判断函数的稳定性
  //对于相同类型的输入是否有相同类型的输出
  override def deterministic: Boolean = true

  //聚合函数缓冲区中值的初始化
  //因为数据是弱类型的,函数缓冲区中是根据索引来找到对应的变量
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    //年龄的总和
    buffer(0)=0L
    //年龄的个数
    buffer(1)=0L
  }

  //更新缓冲区中的数据(执行操作步骤)
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit ={
    //第0个索引值是否为空
    if(!input.isNullAt(0)) {
      //更新年龄sum的值
      buffer(0)=buffer.getLong(0)+input.getInt(0)
      //更新年龄个数
      buffer(1)=buffer.getLong(1)+1;
    }

  }

  //合并缓冲区
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)
    buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)
  }

  //计算最终结果
  override def evaluate(buffer: Row): Double = {
    buffer.getLong(0).toDouble / buffer.getLong(1)
  }
}

在这里插入图片描述

2.2.3 使用UDAF强类型实现

Spark3.0 版本可以采用强类型的 Aggregator 方式代替 UserDefinedAggregateFunction

package bigdata.wordcount.udf

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Encoders, Row, SparkSession, TypedColumn}
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StructField, StructType}
import org.apache.spark.util.AccumulatorV2

/**
 * 用户自定义函数
 */
object UDF_Demo03 {
  def main(args: Array[String]): Unit = {
    //创建上下文环境配置对象
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo03")

    //创建 SparkSession 对象
    val sc: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    import sc.implicits._

    val dataRDD: RDD[(String, Int)] = sc.sparkContext.makeRDD(List(("zhangsan", 19), ("lisi", 21), ("wangwu", 22)))
    val dataFrame: DataFrame = dataRDD.toDF("name","age")
    val dataset: Dataset[User01] = dataFrame.as[User01]
    //创建聚合函数
    var myAvg=new MyAverageUDAF01()
    //将聚合函数转换为查询的列
    val col: TypedColumn[User01, Double] = myAvg.toColumn
    //执行查询操作
    dataset.select(col).show()

    sc.stop()
  }
  case class User(var name:String,var age:Int)

}

//输入数据类型
case class User01(var name:String,var age:Int)
//缓存中的数据类型
case class AgeBuffer(var sum:Long,var count:Long)

class MyAverageUDAF01 extends Aggregator[User01,AgeBuffer,Double]{
  //设置初始值
  override def zero: AgeBuffer = {
    AgeBuffer(0L,0L)
  }

  //缓冲区实现聚合
  override def reduce(b: AgeBuffer, a: User01): AgeBuffer = {
    b.sum = b.sum + a.age
    b.count = b.count + 1
    b
  }

  //合并缓冲区
  override def merge(b1: AgeBuffer, b2: AgeBuffer): AgeBuffer = {
    b1.sum+=b2.sum
    b1.count+=b2.count
    b1
  }

  //计算最终结果
  override def finish(buff: AgeBuffer): Double = {
    buff.sum.toDouble/buff.count
  }

  //设置编码器和解码器
  //自定义类型就是 product 自带类型根据类型选择
  override def bufferEncoder: Encoder[AgeBuffer] = {
    Encoders.product
  }

  override def outputEncoder: Encoder[Double] = {

    Encoders.scalaDouble
  }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/355108.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【QT+QGIS跨平台编译】之十四:【webp+Qt跨平台编译】(一套代码、一套框架,跨平台编译)

文章目录 一、webp介绍二、webp下载三、文件分析四、pro文件五、编译实践一、webp介绍 WebP 是一种现代的图像格式,由 Google 开发。它能够提供更好的压缩率和图像质量,相比于传统的 JPEG 和 PNG 格式。WebP 图像通常具有更小的文件大小,同时保持高质量的图像细节,这使得它…

总结6(循环(for))

循环 定义: 某些代码会被重复执行 分类: for 1.格式 for(1; 2; 3) 语句A; 2.执行的流程(1,2,A,3 2,A,3 2,A,3..........) 单个for循环的使用 多个for循环的嵌套使用 1). for(1; 2; 3) for&#xff0…

【数据分析】numpy基础第二天

文章目录 前言数组的形状变换reshape的基本介绍使用reshapereshape([10, 1])运行结果reshape自动判断形状reshape([-1, 1])运行结果 合并数组使用vstack和hstackvstack和hstack的运行结果使用concatenateconcatenate运行结果 分割数组array_split运行结果 数组的条件筛选条件筛…

qemu单步调试arm64 linux kernel

一、背景和目的 qemu搭建arm64 linux kernel环境-CSDN博客 之前介绍了qemu启动kernel的配置步骤和方法,现在开始我们的调试,这篇文章主要讲解如何单步调试内核,所有的实验还是基于ARM64; 二、环境准备 需要准备hostx86 target…

VirtualBox:安装提示缺少python core和win32 api

最近升级了Ubuntu22.04,查了一下,VirtualBox的增强功能,居然用时5分钟。 5min 18ms vboxadd.service 据说升级VirtualBox的增强功能能解决这个问题,于是先升级VirtualBox,但是安装VirtualBox却报缺少python core及win3…

Android App开发-简单控件(4)——按钮触控和图像显示

3.4 按钮触控 本节介绍了按钮控件的常见用法,包括:如何设置大小写属性与点击属性,如何响应按钮的点击事件和长按事件,如何禁用按钮又该如何启用按钮,等等。 3.4.1 按钮控件Button 除了文本视图之外,按钮…

【GAMES101】Lecture 11 贝塞尔曲线

曲线这部分基本上就单讲了贝塞尔曲线 目录 贝塞尔曲线(Bezier curves) De Casteljau’s algorithm B-splines 贝塞尔曲线(Bezier curves) 很早之前说过的这种矢量图是不会随着放大而失真的,像这种字体&#xff0c…

Mysql第一天

数据库概述 1. 为什么要使用数据库 持久化(persistence):把数据保存到可掉电式存储设备中以供之后使用。(可掉电:内存 使用高电压和低电压来区别0和1进行数据的一个存储但是一旦断电了电压都没了 0和1也就没有了)大多数情况下,特别是企 业级应用&#…

CSS 星空按钮

<template><button class="btn" type="button"><strong>星空按钮</strong><div id="container-stars"><div id="stars"></div></div><div id="glow"><div class=…

HCIA-HarmonyOS设备开发认证-3.内核基础

目录 前言目标一、进程与线程待续。。。 前言 对于任何一个操作系统而言&#xff0c;内核的运行机制与原理是最为关键的部分。本章内容从多角度了解HarmonyOS的内核运行机制&#xff0c;涵盖进程与线程的概念&#xff0c;内存管理机制&#xff0c;网络特性&#xff0c;文件系统…

vue3 + ts vue_3.4 setup单文件组件中的属性

props 各种类型 <script lang"ts" setup>import { ref, PropType } from vue;interface listType {addres: string;code: number;}interface ftObj {obj1: string[];obj2: {[key: string]: any;};}const props defineProps({name: {type: String as PropType&…

C 练习实例50-使用Dev-C++创建项目(圆形体体积计算器)

项目展示 项目案例&#xff1a;圆形体体积计算器 vol.h文件 #include <stdio.h> #include <math.h> #define PI 3.141592654 void cal(int sel); //函数声明 double vol_ball(void); double vol_cylind(void); double vol_cone(void); main.c文件 #include &quo…

代理IP使用指南:风险与注意事项

在当今的数字化时代&#xff0c;使用在线代理IP已经成为一种常见的网络行为。然而&#xff0c;在使用这些代理IP时&#xff0c;我们需要注意一些风险和问题&#xff0c;以确保我们的网络安全和隐本私文。将探讨使用代理IP时需要注意的几个关键问题。 1、代理IP的安全性 使用代理…

【JavaLearn】#(29)Maven引入、Maven项目类型、Maven安装与配置、Maven项目的创建和使用、pom配置文件介绍

1. Maven引入 1.1 传统方式中项目jar包资源的问题 项目中的jar包资源&#xff08;如JDBC驱动包&#xff09;需要我们自己从网上下载&#xff0c;然后手动导入到项目中使用 –> 一旦jar包资源过多&#xff0c;容易造成遗漏&#xff0c;且不好管理 如果有两个项目&#xff0…

LabVIEW动态数据交换实现数据通信

LabVIEW动态数据交换实现数据通信 介绍了LabVIEW软件在驱动一般多功能接口卡中的应用。LabVIEW作为一种图形化编程平台&#xff0c;被广泛应用于自动测量系统、工业过程自动化等领域。利用LabVIEW驱动实验室中常用的多功能接口卡&#xff0c;以实现数据采集和分析。 系统主要…

C#算法(11)—求三个点构成圆的圆心坐标和半径

前言 我们在上位机开发领域也经常会碰到根据三个点求出圆的圆心、半径等信息的场景,本文就是详细的介绍如何根据三个点使用C#代码求出三点构成的圆的圆心坐标、圆半径、三点构成的圆弧的角度。 1、3点求圆分析 A、B、C三个点都是圆上的坐标点,过向量AB做中垂线,过向量AC做…

HCIP寒假第十次作业

第一步&#xff0c;给pc配置IP192.268.1.2-192.168.1.4 第二步&#xff0c;在三个交换机上划分vlan [sw1]vlan 3 [sw1]interface e0/0/2 [sw1-Ethernet0/0/2]port link-type access [sw1-Ethernet0/0/2]port default vlan 3 [sw2]vlan 2 lsw2]interface e0/0/2 [sw2…

【c++】构造函数和析构函数

1.类的6个默认成员函数 如果一个类中什么成员都没有&#xff0c;简称为空类。 空类中真的什么都没有吗&#xff1f;并不是&#xff0c;任何类在什么都不写时&#xff0c;编译器会自动生成以下6个默认成员函数 默认成员函数&#xff1a;用户没有显式实现&#xff0c;编译器…

2024年第一篇博客

这是2024年的第一篇博客&#xff0c;2023年笔者经历了一连串的生活、工作、学习上的转折和调整&#xff0c;跌跌撞撞时光飞逝&#xff0c;转眼间就踏着元旦的钟声步入了2024年&#xff0c;前思后想、辗转反侧、犹豫再三不知道从哪里开始博客新的篇章&#xff0c;这个问题坦诚说…

【Spark系列3】RDD源码解析实战

本文主要讲 1、什么是RDD 2、RDD是如何从数据中构建 一、什么是RDD&#xff1f; RDD&#xff1a;弹性分布式数据集&#xff0c;Resillient Distributed Dataset的缩写。 个人理解&#xff1a;RDD是一个容错的、并行的数据结构&#xff0c;可以让用户显式的将数据存储到磁盘…