Spark Streaming简介与代码实例

背景:

Spark Streaming是准实时流处理框架,处理响应时间一般以分钟为单位,处理实时数据的延迟时间一般是秒级别的;其他容易混淆的例如Storm实时流处理框架,处理响应是毫秒级。

在我们项目实施选择流框架时需要看具体业务场景:使用MapReduce和Spark进行大数据处理,能够解决很多生产环境下的计算问题,但是随着业务逐渐丰富,数据逐渐丰富,这种批处理在很多场景已经不能满足生产环境的需要了,体现例如①离线计算一般就会建立一个数据仓库,数据量大的情况下,计算耗时也会很长。②例如一个业务场景,需要在根绝客户访问一个网站时的浏览、点击行为,实时做出一些业务上的反馈,时延太长这个数据也流失了很多价值。③现在技术发展的需要,许多机器学习和人工智能应用需要大量的实时数据进行训练和优化。

数据是源源不断产生,计算程序也是一直存在的,即实时计算。

1.流式计算和批处理的关系

批处理和流式本来就存在某种微妙的关系,不是完全隔离的。Spark Streaming充分利用了这种微妙关系,将其发挥到极致。批量处理是Spark Streaming流式处理的一个窗口特别大的特例,实际上,如果我们定时执行某个Spark程序,或者每天执行一次,也相当于是流失计算,不过是以天为事件窗口。但是如果细加观察,Spark Streaming的每个batch又都是一个批处理,只是因为这个批处理可以足够小,看起来就像数据在真实流动一样,所以我们也称之为流式处理。

2.主流的流式计算框架

流式计算最具代表性的框架之一就是Storm。在Storm中,先要设计一个用于实时计算的图状结构,我们称之为拓扑(topology)。这个拓扑将会被提交给集群,由集群中的主控节点(Master node)分发代码,将任务分配给工作节点(Worker node)执行。一个拓扑中包括spout和bolt两种角色,其中spout发送消息,负责将数据流以tuple元组的形式发送出去;而bolt则负责转换这些数据流,在bolt中可以完成计算、过滤等操作,bolt自身也可以随机将数据发送给其他bolt。由spout发射出的tuple是不可变数组,对应着固定的键值对。

Spark Streaming是核心Spark API的一个扩展,它并不会像Storm那样一次一个地处理数据流,而是在处理前按时间间隔预先将其切分为一段一段的批处理作业。Spark针对持续性数据流的抽象称为DStream(DiscretizedStream),一个DStream是一个微批处理(micro-batching)的RDD(弹性分布式数据集);RDD则是一种分布式数据集,能够以两种方式并行运作,分别是任意函数和滑动窗口数据的转换。

除了Spark,Flink也是类似Spark的计算框架,Flink是一个针对流数据和批数据的分布式处理引擎。它主要是由Java代码实现。对Flink而言,其所要处理的主要场景就是流数据,批数据只是流数据的一个极限特例而已。Flink会把所有任务当成流来处理,这也是其最大的特点。Flink可以支持本地的快速迭代,以及一些环形的迭代任务,并且Flink可以定制化内存管理。在这点,如果要对比Flink和Spark的话,Flink并没有将内存完全交给应用层。这也是为什么Spark相对于Flink,更容易出现OOM的原因(out ofmemory)。就框架本身与应用场景来说,Flink更相似与Storm。

3.自定义流式计算举例

为了更好理解流式计算思想,我们来举例一个更具体的流式计算的程序。常见的实时计算需要有数据源、消息队列、数据处理。我们的数据源来自Socket,消息队列为了保证线程安全,我们使用Java自带的BlockingQueue,而数据处理就通过一个独立线程读取消息队列的内容处理,结果我们放在ConcurrentHashMap中,保证线程安全。

Spark Streaming的基本原理是将输入数据流以时间片(秒级)为单位进行拆分,然后以类似批处理的方式处理每个时间片数据,见下图:

