kafka服务介绍

kafka

    • 安装使用
    • 管理 Kafka

Apache Kafka 是一个开源的分布式事件流平台,主要用于实时数据传输和流处理。它最初由 LinkedIn 开发,并在 2011 年成为 Apache 基金会的顶级项目。Kafka 设计的目标是处理大规模的数据流,同时提供高吞吐量、低延迟和高容错性

Kafka 的工作原理可以从几个关键方面来理解:

    1. 消息的生产
    • Producer:生产者是发送消息的客户端,向 Kafka 发送消息。生产者将消息发布到特定的主题(Topic)中,主题内部可以有多个分区(Partition)。
    • Partitioning:消息会被分配到不同的分区。分区是主题的逻辑分片,有助于提高并发处理能力。消息的分配通常基于某种策略,比如消息键(key)或者轮询。
    1. 消息的存储
    • Broker:Kafka 的服务器组件,负责存储和管理消息。每个 Kafka 实例都是一个 Broker。多个 Brokers 组成 Kafka 集群。
    • Log Segments:在每个分区中,消息被追加到日志(log)中。日志是一个有序的、不可变的消息序列。为了管理日志文件的大小,Kafka 会定期将日志分段成多个文件。
    1. 消息的消费
    • Consumer:消费者从 Kafka 中读取消息。消费者订阅一个或多个主题,然后拉取(fetch)数据。
    • Consumer Group:消费者可以组成一个消费组(Consumer Group)。同一消费组中的消费者会分担读取同一个主题的不同分区的任务,从而实现负载均衡。每个消息只会被消费组中的一个消费者读取。
    1. 消息的存储与复制
    • Replication:为了提高数据的可靠性,Kafka 使用副本(replica)。每个分区有一个主副本(leader)和若干个从副本(follower)。所有的读写请求都由主副本处理,从副本则从主副本同步数据。
    • Leader-Follower:在分区中,领导者负责所有的读写请求,从副本负责从领导者同步数据。领导者失败时,从副本会选举新的领导者,确保高可用性。
    1. 协调与管理
    • Zookeeper:早期 Kafka 使用 Zookeeper 来管理集群的元数据和协调集群中的 Broker 和分区状态。Zookeeper 负责领导者的选举、配置管理和状态监控。
    • Kafka 2.8.0 及以后:Kafka 开始逐渐减少对 Zookeeper 的依赖,尝试用 Kafka 自身的协议来管理集群的元数据,这种模式称为 KRaft 模式(Kafka Raft Metadata Mode)。

数据流示例

  • 生产:生产者将消息发送到一个主题,主题有多个分区。每条消息附带一个时间戳。
  • 存储:消息被追加到分区的日志中。日志分为多个段,Kafka 按顺序存储消息。
  • 消费:消费者从主题的分区中读取消息。消费者可以跟踪已处理的消息位置,以实现断点续传。
  • 复制:数据在主副本和从副本之间同步,保证数据在 Broker 失败时不会丢失。

通过这些机制,Kafka 能够实现高吞吐量、低延迟和高可靠性的消息传递和数据流处理。

安装使用

Kafka 依赖于 Java 运行环境,因此首先需要安装 Java 11 或更高版本

apt install -y openjdk-11-jdk
root@huhy:~# java --version
openjdk 11.0.23 2024-04-16
OpenJDK Runtime Environment (build 11.0.23+9-post-Ubuntu-1ubuntu1)
OpenJDK 64-Bit Server VM (build 11.0.23+9-post-Ubuntu-1ubuntu1, mixed mode, sharing)

官网下载;https://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz

tar -xf kafka_2.13-3.7.1.tgz
cd kafka_2.13-3.7.1/

