Kafka 深入服务端 — 时间轮

Kafka中存在大量的延迟操作,比如延时生产、延时拉取和延时删除等。Kafka基于时间轮概念自定义实现了一个用于延时功能的定时器,来完成这些延迟操作。

1 时间轮

Kafka没有使用基于JDK自带的Timer或DelayQueue来实现延迟功能,因为它们的插入和删除操作的时间复杂度为logn,这不能满足Kafka高性能要求。

1.1 Timer 和 DelayQueue

它们都使用了一个优先级队列(通常基于堆实现)来管理任务。

1.1.1 Timer

用于计划在特定时间后执行的任务,这些任务可以只执行一次或定期重复执行。其有以下特点:

  1. 运行在单线程中,无法满足多个任务同时执行的需求。如果前置任务耗时长,可能会阻塞后置任务。
  2. 如果任务执行过程中抛出异常,Timer会被异常中断停止。
public class TimerTest {

    public static void main(String[] args) {
        Timer timer = new Timer();

        Date startTime = new Date();

        TimerTask task1 = new TimerTask() {
            @Override
            public void run() {
                long dis = (new Date().getTime() - startTime.getTime()) / 1_000;
                System.out.println("task1执行,距离开始时间:" + dis + "s");
            }
        };

        TimerTask task2 = new TimerTask() {
            @Override
            public void run() {
                long dis = (new Date().getTime() - startTime.getTime()) / 1_000;
                System.out.println("task2执行,距离开始时间:" + dis + "s,休眠5s。");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("task2休眠结束");
            }
        };

        TimerTask task3 = new TimerTask() {
            @Override
            public void run() {
                long dis = (new Date().getTime() - startTime.getTime()) / 1_000;
                System.out.println("task3执行,距离开始时间:" + dis + "s");
            }
        };

        timer.schedule(task1,1000); // 1s后执行
        timer.schedule(task2,2000); // 2s后执行
        timer.schedule(task3,3000); // 3s后执行
    }
//    执行结果:
//    task1执行,距离开始时间:1s
//    task2执行,距离开始时间:2s,休眠5s。
//    task2休眠结束
//    task3执行,距离开始时间:7s
}

1.1.2 DelayQueue

是一个无界阻塞队列,用于存储实现了Delayed接口的元素,这些元素只有在它们的延迟期满时才会被取出。其有以下特点:

  1. 线程安全,可以在多线程环境中使用。
  2. 无界队列,可以存储任意数量的元素,直到系统内存耗尽。
  3. 延迟精度依赖与系统时钟。
public class DelayQueueTest {

    private static class DelayQueueTask implements Delayed {

        private final String taskName;
        private final long delayTime;

        private DelayQueueTask(String taskName, long delayTime) {
            this.taskName = taskName;
            this.delayTime = delayTime + System.currentTimeMillis();
        }

        @Override
        public long getDelay(TimeUnit unit) { // 返回剩余延迟
            long diff = delayTime - System.currentTimeMillis();
            return unit.convert(diff,TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            if (this.delayTime < ((DelayQueueTask) o).delayTime) {
                return -1;
            }
            if (this.delayTime > ((DelayQueueTask) o).delayTime) {
                return 1;
            }
            return 0;
        }

        @Override
        public String toString() {
            return "DelayQueueTask{" +
                    "taskName='" + taskName + '\'' +
                    ", delayTime=" + delayTime +
                    '}';
        }
    }

    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DelayQueueTask> delayQueue = new DelayQueue<>();

        delayQueue.offer(new DelayQueueTask("task1",5000));
        delayQueue.offer(new DelayQueueTask("task2",2000));
        delayQueue.offer(new DelayQueueTask("task3",4000));

        System.out.println("开始执行delayQueue任务");
        while (!delayQueue.isEmpty()) {
            DelayQueueTask task = delayQueue.take();
            System.out.println("任务:" + task);
        }
        System.out.println("delayQueue任务 任务执行完毕");
    }
}

