Structured-Streaming初识

一、概览

Structured Streaming是一个基于SparkSQL引擎构建的可扩展且容错的流处理引擎。可以像在静态数据上表达批量计算一样表达流计算。SparkSQL引擎将负责以增量方式连续运行它,并在流数据继续到达时更新最终结果。可以使用Scala、Java、Python或R中的Dataset/DataFrame API来进行流聚合、事件时间窗口、流到批处理连接等的开发。它基于SparkSQL引擎优化而执行。最后,系统通过 checkpoint 和预写日志确保端到端的精确一次容错保证。简而言之,Structured Streaming提供快速、可扩展、容错、端到端的精确一次流处理,而无需用户对流进行推理。

默认, Structured Streaming使用微批次处理作业引擎进行处理,该引擎将数据流作为一系列小批次作业进行处理,从而实现低至100毫秒的端到端延迟和精确一次的容错保证。自Spark 2.3以来,又引入了一种新的低延迟处理模式,称为连续处理,它可以实现低至1毫秒的端到端延迟,并提供至少保证一次的语义。

二、官方示例

我们还是以从socket接收文本数据进行word count统计来了解它的工作原理。代码如下:

object StructuredNetworkWordCount {
  def main(args: Array[String]): Unit = {
    if (args.length < 2) {
      System.err.println("Usage: StructuredNetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    val host = args(0)
    val port = args(1).toInt

    val spark = SparkSession
      .builder
      .appName("StructuredNetworkWordCount")
      .getOrCreate()

    import spark.implicits._

    // 创建表示从连接到主机端口的输入行流的DataFrame
    val lines = spark.readStream
      .format("socket")
      .option("host", host)
      .option("port", port)
      .load()

    // 将一行文本按空格分割
    val words = lines.as[String].flatMap(_.split(" "))

    // 分组统计
    val wordCounts = words.groupBy("value").count()

    // 启动运行并将结果输出到控制台
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

    query.awaitTermination()
  }
}

可以看到里面用到了spark sql 来进行流式处理,我们看下该官方示例在源码中的位置也就明白了

三、编程模型

Structured Streaming 的关键思想是将实时数据流视为不断追加的表。它是一种类似Spark Streaming(微批处理模型)的新流式处理模型。这使我们可以像查询表一样来处理流式数据。

1、基本概念

将输入数据流视为“输入表”。到达流中的每个数据项就像对输入表做追加操作。每个微小的间隔都会最加到输入表中。

在输入表中进行查询会得到结果表。随着输入表的增加,结果表也会增加或改变。最终将其写入外部系统或输出。

输出模式有以下三种:

  1. Complete模式:将结果表整体写入外部系统
  2. Append模式:只将处理完的当下微批次结果写入外部系统
  3. Update模式:只将处理完的当下微批次且有更新的数据结果写入外部系统

我们用第二部分的示例代码来说明下:

linesDataFrame是输入表

wordCountsDataFrame是结果表

对流式linesDataFrame生成wordCounts的查询与静态DataFrame完全相同。但是,当这个查询启动时,Spark将不断检查来自套接字连接的新数据。如果有新数据,Spark将运行一个“增量”查询,将以前的运行计数与新数据结合起来计算更新的计数,如下所示:

请注意,Structured Streaming 不会物化整个表。它从流数据源中读取最新的可用数据,增量处理以更新结果,然后丢弃源数据。它只保留更新结果所需的最小中间状态数据(例如前面示例中的中间计数)。

2、处理事件时间和延迟数据

从示例中可以发现Structured Streaming 并没有指出每个批次的间隔时间,

因为Structured Streaming 使用的是嵌入在数据本身中的时间。对于许多应用程序,您可能希望对这个事件时间进行操作。例如,如果想获取IoT设备每分钟生成的事件数量,那么可能希望使用数据生成的时间(即数据中的事件时间),而不是Spark接收它们的时间。这个事件时间在这个模型中表达得非常自然,因为把它作为了数据的一部分,也就是表中的一列。

这使得基于窗口的聚合(例如每分钟的事件数量)只是事件时间列上的一种特殊类型的分组和聚合——每个时间窗口是一个组,每行可以属于多个窗口/组。因此,这种event-time-window-based聚合查询可以在静态数据集和数据流上一致定义。

因此,该模型可以处理预期晚到达的数据。由于Spark正在更新结果表,它可以完全控制在有延迟数据时更新旧聚合,以及清理旧聚合以限制中间状态数据的大小。从Spark 2.1开始,我们支持水印,允许用户指定延迟数据的阈值,并允许引擎相应地清理旧状态。

四、运行官方示例

运行Netcat

nc -lk 9999

新建一个窗口运行官方示例

cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/

bin/run-example sql.streaming.StructuredNetworkWordCount cdh1 9999

 

 他和sparksql一样默认的分区为200个,如果数据量很小,速度非常慢。需要根据数据量来设置自己的分区数。


大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:

第四届大数据、信息与计算机网络国际学术会议(BDICN 2025)

  • 广州
  • https://ais.cn/u/fi2yym

第四届电子信息工程、大数据与计算机技术国际学术会议(EIBDCT 2025)

  • 青岛
  • https://ais.cn/u/nuQr6f

第六届大数据与信息化教育国际学术会议(ICBDIE 2025)

  • 苏州
  • https://ais.cn/u/eYnmQr

第三届通信网络与机器学习国际学术会议(CNML 2025)

  • 南京
  • https://ais.cn/u/vUNva2

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/944197.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Gradio全解系列——Additional Features:附加功能(上)

Gradio全解系列——Additional Features&#xff1a;附加功能&#xff08;上&#xff09; 前言本篇摘要10. Additional Features&#xff1a;附加功能10.1 队列10.1.1 使用方法10.1.2 配置队列 10.2 流输入输出10.2.1 流输出1. 生成器yield2. 流媒体 10.2.2 流输入1. 流事件2. …

TestMAX/DFT Compiler:时序单元的类型、连接顺序和后DFT优化

相关阅读 TestMAX/DFT Compilerhttps://blog.csdn.net/weixin_45791458/category_12865937.html?spm1001.2014.3001.5482 时序单元的状态 未映射的时序单元(Unmapped Sequential Cell) 在Design Compiler读取了一个RTL设计后&#xff0c;Design Compiler内置的HDL Compiler工…

Cocos Creator 3.8.5 正式发布,更小更快更多平台!

在 Cocos Creator 3.8.5 版本中&#xff0c;我们做了新一轮的优化。 在加载速度、代码裁剪、平台增强等多方面做了优化&#xff0c;提升了开发者体验和游戏性能。 希望能够助 Cocos 开发者们的产品更上一层楼。 一、加载速度优化 1、WASM 模块延迟加载 在早期版本中&#xff0c…

跨语言数据格式标准化在 HarmonyOS 开发中的实践

文章目录 前言数据格式标准化的意义数据传递中的痛点标准化的优势 JSON 与 Protocol Buffers 的比较JSONProtocol Buffers HarmonyOS 跨语言数据传递示例示例代码&#xff1a;定义 Protocol Buffers 消息格式生成 Java 和 C 代码示例代码&#xff1a;Java 端序列化与传递数据C …

【有作图代码】多尺度动力学模型:像“显微镜与望远镜的结合”,揭示微观分子运动与宏观流体流动的奥秘

【有作图代码】多尺度动力学模型&#xff1a;像“显微镜与望远镜的结合”&#xff0c;揭示微观分子运动与宏观流体流动的奥秘 具体实例与推演 假设我们有一个流体系统&#xff0c;其中微观尺度上分子间的相互作用可以通过分子动力学方程描述&#xff0c;而宏观尺度上流体的流…

工具变量笔记

补充知识 简单介绍工具变量 假设 Y i α β D i ϵ i Y_i\alpha\beta D_i\epsilon_i Yi​αβDi​ϵi​, where E ( ϵ i ∣ D i ) 0 E(\epsilon_i\mid D_i)0 E(ϵi​∣Di​)0. 但是通常这个条件不满足。于是假如有这样一个工具变量 Z i Z_i Zi​存在的话&#xff0c;满…

通过 Ansys Electronics Desktop 中的高级仿真优化 IC 设计

半导体行业继续通过日益复杂的集成电路 (IC) 设计突破技术界限。随着工艺节点缩小和电路密度达到前所未有的水平&#xff0c;电磁效应对设备性能和可靠性变得越来越重要。现代 IC 设计面临着来自复杂的布局相关耦合机制、信号完整性问题和功率分布问题的挑战&#xff0c;这些问…

Yocto 项目中的交叉编译:原理与实例

Yocto 项目是一个强大的工具集&#xff0c;它专注于为嵌入式系统生成定制的 Linux 发行版。交叉编译在 Yocto 项目中扮演着核心角色&#xff0c;它使得开发者能够在功能强大的宿主机上构建适用于资源受限目标设备的软件系统。这篇文章将从运行原理、实际案例和工具链组成等角度…

WPF 绘制过顶点的圆滑曲线(样条,贝塞尔)

项目中要用到样条曲线&#xff0c;必须过顶点&#xff0c;圆滑后还不能太走样&#xff0c;捣鼓一番&#xff0c;发现里面颇有玄机&#xff0c;于是把我多方抄来改造的方法发出来&#xff0c;方便新手&#xff1a; 如上图&#xff0c;看代码吧&#xff1a; -------------------…

谷粒商城-高级篇-秒杀业务

1、后台添加秒杀商品 1、配置网关 - id: coupon_routeuri: lb://gulimall-couponpredicates:- Path/api/coupon/**filters:- RewritePath/api/(?<segment>.*),/$\{segment} 2、每日秒杀关联商品功能实现 点击关联商品后&#xff0c;应该查询当前场次的所有商品 点击关…

JuOne核心模块揭秘:从智能硬件到Web3生态的完美连接

JuOne核心模块揭秘&#xff1a;从智能硬件到Web3生态的完美连接在全球数字经济的浪潮中&#xff0c;Web3 正以前所未有的速度重塑我们的生活方式、商业模式和价值创造体系。它不仅仅是互联网的下一阶段&#xff0c;更是一场关于未来的革命。去中心化、用户主权、价值互联&#…

Kafka高性能设计

高性能设计概述 Kafka高性能是多方面协同的结果&#xff0c;包括集群架构、分布式存储、ISR数据同步及高效利用磁盘和操作系统特性等。主要体现在消息分区、顺序读写、页缓存、零拷贝、消息压缩和分批发送六个方面。 消息分区 存储不受单台服务器限制&#xff0c;能处理更多数据…

若依框架之简历pdf文档预览功能

一、前端 &#xff08;1&#xff09;安装插件vue-pdf&#xff1a;npm install vue-pdf &#xff08;2&#xff09;引入方式&#xff1a;import pdf from "vue-pdf"; &#xff08;3&#xff09;components注入方式&#xff1a;components:{pdf} &#xff08;4&…

【社区投稿】自动特征auto trait的扩散规则

自动特征auto trait的扩散规则 公式化地概括&#xff0c;auto trait marker trait derived trait。其中&#xff0c;等号右侧的marker与derived是在Rustonomicon书中的引入的概念&#xff0c;鲜见于Rust References。所以&#xff0c;若略感生僻&#xff0c;不奇怪。 marker …

Elasticsearch检索之三:官方推荐方案search_after检索实现(golang)

Elasticsearch8.17.0在mac上的安装 Kibana8.17.0在mac上的安装 Elasticsearch检索方案之一&#xff1a;使用fromsize实现分页 快速掌握Elasticsearch检索之二&#xff1a;滚动查询(scrool)获取全量数据(golang) 1、search_after检索 在前面的文章介绍了fromsize的普通分页…

精读DeepSeek v3技术文档的心得感悟

最近宋大宝同学读完了DeepSeekv3的文档&#xff0c;心中颇多感慨&#xff0c;忍不住想在这里记录一下对这款“业界有望启示未来低精度训练走向”的开源大模型的观察与思考。DeepSeek v3的亮点绝不仅仅是“Float8”或“超长上下文”这么简单&#xff0c;而是贯穿了从数值精度、注…

WAV文件双轨PCM格式详细说明及C语言解析示例

WAV文件双轨PCM格式详细说明及C语言解析示例 一、WAV文件双轨PCM格式详细说明1. WAV文件基本结构2. PCM编码方式3. 双轨PCM格式详细说明二、C语言解析WAV文件的代码示例代码说明一、WAV文件双轨PCM格式详细说明 WAV文件是一种用于存储未压缩音频数据的文件格式,广泛应用于音频…

Day1 微服务 单体架构、微服务架构、微服务拆分、服务远程调用、服务注册和发现Nacos、OpenFeign

目录 1.导入单体架构项目 1.1 安装mysql 1.2 后端 1.3 前端 2.微服务 2.1 单体架构 2.2 微服务 2.3 SpringCloud 3.微服务拆分 3.1 服务拆分原则 3.1.1 什么时候拆 3.1.2 怎么拆 3.2 拆分购物车、商品服务 3.2.1 商品服务 3.2.2 购物车服务 3.3 服务调用 3.3.1 RestTemplate 3.…

DeepSpeed 使用 LoRA 训练后文件结构详解

DeepSpeed 使用 LoRA 训练后文件结构详解 在大语言模型&#xff08;LLM&#xff09;的训练过程中&#xff0c;DeepSpeed 提供了强大的分布式训练能力&#xff0c;而 LoRA&#xff08;Low-Rank Adaptation&#xff09;通过参数高效微调技术显著减少了资源占用。完成训练后&…

Llama 3 预训练(二)

目录 3. 预训练 3.1 预训练数据 3.1.1 网络数据筛选 PII 和安全过滤 文本提取与清理 去重&#xff08;De-duplication&#xff09; 启发式过滤&#xff08;Heuristic Filtering&#xff09; 基于模型的质量过滤 代码和数学推理数据处理 多语言数据处理 3.1.2 确定数…