sparksql

1.1什么是sparksql

 Spark SQL 是 Spark 用于处理结构化数据的模块。

一、主要特点

  1. 统一的数据处理

    • Spark SQL 提供了一个统一的编程接口,可以处理多种数据源,包括 Hive 表、Parquet 文件、JSON 文件、关系型数据库等。这使得用户可以在一个统一的环境中进行数据处理,无需切换不同的工具和技术。
    • 例如,可以使用相同的代码读取 Hive 表和 Parquet 文件,并进行联合查询和分析。
  2. 支持 SQL 查询

    • 用户可以使用标准的 SQL 语言进行数据查询和分析。Spark SQL 支持大多数 SQL 语法和功能,包括 SELECT、JOIN、GROUP BY、WHERE 等语句,以及窗口函数、子查询等高级功能。
    • 这使得熟悉 SQL 的用户可以轻松地使用 Spark SQL 进行数据分析,无需学习新的编程语言。
  3. 与 Spark 生态系统集成

    • Spark SQL 与 Spark 的其他模块(如 Spark Core、Spark Streaming、MLlib、GraphX)紧密集成,可以在大规模数据处理和分析中发挥重要作用。
    • 例如,可以将 Spark Streaming 接收的实时数据流存储到 Spark SQL 的表中,然后使用 SQL 进行实时分析;或者将 Spark SQL 的查询结果作为输入,用于机器学习算法的训练和预测。
  4. 性能优化

    • Spark SQL 采用了多种性能优化技术,以提高数据处理的效率和速度。例如,它使用了基于成本的优化器(CBO)来自动优化 SQL 查询计划,选择最优的执行方式;同时,它还支持内存列式存储和向量化执行,以提高数据的读取和处理速度。
    • 此外,Spark SQL 还可以利用 Spark 的分布式计算能力,将数据处理任务并行化到多个节点上,提高处理大规模数据的能力。

二、使用场景

  1. 数据仓库和数据分析

    • Spark SQL 可以作为一个数据仓库解决方案,用于存储和分析大规模的结构化数据。它支持与 Hive 兼容的 metastore,可以直接读取和写入 Hive 表,方便与现有的数据仓库集成。
    • 同时,Spark SQL 提供了丰富的数据分析功能,如 SQL 查询、数据透视表、聚合函数等,可以满足各种数据分析需求。
  2. 实时数据分析

    • 结合 Spark Streaming,Spark SQL 可以实现实时数据分析。可以将实时数据流存储到 Spark SQL 的表中,然后使用 SQL 进行实时查询和分析,以满足对实时数据的监控和决策需求。
    • 例如,在金融交易、网络监控、社交媒体分析等领域,可以使用 Spark SQL 进行实时风险监控、异常检测和趋势分析。
  3. 机器学习和数据挖掘

    • Spark SQL 可以与 Spark MLlib 集成,用于机器学习和数据挖掘任务。可以使用 SQL 查询从大规模数据集中提取特征,然后将这些特征输入到机器学习算法中进行训练和预测。
    • 例如,在推荐系统、客户细分、欺诈检测等领域,可以使用 Spark SQL 和 MLlib 进行数据预处理、特征工程和模型训练。
  4. 数据可视化

    • Spark SQL 的查询结果可以与各种数据可视化工具集成,如 Tableau、PowerBI 等,以实现数据的可视化展示。通过 SQL 查询,可以从大规模数据集中提取关键信息,并以直观的图表和图形展示出来,帮助用户更好地理解和分析数据。

 1.2创建datafram

# 导入行类Row
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *

# 创建行数据
r1 = Row(id=1, name='张三', age=20)
r2 = Row(id=2, name='李四', age=22)
# 创建元数据
schema = (StructType().add('id', IntegerType(), nullable=False)
                      .add('name', StringType(), nullable=False)
                      .add('age',IntegerType(),nullable=False)
                      .add('gender', StringType(), nullable=False))
print(schema)

# 创建dataframe
# 生成sparksession对象  按照固定写法创建
ss = SparkSession.builder.getOrCreate()
# 使用sparksession对象方法创建df
# createDataFrame 第一参数是一个列表数据,将每行数据放入列表
# 第二个参数指定表元数据信息
# df是一个dataframe类型的对象
df = ss.createDataFrame([r1, r2], schema=schema)

# dataframe数据的操作
# 查看df数据
df.show()  # 查看所有数据,超过20行时,默认只显示20行
# 查看元信息
df.printSchema()