1.2 时间轮结构

Kafka 在任务的插入与删除采用了时间轮结构,其时间复杂度为O(1),而在时间推进上,还是依赖JDK提供的DelayQueue。

图 时间轮(TimingWheel)结构

Kafka的时间轮是一个存储定时任务的环形队列,每个元素(时间格)相当于一个桶(bucket),来存储一个定时任务列表TimerTaskList。TimerTaskList是一个环形的双向链表,链表中的每一项都是定时任务项TimerTaskEntry,其中封装了真正的定时任务TimerTask。

Kafka将TimerTaskList插入到DelayQueue(队列)中,使其成为其中的一个元素。它的过期时间为TimerTaskList的TimerTaskEntry中最快过期的时间。

1.2.1 时间格与时间跨度

时间轮由多个时间格组成(上面示意图中,每一层有10个时间格),每个时间格代表时间轮的基本时间跨度(tick),时间格数(wheelSize)是固定的,那么时间轮的总体跨度interval = tick * wheelSize。

时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间。currentTime是tick的整数倍,currentTime将整个时间轮划分为到期部分和未到期部分。当前指向的时间格也属于到期部分,此时需要处理此时间格所对应的TimerTaskList中的所有任务。

上面示意图中,tick = 1s,此时currentTime指向第2个时间格,需要处理这个时间格存储的所有任务。假设,此时插入了一个3s后的任务,则把该任务插入第5个时间格中的bucket。

1.2.2 时间轮层级

当currentTime 指向第2个时间格时,需要插入一个33s后的任务,此时时间超过了第一层的跨度(1s * 10 = 10s)。Kafka引入层级时间轮的概念,当到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层的时间轮中。

33s 的任务会被插入到第二层的第(33 / 10 = 3) 3 个时间格中。

第一层的开始时间(第0格)startMs 是当前系统时间。其余高层时间轮的起始时间都设置为创建此层时前面第一轮的currentTime。

每一层时间轮都会有指向更高一层的引用。

1.2.3 任务处理及时间轮降级

图 时钟

时间轮类似于时钟,当每一层走完一圈时,上一层就会走一格。例如当第1层的currentTime 指向第2格时,此时需要插入两个任务,分别是33s及39s后。它们都会被插入到第2层的第3格中的bucket(TimerTaskList)。

假设经过33s(第1层指向第5格,第2层指向第3格)后,此bucket还是只有这两个任务。Kafka会把它们所在的TimerTaskList从第2层的第3格中取出,将33s的任务执行并从TimerTaskList中删除。此时,39s的任务还剩6s,Kafka会把这个任务“降级”,插入到第1层第1((5+6)% 10)格中。

1.2.4 时间推进与DelayQueue

如果按照时间格一格格推进时间,这样消耗会比较大,而且可能好多时间格没有存储任务。Kafka借助DelayQueue来推进时间。

将时间格bucket的TimerTaskList封装成Delayed,其剩余时间取TimerTaskList中TimerTaskEntry最快达到的时间。然后将这些Delayed插入到DelayQueue中。DelayQueue会将这些Delayed排序,最快到达的排在队列头部。当到达时刻时,将表头的TimerTaskEntry取出,对它的TaskEntry执行任务执行或降级等操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/958947.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

九、CSS工程化方案

一、PostCSS介绍 二、PostCSS插件的使用 项目安装 - npm install postcss-cli 全局安装 - npm install postcss-cli -g postcss-cli地址&#xff1a;GitHub - postcss/postcss-cli: CLI for postcss postcss地址&#xff1a;GitHub - postcss/postcss: Transforming styles…

FFPlay命令全集合

FFPlay是以FFmpeg框架为基础&#xff0c;外加渲染音视频的库libSDL构建的媒体文件播放器。 ffplay工具下载并播放视频&#xff0c;可以辅助卡看流信息。 官网下载地址&#xff1a;http://ffmpeg.org/download.html#build-windows 下载build好的exe程序&#xff1a; 此处下载…

