zookeperkafka学习

1、why kafka

优点   缺点
kafka
  • 吞吐量高,对批处理和异步处理做了大量的设计,因此Kafka可以得到非常高的性能。
延迟也会高,不适合电商场景。
RabbitMQ
  • 如果有大量消息堆积在队列中,性能会急剧下降
  • 每秒处理几万到几十万的消息。如果应用要求高的性能,不要选择RabbitMQ。
性能RocketMQ低
RocketMQ
  • 性能比RabbitMQ高一个数量级,适合电商场景。
  • RocketMQ主要用于有序,事务,流计算,消息推送,日志流处理,binlog分发等场景。
  • 每秒处理几十万的消息,同时响应在毫秒级。如果应用很关注响应时间,可以使用RocketMQ。

2、Broker:

缓存代理(可以把Broker理解为Kafka的服务器),Kafka 集群中的一台或多台服务器统称为 broker。kafka中支持消息持久化的,生产者生产消息后,kafka不会直接把消息传递给消费者,而是先要在broker中进行存储,持久化是保存在kafka的日志文件中。 

3、分区:

一个消费者可以对应多个分区,一个分区只能对应一个消费者。

topic分区有leader和follower。 Log的分区被分布到集群中的多个服务器上。每个服务器处理它分到的分区。 根据配置每个分区还可以复制到其它服务器作为备份容错。 每个分区有一个leader,零或多个follower。Leader处理此分区的所有的读写请求,而follower被动的复制数据。如果leader宕机,其它的一个follower会被推举为新的leader。 一台服务器可能同时是一个分区的leader,另一个分区的follower。 这样可以平衡负载,避免所有的请求都只让一台或者某几台服务器处理。

4、消费者组 :

topic实现JMS模型中消费者组中只有一个消费者,这种情况下topic的消费的offset是无序的。当单个消费者无法跟上数据生成的速度,就可以增加更多的消费者分担负载,每个消费者只处理部分partition的消息,从而实现单个应用程序的横向伸缩。但是不要让消费者的数量多于partition的数量,此时多余的消费者会空闲。此外,Kafka还允许多个应用程序从同一个Topic读取所有的消息,此时只要保证每个应用程序有自己的消费者组即可。

kafka为什么读写快?

利用零拷贝和页面缓存技术,零拷贝技术读取文件数据并发送到网络的步骤如下:

  • 将磁盘文件的数据复制到页面缓存。
  • 将数据从页面缓存直接发送到网卡从而发到网络中。

rebalance

主要是对partition的个数和group当中的consumer个数重新统计,再重新对应consumer和partition的关系。一个消费者可以对应多个分区。一个分区只能对应一个消费者。

kafka producer API

生产者的分区由key决定

我们创建消息的时候,必须要提供主题和消息的内容,而消息的key是可选的,当不指定key时默认为null。消息的key有两个重要的作用:1)提供描述消息的额外信息;2)用来决定消息写入到哪个分区,所有具有相同key的消息会分配到同一个分区中。

如果key为null,那么生产者会使用默认的分配器,该分配器使用轮询(round-robin)算法来将消息均衡到所有分区。

如果key不为null而且使用的是默认的分配器,那么生产者会对key进行哈希并根据结果将消息分配到特定的分区。

案例:

Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value pairs.

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384); //默认是16kB, 每个Batch要存放batch.size大小的数据后,才可以发送出去。
 props.put("linger.ms", 1); //一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了。
 props.put("buffer.memory", 33554432); //默认是32MB,KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的Batch,再发送到Broker上去的。
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
 Producer<String, String> producer = new KafkaProducer<>(props);
 for(int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
 
 producer.close();

kafka consumer API

案例一:手动同步提交

Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "false");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     final int minBatchSize = 200;
     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records) {
             buffer.add(record);
         }
         if (buffer.size() >= minBatchSize) {
             insertIntoDb(buffer);
             consumer.commitSync();
             buffer.clear();
         }
     }

案例二:每个partition手动同步提交

