Spark实时(五):InputSource数据源案例演示

文章目录

InputSource数据源案例演示

一、​​​​​​​File Source

1、读取text文件

2、读取csv文件

3、读取json文件

二、Socket Source 

三、Rate Source


InputSource数据源案例演示

在Spark2.0版本之后,DataFrame和Dataset可以表示静态有边界的数据,也可以表示无边界的流式数据。在Structured Streaming中我们可以使用SparkSession针对流式数据源创建对应的Dataset或者DataFrame,并可以像处理批数据一样使用各种Operators操作处理流式数据。

Structured Streaming的数据源目前支持File Source 、Socket Source 、Rate Source、Kafka Source ,与Kafka的整合在后续整理,这里对其他三种数据源分别演示。

一、​​​​​​​​​​​​​​File Source

Sturctured Streaming可以读取写入目录的文件作为数据流,文件将按照文件修改时间的顺序进行处理,文件必须原子性的存入到监控目录中,支持的格式有text、csv、json、orc、parquet。

1、读取text文件

Scala代码如下:

package com.lanson.structuredStreaming.source

import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
  *  Structured Streaming监控目录 text格式数据
  */
object SSReadTextData {
  def main(args: Array[String]): Unit = {

    //1.创建对象
    val spark: SparkSession = SparkSession.builder().master("local")
      .appName("SSReadTextData")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    import  spark.implicits._

    spark.sparkContext.setLogLevel("Error")

    //2.监控目录
    val ds: Dataset[String] = spark.readStream.textFile("./data/")

    val result: DataFrame = ds.map(line => {
      val arr: Array[String] = line.split("-")
      (arr(0).toInt, arr(1), arr(2).toInt)
    }).toDF("id", "name", "age")

    val query: StreamingQuery = result.writeStream
      .format("console")
      .start()

    query.awaitTermination()

  }

}

 结果:

Java代码如下:

package com.lanson.structuredStreaming.source;

import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import scala.Tuple3;

public class SSReadTextData01 {
    public static void main(String[] args) throws TimeoutException, StreamingQueryException {
        //1.创建对象
        SparkSession spark = SparkSession.builder().master("local")
                .appName("SSReadSocketData01")
                .config("spark.sql.shuffle.partitions", 1)
                .getOrCreate();

        spark.sparkContext().setLogLevel("Error");

        Dataset<String> ds = spark.readStream().textFile("./data/");

        Dataset<Tuple3<Integer, String, Integer>> ds2 = ds.map(new MapFunction<String, Tuple3<Integer, String, Integer>>() {
            @Override
            public Tuple3<Integer, String, Integer> call(String line) throws Exception {
                String[] arr = line.split("-");

                return new Tuple3<>(Integer.valueOf(arr[0]), arr[1],Integer.valueOf(arr[2]) );
            }
        }, Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.INT()));

        Dataset<Row> result = ds2.toDF("id", "name", "age");

        result.writeStream()
                .format("console")
                .start()
                .awaitTermination();

    }
}

 结果:

以上代码编写完成之后,向监控的目录“./data”中不断写入含有以下内容的文件,可以看到控制台有对应的流数据输出,这里一定是原子性的将文件复制到对应目录下。文件内容如下:

1-zhangsan-18
2-lisi-19
3-ww-20

2、读取csv文件

Scala代码如下:

package com.lanson.structuredStreaming.source

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.types.StructType

/**
  * Structured Streaming 读取CSV数据
  */
object SSReadCsvData {
  def main(args: Array[String]): Unit = {
    //1.创建对象
    val spark: SparkSession = SparkSession.builder().master("local")
      .appName("SSReadCsvData")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    import  spark.implicits._

    spark.sparkContext.setLogLevel("Error")

    //2.创建CSV数据schema
    val userSchema: StructType = new StructType().add("id", "integer")
      .add("name", "string")
      .add("gender", "string")
      .add("age", "integer")


    val result: DataFrame = spark.readStream
      .option("sep", ",")
      .schema(userSchema)
      .csv("./data/")

    val query: StreamingQuery = result.writeStream
      .format("console")
      .start()

    query.awaitTermination()

  }

}

结果:

Java代码如下

package com.lanson.structuredStreaming.source;

import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;

/**
 * Structured Streaming 读取CSV数据
 */

