Java避坑案例 - “激进”的线程池扩容策略及实现

文章目录

  • 问题
  • 思路
  • 线程池的默认行为
  • 自定义线程池扩容策略
  • Code实现
  • 小结

在这里插入图片描述


问题

Java 线程池是先用工作队列来存放来不及处理的任务,满了之后再扩容线程池。当我们的工作队列设置得很大时,最大线程数这个参数显得没有意义,因为队列很难满,或者到满的时候再去扩容线程池已经于事无补了。

那有没有办法让线程池更激进一点,优先开启更多的线程,而把队列当成一个后备方案呢?

举个例子,任务执行得很慢,需要 10 秒,如果线程池可以优先扩容到 5个最大线程,那么这些任务最终都可以完成,而不会因为线程池扩容过晚导致慢任务来不及处理


思路

  1. 由于线程池在工作队列满了无法入队的情况下会扩容线程池,那么我们是否可以重写队列的 offer 方法,造成这个队列已满的假象呢?
  2. 由于我们 Hack 了队列,在达到了最大线程后势必会触发拒绝策略,那么能否实现一个自定义的拒绝策略处理程序,这个时候再把任务真正插入队列呢

线程池的默认行为

Java 的线程池默认行为如下:

  • 仅在有任务到来时才初始化核心线程。
  • 当核心线程满后,任务会被放入工作队列。
  • 如果工作队列满,线程池会扩容直到达到最大线程数。
  • 超过最大线程数的任务会根据拒绝策略处理。
  • 当线程数超过核心线程数时,超出线程会在空闲时被回收。

自定义线程池扩容策略

为了使线程池在任务到来时更激进地扩容,我们可以考虑以下两步:

  1. 重写工作队列的 offer 方法
    通过创建一个自定义工作队列,重写 offer 方法,使其在插入任务时总是返回 false,从而模拟队列已满的状态。

  2. 实现自定义拒绝策略
    在达到最大线程数后,我们需要定义一个拒绝策略,在这个策略中,再把任务插入到自定义工作队列中。


Code实现

public int elasticTP() throws InterruptedException {
    //这里开始是激进线程池的实现
    //创建一个容量为10的阻塞队列,用于存储待执行的任务
    BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(10) {
        @Override
        public boolean offer(Runnable e) {
            //先返回false,造成队列满的假象,让线程池优先扩容
            return false;
        }
    };

    //创建一个线程池,核心线程数为2,最大线程数为5,空闲线程存活时间为5秒
    //使用自定义的线程工厂设置线程名称格式,拒绝执行处理器为尝试再次提交任务到队列
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            2, 5,
            5, TimeUnit.SECONDS,
            queue, new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").build(), (r, executor) -> {
        try {
            //等出现拒绝后再加入队列
            //如果希望队列满了阻塞线程而不是抛出异常,那么可以注释掉下面三行代码,修改为executor.getQueue().put(r);
            if (!executor.getQueue().offer(r, 0, TimeUnit.SECONDS)) {
                throw new RejectedExecutionException("ThreadPool queue full, failed to offer " + r.toString());
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
    //激进线程池实现结束

    //定期打印线程池的状态信息
    printStats(threadPool);
    //每秒提交一个任务,每个任务耗时10秒执行完成,一共提交20个任务

    //任务编号计数器
    AtomicInteger atomicInteger = new AtomicInteger();

    //循环提交20个任务到线程池
    IntStream.rangeClosed(1, 20).forEach(i -> {
        try {
            //每秒提交一个任务
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //获取当前任务编号
        int id = atomicInteger.incrementAndGet();
        try {
            //提交任务到线程池执行
            threadPool.submit(() -> {
                //任务开始时的日志记录
                log.info("{} started", id);
                try {
                    //模拟任务执行时间
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    //任务被中断时不做处理
                }
                //任务结束时的日志记录
                log.info("{} finished", id);
            });
        } catch (Exception ex) {
            //提交任务到线程池执行时发生错误
            log.error("error submitting task {}", id, ex);
            //任务提交失败,编号计数器回退
            atomicInteger.decrementAndGet();
        }
    });

    //等待所有任务完成
    TimeUnit.SECONDS.sleep(60);
    //返回最终的任务编号计数
    return atomicInteger.intValue();
}



private void printStats(ThreadPoolExecutor threadPool) {
    Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
        log.info("=========================");
        log.info("Pool Size: {}", threadPool.getPoolSize());
        log.info("Active Threads: {}", threadPool.getActiveCount());
        log.info("Number of Tasks Completed: {}", threadPool.getCompletedTaskCount());
        log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());

        log.info("=========================");
    }, 0, 1, TimeUnit.SECONDS);
}


