手写ThreadPoolExecutor线程池

很多人不推荐造轮子,我偏不。我造轮子又不是为了上生产环境,而是为了加深理解,有何不可?私以为造轮子几乎是最好的学习方式,甚至没有之一。因为造轮子需要至少做足以下两点:

  • 了解设计思想(设计层面)
  • 大略看过源码(代码层面)

不了解设计,就无法把握整体。没看过代码,就无法完善细节。

另外,从创作者的角度来说,直接分析源码有时太困难了,代码太多,抽象层次太深。如果可以通过造轮子,把抽象层次减少一些,采用平铺直叙的方式呈现,那么读者理解起来也就更容易些。

既然造轮子这么好,那就,拿来吧你。今天带大家造一个线程池。

线程池设计思想

池化技术

大家多少听过所谓的“池化技术”,比如数据库连接池、常量池、线程池、对象池等等。池化技术是计算机世界里比较常用的、行之有效的优化手段。那么我想问你,线程池中的“池”,到底指代什么?

抛开无足轻重的小虾米,线程池中最主要的就两个:我们向Executor提交的任务、Executor自己维护的Thread。其中,线程“池”显然指代的是Thread的集合。

但和数据库连接池等一般的池化技术不同的是,ThreadPool的作用不单单是“池化”,它更重要的职责其实是“做功”,也即是执行任务。举个例子,平时我们使用数据库连接池,其实都是从池中取出一个Connection,执行完SQL后会调用重写过的close()归还Connection。但你可曾见过有人向ThreadPool讨要Thread的?它会给你吗?ThreadPool的做法是:

想要从池中拿Thread?没门儿!你不知道自己多线程知识多菜啊?小心玩火自焚。要执行任务的话,你自己把Task丢进来,哥罩着你。

也就是说,ThreadPool从一开始就没想过让你们把Thread拿走!但你们又要返回结果咋整?我返回一个FutureTask,需要结果的话,自己FutureTask#get()。但主动权还是在ThreadPool这,能不能拿到结果、是否要阻塞都是它说了算!

大部分人觉得线程池难,并非搞不清楚线程“池”,而是不了解它是如何“做功”的,也就是说:线程池是如何执行任务的呢?这就涉及到线程池和一般池化技术最大的不同:内化执行操作,而且是通过生产消费的模式执行任务。

生产消费模型

如果往线程池不断提交任务,大致会经历4个阶段:

  • 核心线程处理任务
  • 任务进入任务队列等待
  • 非核心线程处理任务
  • 拒绝策略

特别是第二个阶段,来不及处理的任务会被暂存入workQueue(任务队列),于是典型的生产消费模型就出现了。

调用者投递Task ====> ThreadPool.workQueue ====> workerThread阻塞获取Task执行

几个重要概念

线程池如何复用线程?

有时候,要解决一个问题,从反方向入手可能更简单写。我们暂且先不管如何复用线程,我就问大家:如何回收/销毁线程?(知道线程什么情况会被销毁,那么只要避免销毁,也就可以复用)

“线程”这个词,其实有两个层次的指代:Thread对象、JVM线程资源(本质还是操作系统线程)。Thread对象与线程资源之间是绑定关系,一个线程资源被分配后,会找到Thread#run()作为代码的执行入口。

线程什么时候销毁呢?正常来说,new Thread(tartget).start()后,操作系统就会分配线程资源。等到线程执行完Thread#run()中的代码,就会自然消亡。至于Thread对象,如果没有引用,也会被GC回收。

看到这,我想大家应该明白了:只要任务永远不结束,线程就永远死不了。任务如何才能永远不结束呀?要么循环做任务、要么阻塞。

线程池本质也是Thread,只是单体和集合的区别。既然Thread“跑完任务就销毁”的特性是天生的、注定的,线程池也无法改变这一点。所以,线程池要想让内部线程一直存活着,就要keeps threads busy working,也就是让它们一直干活。实在没活干怎么办?那就阻塞着呗(可以用阻塞队列)!总之,不能让你“执行完毕”,否则就销毁了。

如何保证只销毁“非核心线程”

大家都听过一些八股文的口诀,比如“在空闲时间,如果非核心线程空闲超过keepAliveTime就会被回收”,这是怎么实现的呢?

