2023_Spark_实验三十二:消费Kafka数据并保存到MySQL中

实验目的:掌握Scala开发工具消费Kafka数据,并将结果保存到关系型数据库中

实验方法:消费Kafka数据保存到MySQL中

实验步骤:

一、创建Job_ClickData_Process

代码如下:

package exams

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.sql.{Connection, DriverManager, PreparedStatement}
import scala.collection.mutable

/**
 * @projectName sparkGNU2023  
 * @package exams  
 * @className exams.Job_ClickData_Process  
 * @description ${description}  
 * @author pblh123
 * @date 2023/12/20 15:42
 * @version 1.0
 *
 */
    
object Job_ClickData_Process {

  def main(args: Array[String]): Unit = {
    //  1. 创建spark,sc,sparkstreaming对象
    if (args.length != 3) {
      println("您需要输入三个参数")
      System.exit(5)
    }
    val musrl: String = args(0)
    val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster(musrl)
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ckeckpointdir: String = args(1)
    val ssc = new StreamingContext(sc, Seconds(5)) //连续流批次处理的大小

    //  2. 代码主体
//    设置ckeckpoint目录
    ssc.checkpoint(ckeckpointdir)

    //准备kafka的连接参数
    val kfkbst: String = args(2)
    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> kfkbst,
      "group.id" -> "SparkKafka",
      //latest表示如果记录了偏移量的位置,就从记录的位置开始消费,如果没有记录,就从最新/或最后的位置开始消费
      //earliest表示如果记录了偏移量的位置,就从记录的位置开始消费,如果没有记录,就从最开始/最早的位置开始消费
      //none示如果记录了偏移量的位置,就从记录的位置开始消费,如果没有记录,则报错
      "auto.offset.reset" -> "latest", //偏移量的重置位置
      "enable.auto.commit" -> (false: java.lang.Boolean), //是否自动提交偏移量
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer]
    )
    val topics: Array[String] = Array("RealDataTopic")

    //从mysql中查询出offsets:Map[TopicPartition, Long]
    val offsetsMap: mutable.Map[TopicPartition, Long] = OffsetUtils.getOffsetMap("SparkKafka", "RealDataTopic")
    val kafkaDS: InputDStream[ConsumerRecord[String, String]] = if (offsetsMap.size > 0) {
      println("MySql记录了offset信息,从offset处开始消费")
      //连接kafka的消息
      KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsetsMap)
      )
    } else {
      println("MySql没有记录了offset信息,从latest处开始消费")
      //连接kafka的消息
      KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
      )
    }

    //实时处理数据并手动维护offset
    val valueDS = kafkaDS.map(_.value()) //_表示从kafka中消费出来的每一条数据
    valueDS.print()
    kafkaDS.map(_.value())
    valueDS.foreachRDD(rdd => {
      rdd.foreachPartition(lines => {
        //将处理分析的结果存入mysql
        /*
        DROP TABLE IF EXISTS `job_real_time`;
        CREATE TABLE `job_real_time` (
        `datetime` varchar(8) DEFAULT NULL COMMENT '日期',
        `job_type` int(2) DEFAULT NULL COMMENT '1代表新招聘岗位,0代表找工作的人',
        `job_id` int(8) DEFAULT NULL COMMENT '岗位ID,匹配岗位名称',
        `count` int(8) DEFAULT NULL COMMENT '企业新增岗位数和找工作的人数'
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

        */
        //1.开启连接
        val conn: Connection = DriverManager.getConnection("jdbc:mysql://192.168.137.110:3306/bigdata19?characterEncoding=UTF-8&serverTimezone=UTC", "lh", "Lh123456!")
        //2.编写sql并获取ps
        val sql: String = "replace into job_real_time(datetime,job_type,job_id,count) values(?,?,?,?)"
        val ps: PreparedStatement = conn.prepareStatement(sql)
        //3.设置参数并执行
        for (line <- lines) {
          var item = line.split(" ")
          ps.setString(1, item(0).toString)
          ps.setInt(2, item(1).toInt)
          ps.setInt(3, item(2).toInt)
          ps.setInt(4, item(3).toInt)
          ps.executeUpdate()
        }
        //4.关闭资源
        ps.close()
        conn.close()
      })
    })

    //手动提交偏移量
    kafkaDS.foreachRDD(rdd => {
      if (rdd.count() > 0) {
        //获取偏移量
        val offsets: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        OffsetUtils.saveOffsets(groupId = "SparkKafka", offsets)
      }
    })

    //开启sparkstreaming任务并等待结束,关闭ssc,sc
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
    sc.stop()
  }

}

