Kafka 安装教程和基本操作

一、简介

Kafka 是最初由 Linkedin 公司开发,是一个分布式、分区的、多副本的、多订阅者,基于 zookeeper 协调的分布式日志系统(也可以当做 MQ 系统),常见可以用于 web/nginx 日志、访问日志,消息服务等等,Linkedin于2010年12月贡献给了 Apache基金会 并成为顶级开源项目。

应用特性

  • 分布式存储:数据被自动分区并分布在集群的节点中。
  • 消息有序性Kafka 能确保从生产者传到消费者的记录都是有序的。
  • 高容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)。
  • 高吞吐量Kafka 支持单机每秒至少处理10万以上消息,通常可以达到数百万条消息。
  • 易扩展性:支持集群热扩展。
  • 高并发:支持数千个客户端同时读写。
  • 持久性:支持消息数据持久化到本地磁盘 并支持数据备份和灵活配置数据的持久化时间。
  • 实时处理/低延迟:在数据写入的同时对进行处理,消息延迟最低只有几毫秒。

应用场景

Kafka 本质是 支持分布式的消息系统/消息中间件 。分析 Kafka 的应用场景等同于分析 消息中级件 的应用场景。通常,使用 消息系统 的 发布/订阅模型 功能来连接 生产者消费者。实现以下三大功能:

  • 生产者和消费者的解耦
  • 消息持久化 / 消息冗余
  • 消息缓冲 / 流量消峰

具体应用场景有:

  • 日志收集或数据管道:作为日志收集系统或数据处理管道的一部分,以处理大量的日志数据或实时数据流。
  • 负载均衡:如果系统收到大量请求或数据流,可以使用消息队列把这些任务平均分配给多个处理器或服务,从而实现负载均衡。
  • 系统解耦:消息队列经常用作不同服务间的通信机制,以解耦系统的不同部分。
  • 分布式事务:如果一个事务需要跨多个服务进行,可以使用消息队列来协调不同服务之间的通信,确保事务的原子性。
  • 实时流数据处理:比如实时日志分析或者实时数据报警。Kafka 能接收实时数据流并保证它的可靠性和持久性,这样就可以在上游源源不断生产数据的同时,下游可以实时地进行分析。
  • 通知和实时更新:消息队列可以用作通知的中介,比如告知用户完成某个任务,或者在后端数据更新时实时通知前端。

设计目标

  • 高性能:以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
  • 高吞吐率:即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
  • 消息系统:支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
  • 横向扩展:支持在线水平热扩展

二、kafka安装和配置

1. zookeeper安装配置

需要说明一下, 为了支持 Kafka 的集群功能, Zookeeper 必须使用集群模式部署。
本文以部署 3 个Zookeeper 实例的伪集群为例。具体安装步骤参阅之前的文章:Zookeeper 安装教程和使用指南

2. kafka安装配置

下载链接:Kafka Downloads

下载页面中包含两种下载方式

  • : kafka-[version]-src.tgz:包含 Kafka 源码和API源码,需要自己编译

a) 安装

[root@Ali ~]# wget https://downloads.apache.org/kafka/3.6.2/kafka_2.12-3.6.2.tgz
[root@Ali ~]# tar xzvf kafka_2.12-3.6.2.tgz
[root@Ali ~]# mv /usr/local/kafka_2.12-3.6.2 /usr/local/kafka

b) 配置实例

配置第一个 Kafka 实例

# broker 编号,集群内必须唯一
broker.id=1
# 监听所有ip的9091端口,PLAINTEXT表示明文传输
listeners=PLAINTEXT://:9091
# 相当于listeners=PLAINTEXT://0.0.0.0:9091
# 消息日志存放地址
log.dirs=/usr/local/kafka/logs
# ZooKeeper 地址,多个用,分隔   /kafka指定在zk上的目录
zookeeper.connect=localhost:12181/kafka,localhost:22181/kafka

配置第二个 Kafka 实例

