kafka第三课-可视化工具、生产环境问题总结以及性能优化

一、可视化工具

https://pan.baidu.com/s/1qYifoa4 密码:el4o

  1. 下载解压之后,编辑该文件,修改zookeeper地址,也就是kafka注册的zookeeper的地址,如果是zookeeper集群,以逗号分开

vi conf/application.conf

在这里插入图片描述

  1. 启动命令
bin/kafka-manager
  1. 默认9000端口,直接访问,添加一个名字,名字只是方便开发者使用,随便起,地址是zookeeper的地址和端口
    在这里插入图片描述
    在这里插入图片描述
  2. 点进去可以看到对应的主体以及kafka的集群节点信息

在这里插入图片描述

二、线上环境规划

在这里插入图片描述
JVM参数设置
kafka是scala语言开发,运行在JVM上,需要对JVM参数合理设置,参看JVM调优专题
修改bin/kafka-start-server.sh中的jvm设置,假设机器是32G内存,可以如下设置:

export KAFKA_HEAP_OPTS="-Xmx16G -Xms16G -Xmn10G -XX:MetaspaceSize=256M -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:G1HeapRegionSize=16M"
这种大内存的情况一般都要用G1垃圾收集器,因为年轻代内存比较大,用G1可以设置GC最大停顿时间,不至于一次minor gc就花费太长时间,当然,因为像kafka,rocketmq,es这些中间件,写数据到磁盘会用到操作系统的page cache,所以JVM内存不宜分配过大,需要给操作系统的缓存留出几个G。

这种大内存的情况一般都要用G1垃圾收集器,因为年轻代内存比较大,用G1可以设置GC最大停顿时间,不至于一次minor gc就花费太长时间,当然,因为像kafka,rocketmq,es这些中间件,写数据到磁盘会用到操作系统的page cache,所以JVM内存不宜分配过大,需要给操作系统的缓存留出几个G。

三、线上问题及优化

1、消息丢失情况:

消息发送端:
(1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。大数据统计报表场景,对性能要求很高,对数据丢失不敏感的情况可以用这种。
(2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
(3)acks=-1或all: 这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。当然如果min.insync.replicas配置的是1则也可能丢消息,跟acks=1情况类似。

消息消费端:
如果消费这边配置的是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时你consumer直接宕机了,未处理完的数据丢失了,下次也消费不到了。

2、消息重复消费

消息发送端:
发送消息如果配置了重试机制,比如网络抖动时间过长导致发送端发送超时,实际broker可能已经接收到消息,但发送方会重新发送消息
消息消费端:
如果消费这边配置的是自动提交,刚拉取了一批数据处理了一部分,但还没来得及提交,服务挂了,下次重启又会拉取相同的一批数据重复处理
一般消费端都是要做消费幂等处理的。

3、消息乱序

如果发送端配置了重试机制,kafka不会等之前那条消息完全发送成功才去发送下一条消息,这样可能会出现,发送了1,2,3条消息,第一条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了
所以,是否一定要配置重试要根据业务情况而定。也可以用同步发送的模式去发消息,当然acks不能设置为0,这样也能保证消息发送的有序。
kafka保证全链路消息顺序消费,需要从发送端开始,将所有有序消息发送到同一个分区,然后用一个消费者去消费,但是这种性能比较低,可以在消费者端接收到消息后将需要保证顺序消费的几条消费发到内存队列(可以搞多个),一个内存队列开启一个线程顺序处理消息。

4、消息积压

1)线上有时因为发送方发送消息速度过快,或者消费方处理消息过慢,可能会导致broker积压大量未消费消息。
此种情况如果积压了上百万未消费消息需要紧急处理,可以修改消费端程序,让其将收到的消息快速转发到其他topic(可以设置很多分区),然后再启动多个消费者同时消费新主题的不同分区。

2)由于消息数据格式变动或消费者程序有bug,导致消费者一直消费不成功,也可能导致broker积压大量未消费消息。
此种情况可以将这些消费不成功的消息转发到其它队列里去(类似死信队列),后面再慢慢分析死信队列里的消息处理问题。

5、延时队列

延时队列存储的对象是延时消息。所谓的“延时消息”是指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取这个消息进行消费,延时队列的使用场景有很多, 比如 :
1)在订单系统中, 一个用户下单之后通常有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行异常处理,这时就可以使用延时队列来处理这些订单了。
2)订单完成1小时后通知用户进行评价。
实现思路:发送延时消息时先把消息按照不同的延迟时间段发送到指定的队列中(topic_1s,topic_5s,topic_10s,…topic_2h,这个一般不能支持任意时间段的延时),然后通过定时器进行轮训消费这些topic,查看消息是否到期,如果到期就把这个消息发送到具体业务处理的topic中,队列中消息越靠前的到期时间越早,具体来说就是定时器在一次消费过程中,对消息的发送时间做判断,看下是否延迟到对应时间了,如果到了就转发,如果还没到这一次定时任务就可以提前结束了。