1.3rdd与df之间的转化 

# RDD和DF之间的转换
# 导入SparkSession
from pyspark.sql import SparkSession

# 创建对象
ss = SparkSession.builder.getOrCreate()

# 使用sparksession获取sparkcontext
sc = ss.sparkContext # 不要括号,可以直接获取到sparkcontext对象

# 生成rdd数据
# rdd转df时,要求数据是二维嵌套列表
data = [[1,'张三',20,'男'],[2,'小红',19,'女']]
rdd = sc.parallelize(data)

# rdd转df
df = rdd.toDF(schema='id int,name string,age int,gender string')

# 查看df数据
df.show()

# 查看表结构
df.printSchema()


# 将df转为rdd
rdd2 = df.rdd
# 查看rdd中数据
res = rdd2.collect() # [Row(),Row()]
# 转化后的rdd中每个元素是有个Row类对象
print(res)
print(res[0])
print(res[0]['name'])

1.4pandas和spark之间转化

pandas的df转为spark的df  

# Pandas和spark之间的转化
import pandas as pd
from pyspark.sql import SparkSession
# 创建pd的df
pd_df = pd.DataFrame(
    {
        'id':[1,2,3,4],
        'name':['a','b','c','d'],
        'age':[20,21,22,24],
        'gender':['男','女','男','男']
    }
)
# 查看数据
print(pd_df)

# 将pd_df 转为spark的df
ss = SparkSession.builder.getOrCreate()
spark_df = ss.createDataFrame(pd_df)

# 查看数据
spark_df.show()

spark的df转为pandas的df

# 将spark_df转为pd_df
pd_df2 = spark_df.toPandas()
print(pd_df2)

1.5读取文件数据转为df

# 读取数据转化为df
from pyspark.sql import SparkSession

# 创建sparksession
ss = SparkSession.builder.getOrCreate()

# 读取不同数据源
# header=True 是否需要获取表头
# sep 指定数据字段按照什么字符分割
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')
# schema当没有表头时,可以自己指定字段
df2 = ss.read.csv('hdfs://node1:8020/data/students.csv',header=False,sep=',',schema='user_id string,username string,sex string,age string,cls string')

df3 = ss.read.json('hdfs://node1:8020/data/employees.json')
df4 = ss.read.orc('hdfs://node1:8020/data/users.orc')
df5 = ss.read.parquet('hdfs://node1:8020/data/users.parquet')


# 查看
# show中可以指定显示多少行,默认是20行
df.show(100)

df2.show()

 2.DataFrame基本使用

2.1SQL语句

# 使用sql方式开发
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

# 读取数据得到df数据
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',',schema='id string,name string,gender string,age int,cls string')

# 对df数据进行sql操作
# 需要给df指定一个表名
df.createTempView('tb_user')

# 编写sql语句执行
# sql执行后的结果被保存新的df中
new_df =  ss.sql('select gender,avg(age) as avg_data from tb_user group by gender')
new_df.show()

 2.2DSL方法

# 使用DSL方式开发
from pyspark.sql import  SparkSession

ss = SparkSession.builder.getOrCreate()

# 生成df
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',',schema='id string,name string,gender string,age int,cls string')
# 查看df数据
df.show()

print('---------------select方法----------------------')
# 使用select方法指定输出展示的数据字段
# 方式一指定字段
df_select = df.select('id','name')
# 方式二
df_select2 = df.select(df.age,df.gender)
# 方式三
df_select3 = df.select(df['id'],df['cls'])

df_select.show()
df_select2.show()
df_select3.show()

print('---------------alias方法----------------------')
# 字段名称修改,需要配合select中使用
df_alias = df.select(df.id.alias('user_id'),df.name.alias('username'))
df_alias.show()

print('---------------cast方法----------------------')
# 修改字段的数据类型
df.printSchema()
df_cast = df.select(df.id.cast('int'),df.name,df.age)
df_cast.printSchema()

print('---------------where方法----------------------')
# 数据过滤,where方法内部是调用了filter方法
# 方式1
df_where = df.where('age > 20')
df_where.show()
#方式2
df_where2 = df.where(df.age > 20)
df_where2.show()

# 与或非多条件 只能使用方式1  条件的书写和在sql中的where书写内容一样
df_where3 = df.where('age > 20 and gender = "男" ')
df_where3.show()