wangEditor富文本编辑器,Laravel上传图片配置和使用

文章目录 前言步骤1. 构造好前端模版2. 搭建后端存储3. 调试 前言 由于最近写项目需要使用富文本编辑器&#xff0c;使用的是VUE3.0版本所以很多不兼容&#xff0c;实际测试以后推荐使用wangEditor 步骤 构造好前端模版搭建后端存储调试 1. 构造好前端模版 安装模版 模版安…

利用 SAM2 模型探测卫星图像中的农田边界

将 Segment Anything Model Version 2 应用于卫星图像以检测和导出农业地区田地边界的分步教程 &#x1f31f; 简介 手动绘制田地边界是最耗时的任务之一&#xff0c;其准确性取决于绘制者的表现。然而&#xff0c;精确的边界检测在很多领域都有应用。例如&#xff0c;假设您…

数据库管理-第287期 Oracle DB 23.7新特性一览(20250124)

数据库管理287期 20245-01-24 数据库管理-第287期 Oracle DB 23.7新特性一览&#xff08;20250124&#xff09;1 AI向量搜索&#xff1a;算术和聚合运算2 更改Compatible至23.6.0&#xff0c;以使用23.6或更高版本中的新AI向量搜索功能3 Cloud Developer包4 DBMS_DEVELOPER.GET…

第13章 深入volatile关键字(Java高并发编程详解:多线程与系统设计)

1.并发编程的三个重要特性 并发编程有三个至关重要的特性&#xff0c;分别是原子性、有序性和可见性 1.1 原子性 所谓原子性是指在一次的操作或者多次操作中&#xff0c;要么所有的操作全部都得到了执行并 且不会受到任何因素的干扰而中断&#xff0c;要么所有的操作都不执行…

mysql-06.JDBC

目录 什么是JDBC: 为啥存在JDBC: JDBC工作原理&#xff1a; JDBC的优势&#xff1a; 下载mysql驱动包&#xff1a; 用java程序操作数据库 1.创建dataSource: 2.与服务端建立连接 3.构造sql语句 4.执行sql 5.关闭连接&#xff0c;释放资源 参考代码&#xff1a; 插…

unity 粒子系统设置触发

1、勾选Triggers选项 2、将作为触发器的物体拉入队列当中&#xff0c;物体上必须挂载collider 3、将想要触发的方式&#xff08;Inide、Outside、Enter和Exit&#xff09;选择为”Callback“&#xff0c;其他默认为”Ignore“ 4、Collider Query Mode 设置为All&#xff1a…

LMI Gocator GO_SDK VS2019引用配置

LMI SDK在VS2019中的引用是真的坑爹,总结一下经验,希望后来的人能少走弯路.大致内容如下: &#xff08;1&#xff09; 环境变量 &#xff08;2&#xff09;C/C 附加包含目录 E:\GWQ\Gocator\GO_SDK\Gocator\GoSdk E:\GWQ\Gocator\GO_SDK\Platform\kApi &#xff08;3&#…

61,【1】BUUCTF WEB BUU XSS COURSE 11

进入靶场 左边是吐槽&#xff0c;右边是登录&#xff0c;先登录试试 admin 123456 admiin# 123456 admin"# 123456 不玩了&#xff0c;先去回顾下xss 回顾完就很尴尬了&#xff0c;我居然用SQL的知识去做xss的题 重来 吐槽这里有一个输入框&#xff0c;容易出现存储型…

EchoMimicV2的部署使用

最近有一个录课的需要&#xff0c;我不想浪费人力&#xff0c;只想用技术解决。需求很简单&#xff0c;就是用别人现成的录课视频中的形象和声线&#xff0c;再结合我提供的讲稿去生成一个新的录课视频。我觉得应该有现成的技术了&#xff0c;我想要免费大批量生产。最近看到这…

