Spark-Streaming集成Kafka

 Spark Streaming集成Kafka是生产上最多的方式,其中集成Kafka 0.10是较为简单的,即:Kafka分区和Spark分区之间是1:1的对应关系,以及对偏移量和元数据的访问。与高版本的Kafka Consumer API 集成时做了一些调整,下面我们一起来看看吧。

一、创建一个Direct Stream

导入相关maven依赖

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  <version>3.5.3</version>
</dependency>

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

class KafkaDriectStream {
  def main(args: Array[String]): Unit = {

    // 创建一个具有2个线程和1秒批处理间隔的本地StreamingContext。
    val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaDriectStream")
    val ssc = new StreamingContext(conf, Seconds(1))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "cdh1:9092,cdh2:9092,cdh3:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("topicA", "topicB")
    val inputDStream :InputDStream[ConsumerRecord[String, String]]= KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    inputDStream.map(record => (record.key, record.value))
  }
}

如果Spark批处理持续时间大于默认的Kafka心跳会话超时时间(30秒),请适当增加heartbeat.interval.ms和session.timeout.ms。对于大于5分钟的批处理,这将需要更改代理上的group.max.session.timeout.ms。

二、executor选择适合分区处理

新的Kafka Consumer API会将消息预取到缓冲区中。因此,出于性能原因,Spark集成Kafka时最好将缓存的Consumer 保留在executor上(而不是为每个批次重新创建它们)。

在大多数情况下,应该使用LocationStrategies.PreferConsistent。这将在可用的executor之间均匀地分配分区。如果executor与Kafka 的broker位于相同的主机上,则使用PreferBrokers,这将在该分区的Kafka leader上安排分区。最后,如果分区之间的负载严重偏差,请使用PreferFixed。这允许指定分区到主机的显式映射(任何未指定的分区都将使用一致的位置)。

Consumer 缓存的默认最大大小为64。如果处理超过(64个executor数量)的Kafka分区,可以通过更改spark.streaming.kafka.consumer.cache.maxCapacity设置。

如果想禁用Consumer 的缓存,可以将spark.streaming.kafka.consumer.cache.enabled 设置成false

缓存由topic分区和group.id控制,因此对createDirectStream的每次调用使用单独的 group.id

三、根据topic、partition、offset创建RDD

// 导入依赖关系并创建kafka-params,例如第一步:创建Direct Stream

val offsetRanges = Array(
  // topic, partition, 包含起始offset, 不包含结束offset
  OffsetRange("test", 0, 0, 100),
  OffsetRange("test", 1, 0, 100)
)

//根据kafka TopicPartition 中的一段数据来创建一个RDD,这是不是为了实现微批来提供支持呢
val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)

请注意,这里不能指定broker来消费,因为spark streaming的Driver Consumer 可以自动查找broker的元数据。如果要指定broker,需要将其与元数据绑定到一起。

四、获取offset

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd.foreachPartition { iter =>
    val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
  }
}

请注意,HasOffsetRanges的类型转换只有在createDirectStream结果调用的第一个方法中完成时才会成功,而不是在后面的方法链中完成。因为一旦发生shuffle和重分区,RDD分区和Kafka分区之间的一对一关系就会遭到破坏。

五、存储offset

在kafka中为了实现精确一次的语义,必须把结果处理和offset放到一个事务中去处理,在与spark streaming集成时也不例外。必须在幂等输出之后存储offset,或者将offset与输出一起存储在原子事务中。

offset可以存储在spark的checkpoint中,也可以存储在kafka自身的内部topic中。将offset存储到kafka的好处是,无论应用程序代码发生什么变化,Kafka都是一个持久的存储。但是,Kafka不是事务性的,程序的输出必须仍然是幂等的。注意,在流式计算中我们一般会将enable.auto.commit置为false。采用手动提交的方式。

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // 一段时间后,在输出完成之后,提交offset
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

与HasOffsetRanges一样,只有在createDirectStream的结果上调用时,才能成功得到CanCommitOffsets ,而不是在转换之后。获取到CanCommitOffsets 一般要等这批数据处理完再进行提交。

// 从提交到数据库的偏移量开始
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
  new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  val results = yourCalculation(rdd)

  // 开启事务

  // 更新结果
  // 更新offset

  // 结束事务
}

六、官方例子

