【Kafka】Kafka 架构深入

Kafka 工作流程及文件存储机制

Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的。

topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。Producer 生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset。 消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。

由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。每个 segment 对应两个文件:“.index” 文件和 “.log” 文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号

例如,test 这个 topic 有三个分区, 则其对应的文件夹为 test-0、test-1、test-2。

index 和 log 文件以当前 segment 的第一条消息的 offset 命名

“.index” 文件存储大量的索引信息,“.log” 文件存储大量的数据,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址


数据可靠性保证

为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到 producer 发送的数据后, 都需要向 producer 发送 ack(acknowledgement 确认收到),如果 producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。


数据一致性问题

LEO:指的是每个副本最大的 offset; 
HW:指的是消费者能见到的最大的 offset,所有副本中最小的 LEO

1)follower 故障

follower 发生故障后会被临时踢出 ISR(Leader 维护的一个和 Leader 保持同步的 Follower 集合),待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。

2)leader 故障

leader 发生故障之后,会从 ISR 中选出一个新的 leader, 之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader 同步数据。

注:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。 


ack 应答机制

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 follower 全部接收成功。所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡选择。

当 producer 向 leader 发送数据时,可以通过 request.required.acks 参数来设置数据可靠性的级别:
●0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。当broker故障时有可能丢失数据。

●1(默认配置):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果在follower同步成功之前leader故障,那么将会丢失数据。

●-1(或者是all):producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是如果在 follower 同步完成后,broker 发送ack 之前,leader 发生故障,那么会造成数据重复。

三种机制性能依次递减,数据可靠性依次递增。

注:在 0.11 版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。在 0.11 及以后版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据, Server 端都只会持久化一条。


Filebeat+Kafka+ELK

环境准备
node1:192.168.67.11        elasticsearch  kibana
node2:192.168.67.12        elasticsearch
apache:192.168.67.10               logstash  apache/nginx/mysql
Filebeat节点:filebeat/192.168.67.13           Filebeat
zk-kfk01:192.168.67.21                zookeeper、kafka
zk-kfk02:192.168.67.22                zookeeper、kafka
zk-kfk03:192.168.67.23                zookeeper、kafka

systemctl stop firewalld
systemctl enable firewalld
setenforce 0

1、部署 Zookeeper+Kafka 集群

2、部署 Filebeat 

cd /usr/local/filebeat

vim filebeat.yml
filebeat.prospectors:
- type: log
  enabled: true
  paths:
    - /var/log/httpd/access_log
  tags: ["access"]
  
- type: log
  enabled: true
  paths:
    - /var/log/httpd/error_log
  tags: ["error"]
  
......
#添加输出到 Kafka 的配置
output.kafka:
  enabled: true
  #指定 Kafka 集群配置
  hosts: ["192.168.67.21:9092","192.168.67.22:9092","192.168.67.23:9092"]
  #指定 Kafka 的 topic
  topic: "httpd"

 注释掉logstash 出口,留下kafka出口;出口只能有一个

 
启动 filebeat
systemctl restart filebeat.service
systemctl status filebeat.service

# ./filebeat -e -c filebeat.yml

报错:服务起不来;查看日志;

原因:是filebeat.yml中将日志同时输出到了kafka和logstash

解决:注释掉logstash即可

3、部署 ELK,在 Logstash 组件所在节点上新建一个 Logstash 配置文件

cd /etc/logstash/conf.d/

vim kafka.conf
input {
    kafka {
        #kafka集群地址
        bootstrap_servers => "192.168.67.21:9092,192.168.67.22:9092,192.168.67.23:9092"
        #拉取的kafka的指定topic
        topics  => "httpd"
        #指定 type 字段
        type => "httpd_kafka"
        #解析json格式的日志数据
        codec => "json"
        #拉取最近数据,earliest为从头开始拉取
        auto_offset_reset => "latest"
        #传递给elasticsearch的数据额外增加kafka的属性数据
        decorate_events => true
    }
}