首先,Spark Streaming把实时输入数据流以时间片Δt(如1秒)为单位切分成块。Spark Streaming会把每块数据作为一个RDD,并使用RDD操作处理每一小块数据。每个块都会生成一个Spark Job处理,最终结果也返回多个块。使用Spark Streaming编写的程序与编写Spark程序非常相似,在Spark程序中,主要通过操作RDD提供的接口,如Map、Reduce、Filter等,实现数据的批处理。而在Spark Streaming中,则通过操作DStream(表示数据流的RDD序列)提供的接口,这些接口和RDD提供的接口类似。下图显示了Spark Streaming程序到Spark Job的转换:

Spark Streaming把程序中对DStream的操作转换为DStream Graph,对于每个时间片,DSteam Graph都会产生一个RDD Graph;针对每个输出操作(如Print、Foreach等),SparkStreaming都会创建一个Spark Action;对于每个Spark Action,Spark Streaming都会产生一个相应的Spark Job,Spark会调度Task到相应的Spark Executor上执行。

Spark Streaming的一些常用组件如下:

1.StreamingContext:Spark Streaming中Driver端的上下文对象,初始化的时候会构造Spark Streaming应用程序需要使用的组件,比如DStreamGraph、JobScheduler 等

2.JobGenerator:主要是从DStream产生Job,且根据指定时间执行checkpoint。它维护着一个定时器,该定时器在批处理时间到来的时候会生成作业的操作。

3.JobScheduler:主要用于调度Job。JobScheduler主要通过JobGenerator产生Job,并且通过ReceiverTracker管理流数据接收器Receiver。

4.ReceiverTracker:管理各个Executor上的Receiver的元数据。它在启动的时候,需要根据流数据接收器Receiver分发策略通知对应的Executor中的ReceiverSupervisor(接收器管理着)启动,然后再由ReceiverSupervisor来启动对应节点的Receiver。

数据源:

数据源程序,使用Java编写一个程序,使用socket来向7777端口发送数据:

Package test;

import java.io.Bufferedwriter;

import java.io.IOException;

import java.io.Outputstream;

import java.net.ServerSocket;

import java.net.socket;

import java.io.Outputstreamwriter;



public class DataGenerator{

         public static void main(string[] args) throws IOException{

                  //设置发送端口为7777

                  ServerSocket ss = new ServerSocket(7777);

                  Socket accept = ss.accept();

                  Outputstream outputstream = accept.getoutputstream();

                  Bufferedwriter writer = new Bufferedwriter(new Outputstreamhriter(outputstream));

                  //发送的字符串

                  String[] words = new String[]{"hello Hadoop\n", "hello spark\n", "world hello\n", "hello\n", "hadoop\n"};

                  while (true){

                          try{

                                   Thread.sleep(1000);

                          }catch (InterruptedException e){

                         

                          }

                          //随机发送一个字符串

                          writer.write(words[(int)(Math.random() * 5)]);

                          writer.flush();

                  }

         }

}

数据接收处理:

import org.apache.spark.streaming.streamingContext

import org.apache.spark.streaming.streamingContext

import org.apache.spark.streaming.dstream.Dstream

import org.apache.spark.streaming.Duration

import org.apache.spark.streaming.seconds

import org.apache.spark.SparkContext

import org.apache.spark.SparkConf

import org.apache.spark. storage.storageLevel



val sparkconf = new SparkConf().setApplame("NetworkWordCount").setMaster("localhost")

//设置每秒处理一次

val ssc = new streamingContext(sc, Seconds(1))

//使用socket发送数据,ip为localhost,端门为7777

val lines =ssc.socketTextstream("localhost",7777,StorageLevel.MEMORY_AND_DISK_SER)

//flatMap以空格分隔

val words = lines.flatmap(_.split(" "))

//对每一组数据各个字符串数量累加

val wordCounts = words.map(x => (x,1)).reduceBykey(_+_)

//对每一组数据各个字符串数量累加,每10秒一次,统计最近30秒的结果

val wordCounts = words.map(x => (x,1)).reduceByKeyAndWindow((a:Int,b:Int) => (a+b),Seconds(30),Seconds(10))

//输出



