SparkStreaming之04:调优

SparkStreaming调优

一 、要点

4.1 SparkStreaming运行原理

在这里插入图片描述

深入理解

在这里插入图片描述

4.2 调优策略

4.2.1 调整BlockReceiver的数量

在这里插入图片描述

案例演示:

object MultiReceiverNetworkWordCount {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val sc = new SparkContext(sparkConf)

    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sc, Seconds(5))

    //创建多个接收器(ReceiverInputDStream),这个接收器接收一台机器上的某个端口通过socket发送过来的数据并处理
    val lines1 = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)

    val lines2 = ssc.socketTextStream("master", 9997, StorageLevel.MEMORY_AND_DISK_SER)

    val lines = lines1.union(lines2)

    lines.repartition(100)

    //处理的逻辑,就是简单的进行word count
    val words = lines.repartition(100).flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10))

    //将结果输出到控制台
    wordCounts.print()

    //启动Streaming处理流
    ssc.start()

    //等待Streaming程序终止
    ssc.awaitTermination()

    ssc.stop(false)
  }
}
⭐️4.2.2 调整Block的数量

batchInterval : 触发批处理的时间间隔
blockInterval :将接收到的数据生成Block的时间间隔,spark.streaming.blockInterval(默认是200ms),那么,BlockRDD的分区数 = batchInterval / blockInterval,即一个Block就是RDD的一个分区,就是一个task
比如,batchInterval是2秒,而blockInterval是200ms,那么task数为10,如果task的数量太少,比一个executor的core数还少的话,那么可以减少blockInterval,blockInterval最好不要小于50ms,太小的话导致task数太多,那么launch task的时间久多了

4.2.3 调整Receiver的接受速率

pps:permits per second 每秒允许接受的数据量(QPS -> queries per second)
Spark Streaming默认的PPS是没有限制的,可以通过参数spark.streaming.receiver.maxRate来控制,默认是Long.Maxvalue

⭐️4.2.3 调整数据处理的并行度

BlockRDD的分区数

a. 通过Receiver接受数据的特点决定

b. 也可以自己通过repartition设置

ShuffleRDD的分区数

a. 默认的分区数为spark.default.parallelism(core的大小)

b. 通过我们自己设置决定

val wordCounts = words.map(x => (x, 1)).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10))
4.2.4 数据的序列化

SparkStreaming两种需要序列化的数据:
a. 输入的数据:默认是以StorageLevel.MEMORY_AND_DISK_SER_2的形式存储在executor上的内存中
b. 缓存的数据:默认是以StorageLevel.MEMORY_ONLY_SER的形式存储的内存中
使用Kryo序列化机制,比Java序列化机制性能好

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
4.2.5 内存调优
(1)需要内存大小

和transformation的类型有关,如果使用的是updateStateByKey,Window这样的算子,那么内存就要设置得偏大

(2)数据存储级别

如果把接收到的数据设置的存储级别是MEMORY_DISK这种级别,也就是说如果内存不够可以把数据存储到磁盘上,其实性能还是不好的,性能最好的就是所有的数据都在内存里面,所以如果在资源允许的情况下,把内存调大一点,让所有的数据都存在内存里面。

4.2.6 Outout性能
(1)MySQL,HBase

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

(2)Kafka(0.8版本)

虽然现在的Kafka的版本已经到2.x版本了,但是很多公司因为历史遗留的原因,公司里面还是会有0.8x的Kafka。比如本人公司里面有两个Kafka集群,一个是0.8x的kafka,一个是1.x的Kafka。开发的时候有时候需要我们使用SparkStreaming做实时的ETL,然后再把数据打回Kafka,0.8版本的kafka默认是没有批量提交的功能的。本人公司里面一个真实的案例,一位同学写的SparkStreaming程序将数据处理完了以后通过ForeachRDD把数据写回到0.8Kafka。但是数据处理得很慢,经常会收到延时告警。最终发现他把数据写到Kafka的时候是一条数据一条数据提交的性能很差。最终手动实现了批量提交的功能。从此再也没有收到过告警。

4.2.7 Backpressure(压力反馈)

在这里插入图片描述
在这里插入图片描述

Feedback Loop : 动态使得Streaming app从unstable状态回到stable状态

在这里插入图片描述

