大数据分析与应用实验任务十一

大数据分析与应用实验任务十一

实验目的

  • 通过实验掌握spark Streaming相关对象的创建方法;

  • 熟悉spark Streaming对文件流、套接字流和RDD队列流的数据接收处理方法;

  • 熟悉spark Streaming的转换操作,包括无状态和有状态转换。

  • 熟悉spark Streaming输出编程操作。

实验任务

一、DStream 操作概述
  1. 创建 StreamingContext 对象

    登录 Linux 系统后,启动 pyspark。进入 pyspark 以后,就已经获得了一个默认的 SparkConext 对象,也就是 sc。因此,可以采用如下方式来创建 StreamingContext 对象:

    from pyspark.streaming import StreamingContext 
    sscluozhongye = StreamingContext(sc, 1)
    

    image-20231207112253827

    如果是编写一个独立的 Spark Streaming 程序,而不是在 pyspark 中运行,则需要在代码文件中通过类似如下的方式创建 StreamingContext 对象:

    from pyspark import SparkContext, SparkConf 
    from pyspark.streaming import StreamingContext 
    conf = SparkConf() 
    conf.setAppName('TestDStream') 
    conf.setMaster('local[2]') 
    sc = SparkContext(conf = conf) 
    ssc = StreamingContext(sc, 1)
    print("创建成功,lzy防伪")
    

    image-20231207112652285

二、基本输入源
  1. 文件流
  • 在 pyspark 中创建文件流

    首先,在 Linux 系统中打开第 1 个终端(为了便于区分多个终端,这里记作“数据源终端”),创建一个 logfile 目录,命令如下:

    cd /root/Desktop/luozhongye/
    mkdir streaming 
    cd streaming 
    mkdir logfile
    

    image-20231207112923323

    其次,在 Linux 系统中打开第二个终端(记作“流计算终端”),启动进入 pyspark,然后,依次输入如下语句:

    from pyspark import SparkContext 
    from pyspark.streaming import StreamingContext 
    ssc = StreamingContext(sc, 10) 
    lines = ssc.textFileStream('file:///root/Desktop/luozhongye/streaming/logfile') 
    words = lines.flatMap(lambda line: line.split(' ')) 
    wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b) 
    wordCounts.pprint() 
    ssc.start() 
    ssc.awaitTermination()
    

image-20231207113305405

  • 采用独立应用程序方式创建文件流

    #!/usr/bin/env python3 
    from pyspark import SparkContext, SparkConf 
    from pyspark.streaming import StreamingContext 
    conf = SparkConf() 
    conf.setAppName('TestDStream') 
    conf.setMaster('local[2]') 
    sc = SparkContext(conf = conf) 
    ssc = StreamingContext(sc, 10) 
    lines = ssc.textFileStream('file:///root/Desktop/luozhongye/streaming/logfile') 
    words = lines.flatMap(lambda line: line.split(' ')) 
    wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b) 
    wordCounts.pprint() 
    ssc.start() 
    ssc.awaitTermination()
    print("2023年12月7日lzy")
    

    保存该文件,并执行以下命令:

    cd /root/Desktop/luozhongye/streaming/logfile/ 
    spark-submit FileStreaming.py
    

image-20231207114014647

  1. 套接字流
  • 使用套接字流作为数据源

    新建一个代码文件“/root/Desktop/luozhongye/streaming/socket/NetworkWordCount.py”,在NetworkWordCount.py 中输入如下内容:

    #!/usr/bin/env python3 
    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    
    if __name__ == "__main__":
    	if len(sys.argv) != 3:
    		print("Usage: NetworkWordCount.py <hostname> <port>", file=sys.stderr)
    		exit(-1)
    	sc = SparkContext(appName="PythonStreamingNetworkWordCount")
    	ssc = StreamingContext(sc, 1)
    	lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    	counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
    	counts.pprint()
    	ssc.start()
    	ssc.awaitTermination()
    

    使用如下 nc 命令生成一个 Socket 服务器端:

    nc -lk 9999
    

    新建一个终端(记作“流计算终端”),执行如下代码启动流计算:

    cd /root/Desktop/luozhongye/streaming/socket 
    /usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999
    