Kafka 的配置文件位于 config 目录中。主要的配置文件包括:

  • server.properties:Kafka 的服务器配置文件

  • zookeeper.properties:Zookeeper 的配置文件(如果使用 Zookeeper)

    • broker.id:描述:Kafka Broker 的唯一标识符。每个 Broker 必须有一个唯一的 broker.id。
      默认值:无
      示例:broker.id=0

    • listeners:描述:Kafka Broker 监听的地址和端口。指定了 Kafka 接收客户端请求的地址。
      默认值:PLAINTEXT://:9092
      示例:listeners=PLAINTEXT://localhost:9092

    • advertised.listeners:描述:Kafka 向客户端公开的地址。客户端会通过此地址与 Broker 进行通信。
      默认值:无
      示例:advertised.listeners=PLAINTEXT://your-hostname:9092

    • log.dirs:
      描述:Kafka 存储日志文件的目录。可以设置多个目录,Kafka 会将数据分散存储。
      默认值:/tmp/kafka-logs
      示例:log.dirs=/var/lib/kafka/logs

    • log.retention.hours:
      描述:日志文件的保留时间,单位是小时。超过这个时间的数据会被删除。
      默认值:168(7 天)
      示例:log.retention.hours=168

    • log.segment.bytes:
      描述:每个日志段的大小,单位是字节。日志文件会被分段存储。
      默认值:1073741824(1 GB)
      示例:log.segment.bytes=536870912(512 MB)

    • num.partitions:
      描述:默认创建的主题的分区数量。
      默认值:1
      示例:num.partitions=3

    • replication.factor:
      描述:主题的副本因子,表示每个分区有多少副本。提高副本因子可以增加数据的可靠性。
      默认值:无(主题创建时指定)
      示例:replication.factor=2

    • message.max.bytes:
      描述:Kafka 允许的最大消息大小,单位是字节。
      默认值:1000012(1 MB)
      示例:message.max.bytes=2097152(2 MB)

    • log.retention.bytes:
      描述:每个分区的日志文件最大保留大小,超过这个大小的日志会被删除。
      默认值:-1(不限制)
      示例:log.retention.bytes=1073741824(1 GB)

    • log.cleaner.enable:
      描述:启用日志清理器,用于压缩日志中的重复数据。
      默认值:false
      示例:log.cleaner.enable=true

    • security.inter.broker.protocol:
      描述:Kafka Broker 之间的通信协议,支持 PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL。
      默认值:PLAINTEXT
      示例:security.inter.broker.protocol=SSL

    • ssl.keystore.location:
      描述:SSL 密钥库的位置,用于 SSL/TLS 加密。
      默认值:无
      示例:ssl.keystore.location=/path/to/keystore.jks

    • zookeeper.connect:
      描述:Zookeeper 的连接字符串,包括 Zookeeper 的主机名和端口号。
      默认值:localhost:2181
      示例:zookeeper.connect=localhost:2181

    • zookeeper.connection.timeout.ms:
      描述:Zookeeper 连接超时设置,单位是毫秒。
      默认值:6000
      示例:zookeeper.connection.timeout.ms=10000

    • auto.create.topics.enable:
      描述:是否自动创建主题。如果设置为 true,当客户端向不存在的主题发送消息时,Kafka 会自动创建该主题。
      默认值:true
      示例:auto.create.topics.enable=false

    • delete.topic.enable:
      描述:是否允许删除主题。如果设置为 true,可以通过 Kafka 提供的脚本删除主题。
      默认值:false
      示例:delete.topic.enable=true

通常情况下,默认配置就可以开始使用。如果需要自定义配置,可以编辑这些文件

启动 Zookeeper;Kafka 需要 Zookeeper 来管理集群的元数据。Kafka 附带了一个简单的 Zookeeper 实例,开启后另起一个终端

bin/zookeeper-server-start.sh config/zookeeper.properties

启动 Kafka Broker;在另一个终端中,启动 Kafka Broker

bin/kafka-server-start.sh config/server.properties

另起一个终端3;Kafka 使用主题来组织消息。可以使用 Kafka 提供的脚本创建主题。例如,创建一个名为 test-topic 的主题:

bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Created topic test-topic.

生产消息:可以使用 Kafka 提供的生产者工具向主题中发送消息。打开一个终端并运行,然后在终端4中输入消息并按回车键发送消息

bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
root@huhy:~/kafka_2.13-3.7.1# bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
>huhy
>

消费消息;在另一个终端3中,可以运行消费者工具来读取消息,只有最开始两个终端是不能终端,后两个有交互界面可以直接用

root@huhy:~/kafka_2.13-3.7.1# bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server localhost:9092
huhy

获取信息如下

在这里插入图片描述

管理 Kafka

查看主题:

bin/kafka-topics.sh --list --bootstrap-server localhost:9092
root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
test-topic

