将pyspark中的UDF提升6倍

本文亮点

      调用jar中的UDF,减少python与JVM的交互,简单banchmark下对于54亿条数据集进行udf计算,从3小时的执行时间缩短至16分钟。
牺牲UDF部分的开发时间,尽量提高性能。
以接近纯python的开发成本,获得逼近纯scala的性能。兼顾性能和开发效率。

前提

       当遇到sql无法直接处理的数据时(比如加密解密、thrift解析操作二进制),我们需要自定义函数(UDF)来进行处理。出于开发效率的考虑,我们一般会选择airflow,使用pyspark脚本。

优化后的代码

from datetime import datetime
from pyspark.sql import SparkSession
import sys

if __name__ == "__main__":
    # 创建 Spark 会话
    spark = SparkSession.builder.appName("xxx").enableHiveSupport().getOrCreate()

    DT = sys.argv[1]
    HOUR = sys.argv[2]
    F_DT = datetime.strptime(DT, "%Y-%m-%d").strftime("%Y%m%d")
    tmp_tbl_name = f"""temp_xxx_pre_0_{F_DT}_{HOUR}"""

    print('''注册java函数''')
    spark.udf.registerJavaFunction("get_area_complex_value", "com.xxx.utf.block.AreaComplexValue")
    spark.udf.registerJavaFunction("most_common_element", "com.xxx.utf.common.MosCommonElement")
    spark.sql('''set hive.exec.dynamic.partition.mode=nonstrict;''')
    exec_sql = '''
  insert overwrite table xxx.xxx --你的目标表
			select
			a.distinct_id device_id -- l临时
			,app_id
			,install_datetime
			,os
			,ip
			,country
			,city
			,uuid
			,a.distinct_id
			,event_name
			,event_timestamp
			,event_datetime
			,game_id
			,game_type
			,round_id
			,travel_id
			,travel_lv
			,matrix
			,position
			,concat('[', concat_ws(',', clean), ']') as clean
			,block_id
			,index_id
			,rec_strategy
			,rec_strategy_fact
			,combo_cnt
			,gain_score
			,gain_item
			,block_list
			,lag_event_timestamp
			,(event_timestamp - lag_event_timestamp) / 1000 AS time_diff_in_seconds
			,most_common_element( replace( replace(replace(rec_strategy_fact, '[', ''), ']', ''), '"', '' )) as rec_strategy_fact_most
			,lag_matrix
			,is_clear_screen
			,is_blast
			,blast_row_col_cnt
			,CASE 
                WHEN size(clean) > 0 THEN
                    CASE 
                        WHEN (IF(size(lag_clean_3) > 0, 1, 0) + IF(size(lag_clean_2) > 0, 1, 0) + IF(size(lag_clean_1) > 0, 1, 0)) > 0 THEN TRUE
                        WHEN (IF(size(lag_clean_3) > 0, 1, 0) + IF(size(lag_clean_2) > 0, 1, 0) + IF(size(lag_clean_1) > 0, 1, 0)) = 0
                        AND combo_cnt = '-1'
                        AND (IF(size(lead_clean_3) > 0, 1, 0) + IF(size(lead_clean_2) > 0, 1, 0) + IF(size(lead_clean_1) > 0, 1, 0)) > 0 THEN TRUE
                        ELSE FALSE
                    END
                ELSE
                    CASE 
                        WHEN (IF(size(lag_clean_2) > 0, 1, 0) + IF(size(lag_clean_1) > 0, 1, 0)) = 0 THEN FALSE
                        WHEN (IF(size(lag_clean_2) > 0, 1, 0) + IF(size(lag_clean_1) > 0, 1, 0)) = 2 THEN TRUE
                        WHEN size(lag_clean_1) > 0
                            AND lag_combo_cnt_1 = -1
                            AND (IF(size(lead_clean_1) > 0, 1, 0) + IF(size(lead_clean_2) > 0, 1, 0)) >= 1 THEN TRUE
                        WHEN size(lag_clean_1) > 0
                            AND lag_combo_cnt_1 > -1
                            AND (IF(size(lag_clean_3) > 0, 1, 0) + IF(size(lag_clean_4) > 0, 1, 0)) >= 1 THEN TRUE
                        WHEN size(lag_clean_2) > 0
                            AND lag_combo_cnt_2 = -1
                            AND size(lead_clean_1) > 0 THEN TRUE
                        WHEN size(lag_clean_2) > 0
                            AND lag_combo_cnt_2 > -1
                            AND (IF(size(lag_clean_3) > 0, 1, 0) + IF(size(lag_clean_4) > 0, 1, 0) + IF(size(lag_clean_5) > 0, 1, 0)) >= 1 THEN TRUE
                        ELSE FALSE
                    END
                END AS is_combo_status
			,common_block_cnt
			,step_score
			,block_index_id
			,cast(get_area_complex_value(lag_matrix) as int) matrix_complex_value
			,-0.1 as block_line_percent
			,-0.1 as corner_outside_percent
			,-0.1 as corner_inside_percent
			,sum((event_timestamp - lag_event_timestamp) / 1000) over(partition by game_id,game_type,distinct_id) time_accumulate_seconds --  double COMMENT '当前块距离本局游戏的开始的时长,秒级别的'
			,max(cast(round_id as Integer)) over(partition by game_id,game_type,distinct_id ) max_round_id -- 最大轮数2024-12-11数据开始准确
			,case when round_id=max(cast(round_id as Integer)) over(partition by game_id,game_type,distinct_id ) then true else false end  is_final_round -- boolean COMMENT '此轮是不是最后一轮' 最后一轮 2024-12-11数据开始准确
			,case when round_id=max(cast(round_id as Integer)) over(partition by game_id,game_type,distinct_id ) and block_index_id=max(block_index_id) over(partition by game_id,game_type,distinct_id,round_id )  then true 			else false end is_lethal_block -- boolean COMMENT '是不是致死块' 最后一轮里面的最后一个放块 2024-12-11数据开始准确
			,sum(step_score) over(partition by game_id,game_type,distinct_id) - step_score lag_accumulate_score -- double COMMENT '出此块前的累计分数'
			,sum(step_score) over(partition by game_id,game_type,distinct_id) accumulate_score -- double COMMENT '出此块后的累计分数'
			,cast((event_timestamp-last_click_time) / 1000 as int) as time_action_in_seconds -- 落块动作的时间
			,(event_timestamp - lag_event_timestamp) / 1000	- (event_timestamp-last_click_time) / 1000 as time_think_in_seconds --落块-思考时间
			,0 as last_click_time
			,cast(`gain_score_per_done` as Integer) as gain_score_per_done
			,cast(`is_clean_screen` as Integer) as is_clean_screen
			,cast(`weight` as float) as weight
			,cast(`put_rate` as float) as put_rate
			,0 as userwaynum
			,clean_times
			,clean_cnt
			,sum(clean_times) over(partition by game_id,game_type,distinct_id)   as accumulate_clean_times
			,sum(clean_cnt) over(partition by game_id,game_type,distinct_id)     as accumulate_clean_cnt
			,app_version
			,ram
			,disk
			,cast(get_area_complex_value(matrix) as int) cur_matrix_complex_value
			,block_down_color
			,design_position
			,1 as is_sdk_sample
			,network_type
			,session_id
			,block_shape_list
			,block_shape
			,design_postion_upleft
			,fps
			,-1 as fact_line
		    ,case when block_id in (1)  then 4  
                  when block_id in (2,3) then 6  
                  when block_id in (4 ,5 ,6 ,9 ,15 ,27 ,28 ,37 ,38 ) then 8 
                  when block_id in  (7 ,8 ,10 ,14 ,16 ,17 ,18 ,19 ,20 ,25 ,26 ,29 ,30 ,31 ,32 ,33 ,34 ,35 ,36 ,42)   then 10 
                  when block_id in (11, 12 ,13 ,21 ,22 ,23 ,24 ,39 ,40 ,41)   then 12 
             end as total_line
			,dt
			from xxx.xxx a --你的源表
			where dt='{DT}' and event_name = 'game_touchend_block_done' and (event_timestamp - lag_event_timestamp)>0;
     '''.format(DT=DT, tmp_tbl_name=tmp_tbl_name)
    print(exec_sql)
    spark.sql(exec_sql)

    # 关闭 Spark 会话
    spark.stop()

