Java多线程实战-从零手搓一个简易线程池(三)线程工厂,核心线程与非核心线程逻辑实现

🏷️个人主页:牵着猫散步的鼠鼠 

🏷️系列专栏:Java全栈-专栏

🏷️本系列源码仓库:多线程并发编程学习的多个代码片段(github)

🏷️个人学习笔记,若有缺误,欢迎评论区指正 

目录

1.前言

1.1.内容回顾

1.2.本节任务

2.实现思路

2.1 线程工厂实现思路

2.2 核心线程与非核心线程实现思路

3.代码实现

3.1.线程池工厂实现

3.2核心线程与非核心线程逻辑

4.总结


✨️本系列源码均已上传仓库 1321928757/Concurrent-MulThread-Demo(github.com)✨️ 

(本章节可参考liushijie-240329-core分支)

1.前言

1.1.内容回顾

往期文章传送门:
Java多线程实战-从零手搓一个简易线程池(一)定义任务等待队列-CSDN博客

Java多线程实战-从零手搓一个简易线程池(二)线程池与拒绝策略实现-CSDN博客

在上一节我们实现了线程池内部的基本运转逻辑,池化了线程资源进行任务处理,细心的同学可以发现,我们上章没有划分核心线程与非核心线程的概念,在JDK官方的提供的线程池中,线程池中的线程从概念上分为核心线程和非核心线程,核心线程是线程池中长久存在的线程,默认不会被回收,而非核心线程在空闲时间超过设置的最大空闲时间时会被回收,当然,我们也可以通过设置一个属性来运行核心线程被回收。

1.2.本节任务

本章节的任务如下:

  1. 实现线程工厂
  2. 实现核心线程与非核心线程

2.实现思路

2.1 线程工厂实现思路

线程工厂是运用了工厂设计模式,可以帮助我们隐藏创建线程的一些细节。我们可以通过线程工厂在创建线程数时定义线程的一些属性,如线程名称、线程组等。实现线程工厂一般有以下步骤:

  1. 定义一个线程工厂接口或抽象类,提供创建新线程的方法。
  2. 实现该接口或继承该抽象类,重写创建线程的方法逻辑。
  3. 在线程池的构造函数中,传入自定义的线程工厂实例。

整体实现还是比较简单,主要就是要注意编码规范

2.2 核心线程与非核心线程实现思路

这里首先要清楚一个概念,JDK线程池源码中没有显式的区别核心线程和非核心线程,他只是线程池在处理线程池不同情况下的线程的一种概念。我们接下来从源码分析(JDK1.8)是如何实现核心线程和非核心线程的管理的。

JDK官方线程池中的runWorker方法作用是用来执行worker线程

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                // 线程执行任务流程,省流
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

同我们上节运行线程一样,他会通过while (task != null || (task = getTask()) != null)来重复获取任务,如果task == null,也就是没获取到,会进入到processWorkerExit函数中,线程会被回收。也就是说,只要getTask方法返回为null,就代表了当前线程需要回收,所以我们接下来重点查看getTask方法的源码:

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        // 1.方法内部使用了一个无限循环for (;;),这意味着线程会一直尝试获取任务,直到成功获取到任务或者满足退出条件。
        for (;;) {
            // 2.获取到目前线程池的线程数,最大核心线程,最大总线程数等信息
            int c = ctl.get();
            int rs = runStateOf(c);

            // 3.如果线程池的运行状态至少为SHUTDOWN(在此状态以上的状态,都不会接受新任务了,所以我们直接返回null)
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);//获取线程池当前线程数量

            // 4.根据当前线程数动态判断是否要回收
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

getTask方法主要负责从workQueue队列中获取任务,如果获取到了就返回任务,如果没有获取到就返回null。他会根据线程池的当前状态,当前线程数,来动态的选择是否从workQueue中拿取任务,以及拿取操作是否是超时操作。这里的设计特别巧妙,建议阅读源码仔细体会

如果 当前线程数 > 最大核心线程数,我们就判定存在非核心线程,可以进行回收判断

如果 当前线程数 < 最大线程数,我们就判定不存在核心线程

所以核心线程和非核心线程他们都是一类线程,只是在线程池不同情况下划分的概念而已

3.代码实现

3.1.线程池工厂实现