首先,有一个常见的误区是,很多人以为线程池创建线程时会给每一个Thread做标记,比如给核心线程标记为coreThread,非核心线程标记为nonCoreThread,然后空闲时间回收nonCoreThread。

然而JDK的Doug Lea可不这么想,人家采用的方案更加简单粗暴:

  • 当前线程数 <= corePoolSize,那么所有线程都是核心线程,不回收
  • 当前线程数 > corePoolSize,那么适当回收部分线程

看吧,“部分”线程,甚至压根不管你是谁。

OK,罗里吧嗦讲了一大堆,开始coding吧。

山寨线程池Demo

建议大家拷贝代码到本地debug调试,代码已经很精简了,比较容易追踪。

public class ThreadPool {

    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * 工作线程
     */
    private final List<Worker> workers = new ArrayList<>();
    /**
     * 任务队列
     */
    private BlockingQueue<Runnable> workQueue;
    /**
     * 核心线程数
     */
    private final int corePoolSize;
    /**
     * 最大线程数
     */
    private final int maximumPoolSize;
    /**
     * 非核心线程最大空闲时间(否则销毁线程)
     */
    private final long keepAliveTime;

    public ThreadPool(int corePoolSize,
                      int maximumPoolSize,
                      long keepAliveTime,
                      TimeUnit timeUnit,
                      BlockingQueue<Runnable> workQueue) {
        this.workQueue = workQueue;
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.keepAliveTime = timeUnit.toNanos(keepAliveTime);
    }

    public void execute(Runnable task) {
        Assert.notNull(task, "task is null");

        // 创建核心线程处理任务
        if (workers.size() < corePoolSize) {
            this.addWorker(task, true);
            return;
        }

        // 尝试加入任务队列
        boolean enqueued = workQueue.offer(task);
        if (enqueued) {
            return;
        }

        // 创建非核心线程处理任务
        if (!this.addWorker(task, false)) {
            // 非核心线程数达到上限,触发拒绝策略
            throw new RuntimeException("拒绝策略");
        }
    }