低层实现原理

如上图所示,pyspark并没有像dpark一样用python重新实现一个计算引擎,依旧是复用了scala的jvm计算底层,只是用py4j架设了一条python进程和jvm互相调用的桥梁。
driver: pyspark脚本和sparkContext的jvm使用py4j相互调用;
executor: 由于driver帮忙把spark算子封装好了,执行计划也生成了字节码,一般情况下不需要python进程参与,仅当需要运行UDF(含lambda表达式形式)时,将它委托给python进程处理(DAG图中的BatchEvalPython步骤),此时JVM和python进程使用socket通信。

上述使用简单UDF时的pyspark由于需要使用UDF,因此DAG图中有BatchEvalPython步骤:

BatchEvalPython过程

参考源码:spark/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala at master · apache/spark · GitHub
可以看到和这个名字一样直白,它就是每次取100条数据让python进程帮忙处理一下:

// 第58行:
// Input iterator to Python: input rows are grouped so we send them in batches to Python.
    // For each row, add it to the queue.
    val inputIterator = iter.map { row =>
      if (needConversion) {
        EvaluatePython.toJava(row, schema)
      } else {
        // fast path for these types that does not need conversion in Python
        val fields = new Array[Any](row.numFields)
        var i = 0
        while (i < row.numFields) {
          val dt = dataTypes(i)
          fields(i) = EvaluatePython.toJava(row.get(i, dt), dt)
          i += 1
        }
        fields
      }
    }.grouped(100).map(x => pickle.dumps(x.toArray))

