【大数据测试spark+kafka-详细教程(附带实例)】

大数据测试:Spark + Kafka 实时数据处理与窗口计算教程

  • 1. 概述
    • 1.1 大数据技术概述
    • 1.2 Apache Kafka 与 Spark 的结合
  • 2. 技术原理与流程
    • 2.1 Kafka 简介
    • 2.2 Spark Streaming 简介
    • 2.3 数据流动与处理流程
  • 3. 环境配置
    • 3.1 安装依赖项
  • 4. 实例:实时数据处理与窗口计算
    • 4.1 Kafka 生产者代码
    • 4.2 Spark Streaming 消费者代码
    • 4.3 解释与操作
  • 5. 运行与测试
    • 5.1 创建 Kafka Topic
    • 5.2 启动 Kafka 生产者
    • 5.3 启动 Spark Streaming 程序
    • 5.4 输出结果
  • 6. 总结

1. 概述

1.1 大数据技术概述

大数据(Big Data)指的是无法用传统数据库技术和工具进行处理和分析的超大规模数据集合。在大数据技术中,实时数据流的处理尤为重要,尤其是如何高效地对海量的实时数据进行采集、存储、处理与分析。

在这方面,Apache KafkaApache Spark 是两个关键技术。Kafka 作为分布式流处理平台,可以高效地进行实时数据流的生产和消费,而 Spark 提供了强大的分布式计算能力,尤其是其扩展的流式计算模块 Spark Streaming,非常适合处理实时数据流。

1.2 Apache Kafka 与 Spark 的结合

  • Kafka 是一个分布式消息队列,可以处理高吞吐量、低延迟的实时数据流。Kafka 被广泛用于日志收集、监控系统、实时数据传输等场景。
  • Spark 是一个统一的分析引擎,支持批量处理、流式处理和图计算。Spark Streaming 是 Spark 的一个流式处理组件,用于实时处理流数据。

通过结合 Kafka 和 Spark,我们可以实现大规模数据的实时处理、聚合和窗口计算。Spark 可以从 Kafka 消费数据流,并进行实时计算与分析,适用于诸如实时日志分析、用户行为分析、实时推荐等场景。


2. 技术原理与流程

2.1 Kafka 简介

Kafka 是一个分布式的消息队列系统,能够实现高吞吐量、可扩展性、容错性。它的基本组成包括:

  • Producer(生产者):负责向 Kafka 发送数据。
  • Consumer(消费者):从 Kafka 中消费数据。
  • Broker(代理):Kafka 的节点,每个节点负责存储消息。
  • Topic(主题):消息被组织在 Topic 中,生产者向 Topic 发送数据,消费者从 Topic 中读取数据。
  • Partition(分区):Kafka 支持水平分区,使得数据可以分布在多个 Broker 上。

2.2 Spark Streaming 简介

Spark Streaming 是 Spark 的流处理模块,它以 DStream(离散流)为基本数据结构,能够实时地处理数据流。DStream 是一个连续的 RDD(弹性分布式数据集),Spark Streaming 将实时流数据划分成一个个小的批次,使用批处理模型对这些小批次进行处理。

2.3 数据流动与处理流程

  1. Kafka Producer:将数据发送到 Kafka Topic。
  2. Kafka Broker:Kafka 集群负责存储和转发数据。
  3. Spark Streaming:通过 Kafka 的消费者接口从 Topic 中消费数据。
  4. 数据处理与计算:在 Spark Streaming 中进行数据聚合、过滤、窗口计算等操作。
  5. 输出结果:将处理后的数据输出到外部系统,如 HDFS、数据库或控制台。

3. 环境配置

3.1 安装依赖项

  1. 安装 Java:确保安装了 Java 8 或更高版本。

    检查版本:

    java -version
    
  2. 安装 Apache Spark:从 Apache Spark 官网 下载并安装 Spark。

  3. 安装 Apache Kafka:从 Kafka 官网 下载并安装 Kafka。

  4. Maven 配置:在 Java 项目中使用 Maven 作为构建工具,添加必要的 Spark 和 Kafka 依赖。

pom.xml 文件中添加 Spark 和 Kafka 的 Maven 依赖:

<dependencies>
    <!-- Spark Core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.3.1</version>
    </dependency>

    <!-- Spark Streaming -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.3.1</version>
    </dependency>

    <!-- Spark Streaming Kafka -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
        <version>3.3.1</version>
    </dependency>

    <!-- Kafka Consumer -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.0.0</version>
    </dependency>
