一文理解Kafka

概述
        Kafka是一个基于Zookeeper的分布式消息中间件,支持消息分区,提供发布和订阅功能。使用Scala编写,主要特点是可水平扩展,高吞吐率以及高并发。

        常见的使用场景:

  • 企业级别活动数据和运营数据的消息传递,活动数据一般包括页面的访问,搜索。运营数据包括服务器上CPU,IO,用户活跃度等数据。
  • 日志收集,收集的日志对接hadoop,Hbase,Elasticsearch等系统。
  • 流式处理,支持spark streaming和storm。

 
 

基本架构以及概念
        Kafka的主要工作原理是多个Producer发送Topic消息体到Kafka集群上,消息首先会存放在不同Broker对应的Leader分区上,Follower分区拉取Leader分区消息并写入日志,Consumer客户端同时也拉取Leader分区消息,完成消息消费。

         上图中,Kafka集群中有3台Broker,Kafka集群在启动的时候会将自身信息注册到Zookeeper集群中,保证信息的一致性。Producer有3个,分别发送Topic为A,B,C的消息体道Kafka集群中。Kafka集群中Topic A的Partition数为2,Replication数为3,Topic B的Partition数为1,Replication数为3,Topic C的Partition数为1,Replication数为2.每个Partition有主从之分,主Partition会接收Producer消息并共Consumer消费,从Partition只会从主Partition接收数据,不会和Producer以及Cosumer有直接联系。多个Consumer可以组成一个Group,同一group下不同的Consumer只能消费同一Topic下不同Partition的消息。例如Consumer Group A下的Consumer0和Consumer1只能分别消费Topic A中Partition0和Partition1的消息。

        以下是Kafka部分概念解析

  •  Producer:消息生产者。
  •  Consumer:消息消费者。
  •  Consumer Group: 消费者群组,包含多个消费者,同组消费者消费同一个Topic下不同的分区的消息。
  •  Broker: Kafka实例,可以理解为不同的kafka服务器,每个都有一个唯一的编号。
  •  Message: 生产者传递给消费者的消息体。
  •  Topic: 消息主题,Broker上有不同的Topic, Message发送到不同的Topic供消费者消费。
  •  Partition: 相当于将消息进行了分发,一个Topic可以分为多个分区,消费者群组里面的消费者可以同时消费不同分区里面的消息,提高了吞吐量。
  •  Replication: 分区副本,默认最大为10个,不能大于Broker的数量,当分区的Leader挂掉之后,Follower继续工作,提供可靠性保证。
  • Offset:消息持久化中消息的位置偏移信息。
  •  zookeeper: 保存Kafka集群的信息的Metadata,同样提供了可靠性保证

具体工作流程

  1. Producer发送数据到Broker

        同一Topic下的消息在集群中有多个分区,Producer发送数据的时候总会发送给Leader分区,Leader分区再将数据同步给其他Follower分区,等待所有的Follower同步完成之后向Leader分区返回ack消息,Leader分区接收到所有的Follower分区ack之后向Producer发送ack,确认消息接收完成。

        Leader分区的选择是首先所有Broker选取出一个Controller,由Controller指定分区的leader。

        其中ACK应答机制是有参数可以设置的,值为0,1,all;来确定kafka是否有接收到数据,这3个参数的含义如下:

  • 0:Producer发送完数据后直接返回,不会等待集群的ack消息
  • 1:Producer只要leader分区应答ack即可,不用其他follower应答ack
  • all: Producer要等待集群中所有分区都回复ack才会继续发送下一条数据,否则发送失败

        ACK应答机制能够确保消息的可靠性。但是可靠性和消息交互速率是一对矛盾体。消息越可靠,相对传输速率就会降低。

        同样,Producer发送消息到broker,到底发送到了那一个分区,通常遵循以下规则:

  • Producer在发送时指定
  • 如果没有指定但是设置了数据key, 就会对数据key进行hash,根据hash之后的值选定分区
  • 如果上述两者都没有设定,则轮询选择分区

     2. Broker保存数据

        Kafka的数据是保存在磁盘的,之所以采用文件追加的方式进行存放,实际是采用了顺序IO的方式,避免随机IO造成大量的耗时。一个Topic有多个Partition,每个partition相当于一个有序的队列。每个parition以文件夹的形式存储在Broker上。

       a) Partition存储结构

         Partiion采用分段(segment)存储的方式,每段有3个文件:.log, .index, .timeindex。

