Spark SQL函数详解:案例解析(第8天)

系列文章目录

1- Spark SQL函数定义(掌握)
2- Spark 原生自定义UDF函数案例解析(掌握)
3- Pandas自定义函数案例解析(熟悉)
4- Apache Arrow框架案例解析(熟悉)
5- spark常见面试题

文章目录

  • 系列文章目录
  • 前言
    • 一、Spark SQL函数定义(掌握)
      • 1. 窗口函数
      • 2. 自定义函数背景
        • 2.1 回顾函数分类标准
        • 2.2 自定义函数背景
    • 二、Spark原生自定义UDF函数
      • 1. 自定义函数流程
        • 1.1 自定义演示一
        • 1.2 自定义演示二
        • 1.3 自定义演示三
    • 三、Pandas的自定义函数
      • 1. Apache Arrow框架
      • 2. 基于Arrow完成Pandas和Spark的DataFrame互转
      • 3. 基于Pandas自定义函数
        • 3.1 自定义函数流程
        • 3.2 自定义UDF函数
        • 3.3 自定义UDAF函数
    • 四、Spark常见面试题
      • 1. Spark client 和Spark cluster的区别?
      • 2. Spark常用端口号
      • 3. Repartitons和Coalesce区别


前言

本文主要通过案例解析工作中常用的Spark SQL函数,以及应用场景


一、Spark SQL函数定义(掌握)

1. 窗口函数

回顾之前学习过的窗口函数:

分析函数 over(partition by xxx order by xxx [asc|desc] [rows between xxx and xxx])

分析函数可以大致分成如下3类:
1- 第一类: 聚合函数 sum() count() avg() max() min()
2- 第二类: 排序函数 row_number() rank() dense_rank() 
3- 第三类: 其他函数 ntile()  first_value() last_value() lead() lag() 

三个排序函数的区别?
row_number(): 巧记 1234  特点: 唯一且连续
rank(): 巧记 1224 特点: 并列不连续
dense_rank(): 巧记 1223  特点: 并列且连续

在Spark SQL中使用窗口函数案例:

已知数据如下:

cookie1,2018-04-10,1
cookie1,2018-04-11,5
cookie1,2018-04-12,7
cookie1,2018-04-13,3
cookie1,2018-04-14,2
cookie1,2018-04-15,4
cookie1,2018-04-16,4
cookie2,2018-04-10,2
cookie2,2018-04-11,3
cookie2,2018-04-12,5
cookie2,2018-04-13,6
cookie2,2018-04-14,3
cookie2,2018-04-15,9
cookie2,2018-04-16,7

需求: 要求找出每个cookie中pv排在前3位的数据,也就是分组取TOPN问题

# 导包
import os
from pyspark.sql import SparkSession,functions as F,Window as W

# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()

    # 2.数据输入
    df = spark.read.csv(
        path='file:///export/data/spark_project/spark_sql/data/cookie.txt',
        sep=',',
        schema='cookie string,datestr string,pv int'
    )
    # 3.数据处理(切分,转换,分组聚合)
    # 4.数据输出
    etldf = df.dropDuplicates().dropna()
    # SQL方式
    etldf.createTempView('cookie_logs')
    spark.sql(
        """
        select cookie,datestr,pv
        from (
           select cookie,datestr,pv,
              dense_rank() over(partition by cookie order by pv desc) as rn
           from cookie_logs
        ) temp where rn <=3 
        """
    ).show()
    # DSL方式
    etldf.select(
        'cookie', 'datestr', 'pv',
        F.dense_rank().over( W.partitionBy('cookie').orderBy(F.desc('pv')) ).alias('rn')
    ).where('rn <=3').select('cookie', 'datestr', 'pv').show()


    # 5.关闭资源
    spark.stop()

运行结果截图:

在这里插入图片描述

2. 自定义函数背景

2.1 回顾函数分类标准