查看主题详情:

bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server localhost:9092
root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server localhost:9092
Topic: test-topic       TopicId: rXHPQIqJRkO5lQDOsco3NQ PartitionCount: 1       ReplicationFactor: 1    Configs:
        Topic: test-topic       Partition: 0    Leader: 0       Replicas: 0     Isr: 0

删除主题;

bin/kafka-topics.sh --delete --topic test-topic --bootstrap-server localhost:9092

验证查看

root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --delete --topic test-topic --bootstrap-server localhost:9092
root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets

停止 Kafka Broker

bin/kafka-server-stop.sh

停止 Zookeeper:

bin/zookeeper-server-stop.sh

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/870207.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

shopee虾皮 java后端 一面面经 整体感觉不难

面试总结:总体不难,算法题脑抽了只过了一半,面试官点出了问题说时间到了,反问一点点,感觉五五开,许愿一个二面 1.Java中的锁机制,什么是可重入锁 Java中的机制主要包括 synchronized关键字 Loc…

SpringBoot整合elasticsearch-java

一、依赖 系统使用的是ElasticSearch8.2.0 <dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>8.1.0</version> </dependency> 二、配置 1、yml文件配置 elastics…

SpringBoot入门:如何新建SpringBoot项目(保姆级教程)

在本文中&#xff0c;我们将演示如何新建一个基本的 Spring Boot 项目。写这篇文章的时候我还是很惊讶的&#xff0c;因为我发现有些java的初学者&#xff0c;甚至工作10年的老员工居然并不会新建一个SpringBoot项目&#xff0c;所以特别出了一篇文章来教大家新建一个SpringBoo…

关于pycharm上push项目到gitee失败原因

版权声明&#xff1a;本文为博主原创文章&#xff0c;如需转载请贴上原博文链接&#xff1a;https://blog.csdn.net/u011628215/article/details/140577821?spm1001.2014.3001.5502 前言&#xff1a;最近新建项目push上gitee都没有问题&#xff0c;但是当在gitee网站进行了一个…

基于 HTML+ECharts 实现监控平台数据可视化大屏(含源码)

构建监控平台数据可视化大屏&#xff1a;基于 HTML 和 ECharts 的实现 监控平台的数据可视化对于实时掌握系统状态、快速响应问题至关重要。通过直观的数据展示&#xff0c;运维团队可以迅速发现异常&#xff0c;优化资源配置。本文将详细介绍如何利用 HTML 和 ECharts 实现一个…

CefSharp音视频编译与免费下载

注&#xff1a;Cefharp 音频和视频播放编译&#xff0c;生成相应的dll文件&#xff0c;从而支持项目开发。 建议编译至少 16G 的 RAM和至少 250G 的 SSD。该脚本以 E 盘为例&#xff0c;您需要在 E 盘上手动创建 cef 文件夹。禁止在转载后通过发布其他平台向用户收取下载费用。…

【Qt 】JSON 数据格式详解

文章目录 1. JSON 有什么作用?2. JSON 的特点3. JSON 的两种数据格式3.1 JSON 数组3.2 JSON 对象 4. Qt 中如何使用 JSON 呢&#xff1f;4.1 QJsonObject4.2 QJsonArray4.3 QJsonValue4.4 QJsonDocument 5. 构建 JSON 字符串6. 解析 JSON 字符串 1. JSON 有什么作用? &#x…

VIN解析汽车详情|阿里云实现调用API接口

介绍&#xff1a; 本次解析通过阿里云云市场的云服务来实现通过17位车架号来自动识别车型的详细信息&#xff0c;首先需要准备选择一家可以提供查询的商品。 https://market.aliyun.com/apimarket/detail/cmapi00065864#skuyuncode5986400001 步骤1: 选择商品 如图点击免费…

鸿蒙界面开发

界面开发 //构建 → 界面 build() {//行Row(){//列Column(){//文本 函数名(参数) 对象.方法名&#xff08;参数&#xff09; 枚举名.变量名Text(this.message).fontSize(40)//设置文本大小.fontWeight(FontWeight.Bold)//设置文本粗细.fontColor(#ff2152)//设置文本颜色}.widt…

四、GD32 MCU 常见外设介绍 (5) TIMER 模块介绍

