Spark(39):Streaming DataFrame 和 Streaming DataSet 输出

目录

0. 相关文章链接

1. 输出的选项

2. 输出模式(output mode)

2.1. Append 模式(默认)

2.2. Complete 模式

2.3. Update 模式

2.4. 输出模式总结

3. 输出接收器(output sink)

3.1. file sink

3.2. kafka sink

3.2.1. 以 Streaming 方式输出数据

3.2.2. 以 batch 方式输出数据

3.3. console sink

3.4. memory sink

3.5. foreach sink

3.6. ForeachBatch Sink


0. 相关文章链接

 Spark文章汇总 

1. 输出的选项

一旦定义了最终结果DataFrame / Dataset,剩下的就是开始流式计算。为此,必须使用返回的 DataStreamWriter Dataset.writeStream()。

需要指定一下选项:

  • 输出接收器的详细信息:数据格式,位置等。
  • 输出模式:指定写入输出接收器的内容。
  • 查询名称:可选,指定查询的唯一名称以进行标识。
  • 触发间隔:可选择指定触发间隔。如果未指定,则系统将在前一处理完成后立即检查新数据的可用性。如果由于先前的处理尚未完成而错过了触发时间,则系统将立即触发处理。
  • 检查点位置:对于可以保证端到端容错的某些输出接收器,请指定系统写入所有检查点信息的位置。这应该是与HDFS兼容的容错文件系统中的目录。

2. 输出模式(output mode)

2.1. Append 模式(默认)

        默认输出模式, 仅仅添加到结果表的新行才会输出。采用这种输出模式, 可以保证每行数据仅输出一次。在查询过程中, 如果没有使用 watermask 机制, 则不能使用聚合操作。 如果使用了 watermask 机制, 则只能使用基于 event-time 的聚合操作。watermask 用于高速 append 模式如何输出不会再发生变动的数据。 即只有过期的聚合结果才会在 Append 模式中被“有且仅有一次”的输出。

2.2. Complete 模式

每次触发, 整个结果表的数据都会被输出。 仅仅聚合操作才支持。同时该模式使用 watermask 无效。

2.3. Update 模式

        该模式在 从 spark 2.1.1 可用. 在处理完数据之后, 该模式只输出相比上个批次变动的内容(新增或修改)。如果没有聚合操作, 则该模式与 append 模式一样。如果有聚合操作, 则可以基于 watermast 清理过期的状态。

2.4. 输出模式总结

不同的查询支持不同的输出模式

3. 输出接收器(output sink)

spark 提供了几个内置的 output-sink,不同 output sink 所适用的 output mode 不尽相同:

SinkSupported Output ModesOptionsFault-tolerantNotes
File SinkAppendpath: path to the output directory, must be specified. For file-format-specific options, see the related methods in DataFrameWriter (Scala/Java/Python/R). E.g. for “parquet” format options see DataFrameWriter.parquet()Yes (exactly-once)Supports writes to partitioned tables. Partitioning by time may be useful.
Kafka SinkAppend, Update, CompleteSee the Kafka Integration GuideYes (at-least-once)More details in the Kafka Integration Guide
Foreach SinkAppend, Update, CompleteNoneDepends on ForeachWriter implementationMore details in the next section
ForeachBatch SinkAppend, Update, CompleteNoneDepends on the implementationMore details in the next section
Console SinkAppend, Update, CompletenumRows: Number of rows to print every trigger (default: 20) truncate: Whether to truncate the output if too long (default: true)No
Memory SinkAppend, CompleteNoneNo. But in Complete Mode, restarted query will recreate the full table.Table name is the query name.

3.1. file sink

存储输出到目录中 仅仅支持 append 模式

需求: 把单词和单词的反转组成 json 格式写入到目录中。

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

