Kafka和Spark Streaming的组合使用学习笔记(Spark 3.5.1)

一、安装Kafka

1.执行以下命令完成Kafka的安装:
cd ~  //默认压缩包放在根目录
sudo tar -zxf  kafka_2.12-2.6.0.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.12-2.6.0 kafka-2.6.0
sudo chown -R qiangzi ./kafka-2.6.0

二、启动Kafaka

1.首先需要启动Kafka,打开一个终端,输入下面命令启动Zookeeper服务:
cd  /usr/local/kafka-2.6.0
./bin/zookeeper-server-start.sh  config/zookeeper.properties

注意:以上现象是Zookeeper服务器已经启动,正在处于服务状态。不要关闭!

2.打开第二个终端输入下面命令启动Kafka服务:
cd  /usr/local/kafka-2.6.0
./bin/kafka-server-start.sh  config/server.properties

//加了“&”的命令,Kafka就会在后台运行,即使关闭了这个终端,Kafka也会一直在后台运行。
bin/kafka-server-start.sh  config/server.properties  &

注意:同样不要误以为死机了,而是Kafka服务器已经启动,正在处于服务状态。

三、创建Topic

1.再打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsender”的Topic:
cd /usr/local/kafka-2.6.0
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsender
2.然后,可以执行如下命令,查看名称为“wordsender”的Topic是否已经成功创建:
./bin/kafka-topics.sh --list --zookeeper localhost:2181

3.再新开一个终端(记作“监控输入终端”),执行如下命令监控Kafka收到的文本:
cd /usr/local/kafka-2.6.0
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordsender

注意,所有这些终端窗口都不要关闭,要继续留着后面使用。

四、Spark准备工作

Kafka和Flume等高级输入源,需要依赖独立的库(jar文件),因此,需要为Spark添加相关jar包。访问MVNREPOSITORY官网(http://mvnrepository.com),下载spark-streaming-kafka-0-10_2.12-3.5.1.jar和spark-token-provider-kafka-0-10_2.12-3.5.1.jar文件,其中,2.12表示Scala的版本号,3.5.1表示Spark版本号。然后,把这两个文件复制到Spark目录的jars目录下(即“/usr/local/spark-3.5.1/jars”目录)。此外,还需要把“/usr/local/kafka-2.6.0/libs”目录下的kafka-clients-2.6.0.jar文件复制到Spark目录的jars目录下。

cd ~  .jar文件默认放在根目录
sudo mv ./spark-streaming-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/
sudo mv ./spark-token-provider-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/
sudo cp /usr/local/kafka-2.6.0/libs/kafka-clients-2.6.0.jar /usr/local/spark-3.5.1/jars/

spark-streaming-kafka-0-10_2.12-3.5.1.jar的下载页面:

Maven Repository: org.apache.spark » spark-streaming-kafka-0-10_2.12 » 3.5.1 (mvnrepository.com)

spark-streaming-kafka-0-10_2.12-3.5.1.jar的下载页面:

Maven Repository: org.apache.spark » spark-token-provider-kafka-0-10_2.12 » 3.5.1 (mvnrepository.com)

进入下载页面以后,如下图所示,点击红色方框内的“jar”,就可以下载JAR包了。

五、编写Spark Streaming程序使用Kafka数据源

1.编写生产者(Producer)程序
(1)新打开一个终端,然后,执行如下命令创建代码目录和代码文件:
cd /usr/local/spark-3.5.1
mkdir mycode
cd ./mycode
mkdir kafka
mkdir -p kafka/src/main/scala
vi kafka/src/main/scala/KafkaWordProducer.scala
(2)使用vi编辑器新建了KafkaWordProducer.scala

它是用来产生一系列字符串的程序,会产生随机的整数序列,每个整数被当作一个单词,提供给KafkaWordCount程序去进行词频统计。请在KafkaWordProducer.scala中输入以下代码:

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
object KafkaWordProducer {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }
    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
   // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).
toString)
          .mkString(" ")
                    print(str)
                    println()
        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }
     Thread.sleep(1000)
    }
  }
}
2.编写消费者(Consumer)程序

