使用Apache Spark从MySQL到Kafka再到HDFS的数据转移

使用Apache Spark从MySQL到Kafka再到HDFS的数据转移

在本文中,将介绍如何构建一个实时数据pipeline,从MySQL数据库读取数据,通过Kafka传输数据,最终将数据存储到HDFS中。我们将使用Apache Spark的结构化流处理和流处理功能,以及Kafka和HDFS作为我们的数据传输和存储工具。
1、环境设置:
首先,确保在您的环境中正确安装并配置了mysql、Kafka和HDFS。同时需要在idea中构建依赖配置的pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>spark_project</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.version>2.12.12</scala.version>
        <spark.version>3.2.0</spark.version>
        <kafka.version>2.8.1</kafka.version>
    </properties>

    <dependencies>
        <!-- Spark dependencies -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.76</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- Kafka dependencies -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.28</version>
        </dependency>

        <!-- Scala library -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>   
    </dependencies>
</project>

mysql中表结构:
在这里插入图片描述

2、从MySQL读取数据到Kafka:
我们将使用Spark的结构化流处理功能从MySQL数据库中读取数据,并将其转换为JSON格式,然后将数据写入到Kafka主题中。以下是相应的Scala代码:

package org.example.mysql2kafka2hdfs

import org.apache.spark.sql.SparkSession

import java.util.Properties

object Mysql2Kafka {

  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("MySQLToKafka")
      .master("local[*]")
      .getOrCreate()

    // 设置 MySQL 连接属性
    val mysqlProps = new Properties()
    mysqlProps.setProperty("user", "root")
    mysqlProps.setProperty("password", "12345678")
    mysqlProps.setProperty("driver", "com.mysql.jdbc.Driver")

    // 从 MySQL 数据库中读取数据
    val jdbcDF = spark.read.jdbc("jdbc:mysql://localhost:3306/mydb", "comment", mysqlProps)

    // 将 DataFrame 转换为 JSON 字符串
    val jsonDF = jdbcDF.selectExpr("to_json(struct(*)) AS value")


    // 将数据写入 Kafka
    jsonDF.show()
    jsonDF
      .write
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "comment")
      .save()

    // 停止 SparkSession
    spark.stop()
  }

}

以上代码首先创建了一个SparkSession,然后设置了连接MySQL所需的属性。接着,它使用jdbc.read从MySQL数据库中读取数据,并将数据转换为JSON格式,最后将数据写入到名为"comment"的Kafka主题中。提示:topic主题会被自动创建。

从Kafka消费数据并写入HDFS:
接下来,我们将设置Spark Streaming来消费Kafka中的数据,并将数据保存到HDFS中。以下是相应的Scala代码:

package org.example.mysql2kafka2hdfs

import com.alibaba.fastjson.JSON
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

case class Comment(author_name:String,
                   fans:String,
                   comment_text:String,
                   comment_time:String,
                   location:String,
                   user_gender:String)

object kafka2Hdfs {
  def main(args: Array[String]): Unit = {
    // 设置 SparkConf
    val sparkConf = new SparkConf()
      .setAppName("KafkaToHDFS")
      .setMaster("local[*]")

    // 创建 StreamingContext,每秒处理一次
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // 设置 Kafka 相关参数
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092", // Kafka broker 地址
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "spark-consumer-group", // Spark 消费者组
      "auto.offset.reset" -> "earliest", // 从最新的偏移量开始消费
      "enable.auto.commit" -> (false: java.lang.Boolean) // 不自动提交偏移量
    )

    // 设置要订阅的 Kafka 主题
    val topics = Array("comment")

    // 创建 Kafka Direct Stream
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )


    // 从 Kafka 中读取消息,然后将其写入 HDFS
    stream.map({rdd=>
      val comment = JSON.parseObject(rdd.toString(), classOf[Comment])
      comment.author_name+","+comment.comment_text+","+comment.comment_time+","+comment.fans+","+comment.location+","+comment.user_gender
    }).foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        println(rdd)
        rdd.saveAsTextFile("hdfs://hadoop101:8020/tmp/")
      }
    }

    // 启动 Spark Streaming
    ssc.start()
    ssc.awaitTermination()
  }

}

以上代码设置了Spark Streaming来消费Kafka中的数据。它将JSON格式的数据解析为Comment类对象,并将其保存为逗号分隔的文本文件,最终存储在HDFS的/tmp目录中。
在这里插入图片描述