6、消息回溯
如果某段时间对已消费消息计算的结果觉得有问题,可能是由于程序bug导致的计算错误,当程序bug修复后,这时可能需要对之前已消费的消息重新消费,可以指定从多久之前的消息回溯消费,这种可以用consumer的offsetsForTimes、seek等方法指定从某个offset偏移的消息开始消费,参见上节课的内容。

当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费
        latest(默认) :只消费自己启动之后发送到主题的消息
        earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费)
        */
        //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

7、分区数越多吞吐量越高吗

可以用kafka压测工具自己测试分区数不同,各种情况下的吞吐量

# 往test里发送一百万消息,每条设置1KB
# throughput 用来进行限流控制,当设定的值小于 0 时不限流,
#当设定的值大于 0 时,当发送的吞吐量大于该值时就会被阻塞一段时间
#一条消息1024个字节,也就是1kb
#acks=1 参考上满
bin/kafka-producer-perf-test.sh --topic test --num-records 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=192.168.85.200:9092 acks=1

在这里插入图片描述
平均值是1秒多,最大发送时间是1.9秒,不过这个时间都是一批消息的时间,百万数据kafka是分批发的,具体参考上一个文档,看他的缓存区和bash区
网络上很多资料都说分区数越多吞吐量越高 , 但从压测结果来看,分区数到达某个值吞吐量反而开始下降,实际上很多事情都会有一个临界值,当超过这个临界值之后,很多原本符合既定逻辑的走向又会变得不同。一般情况分区数跟集群机器数量相当就差不多了。
当然吞吐量的数值和走势还会和磁盘、文件系统、 I/O调度策略等因素相关。
注意:如果分区数设置过大,比如设置10000,可能会设置不成功,后台会报错"java.io.IOException : Too many open files"。
异常中最关键的信息是“ Too many open flies”,这是一种常见的 Linux 系统错误,通常意味着文件描述符不足,它一般发生在创建线程、创建 Socket、打开文件这些场景下 。 在 Linux系统的默认设置下,这个文件描述符的个数不是很多 ,通过 ulimit -n 命令可以查看:一般默认是1024,可以将该值增大,比如:ulimit -n 65535

8、消息传递保障

  • at most once(消费者最多收到一次消息,0–1次):acks = 0 可以实现。
  • at least once(消费者至少收到一次消息,1–多次):ack = all 可以实现。
  • exactly once(消费者刚好收到一次消息):at least once 加上消费者幂等性可以实现,还可以用kafka生产者的幂等性来实现。

kafka生产者的幂等性:因为发送端重试导致的消息重复发送问题,kafka的幂等性可以保证重复发送的消息只接收一次,只需在生产者加上参数 props.put(“enable.idempotence”, true) 即可,默认是false不开启。
具体实现原理是,kafka每次发送消息会生成PID和Sequence Number,并将这两个属性一起发送给broker,broker会将PID和Sequence Number跟消息绑定一起存起来,下次如果生产者重发相同消息,broker会检查PID和Sequence Number,如果相同不会再接收。

PID:每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个PID 对用户完全是透明的。生产者如果重启则会生成新的PID。
Sequence Number:对于每个 PID,该 Producer 发送到每个 Partition 的数据都有对应的序列号,这些序列号是从0开始单调递增的。

9、kafka的事务(了解)

Kafka的事务不同于Rocketmq,Rocketmq是保障本地事务(比如数据库)与mq消息发送的事务一致性,Kafka的事务主要是保障一次发送多条消息的事务一致性(要么同时成功要么同时失败),一般在kafka的流式计算场景用得多一点,比如,kafka需要对一个topic里的消息做不同的流式计算处理,处理完分别发到不同的topic里,这些topic分别被不同的下游系统消费(比如hbase,redis,es等),这种我们肯定希望系统发送到多个topic的数据保持事务一致性。Kafka要实现类似Rocketmq的分布式事务需要额外开发功能。

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("transactional.id", "my-transactional-id");
 Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
 //初始化事务
 producer.initTransactions();

 try {
     //开启事务
     producer.beginTransaction();
     for (int i = 0; i < 100; i++){
         //发到不同的主题的不同分区
         producer.send(new ProducerRecord<>("hdfs-topic", Integer.toString(i), Integer.toString(i)));
         producer.send(new ProducerRecord<>("es-topic", Integer.toString(i), Integer.toString(i)));
         producer.send(new ProducerRecord<>("redis-topic", Integer.toString(i), Integer.toString(i)));
     }
     //提交事务
     producer.commitTransaction();
 } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
     // We can't recover from these exceptions, so our only option is to close the producer and exit.
     producer.close();
 } catch (KafkaException e) {
     // For all other exceptions, just abort the transaction and try again.
     //回滚事务
     producer.abortTransaction();
 }
 producer.close();

