利用Spark将Kafka数据流写入HDFS

利用Spark将Kafka数据流写入HDFS

在当今的大数据时代,实时数据处理和分析变得越来越重要。Apache Kafka作为一个分布式流处理平台,已经成为处理实时数据的事实标准。而Apache Spark则是一个强大的大数据处理框架,它提供了对数据进行复杂处理的能力。
本篇博客将介绍如何使用Spark来读取Kafka中的数据流,并将这些数据以CSV格式写入到HDFS中。
环境准备
在开始之前,确保你的开发环境中已经安装了以下软件:

Apache Kafka

#启动zookeeper
zkServer start
#启动kafka服务
kafka-server-start /opt/homebrew/etc/kafka/server.properties

Apache Spark

<properties>
      <scala.version>2.12.17</scala.version>
      <spark.version>3.0.0</spark.version>
 <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <!-- Kafka Streaming dependency -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>

Hadoop HDFS

#启动hdfs
start-dfs.sh

Java开发环境
此外,你需要在项目中包含Spark和Kafka的依赖库。

代码实现
首先,我们定义一个Scala case class Job 来表示从Kafka读取的每条记录的数据结构。

case class Job(
  Position: String,
  Company: String,
  Salary: String,
  Location: String,
  Experience: String,
  Education: String,
  Detail: String
)

接下来,我们编写一个Kafka2Hdfs对象,并在其中实现main方法。这个方法将创建一个SparkSession,配置Kafka读取选项,并从Kafka中读取数据流。

object Kafka2Hdfs {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("Kafka2Hdfs")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    val kafkaOptions = Map[String, String](
      "kafka.bootstrap.servers" -> "127.0.0.1:9092",
      "subscribe" -> "flume",
      "startingOffsets" -> "earliest"
    )

    val stream = spark.readStream
      .format("kafka")
      .options(kafkaOptions)
      .load()

我们使用subscribe选项指定Kafka中的topic名称,这里我们使用的是flume。startingOffsets选项设置为earliest,意味着我们从最早的记录开始读取数据。

接下来,我们将Kafka中的数据转换成DataFrame。我们首先将每条记录的value字段转换为字符串,然后使用map函数将每条记录解析为Job对象。

val jobDs = stream.selectExpr("CAST(value AS STRING)")
  .as[String]
  .map(line => {
    val fields = line.split(",")
    Job(
      Position = fields(0),
      Company = fields(1).trim,
      Salary = fields(2).trim,
      Location = fields(3).trim,
      Experience = fields(4).trim,
      Education = fields(5).trim,
      Detail = fields(6).trim
    )
  }).toDF()

现在,我们已经有了一个包含Job对象的DataFrame。接下来,我们将这个DataFrame以CSV格式写入到HDFS中。我们使用writeStream方法,并设置format为csv,同时指定输出路径和检查点位置。

val query: StreamingQuery = jobDs.writeStream
  .format("csv")
  .option("header", "false")
  .option("path", "/")
  .option("checkpointLocation", "/ck")
  .start()

注意,我们在这里将header选项设置为false,因为我们不打算在CSV文件中包含列名。path选项指定了输出文件的存储路径,而checkpointLocation选项指定了检查点的存储路径,这对于流处理的可靠性非常重要。

最后,我们调用awaitTermination方法来等待流处理的结束。在实际的生产环境中,你可能希望将这个流处理任务部署到一个集群上,并让它持续运行。

query.awaitTermination()

总结
在这篇博客中,我们介绍了如何使用Spark读取Kafka中的数据流,并将这些数据以CSV格式写入到HDFS中。这种方法可以用于各种实时数据处理场景,例如日志分析、事件监控等。通过这种方式,我们可以将实时数据转换为静态数据,以便进行更深入的分析和处理。

完整代码:

package com.lhy.sparkkafka2hdfs

import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Row, SparkSession}



case class Job(Position:String,Company:String,Salary:String,Location:String,Experience:String,Education:String,Detail:String)
object Kafka2Hdfs{
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("Kafka2Hdfs")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    val kafkaOptions = Map[String, String](
      "kafka.bootstrap.servers" -> "127.0.0.1:9092",
      "subscribe" -> "flume",
      "startingOffsets" -> "earliest"
    )

    val stream = spark.readStream
      .format("kafka")
      .options(kafkaOptions)
      .load()