import java.sql.Timestamp

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 设置数据源,并接收数据
        val lines: DataFrame = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load

        // 数据计算
        val words: DataFrame = lines
            .as[String]
            .flatMap((line: String) => {
                line
                    .split("\\W+")
                    .map((word: String) => {
                        (word, word.reverse)
                    })
            })
            .toDF("原单词", "反转单词")

        // 结果输出
        words
            .writeStream
            .outputMode("append")
            .format("json") // 支持 "orc", "json", "csv"
            .option("path", "./filesink") // 输出目录
            .option("checkpointLocation", "./ck1") // 必须指定 checkpoint 目录
            .start
            .awaitTermination()

        // 关闭执行环境
        spark.stop()

    }
}

输出的数据:

{"原单词":"abc","反转单词":"cba"}

3.2. kafka sink

将 wordcount 结果写入到 kafka

写入到 kafka 的时候应该包含如下列:

ColumnType
key (optional)string or binary
value (required)string or binary
topic (optional)string

注意:

  • 如果没有添加 topic option 则 topic 列必须有.
  • kafka sink 三种输出模式都支持

3.2.1. 以 Streaming 方式输出数据

这种方式使用流的方式源源不断的向 kafka 写入数据:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

import java.sql.Timestamp

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 设置数据源,并接收数据
        val lines: DataFrame = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load

        // 数据计算
        val words = lines
            .as[String]
            .flatMap((_: String).split("\\W+"))
            .groupBy("value")
            .count()
            .map((row: Row) => row.getString(0) + "," + row.getLong(1))
            .toDF("value") // 写入数据时候, 必须有一列 "value"

        words.writeStream
            .outputMode("update")
            .format("kafka")
            .trigger(Trigger.ProcessingTime(0))
            .option("kafka.bootstrap.servers", "bigdata1:9092,bigdata2:9092,bigdata3:9092") // kafka 配置
            .option("topic", "update") // kafka 主题
            .option("checkpointLocation", "./ck1") // 必须指定 checkpoint 目录
            .start
            .awaitTermination()

        // 关闭执行环境
        spark.stop()

    }
}

3.2.2. 以 batch 方式输出数据

这种方式输出离线处理的结果, 将已存在的数据分为若干批次进行处理. 处理完毕后程序退出:

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

import java.sql.Timestamp

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        val wordCount: DataFrame = spark
            .sparkContext
            .parallelize(Array("hello hello abc", "abc, hello"))
            .toDF("word")
            .groupBy("word")
            .count()
            .map(row => row.getString(0) + "," + row.getLong(1))
            .toDF("value") // 写入数据时候, 必须有一列 "value"

        wordCount.write // batch 方式
            .format("kafka")
            .option("kafka.bootstrap.servers", "bigdata1:9092,bigdata2:9092,bigdata3:9092") // kafka 配置
            .option("topic", "update") // kafka 主题
            .save()

        // 关闭执行环境
        spark.stop()

    }
}

3.3. console sink

主要用于测试数据输出

3.4. memory sink

该 sink 也是用于测试, 将其统计结果全部输入内中指定的表中, 然后可以通过 sql 与从表中查询数据。

如果数据量非常大, 可能会发送内存溢出:

import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

import java.sql.Timestamp
import java.util.{Timer, TimerTask}

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 设置数据源,并接收数据
        val lines: DataFrame = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load

        val words: DataFrame = lines
            .as[String]
            .flatMap((_: String).split("\\W+"))
            .groupBy("value")
            .count()

        val query: StreamingQuery = words
            .writeStream
            .outputMode("complete")
            .format("memory") // memory sink
            .queryName("word_count") // 内存临时表名
            .start

        // 测试使用定时器执行查询表
        val timer: Timer = new Timer(true)
        val task: TimerTask = new TimerTask {
            override def run(): Unit = spark.sql("select * from word_count").show
        }
        timer.scheduleAtFixedRate(task, 0, 2000)

        query.awaitTermination()

        // 关闭执行环境
        spark.stop()

    }
}

