sparkstreamnig实时处理入门

1.2 SparkStreaming实时处理入门

1.2.1 工程创建

导入maven依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>3.1.2</version>
</dependency>
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.1.2</version>
        </dependency>
1.2.2 入口类StreamingContext
SparkStreaming中的入口类,称之为StreamingContext,但是底层还是得需要依赖SparkContext。
object SparkStreamingWordCountOps {
    def main(args: Array[String]): Unit = {
        /*
            StreamingContext的初始化,需要至少两个参数,SparkConf和BatchDuration
            SparkConf不用多说
            batchDuration:提交两次作业之间的时间间隔,每次会提交一个DStream,将数据转化batch--->RDD
            所以说:sparkStreaming的计算,就是每隔多长时间计算一次数据
         */
        val conf = new SparkConf()
                    .setAppName("SparkStreamingWordCount")
                    .setMaster("local[*]")
        val duration = Seconds(2)
        val ssc = new StreamingContext(conf, duration) //批次
​
        //业务
        
        
        //为了执行的流式计算,必须要调用start来启动
        ssc.start()
        //为了不至于start启动程序结束,必须要调用awaitTermination方法等待程序业务完成之后调用stop方法结束程序,或者异常
        ssc.awaitTermination()
    }
}
1.2.3 业务编写

SparkStreaming是一个流式计算的计算引擎,那么 就模拟一个对流式数据进行单词统计

代码实现

package com.qianfeng.sparkstreaming
​
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Durations, StreamingContext}
​
/**
 * sparkStreaming的流程序
 */
object Demo01_SparkStreaming_WC {
  def main(args: Array[String]): Unit = {
    //1、获取streamingcontext
    val conf = new SparkConf()
      .setAppName("streaming wc")
      .setMaster("local[*]")
    val sc = new StreamingContext(conf, Durations.seconds(2)) //微批次微2s
    //2、初始化数据
    val ds = sc.socketTextStream("qianfeng01", 6666)
    //3、对数据进行操作
    val sumDS = ds.flatMap(_.split(" "))
      #判断H开头 5位
      .filter(x=>x.startsWith("H") && x.length == 5)
      .map((_, 1))
      .reduceByKey(_ + _)
    //4、对数据做输出
    sumDS.print()
​
    //5、开启sc
    sc.start()
    //6、等待结束  --- 实时不能停止
    sc.awaitTermination()
  }
}

使用netcat进行测试(如果没有请先安装,有则忽略如下)

需要在任意一台节点上安装工具:

[root@qianfeng01 home]# yum install -y nc

启动监听端口:

[root@qianfeng01 home]# nc -lk 6666
hello nihao
nihao hello
hi
hello nihao

Guff_hys_python数据结构,大数据开发学习,python实训项目-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/283356.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【MySQL表的增删查改】