五、华为 RSTP

RSTP&#xff08;Rapid Spanning Tree Protocol&#xff0c;快速生成树协议&#xff09;是 STP 的优化版本&#xff0c;能实现网络拓扑的快速收敛。 一、RSTP 原理 快速收敛机制&#xff1a;RSTP 通过引入边缘端口、P/A&#xff08;Proposal/Agreement&#xff09;机制等&…

嵌入式知识点总结 ARM体系与架构 专题提升(四)-编程

针对于嵌入式软件杂乱的知识点总结起来&#xff0c;提供给读者学习复习对下述内容的强化。 目录 1.嵌人式编程中&#xff0c;什么是大端?什么是小端 ? 2.如何判断计算机处理器是大端&#xff0c;还是小端 ? 3.如何进行大小端的转换 ? 4.如何对绝对地址0x100000赋值? 1…

Ansys Thermal Desktop 概述

介绍 Thermal Desktop 是一种用于热分析和流体分析的通用工具。它可用于组件或系统级分析。 来源&#xff1a;CRTech 历史 Thermal Desktop 由 C&R Technologies (CR Tech) 开发。它采用了 SINDA/FLUINT 求解器。SINDA/FLUINT 最初由 CR Tech 的创始人为 NASA 的约翰逊航…

Python vLLM 实战应用指南

文章目录 1. vLLM 简介2. 安装 vLLM3. 快速开始3.1 加载模型并生成文本3.2 参数说明 4. 实战应用场景4.1 构建聊天机器人示例对话&#xff1a; 4.2 文本补全输出示例&#xff1a; 4.3 自定义模型服务启动服务调用服务 5. 性能优化5.1 GPU 加速5.2 动态批处理 6. 总结 vLLM 是一…

计算机网络 (60)蜂窝移动通信网

一、定义与原理 蜂窝移动通信网是指将一个服务区分为若干蜂窝状相邻小区并采用频率空间复用技术的移动通信网。其原理在于&#xff0c;将移动通信服务区划分成许多以正六边形为基本几何图形的覆盖区域&#xff0c;称为蜂窝小区。每个小区设置一个基站&#xff0c;负责本小区内移…

从63 秒到 0.482 秒:深入剖析 MySQL 分页查询优化

在日常开发中&#xff0c;数据库查询性能问题就像潜伏的“地雷”&#xff0c;总在高并发或数据量庞大的场景下引爆。尤其是当你运行一条简单的分页查询时&#xff0c;结果却让用户苦苦等待&#xff0c;甚至拖垮了系统。这种情况你是否遇到过&#xff1f; 你可能会想&#xff1…

Word 中实现方框内点击自动打 √ ☑

注&#xff1a; 本文为 “Word 中方框内点击打 √ ☑ / 打 ☒” 相关文章合辑。 对第一篇增加了打叉部分&#xff0c;第二篇为第一篇中方法 5 “控件” 实现的详解。 在 Word 方框内打 √ 的 6 种技巧 2020-03-09 12:38 使用 Word 制作一些调查表、检查表等&#xff0c;通常…

Dockerfile另一种使用普通用户启动的方式

基础镜像的Dockerfile # 使用 Debian 11.9 的最小化版本作为基础镜像 FROM debian:11.11# 维护者信息 LABEL maintainer"caibingsen" # 复制自定义的 sources.list 文件&#xff08;如果有的话&#xff09; COPY sources.list /etc/apt/sources.list # 创建…

7-Zip高危漏洞CVE-2025-0411:解析与修复

7-Zip高危漏洞CVE-2025-0411&#xff1a;解析与修复 免责声明 本系列工具仅供安全专业人员进行已授权环境使用&#xff0c;此工具所提供的功能只为网络安全人员对自己所负责的网站、服务器等&#xff08;包括但不限于&#xff09;进行检测或维护参考&#xff0c;未经授权请勿利…