【企业级分布式系统】 Kafka集群

文章目录

  • Kafka
    • Kafka 概述
      • 使用消息队列的好处
    • Kafka 的特性
    • Kafka 系统架构
      • Kafka 的应用场景
      • Kafka 的优缺点
  • Kafka 集群部署
    • 下载安装包
    • 安装 Kafka
    • Kafka 命令行操作
    • Kafka 架构深入
  • Filebeat+Kafka+ELK 部署指南~
    • 部署 Zookeeper+Kafka 集群
    • 部署 Filebeat
    • 部署 ELK(Logstash 配置)
    • Kibana 配置与查看日志

Kafka

Kafka 概述

Kafka 是一个分布式、基于发布/订阅模式的消息队列系统,由 Linkedin 开发并贡献给 Apache 基金会,现已成为顶级开源项目。它主要应用于大数据领域的实时计算以及日志收集,具有高吞吐量、低延迟、可扩展性、持久性、可靠性、容错性和高并发的特性。

使用消息队列的好处

  1. 解耦:允许独立地扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
  2. 可恢复性:系统的一部分组件失效时,不会影响到整个系统。
  3. 缓冲:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
  4. 灵活性 & 峰值处理能力:使用消息队列能够使关键组件顶住突发的访问压力。
  5. 异步通信:允许用户把一个消息放入队列,但并不立即处理它。

Kafka 的特性

  • 高吞吐量、低延迟:每秒可以处理几十万条消息,延迟最低只有几毫秒。
  • 可扩展性:Kafka 集群支持热扩展。
  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。
  • 容错性:允许集群中节点失败。
  • 高并发:支持数千个客户端同时读写。

Kafka 系统架构

  1. Broker
    • 一台 Kafka 服务器就是一个 broker。
    • 一个集群由多个 broker 组成。
    • 一个 broker 可以容纳多个 topic。
  2. Topic
    • 可以理解为一个队列,生产者和消费者面向的都是一个 topic。
    • 类似于数据库的表名或者 ES 的 index。
    • 物理上不同 topic 的消息分开存储。
  3. Partition
    • 为了实现扩展性,一个非常大的 topic 可以分布到多个 broker 上。
    • 一个 topic 可以分割为一个或多个 partition,每个 partition 是一个有序的队列。
    • Kafka 只保证 partition 内的记录是有序的。
    • 数据路由规则:指定了 partition 则直接使用;未指定但指定 key,通过对 key 的 value 进行 hash 取模选出一个 partition;都未指定,使用轮询选出一个 partition。
    • 每个 partition 中的数据使用多个 segment 文件存储。
  4. Replica
    • 副本机制,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 Kafka 仍然能够继续工作。
    • 一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
  5. Leader
    • 当前负责数据的读写的 partition。
  6. Follower
    • 跟随 Leader,所有写请求都通过 Leader 路由。
    • 数据变更会广播给所有 Follower,与 Leader 保持数据同步。
    • 只负责备份,不负责数据的读写。
    • 如果 Leader 故障,则从 Follower 中选举出一个新的 Leader。
  7. Producer
    • 数据的发布者,将消息 push 发布到 Kafka 的 topic 中。
  8. Consumer
    • 从 broker 中 pull 拉取数据。
    • 可以消费多个 topic 中的数据。
  9. Consumer Group(CG)
    • 由多个 consumer 组成。
    • 所有的消费者都属于某个消费者组。
    • 消费者组内每个消费者负责消费不同分区的数据,防止数据被重复读取。
    • 消费者组之间互不影响。
  10. Offset 偏移量
    • 唯一标识一条消息。
    • 决定读取数据的位置。
    • 消费者通过偏移量来决定下次读取的消息。
    • 消息被消费之后,并不被马上删除。
    • 某一个业务也可以通过修改偏移量达到重新读取消息的目的。
    • 消息默认生命周期为 1 周(7*24小时)。
  11. Zookeeper
    • 在 Kafka 中,ZooKeeper 负责维护 Kafka 集群的一些元数据和 leader 选举等协调工作。
    • 元数据存储:存储主题、分区、Broker 节点等信息。
    • Leader 选举:参与领导者选举的过程。
    • 健康监控:进行集群的健康监控。
    • 消费者组协调:协调和追踪消费者的位置信息。

