弱类型和强类型自定义UDAF函数

目录

    • 使用自带的avg函数
    • 弱类型自定义UDAF函数(AVG)
    • 强类型自定义UDAF函数(AVG)

弱类型:3.x过期 2.x有
强类型:3.x 2.x没有

使用自带的avg函数

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object UserDefinedUDAF {

  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder().appName("test").master("local[4]").getOrCreate()
    import spark.implicits._

    val list = List(
      ("zhangsan",20,"北京"),
      ("sd",30,"深圳"),
      ("asd",40,"北京"),
      ("asd",50,"深圳"),
      ("asdad",60,"深圳"),
      ("gfds",70,"北京"),
      ("dfg",60,"深圳"),
      ("erw",80,"上海"),
      ("asd",18,"广州"),
      ("sdassws",20,"广州"),
    )

    val rdd: RDD[(String, Int, String)] = spark.sparkContext.parallelize(list, 2)
    val df: DataFrame = rdd.toDF("name", "age", "region")
    df.createOrReplaceTempView("person")
    spark.sql(
      """
        |select
        |region,
        |avg(age)
        |from person group by region
        |""".stripMargin).show()
  }

}

结果
在这里插入图片描述

弱类型自定义UDAF函数(AVG)

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, StructType}

/**
 * 自定义弱类型UDAF函数
 *     1.创建class继承
 */
class WeakAvgUDAF extends UserDefinedAggregateFunction{


  /**
   * 指定UDAF函数的参数类型【自定义avg函数,针对的参数是age,类型是Int类型】
   * @return
   */
  override def inputSchema: StructType = {
    new StructType()
      .add("input", IntegerType)
  }

  /**
   * 指定中间变量的类型【求一组区域的平均值,需要统计总年龄和人的个数】(因为最后要年龄除以人数才是平均年龄)
   * @return
   */
  override def bufferSchema: StructType = {
    new StructType()
      .add("sum", IntegerType)
      .add("count", IntegerType)

  }

  /**
   * 指定UDAF最终计算结果类型
   * @return
   */
  override def dataType: DataType = DoubleType

  /**
   * 一致性的执行
   * @return
   */
  override def deterministic: Boolean = true

  /**
   * 指定中间变量的初始化[sum=0,count=0]
   * @param buffer
   */
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    //sum = 0
    buffer(0) = 0
    //count = 0
    buffer(1) = 0
  }

  /**
   * 类似combiner操作,针对每个组单个age值进行计算
   * @param buffer  中间变量的封装[sum,count]
   * @param input   组中一个值(age)
   */
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getAs[Int](0) + input.getAs[Int](0)
    buffer(1) = buffer.getAs[Int](1) + 1
  }

  /**
   *
   * @param buffer1 中间变量的封装[sum,count]
   * @param buffer2 combine的结果[sum,count]
   */
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    //sum = sum + combiner_sum
    buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)
    //count = count + combiner_count
    buffer1(1) = buffer1.getAs[Int](1) + buffer2.getAs[Int](1)
  }

  /**
   * 计算最终结果
   * @param buffer [中间变量封装 sum,count]
   * @return
   */
  override def evaluate(buffer: Row): Any = {
    buffer.getAs[Int](0).toDouble / buffer.getAs[Int](1)
  }
}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object UserDefinedUDAF {

  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder().appName("test").master("local[4]").getOrCreate()
    import spark.implicits._

    val list = List(
      ("zhangsan",20,"北京"),
      ("sd",30,"深圳"),
      ("asd",40,"北京"),
      ("asd",50,"深圳"),
      ("asdad",60,"深圳"),
      ("gfds",70,"北京"),
      ("dfg",60,"深圳"),
      ("erw",80,"上海"),
      ("asd",18,"广州"),
      ("sdassws",20,"广州"),
    )

    val rdd: RDD[(String, Int, String)] = spark.sparkContext.parallelize(list, 2)
    val df: DataFrame = rdd.toDF("name", "age", "region")
    df.createOrReplaceTempView("person")
    spark.udf.register("myavg",new WeakAvgUDAF)
    spark.sql(
      """
        |select
        |region,
        |myavg(age)
        |from person group by region
        |""".stripMargin).show()
  }

}

