【头歌实训】PySpark Streaming 入门

文章目录

  • 第1关:SparkStreaming 基础 与 套接字流
    • 任务描述
    • 相关知识
      • Spark Streaming 简介
      • Python 与 Spark Streaming
      • Python Spark Streaming API
      • Spark Streaming 初体验(套接字流)
    • 编程要求
    • 测试说明
    • 答案代码
  • 第2关:文件流
    • 任务描述
    • 相关知识
      • 文件流概述
      • Python 与 Spark Streaming 文件流
      • Spark Streaming 文件流初体验
    • 编程要求
    • 测试说明
    • 答案代码
  • 第3关:RDD 队列流
    • 任务描述
    • 相关知识
      • 队列流概述
      • Python 与 Spark Streaming 队列流
      • Spark Streaming 队列流初体验
    • 编程要求
    • 测试说明
    • 答案代码

第1关:SparkStreaming 基础 与 套接字流

任务描述

本关任务:使用 Spark Streaming 实现词频统计。

相关知识

为了完成本关任务,你需要掌握:

  1. Spark Streaming 简介;
  2. Python 与 Spark Streaming;
  3. Python Spark Streaming API;
  4. Spark Streaming 初体验(套接字流)。

Spark Streaming 简介

Spark Streaming 是 Spark 的核心组件之一,为 Spark 提供了可拓展、高吞吐、容错的流计算能力。如下图所示,Spark Streaming 可整合多种输入数据源,如 Kafka、Flume、HDFS,甚至是普通的 TCP 套接字。经处理后的数据可存储至文件系统、数据库,或显示在仪表盘里。

img

Spark Streaming 的基本原理是将实时输入数据流以时间片(秒级)为单位进行拆分,然后经 Spark 引擎以类似批处理的方式处理每个时间片数据,执行流程如下图所示。

img

Spark Streaming 最主要的抽象是 DStream(Discretized Stream,离散化数据流),表示连续不断的数据流。在内部实现上,Spark Streaming 的输入数据按照时间片(如 1 秒)分成一段一段的 DStream,每一段数据转换为 Spark 中的 RDD,并且对 DStream 的操作都最终转变为对相应的 RDD 的操作。例如,下图展示了进行单词统计时,每个时间片的数据(存储句子的 RDD)经 flatMap 操作,生成了存储单词的 RDD。整个流式计算可根据业务的需求对这些中间的结果进一步处理,或者存储到外部设备中。

Python 与 Spark Streaming

在 Python 中使用 Spark Streaming 只需要下载 pyspark 扩展库即可,命令如下:

pip install pyspark

键入命令后,等待下载完成。

img

出现如上图所示,则表示安装完成。

创建 Spark Streaming 的上下文对象:

方式一

# 导入包
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
# 通过 SparkConf() 设置配置参数
sparkConf = SparkConf()
sparkConf.setAppName('demo')
sparkConf.setMaster('local[*]')
# 创建 spark 上下文对象
sc = SparkContext(conf=sparkConf)
# 创建 Spark Streaming 上下文对象
# 参数1:spark 上下文对象
# 参数2:读取间隔时间(秒)
ssc = StreamingContext(sc, 5)

方式二

# 导入包
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 创建 spark 上下文对象
sc = SparkContext("local[*]", "demo")
# 创建 Spark Streaming 上下文对象
# 参数1:spark 上下文对象
# 参数2:读取间隔时间(秒)
ssc = StreamingContext(sc, 5)

Python Spark Streaming API

pyspark 库中有很多丰富的 API 提供使用,下面将介绍常用的一些 API。

Spark Streaming 核心 API

名称释义
StreamingContext(sparkContext[, …])Spark Streaming 功能的主要入口点。
DStream(jdstream、ssc、jrdd_deserializer)离散流 (DStream) 是 Spark Streaming 中的基本抽象,是表示连续数据流的 RDD 的连续序列(相同类型)。

Spark Streaming 操作 API