Kafka 的应用场景

  1. 日志收集:Kafka 可以被用作日志收集系统,将各种应用的日志数据集中收集起来,方便后续的处理和分析。
  2. 实时计算:Kafka 可以作为实时计算系统的数据源,如 Spark Streaming、Flink 等,用于实时数据处理和分析。
  3. 消息通讯:Kafka 可以作为消息通讯系统,实现不同系统之间的数据交换和通信。
  4. 流量削峰:在高并发场景下,Kafka 可以作为流量削峰的工具,将大量的请求缓存到 Kafka 中,然后按照一定的速率进行处理,避免系统崩溃。

Kafka 的优缺点

优点

  • 高吞吐量、低延迟。
  • 可扩展性强。
  • 持久性、可靠性高。
  • 支持多副本、容错性强。
  • 社区活跃、生态丰富。

缺点

  • 依赖 Zookeeper,如果 Zookeeper 出现故障,会影响 Kafka 的正常运行。
  • 数据一致性方面,虽然 Kafka 提供了多副本机制,但是在极端情况下,仍然可能存在数据丢失的风险。
  • 消息顺序问题,如果生产者发送消息到多个分区,那么消费者消费时可能无法保证消息的顺序性。

Kafka 集群部署

下载安装包

  • 官方下载地址:Apache Kafka 下载页面
  • 步骤
    1. 切换到 /opt 目录。
    2. 使用 wget 从清华大学镜像站下载 Kafka 2.7.1 版本。
cd /opt
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz

安装 Kafka

  • 步骤
    1. 解压 Kafka 压缩包。
    2. 将解压后的目录移动到 /usr/local/kafka
    3. 备份并编辑 server.properties 文件,配置 Kafka。
cd /opt/
tar zxvf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka

cd /usr/local/kafka/config/
cp server.properties{,.bak}
vim server.properties
  • 关键配置项

    • broker.id:每个 Kafka 实例的唯一标识,集群中每个实例的 broker.id 必须不同。
    • listeners:指定 Kafka 监听的 IP 和端口。
    • num.network.threadsnum.io.threads:分别设置处理网络请求和磁盘 IO 的线程数。
    • log.dirs:Kafka 数据和日志的存放路径。
    • zookeeper.connect:指定 Zookeeper 集群的地址。
  • 环境变量配置

    • 将 Kafka 的 bin 目录添加到 PATH 环境变量中。
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
  • 配置启动脚本
    • 创建一个 Kafka 的启动脚本,并设置开机自启。
vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)
    echo "---------- Kafka 启动 ------------"
    ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
    echo "---------- Kafka 停止 ------------"
    ${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
    $0 stop
    $0 start
;;
status)
    echo "---------- Kafka 状态 ------------"
    count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
    if [ "$count" -eq 0 ];then
        echo "kafka is not running"
    else
        echo "kafka is running"
    fi
;;
*)
    echo "Usage: $0 {start|stop|restart|status}"
esac

chmod +x /etc/init.d/kafka
chkconfig --add kafka
service kafka start

Kafka 命令行操作

  • 创建 topic
kafka-topics.sh --create --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 --replication-factor 2 --partitions 3 --topic test
  • 查看 topic
kafka-topics.sh --list --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181
kafka-topics.sh --describe --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 --topic test
  • 发布和消费消息
# 生产者
kafka-console-producer.sh --broker-list 192.168.80.10:9092,192.168.80.11:9092,192.168.80.12:9092 --topic test

# 消费者
kafka-console-consumer.sh --bootstrap-server 192.168.80.10:9092,192.168.80.11:9092,192.168.80.12:9092 --topic test --from-beginning
  • 修改和删除 topic
# 修改分区数
kafka-topics.sh --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 --alter --topic test --partitions 6

# 删除 topic
kafka-topics.sh --delete --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 --topic test

Kafka 架构深入

  • 工作流程及文件存储机制

    • Kafka 以 topic 对消息进行分类,producer 和 consumer 都是面向 topic 的。
    • Topic 是逻辑概念,partition 是物理概念,每个 partition 对应一个 log 文件。
    • 为防止 log 文件过大,Kafka 采用分片和索引机制,将每个 partition 分为多个 segment,每个 segment 包含 .index.log 文件。
  • 数据可靠性保证

    • Kafka 通过 ack 应答机制保证数据可靠性,producer 发送数据后需要等待 broker 的确认。
  • 数据一致性问题

    • LEO:每个副本的最大 offset。
    • HW:消费者能见到的最大 offset,所有副本中最小的 LEO。
    • Leader 和 follower 故障时的数据恢复和同步机制。
  • ack 应答机制

    • Kafka 提供了三种可靠性级别(acks=0, 1, -1),用户可以根据需求选择。
    • 幂等性:在 0.11 版本及以后,Kafka 引入了幂等性特性,保证 producer 发送重复数据时,server 端只持久化一条。