object DirectKafkaWordCount {
  def main(args: Array[String]): Unit = {
    if (args.length < 3) {
      System.err.println(s"""
        |Usage: DirectKafkaWordCount <brokers> <groupId> <topics>
        |  <brokers> is a list of one or more Kafka brokers
        |  <groupId> is a consumer group name to consume from topics
        |  <topics> is a list of one or more kafka topics to consume from
        |
        """.stripMargin)
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val Array(brokers, groupId, topics) = args

    // 以2秒的批处理间隔创建上下文
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    //指定kafka、topic信息创建direct kafka stream
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

    // 获取一行数据并进行分割、统计、打印
    val lines = messages.map(_.value)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()

    //启动计算
    ssc.start()
    ssc.awaitTermination()
  }
}

该例子消费Kafka中一个或多个topic的消息并进行单词统计,需要三个参数:1、Kafka broker的列表,2、消费者组,3、以逗号分隔的topic列表

1、创建2个topic

kafka-topics --create --topic spark-streaming-wc1 --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2
kafka-topics --create --topic spark-streaming-wc2 --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2

2、启动程序

cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/
bin/run-example org.apache.spark.examples.streaming.DirectKafkaWordCount cdh1:9092,cdh2:9092 direct-kafka-wc-group spark-streaming-wc1,spark-streaming-wc2

3、向topic推送数据

kafka-console-producer --topic spark-streaming-wc1 --broker-list cdh1:9092,cdh2:9092,cdh3:9092
kafka-console-producer --topic spark-streaming-wc2 --broker-list cdh1:9092,cdh2:9092,cdh3:9092

4、查看结果


大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:

第四届大数据、信息与计算机网络国际学术会议(BDICN 2025)

  • 广州
  • https://ais.cn/u/fi2yym

第四届电子信息工程、大数据与计算机技术国际学术会议(EIBDCT 2025)

  • 青岛
  • https://ais.cn/u/nuQr6f

第六届大数据与信息化教育国际学术会议(ICBDIE 2025)

  • 苏州
  • https://ais.cn/u/eYnmQr

第三届通信网络与机器学习国际学术会议(CNML 2025)

