Spark Structured Streaming使用教程

文章目录

    • 1、输入数据源
    • 2、输出模式
    • 3、sink输出结果
    • 4、时间窗口
      • 4.1、时间窗口
      • 4.2、时间水印(Watermarking)
    • 5、使用例子

Structured Streaming是一个基于Spark SQL引擎的可扩展和容错流处理引擎,Spark SQL引擎将负责增量和连续地运行它,并在流数据继续到达时更新最终结果。
Structured Streaming把持续不断的流式数据当做一个不断追加的表,这使得新的流处理模型与批处理模型非常相似。您将把流计算表示为在静态表上的标准批处理查询,Spark将其作为无界输入表上的增量查询运行。

1、输入数据源

  • File source - 以数据流的形式读取写入目录中的文件。文件将按照文件修改时间的先后顺序进行处理。如果设置了latestFirst,则顺序将相反。支持的文件格式为text, CSV, JSON, ORC, Parquet。请参阅DataStreamReader接口的文档,了解最新的列表,以及每种文件格式支持的选项。注意,监视目录的文件改变,只能是原子性的改变,比如把文件放入该目录,而不是持续写入该目录中的某个文件。
  • Kafka source - 从Kafka读取数据。它兼容Kafka代理版本0.10.0或更高版本。查看Kafka集成指南了解更多细节。
  • Socket source (用于测试) - 从套接字连接读取UTF8文本数据。监听服务器套接字位于驱动程序。请注意,这应该仅用于测试,因为它不提供端到端的容错保证。
  • Rate source (用于测试) - 以每秒指定的行数生成数据,每个输出行包含一个时间戳和值。其中timestamp为包含消息发送时间的timestamp类型,value为包含消息计数的Long类型,从0开始作为第一行。该源代码用于测试和基准测试。

2、输出模式

  • 我们可以定义每次结果表中的数据更新时,以何种方式,将哪些数据写入外部存储。有3种模式:
  • complete mode:所有数据都会被写入外部存储。具体如何写入,是根据不同的外部存储自身来决定的。
  • append mode:只有新的数据,以追加的方式写入外部存储。只有当我们确定,result table中已有的数据是肯定不会被改变时,才应该使用append mode。
  • update mode:只有被更新的数据,包括增加的和修改的,会被写入外部存储中。
aggDF
  .writeStream()
  .outputMode("complete")
  .format("console")
  .start();

3、sink输出结果

  • File sink - 将输出存储到一个目录。
    输出模式支持Append
writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
    .start()
  • Kafka sink - 将输出存储到Kafka中的一个或多个主题。
    输出模式支持Append, Update, Complete
writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "updates")
    .start()
  • Console sink (用于测试) - 每次有触发器时,将输出打印到控制台/标准输出。支持两种输出模式:Append和Complete。这应该用于在低数据量上进行调试,因为在每次触发后都会收集整个输出并将其存储在驱动程序的内存中。
    输出模式支持Append, Update, Complete
writeStream
    .format("console")
    .start()
  • Memory sink (用于测试) - 输出以内存表的形式存储在内存中。支持两种输出模式:Append和Complete。这应该用于低数据量的调试目的,因为整个输出被收集并存储在驱动程序的内存中。因此,请谨慎使用。
    输出模式支持Append, Complete
输出以内存表的形式存储在内存中。支持两种输出模式:Append和Complete。这应该用于低数据量的调试目的,因为整个输出被收集并存储在驱动程序的内存中。因此,请谨慎使用。
  • 自定义输出Foreach和ForeachBatch - Foreach:针对每条数据的输出;ForeachBatch:针对每批次的数据输出。
    输出模式支持Append, Update, Complete
// Foreach
streamingDatasetOfString.writeStream().foreach(
  new ForeachWriter<String>() {

    @Override public boolean open(long partitionId, long version) {
      // Open connection
    }

    @Override public void process(String record) {
      // Write string to connection
    }

    @Override public void close(Throwable errorOrNull) {
      // Close the connection
    }
  }
).start();