在这里插入图片描述

强类型自定义UDAF函数(AVG)

import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator


/**
 * 自定义强类型UDAF函数
 * 1.定义class继承Aggregator[IN,BUFF,OUT]
 *     IN:代表UDAF函数参数类型
 *     BUFF:代表计算过程中中间变量类型
 *     OUT:最终计算结果类型
 * 2.重写重抽象方法
 * 强类型自定义UDAF函数的使用
 *   1.创建自定义UDAF对象 : val obj = new xxx
 *   2,导入转换方法 import org.apache.spark.sql.function._
 *   3.转换:val function = udaf(obj)
 *   4.注册 spark.udf.register(函数名,function)
 */
case class AvgBuff(sum:Int,count:Int)
class StrongAvgUDAF extends Aggregator[Int,AvgBuff,Double]{

  /**
   * 初始化中间变量值
   * @return
   */
  override def zero: AvgBuff = AvgBuff(0,0)

  /**
   * combiner计算
   * @param buff  中间结果
   * @param age   udaf参数
   * @return   返回累加之后的中间结果
   */
  override def reduce(buff: AvgBuff, age: Int): AvgBuff = AvgBuff(buff.sum+age,buff.count+1);

  /**
   * reducer聚合
   * @param b1 中间结果
   * @param b2 combiner聚合结果
   * @return  返回累加之后的中间
   */
  override def merge(b1: AvgBuff, b2: AvgBuff): AvgBuff = AvgBuff(b1.sum + b2.sum,b1.count+b2.count)

  /**
   * 计算最终结果
   * @param reduction
   * @return
   */
  override def finish(buff: AvgBuff): Double = buff.sum.toDouble / buff.count

  /**
   * 指定中间结果序列化
   * @return
   */
  override def bufferEncoder: Encoder[AvgBuff] = Encoders.product[AvgBuff]

  /**
   * 指定最终序列化类型
   * @return
   */
  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object UserDefinedUDAF {

  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder().appName("test").master("local[4]").getOrCreate()
    import spark.implicits._

    val list = List(
      ("zhangsan",20,"北京"),
      ("sd",30,"深圳"),
      ("asd",40,"北京"),
      ("asd",50,"深圳"),
      ("asdad",60,"深圳"),
      ("gfds",70,"北京"),
      ("dfg",60,"深圳"),
      ("erw",80,"上海"),
      ("asd",18,"广州"),
      ("sdassws",20,"广州"),
    )

    val rdd: RDD[(String, Int, String)] = spark.sparkContext.parallelize(list, 2)
    val df: DataFrame = rdd.toDF("name", "age", "region")
    df.createOrReplaceTempView("person")
    //TODO 弱类型的注册
    spark.udf.register("myavg",new WeakAvgUDAF)
    //TODO 强类型的注册
    import org.apache.spark.sql.functions._
    spark.udf.register("myavg2",udaf(new StrongAvgUDAF))
    spark.sql(
      """
        |select
        |region,
        |myavg2(age)
        |from person group by region
        |""".stripMargin).show()

  }

}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/150976.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

GD32_ADC采样+DMA多通道扫描传输

GD32_ADC采样DMA多通道扫描传输 文章目录 GD32_ADC采样DMA多通道扫描传输前言一、资源介绍二、原理1.ADC连续扫描模式2.DMA传输3.ADC内部通道 三、配置1.ADC配置2.DMA配置3.注意事项 四、计算1.分压转换2.数据转换 前言 <1>、硬件平台&#xff1a;可运行软件程序的GD32单…

【计算思维】少儿编程蓝桥杯青少组计算思维题考试真题及解析B

