Kafka-服务端-DelayedOperationPurgatory

DelayedOperationPurgatory是一个相对独立的组件,它的主要功能是管理延迟操作。

DelayedOperationPurgatory的底层依赖于Kafka提供的时间轮实现。

我们可以使用JDK本身提供的java.util.Timer或是DelayQueue轻松实现定时任务的功能,为什么Kafka还要专门开发DelayedOperationPurgatory组件呢?这主要是因为像Kafka这种分布式系统的请求量巨大,性能要求也很高,JDK提供java.util.Timer和DelayedQueue底层实现使用的是堆这种数据结构,存取操作的复杂度都是O(nlog(n)),无法支持大量的定时任务。

在高性能的框架中,为了将定时任务的存取操作以及取消操作的时间复杂度降为O(1),一般会使用其他方式实现定时任务组件,例如,使用时间轮的方式。这种做法还是比较多见的,例如,ZooKeeper中使用“时间桶”的方式处理Session过期,Netty也提供了HashedWheelTimer这种时间轮的实现,Quartz框架中也有时间轮的身影。

TimingWheel

Kafka的时间轮实现是TimingWheel,它是一个存储定时任务的环形队列,底层使用数组实现,数组中的每个元素可以存放一个TimerTaskList对象。

TimerTaskList是环形双向链表,在其中的链表项TimerTaskEntry中封装了真正的定时任务TimerTask。

TimerTaskList使用expiration字段记录了整个TimerTaskList的超时时间。

TimerTaskEntry中的expirationMs字段记录了超时时间戳,timerTask字段指向了对应的TimerTask任务。TimeTask中的delayMs记录了任务的延迟时间,timerTaskEntry字段记录了对应的TimerTaskEntry对象。

这三个对象是TimingWheel实现的基础。
TimingWheel提供了层级时间轮的概念,如图所示。

在这里插入图片描述

第一层时间轮的时间跨度比较小,而第二层时间轮的时间跨度比较大。存放在同一TimerTaskList中的TimerTask到期时间可能不同,但是都由一个时间格覆盖。

现假设图中第二层时间轮中编号为2的时间格保存的TimerTaskList到期时间为t,其中保存的任务的到期时间只能是[t~t+20ms]这个范围,例如T1的过期时间可以为t+10ms,任务T2的过期时间可以为t+15ms,T3的过期时间可以为t+12ms。

如果任务到期时间不在[t~t+20ms]这个时间段,则只能放到其他的时间格对应的TimerTaskList中保存。

当任务到期时间超出了当前时间轮所表示的时间范围时,会尝试添加到上层时间轮。依然以图为例,其中第一层时间轮的每个时间格是1ms,整个时间轮的跨度是20ms,其表针currentTime当前表示的时间ct,则该时间轮的跨度为[ct~ct+20ms],只有到期时间在这段范围内的任务才能添加到该时间轮中等待到期。

到期时间超出[ct~ct+20ms]这个时间范围的任务会尝试添加到其上级时间轮中,通过逐层向上尝试,最终找到合适的时间轮层级。

整个时间轮表示的时间跨度是不变的,随着表针currentTime的后移,当前时间轮能处理时间段也在不断后移,新来的TimerTaskEntry会复用原来已经到期的TimerTaskList。

如图所示,第一层时间轮的时间跨度始终为20ms,表针currentTime表示的时间随着时间的流逝不断后移,指向了第三个时间格,此时表针currentTime表示的时间为ct+3ms,整个时间轮表示的时间段是[ct+3ms~ct+23ms],但是该时间轮的时间跨度依然是20ms。

此时该时间轮中编号为2的时间格表示的时间范围不再是[ct+1ms ~ ct+2ms],而是[ct+22ms ~ ct+23ms]。

最后用一个示例详细介绍时间轮降级场景,如图所示。

在这里插入图片描述

现在有一个任务是在445ms后执行,默认情况下,各个层级的时间轮的时间格个数为20,第一层时间轮每个时间格的跨度为1ms,整个时间轮的跨度为20ms,跨度不够。

第二层时间轮时间格的跨度为20ms,整个时间轮的跨度为400ms,跨度依然不够。