</dependencies>

4. 实例:实时数据处理与窗口计算

4.1 Kafka 生产者代码

以下是一个简单的 Kafka 生产者,用于生成模拟的用户行为日志(如点击事件)并发送到 Kafka Topic logs

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 模拟用户点击日志数据
        String[] actions = {"click", "view", "scroll"};
        String[] users = {"user1", "user2", "user3"};
        
        // 向 Kafka 发送模拟数据
        for (int i = 0; i < 100; i++) {
            String user = users[i % 3];
            String action = actions[i % 3];
            String timestamp = String.valueOf(System.currentTimeMillis() / 1000);
            String value = user + "," + action + "," + timestamp;
            producer.send(new ProducerRecord<>("logs", null, value));
            try {
                Thread.sleep(1000); // 每秒发送一条数据
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        producer.close();
    }
}

4.2 Spark Streaming 消费者代码

以下是一个 Spark Streaming 程序,它从 Kafka Topic logs 中消费数据并进行窗口计算,统计每个用户在过去 10 秒内的点击次数。

import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.HashMap;
import java.util.Map;
import java.util.Arrays;
import java.util.List;

public class SparkKafkaWindowExample {

    public static void main(String[] args) throws InterruptedException {
        // 初始化 Spark StreamingContext
        JavaStreamingContext jssc = new JavaStreamingContext("local[2]", "SparkKafkaWindowExample", new Duration(2000));

        // Kafka 配置参数
        String brokers = "localhost:9092";
        String groupId = "spark-consumer-group";
        String topic = "logs";

        // Kafka 参数设置
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", brokers);
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", groupId);
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", "false");

        List<String> topics = Arrays.asList(topic);

        // 从 Kafka 获取数据流
        JavaReceiverInputDStream<ConsumerRecord<String, String>> stream =
                KafkaUtils.createDirectStream(
                        jssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.Subscribe(topics, kafkaParams)
                );

        // 处理每条记录:解析用户、动作和时间戳
        JavaPairRDD<String, String> userActions = stream
                .mapToPair(record -> {
                    String[] fields = record.value().split(",");
                    return new Tuple2<>(fields[0], fields[1]); // userId, action
                });

        // 定义窗口大小为 10 秒,滑动间隔为 5 秒
        JavaPairRDD<String, Integer> userClickCounts = userActions
                .window(new Duration(10000), new Duration(5000)) // 滑动窗口
                .reduceByKeyAndWindow(
                        (Function2<Integer, Integer, Integer>) Integer::sum,
                        new Duration(10000), // 窗口大小:10秒
                        new Duration(5000)   // 滑动间隔5);

        // 输出每个窗口的用户点击次数
        userClickCounts.foreachRDD(rdd -> {
            rdd.collect().forEach(record -> {
                System.out.println("User: " + record._1() + ", Click Count: " + record._2());
            });
        });

        // 启动流式处理
        jssc.start();
        jssc.awaitTermination();
    }
}

4.3 解释与操作