image-20231208002212790

  • 使用 Socket 编程实现自定义数据源

    新建一个代码文件“/root/Desktop/luozhongye/streaming/socket/DataSourceSocket.py”,在 DataSourceSocket.py 中输入如下代码:

    #!/usr/bin/env python3 
    import socket
    
    # 生成 socket 对象
    server = socket.socket()
    # 绑定 ip 和端口
    server.bind(('localhost', 9999))
    # 监听绑定的端口
    server.listen(1)
    while 1:
    	# 为了方便识别,打印一个“I’m waiting the connect...”
    	print("I'm waiting the connect...")
    	# 这里用两个值接收,因为连接上之后使用的是客户端发来请求的这个实例
    	# 所以下面的传输要使用 conn 实例操作
    	conn, addr = server.accept()
    	# 打印连接成功
    	print("Connect success! Connection is from %s " % addr[0])
    	# 打印正在发送数据
    	print('Sending data...')
    	conn.send('I love hadoop I love spark hadoop is good spark is fast'.encode())
    	conn.close()
    	print('Connection is broken.')
    print("2023年12月7日lzy")
    

    执行如下命令启动 Socket 服务器端:

    cd /root/Desktop/luozhongye/streaming/socket 
    /usr/local/spark/bin/spark-submit DataSourceSocket.py
    

    新建一个终端(记作“流计算终端”),输入以下命令启动 NetworkWordCount 程序:

    cd /root/Desktop/luozhongye/streaming/socket 
    /usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999
    

image-20231208003303167

  1. RDD 队列流

    Linux 系统中打开一个终端,新建一个代码文件“/root/Desktop/luozhongye/ streaming/rddqueue/ RDDQueueStream.py”,输入以下代码:

    #!/usr/bin/env python3 
    import time
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    
    if __name__ == "__main__":
        print("")
    	sc = SparkContext(appName="PythonStreamingQueueStream")
    	ssc = StreamingContext(sc, 2)
    	# 创建一个队列,通过该队列可以把 RDD 推给一个 RDD 队列流
    	rddQueue = []
    	for i in range(5):
    		rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]
    	time.sleep(1)
    	# 创建一个 RDD 队列流
    	inputStream = ssc.queueStream(rddQueue)
    	mappedStream = inputStream.map(lambda x: (x % 10, 1))
    	reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)
    	reducedStream.pprint()
    	ssc.start()
    	ssc.stop(stopSparkContext=True, stopGraceFully=True)
    

    下面执行如下命令运行该程序:

    cd /root/Desktop/luozhongye/streaming/rddqueue 
    /usr/local/spark/bin/spark-submit RDDQueueStream.py
    

image-20231208004439462

三、转换操作
  1. 滑动窗口转换操作

    对“套接字流”中的代码 NetworkWordCount.py 进行一个小的修改,得到新的代码文件“/root/Desktop/luozhongye/streaming/socket/WindowedNetworkWordCount.py”,其内容如下:

    #!/usr/bin/env python3 
    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    
    if __name__ == "__main__":
    	if len(sys.argv) != 3:
    		print("Usage: WindowedNetworkWordCount.py <hostname> <port>", file=sys.stderr)
    		exit(-1)
    	sc = SparkContext(appName="PythonStreamingWindowedNetworkWordCount")
    	ssc = StreamingContext(sc, 10)
    	ssc.checkpoint("file:///root/Desktop/luozhongye/streaming/socket/checkpoint")
    	lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    	counts = lines.flatMap(lambda line: line.split(" ")) \
    		.map(lambda word: (word, 1)) \
    		.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
    	counts.pprint()
    	ssc.start()
    	ssc.awaitTermination()
    

为了测试程序的运行效果,首先新建一个终端(记作“数据源终端”),执行如下命令运行nc 程序:

   cd /root/Desktop/luozhongye/streaming/socket/ 
   nc -lk 9999