    private boolean addWorker(Runnable task, boolean core) {
        int wc = workers.size();
        if (wc >= (core ? corePoolSize : maximumPoolSize)) {
            return false;
        }

        boolean workerStarted = false;
        try {
            Worker worker = new Worker(task);
            final Thread thread = worker.getThread();
            if (thread != null) {
                mainLock.lock();
                workers.add(worker);
                thread.start();
                workerStarted = true;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            mainLock.unlock();
        }

        return workerStarted;
    }

    private void runWorker(Worker worker) {
        Runnable task = worker.getTask();

        try {
            // 循环处理任务
            while (task != null || (task = getTask()) != null) {
                task.run();
                task = null;
            }
        } finally {
            // 从循环退出来,意味着当前线程是非核心线程,而且需要被销毁
            // Java的线程,既可以指代Thread对象,也可以指代JVM线程,一个Thread对象绑定一个JVM线程
            // 因此,线程的销毁分为两个维度:1.把Thread对象从workers移除 2.JVM线程执行完当前任务,会自然销毁
            workers.remove(worker); // TODO 最好加锁
        }
    }


    private Runnable getTask() {
        boolean timedOut = false;

        // 循环获取任务
        for (; ; ) {

            // 是否需要检测超时:当前线程数超过核心线程
            boolean timed = workers.size() > corePoolSize;

            // 需要检测超时 && 已经超时了
            if (timed && timedOut) {
                return null;
            }

            try {
                // 是否需要检测超时
                // 1.需要:poll阻塞获取,等待keepAliveTime,等待结束就返回,不管有没有获取到任务
                // 2.不需要:take持续阻塞,直到获取结果
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

    @Getter
    @Setter
    private class Worker implements Runnable {
        private Thread thread;
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
            thread = new Thread(this);
        }

        @Override
        public void run() {
            runWorker(this);
        }
    }

}

代码示意图(虚线框内由Thread异步执行):

这个图有瑕疵,实际上线程池并不区分coreThread和nonCoreThread,仅看当前线程数是否大于corePoolSize

测试案例

@Slf4j
public class ThreadPoolTest {

    public static void main(String[] args) {

        // 创建线程池,核心线程1,最大线程2
        // 提交4个任务:第1个任务交给核心线程、第2个任务入队、第3个任务交给非核心线程、第4个任务被拒绝
        ThreadPool threadPoolExecutor = new ThreadPool(
                1,
                2,
                1,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1)
        );

        threadPoolExecutor.execute(() -> {
            log.info("{}:执行第1个任务...", Thread.currentThread().getName());
            sleep(10);
        });

        sleep(1);

        threadPoolExecutor.execute(() -> {
            log.info("{}:执行第2个任务...", Thread.currentThread().getName());
            sleep(10);

        });

        sleep(1);

        threadPoolExecutor.execute(() -> {
            log.info("{}:执行第3个任务...", Thread.currentThread().getName());
            sleep(10);
        });

        sleep(1);

        threadPoolExecutor.execute(() -> {
            log.info("{}:执行第4个任务...", Thread.currentThread().getName());
            sleep(10);
        });

        sleep(1);

        log.info("main结束");
    }

    private static void sleep(int seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

大家可以把测试案例中的线程池换成JDK的ThreadPoolExecutor,执行效果很类似:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/144040.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

java反射机制

java反射机制 方法四要素使用反射机制获取方法并调用方法 方法四要素 不使用反射机制调用一个方法需要几个要素的参与&#xff1f; 例&#xff1a; SystemService.java package com.w.spring6.reflection;public class SystemService {public void logout(){System.out.prin…

质量管理工作难做,为什么还有那么多人还继续做?

理解质量管理的挑战 在当今商业环境中&#xff0c;质量管理工作是一项充满挑战的使命。然而&#xff0c;尽管面对种种困难&#xff0c;却有着越来越多的人愿意踏上这条坎坷之路。为何质量管理工作如此艰难&#xff0c;却依旧吸引无数人投身其中呢&#xff1f; 内外动因交融 内…

【23真题】坑挖的不错,题目也有质量!

今天分享的是23年西安石油大学810的信号与系统试题及解析。 本套试卷难度分析&#xff1a;22年西安石油810考研真题&#xff0c;我也发布过&#xff0c;若有需要&#xff0c;戳这里自取!本套试题内容难度中等偏下&#xff0c;题量较少&#xff0c;没有考察选填题&#xff0c;通…

刚刚!奥特曼终于透露了GPT-5的最新消息!

原 创作者 | Tscom、王二狗 大爆料&#xff01;OpenAI被实锤正在研发GPT-5&#xff01; 还是OpenAI的CEO Sam Altman 亲口证实的。 今日&#xff0c;奥特曼接受《金融时报》的采访&#xff0c;透露了很多OpenAI的下一步计划&#xff0c;二狗帮大家整理成以下10个要点&#x…

搬家快递服务预约小程序的作用是什么

无论家庭还是企业办公&#xff0c;不少人都有搬家快递服务需求&#xff0c;尤其是近些年类似服务市场需求规模增长迅速。而在实际经营中&#xff0c;行业商家从业者也面临一些经营难题&#xff1a; 搬家公司的服务一般主要针对同省用户&#xff0c;同城需求较高&#xff0c;然…

实现定时巡检接口,测试不通过时自动发邮件

背景是这样的&#xff1a;最近组织架构调整&#xff0c;我们这个团队部分人员调入到了另外的业务组&#xff0c;因此她之前负责的业务需要交接给我们。 其中一个是接口每日监测&#xff0c;之前这个同事的做法是每天去手动点下按钮来跑接口测试&#xff0c;然后看一眼接口测试…

GZ038 物联网应用开发赛题第7套

2023年全国职业院校技能大赛 高职组 物联网应用开发 任 务 书 &#xff08;第7套卷&#xff09; 工位号&#xff1a;______________ 第一部分 竞赛须知 一、竞赛要求 1、正确使用工具&#xff0c;操作安全规范&#xff1b; 2、竞赛过程中如有异议&#xff0c;可向现场考评…

YOLOv8任务

介绍 YOLOv8是一个支持多个计算机视觉任务的人工智能框架。该框架可用于执行检测、分割、分类和姿态估计。每个任务都有不同的目标和用例。 检测 检测是YOLOv8支持的主要任务。它包括检测图像或视频帧中的对象&#xff0c;并在它们周围绘制边界框。检测到的对象根据其特征被分类…

全网火爆,Python接口自动化测试Mock服务详细总结(实战场景)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、Mock实现原理与…

SparkSQL声明式

简单案例 import org.apache.spark.sql.SparkSession import org.junit.Testcase class Person(id:Int,name:String,sex:String,age:Int) class DataSetCreate {val spark SparkSession.builder().appName("test").master("local[4]").getOrCreate()impo…

2023年【汽车驾驶员(中级)】免费试题及汽车驾驶员(中级)考试试卷

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2023年【汽车驾驶员&#xff08;中级&#xff09;】免费试题及汽车驾驶员&#xff08;中级&#xff09;考试试卷&#xff0c;包含汽车驾驶员&#xff08;中级&#xff09;免费试题答案和解析及汽车驾驶员&#xff08;…

人工智能基础_机器学习029_Lasso回归的使用_代码实现_稀疏性提现---人工智能工作笔记0069

然后我们再来看lasso回归,其实也是前面我们说的套索回归,我们说了 套索回归,具有稀松性,就是有一部分w会变成0对吧 我们先看一下套索回归的公式 公式我们可以去官网去看 可以看到这里上面有个写法是L1 = ||w||1这里两个竖,就是矩阵的写法,表示矩阵,然后 后面的部分|wi|绝对…

c++ 经典服务器开源项目Tinywebserver如何运行

第一次直接按作者的指示&#xff0c;运行sh ./build.sh,再运行./server&#xff0c;发现不起作用&#xff0c;localhost:9006也是拒绝访问的状态&#xff0c;后来摸索成功了发现&#xff0c;运行./server之后&#xff0c;应该是启动状态&#xff0c;就是不会退出&#xff0c;而…

【广州华锐互动】消防科普VR实训展馆增强群众学习兴趣和沉浸感

在现代社会&#xff0c;科技的发展已经深入到我们生活的各个角落&#xff0c;其中包括教育和信息传播领域。3D技术的引入为科普教育提供了全新的可能性。特别是在消防安全教育中&#xff0c;消防科普VR实训展馆的应用&#xff0c;不仅可以提高公众的消防安全意识&#xff0c;还…

分享一下微信公众号怎么增加分销的功能

在当今的数字化时代&#xff0c;微信公众号已成为企业和个人开展营销活动的重要平台。然而&#xff0c;仅仅依靠发布文章和推送消息&#xff0c;已经不能满足商家对深度营销的需求。为了进一步拓展商业价值&#xff0c;微信公众号需要增加分销功能。本文将详细介绍如何为微信公…

ARM PMU

PMU单元概览 ARM PMU概要 PMU作为一个扩展功能&#xff0c;是一种非侵入式的调试组件。 对PMU寄存器的访问可以通过CP15协处理器指令和Memory-Mapped地址。 基于PMUv2架构&#xff0c;A7处理器在运行时可以收集关于处理器和内存的各种统计信息。对于处理器来说这些统计信息中…

Moto edge s pro手机 WIFI和蓝牙连接不上 解决方法分享

2021年12月入手一台Moto Edge S Pro 12256版&#xff0c;看着性价比很高&#xff0c;越用越垃圾。屏幕显示没有vivo亮丽/APP图标很丑/屏幕上一点点水就失灵/拍照片边缘是模糊的/系统几乎不更新。 以上都可以忍受&#xff0c;但是&#xff1a; 用一年不到&#xff0c;蓝牙不能…

【数据结构】深入了解栈

&#x1f525;博客主页&#xff1a; 小羊失眠啦. &#x1f3a5;系列专栏&#xff1a;《C语言》 《数据结构》 《Linux》《Cpolar》 ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 文章目录 一、栈的基本概念二、栈实现方法的分析与选择2.1 引入2.2 顺序表实现2.3 链表实现2.3.1…

全国犯罪人数大数据可视化平台【可视化项目案例-08】

🎉🎊🎉 你的技术旅程将在这里启航! 🚀🚀 本文选自专栏:可视化技术专栏100例 可视化技术专栏100例,包括但不限于大屏可视化、图表可视化等等。订阅专栏用户在文章底部可下载对应案例源码以供大家深入的学习研究。 🎓 每一个案例都会提供完整代码和详细的讲解,不…

将铜互连扩展到2nm的研究

晶体管尺寸在3nm时达到临界点&#xff0c;纳米片FET可能会取代finFET来满足性能、功耗、面积和成本目标。同样&#xff0c;正在评估2nm铜互连的重大架构变化&#xff0c;此举将重新配置向晶体管传输电力的方式。 芯片制造商也可能会在2nm节点开始用钌或钼在一定程度上取代铜。…