PySPARK带多组参数和标签的SparkSQL批量数据导出到S3的程序

设计一个基于多个带标签SparkSQL模板作为配置文件和多组参数的PySPARK代码程序,实现根据不同的输入参数自动批量地将数据导出为Parquet、CSV和Excel文件到S3上,标签和多个参数(以“_”分割)为组成导出数据文件名,文件已经存在则覆盖原始文件。
代码如下:

import json
from pyspark.sql import SparkSession

def load_config(config_path):
    with open(config_path, 'r') as f:
        return json.load(f)

def main(config_path, base_s3_path):
    # 初始化SparkSession,配置S3和Excel支持
    spark = SparkSession.builder \
        .appName("DataExportJob") \
        .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1") \
        .getOrCreate()

    # 配置S3访问(根据实际环境配置)
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", "YOUR_ACCESS_KEY")
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "YOUR_SECRET_KEY")
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")

    config = load_config(config_path)

    for template in config['templates']:
        label = template['label']
        sql_template = template['sql_template']
        parameters_list = template['parameters']

        for params in parameters_list:
            # 验证参数数量是否匹配
            placeholders = sql_template.count('{')
            if len(params) != placeholders:
                raise ValueError(f"参数数量不匹配,模板需要{placeholders}个参数,但当前参数为{len(params)}个")

            # 替换SQL中的占位符
            formatted_sql = sql_template.format(*params)
            df = spark.sql(formatted_sql)

            # 生成文件名参数部分
            param_str = "_".join(params)
            base_filename = f"{label}_{param_str}"

            # 定义输出路径
            output_paths = {
                'parquet': f"{base_s3_path}/parquet/{base_filename}",
                'csv': f"{base_s3_path}/csv/{base_filename}",
                'excel': f"{base_s3_path}/excel/{base_filename}.xlsx"
            }

            # 写入Parquet
            df.write.mode('overwrite').parquet(output_paths['parquet'])

            # 写入CSV(自动生成header)
            df.write.mode('overwrite') \
                .option("header", "true") \
                .csv(output_paths['csv'])

            # 写入Excel(使用spark-excel包)
            df.write.format("com.crealytics.spark.excel") \
                .option("header", "true") \
                .option("inferSchema", "true") \
                .mode("overwrite") \
                .save(output_paths['excel'])

    spark.stop()

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('--config', type=str, required=True, help='Path to config JSON file')
    parser.add_argument('--s3-path', type=str, required=True, help='Base S3 path (e.g., s3a://your-bucket/data)')
    args = parser.parse_args()

    main(args.config, args.s3_path)

配置文件示例(config.json)

{
  "templates": [
    {
      "label": "sales_report",
      "sql_template": "SELECT * FROM sales WHERE date = '{0}' AND region = '{1}'",
      "parameters": [
        ["202301", "north"],
        ["202302", "south"]
      ]
    },
    {
      "label": "user_activity",
      "sql_template": "SELECT user_id, COUNT(*) AS cnt FROM activity WHERE day = '{0}' GROUP BY user_id",
      "parameters": [
        ["2023-01-01"],
        ["2023-01-02"]
      ]
    }
  ]
}

使用说明

  1. 依赖管理

    • 确保Spark集群已安装Hadoop AWS和Spark Excel依赖:
      spark-submit --packages com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1 your_script.py
      
  2. S3配置

    • 替换代码中的YOUR_ACCESS_KEYYOUR_SECRET_KEY为实际AWS凭证
    • 根据S3兼容存储调整endpoint(如使用MinIO需特殊配置)
  3. 执行命令

    spark-submit --packages com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1 \
    data_export.py --config config.json --s3-path s3a://your-bucket/exports
    

输出结构

s3a://your-bucket/exports
├── parquet
│   ├── sales_report_202301_north
│   ├── sales_report_202302_south
│   └── user_activity_2023-01-01
├── csv
│   ├── sales_report_202301_north
│   ├── sales_report_202302_south
│   └── user_activity_2023-01-01
└── excel
    ├── sales_report_202301_north.xlsx
    ├── sales_report_202302_south.xlsx
    └── user_activity_2023-01-01.xlsx

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/963823.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

LabVIEW无人机航线控制系统

介绍了一种无人机航线控制系统,该系统利用LabVIEW软件与MPU6050九轴传感器相结合,实现无人机飞行高度、速度、俯仰角和滚动角的实时监控。系统通过虚拟仪器技术,有效实现了数据的采集、处理及回放,极大提高了无人机航线的控制精度…

最新功能发布!AllData数据中台核心菜单汇总

🔥🔥 AllData大数据产品是可定义数据中台,以数据平台为底座,以数据中台为桥梁,以机器学习平台为中层框架,以大模型应用为上游产品,提供全链路数字化解决方案。 ✨奥零数据科技官网:http://www.aolingdata.com ✨AllData开源项目:https://github.com/alldatacenter/…

【memgpt】letta 课程1/2:从头实现一个自我编辑、记忆和多步骤推理的代理

llms-as-operating-systems-agent-memory llms-as-operating-systems-agent-memory内存 操作系统的内存管理

HarmonyOS:给您的应用添加通知

一、通知介绍 通知旨在让用户以合适的方式及时获得有用的新消息,帮助用户高效地处理任务。应用可以通过通知接口发送通知消息,用户可以通过通知栏查看通知内容,也可以点击通知来打开应用,通知主要有以下使用场景: 显示…

leetcode——二叉树的最近公共祖先(java)