print('---------------groupby方法----------------------')
# 分组计算,可以配和聚合方法一起使用  使用该方式聚合一次只能计算一个聚合数据 ,可以使用内置函数配合agg方法
# groupby指定分组字段,可以指定多个
# avg 聚合方法  指定聚合字段  sum  count  avg  max  min
df_groupby = df.groupby('gender').avg('age')
df_groupby.show()

# groupby指定分组字段,可以指定多个
df_groupby2 = df.groupby('gender','cls').avg('age')
df_groupby2.show()

# 分组后的数据过滤
df_groupby3 = df.groupby('gender','cls').avg('age').where(' avg(age) > 19')
df_groupby3.show()

print('---------------orderby方法----------------------')
# 数据排序 内部调用sort方法
df_orderby = df.orderBy('age')
df_orderby.show()
# ascending=False 降序
df_orderby2 = df.orderBy('age',ascending=False)
df_orderby2.show()

print('---------------limit方法----------------------')
# 指定获取多条数据
df_limit = df.orderBy('age',ascending=False).limit(5)

df_limit.show()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/896305.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

项目实战:构建 effet.js 人脸识别交互系统的实战之路

📝个人主页🌹:Eternity._ 🌹🌹期待您的关注 🌹🌹 ❀构建 effet.js 📒1. 什么是effet.js📜2. 为什么需要使用effet.js📝3. effet.js的功能📚4. 使用…

【国产操作系统】揭秘deepin 23自定义、全盘、高级安装的奥秘,携手探索无限可能,尝鲜之旅,等你来驾驭!

简述 deepin 作为国内最好的个人桌面Linux社区发行版之一,其实受到很多人的关系,对于很多普通用户来说,其很易用,不需要怎么折腾,界面也非常友好。 针对技术型的 Linux 用户,可能对 deepin 的态度就是仁者…

文献分享: 高维ANN算法的综述