try {
         while(running) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                //拿到这个partition下面的所有数据
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(record.offset() + ": " + record.value());
                 }
                //通过这个partition的list获取最后一个数据的offset
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
       consumer.close();
     }

Kafka文件存储:

知道通过分片和索引机制找到offset的就行了。index和log文件以当前的第一条消息的offset命名。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/159935.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

接口测试 —— 接口测试的意义

1、接口测试的意义&#xff08;优势&#xff09; &#xff08;1&#xff09;更早的发现问题&#xff1a; 不少的测试资料中强调&#xff0c;测试应该更早的介入到项目开发中&#xff0c;因为越早的发现bug&#xff0c;修复的成本越低。 然而功能测试必须要等到系统提供可测试…

Pytorch torch.norm函数详解用法

torch.norm参数定义 torch版本1.6 def norm(input, p"fro", dimNone, keepdimFalse, outNone, dtypeNone)input input (Tensor): the input tensor 输入为tensorp p (int, float, inf, -inf, fro, nuc, optional): the order of norm. Default: froThe following …

【计算思维】蓝桥杯STEMA 科技素养考试真题及解析 5

1、要把下面4张图片重新排列成蜗牛的画像&#xff0c;该如何排列这些图片 A、 B、 C、 D、 答案&#xff1a;A 2、将下图的绳子沿虚线剪开后&#xff0c;绳子被分成了()部分 A、6 B、7 C、8 D、9 答案&#xff1a;C 3、下面的立体图形&#xff0c;沿箭头方向看去&#…

LAST论文翻译

《Read Ten Lines at One Glance: Line-Aware Semi-Autoregressive Transformer for Multi-Line Handwritten Mathematical Expression Recognition》论文翻译 文章目录 《Read Ten Lines at One Glance: Line-Aware Semi-Autoregressive Transformer for Multi-Line Handwritt…

python→函数曲线

CSDN中公式一栏&#xff0c;亦可以插入Latex函数。 以函数 为例 也可以用Latex写如下代码&#xff1a; \documentclass{article} \usepackage{amsmath} \begin{document} \[ y \frac{n}{n30} \] \end{document} 如下&#xff1a; 那么&#xff0c;该函数图像如何呢&#xf…

spring-boot-maven-plugin插件 —— 默认打包配置

创建 Spring Boot 应用&#xff0c;默认会添加 Maven 插件&#xff1a;spring-boot-maven-plugin。如果项目结构比较简单&#xff0c;可以不用额外配置&#xff0c;使用默认的编译打包就可以。 执行 maven 打包命令时会自动触发 spring-boot-maven-plugin 插件的 repackage 目…

JVM判断对象是否存活之引用计数法、可达性分析

目录 前言 引用计数法 概念 优点 缺点 可达性分析 概念 缺点&#xff1a; 扩展&#xff1a; 1.GC Roots 概念 2.STW (Stop the world) 前言 JVM有两种算法来判断对象是否存活&#xff0c;分别是引用计数法和可达性分析算法&#xff0c;针对可达性分析算法STW时间长、…

ChatGpt3.5已经应用了一段时间,分享一些自己的使用心得.

首先ChatGpt3.5的文本生成功能十分强大&#xff0c;但是chatgpt有一些使用规范大家需要注意&#xff0c;既然chat是一种工具&#xff0c;我们就需要学会它的使用说明&#xff0c;学会chatgpt的引用语句&#xff0c;会极大的方便我们的使用。我们需要做以下的准备。 明确任务和目…

*ST富吉-688272 三季报分析(20231117)

*ST富吉-688272 基本情况 公司名称&#xff1a;北京富吉瑞光电科技股份有限公司 A股简称&#xff1a;*ST富吉 成立日期&#xff1a;2011-01-20 上市日期&#xff1a;2021-10-18 所属行业&#xff1a;计算机、通信和其他电子设备制造业 周期性&#xff1a;1 主营业务&#xff1a…

机器学习第8天:线性SVM分类

文章目录 介绍 特征缩放 示例代码 硬间隔与软间隔分类 主要代码 代码解释 结语 介绍 作用&#xff1a;判别种类 原理&#xff1a;找出一个决策边界&#xff0c;判断数据所处区域来识别种类 简单介绍一下SVM分类的思想&#xff0c;我们看下面这张图&#xff0c;两种分类都…