public class SSReadCsvData01 {
    public static void main(String[] args) throws TimeoutException, StreamingQueryException {
        //1.创建对象
        SparkSession spark = SparkSession.builder().master("local")
                .appName("SSReadCsvData")
                .config("spark.sql.shuffle.partitions", 1)
                .getOrCreate();

        spark.sparkContext().setLogLevel("Error");

        StructType userSchema = new StructType()
                .add("id", "integer")
                .add("name", "string")
                .add("gender", "string")
                .add("age", "integer");
        Dataset<Row> result = spark.readStream()
                .option("sep", ",")
                .schema(userSchema)
                .csv("./data/");

        result.writeStream()
                .format("console")
                .start()
                .awaitTermination();

    }
}

 结果:

以上代码运行之后向对应监控的目录下原子性写入含有数据的csv文件,在控制台可以看到实时监控内容。文件内容如下:

1,zhangsan,一班,100
2,lisi,二班,200
3,wangwu,一班,300
4,maliu,二班,100
5,tianqi,三班,100
6,gaoba,三班,50
7,zs2,四班,50

3、读取json文件

Scala代码如下:

package com.lanson.structuredStreaming.source

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.types.StructType

/**
  *  Structured Streaming 监控Json格式数据
  */
object SSReadJsonData {
  def main(args: Array[String]): Unit = {
    //1.创建对象
    val spark: SparkSession = SparkSession.builder().master("local")
      .appName("SSReadCsvData")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    import  spark.implicits._

    spark.sparkContext.setLogLevel("Error")

    //2.创建 json 数据schema
    val userSchema: StructType = new StructType().add("id", "integer")
      .add("name", "string")
      .add("age", "integer")



    val result: DataFrame = spark.readStream
      .schema(userSchema)
      .json("./data/")

    val query: StreamingQuery = result.writeStream
      .format("console")
      .start()

    query.awaitTermination()

  }

}

结果:

Java代码如下

package com.lanson.structuredStreaming.source;


import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;
import java.util.concurrent.TimeoutException;

/**
 * Structured Streaming实时监控目录中json文件作为数据流
 */
public class SSReadJsonData01 {
    public static void main(String[] args) throws TimeoutException, StreamingQueryException {
        //1.创建对象
        SparkSession spark = SparkSession.builder().appName("File Source test")
            .master("local")
            .getOrCreate();

        //2.设置日志
        spark.sparkContext().setLogLevel("Error");

        //3.设置Schema
        StructType userSchema = new StructType().add("id", "integer")
            .add("name", "string")
            .add("age", "integer");

        //4.指定监控目录读取数据json数据
        Dataset<Row> ds = spark.readStream()
            .option("sep", ",")
            .schema(userSchema)
            .json("./data/");

        //5.打印数据到控制台
        StreamingQuery query =ds.writeStream()
            .format("console")
            .start();

        query.awaitTermination();

    }
}

结果:

以上代码启动之后,向监控的目录“./data”下原子写入含有以下内容的json文件,在控制台可以看到实时监控内容。json文件内容如下:

{"id":1,"name":"zs","age":18}
{"id":2,"name":"ls","age":19}
{"id":3,"name":"ww","age":20}
{"id":4,"name":"ml","age":21}

注意:实时监控json格式数据时,创建的Schema 中的字段需要与Json中的属性保持一致,否则在映射成表时,Schema中含有但在Json中没有的属性的字段对应的数据会为null。

二、Socket Source 

读取Socket方式需要指定对应的host和port,读取Socket数据源多用于测试场景,这里不再演示。

可以参考案例:

Spark实时(三):Structured Streaming入门案例-CSDN博客

三、Rate Source

Rate Source是以每秒指定的行数生成数据,每个输出行包含一个timestamp和value,其中timestamp是一个Timestamp含有信息分配的时间类型,value是从0开始的Long类型的数据,Rate Source式多用于测试。

scala代码如下:

package com.lanson.structuredStreaming.source

import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * SSRateSource
  */
object SSRateSource {
  def main(args: Array[String]): Unit = {
    //1.创建对象
    val spark: SparkSession = SparkSession.builder().master("local")
      .appName("rate test")
//      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    val result: DataFrame = spark.readStream
      .format("rate")
      // 配置每秒生成多少行数据,默认1行
      .option("rowsPerSecond", "10")
      .option("numPartitions", 5)
      .load()
    result.writeStream
      .format("console")
      .option("numRows","100")
      .option("truncate","false")
      .start()
      .awaitTermination()

  }

}

结果:

Java代码如下:

