Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

一、案例说明

现有一电商网站数据文件,名为buyer_favorite1,记录了用户对商品的收藏数据,数据以“\t”键分割,数据内容及数据格式如下:
在这里插入图片描述

二、前置准备工作

项目环境说明
Linux Ubuntu 16.04

jdk-7u75-linux-x64

scala-2.10.4

kafka_2.10-0.8.2.2

spark-1.6.0-bin-hadoop2.6

开启hadoop集群,zookeeper服务,开启kafka服务。再另开启一个窗口,在/apps/kafka/bin目录下创建一个topic。

/apps/zookeeper/bin/zkServer.sh start 
cd /apps/kafka  
bin/kafka-server-start.sh config/server.properties &  
cd /apps/kafka  
bin/kafka-topics.sh \  
--create \  
--zookeeper localhost:2181 \  
--replication-factor 1 \  
--topic kafkasendspark \  
--partitions 1

三、编写程序代码创建kafka的producer

1、新创一个文件folder命名为lib,并将jar包添加进来。(可以从我的博客主页资源里面下载)
2、进入以下界面,移除Scala Library。

在这里插入图片描述

3、操作完成后,再点击Add Library选项

在这里插入图片描述

4、进入以下界面

在这里插入图片描述

5、点击完成即可
6、最后创建如下项目结构的文件

在这里插入图片描述

四、编写代码,运行程序

编写生产者代码

package my.kafka;  
import java.io.BufferedReader;  
import java.io.File;  
import java.io.FileNotFoundException;  
import java.io.FileReader;  
import java.io.IOException;  
import java.util.Properties;  
import kafka.javaapi.producer.Producer;  
import kafka.producer.KeyedMessage;  
import kafka.producer.ProducerConfig;  
public class KafkaSend {  
    private final Producer<String, String> producer;  
  
    public final static String TOPIC = "kafkasendspark";  
  
    public KafkaSend(){  
        Properties props = new Properties();  
        // 此处配置的是kafka的端口  
        props.put("metadata.broker.list", "localhost:9092");  
        // 配置value的序列化类  
        props.put("serializer.class", "kafka.serializer.StringEncoder");  
        // 配置key的序列化类  
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");  
        props.put("request.required.acks", "-1");  
        producer = new Producer<String, String>(new ProducerConfig(props));  
    }  
  
    void produce() {  
        int lineNo = 1;  
        File file = new File("/data/case6/buyer_favorite1");  
        BufferedReader reader = null;  
        try {  
            reader = new BufferedReader(new FileReader(file));  
            String tempString = null;  
  
            while ( (tempString = reader.readLine()) != null ) {  
                String key = String.valueOf(lineNo);  
                String data = tempString;  
                producer.send(new KeyedMessage<String, String>(TOPIC, key, data));  
                System.out.println(data);  
                lineNo++;  
  
                Thread.sleep(100);  
  
            }  
        } catch (FileNotFoundException e) {  
            System.err.println(e.getMessage());  
        } catch (IOException e) {  
            System.err.println(e.getMessage());  
        } catch (InterruptedException e) {  
            System.err.println(e.getMessage());  
        }  
    }  
    public static void main(String[] args) {  
        System.out.println("start");  
        new KafkaSend().produce();  
        System.out.println("finish");  
    }  
}  

编写消费者代码

package my.scala  
import org.apache.spark.SparkConf  
import org.apache.spark.streaming.StreamingContext  
import org.apache.spark.streaming.Seconds  
import scala.collection.immutable.Map  
import org.apache.spark.streaming.kafka.KafkaUtils  
import kafka.serializer.StringDecoder  
import kafka.serializer.StringDecoder  
object SparkReceive {  
  def main(args: Array[String]) {  
  
    val sparkConf = new SparkConf().setAppName("countuser").setMaster("local")  
    val ssc = new StreamingContext(sparkConf, Seconds(2))  
    ssc.checkpoint("checkpoint")  
    val topics = Set("kafkasendspark")  
    val brokers = "localhost:9092"  
    val zkQuorum = "localhost:2181"  
  
    val kafkaParams = Map[String, String](  
        "metadata.broker.list" -> brokers,  
        "serializer.class" -> "kafka.serializer.StringEncoder"  
    )  
  
  
    val lines = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topics)  
    val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {  
      //通过Spark内部的reduceByKey按key规约,然后这里传入某key当前批次的Seq/List,再计算当前批次的总和  
      val currentCount = currValues.sum  
      // 已累加的值  
      val previousCount = prevValueState.getOrElse(0)  
      // 返回累加后的结果,是一个Option[Int]类型  
      Some(currentCount + previousCount)  
    }  
    val result=lines.map(line => (line._2.split("\t")) ).map( row => (row(0),1) ).updateStateByKey[Int](addFunc).print()  
  