wordCounts.print()

ssc.start()

ssc.awaitTermination()

运行数据源:

把刚刚的Java程序打包,用spark-submit执行,我们将打包好的程序放到某一个目录,例如/opt下,命名为hadoop-streaming.jar,使用spark-submit提交(命令制定类名、主机名、UI端口号、Jar包路径):

nohup .../你的路径/bin/spark-submit –class test.DataGenerator –master spark://localhost:9000 /opt/Hadoop-streaming.jar &

运行数据接收、处理程序:

进入spark-shell来运行上面写好的“数据接收处理”的代码,可收到结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/287746.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux引导和服务

一、Linux系统引导过程 Linux系统的引导过程可以分为以下几个主要步骤: (一)BIOS/UEFI启动 当计算机开机时,首先会执行基本输入输出系统(BIOS)或统一可扩展固件接口(UEFI)。这些固…

Java 线程池参数详解与实战

1. 引言 在上一篇中,我们讲到了Java线程的特点和使用场景,Java多线程及通信方式详解-CSDN博客 今天就对Java 线程池参数再近一步进行了解。 在Java中,线程池是一组管理和复用线程的机制,它包含了一定数量的工作线程&#xff0…

跨年烟花-Html5实现_附完整源码【可直接运行】

文章目录 🍻前言🔸目录结构⚫完整源码🔵源码分析💮注意事项 💈总结 🍻前言 随着科技的进步和互联网的普及,人们对于跨年庆祝的方式也在不断变化。传统的烟花燃放虽然美丽,但存在环境…

Vite scss 如何引入本地 字体

Vite scss 如何引入本地 字体 最近在用 Vite 改造一个旧项目 Diary,遇到了好多从 Vue 转到 Vite 的问题。 这次这个问题是: scss 里本地字体引入的问题。 一、问题描述 可以看到下面的卡片字体,本来应该是 impact 的,但现在无法…

前缀和算法 -- [模版]二维前缀和

个人主页:Lei宝啊 愿所有美好如期而遇 本题链接 【模板】二维前缀和_牛客题霸_牛客网 输入描述 n是行,m是列,q是查询次数,x1,y1,x2,y2是二维数组的下标。 输出描述 通过两对下标&#x…

PostgreSQL表全解

文章目录 一、 约束1、 主键2、 非空3、唯一4、检查5、外键6、默认值 二、触发器1、构建表信息,填充数据2、触发器函数3、触发器 三、 表空间四、 视图五、索引1、 索引的基本概念2、索引的分类3、创建索引 六、 物化视图 一、 约束 1、 主键 primary key -- 主键…

粒子群优化pso结合bp神经网络优化对csv文件预测matlab(3)

1.csv数据为密西西比数据集,获取数据集可以管我要,数据集内容形式如下图: 2.代码 这里参考的是b站的一位博主。 数据集导入教程在我的另一篇文章bp写过,需要的话可以去看一下 psobp.m close all clc%读取数据 inputX; outputY;…

无边界支付:数字货币如何改变跨境电商?

在全球数字化的浪潮中,数字货币的崛起成为跨境电商领域的一场革命。本文将深入探讨数字货币如何重新定义支付体系,对跨境电商带来的影响以及未来可能的发展方向。 数字货币的崛起 随着比特币等数字货币的逐渐走俏,传统支付体系的边界逐渐被打…

c语言结构体学习上篇

文章目录 前言一、结构体的声明1,什么叫结构体?2,结构体的类型3,结构体变量的创建和初始化4,结构体的类型5,结构体的初始化 二、结构体的访问1,结构体成员的点操作符访问2,结构体体成员的指针访问 前言 昨…

基于RetinaFace+Jetson Nano的智能门锁系统——第二篇(配置环境)

文章目录 设备一、安装远程登录终端Xshell1.1下载Xshell1.2新建回话1.3查询ip地址1.4启动连接 二、安装远程文件管理WinScp2.1下载WinScp2.2连接Jetson Nano2.3连接成功 三、安装远程桌面VNC Viewer3.1下载VNC Viewer3.2在Jetson Nano安装VNC Viewer3.3设置VINO登录选项3.4将网…

