kafka集成spark

1.新建Scala项目

具体教程可见在idea中创建Scala项目教程-CSDN博客

1.1右键项目名-添加框架支持-勾选scala

1.2main目录下新建scala目录-右键Scala目录-将目录标记为-勾选源代码根目录

1.3创建包com.ljr.spark

1.4引入依赖(pox.xml)

<dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
           <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>

1.5把spark conf/目录下的log4j.properties 复制到项目的resources目录

2.集成spark生产者

新建SparkKafkaProducer (注意选择的是object而不是class)

package com.ljr.spark
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

import java.util.Properties

object SparkKafkaProducer {

  def main(args: Array[String]): Unit = {
    //1 属性配置
    val pros = new Properties()
    pros.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092")
    pros.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])
    pros.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])

    //2 创建生产者
    val producer = new KafkaProducer[String, String](pros)

    //3 发送数据
    for (i <- 1 to 5) {
      producer.send(new ProducerRecord[String,String]("customers","Lili" + i))
    }
    //4 关闭资源
    producer.close()
  }
}

运行,开启Kafka 消费者消费数据

kafka-console-consumer.sh --bootstrap-server node1:9092 --topic customers

能接收到信息,可见spark作为生产者集成Kafka成功

3.集成spark消费者

package com.ljr.spark

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object SparkKafkaConsumer {
      def main(args: Array[String]): Unit = {
       //1 初始化上下文环境
       val conf = new SparkConf().setMaster("local[*]").setAppName("spark-kafka")
        val sc = new StreamingContext(conf, Seconds(3))

        //2 消费数据
        val kafkapara = Map[String, Object](
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->"node1:9092,node2:9092",
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
          ConsumerConfig.GROUP_ID_CONFIG->"KFK-SP"
        )
        val kafkaDstream = KafkaUtils.createDirectStream(sc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Set("customers"), kafkapara))
        val valueDstream = kafkaDstream.map(record => record.value())
        valueDstream.print()
        //3 执行代码并阻塞
          sc.start()
        sc.awaitTermination()

      }
}

运行,

开启Kafka 生产者生产数据

kafka-console-producer.sh.sh --bootstrap-server node1:9092 --topic customers

控制台可以消费到数据,可见spark作为消费者集成Kafka成功。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/698201.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

3D Web轻量化引擎HOOPS Communicator核心功能特点及应用优势

随着3D技术的发展&#xff0c;越来越多的行业开始在其应用中集成三维模型和可视化技术。特别是在工程、建筑和制造业&#xff0c;3D模型不仅用于设计和展示&#xff0c;还用于分析和优化。然而&#xff0c;传统的3D渲染引擎往往需要高性能的硬件支持&#xff0c;难以在网络环境…

探索GPT-4V在学术领域的应用——无需编程即可阅读和理解科学论文

1. 概述 论文地址&#xff1a;https://arxiv.org/pdf/2312.05468.pdf 随着人工智能潜力的不断扩大&#xff0c;人工智能&#xff08;AI&#xff09;在化学领域的应用也在迅速发展。特别是大规模语言模型的出现&#xff0c;极大地扩展了人工智能在化学研究中的作用。由于这些模…

Vue 路由:一级路由,嵌套路由

1、安装路由插件,因为用的是vue2 所以路由版本要和vue2对应上&#xff0c;所有有3 yarn add vue-router3 2、在main.js里引入 import VueRouter from vue-router Vue.use(VueRouter) 3、新建文件夹 router,创建index.js 4、引入路由插件&#xff0c;并且暴露出来这个路由 5、在…

一文入门vim

先来波快问快答。 第一个问题&#xff0c;vim是什么&#xff1f; vim就是一文本编辑器。 第二个问题&#xff0c;我们为什么要使用vim&#xff1f; 好像在终端中可选择使用的文本编辑器也不多&#xff08;其他有&#xff0c;但是相对而言vim用的比较广泛&#xff09; 第三…

PNAS | 工作记忆中大脑节律的因果功能图

摘要 工作记忆是一个涉及大脑中多个功能解剖节点的关键认知过程。尽管有大量与工作记忆结构相关的神经影像学证据&#xff0c;但我们对控制整体表现的关键中枢的理解并不完整。因果解释需要在对特定功能解剖节点进行安全、暂时和可控的神经调节后进行认知测试。随着经颅交流电…

适用于 macOS 的最佳免费数据恢复软件

升级到 macOS 后&#xff0c;它可以帮助您从 HDD、SSD、存储卡、USB 闪存驱动器、数码相机或其他存储介质设备中完全恢复已删除、格式化或无法访问的数据。 当 macOS Monterey 用户寻找数据恢复解决方案时&#xff0c;免费数据恢复软件始终是一个不错的选择。实际上&#xff0…

通过引用得到变量的值

编写程序&#xff1a; 运行结果&#xff1a; 程序分析&#xff1a; a的值开始为10,b是a的引用&#xff0c;它的值当然也应该是10&#xff0c;当a的值变为100(a*a的值&#xff09;时&#xff0c;b的值也随之变为100。在输出a和b的值后&#xff0c;b的值变为20&#xff0c;显然a的…

30 张Java 的思维导图,全面梳理构建 Java 的知识体系分享