在“/usr/local/spark-3.5.1/mycode/kafka/src/main/scala”目录下创建文件KafkaWordCount.scala,用于单词词频统计,它会把KafkaWordProducer发送过来的单词进行词频统计,代码内容如下:

cd /usr/local/spark-3.5.1/mycode
vi kafka/src/main/scala/KafkaWordCount.scala
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
 
object KafkaWordCount{
  def main(args:Array[String]){
    val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")
    val ssc = new StreamingContext(sc,Seconds(10))
    ssc.checkpoint("file:///usr/local/spark-3.5.1/mycode/kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动Hadoop
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    val topics = Array("wordsender")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    stream.foreachRDD(rdd => {
      val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      val maped: RDD[(String, String)] = rdd.map(record => (record.key,record.value))
      val lines = maped.map(_._2)
      val words = lines.flatMap(_.split(" "))
      val pair = words.map(x => (x,1))
      val wordCounts = pair.reduceByKey(_+_)
      wordCounts.foreach(println)
    })
    ssc.start
    ssc.awaitTermination
  }
}
3.在路径“file:///usr/local/spark/mycode/kafka/”下创建“checkpoint”目录作为预写式日志的存放路径。
cd ./kafka
mkdir checkpoint
4.继续在当前目录下创建StreamingExamples.scala代码文件,用于设置log4j:
cd /usr/local/spark-3.5.1/mycode/
vi kafka/src/main/scala/StreamingExamples.scala


/*StreamingExamples.scala*/
package org.apache.spark.examples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}                                                                                 /** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {
  /** Set reasonable logging levels for streaming if the user has not configured log4j. */
  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      // We first log something to initialize Spark's default logging, then we override the
      // logging level.
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }                                                                                                                     }                                                                                                                     } 
5.编译打包程序

现在在“/usr/local/spark-3.5.1/mycode/kafka/src/main/scala”目录下,就有了如下3个scala文件:

然后,执行下面命令新建一个simple.sbt文件:

cd /usr/local/spark-3.5.1/mycode/kafka/
vim simple.sbt

在simple.sbt中输入以下代码:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.18"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.5.1" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.5.1"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.6.0"

然后执行下面命令,进行编译打包:

cd  /usr/local/spark-3.5.1/mycode/kafka/
/usr/local/sbt-1.9.0/sbt/sbt  package

打包成功界面

6. 运行程序

首先,启动Hadoop,因为如果前面KafkaWordCount.scala代码文件中采用了ssc.checkpoint
("/user/hadoop/checkpoint")这种形式,这时的检查点是被写入HDFS,因此需要启动Hadoop。启动Hadoop的命令如下:

cd  /usr/local/hadoop-2.10.1
./sbin/start-dfs.sh
或者
start-dfs.sh
start-yarn.sh

启动Hadoop成功以后,就可以测试刚才生成的词频统计程序了。
要注意,之前已经启动了Zookeeper服务和Kafka服务,因为之前那些终端窗口都没有关闭,所以,这些服务一直都在运行。如果不小心关闭了之前的终端窗口,那就参照前面的内容,再次启动Zookeeper服务,启动Kafka服务。
然后,新打开一个终端,执行如下命令,运行“KafkaWordProducer”程序,生成一些单词(是一堆整数形式的单词):

cd  /usr/local/spark-3.5.1/mycode/kafka/
/usr/local/spark-3.5.1/bin/spark-submit --class "KafkaWordProducer" ./target/scala-2.12/sime-project_2.12-1.0.jar localhost:9092 wordsender  3  5

注意,上面命令中,“localhost:9092 wordsender 3 5”是提供给KafkaWordProducer程序的4个输入参数,第1个参数“localhost:9092”是Kafka的Broker的地址,第2个参数“wordsender”是Topic的名称,我们在KafkaWordCount.scala代码中已经把Topic名称写死掉,所以,KafkaWordCount程序只能接收名称为“wordsender”的Topic。第3个参数“3”表示每秒发送3条消息,第4个参数“5”表示每条消息包含5个单词(实际上就是5个整数)。
执行上面命令后,屏幕上会不断滚动出现类似如下的新单词:

不要关闭这个终端窗口,让它一直不断发送单词。然后,再打开一个终端,执行下面命令,运行KafkaWordCount程序,执行词频统计:

cd  /usr/local/spark-3.5.1/mycode/kafka/
/usr/local/spark-3.5.1/bin/spark-submit --class "KafkaWordCount" ./target/scala-2.12/simple-oject_2.12-1.0.jar
运行上面命令以后,就启动了词频统计功能,屏幕上就会显示如下类似信息:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/616014.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Github上 5 个好玩儿的开源项目

1. 在你的 Windows 养小猫 2. 把你的图片生成 ASCII 3. 中国制霸生成器 4. 像素风格代码字体 5. 梦回 QQ 空间 01 在你的 Windows 养小猫 在MacBook的触摸板上&#xff0c;你可以抚养一只小宠物&#xff0c;并与它互动、喂食&#xff0c;这样非常有趣。 我向你推荐了一个…

【Qt 学习笔记】Qt常用控件 | 容器类控件 | Group Box的使用及说明

博客主页&#xff1a;Duck Bro 博客主页系列专栏&#xff1a;Qt 专栏关注博主&#xff0c;后期持续更新系列文章如果有错误感谢请大家批评指出&#xff0c;及时修改感谢大家点赞&#x1f44d;收藏⭐评论✍ Qt常用控件 | 容器类控件 | Group Box的使用及说明 文章编号&#xff…

外汇crm系统是什么

外汇CRM系统是一种专门为外汇交易市场设计的客户关系管理系统。它结合了外汇交易的特点和客户管理的需求&#xff0c;为外汇交易商提供了全面的解决方案。它的出现&#xff0c;极大地促进了外汇交易行业的发展&#xff0c;为交易商提供了更高效、更智能的客户管理方式。 一、外…

力扣每日一题124:二叉树中的最大路径和

题目 困难 二叉树中的 路径 被定义为一条节点序列&#xff0c;序列中每对相邻节点之间都存在一条边。同一个节点在一条路径序列中 至多出现一次 。该路径 至少包含一个 节点&#xff0c;且不一定经过根节点。 路径和 是路径中各节点值的总和。 给你一个二叉树的根节点 root…

08.3.grafana自定义图形

grafana自定义图形 找插件里面的zabbix 点击update 数据源—zabbix数据源,添加zabbix数据源 选择zabbix类型 我这里配置的是本地&#xff0c;所以URL直接localhost 这里配置zabbix登录账号密码Admin/zabbix 然后点击保存并测试&#xff0c;会直接显示版本 导入模板&…

电影网站|基于SSM+vue的电影网站系统(源码+数据库+文档)

电影网站 目录 基于SSMvue的电影网站系统 一、前言 二、系统设计 三、系统功能设计 1 系统功能模块 2 管理员功能模块 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取&#xff1a; 博主介绍&#xff1a;✌️大厂码农|毕设布道…

虚拟机CentOS密码重置

1&#xff0c;reboot重启 在出现下面的界面1按e 如果有选项就选择“CentOS Linux &#xff08;3.10.0-327.e17.x86_64&#xff09;7 &#xff08;Core&#xff09;”【我的电脑没有直接显示界面2】 界面1 界面2 2&#xff0c;在上述界面2中继续按e进入编辑模式 找到“ro cr…

vue2和vue3区别: 探索关键差异

vue2和vue3区别&#xff1a; 探索关键差异 Vue.js 作为流行的前端框架&#xff0c;其版本 3 带来了许多令人兴奋的改进和新功能。虽然 Vue 3 保持了与 Vue 2 的相似性&#xff0c;但也存在一些关键差异需要开发者注意。本文将通过表格形式&#xff0c;清晰地展现 Vue 2 和 Vue …

文献阅读——LPPLS(2)

A study on the bursting point of Bitcoin based on the BSADF and LPPLS methods 文献来源[2] Yao, Can-Zhong, and Hong-Yu Li. “A study on the bursting point of Bitcoin based on the BSADF and LPPLS methods.” The North American Journal of Economics and Financ…

Istio 使用 Apache SkyWalking 进行服务链路追踪、链路监控告警

一、Istio 使用 Apache SkyWalking 链路追踪和告警 SkyWalking是一个开源的观测平台&#xff0c;用于从服务和云原生等基础设施中收集、分析、聚合以及可视化数据&#xff0c;SkyWalking 提供了一种简便的方式来清晰地观测分布式系统&#xff0c;甚至可以观测横跨不同云的系统…