SQL函数,主要分为以下三大类:

  • UDF函数:普通函数
    • 特点:一对一,输入一个得到一个
    • 例如:split() …
  • UDAF函数:聚合函数
    • 特点:多对一,输入多个得到一个
    • 例如:sum() avg() count() min() max() …
  • UDTF函数:表生成函数
    • 特点:一对多,输入一个得到多个
    • 例如:explode() …

在SQL中提供的所有的内置函数,都是属于以上三类中某一类函数

2.2 自定义函数背景

思考:有这么多的内置函数,为啥还需要自定义函数呢?

	为了扩充函数功能。在实际使用中,并不能保证所有的操作函数都已经提前的内置好了。很多基于业务处理的功能,其实并没有提供对应的函数,提供的函数更多是以公共功能函数。此时需要进行自定义,来扩充新的功能函数

​ 在Spark SQL中,针对Python语言,对于自定义函数,原生支持的并不是特别好。目前原生仅支持自定义UDF函数,而无法自定义UDAF函数和UDTF函数。

​ 在1.6版本后,Java 和scala语言支持自定义UDAF函数,但Python并不支持。

1- SparkSQL原生的时候,Python只能开发UDF函数
2- SparkSQL借助其他第三方组件(Arrow,pandas...),Python可以开发UDF、UDAF函数,同时也提升效率

在这里插入图片描述

Spark SQL原生UDF函数存在的问题:大量的序列化和反序列

	虽然Python支持自定义UDF函数,但是其效率并不是特别的高效。因为在使用的时候,传递一行处理一行,返回一行的方式。这样会带来非常大的序列化的开销的问题,导致原生UDF函数效率不好
	
早期解决方案: 基于Java/Scala来编写自定义UDF函数,然后基于python调用即可
	
目前主要的解决方案: 引入Arrow框架,可以基于内存来完成数据传输工作,可以大大的降低了序列化的开销,提供传输的效率,解决原生的问题。同时还可以基于pandas的自定义函数,利用pandas的函数优势完成各种处理操作

在这里插入图片描述

二、Spark原生自定义UDF函数

1. 自定义函数流程

第一步: 在PySpark中创建一个Python的函数,在这个函数中书写自定义的功能逻辑代码即可

第二步: 将Python函数注册到Spark SQL中
	注册方式一: udf对象 = sparkSession.udf.register(参数1,参数2,参数3)
		参数1: 【UDF函数名称】,此名称用于后续在SQL中使用,可以任意取值,但是要符合名称的规范
		参数2: 【自定义的Python函数】,表示将哪个Python的函数注册为Spark SQL的函数
		参数3: 【UDF函数的返回值类型】。用于表示当前这个Python的函数返回的类型
		udf对象: 返回值对象,是一个UDF对象,可以在DSL中使用
	
		说明: 如果通过方式一来注册函数, 【可以用在SQL和DSL】
	
	注册方式二:  udf对象 = F.udf(参数1,参数2)
		参数1: Python函数的名称,表示将那个Python的函数注册为Spark SQL的函数
		参数2: 返回值的类型。用于表示当前这个Python的函数返回的类型
		udf对象: 返回值对象,是一个UDF对象,可以在DSL中使用
		
		说明: 如果通过方式二来注册函数,【仅能用在DSL中】
		
	注册方式三:  语法糖写法  @F.udf(returnType=返回值类型)  放置到对应Python的函数上面
		说明: 实际是方式二的扩展。如果通过方式三来注册函数,【仅能用在DSL中】
	
		
第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可

1.1 自定义演示一

需求1: 请自定义一个函数,完成对 数据 统一添加一个后缀名的操作 , 例如后缀名 ‘_itheima’

效果如下:

在这里插入图片描述

# 导包
import os
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import StringType

