大数据学习之SparkStreaming、PB级百战出行网约车项目一

一.SparkStreaming

163.SparkStreaming概述

Spark Streaming is an extension of the core Spark API that
enables scalable, high-throughput, fault-tolerant stream
processing of live data streams.
Spark Streaming 是核心 Spark API 的扩展,支持实时数据流的
可扩展、高吞吐量、容错流处理。
Spark Streaming 用于流式数据的处理。 Spark Streaming 支持
的数据输入源很多,例如: Kafka Flume HDFS Kinesis TCP
套接字等等。数据输入后可以用 Spark 的高级函数(如 map
reduce join window 等进行运算。而结果也能保存在很多地方,
HDFS ,数据库和实时仪表板等。还可以可以在数据流上应用
Spark 的机器学习和图形处理算法。
Spark Streaming 接收实时输入数据流,并将数据分为多个批
次,然后由 Spark 引擎进行处理,以批量生成最终结果流。在内部,
它的工作原理如下:

164.SparkStreaming_架构

背压机制 ( 了解 ) Spark 1.5 以前版本,用户如果要限制 Receiver 的数据接收速
率,可以通过设置静态配制参数
“spark.streaming.receiver.maxRate” 的值来实现,此举虽然可以通
过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会
引入其它问题。比如: producer 数据生产高于 maxRate ,当前集群
处理能力也高于 maxRate ,这就会造成资源利用率下降等问题。
为了更好的协调数据接收速率与资源处理能力, 1.5 版本开始
Spark Streaming 可以动态控制数据接收速率来适配集群数据处理
能力。背压机制(即 Spark Streaming Backpressure : 根据
JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收
率。
通过属性 “spark.streaming.backpressure.enabled” 来控制是
否启用 backpressure 机制,默认值 false ,即不启用。

165.SparkStreaming_创建项目

<dependency>
<groupId> org.apache.spark </groupId>
<artifactId> spark-core_2.12 </artifactId>
<version> 3.2.1 </version>
</dependency>
<dependency>
   
<groupId> org.apache.spark </groupId>
   
<artifactId> spark
streaming_2.12 </artifactId>
   
<version> 3.2.1 </version>
</dependency>

166.SparkStreaming_WORDCOUNT

package com . itbaizhan . streaming
import org . apache . spark . SparkConf
import org . apache . spark . streaming . dstream .
{ DStream , ReceiverInputDStream }
import org . apache . spark . streaming .{ Seconds ,
StreamingContext }
object StreamingWordCount {
def main ( args : Array [ String ]): Unit = {
   
//1. 初始化 SparkConf 类的对象
   
val conf : SparkConf = new SparkConf ()
    . setMaster ( "local[*]" )
    . setAppName ( "StreamingWordCount" )
   
//2. 创建 StreamingContext 对象
   
val ssc = new StreamingContext ( conf ,
Seconds ( 5 ))
   
//3. 通过监控 node1 9999 端口创建 DStream 对象
   
val lines : ReceiverInputDStream [ String ]
=
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
7 测试
1
node1
2
IDEA 中运行程序
3
node1
4
查看 IDEA 控制台
     
ssc . socketTextStream ( "node1" , 9999 )
   
//4. 将每一行数据做切分,形成一个个单词
   
val wordsDS : DStream [ String ] =
lines . flatMap ( _ . split ( " " ))
   
//5.word=>(word,1)
   
val wordOne : DStream [( String , Int )] =
wordsDS . map (( _ , 1 ))
   
//6. 将相同的 key value 做聚合加
   
val wordCount : DStream [( String , Int )] =
wordOne . reduceByKey ( _ + _ )
   
//7. 打印输出
   
wordCount . print ()
   
//8. 启动
   
ssc . start ()
   
//9. 等待执行停止
   
ssc . awaitTermination ()
}
}

167.SparkStreaming_数据抽象

168.SparkStreaming_RDD队列创建DSTREAM

169.SparkStreaming_自定义数据源一

需求:自定义数据源,实现监控指定的端口号,获取该端口号
内容。
需要继承 Receiver ,并实现 onStart onStop 方法来自定义数据源采集。
package com . itbaizhan . streaming
import org . apache . spark . storage . StorageLevel
import
org . apache . spark . streaming . receiver . Receiver
import java . io .{ BufferedReader ,
InputStreamReader }
import java . net . Socket
import java . nio . charset . StandardCharsets
1
2
3
4
5
6
7
8
9
13 class ReceiverCustomer ( host : String , port :
Int ) extends Receiver [ String ]
( StorageLevel . MEMORY_ONLY ) {
// 最初启动的时候,调用该方法
// 作用:读数据并将数据发送给 Spark
override def onStart (): Unit = {
   
new Thread ( "Socket Receiver" ) {
     
override def run () {
       
receive ()
    }
  }. start ()
}
override def onStop (): Unit = {}
// 读数据并将数据发送给 Spark
def receive (): Unit = {
   
// 创建一个 Socket
   
var socket : Socket = new Socket ( host ,
port )
   
// 定义一个变量,用来接收端口传过来的数据
   
var input : String = null
   
// 创建一个 BufferedReader 用于读取端口传来的数
   
val reader = new BufferedReader ( new
InputStreamReader ( socket . getInputStream ,
StandardCharsets . UTF_8 ))
   
// 读取数据
   
input = reader . readLine ()
   
// receiver 没有关闭并且输入数据不为空,则循环
发送数据给 Spark
   
while ( ! isStopped () && input != null ) {
     
store ( input )
     
input = reader . readLine ()
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
14 使用自定义的数据源采集数据
  }
   
// 跳出循环则关闭资源
   
reader . close ()
   
socket . close ()
   
// 重启任务
   
restart ( "restart" )
}
}

170.SparkStreaming_自定义数据源二

package com . itbaizhan . streaming
import org . apache . spark . SparkConf
import org . apache . spark . streaming .{ Seconds ,
StreamingContext }
object CustomerSource {
def main ( args : Array [ String ]): Unit = {
   
//1. 初始化 Spark 配置信息
   
val sparkConf = new SparkConf ()
    . setMaster ( "local[*]" )
    . setAppName ( "CustomerSource" )
   
//2. 初始化
   
val ssc = new
StreamingContext ( sparkConf , Seconds ( 5 ))
   
//3. 创建自定义 receiver Streaming
   
val lines = ssc . receiverStream ( new
ReceiverCustomer ( "node1" , 9999 ))
   
lines . print ()
   
//4. 启动
   
ssc . start ()
   
ssc . awaitTermination ()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
15 测试
1
node1
2
IDEA 中运行程序
3
node1
4
查看 IDEA 控制台
实时效果反馈
1. 关于 SparkStreaming 接收器自定义数据源的描述,错误的
是:
A
需要继承 Receiver ,并实现 onStart onStop 方法来自定义
数据源采集。
B
Xxx extends Receiver[String](StorageLevel.MEMORY_ONLY)
接收到数据仅保存在
内存中。
C
onStart()
最初启动的时候,调用该方法;作用是读数据并将数
据发给 Spark
D
onStop()
不能空实现。
答案:
1=>D 可以空实现
SparkStreaming_DStream 无状态转换
}
}
20
21
[root@node1 ~] # nc -lk 9999
1
[root@node1 ~] # nc -lk 9999
aa
bb
cc

171.SparkStreaming_DSTREAM无状态转换

172.SparkStreaming_DSTREAM无状态转换transform

173.SparkStreaming_DSTREAM有状态转换

174.SparkStreaming_窗口操作reducebykeyandwidow概述

//reduceFunc– 结合和交换 reduce 函数
//windowDuration– 窗口长度;必须是此数据流批处理间
隔的倍数
//slideDuration– 窗口的滑动间隔 , 即新数据流生成 RDD
的间隔
def reduceByKeyAndWindow (
   
reduceFunc : ( V , V ) => V ,
   
windowDuration : Duration ,
   
slideDuration : Duration
): DStream [( K , V )] = ssc . withScope {
   
//partitioner– 用于控制新数据流中每个 RDD 分区的
分区器
reduceByKeyAndWindow ( reduceFunc ,
windowDuration , slideDuration ,
defaultPartitioner ())
}

175.SparkStreaming_窗口操作reducebykeyandwidow实战

176.SparkStreaming_窗口操作reducebykeyandwidow优化

177.SparkStreaming_窗口操作WINDOW

178.SparkStreaming_输出

179.SparkStreaming_优雅关闭一

流式任务需要 7*24 小时执行,但是有时涉及到升级代码需要主
动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所
以配置优雅的关闭就显得至关重要了。使用外部文件系统来控制内
部程序关闭。
package com . itbaizhan . streaming
import org . apache . spark . SparkConf
import
org . apache . spark . streaming . dstream . ReceiverI
nputDStream
import org . apache . spark . streaming .{ Seconds ,
StreamingContext }
object StreamingStopDemo {
def createSSC (): StreamingContext = {
   
val sparkConf : SparkConf = new
SparkConf (). setMaster ( "local[*]" ). setAppName
( "StreamingStop" )
   
// 设置优雅的关闭
sparkConf . set ( "spark.streaming.stopGraceful
lyOnShutdown" , "true" )
1
2
3
4
5
6
7
8
9
34    
val ssc = new
StreamingContext ( sparkConf , Seconds ( 5 ))
   
ssc . checkpoint ( "./ckp" )
   
ssc
}
def main ( args : Array [ String ]): Unit = {
   
val ssc : StreamingContext =
StreamingContext . getActiveOrCreate ( "./ckp" ,
() => createSSC ())
   
new Thread ( new
StreamingStop ( ssc )). start ()
   
val line : ReceiverInputDStream [ String ] =
ssc . socketTextStream ( "node1" , 9999 )
   
line . print ()
   
ssc . start ()
   
ssc . awaitTermination ()
}
}
10
11
12
13
14
15
16
17
18
19
20
21
22
package com . itbaizhan . streaming
import org . apache . hadoop . conf . Configuration
import org . apache . hadoop . fs .{ FileSystem ,
Path }
import org . apache . spark . streaming .
{ StreamingContext , StreamingContextState }
import java . net . URI
class StreamingStop ( ssc : StreamingContext )
extends Runnable {
override def run (): Unit = {
   
val fs : FileSystem = FileSystem . get ( new
URI ( "hdfs://node2:9820" ),
     
new Configuration (), "root" )
1
2
3
4
5
6
7
8
9
35 测试
1
启动 hadoop 集群
2
node1 上: nc -lk 9999
3
运行程序
4
node2
5
node1 上:
   
while ( true ) {
     
try
       
Thread . sleep ( 5000 )
     
catch {
       
case e : InterruptedException =>
         
e . printStackTrace ()
    }
     
val state : StreamingContextState =
ssc . getState
     
if ( state ==
StreamingContextState . ACTIVE ) {
       
val bool : Boolean = fs . exists ( new
Path ( "hdfs://node2:9820/stopSpark" ))
       
if ( bool ) {
         
ssc . stop ( stopSparkContext = true ,
stopGracefully = true )
         
System . exit ( 0 )
      }
    }
  }
}
}

180.SparkStreaming_优雅关闭二

181.SparkStreaming_优雅关闭测试

182.SparkStreaming_整合KAFKA模式

183.SparkStreaming_整合kafka开发一

导入依赖:
代码编写:
<dependency>
   
<groupId> org.apache.spark </groupId>
   
<artifactId> spark-streaming-kafka-0-
10_2.12 </artifactId>
   
<version> 3.2.1 </version>
</dependency>
<dependency>
<groupId> com.fasterxml.jackson.core </groupI
d>
   
<artifactId> jackson-core </artifactId>
   
<version> 2.12.7 </version>
</dependency>
1
2
3
4
5
6
7
8
9
10
package com . itbaizhan . streaming
import org . apache . kafka . clients . consumer .
{ ConsumerConfig , ConsumerRecord }
import org . apache . spark . SparkConf
import org . apache . spark . streaming . dstream .
{ DStream , InputDStream }
1
2
3
4
40 import org . apache . spark . streaming . kafka010 .
{ ConsumerStrategies , KafkaUtils ,
LocationStrategies }
import org . apache . spark . streaming .{ Seconds ,
StreamingContext }
object DirectAPIDemo {
def main ( args : Array [ String ]): Unit = {
   
//1. 创建 SparkConf
   
val sparkConf : SparkConf = new
SparkConf ()
    . setMaster ( "local[*]" )
    . setAppName ( "DirectAPIDemo" )
   
//2. 创建 StreamingContext
   
val ssc = new
StreamingContext ( sparkConf , Seconds ( 3 ))
   
//3. 定义 Kafka 参数
   
val kafkaPara : Map [ String , Object ] =
Map [ String , Object ](
   
ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG ->
"node2:9092,node3:9092,node4:9092" ,
     
ConsumerConfig . GROUP_ID_CONFIG ->
"itbaizhan" ,
     
"key.deserializer" ->
"org.apache.kafka.common.serialization.Strin
gDeserializer" ,
     
"value.deserializer" ->
"org.apache.kafka.common.serialization.Strin
gDeserializer"
  )
   
//4. 读取 Kafka 数据创建 DStream
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
41 SparkStreaming_ 整合 Kafka 测试
   
val kafkaDStream :
InputDStream [ ConsumerRecord [ String , String ]]
=
     
KafkaUtils . createDirectStream [ String ,
String ]( ssc ,
       
// 由框架自动选择位置匹配
       
LocationStrategies . PreferConsistent ,
       
// 消费者策略 主题: topicKafka,kafka
数: kafkaPara
       
ConsumerStrategies . Subscribe [ String ,
String ]( Set ( "topicKafka" ), kafkaPara ))
   
//5. 将每条消息的 KV 取出
   
//val valueDStream: DStream[String] =
kafkaDStream.map(record => record.value())
   
val valueDStream : DStream [ String ] =
kafkaDStream . map ( _ . value ())
   
//6. 计算 WordCount
   
valueDStream . flatMap ( _ . split ( " " ))
    . map (( _ , 1 ))
    . reduceByKey ( _ + _ )
    . print ()
   
//7. 开启任务
   
ssc . start ()
   
ssc . awaitTermination ()
}
}

184.SparkStreaming_整合kafka开发二

185.SparkStreaming_整合kafka测试

二.PB级百战出行网约车项目一

1.百战出行

项目需求分析
数据采集平台搭建
1
订单数据实时分析计算乘车人数和订单数
2
虚拟车站
3
订单交易数据统计分析
4
司机数据统计分析
5
用户数据统计分析
6
1 名称
框架
数据采集传输
MaxWell Kafka
数据存储
Hbase MySQL Redis
数据计算
Spark
数据库可视化
Echarts
项目技术点
掌握数据从终端 (APP) 的产生到数据中台处理再到大数据后台处理的整个链路技术。
1
Spark 自定义数据源实现 HBase 数据进行剪枝分析计算。
2
基于 Phoenix 实战海量数据秒查询。
3
平台新用户和留存用户分析。
4
空间索引算法 Uber h3 分析与蜂窝六边形区域订单分析。

2.百战出行架构设计

3.环境搭建_HBASE安装部署

4.环境搭建_KAFKA安装部署

5.环境搭建_MYSQL安装部署

6.环境搭建_REDIS安装部署

7.构建父工程

8.订单监控_收集订单数据

9.订单监控_订单数据分析

10.订单监控_存储数据之读取数据

11.订单监控_存储数据之保持数据至MYSQL

12.订单监控_MAXWELL介绍

13.订单监控_MAXWELL安装

14.订单监控_SPARK_STREAMING整合KAFKA_上

15.订单监控_SPARK_STREAMING整合KAFKA_下

16.订单监控_实时统计订单总数之消费订单数据

17.订单监控_实时统计订单总数之构建订单解析器

18.订单监控_实时统计订单总数之解析订单JSON数据

19.订单监控_实时统计订单总数

20.订单监控_实时统计乘车人数统计

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/968021.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

day5QT套接字通信

Widget.cpp #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);objtimer new QTimer (this);//连接定时器的timeout信号到启动的槽函数//connect(objtimer,&…

【免费】2007-2020年各省医疗卫生支出数据

2007-2020年各省医疗卫生支出数据 1、时间&#xff1a;2007-2020年 2、来源&#xff1a;国家统计局、统计年鉴 3、指标&#xff1a;行政区划代码、地区名称、年份、医疗卫生支出 4、范围&#xff1a;31省 5、指标说明&#xff1a;地方财政医疗卫生支出是指地方ZF从其财政预…

本地基于GGUF部署的DeepSeek实现轻量级调优之二:检索增强生成(RAG)

前文&#xff0c;我们在本地windows电脑基于GGUF文件&#xff0c;部署了DeepSeek R1 1.5B模型&#xff0c;如果想在离线模式下加载本地的DeepSeek模型自行对进行训练时&#xff0c;是不能直接使用GGUF文件进行训练的&#xff0c;但是可以对模型进行微调&#xff0c;以下说的是第…

16vue3实战-----动态路由

16vue3实战-----动态路由 1.思路2.实现2.1创建所有的vue组件2.2创建所有的路由对象文件(与上述中的vue文件一一对应)2.3动态加载所有的路由对象文件2.4根据菜单动态映射正确的路由2.5解决main页面刷新的问题2.6解决main的第一个页面匹配显示的问题2.7根据path匹配menu 1.思路 …

WPS如何接入DeepSeek(通过JS宏调用)

WPS如何接入DeepSeek 一、文本扩写二、校对三、翻译 本文介绍如何通过 WPS JS宏调用 DeepSeek 大模型&#xff0c;实现自动化文本扩写、校对和翻译等功能。 一、文本扩写 1、随便打开一个word文档&#xff0c;点击工具栏“工具”。 2、点击“开发工具”。 3、点击“查看代码”…

前端快速生成接口方法

大家好&#xff0c;我是苏麟&#xff0c;今天聊一下OpenApi。 官网 &#xff1a; umijs/openapi - npm 安装命令 npm i --save-dev umijs/openapi 在根目录&#xff08;项目目录下&#xff09;创建文件 openapi.config.js import { generateService } from umijs/openapi// 自…

云消息队列 ApsaraMQ Serverless 演进:高弹性低成本、更稳定更安全、智能化免运维

如今&#xff0c;消息队列已成为分布式架构中不可或缺的关键服务&#xff0c;为电商、物联网、游戏和教育等行业&#xff0c;提供了异步解耦、集成、高性能和高可靠的核心价值。 过去一年&#xff0c;我们发布了云消息队列 ApsaraMQ 全系列产品 Serverless 化&#xff0c;面向…

Spring依赖注入方式

写在前面&#xff1a;大家好&#xff01;我是晴空๓。如果博客中有不足或者的错误的地方欢迎在评论区或者私信我指正&#xff0c;感谢大家的不吝赐教。我的唯一博客更新地址是&#xff1a;https://ac-fun.blog.csdn.net/。非常感谢大家的支持。一起加油&#xff0c;冲鸭&#x…

Mysql索引失效的场景

对索引列使用函数或表达式&#xff0c;或参与计算&#xff08;优化方法&#xff1a;将计算移到条件右侧&#xff1a;&#xff09;例 优化 对索引列进行隐式类型转换&#xff0c;条件中的数据类型与索引列的数据类型不匹配&#xff0c;会进行隐式类型转换 以like 通配符开头索…

CTFHub-RCE系列wp

目录标题 引言什么是RCE漏洞 eval执行文件包含文件包含php://input读取源代码远程包含 命令注入无过滤过滤cat过滤空格过滤目录分隔符过滤运算符综合过滤练习 引言 题目共有如下类型 什么是RCE漏洞 RCE漏洞&#xff0c;全称是Remote Code Execution漏洞&#xff0c;翻译成中文…

算法学习笔记之并查集

简介 问题描述&#xff1a;将编号为1-N的N个对象划分为不相交集合&#xff0c;在每个集合中&#xff0c;选择其中的某个元素代表所在集合。 常见两种操作&#xff1a; 1.合并两个集合 2.查找某元素属于哪个集合 实现方法1 用编号最小的元素标记所在集合&#xff1b; 定义…

渗透利器工具:Burp Suite 联动 XRAY 图形化工具.(主动扫描+被动扫描)

Burp Suite 联动 XRAY 图形化工具.&#xff08;主动扫描被动扫描&#xff09; Burp Suite 和 Xray 联合使用&#xff0c;能够将 Burp 的强大流量拦截与修改功能&#xff0c;与 Xray 的高效漏洞检测能力相结合&#xff0c;实现更全面、高效的网络安全测试&#xff0c;同时提升漏…

菌贝:云南鸡枞菌走向世界的第一品牌

云南&#xff0c;这片神奇的土地&#xff0c;孕育了无数珍稀的野生菌&#xff0c;而鸡枞菌无疑是其中的佼佼者。它以其独特的口感和丰富的营养价值&#xff0c;被誉为“菌中之王”。在云南鸡枞菌的品牌化进程中&#xff0c;菌贝以其卓越的品质和广泛的影响力&#xff0c;成为云…

如何恢复使用 Command+Option+Delete 删除的文件:完整指南

在日常使用 Mac 时&#xff0c;我们经常会使用 CommandOptionDelete 组合键来快速删除文件。这种删除方式会将文件直接移出废纸篓&#xff0c;而不会经过废纸篓的中间步骤&#xff0c;因此文件看似被永久删除。然而&#xff0c;即使文件被这样删除&#xff0c;仍然有几种方法可…

windows生成SSL的PFX格式证书

生成crt证书: 安装openssl winget install -e --id FireDaemon.OpenSSL 生成cert openssl req -x509 -newkey rsa:2048 -keyout private.key -out certificate.crt -days 365 -nodes -subj "/CN=localhost" 转换pfx openssl pkcs12 -export -out certificate.pfx…

用户认证综合实验

实验需求 需求一&#xff1a;根据下表&#xff0c;完成相关配置 需求二&#xff1a;配置DHCP协议&#xff0c;具体要求如下 需求三&#xff1a;防火墙安全区域配置 需求四&#xff1a;防火墙地址组信息 需求五&#xff1a;管理员 为 FW 配置一个配置管理员。要求管理员可以通…

5.7.2 进度管理

文章目录 进度管理Gantt图PERT图 进度管理 进度安排&#xff1a;通过将项目分解成多个活动&#xff0c;分析活动间的依赖关系&#xff0c;估算工作量&#xff0c;分配资源&#xff0c;制定活动时序。 Gantt图 Gantt图横坐标表示时间&#xff0c;纵坐标表示不同任务。使用一条条…

MQTT(Message Queuing Telemetry Transport)协议(二)

下面为你详细介绍如何基于 TCP 协议对 MQTT 进行封装&#xff0c;包括实现思路、代码示例及代码解释。 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h> #include <sys/socket.h>…

b站——《【强化学习】一小时完全入门》学习笔记及代码(1-3 多臂老虎机)

问题陈述 我们有两个多臂老虎机&#xff08;Multi-Armed Bandit&#xff09;&#xff0c;分别称为左边的老虎机和右边的老虎机。每个老虎机的奖励服从不同的正态分布&#xff1a; 左边的老虎机&#xff1a;奖励服从均值为 500&#xff0c;标准差为 50 的正态分布&#xff0c;即…

Unity 接入Tripo 文生模型,图生模型

官方网站&#xff1a;https://www.tripo3d.ai/app/home自行注册账号并且登陆下载Unity插件&#xff1a;https://cdn-web.tripo3d.ai/plugin/tripo-unity.zip申请apikey&#xff1a; https://platform.tripo3d.ai/api-keys使用&#xff08;后续过程就按照第二步下载的插件里面的…