【shell-10】shell实现的各种kafka脚本

kafka-shell工具

    • 背景
    • 日志 log
    • 一.启动kafka->(start-kafka)
    • 二.停止kafka->(stop-kafka)
    • 三.创建topic->(create-topic)
    • 四.删除topic->(delete-topic)
    • 五.获取topic列表->(list-topic)
    • 六. 将文件数据 录入到kafka->(file-to-kafka)
    • 七.将kafka数据 下载到文件->(kafka-to-file)
    • 八. 查看topic的groupID消费情况->(list-group)

背景

注意:我用的kafka版本是 3.2.1 其他版本kafka提供的 命令行可能有细微区别。
因为经常要用kafka环境参与测试,所以写了不少脚本。在很多时候可以大大提高测试的效率。
主要包含如下功能:
topic的管理【创建,删除】
topic信息查看【topic列表,topic groupid 消费情况】
topic数据传输【file数据录入到topic,topic数据下载到本地文件】
脚本中做了各种检查,日志的输出做了颜色区分,用起来没啥问题。

日志 log

此文件是个额外的日志文件主要用于打印日志,该文件会被下面的shell文件引用

#!/bin/bash
#日志级别 debug-1, info-2, warn-3, error-4, always-5
LOG_LEVEL=2

#调试日志
function log_debug(){
  content="$(date '+%Y-%m-%d %H:%M:%S') [DEBUG]: $@"
  [ $LOG_LEVEL -le 1  ] && echo -e "\033[32m"  ${content}  "\033[0m"
}
#信息日志
function log_info(){
  content="$(date '+%Y-%m-%d %H:%M:%S') [INGO]: $@"
  [ $LOG_LEVEL -le 2  ] && echo -e "\033[32m"  ${content} "\033[0m"
}
#警告日志
function log_warn(){
  content="$(date '+%Y-%m-%d %H:%M:%S') [WARN] $@"
  [ $LOG_LEVEL -le 3  ] && echo -e "\033[33m" ${content} "\033[0m"
}
#错误日志
function log_err(){
  content="$(date '+%Y-%m-%d %H:%M:%S') [ERROR]: $@"
  [ $LOG_LEVEL -le 4  ] && echo -e "\033[31m" ${content} "\033[0m"
}
~  

一.启动kafka->(start-kafka)

下面代码中的路径你要替换成自己的路径

#!/bin/bash
source /home/shell/log

pid=`ps -aux | grep /home/kafka/kafka_2.12-3.2.1/bin/ | grep -v grep |awk '{print$2}'`
log_info "Start checking kafka process"
if [ -z $pid ]; then
   log_info "The kafka process does not exist, startting.........................................................................................."
else
   log_warn "The kafka process exists and does not need to be started"
   exit 1
fi
nohup kafka-server-start.sh /home/kafka/kafka_2.12-3.2.1/config/server.properties >>/home/kafka/kafka.log 2>&1 &
# 日志的路径是安装kafka的时候指定的,也要替换成自己的路径
tail -f 20 /home/kafka/kafka.log

二.停止kafka->(stop-kafka)

下面代码中的路径你要替换成自己的路径

#!/bin/bash
source /home/shell/log
log_info "Start checking kafka process"
pid=`ps -aux | grep /home/kafka/kafka_2.12-3.2.1/bin/ | grep -v grep |awk '{print$2}'`
if [ -z $pid ]; then
   log_warn "The kafka process does not exist and does not need to be stopped"
   exit 1
else
   log_info "The kafka process alive, stopping.............................................................................................................."
fi
kafka-server-stop.sh
log_info "Stop kafka success"

三.创建topic->(create-topic)

下面代码中的路径你要替换成自己的路径