从Spark1.5版本开始:spark.streaming.backpressure.enabled = true

4.2.8 Elastic Scaling(资源动态分配)

动态分配资源:

批处理动态的决定这个application中需要多少个Executors:

  1. 当一个Executor空闲的时候,将这个Executor杀掉
  2. 当task太多的时候,动态的启动Executors

Streaming分配Executor的原则是比对 process time / batchInterval 的比率

在这里插入图片描述

如果延迟了,那么就自动增加资源

在这里插入图片描述

在这里插入图片描述

从Spark2.0有这个功能:: spark.streaming.dynamicAllocation.enabled = true

⭐️4.2.8 数据倾斜调优(重要)

因为SparkStreaming的底层就是RDD,之前SparkCore的所有的数据倾斜的调优策略(见Spark之数据倾斜调优)都适合于SparkStreaming,需要灵活掌握,在实际开发的工作当中用得频率较高。

二 、总结

面试问题:你在工作当中有SparkStreaming调优过项目吗?怎么调优的?效果怎么样?

  1. 比如举foreachRDD的例子
  2. 比如举个数据倾斜的例子
  3. 用Xmind整理调优的策略

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/981300.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Jenkins 删除历史构建记录

中文:系统管理 > 脚本命令行: 英文:Manage Jenkins > Script Console def jobName "Wens-Web" //删除的项目名称 def maxNumber 105 // 保留的最小编号,意味着小于该编号的构建都将被删除 Jenkins.instance.getItemByFullName(jobName).build…

全国青少年航天创新大赛各项目对比分析

全国青少年航天创新大赛各项目对比分析 一、比赛场地对比 项目名称场地尺寸场地特点组别差异筑梦天宫虚拟三维场景动态布局,小学组3停泊处,初高中组6停泊处;涉及传送带、机械臂、传感器等虚拟设备。初中/高中组任务复杂度更高,运…

探秘 Linux 系统编程:进程地址空间的奇妙世界

亲爱的读者朋友们😃,此文开启知识盛宴与思想碰撞🎉。 快来参与讨论💬,点赞👍、收藏⭐、分享📤,共创活力社区。 在 Linux 系统编程的领域里,进程地址空间可是个相当重要的…

vue videojs使用canvas截取视频画面

