第三阶段第一章——PySpark实战

学习了这么多python的知识,是时候来搞点真玩意儿了~~

春风得意马蹄疾,一日看尽长安花

o(* ̄︶ ̄*)o


 1.前言介绍

(1)什么是spark

        Apache Spark是一个开源的分布式计算框架,用于处理大规模数据集的计算任务。它提供了一种高性能、通用、易用的计算引擎,支持数据并行处理、内存计算、迭代计算等多种计算模式,并提供了丰富的API,比如Spark SQL、Spark Streaming、Mlib和Graphx等。Spark的基本单元是弹性分布式数据集(RDD),它是一种可分区、可并行计算的数据结构,可以在多个节点上进行操作。Spark可以运行在多种集群管理器上,包括Hadoop YARN、Apache Mesos和Standalone等。Spark的特点是速度快、易用、灵活、可扩展,已经成为了数据处理和数据科学领域的一个重要工具。

(2)什么是pyspark

        PySpark是指Spark的Python API,它是Spark的一部分,可以通过Python语言进行Spark编程。PySpark提供了Python与Spark之间的交互,使Python开发人员能够使用Python语言对Spark进行编程和操作。PySpark可以使用Python进行数据预处理和分析,同时扩展了Python语言的功能,能够同时使用Spark的分布式计算能力,加快了数据处理和分析的速度。与其他编程语言相比,Python在数据处理和科学计算方面有广泛的应用和支持,因此PySpark特别适合处理Python与大规模数据集交互的数据科学项目。

 


2.基础准备

(1)先下载  pyspark软件包

pip install -i https://pypi.tuna.tsinghua.edu.cn/simple/ pyspark

(2)SparkContext入口对象 

初体验代码:

"""
    演示
"""
# 导包
from pyspark import SparkConf, SparkContext

# 创建sparkConf对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

(3)PySpark编程模型

详细过程 ==>>


3.数据输入

(1)RDD对象

数据输入完成之后都会得到一个RDD对象

(2)数据容器转RDD对象

"""
    演示数据输入
"""
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 通过parallelize方法将Python对象加载到Spark内,成为RDD对象
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize((1, 2, 3, 4, 5))
rdd3 = sc.parallelize("abcdefg")
rdd4 = sc.parallelize({1, 2, 3, 4, 5})
rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})
# 查看rdd里面的内容需要使用collect()方法
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())

sc.stop()

(3)读取文件转化为RDD对象

代码示例


# 用过textFile方法,读取文件数据加载到Spark内,成为RDD对象
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

rdd_file = sc.textFile("D:\\IOText\\b.txt")
print(rdd_file.collect())

sc.stop()


4.数据计算

pyspark是通过RDD对象内丰富的:成员方法(算子)

(1)map方法

功能:将RDD的数据一条条处理(处理的逻辑 基于map算子中接收的处理函数),返回新的RDD

语法:

代码示例

"""
    演示map方法
"""
from pyspark import SparkConf, SparkContext
import os
#配置环境变量
os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("map_test")
sc = SparkContext(conf=conf)

# 准备RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])


# 通过map方法将全部数据都乘以10
# (T) -> U
def func(data):
    return data * 10


rdd2 = rdd.map(func)
print(rdd2.collect())

# 链式调用,快速解决
rdd3 = rdd.map(lambda e: e * 100).map(lambda e: e + 6)
print(rdd3.collect())

(2)flatMap方法

和map差不多,多了一个消除嵌套的效果

功能:对rdd执行map操作,然后进行解除嵌套

代码示例

"""
    演示 flatmap 方法
"""
from pyspark import SparkConf, SparkContext
import os

# 配置环境变量
os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("flatmap_test")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize(["a b c", "e f g", "uuu ggg"])
rdd2 = rdd.flatMap(lambda x: x.split(" "))
print(rdd2.collect())

(3)reduceByKey方法

功能:自动分组,组内聚合

代码示例

"""
    演示 reduceByKey 方法
"""
from pyspark import SparkConf, SparkContext
import os