  • 南京
  • https://ais.cn/u/vUNva2

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/941414.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Python】pandas库---数据分析

大学毕业那年&#xff0c;你成了社会底层群众里&#xff0c;受教育程度最高的一批人。 前言 这是我自己学习Python的第四篇博客总结。后期我会继续把Python学习笔记开源至博客上。 上一期笔记有关Python的NumPy数据分析&#xff0c;没看过的同学可以去看看&#xff1a;【Pyt…

数字IC后端设计实现篇之TSMC 12nm TCD cell(Dummy TCD Cell)应该怎么加?

TSMC 12nm A72项目我们需要按照foundary的要求提前在floorplan阶段加好TCD Cell。这个cell是用来做工艺校准的。这个dummy TCD Cell也可以等后续Calibre 插dummy自动插。但咱们项目要求提前在floorplan阶段就先预先规划好位置。 TSCM12nm 1P9M的metal stack结构图如下图所示。…

LiteFlow决策系统的策略模式,顺序、最坏、投票、权重

个人博客&#xff1a;无奈何杨&#xff08;wnhyang&#xff09; 个人语雀&#xff1a;wnhyang 共享语雀&#xff1a;在线知识共享 Github&#xff1a;wnhyang - Overview 想必大家都有听过或做过职业和性格测试吧&#xff0c;尤其是现在的毕业生&#xff0c;在投了简历之后经…

YashanDB 23.2 YAC -单库多实例架构多活共享集群安装部署指南

一、概述 1.1 文档目标 ​ 本说明旨在指导技术人员在 CentOS 7 x86_64 操作系统上完成崖山数据库企业版 23.2 的共享集群安装与部署。通过系统架构、集群拓扑及部署需求的精确描述&#xff0c;帮助读者在开始安装前对崖山数据库的架构形成清晰认识。本文以高效、稳定、安全为…

某科技局国产服务器PVE虚拟化技术文档

环境介绍 硬件配置 服务器品牌&#xff1a;黄河 型号&#xff1a;Huanghe 2280 V2 Cpu型号&#xff1a;kunpeng-920 磁盘信息 :480SSD * 2 ,4T*4 网卡&#xff1a;板载四口千兆 如下表 四台服务器同等型号配置&#xff0c;均做单节点虚拟化&#xff0c;数据保护采用底层r…

Gin-vue-admin(4):项目创建前端一级页面和二级页面

目录 创建一级页面创建二级页面 创建一级页面 view目录下新建一个my&#xff0c;Index.vue <template></template><script> export default {name:My, } </script><script setup> import {ref} from vue const myNameref("name") &…

Ubuntu下的tcl/tk编程快速入门

一、Tcl/Tk 简介 接口介绍 https://www.tutorialspoint.com/tcl-tk/tcl_tk_quick_guide.htm GUI Canvas接口 https://www.tutorialspoint.com/tcl-tk/tk_canvas_widgets.htm tcl语言 https://www.tcl-lang.org/ 官方教程 https://www.tcl.tk/man/tcl8.5/tutorial/tcltutoria…

数字化审计咨询服务,企业转型数字化审计的必要条件

人工智能、云计算、大数据、物联网等新兴技术的快速发展&#xff0c;为企业的数字化转型提供了强大的技术支持。这些技术逐渐被应用到企业运营管理的方方面面&#xff0c;推动了企业内部审计工作的变革。随着数字化转型的深化和信息技术的不断发展&#xff0c;数字化审计将成为…

【QT常用技术讲解】发送POST包(两种方式:阻塞方式及非阻塞方式)

前言 http/https(应用层)协议是广泛使用的网络通信协议。在很多与第三方API对接的场景中&#xff0c;通常是通过http/https协议完成&#xff0c;比如API对接时&#xff0c;通常要通过POST包获取access_token进行鉴权&#xff0c;然后再进行数据交互&#xff08;本篇也包含有对接…

重撸设计模式--代理模式

文章目录 定义UML图代理模式主要有以下几种常见类型&#xff1a;代理模式涉及的主要角色有&#xff1a;C 代码示例 定义 代理模式&#xff08;Proxy Pattern&#xff09;属于结构型设计模式&#xff0c;它为其他对象提供一种代理以控制对这个对象的访问。 通过引入代理对象&am…

【Steel Code】 10.5 COMPOSITE COLUMNS

10.5 COMPOSITE COLUMNS 组合柱 10.5.1 General 总则 (1) This clause applies for the design of composite columns and composite compression members with fully encased H sections, partially encased H sections, and infilled rectangular and circular hollow sect…

11.vector的介绍及模拟实现

1.vector的介绍 记得之前我们用C语言实现过顺序表&#xff0c;vector本质上也是顺序表&#xff0c;一个能够动态增长的数组。 vector 的底层实现机制 - 动态数组&#xff1a;vector 的底层实现是动态数组。它在内存中连续存储元素&#xff0c;就像一个可以自动调整大小的数…

封装(2)

大家好&#xff0c;今天我们来介绍一下包的概念&#xff0c;知道包的作用可以更好的面对今后的开发&#xff0c;那么我们就来看看包是什么东西吧。 6.3封装扩展之包 6.3.1包的概念 在面向对象体系中,提出了一个软件包的概念,即:为了更好的管理类,把多个类收集在一起成为一组…

go官方日志库带色彩格式化

go默认的 log 输出的日志样式比较难看&#xff0c;所以通过以下方式进行了美化和格式化&#xff0c;而且加入了 unicode 的ascii码&#xff0c;进行色彩渲染。 package mainimport ("fmt""log""os""runtime""strings""…

0基础学前端系列 -- 深入理解 HTML 布局

在现代网页设计中&#xff0c;布局是至关重要的一环。良好的布局不仅能提升用户体验&#xff0c;还能使内容更具可读性和美观性。HTML&#xff08;超文本标记语言&#xff09;结合 CSS&#xff08;层叠样式表&#xff09;为我们提供了多种布局方式。本文将详细介绍流式布局、Fl…

Windows开启IIS后依然出现http error 503.the service is unavailable

问题背景 已启用IIS服务&#xff0c;配置步骤可以参考Windows10 IIS Web服务器安装配置 问题描述 在这一步浏览网站时&#xff0c;并没有出现默认首页&#xff0c;而是 http error 503 the service is unavailable 问题解决 参考 成功解决http error 503.the service is un…

BuildCTF 公开赛web部分wp

文章目录 LovePopChainRedFlagWhy_so_serials?babyuploadeazyl0ginez!httpez_md5find-the-idsubtflock刮刮乐我写的网站被rce了&#xff1f; LovePopChain payload: <?php class MyObject{public $NoLove"Do_You_Want_Fl4g?";public $Forgzy;public functi…

diff 算法实现的几种方法和前端中的应用

diff 算法原理和几种实现方法 diff 是什么 diff 算法就是比较两个数据的差异&#xff0c;例如字符串的差异&#xff0c;对象的差异。 常用于版本管理&#xff08;git&#xff09;例如下面的实际案例。 github 上某个 commit&#xff0c;旧代码和新代码之间的不同 diff 展示…

Nacos源码搭建

拉取并配置代码 仓库地址 https://github.com/alibaba/nacos找到config 模块中找到 \resources\META-INF\mysql-schema.sql&#xff0c;在本地mysql中创建数据库nacos-config&#xff0c;将该脚本导入执行创建表。 找到console模块下的配置文件application.properties&#x…

C# Winfrom chart图 实例练习

代码太多了我就不展示了&#xff0c;贴一些比较有代表性的 成品效果展示&#xff1a; Excel转Chart示例 简单说一下我的思路 \ 先把Excel数据展示在dataGridView控件上 XLIST 为 X轴的数据 XLIST 为 Y轴的数据 ZLIST 为 展示的数据进行数据处理点击展示即可 // 将Excel数…