文章目录 0. \textbf{0. } 0. 写在前面 0.1. \textbf{0.1. } 0.1. 一些预备知识 0.2. \textbf{0.2. } 0.2. 本文的主要研究 0.3. \textbf{0.3. } 0.3. 本文一些研究限制 1. \textbf{1. } 1. 三大类 ANN \textbf{ANN} ANN算法回顾以及 DPG \textbf{DPG} DPG 1.1. \textbf{1.1. …

基于递推式最小二乘法的PMSM参数辨识MATLAB仿真模型

微❤关注“电气仔推送”获得资料(专享优惠) 模型简介 最小二乘法是一种回归估计法,适用于被辨识的参数与系统输出为线性关 系的情况。它是在一定数据量下,基于系统输出误差的平方和最小的准则对参 数进行辨识的方法。此模型通过…

案例分享-优秀蓝色系UI界面赏析

蓝色UI设计界面要提升舒适度,关键在于色彩搭配与对比度。选择柔和的蓝色调作为主色,搭配浅灰或白色作为辅助色,能营造清新、宁静的氛围。同时,确保文字与背景之间有足够的对比度,避免视觉疲劳,提升阅读体验…

CatVTON:AI 虚拟换装的卓越之选

在时尚与科技融合的时代,CatVTON 作为一款创新的 AI 虚拟换装工具,正引领着时尚界的变革。它由中山大学、美图、Pixocial 和鹏城实验室等机构联合开发,以其独特的优势和卓越的性能,为时尚爱好者、电商从业者以及设计师们带来了前所…

URL路径以及Tomcat本身引入的jar包会导致的 SpringMVC项目 404问题、Tomcat调试日志的开启及总结

一、URL路径导致的 SpringMVC项目 404问题 SpringMVC项目的各项代码都没有问题,但是在页面请求时仍然显示404,编译的时候报了下面的问题: org.apache.jasper.servlet.TldScanner.scanJars 至少有一个JAR被扫描用于TLD但尚未包含TLD。 为此记录…

Windows下搭建VUE开发环境

Windows下搭建VUE开发环境 文章目录 Windows下搭建VUE开发环境第一步 安装nodejs下载nodejs安装nodejs配置环境变量安装测试配置npm的路径配置npm的国内代理安装必要工具测试工具安装的使用 第二步 安装vscode下载vscode安装插件Chinese (Simplified) (简体中文) Language Pack…

从0到1构建Next.Js项目SSG和SSR应用

最近在探索学习前端工程化相关内容,在如今前后端分离的架构下,为了提升首屏渲染速度和 SEO 效果,兜兜转转,又回到了服务端渲染。 本文主要是讲讲如何使用 Next.js 框架实现服务端渲染,重构或优化现有前端应用的 SEO 和…

光伏工程造价单自动生成

光伏工程造价单依据光伏设计图自动生成。 一、组件 类型:光伏组件是光伏电站的核心设备,负责将太阳能转化为电能。常见的类型包括单晶硅组件、多晶硅组件、薄膜组件等。 规格型号:具体规格型号取决于电站的设计需求,例如功率、…

企业博客SEO优化:8个必备工具与资源指南

在当今数字化时代,企业博客已远远超越了传统意义上的信息展示平台。它不仅是企业展示品牌形象、传递品牌价值的重要窗口,更是吸引潜在客户、增强用户粘性、提升网站流量和搜索引擎排名的关键。通过精心策划和高质量的内容创作,企业博客能够建…

【OpenGL】创建窗口/绘制图形

需要云服务器等云产品来学习Linux可以移步/-->腾讯云<--/官网&#xff0c;轻量型云服务器低至112元/年&#xff0c;新用户首次下单享超低折扣。 目录 一、创建窗口 1、代码流程图 2、运行结果 3、代码 二、三角形 1、顶点缓冲对象&#xff1a;Vertex Buffer Object…

【Qt】控件——Qt控件的介绍、QWidget的介绍、QWidget的属性、QWidget的函数

文章目录 Qt1. 控件的概念2. QWidgetenabledgeometrywindowTitlewindowIconwindowOpacitycursorfonttoolTiptoolTipDuringstyleSheet Qt 1. 控件的概念 Widget 是 Qt 中的核心概念。英文原义是 “小部件”&#xff0c;我们此处也把它翻译为 “控件”。控件是构成一个图形化界面…

吴恩达深度学习笔记(7)

误差分析&#xff1a; 你运行一个算法代替人类计算&#xff0c;但是没有达到人类的效果&#xff0c;需要手动检查算法中的错误&#xff0c;对模型的一些部分做相应调整&#xff0c;才能更好地提升分类的精度。如果不加分析去做&#xff0c;可能几个月的努力对于提升精度并没有…

Linux文件你不知道的那些事,搞清楚磁盘空间占用的问题

在进行采集日志时&#xff0c;日志文件明明被滚动压缩并转移走了&#xff0c;但是发现磁盘空间还是在不断增长&#xff0c;统一目录下的总文件大小&#xff0c;发现与实际占用也不符&#xff0c;于是想到可能是文件句柄未释放导致的&#xff0c;本文就来记录一下文件及文件句柄…

git clone 国内镜像

比如 git clone https://github.com/HKUST-Aerial-Robotics/A-LOAM.git 改成 git clone https://gitclone.com/github.com/HKUST-Aerial-Robotics/A-LOAM.git

MySQL 查询按照更新时间排序遇到相同更新时间的会少数据

MySQL分页后出现重复数据或丢失记录的原因可能包括&#xff1a;SQL查询条件不一致、使用了不稳定的排序、LIMIT语句与ORDER BY配合问题、缓存设置不当或数据库复制配置错误。需要检查查询逻辑和系统配置以解决这些问题。 MySQL分页导致数据重复的原因&#xff1a; 1、排序算法…

补题:C. Paprika and Permutation

C. Paprika and Permutation 传送门&#xff1a;Problem - 1617C - Codeforces 题意&#xff1a; 思路&#xff1a; 首先这个题要知道这个结论&#xff1a; 当 x > a[i] 时&#xff0c;a[i] mod x a[i] 当 x < a[i] 时&#xff0c;0 < a[i] % x < ( a[i] 1 )…

【unity小技巧】Unity6 LTS版本安装和一些修改和新功能使用介绍

文章目录 前言安装新功能变化1、官方推荐使用inputsystem进行输入控制2、修复了InputSystem命名错误导致listen被遮挡的bug3、自带去除unity启动画面logo功能4、unity官方的behavior行为树插件5、linearVelocity代替过时的velocity方法待续 完结 前言 2024/10/17其实unity就已…

ChatGPT 现已登陆 Windows 平台

今天&#xff0c;OpenAI 宣布其人工智能聊天机器人平台 ChatGPT 已开始预览专用 Windows 应用程序。OpenAI 表示&#xff0c;该应用目前仅适用于 ChatGPT Plus、Team、Enterprise 和 Edu 用户&#xff0c;是一个早期版本&#xff0c;将在今年晚些时候推出"完整体验"。…