.log数据存储文件,存放位置position和消息对应关系
.indexoffset索引文件,存放offset和position对应关系,offset代表消息顺序,position代表消息在磁盘中的位置
.timeindex时间索引文件, 存放时间戳和offset的对应关系

        以下是Partition存放文件夹对应示意图。

       b) Message存储结构

        Message在.log文件中存放,具体字段和含义如下

字节描述
8Position
4消息体大小
4CRC32校验值
1kafka版本号
1attributes
4key的长度
mkey的内容
4payload长度
npayload内容

        c) 两个概念LEO和HW

         LEO(Log End Offset):  表示每个Partiotion log中最后一条message的offset位置。
         HW(High Water Mark): 是统一Partiotion中各个Replicas数据同步一直的offset位置,该位置前的数据consumer可见,该位置之后不可见。

        d)通过索引定位消息

       以下是一个例子: 找出offset为7的消息内容

        1)首先通过offset值7确定文件在哪个segment中,显然在00000000000000000.index,这一步是offset值和index文件名进行比对。

        2)index文件索引采用的是稀疏索引进行存储,有可能恰好没有对应的offset值,所以这里是利用二分查找找到小于等于offset值的那条记录,这里找到offset=6,取出Message在log文件中的位置为9807。

        3) 在log文件中从position为9807的位置顺序检索,首先找到的是offset为6的数据,然后加上消息体大小,定位出offset为7的数据位置,然后读取该message数据。

    d) 数据清理策略

        清理策略:时间和大小阈值(时间默认超过7天或者大小超过1G,清除日志)

 #清理超过指定时间的消息,默认是168小时,7天,
 #还有log.retention.ms, log.retention.minutes, log.retention.hours,优先级高到低
 log.retention.hours=168​
 #超过指定大小后,删除旧的消息,下面是1G的字节数,-1就是没限制
 log.retention.bytes=1073741824


    3. Consumer消费数据:
        消费者通常会有一个消费者群组,同一消费组中的消费者可以消费一个Topic不同分区的数据。不会有两个同组消费者消费同一topic下同一分区的消息。
   

        消费者记录消费消息的信息在早期版本会记录在zookeeper中,后边的版本统一记录在_consumer_offsets topic下。

集群搭建
        本文采用docker-compose部署kafka集群以及UI页面,docker版本:18.06.3-ce  docker-compose版本:1.24.1。下图中的10.232.112.13为宿主机的IP,注意需要替换

version: "3"

services:
  zookeeper:
    image: 'bitnami/zookeeper:3.6'
    container_name: zookeeper
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    volumes:
      - ./zookeeper:/bitnami/zookeeper
    restart: always

  kafka1:
    image: 'bitnami/kafka:3.0'
    container_name: kafka1
    ports:
      - '9092:9092'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.232.112.13:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    # restart: always
    depends_on:
      - zookeeper
  kafka2:
    image: 'bitnami/kafka:3.0'
    container_name: kafka2
    ports:
      - '9093:9093'
    environment:
      - KAFKA_BROKER_ID=2
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.232.112.13:9093
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    # restart: always
    depends_on:
      - zookeeper

  kafka3:
    image: 'bitnami/kafka:3.0'
    container_name: kafka3
    ports:
      - '9094:9094'
    environment:
      - KAFKA_BROKER_ID=3
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.232.112.13:9094
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    # restart: always
    depends_on:
      - zookeeper
  kafka-ui:
    image: 'provectuslabs/kafka-ui'
    container_name: kafka-ui
    ports:
      - "18080:8080"
    environment:
      - KAFKA_CLUSTERS_0_NAME=CLUUSTER001
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=10.232.112.13:9092
    # restart: always
    depends_on:
      - zookeeper
      - kafka1
      - kafka2
      - kafka3

Demo代码

        Producer代码

public class KProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException{
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","10.232.112.13:9192");
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer",StringSerializer.class.getName());

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<String, String>("Test",3,"testKey","hello");
        Future<RecordMetadata> send = kafkaProducer.send(stringStringProducerRecord);
        RecordMetadata recordMetadata = send.get();
        System.out.println(recordMetadata);

    }
}

        Consumer代码

public class KConsumer {
    public static void main(String[] args) throws InterruptedException {

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","10.232.112.13:9192");
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer",StringDeserializer.class.getName());
        properties.setProperty("group.id","1111");
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Collections.singletonList("Test"));

        while (true){
            ConsumerRecords<String, String> poll = consumer.poll(500);
            for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
                System.out.println("**********" + stringStringConsumerRecord.key() + stringStringConsumerRecord.value());
            }

        }   

    }
}

