单机环境下定时任务的基本原理和常见解决方案(二)之时间轮原理分析

单机环境下定时任务的基本原理和常见解决方案之时间轮原理分析

  • 时间轮
    • Netty时间轮使用
    • Netty 时间轮 HashedWheelTimer 源码分析
      • 向时间轮里添加task
      • WorkerThread线程执行时间轮任务
  • 多层时间轮
  • 总结

时间轮

生活中的时钟想必大家都不陌生,而时间轮的设计思想就是来源于生活中的时钟,这个从其命名就可以看出。

时间轮是一种环形的数据结构,我们可以将其想象成时钟的样子,时间轮上有许多格子(bucket),每个格子代表一段时间,时间轮的精度取决于一个格子的代表的时间,比如时间轮的格子是一秒跳一次,那么其调度任务的精度就是1秒,小于一秒的任务无法被时间轮调度。

时间轮上的bucket数量是有限的,而任务的数量是可以无限大的(理论上),所以时间轮使用一个链表来存储放在某个格子上的定时任务。

如下图所示 :
假设一个格子是1秒,整个时间轮有10个格子,那走一圈就表示10s,假如当前指针指向1,此时需要调度一个12s后执的任务,应该等指针走完一圈+2格再执行,因此应放入序号为3的格子,同时将round(1)保存到任务中。

指针随着时间一格格转动,走到每个格子,则检查格子中是否有可以执行的任务。此时时间轮指将链表里round=0的任务取出来执行,其他任务的round都减1。
在这里插入图片描述

简单总结一下,时间轮通过数组+链表的形式来存储定时任务,每个任务存放的bucket的计算公式:
(预计时间-时间轮开始时间)/(每一格的时间*时间轮的bucket数) 对应的商就是round,余数就是bucket的下标(本质还是取模)

Netty 需要管理大量的连接,每一个连接都会有很多检测超时任务,比如发送超时、心跳检测间隔等。
它提供了工具类 HashedWheelTimer 来实现延迟任务。该工具类就是采用时间轮原理来实现的。

Netty时间轮使用

后续的源码分析都是基于4.1.80版本的源码

  <dependency>
     <groupId>io.netty</groupId>
     <artifactId>netty-common</artifactId>
     <version>4.1.80.Final</version>
  </dependency>
    public static void main(String[] args) {
        //创建一个HashedWheelTimer时间轮,有16个格的轮子,每一秒走一个格子
        Timer timer = new HashedWheelTimer(1, TimeUnit.SECONDS, 16);
        System.out.println(Calendar.getInstance().getTime() + "开始执行任务...");
        //添加任务到时间轮中
        timer.newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) {
                System.out.println(Calendar.getInstance().getTime() + ":执行任务1");
            }
        }, 5, TimeUnit.SECONDS);
        timer.newTimeout(timeout ->
                        System.out.println(Calendar.getInstance().getTime() + ":执行任务2"), 8,
                TimeUnit.SECONDS);
    }

在这里插入图片描述

Netty 时间轮 HashedWheelTimer 源码分析