BlockingQueueputoffer 方法在行为上有一些重要的区别,主要体现在任务添加失败时的处理方式和返回值上。以下是详细的对比:

  • put:

    • 用于将元素添加到队列中,如果队列已满,put 会阻塞当前线程,直到队列有空间可以添加元素。
    • 语法:
      void put(E e) throws InterruptedException;
      
  • offer:

    • 尝试将元素添加到队列中,如果队列已满,则根据不同的实现可能会立即返回 false,或在某些情况下也可以选择阻塞(例如 offer(E e, long timeout, TimeUnit unit))。
    • 语法:
      boolean offer(E e);
      boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
      

返回值

  • put:

    • put 方法没有返回值,只在成功添加元素后返回。如果操作被中断,会抛出 InterruptedException
  • offer:

    • offer 方法返回一个布尔值:
      • 返回 true 表示成功添加了元素。
      • 返回 false 表示队列已满,未能添加元素。
    • offer(E e, long timeout, TimeUnit unit) 方法在指定时间内等待,如果在超时之前队列有空间可以添加元素,则返回 true,否则返回 false
  1. 适用场景
  • put:

    • 更适合用于需要确保任务被添加到队列中的场景,且在队列满时允许线程等待。
  • offer:

    • 更适合用于希望尽量避免阻塞的场景,尤其是在需要快速尝试添加任务但不希望等待的情况下。

示例代码

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(2);

// 使用 put 方法
try {
    queue.put(1);  // 成功
    queue.put(2);  // 成功
    queue.put(3);  // 阻塞,直到有空间
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
}

// 使用 offer 方法
if (queue.offer(1)) {
    System.out.println("成功添加1");
} else {
    System.out.println("队列已满");
}

总结

  • put 方法适用于需要保证添加成功的场景,可能会导致线程阻塞。
  • offer 方法适用于希望快速检查并添加元素的场景,不会阻塞,但可能会失败。

小结

通过重写 offer 方法来让工作队列显示为已满,迫使线程池在任务到来时优先扩容,是一个很巧妙的思路,哈哈哈。

线程池的实现分析

  1. 自定义工作队列

    • 通过重写 LinkedBlockingQueueoffer 方法,总是返回 false,让线程池认为工作队列已满。这样,线程池会优先扩容到最大线程数。
  2. 自定义拒绝策略

    • 在拒绝策略中,使用 executor.getQueue().offer(r, 0, TimeUnit.SECONDS) 来将任务添加到队列。如果队列已满,会抛出 RejectedExecutionException。 或者选择 executor.getQueue().put(r)
  3. 任务提交

    • 通过 AtomicInteger 来计数已提交的任务。由于每个任务需要 10 秒才能完成,每秒提交一个任务,可以模拟任务积压的情况。

进一步的改进建议

  • 动态调整核心线程数:可以在任务数量超过某个阈值时,考虑动态增加核心线程数,以便更好地利用资源。

  • 更智能的拒绝策略:可以根据具体场景实现一个更加复杂的拒绝策略,比如优先选择某些类型的任务进入队列,或者将任务缓存到内存中。

  • 监控和反馈机制:增加监控机制,实时跟踪线程池的状态,包括活跃线程数、队列长度等,可以帮助在运行时做出更好的决策。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/904485.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

OpenAI低调发布多智能体工具Swarm:让多个智能体协同工作!