名称释义
StreamingContext.addStreamingListener(…)添加一个 [[org.apache.spark.streaming.scheduler.StreamingListener]] 对象,用于接收与流相关的系统事件。
StreamingContext.awaitTermination([timeout])等待执行停止。
StreamingContext.awaitTerminationOrTimeout([timeout])等待执行停止。
StreamingContext.checkpoint(directory)设置上下文以定期检查 DStream 操作以实现主控容错。
StreamingContext.getActive()返回当前活动的 StreamingContext 或无。
StreamingContext.getActiveOrCreate(……)要么返回活动的 StreamingContext(即当前已启动但未停止),要么从检查点数据重新创建 StreamingContext 或使用提供的 setupFunc 函数创建新的 StreamingContext。
StreamingContext.remember(duration)在此上下文中设置每个 DStreams 以记住它在最后给定持续时间内生成的 RDD。
StreamingContext.sparkContext返回与此 StreamingContext 关联的 SparkContext。
StreamingContext.start()开始执行流。
StreamingContext.stop([stopSparkContext,…])停止流的执行,可选择确保所有接收到的数据都已处理。
StreamingContext.transform(dstreams,……)创建一个新的 DStream,其中每个 RDD 都是通过在 DStream 的 RDD 上应用函数来生成的。
StreamingContext.union(*dstreams)从多个相同类型和相同幻灯片时长的 DStream 创建一个统一的 DStream。

输入与输出 API

名称释义
StreamingContext.binaryRecordsStream(……)创建一个输入流,用于监控与 Hadoop 兼容的文件系统中的新文件,并将它们作为具有固定长度记录的平面二进制文件读取。
StreamingContext.queueStream(rdds[, …])从 RDD 或列表的队列中创建输入流。
StreamingContext.socketTextStream(hostname, port)从 TCP 源主机名创建输入:端口。
StreamingContext.textFileStream(directory)创建一个输入流,用于监视与 Hadoop 兼容的文件系统中的新文件并将它们作为文本文件读取。
DStream.pprint([num])打印此 DStream 中生成的每个 RDD 的前 num 个元素。
DStream.saveAsTextFiles(prefix[, suffix])将此 DStream 中的每个 RDD 保存为文本文件,使用元素的字符串表示。

常用的转换与操作 API

名称释义
DStream.count()返回一个新的 DStream,其中每个 RDD 都有一个元素,该元素是通过计算此 DStream 的每个 RDD 生成的。
DStream.countByValue()返回一个新的 DStream,其中每个 RDD 包含此 DStream 的每个 RDD 中每个不同值的计数。
DStream.filter(F)返回一个新的 DStream,仅包含满足条件的元素。
DStream.flatMap(f[,preservesPartitioning])通过对该 DStream 的所有元素应用一个函数,然后将结果展平,返回一个新的 DStream。
DStream.flatMapValues(F)通过将 flatmap 函数应用于此 DStream 中每个键值对的值而不更改键,返回一个新的 DStream。
DStream.foreachRDD(func)对这个 DStream 中的每个 RDD 应用一个函数。
DStream.groupByKey([numPartitions])通过在每个 RDD 上应用 groupByKey 返回一个新的 DStream。
DStream.join(other[,numPartitions])通过在这个 DStream 和其他DStream 的 RDD 之间应用 ‘join’ 返回一个新的DStream。
DStream.map(f[,preservesPartitioning])通过对 DStream 的每个元素应用一个函数来返回一个新的 DStream。
DStream.mapValues(F)通过对该 DStream 中每个键值对的值应用映射函数返回一个新的 DStream,而不更改键。
DStream.reduce(func)返回一个新的 DStream,其中每个 RDD 具有通过减少此 DStream 的每个 RDD 生成的单个元素。
DStream.reduceByKey(func[,numPartitions])通过对每个 RDD 应用 reduceByKey 来返回一个新的 DStream。
DStream.updateStateByKey(updateFunc[, …])返回一个新的“状态” DStream,其中每个键的状态通过对键的先前状态和键的新值应用给定函数来更新。

Spark Streaming 初体验(套接字流)

下面让我们快速了解一下简单的 Spark Streaming 程序是什么样的,假设我们要计算从侦听 TCP 套接字的数据服务器接收到的文本数据中的字数,实现一个流式的 WordCount 计算程序。

第一步,导入包

打开右侧命令行窗口,等待连接后,在主目录下创建文件 test.py,导入 Spark Streaming 所需要的包。

touch test.py
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

第二步,创建上下文对象

首先,我们导入StreamingContext,它是所有流功能的主要入口点。创建一个具有多个执行线程的本地 StreamingContext,批处理间隔为 20 秒。

sc = SparkContext("local[*]", "demo")
# 每 20 秒读取一次
ssc = StreamingContext(sc, 20) 

第三步,指定数据流