    val jobDs = stream.selectExpr("CAST(value AS STRING)")
      .as[String]
      .map(line => {
        val fields = line.split(",")
        Job(
          Position = fields(0),
          Company = fields(1).trim,
          Salary = fields(2).trim,
          Location = fields(3).trim,
          Experience = fields(4).trim,
          Education = fields(5).trim,
          Detail = fields(6).trim
        )
      }).toDF()
//    val query = jobDs.writeStream.format("console").start()

    val query: StreamingQuery = jobDs.writeStream
      .format("csv")
      .option("header", "false")
      .option("path", "/")
      .option("checkpointLocation", "/ck")
      .start()

    query.awaitTermination()

  }

在这里插入图片描述
如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/516531.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

可行性研究报告模板(套用)

1业务需求可行性分析 2技术可行性分析 2.1规范化原则 2.2高度的兼容性和可移植性 2.3人性化、适用性 2.4标准化统一设计原则 2.5先进安全可扩展性原则 3开发周期可行性分析 4人力资源可行性分析 5成本分析 6收益分析 7结论 所有资料获取进主页或本文末个人名片直接…

【OSTEP】并发:线程与多线程

" A flow of control within a process that consists of a PC, a register set and a stack space" 本章将介绍为单个运行进程提供的新抽象 —— 线程 (thread) 线程是 调度的一个基本单位&#xff08;basic unit of CPU scheduling&#xff09;一个单独的线程至…

RUST语言函数的定义与调用