# broker 编号,集群内必须唯一
broker.id=1
# 监听所有ip的9092端口,PLAINTEXT表示明文传输
listeners=PLAINTEXT://:9092
# 消息日志存放地址
log.dirs=/opt/kafka/logs
# ZooKeeper 地址,多个用,分隔
zookeeper.connect=localhost:12181/kafka,localhost:22181/kafka

注:两个客户端的listeners中的port不能一样

4) 服务管理

# 启动服务 -daemon 表示后台启动
$KAFKA_HOME/bin/kafka-server-start.sh -daemon config/server.properties


# 查看服务
jps -l
	43330 org.apache.zookeeper.server.quorum.QuorumPeerMain
	14356 org.elasticsearch.bootstrap.Elasticsearch
	14583 org.logstash.Logstash
	45976 kafka.Kafka  # kafka服务进程
	
netstat -anlpt | grep 9091
	tcp6       0      0 :::9091                 :::*                    LISTEN      45976/java
	tcp6       0      0 192.168.18.128:9091     192.168.18.128:49356    TIME_WAIT   -

# 关闭服务
$KAFKA_HOME/bin/kafka-server-stop.sh

3. 常用操作

1) 创建topic

 #两条命令效果一样
bin/kafka-topics.sh --create --bootstrap-server localhost:9091 --partitions 2 --replication-factor 2 --topic yumu
bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --partitions 2 --replication-factor 2 --topic yumu 

在kafka1上创建一个topic,会自动同步到其他客户端

  • --create表示创建操作
  • --zookeeper 指定了 Kafka 连接的 ZooKeeper
  • --partitions 表示每个主题4个分区
  • --replication-factor 表示创建每个分区创建2个副本(副本因子)
  • --topic 表示主题名称。
    注:副本因子不能超过存活的broker数量,否则报错:Replication factor: 20 larger than available brokers: xxx.

2) 查看topic

# 查看topic列表     #两条命令效果一样
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka 
	__consumer_offsets
	topic-demo
	yumu

# 查看topic详细信息   #两条命令效果一样
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic yumu
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic yumu 
	Topic: yumu	PartitionCount: 2	ReplicationFactor: 2	Configs:
		Topic: yumu	Partition: 0	Leader: 1	Replicas: 1,2	Isr: 1,2
		Topic: yumu	Partition: 1	Leader: 1	Replicas: 2,1	Isr: 1,2

3) 测试通信

# 窗口1,启动生产者,向yumu主题发送消息
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic yumu

# 窗口2,启动消费者,订阅yumu主题
bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic yumu

# 窗口3,启动消费者,订阅yumu主题
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu

=====结果=====
# 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic yumu
>hello, kafka!
>once again.
>
# 消费者1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic yumu
hello, kafka!
once again.

# 消费者2
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu
hello, kafka!
once again.

# 查看所有消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu --from-beginning

# 删除topic
bin/kafka-topics.sh --delete --bootstrap-server localhost:9091  --topic yumu

三、遇到的问题

1. 第一次启动kafka成功后,关闭kafka并修改配置,再次启动失败,报错如下:

[2020-11-07 20:43:00,866] INFO Cluster ID = MChFWWMBT9GJClVEriND5A (kafka.server.KafkaServer)
[2020-11-07 20:43:00,873] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentClusterIdException: The Cluster ID MChFWWMBT9GJClVEriND5A doesn't match stored clusterId Some(c6QPfvqlS6C3gtsYZptQ8Q) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
        at kafka.server.KafkaServer.startup(KafkaServer.scala:235)
        at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
        at kafka.Kafka$.main(Kafka.scala:82)
        at kafka.Kafka.main(Kafka.scala)
[2020-11-07 20:43:00,875] INFO shutting down (kafka.server.KafkaServer)
[2020-11-07 20:43:00,877] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient)
[2020-11-07 20:43:00,986] INFO Session: 0x1000da0dde2000c closed (org.apache.zookeeper.ZooKeeper)
[2020-11-07 20:43:00,986] INFO EventThread shut down for session: 0x1000da0dde2000c (org.apache.zookeeper.ClientCnxn)
[2020-11-07 20:43:00,987] INFO [ZooKeeperClient Kafka server] Closed. (kafka.zookeeper.ZooKeeperClient)
[2020-11-07 20:43:00,992] INFO shut down completed (kafka.server.KafkaServer)[2020-11-07 20:43:00,992] ERROR Exiting Kafka. (kafka.server.KafkaServerStartable)
[2020-11-07 20:43:00,993] INFO shutting down (kafka.server.KafkaServer)

