KafkaQ - 好用的 Kafka Linux 命令行可视化工具

鉴于并没有在网上找到比较好的linux平台的kafka可视化工具,今天为大家介绍一下自己开发的在 Linux 平台上使用的可视化工具KafkaQ

虽然简陋,主要可以实现下面的这些功能:

1)查看当前topic的分片数量和副本数量

2)查看当前topic下面每个分片的最大offset

3)查看当前topic某个分片下面指定offset范围的数据

4)搜索当前topic指定关键词的message

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

KafkaQ分为普通版本和搜索版本:

* 普通版本支持上述3种查询

* 搜索版本支持上述3种查询之外,增加关键词搜索,即在分片中搜索指定关键词的message

一、普通版 KafkaQ.sh

使用方法:

Usage: KafkaQ.sh --topic<topic> [--partition<partition>] [--offset<offset>] [--limit<limit>]

--topic 话题名称
--partition 分片索引(可选)
--offset 从第k个offset开始检索(可选)
--limit 从第k个offset开始检索X条结果(可选)

显示的效果如下,十分简洁,分片数据里面左边一列是消息入库的时间,右边是message内容:

KafkaQ 源码如下:

#!/bin/bash

# 默认值
PARTITION=${2:-0}
OFFSET=${3:-0}
LIMIT=${4:-0}

# 检查参数
if [ -z "$1" ]; then
    echo "Usage: $0 --topic<topic> [--partition<partition>] [--offset<offset>] [--limit<limit>]"
    exit 1
fi

TOPIC="$1"

# 检查Kafka命令是否存在
if ! command -v /usr/local/kafka/bin/kafka-topics.sh >/dev/null 2>&1; then
    echo "Kafka not found at /usr/local/kafka/bin/"
    exit 1
fi

# 获取Topic信息
echo -e "\033[0;31m* 话题: $TOPIC\033[0m"

# 获取分区数和副本数
PARTITION_INFO=$(/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic "$TOPIC")
PARTITION_COUNT=$(echo "$PARTITION_INFO" | awk '/Partition:/ {print $2}' | wc -l)
REPLICA_COUNT=$(echo "$PARTITION_INFO" | grep -oP 'ReplicationFactor: \K\d+')

echo "* 分片: $PARTITION_COUNT, 副本: $REPLICA_COUNT"

# 获取分片a和分片b的最大偏移量
MAX_OFFSET=$(/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic "$TOPIC"  |  awk -F: '{ printf "  分片: %s,MaxOffset: %s\n", $2, $3 }')
echo "$MAX_OFFSET"

# 获取分片数据
if [ "$LIMIT" -gt 0 ]; then
    echo -e "\033[0;33mFetching messages from partition $PARTITION with offset $OFFSET and limit $LIMIT ...\033[0m"
    MESSAGES=$(/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "$TOPIC" --partition "$PARTITION" --offset "$OFFSET" --max-messages "$LIMIT" --property print.key=true --property print.value=true --property print.timestamp=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer)
    
    # 格式化输出消息
    echo "$MESSAGES" | awk -F'\t' 'BEGIN {
        print "* 分片数据:"
    }
    {
        if ($3 != "null") {
            timestamp = substr($1, 12) / 1000 # 从第10个字符开始提取时间戳,并除以1000以转换为秒级时间戳
            value = $3
            printf "%s %s\n", strftime("%Y-%m-%d %H:%M:%S", timestamp), value
        }
    }'
fi

二、搜索版 KafkaQ-Search.sh

使用方法:

Usage: KafkaQ-Search.sh --topic<topic> [--partition<partition>] [--offset<offset>] [--limit<limit>] [--search<keyword>]

--topic 话题名称
--partition 分片索引(可选)
--offset 从第k个offset开始检索(可选)
--limit 从第k个offset开始检索X条结果(可选)
--search 搜索字符串

显示效果如下:

KafkaQ-Search.sh 源码如下:

#!/bin/bash

# 默认值
PARTITION=${2:-0}
OFFSET=${3:-0}
LIMIT=${4:-0}
SEARCH=${5:-""}

# 检查参数
if [ -z "$1" ]; then
    echo "Usage: $0 --topic<topic> [--partition<partition>] [--offset<offset>] [--limit<limit>] [--search<keyword>]"
    exit 1
fi

while [[ $# -gt 0 ]]; do
    case "$1" in
        --topic)
            TOPIC="$2"
            shift 2
            ;;
        --partition)
            PARTITION="$2"
            shift 2
            ;;
        --offset)
            OFFSET="$2"
            shift 2
            ;;
        --limit)
            LIMIT="$2"
            shift 2
            ;;
        --search)
            SEARCH="$2"
            shift 2
            ;;
        *)
            echo "Unknown parameter: $1"
            exit 1
            ;;
    esac
done

# 检查Kafka命令是否存在
if ! command -v /usr/local/kafka/bin/kafka-topics.sh >/dev/null 2>&1; then
    echo "Kafka not found at /usr/local/kafka/bin/"
    exit 1
fi

# 获取Topic信息
echo -e "\033[0;31m* 话题: $TOPIC\033[0m"

# 获取分区数和副本数
PARTITION_INFO=$(/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic "$TOPIC")
PARTITION_COUNT=$(echo "$PARTITION_INFO" | awk '/Partition:/ {print $2}' | wc -l)
REPLICA_COUNT=$(echo "$PARTITION_INFO" | grep -oP 'ReplicationFactor: \K\d+')

echo "* 分片: $PARTITION_COUNT, 副本: $REPLICA_COUNT"

# 获取分片a和分片b的最大偏移量
MAX_OFFSET=$(/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic "$TOPIC"  |  awk -F: '{ printf "  分片: %s,MaxOffset: %s\n", $2, $3 }')
echo "$MAX_OFFSET"


# 获取分片数据
if [ "$LIMIT" -gt 0 ]; then
    echo -e "\033[0;33mFetching messages from partition $PARTITION with offset $OFFSET and limit $LIMIT ...\033[0m"
    MESSAGES=$(/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "$TOPIC" --partition "$PARTITION" --offset "$OFFSET" --max-messages "$LIMIT" --property print.key=true --property print.value=true --property print.timestamp=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer)
    
    # 搜索关键词并输出结果
    if [[ ! -z $SEARCH ]]; then
        echo -e "\033[0;32m* 搜索条件:$SEARCH\033[0m"
        echo "  搜索结果:"
        echo "$MESSAGES" | grep --color=never "$SEARCH" | awk -F'\t' '{
            timestamp = substr($1, 12) / 1000 # 从第12个字符开始提取时间戳,并除以1000以转换为秒级时间戳
            value = $3
            printf "%s  %s\n", strftime("%Y-%m-%d %H:%M:%S", timestamp), value
        }'
    fi
fi

 * (附注)参考的shell如下

1、获取kafka的topic 分区数量

/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic <topic>

2、获取kafka每个分片最大的offset

/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic <topic>

3、获取kafka分片指定offset范围的具体信息

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <topic> --partition <partition> --offset <offset> --max-messages <max-message> --property print.key=true --property print.value=true --property print.timestamp=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/704886.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

实战计算机网络02——物理层

实战计算机网络02——物理层 1、物理层实现的功能2、数据与信号2.1 数据通信模型2.2 通信领域常用术语2.3 模拟信号和数字信号 3、信道和调制3.1 信道3.2 单工通信、半双工通信、全双工通信3.3 调制3.4 奈式准则3.5 香农定律 4、传输媒体4.1 导向传输媒体4.2 非导向传输媒体 5、…

JEPaaS 低代码平台 j_spring_security_check SQL注入漏洞复现

