【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】

【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】

把DStream写入到MySQL数据库中

  • Spark 3.4.1
  • MySQL 8.0.30
  • sbt 1.9.2

文章目录

  • 【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】
  • 前言
  • 一、背景说明
  • 二、使用步骤
    • 1.引入库
    • 2.开发代码
    • 运行测试
  • 总结


前言

需要基于Spark Streaming 将实时监控的套接字流统计WordCount结果保存至MySQL


提示:本项目通过sbt控制依赖

一、背景说明

在Spark应用中,外部系统经常需要使用到Spark DStream处理后的数据,因此,需要采用输出操作把DStream的数据输出到数据库或者文件系统中

Spark Streaming是一个基于Spark的实时计算框架,它可以从多种数据源消费数据,并对数据进行高效、可扩展、容错的处理。Spark Streaming的工作原理有以下几个步骤:

  • 数据接收:Spark Streaming可以从各种输入源接收数据,如Kafka、Flume、Twitter、Kinesis等,然后将数据分发到Spark集群中的不同节点上。每个节点上有一个接收器(Receiver)负责接收数据,并将数据存储在内存或磁盘中。
  • 数据划分:Spark Streaming将连续的数据流划分为一系列小批量(Batch)的数据,每个批次包含一定时间间隔内的数据。这个时间间隔称为批处理间隔(Batch Interval),可以根据应用的需求进行设置。每个批次的数据都被封装成一个RDD,RDD是Spark的核心数据结构,表示一个不可变的分布式数据集。
  • 数据处理:Spark Streaming对每个批次的RDD进行转换和输出操作,实现对流数据的处理和分析。转换操作可以使用Spark Core提供的各种函数,如map、reduce、join等,也可以使用Spark Streaming提供的一些特殊函数,如window、updateStateByKey等。输出操作可以将处理结果保存到外部系统中,如HDFS、数据库等。
  • 数据输出:Spark Streaming将处理结果以DStream的形式输出,DStream是一系列连续的RDD组成的序列,表示一个离散化的数据流。DStream可以被进一步转换或输出到其他系统中。

DStream有状态转换操作是指在Spark Streaming中,对DStream进行一些基于历史数据或中间结果的转换,从而得到一个新的DStream。
在这里插入图片描述

二、使用步骤

1.引入库

ThisBuild / version := "0.1.0-SNAPSHOT"

ThisBuild / scalaVersion := "2.13.11"

lazy val root = (project in file("."))
  .settings(
    name := "SparkLearning",
    idePackagePrefix := Some("cn.lh.spark"),
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.4.1",
    libraryDependencies += "org.apache.spark" %% "spark-core" % "3.4.1",
    libraryDependencies += "org.apache.hadoop" % "hadoop-auth" % "3.3.6",
    libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.4.1",
    libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.4.1",
    libraryDependencies += "org.apache.spark" %% "spark-mllib" % "3.4.1" % "provided",
    libraryDependencies += "mysql" % "mysql-connector-java" % "8.0.30"
)

2.开发代码

为了实现通过spark Streaming 监控控制台输入,需要开发两个代码:

  • NetworkWordCountStatefultoMysql.scala
  • StreamingSaveMySQL8.scala

NetworkWordCountStatefultoMysql.scala

package cn.lh.spark  
  
import org.apache.spark.SparkConf  
import org.apache.spark.streaming.{Seconds, StreamingContext}  
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}  
  
object NetworkWordCountStatefultoMysql {  
  
  def main(args: Array[String]): Unit = {  
    //    定义状态更新函数  
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {  
      val currentCount = values.foldLeft(0)(_ + _)  
      val previousCount = state.getOrElse(0)  
      Some(currentCount + previousCount)  
    }  
  
    //    设置log4j日志级别  
    StreamingExamples.setStreamingLogLevels()  
  
    val conf: SparkConf = new SparkConf().setAppName("NetworkCountStateful").setMaster("local[2]")  
    val scc: StreamingContext = new StreamingContext(conf, Seconds(5))  
  
    //    设置检查点,具有容错机制  
    scc.checkpoint("F:\\niit\\2023\\2023_2\\Spark\\codes\\checkpoint")  
  
    val lines: ReceiverInputDStream[String] = scc.socketTextStream("192.168.137.110", 9999)  
    val words: DStream[String] = lines.flatMap(_.split(" "))  
    val wordDstream: DStream[(String, Int)] = words.map(x => (x, 1))  
    val stateDstream: DStream[(String, Int)] = wordDstream.updateStateByKey[Int](updateFunc)  
    // 打印出状态  
    stateDstream.print()  
    // 将统计结果保存到MySQL中  
    stateDstream.foreachRDD(rdd =>{  
      val repartitionedRDD = rdd.repartition(3)  
      repartitionedRDD.foreachPartition(StreamingSaveMySQL8.writeToMySQL)  
    })  
  
    scc.start()  
    scc.awaitTermination()  
  
    scc.stop()  
  }  
  
  
}

