Kafka入门及生产者详解

1. Kafka定义

传统定义:分布式的、基于发布/订阅模式消息队列,主要用于大数据实时处理领域。发布/订阅模式中,发布者不会直接将消息发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接受感兴趣的消息。

官网最新定义:开源的分布式事件流平台(Event Streaming Platform),用于高性能数据管道、流分析、数据集成。

2. 消息队列的应用场景及模式

传统的消息队列的主要应用场景包括:缓冲/消峰、解耦、异步通信

缓冲/消峰:有助于控制和优化数据流的速度,解决生产消息和消费消息的速度不一致的问题。

解耦:允许独立修改和扩展两边的处理过程,只需确保他们遵守相同的接口约束。

此时消息队列类似于一个超时,数据源是商品生产厂商,目的地是消费者,消费者无需跟各大厂商来往,而是去超市购物。

异步通信:允许用户把一个消息放入队列,不立即处理,再在需要的时候去处理。(比如发送验证码)

消息队列的两种模式:

1)点对点:消费者主动拉取数据,消息收到后清除消息

2)发布/订阅模式:有多个Topic主题;消费者消费完数据后,不删除数据,数据仍可以被其他消费者消费;每个消费者相互独立

3. Kafka基础架构

1)一个topic可以有多个分区,broker为服务器,即一份数据分为多个分区放在多个服务器

2)数据分为多块,消费者也有多个,组成一个消费者组,组内每个成员并行消费不同的分区

3)分区也有副本,不过分HDFS的副本有区别,HDFS的副本是相等的,而Kafka里的副本只有Leader的才能起作用,Follower的副本不能消费(除非Leader挂了,Follower成为Leader)

4)ZK里保存了Kafka的服务器id信息,以及每个topic的各个分区的Leader是哪个服务器,以及isr队列

4. Kafka命令行操作快速入门

针对Kafka基础架构的三大部分,分别有不同的脚本命令来操作。

生产者:kafka-console-producer.sh;

集群:kafka-topics.sh;

消费者:kafka-console-consumer.sh

1)kafka-topics.sh的命令参数如下

创建topic,1个分区,3个副本,并查看:

bin/kafka-topics.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first --create --partitions 1 --replication-factor 3

bin/kafka-topics.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --list

bin/kafka-topics.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first --describe

结果如下:

 修改分区数(分区数只能改大,不能改小)为3

bin/kafka-topics.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first --partitions 3

bin/kafka-topics.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first --describe

结果如下: 

另外副本数也不能通过命令行修改

2) kafka-console-producer.sh

向指定分区发送数据

bin/kafka-console-producer.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first

3) kafka-console-consumer.sh

消费者消费指定分区的数据

bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first

再在生产端发送数据,消费者端可以收到数据,但不能收到历史数据(即生产者在消费者起来之前发送的数据),要想消费历史数据,加上参数:--from-beginning

bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first --from-beginning

5. 生产者异步发送与同步发送

添加依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.0.0</version>
</dependency>

创建Kafka生产者对象:

Properties properties = new Properties();

properties.put(ProducerConfig.BOOTSTRP_SERVERS_CONFIG, "hadoop102:9092");

// 指定key和value的序列化类型
// StringSerializer.class.getName()相当于StringSerializer的全路径名称
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

发送数据:

// new ProducerRecord第一个参数是topic,第二个参数是值(key为null)
kafkaProducer.send(new ProducerRecord<>("first", "value"));

ProducerRecord的多个构造函数:

关闭资源:

kafkaProducer.close();

发送数据也可以带回调函数,返回主题、分区等信息:

kafkaProducer.send(new ProducerRecord<>("first", "value"), new CallBack() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            System.out.println("topic: " + metadata.topic() + ", partition: " + metadata.partition());
        }
    }
});

同步发送只需在send方法之后加上get方法:

// new ProducerRecord第一个参数是topic,第二个参数是值(key为null)
kafkaProducer.send(new ProducerRecord<>("first", "value")).get();