由于我们的计算任务一般耗时瓶颈在于executor端的计算而不是driver,因此应该考虑尽量减少executor端调用python代码的次数从而优化性能。

参考源码:spark/python/pyspark/java_gateway.py at master · apache/spark · GitHub

// 大概135行的地方:
# Import the classes used by PySpark
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.ml.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
# TODO(davies): move into sql
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.api.python.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
java_import(gateway.jvm, "scala.Tuple2")

pyspark可以把很多常见的运算封装到JVM中,但是显然不包括我们的UDF。
所以一个很自然的思路就是把我们的UDF也封到JVM中。

将python的自定义函数改成java

https://github.com/sunlongjiang/adx/blob/master/adx_platform_common/src/main/java/com/hungrystudio/utf/block/AreaComplexValue.java

并在任务中通过--jars 引用该jar包

 "--jars", "s3://hungry-studio-data-warehouse/user/sunlj/java_udf/adx_platform_common-4.0.0.jar",

改写后运行任务发现比之前少了两个transform,没有了BatchEvalPython,也少了一个WholeStageCodeGen

优化前该任务的执行时长为3个小时,

优化后改任务的执行时长为16分钟,效果非常明显!!!

因此在pyspark中尽量使用spark算子和spark-sql,同时尽量将UDF(含lambda表达式形式)封装到一个地方减少JVM和python脚本的交互。
由于BatchEvalPython过程每次处理100行,也可以把多行聚合成一行减少交互次数。
最后还可以把UDF部分用java重写打包成jar包,其他部分则保持python脚本以获得不用编译随时修改的灵活性,以兼顾性能和开发效率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/971703.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

告别第三方云存储!用File Browser在Windows上自建云盘随时随地访问