原因:
kafka启动之后会生成一些日志和配置,导致这个问题的原因是第一次启动之后生成了log/meta.properties文件

cat meta.properties
#
#Sat Nov 07 21:43:51 CST 2020
broker.id=1
version=0
cluster.id=MChFWWMBT9GJClVEriND5A

第二次改完配置后再去启动的时候生成应该会生成一个新的id,新的id和旧的ID不一致导致无法启动,删除log/meta.properties文件后重新启动即可(疑问:是不是我关闭的方法不对呢?)

推荐阅读:

  • Kafka介绍
  • ELK介绍
  • Kafka安装
  • C语言操作kafka以及安装librdkafka库

下一篇:Kafka消息系统原理

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/647972.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++之lambda函数与std::bind区别及用法实例(二百八十)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏:多媒…

AI网络爬虫-从当当网批量获取图书信息

工作任务和目标:用户输入一个图书名称,然后程序自动从当当网批量获取图书信息 查看相关元素在源代码中的位置: 第一步:在deepseek中输入提示词: 你是一个Python爬虫专家,一步步的思考,完成以下…

5.26机器人基础-DH参数 正解

1.建立DH坐标系 1.确定Zi轴(关节轴) 2.确定基础坐标系 3.确定Xi方向(垂直于zi和zi1的平面) 4.完全确定各个坐标系 例子: 坐标系的布局是由个人决定的,可以有不同的选择 标准坐标系布局: …

HAL库LED点灯

一、搭建开发环境 (一)安装MDK5 具体安装请参照下面链接:如何开始一个stm32的简单程序的编译_stm32程序编译-CSDN博客 (二)安装Jdk 由于STM32CubeMX需要用到JAVA,因此需要安装jdk环境。 jdk官网下载链接…

素数判断的奥秘与编程实践

