2023_Spark_实验二十四:SparkStreaming读取Kafka数据源:使用Direct方式

SparkStreaming读取Kafka数据源:使用Direct方式

一、前提工作

  • 安装了zookeeper

  • 安装了Kafka

  • 实验环境:kafka + zookeeper + spark

  • 实验流程

二、实验内容

实验要求:实现的从kafka读取实现wordcount程序

启动zookeeper

zk.sh start

# zk.sh脚本 参考教程 https://blog.csdn.net/pblh123/article/details/134730738?spm=1001.2014.3001.5502

启动Kafka

kf.sh start

# kf.sh 参照教程 https://blog.csdn.net/pblh123/article/details/134730738?spm=1001.2014.3001.5502

 (测试用,实验不做)创建Kafka主题,如test,可参考:Kafka的安装与基本操作

--topic 定义topic名

--replication-factor  定义副本数

--partitions  定义分区数

--bootstrap-server  连接的Kafka Broker主机名称和端口号

--create 创建主题

--describe 查看主题详细描述

# 创建kafka主题测试
/opt/module/kafka_2.12-3.0.0/bin/kafka-topics.sh --create --bootstrap-server hd1:9092 --replication-factor 3 --partitions 1 --topic gnutest2

# 再次查看first主题的详情
/opt/module/kafka_2.12-3.0.0/bin/kafka-topics.sh --bootstrap-server hd1:9092 --describe --topic gnutest2

启动Kafka控制台生产者,可参考:Kafka的安装与基本操作

# 创建kafka生产者
/opt/module/kafka_2.12-3.0.0/bin/kafka-console-producer.sh --bootstrap-server hd1:9092 --topic gnutest2

创建maven项目

添加kafka依赖

       <!--- 添加streaming依赖 --->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.13</artifactId>
            <version>${spark.version}</version>
        </dependency>

       <!--- 添加streaming kafka依赖 --->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.13</artifactId>
            <version>3.4.1</version>
        </dependency>

编写程序,如下所示:

package exams

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

import java.lang

/**
 * @projectName SparkLearning2023  
 * @package exams  
 * @className exams.SparkStreamingReadKafka  
 * @description ${description}  
 * @author pblh123
 * @date 2023/12/1 15:19
 * @version 1.0
 *
 */
    
object SparkStreamingReadKafka {
  def main(args: Array[String]): Unit = {

    //  1. 创建spark,sc对象
    if (args.length != 2) {
      println("您需要输入一个参数")
      System.exit(5)
    }
    val musrl: String = args(0)
    val spark: SparkSession = new SparkSession.Builder()
      .appName(s"${this.getClass.getSimpleName}")
      .master(musrl)
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    // 生成streamingContext对象
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))

    //  2. 代码主体
    val bststrapServers = args(1)
    val kafkaParms: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> bststrapServers, //kafka列表
      "key.deserializer" -> classOf[StringDeserializer], k和v 的序列化类型
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream", //消费者组
      "auto.offset.reset" -> "latest", //如果没有记录偏移量,第一次从最开始读,有偏移量,接着偏移量读
      "enable.auto.commit" -> (true: java.lang.Boolean) // 消费者不自动提交偏移量
    )

    val topics = Array("gnutest2", "t100")
    // createDirectStream: 主动拉取数据
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParms)
    )
    val mapDStream: DStream[(String, String)] = stream.map(record => (record.key, record.value))

    //kafka 是一个key value 格式的, 默认key 为null ,一般用不上
    val resultRDD: DStream[(String, Int)] = mapDStream.flatMap(_._2.split(" ")).map((_, 1)).reduceByKey(_ + _)

    // 打印
    resultRDD.print()

    //  3. 关闭sc,spark对象
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
    sc.stop()
    spark.stop()
  }
}

配置输入参数

生产者追加数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/208560.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux系统常用指令

