4. Python大数据编程入门
- 4.1 Python操作MySQL
- 4.2 Spark与PySpark
- 4.2.1 PySpark基础
- 4.2.2 数据输入
- 4.2.2.1 Python数据容器转换为RDD对象
- 4.2.2.2 读取文本文件得到RDD对象
- 4.2.3 数据计算
- 4.2.3.1 map算子
- 4.2.3.2 flatMap算子
- 4.2.3.3 reduceByKey算子
- 4.2.3.4 案例:单词计数
- 4.2.3.5 filter算子
- 4.2.3.6 distinct算子
- 4.2.3.7 sortBy算子
- 4.2.3.8 案例:货物售卖统计
- 4.2.4 数据输出
- 4.2.4.1 collect算子
- 4.2.4.2 reduce算子
- 4.2.4.3 take算子
- 4.2.4.4 count算子
- 4.2.4.5 输出到文件(saveAsTextFile算子)
- 4.2.5 案例:搜索引擎日志分析
4.1 Python操作MySQL
首先需要安装pymysql外部包
from pymysql import Connection
# 构建到MySQL数据库的连接
connection = Connection(
host="localhost",
port=3306,
user="root",
password="123456"
)
# 获取游标
cunsor = connection.cursor()
# 选择数据库
connection.select_db("test")
# 执行sql
cunsor.execute("select * from people")
# 获取查询结果
results : tuple = cunsor.fetchall()
for result in results:
print(result)
# 关闭连接
connection.close()
在插入数据时,需要手动提交事务
connection.commit()
或者在Connection构造方法中传参
autocommit=True
4.2 Spark与PySpark
4.2.1 PySpark基础
-
Spark是Apache基金会旗下的顶级开源项目,用于对海量数据进行大规模分布式计算。
-
PySpark是Spark的Python实现,是Spark为Python开发者提供的编程入口,用于以Python代码完成Spark任务的开发
-
PySpark不仅可以作为Python第三方库使用,也可以将程序提交的Spark集群环境中,调度大规模集群进行执行。
-
国内代理镜像网站(清华大学源)
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark
-
PySpark的编程,主要分为三个步骤
想要使用PySpark库完成数据处理,首先需要构建一个执行环境入口对象。
PySpark的执行环境入口对象是:类 SparkContext 的类对象
SparkContext类对象,是PySpark编程中一切功能的入口。
from pyspark import SparkConf, SparkContext
# 创建SparkConf对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext运行(停止PySpark程序)
sc.stop()
4.2.2 数据输入
-
PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象
-
RDD全称为:弹性分布式数据集(Resilient Distributed Datasets)
-
RDD对象称之为分布式弹性数据集,是PySpark中数据计算的载体,它可以:
- 提供数据存储
- 提供数据计算的各类方法
- 数据计算的方法,返回值依旧是RDD(RDD迭代计算)
4.2.2.1 Python数据容器转换为RDD对象
from pyspark import SparkConf, SparkContext
# 创建SparkConf对象和SparkContext对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 通过SparkContext的parallelize成员方法,将Python数据容器转换为RDD对象
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize((1, 2, 3, 4, 5))
rdd3 = sc.parallelize("abcdefg")
rdd4 = sc.parallelize({1, 2, 3, 4, 5})
rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})
# 如果要查看RDD中的内容,需要调用collect()方法
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())
# 停止SparkContext运行(停止PySpark程序)
sc.stop()
4.2.2.2 读取文本文件得到RDD对象
from pyspark import SparkConf, SparkContext
# 创建SparkConf对象和SparkContext对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 通过SparkContext的textFile成员方法,读取文本文件得到RDD对象
rdd = sc.textFile("D:\\MyStudy\\MyCode\\PycharmProjects\\py_mysql\\test.txt")
print(rdd.collect())
# 停止SparkContext运行(停止PySpark程序)
sc.stop()
4.2.3 数据计算
4.2.3.1 map算子
- map算子是将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe"
# 创建SparkConf对象和SparkContext对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 通过map方法将全部数据都乘以十,链式调用
rdd2 = rdd.map(lambda data: data * 10).map(lambda data: data + 5)
print(rdd2.collect())
# 停止SparkContext运行(停止PySpark程序)
sc.stop()
4.2.3.2 flatMap算子
- 对rdd执行map操作,然后进行解除嵌套操作。
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe"
# 创建SparkConf对象和SparkContext对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize(["this is", "python 310", "flatmap", "rdd spark"])
# 通过map方法将全部数据都乘以十,链式调用
rdd2 = rdd.flatMap(lambda data: data.split(" "))
print(rdd2.collect())
# 停止SparkContext运行(停止PySpark程序)
sc.stop()
4.2.3.3 reduceByKey算子
- 针对KV型RDD,自动按照key分组**,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作。
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize([('boy', 99), ('boy', 88), ('boy', 77), ('girl', 99), ('girl', 66)])
# 求男生和女生两个组的成绩之和
rdd2 = rdd.reduceByKey(lambda a, b: a + b)
print(rdd2.collect())
sc.stop()
4.2.3.4 案例:单词计数
提前设置好一个等待读取的txt文档:
spark python java zhang rdd python rdd
python java cpp c cpp spark pyspark zhang
rdd c python py pyspark python pp rdd
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 读取数据文件
rdd = sc.textFile("D:\\MyStudy\\MyCode\\PycharmProjects\\py_mysql\\test.txt")
# 取出全部单词
word_rdd = rdd.flatMap(lambda data: data.split(" "))
# 将单词转换为二元元组,单词为key,value设置为1
word_one_rdd = word_rdd.map(lambda word: (word, 1))
# 分组求和
result_rdd = word_one_rdd.reduceByKey(lambda a, b: a + b)
# 打印结果
print(result_rdd.collect())
sc.stop()
4.2.3.5 filter算子
-
接受一个处理函数,可用lambda快速编写
-
函数对RDD数据逐个处理,得到True的保留至返回值的RDD中
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 取出列表中的奇数
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
rdd2 = rdd.filter(lambda num: num % 2 == 0)
print(rdd2.collect())
sc.stop()
4.2.3.6 distinct算子
- 去重,返回新RDD对象
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5, 1, 2, 6, 7, 3])
rdd2 = rdd.distinct()
print(rdd2.collect())
sc.stop()
4.2.3.7 sortBy算子
- 对RDD数据进行排序,基于指定的排序依据
- 对单词统计案例中的结果进行排序
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.textFile("D:\\MyStudy\\MyCode\\PycharmProjects\\py_mysql\\test.txt")
word_rdd = rdd.flatMap(lambda data: data.split(" "))
word_one_rdd = word_rdd.map(lambda word: (word, 1))
result_rdd = word_one_rdd.reduceByKey(lambda a, b: a + b)
result_rdd_sortBy = result_rdd.sortBy(lambda t: t[1], ascending=False, numPartitions=1)
print(result_rdd_sortBy.collect())
sc.stop()
4.2.3.8 案例:货物售卖统计
需求:把下述内容写入txt文件,使用Spark读取文件进行计算
-
各个城市销售额排名,从大到小
-
全部城市,有哪些商品类别在售卖
-
北京市有哪些商品类别在售卖
{"id":1,"timestamp":"2019-05-08T01:03.00Z","category":"平板电脑","areaName":"北京","money":"1450"}|{"id":2,"timestamp":"2019-05-08T01:01.00Z","category":"手机","areaName":"北京","money":"1450"}|{"id":3,"timestamp":"2019-05-08T01:03.00Z","category":"手机","areaName":"北京","money":"8412"}
{"id":4,"timestamp":"2019-05-08T05:01.00Z","category":"电脑","areaName":"上海","money":"1513"}|{"id":5,"timestamp":"2019-05-08T01:03.00Z","category":"家电","areaName":"北京","money":"1550"}|{"id":6,"timestamp":"2019-05-08T01:01.00Z","category":"电脑","areaName":"杭州","money":"1550"}
{"id":7,"timestamp":"2019-05-08T01:03.00Z","category":"电脑","areaName":"北京","money":"5611"}|{"id":8,"timestamp":"2019-05-08T03:01.00Z","category":"家电","areaName":"北京","money":"4410"}|{"id":9,"timestamp":"2019-05-08T01:03.00Z","category":"家具","areaName":"郑州","money":"1120"}
{"id":10,"timestamp":"2019-05-08T01:01.00Z","category":"家具","areaName":"北京","money":"6661"}|{"id":11,"timestamp":"2019-05-08T05:03.00Z","category":"家具","areaName":"杭州","money":"1230"}|{"id":12,"timestamp":"2019-05-08T01:01.00Z","category":"书籍","areaName":"北京","money":"5550"}
{"id":13,"timestamp":"2019-05-08T01:03.00Z","category":"书籍","areaName":"北京","money":"5550"}|{"id":14,"timestamp":"2019-05-08T01:01.00Z","category":"电脑","areaName":"北京","money":"1261"}|{"id":15,"timestamp":"2019-05-08T03:03.00Z","category":"电脑","areaName":"杭州","money":"6660"}
{"id":16,"timestamp":"2019-05-08T01:01.00Z","category":"电脑","areaName":"天津","money":"6660"}|{"id":17,"timestamp":"2019-05-08T01:03.00Z","category":"书籍","areaName":"北京","money":"9000"}|{"id":18,"timestamp":"2019-05-08T05:01.00Z","category":"书籍","areaName":"北京","money":"1230"}
{"id":19,"timestamp":"2019-05-08T01:03.00Z","category":"电脑","areaName":"杭州","money":"5551"}|{"id":20,"timestamp":"2019-05-08T01:01.00Z","category":"电脑","areaName":"北京","money":"2450"}
{"id":21,"timestamp":"2019-05-08T01:03.00Z","category":"食品","areaName":"北京","money":"5520"}|{"id":22,"timestamp":"2019-05-08T01:01.00Z","category":"食品","areaName":"北京","money":"6650"}
{"id":23,"timestamp":"2019-05-08T01:03.00Z","category":"服饰","areaName":"杭州","money":"1240"}|{"id":24,"timestamp":"2019-05-08T01:01.00Z","category":"食品","areaName":"天津","money":"5600"}
{"id":25,"timestamp":"2019-05-08T01:03.00Z","category":"食品","areaName":"北京","money":"7801"}|{"id":26,"timestamp":"2019-05-08T01:01.00Z","category":"服饰","areaName":"北京","money":"9000"}
{"id":27,"timestamp":"2019-05-08T01:03.00Z","category":"服饰","areaName":"杭州","money":"5600"}|{"id":28,"timestamp":"2019-05-08T01:01.00Z","category":"食品","areaName":"北京","money":"8000"}|{"id":29,"timestamp":"2019-05-08T02:03.00Z","category":"服饰","areaName":"杭州","money":"7000"}
from pyspark import SparkConf, SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 公共操作
# 打开文件并获取每个json字符串
file_rdd = sc.textFile("D:\\MyStudy\\MyCode\\PycharmProjects\\py_mysql\\data.txt")
json_str_rdd = file_rdd.flatMap(lambda data: data.split("|"))
# 通过json类的类方法转化为字典对象
dict_rdd = json_str_rdd.map(lambda data: json.loads(data))
# TODO 需求1:城市销售额排名
# 清洗数据,保留字典中的有用信息
clear_data_rdd = dict_rdd.map(lambda data: (data['areaName'], int(data['money'])))
# 按照地名,聚合数据,并进行排序
aggregation_rdd = clear_data_rdd.reduceByKey(lambda a, b: a + b)
aggregation_sort_rdd = aggregation_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
# 打印结果
print(f"城市销售额排名为:{aggregation_sort_rdd.collect()}")
# TODO 需求2:全部城市中有哪些商品在售卖
# 清洗数据,保留字典中的有用信息
clear_data_rdd = dict_rdd.map(lambda data: (data['category']))
# 去重
category_rdd = clear_data_rdd.distinct()
# 打印结果
print(f"所有正在售卖的商品有:{category_rdd.collect()}")
# TODO 需求3:北京有哪些商品在售卖
# 过滤数据,保留满足地名为北京的json数据
bj_rdd = dict_rdd.filter(lambda data: data['areaName'] == '北京')
# 清洗数据,保留字典中的有用信息
clear_data_rdd = bj_rdd.map(lambda data: (data['category']))
# 去重
category_rdd = clear_data_rdd.distinct()
# 打印结果
print(f"北京正在售卖的商品有:{category_rdd.collect()}")
# 公共操作
sc.stop()
4.2.4 数据输出
-
collect:将RDD内容转换为list
-
reduce:对RDD内容进行自定义聚合
-
take:取出RDD的前N个元素组成list
-
count:统计RDD元素个数
4.2.4.1 collect算子
- 将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.collect())
print(type(rdd.collect()))
sc.stop()
4.2.4.2 reduce算子
- 对RDD数据集按照你传入的逻辑进行聚合
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5])
num = rdd.reduce(lambda a, b: a + b)
print(num)
sc.stop()
4.2.4.3 take算子
- 取RDD的前N个元素,组合成list返回
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = rdd.take(3)
print(rdd2)
sc.stop()
4.2.4.4 count算子
- 计算RDD有多少条数据,返回值是一个数字
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = rdd.count()
print(rdd2)
sc.stop()
4.2.4.5 输出到文件(saveAsTextFile算子)
-
将RDD的数据写入文本文件中
-
支持本地写出,hdfs等文件系统
-
调用保存文件的算子,需要配置Hadoop依赖
-
下载Hadoop安装包
- http://archive.apache.org/dist/hadoop/common/hadoop-3.0.0/hadoop-3.0.0.tar.gz
-
解压到电脑任意位置
-
在Python代码中使用os模块配置:os.environ[‘HADOOP_HOME’] = ‘HADOOP解压文件夹路径’
-
下载winutils.exe,并放入Hadoop解压文件夹的bin目录内
- https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/winutils.exe
-
下载hadoop.dll,并放入:C:/Windows/System32 文件夹内
- https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/hadoop.dll
-
-
修改rdd分区为1个
-
case1:SparkConf对象设置属性全局并行度为1:
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") conf.set("spark.default.parallelism", "1") sc = SparkContext(conf=conf)
-
case2:创建RDD的时候设置(parallelize方法传入numSlices参数为1)
rdd1 = sc.parallelize([1, 2, 3, 4, 5], numSlices=1) # 或者 rdd2 = sc.parallelize([1, 2, 3, 4, 5], 1) rdd1.saveAsTextFile("rdd1") rdd2.saveAsTextFile("rdd2")
-
4.2.5 案例:搜索引擎日志分析
- 读取文件转换成RDD,并完成:
- 打印输出:热门搜索时间段(小时精度)Top3
- 打印输出:热门搜索词Top3
- 打印输出:统计黑马程序员关键字在哪个时段被搜索最多
- 将数据转换为JSON格式,写出为文件
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe"
os.environ['HADOOP_HOME'] = "D:\\MyStudy\\Environment\\Hadoop\\spark-3.3.1-bin-hadoop3"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)
# 读取文件转化为RDD
file_rdd = sc.textFile("D:\\MyStudy\\MyCode\\PycharmProjects\\py_mysql\\search_log.txt")
# TODO 需求1:热门搜索时间段(小时精度)Top3
# 将rdd返回的list中的每个元素按\t划分,列表嵌套
# 每行(内层列表)第0个元素即为时间,如:00:00:00,且前两位为小时
# 将小时组装为元组,并按照小时进行聚合
# 按照出现总次数进行降序排序,取前三位即为TOP3
result1 = file_rdd.map(lambda x: x.split("\t")). \
map(lambda x: x[0][:2]). \
map(lambda x: (x + "点", 1)). \
reduceByKey(lambda a, b: a + b). \
sortBy(lambda x: x[1], ascending=False, numPartitions=1). \
take(3)
print(f"热门搜索时间段(小时精度)Top3:{result1}")
# TODO 需求2:热门搜索词Top3
result2 = file_rdd.map(lambda x: (x.split("\t")[2], 1)). \
reduceByKey(lambda a, b: a + b). \
sortBy(lambda x: x[1], ascending=False, numPartitions=1). \
take(3)
print(f"热门搜索词Top3:{result2}")
# TODO 需求3:统计黑马程序员关键字在哪个时段被搜索最多
result3 = file_rdd.map(lambda x: x.split("\t")). \
filter(lambda x: x[2] == "黑马程序员"). \
map(lambda x: (x[0][:2], 1)). \
reduceByKey(lambda a, b: a + b). \
sortBy(lambda x: x[1], ascending=False, numPartitions=1). \
take(1)
print(f"统计黑马程序员关键字在哪个时段被搜索最多:{result3}")
# TODO 需求4:将数据转换为JSON格式,写出为文件
file_rdd.map(lambda x: x.split("\t")). \
map(lambda x: {"time": x[0], "user_id": x[1], "key_word": x[2], "rank1": x[3], "rank2": x[4], "url": x[5]}). \
saveAsTextFile("output_json")
print("文件已写出")
sc.stop()