package com.lanson.structuredStreaming.source;

import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;

public class ssratesource01 {
    public static void main(String[] args) throws TimeoutException, StreamingQueryException {
        //1.创建对象
       SparkSession spark = SparkSession.builder().master("local")
                .appName("rate test")
                .getOrCreate();
       spark.sparkContext().setLogLevel("Error");

        Dataset<Row> result = spark.readStream()
                .format("rate")
                // 配置每秒生成多少行数据,默认1行
                .option("rowsPerSecond", "10")
                .option("numPartitions", 5)
                .load();

        result.writeStream()
                .format("console")
                .option("numRows","100")
                .option("truncate","false")
                .start()
                .awaitTermination();
    }
}

结果: 

 


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/869869.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【RT摩拳擦掌】RT600 4路音频同步输入1路TDM输出方案

【RT摩拳擦掌】RT600 4路音频同步输入1路TDM输出方案 一&#xff0c; 文章简介二&#xff0c;硬件平台构建2.1 音频源板2.2 音频收发板2.3 双板硬件连接 三&#xff0c;软件方案与软件实现3.1 方案实现3.2 软件代码实现3.2.1 4路I2S接收3.2.2 I2S DMA pingpong配置3.2.3 音频数…

Spring源码学习笔记之@Async源码

文章目录 一、简介二、异步任务Async的使用方法2.1、第一步、配置类上加EnableAsync注解2.2、第二步、自定义线程池2.2.1、方法一、不配置自定义线程池使用默认线程池2.2.2、方法二、使用AsyncConfigurer指定线程池2.2.3、方法三、使用自定义的线程池Excutor2.2.4、方法四、使用…

算法-----递归~~搜索~~回溯(宏观认识)

目录 1.什么是递归 1.1二叉树的遍历 1.2快速排序 1.3归并排序 2.为什么会用到递归 3.如何理解递归 4.如何写好一个递归 5.什么是搜索 5.1深度&#xff08;dfs&#xff09;优先遍历&优先搜索 5.2宽度&#xff08;bfs&#xff09;优先遍历&优先搜索 6.回溯 1.什…

《0基础》学习Python——第二十三讲__网络爬虫/<6>爬取哔哩哔哩视频

一、在B站上爬取一段视频&#xff08;B站视频有音频和视频两个部分&#xff09; 1、获取URL 注意&#xff1a;很多平台都有反爬取的机制&#xff0c;B站也不例外 首先按下F12找到第一条复制URL 2、UA伪装&#xff0c;下列图片中&#xff08;注意代码书写格式&#xff09; 3、Co…

redis的使用场景和持久化方式

redis的使用场景 热点数据的缓存。热点&#xff1a;频繁读取的数据。限时任务的操作&#xff1a;短信验证码。完成session共享的问题完成分布式锁。 redis的持久化方式 什么是持久化&#xff1a;把内存中的数据存储到磁盘的过程&#xff0c;同时也可以把磁盘中的数据加载到内存…

react开发-配置开发时候@指向SRC目录

这里写目录标题 配置开发时候指向SRC目录VScode编辑器给出提示总体1.配置react的 2.配置Vscode的1.配置react的2,配置VSCode的提示支持 配置开发时候指向SRC目录VScode编辑器给出提示 总体1.配置react的 2.配置Vscode的 1.配置react的 1. 我么需要下载一个webpack的插件 这样…

河南萌新联赛2024第(二)场:南阳理工学院

文章目录 链接 A. 国际旅行Ⅰ题意与思路代码 D.A*BBBB题意与思路代码 F.水灵灵的小学弟题意与思路代码 H.狼狼的备忘录题意与思路代码 I.重生之zbk要拿回属于他的一切题意与思路代码 J.这是签到题意与思路代码总结 链接 链接 A. 国际旅行Ⅰ 题意与思路 这是一个签到题&…

[红明谷CTF 2021]write_shell 1