1.使用xshell登录到云服务器的Linux系统&#xff1a; ssh 用户名公网IP&#xff0c;例如&#xff1a; ssh root111.11.111. 2.添加用户 adduser 用户名&#xff0c;例如&#xff1a; adduser user 3.为用户设置密码 passwd 用户名&#xff0c;例如&#xff1a; passwd …

更改Jupyter Notebook 默认存储路径

import osprint(os.path.abspath(.)) 然后打开cmd,输入&#xff1a; jupyter notebook --generate-config 按照路径在本地文件夹中找到那个文件。 然后找到"c.NotebookApp.notebook_dir"这条语句&#xff1a;&#xff08;直接通过"crtlf"输入关键字找阿 …

【BEV感知 LSS方案】Lift-Splat-Shoot(LSS)

前言 LSS全称是Lift-Splat-Shoot&#xff0c;它先从车辆周围的多个摄像头拍摄到的图像进行特征提取&#xff0c;在特征图中估计出每个点的深度&#xff0c;然后把这些点“提升”到3D空间中。 接着&#xff0c;这些3D信息被放置到一个网格上&#xff0c;最后将这些信息“拍扁”…

设计模式-结构型模式之桥接设计模式

文章目录 三、桥接模式 三、桥接模式 桥接模式&#xff08;Bridge&#xff09;是用于把抽象化与实现化解耦&#xff0c;使得二者可以独立变化。它通过提供抽象化和实现化之间的桥接结构&#xff0c;来实现二者的解耦。 这种模式涉及到一个作为桥接的接口&#xff0c;使得实体类…

element中el-input限制只输入正整数或保留两位小数

文章目录 一、前言二、实现2.1、HTML2.2、只输入正整数2.3、只能输入数字或小数点 三、最后 一、前言 常见的el-input可能会限制用户只能输入正整数或保留两位小数&#xff0c;达到输入金额的需求&#xff0c;点击【跳转】访问el-input的官方文档 element-ui是有el-input-numb…

新闻网站的技术 SEO:综合指南

要在 Google 上对您的内容进行排名或将目标访问者吸引到您的新闻网站或门户网站&#xff0c;需要的不仅仅是将理想的单词组合串在一起。你应该优化你的内容以获得更高的排名。由于排名高&#xff0c;可见性越高&#xff0c;新闻网站就越高。 持续不断的新内容流和独特的 Googl…

ES6知识

作用域 局部作用域 局部作用域分为函数作用域和块作用域 函数作用域 在函数内部声明的变量只能在函数内部被访问&#xff0c;外部无法直接访问。函数的参数也是函数内部的局部变量。不同函数内部声明的变量无法互相访问。函数执行完毕后&#xff0c;函数内部的变量实际被清空…

容器安全是什么

容器安全是当前面临的重要挑战之一&#xff0c;但通过采取有效的应对策略&#xff0c;我们可以有效地保护容器的安全。在应对容器安全挑战时&#xff0c;我们需要综合考虑镜像安全、网络安全和数据安全等多个方面&#xff0c;并采取相应的措施来确保容器的安全性。 德迅蜂巢原…

Ubuntu 2204 安装libimobiledevice

libimobiledevice是一个开源的软件&#xff0c;它可以直接使用系统原生协议和IOS设备进行通信&#xff0c;类似iMazing&#xff0c;iTunes&#xff0c;libimobiledevice不依赖IOS的私有库&#xff0c;并且连接IOS设备时用的都是原生协议&#xff0c;IOS无需越狱就能实现设备信息…

常见基础指令【Linux】

目录 一、Linux基本指令1. ls2. pwd3. cd4. touch5. mkdir6. rm和rmdir7. man8. cp9. mv10. cat11. tac12. more13. less14. head15. tail16. date17. cal18. find19. grep20. zip/unzip21. echo22. wc23. tree24. which25. alias26. whoami27. stat28. tar29. uname30. shutdo…

