Spark---DataFrame存储、Spark UDF函数、UDAF函数

四、DataFrame存储+Spark UDF函数

1、储存DataFrame

1)、将DataFrame存储为parquet文件

2)、将DataFrame存储到JDBC数据库

3)、将DataFrame存储到Hive表

2、UDF:用户自定义函数

可以自定义类实现UDFX接口

java:

SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("udf");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu"));
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public Row call(String s) throws Exception {
return RowFactory.create(s);
	}
});

List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType,true));

StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD,schema);
df.registerTempTable("user");

/**
 * 根据UDF函数参数的个数来决定是实现哪一个UDF  UDF1,UDF2。。。。UDF1xxx
 */
sqlContext.udf().register("StrLen", new UDF1<String,Integer>() {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public Integer call(String t1) throws Exception {
             return t1.length();
	}
}, DataTypes.IntegerType);
sqlContext.sql("select name ,StrLen(name) as length from user").show();

//sqlContext.udf().register("StrLen",new UDF2<String, Integer, Integer>() {
//
//	/**
//	 * 
//	 */
//	private static final long serialVersionUID = 1L;
//
//	@Override
//	public Integer call(String t1, Integer t2) throws Exception {
//return t1.length()+t2;
//	}
//} ,DataTypes.IntegerType );
//sqlContext.sql("select name ,StrLen(name,10) as length from user").show();

sc.stop();	

scala:

1.val spark = SparkSession.builder().master("local").appName("UDF").getOrCreate()
2.val nameList: List[String] = List[String]("zhangsan", "lisi", "wangwu", "zhaoliu", "tianqi")
3.import spark.implicits._
4.val nameDF: DataFrame = nameList.toDF("name")
5.nameDF.createOrReplaceTempView("students")
6.nameDF.show()
7.
8.spark.udf.register("STRLEN",(name:String)=>{
9.name.length
10.})
11.spark.sql("select name ,STRLEN(name) as length from students order by length desc").show(100)

五、UDAF函数

1、UDAF:用户自定义聚合函数

1)、实现UDAF函数如果要自定义类要继承

UserDefinedAggregateFunction类

2)、UDAF原理图

java:

SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("udaf");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu","zhangsan","zhangsan","lisi"));
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public Row call(String s) throws Exception {
              return RowFactory.create(s);
	}
});

List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.registerTempTable("user");
/**
 * 注册一个UDAF函数,实现统计相同值得个数
 * 注意:这里可以自定义一个类继承UserDefinedAggregateFunction类也是可以的
 */
sqlContext.udf().register("StringCount", new UserDefinedAggregateFunction() {
	
   /**
    * 
    */
   private static final long serialVersionUID = 1L;
   /**
    * 更新 可以认为一个一个地将组内的字段值传递进来 实现拼接的逻辑
    * buffer.getInt(0)获取的是上一次聚合后的值
    * 相当于map端的combiner,combiner就是对每一个map task的处理结果进行一次小聚合 
    * 大聚和发生在reduce端.
    * 这里即是:在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算
    */
   @Override
   public void update(MutableAggregationBuffer buffer, Row arg1) {
         buffer.update(0, buffer.getInt(0)+1);

   }
   /**
    * 合并 update操作,可能是针对一个分组内的部分数据,在某个节点上发生的 但是可能一个分组内的数据,会分布在多个节点上处理
    * 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来
    * buffer1.getInt(0) : 大聚和的时候 上一次聚合后的值       
    * buffer2.getInt(0) : 这次计算传入进来的update的结果
    * 这里即是:最后在分布式节点完成后需要进行全局级别的Merge操作
    */
   @Override
   public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
     buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0));
   }
   /**
    * 指定输入字段的字段及类型
    */
   @Override
   public StructType inputSchema() {
     return DataTypes.createStructType(
      Arrays.asList(DataTypes.createStructField("name", 
          DataTypes.StringType, true)));
   }
   /**
    * 初始化一个内部的自己定义的值,在Aggregate之前每组数据的初始化结果
    */
   @Override
   public void initialize(MutableAggregationBuffer buffer) {
         buffer.update(0, 0);
   }
   /**
    * 最后返回一个和DataType的类型要一致的类型,返回UDAF最后的计算结果
    */
   @Override
   public Object evaluate(Row row) {
      return row.getInt(0);
   }
   
   @Override
   public boolean deterministic() {
     //设置为true
     return true;
   }
   /**
    * 指定UDAF函数计算后返回的结果类型
    */
   @Override
   public DataType dataType() {
      return DataTypes.IntegerType;
   }
   /**
    * 在进行聚合操作的时候所要处理的数据的结果的类型
    */
   @Override
   public StructType bufferSchema() {
       return 
       DataTypes.createStructType(
   Arrays.asList(DataTypes.createStructField("bf", DataTypes.IntegerType, 
            true)));
   }
   
});