新书上架~👇全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我👆,收藏下次不迷路┗|`O′|┛ 嗷~~ 目录 一、素数定义的深入理解 二、非素数的例子与思考 三、素数判断的编程实现 1. 穷举法判断素…

protobuf —— 认识和安装

protobuf —— 认识和安装 什么是序列化和反序列化有哪些常见的什么是序列化和反序列化工具Protobuf安装安装依赖开始安装 连接动态库一些遗留问题 我们今天来看一个序列化和反序列化的工具:protobuf。 什么是序列化和反序列化 序列化(Serialization&a…

基于SpringBoot和Mybatis实现的留言板案例

目录 一、需求及界面展示 二、准备工作 引入依赖 .yml文件相关配置 数据库数据准备 三、编写后端代码 需求分析 代码结构 Model Mapper Service Controller 前端代码 四、测试 一、需求及界面展示 需求: 1. 输入留言信息,点击提交&…

MySQL:图文超详细教程MySQL5.7下载与安装

一、前言 MySQL 5.7 是一个重要的数据库管理系统版本,它带来了多项改进和新特性,本文将超详细的带大家手动安装一下MySQL5.7。 二、下载MySQL5.7版本 MySQL5.7安装包 链接:https://pan.baidu.com/s/1lz5rp9PwfyeHzkEfI_lW6A 提取码&#…

MyBatis框架的使用:mybatis介绍+环境搭建+基础sql的使用+如何使用Map传入多个参数+返回多个实体用List或者Map接收+特殊sql的使用

MyBatis框架的使用:mybatis介绍环境搭建基础sql的使用如何使用Map传入多个参数返回多个实体用List或者Map接收特殊sql的使用 一、MyBatis介绍1.1 特性1.2 下载地址1.3 和其它持久层技术对比 二、搭建环境2.1配置maven2.2 创建mybatis配置文件2.3 搭建测试环境 三、基…

spring状态机实战

引言 完整代码库gitee地址:代码地址 一、什么是状态机 状态机是有限状态自动机的简称,是现实事物运行规则抽象而成的一个数学模型,是一种概念性机器,它能采取某种操作来响应一个外部事件。这种操作不仅能取决于接收到的事件,还…

如何使用Rust构建Python原生库?注意,不是动态链接库!!!

参考文档:https://github.com/PyO3/pyo3 创建python虚拟环境: conda create --name pyo3 python3.11.7激活虚拟环境: conda activate pyo3安装依赖: pip install maturin初始化项目: maturin init构建项目&#x…

搜索二叉树(C++)

文章目录 1. 搜索二叉树的概念2. 搜索二叉树结构的定义3. 搜索二叉树的操作3.1 搜索二叉树的插入3.2 搜索二叉树的删除3.3 搜索二叉树的查找 4. 完整代码 1. 搜索二叉树的概念 二叉搜索树(Binary Search Tree,简称 BST),又称二叉…

Android Studio 获取 SHA1

以 debug.keystore 调试密钥库为例。 步骤1:明确 debug.keystore 位置 debug.keystore 在 .android 目录下: Windows 用户:C:\Users\用户名\.android\debug.keystore Mac 用户:/Users/用户名/.android/debug.keystore 假设我的…

在Windows中安装Redis

一、下载Redis github链接:https://github.com/redis-windows/redis-windows/releases 二、安装 解压后点击start.bat文件即可启动服务 新开一个cmd窗口进入安装了Redis的文件夹输入redis-cli.exe -h 127.0.0.1 -p 6379连接Redis,见如下结果便是成功&…

数据结构——链式二叉树知识点以及链式二叉树数据操作函数详解!!

引言:该博客将会详细的讲解二叉树的三种遍历方法:前序、中序、后序,也同时会讲到关于二叉树的数据操作函数。值得一提的是,这些函数几乎都是建立在一个函数思想——递归之上的。这次的代码其实写起来十分简单,用不了几…

【Springboot系列】SpringBoot 中的日志如何工作的,看完这一篇就够了

文章目录 强烈推荐引言Spring Boot 中的日志是怎么工作日志框架选择配置文件日志级别自定义日志配置集成第三方日志库实时监控和日志管理 Log4j2工作原理分析1. 核心组件2. 配置文件3. Logger的继承和层次结构4. 日志事件处理流程5. 异步日志 总结强烈推荐专栏集锦写在最后 强烈…

免费图片文字转换成文本,ocr文字识别软件免费版,真的太实用了!

截屏短视频上一段扎心文字,想把它发到朋友圈怎么办?这时候就需要一个OCR识别软件。 它就像一个聪明的小助手,它可以帮助电脑“看懂”书本上或者图片里的字。就像我们用眼睛看字一样,OCR软件用它的“眼睛”扫描图片,识…

C语言代码错误(一)

今天在写选择排序代码时&#xff0c;在测试数据发现不能显示结果 1、代码如下&#xff1a; #include <stdio.h>int main(void) {int i, j; // 循环变量int MinIndex; // 保存最小的值的下标int buf; // 互换数据时的临时变量int n;printf("你想输入多少个数据n:\n…

乐理学习-音及音名

1. 我觉得练习题很重要。我要得到一个反馈 所以我想没学习完书中的一节就要把练习题做下来&#xff0c;虽然慢点也可以。 2. 做个小计划。 今天计算了一下学完《基本乐理-李重光》如果每天3张。也要80天干完&#xff0c;希望能有一天可以学习7张的速度的时候。 3. 练习记录…

【除自身以外数组的乘积】python

目录 思路&#xff1a; 代码&#xff1a; 思路&#xff1a; 直接计算前缀乘积&#xff0c;后缀乘积&#xff0c;然后相乘即可 开始我还在想&#xff0c;遍历一次i&#xff0c;怎么能同时计算前缀乘积和后缀乘积&#xff0c;事实上分开计算比较方便。。 代码&#xff1a; cl…