给定一个二叉树, 找到该树中两个指定节点的最近公共祖先。 百度百科中最近公共祖先的定义为:“对于有根树 T 的两个节点 p、q,最近公共祖先表示为一个节点 x,满足 x 是 p、q 的祖先且 x 的深度尽可能大(一个节点也可以是它自己的…

【FreeRTOS 教程 六】二进制信号量与计数信号量

目录 一、FreeRTOS 二进制信号量: (1)二进制信号量作用: (2)二进制信号量与互斥锁的区别: (3)信号量阻塞时间: (4)信号量的获取与…

python学opencv|读取图像(五十五)使用cv2.medianBlur()函数实现图像像素中值滤波处理

【1】引言 在前述学习过程中,已经探索了取平均值的形式进行图像滤波处理。 均值滤波的具体的执行对象是一个nXn的像素核,对这个像素核内所有像素点的BGR值取平均值,然后把这个平均的BGR值直接赋给像素核中心位置的核心像素点,由…

OpenAI 正式推出Deep Research

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗?订阅我们的简报,深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同,从行业内部的深度分析和实用指南中受益。不要错过这个机会,成为AI领…

多模态论文笔记——NaViT

大家好,这里是好评笔记,公主号:Goodnote,专栏文章私信限时Free。本文详细解读多模态论文NaViT(Native Resolution ViT),将来自不同图像的多个patches打包成一个单一序列——称为Patch n’ Pack—…

VLAN 基础 | 不同 VLAN 间通信实验

注:本文为 “ Vlan 间通信” 相关文章合辑。 英文引文,机翻未校。 图片清晰度限于原文图源状态。 未整理去重。 How to Establish Communications between VLANs? 如何在 VLAN 之间建立通信? Posted on November 20, 2015 by RouterSwi…

使用Pygame制作“吃豆人”游戏

本篇博客展示如何使用 Python Pygame 编写一个简易版的“吃豆人(Pac-Man)” 风格游戏。这里我们暂且命名为 Py-Man。玩家需要控制主角在一个网格地图里移动、吃掉散布在各处的豆子,并躲避在地图中巡逻的幽灵。此示例可帮助你理解网格地图、角…

springboot使用rabbitmq

使用springboot创建rabbitMQ的链接。 整个项目结构如下&#xff1a; 1.maven依赖 <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.4.1</version> </dependency>application.y…

安卓(android)订餐菜单【Android移动开发基础案例教程(第2版)黑马程序员】

一、实验目的&#xff08;如果代码有错漏&#xff0c;可查看源码&#xff09; 1.掌握Activity生命周的每个方法。 2.掌握Activity的创建、配置、启动和关闭。 3.掌握Intent和IntentFilter的使用。 4.掌握Activity之间的跳转方式、任务栈和四种启动模式。 5.掌握在Activity中添加…

RabbitMQ快速上手及入门

概念 概念&#xff1a; publisher&#xff1a;生产者&#xff0c;也就是发送消息的一方 consumer&#xff1a;消费者&#xff0c;也就是消费消息的一方 queue&#xff1a;队列&#xff0c;存储消息。生产者投递的消息会暂存在消息队列中&#xff0c;等待消费者处理 exchang…

java命令详解

这里以jdk8为例子&#xff0c;查看默认的垃圾回收器 java -XX:PrintCommandLineFlags -version-XX:UseParallelGC : Parallel Scavenge 和 Parallel Old 组合 -XX:InitialHeapSize268435456 : 初始化堆大小&#xff08;字节&#xff09; -XX:MaxHeapSize4294967296 : 最大堆大…

自主Shell命令行解释器

什么是命令行 我们一直使用的"ls","cd","pwd","mkdir"等命令&#xff0c;都是在命令行上输入的&#xff0c;我们之前对于命令行的理解&#xff1a; 命令行是干啥的&#xff1f;是为我们做命令行解释的。 命令行这个东西实际上是我们…

分析哲学:从 语言解剖到 思想澄清的哲学探险

分析哲学&#xff1a;从 语言解剖 到 思想澄清 的哲学探险 第一节&#xff1a;分析哲学的基本概念与公式解释 【通俗讲解&#xff0c;打比方来讲解&#xff01;】 分析哲学&#xff0c;就像一位 “语言侦探”&#xff0c;专注于 “解剖语言”&#xff0c;揭示我们日常使用的语…

自定义数据集 使用paddlepaddle框架实现逻辑回归

导入必要的库 import numpy as np import paddle import paddle.nn as nn 数据准备&#xff1a; seed1 paddle.seed(seed)# 1.散点输入 定义输入数据 data [[-0.5, 7.7], [1.8, 98.5], [0.9, 57.8], [0.4, 39.2], [-1.4, -15.7], [-1.4, -37.3], [-1.8, -49.1], [1.5, 75.6…

QtCreator在配置Compilers时,有一个叫ABI的选项,那么什么是ABI?

问题提出 QtCreator在配置Compilers时,有一个叫ABI的选项,那么什么是ABI&#xff1f; ABI&#xff08;Application Binary Interface&#xff09;介绍 ABI&#xff08;Application Binary Interface&#xff0c;应用二进制接口&#xff09;是指应用程序与操作系统或其他程序…

[STM32 标准库]EXTI应用场景 功能框图 寄存器

一、EXTI 外部中断在嵌入式系统中有广泛的应用场景&#xff0c;如按钮开关控制&#xff0c;传感器触发&#xff0c;通信接口中断等。其原理都差不多&#xff0c;STM32会对外部中断引脚的边沿进行检测&#xff0c;若检测到相应的边沿会触发中断&#xff0c;在中断中做出相应的处…