6. 生产者分区策略 

生产者的默认分区器:DefaultPatitioner,即如果指定分区,就发送到指定分区;如果没指定分区,指定了key,则将key的哈希值对分区数取模得到分区;如果也没指定key,选择粘性分区(sticky partition),即随机选取一个分区,本批次数据满了或者linger.ms时间到了,再次选择另一个分区。

自定义分区,主要是实现Partitioner接口,重写其中的partition方法:

@Overrride
public int partition(String topic, Object key, byte[] keybytes, Object value, byte[] valuebytes, Cluster cluster) {
    String valueStr = value.toString();
    if (valueStr.contains("xxx")) {
        return 0;
    } else if (valueStr.contains("zzz")) {
        return 1;
    } else {
        return 2;
    }
}

配置关联自定义分区器:

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());

 7. 如何提高吞吐量

batch.size:每批次数据大小,数据量达到这个值,就开始发送,默认为16K

linger.ms:等待时间,如果到了这个时间,无论数据量多大,立即发送,默认为0

如果linger.ms设置为0,意味着一旦有数据来就立马发送,这样效率并不高,所以适当提高linger.ms有利于提高吞吐量,但是不能太大,这样会造成较大的数据延迟。

也可以发送数据的过程中采用数据压缩(snappy)的方式,来提高实际发送的数据量。

还可以修改缓冲区大小RecordAccumulator

设置缓冲区大小:

properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 32M

设置批次大小:

properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 16K

设置linger.ms:

properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

设置压缩格式:

properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

8. 数据可靠性

生产者发送给Kakfa集群,会收到如下几种应答:

1)ack = 0:不需要等待数据落盘,可靠性最差,存在丢数风险,一般不会用这种模式

2)ack = 1:需要Leader收到数据并进行落盘,也有丢数风险,比如Leader刚应答完就挂了,还没来得及同步数据给Follower

3)ack = -1/all,需要Leader和isr队列中所有节点收到数据并进行落盘,可靠性最好,但是数据可能会重复。

所谓ISR队列,就是和Leader保持同步的Leader+Follower的集合,例如:leader:0; isr: 0,1。如果某个Follower长时间未与Leader通信,该Follower就会被提出isr队列,这样就不会出现Leader长期等待某个故障Follower节点的问题。

ack = -1,如果分区副本数为1,或者isr队列里只有一个节点,则与ack=1效果一样,仍有丢数风险。

数据完全可靠 = (ACK = -1)+ (分区副本数 >= 2) + (ISR队列里节点数 >= 2)

代码配置:

properties.put(ProducerConfig.ACK_CONFIG, -1);
//重试次数,默认为int最大值
properties.put(ProducerConfig.RETRIES_CONFIG, 3);

为解决ack = -1时的数据重复性问题,kafka引入了幂等性和事务的概念。所谓幂等性,就是Producer无论向broker发送多少次重复数据,broker都只会持久化一条。

精确依次(Exactly Once) = 幂等性 + 数据完全可靠

重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,broker只会持久化一条。PID是每次Kafka重启时会分配一个新的,Partition表示分区号,SeqNumber是单调递增的。所以能保证单分区单次会话数据不重复。

开启幂等性,只需将enable.idempodence设为true即可(默认就是true)。

Kafka事务原理:

 使用事务发送数据:

properties.put(ProducerConfig.TRANSACTION_ID_CONFIG, "01");

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

kafkaProducer.initTransactions();
kafkaProducer.beginTransactions();
try {
    kafkaProducer.send(new ProducerRecord<>("first", "value"));
    kafkaProducer.commitTransactions();
} catch (Exception e) {
    kafkaProducer.abortTransactions();
} finnaly {
    kafkaProducer.close();
}

9. 数据乱序

Kafka生产者发送数据给broker,每个broker默认缓存5个请求,如果其中一个请求发送失败,不影响后面请求发送,加入失败的请求后来又重试成功了,那么broker收到的数据会是乱序的。只需将max.in.flight.requests.per.connection设置小于等于5,broker就会自动排序。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/441768.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

