Spark读取kafka(流式和批数据)

spark读取kafka(批数据处理)

在这里插入图片描述

在这里插入图片描述

# 按照偏移量读取kafka数据
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

# spark读取kafka
options = {
    # 写kafka配置信息
    # 指定kafka的连接的broker服务节点信息
    'kafka.bootstrap.servers': 'node1:9092',
    # 指定主题
    'subscribe': 'itcast',# 读取的主题不存在会自动创建
    # todo 注意一:连接的配置
    #       主题名称 ,分区编号,偏移量
    # 指定起始偏移量   {主题名称:{分区编号0:偏移量,分区编号1:偏移量....}}
    'startingOffsets':""" {"itcast":{"0":0,"1":1}} """,
    # 指定结束偏移量  {主题名称:{分区编号0:偏移量,分区编号1:偏移量....}}
    'endingOffsets':""" {"itcast":{"0":3,"1":2}}  """
    # 注意点  : 偏移量的区间是左闭右开 ,结束偏移的指定按照最大偏移量加一 ,所有分区都要指定
}
# 读取
# format 指定读取kafka
df = ss.read.load(format='kafka',**options)
# todo 注意二:这一步的数据处理(将value转化为字符串类型)是必须做的,不然你看不懂数据。
#       可以用df.的方式,那我后来怎么都没怎么见过了0
df_select = df.select('key',df.value.cast('string'),'topic','partition','offset','timestamp','timestampType')
# 查看df数据
# todo 注意三:这里使用.show()的方式的,是因为它是有界表
df_select.show()

在这里插入图片描述

spark读取kafka(流数据处理)

在这里插入图片描述

# 流式读取kafka数据
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()
# todo 注意一:定义kafka的连接配置
options={
    # 写kafka配置信息
    # 指定kafka的连接的broker服务节点信息
    'kafka.bootstrap.servers': 'node1:9092',
    # 指定主题
    'subscribe': 'itheima'  # 读取的主题不存在会自动创建
}
df = ss.readStream.load(format='kafka',**options)
# todo 注意二:必须将value转化为string类型

# 计算
df_res = df.select('key',df.value.cast('string'),'topic','partition','offset','timestamp')

# 输出
# todo 注意三:输出不是df_res.show,
df_res.writeStream.start(format='console',outputMode='append').awaitTermination()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/335814.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

无法访问云服务器上部署的Docker容器

说明:记录一次无法访问云服务器上部署的Docker容器的问题。 问题描述 某次,我在云服务器上,使用Docker运行了一个Nginx容器,用公网IP怎么也访问不到。这种情况博主也算有经验,可以从以下几个方面去排查: …

舵机使用总结