工具变量-ESG基金持股数据集(2008-2022年)

一、数据介绍 数据名称:工具变量-ESG基金持股数据 数据范围:A股上市公司 数据年份:2008-2022年 样本数量:41621条 数据来源:中国责任投资年度报告、上市公司年报 数据整理:自主整理 二、参考文献 […

C#中字母与ASCⅡ码的转换

目录 一、关于ASCⅡ及与字符互转 1.主要用到Encoding对象的GetBytes方法 2.Char显式转换为数值类型得到ASCⅡ 二、实例 三、生成效果 四、程序中的一些知识点 1.IsLetterOrDigit() 2.GetBytes() 3.TryParse(string, out int) 一、关于ASCⅡ及与字符互转 ASCⅡ(Americ…

【SpringBoot3】1.SpringBoot入门的第一个完整小项目(新手保姆版+教会打包)

目录 1 SpringBoot简单介绍1.1 SpringBoot是什么1.2 主要优点1.3 术语1.3.1 starter(场景启动器) 1.4 官方文档 2 环境说明3 实现代码3.1 新建工程与模块3.2 加入依赖3.3 主程序文件3.4 业务代码3.5 运行测试3.6 部署打包3.7 命令行运行 1 SpringBoot简单…

YoloV7改进策略:AAAI 2024 最新的轴向注意力|即插即用,改进首选|全网首发,包含数据集和代码,开箱即用!

摘要 https://arxiv.org/pdf/2312.08866.pdf 本文提出了一种名为Multi-scale Cross-axis Attention(MCA)的方法,用于解决医学图像分割中的多尺度信息和长距离依赖性问题。该方法基于高效轴向注意力,通过计算两个平行轴向注意力之间的双向交叉注意力,更好地捕获全局信息。…

Windows安装部署nginx

1、官网下载安装包: 官网地址:https://nginx.org/en/download.html 下载好后,解压即可: 在nginx的配置文件是conf目录下的nginx.conf,默认配置的nginx监听的端口为80,如果本地80端口已经被使用则修改成其…

强大的Git客户端 GitKraken 中文 for Mac

GitKraken提供了直观的图形化界面,让用户可以轻松地进行版本控制操作,而无需使用命令行界面。您可以通过可视化的工作区、分支图和提交历史,更清晰地了解代码的状态和演变。 跨平台支持:GitKraken可在多个操作系统上运行&#xf…

k8s之Pod的基础(上)

什么是pod? pod是k8s中最小的资源管理组件 pod也是最小运行容器化的应用的资源管理对象 pod是一个抽象的概念,可以理解为一个或者多个容器化应用的集合 在一个pod当中运行一个容器时最常用的方式 在一个pod当中同时运行多个容器,在一个po…

Docker之镜像上传和下载

目录 1.镜像上传 1) 先上百度搜索阿里云 点击以下图片网站 2) 进行登录/注册 3) 使用支付宝...登录 4) 登录后会跳转到首页->点击控制台 5) 点击左上角的三横杠 6) 搜索容器镜像关键词->点击箭头所指 ​ 编辑 7) 进入之后点击实例列表 8) 点击个人实例进入我们的一个…

凯越510X ADV欧洲上市,售价5.5万

凯越510X其实并不是一台新车,就是国内上市的双摇臂版本的525X,国内售价33900元,不过国外上市只有一个色,就是下方蓝黑灰的颜色,这个配色方案感觉还不错。 凯越525X作为国产中量级ADV3剑客,口碑销量一直都是…

Linux | 分布式版本控制工具Git【版本管理 + 远程仓库克隆】

文章目录 一、前言二、有关git的相关历史介绍三、Git版本管理1、感性理解 —— 大学生实验报告2、程序员与产品经理3、张三的CEO之路 —— 版本管理工具的诞生 四、如何在Linux上使用Git1、创建仓库2、将仓库克隆到本地3、git三板斧① git add② git commit③ git push 4、有关…