学习笔记 | Kafka

一、概述

定义

1、Kafka传统定义:Kafka 是一个分布式的基于 发布/订阅模式 的消息队列(Message Queue) ,主要应用与大数据实时处理领域。

2、发布/订阅:消息的发送者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接受自己感兴趣的消息。

3、Kafka 最新定义:Kafka是一个开源的 分布式事件流平台 (Event Streaming Platfrom),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

消息队列的应用场景

传统的消息队列主要应用场景包括: 缓存/削峰、解耦和异步通信。

缓存/削峰

所有数据可以全部缓存到消息队列,服务器可以根据自己处理的性能按一定的频率去消息队列中取。

解耦

减少服务之间的直接调用,由消息队列充当中间者。

异步通信

一个业务可以将优化体验(发短信)的动作放到消息队列中,由专门的服务去处理,达到快速响应上游。

消息队列的俩种模式

1)点对点模式

消费者主动拉取数据,消息收到后清除数据。

2)发布/订阅模式

  • 一个队列可以有多个topic主题。(topic对消息进行分类,消费者可以自己需求拿消息)
  • 消费者消费数据之后,不删除数据。
  • 每个消费者相互独立,都可以拿到消费数据。

Kafka的基础架构

1、为方便扩展,并提高吞吐量,一个 Topic 分为多个 partition(分区)

2、配合分区的设计,提出了消费者组的概念,组内每个消费者并行消费,一个分区只能让一个消费者消费。

3、为了提高可用性,为每个 partition 增加诺干副本进行备份(分为leader 和 follower)消费者只找learder,当leader挂掉的时候,follower符合条件时会变成leader。

4、zookerper存储节点信息,有哪些副本。

二、入门

Kafka的基本命令

Topic命令

  • 查看有多少主题
 kafka-topics.sh --bootstrap-server 192.168.204.10:9092,192.168.204.10:9093 --list
  • 新增主题
 kafka-topics.sh --bootstrap-server 192.168.204.10:9092,192.168.204.10:9093 --topic first --create --partitions 1 --replication-factor 3
  • 查看主题详情
kafka-topics.sh --bootstrap-server 192.168.204.10:9092,192.168.204.10:9093 --topic first --describe 
  • 修改主题

只能加不能减

kafka-topics.sh --bootstrap-server 192.168.204.10:9092,192.168.204.10:9093 --topic first --alter --partitions 3 

命令行操作

  • 创建一个生产者
 kafka-console-producer.sh --bootstrap-server 192.168.204.10:9092 --topic second

  • 创建一个消费者
kafka-console-consumer.sh --bootstrap-server 192.168.204.10:9092 --topic second

可以查看到历史数据

kafka-console-consumer.sh --bootstrap-server 192.168.204.10:9092 --topic second --from-beginning

三、生产者

原理

在消息发送的过程中,涉及到了俩个线程 -- main 和 Sender。在main线程中创建了 一个双端队列 RecordAccumulator 。main线程将消息发送给RecordAccumulator ,Sender 线程不断从RecordAccumulator 中拉取消息发送给Kafka Broker。

异步发送

当main线程发送到RecordAccumulator之后就结束了,不管接下去的操作。

示例代码:

//配置参数
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.10:9092,192.168.204.10:9093");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

//创建KafkaProducer
KafkaProducer<String, Object> kafkaProducer = new KafkaProducer<>(properties);
kafkaProducer.send(new ProducerRecord<>("second","hello"));

//释放资源
kafkaProducer.close();

回调异步发送

相对于异步发送,就是多了一个发送成功之后处理的函数。

示例代码:

//配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.10:9092,192.168.204.10:9093");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

//创客KafkaProducer
KafkaProducer<String, Object> kafkaProducer = new KafkaProducer<>(properties);
kafkaProducer.send(new ProducerRecord<>("second", "hello"), (recordMetadata, e) -> {
    System.out.println(recordMetadata.toString());
    System.out.println("send success");
});

//释放资源
kafkaProducer.close();