文章目录 前言1.下载安装File Browser2.启动访问File Browser3.安装cpolar内网穿透3.1 注册账号3.2 下载cpolar客户端3.3 登录cpolar web ui管理界面3.4 创建公网地址 4.固定公网地址访问 前言 无论是个人用户还是企业团队&#xff0c;都希望能够有一个高效、安全的解决方案来…

vue2老版本 npm install 安装失败_安装卡主

vue2老版本 npm install 安装失败_安装卡主 特别说明&#xff1a;vue2老版本安装慢、运行慢&#xff0c;建议升级vue3element plus vite 解决方案1&#xff1a; 第一步、修改npm 镜像为国内镜像 使用淘宝镜像&#xff1a; npm config set registry https://registry.npmmir…

Qwen2-VL 的重大省级,Qwen 发布新旗舰视觉语言模型 Qwen2.5-VL

Qwen2.5-VL 是 Qwen 的新旗舰视觉语言模型&#xff0c;也是上一代 Qwen2-VL 的重大飞跃。 Qwen2.5-VL主要特点 视觉理解事物&#xff1a;Qwen2.5-VL不仅能够熟练识别花、鸟、鱼、昆虫等常见物体&#xff0c;而且还能够分析图像中的文本、图表、图标、图形和布局。 代理性&…