使用这个上下文,我们可以创建一个表示来自 TCP 源的流数据的 DStream,指定为主机名(例如:localhost)和端口(例如:7777)。

lines = ssc.socketTextStream("localhost", 7777)

第四步,分词统计与输出

接下来,我们要按空格(根据数据流的情况来)将行拆分为单词。

words = lines.flatMap(lambda line: line.split(" "))

在这种情况下,每一行将被拆分为多个单词,单词流表示为 wordsDStream。接下来,我们要计算这些单词,输出结果到屏幕。

pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()

第五步,启动与停止

请注意,当执行这些行时,Spark Streaming 仅设置它在启动时将执行的计算,并且尚未开始真正的处理。在所有转换设置完成后才开始处理,所以我们最后调用。

# 开始执行流
ssc.start()
# 等待计算终止
ssc.awaitTermination()

启动前我们需要先新开一个命令行窗口用于创建数据流服务器发送端。点击右侧 + 号,新增一个命令行窗口,启动数据流服务器。

nc -l -p 7777

必须先启动数据流服务器,然后再开始执行程序。

回到刚刚的代码窗口,启动程序,开始监听。启动后,我们切换到数据流服务器窗口,输入如下单词:

hello python
hello spark
hello spark streaming

代码窗口界面结果输出如下:

,

当我们在数据流服务器窗口再次输入和上面一样的单词时,发现结果没有进行累加,如下所示:

,

这是由于我们并没有实现更新的操作,我们需要使用 updateStateByKey(func) 方法对其进行累加统计,其参数为一个函数,也就是根据传入的这个函数来实现状态更新功能。

当我们使用累加器时还需借助 checkpoint() 方法设置检查点,告知累加器其检查区域,其参数为一个字符串,指定为保存检查点的目录,如果指定目录未存在,则会自动创建。

具体实现方式如下代码所示:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 累加器(状态更新)
def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)
sc = SparkContext("local[*]", "demo")
# 设置输入日志等级
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 20)
# 设置检查点
ssc.checkpoint("file:///usr/local/word_log")
# 指定监听端口
lines = ssc.socketTextStream("localhost", 7777)
# 进行词频统计
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
# 调用累加器
wordCounts = pairs.updateStateByKey(updateFunction)
# 输出到屏幕
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()

**运行程序,注意,先启动数据流服务器。**输入如下数据两次,请在第一次数据输出到屏幕上后再输入第二次:

hello python
hello spark
hello spark streaming

第一次结果如下:

,

第二次结果如下:

,

从结果中可以看出,我们已经实现了从套接字流中读取数据并完成词频统计。

编程要求

打开右侧代码文件窗口,在 BeginEnd 区域补充代码,执行程序,读取套接字流数据,按空格进行分词,完成词频统计。补充代码,将词频统计的输出内容存储到 /data/workspace/myshixun/project/step1/result 文件中。

代码文件目录: /data/workspace/myshixun/project/step1/work.py

套接字流相关信息:

  • 地址:localhost
  • 端口:8888
  • 输入数据:

程序启动后(5s),请在 60 秒内写入数据,如果需要调整时间,你可以通过修改代码文件中 ssc.awaitTermination(timeout=60) timeout 指定时间。

It is believed that the computer is bringing the world into a brand new era. 
At the time the computer was invented, scientists, marveling at its calculating speed, 
felt that they had created a miracle.
Nowadays, the function of the computer is no longer confined to calculation; 
It permeates peoples daily lives and has become an inseparable part of human society.

输入内容后,注意按回车。

检查点存放本地目录:/root/mylog/

请在程序运行完成后再点击评测,否则会影响评测结果。

小贴士:

  • pprint() 方法中可以设置数据输出显示的数量。

测试说明

平台将对你编写的代码进行评测,如果与预期结果一致,则通关,否则测试失败。

答案代码

先写入代码

#!/usr/local/bin/python
from pyspark import SparkContext
from pyspark.streaming import StreamingContext


# 累加器(状态更新)
def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)


sc = SparkContext("local[*]", "work")

ssc = StreamingContext(sc, 10)

###################### Begin ######################
# 设置检查点
ssc.checkpoint("/root/mylog/")
# 指定监听端口
lines = ssc.socketTextStream("localhost", 8888)
# 进行词频统计
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
# 调用累加器
wordCounts = pairs.updateStateByKey(updateFunction)
# 输出到屏幕
wordCounts.pprint()