构造方法的三个参数分别代表

  • tickDuration 每一tick的时间,走一格是多久
  • timeUnit tickDuration的时间单位
  • ticksPerWheel 时间轮一共有多个格子,即一圈表示多少个tick。
  public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) {
        this.worker = new Worker();
        //一个CountDownLatch
        this.startTimeInitialized = new CountDownLatch(1);
        //mpsc队列
        this.timeouts = PlatformDependent.newMpscQueue();
        //mpsc队列
        this.cancelledTimeouts = PlatformDependent.newMpscQueue();
        this.pendingTimeouts = new AtomicLong(0L);
        ObjectUtil.checkNotNull(threadFactory, "threadFactory");
        ObjectUtil.checkNotNull(unit, "unit");
        ObjectUtil.checkPositive(tickDuration, "tickDuration");
        ObjectUtil.checkPositive(ticksPerWheel, "ticksPerWheel");
        //创建时间轮,默认创建512个轮 就是创建一个长度为512的HashedWheelBucket数组
        this.wheel = createWheel(ticksPerWheel);
        this.mask = this.wheel.length - 1;
        //默认tickDuration=100ms 
        long duration = unit.toNanos(tickDuration);
        if (duration >= Long.MAX_VALUE / (long)this.wheel.length) {
            throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / (long)this.wheel.length));
        } else {
            if (duration < MILLISECOND_NANOS) {
                logger.warn("Configured tickDuration {} smaller then {}, using 1ms.", tickDuration, MILLISECOND_NANOS);
                this.tickDuration = MILLISECOND_NANOS;
            } else {
                this.tickDuration = duration;
            }
            //创建一个线程workerThread,此时未启动(延迟启动,当有任务添加后再启动)
            this.workerThread = threadFactory.newThread(this.worker);
            this.leak = !leakDetection && this.workerThread.isDaemon() ? null : leakDetector.track(this);
            this.maxPendingTimeouts = maxPendingTimeouts;
            if (INSTANCE_COUNTER.incrementAndGet() > 64 && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
                reportTooManyInstances();
            }

        }
    }

向时间轮里添加task

  public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        ObjectUtil.checkNotNull(task, "task");
        ObjectUtil.checkNotNull(unit, "unit");
        long pendingTimeoutsCount = this.pendingTimeouts.incrementAndGet();
        //如果maxPendingTimeouts>0,则表示对于存储的任务有上限,默认无限制
        if (this.maxPendingTimeouts > 0L && pendingTimeoutsCount > this.maxPendingTimeouts) {
            this.pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending timeouts (" + this.maxPendingTimeouts + ")");
        } else {
            //启动workerThread线程
            this.start();
            //判断当前任务还要多长时间执行(这里的startTime就是workerThread的启动时间,执行到这的时候startTime一定有值,否则this.start()会阻塞)
            long deadline = System.nanoTime() + unit.toNanos(delay) - this.startTime;
            if (delay > 0L && deadline < 0L) {
                deadline = Long.MAX_VALUE;
            }
            //封装成HashedWheelTimeout,并将其加入到MpscQueue(timeouts队列)
            //MPSC: Multi producer, Single consumer FIFO
            HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
            this.timeouts.add(timeout);
            return timeout;
        }
    }

workerThread线程延迟启动

    public void start() {
        switch (WORKER_STATE_UPDATER.get(this)) {
            case 0:
                //通过CAS保证线程安全,workThread线程只会启动一次
                if (WORKER_STATE_UPDATER.compareAndSet(this, 0, 1)) {
                    this.workerThread.start();
                }
            case 1:
                break;
            case 2:
                throw new IllegalStateException("cannot be started once stopped");
            default:
                throw new Error("Invalid WorkerState");
        }
        //当startTime==0L,表示workThread线程还没有启动,通过CountDownLatch阻塞在这,直到workThread线程启动
        while(this.startTime == 0L) {
            try {
                this.startTimeInitialized.await();
            } catch (InterruptedException var2) {
            }
        }

    }

