笔记-python操作kafka实践

1、先看最简单的场景,生产者生产消息,消费者接收消息,下面是生产者的简单代码。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='xxxx:x')

msg_dict = {
    "sleep_time": 10,
    "db_config": {
        "database": "test_1",
        "host": "xxxx",
        "user": "root",
        "password": "root"
    },
    "table": "msg",
    "msg": "Hello World"
}
msg = json.dumps(msg_dict)
producer.send('test_rhj', msg, partition=0)
producer.close()
--------------------------------------------------------------------------------
下面是消费者的简单代码:
from kafka import KafkaConsumer

consumer = KafkaConsumer('test_rhj', bootstrap_servers=['xxxx:x'])
for msg in consumer:
    recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
    print recv

--------------------------------------------------------------------------------
下面是结果:

在这里插入图片描述2、如果想要完成负载均衡,就需要知道kafka的分区机制,同一个主题,可以为其分区,在生产者不指定分区的情况,kafka会将多个消息分发到不同的分区,消费者订阅时候如果不指定服务组,
会收到所有分区的消息,如果指定了服务组,则同一服务组的消费者会消费不同的分区,如果2个分区两个消费者的消费者组消费,则,每个消费者消费一个分区,如果有三个消费者的服务组,
则会出现一个消费者消费不到数据;如果想要消费同一分区,则需要用不同的服务组。以此为原理,我们对消费者做如下修改:
———————————————————————————————————

from kafka import KafkaConsumer

consumer = KafkaConsumer('test_rhj', group_id='123456', bootstrap_servers=['10.43.35.25:4531'])
for msg in consumer:
    recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
    print recv

然后我们开两个消费者进行消费,生产者分别往0分区和1分区发消息结果如下,可以看到,一个消费者只能消费0分区,另一个只能消费1分区:
在这里插入图片描述
在这里插入图片描述
3、kafka提供了偏移量的概念,允许消费者根据偏移量消费之前遗漏的内容,这基于kafka名义上的全量存储,可以保留大量的历史数据,历史保存时间是可配置的,一般是7天,如果偏移量定位到了已删除的位置那也会有问题,但是这种情况可能很小;每个保存的数据文件都是以偏移量命名的,当前要查的偏移量减去文件名就是数据在该文件的相对位置。要指定偏移量消费数据,需要指定该消费者要消费的分区,否则代码会找不到分区而无法消费,代码如下:

from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer(group_id='123456', bootstrap_servers=['10.43.35.25:4531'])
consumer.assign([TopicPartition(topic='test_rhj', partition=0), TopicPartition(topic='test_rhj', partition=1)])
print consumer.partitions_for_topic("test_rhj")  # 获取test主题的分区信息
print consumer.assignment()
print consumer.beginning_offsets(consumer.assignment())
consumer.seek(TopicPartition(topic='test_rhj', partition=0), 0)
for msg in consumer:
    recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
    print recv

因为指定的便宜量为0,所以从一开始插入的数据都可以查到,而且因为指定了分区,指定的分区结果都可以消费,结果如下:

在这里插入图片描述

4、有时候,我们并不需要实时获取数据,因为这样可能会造成性能瓶颈,我们只需要定时去获取队列里的数据然后批量处理就可以,这种情况,我们可以选择主动拉取数据

from kafka import KafkaConsumer
import time

consumer = KafkaConsumer(group_id='123456', bootstrap_servers=['10.43.35.25:4531'])
consumer.subscribe(topics=('test_rhj',))
index = 0
while True:
    msg = consumer.poll(timeout_ms=5)  # 从kafka获取消息
    print msg
    time.sleep(2)
    index += 1
    print '--------poll index is %s----------' % index

结果如下,可以看到,每次拉取到的都是前面生产的数据,可能是多条的列表,也可能没有数据,如果没有数据,则拉取到的为空:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/675599.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

浅测 长亭雷池 WAF “动态防护”

本文首发于 Anyeの小站 前言 雷池 WAF 社区版的更新速度是真快啊,几乎一周一个小版本,俩月一个大版本,攻城狮们真的狠啊,没法测了。 废话不多说,前两天看到了 这篇文章,对雷池的“动态防护”功能挺感兴趣…

二人订单共享模式:新零售电商盈利新秘诀

电商江湖日新月异,竞争如火如荼,如何脱颖而出,赢得消费者?二人订单共享模式,这是一种全新的商业模式,旨在打造爆款产品,迅速吸引大量客源,并激发消费者重复购买欲望。 首先&#xf…

期权懂带你懂50etf认沽期权和认购期权有什么区别?

今天带你了解期权懂带你懂50etf认沽期权和认购期权有什么区别?在金融市场中,期权是一种允许持有者在未来某个时间以特定价格买入或卖出基础资产的金融衍生品。 50etf认沽期权和认购期权有什么区别? 50ETF认沽期权和认购期权的主要区别在于它…

从报名到领证:软考高级【网络规划设计师】报名考试全攻略

本文共计10551字,预计阅读35分钟。包括七个篇章:报名、准考证打印、备考、考试、成绩查询、证书领取及常见问题。 不想看全文的可以点击目录,找到自己想看的篇章进行阅读。 一、报名篇 报名条件要求: 1.凡遵守中华人民共和国宪…

淘宝电商接口获取商品数据,该怎么获取?

淘宝电商接口(也称为淘宝开放平台API)允许开发者通过编程方式获取淘宝平台上的商品数据、订单数据等。然而,直接获取淘宝的商品数据并不是一件简单的事情,因为淘宝对API的使用有一定的限制和要求,包括权限申请、接口调…