[matlab优化算法-18期】基于遗传算法的模糊PID控制优化

遗传算法优化模糊PID控制器&#xff1a;原理与实践 第一节&#xff1a;背景介绍 在现代控制系统中&#xff0c;PID控制器因其结构简单、参数调整方便而被广泛应用。然而&#xff0c;传统PID控制器的参数整定依赖于经验或试错法&#xff0c;难以适应复杂系统的动态变化。模糊控…

Kotlin Lambda

Kotlin Lambda 在探索Kotlin Lambda之前&#xff0c;我们先回顾下Java中的Lambda表达式&#xff0c;Java 的 Lambda 表达式是 Java 8 引入的一项强大的功能&#xff0c;它使得函数式编程风格的代码更加简洁和易于理解。Lambda 表达式允许你以一种更简洁的方式表示实现接口&…

实现pytorch注意力机制-one demo

主要组成部分&#xff1a; 1. 定义注意力层&#xff1a; 定义一个Attention_Layer类&#xff0c;接受两个参数&#xff1a;hidden_dim&#xff08;隐藏层维度&#xff09;和is_bi_rnn&#xff08;是否是双向RNN&#xff09;。 2. 定义前向传播&#xff1a; 定义了注意力层的…

【Prometheus】prometheus结合domain_exporter实现域名监控

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全…

基于Java+Springboot+MySQL企业公司网站系统设计与实现

博主介绍&#xff1a;黄菊华老师《Vue.js入门与商城开发实战》《微信小程序商城开发》图书作者&#xff0c;CSDN博客专家&#xff0c;在线教育专家&#xff0c;CSDN钻石讲师&#xff1b;专注大学生毕业设计教育、辅导。 所有项目都配有从入门到精通的基础知识视频课程&#xff…

SQL复习

SQL复习 MySQL SQL介绍 SQL SQL的全拼是什么&#xff1f; SQL全拼&#xff1a;Structured Query Language&#xff0c;也叫结构化查询语言。 SQL92和SQL99有什么区别呢&#xff1f; SQL92和SQL99分别代表了92年和99年颁布的SQL标准。 在 SQL92 中采用&#xff08;&#xff…

从入门到精通:Postman 实用指南

Postman 是一款超棒的 API 开发工具&#xff0c;能用来测试、调试和管理 API&#xff0c;大大提升开发效率。下面就给大家详细讲讲它的安装、使用方法&#xff0c;再分享些实用技巧。 一、安装 Postman 你能在 Postman 官网&#xff08;https://www.postman.com &#xff09;下…

零基础学QT、C++(一)安装QT

目录 如何快速学习QT、C呢&#xff1f; 一、编译器、项目构建工具 1、编译器&#xff08;介绍2款&#xff09; 2、项目构建工具 二、安装QT 1、下载QT安装包 2、运行安装包 3、运行QT creator 4、导入开源项目 总结 闲谈 如何快速学习QT、C呢&#xff1f; 那就是项目驱动法&…

【Zookeeper如何实现分布式锁?】

Zookeeper如何实现分布式锁? 一、ZooKeeper分布式锁的实现原理二、ZooKeeper分布式锁的实现流程三、示例代码四、总结一、ZooKeeper分布式锁的实现原理 ZooKeeper是一个开源的分布式协调服务,它提供了一个分布式文件系统的接口,可以用来存储和管理分布式系统的配置信息。 …

2D 游戏艺术、动画和光照

原文&#xff1a;https://unity.com/resources/2d-game-art-animation-lighting-for-artists-ebook 笔记 用Tilemap瓷砖大小为1单元&#xff0c;人物大小在0.5~2单元 PPU &#xff1a;单位像素 pixels per unit 2160 4K分辨率/ 正交相机size*2 完整屏幕显示像素点 有骨骼动…

Office word打开加载比较慢处理方法

1.添加safe参数 ,找到word启动项,右击word,选择属性 , 添加/safe , 应用并确定 2.取消加载项,点击文件,点击选项 ,点击加载项,点击转到,取消所有勾选,确定。

docker 运行 芋道微服务

jar包打包命令 mvn clean install package -Dmaven.test.skiptrue创建文件夹 docker-ai 文件夹下放入需要jar包的文件夹及 docker-compose.yml 文件 docker-compose.yml 内容&#xff1a;我这里的是ai服务&#xff0c;所以将原先的文件内容做了变更&#xff0c;你们需要用到什…

软件定义汽车时代的功能安全和信息安全

我是穿拖鞋的汉子&#xff0c;魔都中坚持长期主义的汽车电子工程师。 老规矩&#xff0c;分享一段喜欢的文字&#xff0c;避免自己成为高知识低文化的工程师&#xff1a; 简单&#xff0c;单纯&#xff0c;喜欢独处&#xff0c;独来独往&#xff0c;不易合同频过着接地气的生活…

ZYNQ TCP Server PS端千兆网口速率低问题,要修改BSP中LWIP配置参数

用VITIS教程里面 TCP UDP应用工程例程 打算测试PS端千兆网口速率。ZYNQ核心板用黑金的&#xff0c;外部板子自画的网口电路和其它电路。TCP SERVER时 iperf测试速率 只有60~70Mbit/s&#xff1f;然后用UDP SERVER方式&#xff0c;发现能达到 950Mbit/s&#xff1f;&#xff1f;…

《深度学习》——调整学习率和保存使用最优模型

调整学习率 在使用 PyTorch 进行深度学习训练时&#xff0c;调整学习率是一个重要的技巧&#xff0c;合适的学习率调整策略可以帮助模型更好地收敛。 PyTorch 提供了多种调整学习率的方法&#xff0c;下面将详细介绍几种常见的学习率调整策略及实例代码&#xff1a; torch.opt…

RocketMQ和Kafka如何实现顺序写入和顺序消费?

0 前言 先说明kafka&#xff0c;顺序写入和消费是Kafka的重要特性&#xff0c;但需要正确的配置和使用方式才能保证。本文需要解释清楚Kafka如何通过分区来实现顺序性&#xff0c;以及生产者和消费者应该如何配合。   首先&#xff0c;顺序写入。Kafka的消息是按分区追加写入…

场外个股期权下单后多久成交?场外个股期权对投资组合的影响

对普通老板们而言&#xff0c;它如同精密手术刀——用得好可精准优化投资组合&#xff0c;用不好则可能伤及本金。记住两个关键&#xff1a;一是永远用"亏得起的钱"参与&#xff0c;二是把合约条款当"药品说明书"逐字研读。 场外个股期权下单后多久成交&am…