Kafka源码分析(五) - Server端 - 基于时间轮的延时组件

系列文章目录

Kafka源码分析-目录

一. 背景

Kafka内部涉及大量的"延时"操作,比如收到PRODUCE请求后可为副本等待一个timeout的时间后再响应客户端。

那我们讨论一个问题:Kafka为什么自己实现了一个延时任务组件,而不直接使用java的java.util.concurrent.DelayQueue?

可以按如下两点来分析这个事:

  1. DelayQueue提供了哪些能力?

  2. Kafka所面对的场景是怎样的,为什么DelayQueue不适用?

1.1 DelayQueue的能力

DelayQueue相关的接口/类如下图:
在这里插入图片描述
对应地,DelayQueue所提供的能力如下:

  1. 为任务指定延时时间
    DelayQueue所维护的任务都要实现Delayed接口;其中的getDelay方法返回了距离该任务所设定执行时间有多远;

  2. 任务存取的时间复杂度为O(log n)
    DelayQueue内部使用"优先级队列"来保存任务,该数据结构存取的时间复杂度为对数复杂度;

1.2 Kafka的业务场景

Kafka的业务背景有如下特点:

  1. 延时任务不一定非得等到超时后才执行,有一些事件会触发其提前执行;
    比如PRODUCE请求处理过程中,若副本的响应比较快,那收到了预期数量的副本响应后就可以开始给客户端发响应,不一定非得等满一个timeout的时间;

  2. 延时任务的"入队"操作QPS很高;
    Kafka就是为高QPS而生,内部会产生大量的延时任务;

对应地,Kafka对延时任务组件有如下两点要求:

  1. 要求有提前执行任务的能力;

  2. 要求尽可能降低延时任务入队操作的时间复杂度;
    (借助一个名为"时间轮"的数据结构,Kafka将时间复杂度实际降到了O(1))

这两点要求都无法通过直接应用DelayQueue的方式得到满足。

二. 组件接口

我们来看看Kafka的延时任务组件对外提供的接口,进而搞清楚其提供的能力和使用方式。

如下图:
在这里插入图片描述
左边两个类定义了"延时操作":

  1. TimerTask
    描述了一个最基本的延时Task;定义了超时时间(delayMs属性)、提供了一个取消操作(cancel方法);

  2. DelayedOperation
    描述了一个可被外部事件提前触发的TimerTask;其在TimerTask基础上提供了检查是否满足提前触发条件的方法(tryComplete)、并定义被外部事件提前触发后的行为(onComplete)和无事件触发直到超时后的行为(onExpiration);这几个方法都需要DelayedOperation的子类进行实现;

右边的DelayedOperationPurgatory类定义了一个维护DelayOperaton的容器,其核心操作如下:

  1. tryCompleteElseWatch
    添加延时任务;该方法有两个入参:operation和watchkeys;前者是要添加的延时任务,后者是该任务要监听的事件类型(可以同时监听多个事件类型);可以在kafka.server.DelayedOperationKey所在scala文件中查看目前可选的watchkeys类型;一个watchkey对应一个延时任务队列;

  2. checkAndComplete
    检查各任务是否以满足提前触发的条件;该方法在某个事件发生之后,逻辑是遍历watchkey对应的任务队列,逐个判断是否满足了触发条件;

  3. cancelForKey
    取消watchkey下的所有延时任务;

三. 实现

下文主要关注"延时"的实现方式。

3.1 业务模型

时间轮延时组件的思路如下:

  1. 仍然使用java的DelayQueue存储延时任务;
    但为了降低延时任务入队的事件复杂度,Kafka并不直接将单个延时任务直接存储于DelayQueue,而是先将延时任务列表(TimerTaskList)加入DelayQueue,然后再向对应的列表中添加延时任务 (TimerTaskList也有超时时间的概念,等于其中最快超时的任务的超时时间)。这样任务的入队就由向优先级队列添加元素变为了向TimerTaskList中添加元素;前者时间复杂度为O(logn),后者时间复杂度为O(1)。

  2. 如何判断一个新的延时任务应该加到哪个TimerTaskList中?时间轮(TimingWheel)!
    delayQueue中存储了多个TimerTaskList,当拿到一个新的延时任务时,Kafka会使用TimingWheel来计算该延时任务应插入到哪个TimerTaskList。其实,TimingWheel的本质就是TimerTask和TimerTaskList之间的映射函数。

接下来通过一个具体的例子来说明这种映射逻辑:
在这里插入图片描述
先关注上图中①号时间轮。圆环中每一个单元格表示一个TimerTaskList。单元格有其关联的时间跨度;下方的"1s x 12"表示时间轮上共12个单元格,每个单元格的时间跨度为1秒。有一个指针指向了"当前时间"所对应的单元格。顺时针方向为时间流动方向。