注释

  • Kafka 的安装和配置需要根据集群的实际环境进行调整,特别是 IP 地址和端口号。
  • 在生产环境中,通常需要配置更多的参数以优化性能和可靠性。
  • Kafka 的数据可靠性和一致性机制是其核心特性之一,理解这些机制对于保证数据的安全性和一致性至关重要。

Filebeat+Kafka+ELK 部署指南~

部署 Zookeeper+Kafka 集群

  • 目的:搭建消息队列系统,用于日志数据的传输。
  • 步骤
    1. 安装并配置 Zookeeper 集群。
    2. 安装并配置 Kafka 集群,指定 Zookeeper 集群地址。
    3. 启动 Zookeeper 和 Kafka 服务,确保集群正常运行。

部署 Filebeat

  • 目的:收集服务器上的日志数据。
  • 步骤
    1. 下载并解压 Filebeat 到指定目录(如 /usr/local/filebeat)。
    2. 编辑 filebeat.yml 配置文件:
      filebeat.prospectors:
      - type: log
        enabled: true
        paths:
          - /var/log/httpd/access_log
        tags: ["access"]
      
      - type: log
        enabled: true
        paths:
          - /var/log/httpd/error_log
        tags: ["error"]
      
      # 添加输出到 Kafka 的配置
      output.kafka:
        enabled: true
        hosts: ["192.168.80.10:9092","192.168.80.11:9092","192.168.80.12:9092"]
        topic: "httpd"
      
    3. 启动 Filebeat,开始收集日志并发送到 Kafka。

部署 ELK(Logstash 配置)

  • 目的:从 Kafka 拉取日志数据,并处理、存储到 Elasticsearch 中。
  • 步骤
    1. 在 Logstash 组件所在节点上,新建一个 Logstash 配置文件 kafka.conf
      input {
        kafka {
          bootstrap_servers => "192.168.80.10:9092,192.168.80.11:9092,192.168.80.12:9092"
          topics  => "httpd"
          type => "httpd_kafka"
          codec => "json"
          auto_offset_reset => "latest"
          decorate_events => true
        }
      }
      
      output {
        if "access" in [tags] {
          elasticsearch {
            hosts => ["192.168.80.30:9200"]
            index => "httpd_access-%{+YYYY.MM.dd}"
          }
        }
      
        if "error" in [tags] {
          elasticsearch {
            hosts => ["192.168.80.30:9200"]
            index => "httpd_error-%{+YYYY.MM.dd}"
          }
        }
      
        stdout { codec => rubydebug }
      }
      
    2. 启动 Logstash,开始从 Kafka 拉取日志并存储到 Elasticsearch。