3.1.1.线程工厂接口
/**
 * @author Luckysj @刘仕杰
 * @description 线程工厂接口
 * @create 2024/03/28 20:40:18
 */
public interface ThreadFactory {
    /**
    * @description
    * @param 
    * @return 创建的线程对象
    * @date 2024/03/28 21:01:35
    */
    Thread newThread(Runnable r);
}
3.1.2.默认线程工厂实现类

默认线程工厂实现类主要是设置新建线程的线程组,线程名前缀等等信息,更加规范,方便后续日志排查错误

/**
 * @author Luckysj @刘仕杰
 * @description 默认线程工厂,我们这里仿照源码写法,为每个线程分配线程组(默认会自动分配),并为每个线程组
 * @create 2024/03/28 21:27:10
 */
public class DefaultThreadFactory implements ThreadFactory{
    /** 原子序号类,我们可以通过该类为线程工厂来获取一个随机序号,主要是为了区分不同线程池实例*/
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    /** 线程组,每个线程都需要属于一个线程组(平常使用未指定线程组会默认分配)*/
    private final ThreadGroup group;
    /** 原子序号类,我们可以通过该类为每个线程来获取一个随机序号*/
    private static final AtomicInteger threadNumber = new AtomicInteger(1);
    /** 线程名前缀,以便于在日志、监控等场景下识别和管理线程。*/
    private final String namePrefix;

    public DefaultThreadFactory() {
        // 获取管理安全策略的类,通过这个类我们可以获取对应名称的线程组,SecurityManager 和 group 的存在是为了更好地控制线程的安全性和权限
        SecurityManager s = System.getSecurityManager();
        // 存在 SecurityManager实例,则通过 s.getThreadGroup() 获取一个受限制的线程组。
        // 如果不存在 SecurityManager 实例,则使用当前线程所在的线程组 Thread.currentThread().getThreadGroup()。
        this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        // 生成前缀
        this.namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
        // 将线程设置为用户线程
        if(thread.isDaemon()){
            thread.setDaemon(false);
        }
        // 为线程设置默认优先级
        if(thread.getPriority() != Thread.NORM_PRIORITY){
            thread.setPriority(Thread.NORM_PRIORITY);
        }
        return thread;
    }
}
3.1.3.使用线程工厂

在Worker工作线程构造函数中使用工厂创建线程

    class Worker implements Runnable{
        private Runnable firstTask;

        private Thread thread;

        public Worker(Runnable task) {

            this.firstTask = task;
            this.thread = threadFactory.newThread(this);
        }
        
        // 省略
    }

3.2核心线程与非核心线程逻辑

3.2.1.编写getTask方法

getTask方法会根据线程池情况动态从任务队列中获取任务

    /**
    * @description 从等待队列中获取任务
    * @return Runnable 待执行的任务,没有获取到会返回null
    * @date 2024/04/02 10:46:37
    */
public Runnable getTask(){
        //我们使用一个变量来记录上次循环获取任务是否超时
        boolean preIsTimeOut = false;
        // 内部使用一个while循环,线程会一直尝试获取任务,直到成功获取到任务或者满足退出条件
        while(true){
            // 获取线程池当前线程数量
            int wc = threadTotalNums.get();
            // 1.是否要进行核心线程回收操作,当allowCoreThreadTimeOut为true,或者当前线程池数大于核心线程数时,我们需要进行回收判断
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // 2.根据情况动态调整线程数,以下情况需要直接返回null(返回null就会回收线程):
            // (1)当前线程大于最大线程数(就是超过规定大小了),且任务队列为空且存在工作线程
            // (2)timed为true,上次任务超时了(preIsTimeOut = true),且任务队列为空且存在工作
            if ( (wc > maximumPoolSize || (timed && preIsTimeOut)) && (wc > 1 || workQueue.isEmpty()) ) {
                return null;
            }

            // 3.根据timed这个条件来选择是超时堵塞
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            // 获取任务超时了,将preIsTimeOut设为true,下次可以执行回收
            preIsTimeOut = true;
        }
    }
  •  timed 变量决定了线程从等待队列中拿取任务的方式,如果当前线程数大于最大核心线程数,或者开启了允许核心线程回收(allowCoreThreadTimeOut = true),我们就超时拿取,这样如果拿取任务超时就会返回null,线程就会被回收