# 绑定指定的python解释器

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()


    # 2.数据输入
    df = spark.createDataFrame(
        data=[(1,'张三','广州'),(2,'李四','深圳')],
        schema='id int,name string,address string'
    )
    df.show()
    
    # 3.SparkSQL自定义udf函数
    # 第一步.自定义python函数
    def add_suffix(data):
        return data+'_itheima'

    # 第二步.把python函数注册到SparkSQL
    # ① spark.udf.register注册
    dsl1_add_suffix = spark.udf.register('sql_add_suffix',add_suffix,StringType())
    # ②F.udf注册
    dsl2_add_suffix = F.udf(add_suffix, StringType())
    # ③@F.udf注册
    @F.udf( StringType())
    def candy_add_suffix(data):
        return data+'_itheima'

    # 第三步.在SparkSQL中调用自定义函数
    # SQL方式
    df.createTempView('temp')
    spark.sql(
        """select id,name,sql_add_suffix(address) as new_address from temp"""
    ).show()
    
    # DSL方式
    # 调用dsl1_add_suffix
    df.select(
        'id', 'name', dsl1_add_suffix('address').alias('new_address')
    ).show()
    # 调用dsl2_add_suffix
    df.select(
        'id', 'name', dsl2_add_suffix('address').alias('new_address')
    ).show()
    # 调用candy_add_suffix
    df.select(
        'id', 'name', candy_add_suffix('address').alias('new_address')
    ).show()

    # 4.关闭资源
    spark.stop()

博主友情提醒: 可能遇到的问题如下

在这里插入图片描述

原因: 在错误的地方调用了错误的函数。spark.udf.register参数1取的函数名只能在SQL中使用,不能在DSL中用。
1.2 自定义演示二

需求2: 请自定义一个函数,返回值类型为复杂类型: 列表

效果如下:

在这里插入图片描述

参考代码:

# 导包
import os
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import StringType, ArrayType

# 绑定指定的python解释器

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()


    # 2.数据输入
    df = spark.createDataFrame(
        data=[(1,'张三_广州'),(2,'李四_深圳')],
        schema='id int,name_address string'
    )
    df.show()

    # 3.SparkSQL自定义udf函数
    # 第一步.自定义python函数
    def my_split(data:str):
        list1 = data.split('_')
        return list1

    # 第二步.把python函数注册到SparkSQL
    # ① spark.udf.register注册
    dsl1_add_suffix = spark.udf.register('sql_add_suffix',my_split,ArrayType(StringType()))
    # ②F.udf注册
    dsl2_add_suffix = F.udf(my_split, ArrayType(StringType()))
    # ③@F.udf注册
    @F.udf(ArrayType(StringType()))
    def candy_add_suffix(data):
        list1 = data.split('_')
        return list1

    # 第三步.在SparkSQL中调用自定义函数
    # SQL方式
    df.createTempView('temp')
    spark.sql(
        """select id,sql_add_suffix(name_address) as new_address from temp"""
    ).show()

    # DSL方式
    # 调用dsl1_add_suffix
    df.select(
        'id',  dsl1_add_suffix('name_address').alias('new_name_address')
    ).show()
    # 调用dsl2_add_suffix
    df.select(
        'id',dsl2_add_suffix('name_address').alias('new_name_address')
    ).show()
    # 调用candy_add_suffix
    df.select(
        'id',candy_add_suffix('name_address').alias('new_name_address')
    ).show()


    # 4.关闭资源
    spark.stop()
1.3 自定义演示三

需求3: 请自定义一个函数,返回值类型为复杂类型: 字典

效果如下:

在这里插入图片描述

注意: 注意: 如果是字典类型,StructType中列名需要和字典的key值一致,否则是null补充
# 导包
import os
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import StringType, ArrayType, StructType