第三层时间轮时间格的跨度为400ms,整个时间轮的跨度为8000ms,跨度足够,此任务存放在第三层时间轮的第一个时间格对应的TimerTaskList中等待执行,此TimerTaskList到期时间是400ms。

随着时间的流逝,当此TimerTaskList到期时,距离该任务的到期时间还有45ms,不能执行任务。

我们将其重新提交到层级时间轮中,此时第一层时间轮跨度依然不够,但是第二层时间轮的跨度足够,该任务会被放到第二层时间轮第三个时间格中等待执行。

如此往复几次,高层时间轮的任务会慢慢移动到低层时间轮上,最终任务到期执行。

介绍完了时间轮的基本概念之后,下面开始分析TimingWheel的具体实现,其核心字段的含义如下所述。

  • buckets:Array.tabulate[TimerTaskList]类型,其每一个项都对应时间轮中的一个时间格,用于保存TimerTaskList的数组。在TimingWheel中,同一个TimerTaskList中的不同定时任务的到期时间可能不同,但是相差时间在一个时间格的范围内。

  • tickMs:当前时间轮中一个时间格表示的时间跨度。

  • wheelSize:当前时间轮的格数,也是buckets数组的大小。

  • taskCounter:各层级时间轮中任务的总数。

  • startMs:当前时间轮的创建时间。

  • queue:DelayQueue类型,整个层级时间轮共用的一个任务队列,其元素类型是TimerTaskList(实现了Delayed接口)。

  • currentTime:时间轮的指针,将整个时间轮划分为到期部分和未到期部分。在初始化时,currentTime被修剪成tickMs的倍数,近似等于创建时间,但并不是严格的创建时间。

  • interval:当前时间轮的时间跨度,即tickMswheelSize。当前时间轮只能处理时间范围在currentTime~currentTime+tickMsWheelSize之间的定时任务,超过这个范围,则需要将任务添加到上层时间轮中。

  • overflowWheel:上层时间轮的引用。

在TimeWheel中提供了add、advanceClock、addOverflowWheel三个方法,这三个方法实现了时间轮的基础功能。add方法实现了向时间轮中添加定时任务的功能,它同时也会检测待添加的任务是否已经到期。

SystemTimer

SystemTimer是Kafka中的定时器实现,它在TimeWheel的基础上添加了执行到期任务、阻塞等待最近到期任务的功能。下面来分析其核心字段。

  • taskExecutor:JDK提供的固定线程数的线程池实现,由此线程池执行到期任务。
  • delayQueue:各个层级的时间轮共用的DelayQueue队列,主要作用是阻塞推进时间轮表针的线程(ExpiredOperationReaper),等待最近到期任务到期。
  • taskCounter:各个层级时间轮共用的任务个数计数器。
  • timingWheel:层级时间轮中最底层的时间轮。
  • readWriteLock:用来同步时间轮表针currentTime修改的读写锁。
    SystemTimer.add方法在添加过程中如果发现任务已经到期,则将任务提交到taskExecutor中执行;如果任务未到期,则调用TimeWheel.add方法提交到时间轮中等待到期后执行。

DelayedOperation

在前面介绍ProducerRequest和FetchRequest两种请求时提到,服务端在收到这两种请求时并不是立即返回响应,可能会等待一段时间后才返回。

对于ProducerRequest来说,其中的acks字段设置为-1表示ProducerRequest发送到Leader副本之后,需要ISR集合中所有副本都同步该请求中的消息(或超时)后,才能返回响应给客户端。

ISR集合中的副本分布在不同Broker上,与Leader副本进行同步时就涉及网络通信,一般情况下我们认为网络传输是不可靠的而且是一个较慢的过程,通常采用异步的方式处理来避免线程长时间等待。

当FetchRequest发送给Leader副本后,会积累一定量的消息后才返回给消费者或Follower副本,并不是Leader副本的HW后移一条消息就立即将其返回给消费者,这是为了实现批量发送消息,提高有效负载。

Kafka利用前面介绍的SystemTimer来定期检测请求是否超时,但是这些请求真正的目的并不是为了超时执行,而是为了满足其他条件后执行,例如ProducerRequest的响应条件ISR集合中所有副本都同步了请求中的消息,所以仅使用SystemTimer就无法满足需求了。Kafka使用DelayedOperation抽象类表示延迟操作,它对TimeTask进行了扩展,除了有定时执行的功能,还提供了检测其他执行条件的功能。我们可以认为DelayedOperation是一个延迟的、异步的操作。