Kibana 配置与查看日志

  • 目的:通过 Kibana 可视化界面查看日志数据。
  • 步骤
    1. 在浏览器中访问 Kibana(如 http://192.168.80.30:5601)。
    2. 登录 Kibana(如果设置了登录认证)。
    3. 单击“Create Index Pattern”按钮,添加索引模式,例如 httpd_access-*httpd_error-*(注意:这里应与 Logstash 配置中的 index 名称匹配,但原笔记中的 filebeat_test-* 是不正确的)。
    4. 单击“create”按钮创建索引模式。
    5. 单击“Discover”按钮,可查看图表信息及日志信息。

注释

  • 在配置 Filebeat 和 Logstash 时,确保 Kafka 集群的地址和 topic 名称正确无误。
  • Logstash 的 auto_offset_reset 参数决定了从 Kafka 拉取数据的起始位置,latest 表示从最新的数据开始拉取,earliest 表示从头开始拉取。
  • Kibana 中的索引模式应与 Logstash 配置中的 index 名称一致,以便正确显示日志数据。
  • 在实际部署中,还需要考虑安全性、性能优化等方面的问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/918402.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

html 图片转svg 并使用svg路径来裁剪html元素

1.png转svg 工具地址: Vectorizer – 免费图像矢量化 打开svg图片,复制其中的path中的d标签的路径 查看生成的svg路径是否正确 在线SVG路径预览工具 - UU在线工具 2.在html中使用svg路径 <svg xmlns"http://www.w3.org/2000/svg" width"318px" height…

Solana应用开发常见技术栈

编程语言 Rust Rust是Solana开发中非常重要的编程语言。它具有高性能、内存安全的特点。在Solana智能合约开发中&#xff0c;Rust可以用于编写高效的合约代码。例如&#xff0c;Rust的所有权系统可以帮助开发者避免常见的内存错误&#xff0c;如悬空指针和数据竞争。通过合理利…

23种设计模式-访问者(Visitor)设计模式

文章目录 一.什么是访问者模式&#xff1f;二.访问者模式的结构三.访问者模式的应用场景四.访问者模式的优缺点五.访问者模式的C实现六.访问者模式的JAVA实现七.代码解释八.总结 类图&#xff1a; 访问者设计模式类图 一.什么是访问者模式&#xff1f; 访问者模式&#xff08;…

RecyclerView详解——(四)缓存复用机制

稍微看了下源码和部分文章&#xff0c;在此做个小小的总结 RecyclerView&#xff0c;意思为可回收的view&#xff0c;那么相对于listview&#xff0c;他的缓存复用肯定是一大优化。 具体而言&#xff0c;当一个列表项被移出屏幕后&#xff0c;RecyclerView并不会销毁其视图&a…

C++设计模式行为模式———迭代器模式

文章目录 一、引言二、迭代器模式三、总结 一、引言 迭代器模式是一种行为设计模式&#xff0c; 让你能在不暴露集合底层表现形式 &#xff08;列表、 栈和树等&#xff09; 的情况下遍历集合中所有的元素。C标准库中内置了很多容器并提供了合适的迭代器&#xff0c;尽管我们不…

【网络云计算】2024第48周-技能大赛-初赛篇

文章目录 1、比赛前提2、比赛题目2.1、 修改CentOS Stream系统的主机名称&#xff0c;写出至少3种方式&#xff0c;并截图带时间戳和姓名&#xff0c;精确到秒&#xff0c;否则零分2.2、 创建一个名为你的名字的拼音的缩写的新用户并设置密码&#xff0c;将用户名添加到 develo…

【汇编语言】数据处理的两个基本问题(三) —— 汇编语言的艺术:从div,dd,dup到结构化数据的访问

文章目录 前言1. div指令1.1 使用div时的注意事项1.2 使用格式1.3 多种内存单元表示方法进行举例1.4 问题一1.5 问题一的分析与求解1.5.1 分析1.5.2 程序实现 1.6 问题二1.7 问题二的分析与求解1.7.1 分析1.7.2 程序实现 2. 伪指令 dd2.1 什么是dd&#xff1f;2.2 问题三2.3 问…

R语言数据分析案例45-全国汽车销售数据分析(可视化与回归分析)

一、研究背景 随着经济的发展和人们生活水平的提高&#xff0c;汽车已经成为人们日常生活中不可或缺的交通工具之一。汽车市场的规模不断扩大&#xff0c;同时竞争也日益激烈。对于汽车制造商和经销商来说&#xff0c;深入了解汽车销售数据背后的规律和影响因素&#xff0c;对…

【算法】【优选算法】前缀和(下)

目录 一、560.和为K的⼦数组1.1 前缀和1.2 暴力枚举 二、974.和可被K整除的⼦数组2.1 前缀和2.2 暴力枚举 三、525.连续数组3.1 前缀和3.2 暴力枚举 四、1314.矩阵区域和4.1 前缀和4.2 暴力枚举 一、560.和为K的⼦数组 题目链接&#xff1a;560.和为K的⼦数组 题目描述&#x…

论文 | Learning to Transfer Prompts for Text Generation

1. 总结与提问 论文摘要总结&#xff1a; 论文提出了一种创新的PTG&#xff08;Prompt Transfer Generation&#xff09;方法&#xff0c;旨在通过迁移提示的方式解决传统预训练语言模型&#xff08;PLM&#xff09;在数据稀缺情况下微调的问题。通过将一组已在源任务中训练好…

TON商城与Telegram App:生态融合与去中心化未来的精彩碰撞

随着区块链技术的快速发展&#xff0c;去中心化应用&#xff08;DApp&#xff09;逐渐成为了数字生态的重要组成部分。而Telegram作为全球领先的即时通讯应用&#xff0c;不仅仅满足于传统的社交功能&#xff0c;更在区块链领域大胆探索&#xff0c;推出了基于其去中心化网络的…

自动驾驶系列—探索自动驾驶数据管理的核心技术与平台

&#x1f31f;&#x1f31f; 欢迎来到我的技术小筑&#xff0c;一个专为技术探索者打造的交流空间。在这里&#xff0c;我们不仅分享代码的智慧&#xff0c;还探讨技术的深度与广度。无论您是资深开发者还是技术新手&#xff0c;这里都有一片属于您的天空。让我们在知识的海洋中…

【技术解析】Dolphinscheduler实现MapReduce任务的高效管理

MapReduce是一种编程模型&#xff0c;用于处理和生成大数据集&#xff0c;主要用于大规模数据集&#xff08;TB级数据规模&#xff09;的并行运算。本文详细介绍了Dolphinscheduler在MapReduce任务中的应用&#xff0c;包括GenericOptionsParser与args的区别、hadoop jar命令参…

数据结构哈希表-(开放地址法+二次探测法解决哈希冲突)(创建+删除+插入)+(C语言代码)

#include<stdio.h> #include<stdlib.h> #include<stdbool.h> #define M 20 #define NULLDEL -1 #define DELDEY -2typedef struct {int key;int count; }HashTable;//创建和插入 void Insert(HashTable ha[], int m, int p, int key) {int i, HO, HI;HO key…

【android USB 串口通信助手】stm32 源码demo 单片机与手机通信 Android studio 20241118

android 【OTG线】 接 下位机STM32【USB】 通过百度网盘分享的文件&#xff1a;USBToSerialPort.apk 链接&#xff1a;https://pan.baidu.com/s/122McdmBDUxEtYiEKFunFUg?pwd8888 提取码&#xff1a;8888 android 【OTG线】 接 【USB转TTL】 接 【串口(下位机 SMT32等)】 需…

大数据技术Kafka详解 ① | 消息队列(Messages Queue)

目录 1、消息队列的介绍 2、消息队列的应用场景 2.1、应用耦合 2.2、异步处理 2.3、限流削峰 2.4、消息驱动的系统 3、消息队列的两种模式 3.1、点对点模式 3.2、发布/订阅模式 4、常用的消息队列介绍 4.1、RabbitMQ 4.2、ActiveMQ 4.3、RocketMQ 4.4、Kafka 4.…

一家餐饮企业,「闯入」AI阵地

作者| 皮爷 出品|产业家 “我们需要用AI来帮助我们门店破除内卷的状态。”一位连锁餐饮品牌告诉产业家&#xff0c;“这也是我们想尽快把AI用起来的原因&#xff0c;看看能不能带来一些帮助。” 这种情况正发生在一众餐饮企业中。 与这种情况对应的一个背景是&#xff0c…

MySQL的编程语言

一、MySQL基础 使用系统的全局变量@@VERSION查看当前使用的MySQL的版本信息,SQL语句如下: select @@version; 将局部变量varl声明为char的类型,长度值为10,并为其赋值为“程菲” begin declare var1 char(10); set @var1="程菲"; end 通过局部变量查看d_eams数…

【青牛科技】电动工具直流调速专用集成电路GS069,具有电源电压范围宽、功耗小、抗干扰能力强等特性

GS069是芯谷科技推出的一款CMOS工艺、电动工具直流调速专用集成电路。具有电源电压范围宽、功耗小、抗干扰能力强等特点&#xff0c;广泛应用于各种电动工具。 产品基本参数 产品应用 1、应用图&#xff1a; 2、测试参数&#xff1a;&#xff08;VCC9V&#xff0c;RL2K&#…

PyTorch 中使用自动求导计算梯度

使用 PyTorch 进行自动求导和梯度计算 在 PyTorch 中&#xff0c;张量的 requires_grad 属性决定了是否需要计算该张量的梯度。设置为 True 的张量会在计算过程中记录操作&#xff0c;以便在调用 .backward() 方法时自动计算梯度。通过构建计算图&#xff0c;PyTorch 能够有效…