文章目录 1 舵机简介2 注意事项3 编写驱动程序3.1 使用STM32作为控制器3.1.1 计算高电平对应程序中的取值范围3.1.2 编写控制程序 1 舵机简介 舵机使用PWM控制,周期为20ms,通过改变高电平占空比来驱动,高电平通常为1~2ms( 或 0.5 …

RabbitMQ系列之入门级

🎉🎉欢迎来到我的CSDN主页!🎉🎉 🏅我是君易--鑨,一个在CSDN分享笔记的博主。📚📚 🌟推荐给大家我的博客专栏《RabbitMQ系列之入门级》。🎯&#x…

YOLOv8全网首发:新一代高效可形变卷积DCNv4如何做二次创新?高效结合SPPF

💡💡💡本文独家改进:DCNv4更快收敛、更高速度、更高性能,与YOLOv8 SPPF高效结合 收录 YOLOv8原创自研 https://blog.csdn.net/m0_63774211/category_12511737.html?spm=1001.2014.3001.5482 💡💡💡全网独家首发创新(原创),适合paper !!! 💡💡💡…

AOI与AVI:在视觉检测中的不同点和相似点

AOI(关注区域)和AVI(视觉感兴趣区域)是视觉检测中常用的两个概念,主要用于识别和分析图像或视频中的特定区域。虽然这两个概念都涉及到注视行为和注意力分配,但它们在定义和实际应用等方面有一些差异。 AOI…

x86-x64汇编语言、反汇编知识和IDA

x86-x64汇编语言 基础知识 x86寄存器: 通用寄存器:EAX, EBX, ECX, EDX, ESI, EDI 栈顶指针寄存器:ESP 栈底指针寄存器:EBP 指令计数器:EIP 段寄存器:CS, DS, ES, FS, GS, SS x86-64寄存器:&a…

2.【C语言】(函数指针||sizeof||笔试题)

0x01.函数指针 void test(const char* str) {printf("%s\n", str); }int main() {void (*pf)(const char*) test;//pf是函数指针变量void (*pfarr[10])(const char*);//pfarr是存放函数指针的数组void (*(*p)[10])(const char*) &pfarr;//p是指向函数指针数组…

Leetcoder Day10|栈与队列part02(栈的应用)

语言:Java/C 目录 20. 有效的括号 1047. 删除字符串中的所有相邻重复项 150. 逆波兰表达式求值 今日总结 20. 有效的括号 给定一个只包括 (,),{,},[,] 的字符串,判断字符串是否有效。 有效字…

WEB接口测试之Jmeter接口测试自动化 (三)(数据驱动测试)

接口测试与数据驱动 1简介 数据驱动测试,即是分离测试逻辑与测试数据,通过如excel表格的形式来保存测试数据,用测试脚本读取并执行测试的过程。 2 数据驱动与jmeter接口测试 我们已经简单介绍了接口测试参数录入及测试执行的过程&#xff0…

Unity3D Pico VR 手势识别物体交互 适配 MRTK3

当前Pico已经支持手势识别了,但是提供的PICO Unity Integration SDK 中是没有手势和物体交互的功能,Unity XR Interaction Toolkit提供的手势识别物体交互对 Quest适配的挺好的,Pico 当前只能用指尖点触还不能对物体进行抓握以及手势控制射线…

数据结构一:算法效率分析(时间复杂度和空间复杂度)-重点

在学习具体的数据结构和算法之前,每一位初学者都要掌握一个技能,即善于运用时间复杂度和空间复杂度来衡量一个算法的运行效率。所谓算法,即解决问题的方法。同一个问题,使用不同的算法,虽然得到的结果相同,…

开发实践8_project

要求: 使用Restful对Chaos模型作基本操作。 结果: post 3 组数据后,get 查询如下: put修改后get: delete pk3之后get: 代码: python manage.py startapp pro8_app 注册 总路由 // path(pr…

免费200万Tokens 用科大讯飞API调用星火大模型服务

简介 自ChatGPT火了之后,国内的大模型发展如雨后春笋。其中的佼佼者之一就是科大讯飞研发的星火大模型,现在大模型已经更新到V3 版本,而且对开发者也是相当友好,注册就送200万tokens,讯飞1tokens 约等于 1.5 个中文汉字 或者 0.8 个英文单词…

JVM 如何判断一个对象可以被回收

Hi, 我是 浮生。 今天分享一道一线互联网公司必问的面试题。 ”JVM 如何判断一个对象可以被回收“ 关于这个问题,来看看高手的回答。 一、问题解析 在 JVM 里面,要判断一个对象是否可以被回收,最重要的是判断这个对象是否还在被…

中仕教育:省考怎么查每个岗位报考人数?一篇文章带你搞定!

参加省考避开热门岗位能够一定程度上提高上岸几率,怎么看岗位报考人数? 1. 官方公告:每年省考发布招录公告时,会公布各个岗位的招录人数,可以关注招录信息。 2. 查询报名数据:在报名结束后,省考招录机关…

debian12.4配置

文章目录 debian12.4配置概述笔记将非root用户添加到sudo组更换国内源配置ssh的客户端访问END debian12.4配置 概述 在虚拟机中装了一个debian12.4, 想配置ssh客户端连接, 出了问题. 配置乱了, 还好长了个心眼, 做了快照. 发现2个问题: debian12.4默认安装完, 有ssh, 先检查…

Python自动化测试【selenium面试题】

一、selenium中如何判断元素是否存在? expected_conditions模块提供了16种判断方法,以下方法是判断元素存在DOM中: presence_of_element_located """ An expectation for checking that an element is present on the DOM of…

【Linux】第三十一站:管道的一些应用

文章目录 一、我们之前的|(竖划线)管道二、自定义shell三、使用管道实现一个简易的进程池1.详解2.代码3.一个小bug4.最终代码 一、我们之前的|(竖划线)管道 cat test.txt | head -10 | tail -5如上代码所示,是我们之前所用的管道 我们拿下面这个举个例子 当我们用…

【SpringBoot】—— 如何创建SpringBoot工程

SpringBoot简化了Spring应用的初始搭建和开发过程。 工程创建 新建模块 出现java: 错误: 无效的源发行版:18这样的错误, 修改pom.xml文件 出现以下信息,即运行成功 修改默认端口 创建application.yml文件 内容: server:port:…

【没学过编程语言,想要做一款游戏应该怎么做?】

*** 【没学过编程语言,想要做一款游戏应该怎么做?】 想让你的创意成为像《堡垒之夜》《原神》这样引爆式的热门游戏吗? 想制作一个能与《我的世界》《模拟城市》一决高下的畅销游戏吗? 即使你手头并没有复杂的代码能力&#xf…