Kafka的优缺点

        优点:        

        1、高吞吐量:Kafka支持高吞吐量的传输,可以支持数千个客户端和每秒数百万条消息。

        2、可扩展性:Kafka支持水平扩展,可以添加更多的节点来支持多客户端和更多的消息。

        3、可靠性:Kafka支持消息的可靠传输,可以确保消息不会丢失。

        4、低延迟:Kafka支持低延迟的消息传输,可以确保消息能够及时到达消费者。

        缺点:

        1、管理复杂性:Kafka的管理比较复杂,需要对Kafka集群进行维护和监控。

         2、消息顺序:Kafka不能保证消息的顺序,因为消息可能会被分发到不同的分区中。

到这里,你了解Kafka了吗

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/21432.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux高级(shell)

文章目录 一、shell概述Linux 提供的 Shell 解析器有bash 和 sh 的关系Centos 默认的解析器是 bash 二、shell脚本入门脚本格式第一个shell脚本&#xff1a;helloworld.sh 三、变量系统预定义变量自定义变量特殊变量 四、删除变量五、运算符六、条件判断七、流程控制if判断case…

教你精通Java语法之第十四章、枚举

目录 一、背景及定义 二、使用 2.1switch语句 2.2常用方法 三、枚举优点缺点 四、枚举和反射 4.1枚举是否可以通过反射&#xff0c;拿到实例对象呢&#xff1f; 五、总结 六、面试问题 一、背景及定义 枚举是在JDK1.5以后引入的。主要用途是&#xff1a;将一组常量组织…

Vue--》深入理解 Vue 3 导航守卫,掌握前端路由的灵魂技能!

目录 vue3导航守卫讲解与使用 element-ui的安装与使用 配置路由和设置路径别名 设置登录页面并实现规则跳转 设置导航前置守卫 设置导航后置守卫 其他路由相关操作 vue3导航守卫讲解与使用 导航守卫是在 Vue Router 中提供的一种功能&#xff0c;它允许你在切换路由之前…

NineData:高效高质量的Redis可视化管理工具

Redis 是一个内存数据结构存储系统&#xff0c;它被广泛用于缓存、队列、实时分析等多种应用场景中&#xff0c;目前已经成为 Key-value 数据存储系统中的佼佼者&#xff0c;根据 DB-Engine 网站提供的最新数据&#xff0c;Redis 在 Key-value stores 类别中排名第一&#xff0…

SpringCloud-网关 Gateway