WorkerThread线程执行时间轮任务

  public void run() {
            //设置startTime
            HashedWheelTimer.this.startTime = System.nanoTime();
            if (HashedWheelTimer.this.startTime == 0L) {
                HashedWheelTimer.this.startTime = 1L;
            }
            HashedWheelTimer.this.startTimeInitialized.countDown();

            int idx;
            HashedWheelBucket bucket;
            //自旋直到wheelTimer被关闭
            do {
                //计算时间轮走到下一个tick的时间点(如果没有到时间则通过sleep休眠等待),这里返回的deadline是当前时间距离时间轮启动经过的时间(deadline小于0说明数据异常,不执行操作)
                long deadline = this.waitForNextTick();
                if (deadline > 0L) {
                    //根据tick与轮的大小取模 得到当前tick所在的bucket的下标
                    idx = (int)(this.tick & (long)HashedWheelTimer.this.mask);
                    //处理已经取消的task(将取消队列里的任务从bucket丢弃,如果已经放入到bucket里的话)
                    this.processCancelledTasks();
                    bucket = HashedWheelTimer.this.wheel[idx];
                    //将timeouts队列中缓存的任务取出加入到时间轮中
                    this.transferTimeoutsToBuckets();
                    //处理当前bucket所有的到期任务
                    bucket.expireTimeouts(deadline);
                    ++this.tick;
                }
            } while(HashedWheelTimer.WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == 1);
            //wheelTimer被关闭后的处理,取出每一个bucket里还没被执行的任务,放到unprocessedTimeouts中
            HashedWheelBucket[] var5 = HashedWheelTimer.this.wheel;
            int var2 = var5.length;

            for(idx = 0; idx < var2; ++idx) {
                bucket = var5[idx];
                bucket.clearTimeouts(this.unprocessedTimeouts);
            }

            while(true) {
                HashedWheelTimeout timeout = (HashedWheelTimeout)HashedWheelTimer.this.timeouts.poll();
                //处理所有被取消的任务
                if (timeout == null) {
                    this.processCancelledTasks();
                    return;
                }
                //将加到timeouts队列里的任务添加到把未处理的unprocessedTimeouts队列中
                if (!timeout.isCancelled()) {
                    this.unprocessedTimeouts.add(timeout);
                }
            }
        }

主要流程:

1.如果HashedWheelTimer未关闭,则等待到达下一个tick的时间(未到则sleep)
2.到达下一tick时间后

  • 1)将已取消的任务丢弃
  • 2)然后将timeouts队列里的任务迁移到bucket对应的位置上
  • 3)获取当前tick对应的bucket,执行其中已经到达执行时间的任务

3.如果HashedWheelTimer已关闭,则将bucket里还没被执行的任务和timeouts队列里未取消的任务,统一放到unprocessedTimeouts队列中。
然后统一处理取消队列里的任务(processCancelledTasks) 也就是说已取消的任务在取消操作时只是放入到取消队列里,并没有从timeouts队列或者bucket里移除

  private long waitForNextTick() {
            //计算下一个tick的时间点,该时间是相对时间轮启动时间的相对时间
            long deadline = HashedWheelTimer.this.tickDuration * (this.tick + 1L);
            //自旋
            while(true) {
                //计算时间轮启动后经过的时间
                long currentTime = System.nanoTime() - HashedWheelTimer.this.startTime;
                //判断需要休眠的时间
                long sleepTimeMs = (deadline - currentTime + 999999L) / 1000000L;
                if (sleepTimeMs <= 0L) {
                    if (currentTime == Long.MIN_VALUE) {
                        return -9223372036854775807L;
                    }
                    //如果当前时间大于下一个tick的时间,则直接返回(说明到执行任务的时间了),否则sleep
                    return currentTime;
                }

                if (PlatformDependent.isWindows()) {
                    sleepTimeMs = sleepTimeMs / 10L * 10L;
                    if (sleepTimeMs == 0L) {
                        sleepTimeMs = 1L;
                    }
                }

                try {
                    //休眠对应的时间
                    Thread.sleep(sleepTimeMs);
                } catch (InterruptedException var8) {
                    if (HashedWheelTimer.WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == 2) {
                        return Long.MIN_VALUE;
                    }
                }
            }
        }
 private void transferTimeoutsToBuckets() {
     //从timeouts队列中获取任务,每次最多只能获取10万个任务
            for(int i = 0; i < 100000; ++i) {
                HashedWheelTimeout timeout = (HashedWheelTimeout)HashedWheelTimer.this.timeouts.poll();
                //timeout==null说明timeouts队列已经空了
                if (timeout == null) {
                    break;
                }
                //计算执行该任务需要放到哪个bucket下并且对应的round为多少
                if (timeout.state() != 1) {
                    //计算任务的执行时间
                    //这里的deadline是任务执行时间相对时间轮开始时间的时间,也就是计算从时间轮的开始时间算起,需要经过多少次tick
                    long calculated = timeout.deadline / HashedWheelTimer.this.tickDuration;
                    timeout.remainingRounds = (calculated - this.tick) / (long)HashedWheelTimer.this.wheel.length;
                    //calculated和tick去较大者,就是说如果当前任务的执行时间已过期,则让其在当前tick执行
                    long ticks = Math.max(calculated, this.tick);
                    //计算该任务要在哪个bucket下执行
                    int stopIndex = (int)(ticks & (long)HashedWheelTimer.this.mask);
                    HashedWheelBucket bucket = HashedWheelTimer.this.wheel[stopIndex];
                    //HashedWheelBucket底层是一个HashedWheelTimeout的链表 
                    bucket.addTimeout(timeout);
                }
            }

        }