二、编写模拟点击量并消费Kafka数据

启动zookeeper集群

zk.sh start

启动kafka集群

kf.sh start

检查模拟的实时数据是否正常更新

不断正常更新的情况下,启动flume采集real-time-data.log的实时数据

启动flume

在mysql数据库中准备偏移表与实时数据表

启动Job_ClickData_Process方法消费kafka数据并保存到mysql中


 

检查mysql表是否存入数据

实验结果:通过scala开发spark代码实现消费kafka数据存储到MySQL中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/259703.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

阶段十-springsecurity总结

jwt认证流程 SpringSecurity 认证过程 第一步&#xff1a; 创建一个类实现UserDetailsService接口&#xff0c;重写其中的方法 通过重写 public UserDetails loadUserByUsername(String username) 方法 从数据库校验用户输入的用户名 配置SecurityConfig Bean注入 Passwor…

【C++题目速刷】二分查找

【C题目速刷】二分查找 一、二分查找1、题目链接2、解题3、代码 二、在排序数组中查找元素的第一个和最后一个位置1、题目链接2、解题3、代码4、算法模板 三、x的平方根1、解题链接2、解题3、代码 四、搜索插入位置1、题目链接2、解题3、代码 五、山脉数组的峰顶索引1、题目链接…

论文解读:On the Integration of Self-Attention and Convolution

自注意力机制与卷积结合&#xff1a;On the Integration of Self-Attention and Convolution(CVPR2022) 引言 1&#xff1a;卷积可以接受比较大的图片的&#xff0c;但自注意力机制如果图片特别大的话&#xff0c;运算规模会特别大&#xff0c;即上图中右边(卷积)会算得比较快…

虾皮电商申请:一站式开店指南

随着跨境电商的快速发展&#xff0c;越来越多的商家开始意识到东南亚市场的潜力。虾皮电商&#xff08;Shopee&#xff09;作为东南亚地区最大的电商平台之一&#xff0c;为商家提供了一个开拓市场的机会。本文将详细介绍如何在虾皮电商平台上开店&#xff0c;并给出一些建议来…

Kioptrix-1

信息收集 # nmap -sn 192.168.1.0/24 -oN live.nmap Starting Nmap 7.94 ( https://nmap.org ) at 2023-12-18 20:02 CST Nmap scan report for 192.168.1.1 (192.168.1.1) Host is up (0.00025s latency). MAC Address: 00:50:56:C0:00:08 (VMware) Nmap scan report for 0bc…

听GPT 讲Rust源代码--src/tools(17)

File: rust/src/tools/rust-analyzer/crates/profile/src/hprof.rs 在Rust源代码中&#xff0c;rust/src/tools/rust-analyzer/crates/profile/src/hprof.rs文件是rust-analyzer中的性能分析模块&#xff0c;用于代码运行时的性能统计和分析。下面将详细介绍每个结构体的作用&a…

基于DSP的IIR数字滤波器(论文+源码)

1.系统设计 在本次基于DSP的IIR数字低通滤波计中&#xff0c;拟以TMS320F28335来作为系统的主控制器&#xff0c;通过ADC0832模数转换芯片来对输入信号进行采集&#xff1b;通过TLC5615来将低通滤波后的信号进行输出&#xff1b;同时结合MATLAB仿真软件&#xff0c;对设计的II…

2023美团商家信息

2023美团商家电话、地址、经纬度、评分、均价、执照...

排序算法——快排

快速排序算法最早是由图灵奖获得者Tony Hoare设计出来的,他在形式化方法理论以 及ALGOL.60编程语言的发明中都有卓越的贡献,是20世纪最伟大的计算机科学家之—。 而这快速排序算法只是他众多贡献中的—个小发明而已。 快速排序&#xff08;Quick Sort&#xff09;的基本算法思…

SpringBoot3知识总结