# 配置环境变量
os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("reduceByKey_test")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([('男', 99), ('男', 88), ('女', 99), ('女', 66)])
# 求男生和呃女生两个组的成绩之和
rdd1 = rdd.reduceByKey(lambda a, b: a + b)
print(rdd1.collect())


(4)练习案例1

目标,统计文件内出现单词的数量.

代码示例:

"""
    读取文件,统计文件内,单词出现的数量
"""

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("test_1")
sc = SparkContext(conf=conf)

# 读取文件
file_rdd = sc.textFile("D:\\IOText\\b.txt")

# 取出所有的单词
words = file_rdd.flatMap(lambda line: line.split(' '))
print(words.collect())
# 将其转化为map类型,value设置为1,方便在之后分组统计
word_one = words.map(lambda w: (w, 1))
# 分组求和
res = word_one.reduceByKey(lambda a, b: a + b)

print(res.collect())

我的文件数据是

word1
word2 word2
word3 word3 word3
word4 word4 word4 word4

(5)filter方法

功能:过滤想要的数据进行保留

filter返回值为TRUE则保留,不然会被过滤

语法:

代码示例:

"""
    filter方法演示
"""
from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# filter返回值为TRUE则保留,不然会被过滤
rdd2 = rdd.filter(lambda e: e % 2 == 0)
print(rdd2.collect())

(6)distinct方法

功能:对RDD数据进行去重,返回新的RDD

语法:

rdd.distinct( )   # 这里无需传参

代码示例:

"""
    distinct方法演示
"""
from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([1, 1, 2, 2, 2, 3, 4, 5])
# filter返回值为TRUE则保留,不然会被过滤
rdd2 = rdd.distinct()
print(rdd2.collect())

(7)sortBy方法

功能:对RDD数据进行排序,基于你指定的排序依据

语法:

"""
    sortBy方法演示
"""
from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)

# 读取文件
file_rdd = sc.textFile("D:\\IOText\\b.txt")

# 取出所有的单词
words = file_rdd.flatMap(lambda line: line.split(' '))
# print(words.collect())
# 将其转化为map类型,value设置为1,方便在之后分组统计
word_one = words.map(lambda w: (w, 1))
# 分组求和
res = word_one.reduceByKey(lambda a, b: a + b)
# print(res.collect())
# 按照出现次数从小到大排序
final_rdd = res.sortBy(lambda x: x[1], ascending=True, numPartitions=1)
print(final_rdd.collect())


(8)练习案例2

需要的数据已经上传到博客

三个需求:

# TODO 需求一:城市销售额排名
# TODO 需求2:全部城市有哪些商品类别在售卖
# TODO 需求3:北京市有哪些商品类别在售卖
"""
    sortBy方法演示
"""
from pyspark import SparkConf, SparkContext
import json
import os

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)

# TODO 需求一:城市销售额排名
# 1.1 读取文件
file_rdd = sc.textFile("D:\\IOText\\orders.txt")
# 1.2 取出一个个JSON字符串
json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))
# 1.3 取出城市和销售额数据
dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
# print(dict_rdd.collect())
# 1.4 取出城市和销售额数据
city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'], int(x['money'])))
# 1.5 按城市分组按销售额聚合
city_result_rdd = city_with_money_rdd.reduceByKey(lambda a, b: a + b)
# 1.6 按销售额聚合结果进行排序
result_rdd_1 = city_result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print("需求1的结果:", result_rdd_1.collect())

# TODO 需求2:全部城市有哪些商品类别在售卖
# 同时记得去重
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print("需求2的结果:", category_rdd.collect())

# TODO 需求3:北京市有哪些商品类别在售卖
# 3.1 只要北京市的数据
rdd_beijing = dict_rdd.filter(lambda x: x["areaName"] == "北京")
# 3.2 取出全部商品类别并且去重
result_rdd_3 = rdd_beijing.map(lambda x: x['category']).distinct()
print("需求3的结果:", result_rdd_3.collect())


5.数据输出

(1)输出为Python对象:collect算子

功能:将RDD各个分区内的数据,统一手机到Driver中,形成一个List对象