处理已到执行时间的任务

// 这里的deadline是当前时间距离时间轮启动经过的时间
public void expireTimeouts(long deadline) {
            HashedWheelTimeout next;
            //从头遍历HashedWheelTimeout链表
            for(HashedWheelTimeout timeout = this.head; timeout != null; timeout = next) {
                next = timeout.next;
                if (timeout.remainingRounds <= 0L) {
                    next = this.remove(timeout);
                    //说明当前任务的执行时间大于deadline,中间可能哪里出现故障,抛出异常
                    if (timeout.deadline > deadline) {
                        throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                    }
                    //调用timeout.expire方法,执行task,通过ImmediateExecutor线程池执行任务,实际就是调用task的run方法
                    timeout.expire();
                } else if (timeout.isCancelled()) {
                    next = this.remove(timeout);
                } else {
                    //未到执行时间,remainingRounds-1
                    --timeout.remainingRounds;
                }
            }

        }

多层时间轮

当时间跨度很大时,提升单层时间轮的 tickDuration 可以减少空转次数,但会导致时间精度变低,使用多层时间轮既可以避免精度降低,也能减少空转次数。

如果有时间跨度较长的定时任务,则可以交给多层级时间轮去调度。

假设有一个设置为5天14 小时40 分30 秒后执行的定时任务,在 tickDuration = 1s 的单层时间轮中,需要经过:5x24x60x60+14x60x60+40x60+30 数十万次tick才能被执行。
但在 wheel1 tickDuration = 1 天,wheel2 tickDuration = 1 小时,wheel3 tickDuration = 1 分,wheel4 tickDuration = 1 秒 的四层时间轮中,只需要经过 5+14+40+30 次tick就可以了。

总结

while+sleepTimerScheduledThreadPoolExecutorHashedWheelTimer
实现方式while+sleep最小堆最小堆基于时间轮
写入效率-O(logN)O(logN)类HashMap,近似O(1)
查询效率-O(1)O(1)近似O(1)
优点实现简单 O(1)可以对大量定时任务进行统一调度线程池执行,有异常捕获机制写入性能高
缺点对于大量定时任务不便于管理单线程执行;没有异常捕获机制写入效率较低,在需要大量添加定时任务的场景下会影响性能单线程执行;没有异常捕捉机制

注意下,HashedWheelTimer 的写入和查询效率都是近似O(1),由于链表的存在,如果要执行任务的存放在长链表的末尾,那他的查询性能可能会很差,HashMap通过扰动函数来将减少hash冲突,时间轮也可以通过设置合适的时间精度,来减少hash冲突

Netty对时间轮的实现是基于他的使用场景,我们可以根据不同的业务场景对时间轮进行优化

  1. 将所有的任务交给线程池执行,避免单个任务的执行耗时较长影响下一个任务的执行
  2. 可以给每个bucket设置一个线程池来执行这个bucket的任务
  3. 假设需要在同一时刻,执行大量比较耗时的任务,那么可以通过MQ解耦,然后使用消费者并发执行任务,提高性能
    。。。。

选择哪一种方式来实现定时/延迟任务取决于各自的业务场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/49247.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

智能家居是否可与ChatGPT深度融合?