# 保存输出内容到指定文件中
wordCounts.saveAsTextFiles("/data/workspace/myshixun/project/step1/result","txt")

###################### End ######################

ssc.start()

ssc.awaitTermination(timeout=60)

 

在第一个命令行窗口执行,启动数据流服务器

mkdir -p /root/mylog/
cd /root/mylog/
nc -l -p 8888

启动程序,开始监听后,打开另一个命令行窗口执行

cd /data/workspace/myshixun/project/step1/
chmod 777 work.py
python work.py # 现在开始运行代码文件,请在 60 秒内写入下面数据

回到第一个命令行窗口下把下面数据粘贴上去,再打一个回车

It is believed that the computer is bringing the world into a brand new era. 
At the time the computer was invented, scientists, marveling at its calculating speed, 
felt that they had created a miracle.
Nowadays, the function of the computer is no longer confined to calculation; 
It permeates peoples daily lives and has become an inseparable part of human society.

再去另一个命令行窗口就可以看到正在统计词频了

第2关:文件流

任务描述

本关任务:使用 Spark Streaming 实现文件目录监听,完成词频统计。

相关知识

为了完成本关任务,你需要掌握:

  1. 文件流概述;
  2. Python 与 Spark Streaming 文件流;
  3. Spark Streaming 文件流初体验。

文件流概述

文件流就是数据从一个地方流到另一个地方,像一块大蛋糕一样把一个大的文件分成一块一块的流过去就叫文件流。其中流分为输入流与输出流,输入流指从外界向我们的程序中移动的方向,因此是用来获取数据的流,作用就是读操作。输出流与之相反,从程序向外界移动的方向,用来输出数据的流,作用就是写操作。流是单向的,输入用来读,输出用来写。

img

那么我们为什么需要流呢?

  • 当外部设备与内存中的数据规模不一致,内存小,外部设备大,如果内存大小只有 1G ,但从磁盘读 2G,不能一次读完,这时就需要流。
  • 当外部设备与内存处理数据的能力不一致,内存处理数据快,外部设备慢,内存给磁盘写了 1G ,磁盘可能需要 5 秒去处理写数据,其他事件就会受到影响,这时就需要流。
  • 当读取或者写入大文件时数据会推挤在内存中,导致效率低(内存数据多,导致执行时间变长),这时就需要流。

Python 与 Spark Streaming 文件流

Spark 支持从兼容 HDFS API 的文件系统中读取数据,创建数据流。在 Python 中使用 Spark Streaming 文件流十分简单,通过 textFileStream() 方法就可以对创建文件流。

在 Python 中创建 Spark Streaming 文件流:

# 导入包
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
# 通过 SparkConf() 配置参数
sparkConf = SparkConf()
sparkConf.setAppName('demo')
sparkConf.setMaster('local[*]')
# 创建 spark 上下文对象
sc = SparkContext(conf=sparkConf)
# 创建 Spark Streaming 上下文对象
# 参数1:spark 上下文对象
# 参数2:读取间隔时间(秒)
ssc = StreamingContext(sc, 10)
# 指定目录或文件,创建文件流
ssc.textFileStream("xxxxxx")

Spark Streaming 文件流初体验

通过对文件流及其创建方法的了解,我们现在通过实际的文件流案例来学习 Spark Streaming 读取文件流的具体实现。

打开右侧命令行窗口,创建一个目录 test,并在里面创建两个子文件 log1.txtlog2.txt,用于模拟数据。

cd /root
mkdir test
cd /root/test
touch log1.txt log2.txt

创建完成后,我们在新建的两个文件 log1.txtlog2.txt 中任意写入一些数据。

echo -e "hello python \nhello spark streaming \nI love big data!" > /root/test/log1.txt
echo -e "hello python \nhello spark streaming \nI love big data!" > /root/test/log2.txt

下面我们就进入 python shell 界面,创建文件流。

python

进入后,出现如下界面:

,

第一步,指定监听目录 /root/test,创建 Spark Streaming 文件流

# 导入包
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 创建 spark 上下文对象
sc = SparkContext("local[*]","demo")
# 创建 Spark Streaming 上下文对象
ssc = StreamingContext(sc, 10)
# 指定 /root/test 目录,创建文件流
lines = ssc.textFileStream("/root/test")

第二步,数据处理

完成对文件流中相关数据的处理。

lines.pprint()

第三步,启动与停止

