kafka操作命令详解

目录

1、集群运维命令

1.1、集群启停命令

1.3、集群迁移命令

1.4、权限管理命令

1.4.1、权限参数介绍

1.4.2、增加权限命令

1.4.3、移出权限命令

1.4.4、查看所有topic权限命令

1.4.5、查看某个topic权限命令

2、生产者命令

2.1、创建topic命令

2.2、删除topic命令

2.3、修改topic命令

2.3.1、修改配置命令

2.3.2、删除配置命令

2.4、查询topic命令

2.4.1、查询topic列表命令

2.4.2、查看某个topic详情命令

 2.4.2、查看某个topic配置命令

2.5、发送消息命令

3、消费者命令

3.1、消费者组命令

3.1.1、查看消费者组列表命令

3.1.2、查看消费者组成员命令

3.1.3、查看某个消费组详情命令

3.1.4、查看某个消费者组状态命令

3.3、消费消息命令

4、性能测试命令

5、类执行命令


1、集群运维命令

1.1、集群启停命令

bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-stop.sh

1.3、集群迁移命令

  • consumer.config <file>: 指定消费者的配置文件。该文件包含了消费源集群的相关配置,如连接信息、组ID等。
  • producer.config <file>: 指定生产者的配置文件。该文件包含了生产者写入目标集群的相关配置,如连接信息、压缩方式等。
  • whitelist <topic>: 指定需要镜像的主题列表,可以使用逗号分隔多个主题名。
  • blacklist <topic>: 指定需要排除镜像的主题列表,可以使用逗号分隔多个主题名。与--whitelist互斥。
  • num.streams <number>: 指定消费者线程数量,默认是1。
  • message.handler <class>: 自定义消息处理类,可以在消息写入目标集群前对其进行处理。
  • message.handler.args <args>: 传递给自定义消息处理类的参数。
  • abort.on.send.failure <true|false>: 是否在发送失败时终止镜像过程,默认为true。
  • offset.commit.interval.ms <number>: 定期提交消费者偏移量的间隔时间,默认是60000ms(1分钟)
#将指定topic按照消费者配置调用生产者发送到指定集群
bin/kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist "topic1,topic2"

 

1.4、权限管理命令

1.4.1、权限参数介绍

  • principal: 代表用户或服务主体,例如 User:Alice。
  • host: 允许或拒绝访问的主机地址,* 表示任意主机。
  • operation: 允许或拒绝的操作类型,例如 READ, WRITE, ALL。
  • permissionType: 权限类型,可以是 ALLOW 或 DENY。
  • resourceType: 资源类型,例如 TOPIC 或 GROUP。
  • name: 资源名称,例如 example-topic。
  • patternType: 资源模式类型,LITERAL 表示字面模式。

1.4.2、增加权限命令

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
  --remove --allow-principal User:user1 --operation Write --topic my-topic

1.4.3、移出权限命令

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
  --remove --allow-principal User:user1 --operation Write --topic my-topic

1.4.4、查看所有topic权限命令