SQL-分页查询offset的用法

今天在做一道关于查询一张表中第二高工资的问题时发现没有思路&#xff0c;经过一番搜索发现需要用到offset偏移量来解决这个问题。 OFFSET关键字用于指定从结果集的哪一行开始返回数据。通常&#xff0c;它与LIMIT一起使用&#xff0c;以实现分页效果。其语法如下&#xff1a…

北邮22级信通院数电:Verilog-FPGA(12)第十二周实验(1)设计一个汽车尾灯自动控制系统

北邮22信通一枚~ 跟随课程进度更新北邮信通院数字系统设计的笔记、代码和文章 持续关注作者 迎接数电实验学习~ 获取更多文章&#xff0c;请访问专栏&#xff1a; 北邮22级信通院数电实验_青山如墨雨如画的博客-CSDN博客 目录 一.题目要求 二.代码部分 2.1 car_system.…

【redis】[windows]redis安装以及配置等相关

前言&#xff1a;下载安装配置密码、远程访问等等 目录 一、下载 二、配置文件说明 1、bind 1.1 这个参数默认值是127.0.0.1&#xff0c;也就是只允许redis所在机器访问redis。 1.2 如果我们的应用服务和redis服务不在一个机器我们就需要修改这个参数为0.0.0.0&#xff0c…

使用C语言创建高性能爬虫ip网络

之前写的python和GO语言的爬虫ip池的文章引起很大反响&#xff0c;这次我将以C语言来创建爬虫IP池&#xff0c;但是因为其复杂性&#xff0c;可能代码并非完美。但是最终也达到的想要的效果。 因为在C语言中创建代理IP池可能会比较复杂&#xff0c;且C语言并没有像Python那样的…

Java开发分析 JProfiler 14中文 for Mac

JProfiler Mac版新增功能 已添加用于传出请求 的HTTP探测。同步和异步调用都是测量的。支持的HTTP客户端是&#xff1a; java.net.URLConnection中 Java HTTP客户端&#xff08;Java 11&#xff09; Apache HttpClient 4.x Apache Async HttpClient 4.x OkHttp 3.9 Jersey&am…

SAS聚类分析介绍

1 聚类分析介绍 1.1基本概念 聚类就是一种寻找数据之间一种内在结构的技术。聚类把全体数据实例组织成一些相似组&#xff0c;而这些相似组被称作聚类。处于相同聚类中的数据实例彼此相同&#xff0c;处于不同聚类中的实例彼此不同。聚类技术通常又被称为无监督学习&#xff0…

centos上安装并持久化配置LVS

1 实验背景 1&#xff09;系统版本&#xff1a;centos7.8 2&#xff09;虚拟机&#xff1a;3个centos虚拟机&#xff0c;&#xff08;其中一个做Director Server,另外两个做Real Server) 3) LVS大致有NAT ,DR ,Tun这三种模式&#xff0c;这里搭建一个典型的DR模式的LVS Direc…

C/C++ 通过HTTP实现文件上传下载

WinInet&#xff08;Windows Internet&#xff09;是 Microsoft Windows 操作系统中的一个 API 集&#xff0c;用于提供对 Internet 相关功能的支持。它包括了一系列的函数&#xff0c;使得 Windows 应用程序能够进行网络通信、处理 HTTP 请求、FTP 操作等。WinInet 提供了一套…

DynamicDataSource

DynamicDataSource 多数据源&#xff0c;读写分离&#xff0c;主从数据库

7、单片机与W25Q128(FLASH)的通讯(SPI)实验(STM32F407)

SPI接口简介 SPI 是英语Serial Peripheral interface的缩写&#xff0c;顾名思义就是串行外围设备接口。是Motorola首先在其MC68HCXX系列处理器上定义的。 SPI&#xff0c;是一种高速的&#xff0c;全双工&#xff0c;同步的通信总线&#xff0c;并且在芯片的管脚上只占用四根…