sqlContext.sql("select name ,StringCount(name) from user group by name").show();

sc.stop();

scala:

1.class MyCount extends UserDefinedAggregateFunction{
2.  //输入数据的类型
3.  override def inputSchema: StructType =    StructType(List[StructField](StructField("xx",StringType,true)))
4.
5.  //在聚合过程中处理的数据类型
6.  override def bufferSchema: StructType =   StructType(List[StructField](StructField("xx",IntegerType,true)))
7.
8.  //最终返回值的类型,与evaluate返回的值保持一致
9.  override def dataType: DataType = IntegerType
10.
11.  //多次运行数据是否一致
12.  override def deterministic: Boolean = true
13.
14.  //每个分区中每组key 对应的初始值
15.  override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0,0)
16.
17.  //每个分区中,每个分组内进行聚合操作
18.  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
19.    buffer.update(0,buffer.getInt(0) + 1)
20.  }
21.
22.  //不同的分区中相同的key的数据进行聚合
23.  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
24.    buffer1.update(0,buffer1.getInt(0)+buffer2.getInt(0))
25.  }
26.
27.  //聚合之后,每个分组最终返回的值,类型要和dataType 一致
28.  override def evaluate(buffer: Row): Any = buffer.getInt(0)
29.}
30.
31.object Test {
32.  def main(args: Array[String]): Unit = {
33.    val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
34.    val list = List[String]("zhangsan","lisi","wangwu","zhangsan","lisi","zhangsan")
35.
36.    import session.implicits._
37.    val frame = list.toDF("name")
38.    frame.createTempView("mytable")
39.
40.    session.udf.register("MyCount",new MyCount())
41.
42.    val result = session.sql("select name,MyCount(name) from mytable group by name")
43.    result.show()
44.
45.  }
46.}
47.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/228551.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Nginx+Promtail+Loki+Grafana 升级ELK强大工具