文章目录 前言1 Create1.1 单行数据 全列插入1.2 多行数据 指定列插入1.3 插入否则更新1.4 替换 2 Retrieve2.1 SELECT 列2.1.1 全列查询2.1.2 指定列查询2.1.3 查询字段为表达式2.1.4 为查询结果指定别名2.1.5 结果去重 2.2 WHERE 条件2.2.1 英语不及格的同学及英语成绩 ( &…

CocoaPods安装及‘__rvm_make -j8‘处理

CocoaPods是一个用Ruby写的、负责管理iOS项目中第三方开源库的工具&#xff0c;CocoaPods能让我们集中的、统一管理第三方开源库&#xff0c;为我们节省设置和更新第三方开源库的时间。 安装步骤 1.查看ruby版本 ruby -v 2.通过rvm来安装或升级Ruby&#xff0c;依次执行 cu…

Apache OFBiz RCE漏洞复现(CVE-2023-51467)

0x01 产品简介 Apache OFBiz是一个电子商务平台,用于构建大中型企业级、跨平台、跨数据库、跨应用服务器的多层、分布式电子商务类应用系统。 0x02 漏洞概述 漏洞成因 该系统的身份验证机制存在缺陷,可能允许未授权用户通过绕过标准登录流程来获取后台访问权限。此外,在…

【PTA-C语言】实验七-函数与指针I

如果代码存在问题&#xff0c;麻烦大家指正 ~ ~有帮助麻烦点个赞 ~ ~ 目录——实验七-函数与指针I 6-1 弹球距离&#xff08;分数 10&#xff09;6-2 使用函数输出一个整数的逆序数&#xff08;分数 10&#xff09;6-3 使用函数求最大公约数&#xff08;分数 10&#xff09;6-4…

使用Pycharm给html文件添加浏览器

1、选择菜单栏的File---->选择setting设置 2、选择Tools(工具)---> Web Browser(web 浏览器) 勾选 自己想要添加的浏览器前面 的勾选框即可 注意点击ok进行保存

《数据结构、算法与应用C++语言描述》- 平衡搜索树 -全网唯一完整详细实现插入和删除操作的模板类

平衡搜索树 完整可编译运行代码见&#xff1a;Github::Data-Structures-Algorithms-and-Applications/_34Balanced search tree 概述 本章会讲AVL、红-黑树、分裂树、B-树。 平衡搜索树的应用&#xff1f; AVL 和红-黑树和分裂树适合内部存储的应用。 B-树适合外部存储的…

github使用技巧(经验篇)

相关经验 指定代码范围并高亮显示 例如&#xff0c;指定nn_ops.py文件2612-L2686行的代码&#xff1a;https://github.com/tensorflow/tensorflow/blob/v2.14.0/tensorflow/python/ops/nn_ops.py#L2612-L2686 FAQ Q&#xff1a;github网页打不开&#xff1f; 【github加载不…

Java项目调试实战:如何高效调试Spring Boot项目中的GET请求,并通过equalsIgnoreCase()解决大小写不一致问题

Java项目调试实战&#xff1a;如何高效调试Spring Boot项目中的GET请求&#xff0c;并通过equalsIgnoreCase解决大小写不一致问题 写在最前面全部过程Java equalsIgnoreCase() 方法idea中如何调试SpringBoot项目在IntelliJ IDEA中使用内置HTTP客户端设置断点和调试 补充&#x…

PiflowX组件-WriteToUpsertKafka

WriteToUpsertKafka组件 组件说明 以upsert方式往Kafka topic中写数据。 计算引擎 flink 有界性 Streaming Upsert Mode 组件分组 kafka 端口 Inport&#xff1a;默认端口 outport&#xff1a;默认端口 组件属性 名称展示名称默认值允许值是否必填描述例子kafka_h…

Unity坦克大战开发全流程——结束场景——通关界面

结束场景——通关界面 就照着这样来拼 写代码 hideme不要忘了 修改上一节课中的代码

旅游网站Xtrip 前端模板html推荐

一、需求分析 旅游网站的功能可以根据具体的业务需求和目标进行不同的设计和实现&#xff0c;但是以下是一些常见的旅游网站功能&#xff0c;供参考&#xff1a; 酒店预订功能&#xff1a;用户可以搜索并预订酒店&#xff0c;查看酒店的详细信息、价格、评价和照片&#xff0c…

MySQL 8.0 InnoDB Tablespaces之General Tablespaces(通用表空间/一般表空间)

文章目录 MySQL 8.0 InnoDB Tablespaces之General Tablespaces&#xff08;通用表空间/一般表空间&#xff09;General tablespaces&#xff08;通用表空间/一般表空间&#xff09;通用表空间的功能通用表空间的限制 创建通用表空间&#xff08;一般表空间&#xff09;创建语法…

Linux磁盘与文件管理

目录 一、磁盘介绍 1. 磁盘数据结构 2. 磁盘的接口类型 3. 磁盘在Linux上的表现形式 二、磁盘分区与MBR 1. 分区优缺点 2. 分区方式 3. MBR分区 4. GPT分区 三、文件系统 1. 文件系统的组成 2. 默认的文件系统 3. 文件系统的作用 4. 模拟破坏文件与修复文件 4…

项目总结报告

《项目总结报告》 1.项目概要&#xff08;项目基本信息&#xff0c;项目期间&#xff0c;项目成果&#xff0c;项目开发工具环境&#xff09; 2.项目工作分析&#xff08;需求变更&#xff0c;计划与进度实施&#xff0c;投入情况&#xff0c;收益情况&#xff0c;质量情况&…

【Linux】Linux 下基本指令 -- 详解

无论是什么命令&#xff0c;用于什么用途&#xff0c;在 Linux 中&#xff0c;命令有其通用的格式&#xff1a; command [-options] [parameter] command&#xff1a;命令本身。-options&#xff1a;[可选&#xff0c;非必填]命令的一些选项&#xff0c;可以通过选项控制命令的…

navicat premium历史版本下载及更新navicat premium15 永久(使用)有效期

1、navicat premium介绍 Navicat Premium 是一套可创建多个连接的数据库开发工具&#xff0c;让你从单一应用程序中同时连接 MySQL、Redis、MariaDB、MongoDB、SQL Server、Oracle、PostgreSQL 和 SQLite 。它与 GaussDB 、OceanBase 数据库及 Amazon RDS、Amazon Aurora、Amaz…

windows和linux操作Git(序章2)

Git 分布式版本控制系统(序章1) ## Linux 下安装 Git&#x1f53a; ​ ## Git命令大全&#x1f53a; windows和linux通用 安装完 Git 后&#xff0c;需要进行配置&#xff0c;如姓名、Email 等 git config --global user.name "你的名字"git config --global us…

解算人生--写于2023跨年之夜

最近买了一本书&#xff0c;书名叫《计算》 读了部分内容&#xff0c;虽然理解上还需要再下下功夫&#xff0c;但是直观的感觉冲击还是挺大的&#xff0c;最明显的就是表面与本质的把握。大家可能都有这样一种感觉&#xff0c;初步涉足某一领域时&#xff0c;开始我们都会被大量…

vmware部署docker+springboot+MySQL(超详细)

一、前期准备 (一)安装jdk #docker search openjdk #docker pull openjdk:8 (二)确认网络 如果局域网其他终端(如手机访问),虚拟机网络连接需要选择《桥接》模式,而且,需要使用有线连接,不能使用Wi-Fi,切忌切忌! 并且要选择实际的那个有线连接。比如我这里是“R…

Leetcode 剑指 Offer II 059. 数据流中的第 K 大元素

题目难度: 简单 原题链接 今天继续更新 Leetcode 的剑指 Offer&#xff08;专项突击版&#xff09;系列, 大家在公众号 算法精选 里回复 剑指offer2 就能看到该系列当前连载的所有文章了, 记得关注哦~ 题目描述 设计一个找到数据流中第 k 大元素的类&#xff08;class&#xf…