网关Gateway 一、网关初识二、网关的使用1.创建项目并引入依赖2.编写网关配置3.启动服务并测试 三.查看网关路由规则列表四.路由服务的负载均衡五.断言和过滤1.断言Predicate1.1.The Path Route Predicate Factory(路径断言工厂&#xff09;1.2.The After Route Predicate Fact…

大模型训练数据多样性的重要性

大家好,我是herosunly。985院校硕士毕业,现担任算法研究员一职,热衷于机器学习算法研究与应用。曾获得阿里云天池比赛第一名,CCF比赛第二名,科大讯飞比赛第三名。拥有多项发明专利。对机器学习和深度学习拥有自己独到的见解。曾经辅导过若干个非计算机专业的学生进入到算法…

华为云——代码托管的使用

一、打开前后端项目 登录华为云&#xff0c;点击页面右上角的用户名——点击个人设置 2.点击代码托管的HTTPS密码管理&#xff0c;设置自己的密码 3.回到代码仓库&#xff0c;复制HTTP地址 4.打开GitHubDesktop&#xff0c;点击左上角进行仓库克隆 &#xff08;我这里已经cl…

声音合成——Foley Sound——DECASE项目——多模态智能感知与应用——论文翻译

文章目录 概述论文翻译CONDITIONAL SOUND GENERATION USING NEURAL DISCRETE TIME-FREQUENCY REPRESENTATION LEARNINGAbstractSampleRNN是啥&#xff1f; Introduction个人总结&#xff08;省流&#xff09;补充个人感想 Approach2.1 Discrete time-frequency省流总结2.1.1 Mu…

分布式系统原理

高可用是指系统无中断的执行功能的能力&#xff0c;代表了系统的可用程度&#xff0c;是进行系统设计时必须要遵守的准则之一。 而高可用的实现方案&#xff0c;无外乎就是冗余&#xff0c;就存储的高可用而言&#xff0c;问题不在于如何进行数据备份&#xff0c;而在于如何规避…

【Lychee图床】本地电脑搭建私人图床,公网远程访问

文章目录 1.前言2. Lychee网站搭建2.1. Lychee下载和安装2.2 Lychee网页测试2.3 cpolar的安装和注册 3.本地网页发布3.1 Cpolar云端设置3.2 Cpolar本地设置 4.公网访问测试5.结语 转发自CSDN风浪越大%鱼越贵的文章&#xff1a;Lychee图床 - 本地配置属于自己的相册管理系统并远…

国潮还能怎么玩?小红书用户画像速看!

所谓“国潮”&#xff0c;概括来说就是“国风潮流”。主要有两层含义&#xff1a;其一&#xff0c;有中国文化和传统的基因&#xff1b;其二&#xff0c;能将传统文化与时下潮流相融合&#xff0c;使产品更具时尚感。在“国潮”元年之前&#xff0c;“国潮”大多指狭义上的特定…

【开发者指南】如何在MyEclipse中编辑HTML或JSP文件?(一)

MyEclipse v2022.1.0正式版下载 如果您有HTML或JSP文件要编辑&#xff0c;这里将介绍如何编辑。查找以下信息&#xff1a; 编辑源代码大纲和属性视图参数页面 该功能在MyEclipse中是可用的。 一、HTML / JSP编辑器 要编辑HTML或JSP文件&#xff0c;请执行以下操作当中的一…

Git笔记

目录 Git概念 git配置 git的安装 远程仓库配置 忽略跟踪文件 git指令 文件跟踪指令&#xff1a; 查看提交历史 撤消操作 远程仓库的使用 标签 分支 常见错误提示及解决方法 git patch的运用 git中branch/commit/add之间关系 Windows下Git的使用 Git概念 Git 是…

接口优化技巧汇总

1.批处理 批量思想&#xff1a;批量操作数据库&#xff0c;这个很好理解&#xff0c;我们在循环插入场景的接口中&#xff0c;可以在批处理执行完成后一次性插入或更新数据库&#xff0c;避免多次IO。 //批量入库 batchInsert();2.异步处理 异步思想&#xff1a;针对耗时比较…

Nacos-04-@RefreshScope自动刷新原理

Nacos动态刷新原理 Nacos做配置中心的时候&#xff0c;配置数据的交互模式是有服务端push推送的&#xff0c;还是客户端pull拉取的&#xff1f; 短轮询 不管服务端的配置是否发生变化&#xff0c;不停发起请求去获取配置&#xff0c;比如支付订单场景中前端JS不断轮询订单支…

mathtype公式符号显示不对

文章目录 问题解决方法结果 记录攥写论文遇到的问题及解决方法 问题 使用mathtype编辑公式过后&#xff0c;发现公式显示不对&#xff0c;出现两种问题&#xff1a; 1&#xff1a;部分符号变为方框 2&#xff1a;符号大小异常 例如&#xff1a; 解决方法 第一种&#xff1a…

【Linux 之五】 Linux中使用fdisk命令实现磁盘分区

最近由于工作的需要&#xff0c;初步研究了uboot中的fastboot实现方式。研究fastboot不可避免的需要了解磁盘分区的相关知识点&#xff0c;在linux下可以使用fdisk命令实现磁盘的分区。好了&#xff0c;下面步入正题。 1. 查看帮助信息&#xff08;fdisk --help&#xff09; …

我们详细讲讲UI自动化测试最佳设计模式POM

概念 什么是POM&#xff1f; POM是PageObjectModule&#xff08;页面对象模式&#xff09;的缩写&#xff0c;其目的是为了Web UI测试创建对象库。 在这种模式下&#xff0c;应用涉及的每一个页面应该定义为一个单独的类&#xff0c;类中应该包含此页面上的页面元素对象和处…

skywalking安全认证问题

skywalking安全认证 一、问题二、步骤2.1 skywalking-aop配置文件修改2.2 agent配置文件修改 一、问题 在springboot项目使用java-agent接入skywalking时&#xff0c;为保证两者之间的数据安全传输&#xff0c;准备加个安全认证 参考文章&#xff1a; https://www.helloworld…

亚马逊云科技使用Inf2实例运行GPT-J-6B模型

在2019年的亚马逊云科技re:Invent上&#xff0c;亚马逊云科技发布了Inferentia芯片和Inf1实例这两个基础设施。Inferentia是一种高性能机器学习推理芯片&#xff0c;由亚马逊云科技定制设计&#xff0c;其目的是提供具有成本效益的大规模低延迟预测。时隔四年&#xff0c;2023年…