摸鱼大数据——Kafka——kafka tools工具使用

可以在可视化的工具通过点击来操作kafka完成主题的创建,分区等操作

注意: 安装完后桌面不会有快捷方式,需要去电脑上搜索,或者去自己选的安装位置找到发送快捷方式到桌面!

 连接配置

创建主题

删除主题

主题下的数据查看

数据显示问题说明

修改工具的数据显示类型

发送消息数据到kafka

Kafka的Python API的操作

模块安装

纯Python的方式操作Kafka。

准备工作:在node1的节点上安装一个python用于操作Kafka的库

安装kafka-python 模模块 ,模块中提供了操作kafka的方法

在线安装

在node1上安装就可以,需要保证服务器能够连接网络

 安装命令: python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple

离线安装

将kafka_python-2.0.2-py2.py3-none-any.whl安装包上传服务器software目录下进行安装

 安装命令: pip install kafka_python-2.0.2-py2.py3-none-any.whl

模块使用

API使用的参考文档: Usage — kafka-python 2.0.2-dev documentation

模块中封装了两个类,

一个是生成者类KafkaProducer,提供了向kafka写数据的方法

另一个是消费者类KafkaConsumer,提供了读取kafka数据的方法

完成生产者代码

生成者类KafkaProducer,提供了向kafka写数据的方法

 send(topic,valu)方法: 发送消息
 topic参数:指定向哪个主题发送消息
 value参数:指定发送的消息数据 ,数据类型要求是bytes类型

示例:

 # 导包
 from kafka import KafkaProducer
 ​
 # 编写代码
 if __name__ == '__main__':
     # 创建生产者对象并指定对应服务器
     producer = KafkaProducer(bootstrap_servers=['node1:9092'])
     # 发送消息
     for i in range(1,101):
         future = producer.send('kafka', f'hi_kafka_{i}'.encode())
         # 获取元数据
         record_metadata = future.get()
         # 从元数据中获取主题,分区,偏移
         print(record_metadata.topic)
         print(record_metadata.partition)
         print(record_metadata.offset)

完成消费者代码

消费者类KafkaConsumer,提供了读取kafka数据的方法

 KafkaConsumer(topic,bootstrap_servers)
 第一个参数:指定消费者连接的主题,
 第二个参数:指定消费者连接的kafka服务器

示例:

 # 导包
 from kafka import KafkaConsumer
 ​
 # 编写代码
 if __name__ == '__main__':
 ​
     # 创建消费者对象
     consumer = KafkaConsumer('kafka',bootstrap_servers=['node1:9092'])
     # 遍历对象
     for message in consumer:
 ​
         # 格式化打印,设置相关参数
         # 因为value是二进制,需要decode解码
         print ("主题:%s,分区:%d,偏移:%d : key=%s value=%s"
                % (message.topic, message.partition,message.offset, message.key, message.value.decode('utf8')))
 ​

可能遇到的错误:

 原因: 服务器环境有问题。是因为服务器上既安装了kafka-python的第三方依赖,同时还安装kafka的第三方依赖。可以通过pip list | grep kafka进行确定
 解决办法: 先将这两个第三方依赖全部卸载,然后再重新执行如下命令
 python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/800663.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Laravel :如何将Excel文件导入数据库

文章目录 一、前提二、使用2.1、新建一个导入文件2.2、新建一个控制器和方法,调用导入文件2.3、 新建一个页面,支持文件上传 一、前提 想要将excel内容入库,laravel有扩展可以使用,常用的扩展是maatwebsite/excel,安装步骤参考上一篇&#x…

校园工会体育报名小程序的设计

管理员账户功能包括:系统首页,个人中心,赛事公告管理,球员管理,球队信息管理,比赛信息,比赛报名管理 微信端账号功能包括:系统首页,比赛信息,比赛报名&#…

7月考研数学的保底进度,警惕三个误区!

误区1. 不恰当的课程选择和学习计划 尤其25张宇36讲大改版,一些同学感到焦虑,担心自己的课程选择不适合自己。 或者担心学习计划不够高效,影响最终的成绩。 课程选择,看3方面: 1. 覆盖是否全面? 2. 是否…

element-ui dialog 嵌套

dialog 内部嵌套 dialog,内层的 dialog 层级显示会遮罩在内容的 dialog 内容区域之上,内层 dialog 添加 append-to-body 属性即可,如官方文档:

网安小贴士(11)VPN类型

前言 VPN(Virtual Private Network,虚拟专用网络)类型多样,主要根据其使用的协议、应用场景以及实现方式等因素进行分类。以下是对VPN类型的详细概述: 一、按协议分类 根据使用的隧道协议,VPN可以分为以下几…

java设计模式(十五)命令模式(Command Pattern)

1、模式介绍: 命令模式(Command Pattern)是一种行为设计模式,其主要目的是将请求封装成一个对象,从而允许使用不同的请求、队列或者日志来参数化其他对象。这种模式使得命令的请求者和实现者解耦。 2、应用场景&…

c++树笔记