结论:
通过本文的介绍和示例代码,您现在应该了解如何使用Apache Spark构建一个实时数据流水线,从MySQL数据库读取数据,通过Kafka传输数据,最终将数据保存到HDFS中。这个流水线可以应用于各种实时数据处理和分析场景中。

**如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。 hadoop hdfs yarn spark Django flask flink kafka flume datax sqoop seatunnel echart可视化 机器学习等 **
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/623150.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Linux】调试器-gdb使用

大家好&#xff0c;我是苏貝&#xff0c;本篇博客带大家了解Linux的编译器-gcc/g&#xff0c;如果你觉得我写的还不错的话&#xff0c;可以给我一个赞&#x1f44d;吗&#xff0c;感谢❤️ 目录 1. 背景(A) 看大小(B) 查看ELF格式的文件 2.使用(A) 进入gdb(B) quit/q&#xff…

flink优化案例

文章目录 一、flink join维表案例二、flink 双流join案例三、总结 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考(适用于flink1.13) 一、flink join维表案例 背景:flink sql join 维表。job业务不复杂&#xff0c;job写入性能比较差。维表数据大约每天…

想半天憋不出几个字?试试AI扩写

大家在写文章时是否也经常这样&#xff1f;想了半天&#xff0c;结果只能写出几个字&#xff0c;但是要求往往又是几百多个字&#xff0c;那么有没有啥工具可以帮我们在原文的基础上扩写一下文章字数&#xff0c;让我们达到字数要求呢&#xff1f; 下面给大家介绍一下如何扩写文…

Microsoft Office for Mac 2024 (Office 365) 16.84 Universal 预览版

Microsoft Office for Mac 2024 (Office 365) 16.84 Universal 预览版 Office LTSC 2024 for Mac 请访问原文链接&#xff1a;Microsoft Office for Mac 2024 (Office 365) 16.84 Universal 预览版&#xff0c;查看最新版。原创作品&#xff0c;转载请保留出处。 作者主页&a…

FullCalendar日历组件集成实战(2)

背景 有一些应用系统或应用功能&#xff0c;如日程管理、任务管理需要使用到日历组件。虽然Element Plus也提供了日历组件&#xff0c;但功能比较简单&#xff0c;用来做数据展现勉强可用。但如果需要进行复杂的数据展示&#xff0c;以及互动操作如通过点击添加事件&#xff0…

Ubuntu安装cmake

在软件开发的世界中&#xff0c;构建系统扮演着至关重要的角色&#xff0c;它们确保代码能够正确、高效地编译和链接。CMake就是这样一个强大的跨平台自动化构建系统&#xff0c;它被广泛用于各种大型项目中。对于Ubuntu用户来说&#xff0c;安装CMake非常简单&#xff0c;而且…

kubeadm 在vubuntu22.04.4 server 上安装kubernetes 1.28.9

一、基础安装&#xff08;所有节点执行&#xff09;---------------------------------------- 时间同步 关闭防火墙 sudo ufw disable sudo ufw status关闭交换内存 临时关闭 sudo swapoff -a free -m永久关闭 sudo vim /etc/fstab 注释掉交换内存 转发 IPv4 并让 iptab…

鸿蒙ArkUI开发:常用布局【交叉轴】

交叉轴 垂直于主轴方向的轴线。Row容器交叉轴为纵向&#xff0c;Column容器交叉轴为横向。通过alignItems属性设置子元素在交叉轴&#xff08;排列方向的垂直方向&#xff09;上的对齐方式alignSelf属性用于控制单个子元素在容器交叉轴上的对齐方式&#xff0c;其优先级高于al…

4.分支与循环

逻辑控制分为三部分&#xff1a; 1.顺序结构---》顺序执行代码 2.分支结构---》if语句和switch语句 3.循环执行---》for语句 while语句 和do while语句 顺序结构比较简单&#xff0c;按照代码书写的顺序一行一行执行 分支结构&#xff08;if、switch语句&#xff09; 也就是…

深度学习之神经网络理论基础

深度学习之神经网络理论基础 人工神经元 人工神经元&#xff1a;人类神经元中抽象出来的数学模型 MP模型 mp模型&#xff1a;1943年心理学家W.S.McCulloch和数理逻辑学家W.Pitts研究出人工神经元&#xff0c;称为M-P模型。 M-P神经元&#xff08;一个用来模拟生物行为的数学模…