​ ChatGPT自2022年面世以来&#xff0c;已为亿万网民提供智能问答服务。然而我们是否曾想到&#xff0c;这一人工智能驱动的聊天机器人&#xff0c;是否可为智能家居赋能? 要实现ChatGPT与智能家居设备之间的无缝对话&#xff0c;单单依靠一台终端是远远不够的。ChatGPT必须…

【快应用】通用方法getBoundingClientRect的使用

【关键词】 快应用开发、通用方法、getBoundingClientRect 【背景】 快应用通用方法中提供了getBoundingClientRect方法来获取当前组件的布局位置&#xff0c;之前处理的快应用问题中&#xff0c;有个cp却把这种场景误解为可以获取到文字的宽度和高度&#xff0c;这是不合理的…

会议OA项目之会议发布(一)

目录 前言&#xff1a; 会议发布的产品原型图&#xff1a; 1.会议发布 1.1实现的特色功能&#xff1a; 1.2思路&#xff1a; 使用的数据库&#xff1a; 我们要实现多功能下拉框的形式选择可以参考原文档&#xff1a;https://hnzzmsf.github.io/example/example_v4.html#down…

Java-方法的使用

目录 一、方法的概念和使用 1.1方法的含义 1.2 方法的定义 1.3方法调用的执行过程 1.4实参和形参的关系 1.5没有返回值的方法 二、方法重载 2.1方法重载的含义 2.3 方法签名 三、递归 3.1 递归的概念 3.2递归的执行过程 3.3递归练习 一、方法的概念和使用 1.1方法的含义 方…

无符号数和有符号数的“bug”

1. 起因 在实现kmp算法时&#xff0c;出现了诡异的现象&#xff0c;看下面的代码&#xff1a; int KMP (const char *s, const char *t) {int lenS strlen (s);int lenT strlen (t);int next[lenT];get_next (next, t);int i 0;int j 0;while (i < lenS && j …

watch避坑,使用computed进行处理数据

业务场景&#xff1a;在vue中监听el-input 中的字数有没有超过60&#xff0c;如果超过60字时将60后面的字变为 “>>” 符号&#xff0c;以此实现预览苹果手机推送摘要场景。 错误&#xff1a;开始的逻辑是使用watch监听&#xff0c;检查length超过60直接 加上符号&#x…

深度学习:Pytorch最全面学习率调整策略lr_scheduler

深度学习&#xff1a;Pytorch最全面学习率调整策略lr_scheduler lr_scheduler.LambdaLRlr_scheduler.MultiplicativeLRlr_scheduler.StepLRlr_scheduler.MultiStepLRlr_scheduler.ConstantLRlr_scheduler.LinearLRlr_scheduler.ExponentialLRlr_scheduler.PolynomialLRlr_sched…

【六大锁策略-各种锁的对比-Java中的Synchronized锁和ReentrantLock锁的特点分析-以及加锁的合适时机】

系列文章目录 文章目录 系列文章目录前言一、六大"有锁策略"1. 乐观锁——悲观锁2. 轻量级锁——重量级锁3. 自旋锁——挂起等待锁4. 互斥锁——读写锁5. 可重入锁——不可重入锁6. 公平锁——非公平锁 二、Synchronized——ReentrantLockSynchronized的特点&#xf…

【C语言数据结构】模拟·顺序表·总项目实现

&#x1f490; &#x1f338; &#x1f337; &#x1f340; &#x1f339; &#x1f33b; &#x1f33a; &#x1f341; &#x1f343; &#x1f342; &#x1f33f; &#x1f344;&#x1f35d; &#x1f35b; &#x1f364; &#x1f4c3;个人主页 &#xff1a;阿然成长日记 …

leetcode 376. 摆动序列

2023.7.28 本题思路是定义一个 direct变量记录上一次摆动是上坡还是下坡 。 然后在一个for循环中循环判断当前摆动和上一次摆动是否一致&#xff0c;如果不一致则视为一次摆动。 如果前后元素值相等得话&#xff0c;直接continue进入下一次循环。 下面看代码&#xff1a; clas…

leaftjs实现全国温度降水气压风速等值面风场洋流效果