3.5. foreach sink

foreach sink 会遍历表中的每一行, 允许将流查询结果按开发者指定的逻辑输出;把 wordcount 数据写入到 mysql。

注意(需要在依赖中添加MySQL的驱动依赖):

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
</dependency>

建表语句如下所示:

create database ss;
use ss;
create table word_count
(
    word  varchar(255) primary key not null,
    count bigint                   not null
);

代码示例如下:

import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row, SparkSession}

import java.sql.{Connection, DriverManager, PreparedStatement, Timestamp}
import java.util.{Timer, TimerTask}

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 设置数据源,并接收数据
        val lines: DataFrame = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load

        val wordCount: DataFrame = lines
            .as[String]
            .flatMap((_: String).split("\\W+"))
            .groupBy("value")
            .count()

        val query: StreamingQuery = wordCount
            .writeStream
            .outputMode("update")
            // 使用 foreach 的时候, 需要传递ForeachWriter实例, 三个抽象方法需要实现. 每个批次的所有分区都会创建 ForeeachWriter 实例
            .foreach(new ForeachWriter[Row] {

                var conn: Connection = _
                var ps: PreparedStatement = _
                var batchCount = 0

                // 一般用于 打开链接. 返回 false 表示跳过该分区的数据,
                override def open(partitionId: Long, epochId: Long): Boolean = {
                    println("open ..." + partitionId + "  " + epochId)
                    Class.forName("com.mysql.jdbc.Driver")
                    conn = DriverManager.getConnection("jdbc:mysql://hadoop201:3306/ss", "root", "aaa")
                    // 插入数据, 当有重复的 key 的时候更新
                    val sql = "insert into word_count values(?, ?) on duplicate key update word=?, count=?"
                    ps = conn.prepareStatement(sql)

                    conn != null && !conn.isClosed && ps != null
                }

                // 把数据写入到连接
                override def process(value: Row): Unit = {
                    println("process ...." + value)
                    val word: String = value.getString(0)
                    val count: Long = value.getLong(1)
                    ps.setString(1, word)
                    ps.setLong(2, count)
                    ps.setString(3, word)
                    ps.setLong(4, count)
                    ps.execute()
                }

                // 用户关闭连接
                override def close(errorOrNull: Throwable): Unit = {
                    println("close...")
                    ps.close()
                    conn.close()
                }
            })
            .start

        query.awaitTermination()

        // 关闭执行环境
        spark.stop()

    }
}

3.6. ForeachBatch Sink

ForeachBatch Sink 是 spark 2.4 才新增的功能, 该功能只能用于输出批处理的数据。将统计结果同时输出到本地文件和 mysql 中。

import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row, SparkSession}

import java.sql.{Connection, DriverManager, PreparedStatement, Timestamp}
import java.util.{Properties, Timer, TimerTask}

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 设置数据源,并接收数据
        val lines: DataFrame = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load

        val wordCount: DataFrame = lines
            .as[String]
            .flatMap(_.split("\\W+"))
            .groupBy("value")
            .count()

        val props: Properties = new Properties()
        props.setProperty("user", "root")
        props.setProperty("password", "aaa")
        val query: StreamingQuery = wordCount
            .writeStream
            .outputMode("complete")
            .foreachBatch((df: Dataset[Row], batchId: Long) => { // 当前分区id, 当前批次id
                if (df.count() != 0) {
                    df.cache()
                    df.write.json(s"./$batchId")
                    df.write.mode("overwrite").jdbc("jdbc:mysql://hadoop201:3306/ss", "word_count", props)
                }
            })
            .start()

        query.awaitTermination()

        // 关闭执行环境
        spark.stop()

    }
}

注:其他Spark相关系列文章链接由此进 ->  Spark文章汇总 


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/74946.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Https、CA证书、数字签名

