Java自定义一个线程池

线程池图解 

线程池与主线程之间通过一个阻塞队列来平衡任务分配,阻塞队列中既可以满足线程等待,又要接收主线程的任务。

线程池实现

使用一个双向链表实现任务队列

 创建任务队列

//阻塞队列
public class BlockingQueue<T> {
    //双线链表
    private Deque<T> queue = new ArrayDeque();
    //锁
    private ReentrantLock lock =new ReentrantLock();
    //生产者条件变量
    private Condition fullWaitSet = lock.newCondition();
    //消费者条件变量
    private Condition emptyWaitSet = lock.newCondition();
    //容器容量大小
    private int capacity;

    //阻塞获取
    public T pull(long timeOut, TimeUnit unit){
        lock.lock();
        //判断链表中是否存在任务待处理
        try {
            //将尝试时间转化为纳秒
            long nanos = unit.toNanos(timeOut);
            while (queue.isEmpty()){
                try {
                    if (nanos<0){
                        return null;
                    }
                    //awaitNanos返回结果是最大等待时间减去睡眠时间的剩余时间
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }

    //阻塞添加
    public void put(T element){
        lock.lock();
        try{
            while(queue.size()==capacity){
                //说明满了,暂时无法添加新的任务
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(element);
            emptyWaitSet.signal();
        }finally {
            lock.unlock();
        }
    }

    //获取队列任务数量
    public int size(){
        lock.lock();
        try {
            return queue.size();
        }finally {
            lock.unlock();
        }
    }
}

创建线程池 

public class ThreadPool {
    //任务队列
    private BlockingQueue<Runnable> blockingQueue;
    //线程集合
    private HashSet<Worker> workers = new HashSet();
    //核心线程数
    private int coreNum;
    //超时时间
    private long timeOut;
    private TimeUnit unit;

    public ThreadPool(int coreNum, long timeOut, TimeUnit unit, int queueCapacity) {
        System.out.println("初始化线程池");
        this.coreNum = coreNum;
        this.timeOut = timeOut;
        this.unit = unit;
        this.blockingQueue = new BlockingQueue<>(queueCapacity);
    }

    //线程执行任务
    public void execute(Runnable task) {
        //当线程数没有超过coreNum时,直接交给Worker对象执行,如果超过了coreNum数,则加入BlockingQueue
        synchronized (workers) {
            if (workers.size() < coreNum) {
                Worker worker = new Worker(task);
                System.out.println("新增worker"+worker);
                workers.add(worker);
                worker.start();
            } else {
                System.out.println("从消息队列中获取task");
                blockingQueue.put(task);
            }
        }
    }

    class Worker extends Thread {
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            while (task != null || (task = blockingQueue.pull(timeOut, unit)) != null) {
                try {
                    System.out.println("Worker执行任务");
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            synchronized (workers){
                System.out.println("Worker执行完毕"+this);
                workers.remove(this);
            }
        }
    }
}

测试 

public class Test {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(3,3000, TimeUnit.MILLISECONDS,5);
        for (int i = 0; i < 10; i++) {
            int j = i;
            threadPool.execute(()->{
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("生产任务:"+j);
            });
        }
    }
}

初始化线程池

新增workerThread[Thread-0,5,main]

新增workerThread[Thread-1,5,main]

新增workerThread[Thread-2,5,main]

Worker执行任务

Worker执行任务

加入任务队列TheadPool.Test$$Lambda$1/1078694789@7ba4f24f

Worker执行任务

加入任务队列TheadPool.Test$$Lambda$1/1078694789@3b9a45b3

加入任务队列TheadPool.Test$$Lambda$1/1078694789@7699a589

加入任务队列TheadPool.Test$$Lambda$1/1078694789@58372a00

加入任务队列TheadPool.Test$$Lambda$1/1078694789@4dd8dc3

等待加入任务队列TheadPool.Test$$Lambda$1/1078694789@6d03e736

生产任务:2

生产任务:1

生产任务:0

Worker执行任务

Worker执行任务

加入任务队列TheadPool.Test$$Lambda$1/1078694789@6d03e736

Worker执行任务

加入任务队列TheadPool.Test$$Lambda$1/1078694789@378bf509

生产任务:3

生产任务:4

生产任务:5

Worker执行任务

Worker执行任务

Worker执行任务

生产任务:6

生产任务:8

生产任务:7

Worker执行任务

Worker执行完毕Thread[Thread-1,5,main]

Worker执行完毕Thread[Thread-0,5,main]

生产任务:9

Worker执行完毕Thread[Thread-2,5,main]

添加拒绝策略

上面测试中,有一点不友好的是,当任务队列满了之后,再向其中添加任务时,主线程会死等任务添加成功。

对此我们可以选择多种解决方案

  • 死等
  • 添加超时时间
  • 让调用者方式执行
  • 让调用者抛出异常
  • 让调用者自己执行

创建拒绝策略

@FunctionalInterface
public interface RejectPolicy<T> {
    void reject(BlockingQueue<T> queue,T task);
}

修改线程池的执行方法 

	//添加属性
	private RejectPolicy rejectPolicy;
	//构造方法
	public ThreadPool(int coreNum, long timeOut, TimeUnit unit, int queueCapacity, RejectPolicy rejectPolicy) {
        System.out.println("初始化线程池");
        this.coreNum = coreNum;
        this.timeOut = timeOut;
        this.unit = unit;
        this.blockingQueue = new BlockingQueue<>(queueCapacity);
        this.rejectPolicy = rejectPolicy;
    }

	//线程执行任务
    public void execute(Runnable task) {
        //当线程数没有超过coreNum时,直接交给Worker对象执行,如果超过了coreNum数,则加入BlockingQueue
        synchronized (workers) {
            if (workers.size() < coreNum) {
                Worker worker = new Worker(task);
                System.out.println("新增worker" + worker);
                workers.add(worker);
                worker.start();
            } else {
//                System.out.println("从消息队列中获取task");
//                blockingQueue.put(task);
                blockingQueue.tryPut(rejectPolicy,task);
            }
        }
    }

 任务队列添加方法

    public void tryPut(RejectPolicy rejectPolicy, T task) {
        lock.lock();
        try {
            if (queue.size() == capacity) {
                //如果满了,需要调用拒绝策略
                rejectPolicy.reject(this,task);
            } else {
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        } finally {
            lock.unlock();
        }
    }

测试 

public class Test {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(3,3000,
                TimeUnit.MILLISECONDS,5,
                (queue,task)->{
                    //由调用者决定任务队列满了之后如何处理后续任务
                    queue.put(task);//死等
                    queue.offer(task,1000,TimeUnit.MILLISECONDS);//超时返回
                                    //啥也不干,直接丢弃任务
                    task.run();//调用者自己执行
                    throw new RuntimeException("任务秩序异常");//抛出异常
                });
        for (int i = 0; i < 10; i++) {
            int j = i;
            threadPool.execute(()->{
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("生产任务:"+j);
            });
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/194316.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Mysql数据库多表数据查询问题

1、背景 线上某个业务数据分表存储在10个子表中&#xff0c;现在需要快速按照条件&#xff08;比如时间范围&#xff09;筛选出所有的数据&#xff0c;主要是想做一个可视化的数据查询工具&#xff0c;给产研团队使用。 2、实践 注意&#xff1a;不要在线上真实数据库操作&am…

使用Docker compose方式安装Spug,并结合内网穿透实现远程访问

文章目录 前言1. Docker安装Spug2 . 本地访问测试3. Linux 安装cpolar4. 配置Spug公网访问地址5. 公网远程访问Spug管理界面6. 固定Spug公网地址 前言 Spug 面向中小型企业设计的轻量级无 Agent 的自动化运维平台&#xff0c;整合了主机管理、主机批量执行、主机在线终端、文件…

【Docker】python flask 项目如何打包成 Docker images镜像 上传至阿里云ACR私有(共有)镜像仓库 集成Drone CI

一、Python环境编译 1、处理好venv环境 要生成正常的 requirements.txt 文件&#xff0c;我们就需要先将虚拟环境处理好 创建虚拟环境&#xff08;可选&#xff09;&#xff1a; 在项目目录中&#xff0c;你可以选择使用虚拟环境&#xff0c;这样你的项目依赖将被隔离在一个…

3D点云目标检测:VoxelNex解读(带源码/未完)

VoxelNext 通用vsVoxelNext一、3D稀疏卷积模块1.1、额外的两次下采样1.2、稀疏体素删减 二、高度压缩三、稀疏池化四、head五、waymo数据集训练六、训练自己的数据集bug修改 通用vsVoxelNext 一、3D稀疏卷积模块 1.1、额外的两次下采样 使用通用的3D sparse conv&#xff0c;…

多线激光三维重建

交流联系点击&#xff1a;联系方式

人工智能学习2(python数据清洗)

编译工具&#xff1a;PyCharm 一.数据清洗 转化数据类型、处理重复数据、处理缺失数据 import pandas as pddf pd.read_csv("/data.csv") df.sample(10) # 用于随机获取数据并返回结果 df.head(10) # 查看前十条数据 df.tail(10) # 查看后十条数据 df.shape …

RK3568 android11 实现双路I2C触摸 --gt9xx

一&#xff0c;GT911 触摸屏简介 它的接口类型为 I2C &#xff0c;供电电压和通讯电压均为 3.3V 。这款电容触摸屏内置了上拉电阻&#xff0c;这意味着我们的开发板上与该触摸屏的接口处不需要设置上拉电阻。关于线序&#xff0c;同样是 GT911 &#xff0c;不同批次的器件都有…

从0开始学习JavaScript--JavaScript对象继承深度解析

JavaScript中的对象继承是构建灵活、可维护代码的关键部分。本文将深入讨论JavaScript中不同的继承方式&#xff0c;包括原型链继承、构造函数继承、组合继承等&#xff0c;并通过丰富的示例代码展示它们的应用和差异。通过详细解释&#xff0c;大家可以更全面地了解如何在Java…

vulfocus apache-cve_2021_41773 漏洞复现

vulfocus apache-cve_2021_41773 漏洞复现 名称: vulfocus/apache-cve_2021_41773 描述: Apache HTTP Server 2.4.49、2.4.50版本对路径规范化所做的更改中存在一个路径穿越漏洞&#xff0c;攻击者可利用该漏洞读取到Web目录外的其他文件&#xff0c;如系统配置文件、网站源码…

IDEA中也能用postman了?

Postman是大家最常用的API调试工具&#xff0c;那么有没有一种方法可以不用手动写入接口到Postman&#xff0c;即可进行接口调试操作&#xff1f;今天给大家推荐一款IDEA插件&#xff1a;Apipost Helper&#xff0c;写完代码就可以调试接口并一键生成接口文档&#xff01;而且还…

修改mysql的密码(每一步都图文解释哦)

当你想要连接本机数据库时&#xff0c;是不是有可能突然忘记了自己的数据库密码? 在此文中&#xff0c;我们来详细解决一下如何去修改自己的数据库密码&#xff0c;并使用Navicat来连接测试 1.停止mysql服务 打开终端&#xff0c;键入命令,将mysql服务先停止掉&#xff0c;…

Matlab论文插图绘制模板第128期—函数三维折线图(fplot3)

在之前的文章中&#xff0c;分享了Matlab函数折线图的绘制模板&#xff1a; 进一步&#xff0c;再来分享一下函数三维折线图。 先来看一下成品效果&#xff1a; 特别提示&#xff1a;本期内容『数据代码』已上传资源群中&#xff0c;加群的朋友请自行下载。有需要的朋友可以关…

Linux系统编程:文件系统总结

目录和文件 获取文件属性 获取文件属性有如下的系统调用&#xff0c;下面逐个来分析。 stat:通过文件路径获取属性&#xff0c;面对符号链接文件时获取的是所指向的目标文件的属性 从上图中可以看到stat函数接收一个文件的路径字符串&#xff08;你要获取哪个文件的属性&a…

数据结构和算法-树和二叉树的定义和基本术语和性质

文章目录 树的基本概念和相关术语相关的应用节点间的关系描述节点&#xff0c;树的属性描述有序树vs无序树树vs森林小结 树的相关性质考点1考点2考点3考点4考点5考点6小结 二叉树的相关概念和基本术语重要 &#xff08;五种状态&#xff09;特殊二叉树小结 二叉树的相关性质二叉…

SpringCloudAlibaba之Nacos的持久化和高可用——详细讲解

目录 一、Nacos持久化 1.持久化说明 2.安装mysql数据库5.6.5以上版本(略) 3.修改配置文件 二、nacos高可用 1.集群说明 2.nacos集群架构图 2.集群搭建注意事项 3.集群规划 4.搭建nacos集群 5.安装Nginx 6.配置nginx conf配置文件 7.启动nginx进行测试即可 一、Nacos持久…

【算法】NOIP2003神经网络

题目描述 人工神经网络&#xff08;Artificial Neural Network&#xff09;是一种新兴的具有自我学习能力的计算系统&#xff0c;在模式识别、函数逼近及贷款风险评估等诸多领域有广泛的应用。对神经网络的研究一直是当今的热门方向&#xff0c;兰兰同学在自学了一本神经网络的…

经验分享:JMeter控制RPS

一、前言 ​ RPS (Request Per Second)一般用来衡量服务端的吞吐量&#xff0c;相比于并发模式&#xff0c;更适合用来摸底服务端的性能。我们可以通过使用 JMeter 的常数吞吐量定时器来限制每个线程的RPS。对于RPS&#xff0c;我们可以把他理解为我们的TPS&#xff0c;我们就…

19.Oracle11g中的游标

oracle11g中的游标 一、案例引入二、什么是游标三、隐式游标1、隐式游标的属性2、创建语法3、示例 四、显示游标1、显示游标的属性2、创建语法3、示例 五、REF游标1、REF游标的属性2、创建语法3、示例 六、循环游标1、 循环游标的作用2、用for 与 loop 创建3、示例 一、案例引入…

基于可微分渲染器的相机位置优化【PyTorch3D】

在这个教程中&#xff0c;我们将使用可微渲染学习给定参考图像的相机的 [x, y, z] 位置。 我们将首先使用相机的起始位置初始化渲染器。 然后&#xff0c;我们将使用它来生成图像&#xff0c;使用参考图像计算损失&#xff0c;最后通过整个管道进行反向传播以更新相机的位置。…

【开源】基于Vue+SpringBoot的企业项目合同信息系统

项目编号&#xff1a; S 046 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S046&#xff0c;文末获取源码。} 项目编号&#xff1a;S046&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 合同审批模块2.3 合…