实现内容 数据爬取、地图marker聚合、鼠标移动显示pop&#xff0c;风场&#xff0c;洋流&#xff0c;温度等值面、降水等值面、气压等值面、风速等值面&#xff0c;洋流方向、洋流流速展示、风场方向、风场风速展示&#xff0c;后期扩展小时预报&#xff0c;分钟预报、7天预报…

Matplotlib_概述_绘制图象

⛳绘制基础 在使用 Matplotlib 绘制图形时&#xff0c;其中有两个最为常用的场景。一个是画点&#xff0c;一个是画线。 pyplot 基本方法的使用如下表所示 方法名说明title()设置图表的名称xlabel()设置 x 轴名称ylabel()设置 y 轴名称xticks(x, ticks, rotation)设置 x 轴的…

(el-radio)操作:Element-plus 中 Radio 单选框改成垂直排列的样式操作与使用

Ⅰ、Element-plus 提供的Radio单选框组件与想要目标情况的对比&#xff1a; 1、Element-plus 提供 Radio 组件情况&#xff1a; 其一、Element-ui 自提供的Radio代码情况为(示例的代码)&#xff1a; // Element-plus 自提供的代码&#xff1a; // 此时是使用了 ts 语言环境&a…

4.操作元素属性

4.1操作元素常用属性 ●通过 JS 设置/修改 标签元素属性&#xff0c;比如通过src更换图片 ●最常见的属性比如&#xff1a;href、 title、 src 等 ●语法: 对象.属性 值【示例】 // 1.获取元素 const pic document.querySelector( img ) // 2.操作元素 pic.src ./images/b…

商品库存管理系统设计与实现(Vue+SpringBoot+MySQL)

一、项目背景 当今&#xff0c;我国科技发展日新月异&#xff0c;各类企业迅速崛起&#xff0c;商品类型日益繁多&#xff0c;产品数量急剧增加&#xff0c;企业经营模式越来越多样&#xff0c;信息处理量不断加大&#xff0c;对库存管理提出了更高的要求。通过本系统&#xff…

LayUi 树形组件tree 实现懒加载模式,展开父节点时异步加载子节点数据

如题。 效果图&#xff1a; //lazy属性为true&#xff0c;点开时才加载 引用代码&#xff1a; <link href"~/Content/layui-new/css/layui.css" rel"stylesheet" /><form id"form" class"layui-form" style"margin-to…

数据库索引优化与查询优化——醍醐灌顶

索引优化与查询优化 哪些维度可以进行数据库调优 索引失效、没有充分利用到索引-一索引建立关联查询太多JOIN (设计缺陷或不得已的需求) --SQL优化服务器调优及各个参数设置 (缓冲、线程数等)–调整my.cnf数据过多–分库分表 关于数据库调优的知识点非常分散。不同的 DBMS&a…

YOLOv5:使用7.0版本训练自己的实例分割模型(车辆、行人、路标、车道线等实例分割)

YOLOv5&#xff1a;使用7.0版本训练自己的实例分割模型&#xff08;车辆、行人、路标、车道线等实例分割&#xff09; 前言前提条件相关介绍使用YOLOv5-7.0版本训练自己的实例分割模型YOLOv5项目官方源地址下载yolov5-7.0版源码解压目录结构 准备实例分割数据集在./data目录下&…

Rust vs Go:常用语法对比(七)

题图来自 Go vs Rust: Which will be the top pick in programming?[1] 121. UDP listen and read Listen UDP traffic on port p and read 1024 bytes into buffer b. 听端口p上的UDP流量&#xff0c;并将1024字节读入缓冲区b。 import ( "fmt" "net&qu…

vue+leaflet笔记之地图聚合

vueleaflet笔记之地图聚合 文章目录 vueleaflet笔记之地图聚合开发环境代码简介插件简介与安装使用简介 详细源码(Vue3) 本文介绍了Web端使用Leaflet开发库进行地图聚合查询的一种方法 (底图来源:中科星图)&#xff0c;结合Leaflet.markercluster插件能够快速的实现地图聚合查询…