// ForeachBatch
streamingDatasetOfString.writeStream().foreachBatch(
  new VoidFunction2<Dataset<String>, Long>() {
    public void call(Dataset<String> dataset, Long batchId) {
      // Transform and write batchDF
    }    
  }
).start();

4、时间窗口

4.1、时间窗口

在业务场景中,经常会遇到按时间段进行聚合操作,Spark提供了基于滑动窗口的事件时间集合操作,每个时间段作为一个分组,并对每个组内的每行数据进行聚合操作。
在这里插入图片描述

可以使用groupBy()和window()操作来表示窗口聚合。

Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
  functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
  words.col("word")
).count();

4.2、时间水印(Watermarking)

WaterMarking的作用主要是为了解决:延迟到达的数据是否丢弃,系统可以删除过期的数据。
在这里插入图片描述

Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
    .withWatermark("timestamp", "10 minutes") // 延迟10分钟后到达的数据将会被丢弃
    .groupBy(
        window(col("timestamp"), "10 minutes", "5 minutes"),
        col("word"))
    .count();

5、使用例子

package com.penngo.spark;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.roaringbitmap.art.Art;

import java.io.Serializable;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.window;

public class SparkStructStream {
    private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    public static class DataTxt implements Serializable {
        private String text;
        private Timestamp time;

        public DataTxt(String text, LocalDateTime time) {
            this.text = text;
            this.time = Timestamp.valueOf(time);
        }
        public String getText() {
            return text;
        }
        public void setText(String text) {
            this.text = text;
        }
        public Timestamp getTime() {
            return time;
        }
        public void setTime(Timestamp time) {
            this.time = time;
        }
    }
    
    public static void socket(SparkSession spark) throws Exception{
        // 运行:nc -lk 9999
        Dataset<Row> lines = spark
                .readStream()
                .format("socket")
                .option("host", "localhost")
                .option("port", 9999)
                .load();
        Dataset<DataTxt> words = lines
                .as(Encoders.STRING())
                .map((MapFunction<String, DataTxt>) x -> {
                    String[] strs = x.split(",");

                    LocalDateTime date = LocalDateTime.parse(strs[1],formatter);
                    Arrays.asList(x.split(",")).iterator();
                    DataTxt data = new DataTxt(strs[0], date);
                    return data;
                }, Encoders.bean(DataTxt.class));

        Dataset<Row> wordCounts = words.toDF()
                .withWatermark("time", "10 minutes") // 延迟10分钟后到达的数据将会被丢弃
                .groupBy(
                        window(col("time"), "10 minutes", "5 minutes"),
                        col("text"))
                .count();


        wordCounts.writeStream().outputMode("append")
                .foreach(new ForeachWriter<Row>() {
                    @Override public boolean open(long partitionId, long version) {
//                        System.out.println("open==========partitionId:" + partitionId + ",version:" + version);
                        return true;
                    }

                    @Override public void process(Row record) {
                        // Write string to connection
                        System.out.println("recordxxxxxxxxxxxxxxxxxx:======" + record);

                    }

                    @Override public void close(Throwable errorOrNull) {
                        // Close the connection
//                        System.out.println("close==========errorOrNull:" + errorOrNull);
                    }
                })
//                .format("console")
                .start().awaitTermination();
    }

    public static void kafka(SparkSession spark) throws Exception{
        // Subscribe to 1 topic
        Dataset<Row> df = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "192.168.245.1:9092")
                .option("subscribe", "topic-news")
                .option("startingOffsets","latest")
                .option("maxOffsetsPerTrigger",1000)
                .load();
        
        df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
        df.printSchema();
        df.writeStream().outputMode("append")
                .format("console")
                .start().awaitTermination();
    }
    public static void main(String[] args) throws Exception{
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
        Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF);
        Logger.getLogger("org.apache.kafka").setLevel(Level.WARN);

        System.setProperty("hadoop.home.dir", "/usr/local/hadoop-3.3.6");
        System.setProperty("HADOOP_USER_NAME", "root");

        SparkSession spark = SparkSession
                .builder()
                .appName("SparkStructStream")
                .master("local[*]")
                .getOrCreate();

//        socket(spark);
        kafka(spark);
    }
}