ssc.start()
ssc.awaitTermination()

运行后发现,并没有输出我们之前写入到文件 log1.txtlog2.txt中的内容。

,

原因是,程序启动后只会只监听 /root/test 目录下在程序启动后新增的文件,不会去处理历史上已经存在的文件,即使你对其进行更新操作。

现在我们点击 + 号新增一个命令行窗口,验证是否真的如此。打开新窗口后,切换到监听目录中,创建一个新文件 log3.txt,任意写入一些数据。

cd /root/test
vi log3.txt
hello python
hello spark

此时我们返回程序运行窗口,稍作等待,查看输出内容,发现刚刚创建的文件 log3.txt 其中的内容输出到了屏幕上。

,

现在我们来测试更新操作是否会被读取到,对程序运行前创建的文件 log2.txt 进行更新操作,任意追加一些内容。

cd /root/test
vi log2.txt
I like ping!

,

此时我们返回程序运行窗口,稍作等待,查看输出内容,发现对 log2.txt 文件追加的内容并没有输出到屏幕上。

,

说明程序运行前监听目录下的文件并不会被识别。

文件流的扩展知识:

  • 可以提供 POSIX glob 模式,例如:hdfs://namenode:8040/logs/2017/*,在这里,DStream 将包含与该模式匹配的目录中的所有文件。也就是说:它是目录的模式,而不是目录中的文件。
  • 所有文件必须采用相同的数据格式。
  • 文件根据其修改时间而非创建时间被视为时间段的一部分。
  • 一旦处理完毕,在当前窗口中对文件的更改不会导致文件被重新读取,即:更新被忽略。
  • 目录下的文件越多,扫描更改所需的时间就越长——即使没有文件被修改。
  • 如果使用通配符来标识目录,例如:hdfs://namenode:8040/logs/2016-*,重命名整个目录以匹配路径,则会将该目录添加到受监视目录列表中。只有目录中修改时间在当前窗口内的文件才会包含在流中。
  • 调用FileSystem.setTimes() 修复时间戳是一种在以后的窗口中拾取文件的方法,即使它的内容没有改变。

编程要求

打开右侧代码文件窗口,在 BeginEnd 区域补充代码,执行程序,读取文件流数据,按空格进行分词,完成词频统计。补充代码,将词频统计的输出内容存储到 /data/workspace/myshixun/project/step2/result 文件中。

代码文件目录: /data/workspace/myshixun/project/step2/work.py

文件流相关信息:

  • 监听目录:/root/file_stream (需要自行创建)
  • 文件名称:words.txt (需要自行创建)
  • 文件内的数据:

程序启动后(5s),请在 60 秒内创建文件并写入数据,如果需要调整时间,你可以通过修改代码文件中 ssc.awaitTermination(timeout=60) timeout 指定时间。

Hiding behind the loose dusty curtain, a teenager packed up his overcoat into the suitcase.
He planned to leave home at dusk though there was thunder and lightning outdoors.
As a result, his score in each exam never added up to over 60, his name is LiMing.

输入内容后,注意保存退出。

检查点存放本地目录:/root/mylog2/

小贴士:

  • pprint() 方法中可以设置数据输出显示的数量。

测试说明

平台将对你编写的代码进行评测,如果与预期结果一致,则通关,否则测试失败。

答案代码

#!/usr/local/bin/python
from pyspark import SparkContext
from pyspark.streaming import StreamingContext


# 累加器(状态更新)
def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)


sc = SparkContext("local[*]", "work")

ssc = StreamingContext(sc, 10)

###################### Begin ######################
# 设置检查点
ssc.checkpoint("/root/mylog2/")
# 指定监听端口
lines = ssc.textFileStream("/root/test")
# 进行词频统计
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
# 调用累加器
wordCounts = pairs.updateStateByKey(updateFunction)
# 输出到屏幕
wordCounts.pprint()

# 保存输出内容到指定文件中
wordCounts.saveAsTextFiles("/data/workspace/myshixun/project/step2/result","txt")

###################### End ######################

ssc.start()

ssc.awaitTermination(timeout=60)

 

在第一个命令行窗口执行

mkdir -p /root/test/
mkdir -p /root/mylog2/
cd /data/workspace/myshixun/project/step2/
chmod 777 work.py
python work.py # 现在开始运行代码文件,请在 60 秒内创建文件并写入下面数据

再打开一个命令行窗口创建文件并写入下面数据