output {
  if "access" in [tags] {
    elasticsearch {
      hosts => ["192.168.67.11:9200"]
      index => "httpd_access-%{+YYYY.MM.dd}"
    }
  }
  
  if "error" in [tags] {
    elasticsearch {
      hosts => ["192.168.67.11:9200"]
      index => "httpd_error-%{+YYYY.MM.dd}"
    }
  }
  
  stdout { codec => rubydebug }
}

启动 logstash
logstash -f kafka.conf

报错:路径重复

解决:指定一个新的路径

logstash -f kafka.conf --path.data=/opt

报错:配置文件有错

解决:配置文件删了重写

注:生产黑屏操作es时查看所有的索引:

curl -X GET "192.168.67.11:9200/_cat/indices?v"

4、浏览器访问

http://192.168.67.11:5601/

浏览器访问 http://192.168.67.11:5601 登录 Kibana,单击“Create Index Pattern”按钮添加索引“filebeat_test-*”,单击 “create” 按钮创建,单击 “Discover” 按钮可查看图表信息及日志信息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/542510.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

认识异常(2)

❤️❤️前言~🥳🎉🎉🎉 hellohello~,大家好💕💕,这里是E绵绵呀✋✋ ,如果觉得这篇文章还不错的话还请点赞❤️❤️收藏💞 💞 关注💥&a…

gemini1.5 API调用

https://ai.google.dev/pricing?hlzh-cn 查询可用的model https://generativelanguage.googleapis.com/v1beta/models?keyxxx 使用postman调用 https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-pro-latest:generateContent?keyxxx https://ai.google…

JavaSE——常用API进阶二(3/8)-Date、SimpleDateFormat(构造器、常用的方法、用法示例、时间格式的常见符号)

目录 Date 构造器、常用的方法 用法示例 SimpleDateFormat 构造器、格式化时间的方法 时间格式的常见符号 用法示例 解析字符串时间成为日期对象 接下来会学习JDK8以前传统的日期、时间,以及JDK8开始新增的日期、时间;有部分项目还是有在使用JDK…

【C++学习】深入理解C++异常处理机制:异常类型,捕获和处理策略

文章目录 ♫一.异常的提出♫二.异常的概念♫三.异常的使用♫3.1 异常的抛出和捕获♫3.2.异常的重新抛出♫3.3异常安全♫3.4 异常规范 ♫4.自定义异常体系♫5.C标准库的异常体系♫6.异常的优缺点 ♫一.异常的提出 之前: C语言传统的处理错误的方式与带来的弊端&…

【位运算】Leetcode 两整数之和

题目解析 371. 两整数之和 算法讲解 异或的本质就是无进位相加,但是我们需要处理进位,就需要知道哪一位上有进位,再让无进位相加的结果 进位即可,在重复这个过程,当进位等于0的时候,说明相加的过程已经结…

Windows环境下删除MySQL