当收到一个延迟时间在0-1s的TimerTask时,会将其追加到①号时间轮的橙色单元格中。当收到一个延时时间在3-4s的TimerTask时,会将其追加到①号时间轮的黄色单元格中。以此类推。

现在有个问题:①号时间轮能表示的最大延迟时间是12秒,那如果收到了延迟13秒的任务该怎么办?这时该用到②号时间轮了,我们称②号为①号的"溢出时间轮"。②号时间轮的特点如下:

  1. 一个单元格所标识的时间跨度为①号的总长度,即12秒;
  2. 单元格数量与①号相同;

如此,延迟时间在12-24s的TimerTask会被追加到②号的紫色单元格,延迟时间在36-48s的TimerTask会被追加到②号的绿色单元格中。③号时间轮同理。

刚刚是按①->②->③的顺序来分析时间轮的逻辑,反过来也可以得到有用的结论:想象手里有一个"放大镜",其实③号时间轮的蓝色单元格"放大"后是②号时间轮;②号时间轮的蓝色单元格"放大"后是①号时间轮;蓝色单元格并不实际存储TimerTask。

3.2 数据结构

在这里插入图片描述
DelayedOperationPurgatory有个Timer类型的timeoutTimer属性,用于维护延时任务。实际使用的是Timer的实现类:SystemTimer。该类用于维护延时任务的核心属性有两个:delayQueue和timingWheel。TimingWheel表示单个时间轮,接下来我们来看看其类图:
TimingWheel类图

各属性含义如下:

  1. buckets:TimerTaskList数组;其中一个元素对应一个时间轮单元格;

  2. tickMs:本时间轮一个单元格所表示的时间跨度(毫秒);

  3. intervel:本时间轮所能表示的时间总长度,其值为 b u c k e t s . l e n g t h ∗ t i c k M s buckets.length*tickMs buckets.lengthtickMs ;

  4. currentTime:当前时间;其是tickMs的整数倍,这样可以通过currentTime的值很方便地计算出"当前"对应的时间轮单元格;其初始值计算公式如下:
    currentTime = startMs - (startMs % tickMs );(其中startMs为时间轮创建时间)

  5. overflowWheel:本时间轮的"溢出时间轮";

3.3 算法

3.3.1 添加任务

添加任务的入口是 DelayedOperationPurgatory.tryCompleteElseWatch,其核心逻辑分如下两步:

def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
  // 1. 尝试立即指定指定的任务
  ...

  // 2. 加入到Timer
  timeoutTimer.add(operation)
  ...
}

SystemTimer.add直接调用了addTimerTaskEntry方法,后者逻辑如下:

private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
  // 尝试将任务加入时间轮
  if (!timingWheel.add(timerTaskEntry)) {
    // 若任务已过期且未被取消,直接提交执行
    if (!timerTaskEntry.cancelled)
      taskExecutor.submit(timerTaskEntry.timerTask)
  }
}

TimingWheel.add的逻辑也很清晰,分如下4种场景处理:

def add(timerTaskEntry: TimerTaskEntry): Boolean = {
  val expiration = timerTaskEntry.expirationMs

  if (timerTaskEntry.cancelled) {
    // 1.如果任务已取消
    ... ...
    false
  } else if (expiration < currentTime + tickMs) {
    // 2.若任务已过期, 即任务过期时间在"当前单元格"内
    ... ...
    false
  } else if (expiration < currentTime + interval) {
    // 3.虽然过期时间不在"当前单元格"内,但仍在当前时间轮之内
    // 将任务正常加入到当前时间轮对应的单元格内 
    ... ...
    true
  } else {
    // 4.超出了当前时间轮所能表示的范围: 将任务加入到"溢出时间轮"
    if (overflowWheel == null) 
      // 如果当前不存在上层时间轮, 那么生成一个
      addOverflowWheel()
    // 将任务加入到"溢出时间轮"
    overflowWheel.add(timerTaskEntry)
  }
}

3.3.2 尝试提前触发任务

入口是DelayedOperationPurgatory.checkAndComplete:

def checkAndComplete(key: Any): Int = {
  // 1. 根据事件key查找对应的任务队列(watchers就是一个队列的封装);
  val watchers = inReadLock(removeWatchersLock) { watchersForKey.get(key) }

  // 2. 遍历队列,尝试执行各任务;
  if(watchers == null)
    0
  else
    watchers.tryCompleteWatched()
}

接下来看Watchers.tryCompleteWatched方法的内容:

def tryCompleteWatched(): Int = {
  var completed = 0

  // 遍历队列
  val iter = operations.iterator()
  while (iter.hasNext) {
    // 逐个处理各任务
    val curr = iter.next()
    if (curr.isCompleted) {
      // 若任务已经处于完成态, 直接移除
      iter.remove()
    } else if (curr.maybeTryComplete()) {
      // 若任务在调用maybeTryComplete后被成功触发, 则移除
      iter.remove()
      completed += 1
    }
  }

  // 若队列已空, 移除当前队列
  if (operations.isEmpty)
    removeKeyIfEmpty(key, this)

  // 返回本次成功提前触发的任务数量
  completed
}

DelayedOperation.maybeTryComplete方法最终调用了DelayedOperation.tryComplete;

DelayedOperation的子类需要在后者中实现自己的"触发条件"检查逻辑;若满足了提前触发的条件,则调用forceComplete方法执行事件触发场景下的业务逻辑。

3.3.3 任务到期自动执行

DelayedOperationPurgatory中维护了一个expirationReaper线程,其职责就是循环调用kafka.utils.timer.SystemTimer#advanceClock来从事件轮中获取已超时的任务,并更新时间轮的"当前时间"指针。

四. 总结

才疏学浅,未能窥其十之一二,随时欢迎各位交流补充。若文章质量还算及格,可以点赞收藏加以鼓励,后续我继续更新。

另外也可以在目录中找到同系列的其他文章:
Kafka源码分析系列-目录(收藏关注不迷路)

感谢阅读。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/595080.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

《从Paxos到Zookeeper》——第五、六章:经典应用场景

目录 第五章 使用Zookeeper 5.1 服务端部署与运行 5.2 客户端相关 5.2.1 客户端运行 5.2.2 客户端命令 5.3 Java客户端API 5.4 开源客户端 第六章 经典应用场景 6.1 典型应用场景及实现 6.1.1 数据发布/订阅&#xff08;全局配置中心&#xff09; 6.1.2 负载均衡&#xff08;Lo…

ChatGPT Web Midjourney一键集成最新版

准备工具 服务器一台 推荐使用浪浪云服务器 稳定 安全 有保障 chatgpt api 推荐好用白嫖的api 项目演示 项目部署 浏览器访问casaos 添加软件原添加 https://gitee.com/langlangy_1/CasaOS-AppStore-LangLangy/raw/master/chatmjd.zip 安装此软件 等待安装 安装后再桌面设置…

什么是模版方法模式,有哪些应用?

模板方法模式是一种行为设计模式&#xff0c;他的主要作用就是复用代码。在很多时候&#xff0c;我们的代码中可能会有-些公共的部分并且还有一些定制的部分&#xff0c;那么公共这部分就可以定义在一个父类中&#xff0c;然后将定制的部分实现在子类中。这样子类可以根据需要扩…

TRIZ理论助力充电桩产业跨越技术瓶颈,实现产业升级!

随着新能源汽车市场的迅猛发展和电动汽车保有量的不断增加&#xff0c;充电桩作为电动汽车的“能量补给站”&#xff0c;其重要性日益凸显。然而&#xff0c;充电桩产业在发展过程中也面临着诸多技术瓶颈&#xff0c;如何突破这些瓶颈&#xff0c;推动充电桩产业升级成为行业亟…

《Video Mamba Suite》论文笔记(2)Mamba对于多模态交互的作用

原文翻译 4.2 Mamba for Cross-Modal Interaction Tasks and datasets.除了单模态任务外&#xff0c;我们还评估了 Mamba 在跨模态交互方面的性能。我们首先使用视频时间定位 (Video Temporal Grounding) 任务进行评估。所涉及的数据集包含 QvHighlight [44] 和 Charade-STA …

Vue阶段练习:初始化渲染、获取焦点、记账清单

阶段练习主要承接Vue 生命周期-CSDN博客 &#xff0c;学习完该部分内容后&#xff0c;进行自我检测&#xff0c;每个练习主要分为效果显示、需求分析、静态代码、完整代码、总结 四个部分&#xff0c;效果显示和准备代码已给出&#xff0c;我们需要完成“完整代码”部分。 练习…

交直流充电桩测试系统解决方案,你了解多少?

交直流充电桩测试系统是电动汽车充电设施的重要组成部分&#xff0c;它对充电桩的性能、安全性和可靠性进行全方位的检测和评估。随着电动汽车的普及&#xff0c;充电桩测试系统的需求也在不断增加。本文将对交直流充电桩测试系统的解决方案进行简要介绍。 1. 系统组成 交直流…

微信小程序如何使用svg矢量图标

微信小程序如何使用自定义SVG矢量图标 在微信小程序中&#xff0c;经常会用到小图标来装饰界面&#xff0c;我们常用的方法就是引用第三方的图标&#xff0c;但会存在收费或者找不到合适的图标&#xff0c;这时候我建议可以自行编写svg图标代码&#xff0c;就可以随心所欲的使…

纯干货,源代码防泄密之环境加密与文档加密的区别

