一、介绍
1. 不同的数据处理
- 从数据处理的方式:
- 流式数据处理(Streaming)
- 批量数据处理(Batch)
- 从数据处理的延迟:
- 实时数据处理(毫秒级别)
- 离线数据处理(小时或天级别)
2. 简介
- SparkStreaming 是一个准实时(秒或分钟级别)、微批量的数据处理框架
- SparkStreaming 支持的很多数据输入源,如: Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等。数据输入后可以用 Spark 的高度抽象原语,如: map、 reduce、 join、 window 等进行运算。结果能保存在很多地方,如 HDFS,数据库等
- SparkStreaming 使用离散化流 (discretized stream) 作为抽象表示,称为 DStream,它是对 RDD 在实时数据处理场景的一种封装
3. 特点
- 易用
- 容错
- 易整合到 Spark 体系
二、基本架构
1. 背压机制
- Spark 1.5 以前版本:通过设置静态配制参数
spark.streaming.receiver.maxRate
来限制 Receiver 的数据接收速率,来解决生产和消费速率不对等造成的内存溢出等问题,但当数据生产和数据消费的能力都高于 maxRate 时会造成资源利用率下降等问题 - Spark 1.5 版本及以后版本:为了动态控制数据接收速率来适配集群数据处理能力,引入了背压机制 (Spark Streaming Backpressure),即根据 JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率
- 通过属性
spark.streaming.backpressure.enabled
来配置启用 backpressure 机制,默认值为 false,即不启用
三、入门 WordCount 案例
需求:使用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并统计不同单词出现的次数
1. 引入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
2. 代码实现
object SparkStreamingWC {
def main(args: Array[String]): Unit = {
// 1.创建 SparkStreaming 环境对象
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
/*
创建 StreamingContext 对象需要传递两个参数
1.SparkConf:配置对象
2.Duration:批处理的周期,即数据采集周期,单位为毫秒,内置有 Seconds/Minute 等对象
*/
val ssc = new StreamingContext(conf, Seconds(3))
// 2.逻辑处理
val line: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val words = line.flatMap(_.split(" "))
val wordAsOne = words.map((_, 1))
val wordCount: DStream[(String, Int)] = wordAsOne.reduceByKey(_ + _)
wordCount.print()
// 3.运行采集器并等待关闭
/*
采集器是一个长期运行的任务,所以不能关闭 ssc,也不能让 main 方法执行完毕
*/
ssc.start()
ssc.awaitTermination()
}
}
3. 测试
- 打开 cmd 命令窗口,执行
nc -lp 9999
命令(Linux 下为nc -lk 999
) - 运行程序 main 方法
- 在窗口中输入测试字符串(以空格分隔),观察程序命令行输出结果