0x01 产品简介 JEPaaS是一款优秀的软件平台产品,可视化开发环境,低代码拖拽式配置开发,操作极其简单,可以帮助解决Java项目80%的重复工作,让开发更多关注业务逻辑,大大提高开发效率,能帮助公司大幅节省人力成本和时间成本,同时又不失灵活性。适用于搭建 OA、ERP、CRM、…

ONNX2NCNN工具

最近部署很多onnx转ncnn的操作&#xff0c;发现还是需要有页面操作会比较好&#xff0c;而且需要查询onnx的图&#xff0c;所以写了一个工具来搭配使用 建议搭配Netron 来使用 打开模型 选择打开-》选择onnx模型 显示基础信息 查询onnx模型图 展示信息 点击“展示信息”&…

喜讯!云起无垠入选《2024中国AI大模型产业图谱1.0版》

近日&#xff0c;数据猿与上海大数据联盟联合策划并启动了“2024全年度三大策划活动”&#xff0c;经过数月的精心筹备和严格筛选&#xff0c;通过直接申报交流、深入访谈调研、外部咨询评价以及匿名访谈等多维度交叉验证的方式&#xff0c;最终完成了《2024中国AI大模型产业图…

不同进制数之间的相互转换(全面解析版)

目录 前言 1.不同进制的表示方法 2.不同进制之间的对照 3.二进制数转换为其他进制数 3.1二进制数转换为八进制数 3.2任意进制数转换为十进制数 3.3二进制数转换为十六进制数 4.其他进制数转换为二进制数 4.1八进制数转换为二进制数 4.2十进制数转换为任意进制数 4.3十…

ESP-IDF OTA升级过程中遇到的“esp_transport_read returned:-1 and errno:128”问题(1)