    ssc.start();  
    ssc.awaitTermination()  
  }  
}  

五、运行程序

在Eclipse的SparkReceive类中右键并点击==>Run As==>Scala Application选项。

然后在KafkaSend类中:右键点击==>Run As==>Jave Application选项。

即可在控制窗口Console中查看输出结果为:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/37604.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++的switch函数用法

一个 switch 语句允许测试一个变量等于多个值时的情况。每个值称为一个 case&#xff0c;且被测试的变量会对每个 switch case 进行检查。 语法 C 中 switch 语句的语法&#xff1a; switch(expression){ case constant-expression : statement(s); break; // 可选的 case c…

解决MAC IDEA终端每次都要source ~/.zshrc

安装nvm之后&#xff0c;发现每隔一段时间&#xff08;不清楚是新打开一个终端还是会定时刷新&#xff09;就要重新执行source ~/zshrc&#xff0c;才能执行nvm命令。找了一圈发现idea默认使用的shell是bash&#xff0c;将默认的shell改成zsh就可以&#xff0c;更改位置&#x…

多模态系列论文--CoCa 详细解析

论文地址&#xff1a;CoCa: Contrastive Captioners are Image-Text Foundation Models 代码地址&#xff1a;CoCa CoCa 1 摘要2 网络结构3 损失函数4 实验结果5 总结 1 摘要 CoCa代表Contrastive Captioners的缩写&#xff0c;代表模型用两个目标函数训练出来的&#xff0c;一…

selenium怎么使用代理IP

什么是selenium Selenium 是一个自动化测试框架&#xff0c;用于测试 Web 应用程序的功能性。它支持多个编程语言&#xff08;如Java&#xff0c;Python&#xff0c;C#等&#xff09;并且可以在操作系统和不同浏览器上运行测试。Selenium 可以模拟用户在浏览器中的操作&#x…

PyTorch从零开始实现Transformer

文章目录 自注意力Transformer块编码器解码器块解码器整个Transformer参考来源全部代码&#xff08;可直接运行&#xff09; 自注意力 计算公式 代码实现 class SelfAttention(nn.Module):def __init__(self, embed_size, heads):super(SelfAttention, self).__init__()self.e…

RDS-Tools RDS-Knight Crack

RDS 高级安全性 利用全面的网络安全工具箱中有史以来最强大的安全功能集来保护您的 RDS 基础架构。 全方位 360 保护 无与伦比的功能集 无与伦比的物有所值 企业远程桌面安全。现代工作空间的智能解决方案。 办公室正在权力下放。远程办公室和移动员工数量创历史新高。随…

机器学习技术(四)——特征工程与模型评估

机器学习技术&#xff08;四&#xff09;——特征工程与模型评估(1️⃣) 文章目录 机器学习技术&#xff08;四&#xff09;——特征工程与模型评估(:one:)一、特征工程1、标准化2、特征缩放3、缩放有离群值的数据4、非线性转换5、样本归一化6、特征二值化7、标称特征编码(one-…

设计模式——命令模式

命令模式 定义 将一个请求封装成一个对象&#xff0c;从而让你使用不同的请求吧客户端参数化&#xff0c;对请求排队或者记录请求日志&#xff0c;可以提供命令的撤销和恢复功能。 命令模式是一个高内聚的模式。 优缺点、应用场景 优点 类间解耦。调用者与接收者之间没有任…

Linux系统使用(超详细)

目录 Linux操作系统简介 Linux和windows区别 Linux常见命令 Linux目录结构 Linux命令提示符 常用命令 ls cd pwd touch cat echo mkdir rm cp mv vim vim的基本使用 grep netstat Linux面试题 Linux操作系统简介 Linux操作系统是和windows操作系统是并列…