最近客户有个新需求,就是想查看网站的访问情况,由于网站没有做google的统计和百度的统计,所以访问情况,只能通过日志查看,通过脚本的形式给客户导出也不太实际,给客户写个简单的页面,咱也做不到 成熟的日志解决方案,那就是ELK,还有现在比较火的Loki,(当然还有很多其…

MQTT_fx的使用——连接ONENET

这里写目录标题 前言1、ONENET云平台设备创建1.1、注册并登录ONENET云平台1.2、创建产品1.3、创建物模型1.4、创建设备 2、利用MQTT.fx连接ONENET订阅和发布消息2.1、MQTT.fx和OneNET-token计算工具的安装2.2、使用MQTT.fx连接云平台2.3、发布消息到云平台2.4、订阅云平台信息 …

C语言-文件

文件 1. 简介 文件用来存放程序、文档、音频、视频数据、图片等数据的。 文件就是存放在磁盘上的&#xff0c;一些数据的集合。 在 windows 下可以通过写字板或记事本打开文本文件对文件进行编辑保存。写字板和记事本是微软程序员写的程序&#xff0c;对文件进行打开、显示…

uc_14_IP地址_套接字_字节序转换

1 计算机网络 计算机网络&#xff0c;是指将地理位置不同的具有独立功能的多台计算机及其外部设备&#xff0c;通过通信线路连接起来&#xff0c;在网络操作系统、网络管理软件及网络通信协议的管理和协调下&#xff0c;实现资源共享和信息传递的计算机系统。 网络协议是一种特…

Redis基础系列-持久化

Redis基础系列-持久化 文章目录 Redis基础系列-持久化1. 什么是持久化2. 为什么要持久化3. 持久化的两种方式3.1 持久化方式1&#xff1a;RDB(redis默认持久化方式)3.11 配置步骤-自动触发3.12 配置步骤-手动触发3.12 优点3.13 缺点3.14 检查和修复RDB快照文件3.15 哪些情况会触…

ArcGIS Maps SDK for JS:关闭地图边框(v4.27)

1 问题描述 近期&#xff0c;将ArcGIS Api for JS v4.16更新到了ArcGIS Maps SDK for JS v4.27&#xff0c;原本去除地图的css代码失效了。v4.27需要用.esri-view-surface--touch-none::after控制边框属性。 下面为没有关闭地图边框的效果图。&#xff08;亮色版地图为黑色边…

Python AI学习资料包这一套就够!

历时两个月&#xff0c;小Mo终于为大家整理好了 Python & AI 学习资料包&#xff01; 资料包内容细分为「&#x1f393;新手」「&#x1f3a2;进阶」「&#x1f381;分享」&#xff0c;包括 ✅50 个 PDF 课程资料&#xff0c;从基础到进阶 ✅18 本专业书籍&#xff0c;从…

DevEco Studio 运行项目有时会自动出现.js和.map文件

运行的时候报错了&#xff0c;发现多了.js和.map&#xff0c;而且还不是一个&#xff0c;很多个。 通过查询&#xff0c;好像是之前已知问题了&#xff0c;给的建议是手动删除(一个一个删)&#xff0c;而且有的评论还说&#xff0c;一周出现了3次&#xff0c;太可怕了。 搜的过…

ABP vNext 扩展 CurrentUser

ABP内置Users表&#xff0c;我们可以对其字段进行扩展&#xff0c;辅助进行更详细的数据记录 ICurrentUser 是主要的服务,用于获取有关当前活动的用户信息. 以下是 ICurrentUser 接口的基本属性:1. IsAuthenticated 如果当前用户已登录(已认证),则返回 true. 如果用户尚未登录…

网站采集工具,网站自动采集发布的软件【免费】

在当今数字化时代&#xff0c;网站信息的采集对于市场调研、竞争分析以及内容创作都至关重要。本文将着深入探讨如何通过输入关键词实现全网采集&#xff0c;并支持指定任意网站的详尽数据抓取。 采集软件 147SEO采集软件作为一款强大而灵活的网站信息采集工具&#xff0c;以其…

Notepad安装

中文免安装版&#xff0c;下载解压即可。 NotepadV7.5.6 (访问密码: 1666)https://url48.ctfile.com/f/33868548-986668939-7a3316?p1666

一文帮你搞懂继承(c++笔记)

继承 继承概念继承语法格式定义继承基类成员的访问方式 基类和派生类对象赋值转换继承中的作用域派生类的默认成员函数继承和友元继承和静态成员给我实现一个不能被继承的类菱形继承菱形继承有什么问题呢&#xff1f;疑问&#xff1f;总结 继承和组合 继承概念 继承是面向对象…

105.长度最小的子数组(力扣)|滑动窗口

代码演示 class Solution { public:int minSubArrayLen(int target, vector<int>& nums) {int result INT_MAX; // 用于存储最小子数组的长度int sum 0; // 滑动窗口的长度int i 0; // 滑动窗口的起始位置int sumlength 0; // 当前子数…

docker:部署java Springboot项目

文章目录 1、打 jar 包1、创建Dockerfile3、创建镜像4、启动容器其他注意事项docker中jdk的版本命名举例&#xff1a;openjdk:11-ea-17-jre-slim举例&#xff1a;8u312-jre-nanoserver-1809 通过find找文件 1、打 jar 包 将项目打一个 jar 包&#xff0c;可以使用 IDEA 1、…

第一课【习题】给应用添加通知和提醒

构造进度条模板通知&#xff0c;name字段当前需要固定配置为downloadTemplate。 给通知设置分发时间&#xff0c;需要设置showDeliveryTime为false。 OpenHarmony提供后台代理提醒功能&#xff0c;在应用退居后台或退出后&#xff0c;计时和提醒通知功能被系统后台代理接管…

浪潮信息KeyarchOS——保卫数字未来的安全防御利器

浪潮信息KeyarchOS——保卫数字未来的安全防御利器 前言 众所周知&#xff0c;目前流行的操作系统有10余种&#xff0c;每一款操作系统都有自己的特点。作为使用者&#xff0c;我们该如何选择操作系统。如果你偏重操作系统的安全可信和稳定高效&#xff0c;我推荐你使用浪潮信…

Docker网络原理

Docker网络概述 1.桥接模式介绍 bridge模式是docker的默认网络模式。 桥接模式是一种用于连接两个不同网络段的设备&#xff0c;使它们能够共享通信的一种方式。 桥接设备工作在OSI模型的第二层&#xff0c;即数据链路层&#xff0c;通常基于MAC地址进行帧转发。 物理层连接…

2024山东健博会,济南健康展,5月中国大健康展,健康管理展

China-DJK山东健博会&#xff1a;5月黄金招商季&#xff0c;携千家参展商、万余款产品精彩亮相&#xff1b; DJK 2024第6届中国&#xff08;济南&#xff09;国际大健康产业博览会 The 2024 sixth China (Jinan) International Big Health Industry Expo 时间&#xff1a;2024…

FastAPI之Cookie参数

示例代码 from typing import Annotated, Unionfrom fastapi import Cookie, FastAPIapp FastAPI()app.get("/items/") async def read_items(ads_id: Annotated[str, Cookie()]):print(ads_id)return {"ads_id": ads_id, "code": 200}测试效果…

柔性数组详解

目录 一、柔性数组的特点&#xff1a;​ 二、使用sizeof计算包含柔性数组的结构体的大小 三、柔性数组的使用 代码1: 四、用结构体指针代替柔性数组 代码2&#xff1a; 五、柔性数组与结构体指针对比 • 在结构体中 • 最后一个成员 • 未知大小的数组 这就是柔性…