Chromium 调试指南2024 Windows11篇-Visual Studio 2022启用子进程调试插件(六)

1. 前言 在Chromium项目的开发过程中&#xff0c;构建、运行和调试是至关重要的步骤。本文将介绍如何使用Visual Studio 2022打开Chromium项目、启用子进程调试插件、以及编译Chromium项目的流程。通过这些步骤&#xff0c;我们将能够更加顺利地进行Chromium项目的开发和调试工…

深度解析Nginx:高性能Web服务器的奥秘(下)

&#x1f407;明明跟你说过&#xff1a;个人主页 &#x1f3c5;个人专栏&#xff1a;《洞察之眼&#xff1a;ELK监控与可视化》&#x1f3c5; &#x1f516;行路有良友&#xff0c;便是天堂&#x1f516; 目录 一、前言 1、Nginx概述 二、Nginx核心功能 1、URL重写与重…

初识FlaskMySQL实现前后端通信 全栈开发之路——后端篇(1)

全栈开发一条龙——前端篇 第一篇&#xff1a;框架确定、ide设置与项目创建 第二篇&#xff1a;介绍项目文件意义、组件结构与导入以及setup的引入。 第三篇&#xff1a;setup语法&#xff0c;设置响应式数据。 第四篇&#xff1a;数据绑定、计算属性和watch监视 第五篇 : 组件…

拥有一台服务器可以做哪些有趣又实用的事情?

在接触云服务器这个概念你以前&#xff0c;你是不是在想&#xff1a; 可能是&#xff0c;云服务器&#xff0c;这个产品的存在&#xff0c;它可以为你做些什么实用的事情吗&#xff1f; 或者是&#xff0c;云服务器这个看似高大上的科技产品&#xff0c;其实可以为我们的生活…

AI智能体|手把手教你申请一个Kimi(Moonshot)的API KEY

大家好&#xff0c;我是无界生长。 今天分享一下如何申请一个Kimi(Moonshot)的API KEY&#xff0c;为后面Kimi(Moonshot)接入微信机器人做铺垫。学会了的话&#xff0c;欢迎分享转发&#xff01; 前提 拥有一个Kimi(Moonshot)账号 使用手机号注册即可&#xff0c;新用户可免费…

文章解读与仿真程序复现思路——中国电机工程学报EI\CSCD\北大核心《考虑协变量因素的多能微电网两阶段分布鲁棒优化调度》

本专栏栏目提供文章与程序复现思路&#xff0c;具体已有的论文与论文源程序可翻阅本博主免费的专栏栏目《论文与完整程序》 论文与完整源程序_电网论文源程序的博客-CSDN博客https://blog.csdn.net/liang674027206/category_12531414.html 电网论文源程序-CSDN博客电网论文源…

ARM架构安全特性之标准安全 API

安全之安全(security)博客目录导读 目录 一、机密计算软件 二、Arm机密计算固件架构 三、认证校验 四、Veraison项目 五、独立于语言的安全API 六、平台抽象安全项目&#xff08;Platform AbstRaction for SECurity project&#xff09; 七、可移植平台安全API 八、…

黄仁勋炉边对话:创业的超能力与英伟达的加速计算之旅

在TiECon 2024大会上&#xff0c;英伟达的创始人兼CEO黄仁勋与风投公司Mayfield的管理合伙人纳文查德哈进行了一场深入的炉边对话。黄仁勋不仅分享了英伟达的创业故事&#xff0c;还谈到了他对创业和加速计算的深刻见解。下面是我对这次对话的总结&#xff0c;希望能给正在创业…

10.nginx模板(开启监控取值页面)

nginx模板(开启监控取值页面) 1.开启监控页面 vim nginx.conflocation /nginx_status {stub_status;} systemctl restart nginx.server网页展示 导入模板&#xff0c;nginx监控模板zbx_nginx_template.xml <?xml version"1.0" encoding"UTF-8"?…

高级查询(子查询)

可以使用的范围&#xff1a; 子查询是一个嵌套在SELECT、INSERT、UPDATE、DELETE语句或其他子查询中的查询。 任何允许使用表达式的地方都可以使用子查询。 子查询也称为内部查询或内部选择&#xff0c;而包含子查询的语句也称为外部查询或外部选择 子查询的特点和优势 可以…