同步发送

同步发送就是main线程需要等sender线程将双端队列中的数据发送出去才能继续往下面操作。

示例代码:

//配置参数
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.10:9092,192.168.204.10:9093");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

//创建KafkaProducer
KafkaProducer<String, Object> kafkaProducer = new KafkaProducer<>(properties);
try {
    kafkaProducer.send(new ProducerRecord<>("second","hello")).get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

//释放资源
kafkaProducer.close();

分区

Kafka分区好处

1、便于合理使用存储资源,每个Partition 在一个Broker上存储,可以把海量数据按照分区切割成一块一块存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。

2、提高并行度,生产者可以以分区为单位发送数;消费者可以以分区为单位进行消费数据。

分区策略

自定义分区器

1、定义自己的分区器

package cn.swj.kafka;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * @Author suweijie
 * @Date 2023/8/30 21:40
 * @Description: TODO
 * @Version 1.0
 */
public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        String msg = o1.toString();

        if(msg.contains("suweijie")) {
            return 1;
        }

        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

2、添加配置

//配置参数
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.10:9092,192.168.204.10:9093");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

//自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName())

//创建KafkaProducer
KafkaProducer<String, Object> kafkaProducer = new KafkaProducer<>(properties);
try {
    kafkaProducer.send(new ProducerRecord<>("second","hello")).get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

//释放资源
kafkaProducer.close();

提高生产者的吞吐量

batch.size: 批次的大小默认是16k(16384b) ,但是这个参数要跟linger.ms 配合才有用

linger.ms: 等待时间,修改为 5-100ms ,修改这个会造成数据的延迟。

RecordAccumulator: 双端队列的缓存区大小,修改为64m (33554432b)

compression.type : 压缩snappy, none(默认)、gzip、snappy(用的比较多)、lz4、zstd

最佳实践:

batch.size = 32768
linger.ms = 5
buffer.memory = 33554432
compression.type = snappy

数据可靠性

应答ACKS

  • 0: 生产者发过来的数据,不需要等待数据落盘应答。
  • 1: 生产者发过来的数据,需要等待Leader收到之后应答。
  • -1(all): 生产者发过来的数据,需要等Leader+ 和 isr 队列里面所有的节点收齐数据后应答。-1 和 all等价。
spring:
  kafka:
    bootstrap-servers: 192.168.204.10:9092,192.168.204.10:9093,192.168.204.10:9094
    consumer:
      group-id: 1
      value-deserializer: org.apache.kafka.common.serialization.StringSerializer
      key-deserializer: org.apache.kafka.common.serialization.StringSerializer

    producer:
      acks: -1  #ack机制  0 1 -1
      batch-size: 32768  #批次大小
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      compression-type: snappy  #数据压缩
      retries: 5  #重试次数
      buffer-memory: 33554432  #双端队列的缓冲区大小
      linger-ms: 5  # sender 等待时间

数据重复

幂等性特性

配置:

enable:
	idempotence: true  #开启幂等性  默认开启

但是Kafka挂掉之后会重新生成一个PID,所以也是有可能会产生重复数据。

生产者事务

开启事务、必须得开启幂等性

示例代码:

private void transaction() {
        //配置参数
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.10:9092,192.168.204.10:9093");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.LINGER_MS_CONFIG,5); //sender 发送的等待时间 ,当达到这个时间的时候Sender 会直接发
        properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);   //开启幂等性,默认开启
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432); //设置双端队列的大小  64m
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,32768);  //批次的大小  32k ,当批次达到这个大小的时候,Sender会直接发送
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");  //数据的压缩方式
        properties.put(ProducerConfig.RETRIES_CONFIG,5);   //发送失败的重试次数
        properties.put(ProducerConfig.ACKS_CONFIG,-1); // acks的方式 -1 当leader 收到并且和isr 队列里面所有的节点同步才应答。
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"123");  //事务唯一id
        //自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());

        //创建KafkaProducer
        KafkaProducer<String, Object> kafkaProducer = new KafkaProducer<>(properties);

        kafkaProducer.initTransactions();  //初始化事务

        kafkaProducer.beginTransaction();  //开启事务

        try {
            kafkaProducer.send(new ProducerRecord<>("second","hello"));
            kafkaProducer.commitTransaction();  //事务
        } catch (Exception e) {
            e.printStackTrace();
            kafkaProducer.abortTransaction();
        }

        //释放资源
        kafkaProducer.close();
    }