目录 代码审计check()$_GET["action"] ?? "" 解题 代码审计 <?php error_reporting(0); highlight_file(__FILE__); function check($input){if(preg_match("/| |_|php|;|~|\\^|\\|eval|{|}/i",$input)){// if(preg_match("/| |_||p…

如何使用C#快速创建定时任务

原文链接&#xff1a;https://www.cnblogs.com/zhaotianff/p/17511040.html 使用Windows的计划任务功能可以创建定时任务。 使用schtasks.exe可以对计划任务进行管理&#xff0c;而不需要编写额外代码 这里掌握schtasks /CREATE 的几个核心参数就可以快速创建计划任务 /SC …

一些和颜色相关网站

1.中国传统色 2.网页颜色选择器 3.渐变色网站 4.多风格色卡生成 5.波浪生成 6.半透明磨砂框 7.色卡组合

OAK相机支持的图像传感器有哪些?

相机支持的传感器 在 RVC2 上&#xff0c;固件必须具有传感器配置才能支持给定的相机传感器。目前&#xff0c;我们支持下面列出的相机传感器的开箱即用&#xff08;固件中&#xff09;传感器配置。 名称 分辨率 传感器类型 尺寸 最大 帧率 IMX378 40563040 彩色 1/2.…

nginx通过nginx_upstream_check_module实现后端健康检查

1、简介说明 nginx是常用的反向代理和负载均衡服务&#xff0c;具有强大并发能力、稳定性、丰富的功能集、低资源的消耗。 nginx自身是没有针对后端节点健康检查的&#xff0c;但是可以通过默认自带的ngx_http_proxy_module 模块和ngx_http_upstream_module模块中的相关指令来完…

EmlogPro图片本地化插件修复版V2.0

Emlog图片本地化插件V2.0 全新优化升级版&#xff0c;并非emlog官方发布的收费插件&#xff0c;可以快速将文章中的远程图片链接下载到自己的服务器&#xff0c;避免远程站点图片防盗链、跑路等等问题&#xff01; 插件下载&#xff1a;img2local.zip 功能特色&#xff1a; …

Centos安装、迁移gitlab

Centos安装迁移gitlab 一、下载安装二、配置rb修改&#xff0c;起服务。三、访问web&#xff0c;个人偏好设置。四、数据迁移1、查看当前GitLab版本2、备份旧服务器的文件3、将上述备份文件拷贝到新服务器同一目录下&#xff0c;恢复GitLab4、停止新gitlab数据连接服务5、恢复备…

MySQL SQL 编程练习

目录 创建表并插入数据 查看表结构 创建触发器 创建INSERT 触发器 创建DELETE 触发器 创建更新触发器 创建存储过程 创建提取emp_new表所有员工姓名和工资的存储过程s1 创建存储过程s2&#xff0c;实现输入员工姓名后返回员工的年龄 创建一个存储过程s3&#xff0c;有2个参数&…

<数据集>AffectNet表情识别数据集<目标检测>

数据集格式&#xff1a;VOCYOLO格式 图片数量&#xff1a;29752张 标注数量(xml文件个数)&#xff1a;29752 标注数量(txt文件个数)&#xff1a;29752 标注类别数&#xff1a;7 标注类别名称&#xff1a;[anger,contempt,disgust,fear,happy,neutral,sad,surprise] 序号类…

在WPF中使用WebView2详解

Microsoft Edge WebView2 Microsoft Edge WebView2 控件允许在本机应用中嵌入 web 技术(HTML、CSS 以及 JavaScript)。 WebView2 控件使用 Microsoft Edge 作为绘制引擎&#xff0c;以在本机应用中显示 web 内容。 使用 WebView2 可以在本机应用的不同部分嵌入 Web 代码&…

【网络流】——初识(最大流)

网络流-最大流 基础信息引入一些概念基本性质 最大流定义 Ford–Fulkerson 增广Edmons−Karp算法Dinic 算法参考文献 基础信息 引入 假定现在有一个无限放水的自来水厂和一个无限收水的小区&#xff0c;他们之间有多条水管和一些节点构成。 每一条水管有三个属性&#xff1a…

【算法】单链表面试题

1.求单链表中有效节点的个数 //方法&#xff1a;获取到单链表的节点的个数(如果是带头节点的链表&#xff0c;不统计头节点)/**** param head 链表的头节点* return 返回有效节点的个数*/public static int getLength(HeroNode head) {if (head.next null) {return 0;}int le…

面试场景题系列--(2)短 URL 生成器设计:百亿短 URL 怎样做到无冲突?--xunznux

文章目录 面试场景题&#xff1a;短 URL 生成器设计&#xff1a;百亿短 URL 怎样做到无冲突&#xff1f;1. 需求分析2. 短链接生成算法2.1 自增法2.2 散列函数法2.3 预生成法 3. 部署模型3.1 其他部署方案 4. 设计4.1 重定向响应码4.2 短 URL 预生成文件及预加载4.3 用户自定义…