5.1.TIMER 基础知识 TIMER分高级定时器&#xff0c;通用定时器L0&#xff0c;L1&#xff0c;L2和基本定时器。 5.2.硬件连接说明 TIMER 属于片内外设&#xff0c;对于外部硬件设计&#xff0c;只需要单独IO口外接信号线即可。 5.3.GD32 TIMER 外设原理简介&#xff08;以 G…

Mysql或MariaDB数据库的用户与授权操作——实操保姆级教程

一、问题描述 在日常的工作中,我们需要给不同角色的人员创建不同的账号,他们各自可访问的数据库或权限不一样,这时就需要创建用户和赋予不同的权限内容了。 二、问题分析 1、创建不同的角色账号; 2、给这些账号授予各自可访问数据库的权限。 三、实现方法 Centos8安装…

(7) cmake 编译C++程序(二)

文章目录 概要整体代码结构整体代码小结 概要 在ubuntu下&#xff0c;通过cmake编译一个稍微复杂的管理程序 整体代码结构 整体代码 boss.cpp #include "boss.h"Boss::Boss(int id, string name, int dId) {this->Id id;this->Name name;this->DeptId …

最简单的typora+gitee+picgo配置图床

typoragiteepicgo图床 你是否因为管理图片而感到头大&#xff1f;是时候了解一下 Typora、Gitee 和 PicGo 这个超级三剑客了&#xff0c;它们可以帮你轻松打造自己的图床&#xff0c;让你的博客图片管理变得简单又有趣。让我们开始这场神奇的图床之旅吧&#xff01; Typora …

Elasticsearch概念及ELK安装

1、Elasticsearch是什么 它是elastic技术栈中的一部分。完整的技术栈包括&#xff1a; Elasticsearch&#xff1a;用于数据存储、计算和搜索 Logstash/Beats&#xff1a;用于数据收集 Kibana&#xff1a;用于数据可视化 整套技术栈被称为ELK&#xff0c;经常用来做日志收集…

大语言模型-GPT-Generative Pre-Training

一、背景信息&#xff1a; GPT是2018 年 6 月由OpenAI 提出的预训练语言模型。 GPT可以应用于复杂的NLP任务中&#xff0c;例如文章生成&#xff0c;代码生成&#xff0c;机器翻译&#xff0c;问答对话等。 GPT也采用两阶段的训练过程&#xff0c;第一阶段是无监督的方式来预训…

HarmonyOS 请求相应HTTPS概览

1.HTTP概述 请求和响应 2.HTTP请求开发步骤 2.1.module.json5中添加 ohos.permission.INTERNET 2.2.导入http模块 2.3.创建htppt请求 2.4.发起请求 2.5.处理响应 2.6.销毁http对象 3.几个基本概念&#xff1a; 3.1.Webview&#xff1a;提供We…

git 版本回退-idea

1、选中项目&#xff0c;右键&#xff0c;打开 git历史提交记录 2、选中想要回退的版本&#xff0c;选择 hard&#xff08;不保留版本记录&#xff09; 3、最终选择强制提交&#xff08;必须强制&#xff09; OK&#xff0c;搞定

被问到MQ消息已丢失,该如何处理?

在分布式系统中&#xff0c;消息中间件&#xff08;如 RabbitMQ、RocketMQ、Kafka、Pulsar 等&#xff09;扮演着关键角色&#xff0c;用于解耦生产者和消费者&#xff0c;并确保数据传输的可靠性和顺序性。尽管我们通常会采取多种措施来防止消息丢失&#xff0c;如消息持久化、…

【Docker】CentOS7环境下的安装

环境展示 安装 配置仓库 sudo yum install -y yum-utils # docker官方key文件下载 sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo # 建议使用阿里云key文件下载 sudo yum-config-manager --add-repo https://mirrors.aliyun.…

Jacoco 单元测试配置

前言 编写单元测试是开发健壮程序的有效途径&#xff0c;单元测试写的好不好可以从多个指标考量&#xff0c;其中一个就是单元测试的覆盖率。单元测试覆盖率可以看到我们的单元测试覆盖了多少代码行、类、分支等。查看单元测试覆盖率可以使用一些工具帮助我们计算&#xff0c;…