StreamingSaveMySQL8.scala

package cn.lh.spark  
  
import java.sql.DriverManager  
  
object StreamingSaveMySQL8 {  
  
  // 定义写入 MySQL 的函数  
  def writeToMySQL(iter: Iterator[(String,Int)]): Unit = {  
    // 保存到MySQL  
    val ip = "192.168.137.110"  
    val port = "3306"  
    val db = "sparklearning"  
    val username = "lh"  
    val pwd = "Lh123456!"  
    val jdbcurl = s"jdbc:mysql://$ip:$port/$db"  
    val conn = DriverManager.getConnection(jdbcurl, username, pwd)  
    val statement = conn.prepareStatement("INSERT INTO wordcount (word,count) VALUES (?,?)")  
  
    try {  
      // 写入数据  
      iter.foreach { wc =>  
        statement.setString(1, wc._1.trim)  
        statement.setInt(2, wc._2.toInt)  
        statement.executeUpdate()  
      }  
    } catch {  
      case e:Exception => e.printStackTrace()  
    } finally {  
      if(statement != null){  
        statement.close()  
      }  
      if(conn!=null){  
        conn.close()  
      }  
    }  
  }  
  
}

运行测试

准备工作:

  1. 提前在mysql中新建数据表保存Spark Streaming写入的数据
    在这里插入图片描述

  2. 启动nc -lk 9999
    在这里插入图片描述

  3. 启动 NetworkWordCountStatefultoMysql.scala
    ![[Pasted image 20230804214904.png]]在这里插入图片描述

  4. 在nc端口输入字符,再分别到idea控制台和MySQL检查结果

在这里插入图片描述


总结

本次实验通过IDEA基于Spark Streaming 3.4.1开发程序监控套接字流,并统计字符串,实现实时统计单词出现的数量。试验成功,相对简单。
后期改善点如下:

  • 通过配置文件读取mysql数据库相应的配置信息,不要写死在代码里
  • 写入数据时,sql语句【插入的表信息】,可以在调用方法时,当作参数输入
  • iter: Iterator[(String,Int)] 应用泛型
  • 插入表时,自动保存插入时间

欢迎各位开发者一同改进代码,有问题有疑问提出来交流。谢谢!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/59488.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

某东详情页h5st 算法分析

文章目录 声明目标地址h5st 算法四大入参分析1. z值生成2. v值生成3. b值生成4. r值生成风控浅谈往期逆向文章推荐声明 本文章中所有内容仅供学习交流,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关,若有侵权,请私信我立即删除! 目标地址 aHR0cHM6Ly…

IO进程线程day7(2023.8.4)

一、Xmind整理: 二、课上练习: 练习1:创建两个线程:其中一个线程拷贝前半部分,另一个线程拷贝后半部分。 只允许开一份资源,且用互斥锁方式实现。 提示:找临界区--->找临界资源。 #includ…

单例模式和工厂模式

目录 今日良言:关关难过关关过,步步难行步步行 一、单例模式 1.饿汉模式 2.懒汉模式 二、工厂模式 今日良言:关关难过关关过,步步难行步步行 一、单例模式 首先来解释一下,什么是单例模式。 单例模式也就是单个…

linux 文件的权限

修改文件的权限 我这里有一个test.txt 文件,我们ll 查看一下该文件相应的属性信息 其中,权限的位置是相对固定的即: 第一个位置是r 权限,代表可读权限。 第二个位置是w权限,代表可修改权限。 第三个位置是x权限&…

一百四十一、Kettle——kettle8.2在Windows本地开启carte服务以及配置子服务器

一、目的 在kettle建好共享资源库后,为了给在服务器上部署kettle的carte服务躺雷,先在Windows本地测试一下怎么玩carte服务 二、Kettle版本以及在Windows本地安装路径 kettle版本是8.2 pdi-ce-8.2.0.0-342 kettle本地安装路径是D:\j…

linuxARM裸机学习笔记(2)----汇编LED灯实验

MX6ULL 的 IO IO的复用功能 这里的只使用了低五位,用来配置io口,其中bit0~bit3(MUX_MODE)就是设置 GPIO1_IO00 的复用功能的,GPIO1_IO00 一共可以复用为 9种功能 IO,分别对应 ALT0~ALT8。每种对应了不同的功能 io的属性配置 HY…

拦截器在SpringBoot中使用,HandlerInterceptor,WebMvcConfigurer

拦截器在Controller之前执行。 用于权限校验,日志记录,性能监控 在SpringBoot中使用 创建拦截器类:首先,创建一个Java类来实现拦截器逻辑。拦截器类应该实现Spring提供的HandlerInterceptor接口。实现拦截器方法:拦…

