【并发编程】手写线程池阻塞队列

       📝个人主页:五敷有你      
 🔥系列专栏:并发编程
⛺️稳重求进,晒太阳

示意图 

步骤1:自定义任务队列

变量定义

  1. 用Deque双端队列来承接任务
  2. 用ReentrantLock 来做锁
  3. 并声明两个条件变量 Condition fullWaitSet emptyWaitSet
  4. 最后定义容量 capcity

方法:

  1. 添加任务
    1. 注意点:
      1. 任务容量慢了 用await
      2. 每个添加都进行一个emptyWaitSet.signalAll 唤醒沉睡的线程
      3. 考虑万一死等的情况,加入时间的判断
  2. 取出任务
    1. 注意点:
      1. 任务空了 用await
      2. 每个任务取出来都进行一个fullWaitSet.signAll来唤醒沉睡的线程
      3. 考虑超时的情况,加入时间的判断
public class MyBlockQueue<T> {
    //1.任务队列
    private Deque<T> deque=new ArrayDeque();
    //2.锁
    private ReentrantLock lock=new ReentrantLock();

    //3.生产者条件变量
    private Condition fullWaitSet=lock.newCondition();
    //4.消费者条件变量
    private Condition emptyWaitSet=lock.newCondition();

    //5.容量
    private int capcity;

    public MyBlockQueue(int capcity) {
        this.capcity = capcity;
    }

    //带超时的阻塞获取
    public T poll(long timeOut, TimeUnit unit){

        lock.lock();
        try {
            //将timeOUt转换成统一转换为ns
            long nanos = unit.toNanos(timeOut);

            while (deque.isEmpty()) {
                try {
                    //返回值=等待时间-经过的时间
                    if(nanos<=0){
                        return null;
                    }
                   nanos= emptyWaitSet.awaitNanos(nanos);
                }catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            T t = deque.removeFirst();
            fullWaitSet.signalAll();
            return t;
        }finally {
            lock.unlock();
        }



    }

    //6. 阻塞获取
    public T take() {
        lock.lock();
        try {
            while (deque.isEmpty()) {
                try {
                    emptyWaitSet.await();
                }catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
         T t = deque.removeFirst();
            fullWaitSet.signalAll();
            return t;
        }finally {
            lock.unlock();
        }




    }





    //阻塞添加
    public void put(T element){
        lock.lock();
        try {
            while (deque.size()==capcity){
                try {
                    fullWaitSet.await();
                }catch (Exception e){

                }
             }
            deque.addLast(element);
            emptyWaitSet.signalAll();

        } finally {
            lock.unlock();
        }
    }
    public int size(){
        lock.lock();
        try {
            return deque.size();
        }finally {
            lock.unlock();
        }
    }
    }

步骤2:自定义线程池

  1. 定义变量:
    1. 任务队列 taskQueue
    2. 队列的容量
    3. 线程的集合
    4. 核心线程数
    5. 获取任务的超时时间
    6. 时间单位
  2. 方法
    1. 构造方法 初始化一些核心的参数
    2. 执行方法 execute(task) 里面处理任务
      1. 每执行一个任务就放入一个worker中,并开启线程执行 同时放入workers集合中
      2. 当任务数量>核心数量时,就加入到阻塞队列中
  3. 自定义的类worker
    1. 继承Thread 重写Run方法
      1. 执行传递的任务,每次任务执行完毕,不回收,
      2. 去队列中拿任务 当队列也空了之后 workers集合中移除线程,线程停止。
package com.aqiuo.juc;

import java.util.HashSet;
import java.util.concurrent.TimeUnit;

public class ThreadPool {
    //任务队列
    private MyBlockQueue<Runnable> taskQueue;
    //队列容量
    int queueCapcity;
    //线程集合
    private HashSet<Worker> workers=new HashSet();

    //线程池的核心线程
    private int coreSize;

    //获取任务的超时时间
    private long timeOut;

    //时间单位
    private TimeUnit timeUnit;

    public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit,int queueCapcity) {
        this.coreSize = coreSize;
        this.timeOut = timeOut;
        this.timeUnit = timeUnit;
        taskQueue=new MyBlockQueue<>(queueCapcity);
    }

    public void exectue(Runnable task){
            //当任务数没有超过coreSize时,直接交给work对象执行
            //如果任务超过coreSize时,加入任务队列
      synchronized (workers){
          if(workers.size()<coreSize){
              Worker worker=new Worker(task);
              System.out.println("新增worker");
              workers.add(worker);
              worker.start();
              //任务数超过了核心数
          }else{
              System.out.println(task+"加入任务队列");
              taskQueue.put(task);
          }
      }


    }

    class Worker extends Thread{

        private Runnable task;
        public Worker(Runnable task){
            this.task=task;
        }

        @Override
        public void run() {
            //执行任务
            //1)当task不为空,执行任务
            //2)当task执行完毕,再接着从任务队列中获取任务

            while (task!=null||(task=taskQueue.take())!=null){
                try {
                    System.out.println("正在执行worker"+this);
                    sleep(10000);
                    task.run();
                } catch (Exception e) {

                }finally {
                    task=null;
                }

            }

            //执行完任务后销毁线程
            synchronized (workers){
                workers.remove(this);
            }

        }
    }

    


}

测试

开启15个线程测试

public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);
        for (int i=0;i<15;i++){
            int j=i;
            threadPool.exectue(()->{
                System.out.println(j);
            });
        }
    }

        执行过程中,超过了队列容量之后,就会发生fullWaitSet阻塞。这个阻塞的线程就开始等待,当有队列不满之后,唤醒fullWaitSet阻塞的队列,

        同理,当队列为空,emptyWaitSet小黑屋阻塞,当有任务被放入,EmptyWaitSet唤醒所有的线程。