用法:

rdd.collect( )   # 返回值是一个list

(2)输出为Python对象:reduce算子

功能:对RDD数据集按照你传入的逻辑进行聚合

语法:

(3)输出为Python对象:take算子

功能:取RDD的前N个元素,组合成list返回给你

语法:

(4)输出为Python对象:count算子

功能:计算RDD有多少条数据,返回值是一个数字

语法:

(5)上述1-4算子代码示例

"""
    collect算子
"""

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)

# 准备RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])

# collect算子,输出RDD为list对象
rdd_list: list = rdd.collect()
print(type(rdd_list))
print(rdd_list)

# reduce算子,对RDD进行两两聚合
num = rdd.reduce(lambda a, b: a + b)
print(num)

# take算子,取出RDD前N个元素,组成list返回
take_list_3 = rdd.take(3)
print(take_list_3)

# count算子,统计rdd内有多少条数据,返回值为数字
num_count = rdd.count()
print(f"有{num_count}个元素")

 

(6)输出到文件中:saveAsTextFile算子

功能:将RDD的数据写入文本文件中

支持  本地写出,hdfs等文件系统

语法:

注意事项:

 代码示例:

"""
    saveAsTextFile
"""

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"
os.environ['HADOOP_HOME'] = "D:\\CodeSoft\\Hadoop\\hadoop-3.0.0"

conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)

# 准备RDD1
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
# 准备RDD2
rdd2 = sc.parallelize([("hello", 3), ("spark", 5), ("java", 7)])
# 准备RDD3
rdd3 = sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]])

# 输出到文件之中
rdd1.saveAsTextFile("D:\\IOText\\saveAsTextFile测试1")
rdd2.saveAsTextFile("D:\\IOText\\saveAsTextFile测试2")
rdd3.saveAsTextFile("D:\\IOText\\saveAsTextFile测试3")

上述方法会使文件内有对应你CPU核心数量的分区

修改RDD分区为1个

方式一:SparkConf对象设置属性为全局并行度为1

 

方式二:创建RDD的时候设置(parallelize方法传入numSlices的参数为1)


6.综合案例

搜索引擎日志分析

实现代码:
 

"""
    综合案例
"""

from pyspark import SparkConf, SparkContext
import os
import json

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"
os.environ['HADOOP_HOME'] = "D:\\CodeSoft\\Hadoop\\hadoop-3.0.0"

conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
conf.set("spark.default.parallelism", "1")
conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)

# 读取文件转换成RDD
file_rdd = sc.textFile("D:\\IOText\\search_log.txt")

# TODO 需求一:热门搜索时间段Top3(小时精度)
# 1.1 取出全部的时间并且转换为小时
# 1.2 转换为(小时,1)的二元组
# 1.3 key分组聚合value
# 1.4 排序(降序)
# 1.5 取前三
res1 = (file_rdd
        .map(lambda x: (x.split("\t")[0][:2], 1))
        .reduceByKey(lambda a, b: a + b)
        .sortBy(lambda x: x[1], ascending=False, numPartitions=1)
        .take(3)
        )
print(f"需求一的结果:{res1}")

# TODO 需求二:热门搜索词Top3
# 2.1 取出全部的搜索词
# 2.2 (词,1)二元组
# 2.3 分组聚合
# 2.4 排序
# 2.5 top3
res2 = (file_rdd
        .map(lambda x: (x.split("\t")[2], 1))
        .reduceByKey(lambda a, b: a + b)
        .sortBy(lambda x: x[1], ascending=False, numPartitions=1)
        .take(3)
        )
print(f"需求二的结果:{res2}")

# TODO 需求三:统计黑马程序员关键字在什么时间段被搜索的最多
# 3.1 过滤内容,只保留黑马程序员关键词
# 3.2 转换为(小时,1)的二元组
# 3.3 分组聚合
# 3.4 排序
# 3.5 取前一
res3 = (file_rdd
        .map(lambda x: x.split("\t"))
        .filter(lambda x: x[2] == "黑马程序员")
        .map(lambda x: (x[0][:2], 1))
        .reduceByKey(lambda a, b: a + b)
        .sortBy(lambda x: x[1], ascending=False, numPartitions=1)
        .take(1)
        )