vim /root/test/words.txt

把下面数据粘贴上去

Hiding behind the loose dusty curtain, a teenager packed up his overcoat into the suitcase.
He planned to leave home at dusk though there was thunder and lightning outdoors.
As a result, his score in each exam never added up to over 60, his name is LiMing.

再去另一个命令行窗口就可以看到正在统计词频了

第3关:RDD 队列流

任务描述

本关任务:使用 Spark Streaming 实现队列流,完成词频统计。

相关知识

为了完成本关任务,你需要掌握:

  1. 队列流概述;
  2. Python 与 Spark Streaming 队列流;
  3. Spark Streaming 队列流初体验。

队列流概述

队列是无须的或共享的消息。使用队列消息传递,可以创建多个消费者来从点对点消息传递通道接收消息。当通道传递消息时,任何消费者都可能收到消息。消息传递系统的实现确定哪个消费者实际接收消息。Queuing 通常与无状态应用程序一起使用。无状态应用程序不关心顺序,但它们确实需要识别或删除单个消息的能力,以及尽可能扩展并行消耗的能力。

img

相比之下,流是严格有序的或独占的消息传递。使用流消息传递,始终只有一个消费者使用消息传递通道。消费者接收从通道发送的消息,其顺序与消息的写入顺序一致。Streaming 通常与有状态的应用程序一起使用。有状态应用程序关心消息顺序及其状态。消息的顺序决定有状态应用程序的状态。当发生无序消费时,排序将影响应用程序,需要处理逻辑的正确性。

Python 与 Spark Streaming 队列流

为了使用测试数据测试 Spark Streaming 应用程序,还可以基于 RDD 队列创建 DStream,使用 streamingContext.queueStream(queueOfRDDs). 每个推入队列的 RDD 都会被视为 DStream 中的一批数据,像流一样处理。

img

在 Python 中创建 Spark Streaming 队列流:

# 导入包
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
# 通过 SparkConf() 配置参数
sparkConf = SparkConf()
sparkConf.setAppName('demo')
sparkConf.setMaster('local[*]')
# 创建 spark 上下文对象
sc = SparkContext(conf=sparkConf)
# 创建 Spark Streaming 上下文对象
# 参数1:spark 上下文对象
# 参数2:读取间隔时间(秒)
ssc = StreamingContext(sc, 10)
# 创建列表(RDD)
rddQueue = ["Hello python", "Hello spark", "Hello spark streaming"]
# 创建队列流
inputStream = ssc.queueStream(rddQueue)

Spark Streaming 队列流初体验

通过对队列流及其创建方法的了解,我们现在通过一个案例来学习 Spark Streaming 读取队列流的具体实现。

打开右侧命令行窗口,等待连接后,进入 python shell 界面,创建队列流。

python

进入后,出现如下界面:

,

第一步,创建 Spark Streaming 队列流

# 导入包
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 创建 spark 上下文对象
sc = SparkContext("local[*]","demo")
# 创建 Spark Streaming 上下文对象
ssc = StreamingContext(sc, 10)
# 创建列表(RDD)
rddQueue = ["Hello python", "Hello spark", "Hello spark streaming"]
# 创建队列流
inputStream = ssc.queueStream(rddQueue)

第二步,数据处理

完成对队列流中相关数据的处理。

inputStream.pprint()

第三步,启动与停止

ssc.start()
# 检测到没有数据流输入后就会停止
ssc.stop()

运行后发现,并没有一次输出所有的数据,而是依次的进行输出处理。

img

img

这就是 Spark Streaming 队列流的特性,我们在使用时需要注意。

编程要求

打开右侧代码文件窗口,在 BeginEnd 区域补充代码,根据所给出的 rdd 列表,创建队列流,按空格进行分词,完成词频统计,使用 pprint() 输出结果。

词频统计要求:

  • 对数据按照 26 个字母进行扁平化统计,例如:('g', 10)
  • 过滤掉所有为 '' 的值。

检查点存放本地目录:/root/mylog3/

小贴士:

  • pprint() 方法中可以设置数据输出显示的数量。

测试说明

平台将对你编写的代码进行评测,如果与预期结果一致,则通关,否则测试失败。

答案代码

mkdir -p /root/mylog3/
import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext


# 累加器(状态更新)
def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)


sc = SparkContext("local[*]", "work")

ssc = StreamingContext(sc, 5)