Https Http协议 Http协议是目前应用比较多应用层协议&#xff0c;浏览器对于Http协议已经实现。Http协议基本的构成部分有 请求行 &#xff1a; 请求报文的第一行请求头 &#xff1a; 从第二行开始为请求头内容的开始部分。每一个请求头都是由K-V键值对组成。请求体&#xf…

微服务05-Sentinel流量防卫兵

随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件,主要以 流量 为切入点,从流量路由、流量控制、流量整形、熔断降级、系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性。 S…

06-2_Qt 5.9 C++开发指南_自定义对话框及其调用

本篇介绍到的对话框及其调用实例较为复杂但十分详细&#xff0c;如果做了解可以先参考&#xff1a;QT从入门到实战x篇_13_模态和非模态对话框创建。 文章目录 1. 对话框的不同调用方式2. 对话框QWDialogSize 的创建和使用2.1 创建对话框QWDialogSize2.2 对话框的调用和返回值 …

C++入门基础(万字详解!!!)

文章目录 前言1.C关键字2.命名空间3.C的输入输出4.缺省参数4.1 全缺省4.2 半缺省 5.函数重载6. 引用6.1 引用的特性6.2 引用的使用场景6.3 引用和指针 7.内联函数7.1 特性 8.auto关键字8.1 注意事项 9. 基于范围的for循环9.1 使用条件 10.指针控制nullptr10.1 注意事项 11.总结…

LVGL学习笔记 28 - 键盘keyboard

目录 1. 设置关联文本框 2. 设置模式 2.1 LV_KEYBOARD_MODE_TEXT_LOWER 2.2 LV_KEYBOARD_MODE_TEXT_UPPER 2.3 LV_KEYBOARD_MODE_SPECIAL 2.4 LV_KEYBOARD_MODE_NUMBER 2.5 LV_KEYBOARD_MODE_USER_1 ~ LV_KEYBOARD_MODE_USER_4 3. 使能弹窗模式 4. 更改按键布局 5. 事…

P12-Retentive NetWork-RetNet挑战Transformer

论文地址:https://arxiv.org/abs/2307.08621 目录 Abstract 一.Introduction 二.Retentive Networks 2.1Retention 2.2Gated Multi-Scale Retention 2.3Overall Architecture of Retention Networks 2.4Relation to and Differences from Previous Methods 三.Experime…

Vue 安装开发者工具

1.下载开发者工具&#xff0c;下载地址&#xff1a;http://book.wiyp.top/App/Vue3开发者工具-谷歌/Vue3.crx 2.打开谷歌浏览器&#xff0c;点击扩展&#xff0c;点击管理扩展程序。 3.开启开发者模式&#xff0c;将 Vue3 开发者工具文件拖拽到浏览器中进行安装。 注&#xff…

Pytorch源码搜索与分析

PyTorch的的代码主要由C10、ATen、torch三大部分组成的。其中&#xff1a; C10 C10&#xff0c;来自于Caffe Tensor Library的缩写。这里存放的都是最基础的Tensor库的代码&#xff0c;可以运行在服务端和移动端。PyTorch目前正在将代码从ATen/core目录下迁移到C10中。C10的代…

pdf怎么压缩?一分钟学会文件压缩方法

PDF文件过大一般主要原因就是内嵌大文件、重复的资源或者图片比较多&#xff0c;随之而来的问题就是占用存储空间、被平台限制发送等等&#xff0c;这时候我们可以通过压缩的方法缩小PDF文件大小&#xff0c;下面就一起来看看具体的操作方法吧。 方法一&#xff1a;嗨格式压缩大…

SQL | 汇总数据

9-汇总数据 9.1-聚集函数 在实际开发过程中&#xff0c;可能会遇到下面这些情况&#xff1a; 确定大于某个值的有多少行数据&#xff0c;比如游戏排行榜&#xff0c;查询玩家排行多少名。 获取表中某些行的和&#xff0c;比如双十一当天&#xff0c;某个用户总订单价格是多少…

