一.SparkStreaming
163.SparkStreaming概述
Spark Streaming is an extension of the core Spark API that
enables scalable, high-throughput, fault-tolerant stream
processing of live data streams.
Spark Streaming
是核心
Spark API
的扩展,支持实时数据流的
可扩展、高吞吐量、容错流处理。
Spark Streaming
用于流式数据的处理。
Spark Streaming
支持
的数据输入源很多,例如:
Kafka
、
Flume
、
HDFS
、
Kinesis
和
TCP
套接字等等。数据输入后可以用
Spark
的高级函数(如
map
、
reduce
、
join
和
window
等进行运算。而结果也能保存在很多地方,
如
HDFS
,数据库和实时仪表板等。还可以可以在数据流上应用
Spark
的机器学习和图形处理算法。
Spark Streaming
接收实时输入数据流,并将数据分为多个批
次,然后由
Spark
引擎进行处理,以批量生成最终结果流。在内部,
它的工作原理如下:
164.SparkStreaming_架构
背压机制
(
了解
)
Spark 1.5
以前版本,用户如果要限制
Receiver
的数据接收速
率,可以通过设置静态配制参数
“spark.streaming.receiver.maxRate”
的值来实现,此举虽然可以通
过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会
引入其它问题。比如:
producer
数据生产高于
maxRate
,当前集群
处理能力也高于
maxRate
,这就会造成资源利用率下降等问题。
为了更好的协调数据接收速率与资源处理能力,
1.5
版本开始
Spark Streaming
可以动态控制数据接收速率来适配集群数据处理
能力。背压机制(即
Spark Streaming Backpressure
)
:
根据
JobScheduler
反馈作业的执行信息来动态调整
Receiver
数据接收
率。
通过属性
“spark.streaming.backpressure.enabled”
来控制是
否启用
backpressure
机制,默认值
false
,即不启用。
165.SparkStreaming_创建项目
<dependency>
<groupId>
org.apache.spark
</groupId>
<artifactId>
spark-core_2.12
</artifactId>
<version>
3.2.1
</version>
</dependency>
<dependency>
<groupId>
org.apache.spark
</groupId>
<artifactId>
spark
streaming_2.12
</artifactId>
<version>
3.2.1
</version>
</dependency>
![](https://i-blog.csdnimg.cn/direct/7172791580ef46de8243cc14bf490184.png)
166.SparkStreaming_WORDCOUNT
package
com
.
itbaizhan
.
streaming
import
org
.
apache
.
spark
.
SparkConf
import
org
.
apache
.
spark
.
streaming
.
dstream
.
{
DStream
,
ReceiverInputDStream
}
import
org
.
apache
.
spark
.
streaming
.{
Seconds
,
StreamingContext
}
object
StreamingWordCount
{
def
main
(
args
:
Array
[
String
]):
Unit
=
{
//1.
初始化
SparkConf
类的对象
val
conf
:
SparkConf
=
new
SparkConf
()
.
setMaster
(
"local[*]"
)
.
setAppName
(
"StreamingWordCount"
)
//2.
创建
StreamingContext
对象
val
ssc
=
new
StreamingContext
(
conf
,
Seconds
(
5
))
//3.
通过监控
node1
的
9999
端口创建
DStream
对象
val
lines
:
ReceiverInputDStream
[
String
]
=
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
7
测试
1
在
node1
上
2
在
IDEA
中运行程序
3
在
node1
上
4
查看
IDEA
控制台
ssc
.
socketTextStream
(
"node1"
,
9999
)
//4.
将每一行数据做切分,形成一个个单词
val
wordsDS
:
DStream
[
String
]
=
lines
.
flatMap
(
_
.
split
(
" "
))
//5.word=>(word,1)
val
wordOne
:
DStream
[(
String
,
Int
)]
=
wordsDS
.
map
((
_
,
1
))
//6.
将相同的
key
的
value
做聚合加
val
wordCount
:
DStream
[(
String
,
Int
)]
=
wordOne
.
reduceByKey
(
_
+
_
)
//7.
打印输出
wordCount
.
print
()
//8.
启动
ssc
.
start
()
//9.
等待执行停止
ssc
.
awaitTermination
()
}
}
167.SparkStreaming_数据抽象
168.SparkStreaming_RDD队列创建DSTREAM
169.SparkStreaming_自定义数据源一
需求:自定义数据源,实现监控指定的端口号,获取该端口号
内容。
需要继承
Receiver
,并实现
onStart
、
onStop
方法来自定义数据源采集。
package
com
.
itbaizhan
.
streaming
import
org
.
apache
.
spark
.
storage
.
StorageLevel
import
org
.
apache
.
spark
.
streaming
.
receiver
.
Receiver
import
java
.
io
.{
BufferedReader
,
InputStreamReader
}
import
java
.
net
.
Socket
import
java
.
nio
.
charset
.
StandardCharsets
1
2
3
4
5
6
7
8
9
13
class
ReceiverCustomer
(
host
:
String
,
port
:
Int
)
extends
Receiver
[
String
]
(
StorageLevel
.
MEMORY_ONLY
) {
//
最初启动的时候,调用该方法
//
作用:读数据并将数据发送给
Spark
override def
onStart
():
Unit
=
{
new
Thread
(
"Socket Receiver"
) {
override def
run
() {
receive
()
}
}.
start
()
}
override def
onStop
():
Unit
=
{}
//
读数据并将数据发送给
Spark
def
receive
():
Unit
=
{
//
创建一个
Socket
var
socket
:
Socket
=
new
Socket
(
host
,
port
)
//
定义一个变量,用来接收端口传过来的数据
var
input
:
String
=
null
//
创建一个
BufferedReader
用于读取端口传来的数
据
val
reader
=
new
BufferedReader
(
new
InputStreamReader
(
socket
.
getInputStream
,
StandardCharsets
.
UTF_8
))
//
读取数据
input
=
reader
.
readLine
()
//
当
receiver
没有关闭并且输入数据不为空,则循环
发送数据给
Spark
while
(
!
isStopped
()
&&
input
!=
null
) {
store
(
input
)
input
=
reader
.
readLine
()
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
14
使用自定义的数据源采集数据
}
//
跳出循环则关闭资源
reader
.
close
()
socket
.
close
()
//
重启任务
restart
(
"restart"
)
}
}
![](https://i-blog.csdnimg.cn/direct/99b0994b774b440095c4e0fb22a526df.png)
170.SparkStreaming_自定义数据源二
package
com
.
itbaizhan
.
streaming
import
org
.
apache
.
spark
.
SparkConf
import
org
.
apache
.
spark
.
streaming
.{
Seconds
,
StreamingContext
}
object
CustomerSource
{
def
main
(
args
:
Array
[
String
]):
Unit
=
{
//1.
初始化
Spark
配置信息
val
sparkConf
=
new
SparkConf
()
.
setMaster
(
"local[*]"
)
.
setAppName
(
"CustomerSource"
)
//2.
初始化
val
ssc
=
new
StreamingContext
(
sparkConf
,
Seconds
(
5
))
//3.
创建自定义
receiver
的
Streaming
val
lines
=
ssc
.
receiverStream
(
new
ReceiverCustomer
(
"node1"
,
9999
))
lines
.
print
()
//4.
启动
ssc
.
start
()
ssc
.
awaitTermination
()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
15
测试
1
在
node1
上
2
在
IDEA
中运行程序
3
在
node1
上
4
查看
IDEA
控制台
实时效果反馈
1.
关于
SparkStreaming
接收器自定义数据源的描述,错误的
是:
A
需要继承
Receiver
,并实现
onStart
、
onStop
方法来自定义
数据源采集。
B
Xxx extends Receiver[String](StorageLevel.MEMORY_ONLY)
接收到数据仅保存在
内存中。
C
onStart()
最初启动的时候,调用该方法;作用是读数据并将数
据发给
Spark
。
D
onStop()
不能空实现。
答案:
1=>D
可以空实现
SparkStreaming_DStream
无状态转换
}
}
20
21
[root@node1 ~]
# nc -lk 9999
1
[root@node1 ~]
# nc -lk 9999
aa
bb
cc
![](https://i-blog.csdnimg.cn/direct/db6d083da6b44b2a9f2128e0aa11fa90.png)
171.SparkStreaming_DSTREAM无状态转换
172.SparkStreaming_DSTREAM无状态转换transform
173.SparkStreaming_DSTREAM有状态转换
174.SparkStreaming_窗口操作reducebykeyandwidow概述
//reduceFunc–
结合和交换
reduce
函数
//windowDuration–
窗口长度;必须是此数据流批处理间
隔的倍数
//slideDuration–
窗口的滑动间隔
,
即新数据流生成
RDD
的间隔
def
reduceByKeyAndWindow
(
reduceFunc
: (
V
,
V
)
=>
V
,
windowDuration
:
Duration
,
slideDuration
:
Duration
):
DStream
[(
K
,
V
)]
=
ssc
.
withScope
{
//partitioner–
用于控制新数据流中每个
RDD
分区的
分区器
reduceByKeyAndWindow
(
reduceFunc
,
windowDuration
,
slideDuration
,
defaultPartitioner
())
}
175.SparkStreaming_窗口操作reducebykeyandwidow实战
176.SparkStreaming_窗口操作reducebykeyandwidow优化
177.SparkStreaming_窗口操作WINDOW
178.SparkStreaming_输出
179.SparkStreaming_优雅关闭一
流式任务需要
7*24
小时执行,但是有时涉及到升级代码需要主
动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所
以配置优雅的关闭就显得至关重要了。使用外部文件系统来控制内
部程序关闭。
package
com
.
itbaizhan
.
streaming
import
org
.
apache
.
spark
.
SparkConf
import
org
.
apache
.
spark
.
streaming
.
dstream
.
ReceiverI
nputDStream
import
org
.
apache
.
spark
.
streaming
.{
Seconds
,
StreamingContext
}
object
StreamingStopDemo
{
def
createSSC
():
StreamingContext
=
{
val
sparkConf
:
SparkConf
=
new
SparkConf
().
setMaster
(
"local[*]"
).
setAppName
(
"StreamingStop"
)
//
设置优雅的关闭
sparkConf
.
set
(
"spark.streaming.stopGraceful
lyOnShutdown"
,
"true"
)
1
2
3
4
5
6
7
8
9
34
val
ssc
=
new
StreamingContext
(
sparkConf
,
Seconds
(
5
))
ssc
.
checkpoint
(
"./ckp"
)
ssc
}
def
main
(
args
:
Array
[
String
]):
Unit
=
{
val
ssc
:
StreamingContext
=
StreamingContext
.
getActiveOrCreate
(
"./ckp"
,
()
=>
createSSC
())
new
Thread
(
new
StreamingStop
(
ssc
)).
start
()
val
line
:
ReceiverInputDStream
[
String
]
=
ssc
.
socketTextStream
(
"node1"
,
9999
)
line
.
print
()
ssc
.
start
()
ssc
.
awaitTermination
()
}
}
10
11
12
13
14
15
16
17
18
19
20
21
22
package
com
.
itbaizhan
.
streaming
import
org
.
apache
.
hadoop
.
conf
.
Configuration
import
org
.
apache
.
hadoop
.
fs
.{
FileSystem
,
Path
}
import
org
.
apache
.
spark
.
streaming
.
{
StreamingContext
,
StreamingContextState
}
import
java
.
net
.
URI
class
StreamingStop
(
ssc
:
StreamingContext
)
extends
Runnable
{
override def
run
():
Unit
=
{
val
fs
:
FileSystem
=
FileSystem
.
get
(
new
URI
(
"hdfs://node2:9820"
),
new
Configuration
(),
"root"
)
1
2
3
4
5
6
7
8
9
35
测试
1
启动
hadoop
集群
2
在
node1
上:
nc -lk 9999
3
运行程序
4
在
node2
:
5
在
node1
上:
while
(
true
) {
try
Thread
.
sleep
(
5000
)
catch
{
case
e
:
InterruptedException
=>
e
.
printStackTrace
()
}
val
state
:
StreamingContextState
=
ssc
.
getState
if
(
state
==
StreamingContextState
.
ACTIVE
) {
val
bool
:
Boolean
=
fs
.
exists
(
new
Path
(
"hdfs://node2:9820/stopSpark"
))
if
(
bool
) {
ssc
.
stop
(
stopSparkContext
=
true
,
stopGracefully
=
true
)
System
.
exit
(
0
)
}
}
}
}
}
![](https://i-blog.csdnimg.cn/direct/7d07345891f94a51be637443735b340a.png)
180.SparkStreaming_优雅关闭二
181.SparkStreaming_优雅关闭测试
182.SparkStreaming_整合KAFKA模式
183.SparkStreaming_整合kafka开发一
导入依赖:
代码编写:
<dependency>
<groupId>
org.apache.spark
</groupId>
<artifactId>
spark-streaming-kafka-0-
10_2.12
</artifactId>
<version>
3.2.1
</version>
</dependency>
<dependency>
<groupId>
com.fasterxml.jackson.core
</groupI
d>
<artifactId>
jackson-core
</artifactId>
<version>
2.12.7
</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
package
com
.
itbaizhan
.
streaming
import
org
.
apache
.
kafka
.
clients
.
consumer
.
{
ConsumerConfig
,
ConsumerRecord
}
import
org
.
apache
.
spark
.
SparkConf
import
org
.
apache
.
spark
.
streaming
.
dstream
.
{
DStream
,
InputDStream
}
1
2
3
4
40
import
org
.
apache
.
spark
.
streaming
.
kafka010
.
{
ConsumerStrategies
,
KafkaUtils
,
LocationStrategies
}
import
org
.
apache
.
spark
.
streaming
.{
Seconds
,
StreamingContext
}
object
DirectAPIDemo
{
def
main
(
args
:
Array
[
String
]):
Unit
=
{
//1.
创建
SparkConf
val
sparkConf
:
SparkConf
=
new
SparkConf
()
.
setMaster
(
"local[*]"
)
.
setAppName
(
"DirectAPIDemo"
)
//2.
创建
StreamingContext
val
ssc
=
new
StreamingContext
(
sparkConf
,
Seconds
(
3
))
//3.
定义
Kafka
参数
val
kafkaPara
:
Map
[
String
,
Object
]
=
Map
[
String
,
Object
](
ConsumerConfig
.
BOOTSTRAP_SERVERS_CONFIG
->
"node2:9092,node3:9092,node4:9092"
,
ConsumerConfig
.
GROUP_ID_CONFIG
->
"itbaizhan"
,
"key.deserializer"
->
"org.apache.kafka.common.serialization.Strin
gDeserializer"
,
"value.deserializer"
->
"org.apache.kafka.common.serialization.Strin
gDeserializer"
)
//4.
读取
Kafka
数据创建
DStream
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
41
SparkStreaming_
整合
Kafka
测试
val
kafkaDStream
:
InputDStream
[
ConsumerRecord
[
String
,
String
]]
=
KafkaUtils
.
createDirectStream
[
String
,
String
](
ssc
,
//
由框架自动选择位置匹配
LocationStrategies
.
PreferConsistent
,
//
消费者策略
主题:
topicKafka,kafka
参
数:
kafkaPara
ConsumerStrategies
.
Subscribe
[
String
,
String
](
Set
(
"topicKafka"
),
kafkaPara
))
//5.
将每条消息的
KV
取出
//val valueDStream: DStream[String] =
kafkaDStream.map(record => record.value())
val
valueDStream
:
DStream
[
String
]
=
kafkaDStream
.
map
(
_
.
value
())
//6.
计算
WordCount
valueDStream
.
flatMap
(
_
.
split
(
" "
))
.
map
((
_
,
1
))
.
reduceByKey
(
_
+
_
)
.
print
()
//7.
开启任务
ssc
.
start
()
ssc
.
awaitTermination
()
}
}
![](https://i-blog.csdnimg.cn/direct/41f2e286a62640dd9853abd9452e8ed4.png)
184.SparkStreaming_整合kafka开发二
185.SparkStreaming_整合kafka测试
二.PB级百战出行网约车项目一
1.百战出行
项目需求分析
数据采集平台搭建
1
订单数据实时分析计算乘车人数和订单数
2
虚拟车站
3
订单交易数据统计分析
4
司机数据统计分析
5
用户数据统计分析
6
1
名称
框架
数据采集传输
MaxWell
、
Kafka
数据存储
Hbase
、
MySQL
、
Redis
数据计算
Spark
数据库可视化
Echarts
项目技术点
掌握数据从终端
(APP)
的产生到数据中台处理再到大数据后台处理的整个链路技术。
1
Spark
自定义数据源实现
HBase
数据进行剪枝分析计算。
2
基于
Phoenix
实战海量数据秒查询。
3
平台新用户和留存用户分析。
4
空间索引算法
Uber h3
分析与蜂窝六边形区域订单分析。