这就有一个执行完毕之后,线程不会停止,他会一定等待拿去任务,线程阻塞了EmptyWaitSet

改进

获取任务的超时结束

获取任务take的增强 超时

  //带超时的阻塞获取
    public T poll(long timeOut, TimeUnit unit){

        lock.lock();
        try {
            //将timeOUt转换成统一转换为ns
            long nanos = unit.toNanos(timeOut);

            while (deque.isEmpty()) {
                try {
                    //返回值=等待时间-经过的时间
                    if(nanos<=0){
                        return null;
                    }
                   nanos= emptyWaitSet.awaitNanos(nanos);
                }catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            T t = deque.removeFirst();
            fullWaitSet.signalAll();
            return t;
        }finally {
            lock.unlock();
        }



    }

修改worker的run函数

      public void run() {
            //执行任务
            //1)当task不为空,执行任务
            //2)当task执行完毕,再接着从任务队列中获取任务
//            while (task!=null||(task=taskQueue.take())!=null){
            //修改如下
            while (task!=null||(task=taskQueue.poll(timeOut,timeUnit))!=null){
                try {
                    System.out.println("正在执行worker"+this);
                    sleep(1000);
                    task.run();
                } catch (Exception e) {

                }finally {
                    task=null;
                }

            }

正常结束了

放入任务的超时结束offer()

那么有装入任务 的增强 ,就再提供一个超时装入入offer()吧 ,当放入一个满的队列时,超时后返回false不再放入

//带有超时的队列添加
public Boolean offer(T element,long timeOut, TimeUnit unit){
    lock.lock();
    long nanos = unit.toNanos(timeOut);
    try {

        while (deque.size()==capcity){
            try {
                long l = fullWaitSet.awaitNanos(nanos);
                if(l<=0){
                    return false;
                }

            }catch (Exception e){

            }
        }
        deque.addLast(element);
        emptyWaitSet.signalAll();
        return true;
    } finally {
        lock.unlock();
    }
}

拒绝策略

函数式接口

@FunctionalInterface // 拒绝策略
interface RejectPolicy<T> {
    void reject(MyBlockQueue<T> queue, T task);
}

代码改进

如下部分代码是存入任务的部分

public void exectue(Runnable task){
            //当任务数没有超过coreSize时,直接交给work对象执行
            //如果任务超过coreSize时,加入任务队列
      synchronized (workers){
          if(workers.size()<coreSize){
              Worker worker=new Worker(task);
              System.out.println("新增worker");
              workers.add(worker);
              worker.start();
              //任务数超过了核心数
          }else{
              //存入任务
              //taskQueue.put(task);
              //当队列满了之后 执行的策略
              //1) 死等
              //2)带有超时的等待
              //3)当调用者放弃任务执行
              //4)让调用者抛出异常
              //5)让调用者自己执行任务...
              //为了增加灵活性,这里不写死,交给调用者
              //重新写了一个放入任务的方法
              taskQueue.tryPut(rejectPolicy,task);
          }
      }

    }

阻塞队列里的tryPut

public void tryPut(ThreadPool.RejectPolicy<T> rejectPolicy, T task) {

    lock.lock();

    try {
        //如果队列容量满了,就开始执行拒绝策略
        if(capcity>= deque.size()){
            rejectPolicy.reject(this,task);

        }else{
            //不满就正常加入到队列中
            System.out.println(task+"正常加入到队列");
            deque.addLast(task);
        }
    }finally {
        lock.unlock();
    }

}

//1) 死等

//2)带有超时的等待

//3)当调用者放弃任务执行

//4)让调用者抛出异常

//5)让调用者自己执行任务...

谁调用方法,谁写拒绝策略

为了传入策略,就再构造函数里面加入一个方法的参数传入

//部分代码...
//拒绝策略
RejectPolicy<Runnable> rejectPolicy;

public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit,int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {
    this.coreSize = coreSize;
    this.timeOut = timeOut;
    this.timeUnit = timeUnit;
    taskQueue=new MyBlockQueue<>(queueCapcity);
    this.rejectPolicy=rejectPolicy;
}

主函数编写拒绝的策略,就lamda表达式会把...

public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1,(queue,task)->{
            //死等
//            queue.put(task);
            //超时添加
//            System.out.println(queue.offer(task, 100, TimeUnit.NANOSECONDS));
            //放弃执行
//            System.out.print("我放弃");
            //调用者抛出异常
//            throw new RuntimeException("任务执行失败");
            //调用者执行
//            task.run();


        });
        for (int i=0;i<5;i++){
            int j=i;
            threadPool.exectue(()->{
                System.out.println(j);
            });
        }
    }

