背景:
Spark Streaming是准实时流处理框架,处理响应时间一般以分钟为单位,处理实时数据的延迟时间一般是秒级别的;其他容易混淆的例如Storm实时流处理框架,处理响应是毫秒级。
在我们项目实施选择流框架时需要看具体业务场景:使用MapReduce和Spark进行大数据处理,能够解决很多生产环境下的计算问题,但是随着业务逐渐丰富,数据逐渐丰富,这种批处理在很多场景已经不能满足生产环境的需要了,体现例如①离线计算一般就会建立一个数据仓库,数据量大的情况下,计算耗时也会很长。②例如一个业务场景,需要在根绝客户访问一个网站时的浏览、点击行为,实时做出一些业务上的反馈,时延太长这个数据也流失了很多价值。③现在技术发展的需要,许多机器学习和人工智能应用需要大量的实时数据进行训练和优化。
数据是源源不断产生,计算程序也是一直存在的,即实时计算。
1.流式计算和批处理的关系
批处理和流式本来就存在某种微妙的关系,不是完全隔离的。Spark Streaming充分利用了这种微妙关系,将其发挥到极致。批量处理是Spark Streaming流式处理的一个窗口特别大的特例,实际上,如果我们定时执行某个Spark程序,或者每天执行一次,也相当于是流失计算,不过是以天为事件窗口。但是如果细加观察,Spark Streaming的每个batch又都是一个批处理,只是因为这个批处理可以足够小,看起来就像数据在真实流动一样,所以我们也称之为流式处理。
2.主流的流式计算框架
流式计算最具代表性的框架之一就是Storm。在Storm中,先要设计一个用于实时计算的图状结构,我们称之为拓扑(topology)。这个拓扑将会被提交给集群,由集群中的主控节点(Master node)分发代码,将任务分配给工作节点(Worker node)执行。一个拓扑中包括spout和bolt两种角色,其中spout发送消息,负责将数据流以tuple元组的形式发送出去;而bolt则负责转换这些数据流,在bolt中可以完成计算、过滤等操作,bolt自身也可以随机将数据发送给其他bolt。由spout发射出的tuple是不可变数组,对应着固定的键值对。
Spark Streaming是核心Spark API的一个扩展,它并不会像Storm那样一次一个地处理数据流,而是在处理前按时间间隔预先将其切分为一段一段的批处理作业。Spark针对持续性数据流的抽象称为DStream(DiscretizedStream),一个DStream是一个微批处理(micro-batching)的RDD(弹性分布式数据集);RDD则是一种分布式数据集,能够以两种方式并行运作,分别是任意函数和滑动窗口数据的转换。
除了Spark,Flink也是类似Spark的计算框架,Flink是一个针对流数据和批数据的分布式处理引擎。它主要是由Java代码实现。对Flink而言,其所要处理的主要场景就是流数据,批数据只是流数据的一个极限特例而已。Flink会把所有任务当成流来处理,这也是其最大的特点。Flink可以支持本地的快速迭代,以及一些环形的迭代任务,并且Flink可以定制化内存管理。在这点,如果要对比Flink和Spark的话,Flink并没有将内存完全交给应用层。这也是为什么Spark相对于Flink,更容易出现OOM的原因(out ofmemory)。就框架本身与应用场景来说,Flink更相似与Storm。
3.自定义流式计算举例
为了更好理解流式计算思想,我们来举例一个更具体的流式计算的程序。常见的实时计算需要有数据源、消息队列、数据处理。我们的数据源来自Socket,消息队列为了保证线程安全,我们使用Java自带的BlockingQueue,而数据处理就通过一个独立线程读取消息队列的内容处理,结果我们放在ConcurrentHashMap中,保证线程安全。
Spark Streaming的基本原理是将输入数据流以时间片(秒级)为单位进行拆分,然后以类似批处理的方式处理每个时间片数据,见下图:
首先,Spark Streaming把实时输入数据流以时间片Δt(如1秒)为单位切分成块。Spark Streaming会把每块数据作为一个RDD,并使用RDD操作处理每一小块数据。每个块都会生成一个Spark Job处理,最终结果也返回多个块。使用Spark Streaming编写的程序与编写Spark程序非常相似,在Spark程序中,主要通过操作RDD提供的接口,如Map、Reduce、Filter等,实现数据的批处理。而在Spark Streaming中,则通过操作DStream(表示数据流的RDD序列)提供的接口,这些接口和RDD提供的接口类似。下图显示了Spark Streaming程序到Spark Job的转换:
Spark Streaming把程序中对DStream的操作转换为DStream Graph,对于每个时间片,DSteam Graph都会产生一个RDD Graph;针对每个输出操作(如Print、Foreach等),SparkStreaming都会创建一个Spark Action;对于每个Spark Action,Spark Streaming都会产生一个相应的Spark Job,Spark会调度Task到相应的Spark Executor上执行。
Spark Streaming的一些常用组件如下:
1.StreamingContext:Spark Streaming中Driver端的上下文对象,初始化的时候会构造Spark Streaming应用程序需要使用的组件,比如DStreamGraph、JobScheduler 等
2.JobGenerator:主要是从DStream产生Job,且根据指定时间执行checkpoint。它维护着一个定时器,该定时器在批处理时间到来的时候会生成作业的操作。
3.JobScheduler:主要用于调度Job。JobScheduler主要通过JobGenerator产生Job,并且通过ReceiverTracker管理流数据接收器Receiver。
4.ReceiverTracker:管理各个Executor上的Receiver的元数据。它在启动的时候,需要根据流数据接收器Receiver分发策略通知对应的Executor中的ReceiverSupervisor(接收器管理着)启动,然后再由ReceiverSupervisor来启动对应节点的Receiver。
数据源:
数据源程序,使用Java编写一个程序,使用socket来向7777端口发送数据:
Package test;
import java.io.Bufferedwriter;
import java.io.IOException;
import java.io.Outputstream;
import java.net.ServerSocket;
import java.net.socket;
import java.io.Outputstreamwriter;
public class DataGenerator{
public static void main(string[] args) throws IOException{
//设置发送端口为7777
ServerSocket ss = new ServerSocket(7777);
Socket accept = ss.accept();
Outputstream outputstream = accept.getoutputstream();
Bufferedwriter writer = new Bufferedwriter(new Outputstreamhriter(outputstream));
//发送的字符串
String[] words = new String[]{"hello Hadoop\n", "hello spark\n", "world hello\n", "hello\n", "hadoop\n"};
while (true){
try{
Thread.sleep(1000);
}catch (InterruptedException e){
}
//随机发送一个字符串
writer.write(words[(int)(Math.random() * 5)]);
writer.flush();
}
}
}
数据接收处理:
import org.apache.spark.streaming.streamingContext
import org.apache.spark.streaming.streamingContext
import org.apache.spark.streaming.dstream.Dstream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.seconds
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark. storage.storageLevel
val sparkconf = new SparkConf().setApplame("NetworkWordCount").setMaster("localhost")
//设置每秒处理一次
val ssc = new streamingContext(sc, Seconds(1))
//使用socket发送数据,ip为localhost,端门为7777
val lines =ssc.socketTextstream("localhost",7777,StorageLevel.MEMORY_AND_DISK_SER)
//flatMap以空格分隔
val words = lines.flatmap(_.split(" "))
//对每一组数据各个字符串数量累加
val wordCounts = words.map(x => (x,1)).reduceBykey(_+_)
//对每一组数据各个字符串数量累加,每10秒一次,统计最近30秒的结果
val wordCounts = words.map(x => (x,1)).reduceByKeyAndWindow((a:Int,b:Int) => (a+b),Seconds(30),Seconds(10))
//输出
wordCounts.print()
ssc.start()
ssc.awaitTermination()
运行数据源:
把刚刚的Java程序打包,用spark-submit执行,我们将打包好的程序放到某一个目录,例如/opt下,命名为hadoop-streaming.jar,使用spark-submit提交(命令制定类名、主机名、UI端口号、Jar包路径):
nohup .../你的路径/bin/spark-submit –class test.DataGenerator –master spark://localhost:9000 /opt/Hadoop-streaming.jar &
运行数据接收、处理程序:
进入spark-shell来运行上面写好的“数据接收处理”的代码,可收到结果。