Mapbox加载天地图CGCS2000矢量瓦片地图

1.背景 最近在做天地图的项目&#xff0c;要基于MapBox添加CGCS2000矢量切片数据&#xff0c;但是 Mapbox 只支持web 墨卡托&#xff08;3857&#xff09;坐标系的数据。Github有专业用户修改了mapbox-gl的相关代码&#xff0c;支持CGCS2000的切片数据加载&#xff0c;并且修改…

一文教你学会Termux+SFTP+远程文件传输

文章目录 1. 安装openSSH2. 安装cpolar3. 远程SFTP连接配置4. 远程SFTP访问4. 配置固定远程连接地址 SFTP&#xff08;SSH File Transfer Protocol&#xff09;是一种基于SSH&#xff08;Secure Shell&#xff09;安全协议的文件传输协议。与FTP协议相比&#xff0c;SFTP使用了…

docker可视化工具Portainer

1:Portainer简介 Portainer是一个docker可视化管理工具&#xff0c;可以非常方便地管理docker镜像容器。官网地址&#xff1a;https://www.portainer.io/ 注&#xff1a;现在Portainer有BE&#xff08;收费&#xff09;和CE&#xff08;免费&#xff09;版本&#xff0c;安装的…

Mongodb:业务应用(1)

环境搭建参考&#xff1a;mongodb&#xff1a;环境搭建_Success___的博客-CSDN博客 需求&#xff1a; 在文章搜索服务中实现保存搜索记录到mongdb 并在搜索时查询出mongdb保存的数据 1、安装mongodb依赖 <dependency><groupId>org.springframework.data</groupI…

ICC2如何write_gds写出pr boundary

我正在「拾陆楼」和朋友们讨论有趣的话题,你⼀起来吧? 拾陆楼知识星球 在数模混合项目中,需要在前期确定pr boundary的尺寸,可以在virtuoso中画一个pr boundary存def给pr,当然,pr这边在前期修改尺寸也需要给负责模拟版图的同事确认,但ICC2 write gds默认是写不出pr bou…

React源码解析18(7)------ 实现事件机制(onClick事件)

摘要 在上一篇中&#xff0c;我们实现了useState的hook&#xff0c;但由于没有实现事件机制&#xff0c;所以我们只能将setState挂载在window上。 而这一篇主要就是来实现事件系统&#xff0c;从而实现通过点击事件进行setState。 而在React中&#xff0c;虽然我们是将事件绑…

jmeter获取mysql数据

JDBC Connection Configuration Database URL: jdbc:mysql:// 数据库地址 /库名 JDBC Driver class&#xff1a;com.mysql.jdbc.Driver Username&#xff1a;账号 Password&#xff1a;密码 JDBC Request 字段含义 字段含义 Variable Name Bound to Pool 数据库连接池配置…

PHP证券交易员学习网站mysql数据库web结构apache计算机软件工程网页wamp

一、源码特点 PHP证券交易员学习网站 是一套完善的web设计系统&#xff0c;对理解php编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。 下载地址https://download.csdn.net/download/qq_41221322/88205549 PHP证券交易员…

WPF 本地化的最佳做法

WPF 本地化的最佳做法 资源文件英文资源文件 en-US.xaml中文资源文件 zh-CN.xaml 资源使用App.xaml主界面布局cs代码 App.config辅助类语言切换操作类资源 binding 解析类 实现效果 应用程序本地化有很多种方式&#xff0c;选择合适的才是最好的。这里只讨论一种方式&#xff0…

开发者如何使用讯飞星火认知大模型API?

目录 1、申请星火API接口 2、使用星火API接口 3、测试编译效果 之前我们使用网页文本输入的方式体验了讯飞星火认知大模型的功能&#xff08;是什么让科大讯飞1个月股价翻倍&#xff1f;&#xff09;&#xff0c;本篇博文将从开发者角度来看看如何使用讯飞星火认知大模型API…