10、kafka高性能的原因

  • 磁盘顺序读写:kafka消息不能修改以及不会从文件中间删除保证了磁盘顺序读,kafka的消息写入文件都是追加在文件末尾,不会写入文件中的某个位置(随机写)保证了磁盘顺序写。
  • 数据传输的零拷贝
  • 读写数据的批量batch处理以及压缩传输

数据传输零拷贝原理:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/42484.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

前端vue uni-app仿美团下拉框下拉筛选组件

在前端Web开发中&#xff0c;下拉筛选功能是一种非常常见的交互方式&#xff0c;它可以帮助用户快速选择所需的选项。本文将介绍如何利用Vue.js和uni-app框架来实现一个高效的下拉筛选功能。通过使用这两个强大的前端框架&#xff0c;我们可以轻松地创建具有响应式用户操作的下…

Spring Cloud+Uniapp+企业工程管理系统源码之提高工程项目管理软件的效率

高效的工程项目管理软件不仅能够提高效率还应可以帮你节省成本提升利润 在工程行业中&#xff0c;管理不畅以及不良的项目执行&#xff0c;往往会导致项目延期、成本上升、回款拖后&#xff0c;最终导致项目整体盈利下降。企企管理云业财一体化的项目管理系统&#xff0c;确保…

车道线检测|利用边缘检测的原理对车道线图片进行识别

前言 那么这里博主先安利一些干货满满的专栏了&#xff01; 这两个都是博主在学习Linux操作系统过程中的记录&#xff0c;希望对大家的学习有帮助&#xff01; 操作系统Operating Syshttps://blog.csdn.net/yu_cblog/category_12165502.html?spm1001.2014.3001.5482Linux S…

Python应用实例(二)数据可视化(三)

数据可视化&#xff08;三&#xff09; 1.使用Plotly模拟掷骰子1.1 安装Plotly1.2 创建Die类1.3 掷骰子1.4 分析结果1.5 绘制直方图1.6 同时掷两个骰子1.7 同时掷两个面数不同的骰子 1.使用Plotly模拟掷骰子 本节将使用Python包Plotly来生成交互式图表。需要创建在浏览器中显示…

【计算机视觉 | 目标检测 | 图像分割】arxiv 计算机视觉关于目标检测和图像分割的学术速递(7 月 17 日论文合集)

文章目录 一、检测相关(5篇)1.1 TALL: Thumbnail Layout for Deepfake Video Detection1.2 Cloud Detection in Multispectral Satellite Images Using Support Vector Machines With Quantum Kernels1.3 Multimodal Motion Conditioned Diffusion Model for Skeleton-based Vi…

LiveGBS流媒体平台GB/T28181功能-海康NVR摄像机自带物联网卡摄像头注册GB/T28181国标平台看不到设备的时候如何抓包及排查

海康大华宇视华为等硬件NVR摄像机注册到LiveGBS国标平台看不到设备的时候如何抓包及排查 1、设备注册后查看不到1.1、是否是自带物联网卡的摄像头1.2、关闭萤石云1.3、防火墙排查1.4、端口排查1.5、IP地址排查1.6、设备TCP/IP配置排查1.7、设备多网卡排查1.8、设备接入配置参数…

docker-compose自建RustDesk远程控制服务器

github&#xff1a; rustdesk/rustdesk-server: RustDesk Server Program (github.com) 一、创建 docker-compose.yml 文件&#xff0c;复制以下 docker-compose 配置文件内容到文件 version: 3networks:rustdesk-net:external: falseservices:hbbs:container_name: hbbspor…

卷积神经网络识别人脸项目—使用百度飞桨ai计算

卷积神经网络识别人脸项目的详细过程 整个项目需要的准备文件&#xff1a; 下载链接&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1WEndfi14EhVh-8Vvt62I_w 提取码&#xff1a;7777 链接&#xff1a;https://pan.baidu.com/s/10weqx3r_zbS5gNEq-xGrzg 提取码&#x…

【论文笔记】KDD2019 | KGAT: Knowledge Graph Attention Network for Recommendation