print(f"需求三的结果:{res3}")

# TODO 需求四:将数据转化为JSON格式,写出到文件之中
# 4.1 转换为JSON格式的RDD
# 4.2 写出为文件
(file_rdd
 .map(lambda x: x.split("\t"))
 .map(lambda x: {
    "time": x[0],
    "user_id": x[1],
    "key_word": x[2],
    "rank1": x[3],
    "rank2": x[4],
    "url": x[5]}
      )
 .saveAsTextFile("D:\\IOText\\综合案例")
 )

结语

有啥好说的。呃呃呃呃呃呃呃没啥好说的

下班

┏(^0^)┛

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/121764.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

从哪些方面做好电商系统的网站建设?

电子商务的迅猛发展,建设一款成功的电商系统网站成为企业取得竞争优势的重要一环。下面将从用户体验、网站设计、安全性和性能优化等方面,介绍如何打造一款优秀的电商系统网站。 一、用户体验 一款成功的电商系统网站必须注重用户体验,确保用…

康耐视深度学习ViDi-ViDi四大工具之一蓝色定位工具/Locate

目录 工具介绍使用步骤说明调整工具ROI添加特征标签生成定位姿态训练并审核模型编辑器参数说明蓝色定位工具/Locate工具 工具介绍 蓝色定位工具用于识别和定位图像中的特定特征或特征组。该工具的输出可用于为其他ViDi 工具提供位置数据。使用该工具时,您提供图像训练集,然后…

MySQL中的datetime和timestamp有什么区别

相同点: 存储格式相同 datetime和timestamp两者的时间格式都是YYYY-MM-DD HH:MM:SS 不同点: 存储范围不同. datetime的范围是1000-01-01到9999-12-31. 而timestamp是从1970-01-01到2038-01-19, 即后者的时间范围很小. 与时区关系. datetime是存储服务器当前的时区. 而timesta…

电脑怎么做图片二维码?在线制作二维码的方法

图片制作二维码是现在经常被使用的一个功能,比如产品照片、自拍、海报等等不同格式或者类型的文件都可以生成二维码。那么想要快速完成二维码制作,使用图片二维码生成器就可以快速完成制作,本文将给大家分享一下在电脑上制作图片二维码的操作…

Presentation Prompter 5.4.2(mac屏幕提词器)

Presentation Prompter是一款演讲辅助屏幕提词器软件,旨在帮助演讲者在公共演讲、主持活动或录制视频时更加流畅地进行演讲。以下是Presentation Prompter的一些特色功能: 提供滚动或分页显示:可以将演讲稿以滚动或分页的形式显示在屏幕上&a…

管理视频推广工作:新媒体团队的成功策略

目前的新媒体团队,在视频管理时呈现出多、杂、散的特点,如何有效管理视频素材是当下许多新媒体团队的管理痛点,也是管理要点。高效的视频推广管理是新媒体团队提升产出效率的关键。 那么新媒体行业该如何管理视频推广工作? 数据…

【修车案例】一波形一案例(10)

故障车型: 2005 teana 2.0日产 维修厂: 建兴汽车保养厂示波器诊断: 通道A – ABS霍尔传感器信号测量故障分析: 诊断计算机报错左后轮胎轮速异常, 速度与其他车轮差较大。 通过示波器量测ABS信号, 2线式霍尔传感器, 信道A正极接信号线, 负极接地线, 干扰较严重就不建议从蓄电池…

windows环境下PHP7.4多线程设置

windows环境下的PHP设置多线程时有一定的难度,难点主要是PHP版本的选择,多线程扩展的选择,以及相关的设置等。 环境 windows 10php-7.4.33-Win32-vc15-x64php_parallel-1.1.4-7.4-ts-vc15-x64phpstudy 8.1.1.2 为了快速的部署PHP环境&…

Wireshark学习 与 TCP/IP协议分析

