用pyspark把kafka主题数据经过etl导入另一个主题中的有关报错

首先看一下我们的示例代码

import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
"""
------------------------------------------
  Description : TODO:
  SourceFile : etl_stream_kafka
  Author  : zxx
  Date  : 2024/11/14
-------------------------------------------
"""
if __name__ == '__main__':
    os.environ['JAVA_HOME'] = 'D:/bigdata/03-java/java-8/jdk'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/bigdata/04-Hadoop/hadoop/hadoop-3.3.1/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'D:/bigdata/22-spark/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'D:/bigdata/22-spark/Miniconda3/python.exe'
    spark = SparkSession.builder.master("local[2]").appName("etl_stream_kafka").config(
        "spark.sql.shuffle.partitions", 2).getOrCreate()
    # 连接kafka
    readDF = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "bigdata01:9092") \
    .option("subscribe", "topicA") \
    .load()

    # 使用DSL语句
    etlDF = readDF.selectExpr("cast(value as STRING)").filter(F.col("value").contains("success"))

    etlDF.writeStream \
    .format("kafka") \
        .option("kafka.bootstrap.servers", "bigdata01:9092") \
        .option("topic", "etlTopic") \
        .option("checkpointLocation", "../../datas/kafka_stream") \
        .start().awaitTermination()
    # 关闭
    spark.stop()

运行发现报错

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
  File "D:\bigdata\18-python\pyspark_project\pythonProject1\main\streamingkafka\etl_stream_kafka.py", line 22, in <module>
    readDF = spark.readStream.format("kafka") \
  File "D:\bigdata\22-spark\Miniconda3\lib\site-packages\pyspark\sql\streaming.py", line 482, in load
    return self._df(self._jreader.load())
  File "D:\bigdata\22-spark\Miniconda3\lib\site-packages\py4j\java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "D:\bigdata\22-spark\Miniconda3\lib\site-packages\pyspark\sql\utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".

报错 : org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;

解决:这个是因为缺少了Kafka和Spark的集成包,前往https://mvnrepository.com/artifact/org.apache.spark

下载对应的jar包即可,比如我是SparkSql写入的Kafka,那么我就需要下载Spark-Sql-Kafka.x.x.x.jar

 进入网站(已打包放入文章末尾)

找到对应有关spark 和kafka的模块

找到对应的版本 ,这里我用的kafka是3.0版本,下载的是3.1.2版本

 点进去,下载jar包

 再次运行会发现仍然报错,这是因为jar包之间的依赖关系,从刚才下载的界面下面再下载有关的jar包

 

 

 

 再次运行即可

 jar包下载链接

【免费】用pyspark把数据从kafka的一个主题用流处理后再导入kafka的另一个主题的有关报错资源-CSDN文库

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/918058.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

单片机_day3_GPIO

目录 1. 灯如何才能亮 1.1原理图 1.2 二极管 1.3 换了一个灯和原理图 ​编辑 1.4 三极管 1.4.1 NPN型三极管 1.4.2 PNP型三极管 2. 基本概念 3. 输入 3.1 浮空输入 3.2 上拉输入 3.3 下拉输入 3.4 模拟输入 4. 输出 4.1 推挽输出 4.2 开漏输出 如何让开漏输出…

基于视觉智能的时间序列基础模型

GitHub链接&#xff1a;ViTime: A Visual Intelligence-Based Foundation Model for Time Series Forecasting 论文链接&#xff1a;https://github.com/IkeYang/ViTime 前言 作者是来自西安理工大学&#xff0c;西北工业大学&#xff0c;以色列理工大学以及香港城市大学的研…

java项目-jenkins任务的创建和执行

参考内容: jenkins的安装部署以及全局配置 1.编译任务的general 2.源码管理 3.构建里编译打包然后copy复制jar包到运行服务器的路径 clean install -DskipTests -Pdev 中的-Pdev这个参数用于激活 Maven 项目中的特定构建配置&#xff08;Profile&#xff09; 在 pom.xml 文件…

Qt按钮类-->day09

按钮基类 QAbstractButton 标题与图标 // 参数text的内容显示到按钮上 void QAbstractButton::setText(const QString &text); // 得到按钮上显示的文本内容, 函数的返回就是 QString QAbstractButton::text() const;// 得到按钮设置的图标 QIcon icon() const; // 给按钮…

论文6—《基于YOLOv5s的深度学习在自然场景苹果花朵检测中的应用》文献阅读分析报告

论文报告&#xff1a;基于YOLOv5s的深度学习在自然场景苹果花朵检测中的应用 基于YOLOv5s的深度学习在自然场景苹果花朵检测中的应用 摘要国内外研究现状1. 疏花技术研究2. 目标检测算法研究 研究目的研究问题使用的研究方法试验研究结果文献结论创新点和对现有研究的贡献1. Y…

「人眼视觉不再是视频消费的唯一形式」丨智能编解码和 AI 视频生成专场回顾@RTE2024

你是否想过&#xff0c;未来你看到的电影预告片、广告&#xff0c;甚至新闻报道&#xff0c;都可能完全由 AI 生成&#xff1f; 在人工智能迅猛发展的今天&#xff0c;视频技术正经历着一场前所未有的变革。从智能编解码到虚拟数字人&#xff0c;再到 AI 驱动的视频生成&#…