参考自官方文档:https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/227059.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

从主从复制到哨兵模式(含Redis.config配置模板)

文章目录 前言一、主从复制1.概述2.作用3.模拟实践搭建场景模拟实践 二、哨兵模式1.概述2.配置使用3.优缺点4.sentinel.conf完整配置 总结 前言 从主从复制到哨兵模式。 一、主从复制 1.概述 主从复制&#xff0c;是指将一台 Redis 服务器的数据&#xff0c;复制到其他的 Red…

Smart Link和Monitor Link

Smart Link和Monitor Link简介 Smart Link&#xff0c;又叫做备份链路。一个Smart Link由两个接口组成&#xff0c;其中一个接口作为另一个的备份。Smart Link常用于双上行组网&#xff0c;提供可靠高效的备份和快速的切换机制。 Monitor Link是一种接口联动方案&#xff0c;它…

AMEYA360分析兆易创新GD32A490系列车规级MCU

兆易创新GigaDevice今日宣布&#xff0c;正式推出全新GD32A490系列高性能车规级MCU&#xff0c;以高主频、大容量、高集成和高可靠等优势特性紧贴汽车电子开发需求&#xff0c;适用于车窗、雨刷、智能车锁、电动座椅等BCM车身控制系统&#xff0c;以及仪表盘、娱乐影音、中控导…

Google Bard vs. ChatGPT 4.0:文献检索、文献推荐功能对比

在这篇博客中&#xff0c;我们将探讨和比较四个不同的人工智能模型——ChatGPT 3.5、ChatGPT 4.0、ChatGPT 4.0插件和Google Bard。我们将通过三个问题的测试结果来评估它们在处理特定任务时的效能和响应速度。 导航 问题 1: 统计自Vehicle Routing Problem (VRP)第一篇文章发…

Android 11.0 MTK Camera2 设置默认拍照尺寸功能实现

1.前言 在11.0的系统rom定制化开发中,在mtk平台的camera2关于拍照的一些功能修改中,在一些平台默认需要设置最大的分辨率 来作为拍照的分辨率,所以就需要了解拍照尺寸设置流程,然后来实现相关的功能 如图: 2.MTK Camera2 设置默认拍照尺寸功能实现的核心类 \vendor\me…

[数据集][目标检测]拉横幅识别横幅检测数据集VOC+yolo格式1962张1类别

数据集格式&#xff1a;Pascal VOC格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;1962 标注数量(xml文件个数)&#xff1a;1962 标注数量(txt文件个数)&#xff1a;1962 标注类别数&a…

一天一个设计模式---原型模式

基本概念 原型模式&#xff08;Prototype Pattern&#xff09;是一种创建型设计模式&#xff0c;其主要目的是通过复制现有对象来创建新对象&#xff0c;而不是通过实例化类。原型模式允许在运行时动态创建对象&#xff0c;同时避免了耦合与子类化。 在原型模式中&#xff0…

Django模板,Django中间件,ORM操作(pymysql + SQL语句),连接池,session和cookie, 缓存

day04 django进阶-知识点 今日概要&#xff1a; 模板中间件ORM操作&#xff08;pymysql SQL语句&#xff09;session和cookie缓存&#xff08;很多种方式&#xff09; 内容回顾 请求周期 路由系统 最基本路由关系动态路由&#xff08;含正则&#xff09;路由分发不同的app中…

视频相似度对比 python opencv sift flann

提取SIFT特征的代码&#xff0c;返回关键点kp及特征描述符des def SIFT(frame):# 创建SIFT特征提取器sift cv2.xfeatures2d.SIFT_create()# 提取SIFT特征kp, des sift.detectAndCompute(frame, None)return kp, des 这行代码是使用SIFT&#xff08;Scale-Invariant Feature…

Go--协程

协程 协程是Go语言最大的特色之一。 1、协程的概念 协程并不是Go发明的概念&#xff0c;支持协程的变成语言有很多。Go在语言层面直接提供对协程的支持称为goroutine。 1.1 基本概念 进程 进程是应用程序启动的实例&#xff0c;每个进程都有独立的内存空间&#xff0c;不同…