如图所示,DelayedOperation有四个实现类,分别表示四类不同的延迟操作,也对应了四种不同的请求。DelayedOperation是一个抽象类,completed字段是AtomicBoolean类型,标识了此DelayedOperation是否完成,初始值为false;delayMs记录了延迟操作的延迟时长。

在这里插入图片描述
DelayedOperation可能因为到期而被提交到SystemTimertaskExecutor线程池中执行,也可能在其他线程检测其执行条件时发现已经满足执行条件,而将其执行。

在这里插入图片描述

DelayedOperationPurgatory

DelayedOperationPurgatory是一个辅助类,提供了管理DelayedOperation以及处理到期DelayedOperation的功能。DelayedOperationPurgatory依赖的组件如图所示。

在这里插入图片描述

DelayedOperationPurgatory中的watchersForKey字段用于管理DelayedOperation,它是Pool[Any,Watchers]类型,Pool的底层实现是ConcurrentHashMap。

watchersForKey集合的key表示的是Watchers中的DelayedOperation关心的对象,其value是Watchers类型的对象,Watchers是DelayedOperationPurgatory的内部类,表示一个DelayedOperation的集合,底层使用LinkedList实现。

DelayedProduce

经过前面的介绍,我们已经了解了SystemTimer和DelayedOperationPurgatory的工作原理。

在详细介绍DelayedProduce的相关实现之前,先来了解一下当ProducerRequest的acks字段为-1时,服务端的处理流程:在KafkaApis中处理ProducerRequest的方法是handleProducerRequest()方法,它会调用ReplicaManager.appendMessages方法将消息追加到Log中,生成相应的DelayedProduce对象并添加到delayedProducePurgatory处理。

delayedProducePurgatory是ReplicaManager中的字段,它是专门用来处理DelayedProduce的DelayedOperationPurgatory对象。
在这里插入图片描述

DelayedFetch

DelayedFetch是FetchRequest对应的延迟操作,它的原理与DelayedProduce类似。

我们先来粗略分析FetchRequest的处理流程:来自消费者或Follower副本的FetchRequest由KafkaApis.handleFetchRequest()方法处理,它会调用ReplicaManager.fetchMessages()方法从相应的Log中读取消息,并生成DelayedFetch添加到delayedFetchPurgatory中处理。

delayedFetchPurgatory是ReplicaManager中的字段,它是专门用来处理DelayedFetch的DelayedOperationPurgatory对象。

零拷贝

在消费者获取消息时,服务器先从硬盘读出数据到内存,然后将内存中的数据原封不动地通过Socket发送给消费者。

虽然这个操作描述非常简单,但是其中涉及的步骤非常多,效率也比较差,尤其是当数据量较大时。

按照这种设计,其底层执行步骤大致如下:

首先,应用程序调用read方法时需要从用户态切换到内核态,将数据从磁盘上读取出来保存到内核缓冲区中;

然后,内核缓冲区中的数据传输到应用程序,此时read方法调用结束,从内核态切换到用户态;

之后,应用程序执行send方法,需要从用户态切换到内核态,将数据传输给Socket Buffer;

最后,内核会将Socket Buffer中的数据发送NIC Buffer(网卡缓冲区)进行发送,此时send方法结束,从内核态切换到用户态。

如图所示,在这个过程中涉及四次上下文切换(Context switch)以及四次数据复制,并且其中有两次复制操作由CPU完成。

但是在这个过程中,数据完成没有进行变化,仅仅是从磁盘复制到了网卡缓冲区中,会浪费大量的CPU周期。

在这里插入图片描述
通过“零拷贝”技术可以去掉这些无谓的数据复制操作,同时也会减少上下文切换的次数。

大致步骤如下:首先,应用程序调用transferTo方法,DMA会将文件数据发送到内核缓冲区;

然后,Socket Buffer追加数据的描述信息;

最后,DMA将内核缓冲区的数据发送到网卡缓冲区,这样就完全解放了CPU,实现了零拷贝,如图所示。

在这里插入图片描述
我们对DelayedFetch的相关处理流程做一下总结,这里以来自Follower副本为例,如图所示。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/340124.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++11手撕线程池 call_once 单例模式 Singleton / condition_variable 与其使用场景