STEMA考试-计算思维-U8级(样题) 1.浩浩的左⼿边是&#xff08; &#xff09;。 A.兰兰 B.⻉⻉ C.⻘⻘ D.浩浩 2.2时30分&#xff0c;钟⾯上时针和分针形成的⻆是什么⻆&#xff1f;&#xff08; &#xff09; A.钝⻆ B.锐⻆ C.直⻆ D.平⻆ 3.下⾯是⼀年级同学最喜欢的《⻄游记》…

人工智能基础_机器学习037_多项式回归升维实战4_使用随机梯度下降模型_对天猫双十一销量数据进行预测_拟合---人工智能工作笔记0077

上一节我们使用线性回归模型最终拟合了双十一天猫销量数据,升维后的数据. 我们使用SGDRegressor的时候,随机梯度下降的时候,发现有问题, 对吧,怎么都不能拟合我们看看怎么回事现在 可以看到上面是之前的代码 上面是对数据的准备 这里我们还是修改,使用 poly=PolynomialFeatur…

nodejs+vue电影在线预定与管理系统的设计与实现-微信小程序-安卓-python-PHP-计算机毕业设计

通过软件的需求分析已经获得了系统的基本功能需求&#xff0c;根据需求&#xff0c;将电影在线预定与管理系统功能模块主要分为管理员模块。 我国各行各业的发展在信息化浪潮的推动下也在不断进步&#xff0c;尤其是电影产业&#xff0c;在人们生活水平提高的同时&#xff0c;从…

旅拍摄影技巧澳大利亚、韩国旅行攻略

欢迎关注「苏南下」 在这里分享我的旅行和影像创作心得 刚刚在腾讯内部做了一场摄影分享课&#xff1a; 《旅拍摄影技巧&澳大利亚、韩国旅行攻略》 分享了早前去两个国家的一些旅行见闻和摄影心得。我发现&#xff1a;把自己学会的东西整理出来&#xff0c;再告诉给别人这件…

探索人工智能领域——每日30个名词详解【day3】

目录 前言 正文 总结 &#x1f308;嗨&#xff01;我是Filotimo__&#x1f308;。很高兴与大家相识&#xff0c;希望我的博客能对你有所帮助。 &#x1f4a1;本文由Filotimo__✍️原创&#xff0c;首发于CSDN&#x1f4da;。 &#x1f4e3;如需转载&#xff0c;请事先与我联系以…

c语言从入门到实战——数组指针与函数指针

数组指针与函数指针 前言1. 字符指针变量2. 数组指针变量2.1 数组指针变量是什么&#xff1f;2.2 数组指针变量怎么初始化? 3. 二维数组传参的本质4. 函数指针变量4.1 函数指针变量的创建4.2 函数指针变量的使用4.3 两段有趣的代码4.3.1 typedef关键字 5. 函数指针数组6. 转移…

【华为HCIP | 华为数通工程师】ISIS 高频题(1)

个人名片&#xff1a; &#x1f43c;作者简介&#xff1a;一名大三在校生&#xff0c;喜欢AI编程&#x1f38b; &#x1f43b;‍❄️个人主页&#x1f947;&#xff1a;落798. &#x1f43c;个人WeChat&#xff1a;hmmwx53 &#x1f54a;️系列专栏&#xff1a;&#x1f5bc;️…

机器人导航+OPENCV透视变换示例代码

透视变换又称四点变换&#xff0c;所以不能用于5边形这样的图形变换&#xff0c;不是真正的透视变换&#xff0c;但是这个方法可以把机器人看到的图像转换为俯视图&#xff0c;这样就可以建立地图&#xff0c;要不然怎么建立地图呢。 void CrelaxMyFriendDlg::OnBnClickedOk()…

【JavaSE语法】类和对象(二)

六、 封装 6.1 封装的概念 面向对象程序三大特性&#xff1a;封装、继承、多态。而类和对象阶段&#xff0c;主要研究的就是封装特性。 封装&#xff1a;将数据和操作数据的方法进行有机结合&#xff0c;隐藏对象的属性和实现细节&#xff0c;仅对外公开接口来和对象进行交互…