前言 刚开始做的时候太多坑,导致一直报错: Uncaught (in promise) TypeError: Failed to execute ‘drawImage’ on ‘CanvasRenderingContext2D’: The provided value is not of type ‘(CSSImageValue or HTMLCanvasElement or HTMLImageElement or H…

防火墙旁挂组网双机热备负载均衡

一,二层交换网络: 使用MSTPVRRP组网形式 VLAN 2--->SW3为主,SW4 作为备份 VLAN 3--->SW4为主,SW3 作为备份 MSTP 设计 --->SW3 、 4 、 5 运行 实例 1 : VLAN 2 实例 2 : VLAN 3 SW3 是实例 1 的主根,实…

记忆化搜索与动态规划:原理、实现与比较

记忆化搜索和动态规划是解决优化问题的两种重要方法,尤其在处理具有重叠子问题和最优子结构性质的问题时非常有效。 目录 1. 记忆化搜索(Memoization) 定义: 实现步骤: 示例代码(斐波那契数列&#xff0…

《几何原本》命题I.9

《几何原本》命题I.9 一个角可以切分成两个相等的角。 设 ∠ E A F \angle EAF ∠EAF 为给定角 在 A E , A F AE,AF AE,AF 上任取两点 B , C B,C B,C 使得 A B A C ABAC ABAC 连结 B C BC BC 在 A A A 下方作等边三角形 B C D BCD BCD 则 A B A C , B D C D , A D…

docker-compose安装anythingLLM

1、anythingLLM的docker-compose文件 version: 3.8 services:anythingllm:image: mintplexlabs/anythingllm:latestcontainer_name: anythingllmports:- "23001:3001"cap_add:- SYS_ADMINenvironment:# Adjust for your environment- STORAGE_DIR/app/server/storage…

DeepSeek 助力 Vue3 开发:打造丝滑的表格(Table)示例2: 分页和排序

前言:哈喽,大家好,今天给大家分享一篇文章!并提供具体代码帮助大家深入理解,彻底掌握!创作不易,如果能帮助到大家或者给大家一些灵感和启发,欢迎收藏+关注哦 💕 目录 DeepSeek 助力 Vue3 开发:打造丝滑的表格(Table)示例2: 分页和排序📚前言📚页面效果📚指令…

SQL命令详解之多表查询(连接查询)

目录 1 简介 2 内连接查询 2.1 内连接语法 2.2 内连接练习 3 外连接查询 3.1 外连接语法 3.2 外连接练习 4 总结 1 简介 连接的本质就是把各个表中的记录都取出来依次匹配的组合加入结果集并返回给用户。我们把 t1 和 t2 两个表连接起来的过程如下图所示: …

二、QT和驱动模块实现智能家居-----问题汇总1

1、文件地址改变后必须在QT下更改地址 2、指定了QT内Kits下的Sysroot头文件地址,但是还是找不到头文件: 3、提示无法执行QT程序:先干掉之前的QT程序 ps //查看程序PIDkill -9 PID 4、无法执行QT程序 1)未设置环境变量 …

CentOS7安装Mysql5.7(ARM64架构)

1.第一步:下载 arm 版本离线 mysql 5.7 安装包 arm 版本离线 mysql 5.7 安装包 2.第二步:查询并卸载 CentOS 自带的数据库 Mariadb 找到数据库 mariadb,如果有会给出一个结果,结果是 mariadb 名称 rpm -qa | grep mariadb 如果…

AI数字人口播源码开发全解析

——源码即未来:揭秘千亿级市场的技术底层逻辑 一、为什么源码开发是数字人赛道的“核武器”? 2025年全球AI数字人市场规模预计突破6402.7亿元,而源码开发能力正成为企业竞争的核心壁垒。与标准化SaaS工具相比,源码开发赋予三大…

Versal - XRT(CPP) 2024.1

目录 1.简介 2. XRT 2.1 XRT vs OpenCL 2.2 Takeways 2.3 XRT C APIs 2.4 Device and XCLBIN 2.5 Buffers 2.5.1 Buffer 创建 2.5.1.1 普通 Buffer 2.5.1.2 特殊 Buffer 2.5.1.3 用户指针 Buffer 2.5.2 Data Transfer 2.5.2.1 read/write API 2.5.2.2 map API 2…

GPPT: Graph Pre-training and Prompt Tuning to Generalize Graph Neural Networks

GPPT: Graph Pre-training and Prompt Tuning to Generalize Graph Neural Networks KDD22 推荐指数:#paper/⭐⭐#​ 动机 本文探讨了图神经网络(GNN)在迁移学习中“预训练-微调”框架的局限性及改进方向。现有方法通过预训练&#xff08…

微信小程序上如何使用图形验证码

1、php服务器生成图片验证码的代码片段如下: 注意红框部分的代码,生成的是ArrayBuffer类型的二进制图片 2、显示验证码 显示验证码,不要直接image组件加上src显示,那样拿不到cookie,没有办法做图形验证码的验证&…

MAX232数据手册:搭建电平转换桥梁,助力串口稳定通信

在现代电子设备的通信领域,串口通信因其简单可靠而被广泛应用。MAX232 芯片作为串口通信中的关键角色,发挥着不可或缺的作用。下面,我们将依据提供的资料,深入解读 MAX232 芯片的各项特性、参数以及应用要点。 一、引脚说明 MAX2…

HTTP 与 HTTPS 协议:从基础到安全强化

引言 互联网的消息是如何传递的? 是在路由器上不断进行跳转 IP的目的是在寻址 HTTP 协议:互联网的基石 定义 HTTP(英文:HyperText Transfer Protocol,缩写:HTTP),即超文本传输协…

记录linux安装mysql后链接不上的解决方法

首先确保是否安装成功 systemctl status mysql 如果没有安装的话,执行命令安装 sudo apt install mysql-server 安装完成后,执行第一步检测是否成功。 通常初始是没有密码的,直接登陆 sudo mysql -u root 登录后执行以下命令修改密码&…

精讲坐标轴系统(Axis)

续前文: 保姆级matplotlib教程:详细目录 保姆级seaborn教程:详细目录 seaborn和matplotlib怎么选,还是两个都要学? 详解Python matplotlib深度美化(第一期) 详解Python matplotlib深度美化&…