然后,再新建一个终端(记作“流计算终端”),运行客户端程序 WindowedNetworkWordCount.py,命令如下:

   cd /root/Desktop/luozhongye/streaming/socket/ 
   /usr/local/spark/bin/spark-submit WindowedNetworkWordCount.py localhost 9999

在数据源终端内,连续输入 10 个“hadoop”,每个 hadoop 单独占一行(即每输入一个 hadoop就按回车键),再连续输入 10 个“spark”,每个 spark 单独占一行。这时,可以查看流计算终端内显示的词频动态统计结果,可以看到,随着时间的流逝,词频统计结果会发生动态变化。

image-20231208005821701

  1. updateStateByKey 操作

    在“/root/Desktop/luozhongye/streaming/stateful/”目录下新建一个代码文件 NetworkWordCountStateful.py,输入以下代码:

    #!/usr/bin/env python3 
    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    
    if __name__ == "__main__":
    	if len(sys.argv) != 3:
    		print("Usage: NetworkWordCountStateful.py <hostname> <port>", file=sys.stderr)
    		exit(-1)
    	sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
    	ssc = StreamingContext(sc, 1)
    	ssc.checkpoint("file:///root/Desktop/luozhongye/streaming/stateful/")
    	# RDD with initial state (key, value) pairs
    	initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])
    
    
    	def updateFunc(new_values, last_sum):
    		return sum(new_values) + (last_sum or 0)
    
    
    	lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    	running_counts = lines.flatMap(lambda line: line.split(" ")) \
    		.map(lambda word: (word, 1)) \
    		.updateStateByKey(updateFunc, initialRDD=initialStateRDD)
    	running_counts.pprint()
    	ssc.start()
    	ssc.awaitTermination()
    

    新建一个终端(记作“数据源终端”),执行如下命令启动 nc 程序:

    nc -lk 9999
    

    新建一个 Linux 终端(记作“流计算终端”),执行如下命令提交运行程序:

    cd /root/Desktop/luozhongye/streaming/stateful 
    /usr/local/spark/bin/spark-submit NetworkWordCountStateful.py localhost 9999
    

image-20231208010814959

四、把 DStream 输出到文本文件中

下面对之前已经得到的“/root/Desktop/luozhongye/streaming/stateful/NetworkWordCountStateful.py”代码进行简单的修改,把生成的词频统计结果写入文本文件中。

修改后得到的新代码文件“/root/Desktop/luozhongye/streaming/stateful/NetworkWordCountStatefulText.py”的内容如下:

#!/usr/bin/env python3 
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
	if len(sys.argv) != 3:
		print("Usage: NetworkWordCountStateful.py <hostname> <port>", file=
		sys.stderr)
		exit(-1)
	sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
	ssc = StreamingContext(sc, 1)
	ssc.checkpoint("file:///root/Desktop/luozhongye/streaming/stateful/")
	# RDD with initial state (key, value) pairs 
	initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])


	def updateFunc(new_values, last_sum):
		return sum(new_values) + (last_sum or 0)


	lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
	running_counts = lines.flatMap(lambda line: line.split(" ")) \
		.map(lambda word: (word, 1)) \
		.updateStateByKey(updateFunc, initialRDD=initialStateRDD)
	running_counts.saveAsTextFiles("file:///root/Desktop/luozhongye/streaming/stateful/output")
	running_counts.pprint()
	ssc.start()
	ssc.awaitTermination()

新建一个终端(记作“数据源终端”),执行如下命令运行nc 程序:

cd /root/Desktop/luozhongye/streaming/socket/ 
nc -lk 9999

新建一个 Linux 终端(记作“流计算终端”),执行如下命令提交运行程序:

cd /root/Desktop/luozhongye/streaming/stateful 
/usr/local/spark/bin/spark-submit NetworkWordCountStatefulText.py localhost 9999

image-20231208012123002

实验心得

通过本次实验,我深入理解了Spark Streaming,包括创建StreamingContext、DStream等对象。同时,我了解了Spark Streaming对不同类型数据流的处理方式,如文件流、套接字流和RDD队列流。此外,我还熟悉了Spark Streaming的转换操作和输出编程操作,并掌握了map、flatMap、filter等方法。最后,我能够自定义输出方式和格式。总之,这次实验让我全面了解了Spark Streaming,对未来的学习和工作有很大的帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/227465.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Leetcode—190.颠倒二进制位【简单】