# rdd 列表
rdd = ["My father is a basketball fan, he watches the NBA match when he is free.",
            "Because of the effect from my father, I fell in love with basketball when I was very small.",
            " So when I go to middle school, I join the basketball team in my class",
            " I meet many friends who have the same love for basketball.",
            " We will play basketball after class or sometimes in the weekend, we will play the match with other team."]


###################### Begin ######################
# 设置检查点
ssc.checkpoint("/root/mylog3/")

# 创建队列流
inputStream = ssc.queueStream([sc.parallelize([line]) for line in rdd])

# 按空格进行分词
words = inputStream.flatMap(lambda line: line.split(" "))

# 过滤掉空字符串
words_filter = words.filter(lambda word: word != '')

# 按字母进行扁平化统计
words_flatMap = words_filter.flatMap(lambda word: [(letter, 1) for letter in word.lower()])

# 使用 updateStateByKey 进行状态更新
wordCnt = words_flatMap.updateStateByKey(updateFunction)

# 输出结果
wordCnt.pprint()

###################### End ######################

ssc.start()

time.sleep(30)

ssc.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/274527.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Pandas有了平替Polars

Polars是一个Python数据处理库,旨在提供高性能、易用且功能丰富的数据操作和分析工具。它的设计灵感来自于Pandas,但在性能上更加出色。 Polars具有以下主要特点: 强大的数据操作功能:Polars提供了类似于Pandas的数据操作接口&am…

Xshell连接ubuntu,从github克隆项目,用Xshell克隆项目

访问不了github:https://blog.csdn.net/liu834189447/article/details/135246914 短暂解决访问问题。 ping不通虚拟机/无法连接虚拟机:https://blog.csdn.net/liu834189447/article/details/135240276 ps: Xshell、ubuntu的粘贴快捷键为 Shift Insert …

23款奔驰GLC260L升级原厂540全景影像 高清环绕的视野

嗨 今天给大家介绍一台奔驰GLC260L升级原厂360全景影像 新款GLC升级原厂360全景影像 也只需要安装前面 左右三个摄像头 后面的那个还是正常用的,不过不一样的是 升级完成之后会有多了个功能 那就是新款透明底盘,星骏汇小许 Xjh15863 左右两边只需要更换…

XV7021BB陀螺仪传感器