#!/bin/bash
source /home/shell/log
log_info "脚本功能: 创建topic"
log_info "脚本参数: topic名称(必选)"
if [ $# -ne 1 ]; then
  log_err "错误:请传入topic名称"
  exit 1
fi
#TOPIC名称
TOPIC_NAME=$1
#KAFKA地址
KAFKA_BROKER=ip:9092
# 检查Kafka主题是否存在, 若已存在则放弃创建
if kafka-topics.sh --bootstrap-server $KAFKA_BROKER  --list | grep -q "^$TOPIC_NAME$";then
    log_warn "$TOPIC_NAME 已经存在,放弃创建"
else
    # 默认1副本,3分区
    kafka-topics.sh --create --bootstrap-server $KAFKA_BROKER --replication-factor 1 --partitions 3 --topic $TOPIC_NAME
    log_info "请执行topic-list检查创建是否成功"
fi
~     

在这里插入图片描述

四.删除topic->(delete-topic)

下面代码中的路径你要替换成自己的路径

#!/bin/bash

source /home/shell/log
log_info "脚本作用:删除topic"
log_info "脚本参数: 支持多个topic,用空格分开,可以批量删除"
KAFKA_BROKER=ip:9092
function check_kafka_topic() {
    local local_topic_name=$1
    if kafka-topics.sh --bootstrap-server $KAFKA_BROKER  --list | grep -q "^$local_topic_name$";then
        log_info "$local_topic_name存在->true"
        return 0  # 返回true  
    else
        log_warn "$local_topic_name 不存在->false"
        return 1  # return false
    fi
}

# 逐个删除topic
for topic in "$@"
do
  if ! check_kafka_topic $topic; then
     log_info "tpoic->$topic 不存在,跳过删除行为"
     continue
  else
     log_info "topic->$topic 执行删除"
     kafka-topics.sh --delete --bootstrap-server $KAFKA_BROKER --topic $topic
     log_info "topic->$topic 删除成功"
  fi
done

在这里插入图片描述

五.获取topic列表->(list-topic)

#!/bin/bash
source /home/shell/log
KAFKA_BROKER=ip:9092  
log_info "脚本作用: 列出topic信息"
log_info "脚本参数: topic名称(可选->未指定topic则列出所有topic信息)"
if [ $# -eq 1 ]; then
    log_info "目标$1 详情如下"
    kafka-topics.sh --describe  --bootstrap-server $KAFKA_BROKER | grep -v "__consumer_offsets" | grep $1
else
    log_info "所有topic 列表如下:"
    kafka-topics.sh --describe  --bootstrap-server $KAFKA_BROKER | grep -v "__consumer_offsets"
fi

在这里插入图片描述

六. 将文件数据 录入到kafka->(file-to-kafka)

#!/bin/bash
source /home/shell/log
log_info "脚本作用: 将文件中的数据录入指定topic"
log_info "脚本参数: 1.文件路劲(必选) 2.topic(必选)"
log_info "参数校验"
log_info "执行条件检查.........................................................................................................."
if [ $# -ne 2 ]; then
  log_err "必须传入两个参数: 1.文件路劲(必选) 2.topic(必选)"
  exit 1
fi

if ! [ -f $1 ]; then
  log_err "$1不是一个有效的数据文件"
  exit 1
fi

FILE_PATH=$1
TOPIC_NAME=$2
KAFKA_BROKER=ip:9092  


#检查topic是否存在
function check_kafka_topic() {
    local local_KAFKA_BROKER=$1
    if kafka-topics.sh --bootstrap-server $KAFKA_BROKER  --list | grep -q "^$local_KAFKA_BROKER$";then
        return 0  # 返回true  
    else
        return 1  # return false
    fi
}

#将文件数据推送到kafka
function send_to_kafka(){
    local local_path=$1
    local count=0
    while IFS= read -r line; do  
      kafka-console-producer.sh --broker-list $KAFKA_BROKER --topic $TOPIC_NAME <<< "$line"  
      count=$((count+1))
    done < "$local_path"
    echo $count
}        

if ! check_kafka_topic $TOPIC_NAME;then
  log_err "条件检查不通过, 原因: topic->$TOPIC_NAME不存在, 请先创建topic"
  exit 1
fi


log_info "参数检查通过.........................................................................................................."
start_time=`date "+%Y-%m-%d %H:%M:%S"`
start_seconds=$(date -d "$start_time" +%s)

log_info "开始录入数............................................................................................................"
count=$(send_to_kafka $FILE_PATH)

end_time=`date "+%Y-%m-%d %H:%M:%S"`
end_seconds=$(date -d "$end_time" +%s)
time_diff=$((end_seconds - start_seconds))  

log_info "录入条数: $count"
log_info "花费时间:$time_diff 秒"
log_info "录入完成.............................................................................................................."


在这里插入图片描述

七.将kafka数据 下载到文件->(kafka-to-file)

#!/bin/bash
source /home/shell/log
log_info "脚本作用: 将kafka指定topic的数据消费到指定文件中"
log_info "脚本参数:1.数据文件路径(必选) 2.topic名称(必选) 3.groupID(可选->不存在则从头消费,存在则从grooupID offset 开始消费)"
log_info "group-list 脚本可以查看当前的"
# Kafka的bin目录  
KAFKA_BIN_DIR=/path/to/kafka/bin

#kafka 地址  
KAFKA_SERVER=ip:9092 
 
# Kafka的配置文件目录  
KAFKA_CONFIG_DIR=/home/kafka/kafka_2.12-3.2.1/config

# Kafka消费者配置文件  
CONSUMER_CONFIG=$KAFKA_CONFIG_DIR/consumer.properties

# 指定要消费的主题  
TOPIC_NAME=your_topic_name

# 指定要写入的文件 
FILE_PATH=$1
TOPIC_NAME=$2
GROUP_ID=$3


log_info "执行检察............................................................................................................................"

function check_kafka_topic() {
    local local_topic_name=$1
    if kafka-topics.sh --bootstrap-server $KAFKA_SERVER  --list | grep -q "^$local_topic_name$";then
        return 0  # 返回true  
    else
        return 1  # return false
    fi
}

 
if ! check_kafka_topic $TOPIC_NAME;then
  log_err "topic->$TOPIC_NAME 未找到"
  exit 1
fi
log_info "检查通过............................................................................................................................"

log_info "当前topic,所有groupID的消费情况如下>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"
while IFS= read -r line; do
   if [[ $line == *"PARTITION"* ]]; then
         content="$(date '+%Y-%m-%d %H:%M:%S') [INFO] $line"
         echo -e "\033[45m" ${content} "\033[0m"
   else  
       log_info "$line"
   fi
done< <(kafka-consumer-groups.sh --bootstrap-server $KAFKA_SERVER --describe  --all-groups | grep -v '__consumer_offsets' | grep "$TOPIC_NAME\|PARTITION")

log_info "当前topic,所有groupID的消费情况输出完成>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"

 
log_info "消费进程运行中( CTRL+C 可退出消费 )................................................................................................."
# 运行消费者脚本并将输出重定向到文件  
if [ $# -eq 2 ]; then
  kafka-console-consumer.sh --bootstrap-server $KAFKA_SERVER --topic $TOPIC_NAME --from-beginning > $FILE_PATH
fi
if [ $# -eq 3 ]; then
  kafka-console-consumer.sh --bootstrap-server $KAFKA_SERVER --topic $TOPIC_NAME --from-beginning --group $GROUP_ID > $FILE_PATH
fi

在这里插入图片描述

八. 查看topic的groupID消费情况->(list-group)

#!/bin/bash
kafka_broker=ip:9092
source /home/shell/log
log_info "脚本功能: 查看topic的groupID信息"
log_info "脚本参数: topic名称(可选->未指定topic则列出所有topic的groupID信息)"
function check_kafka_topic() {
    local local_topic_name=$1
    if kafka-topics.sh --bootstrap-server $kafka_broker  --list | grep -q "^$local_topic_name$";then
        log_info "$local_topic_name存在->true"
        return 0  # 返回true  
    else
        log_warn "$local_topic_name 不存在->false"
        return 1  # return false
    fi
}

if [ $# -eq 1 ]; then
  if ! check_kafka_topic $1; then
    #topic 不存在则直接退出程序
    log_warn "topic=$1, 不存在"
    exit 1
  fi
  log_info "topic_name=$1 的gruoupID信息如下:"
  kafka-consumer-groups.sh --bootstrap-server $kafka_broker --describe --all-groups | grep $1 | grep -v __consumer_offsets
else
  log_info "所有groupID信息如下:"
  kafka-consumer-groups.sh --bootstrap-server $kafka_broker --describe --all-groups | grep -v __consumer_offsets
fi

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/350808.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

GPT应用程序的应用场景

GPT&#xff08;Generative Pre-trained Transformer&#xff09;应用程序具有广泛的应用场景&#xff0c;其强大的自然语言生成能力使其适用于多个领域。以下是一些常见的GPT应用场景&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发…

AI数字人-数字人视频创作数字人直播效果媲美真人

在科技的不断革新下&#xff0c;数字人技术正日益融入到人们的生活中。近年来&#xff0c;随着AI技术的进一步发展&#xff0c;数字人视频创作领域出现了一种新的创新方式——AI数字人。数字人视频通过AI算法生成虚拟主播&#xff0c;其外貌、动作、语音等方面可与真实人类媲美…

【开源】基于JAVA+Vue+SpringBoot的数据可视化的智慧河南大屏

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块三、系统展示四、核心代码4.1 数据模块 A4.2 数据模块 B4.3 数据模块 C4.4 数据模块 D4.5 数据模块 E 五、免责说明 一、摘要 1.1 项目介绍 基于JAVAVueSpringBootMySQL的数据可视化的智慧河南大屏&#xff0c;包含了GDP、…

Python实战项目Excel拆分与合并——合并篇

在实际工作中&#xff0c;我们经常会遇到各种表格的拆分与合并的情况。如果只是少量表&#xff0c;手动操作还算可行&#xff0c;但是如果是几十上百张表&#xff0c;最好使用Python编程进行自动化处理。下面介绍两种拆分案例场景&#xff0c;如何用Pandas实现Excel文件的合并。…

Android App开发基础(1)—— App的开发特点

本文介绍基于Android系统的App开发常识&#xff0c;包括以下几个方面&#xff1a;App开发与其他软件开发有什么不一样&#xff0c;App工程是怎样的组织结构又是怎样配置的&#xff0c;App开发的前后端分离设计是如何运作实现的&#xff0c;App的活动页面是如何创建又是如何跳转…

代码随想录算法刷题训练营day16

代码随想录算法刷题训练营day16&#xff1a;LeetCode(104)二叉树的最大深度 、LeetCode(559)n叉树的最大深度、LeetCode(111)二叉树的最小深度、LeetCode(222)完全二叉树的节点个数 LeetCode(104)二叉树的最大深度 题目 代码 /*** Definition for a binary tree node.* publ…

Web3.0投票如何做到公平公正且不泄露个人隐私

在当前的数字时代&#xff0c;社交平台举办投票活动已成为了一种普遍现象。然而&#xff0c;随之而来的是一些隐私和安全方面的顾虑&#xff0c;特别是关于个人信息泄露和电话骚扰的问题。期望建立一个既公平公正又能保护个人隐私的投票系统。Web3.0的出现为实现这一目标提供了…

qt学习:实战 http请求获取qq的吉凶

目录 利用的api是 聚合数据 的qq号码测吉凶 编程步骤 配置ui界面 添加头文件&#xff0c;定义网络管理者和http响应槽函数 在界面的构造函数里创建管理者对象&#xff0c;关联http响应槽函数 实现按钮点击事件 实现槽函数 效果 利用的api是 聚合数据 的qq号码测吉凶 先…

架构整洁之道-设计原则

4 设计原则 通常来说&#xff0c;要想构建一个好的软件系统&#xff0c;应该从写整洁的代码开始做起。这就是SOLID设计原则所要解决的问题。 SOLID原则的主要作用就是告诉我们如何将数据和函数组织成为类&#xff0c;以及如何将这些类链接起来成为程序。请注意&#xff0c;这里…

C#使用RabbitMQ-1_Docker部署并在c#中实现简单模式消息代理

介绍 RabbitMQ是一个开源的消息队列系统&#xff0c;实现了高级消息队列协议&#xff08;AMQP&#xff09;。 &#x1f340;RabbitMQ起源于金融系统&#xff0c;现在广泛应用于各种分布式系统中。它的主要功能是在应用程序之间提供异步消息传递&#xff0c;实现系统间的解耦和…

nginx离线部署-aarch64架构

nginx离线部署-aarch64架构 服务器环境: 架构&#xff1a;aarch64&#xff0c; 系统&#xff1a;Red Hat &#xff08;CentOS 7&#xff09; nginx 1.24 需要准备这些&#xff1a; 可以先尝试安装 Nginx 安装NGINX 内网是没有网络的需要使用 RPM 包安装 gcc&#xff0c; g…

巨杉数据库携手广发证券入选2023大数据“星河”案例

近期&#xff0c;中国信息通信研究院、中国通信标准化协会大数据技术标准推进委员会(CCSA TC601)连续七年共同组织的大数据“星河&#xff08;Galaxy&#xff09;”案例征集活动发布公示。本次征集活动&#xff0c;旨在通过总结和推广大数据产业发展的优秀成果&#xff0c;推动…

2023年衣物清洁赛道行业数据分析(电商数据查询):总销额同比下滑21%

衣物清洁产品是日常生活中的必备消费品&#xff0c;加之消费频次较高&#xff0c;因此在我国较大的人口基数背景下&#xff0c;衣物清洁市场的整体体量也比较大。不过&#xff0c;从年度的销售走势看&#xff0c;2023年衣物清洁市场的整体销售呈现一定幅度的下滑。 根据鲸参谋…

【CSS】字体效果展示

测试时使用了Google浏览器。 1.Courier New 2.monospace 3.Franklin Gothic Medium 4.Arial Narrow 5.Arial 6.sans-serif 7.Gill Sans MT 8.Calibri 9.Trebuchet MS 10.Lucida Sans 11.Lucida Grande 12.Lucida Sans Unicode 13.Geneva 14.Verdana 15.Segoe UI 16.Tahoma 17.…

安全小记-ngnix负载均衡

目录 一.配置ngnix环境二.nginx负载均衡 一.配置ngnix环境 本次实验使用的是centos7,首先默认yum源已经配置好&#xff0c;没有配置好的自行访问阿里云镜像站 https://developer.aliyun.com/mirror/ 接着进行安装工作 1.首先创建Nginx的目录并进入&#xff1a; mkdir /soft &…

【蓝桥杯冲冲冲】旅行计划

蓝桥杯备赛 | 洛谷做题打卡day18 文章目录 蓝桥杯备赛 | 洛谷做题打卡day18旅行计划题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1 提示题解代码我的一些话 旅行计划 题目描述 Kira酱要去一个国家旅游。这个国家有 N N N 个城市&#xff0c;编号为 1 1 1 至 N N…

Elasticsearch介绍以及基本操作

目录 一、Elasticsearch介绍 二、关于Elasticsearch的基本操作 &#xff08;1&#xff09;索引操作 &#xff08;2&#xff09;文档操作 三、域的属性 &#xff08;1&#xff09;index &#xff08;2&#xff09;type &#xff08;3&#xff09;store 一、Elasticsearc…

数据结构(队列)

一.什么是队列 1.队列定义 队列是一种特殊的线性表&#xff0c;特殊之处在于他只允许在表的前端(front)进行删除操作&#xff0c;而在表的后端(rear)进行插入操作。和栈一样&#xff0c;队列也是一种操作受限制的线性表。进行插入操作的一端称为队尾&#xff0c;进行删除操作的…

安装好IntelliJ IDEA点击无反应,如何解决配置文件不一致导致的启动问题

在我们的开发生涯中&#xff0c;遇到IDE工具出现问题是在所难免的。最令人头疼的莫过于&#xff0c;你的IDEA(IntelliJ IDEA)无法启动&#xff0c;而且没有任何错误提示。这篇文章将详细讲解如何解决IntelliJ IDEA 2023.3.3版本启动失败的问题&#xff0c;这个问题可能也适用于…

【日常学习笔记】gflags

https://mp.weixin.qq.com/s/FFdAUuQavhD5jCCY9aHBRg gflags定义的是全局变量&#xff0c;在main函数后&#xff0c;添加::gflags::ParseCommandLineFlags函数&#xff0c;就能解析命令行&#xff0c;在命令行传递定义的参数。 在程序中使用DEFINE_XXX函数定义的变量时&#x…