# 绑定指定的python解释器

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()


    # 2.数据输入
    df = spark.createDataFrame(
        data=[(1,'张三_广州'),(2,'李四_深圳')],
        schema='id int,name_address string'
    )
    df.show()

    # 3.SparkSQL自定义udf函数
    # 第一步.自定义python函数
    def my_split(data:str):
        list1 = data.split('_')
        return {'name':list1[0],'address':list1[1]}

    # 第二步.把python函数注册到SparkSQL
    # 注意: 如果是字典类型,StructType中列名需要和字典的key值一致,否则是null
    t = StructType().add('name',StringType()).add('address',StringType())
    # ① spark.udf.register注册
    dsl1_add_suffix = spark.udf.register('sql_add_suffix',my_split,t)
    # ②F.udf注册
    dsl2_add_suffix = F.udf(my_split, t)
    # ③@F.udf注册
    @F.udf(t)
    def candy_add_suffix(data):
        list1 = data.split('_')
        return {'name':list1[0],'address':list1[1]}

    # 第三步.在SparkSQL中调用自定义函数
    # SQL方式
    df.createTempView('temp')
    spark.sql(
        """select id,sql_add_suffix(name_address) as new_name_address from temp"""
    ).show()

    # DSL方式
    # 调用dsl1_add_suffix
    df.select(
        'id', dsl1_add_suffix('name_address').alias('new_name_address')
    ).show()
    # 调用dsl2_add_suffix
    df.select(
        'id',dsl2_add_suffix('name_address').alias('new_name_address')
    ).show()
    # 调用candy_add_suffix
    df.select(
        'id',candy_add_suffix('name_address').alias('new_name_address')
    ).show()

    # 4.关闭资源
    spark.stop()

三、Pandas的自定义函数

1. Apache Arrow框架

​ Apache Arrow是Apache旗下的一款顶级的项目。是一个跨平台的在内存中以列式存储的数据层,它的设计目标就是作为一个跨平台的数据层,来加快大数据分析项目的运行效率

​ Pandas 与 Spark SQL 进行交互的时候,建立在Apache Arrow上,带来低开销 高性能的UDF函数

如何安装? 三个节点建议都安装(注:集群搭建后续会更新)

检查服务器上是否有安装pyspark
pip list | grep pyspark  或者 conda list | grep pyspark

如果服务器已经安装了pyspark的库,那么仅需要执行以下内容,即可安装。例如在 node1安装
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark[sql]
	
如果服务器中python环境中没有安装pyspark,建议执行以下操作,即可安装。例如在 node2 和 node3安装
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyarrow==16.1.0

在这里插入图片描述

Arrow并不会自动使用,在某些情况下,需要配置 以及在代码中需要进行小的更改才可以使用

如何使用呢? 默认不会自动启动的, 一般建议手动配置

spark.conf.set('spark.sql.execution.arrow.pyspark.enabled',True)

2. 基于Arrow完成Pandas和Spark的DataFrame互转

Pandas中DataFrame:

DataFrame:表示一个二维表对象,就是表示整个表

字段、列、索引;Series表示一列

在这里插入图片描述

Spark SQL中DataFrame:

在这里插入图片描述

使用场景:

1- Spark的DataFrame -> Pandas的DataFrame:当大数据处理到后期的时候,可能数据量会越来越少,这样可以考虑使用单机版的Pandas来做后续数据的分析

2- Pandas的DataFrame -> Spark的DataFrame:当数据量达到单机无法高效处理的时候,或者需要和其他大数据框架集成的时候,可以转成Spark中的DataFrame

Pandas的DataFrame -> Spark的DataFrame: spark.createDataFrame(data=pandas_df)
Spark的DataFrame -> Pandas的DataFrame: init_df.toPandas()

示例:

# 导包
import os
from pyspark.sql import SparkSession

# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()

    # TODO: 手动开启arrow框架
    spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)

    # 2.数据输入
    df = spark.createDataFrame(
        data=[(1,'张三_广州'),(2,'李四_深圳')],
        schema='id int ,name_address string'
    )
    df.show()
    print(type(df))
    print('------------------------')

    # 3.数据处理(切分,转换,分组聚合)
    # 4.数据输出
    # spark->pandas
    pd_df = df.toPandas()
    print(pd_df)
    print(type(pd_df))

    print('------------------------')
    # pandas->spark
    df2 = spark.createDataFrame(pd_df)
    df2.show()
    print(type(df2))
    

    # 5.关闭资源
    spark.stop()