数据有序

同分区内消费者可以实现数据的有序消费,不同分区内消费者如何实现有序消费?TODO

数据乱序问题

产生的原因:

1、默认 broker 最多缓存5个请求

2、当sender一直在发送数据的时候,当有一条数据发送失败需要返回双端队列进行重发,就会产生数据乱序的问题。

解决方案:

1) kafka 在 1.x 版本之前确保单分区下数据有序需要增加以下配置:

max.in.flight.requests.per.connection = 1

1) kafka在 1.x 以及之后的版本确保单分区下的额数据有序,条件如下:

(1) 未开启幂等性

max.in.flight.requests.per.connection 设置为1

(2)开启幂等性

max.in.flight.requests.per.connection 设置小于5

原理:在kafka1.x 版本以后,启用幂等性后,kafka broker 会缓存producer 发来的最近5个request 的元数据,如果数据乱序会将乱序的数据保存在内存中,重新排序之后在落盘。

四、Broker

ZK存储

启动zkCli.sh:

docker exec -it zookeeper-server bash
#进入之后启动zkCli.sh
bin/zkCli.sh
ls /brokers/ids
get /brokers/topics/second/partitions/0/state 
get /controller

/brokes/ids : 记录有哪些节点

/brokers/topics/主题/patitions/0/state : 记录着leader、isr队列

/controller : 辅助选举leader

Broker工作原理

AR: kafka 分区中所有的副本统称

工作流程:

1) broker 启动会在zk中注册

2) controller 谁注册,谁说了算

3) 由选举出来的controller 监听 brokers 节点变化

4) Controller 决定 Leader 的选举

选举规则:

在isr队列中存活为前提,安装ARa中排在最前面的优先。例如 ar[1,0,2]、isr[1,0,2],那么leader 就会按照1,0,2的顺序轮询。

5) 主broker的Controller,会将所有节点的信息上传到zk

6) 其他节点的controller 会去从zk同步相关信息下来。

7) 假设broker挂了

8) 监听到broker节点变化

9) 获取isr

10) 选举新的leader

11) 更新leader 以及 isr

新节点的服役以及退役(没听懂)

新节点服役

docker run  -d --name kafka3 \
--network kafka-net \
-p 9095:9095 \
-e  KAFKA_BROKER_ID=3 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper-server:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.204.10:9095 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9095 \
-e TZ="Asia/Shanghai" \
wurstmeister/kafka:latest

查看在新节点是否有主题信息(指定这台broker的地址,查看是否有主题信息)

kafka-topics.sh --bootstrap-server 192.168.204.10:9094 --topic first --describe 

服役新节点、正确退役旧节点

五、Kafka 副本

基本信息

1)Kafka 副本作用: 提高数据的可靠性。

2)Kafka默认的副本数为1,生产环境正常配置俩个,保证数据的可靠性;太多副本会增加磁盘的存储空间,增加网络上数据传输,降低效率。

3)Kafka 中副本分为: Leader 和 Follower。Kafka生产者只会把数据发送到Leader,然后Follower 自己去找Leader 同步。

4)Kafka 分区中的所有副本统称为AR(Assigned Replicas)。

AR = ISR + OSR

ISR,表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出ISR。该时间闽值由 replica.lagtime.max.ms参数设定,默认 30s。Leader 发生故障之后,就会从ISR 中选举新的 Leader。

OSR,表示 Follower 与 Leader 副本同步时,延迟过多的副本。

Leader的选举流程

Follower的故障

Leader的故障

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/300259.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

localhost和127.0.0.1的区别是什么