大家好&#xff0c;我是木易&#xff0c;一个持续关注AI领域的互联网技术产品经理&#xff0c;国内Top2本科&#xff0c;美国Top10 CS研究生&#xff0c;MBA。我坚信AI是普通人变强的“外挂”&#xff0c;专注于分享AI全维度知识&#xff0c;包括但不限于AI科普&#xff0c;AI工…

C++设计模式结构型模式———适配器模式

文章目录 一、引言二、适配器模式三、类适配器四、总结 一、引言 适配器模式是一种结构型设计模式&#xff0c;它在日常生活中有着广泛的应用&#xff0c;比如各种转换接头和电源适配器&#xff0c;它们的主要作用是解决接口不兼容的问题。就像使用电源适配器将220V的市电转换…

【Clickhouse】客户端连接工具配置

ClickHouse 是什么 ClickHouse 是一个分布式实时分析型列式存储数据库。具备高性能&#xff0c;支撑PB级数据&#xff0c;提供实时分析&#xff0c;稳定可扩展等特性。适用于数据仓库、BI报表、监控系统、互联网用户行为分析、广告投放业务以及工业、物联网等分析和时序应用场…

巴西电商市场神仙打架,美客多多月蝉联访问量榜首,9月Temu位居巴西APP下载量榜首

巴西电商市场近年来呈现出强劲的增长趋势&#xff0c;预计2024年巴西电子商务市场的销售额将达到2043亿雷亚尔&#xff08;约合373亿美元&#xff09;&#xff0c;同比增长约10%。作为拉美地区最大的经济体&#xff0c;巴西吸引了众多电商平台和商家&#xff0c;巴西电商市场竞…

Remix中struct入参

Remix中struct入参 // SPDX-License-Identifier: MIT pragma solidity 0.8.28;contract StructDemo {struct Student {uint256 id;string name;}// 初始化一个结构体Student public student;function initStudent5(Student memory _stu) public {student _stu;} }结构体最终…

网络请求自定义header导致跨域问题

我记得我的项目之前已经解决了跨域问题。 后来在功能开发着&#xff0c;需要添加一个自定义的header&#xff0c;发现又出现跨域报错。 于是又开始一通摸索折腾。 我的项目前面端是用axios网络请求&#xff0c;通过拦截器添加header&#xff0c;代码如下&#xff1a; //添加请…

leetcode344. Reverse String