2023每日刷题&#xff08;五十二&#xff09; Leetcode—190.颠倒二进制位 算法思路 实现代码 class Solution { public:uint32_t reverseBits(uint32_t n) {uint32_t res 0;for(int i 0; i < 32 && n > 0; i) {res | (n & 1) << (31 - i);n >&…

SpringDataJPA基础

简介 Spring Data为数据访问层提供了熟悉且一致的Spring编程模版&#xff0c;对于每种持久性存储&#xff0c;业务代码通常需要提供不同存储库提供对不同CURD持久化操作。Spring Data为这些持久性存储以及特定实现提供了通用的接口和模版。其目的是统一简化对不同类型持久性存储…

Verilog学习 | 用initial语句写出固定的波形

initial beginia 0;ib 1;clk 0;#10ia 1; #20ib 0;#20ia 0; endalways #5 clk ~clk; 或者 initial clk 0;initial beginia 0;#10ia 1; #40ia 0; endinitial beginib 1;#30 ib 0; endalways #5 clk ~clk;

Linux本地部署1Panel服务器运维管理面板并实现公网访问

文章目录 前言1. Linux 安装1Panel2. 安装cpolar内网穿透3. 配置1Panel公网访问地址4. 公网远程访问1Panel管理界面5. 固定1Panel公网地址 前言 1Panel 是一个现代化、开源的 Linux 服务器运维管理面板。高效管理,通过 Web 端轻松管理 Linux 服务器&#xff0c;包括主机监控、…

一些系统日常运维命令和语句

一、前言 记录一些日常系统运维的命令和语句 二、linux命令与语句 1、linux查看各目录使用磁盘情况 du -h /home home为目录 du -h /home 2.查看内存使用情况 free -h 3、查看进程和CPU使用情况 top top 三、数据库语句 1、统计mysql数据库表数量 SELECT COUNT(*) A…

【Linux--基础IO】

目录 一、系统文件接口1.1 open1.2 write1.3 read1.4 close 二、文件描述符三、文件描述符的分配规则四、重定向4.1输出重定向的原理4.2dup2函数的系统调用 五、缓冲区5.1代码及现象5.2原理解释5.3C语言FILE 六、文件系统6.1磁盘的介绍6.1磁盘的分区管理 7、软硬连接7.1软连接7…

基于FPGA的温度控制系统设计(论文+源码)

1.系统设计 本次基于FPGA的智能温度控制系统&#xff0c;以FPGA为控制核心&#xff0c;采用自顶向下的设计方法&#xff0c;按照模块化设计的思路分别实现各个模块&#xff0c;再加以整合实现整个系统&#xff0c;从而达到了温度控制的目的。系统以水箱为被控对象&#xff0c;…

【Spring 源码】 深入理解 Bean 定义之 BeanDefinition

&#x1f680; 作者主页&#xff1a; 有来技术 &#x1f525; 开源项目&#xff1a; youlai-mall &#x1f343; vue3-element-admin &#x1f343; youlai-boot &#x1f33a; 仓库主页&#xff1a; Gitee &#x1f4ab; Github &#x1f4ab; GitCode &#x1f496; 欢迎点赞…

Python文件操作(txt + xls + json)

文章目录 简介1、使用with_open读取和保存&#xff1a;.txt .bin&#xff08;二进制文本&#xff09;1.1、with open语句详解1.1、项目实战 2、使用pandas读取和保存&#xff1a;.xls .xlsx2.1、pandas简介2.2、环境配置2.3、项目实战 3、 使用json.dump读取和保存&#xff1…

如何查询川菜食材配料的API接口

在当今的美食文化中&#xff0c;菜谱不只是一张简单的食谱&#xff0c;更是了解美食文化和饮食知识的重要途径。然而&#xff0c;若没有准确的食材配料&#xff0c;烹制出的每道菜品都将难以达到完美的味道。因此&#xff0c;为了更好地满足人们对于菜谱和食谱的需求&#xff0…