3.2.2.调整Worker工作线程的run方法

将原来直接从任务队列中获取任务改为通过getTask方法获取

 @Override
        public void run() {
            log.info("工作线程====》工作线程{}开始运行", Thread.currentThread());

            // 1。首先消费当前任务,消费完再去任务队列取,while循环实现线程复用
            while(firstTask != null || (firstTask = getTask()) != null){
                try {
                    firstTask.run();
                }catch (Exception e){
                    throw new RuntimeException(e);
                }finally {
                    // 执行完后清除任务
                    firstTask = null;
                }
            }

            // 2.跳出循环,说明取任务超过了最大等待时间,线程歇菜休息吧
            synchronized (workerSet){
                workerSet.remove(this);
                threadTotalNums.decrementAndGet(); //计数扣减
            }
            log.info("工作线程====》线程{}已被回收,当前线程数:{}", Thread.currentThread(), threadTotalNums.get());

        }
 3.2.3.编写addWorker方法
/**
    * @description 添加工作线程
    * @param firstTask 线程第一次执行的任务
    * @param isCore 是否为核心线程
    * @return Boolean 线程是否添加成功
    * @date 2024/04/02 10:42:43
    */
    public Boolean addWorker(Runnable firstTask, Boolean isCore){
        if(firstTask == null) {
            throw new NullPointerException();
        }
        // TODO 1.我们在添加线程时,首先可以进行一些与线程池生命周期相关的校验,比如在一些状态下,不允许再添加任务

        // 2.根据当前线程池和isCore条件判断是否需要创建
        int wc = threadTotalNums.get();
        if (wc >= (isCore ? corePoolSize : maximumPoolSize))
            return false;
        // 3.创建线程,并添加到线程集合中
        Worker worker = new Worker(firstTask);
        Thread t = worker.thread;
        if(t != null){
            synchronized (workerSet){
                workerSet.add(worker);
                threadTotalNums.getAndIncrement();
            }
            t.start();
            return true;
        }
        return false;
    }
3.2.4.完善excute方法

流程如下:

1.如果当前线程数小于核心线程,直接创建核心线程去运行

2.线程数大于核心线程,我们就将任务加入等待队列

3.队列满了,尝试创建非核心线程,如果失败就触发拒绝策略

public void execute(Runnable task){
        if(task == null){
            throw new NullPointerException("传递的Runnable任务为Null");
        }
        // 1.如果当前线程数小于核心线程,直接创建线程去运行
        if(threadTotalNums.get() < corePoolSize){
            if(addWorker(task, true)) return;
        }

        // 2.线程数大于核心线程,我们就将任务加入等待队列
        if(workQueue.offer(task)){
            return;
        }
        // 3.队列满了,尝试创建非核心线程,如果失败就触发拒绝策略
        else if(!addWorker(task, false)){
            reject(task);
        }

    }

4.测试

编写如下测试代码,我们会创建一个核心线程数为2,最大线程数为5,等待队列长度为5的线程池,并添加15个任务到线程池中,按照预期会有五个任务触发拒绝策略,在任务执行完成后只保留两个核心线程