今天在网上逛的时候看到一个问题&#xff0c;没想到大家讨论的很热烈&#xff0c;就是标题中这个&#xff1a; localhost和127.0.0.1的区别是什么&#xff1f; 前端同学本地调试的时候&#xff0c;应该没少和localhost打交道吧&#xff0c;只需要执行 npm run 就能在浏览器中打…

光速爱购--靠谱的SpringBoot项目

简介 这是一个靠谱的SpringBoot项目实战&#xff0c;名字叫光速爱购。从零开发项目&#xff0c;视频加文档&#xff0c;十天就能学会开发JavaWeb项目。 教程路线是&#xff1a;搭建环境> 安装软件> 创建项目> 添加依赖和配置> 通过表生成代码> 编写Java代码&g…

LeetCode-重复的子字符串(459)

题目描述&#xff1a; 给定一个非空的字符串 s &#xff0c;检查是否可以通过由它的一个子串重复多次构成。 思路一&#xff1a; 使用枚举的方法。首先因为字符串s有一个子串重复多次构成&#xff0c;那么s的长度len与子串的长度subLen应该成倍数关系&#xff0c;并且在s中索…

C++/OpenGL应用程序

图像应用程序大部分是 C 编写&#xff0c;OpenGL 调用实现与 3D 渲染相关任务将会使用一些扩展库: GLEW、GLM、GLFW、SOLL2 等。 GLFW 库包含 GLFWwindow 类&#xff0c;我们可以在其上进行 3D 场景绘制。OpenGL 也向我们提供了用于 GLSL 程序载入可编程着色阶段并对其进行编译…

算法第十三天-组合总和Ⅱ

组合总和Ⅱ 题目要求 解题思路 按顺序搜索&#xff0c;设置合理的变量&#xff0c;在搜索的过程中判断是否会出现重复集结果。重点理解对输入数组排序的作用和参考代码中 大剪枝和小剪枝 的意思 这道题域上一问的区别在于&#xff1a; 第39题&#xff1a;candidates中的数字…

Linux系统IO—探索输入输出操作的奥秘

&#x1f3ac;慕斯主页&#xff1a;修仙—别有洞天 ♈️今日夜电波&#xff1a;HEART BEAT—YOASOBI 2:20━━━━━━️&#x1f49f;──────── 5:35 &#x1f504; ◀️ ⏸ ▶️ ☰ …

PMP过了就是中级职称?

&#x1f33b;PMP项目管理专业人士认证在全球范围内受到广泛认可&#xff0c;许多人就误以为获得PMP证书就等同于获得中级职称。但是&#xff0c;事实真的如此吗❓ 1️⃣PMP不属于职称认证 ✅PMP证书&#xff1a; 是由美国项目管理协会(PMI)颁发的专业认证&#xff0c;旨在证明…

2022年多元统计分析期末试题