3. 基于Pandas自定义函数

​ 基于Pandas的UDF函数来转换为Spark SQL的UDF函数进行使用。底层是基于Arrow框架来完成数据传输,允许向量化(可以充分利用计算机CPU性能)操作。

​ Pandas的UDF函数其实本质上就是Python的函数,只不过函数的传入数据类型为Pandas的类型

基于Pandas的UDF可以使用自定义UDF函数和自定义UDAF函数

3.1 自定义函数流程
第一步: 在PySpark中创建一个Python的函数,在这个函数中书写自定义的功能逻辑代码即可

第二步: 将Python函数包装成Spark SQL的函数
	注册方式一: udf对象 = spark.udf.register(参数1, 参数2)
		参数1: UDF函数名称。此名称用于后续在SQL中使用,可以任意取值,但是要符合名称的规范
		参数2: Python函数的名称。表示将哪个Python的函数注册为Spark SQL的函数
		使用: udf对象只能在DSL中使用。参数1指定的名称只能在SQL中使用
		
		
	注册方式二: udf对象 = F.pandas_udf(参数1, 参数2)
		参数1: 自定义的Python函数。表示将哪个Python的函数注册为Spark SQL的函数
		参数2: UDF函数的返回值类型。用于表示当前这个Python的函数返回的类型对应到Spark SQL的数据类型
		udf对象: 返回值对象,是一个UDF对象。仅能用在DSL中使用
	
	注册方式三: 语法糖写法  @F.pandas_udf(returnType)  放置到对应Python的函数上面
		说明: 实际是方式二的扩展。仅能用在DSL中使用
	
第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可

基于pandas方式还支持自定义UDAF函数
注意: 如果要用于自定义UDAF函数,理论上只能用上述注册方式三语法糖方式,也就意味着理论只能DSL使用
注意: 如果还想同时用SQL方式和DSL方式,可以把加了语法糖的函数,再传入到方式register注册,就可以使用了!
3.2 自定义UDF函数
  • 自定义Python函数的要求:SeriesToSeries

    在这里插入图片描述

# 导包
import os
from pyspark.sql import SparkSession,functions as F
import pandas as pd

# 绑定指定的python解释器
from pyspark.sql.types import LongType, IntegerType

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()

    # TODO: 开启Arrow的使用
    spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'True')

    # 2.数据输入
    df = spark.createDataFrame(
        data = [(1,1),(2,2),(3,3)],
        schema= 'num1 int,num2 int'
    )
    df.show()
    # 3.基于pandas自定义函数 :SeriesTOSeries
    # 第一步: 自定义python函数
    def multiply(num1:pd.Series,num2:pd.Series)->pd.Series:
        return num1*num2

    # 第二步: 把python注册为SparkSQL函数
    # ①spark.udf.register注册
    dsl1_multiply = spark.udf.register('sql_multiply',multiply)
    # ②F.pandas_udf注册
    dsl2_multiply = F.pandas_udf(multiply,IntegerType())
    # ③@F.pandas_udf注册
    @F.pandas_udf(IntegerType())
    def candy_multiply(num1: pd.Series, num2: pd.Series) -> pd.Series:
        return num1 * num2

    # 第三步: 在SparkSQL中调用注册后函数
    # SQL方式
    df.createTempView('temp')
    spark.sql(
        """select num1,num2,sql_multiply(num1,num2) as result from temp"""
    ).show()
    # DSL方式
    #调用dsl1_multiply
    df.select(
        'num1','num2',dsl1_multiply('num1','num2').alias('result')
    ).show()
    # 调用dsl2_multiply
    df.select(
        'num1', 'num2', dsl2_multiply('num1', 'num2').alias('result')
    ).show()
    # 调用candy_multiply
    df.select(
        'num1', 'num2', candy_multiply('num1', 'num2').alias('result')
    ).show()

    # 4.关闭资源
    spark.stop()