Wireshark简介和工具应用 如何开始抓包? 打开wireshark,显示如下网络连接。选择你正在使用的,(比如我正在使用无线网上网),双击 可以先看下自己的ip地址和网关ip地址(看抓包数据时候会用到&…

Mysql--高级(自定义函数、存储过程、视图、事务、索引)

自定义函数 语法 delimiter $$ create function 函数名称(参数列表) returns 返回类型 begin sql语句 end $$ delimiter ; 说明: delimiter用于设置分割符,默认为分号,主要用于命令行,在“sql语句”部分编写的语句需要以分号结尾,此时回车会…

Qt插件开发_入门教程

文章目录 前言插件的好处具体流程1. 第一,我们先创建一个主框架应用(**第一个工程**)2. GUI 设计 ![在这里插入图片描述](https://img-blog.csdnimg.cn/f215270ccfac4e038e7261c4b4891ec1.png)3. 创建动态库项目(**第2个工程**)4. 给插件项目添加qt界面类5.在插件工程添加一个头…

Unix环境高级编程-学习-02-进程环境之进程终止、命令行参数、环境表、C程序的存储空间布局

目录 一、环境信息 二、声明 三、进程终止 1、情况分类 2、退出函数 3、退出实验 (1)main声明int和调用return值 (2)main声明int和不调用return (3)main声明不int和不调用return 4、atexit 5、at…

SpringBoot加载测试类属性和配置说明

一、项目准备 1.创建项目 2.配置yml文件 test:name: FOREVERlove: sing二、测试类属性 1.Value 说明:读取yml中的数据。 package com.forever;import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Value; import org.spr…

Linux中固定ip端口和修改ip地址

一,更改虚拟网络编辑器 1,首先启动VMware,选择自己要更改ip或固定ip的虚拟机,并找到虚拟网络配编辑器,点击进入 2,进入之后需要点击右下角获取管理员权限后才能修改,有管理员权限之后图片如下 …

TSINGSEE青犀车辆违停AI算法在园区道路管控场景中的应用方案

一、背景与需求 园区作为企业办公、生产制造的重要场所,主要道路车辆违停等违规行为会对园区的安全造成隐患,并且在上下班高峰期内,由于发现不及时,车辆违停行为会造成出入口拥堵现象,这也成为园区管理的棘手问题。 …

C++入门(二)

前言 我们上一期介绍了什么是C,命名空间、输入输出、以及缺省参数。本期我们来继续介绍C的入门知识! 本期内容介绍 函数重载 引用 内联函数 auto关键字 范围for 指针空值nullptr 目录 前言 本期内容介绍 一、函数重载 什么是函数重载? …

Apple :苹果将在明年年底推出自己的 AI,预计将随 iOS 18 一起推出

本心、输入输出、结果 文章目录 Apple :苹果将在明年年底推出自己的 AI,预计将随 iOS 18 一起推出前言三星声称库克相关图片弘扬爱国精神 Apple :苹果将在明年年底推出自己的 AI,预计将随 iOS 18 一起推出 编辑:简简单…

Java关于由子类构造器生成的父类对象的反射问题

Java关于由子类构造器生成的父类对象的反射问题 问题概括一、案例准备二、问题描述 问题概括 提示:这里我就不绕圈子直接描述: Java中由子类构造器生成的父类的getclass.getName不是父类的类名而是子类的类名,因此不可以用子类构造器生成的…

2023年【安全员-B证】新版试题及安全员-B证免费试题

题库来源:安全生产模拟考试一点通公众号小程序 安全员-B证新版试题参考答案及安全员-B证考试试题解析是安全生产模拟考试一点通题库老师及安全员-B证操作证已考过的学员汇总,相对有效帮助安全员-B证免费试题学员顺利通过考试。 1、【多选题】下列哪些属…

HTML表格学习

HTML学习笔记二 HTML表格: HTML 表格由 标签来定义。 HTML 表格是一种用于展示结构化数据的标记语言元素。 tr:表示表格的一行。td:表示表格的数据单元格。th:表示表格的表头单元格。 数据单元格可以包含文本、图片、列表、段…