小编这几天在网上收集了 30 张大佬制作的 Java 知识点总结的思维导图&#xff0c;整理成了这篇文章分享给大家&#xff0c;帮助大家梳理构建 Java 的知识体系。 这份思维导图包含从Java的简介、主要特性、发展历史到语法、数据类型、修饰符、运算符、类、数组、框架、面向对象…

Faster-RCNN基本思想和网络结构

简单来说&#xff0c;Faster RCNN RPN Fast RCNN RPN 是指 Region Proposal Network&#xff0c;建议区域生成网络。 Faster RCNN 中用 RPN 来代替了 Fast RCNN 中的SS算法。 算法流程&#xff1a; &#xff08;1&#xff09;将图像输入CNN网络得到相应的特征图。 &#x…

如何评估pcdn调度算法的优化效果(壹)

评估PCDN&#xff08;Peer-assisted Content Delivery Network&#xff0c;对等网络内容分发网络&#xff09;调度算法的优化效果是一个综合且系统的过程&#xff0c;涉及多个维度的考量。以下是一些建议的步骤和考量因素&#xff0c;以便全面评估优化效果&#xff1a; 一&…

WT32-ETH01作为TCP Server进行通讯

目录 模块简介WT32-ETH01作为TCP Server设置W5500作为TCP Client设置连接并进行通讯总结 模块简介 WT32-ETH01网关主要功能特点: 采用双核Xtensa⑧32-bit LX6 MCU.集成SPI flash 32Mbit\ SRAM 520KB 支持TCP Server. TCP Client, UDP Server. UDP Client工作模式 支持串口、wi…

探索乡村振兴新模式:发挥科技创新在乡村振兴中的引领作用,构建智慧农业体系,助力美丽乡村建设

随着科技的不断进步&#xff0c;乡村振兴工作正迎来前所未有的发展机遇。科技创新作为推动社会发展的重要力量&#xff0c;在乡村振兴中发挥着越来越重要的引领作用。本文旨在探讨如何发挥科技创新在乡村振兴中的引领作用&#xff0c;通过构建智慧农业体系&#xff0c;助力美丽…

【MySQL】(基础篇六) —— 过滤数据

过滤数据 本文将讲授如何使用SELECT语句的WHERE子句指定搜索条件。 WHERE子句 数据库表一般包含大量的数据&#xff0c;很少需要检索表中所有行。通常只会根据特定操作或需要提取表数据的子集。只检索所需数据需要指定搜索条件&#xff08;search criteria&#xff09;&…

IIR和FIR两种滤波器有什么区别?

概念的区分 IIR&#xff08;Infinite Impulse Response&#xff0c;无限脉冲响应&#xff09;和FIR&#xff08;Finite Impulse Response&#xff0c;有限脉冲响应&#xff09;滤波器是两种常见的数字信号处理滤波器类型&#xff0c;它们在结构、性能和用途上有显著区别&#…

大数据快速使用Kerberos认证集群

一、创建安全集群并登录其Manager 创建安全集群&#xff0c;开启“Kerberos认证“参数开关&#xff0c;并配置“密码“、“确认密码“参数。该密码用于登录Manager&#xff0c;请妥善保管。 登录MRS管理控制台页面。 单击“集群列表“&#xff0c;在“现有集群“列表&#xf…

【web性能】什么是图层?图层创建的条件?

CSS图层 浏览器在渲染一个页面时&#xff0c;会将页面分为很多个图层&#xff0c;图层有大有小&#xff0c;每个图层上有一个或多个节点。在渲染DOM的时候&#xff0c;浏览器所做的工作实际上是&#xff1a; 获取DOM后分割为多个图层&#xff1b;对每个图层的节点计算样式结果…

thinkphp6.0版本下子查询sql处理

目录 一&#xff1a;背景 二&#xff1a;查询实例 三&#xff1a;总结 一&#xff1a;背景 我们在实际业务的开发过程中&#xff0c;经常会碰到这样的场景&#xff0c;查询某些部门的客户信息&#xff0c;查询下过订单的客户信息。这里查询客户信息实际上就用到了子查询&…

打造智慧校园信息系统,提升学校科技实力

在如今数字化的时代&#xff0c;打造智慧校园信息系统已成为提升学校科技实力的关键。随着科技的迅猛发展&#xff0c;学校需要跟上时代步伐&#xff0c;利用先进技术建设一个高效、智能的信息系统&#xff0c;为学生、教师和管理人员提供更好的学习和工作环境。 智慧校园信息系…

专家解读 | NIST网络安全框架(3):层级配置

NIST CSF在核心部分提供了六个类别的关键功能和子功能&#xff0c;并围绕CSF的使用提供了层级&#xff08;Tier&#xff09;和配置&#xff08;Profile&#xff09;两种工具&#xff0c;使不同组织和用户更方便有效地使用CSF&#xff0c;本文将深入探讨CSF层级和配置的主要内容…

01、Linux网络设置

目录 1.1 查看及测试网络 1.1.1 查看网络配置 1、查看网络接口地址 2、查看主机状态 3、查看路由表条目 4、查看网络连接qing 1.1.2 测试网络连接 1.测试网络连接 2.跟踪数据包的路由路径 3.测试DNS域名解析 1.2 设置网络地址参数 1.2.1 使用网络配置命令 1.修改网卡…