Write a function that reverses a string. The input string is given as an array of characters s. You must do this by modifying the input array in-place with O(1) extra memory. Example 1: Input: s [“h”,“e”,“l”,“l”,“o”] Output: [“o”,“l”,“l”…

郎酒不做酱香“凤尾”,白酒首富汪俊林要做兼香“鸡头”

前两天&#xff0c;《2024胡润百富榜》发布&#xff0c;郎酒集团董事长汪俊林以590亿元财富位列榜单第65位&#xff0c;虽仍是白酒行业首富&#xff0c;但排名较去年下降18位&#xff0c;财富缩水17%。 个人财富的缩水&#xff0c;或许和身后郎酒的困境息息相关。发展40年来&am…

【力扣】[Java版] 刷题笔记-104. 二叉树的最大深度

题目&#xff1a;104. 二叉树的最大深度 给定一个二叉树 root &#xff0c;返回其最大深度。 二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数。 解题思路 有关二叉树的题&#xff0c;最先想到的就是利用递归方法遍历。 解题过程 分别计算左右子树的最大…

HCIP-HarmonyOS Application Developer 习题(十七)

&#xff08;判断&#xff09;1、对于用户创建的一些临时卡片在遇到卡片服务框架死亡重启&#xff0c;此时临时卡片数据在卡片管理服务中已经删除&#xff0c;且对应的卡片ID不会通知到提供方&#xff0c;所以卡片使用方需要自己负责清理长时间未刚除的临时卡片数据。 答案&…

2024下半年软考全国计算机软考高级考试,带你一文读懂软考!

一、软考是什么&#xff1f; 全国计算机技术与软件专业技术资格&#xff08;水平&#xff09;考试&#xff0c;简称“软考”&#xff0c;分为初级、中级、高级三个级别&#xff0c;国家级考试&#xff0c;证书含金量很高。 作为IT人&#xff0c;有哪些科目可以报考? 可参考202…

Vue3 学习笔记(十三)Vue组件详解

1、组件&#xff08;Component&#xff09; 介绍 组件&#xff08;Component&#xff09;是 Vue.js 最强大的功能之一。 组件可以扩展 HTML 元素&#xff0c;封装可重用的代码&#xff0c;可以帮助你将用户界面拆分成独立和可复用的部分。 每个 Vue 组件都是一个独立的 Vue 实…

快速入门kotlin编程(精简但全面版)

注&#xff1a;本文章为个人学习记录&#xff0c;如有错误&#xff0c;欢迎留言指正。 目录 1. 变量 1.1 变量声明 1.2 数据类型 2. 函数 3. 判断语句 3.1 if 3.2 when语句 4. 循环语句 4.1 while 4.2 for-in 5. 类和对象 5.1 类的创建和对象的初始化 5.2 继承 5…

性能之光 年度电竞性能旗舰iQOO 13发布

2024年10月30日&#xff0c;被定义为“性能之光”的年度电竞性能旗舰——iQOO 13正式发布&#xff0c;售价3999元起。iQOO 13作为iQOO 品牌在性能上的又一次深入探索&#xff0c;它像是一束光&#xff0c;引领行业不断拉高性能上限&#xff0c;让用户看到更多的可能性。 iQOO …

ubuntu内核更新导致显卡驱动掉的解决办法

方法1&#xff0c;DKMS指定内核版本 用第一个就行 1&#xff0c;借鉴别人博客解决方法 2&#xff0c;借鉴别人博客解决方法 方法2&#xff0c;删除多于内核的方法 系统版本&#xff1a;ubuntu20.24 这个方法是下下策&#xff0c;如果重装驱动还是不行&#xff0c;就删内核在…

端到端拥塞控制的公平性和稳定性

昨天早上环城河跑步时的两个思考&#xff0c;发了朋友圈&#xff0c;简单总结成文。 拥塞控制算法公平性度量要重新评估&#xff01;仅以带宽公平性做论断是过时且自私的&#xff0c;在全局视角&#xff0c;平衡和稳定一定以某种表现为乘积 “矩” 来保证&#xff0c;比如力矩…

Vue 组件生命周期(四)

Vue 组件生命周期 Vue3 的组件生命周期可以概括为四个阶段&#xff1a;创建、挂载、更新、销毁。每个阶段都包含了一组钩子函数&#xff0c;用于在不同阶段执行特定的操作。 生命周期各阶段对应以下 Hooks 函数&#xff1a; 一、创建阶段 setup() Vue3 引入的新生命周期函数&am…

idea main 不是模块 导致找不到或无法加载主类

问题 导入一个新项目&#xff0c;然后执行启动类&#xff0c;直接报错&#xff1a; 找不到或无法加载主类。 把编译的删除了&#xff0c;重新处理&#xff0c;也不行。 看了下main和test不是模块 正常的是&#xff1a; 处理&#xff1a; 把项目的 .gradle 和 .idae 目录删了&am…

推荐一款优秀的pdf编辑器:Ashampoo PDF Pro

Ashampoo PDF Pro是管理和编辑 PDF 文档的完整解决方案。程序拥有您创建、转换、编辑和保护文档所需的一切功能。根据需要可以创建特定大小的文档&#xff0c;跨设备可读&#xff0c;还可以保护文件。现在您还能像编辑Word文档一样编辑PDF! 软件特点 轻松处理文字 如 Microso…

在manjaro 2024里使用yay命令安装ROS2

不建议这么安装&#xff0c;研究了两天以失败告终。要不就手动编译吧。。。&#xff08;在系统环境良好的情况下&#xff0c;最好是刚装完系统就装ROS&#xff09;真的太多不适配了&#xff0c;旧有的很多yay包都会遇到一些奇怪的问题&#xff1a; 0.一开始就会遇到网络卡住的…