环境加密和文档加密是两种不同的数据保护方法&#xff0c;下面用SDC沙盒及文档加密系统作对比&#xff0c;它们在设计理念、管控对象、安全性、性能以及适用场景等方面有所区别&#xff1a; 来百度APP畅享高清图片 1、设计理念&#xff1a; 环境加密&#xff08;如SDC沙盒&am…

JavaScript继承的方法和优缺点

原型链继承 让一个构造函数的原型是另一个类型的实例&#xff0c;那么这个构造函数new出来的实例就具有该实例的属性。 优点&#xff1a; 写法方便简洁&#xff0c;容易理解。 缺点&#xff1a; 在父类型构造函数中定义的引用类型值的实例属性&#xff0c;会在子类型原型上…

华中科技大学雷达站部署

一&#xff1a;项目地址 GitHub - HUSTLYRM/HUST_Radar_2023: 华中科技大学狼牙战队 RoboMaster 2023赛季 雷达站 二&#xff1a;安装依赖 2.1创建虚拟环境 首先是程序是基于python3.8完成&#xff0c;所以创建虚拟环境的时候&#xff0c;选择3.8的虚拟环境 conda create -…

【算法刷题日志】吸氧羊的StarryCoding之旅 - 贡献法计算

题目链接&#xff1a;https://www.starrycoding.com/problem/3 题目描述 吸氧羊终于注册了一个StarryCoding账号&#xff01;&#xff08;她很开心&#xff09; 但是吸氧羊忘记了它的密码&#xff0c;她想起你是计算机大师&#xff0c;于是就来请教你。 她虽然不记得密码了…

nacos开启登录开关启动报错“Unable to start embedded Tomcat”

nacos 版本&#xff1a;2.3.2 2.2.2版本之前的Nacos默认控制台&#xff0c;无论服务端是否开启鉴权&#xff0c;都会存在一个登录页&#xff1b;在之后的版本关闭了默认登录页面&#xff0c;无需登录直接进入控制台操作。在这里我们可以在官网可以看到相关介绍 而我现在所用的…

中国各地级市城投债详细数据(2006年-2023年2月)

01、数据简介 城投债又称为准市政债&#xff0c;发行主体是地方ZF投资平台&#xff0c;公开发行企业债和中期票据&#xff0c;其业主一般是地方基础设施建设&#xff0c;或者公益性项目主体&#xff0c;参与债券发行环节的当地ZF发债。 数据整理中国各地级市的城投债详细数据…

opencv图片的旋转-------c++

图片的旋转 /// <summary> /// 图片的旋转 /// </summary> /// <param name"img"></param> /// <param name"angle">旋转角度:正数&#xff0c;则表示逆时针旋转;负数&#xff0c;则表示顺时针旋转</param> /// <…

【intro】图卷积神经网络(GCN)

本文为Graph Neural Networks(GNN)学习笔记-CSDN博客后续&#xff0c;内容为GCN论文阅读&#xff0c;相关博客阅读&#xff0c;kaggle上相关的数据集/文章/代码的阅读三部分&#xff0c;考虑到本人是GNN新手&#xff0c;会先从相关博客开始&#xff0c;进一步看kaggle&#xff…

考虑极端天气线路脆弱性的配电网分布式电源和储能优化配置模型

1 主要内容 程序主要参考《考虑极端天气线路脆弱性的配电网分布式电源配置优化模型-马宇帆》&#xff0c;针对极端天气严重威胁配电网安全稳定运行的问题。基于微气象、微地形对配电网的线路脆弱性进行分析&#xff0c;然后进行分布式电源接入位置与极端天气的关联性分析&…

优优嗨聚集团:法律明灯,个债处理中的法律咨询力量

在现代社会&#xff0c;个人债务问题日益突出&#xff0c;无论是因生活消费、投资失利还是其他原因&#xff0c;债务问题都可能成为个人财务的一大负担。面对复杂的债务困境&#xff0c;许多人感到迷茫和无助。此时&#xff0c;法律咨询如同一盏明灯&#xff0c;能够为个人债务…

Docker 安装部署 postgres

Docker 安装部署 postgres 1、拉取 postgres 镜像文件 [rootiZbp19a67kznq0h0rgosuxZ ~]# docker pull postgres:latest latest: Pulling from library/postgres b0a0cf830b12: Pull complete dda3d8fbd5ed: Pull complete 283a477db7bb: Pull complete 91d2729fa4d5: Pul…

自动化测试 selenium基础

前言 我们都知道测试开发工程师的任务是根据用户需求测试用例的同时,害的开发自动化工具来减轻测试压力且提高测试的效率以及质量,这一节我们就来简单谈谈开发简单的自动化工具基础 什么是自动化测试呢?就是将我们需要做的测试交给机器去做,也就是使用代码来模拟人对于机器的行…