1.定义函数 定义一个RUST函数使用fn关键字 函数定义语法: fn 函数名(参数名:参数类型,参数名:参数类型) -> 返回类型 { //函数体 } 定义一个没有参数,没有返回类型的参数 fn add() {println!("调用了add函数!"); } 定义有一个参数的函数 fn add(a:u32)…

力扣热题100_链表_21_合并两个有序链表

文章目录 题目链接解题思路解题代码 题目链接 21. 合并两个有序链表 将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 示例 1&#xff1a; 输入&#xff1a;l1 [1,2,4], l2 [1,3,4] 输出&#xff1a;[1,1,2,3,4,4] 示例…

InternLM2-lesson2作业

书生浦语大模型趣味 Demo 视频连接&#xff1a;https://www.bilibili.com/video/BV1AH4y1H78d/?vd_source902e3124d4683c41b103f1d1322401fa 目录 书生浦语大模型趣味 Demo一、基础作业二、进阶作业 一、基础作业 第一次执行&#xff1a; 第二次执行&#xff1a; 第一次执…

uni-app 实现仿微信界面【我的+首页聊天列表+长按菜单功能+添加菜单功能】+ 附源码

目录 【微信首页聊天列表】界面 【我的】界面 源代码&#xff1a; 文后附完整代码&#xff0c;支持一键导入 HBuilderX 示例体验 【微信首页聊天列表】界面 仿造【微信首页聊天列表 长按菜单功能 右上角添加按钮弹窗功能】&#xff0c;使用 uni-app 开发&#xff0c; 一…

蓝桥杯真题:路径

import java.util.Scanner; // 1:无需package // 2: 类名必须Main, 不可修改public class Main {public static void main(String[] args) {int n 2022; //从下标为1开始&#xff0c;方便计算int[] q new int[n]; //存储最短路q[1] 0; //起始条件for (int i 2; i < 202…

黑马java-JavaWeb-Maven

1.Maven是专门用于管理和构建java项目的工具&#xff0c;它的主要功能有&#xff1a; 提供了一套标准化的项目结构提供了一套标准化的构建流程提供了一套依赖管理机制&#xff08;管理项目所依赖的第三方资源&#xff09; 2.Maven仓库 本地仓库&#xff1a;自己计算机上的一个目…

docker环境中宿主机防火墙添加ssh无法生效的问题分析

背景 在部署了docker容器的环境中&#xff0c;要在防火墙开通22端口&#xff0c;即ssh服务&#xff0c;以便在终端可以正常登陆。使用firewall-cmd在docker区域添加了22端口&#xff0c;但是没有起作用。后再public区域添加22端口才起作用。为什么docker区域不起作用&#xff…

单位档案室用综合档案管理系统还是馆藏档案管理系统

单位档案室应该使用综合档案管理系统。 综合档案管理系统是一种全面管理各类档案的系统&#xff0c;可以对文书档案、电子档案、纸质档案等进行统一管理和检索。综合档案管理系统可以满足单位档案室的多种需求&#xff0c;包括文书档案的归档、借阅、追溯等功能&#xff0c;同时…

进阶线段树之乘法线段树

1.乘法线段树 顾名思义&#xff0c;就是其中的区间修改为乘法&#xff0c;但是呢&#xff0c;如果只是一个乘法&#xff0c;把之前的加号变成*号&#xff0c;然后开long long即可&#xff08;因为乘法的数据超大&#xff0c;如果不在中间mod点儿东西还能直接超出64位&#xff…

一分钟了解MOS管基础知识

场效应管&#xff08;Field-Effect Transistor&#xff0c;简称FET&#xff09;是电子技术中广泛使用的一种半导体器件&#xff0c;具有高输入阻抗、噪声低和低功耗等优点。 简介 场效应管是一种电压控制器件&#xff0c;其工作原理是通过改变栅极&#xff08;Gate&#xff09;…

【前端面试3+1】11 http和https有何不同及https的加密过程、数组有哪些方法及作用、tcp三次握手四次挥手、【分发饼干】

一、http和https有何不同&#xff1f;https的加密过程 1、不同&#xff1a; HTTP和HTTPS的主要区别在于安全性。HTTP是超文本传输协议&#xff0c;是一种用于传输数据的协议&#xff0c;但是传输的数据是明文的&#xff0c;容易被窃听和篡改。而HTTPS是在HTTP基础上加入了SSL/T…

LeetCode.1379. 找出克隆二叉树中的相同节点

题目 1379. 找出克隆二叉树中的相同节点 分析 这道题目其实利用的是递归的思想&#xff0c;同时遍历两棵树即可。具体流程&#xff08;下面所讲解的流程基于的前提一定是两棵树一起遍历哦&#xff09;&#xff1a; 如果 original 为空节点&#xff0c;直接返回 null&#…

Python 爬虫基础——http请求和http响应

写本篇文章&#xff0c;我认为是能把自己所理解的内容分享出来&#xff0c;说不定就有和我一样有这样思维的共同者&#xff0c;希望本篇文章能帮助大家&#xff01;✨✨ 文章目录 一、 &#x1f308;python介绍和分析二、 &#x1f308;http请求三、 &#x1f308;http响应四、…

初识MySQL(中篇)

使用语言 MySQL 使用工具 Navicat Premium 16 代码能力快速提升小方法&#xff0c;看完代码自己敲一遍&#xff0c;十分有用 目录 1.SQL语言 1.1 SQL语言组成部分 2.MySQL数据类型 2.1 数值类型 2.2 字符串类型 2.3 日期类型 3.创建数据表 3.1 创建数据表方法1 …

00-JAVA基础-注解及反射解析注解

注解 什么是注解 Java 注解&#xff08;Annotation&#xff09;是 JDK 5.0 引入的一种元素&#xff0c;用于为 Java 代码提供元数据。元数据是关于数据的数据&#xff0c;它为代码提供附加信息&#xff0c;而这些信息并不直接参与到程序的逻辑中&#xff0c;但可以被编译器或…

如何根据黄金行情进行交易操作?

根据黄金行情进行交易操作是许多投资者关注的重要议题&#xff0c;黄金作为一种重要的避险资产和投资工具&#xff0c;其价格波动受多种因素影响&#xff0c;包括经济数据、地缘政治风险、货币政策等。为了有效地进行黄金交易操作&#xff0c;投资者需要综合考虑多方面因素&…

ST表---算法

相当于二分的思想&#xff0c;一直比较最值 ST的创建 现在创建成功&#xff0c;是应该如何查询的问题 ST表的查询 虽然这两区间有重叠&#xff0c;但是可以一个往前数&#xff0c;一个往后数&#xff0c;互不影响 时间复杂度 创建st表的复杂度为n*logn 使用时的复杂度为O(…

ROS 2边学边练(12)-- 创建一个工作空间

上一篇我们已经接触过工作空间的概念&#xff0c;并简单了解体验了一点构建包、测试包的流程&#xff0c;此篇会深入一点学习工作空间相关内容。 前言 一个工作空间是包含了ROS 2的功能包的目录&#xff08;文件夹&#xff09;&#xff0c;在使用ROS 2之前我们得激活一下目标工…