Spring接入Metric+Graphite+Grafana搭建监控系统

环境搭建 Metric 主要是记录操作记录&#xff0c;把数据传给Graphite&#xff0c;这个只需要引入依赖就可以了 日志收集系统&#xff0c;可以支持很多的监控系统一般在Spring项目中用其收集数据&#xff0c;可以发送到Graphite等监控系统中一般使用Merter和Timer分别记录成功…

【SpringBoot】 环境准备

一.SpringBoot准备 1.下载idea 社区版 2021.1 - 2022.1.4 专业版 无要求 2.Maven 是一个工具,和Java没有关系 . 主要功能是项目构建和依赖管理. 项目构建 上述对应的都是maven命令 . 依赖管理 添加坐标之后,点击刷新,右侧就会载入依赖. Maven还有依赖传递和依赖排除功…

滚雪球学Java(09-3):Java中的逻辑运算符,你真的掌握了吗?

咦咦咦&#xff0c;各位小可爱&#xff0c;我是你们的好伙伴——bug菌&#xff0c;今天又来给大家普及Java SE相关知识点了&#xff0c;别躲起来啊&#xff0c;听我讲干货还不快点赞&#xff0c;赞多了我就有动力讲得更嗨啦&#xff01;所以呀&#xff0c;养成先点赞后阅读的好…

C# 实现腾讯云多路直播流的云端混合录制

目录 应用场景 腾讯云直播和云点播 产品架构 混流显示示例 关键代码 API实现 小结 应用场景 在云考试或视频面试中&#xff0c;除了对考生、考官的实时音视频监控以防止作弊行为的发生以外&#xff0c;对直播流的音视频录制也尤为重要&#xff0c;可做为后期证据材料进…

Spring对事务的实现

Spring对事务的支持 事务概述事务的四个处理过程事务的四个特性 引入事务场景Spring实现事务的两种方式声明式事务之注解实现方式 事务概述 在一个业务流程当中&#xff0c;通常需要多条DML&#xff08;insert delete update&#xff09;语句共同联合才能完成&#xff0c;这多…

RT-Thread STM32F407 PWM

为了展示PWM效果&#xff0c;这里用ADC来采集PWM输出通道的电平变化 第一步&#xff0c;进入RT-Thread Settings配置PWM驱动 第二步&#xff0c;进入board.h&#xff0c;打开PWM宏 第三步&#xff0c;进入STM32CubeMX&#xff0c;配置时钟及PWM 第四步&#xff0c;回到R…

鸿蒙4.0开发笔记之DevEco Studio如何使用Previewer窗口预览器(一)

一、预览器作用 DevEco Studio预览器概况在HarmonyOS应用开发过程中&#xff0c;通过使用预览器&#xff0c;可以查看应用的UI效果&#xff0c;方便开发者实时查看应用的运行效果&#xff0c;随时调整代码。 二、打开Previewer预览器 1、正常启动 打开预览器的位置在DevEco…

三十分钟学会zookeeper

zookeeper 一、前提知识 集群与分布式 ​ 集群&#xff1a;将一个任务部署在多个服务器&#xff0c;每个服务器都能独立完成该任务。 ​ 分布式&#xff1a;将一个任务拆分成若干个子任务&#xff0c;由若干个服务器分别完成这些子任务&#xff0c;每个服务器只能完成某个特…

有趣的按钮分享

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 广告打完&#xff0c;我们进入正题&#xff0c;先看效果&#xff1a; 废话不多&#xff0c;上源码&#xff1a; <button class&quo…

【STM32】RTC(实时时钟)

1.RTC简介 本质&#xff1a;计数器 RTC中断是外部中断&#xff08;EXTI&#xff09; 当VDD掉电的时候&#xff0c;Vbat可以通过电源--->实时计时 STM32的RTC外设&#xff08;Real Time Clock&#xff09;&#xff0c;实质是一个 掉电 后还继续运行的定时器。从定时器的角度…