JUC:手写实现一个简易的线程池(Java)

目录

​编辑

先上完整代码:

解析:

任务队列:

线程池类:

拒绝策略:


先上完整代码:

public class MyThreadPool {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2, 1000,TimeUnit.MICROSECONDS, 10, (queue, task) -> {
            // 1.死等
            queue.put(task);
            // 2.带超时时间等待加入等待队列
            // queue.offer(task, 500, TimeUnit.MICROSECONDS);
            // 3.放弃任务
            // 队列满了,没做人任何事情
            // 4.抛出异常
            // throw new RuntimeException("任务执行失败" + task);
            // 5.让调用者自己执行
            // task.run();
        });
        for (int i = 0; i < 15; i++) {
            int j = i;
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(j);
            });
        }
    }
}

// 拒接策略
@FunctionalInterface
interface RejectPolicy<T> {
    void reject(BlockQueue queue, T task) ;
}
class ThreadPool {
    // 任务队列
    private BlockQueue<Runnable> taskQueue;

    // 线程集合
    private HashSet<Worker> workers = new HashSet();

    // 线程数
    private int coreSize;

    private long timeout;

    private TimeUnit timeUnit;

    private RejectPolicy<Runnable> rejectPolicy;
    // 构造方法
    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueSize, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockQueue<>(queueSize);
        this.rejectPolicy = rejectPolicy;
    }

    public void execute(Runnable task) {
        // 当任务数没有超过核心数时,直接交给woker对象执行
        // 如果超过,放入任务队列中存起来
        synchronized (workers) { // workers不安全,把他锁起来
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                System.out.println("新增worker");
                workers.add(worker); // 加入线程集合
                worker.start();
            } else {
                // taskQueue.put(task); // 任务添加进入
                // 1.死等
                // 2.带超时时间等待
                // 3.放弃任务
                // 4.抛出异常
                // 5.让调用者自己执行
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }
    class Worker extends Thread{
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 当task任务不为空,执行
            // 当任务为空,去任务队列中去取
            //  while (task != null || (task = taskQueue.take()) != null) 一直等待获取
            while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
                try {
                    System.out.println("正在执行" + task);
                    task.run();
                } catch (Exception e) {

                } finally {
                    task = null;
                }
            }
            synchronized (workers) {
                System.out.println("worker被移除" + this);
                workers.remove(this); // 移除当前集合对象
            }
        }
    }
}

// 阻塞队列
class BlockQueue<T> {
    // 任务队列
    private Deque<T> queue = new ArrayDeque<>();

    // 锁
    private ReentrantLock lock = new ReentrantLock();

    // 满了等待,生产者
    private Condition fullWaitSet = lock.newCondition();

    // 空的等待,消费者
    private Condition emptyWaitSet = lock.newCondition();

    // 容量
    private int capacity;

    public BlockQueue(int capacity) {
        this.capacity = capacity;
    }

