kafka: 基础概念回顾(生产者客户端和机架感知相关内容)

一、kafka生产者客户端

在kafka体系结构中有如下几个重要的概念:

  • Producer:生产者,负责生产消息并投递到kafka broker的某个的分区中
  • Consumer:消费者,负责消费kafka若干个分区中的消息
  • Broker:kafka服务节点
1、整体架构:数据发送流程

在这里插入图片描述
(1)生产者

  • 拦截器
    生产者的拦截器可以在消息发送前做一些拦截工作对数据进行相应的处理,比如:消息过滤、消息内容修改等。
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;
public interface ProducerInterceptor<K, V> extends Configurable {
		//在将消息序列化和计算分区之前会调⽤该⽅法,⽤来对消息进⾏相应的定制化操作,如修改消息内容
		public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
		//在消息被应答之前或者消息发送失败时调⽤该⽅法,优先于⽤⼾设定的Callback之前执⾏,如统计消息发送成功或失败的次数
		public void onAcknowledgement(RecordMetadata metadata, Exception exception);
		public void close();
}
  • 序列化器
  • 分区器

二、kafka数据可靠性保证

1、LEO和HW
2、工作流程
3、Leader Epoch

三、粘性分区策略

四、机架感知

1、概念
2、机架感知分区分配策略
3、验证

(1)验证目标

  • 机架感知特性将同⼀分区的副本分散到不同的机架上
  • rack机制消费者可以消费到follower副本中的数据

(2)参数配置
broker端配置:

  • 配置名:broker.rack=my-rack-id
    • 解释:broker属于的rack
  • 配置名:replica.selector.class
    • 解释:ReplicaSelector实现类的全名,包括路径 (⽐如 RackAwareReplicaSelector 即按 rack id 指定消费)

Client端配置:
client.rack

  • consumer端配置
  • 配置名:client.rack
  • 解释:这个参数需要和broker端指定的 broker.rack 相同,表⽰去哪个rack中获取数据。
  • 默认:null

(3)环境准备:kafka集群

  • kafka实例数: 4
  • 两个kafka实例broker.rack配置为0,另外两个kafka实例broker.rack配置为了2,broker端配置如下:
server1:
broker.id=0

broker.rack=0
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector

server2:
broker.id=1
broker.rack=0
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector

server3
broker.id=2
broker.rack=2
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector

server4
broker.id=3
broker.rack=2
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector

启动kafka集群,服务端⽇志信息:
在这里插入图片描述
在这里插入图片描述
验证一:机架感知特性将同一分区的副本分散到不同的机架上
在这里插入图片描述
创建topic rack02,副本被分配到了broker1和2
在这里插入图片描述
创建topic rack03 副本被分配到了0和3
在这里插入图片描述
在这里插入图片描述

验证二:客⼾端(消费者)验证:rack机制消费者可以消费到follower副本中的数据

验证代码如下:

package person.xsc.train.producer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import person.xsc.train.client.KafkaConsumerClient;
import person.xsc.train.constant.KafkaConstant;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class Demo {
		public static KafkaConsumer<String, String> kafkaConsumer;
		public static void main(String[] args) {
				Properties properties = new Properties();
				properties.put(KafkaConstant.BOOTSTRAP_SERVERS, "localhost:9093,localhos
				properties.put(KafkaConstant.GROUP_ID, "test01");
				properties.put(KafkaConstant.ENABLE_AUTO_COMMIT, "true");
				properties.put(KafkaConstant.AUTO_COMMIT_INTERVAL_MS, "1000");
				properties.put(KafkaConstant.KEY_DESERIALIZER, StringDeserializer.class.
				properties.put(KafkaConstant.VALUE_DESERIALIZER, StringDeserializer.clas
				properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
				properties.put(ConsumerConfig.CLIENT_RACK_CONFIG, "0");
				properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
				kafkaConsumer = KafkaConsumerClient.createKafkaClient(properties);
				
				receiveMessage("rack02");
		}
		public static void receiveMessage(String topic) {
				TopicPartition topicPartition0 = new TopicPartition(topic, 0);
				kafkaConsumer.assign(Arrays.asList(topicPartition0));
				while(true) {
						// Kafka的消费者⼀次拉取⼀批的数据
						ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll
						//System.out.println("开始打印消息!");
						// 5.将将记录(record)的offset、key、value都打印出来
						for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
								// 主题
								String topicName = consumerRecord.topic();
								int partition = consumerRecord.partition();
								// offset:这条消息处于Kafka分区中的哪个位置
								long offset = consumerRecord.offset();
								// key\value
								String key = consumerRecord.key();
								String value = consumerRecord.value();
								System.out.println(String.format("topic: %s, partition: %s, offs
						}
				}
		}
}

前置背景:
Topic rack02的partition 0分区的副本为broker2(对应的rack为2)和broker1(对应的rack为0),其中broker2为leader(在⾮rack机制下仅能消费到leader中的数据)。

在上述代码中,消费者配置中限制了rack为0,消费的分区为0,因此映射到broker1。通过测试可验证在rack机制下消费者可以消费到folloer副本中的数据,测试如下:
在这里插入图片描述

五、机架感知存在的问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/307655.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

20240110-你是成年人了,你的精力要用来赚钱

丘吉尔曾经说过&#xff1a;从让一个人生气事情的大小&#xff0c;就能看出一个人的价值。我觉得这句话很有道理。为什么有的人动不动就生气&#xff1f;而有的人却对一些冒犯做到丝毫不在乎&#xff1f; 我们经常就一些鸡毛蒜皮的事情去计较&#xff0c;其实很多事情不值得我…

格雷希尔G65系列快速接头满足汽车减震器的气压、油压测试要求

当汽车经过不平路面时&#xff0c;汽车减震器可以抑制弹簧吸震后因反弹带来的震荡和来自路面的冲击&#xff0c;为乘客带来平稳舒适的行车体验。减震器在出厂之前&#xff0c;需要模拟汽车的真实行驶环境&#xff0c;在模拟当中需要对它们进行气压和油压的轮番测试。 客户的测试…

LLaMA-VID:突破视觉语言模型界限,精准捕捉图像精髓

模型概述 LLaMA-VID模型的主要目标是解决现有视觉语言模型在处理长时视频时遇到的挑战。这些挑战主要包括处理大量视觉特征所需的高计算资源以及信息的复杂性和冗余性。为了克服这些难题&#xff0c;LLaMA-VID采用了创新的方法&#xff0c;有效地减少了长时视频中无关紧要信息…

vue echarts折线图加背景颜色 值区域对应的右侧加上文本

mounted() {this.lineEcharts();},lineEcharts() {const option { tooltip: {trigger: axis,transitionDuration: 0 // 让toolltip紧跟鼠标&#xff0c;防止抖动},title: {text: 得分,left: 25,textStyle:{color: #333,fontSize: 12,fontWeight: 400}, },legend:…

Alphalens因子分析(2) - 低换手率因子秒杀98%的基金经理?

上一篇笔记&#xff0c;我们已经为因子分析准备好了数据。这一篇笔记&#xff0c;我们就进行因子分析。分析过程在 Alphalens 中非常简单&#xff0c;核心是读懂它的报告。 Alphalens 框架 Alphalens 的主要模块是 utils, tears, performance 和 plotting。 utils 主要功能是…

elementui dialog 回车时却刷新整个页面

到处都是坑&#xff0c;这个坑填完另一个坑还在等你。。。坑坑相连&#xff0c;坑坑不同。。。 使用el-dialog弹出一个表单&#xff0c;当我无意间敲到回车键时&#xff0c;整个页面被刷新了&#xff0c;又是一脸的懵逼。。。 经过查找文档发现解决方案为上述截图标记。。。 e…

SaaS先驱Salesforce发展史

Salesforce是云计算和SaaS领域的先驱&#xff0c;大致经过5个不同发展阶段 第一个阶段&#xff1a;SaaS CRM发展初期 Salesforce成立时间是1999年&#xff0c;其SaaS业务的Idea的灵感起源于IaaS巨头亚马逊。初期标榜的竞品Siebel早期投入高、很难上手、功能过于复杂、实用性不强…

C语言之详解数组【附三子棋和扫雷游戏实战】

文章目录 一、一维数组的创建和初始化1、数组的创建2、数组的初始化3、一维数组的使用4、 一维数组在内存中的存储 二、二维数组的创建和初始化1、二维数组的创建2、二维数组的初始化3、二维数组的使用4、二维数组在内存中的存储 三、数组越界边界值考虑不当导致越界访问数组大…

龍运当头--html做一个中国火龙祝大家龙年大吉

🐉效果展示 🐉HTML展示 <body> <!-- partial:index.partial.html --> <svg><defs><g id=

推荐VSCODE插件:为`package.json`添加注释信息

众所周知&#xff0c;JSON文件是不支持注释的&#xff0c;除了JSON5/JSONC之外&#xff0c;我们在开发项目特别是前端项目时&#xff0c;大量会用到JSON文件&#xff0c;特别是在编写package.json中的scripts时&#xff0c;由于缺少注释,当有大量的命令脚本时&#xff0c;就有了…

【REST2SQL】07 GO 操作 Mysql 数据库

【REST2SQL】01RDB关系型数据库REST初设计 【REST2SQL】02 GO连接Oracle数据库 【REST2SQL】03 GO读取JSON文件 【REST2SQL】04 REST2SQL第一版Oracle版实现 【REST2SQL】06 GO 跨包接口重构代码 MySQL是一个关系型数据库管理系统&#xff0c;由瑞典MySQL AB 公司开发&#xf…

【Spring Boot】SpringBoot maven 项目创建图文教程

创建一个Spring Boot项目并使用Maven进行构建是一项相对简单的任务。以下是使用IntelliJ IDEA创建Spring Boot Maven项目的详细教程&#xff1a; 步骤 1&#xff1a;安装 IntelliJ IDEA 确保你已经安装了最新版本的 IntelliJ IDEA。你可以从官方网站下载并安装。 步骤 2&am…

构建高效学习平台:企业培训系统源码深度解析

企业培训系统是组织中培养和提升员工技能的核心工具。本文将深入探讨企业培训系统的源码&#xff0c;通过关键技术代码解析&#xff0c;揭示其中的设计原理和功能实现&#xff0c;以构建更高效的学习平台。 1. 环境配置与依赖项安装 首先&#xff0c;让我们关注源码的环境配…

小测一下HCL中VSR的转发性能

正文共&#xff1a;555 字 10 图&#xff0c;预估阅读时间&#xff1a;1 分钟 上次我们在HCL中导入了NFV的自定义镜像&#xff08;如何在最新版的HCL 5.10.0中导入NFV镜像&#xff1f;&#xff09;&#xff0c;但是当时没有测试转发性能&#xff0c;最近HCL又更新了V5.10.1版本…

电脑文件mfc100u.dll丢失的解决方法分析,怎么修复mfc100u.dll靠谱

mfc100u.dll丢失了要怎么办&#xff1f;其实很多人都遇到过这样的电脑故障吧&#xff0c;说这个mfc100u.dll文件已经不见了&#xff0c;然后一些程序打不开了&#xff0c;那么这种情况我们要怎么解决呢&#xff1f;今天我们就来给大家详细的说说mfc100u.dll丢失的解决方法。 一…

虚拟机安装intel架构的银河麒麟V10(SP1)

一 背景 银河麒麟是国产操作系统之一&#xff0c;是基于Linux内核的桌面操作系统&#xff0c;有自己的应用中心&#xff0c;具有一定的生态系统。今从官网下载了V10&#xff08;SP1&#xff09;镜像文件&#xff0c;在Windowns的VMware虚拟机上安装试用。 官网&#xff1a;http…

ylov8的训练和预测使用(目标检测)

首先要配置文文件 1-配置数据集的yaml文件&#xff1a; 目录在ultralytics/cfg/datasets/下面&#xff1a; 例如我的&#xff1a; (这里面的yaml文件在/ultralytics/cfg/datasets下面有很多&#xff0c;可以找几个参考一下) path: /path/to/eye_datasets # dataset root di…

使用cURL命令在Linux中测试HTTP服务器的性能

cURL是一个强大的命令行工具&#xff0c;用于从或向服务器传输数据。它支持多种协议&#xff0c;包括HTTP、HTTPS、FTP等。在Linux系统中&#xff0c;cURL可以用于测试和评估HTTP服务器的性能。下面是一些使用cURL命令测试HTTP服务器性能的示例和说明。 1. 基本请求 要向指定…

虚幻引擎:开创视觉与创意的新纪元

先看看据说虚幻5做出来的东西吧&#xff1a; 虚幻引擎5&#xff01;&#xff01;&#xff01;4K画质PS5实机演示&#xff01; 好了&#xff0c;用文字认识一下吧&#xff1a; 虚幻引擎5.3对UE5的核心工具集作了进一步优化&#xff0c;涉及渲染、世界构建、程序化内容生成&…

怀念母校《山东海天软件工程学院》

当初就是个统招专科的分数&#xff0c;因个人喜欢英语和计算机&#xff0c;加之学校宣传学历技能培训&#xff0c;于是参加了夏令营&#xff0c;后来在海天度过了大学时光&#xff0c;有技术社团&#xff0c;有爱好社团&#xff0c;也参加了比赛&#xff0c;顺利实习&#xff0…