WEB组态编辑器(BY组态)介绍

BY组态是一款非常优秀的纯前端的【web组态插件工具】&#xff0c;可无缝嵌入到vue项目&#xff0c;react项目等&#xff0c;由于是原生js开发&#xff0c;对于前端的集成没有框架的限制。同时由于BY组态只是一个插件&#xff0c;不能独立运行&#xff0c;必须嵌入到你方软件平台…

如何在报表工具 FastReport Cloud 中使用 ClickHouse

FastReport Cloud 是一项云服务 (SaaS)&#xff0c;旨在为您的企业存储、编辑、构建和发送报告。您的整个团队可以从世界任何地方访问这些报告&#xff0c;并且无需创建自己的应用程序。 FastReport Cloud 试用&#xff08;qun&#xff1a;585577353&#xff09;https://chat8.…

高翔《自动驾驶与机器人中的SLAM技术》第九、十章载入静态地图完成点云匹配重定位

修改mapping.yaml文件中bag_path&#xff1a; 完成之后会产生一系列的点云文件以及Keyframe.txt文件&#xff1a; ./bin/run_frontend --config_yaml ./config/mapping 生成拼接的点云地图map.pcd文件 &#xff1a; ./bin/dump_map --pose_sourcelidar 。、 完成第一次优…

数据清洗、特征工程和数据可视化、数据挖掘与建模的主要内容

1.4 数据清洗、特征工程和数据可视化、数据挖掘与建模的内容 视频为《Python数据科学应用从入门到精通》张甜 杨维忠 清华大学出版社一书的随书赠送视频讲解1.4节内容。本书已正式出版上市&#xff0c;当当、京东、淘宝等平台热销中&#xff0c;搜索书名即可。内容涵盖数据科学…

JVM 虚拟机(二)类的生命周期

类的声明周期描述了一个类加载、使用和卸载的整个过程。 一个类的声明周期包括五个阶段&#xff1a;加载、连接、初始化、使用、卸载&#xff0c;其中连接部分分为验证、准备和解析阶段。 加载阶段 加载阶段是第一步是类加载器根据类的全限定名通过不同的渠道以二进制流的方式…

CAN总线协议编程实例

1. can.h #ifndef __CAN_H #define __CAN_H#include "./SYSTEM/sys/sys.h"/******************************************************************************************/ /* CAN 引脚 定义 */#define CAN_RX_GPIO_PORT GPIOA #define CAN_RX_GPI…

【Maven】未找到有效的 Maven 安装。在配置对话框中设置主目录,或者在系统上设置 M2_HOME 环境变量。

错误显示 今天导入工程&#xff0c;进行clean的时候报错&#xff1a; 解决方法 重新设置一下maven的目录即可

微机原理14

一、单项选择题(本大题共15小题,每小题3分,共45分。在每小题给出的四个备选项中选出一个正确的答案,请将选定的答案填涂在答题纸的相应位置上。) 字符’A’的 ASCI 码是&#xff08;&#xff09; A. OAH B. 41H C. 61H D. OAOH 2, 8086微处理器的地址线有(&#xff09; A. 16条…

RabbitMQ-学习笔记(初识 RabbitMQ)

本篇文章学习于 bilibili黑马 的视频 (狗头保命) 同步通讯 & 异步通讯 (RabbitMQ 的前置知识) 同步通讯&#xff1a;类似打电话&#xff0c;只有对方接受了你发起的请求,双方才能进行通讯, 同一时刻你只能跟一个人打视频电话。异步通讯&#xff1a;类似发信息&#xff0c…

根据既定数组创建数组的方法汇总 (第3讲)

根据既定数组创建数组的方法 &#xff08;第3讲&#xff09;         &#x1f379;博主 侯小啾 感谢您的支持与信赖。☀️ &#x1f339;꧔ꦿ&#x1f339;꧔ꦿ&#x1f339;꧔ꦿ&#x1f339;꧔ꦿ&#x1f339;꧔ꦿ&#x1f339;꧔ꦿ&#x1f339;꧔ꦿ&#x1f339;꧔…