SpringBoot3 1、简介 1. 前置知识 Java17Spring、SpringMVC、MyBatisMaven、IDEA 2. 环境要求 环境&工具版本&#xff08;or later&#xff09;SpringBoot3.0.5IDEA2022Java17Maven3.5 3. SpringBoot是什么 Spring Boot是Spring项目中的一个子工程&#xff0c;与我们…

四、Spring IoC实践和应用(基于XML配置方式组件管理)

本章概要 基于XML配置方式组件管理 实验一&#xff1a; 组件&#xff08;Bean&#xff09;信息声明配置&#xff08;IoC&#xff09;实验二&#xff1a; 组件&#xff08;Bean&#xff09;依赖注入配置&#xff08;DI&#xff09;实验三&#xff1a; IoC 容器创建和使用实验四…

Hive参数操作和运行方式

Hive参数操作和运行方式 1、Hive参数操作 1、hive参数介绍 ​ hive当中的参数、变量都是以命名空间开头的&#xff0c;详情如下表所示&#xff1a; 命名空间读写权限含义hiveconf可读写hive-site.xml当中的各配置变量例&#xff1a;hive --hiveconf hive.cli.print.headert…

基于SpringBoot+Vue的小区物业管理系统

基于SpringBootVue的小区物业管理系统的设计与实现~ 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringBootVue工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 主页 房屋类型 论坛 登录界面 管理员界面 员工界面 摘要 小区物业管理系统是一个…

CTF-PWN-堆-【use after free-2】

文章目录 fheap libc 2.23 64位检查maincreatedelete 思路覆盖目标函数的指针printf内部调用覆盖的函数前调用 printf时的栈实际去的函数的地方查找当前版本对应的libc_start_main和system计算出system的libc基地址exp fheap libc 2.23 64位 检查 main 多层while&#xff0c;…

Mybatis 缓存

Mybatis 缓存 Mybatis 中有一级缓存和二级缓存&#xff0c;默认情况下一级缓存是开启的&#xff0c;而且是不能关闭的。一级缓存是指 SqlSession 级别的缓存&#xff0c;当在同一个 SqlSession 中进行相同的 SQL 语句查询时&#xff0c;第二次以后的查询不会从数据库查询&…

【Spring】10 BeanFactoryAware 接口

文章目录 1. 简介2. 作用3. 使用3.1 创建并实现接口3.2 配置 Bean 信息3.3 创建启动类3.4 启动 4. 应用场景总结 Spring 框架为开发者提供了丰富的扩展点&#xff0c;其中之一就是 Bean 生命周期中的回调接口。本文将专注于介绍一个重要的接口 BeanFactoryAware&#xff0c;探…

FA2016ASA (MHz范围晶体单元,内置热敏电阻) 汽车

FA2016ASA是爱普生推出的一款内置热敏电阻、频率范围为38.4MHz的晶振&#xff0c;确保数据的准确传输&#xff0c;同时有效避免频谱干扰的出现。可以在-40C to 125C 的温度内稳定工作。在汽车内部空间有限的情况下&#xff0c;FA2016ASA以其小型超薄的外形尺寸2.0 1.6 0.68mm…

多级缓存:亿级流量的缓存方案

文章目录 一.多级缓存的引入二.JVM进程缓存三.Lua语法入门四.多级缓存1.OpenResty2.查询Tomcat3.Redis缓存预热4.查询Redis缓存5.Nginx本地缓存6.缓存同步 一.多级缓存的引入 传统缓存的问题 传统的缓存策略一般是请求到达Tomcat后&#xff0c;先查询Redis&#xff0c;如果未…

buuctf-Misc 题目解答分解91-93

91.[SUCTF2018]followme 下载完就是一个流量包 &#xff0c;用wireshark 打开 直接导出 http对象 这里面 有很多的这样的文件 里面都是参数 直接搜索 关键字 suctf grep -r "SUCTF" 得到flag SUCTF{password_is_not_weak} 92.[MRCTF2020]CyberPunk 用notepad 打开…

论文笔记:Accurate Localization using LTE Signaling Data

1 intro 论文提出LTELoc&#xff0c;仅使用信令数据实现精准定位 信令数据已经包含在已在LTE系统中&#xff0c;因此这种方法几乎不需要数据获取成本仅使用TA&#xff08;时序提前&#xff09;和RSRP【这里单位是瓦】&#xff08;参考信号接收功率&#xff09; TA值对应于信号…