文章目录 一、关闭MySQL服务1、winR打开运行,输入services.msc回车2、服务里找到MySQL并停止 二、卸载MySQL软件1、打开控制模板--卸载程序--卸载MySQL相关的所有组件 三、删除MySQL在物理硬盘上的所有文件1、删除MySQL的安装目录(默认在C盘下的Program …

CSS盒模型(详讲)

目录 概述: 内容区(content): 内边距(paddingj): 前言: 设置内边距: 边框(border): 前言: 示例: 外边…

飞驰云联入选金融信创生态实验室「金融信创优秀解决方案」

近日,由中国人民银行领导、中国金融电子化集团有限公司牵头组建的金融信创生态实验室发布了第三期金融信创优秀解决方案,Ftrans飞驰云联“文件数据传输解决方案”成功入选! 本次金融信创优秀解决方案遴选经方案征集、方案初审、专家评审等多环…

unity android 打包

现在使用的unity版本hub不支持导入support,只能自己下载对应的支持 找到对应的sdk,ndk

计算机组成原理【CO】Ch2 数据的表示和应用

文章目录 大纲2.1 数制与编码2.2 运算方法和运算电路2.3 浮点数的表示和运算 【※】带标志加法器OFSFZFCF计算机怎么区分有符号数无符号数? 【※】存储排列和数据类型转换数据类型大小数据类型转换 进位计数制进制转换2的次幂 各种码的基本特性无符号整数的表示和运算带符号整…

牛客研究生复试刷题(1)

KY30进制转换 1.最开始没有考虑到大数问题,可以说是没考虑完全,输入类型使用的是int64_t,只ac了一半测试用例。所以在数很大找不到合适的数据类型存储时,要考虑使用string来存放。 2.使用string存放数字的时候就要考虑:字符和数字之间的转换。字符转换成数字:str[i]-0,…

软考125-上午题-【软件工程】-传统软件的测试策略

一、传统软件的测试策略 有效的软件测试实际上分为4步进行,即:单元测试、集成测试、确认测试、系统测试。 1-1、单元测试(模块测试) 单元测试也称为模块测试,在模块编写完成且无编译错误后就可以进行。 单元测试侧重…

ChatGPT-4 Turbo 今天开放啦!附如何查询GPT-4 是否为 Turbo

2024年4月12日,OpenAI在X上宣布GPT-4 Turbo开放了!提高了写作、数学、逻辑推理和编码方面的能力。另外最重要的是,响应速度更快了!! ChatGPT4 Turbo 如何升级?解决国内无法升级GPT4 Turbo的问题&#xff0…

springboot+vue科普知识商城考试论坛交流系统网站

本系统主要是设计出新能源科普网站,基于B/S构架,后台数据库采用了Mysql,可以使数据的查询和存储变得更加有效,可以确保新能源科普网站管理的工作能够正常、高效的进行,从而提高工作的效率。总体的研究内容如下&#xf…

Nikon | NEF格式图片批量转换为jpg格式

如何将nikon相机拍的NEF格式图片转换为jpg格式呢? 这里推荐一个在线转换的网址: https://picflow.com/convert/nef-to-jpg 添加图片后,可以批量选择图片,点击转换后即可进行下载

【Python】使用OPC UA创建数据服务器

目录 准备工作服务器设置创建或获取节点设置节点值启动服务器查看服务器客户端总结 在工业自动化和物联网(IoT)领域,OPC UA(开放平台通信统一架构)已经成为一种广泛采用的数据交换标准。它提供了一种安全、可靠且独立于…

Nature Geoscience | 近十年来北方森林和温带森林是全球主要的碳汇

2023年10月2日,法国巴黎萨克雷大学、波尔多大学与丹麦哥本哈根大学等多个单位的科研小组在国际知名学术期刊《Nature Geoscience》发表了一项题为“Global Increase in Biomass Carbon Stock Dominated by Growth of Northern Young Forests over Past Decade”的文…

论文笔记:NEFTune: Noisy Embeddings Improve Instruction Finetuning

iclr 2024 reviewer 评分 5666 1 论文思路 论文的原理很简单:在finetune过程的词向量中引入一些均匀分布的噪声即可明显地提升模型的表现 2 方法评估

动态规划-简单多状态dp问题2

文章目录 1. 买卖股票的最佳时机含冷冻期(309)2. 买卖股票的最佳时机含手续费(714)3. 买卖股票的最佳时机 III(123)4. 买卖股票的最佳时机 IV(188) 1. 买卖股票的最佳时机含冷冻期&a…

【ELK】ELK企业级日志分析系统

搜集日志;日志处理器;索引平台;提供视图化界面;客户端登录 日志收集者:负责监控微服务的日志,并记录 日志存储者:接收日志,写入 日志harbor:负责去连接多个日志收集者&am…