在笔者“ESP32-C3模组上跑通OTA升级”系列文章中,经过了一番“踩坑填坑”的过程,最终实现了OTA升级功能。每次升级都能够成功,比较稳定。 但是,当笔者添加大量业务代码(如使能蓝牙配置),使得固件的大小由之前的200~300K字节变为1.5~1.6M字节后,再次执行升级时(同样的…

x64-linux下在vscode使用vcpkg

1.使用vscode远程连接上对应的linux &#xff0c;或者直接在图形化界面上使用。 2.安装vcpkg 插件&#xff0c;然后打开插件设置。 注意&#xff1a;defalut和host的主机一定和你自己的主机一致&#xff0c;且必须符合vcpkg三元组格式&#xff0c;其中你可以选择工作台的设置&a…

编写函数isprime(int a),用来判断自变量a是否为素数,若是素数,函数返回整数1,否则返回0

int main() {int isprime(int x);int x;printf("请输入一个数\n");scanf("%d", &x);if (isprime(x)){printf("%d是素数\n",x);}else{printf("%d不是素数\n",x);} } int isprime(int a) {int i;for (i 2; i < a / 2; i){if (a%…

法考报名必看,99%高过审率证件照片电子版制作技巧

在2024年&#xff0c;法考备战已经如火如荼进行中&#xff0c;作为进入法律行业的第一步&#xff0c;参加法考的重要性不言而喻。而作为报名过程中必不可少的一环&#xff0c;报名照片要求以及证件照制作技巧更是需要我们特别重视的部分。想要在这个过程中顺利通过审核&#xf…

vue3中用setup写的数据,不能动态渲染(非响应式)解决办法

相比于2.0&#xff0c;vue3.0在新增了一个setup函数&#xff0c;我们在setup中可以写数据也可以写方法&#xff0c;就像我们以前最开始学习js一样&#xff0c;在js文件中写代码。 For instance <template><div class"person"><h2>姓名&#xff1…

必刷!!软考【系统分析师】100道高频考题(含知识点解析),轻松45+

2024上软考已经圆满结束&#xff0c;距离下半年的考试也只剩下半年不到的时间。需要备考下半年软考系分的小伙伴们可以抓紧开始准备了&#xff0c;毕竟高级科目的难度可是不低的。 今天给大家整理了——系统分析师100道高频考题 &#xff0c;都是核心重点&#xff0c;有PDF&…

音频处理1_基本概念

AI变声和音乐创作的基础 声音本质上是人类可察觉范围内的气压周期性波动, 即声波 声波是一种连续信号&#xff0c;在任意时间内的声音信号有无数个取值。对于只能读取有限长数组计算机来说&#xff0c;我们需要将连续的声音信号转换为一个离散的序列&#xff0c;即数字化表示。…

ssm情侣购物系统-计算机毕业设计源码02387

目 录 摘要 1 绪论 1.1 开发背景与意义 1.2开发意义 1.3Vue.js 主要功能 1.3论文结构与章节安排 2 情侣购物系统系统分析 2.1 可行性分析 2.2 系统流程分析 2.2.1 数据流程 3.3.2 业务流程 2.3 系统功能分析 2.3.1 功能性分析 2.3.2 非功能性分析 2.4 系统用例分…

电脑蓝屏怎么办?7个方法为你解决问题!

“我今天开电脑时&#xff0c;不知道为什么电脑突然就蓝屏了。大家有什么方法可以解决电脑蓝屏问题吗&#xff1f;” 在现代社会的快节奏中&#xff0c;电脑已经成为了我们工作和生活的重要伙伴。然而&#xff0c;当这个“伙伴”突然展现出它的“任性”一面——蓝屏时&#xff…

idea插件开发之通过纯编码方式开发页面(不使用form ui)

写在前面 本文看线如何通过纯编码方式来定义页面。 1&#xff1a;正戏 我们首先来定义一个面板&#xff0c;需要继承抽象类&#xff1a;com.intellij.openapi.ui.SimpleToolWindowPanel&#xff0c;如下&#xff1a; public class MySelfDefinePanel extends SimpleToolWin…

【Python整蛊小代码】以及打包成.exe格式过程【踩坑,避坑】

一、项目介绍 该项目为一个Python写成的整蛊小代码&#xff0c;效果是不断弹出窗口&#xff0c;并显示图片和文字。并使用 项目大致效果&#xff1a; &#xff08;图片过于搞笑&#xff0c;不宜展示&#xff09; &#xff08;无侵犯肖像权&#xff0c;禁止商用&#xff09; …

优迅医学近10亿对赌协议今年到期,前五大客户收入波动剧烈

《港湾商业观察》廖紫雯 日前&#xff0c;优迅医学生物科技&#xff08;以下简称&#xff1a;优迅医学&#xff09;递表港交所&#xff0c;保荐机构为中金公司&#xff0c;优迅医学国内运营主体为北京优迅医学检验实验室有限公司。 作为一家以平台为基础的基因科技公司&#…

常见报错及程序框架图

程序框架图 程序流程图又称程序框图&#xff0c;是用统一规定的标准符号描述程序运行具体步骤的图形表示。程序框图的设计是在处理流程图的基础上&#xff0c;通过对输入输出数据和处理过程的详细分析&#xff0c;将计算机的主要运行步骤和内容标识出来。程序框图是进行程序设…

湖北省七普分乡、镇、街道数据,shp、excel格式 需要自取

数据名称: 湖北省七普分乡、镇、街道数据 数据格式: Shp、excel 数据几何类型: 面 数据坐标系: WGS84 数据时间&#xff1a;2020年 数据来源&#xff1a;网络公开数据 数据可视化.

1.Element的table表高度自适应vue3+js写法

解决方法 在页面table上添加id&#xff0c;动态计算每页table的最大高度 &#xff0c;将高度保存在store中&#xff0c;每次切换路由时进行计算。 文章目录 解决方法前言一、页面table使用二、store状态库1.引入库 效果 前言 提示&#xff1a;状态管理使用的是pinia,用法参考…