html--心动

代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>html</title><style>*{padding: 0;margin: 0;}body{background-color: pink;}#frame{position: relative;width: 400px;height: 300…

【项目】Boost 搜索引擎

文章目录 1.背景2.宏观原理3.相关技术与开发环境4. 实现原理1.下载2.加载与解析文件2.1获取指定目录下的所有网页文件2.2. 获取网页文件中的关键信息2.3. 对读取文件进行保存 3.索引3.1正排与倒排3.2获取正排和倒排索引3.3建立索引3.3.1正排索引3.3.2倒排索引 4.搜索4.1 初始化…

练习3-softmax分类(李沐函数简要解析)与d2l.train_ch3缺失的简单解决方式

环境为:练习1的环境 网址为:https://www.bilibili.com/video/BV1K64y1Q7wu/?spm_id_from333.1007.top_right_bar_window_history.content.click 代码简要解析 导入模块 导入PyTorch 导入Torch中的nn模块 导入d2l中torch模块 并命名为d2l import torch from torch import nn…

pytorch CV入门3-预训练模型与迁移学习.md

专栏链接&#xff1a;https://blog.csdn.net/qq_33345365/category_12578430.html 初次编辑&#xff1a;2024/3/7&#xff1b;最后编辑&#xff1a;2024/3/8 参考网站-微软教程&#xff1a;https://learn.microsoft.com/en-us/training/modules/intro-computer-vision-pytorc…

【Linux】文件周边003之文件系统

&#x1f440;樊梓慕&#xff1a;个人主页 &#x1f3a5;个人专栏&#xff1a;《C语言》《数据结构》《蓝桥杯试题》《LeetCode刷题笔记》《实训项目》《C》《Linux》《算法》 &#x1f31d;每一个不曾起舞的日子&#xff0c;都是对生命的辜负 目录 1.磁盘引入 2.文件系统 …

构建留学平台技术架构:从设计到实现

随着全球化进程的加速和人们对国际教育的需求不断增长&#xff0c;留学行业也迎来了快速发展的机遇。作为留学服务的重要组成部分&#xff0c;留学平台的技术架构设计至关重要。本文将探讨留学平台技术架构的设计和实现过程&#xff0c;以及相关的技术选择、挑战和解决方案。 …

NodeJS实现堆排序算法

NodeJS实现堆排序算法 以下是使用 Node.js 实现堆排序算法的示例代码&#xff1a; // 堆排序函数 function heapSort(arr) {// 构建最大堆buildMaxHeap(arr);// 依次取出最大堆的根节点&#xff08;最大值&#xff09;&#xff0c;并调整堆结构for (let i arr.length - 1; i…

18、电源管理入门之Power Domain管理

目录 1. 框架介绍 2. 如何使用power domain 3. provider 4. Consumer 参考: SoC中通常有很多IP,按逻辑可以把几个相关功能的IP划为一个电源域。一个电源域内的IP,通常按相同的方式由同一个硬件模块PMIC供电,电压一样并且电源管理例如休眠唤醒一致。 为什么有设备电源管…

HTML5+CSS3+JS小实例:暗紫色Tabbar

实例:暗紫色Tabbar 技术栈:HTML+CSS+JS 效果: 源码: 【HTML】 <!DOCTYPE html> <html lang="zh-CN"><head><meta charset="UTF-8" /><meta name="viewport" content="width=device-width, initial-scal…

Java项目:基于SSM框架实现的二手车交易平台【源码+开题报告+任务书+毕业论文+答辩ppt】

一、项目简介 本项目是一套基于SSM框架实现的二手车交易平台 包含&#xff1a;项目源码、数据库脚本等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都经过严格调试&#xff0c;eclipse或者idea 确保可以运行&#xff01; 该系统功能完善、界面美观、操作简单、功能齐…

746. 使用最小花费爬楼梯 (Swift版本)