2023年多元统计分析期末试题 1.试论述系统聚类、动态聚类和有序聚类的异同之处。 2、设 X {X} X~ N 3 {N_3} N3​(μ&#xff0c;Σ)&#xff0c;其中 X {X} X ~ ( X 1 {X_1} X1​, X 2 {X_2} X2​, X 3 {X_3} X3​)&#xff0c;μ (1,-2,3)‘&#xff0c;Σ [ 1 1 1 1 3 2…

leetcode——杨辉三角

https://leetcode.cn/problems/pascals-triangle/ 杨辉三角&#xff1a; 给定一个非负整数 numRows&#xff0c;生成「杨辉三角」的前 numRows 行。 在「杨辉三角」中&#xff0c;每个数是它左上方和右上方的数的和。 核心思想&#xff1a;找出杨辉三角的规律&#xff0c;发…

mybatis调用Oracle存储过程 带游标

目录 存储过程 调用测试 游标 Mapper.xml Mapper 调用测试 结果 存储过程 CREATE OR REPLACE PROCEDURE proc_test2(p_id IN NUMBER,v_cur OUT SYS_REFCURSOR,p_result_code OUT NUMBER,p_result_message OUT VARCHAR2) AS BEGINp_result_m…

阿里云服务器固定带宽实际下载速度表,不只是3M固定带宽

阿里云服务器公网带宽上传和下载速度对照表&#xff0c;1M带宽下载速度是128KB/秒&#xff0c;为什么不是1M/秒&#xff1f;阿里云服务器网aliyunfuwuqi.com分享阿里云服务器带宽1M、2M、3M、5M、6M、10M、20M、30M、50M、100M及200M等公网带宽下载速度对照表&#xff0c;附带宽…

安科瑞电力物联网系统在电力设备在线监测中的应用——安科瑞 顾烊宇

摘要&#xff1a;近年来&#xff0c;社会经济发展速度不断提升&#xff0c;对电力能源的需求大幅增加&#xff0c;为保障变电站等电力设备合理发挥功能&#xff0c;保障供电安全性和稳定性&#xff0c;应当加强对电力设备的监测和管理。而电力物联网技术是现代一种安全工器具的…

一文搞定JVM内存模型

鲁大猿&#xff0c;寻精品资料&#xff0c;帮你构建Java全栈知识体系 www.jiagoujishu.cn 运行时数据区 内存是非常重要的系统资源&#xff0c;是硬盘和 CPU 的中间仓库及桥梁&#xff0c;承载着操作系统和应用程序的实时运行。JVM 内存布局规定了 Java 在运行过程中内存申请、…

application.properties 如何改成 application.yml

Convert YAML and Properties File 右键直接转换即可 Further Reading &#xff1a; idea 常用插件

月报总结|Moonbeam 12月份大事一览

一转眼已经到年底啦。本月&#xff0c;Moonbeam基金会发布四个最新战略重点&#xff1a;跨链解决方案、游戏、真实世界资产&#xff08;RWA&#xff09;、新兴市场。其中在新兴市场方面&#xff0c;紧锣密鼓地推出与巴西公司Grupo RO的战略合作。 用户教育方面&#xff0c;为了…

详解Java中的原子操作

第1章&#xff1a;什么是原子操作 大家好&#xff0c;我是小黑&#xff0c;面试中一个经常被提起的话题就是“原子操作”。那么&#xff0c;到底什么是原子操作呢&#xff1f;在编程里&#xff0c;当咱们谈论“原子操作”时&#xff0c;其实是指那些在执行过程中不会被线程调度…

Python | 基于Mediapipe框架的手势识别系统

一、项目要求 1、题目 本题着力于解决会商演示系统中的非接触式人机交互问题&#xff0c;具体而言&#xff0c;其核心问题就是通过计算机视觉技术实现对基于视频流的手势动作进行实时检测和识别。通过摄像头采集并识别控制者连续的手势动作&#xff0c;完成包括点击、平移、缩放…

关于无人机上层控制的PID算法的思考

一、前言 背景介绍&#xff1a;PID虽然出现了很多年&#xff0c;但是目前工业界还是把PID作为主流的控制算法&#xff08;尽管学术界有很多非常时尚的控制算法&#xff0c;包括鲁邦控制&#xff0c;神经网络控制等等&#xff09;&#xff0c;PID的算法在于其不需要对系统进行复…

编程语言的生命力

一、目前主流的编程语言 目前流行的编程语言有很多种&#xff0c;可谓是百花齐放、百家争鸣。根据不同的应用场景和领域&#xff0c;有不同的编程语言被广泛使用。一些目前主流的编程语言HTML5、Python、JavaScript 、Java 、C 、PHP 、Swift 等等。 还有许多其他的编程语言&am…

leetcode算法题之递归--综合练习(二)

本章目录 1.N皇后2.有效的数独3.解数独4.单词搜索5.黄金矿工6.不同路径III 1.N皇后 N皇后 class Solution {vector<vector<string>> ret;vector<string> path;int n;bool checkCol[10],checkDig1[20],checkDig2[20]; public:vector<vector<string&g…