Abstract 为了更好的推荐&#xff0c;不仅要对user-item交互进行建模&#xff0c;还要将关系信息考虑进来 传统方法因子分解机将每个交互都当作一个独立的实例&#xff0c;但是忽略了item之间的关系&#xff08;eg&#xff1a;一部电影的导演也是另一部电影的演员&#xff09…

idea-实现热部署

idea-实现热部署 今天在进行idea 开发时突然发现热部署失败了&#xff0c;每次修改内容都要去restart server一次 这样比较麻烦&#xff0c;故而总结一下idea实现热部署的方法&#xff1a; 步骤一&#xff1a; 选择edit configuration 然后跳出server 的配置&#xff0c;方框…

第一性原理COHP计算在材料科学领域的应用

第一性原理COHP计算在材料科学领域的应用 第一性原理COHP计算是一种基于密度泛函理论&#xff08;DFT&#xff09;的计算方法&#xff0c;用于研究固体材料中的化学键和电子结构相互作用。通过COHP计算&#xff0c;我们可以获得许多有用的数据&#xff0c;并且这些数据在材料科…

NFS 存储(二十八)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 前言 一、概述 二、应用场景 三、安装 四、启动 五、目录结构 六、命令解析 七、配置 八、客户端访问 总结 前言 今天学习的是NFS 存储&#xff0c;主要是讲 nfs 的概述…

1.Lee Code HTML面试题

如何理解HTML语义化 HTML语义化是指在编写HTML代码时,使用合适的标签和元素来表达文档结构和含义,使得页面内容对搜索引擎和开发者更加友好,并增加代码的可读性。语义化的HTML使得网页在没有样式或样式加载失败时仍然能够保持良好的结构和意义。 问题 2

Kubernetes 使用 helm 部署 NFS Provisioner

文章目录 1. 介绍2. 预备条件3. 部署 nfs4. 部署 NFS subdir external provisioner4.1 集群配置 containerd 代理4.2 配置代理堡垒机通过 kubeconfig 部署 部署 MinIO添加仓库修改可配置项 访问nodepotingress 1. 介绍 NFS subdir external provisioner 使用现有且已配置的NFS…

高级ACL列表应用实验

实验拓扑图&#xff1a; 实验要求&#xff1a; PC1可以telnet R1&#xff0c;但不能ping R1&#xff1b;PC1可以ping R2但不能telnet R2&#xff1b;PC2和PC1相反 1、配置IP让整个网络互通 [PC1]ip route-static 0.0.0.0 0.0.0.0 192.168.1.254 [PC2]ip route-static 0.0.0.…

【VB6|第20期】遍历Excel单元格的四种方法

日期&#xff1a;2023年7月19日 作者&#xff1a;Commas 签名&#xff1a;(ง •_•)ง 积跬步以致千里,积小流以成江海…… 注释&#xff1a;如果您觉得有所帮助&#xff0c;帮忙点个赞&#xff0c;也可以关注我&#xff0c;我们一起成长&#xff1b;如果有不对的地方&#xf…

Linux系统编程(信号处理机制)

文章目录 前言一、中断&#xff0c;异常&#xff0c;信号的区别二、信号在Linux中的标识三、信号处理相关函数四、代码实验总结 前言 本篇文章我们来讲解信号的处理机制&#xff0c;信号处理在Linux操作系统中必不可少&#xff0c;这一点值得大家注意&#xff0c;信号又会与中…

DOM事件

文章目录 1.注册事件&#xff08;绑定事件&#xff09;1.1 注册事件概述1.2 addEventListener 事件监听方式1.3 attachEvent 事件监听方式1.4 注册事件兼容性解决方案 2.删除事件&#xff08;解绑事件&#xff09;2.1 删除事件的方式2.2 删除事件兼容性解决方案 3.DOM 事件流4.…

机器学习实践(2.1)LightGBM分类任务

前言 LightGBM也属于Boosting集成学习模型(还有前面文章的XGBoost)&#xff0c;LightGBM和XGBoost同为机器学习的集大成者。相比越来越流行的深度神经网络&#xff0c;LightGBM和XGBoost能更好的处理表格数据&#xff0c;并具有更强的可解释性&#xff0c;还具有易于调参、输入…

Kafka的基本概念及其关键原理

Apache Kafka是一种分布式事件存储和流处理平台。该项目旨在提供一个统一的、高吞吐量、低延迟的平台&#xff0c;用于处理实时数据流。 •Kafka可以通过Kafka Connect连接到外部系统&#xff08;用于数据导入/导出&#xff09;&#xff0c;并提供Kafka Streams库用于流处理应用…