题目 给你一个整数数组 cost&#xff0c;其中 cost[i] 是从楼梯第 i 个台阶向上爬需要支付的费用。一旦你支付此费用&#xff0c;即可选择向上爬一个或者两个台阶。 你可以选择从下标为 0 或下标为 1 的台阶开始爬楼梯。 请你计算并返回达到楼梯顶部的最低花费。 限制条件 2…

智能合约语言(eDSL)—— proc_macro实现合约init函数

我们通过属性宏来实现合约的init函数&#xff0c;call函数其实和init是类似的&#xff1b; GitHub - XuHugo/xwasm 构建属性宏&#xff0c;要在cargo.toml里面设置一些参数&#xff0c;这是必须的。一般来说&#xff0c;过程宏必须是一个库&#xff0c;或者作为工程的子库&…

【Git】项目源码迁移到另一个gitlab(保留原来提交历史记录)

目录 前情提要迁移方案IDEA远程仓库管理团队其他成员切换gitgit命令操作界面 前情提要 公司原来是自己私有部署的gitlab。有了研发云后就希望将代码推送到研发云的代码仓库上。这时候需要迁移并保留原来提交的历史记录。 迁移方案 登录新的gitlab(代码仓库)新建空白项目获取…

DEAP:利用生理信号进行情绪分析的数据库【DEAP数据集】

文章目录 摘要引言刺激选择实验环境参与者步骤参与者自我评估 主观评价分析EEG频率与参与者评分之间的相关性单次试验分类结果 结论 点击下载原文 摘要 ● DEAP&#xff1a;用于分析人类情感状态的多模态数据集。 ● 32名参与者观看了40个一分钟长的音乐视频。 ● 参与者根据唤…

Postman(注册,使用,作用)【详解】

目录 一、Postman 1. Postman介绍 2. 安装Postman 3. 注册帐号再使用(可保存测试记录) 4. 创建workspace 5. 测试并保存测试记录 一、Postman postman工具可以发送不同方式的请求,浏览器只能发送get请求(所有用这个工具) 在前后端分离开发模式下&#xff0c;前端技术人员…

简历–工作经历–通用

文章目录 底层逻辑导图要做到&#xff1a;避免出现&#xff1a;爽文模版&#xff1a;逆境努力逆袭&#xff1a;娱乐 底层逻辑 写作底层逻辑&#xff1a; 简历是给面试者/老师看的&#xff0c;要让人家看起来轻松。 工作经历方面&#xff0c;时间一般是倒着写的&#xff08;考官…

基于SSM的党务政务服务热线平台(有报告)。Javaee项目。ssm项目。

演示视频&#xff1a; 基于SSM的党务政务服务热线平台&#xff08;有报告&#xff09;。Javaee项目。ssm项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff0c;通过Spri…

机器学习笔记 计算机视觉中的测距任务常见技术路线

一、计算机视觉中的测距任务 测距是计算机视觉中的一项关键任务,涉及测量物体和相机之间的距离。这些信息可用于多种应用,包括机器人、自动驾驶汽车和增强现实。测距技术有很多种,包括主动式和被动式,每种技术都有自己的优点和局限性。主动测距技术,例如飞行时间、结构光和…

推荐一款go语言的开源物联网框架-opengw

推荐一款go语言的开源物联网框架&#xff0c;设计思想不错&#xff0c;值的学习。 技术交流 QQ群1028704210 官网及驱动下载 http://www.opengw.cn http://www.opengw.cn/col.jsp?id104 可执行文件下载 https://gitee.com/my_iot/goAdapter/releases 码云地址 https:/…

大语言模型如何充分理解人类自然语言指令

经过海量数据预训练后的语言模型虽然具备了大量的知识&#xff0c;但是由于其训练的目标仅是进行下一个词的预测&#xff0c;此时的模型还不能够理解并遵循人类自然语言的指令。指令微调(Instruction Tuning)&#xff0c;是指在已经训练好的语言模型的基础上&#xff0c;通过使…