3.3 自定义UDAF函数
  • 自定义Python函数的要求:Series To 标量

    表示:自定义函数的输入数据类型是Pandas中的Series对象,返回值数据类型是标量数据类型。也就是Python中的数据类型,例如:int、float、bool、list…

    在这里插入图片描述

基于pandas方式还支持自定义UDAF函数
注意: 如果要用于自定义UDAF函数,理论上只能用上述注册方式三语法糖方式,也就意味着理论只能DSL使用
注意: 如果还想同时用SQL方式和DSL方式,可以把加了语法糖的函数,再传入到方式register注册,就可以使用了!
# 导包
import os
from pyspark.sql import SparkSession, functions as F
import pandas as pd

# 绑定指定的python解释器
from pyspark.sql.types import LongType, IntegerType, FloatType

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()

    # TODO: 开启Arrow的使用
    spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'True')

    # 2.数据输入
    df = spark.createDataFrame(
        data=[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        schema='id int,value float'
    )
    df.show()


    # 3.基于pandas自定义函数 :SeriesTOSeries
    # 第一步: 自定义python函数
    # ③@F.pandas_udf注册  注意: 理论上UDAF只能用注册方式三语法糖方式,也就意味着只能DSL使用
    @F.pandas_udf(FloatType())
    def candy_mean_v(value: pd.Series) -> float:
        return value.mean()


    # 第二步: 注意: 如果还想同时用SQL方式和DSL方式,可以把加了语法糖的函数,再传入到方式一register注册
    # ①spark.udf.register注册
    dsl1_mean_v = spark.udf.register('sql_mean_v', candy_mean_v)

    # 第三步: 在SparkSQL中调用注册后函数
    # DSL方式
    # 调用candy_mean_v
    df.groupBy('id').agg(
        candy_mean_v('value').alias('result')
    ).show()

    # 调用dsl1_mean_v
    df.groupBy('id').agg(
        dsl1_mean_v('value').alias('result')
    ).show()

    # SQL方式
    df.createTempView('temp')
    spark.sql(
        """select id,sql_mean_v(value) as result from temp group by id"""
    ).show()

    # 4.关闭资源
    spark.stop()

四、Spark常见面试题

1. Spark client 和Spark cluster的区别?

区别是driver 进程在哪运行,client模式driver运行在master节点上,不在worker节点上;cluster模式
driver运行在worker集群某节点上,不在master节点上。
一般来说,如果提交任务的节点(即Master)和Worker集群在同一个网络内,此时client mode比较合
适。
如果提交任务的节点和Worker集群相隔比较远,就会采用cluster mode来最小化Driver和Executor之间
的网络延迟。
yarn client模式:driverzai当前提交任务的节点上,可以打印任务运行的日志信息。
yarn cluster模式:driver在AppMaster所有节点上,分布式分配,不能再提交任务的本机打印日志信
息。

2. Spark常用端口号

Spark-shell任务端口:4040
内部通讯端口:7077
查看任务执行情况端口:8080
历史服务器:18080
Oozie端口号:11000

3. Repartitons和Coalesce区别

  • 关系:两者都是用来改变 RDD 的 partition 数量的,repartition 底层调用的就是 coalesce 方 法:
    coalesce(numPartitions, shuffle = true)
  • 区别:repartition 一定会发生 shuffle,coalesce 根据传入的参数来判断是否发生 shuffle 一般情况
    下增大 rdd 的 partition 数量使用 repartition,减少 partition 数量时使用 coalesce。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/725650.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Centos 配置安装Mysql

linux安装配置mysql的方法主要有yum安装和配置安装两种&#xff0c;由于yum安装比较简单&#xff0c;但是会将文件分散到不同的目录结构下面&#xff0c;配置起来比较麻烦&#xff0c;这里主要研究一下配置安装mysql的方法 1、环境说明 centos 7.9 mysql 5.7.372、环境检查 …

ChatGPT Plus GPT-4o Claude 3 Opus合租拼车全新方式

无需自己搭建&#xff0c;登录即可用&#xff0c;国内直连访问&#xff0c;聚合多家最强大模型&#xff0c;随意选择使用。立即体验 datapipe.top 支持 OpenAI 最新 GPT-4o &#xff0c;获得快速高质量的对话&#xff0c;保证可用配额。支持多种大模型&#xff0c;GPT-4o &…

SerialChart上位机使用详解

SerialChart 上位机 软件分为三个区域&#xff1a;接收数据区&#xff0c;用于显示串口接收的数据。参数配置区&#xff0c;用于配置串口参数和显示参数。波形显示区&#xff0c;显示串口数据的波形。 在参数配置区写入串口号&#xff0c;波特率&#xff0c;通道波形颜色等&am…

高压电阻器支持牙科 X 射线成像的准确性

为了捕获患者牙齿和颌骨的足够图像&#xff0c;牙医依靠锥形束计算机断层扫描 &#xff08;CBCT&#xff09; 系统的先进 3D 成像。CBCT系统的输出对于准确诊断口腔健康问题和随后的治疗计划至关重要。为了确保这些图像的可靠性&#xff0c;CBCT系统制造商利用了Exxelia Ohmcra…

Jenkins+K8s实现持续集成(一)

镜像仓库的搭建 docker run -d \--restartalways \--name registry \-p 5000:5000 \-v /root/devops/registry/data:/var/lib/registry \registry安装完之后&#xff0c;执行下面命令可以看到镜像仓库已经安装成功 docker ps 然后在浏览器上输入下面地址进行访问 http://ip:…

一键简易桌签(带背景)-Word插件-大珩助手

问题整理&#xff1a; 如何Word中设计简易桌签&#xff1f;如何设置带背景图的桌签&#xff1f; Word大珩助手是一款功能丰富的Office Word插件&#xff0c;旨在提高用户在处理文档时的效率。它具有多种实用的功能&#xff0c;能够帮助用户轻松修改、优化和管理Word文件&…

Python酷库之旅-比翼双飞情侣库(17)

目录 一、xlwt库的由来 1、背景和需求 2、项目启动 3、功能特点 4、版本兼容性 5、与其他库的关系 6、示例和应用 7、发展历史 二、xlwt库优缺点 1、优点 1-1、简单易用 1-2、功能丰富 1-3、兼容旧版Excel 1-4、社区支持 1-5、稳定性 2、缺点 2-1、不支持.xls…

LVGL开发教程-Flex(弹性布局)

系列文章目录 知不足而奋进 望远山而前行 目录 系列文章目录 文章目录 前言 1.常用方法 2.代码实现 3.对齐方式 4.控件特殊的size 总结 前言 Flexbox布局在现代界面设计中扮演着重要角色&#xff0c;特别是在响应式和动态布局方面。LVGL&#xff08;LittlevGL&#x…

Dockerfile封装制作pytorch(tensorflow)深度学习框架 + jupyterlab服务 + ssh服务镜像

一&#xff1a;docker-hub官网寻找需求镜像 1.我们在https://hub.docker.com/官网找到要封装的pytorch基础镜像&#xff0c;这里我们以pytorch1.13.1版本为例 2.我们找到的这个devel版本的镜像&#xff08;我们需要cuda的编译工具&#xff09; pytorch版本是1.13.1&#xff0c;…

气体泄露隐患多,佛山工业可燃气体报警器年检校准来帮忙

在佛山这座工业发达的城市&#xff0c;可燃气体报警器的应用日益广泛&#xff0c;涉及化工、冶金、石油等多个领域。 然而&#xff0c;长时间的使用和恶劣的工业环境可能导致报警器的性能下降&#xff0c;甚至出现误报或漏报的情况。 因此&#xff0c;定期对可燃气体报警器进…

OPenCV实现把人形轮廓画在实时视频画面中

操作系统&#xff1a;ubuntu22.04OpenCV版本&#xff1a;OpenCV4.9IDE:Visual Studio Code编程语言&#xff1a;C11 1.功能描述 当你从摄像头读取实时视频时&#xff0c;如果想在视频的画面中画一个方框&#xff0c;或者是画一个圆&#xff0c;是很简单的事情&#xff0c;可是…

VMR,支持30+种编程语言的SDK版本管理器,支持Windows/MacOS/Linux。

官方文档地址&#xff1a;documents 官方项目地址&#xff1a;github 欢迎安装使用&#xff0c;分享转发&#xff0c;前往github star。 跨平台&#xff0c;支持Windows&#xff0c;Linux&#xff0c;MacOS支持多种语言和工具&#xff0c;省心受到lazygit的启发&#xff0c;拥…

LLM漫谈(七)| 使用PyTorch从零构建LLM

LLM是最流行AI聊天机器人的核心基础&#xff0c;比如ChatGPT、Gemini、MetaAI、Mistral AI等。在每一个LLM&#xff0c;有个核心架构&#xff1a;Transformer。我们将首先根据著名的论文“Attention is all you need”-https://arxiv.org/abs/1706.03762 来构建Transformer架构…

漏洞挖掘 | 记一次src挖掘-小程序敏感信息泄露

权当是一次漏洞挖掘的思路分享 闲言 就现在的一个web漏洞挖掘强度还是非常高的&#xff0c;所以我们不妨把我们的眼光投向一个之前可能未曾涉及到的区域———小程序 是的微信小程序&#xff0c;这玩意的防范能力和过滤能力其实对比web方向是要弱小很多的 进入正题 以下就是…

详细分析Element Plus的el-pagination基本知识(附Demo)

目录 前言1. 基本知识2. Demo3. 实战 前言 需求&#xff1a;从无到有做一个分页并且附带分页的导入导出增删改查等功能 前提一定是要先有分页&#xff0c;作为全栈玩家&#xff0c;先在前端部署一个分页的列表 相关后续的功能&#xff0c;是Java&#xff0c;推荐阅读&#x…

配置环境常规操作

一、看看显卡情况 1、看显卡驱动&#xff1a; nvidia-smi 2、验证cuda是否安装成功 nvcc -V 二、conda创建环境 conda create --name PatchCore_anomaly_detection python3.9 conda activate PatchCore_anomaly_detection 三、配置虚拟环境 cd C:\BaiduNetdiskDownload…

不同表格式下的小文件治理方式(开源RC file/ORC/Text非事务表、事务表、Holodesk表格式..)

友情链接&#xff1a; 小文件治理系列之为什么会出现小文件问题&#xff0c;小文件过多问题的危害以及不同阶段下的小文件治理最佳解决手段 小文件过多的解决方法&#xff08;不同阶段下的治理手段&#xff0c;SQL端、存储端以及计算端&#xff09; 概览 在前两篇博文中&am…

OceanBase v4.2 特性解析:支持并发建表,提升OMS导入效率

背景 OceanBase 4.0版本新增了单日志流架构&#xff0c;使得OBServer单机突破了原有的分区数限制&#xff0c;支持更大数量的分区。 很多业务环境为了处理单机数据量过大的问题&#xff0c;通常采取分库分表的方法&#xff0c;这一方法会导致业务需要创建数十万乃至百万级别的…

Java安全

Java安全 Java2Sec靶场搭建 靶场地址 https://github.com/bewhale/JavaSec 查看数据库配置文件&#xff0c;mysql&#xff0c;用户名密码根据自己数据库密码更改 使用小皮面板的mysql&#xff0c;新建一个数据名为javasec的数据库 运行javasec.sql文件 下载运行jar包即可 …

【五子棋】C语言教程

Hi~&#xff01;这里是奋斗的小羊&#xff0c;很荣幸您能阅读我的文章&#xff0c;诚请评论指点&#xff0c;欢迎欢迎 ~~ &#x1f4a5;&#x1f4a5;个人主页&#xff1a;奋斗的小羊 &#x1f4a5;&#x1f4a5;所属专栏&#xff1a;C语言 &#x1f680;本系列文章为个人学习…