计算机毕业设计Python美食推荐系统 美团爬虫 美食可视化 机器学习 深度学习 混合神经网络推荐算法 Hadoop Spark 人工智能 大数据毕业设计

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

GPU分布式通信技术-PCle、NVLink、NVSwitch深度解析

GPU分布式通信技术-PCle、NVLink、NVSwitch 大模型时代已到来&#xff0c;成为AI核心驱动力。然而&#xff0c;训练大模型却面临巨大挑战&#xff1a;庞大的GPU资源需求和漫长的学习过程。 要实现跨多个 GPU 的模型训练&#xff0c;需要使用分布式通信和 NVLink。此外&#xf…

MySQL:联合查询(2)

首先写一个三个表的联合查询 查询所有同学的每门课成绩&#xff0c;及同学的个人信息 1.我们首先要确定使用哪些表 学生表&#xff0c;课程表&#xff0c;成绩表 2.取笛卡尔积 select * from score,student,course; 3. 确定表与表之间的联合条件 select * from score,stud…

【leetcode】704. 二分查找

注意一般mid left (right-left)/2; 不要用mid (right - left)/2 中间值的计算需要考虑到整型溢出的问题。 如果使用 mid (right - left) / 2 的方式计算中间值&#xff0c;那么在 right 和 left 的值接近极限值的情况下&#xff0c;可能会导致计算出的中间值发生整型溢出&…

RHCE的练习(12)

写一个脚本&#xff0c;完成以下要求&#xff1a; 给定一个用户&#xff1a; 如果其UID为0&#xff0c;就显示此为管理员&#xff1b;否则&#xff0c;就显示其为普通用户&#xff1b; #!/bin/bash ​ # 使用read命令获取用户名 read -p "请输入用户名: " username ​…

WPF-控件的属性值的类型转化

控件的属性值需要转成int、double进行运算的&#xff0c;可以使用一下方法 页面代码 <StackPanel Margin"4,0,0,0" Style"{StaticResource Form-StackPanel}"> <Label Content"替换后材料增加金额&#xff…

【从零开始的LeetCode-算法】3270. 求出数字答案

给你三个 正 整数 num1 &#xff0c;num2 和 num3 。 数字 num1 &#xff0c;num2 和 num3 的数字答案 key 是一个四位数&#xff0c;定义如下&#xff1a; 一开始&#xff0c;如果有数字 少于 四位数&#xff0c;给它补 前导 0 。答案 key 的第 i 个数位&#xff08;1 < …

iMetaOmics | 刘永鑫/陈同-用于食物微生物组成和时间序列研究的微生物组数据库FoodMicroDB...

点击蓝字 关注我们 FoodMicroDB&#xff1a;用于食物微生物组成和时间序列研究的微生物组数据库 iMeta主页&#xff1a;http://www.imeta.science 研究论文 ● 原文链接DOI: https://doi.org/10.1002/imo2.40 ● 2024年11月1日&#xff0c;中国农业科学院深圳农业基因组研究所刘…

视觉slam十四讲 ch8 光流法和直接法

之前的都是单层光流 转载至Blibli 视觉SLAM十四讲_7视觉里程计1_计算相机运动_哔哩哔哩_bilibili

QSS 设置bug

问题描述&#xff1a; 在QWidget上add 一个QLabel&#xff0c;但是死活不生效 原因&#xff1a; c 主程序如下&#xff1a; QWidget* LOGO new QWidget(logo_wnd);LOGO->setFixedSize(logo_width, 41);LOGO->setObjectName("TittltLogo");QVBoxLayout* tit…

Linux运维篇-iscsi存储搭建

目录 概念实验介绍环境准备存储端软件安装使用targetcli来管理iSCSI共享存储 客户端软件安装连接存储 概念 iSCSI是一种在Internet协议上&#xff0c;特别是以太网上进行数据块传输的标准&#xff0c;它是一种基于IP Storage理论的存储技术&#xff0c;该技术是将存储行业广泛…

WSL--无需安装虚拟机和docker可以直接在Windows操作系统上使用Linux操作系统

安装WSL命令 管理员打开PowerShell或Windows命令提示符&#xff0c;输入wsl --install&#xff0c;然后回车 注意&#xff1a;此命令将启用运行 WSL 和安装 Linux 的 Ubuntu 发行版所需的功能。 注意&#xff1a;默认安装最新的Ubuntu发行版。 注意&#xff1a;默认安装路径是…

【学习心得】算力云平台上的大模型部署并实现远程调用

以AutoDL算力云平台为例&#xff0c;部署国产开源ChatGLM3b模型。 一、准备工作 &#xff08;1&#xff09;准备一台算力服务器 首先&#xff0c;进入AutoDL官网的算力时长选择算力服务器资源。 创建好后会自动跳转控制台的“容器实例”界面&#xff0c;稍等片刻后选择“快捷…

Vue 中的透传,插槽,依赖注入

1. 透传attributes 在组件上使用透传attribute&#xff1a; 当你在父组件中使用子组件时&#xff0c;你可以添加一些attribute到子组件上&#xff0c;即使这些attribute没有在子组件的props中声明。 父组件&#xff1a; <!-- 父组件&#xff0c;例如 ParentComponent.vue…