PCL 提取点云边界轮廓-AC方法、平面轮廓

一、概述 PCL点云边界特征检测 &#xff08;附完整代码 C&#xff09;_pcl计算点云特征值_McQueen_LT的博客-CSDN博客 在点云的边界特征检测&#xff08;网格模型的边界特征检测已经是一个确定性问题了&#xff0c;见 网格模型边界检测&#xff09;方面&#xff0c;PCL中有一…

【C++初阶】STL详解(一)string类

本专栏内容为&#xff1a;C学习专栏&#xff0c;分为初阶和进阶两部分。 通过本专栏的深入学习&#xff0c;你可以了解并掌握C。 &#x1f493;博主csdn个人主页&#xff1a;小小unicorn ⏩专栏分类&#xff1a;C &#x1f69a;代码仓库&#xff1a;小小unicorn的代码仓库&…

【Hello Go】Go语言基础类型

Go语言基础类型 基础类型命名变量变量声明变量初始化变量赋值匿名变量 常量字面常量常量定义iota枚举 基础数据类型分类 fmt包的标准输入输出格式说明输入类型转换类型取别名 基础类型 命名 Go语言中的命名遵循下面的几个规则 必须以字母或者是下划线开头不能使用Go语言中的…

Django——模板层、模型层

模板层 一. 模版语法 {{ }}: 变量相关 {% %}: 逻辑相关 1. 注释是代码的母亲 {# ... #} 2. 基本数据类型传值 int1 123 float1 11.11 str1 我也想奔现 bool1 True list1 [小红, 姗姗, 花花, 茹茹] tuple1 (111, 222, 333, 444) dict1 {username: jason, age: 18, i…

单片机的冷启动、热启动、复位

一文看懂STC单片机冷启动和复位有什么区别-电子发烧友网 单片机的冷启动、热启动和复位是不同的启动或重置方式&#xff0c;它们在系统状态和初始化方面有所不同&#xff1a; 1.冷启动&#xff08;Cold Start&#xff09;&#xff1a; 定义&#xff1a; 冷启动是指系统从完全关…

第14届蓝桥杯青少组python试题解析:22年10月选拔赛

选择题 T1. 执行print (5%3) 语句后&#xff0c;输出的结果是 ( ) 0 1 2 3 T2. 以下选项中&#xff0c;哪一个是乘法运算符?&#xff08;&#xff09; % // * ** T3. 已知x3&#xff0c;求x//2x**2的运算结果? 7.5 10 8 10.5 T4. 以下选项中&#xff0c;对下面程序的打印…

Unity地面交互效果目录

大家好&#xff0c;我是阿赵。   之前写了几篇关于地形交互、地面轨迹、脚印效果实现的博文。虽然写的篇数不多&#xff0c;但里面也包含了不少基础知识&#xff0c;比如局部UV采样、法线动态混合、曲面细分等知识&#xff0c;这些都是可以和别的效果组合在一起&#xff0c;做…

【刷题专栏—突破思维】142. 环形链表 II

前言&#xff1a;本篇博客将讲解三个OJ题&#xff0c;前两个作为铺垫&#xff0c;最后完成环形链表的节点的寻找 文章目录 一、160. 相交链表二、141. 环形链表三、142. 环形链表II 一、160. 相交链表 题目链接&#xff1a;LeetCode—相交链表 题目描述&#xff1a; 给你两个单…

排查线程阻塞问题

案例代码 package first;import java.util.concurrent.TimeUnit;public class DeadLock {private static volatile Object lock new Object();public static void main(String[] args) {new Thread(() -> {test1();}).start();new Thread(() -> {test2();}).start();}p…

队列的实现---超详细

队列的实现—超详细 文章目录 队列的实现---超详细一、队列的模型二、代码实现以及测试用例①队列初始化②入队③出队④输出队头⑤输出队尾⑥判断队列是否为空⑦队列的长度⑧队列的销毁⑨测试用例 一、队列的模型 队列&#xff1a;只允许在一端进行插入数据操作&#xff0c;在…