PyCharm粘贴失灵?一文教你快速恢复!(如何解决Pycharm无法粘贴的问题)

文章目录 💢 问题 💢🏡 演示环境 🏡💯 解决方案 💯⚓️ 相关链接 ⚓️💢 问题 💢 "为什么你的代码编辑器突然变得不听话了?"最近在使用pycharm的时候遇到了一个问题,就是在pycharm中无法使用粘贴功能,后面经过一番折腾得到了解决,现在将我的解决…

03、SpringBoot 源码分析 - SpringApplication启动流程三

SpringBoot 源码分析 - SpringApplication启动流程三 初始化基本流程SpringApplication的setListeners设置监听器deduceMainApplicationClass对端主启动类rungetRunListeners获取SpringApplicationRunListener监听器EventPublishingRunListener的构造方法SimpleApplicationEven…

第十二篇:数据库系统导论 - 探索数据管理的基石

数据库系统导论 - 探索数据管理的基石 1 引言 数据的力量&#xff1a;揭秘数据库系统的核心 在信息时代&#xff0c;数据无处不在&#xff0c;它们成为了企业和社会运作的基础。我们如何储存、检索、更新和维护这些数据&#xff0c;决定了我们能否从这些数据中获得力量。数据…

C语言深入理解指针(4)--指针笔试题解析

个人主页&#xff1a;C忠实粉丝 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 C忠实粉丝 原创 C语言深入理解指针(4) 收录于专栏【C语言学习】 本专栏旨在分享学习C语言学习的一点学习笔记&#xff0c;欢迎大家在评论区交流讨论&#x1f48c; 目录 1. size…

[嵌入式系统-75]:RT-Thread-快速上手:正点原子探索者 STM32F407示例

目录 正点原子探索者 STM32F407 上手指南 1. 简介 2. 准备工作 3. 运行第一个示例程序 3.1 编译下载 3.2 运行 继续学习 正点原子探索者 STM32F407 上手指南 1. 简介 探索者 STM32F407 是正点原子推出的一款基于 ARM Cortex-M4 内核的开发板&#xff0c;最高主频为 16…

启用dell服务器的iDRAC

插网线 观察到 dell服务器背板左侧有一个网口&#xff0c;标有iDRAC字样&#xff0c;使用网线将该网口和网段所在的交换机连接起来。 网络配置 重启计算机&#xff0c;依照屏幕显示按F2进入SystemSetup。选择iDRACsettings – Network&#xff0c;需要改动的如下&#xff08;现…

WPS表格:使用vlookup函数解决乱序数据对应问题

我们常常会遇到两个表格的内容相同&#xff0c;但是顺序不一致的情况。并且这种顺序无关于简单的排序&#xff0c;而是一种业务性很强的复杂排序规则。下面我举个例子&#xff0c;使用VLOOKUP复制数据。 假设太阳系行星举办了一次卖萌比赛&#xff0c;由太阳妈妈决定谁是最萌的…

Java | Leetcode Java题解之第84题柱状图中最大的矩形

题目&#xff1a; 题解&#xff1a; class Solution {public int largestRectangleArea(int[] heights) {int n heights.length;int[] left new int[n];int[] right new int[n];Arrays.fill(right, n);Deque<Integer> mono_stack new ArrayDeque<Integer>();f…

【git】通过JetBrains IDE对git的操作

应该适用于所有jetbrains产品。 一、拉取(pull)代码 上方工具栏-Git-克隆。然后填写git地址与本地存放地址。 二、搁置 修改代码后搁置代码&#xff08;不提交&#xff0c;但是也不撤销已修改的代码&#xff0c;把它暂存起来&#xff09;。 界面的左上角。1->2->3。…

[算法][差分数组][leetcode]1094. 拼车

地址&#xff1a; https://leetcode.cn/problems/car-pooling/description/ 解法一&#xff1a;暴力解法 class Solution {public boolean carPooling(int[][] trips, int capacity) {//特殊条件判断if(nulltrips||capacity<0){return false;}int [] d new int[1001];//暴…