五种拒绝策略的结果(我不会用slog4j)

1.死等的结果

2.超时拒绝的结果(每个false都是时间到了,每加进去)

3.不作为,调用者放弃任务

4.抛出异常,停止

5.调用者线程执行了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/374492.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

PDF文件格式(一):交叉引用流

在PDF-1.5版本之前&#xff0c;对象的交叉引用信息是存储在交叉引用表(cross-reference table)中的。在PDF-1.5版本之后&#xff0c;引进了交叉引用流(cross-reference stream)对象&#xff0c;可以用它来存储对象的交叉引用信息&#xff0c;就像交叉引用表的功能一样。 采用交…

鸿蒙开发系列教程(十一)--布局应用:层叠布局

层叠布局 &#xff08;Stack&#xff09; 层叠布局&#xff08;StackLayout&#xff09;用于在屏幕上预留一块区域来显示组件中的元素&#xff0c;提供元素可以重叠的布局。层叠布局通过stack容器组件实现位置的固定定位与层叠&#xff0c;容器中的子元素&#xff08;子组件&…

Python进阶----在线翻译器(Python3的百度翻译爬虫)

目录 一、此处需要安装第三方库requests: 二、抓包分析及编写Python代码 1、打开百度翻译的官网进行抓包分析。 2、编写请求模块 3、输出我们想要的消息 三、所有代码如下&#xff1a; 一、此处需要安装第三方库requests: 在Pycharm平台终端或者命令提示符窗口中输入以下代…

JVM-JVM内存结构(二)

堆 堆(Heap) 通过new关键字&#xff0c;创建的对象都会使用堆内存特点&#xff1a; 他是线程共享的&#xff0c;堆中的对象需要考虑线程安全的问题有垃圾回收机制 堆内存溢出(OutOfMemoryError) 代码演示 List<String> list new ArrayList<>(); try{String …

【已解决】pt文件转onnx后再转rknn时得到推理图片出现大量锚框变花屏

前言 环境介绍&#xff1a; 1.编译环境 Ubuntu 18.04.5 LTS 2.RKNN版本 py3.8-rknn2-1.4.0 3.单板 迅为itop-3568开发板 一、现象 采用yolov5训练并将pt转换为onnx&#xff0c;再将onnx采用py3.8-rknn2-1.4.0推理转换为rknn&#xff0c;rknn模型能正常转换&#xff0c;…

阿里地址标准化相关能力

阿里云地址标准化服务入口 1地址标准化概念 阿地址标准化&#xff08;Address Purification&#xff09;是一站式闭环地址数据处理和服务平台产品&#xff0c;依托阿里云海量的地址语料库&#xff0c;针对各行业业务系统所登记的地址数据&#xff0c;进行纠错、补全、归一、结…

【QT】opcuaServer 的构建

【QT】opcuaServer 的构建 前言opcuaServer实现测试 前言 在博文【opcua】从编译文件到客户端的收发、断连、节点查询等实现 中&#xff0c;我们已经介绍了如何在QT 中创建opucaClient 。在本期的博文中&#xff0c;我们基于之前的部署环境&#xff0c;介绍一下如何构建opcuaS…

Windows 离线安装MySQL8教程

本文描述了在Windows 11 上离线安装MySQL8的方法本方法同样使用与Win10、Windows Server等本文仅供参考&#xff0c;请理解后安装和优化 一、下载文件 官方下载路径 https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-8.0.36-winx64.zip百度网盘缓存 微信小程序搜索“数字…

arcgis各种版本下载