树的定义 树(Tree)是n(n≥0)个结点的有限集。n0时称为空树。在任意一颗非空树中:①有且仅有一个特定的称为根(Root)的结点;②当n>1时,其余结点可分为m(m&g…

基于 jenkins 部署接口自动化测试项目!

引言 在现代软件开发过程中,自动化测试是保证代码质量的关键环节。通过自动化测试,可以快速发现和修复代码中的问题,从而提高开发效率和产品质量。而 Jenkins 作为一款开源的持续集成工具,可以帮助我们实现自动化测试的自动化部署…

itextpdf字体选择

itextpdf 版本7.2.5 itextpdf-html2pdf 版本4.0.5 这里讲的是通过html转pdf,在html2pdf中是通过html中font-family样式来确定字体的,那已知font-family的情况,怎么确定pdf中实际用的字体,大致分为两步: 1、通过font…

ollama 模型国内加速下载,制作自定义Modelfile模型文件

参考: https://www.zhihu.com/question/640579563/answer/3562899008 https://github.com/ollama/ollama/blob/main/docs/modelfile.md gguf格式介绍: https://www.datalearner.com/blog/1051705718835586 1、ollama 模型国内加速下载 ollama主要的模型文件格式是gguf,可…

LabVIEW扬尘控制系统

设计了一套基于LabVIEW的扬尘控制系统,通过监测TsP(总悬浮颗粒物)浓度、风向和摄像头视频,实现对环境的综合监控和扬尘控制。系统可以自动判断扬尘位置,并驱动抑尘设备进行抑尘。硬件选用NI cDAQ-9178数据采集模块、Om…

甄选范文“论基于构件的软件开发方法及其应用”,软考高级论文,系统架构设计师论文

论文真题 基于构作的软件开发 (Component-Based Software Development,CBSD) 是一种基于分布对象技术、强调通过可复用构件设计与构造软件系统的软件复用途径。基于构件的软件系统中的构件可以是COTS (Commercial-Off-the-Shelf)构件,也可以是通过其它途径获得的构件(如自…

Android Stuido Gradle build编译报错原因排查

事情是这样的,在更新了支付宝sdk的aar文件后,运行项目,报错了。如下图: 但是没有给出更多错误信息。想尝试通过gradlew compileDebug --stacktrace来输出更多build时的信息,但没有得到更多有效信息。 接下来&#xff…

JVM(Java虚拟机)详解(JVM 内存模型、堆、GC、直接内存、性能调优)

JVM(Java虚拟机) JVM 内存模型 结构图 jdk1.8 结构图(极简) jdk1.8 结构图(简单) JVM(Java虚拟机): 是一个抽象的计算模型。如同一台真实的机器,它有自己…

OrangePi Aipro Ai计算测试

开发板配置 http://www.orangepi.cn/html/hardWare/computerAndMicrocontrollers/details/Orange-Pi-AIpro.html CPU4核64位处理器 AI处理器GPU集成图形处理器AI算力8-12TOPS算力内存LPDDR4X:8GB/16GB(可选),速率:3200…

AI算法16-贝叶斯线性回归算法Bayesian Linear Regression | BLR

贝叶斯线性回归算法简介 频率主义线性回归概述 线性回归的频率主义观点可能你已经学过了:该模型假定因变量(y)是权重乘以一组自变量(x)的线性组合。完整的公式还包含一个误差项以解释随机采样噪声。如有两个自变量时…

使用NIFI连接瀚高数据库_并从RestFul的HTTP接口中获取数据局_同步到瀚高数据库中---大数据之Nifi工作笔记0067

首先来看一下如何,使用NIFI 去连接瀚高数据库. 其实,只要配置好了链接的,连接字符串,和驱动,任何支持JDBC的数据库都可以连接的. 首先我们用一个ListDatabaseTables处理器,来连接瀚高DB 主要是看这里,连接地址,以及驱动,还有驱动的位置 这个是数据连接的配置 jdbc:highgo://…

MagicClothing: 给人物照片换装的ComfyUI工作流(干货满满)

前言 在试验了各种ComfyUI 工作流,换了3台电脑,失败了无数次之后,终于又一次跑通了ComfyUI。 接下来会分享跑成功的各种ComfyUI工作流。 今天就拿给人物换装的新出来的这个做一个样本。 上一次文章提到给人物换装的模型[OOTDiffusion: 给人…

【OTA】-【汽车远程升级】- 技术背景及现状分析

一、OTA技术背景 在21世纪,计算机技术、物联网技术、移动通讯技术取得了巨大的成就,这些技术的发展使智能网联汽车越来越多。智能网联汽车在各行各业,例如:交通调度、工业现场、物联网设备、智慧城市、军队武器装备等,…

软件工程课设——成绩管理系统

软件工程课设——成绩管理系统 该文档是软件工程课程设计,成绩管理子系统的开发模块仓库。 功能分析 从面向的用户分,成绩管理子系统主要面向三类用户,即至少需要满足这三类用户的需求: 学生:学生是成绩管理系统的…