项目-双人五子棋对战: websocket的讲解与使用 (1)

项目介绍 接下来, 我们将制作一个关于双人五子棋的项目, 话不多说先来理清一下需求. 1.用户模块 用户的注册和登录 管理用户的天梯分数, 比赛场数, 获胜场数等信息. 2.匹配模块 依据用户的天梯积分, 实现匹配机制. 3.对战模块 把两个匹配到的玩家放到同一个游戏房间中, 双方通…

线程进阶-2 ThreadLocal

一.简单说一下ThreadLocal 1.ThreadLocal是一个线程变量,用于在并发条件下,为不同线程提供相互隔离的变量存储空间。在多线程并发的场景下,每个线程往ThreadLocal中存的变量都是相互独立的。 2.基本方法 (1)set(Obj…

wms中对屏幕进行修改wm size设置屏幕宽高原理剖析

背景: 上面是正常屏幕1440x2960的屏幕大小,如果对display进行相关的修改,可以使用如下命令: adb shell wm size 1080x1920 得出如下的画面 明显看到差异就是屏幕上下有黑边了,那么下面就来调研这个wm size是怎么做的…

软件系统测试的定义和测试内容介绍

一、什么是软件系统测试? 软件系统测试是指对软件系统的功能、性能、可靠性、稳定性等方面进行全面检查和验证的过程。其目的是发现潜在的问题、缺陷和风险,并确保软件系统的质量和稳定性。 软件系统测试可以分为多个阶段,包括单元测试、集成测试、系…

《深入浅出C语言:从基础到指针的全面指南》

1. 简介 C语言是一种通用的编程语言,广泛应用于系统编程、嵌入式系统和高性能应用程序。它由Dennis Ritchie在1972年开发,并且至今仍然非常流行。C语言以其高效、灵活和强大的功能著称,是许多现代编程语言的基础。 2. 基本语法 2.1 Hello, …

44-4 waf绕过 - CDN简介

一、CDN简介 CDN,即内容分发网络(Content Delivery Network),是建立在现有网络基础之上的智能虚拟网络。它依靠部署在各地的边缘服务器,并通过中心平台的负载均衡、内容分发和调度等功能模块,使用户可以就近获取所需内容,从而降低网络拥塞,提高用户访问响应速度和命中率…

servlet小项目与servlet续集

文章目录 servlet小项目与servlet续集,是结合上一次的案例进行升级,给项目新增了,增加员工,删除员工,查询具体员工 功能新增操作修改操作删除操作过滤器Cookie servlet小项目与servlet续集,是结合上一次的案例进行升级,给项目新增了,增加员工,删除员工,查询具体员工 功能 上一…

Spring boot集成通义千问大模型实现智能问答

Spring boot集成通义千问大模型实现智能问答 背景 我在用idea进行java开发时发现了通义灵码这款免费的智能代码补全插件,用了一段时间了,感觉很不错。就想着在自己的项目中也能集成通义千问大模型实现智能回答,毕竟对接openai需要解决网络问…

社交媒体数据恢复:Weico

一、从备份中恢复数据 云备份 希望这篇教程能帮助你恢复Weico中的聊天记录和文件。如有其他问题,请随时联系我们。 三、注意事项 在尝试恢复数据的过程中,请避免执行任何可能导致数据进一步丢失的操作。 数据恢复的效果取决于多种因素,包…

GPU的最佳拍档HBM到底是什么

在AI界,英伟达的大名无人不知,无人不晓。然而即使在AI芯片界占据绝对霸主地位的英伟达,依旧受制于人。 众所周知,算力与带宽是制衡AI应用的两大关键因素,长期以来高速发展的算力受困于有限的带宽限制了其性能的最大发…

信息系统项目管理师0142:管理新实践(9项目范围管理—9.1管理基础—9.1.2管理新实践)

点击查看专栏目录 文章目录 9.1.2 管理新实践 9.1.2 管理新实践 需求一直是项目管理的关注重点,需求管理过程结束于需求关闭,即把产品、服务或成果移交给接收方,以便长期测量、监控、实现并维持收益。随着全球项目环境变得日益复杂&#xff0…

【人工智能Ⅱ】实验8:生成对抗网络

实验8:生成对抗网络 一:实验目的 1:理解生成对抗网络的基本原理。 2:学会构建改进的生成对抗网络,如DCGAN、WGAN、WGAN-GP等。 3:学习在更为真实的数据集上应用生成对抗网络的方法。 二:实验…

Java反序列化-RMI流程分析

RMI 在反序列化里漏洞里面是很常用的,它是一个分布式的思想。 RMI概述 RMI 通常包含两个独立的程序,一个服务端 和 一个客户端。服务端通过绑定这个远程对象类,它可以封装网络操作。客户端层面上只需要传递一个名字,还有地址。 …

LNMP 环境下使用 Zstd 压缩优化网站备份脚本

网站的备份一直都是网站运营、服务器运维中很重要的一环,明月无论是在自己的服务器还是客户的代运维服务器上都是非常重视网站备份的,尤其热衷于优化网站备份这块儿,毕竟明月自己的服务器配置一直都是最低的 1H1G 呀,就这配置常年…

(函数)空格填充(C语言)

一、运行结果&#xff1b; 二、源代码&#xff1b; # define _CRT_SECURE_NO_WARNINGS # include <stdio.h>//声明空格填充函数&#xff1b; void space(char a[100]);int main() {//初始化变量值&#xff1b;char a[100] { 0 };//获取用户输入的数据&#xff1b;print…