Avalonia中如何实现文件拖拽上传

前言 前面我们讲了在Avalonia中如何将View事件映射到ViewModel层感兴趣的读者可以看一下&#xff0c;本章我们将讲一下在Avalonia框架下如何实现文件和文字的拖拽到指定区域进行处理和上传。 先看效果 界面设计比较简单&#xff0c;还是在前一张的基础上加了一个指定区域&…

Vue使用百度地图以及实现轨迹回放 附完整代码

百度地图开放平台 https://lbs.baidu.com/index.php?title%E9%A6%96%E9%A1%B5 javaScript API https://lbs.baidu.com/index.php?titlejspopularGL 百度地图实例 https://lbsyun.baidu.com/index.php?titleopen/jsdemoVue Baidu Map文档 https://dafrok.github.io/vue-baidu…

【环境搭建】ubuntu22安装ros2

基于某种特殊需求&#xff0c;从Ubuntu16到22目前都尝试过安装ros、ros2 参考1&#xff1a;http://t.csdnimg.cn/DzvSe 参考2&#xff1a;http://t.csdnimg.cn/sOzr1 1.设置locale sudo apt update && sudo apt install locales sudo locale-gen en_US en_US.UTF-8 s…

基于ssm vue协同过滤算法的图书推荐系统源码和论文

基于ssm vue协同过滤算法的图书推荐系统源码和论文742 idea 数据库mysql5.7 数据库链接工具&#xff1a;navcat,小海豚等 环境&#xff1a; jdk8 tomcat8.5 开发技术 ssm 摘 要 “互联网”的战略实施后&#xff0c;很多行业的信息化水平都有了很大的提升。但是目前很多行业…

[原创][6]探究C#多线程开发细节-“ConcurrentDictionary<T,T>解决多线程的无顺序性的问题“

[简介] 常用网名: 猪头三 出生日期: 1981.XX.XX QQ联系: 643439947 个人网站: 80x86汇编小站 https://www.x86asm.org 编程生涯: 2001年~至今[共22年] 职业生涯: 20年 开发语言: C/C、80x86ASM、PHP、Perl、Objective-C、Object Pascal、C#、Python 开发工具: Visual Studio、D…

Matlab 点云曲线探测(算法不稳定,仅用于学习)

文章目录 一、简介二、实现代码三、实现效果参考文献一、简介 这是一个很有趣的曲线探测的方法,不过我没有复现出论文中那样的效果,可能是理解有误,但这个算法仍然是很有意思,故这里也对其进行记录。 按照论文中的思路,首先我们需要通过一种线性强度图来计算确定每个点的法…

学好操作系统需要的前置知识

1. 态度&#xff1a;不要等一切都准备好了再前行 如果把一切你可能会说&#xff0c;没有这些基础知识&#xff0c;我每看一篇文章&#xff0c;知识就铺天盖地席卷过来&#xff0c;仿佛每一个知识点都准确地打在了自己的盲点上&#xff0c;这该怎么办呢&#xff1f; 我非常能理…

一对多群聊

服务端 import java.io.*; import java.net.*; import java.util.ArrayList; public class Server{public static ServerSocket server_socket;public static ArrayList<Socket> socketListnew ArrayList<Socket>(); public static void main(String []args){try{…

第二十一章总结博客

网络程序设计基础 局域网与互联网 为了实现两台计算机的通信&#xff0c;必须用一个网络线路连接两台计算机。如下图所示 网络协议 1.IP协议 IP是Internet Protocol的简称&#xff0c;是一种网络协议。Internet 网络采用的协议是TCP/IP协议&#xff0c;其全称是Transmission …

Docker架构及常用的命令

一、初识Docker 1、 docker是一个快速交付应用、运行应用的技术&#xff0c;具备下列优势&#xff1a; 可以将程序及其依赖、运行环境一起打包为一个镜像&#xff0c;可以迁移到任意Linux操作系统运行时利用沙箱机制形成隔离容器&#xff0c;各个应用互不干扰启动、移除都可以…