@Slf4j
public class MainTest {
    public static void main(String[] args) {

        ThreadPool threadPool = new ThreadPool(new WorkQueue<>(5), 2, 5,5L, TimeUnit.SECONDS,
                (queue, task) -> {
                    log.info("拒绝策略====》拒绝策略触发,直接丢弃当前任务");
                }, new DefaultThreadFactory());
        threadPool.setAllowCoreThreadTimeOut(false); //不回收核心线程
        for (int i = 0; i < 15; i++) {
            threadPool.execute(() -> {
                System.out.println("执行任务------->当前执行线程为" + Thread.currentThread().toString());
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
        }

        // ExecutorService executorService = Executors.newFixedThreadPool(2);

    }
}

运行结果如下:

可以看到运行结果符合预期,任务也被正常消费 

我们设置AllowCoreThreadTimeOut的属性为true,再次进行测试,

threadPool.setAllowCoreThreadTimeOut(true); //回收核心线程

结果输出:

可以看到,核心线程也会被回收,符合预期。

5.总结

在本章节中我们通过学习JDK线程池源码中的部分代码,实现了一个简易版带有核心线程与非核心线程处理逻辑的线程池,我们可以通过指定AllowCoreThreadTimeOut属性来设置是否允许核心线程的回收,默认只会回收非核心线程。线程池的官方源码还是写得相当巧妙的,阅读难度也不高,推荐小伙伴学习~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/512406.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SWM341系列应用(上位机应用)

SWM341系列之上位机应用 1、分级图像和PNG、JPG的应用 现象&#xff1a;客户使用SWM34SVET6HMI_0.4.1版本上位机进行UI界面布局&#xff0c;反馈在模拟运行时&#xff08;PC端&#xff09;流畅&#xff0c;在Demo平台&#xff08;设备端&#xff09;运行卡顿。 分析及解决&…

基于SpringBoot+微信小程序的图书借阅管理系统(包运行调试)

介绍 系统介绍 是一套图书借阅管理系统&#xff0c;包括用户小程序以及后台管理系统。 前台商城系统包含用户注册登录、首页门户、图书查询、在线借阅、个人中心、我的信息、我的借阅、押金充值。 后台管理系统包含统计分析、用户管理、分类管理、图书管理、借阅管理、管理员…

Unknown redis exception; event execu tor terminated;解决

最近查看服务器日记是不是报发现有台服务器报错&#xff1a; rocessing failed; nested exception is org.springframework.data.redis.RedisSystemException: Unknown redis exception; nested exception is java.util.concurrent.RejectedExecutionException: event execu …

升降梯人数识别摄像机

升降梯人数识别摄像机是一种智能监测设备&#xff0c;主要用于实时识别和计算升降梯内乘客的数量。通过搭载先进的图像识别技术和人工智能算法&#xff0c;该设备可以准确监测乘客进出数量&#xff0c;提供重要数据支持和信息反馈&#xff0c;帮助管理人员有效管理升降梯运行&a…

经久耐用耐强腐蚀PFA材质气体洗涤瓶全氟烷氧基树脂尾气吸收瓶

PFA洗气瓶是一种常用于净化和干燥各种气体的实验室器皿&#xff0c;以去除其中的水分、油脂、颗粒物等杂质&#xff0c;从而使需要用到的气体满足实验要求。 PFA气体吸收瓶 PFA洗气瓶的工作原理&#xff1a; 主要是通过液体吸收、溶解或发生化学反应来去除气体中的杂质。在洗气…

【软件工程】详细设计(二)

这里是详细设计文档的第二部分。前一部分点这里 4. 学生端模块详细设计 学生端模块主要由几个组件构成&#xff1a;学生登录界面&#xff0c;成绩查询界面等界面。因为学生端的功能相对来说比较单一&#xff0c;因此这里只给出两个最重要的功能。 图4.1 学生端模块流程图 4.…

牛客网BC-71 三角形判断(操作符注意事项)

例题如下 这道题的编程很容易实现&#xff0c;但恰恰因为太简单导致容易忘记注意事项 代码如下 #include<stdio.h> int main() {int a 0,b 0,c 0;while(scanf("%d%d%d",&a,&b,&c)!EOF){if(ab>c&&ac>b&&bc>a){ //三…

零基础如何自学人工智能?推荐优秀的学习路径及方法

人工智能&#xff08;AI&#xff09;是一个广泛且复杂的领域&#xff0c;自学AI可能是一项艰巨的任务&#xff0c;但只要有兴趣和决心&#xff0c;这绝对是可能的。以下是一个零基础自学人工智能的学习路径&#xff0c;旨在帮助那些只有兴趣&#xff0c;但缺乏背景知识的人。 *…

C语言第三十九弹---预处理(上)

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】 预处理 1、预定义符号 2、#define定义常量 3、#define定义宏 4、带有副作用的宏参数 5、宏替换的规则 6、宏和函数的对比 总结 在C语言中&#xff0c;预处…

宝塔面板docker管理器安装后,返回docker菜单页,提示当前未安装docker或docker-compose 未安装,再次安装后,依然提示未安装。

宝塔面板docker管理器安装后&#xff0c;返回docker菜单页&#xff0c;提示当前未安装docker或docker-compose 未安装&#xff0c;再次安装后&#xff0c;依然提示未安装。 OS: debian 11 BT: 7.9.8 解答&#xff1a; 您好&#xff0c;服务器终端执行以下命令截图看一下命令…

企业微信企业主体变更认证介绍

企业微信变更主体有什么作用&#xff1f; 说一个自己亲身经历的事情&#xff0c;当时我在一家教育公司做运营&#xff0c;公司所有客户都是通过企业微信对接的。后来行业整顿&#xff0c;公司不得不注销&#xff0c;换了营业执照打算做技能培训&#xff0c;但发现注销后原来的企…

前段之JavaScript——网页的血液!!

目录 一、JavaScript简介 二、JavaScript引入 三、声明变量 四、数据类型 五、运算符 六、函数 七、常用数据操作方法 1、字符串 2、数组 3、对象 八、BOM 九、DOM 一、JavaScript简介 JavaScript是一种用于为网页添加交互功能的脚本语言。它是一种轻量级的、解释…

氟化氢冷凝装置配套PFA烧瓶PFA冷凝管PFA接收瓶等

一、装置清单及说明&#xff1a; 1. PFA烧瓶 材质为PFA&#xff0c;半透明&#xff0c;耐受强酸强碱&#xff0c;常用500ml 1000ml&#xff0c;其他规格等可自行选择&#xff0c;若需要3颈及以上建议选择500ml以上规格&#xff0c;可根据要求选择有液位计&#xff0c;可看出瓶…

3D人脸扫描技术与数字人深度定制服务:赋能打造超写实3D数字分身

在数字时代&#xff0c;3D数字分身有着广泛的应用场景&#xff0c;在动画视频、广告宣传片、大型活动主持人、AI交互数字人等领域&#xff0c;发挥着重要的商业价值。其中&#xff0c;3D人脸扫描技术&#xff0c;推动了超写实3D数字分身的诞生。 公司案例 2023海心沙元宇宙音乐…

酒吧酒馆微信小程序设计基于Java,SpringBoot,Vue和UniApp

摘要 该设计目标是创建一个集成了Java, SpringBoot, Vue和UniApp技术的酒吧微信小程序&#xff0c;为用户提供一个功能全面、操作便捷的服务体验。通过利用SpringBoot的高效微服务架构&#xff0c;后端能够快速处理用户请求&#xff0c;实现酒品浏览、订单管理等核心功能&…

大话设计模式之外观模式

外观模式&#xff08;Facade Pattern&#xff09;是一种软件设计模式&#xff0c;旨在提供一个简单的接口&#xff0c;隐藏系统复杂性&#xff0c;使得客户端能够更容易地使用系统。这种模式属于结构型模式&#xff0c;它通过为多个子系统提供一个统一的接口&#xff0c;简化了…

华为OD机试 - 最大社交距离(Java 2024 C卷 100分)

华为OD机试 2024C卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;A卷B卷C卷&#xff09;》。 刷的越多&#xff0c;抽中的概率越大&#xff0c;每一题都有详细的答题思路、详细的代码注释、样例测试…

央视315推荐的护眼灯有哪些?护眼灯十大品牌推荐

台灯作为家居类不可或缺的一种照明灯具&#xff0c;在我们的日常生活中发挥着重要作用&#xff0c;尤其是对于经常需要在夜晚长时间用眼学习的孩子而言&#xff0c;能够提供充足、明亮的照明&#xff0c;对学习帮助是非常大的。然而台灯的选择也是有讲究的&#xff0c;市面上很…

MongoDB 6.1 及以上版本使用配置文件的方式启动报错 Unrecognized option: storage.journal.enabled

如果你使用的 MongoDB 的版本大于等于 6.1&#xff0c;并且在 MongoDB 的配置文件中编写了如下内容 storage:journal:# 启用或禁用持久性日志以确保数据文件保持有效和可恢复# true 启用&#xff1b;false 不启用# 64 位系统默认启用&#xff0c;启用后 MongoDB 可以在宕机后根…

黄金票据的复现

实验环境以及工具 服务器&#xff1a;Windows server 2003 用户&#xff1a;Windows 7旗舰版 工具&#xff1a;mimikatz 搭建服务器环境 参考&#xff1a;内网横向——域渗透之黄金票据复现-CSDN博客 创建用户 使用gpupdate刷新策略&#xff1b; 搭建win7环境 设置ip ‘…