arcgic 下载&#xff01;&#xff01;&#xff01; ArcGIS是一款地理信息系统软件&#xff0c;由美国Esri公司开发。它提供了一系列完整的GIS功能&#xff0c;包括地图制作、空间数据管理、空间分析、空间信息整合、发布与共享等。ArcGIS是一个可扩展的GIS平台&#xff0c;提供…

复杂人像背景分割解决方案

随着人工智能和图像处理技术的不断发展&#xff0c;人像处理已成为企业宣传、产品展示、线上教育等领域不可或缺的一环。然而&#xff0c;面对复杂多变的人像背景&#xff0c;如何实现精准、高效的分割&#xff0c;一直是困扰企业的技术难题。为此&#xff0c;美摄科技凭借其领…

【Java程序设计】【C00240】基于Springboot的班级综合测评管理系统(有论文)

基于Springboot的班级综合测评管理系统&#xff08;有论文&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的班级综合测评管理系统 本系统分为学生功能模块、管理员功能模块以及教师功能模块。 管理员功能模块&#xff1a;管理员功能…

LeetCode、17. 电话号码的字母组合【中等,dfs回溯】

文章目录 前言LeetCode、17. 电话号码的字母组合【中等&#xff0c;dfs回溯】题目与类型思路递归回溯优化&#xff1a;StringBuilder来回溯补充代码&#xff1a;2024.1.31&#xff08;简化&#xff09; 资料获取 前言 博主介绍&#xff1a;✌目前全网粉丝2W&#xff0c;csdn博…

艺术创作和生活的关系

艺术出现在生产劳作中并体现出人们生活、工作、学习中&#xff0c;使人们在不受限制随意发挥缔造发明能力的体现&#xff0c;独立的精神活动领域在它逐渐演变进步的历程中越来越明显&#xff0c;也是一个人精神思想生活中很重要的一部分。艺术随着社会发展而发展。一件完美的艺…

银行数据仓库体系实践(17)--数据应用之营销分析

营销是每个银行业务部门重要的工作任务&#xff0c;银行产品市场竞争激烈&#xff0c;没有好的营销体系是不可能有立足之地&#xff0c;特别是随着互联网金融发展,金融脱媒”已越来越普遍&#xff0c;数字化营销方兴未艾&#xff0c;银行的营销体系近些年也不断发展&#xff0c…

Pyhton专项进阶——http协议、cookie、session和认证-3

关于cookie的报文首部相关属性熟悉后&#xff0c;下面就是实际应用。 使用cookie实现用户登录验证&#xff08;初步&#xff09;&#xff1a; 思路&#xff08;一&#xff09;&#xff1a;显示登录页面&#xff0c;输入用户和密码&#xff0c;后端验证&#xff0c;如果验证通…

Docker下安装GitLab

极狐GitLab Docker 镜像 | 极狐GitLab 安装所需最小配置 内存至少4G 系统内核至少3.10以上 uname -r 命令可以查看系统内核版本 安装Docker 1.更新 yum源 yum update 2.安装依赖(如果在操作第三步的时候提示yum-config-manager 未找到命令 就安装下面依赖) yum instal…

[UI5 常用控件] 07.SplitApp,SplitContainer

文章目录 前言1. SplitApp1.1 组件结构1.2 Demo1.3 mode属性 2. SplitContainer 前言 本章节记录常用控件SplitApp&#xff0c;SplitContainer。主要功能是在左侧显示Master页面&#xff0c;右侧显示Detail页面。 Master页面和Detail页面可以由多个Page组成&#xff0c;并支持…

【51单片机】LED的三个基本项目(LED点亮&LED闪烁&LED流水灯)(3)

前言 大家好吖&#xff0c;欢迎来到 YY 滴单片机系列 &#xff0c;热烈欢迎&#xff01; 本章主要内容面向接触过单片机的老铁 主要内容含&#xff1a; 欢迎订阅 YY滴C专栏&#xff01;更多干货持续更新&#xff01;以下是传送门&#xff01; YY的《C》专栏YY的《C11》专栏YY的…

ShardingSphere实现openGauss分布式架构

本文档采用openGauss结合ShardingSphere中间件的架构&#xff0c;实现openGauss数据库分布式OLAP场景的环境部署。 术语说明&#xff1a; 开源数据库引擎&#xff1a;openGauss shardingsphere Proxy&#xff1a;定位为透明化的数据库代理端&#xff0c;提供封装了数据库二进…

必看!第六版CCF(中国计算机学会)推荐B类国际学术会议!

中国计算机学会 中国计算机学会&#xff08;CCF&#xff09;是全国性、学术性、非营利的学术团体&#xff0c;由从事计算机及相关科学技术领域的个人和单位自愿组成。作为独立社团法人&#xff0c;CCF是中国科学技术协会的成员之一&#xff0c;是全国一级学会&#xff01; CCF的…