  • Kafka 配置:配置 Kafka 参数,连接到 Kafka 服务,订阅 Topic logs
  • 数据解析:从 Kafka 消费数据后,解析每条日志(如 user1,click,1609459200)。
  • 窗口计算:使用 window() 定义一个窗口,窗口大小为 10 秒,滑动间隔为 5 秒。使用 reduceByKeyAndWindow() 聚合每个窗口内的用户点击次数。
  • 输出结果:每 5 秒统计一次过去 10 秒内的用户点击次数,输出到控制台。

5. 运行与测试

5.1 创建 Kafka Topic

在 Kafka 中创建 Topic logs

kafka-topics.sh --create --topic logs --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

5.2 启动 Kafka 生产者

运行 Kafka 生产者代码,模拟数据发送到 Kafka:

java KafkaProducerExample

5.3 启动 Spark Streaming 程序

运行 Spark Streaming 程序,消费 Kafka 数据并执行窗口计算:

java SparkKafkaWindowExample

5.4 输出结果

每隔 5 秒输出用户的点击次数,如:

User: user1, Click Count: 3
User: user2, Click Count: 5

6. 总结

通过结合使用 Apache KafkaApache Spark,我们可以高效地处理大规模的实时数据流。Kafka 负责消息的可靠传输,而 Spark Streaming 负责实时计算和分析。使用窗口计算(如 window()reduceByKeyAndWindow()),我们可以在不同时间段内对数据进行聚合,适用于实时监控、推荐系统、用户行为分析等场景。

此架构适用于需要处理大数据、实时响应的应用程序,并能满足高吞吐量、低延迟的要求。


推荐阅读:《大数据 ETL + Flume 数据清洗》,《大数据测试 Elasticsearch》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/914843.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

005_ipc概述及信号量

【信号量】 在Linux系统中&#xff0c;信号量主要用于进程间的同步。Linux提供了两种类型的信号量&#xff1a;POSIX信号量和System V信号量&#xff0c;信号量&#xff08;Semaphore&#xff09;是一种同步机制&#xff0c;用于多线程或进程间的同步和互斥。信号量可以控制对…

SSA-CNN-LSTM-MATT多特征分类预测

项目源码获取方式见文章末尾&#xff01; 600多个深度学习项目资料&#xff0c;快来加入社群一起学习吧。 《------往期经典推荐------》 项目名称 1.【基于CNN-RNN的影像报告生成】 2.【卫星图像道路检测DeepLabV3Plus模型】 3.【GAN模型实现二次元头像生成】 4.【CNN模型实现…

有手就会:java 环境变量配置 - 包含windows、macos、linux和vscode的详细配置步骤

java 环境变量配置 本文旨在帮助用户完成Java环境变量的配置&#xff0c;涵盖Windows、Linux和macOS三大操作系统。对于每个系统&#xff0c;不仅提供了通过命令行设置环境变量的方法&#xff0c;还介绍了如何在VSCode中进行相应配置以启动Java项目&#xff0c;确保开发者能够…

Error response from daemon:

指出在尝试解析 auth.docker.io&#xff08;Docker Hub 的一个域名&#xff0c;用于身份验证和镜像拉取&#xff09;时&#xff0c;DNS 查询超时了。这通常意味着你的 Docker 客户端无法通过配置的 DNS 服务器&#xff08;在这个案例中是 &#xff09;来解析域名 解决方案&…

127.WEB渗透测试-信息收集-ARL(18)

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 内容参考于&#xff1a; 易锦网校会员专享课 上一个内容&#xff1a; 这一行是对应使用的指纹 这个界面是springbot 如果存在漏洞&#xff0c;他的信息里面可…

字节、快手、Vidu“打野”升级,AI视频小步快跑

文&#xff5c;白 鸽 编&#xff5c;王一粟 继9月份版本更新之后&#xff0c;光锥智能从生数科技联合创始人兼CEO唐家渝朋友圈获悉&#xff0c;Vidu大模型将于本周再次进行版本升级&#xff0c;Vidu-1.5版本即将上线。 此版本更新方向仍是重点延伸大模型的泛化能力和主体…

分享 pdf 转 word 的免费平台

背景 找了很多 pdf 转 word 的平台都骗进去要会员&#xff0c;终于找到一个真正免费的&#xff0c;遂分享。 网址 PDF转Word转换器 - 100%免费市面上最优质的PDF转Word转换器 - 免费且易于使用。无附加水印 - 快速将PDF转成Word。https://smallpdf.com/cn/pdf-to-word

CentOS下如何安装Nginx

1、下载nginx 官方网站 http://nginx.org 下载链接&#xff1a;http://nginx.org/download/ 下载完成后的安装包&#xff1a; 2、使用解压命令进行解压 tar -zxvf nginx-1.13.7.tar.gz3、在安装所需的安装环境 安装gcc环境 yum install gcc-c安装第三方开发包 - PCRE(P…

Springboot 不同版本的配置文件怎么知道差异

起因 今天配置一个 Springboot-3.3.5 的 redis-starter&#xff0c;结果一直提示链接不上 redis java.net.ConnectException: Connection refused我反复对比了新项目和老项目的 redis 配置文件格式&#xff0c;是一模一样的&#xff01; Debug 过程 配置中增加了如下配置 …

Diffusion Policy——斯坦福机器人UMI所用的扩散策略:从原理到其编码实现(含Diff-Control、ControlNet详解)

前言 本文一开始是属于此文《UMI——斯坦福刷盘机器人&#xff1a;从手持夹持器到动作预测Diffusion Policy(含代码解读)》的第三部分&#xff0c;考虑后Diffusion Policy的重要性很高&#xff0c;加之后续还有一系列基于其的改进工作 故独立成本文&#xff0c;且写的过程中 …

计算机新手练级攻略——写博客

目录 计算机新手练级攻略——写博客计算机新手写博客的好处加深知识点建立个人IP可能有额外的收入 如何写博客确定博客主题方向选择博客平台学习基础技能一定要有互动性持之以恒&#xff0c;克服惰性Just do it&#xff01;&#xff01;&#xff01; 计算机新手练级攻略——写博…

哥德巴赫猜想渐行渐远

我现在的工作&#xff0c;表明经典分析可能出了问题&#xff0c;如此则连Vinogradov的三素数定理都不成立了&#xff0c;更别说基于L-函数方程的陈氏定理“12”了。事实上即使L-函数方程成立&#xff0c;由于我指出Siegel定理不成立&#xff0c;陈景润和张益唐的工作就不成立。…

Linux探秘坊-------1.系统核心的低语:基础指令的奥秘解析(1)

1.Linux的背景介绍 Linux 操作系统的发展历程充满了激情与创新喵&#xff5e;&#x1f380; 萌芽期 (1983 - 1991)&#xff1a;Linux 的历史可追溯到 1983 年&#xff0c;理查德斯托曼 (Richard Stallman) 发起 GNU 计划&#xff0c;目标是创建一个自由软件操作系统。1987 年发…

Angular 和 Vue2.0 对比

前言 &#xff1a;“业精于勤&#xff0c;荒于嬉&#xff1b;行成于思&#xff0c;毁于随” 很久没写博客了&#xff0c;大多记录少进一步探查。 Angular 和 Vue2.0 对比&#xff1a; 一.概念 1.1 Angular 框架&#xff1a; 是一款由谷歌开发的开源web前端框架&#xff08;核…

【Python】轻松实现机器翻译:Transformers库使用教程

轻松实现机器翻译&#xff1a;Transformers库使用教程 近年来&#xff0c;机器翻译技术飞速发展&#xff0c;从传统的基于规则的翻译到统计机器翻译&#xff0c;再到如今流行的神经网络翻译模型&#xff0c;尤其是基于Transformer架构的模型&#xff0c;翻译效果已经有了质的飞…

[2024最新] macOS 发起 Bilibili 直播(不使用 OBS)

文章目录 1、B站账号 主播认证2、开启直播3、直播设置添加素材、隐私设置指定窗口添加/删除 窗口 4、其它说明官方直播帮助中心直播工具教程 目前搜到的 macOS 直播教程都比较古早&#xff0c;大部分都使用 OBS&#xff0c;一番探索下来&#xff0c;发现目前已经不需要 OBS了&a…

前端 性能优化 (图片与样式篇)

文章目录 前端能够做哪些图片优化?1、减小图片体积2、图片缓存服务工作线程 (Service Workers)缓存IndexDB缓存图片LocalStorage缓存 3、图片懒加载使用 loading"lazy" 属性 4、不同分辨率下使用不同的图片5、使用webp格式的图片6、配置图片CDN7、减少图片和动图的使…

【MySQL 保姆级教学】事务的自动提交和手动提交(重点)--上(13)

目录 1. 什么是事务&#xff1f;2. 事务的版本支持3. 事务提交的方式3.1 事务提交方式的分类3.2 演示的准备的工作3.2.1 创建表3.2.2 MySQL的服务端和客户端3.2.3 调低事务的隔离级别 4. 手动提交4.1 手动提交的命令说明4.2 示例一4.3 示例二4.4 示例三4.5 示例四 5. 自动提交5…

C++ | Leetcode C++题解之第546题移除盒子

题目&#xff1a; 题解&#xff1a; class Solution { public:int dp[100][100][100];int removeBoxes(vector<int>& boxes) {memset(dp, 0, sizeof dp);return calculatePoints(boxes, 0, boxes.size() - 1, 0);}int calculatePoints(vector<int>& boxes…

RK3588部署ppocr流程及安装环境_笔记1

前言&#xff1a; RK3588部署ppocr流程及ubuntu安装环境 目录 一、NoMachine安装使用 二、把ubuntu系统从英文修改为中文界面 三、安装conda 没有报错说明没有问题&#xff0c;如果source的时候报错&#xff0c;查看 ​编辑 报这种错&#xff1a; 5、需要添加国内镜像源…