    // 阻塞队列中获取任务
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await(); // 进入等待
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal(); // 唤醒
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 阻塞队列中添加任务
    public void put(T t) {
         lock.lock();
         try {
             while (queue.size() == capacity) { // 如果满了,进入等待
                 try {
                     System.out.println("等待加入任务队列" +  t);
                     fullWaitSet.await();
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
             }
             System.out.println("加入任务队列" + t);
             queue.addLast(t);
             emptyWaitSet.signal(); // 唤醒
         }finally {
             lock.unlock();
         }
    }

    public int size() {
        lock.lock();
        try {
            return queue.size();
        }finally {
            lock.unlock(); // 就算return也会执行
        }
    }

    // 带超时时间的获取,无需永久的等待了
    public T poll (long timeout, TimeUnit unit) {
        lock.lock();
        try {
            long nanos = unit.toNanos(timeout); // 时间转换为ns
            while (queue.isEmpty()) {
                try {
                    if (nanos <= 0) return null; // 超时了,直接返回吧
                    nanos = emptyWaitSet.awaitNanos(nanos);// 进入等待,超时不再等待,返回结果为剩余等待时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal(); // 唤醒
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 带超时时间的添加, return 添加成功 or 失败
    public boolean offer(T task, long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == capacity) { // 如果满了,进入等待
                try {
                    System.out.println("等待加入任务队列" +  task);
                    if (nanos <= 0) return false;
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("加入任务队列" + task);
            queue.addLast(task);
            emptyWaitSet.signal(); // 唤醒
            return true;
        }finally {
            lock.unlock();
        }
    }

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            // 判断队列是否已满
            if (queue.size() == capacity) { // 有空闲
                rejectPolicy.reject(this, task); // 拒绝策略
            } else { // 有空闲
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        }finally {
            lock.unlock();
        }
    }
}

解析:

任务队列:

// 阻塞队列
class BlockQueue<T> {
    // 任务队列
    private Deque<T> queue = new ArrayDeque<>();

    // 锁
    private ReentrantLock lock = new ReentrantLock();

    // 满了等待,生产者
    private Condition fullWaitSet = lock.newCondition();

    // 空的等待,消费者
    private Condition emptyWaitSet = lock.newCondition();

    // 容量
    private int capacity;

    public BlockQueue(int capacity) {
        this.capacity = capacity;
    }

    // 阻塞队列中获取任务
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await(); // 进入等待
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal(); // 唤醒
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 阻塞队列中添加任务
    public void put(T t) {
         lock.lock();
         try {
             while (queue.size() == capacity) { // 如果满了,进入等待
                 try {
                     System.out.println("等待加入任务队列" +  t);
                     fullWaitSet.await();
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
             }
             System.out.println("加入任务队列" + t);
             queue.addLast(t);
             emptyWaitSet.signal(); // 唤醒
         }finally {
             lock.unlock();
         }
    }

    public int size() {
        lock.lock();
        try {
            return queue.size();
        }finally {
            lock.unlock(); // 就算return也会执行
        }
    }

    // 带超时时间的获取,无需永久的等待了
    public T poll (long timeout, TimeUnit unit) {
        lock.lock();
        try {
            long nanos = unit.toNanos(timeout); // 时间转换为ns
            while (queue.isEmpty()) {
                try {
                    if (nanos <= 0) return null; // 超时了,直接返回吧
                    nanos = emptyWaitSet.awaitNanos(nanos);// 进入等待,超时不再等待,返回结果为剩余等待时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal(); // 唤醒
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 带超时时间的添加, return 添加成功 or 失败
    public boolean offer(T task, long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == capacity) { // 如果满了,进入等待
                try {
                    System.out.println("等待加入任务队列" +  task);
                    if (nanos <= 0) return false;
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("加入任务队列" + task);
            queue.addLast(task);
            emptyWaitSet.signal(); // 唤醒
            return true;
        }finally {
            lock.unlock();
        }
    }

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            // 判断队列是否已满
            if (queue.size() == capacity) { // 有空闲
                rejectPolicy.reject(this, task); // 拒绝策略
            } else { // 有空闲
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        }finally {
            lock.unlock();
        }
    }
}
  1.  ArrayDeque 作为底层数据结构存储队列元素。
  2.  ReentrantLock 实现了线程安全。
  3. Condition 来实现阻塞等待机制,当队列为空时,消费者线程等待;当队列满时,生产者线程等待。
  4. 常规的入队 put()、出队 take() 操作。
  5. 带有超时的入队 offer() 和出队 poll() 操作。
  6. tryPut() 方法,该方法接受一个 RejectPolicy 接口,用于指定当队列已满时的拒绝策略

方法:

  • take(): 当队列为空时,消费者线程调用该方法将进入等待状态,直到队列中有元素可取。
  • put(T t): 当队列已满时,生产者线程调用该方法将进入等待状态,直到队列有空位可添加元素。
  • poll(long timeout, TimeUnit unit): 带有超时的出队操作,当队列为空时,会等待一段时间,如果在指定时间内仍未有元素可取,则返回 null。
  • offer(T task, long timeout, TimeUnit timeUnit): 带有超时的入队操作,当队列已满时,会等待一段时间,如果在指定时间内仍未有空位可添加元素,则返回 false。
  • tryPut(RejectPolicy<T> rejectPolicy, T task): 尝试添加元素,当队列已满时,根据拒绝策略 RejectPolicy 进行处理。

单看其实就是一个生产者消费者模式而已。

线程池类:

class ThreadPool {
    // 任务队列
    private BlockQueue<Runnable> taskQueue;

    // 线程集合
    private HashSet<Worker> workers = new HashSet();

    // 线程数
    private int coreSize;

    private long timeout;

    private TimeUnit timeUnit;

    private RejectPolicy<Runnable> rejectPolicy;
    // 构造方法
    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueSize, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockQueue<>(queueSize);
        this.rejectPolicy = rejectPolicy;
    }

    public void execute(Runnable task) {
        // 当任务数没有超过核心数时,直接交给woker对象执行
        // 如果超过,放入任务队列中存起来
        synchronized (workers) { // workers不安全,把他锁起来
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                System.out.println("新增worker");
                workers.add(worker); // 加入线程集合
                worker.start();
            } else {
                // taskQueue.put(task); // 任务添加进入
                // 1.死等
                // 2.带超时时间等待
                // 3.放弃任务
                // 4.抛出异常
                // 5.让调用者自己执行
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }
    class Worker extends Thread{
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 当task任务不为空,执行
            // 当任务为空,去任务队列中去取
            //  while (task != null || (task = taskQueue.take()) != null) 一直等待获取
            while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
                try {
                    System.out.println("正在执行" + task);
                    task.run();
                } catch (Exception e) {

                } finally {
                    task = null;
                }
            }
            synchronized (workers) {
                System.out.println("worker被移除" + this);
                workers.remove(this); // 移除当前集合对象
            }
        }
    }
}
  1. BlockQueue<Runnable> 来存储待执行的任务。
  2. HashSet<Worker> 来存储线程集合。
  3. 提供构造方法来初始化线程池的核心线程数、超时时间、任务队列大小和拒绝策略。
  4. execute(Runnable task) 方法来提交任务到线程池中执行。
  5. 内部定义了 Worker 内部类,用于执行任务的线程。

方法:

  • execute(Runnable task): 提交任务到线程池中执行。如果当前线程数小于核心线程数,则直接创建新的 Worker 线程执行任务;如果当前线程数已达到核心线程数,则尝试将任务放入任务队列中,根据拒绝策略 rejectPolicy 进行处理。
  • Worker: 内部类实现了线程执行任务的逻辑。在 run() 方法中,线程会不断从任务队列中取出任务执行,如果队列为空则会等待一段时间,超时时间由 timeouttimeUnit 决定。

拒绝策略:

函数式接口,由使用者提供实现。

// 拒接策略
@FunctionalInterface
interface RejectPolicy<T> {
    void reject(BlockQueue queue, T task) ;
}
```java
public class MyThreadPool {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2, 1000,TimeUnit.MICROSECONDS, 10, (queue, task) -> {
            // 1.死等
            queue.put(task);
            // 2.带超时时间等待加入等待队列
            // queue.offer(task, 500, TimeUnit.MICROSECONDS);
            // 3.放弃任务
            // 队列满了,没做人任何事情
            // 4.抛出异常
            // throw new RuntimeException("任务执行失败" + task);
            // 5.让调用者自己执行
            // task.run();
        });
        for (int i = 0; i < 15; i++) {
            int j = i;
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(j);
            });
        }
    }
}

几种拒绝策略实现:

  1. 死等(Blocking): 当任务队列已满时,线程池会一直等待直到有空位。这里使用了 queue.put(task),该方法会阻塞当前线程直到队列有空位可用。

  2. 带超时时间等待(Timeout Blocking): 当任务队列已满时,线程池会等待一段时间,如果在指定时间内仍未有空位可用,则放弃当前任务。这里使用了 queue.offer(task, 500, TimeUnit.MICROSECONDS),该方法会在指定时间内等待,如果超时则返回 false。

  3. 放弃任务(Discard): 当任务队列已满时,线程池会放弃当前任务,不做任何处理。

  4. 抛出异常(Throw Exception): 当任务队列已满时,线程池会抛出异常,通知调用者任务执行失败。

  5. 让调用者自己执行(Caller Runs): 当任务队列已满时,不在线程池内执行任务,而是由调用者自己执行任务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/524298.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

分布式主键ID生成策略

业务系统对分布式ID的要求 唯一性&#xff1a;在分布式系统中&#xff0c;每个节点都需要生成唯一的标识符来确保数据的唯一性。传统的单点生成ID方式无法满足分布式环境下的需求&#xff0c;而分布式ID能够在整个系统中保证每个节点生成的ID都是唯一的。 顺序性&#xff1a;某…

Centos7源码方式安装Elasticsearch 7.10.2单机版

下载 任选一种方式下载 官网7.10.2版本下载地址&#xff1a; https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.10.2-linux-x86_64.tar.gz 网盘下载链接 链接&#xff1a;https://pan.baidu.com/s/1EJvUPGVOkosRO2PUaKibaA?pwdbnqi 提取码&#x…

Lumos学习王佩丰Excel第二讲:单元格格式设置

今天学会GIF录制了&#xff0c;分享知识会更简便一些&#xff0c;话不多说&#xff0c;开始吧~ 一、美化表格 1、设置单元格格式的路径 从菜单栏进入&#xff1a; 选中区域&#xff08;单元格&#xff09;- 右键“设置单元格格式”&#xff1a; 2、合并单元格 合并一行 批量…

大话设计模式——10.适配器模式(Adapter Pattern)

简介 将一个类的接口转换成客户希望的另外一个接口。使得原本由于接口不兼容而不能一起工作的那些类可以一起工作。 UML图&#xff1a; 应用场景 系统需要使用现有的类&#xff0c;而这些类的接口不符合系统的需要java中的JDBC、InputStreamReader 示例 翻译员的主要工作也…

解决Android Studio Loading Devices问题

目录 一、解决办法&#xff08;普通&#xff09;&#xff1a; 二、解决办法的优化 三、解决办法的进一步优化 问题&#xff1a;windows 11 电脑&#xff0c;每次开机&#xff0c;打开Android Studio,都会显示Loading Devices&#xff0c;连接不上设备。 原因&#xff1a;adb…

性能优化-如何爽玩多线程来开发

前言 多线程大家肯定都不陌生&#xff0c;理论滚瓜烂熟&#xff0c;八股天花乱坠&#xff0c;但是大家有多少在代码中实践过呢&#xff1f;很多人在实际开发中可能就用用Async&#xff0c;new Thread()。线程池也很少有人会自己去建&#xff0c;默认的随便用用。在工作中大家对…

STL--list如何实现元素的插入和删除

在C标准模板库&#xff08;STL&#xff09;中&#xff0c;std::list 是一个双向链表。由于它的双向链表特性&#xff0c;std::list 支持在任何位置高效地插入和删除元素。 元素插入&#xff1a; ●使用 push_back() 在列表尾部添加元素&#xff1b; ●使用 push_front() 在列表…

hadoop分布式计算组件

什么是计算、分布式计算&#xff1f; 计算&#xff1a;对数据进行处理&#xff0c;使用统计分析等手段得到需要的结果 分布式计算&#xff1a;多台服务器协同工作&#xff0c;共同完成一个计算任务 分布式计算常见的2种工作模式 分散->汇总(MapReduce就是这种模式)中心调…

在NBA我需要翻译--适配器模式

1.1 在NBA我需要翻译&#xff01; "你说姚明去了几年&#xff0c;英语练出来了哦&#xff0c;我看教练在那里布置战术&#xff0c;他旁边也没有翻译的&#xff0c;不住点头&#xff0c;瞧样子听懂没什么问题了。" "要知道&#xff0c;最开始&#xff0c…

(学习日记)2024.04.04:UCOSIII第三十二节:计数信号量实验

写在前面&#xff1a; 由于时间的不足与学习的碎片化&#xff0c;写博客变得有些奢侈。 但是对于记录学习&#xff08;忘了以后能快速复习&#xff09;的渴望一天天变得强烈。 既然如此 不如以天为单位&#xff0c;以时间为顺序&#xff0c;仅仅将博客当做一个知识学习的目录&a…

激光雷达和相机的联合标定工具箱[cam_lidar_calibration]介绍

激光雷达和相机的联合标定工具箱[cam_lidar_calibration]介绍 写在前面安装过程调试过程标定成功可视化展示 写在前面 激光雷达和相机联合标定工具 论文地址&#xff1a;https://ieeexplore.ieee.org/stamp/stamp.jsp?tp&arnumber9564700 github地址: https://github.com…

设计方案:914-基于64路AD的DBF波束形成硬件

一、硬件概述 &#xff24;&#xff22;&#xff26;技术的实现全部是在数字域实现&#xff0c;然而天线阵列接收的信号经过多次混频后得到的中频信号是模拟信号&#xff0c;实现&#xff24;&#xff22;&#xff26;处理并充分发挥&#xff24;&#xff22;&…

[C++][算法基础]模拟堆(堆)

维护一个集合&#xff0c;初始时集合为空&#xff0c;支持如下几种操作&#xff1a; I x&#xff0c;插入一个数 x&#xff1b;PM&#xff0c;输出当前集合中的最小值&#xff1b;DM&#xff0c;删除当前集合中的最小值&#xff08;数据保证此时的最小值唯一&#xff09;&…

【LeetCode热题100】153. 寻找旋转排序数组中的最小值(二分)

一.题目要求 已知一个长度为 n 的数组&#xff0c;预先按照升序排列&#xff0c;经由 1 到 n 次 旋转 后&#xff0c;得到输入数组。例如&#xff0c;原数组 nums [0,1,2,4,5,6,7] 在变化后可能得到&#xff1a; 若旋转 4 次&#xff0c;则可以得到 [4,5,6,7,0,1,2]若旋转 7…

Flutter应用发布前的关键iOS设备测试策略

大家好&#xff0c;我是咕噜铁蛋&#xff01;今天我想和大家分享一下关于Flutter应用在发布前&#xff0c;如何进行关键iOS设备测试的策略。随着移动应用的普及&#xff0c;Flutter作为一种跨平台的开发框架&#xff0c;越来越受到开发者的青睐。但是&#xff0c;跨平台也意味着…

10 Python进阶:MongoDB

MongoDb介绍 MongoDB是一个基于分布式架构的文档数据库&#xff0c;它使用JSON样式的数据存储&#xff0c;支持动态查询&#xff0c;完全索引。MongoDB是NoSQL数据库的一种&#xff0c;主要用于处理大型、半结构化或无结构化的数据。以下是MongoDB数据库的一些关键特点和优势&a…

蓝桥杯 历届真题 杨辉三角形【第十二届】【省赛】【C组】

资源限制 内存限制&#xff1a;256.0MB C/C时间限制&#xff1a;1.0s Java时间限制&#xff1a;3.0s Python时间限制&#xff1a;5.0s 思路&#xff1a; 由于我第一写没考虑到大数据的原因&#xff0c;直接判断导致只得了40分&#xff0c;下面是我的代码&#xff1a; #…

4.7 IO day6

1&#xff1a;有一个隧道&#xff0c;全长5公里&#xff0c;有2列火车&#xff0c;全长200米&#xff0c; 火车A时速 100公里每小时 火车B时速 50公里每小时 现在要求模拟火车反复通过隧道的场景(不可能2列火车都在隧道内运行) #include <stdio.h> #include <string.…

1、java语法入门(找工作版)

文章目录 一、Java简介二、Java常量与变量1、标识符2、关键字3、变量4、类的命名规则5、数据类型6、基本数据类型字面值7、变量的定义与初始化8、ASCII码和Unicode编码9、转义字符10、类型转换11、常量 三、Java运算符1、算术运算符2、赋值运算符3、关系运算符4、逻辑运算符5、…

C#/.NET/.NET Core推荐学习书籍(24年4月更新,已分类)

前言 古人云&#xff1a;“书中自有黄金屋&#xff0c;书中自有颜如玉”&#xff0c;说明了书籍的重要性。作为程序员&#xff0c;我们需要不断学习以提升自己的核心竞争力。以下是一些优秀的C#/.NET/.NET Core相关学习书籍&#xff08;包含了C#、.NET、.NET Core、Linq、EF/E…