Github Pages使用自定义域名

Github Pages使用自定义域名 部署好网站后默认访问地址是xxx.github.io,我们想要自定义为自己的域名 1.DNS解析 这里我使用的是腾讯云,DNS解析DNSPod 添加两条解析记录: 第一个解析记录的记录类型为A&#xff0c;主机记录为&#xff0c;记录值为ping 你的github用户名.githu…

【Java】单例模式

单例模式 设计模式概述单例模式实现思路饿汉式懒汉式饿汉式 vs 懒汉式 设计模式概述 设计模式是在大量的实践中总结和理论化之后优选的代码结构、编程风格、以及解决问题的思考方式。设计模式免去我们自己再思考和摸索。就像是经典的棋谱&#xff0c;不同的棋局&#xff0c;我…

【unity之IMGUI实践】单例模式管理面板对象【一】

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;uni…

electron globalShortcut 快捷键与系统全局快捷键冲突

用 electron 开发自己的接口测试工具&#xff08;Post Tools&#xff09;&#xff0c;在设置了 globalShortcut 快捷键后&#xff0c;发现应用中的快捷键与系统全局快捷键冲突了&#xff0c;导致系统快捷键不可正常使用。 快捷键配置 export function initGlobalShortcut(main…

【宝塔】宝塔部署ThinkPHP项目

最近搞了个培训教育的小程序&#xff0c;后端服务用的是ThinkPHP。使用的过程中&#xff0c;发现对于这种小项目用php还是很不错的选择&#xff0c;开发便捷&#xff0c;轻量级。宝塔神器也是很不错的&#xff0c;值得推荐使用。 下面介绍一下项目中用宝塔部署ThinkPHP项目&…

【菜鸟の笔记_利用Excel自动总结表格数据_自动链接word文本】

自动更新总结表格数据 1. 撰写原因2. 解决的问题3. Excel自动总结表格数据内容&#xff08;一段话&#xff09;。3.1问题引出3.2解决方式 4.Excel数据、总结内容&#xff0c;自动链接更新Word文本 1. 撰写原因 【GPT的答案】利用Excel自动总结表格数据有以下好处&#xff1a; …

redis浅析

一 什么是NoSQL&#xff1f; Nosql not only sql&#xff08;不仅仅是SQL&#xff09; 关系型数据库&#xff1a;列行&#xff0c;同一个表下数据的结构是一样的。 非关系型数据库&#xff1a;数据存储没有固定的格式&#xff0c;并且可以进行横向扩展。 NoSQL泛指非关系…

RocketMQ 为何性能高

本文主要从性能角度考虑 RocketMQ 的实现。 整体架构 这是网络上流行的 RocketMQ 的集群部署图。 RocketMQ 主要由 Broker、NameServer、Producer 和 Consumer 组成的一个集群。 **NameServer&#xff1a;整个集群的注册中心和配置中心&#xff0c;管理集群的元数据。包括 T…

解析Android VNDK/VSDK Snapshot编译框架

1.背景 背景一&#xff1a; 为解决Android版本碎片化问题&#xff0c;引入Treble架构&#xff0c;它提供了稳定的新SoC供应商接口&#xff0c;引入HAL 接口定义语言&#xff08;HIDL/Stable AIDL&#xff0c;技术栈依然是Binder)&#xff0c;它指定了 vendor HAL 和system fr…

容器化背后的魔法之Docker底层逻辑解密

Docker内部工作原理是怎样的&#xff1f; 现在我们知道了Docker是什么以及它提供了哪些好处&#xff0c;让我们逐个重要的细节来了解。 什么是容器&#xff1f;它们是如何工作的&#xff1f; 在深入研究Docker的内部机制之前&#xff0c;我们首先要了解容器的概念。简单地说…

从2023中国峰会,看亚马逊云科技的生成式AI战略

“生成式AI的发展就像一场马拉松比赛&#xff0c;当比赛刚刚开始时&#xff0c;如果只跑了三四步就断言某某会赢得这场比赛&#xff0c;显然是不合理的。我们现在还处于非常早期的阶段。” 近日&#xff0c;在2023亚马逊云科技中国峰会上&#xff0c;亚马逊云科技全球产品副总裁…