XV7021BB具有优越的性能特性,特别是在偏置输出稳定性和低噪声,而消耗小于1 mA的电流。爱普生通过使用爱普生的原始石英传感器元件来实现这些性能。 该传感器具有数字输出接口(SPl和l?C),兼容由独立于主电源电压(VodM&…

Redis 分布式锁总结

在一个分布式系统中,由于涉及到多个实例同时对同一个资源加锁的问题,像传统的synchronized、ReentrantLock等单进程情况加锁的api就不再适用,需要使用分布式锁来保证多服务实例之间加锁的安全性。常见的分布式锁的实现方式有zookeeper和redis…

『Linux升级路』冯诺依曼体系结构与操作系统

🔥博客主页:小王又困了 📚系列专栏:Linux 🌟人之为学,不日近则日退 ❤️感谢大家点赞👍收藏⭐评论✍️ 目录 一、冯诺依曼体系结构 📒1.1为什么要有体系结构 📒1.2…

前端文件在虚拟机,后端在本机,两个如何通信

前端文件在虚拟机,后端在本机,两个如何通信 如果前端的文件放在虚拟机里面,但是调用接口的后端在本地调试,如何做到在虚拟机中也能访问到本地的接口内容。 其实这个问题很简单,只要讲本地的IP和虚拟机中的IP结合就可…

[SWPUCTF 2021 新生赛]finalrce

[SWPUCTF 2021 新生赛]finalrce wp 注&#xff1a;本文参考了 NSSCTF Leaderchen 师傅的题解&#xff0c;并修补了其中些许不足。 此外&#xff0c;参考了 命令执行(RCE)面对各种过滤&#xff0c;骚姿势绕过总结 题目代码&#xff1a; <?php highlight_file(__FILE__); …

Spring Boot笔记1

1. SpringBoot简介 1.1. 原有Spring优缺点分析 1.1.1. Spring的优点分析 Spring是Java企业版&#xff08;Java Enterprise Edition&#xff0c;javeEE&#xff09;的轻量级代替品。无需开发重量级的Enterprise JavaBean&#xff08;EJB&#xff09;&#xff0c;Spring为企业…

C++ 内联函数inline

内联函数是C为提高程序运行速度所做的一项改进。常规函数和内联函数之间的主要区别不在于编写方式&#xff0c;面在于C编译器如何将它们组合到程序中。要了解内联函数与常规函数之间的区别&#xff0c;必须深入到程序内部。 编译过程的最终产品是可执行程序一一由一组机器语言指…

C++ 文件操作篇

C 文件操作篇 文章目录 C 文件操作篇1 简介1.1 继承关系1.2 流1.3 缓冲区输入输出流中的缓冲streambuf 2 文件操作步骤2.1 头文件2.2 创建流对象2.3 打开文件2.4 读取数据第一种&#xff1a;**按元素直接读**第二种&#xff1a;**使用getline按行读**第三种&#xff1a;**使用*…

数据结构--查找

目录 1. 查找的基本概念 2. 线性表的查找 3. 树表的查找 3.1 二叉排序树 3.1.1 定义: 3.1.2 存储结构&#xff1a; 3.1.3 二叉排序树的查找 3.1.4 二叉排序树的插入 3.1.5 二叉排序树删除 3.2 平衡二叉树&#xff08;AVL 3.2.1 为什么要有平衡二叉树 3.2.2 定义 3.3 B-树 3.3.1…

如何安装T4显卡的驱动

文章目录 一、没有驱动的报错现象二、cuda版本与驱动的版本对应关系三、安装驱动方法1&#xff1a;方法2&#xff1a; 一、没有驱动的报错现象 ERROR: Unable to find the kernel source tree for the currently running kernel. Please make sure you have installed the ker…

uniapp-android原生插件如何打aar包 (避坑指南二)

1.打开android studio项目&#xff0c;找到module项目&#xff0c;打开右侧gradle&#xff0c;找到对应的module, 点击assemble&#xff0c;会打包生成aar&#xff0c;生成的aar在 [module]/build/outputs/aar/目录下 特殊情况&#xff0c;如果右侧的gradle&#xff0c;找到mo…

生存分析序章2——生存分析之Python篇:lifelines库入门

目录 写在开头1. 介绍 lifelines 库1.1 lifelines库简介1.2 安装与环境配置 2. 数据准备2.1 数据格式与结构2.2 处理缺失数据2.3 对异常值的处理 3. Kaplan-Meier 曲线3.1 使用 lifelines 绘制生存曲线3.2 曲线解读3.3 额外补充 4. Cox 比例风险模型4.1 lifelines 中的 Cox 模型…

RabbitMq知识概述

本文来说下RabbitMq相关的知识与概念 文章目录 概述AMQP协议Exchange 消息如何保证100&#xff05;投递什么是生产端的可靠性投递可靠性投递保障方案 消息幂等性高并发的情况下如何避免消息重复消费confirm 确认消息、Return返回消息如何实现confirm确认消息return消息机制 消费…

Flask 与微信小程序对接

Flask 与微信小程序的对接 在 web/controllers/api中增建py文件&#xff0c;主要是给微信小程序使用的。 web/controllers/init.py # -*- coding: utf-8 -*- from flask import Blueprint route_api Blueprint( api_page,__name__ )route_api.route("/") def ind…

数据的价值:隐藏在数字背后的巨大财富

在当今数字化的时代&#xff0c;数据已经成为了一种宝贵的资源&#xff0c;它的价值被越来越多的人所认识。数据不仅可以帮助企业更好地了解市场和消费者&#xff0c;提高决策的准确性&#xff0c;还可以为社会带来更多的便利和创新。企业、组织和个人可以利用数据来更好地了解…

编程笔记 html5cssjs 004 我的第一个页面

编程笔记 html5&css&js 004 我的第一个页面 一、基本结构二、HTML标签三、HTML元素四、HTML属性五、编写第一个网页六、使用VSCODE小结 开始编写网页&#xff0c;并且使用第一个网页成为一个母板&#xff0c;用于完成后续内容的学习。有一个基本要求&#xff0c;显示结…

linux cat命令增加-f显示文件名功能

在使用cat命令配合grep批量搜索文件内容时&#xff0c;我仅仅能知道是否搜索到&#xff0c;不知道是在哪个文件里找到的。比如cat ./src/*.c | grep full_write,在src目录下的所有.c文件里找full_write,能匹配到所有的full_write&#xff0c;但是不知道它们分别在哪些文件里。于…