Unity数字可视化学校_昼夜(二)

1、时间设置: 2、新建夜晚 3、新建侧置球(BOX),测试灯光强度 降低亮度 色调:冷色调 4、自发光 新建shader 灯光控制 道路线: 建筑: 夜晚加灯光: 玻璃: 加大灯光数量: 边缘…

uni-ajax网络请求库使用

uni-ajax网络请求库使用 uni-ajax是什么 uni-ajax是基于 Promise 的轻量级 uni-app 网络请求库,具有开箱即用、轻量高效、灵活开发 特点。 下面是安装和使用教程 安装该请求库到项目中 npm install uni-ajax编辑工具类request.js // ajax.js// 引入 uni-ajax 模块 import ajax…

服务端高并发分布式结构演进之路

目录 一、常见概念 1.1基本概念 二、架构演进 2.1单机架构 2.2应用数据分离架构 2.3应用服务集群架构 2.4读写分离 / 主从分离架构 2.5引入缓存 —— 冷热分离架构 2.6垂直分库 2.7业务拆分 —— 微服务 一、常见概念 1.1基本概念 应用(Application&am…

Grafana集成prometheus(1.Prometheus安装)

下载docker镜像 docker pull prom/prometheus docker pull prom/node-exporter启动 node-exporter 该程序用以采集机器内存等数据 启动脚本 docker run -d -p 9100:9100 prom/node-exporter ss -anptl | grep 9100启动截图 prometheus 启动脚本 # 3b907f5313b7 为镜像i…

C++数据结构之平衡二叉搜索树(一)——AVL的实现(zig-zag/左右双旋/3+4重构)

目录 00.BBST——平衡二叉搜索树01.AVL树02.AVL的插入2.1单旋——zig 与 zag2.2插入节点后的单旋实例2.3手玩小样例2.4双旋实例2.5小结 03.AVL的删除3.1单旋删除3.2双旋删除3.3小结 04.34重构05.综合评价AVL5.1优点5.2缺点 00.BBST——平衡二叉搜索树 本文是介绍众多平衡二叉搜…

上海亚商投顾:沪指震荡微涨 金融、地产午后大幅走强

上海亚商投顾前言:无惧大盘涨跌,解密龙虎榜资金,跟踪一线游资和机构资金动向,识别短期热点和强势个股。 市场情绪 三大指数早盘震荡,午后集体拉升反弹,创业板指涨超1%。券商等大金融板块午后再度走强&#…

【LNMP】LNMP

LNMP:是目前成熟的企业网站的应用模式之一,指的是一套协同工作的系统和相关软件;能够提供静态页面服务,也可以提供动态web服务 L Linux系统,操作系统N Nginx网站服务,前端,提供前端的静态…

抽象类的顶级理解

目录 1.抽象类的介绍 2. 抽象类语法 3.模板设计模式 1.抽象类的介绍 在面向对象的概念中,所有的对象都是通过类来描绘的,但是反过来,并不是所有的类都是用来描绘对象的,如果 一个类中没有包含足够的信息来描绘一个具体的对象&…

24. 两两交换链表中的节点

头结点dummyHead 定义结点temp用来暂存node2 让node1和node2位置互换:head(temp)->node1->node2->node3->node4 然后让temp等于交换后node1位置:head->node2->node1(temp)->node3->node4 class Solution { public:ListNode*…

学生信息管理系统springboot学校学籍专业数据java jsp源代码mysql

本项目为前几天收费帮学妹做的一个项目,Java EE JSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。 一、项目描述 学生信息管理系统springboot 系统3权限:超…

数据结构--线性表2-2

目录 一、线性表例题: 二、分配动态内存: 1.动态创建一个空顺序表的算法: 2.动态顺序表的插入算法: 3.动态顺序表的删除 三、线性表的链式表示和实现 例题1:创建链表并插入26个字母 例题2:在链表中取…

MGRE综合

实验 一、实验思路 1.先按照上图配置IP地址及环回 2.写缺省使公网可通 3.让R1、R4、R5每台路由器均成为中心站点形成全连网状结构拓扑 4.让R1成为中心站点R2R3为分支站点 5.分区域宣告ospf之后更改ospf在虚拟接口Tunnel工作方式为broadcast及让R1 当选DR 二、上虚拟机操作…

【已解决】vagrant up下载box速度太慢的解决方法

一、问题背景 本菜鸟在学习雷神(尚硅谷雷丰阳)的这个教程Java项目《谷粒商城》Java架构师 | 微服务 | 大型电商项目的时候,按照视频教程的步骤,正准备用Vagrant工具给VirtualBox安装并启动Centos7的Linux操作系统,当在Windows命令提示符窗体…