bin/kafka-acls.sh --list --bootstrap-server localhost:9092
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=example-topic, patternType=LITERAL)`:
  (principal=User:Alice, host=*, operation=READ, permissionType=ALLOW)
  (principal=User:Bob, host=192.168.1.100, operation=WRITE, permissionType=ALLOW)
  (principal=User:Carol, host=*, operation=READ, permissionType=DENY)

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=another-topic, patternType=LITERAL)`:
  (principal=User:David, host=192.168.1.101, operation=READ, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=example-group, patternType=LITERAL)`:
  (principal=User:Admin, host=*, operation=ALL, permissionType=ALLOW)

1.4.5、查看某个topic权限命令

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
  --list --topic my-topic

2、生产者命令

2.1、创建topic命令

bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

2.2、删除topic命令

bin/kafka-topics.sh --delete --topic my_topic --bootstrap-server localhost:9092

2.3、修改topic命令

2.3.1、修改配置命令

kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic topicName --config flush.messages=1

2.3.2、删除配置命令

kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic topicName --delete-config flush.messages

2.4、查询topic命令

2.4.1、查询topic列表命令

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

2.4.2、查看某个topic详情命令

bin/kafka-topics.sh --describe --topic my_topic --bootstrap-server localhost:9092

 2.4.2、查看某个topic配置命令

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --add-config retention.ms=3600000

Configs for topic 'my_topic' are:
  cleanup.policy=delete
  compression.type=producer
  delete.retention.ms=86400000
  file.delete.delay.ms=60000
  flush.messages=9223372036854775807
  flush.ms=9223372036854775807
  follower.replication.throttled.replicas=
  index.interval.bytes=4096
  leader.replication.throttled.replicas=
  max.message.bytes=1000012
  message.downconversion.enable=true
  message.format.version=2.7-IV0
  message.timestamp.difference.max.ms=9223372036854775807
  message.timestamp.type=CreateTime
  min.cleanable.dirty.ratio=0.5
  min.compaction.lag.ms=0
  min.insync.replicas=1
  preallocate=false
  retention.bytes=-1
  retention.ms=604800000
  segment.bytes=1073741824
  segment.index.bytes=10485760
  segment.jitter.ms=0
  segment.ms=604800000
  unclean.leader.election.enable=false

2.5、发送消息命令

#默认发送消息
bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092

#指定分区发送命令
kafka-console-producer.sh --topic my-topic 2 --broker-list localhost:9092

3、消费者命令

3.1、消费者组命令

3.1.1、查看消费者组列表命令

#查看消费组列表命令
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

group1
group2
group3
my_consumer_group
another_consumer_group

3.1.2、查看消费者组成员命令

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test_group --members

CONSUMER-ID                                     HOST            CLIENT-ID       #PARTITIONS     
consumer-1-2d7d59c3-64c9-4526-905b-4e8920db3e84 /192.168.1.36   consumer-1      2

3.1.3、查看某个消费组详情命令

bin/kafka-consumer-groups.sh --describe --group my_group --bootstrap-server localhost:9092

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID         HOST            CLIENT-ID
my_group        my_topic        0          12345           12350           5               consumer-1-1        /192.168.1.1    consumer-1
my_group        my_topic        1          23456           23460           4               consumer-1-2        /192.168.1.1    consumer-1
my_group        my_topic        2          34567           34567           0               consumer-1-3        /192.168.1.1    consumer-1

3.1.4、查看某个消费者组状态命令

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state

COORDINATOR (ID)          ASSIGNMENT-STRATEGY       STATE                #MEMBERS
192.168.1.36:9092 (1)     range                     Stable               1

3.3、消费消息命令

#从头消费命令
bin/kafka-console-consumer.sh --topic my-topic --bootstrap-server localhost:9092 --from-beginning

#指定分区消费命令
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning --partition 1

#指定消费者组id消费命令
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning --consumer-property group.id=test

4、性能测试命令

4.1、生产者测试命令

bin/kafka-producer-perf-test.sh --topic perf-test --num-records 1000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=127.0.0.1:9092 compression.type=lz4

1000 records sent, 3424.657534 records/sec (3.34 MB/sec), 13.61 ms avg latency, 255.00 ms max latency, 13 ms 50th, 20 ms 95th, 255 ms 99th.

-h, --help	显示使用帮助并退出
--topic	指定生产的消息发往的 topic
--num-records	指定生产的消息总数
--payload-delimeter	如果通过 --payload-file 指定了从文件中获取消息内容,那么这个参数的意义是指定文件的消息分隔符,默认值为 \n,即文件的每一行视为一条消息;如果未指定 --payload-file 则此参数不生效
--throughput	限制每秒发送的最大的消息数,设为 -1 表示不限制
--producer-props	直接指定 Producer 配置,格式为 NAME=VALUE,例如 bootstrap.server=127.0.0.1:9092,通过此种方式指定的配置优先级高于 --producer.config
--producer-config	指定 Producer 的配置文件,格式参照官方的 config/producer.properties
--print-metrics	在测试结束后打印更详尽的指标,默认为 false
--transactional-id	指定事务 ID,测试并发事务的性能时需要,只有在 --transaction-duration-ms > 0 时生效,默认值为 performance-producer-default-transactional-id
--transactional-duration-ms	指定事务持续的最长时间,超过这段时间后就会调用 commitTransaction 来提交事务,只有指定了 > 0 的值才会开启事务,默认值为 0
--record-size	指定每条消息的大小,单位是字节,和 --payload-file 两个中必须指定一个,但不能同时指定
--payload-file	指定消息的来源文件,只支持 UTF-8 编码的文本文件,文件的消息分隔符通过 --payload-delimeter 指定,和 --record-size 两个中必须指定一个,但不能同时指定

4.2、消费者测试命令

bin/kafka-consumer-perf-test.sh --bootstrap-server 127.0.0.1:9092 --topic perf_test --messages 1000000 --threads 8 --reporting-interval 1000 --show-detailed

time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2021-03-25 15:57:59:426, 0, 657.2275, 657.2275, 673001, 673001.0000, 1616659078690, -1616659077690, 0.0000, 0.0000
...

***************************输入参数解释***************************
--bootstrap-server	指定 broker 地址,必选,除非用 --broker-list 代替(不建议)
--topic	指定消费的 topic,必选
--version	输出 Kafka 版本
--consumer.config	指定 Consumer 配置文件
--date-format	指定用于格式化 *.time 的规则,默认为 yyyy-MM-dd HH:mm:ss:SSS
--fetch-size	指定一次请求消费的大小,默认为 1048576 即 1 MB
--from-latest	如果 Consumer 没有已经建立的 offset,则指定从 log 中最新的位点开始消费,而不是从最早的位点开始消费
--group	指定 ConsumerGroup ID,默认为 perf-consumer-40924
--help	显示使用帮助并退出
--hide-header	指定后不输出 header 信息
--messages	指定消费的消息数量,必选
--num-fetch-threads	指定 fetcher 线程的数量
--print-metrics	指定打印 metrics 信息
--reporting-interval	指定打印进度信息的时间间隔,默认为 5000 即 5 秒
--show-detailed-stats	指定每隔一段时间(由 --reporting-interval 指定)输出显示详细的状态信息
--socket-buffer-size	指定 TCP 的 RECV 大小,默认为 2097152 即 2 MB
--threads	指定消费的线程数,默认为 10
--timeout	指定允许的最大超时时间,即每条消息返回的最大时间间隔,默认为 10000 即 10 秒


***************************输出参数解释***************************
time:当前时间,格式由 --date-format 指定
threadId:线程 ID
data.consumed.in.MB:消费到的数据总大小,单位为 MB
MB.sec:消费 TPS,即每秒消费的消息大小
data.consumed.in.nMsg:消费到的总消息数
nMsg.sec:消费 TPS,即每秒消费的消息条数
rebalance.time.ms:消费者组重平衡的耗时,单位为 ms,0 表示没有发生重平衡
fetch.time.ms:fetch 线程的总耗时,单位为 ms
fetch.MB.sec:fetch 线程每秒钟获取到的消息大小
fetch.nMsg.sec:fetch 线程每秒钟获取到的消息数量
#若没有指定 --show-detailed,则输出信息中的前两项会有所不同,如下:
start.time, end.time, data.consumed.in.MB, MB.sec, ...
start.time:消费开始的时间,格式由 --date-format 指定
end.time:消费结束的时间,格式由 --date-format 指定

5、类执行命令

通过kafka-run-class命令执行指定类

#查看消费者消费进度
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic my-topic

my-topic:0:12345
my-topic:1:23456
my-topic:2:34567

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/731349.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

同时使用磁吸充电器和Lightning时,iPhone充电速度会变快吗?

在智能手机的世界里&#xff0c;续航能力一直是用户关注的焦点。苹果公司以其创新的MagSafe技术和传统的Lightning接口&#xff0c;为iPhone用户提供了多样化的充电解决方案。 然而&#xff0c;当这两种技术同时使用时&#xff0c;它们能否带来更快的充电速度&#xff1f;本文…

“用友审批+民生付款”,YonSuite让企业发薪更准时

随着现代企业经营模式的不断创新和市场竞争的加剧&#xff0c;企业薪资管理和发放的效率、准确性和及时性已成为企业管理的重要一环。然而&#xff0c;在实际操作中&#xff0c;许多企业面临着薪资管理复杂、发放流程繁琐、数据不准确等难点和痛点。为了解决这些问题&#xff0…

【Java】已解决java.net.UnknownHostException异常

文章目录 一、分析问题背景二、可能出错的原因三、错误代码示例四、正确代码示例五、注意事项 已解决java.net.UnknownHostException异常 在Java的网络编程中&#xff0c;java.net.UnknownHostException是一个常见的异常&#xff0c;它通常表明在尝试解析主机名时出现了问题。…

全网首测!文生软件平台码上飞CodeFlying,效果炸裂!

前言&#xff1a; 提到AIGC&#xff0c;在大家的印象中应该就是让AI自己生成文字&#xff0c;图片等内容吧。随着今年Sora&#xff0c;Suno的爆火&#xff0c;将AIGC的应用场景又拉到了一个新的高度&#xff0c;为人们带来了更多的遐想。在未来&#xff0c;或许可以用AI来生成…

人声分离的5个方法分享,从入门到精通,伴奏提取手拿把捏!

人声分离通常是音乐制作、混音和卡拉OK中常用的重要技术之一。它的核心是将乐器伴奏从原始音轨中分离出来&#xff0c;使得用户可以单独处理或重混音频&#xff0c;创造出清晰干净的伴奏轨道。若缺乏强大的音频剪辑软件或专业人声分离工具&#xff0c;这一过程往往会比较困难。…

车辆轨迹预测系列 (二):常见数据集介绍

车辆轨迹预测系列 (二)&#xff1a;常见数据集介绍 文章目录 车辆轨迹预测系列 (二)&#xff1a;常见数据集介绍1、NuScenes (2020)&#xff1a;1、下载2、说明 2、Waymo Open Dataset (2020)&#xff1a;1、介绍2、概述3、下载4、教程5、参考 3、Lyft Level 5 (2020)&#xff…

智慧办公新篇章:可视化技术引领园区管理革命

随着科技的飞速发展&#xff0c;办公方式也在经历着前所未有的变革。在这个信息爆炸的时代&#xff0c;如何高效、智能地管理办公空间&#xff0c;成为了每个企业和园区管理者面临的重要课题。 智慧办公园区作为未来办公的新趋势&#xff0c;以其高效、便捷、智能的特点&#x…

鸿蒙NEXT实战开发: 依据前端对http请求进行二次简单封装

一、为什么要对http请求进行封装&#xff1f; 在我看来二次封装有一下几点好处 代码封装之后&#xff0c;开发人员只用关注业务层面的东西&#xff0c;不用去过多浪费时间在接口请求数据处理上。封装之后代码更加简洁&#xff0c;通俗易懂&#xff0c;方便后期维护&#xff0…

数据库讲解---(数据库保护)【上】

目录 一.事务 1.1事务的概念【重要】 1.2事务的特性【重要】 1.2.1原子性(Atomicity) 1.2.2一致性(Consistency) 1.2.3隔离性(Isolation) 1.2.4持久性(Durability) 二.数据库恢复 2.1数据库系统的故障 2.1.1事务内部故障 2.1.2系统故障 2.1.3介质故障 2.1.4计算机…

甘肃的千层烤馍:传统面点的魅力绽放

千层烤馍&#xff0c;作为甘肃美食文化的重要象征&#xff0c;以其独特的外形和丰富的口感&#xff0c;吸引着众多食客。它的外观犹如一件精美的艺术品&#xff0c;层层叠叠&#xff0c;金黄酥脆&#xff0c;散发着诱人的香气。 在甘肃平凉地区制作千层烤馍&#xff0c…

详解|什么样的SSL证书能助力企业通过等保与密评?

企业在过等级保护&#xff08;简称“等保”&#xff09;与密码评测&#xff08;简称“密评”&#xff09;的时候&#xff0c;SSL证书作为网络安全的基础组件之一&#xff0c;其选择与部署对于企业顺利通过等保测评与密评至关重要。那什么样的SSL证书能够有效助力企业达成这一目…

gbase8s之Encoding or code set not supported

如图发生以下错误&#xff1a; 解决办法&#xff1a;在url里加上ifx_use_strenctrue 就可以了 参数解释&#xff1a;

镜像发布至dockerHub

1、login 没有账号的话去注册一个 https://hub.docker.com docker login 输入账号密码和账号2、修改镜像名格式 可以直接招我的修改 格式为你的 hub名/镜像名 3、推送

与大模型交手近 1500 天,智源仍在坚持原始创新

前言 2024 上半年&#xff0c; OpenAI 的成果从世界模拟器 Sora&#xff0c;到首个实现多模态 in 到多模态 out 的 GPT-4o &#xff0c;仍在强势推进着迈向 AGI 的节奏。面对技术上的差距&#xff0c;追赶 OpenAI ——是这场人工智能革命浪潮发展至今&#xff0c; AI 界仍在追…

密码CTF(5)

一、[安洵杯 2020]密码学&#xff1f;爆破就行了——sha256掩码爆破 1.题目&#xff1a; #!/usr/bin/python2 import hashlib from secret import SECRET from broken_flag import BROKEN_FLAGflag d0g3{ hashlib.md5(SECRET).hexdigest() } broken_flag d0g3{71b2b5616…

解决virtualbox虚拟机与主机之间复制粘贴

1、在VirtualBox管理器中设置共享粘贴板和拖放方向为双向 2、在存储中设置使用主机输入输出&#xff08;I/O&#xff09;缓存。 3、在存储→控制器&#xff1a;SATA→***.vdi下勾选固态驱动器 4、在虚拟机→设备→安装增强功能 如果上述操作重启虚拟机后&#xff0c;还不行&am…

揭秘Xinstall如何助力App推广,提升用户量与转化率双指标!

在移动互联网时代&#xff0c;App的推广与运营成为了每个开发者必须面对的重要课题。然而&#xff0c;推广效果的评估和优化往往令众多开发者头疼不已。今天&#xff0c;我们将为您揭秘一款能够解决这一痛点的利器——Xinstall&#xff0c;带您一起探讨它如何助力App推广&#…

深度神经网络一

文章目录 深度神经网络 (DNN)1. 概述2. 基本概念3. 网络结构 深度神经网络的层次结构详细讲解1. 输入层&#xff08;Input Layer&#xff09;2. 隐藏层&#xff08;Hidden Layers&#xff09;3. 输出层&#xff08;Output Layer&#xff09;整体流程深度神经网络的优点深度神经…

项目实践---Windows11中安装Zookeeper/Hadoop/Hive的部分问题解决

一.Hadoop与Hive兼容版本选择 正常来说&#xff0c;Hadoop与Hive版本不兼容会出现很多问题导致hive安装失败&#xff0c;可以先确定HIve的版本&#xff0c;比如&#xff1a;要用Hive3.1.2版本&#xff0c;该如何确定使用Hadoop的版本呢&#xff0c;需要我们在hive源码中找到对…

C盘满了怎么清理?一招让你远离C盘空间不足的烦恼

C盘满了怎么清理&#xff1f;一招让你远离C盘空间不足的烦恼&#xff0c;当C盘空间满了时&#xff0c;会给我们来一系列烦恼和潜在问题。比如&#xff1a;系统运行缓慢、程序崩溃或无法安装、启动时间变长、系统不稳定、文件管理困难、游戏卡顿、电脑卡顿、系统故障等问题&…