一、call_once 单例模式 Singleton 大家可以先看这篇文章&#xff1a;https://zh.cppreference.com/w/cpp/thread/call_once /*std::call_oncevoid call_once( std::once_flag& flag, Callable&& f, Args&&... args ); */ #include <iostream> #i…

Flash读取数据库中的数据

Flash读取数据库中的数据 要读取数据库的记录&#xff0c;首先需要建立一个数据库&#xff0c;并输入一些数据。数据库建立完毕后&#xff0c;由Flash向ASP提交请求&#xff0c;ASP根据请求对数据库进行操作后将结果返回给Flash&#xff0c;Flash以某种方式把结果显示出来。 …

国内首个!亚信安全获得CCRC数据分类分级产品认证证书

亚信安全信数数据分类分级系统AISDC V1.0&#xff0c;荣获中国网络安全审查认证和市场监管大数据中心颁发的首个数据分类分级产品IT产品信息安全认证证书&#xff01;标志着亚信安全在大数据安全领域的强大技术实力以及专业研究&#xff0c;正式获得国内数据分类分级产品评定的…

如何无公网ip实现SSH远程访问本地局域网openEuler系统?

文章目录 1. 本地SSH连接测试2. openEuler安装Cpolar3. 配置 SSH公网地址4. 公网远程SSH连接5. 固定连接SSH公网地址6. SSH固定地址连接测试 欧拉操作系统(openEuler, 简称“欧拉”)是面向数字基础设施的操作系统,支持服务器、云计算、边缘openEuler是面向数字基础设施的操作系…

【Java】学习一门开发语言,从TA的Hello World开始

欢迎来到《小5讲堂》 大家好&#xff0c;我是全栈小5。 这是《Java》序列文章&#xff0c;每篇文章将以博主理解的角度展开讲解&#xff0c; 特别是针对知识点的概念进行叙说&#xff0c;大部分文章将会对这些概念进行实际例子验证&#xff0c;以此达到加深对知识点的理解和掌握…

晶振术语名词中英文对照及解析|晶发电子

在电子设备和通信系统中&#xff0c;精确的频率源是至关重要的。晶振作为频率源的核心元件&#xff0c;其性能直接影响着整个系统的稳定性、可靠性和准确性。随着技术的不断发展&#xff0c;对晶振的性能要求也越来越高。晶发电子将探讨晶振的常用术语及其含义&#xff0c;帮助…

java面试——juc篇

目录 一、线程基础 1、进程与线程的区别&#xff1f;&#xff08;⭐⭐⭐&#xff09; 2、并行和并发的区别&#xff08;⭐&#xff09; 3、创建线程的方式有哪些&#xff1f;&#xff08;⭐⭐⭐⭐&#xff09; runnable和Callable的区别&#xff1a; 线程中的run()和 star…

变电所运维可以实现一些什么功能

安科瑞武陈燕acrelcy 安科瑞AcrelCloud-1000变电所运维云平台 1.概述 基于互联网&#xff0b;、大数据、移动通讯等技术开发的云端管理平台&#xff0c;满足用户或运维公司监测众多变电所回路运行状态和参数、室内环境温湿度、电缆及母线运行温度、现场设备或环境场景等需求…

GLOBALCHIP GC3909替代A3909/allegro电机驱动芯片产品参数分析,应用于摇头机,舞台灯,打印机,白色家电等

GLOBALCHIP GC3909 12V H 桥驱动器芯片替代A3909/allegro产品概述: GC3909是一款双通道12V直流电机驱动芯片&#xff0c;为摄像机、消费类产品、玩具和其他低压或者电池供电的运动控制类应用提供了集成的电机驱动解决方案。芯片一般用来驱动两个直流电机或者驱动一个步进电机。…

【JavaEE进阶】 Spring Boot⽇志

文章目录 &#x1f38b;关于日志&#x1f6a9;为什么要学习⽇志&#x1f6a9;⽇志的⽤途&#x1f6a9;日志的简单使用 &#x1f384;打印⽇志&#x1f6a9;程序中得到⽇志对象&#x1f6a9;使⽤⽇志对象打印⽇志 &#x1f38d;⽇志格式的说明&#x1f6a9;⽇志级别的作用&#…

使用pysimplegui+opencv编写一个摄像头的播放器

需求 使用pysimplegui和opencv实现一个播放器&#xff0c;播放 摄像头的画面。 代码实现 import cv2 import time from typing import Iterable, NamedTuple, Optionalimport PySimpleGUI as sgclass CameraSpec(NamedTuple):name: strindex: intwidth: intheight: intfps: i…

DevEco Studio4.0/3.1预览器报错综合整理

题外话&#xff1a;额&#xff0c;这篇文章的由来&#xff0c;是在这篇文章DevEco Studio3.1报错...发布后&#xff0c;仍有人没解决预览不了的问题&#xff0c;然后就有小伙伴让我看看到底哪个地方出错了&#xff0c;为什么按照文章上的去做了&#xff0c;还是无法使用&#x…

如何使用支付宝沙箱环境本地配置模拟支付并结合内网穿透远程调试

文章目录 前言1. 下载当面付demo2. 修改配置文件3. 打包成web服务4. 局域网测试5. 内网穿透6. 测试公网访问7. 配置二级子域名8. 测试使用固定二级子域名访问 正文开始前给大家推荐个网站&#xff0c;前些天发现了一个巨牛的 人工智能学习网站&#xff0c; 通俗易懂&#xff…

抖音跳微信,有哪些方法是被允许导流到微信

要在抖音允许的范围内引流到微信&#xff0c;必须提前“告诉”抖音并获得它的许可。这需要我们采取一些步骤来与抖音进行沟通和合作。 首先&#xff0c;巨量星图是一个重要的平台&#xff0c;它在抖音生态中起到了桥梁的作用。对于许多抖音用户来说&#xff0c;巨量星图可能是一…

Linux部署nginx+appache动静分离

部署nginxappache动静分离 虚拟机配置到vm1网卡 地址192.168.1.100 重启网卡 关闭安全linux 关闭防火墙、 挂载磁盘 配置yum源 上传软件包 nginx和appache 配置appache服务 tar xf apr-1.6.2.tar.gz tar xf apr-util-1.6.0.tar.gz tar -xjf httpd-2.4.29.tar.bz2 mv a…

swf格式怎么快速转换成mp4?3个简单快捷方法分享

swf格式怎么快速转换成mp4&#xff1f;在日常生活中&#xff0c;将SWF格式快速转换成MP4格式是一项非常实用的技巧。首先&#xff0c;MP4格式是一种广泛使用的视频格式&#xff0c;可以在各种设备上轻松播放&#xff0c;如手机、平板电脑、电视等。其次&#xff0c;还可以提高视…

JavaEE中的监听器的作用和工作原理

在JavaEE&#xff08;Java Platform, Enterprise Edition&#xff09;中&#xff0c;监听器&#xff08;Listener&#xff09;是一种重要的组件&#xff0c;用于监听和响应Web应用程序中的事件。监听器的作用是在特定的事件发生时执行一些自定义的逻辑。常见的监听器包括Servle…

HCIA NAT练习

目录 实验拓扑 实验要求 实验步骤 1、IP分配 2、使用ACL使PC访问外网 3、缺省路由 4、边界路由器公网ip端口配置 测试 实验拓扑 实验要求 1、R2为ISP路由器&#xff0c;其上只能配置ip地址&#xff0c;不得再进行其他的任何配置 2、PC1-PC2可以ping通客户平板和DNS服…

一套高效使用的 Vue3 + Springboot 前端低代码框架

一、关于低代码 JNPF低代码平台在提供无代码&#xff08;可视化建模&#xff09;和低代码&#xff08;高度可扩展的集成工具以支持跨功能团队协同工作&#xff09;开发工具上是独一无二的。支持简单、快速地构建及不断改进Web端应用程序&#xff0c;可为整个应用程序的生命周期…

python_ACM模式《剑指offer刷题》链表1

题目&#xff1a; 面试tips&#xff1a; 询问面试官是否可以改变链表结构 思路&#xff1a; 1. 翻转链表&#xff0c;再遍历链表打印。 2. 想要实现先遍历后输出&#xff0c;即先进后出&#xff0c;因此可借助栈结构。 3. 可用隐式的栈结构&#xff0c;递归来实现。 代码…