45.自定义线程池(三)-拒绝策略

 拒绝策略采用函数式接口参数传入,策略模式

@FunctionalInterface
public interface RejectPolicy<T> {

    void reject(BlockingQueue<T> queue, T task);
}
package com.xkj.thread.pool;


import com.aspose.words.Run;
import lombok.extern.slf4j.Slf4j;

import java.util.HashSet;
import java.util.concurrent.TimeUnit;

@Slf4j(topic = "c.ThreadPool")
public class ThreadPool {

    //任务队列
    private BlockingQueue<Runnable> taskQueue;
    //线程集合
    private HashSet<Worker> workers = new HashSet<>();
    //核心线程数
    private int coreSize;
    //获取任务的超时时间
    private long timeout;

    private TimeUnit timeUnit;

    private RejectPolicy<Runnable> rejectPolicy;

    public ThreadPool(int coreSize,
                      int queueCapcity,
                      long timeout,
                      TimeUnit timeUnit,
                      RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapcity);
        this.rejectPolicy = rejectPolicy;
    }

    class Worker extends Thread {
        private Runnable task;
        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 执行任务
            // 1.当task不为空执行任务
            // 2.当task执行完毕,再接着从任务队列获取任务并执行
            while(task != null || (task = taskQueue.take()) != null) {
                try {
                    log.debug("正在执行...{}", task);
                    task.run();
                }catch (Exception e) {

                }finally {
                    task = null;
                }
            }
            synchronized (workers) {
                log.debug("worker 被移除{}", this);
                workers.remove(this);
            }
        }
    }

    //执行任务
    public void execute(Runnable task) {
        synchronized (workers) {
            if(workers.size() < coreSize) {
                Worker worker = new Worker(task);
                log.debug("新增worker{},{}", worker, task);
                // 当任务数没有超过coreSize时,直接交给worker对象执行
                workers.add(worker);
                worker.start();
            } else {
                // 当任务数超过coreSize时,加入任务队列暂存
                // 1) 死等
//                taskQueue.put(task);
                // 2)超时等待
                // 3)放弃任务执行
                // 4)抛出异常
                // 5)让调用者自己执行任务
                taskQueue.tryPut(rejectPolicy, task);
            }
        }

    }
}

在原来的基础上添加了 带超时时间的阻塞添加方法,offer方法

package com.xkj.thread.pool;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.blockingQueue")
public class BlockingQueue<T> {

    //1.任务队列
    private Deque<T> queue = new ArrayDeque<>();
    //2.锁
    private Lock lock = new ReentrantLock();
    //3.生产者条件变量
    private Condition fullWaitSet = lock.newCondition();
    //4.消费者条件变量
    private Condition emptyWaitSet = lock.newCondition();
    //5.容量
    private int capcity;

    public BlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    /**
     * 带超时的获取元素
     * @param timeout
     * @param unit
     * @return
     */
    public T poll(long timeout, TimeUnit unit) {
        lock.lock();
        try {
            //将timeout统一转化成纳秒
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) { //判断队列是否为空
                try {
                    if(nanos <= 0) {
                        return null;
                    }
                    //阻塞等待,当被唤醒后,队列不会空,不满足while条件,程序继续向下执行
                    //返回的是timeout - 已经等待的时间 = 剩余的时间
                    //防止虚假唤醒
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //获取队列头部的元素返回,获取元素后应该从队列中移除
            T t = queue.removeFirst();
            //唤醒生产者,继续添加元素
            fullWaitSet.signal();
            return t;
        }finally {
            lock.unlock();
        }
    }


    /**
     * 获取元素
     * @return
     */
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) { //判断队列是否为空
                try {
                    //阻塞等待,当被唤醒后,队列不会空,不满足while条件,程序继续向下执行
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //获取队列头部的元素返回,获取元素后应该从队列中移除
            T t = queue.removeFirst();
            //唤醒生产者,继续添加元素
            fullWaitSet.signal();
            return t;
        }finally {
            lock.unlock();
        }
    }

    /**
     * 添加元素
     * @param element
     */
    public void put(T element) {
        lock.lock();
        try {
           while (queue.size() == capcity){
               try {
                   log.debug("等待加入任务队列{}...", element);
                   fullWaitSet.await();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
           log.debug("加入任务队列{}", element);
           queue.addLast(element);
           //唤醒消费者,继续获取任务
            emptyWaitSet.signal();
        }finally {
            lock.unlock();
        }
    }

    /**
     * 带超时时间的阻塞添加
     * @param element 任务
     * @param timeout 超时时间
     * @param timeUnit 时间单位
     * @return
     */
    public boolean offer(T element, long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while(queue.size() == capcity) {
                try {
                    log.debug("等待加入任务队列{}...", element);
                    if(nanos < 0) {
                        return false;
                    }
                    nanos = fullWaitSet.awaitNanos(nanos);
                }catch (Exception e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列{}", element);
            queue.addLast(element);
            emptyWaitSet.signal();
            return true;
        }finally {
            lock.unlock();
        }
    }

    /**
     * 获取大小
     * @return
     */
    public int size() {
        lock.lock();
        try {
            return queue.size();
        }finally {
            lock.unlock();
        }
    }

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            if(queue.size() == capcity) { //队列已满
                rejectPolicy.reject(this, task);
            }else { //队列有空闲
                log.debug("加入任务队列{}", task);
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        }finally {
            lock.unlock();
        }

    }
}

场景一:拒绝策略死等 

package com.xkj.thread.pool;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

@Slf4j(topic = "c.TestPool")
public class TestPool {

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(
                1,
                1,
                1000,
                TimeUnit.MILLISECONDS,
                (queue, task) -> {
                    // 1.死等
                    queue.put(task);
                });

        for (int i = 0; i < 3; i++) {
            int j = i;
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("{}", j);
            });
        }
    }
}

场景二:带超时等待

@Slf4j(topic = "c.TestPool")
public class TestPool {

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(
                1,
                1,
                1000,
                TimeUnit.MILLISECONDS,
                (queue, task) -> {
                    // 1.死等
//                    queue.put(task);
                    // 2.带超时时间的等待
                    queue.offer(task, 500, TimeUnit.MILLISECONDS);
                });

        for (int i = 0; i < 3; i++) {
            int j = i;
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("{}", j);
            });
        }
    }
}

注意:ThreadPool的区别,取任务也要调用超时的poll方法

package com.xkj.thread.pool;


import lombok.extern.slf4j.Slf4j;

import java.util.HashSet;
import java.util.concurrent.TimeUnit;

@Slf4j(topic = "c.ThreadPool")
public class ThreadPool {

    //任务队列
    private BlockingQueue<Runnable> taskQueue;
    //线程集合
    private HashSet<Worker> workers = new HashSet<>();
    //核心线程数
    private int coreSize;
    //获取任务的超时时间
    private long timeout;

    private TimeUnit timeUnit;

    private RejectPolicy<Runnable> rejectPolicy;

    public ThreadPool(int coreSize,
                      int queueCapcity,
                      long timeout,
                      TimeUnit timeUnit,
                      RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapcity);
        this.rejectPolicy = rejectPolicy;
    }

    class Worker extends Thread {
        private Runnable task;
        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 执行任务
            // 1.当task不为空执行任务
            // 2.当task执行完毕,再接着从任务队列获取任务并执行
            while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
                try {
                    log.debug("正在执行...{}", task);
                    task.run();
                }catch (Exception e) {

                }finally {
                    task = null;
                }
            }
            synchronized (workers) {
                log.debug("worker 被移除{}", this);
                workers.remove(this);
            }
        }
    }

    //执行任务
    public void execute(Runnable task) {
        synchronized (workers) {
            if(workers.size() < coreSize) {
                Worker worker = new Worker(task);
                log.debug("新增worker{},{}", worker, task);
                // 当任务数没有超过coreSize时,直接交给worker对象执行
                workers.add(worker);
                worker.start();
            } else {
                // 当任务数超过coreSize时,加入任务队列暂存
                // 1) 死等
//                taskQueue.put(task);
                // 2)超时等待
                // 3)放弃任务执行
                // 4)抛出异常
                // 5)让调用者自己执行任务
                taskQueue.tryPut(rejectPolicy, task);
            }
        }

    }
}

流程分析

一共三个任务,一个核心线程,队列的容量为1。

第一个任务占用核心线程

第二个任务进入队列

因为队列已满,第三个任务无法放入队列中,只有等待,等待超时时间为500ms

第一个任务执行完成需要1s

1s后才会执行队列中的任务

但是,第三个任务等待500ms就会超时,就不会等待了,也就是不会添加到队列中了。第三个任务也不会被执行了。

这个时候还没有结束,取任务的也会超时,超时时间为1s,所以取任务等了1s后队列中没有新的任务所以也会超时。超时后,核心线程会被移除。

结果

如果第三个任务等待的超时时间变大,设置为1500ms,那么当第一个线程执行完毕花费1s时间,然后从队列中取出第二个线程执行,此时队列为空,第三个线程添加到队列还未超时,成功添加到队列中,等待执行。所以三个线程都可以得到执行。

@Slf4j(topic = "c.TestPool")
public class TestPool {

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(
                1,
                1,
                1000,
                TimeUnit.MILLISECONDS,
                (queue, task) -> {
                    // 1.死等
//                    queue.put(task);
                    // 2.带超时时间的等待
                    queue.offer(task, 1500, TimeUnit.MILLISECONDS);
                });

        for (int i = 0; i < 3; i++) {
            int j = i;
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("{}", j);
            });
        }
    }
}

 场景三:放弃任务的执行

队列满了,不做任何的操作,任务就不会加入到队列中,就等于放弃任务的执行。

@Slf4j(topic = "c.TestPool")
public class TestPool {

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(
                1,
                1,
                1000,
                TimeUnit.MILLISECONDS,
                (queue, task) -> {
                    // 1.死等
//                    queue.put(task);
                    // 2.带超时时间的等待
//                    queue.offer(task, 1500, TimeUnit.MILLISECONDS);
                    // 3.让调用者放弃任务执行(不要添加任务到队列就是放弃)
                    log.debug("放弃{}", task);

                });

        for (int i = 0; i < 3; i++) {
            int j = i;
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("{}", j);
            });
        }
    }
}

 场景四:让调用者抛出异常

package com.xkj.thread.pool;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

@Slf4j(topic = "c.TestPool")
public class TestPool {

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(
                1,
                1,
                1000,
                TimeUnit.MILLISECONDS,
                (queue, task) -> {
                    // 1.死等
//                    queue.put(task);
                    // 2.带超时时间的等待
//                    queue.offer(task, 1500, TimeUnit.MILLISECONDS);
                    // 3.让调用者放弃任务执行(不要添加任务到队列就是放弃)
//                    log.debug("放弃{}", task);
                    // 4.让调用者抛出异常
                    throw new RuntimeException("任务执行失败" + task);

                });

        for (int i = 0; i < 3; i++) {
            int j = i;
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("{}", j);
            });
        }
    }
}

第三个任务抛出异常后,如果后面还有任务也不会再执行了。

场景五:让调用者自己执行任务

package com.xkj.thread.pool;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

@Slf4j(topic = "c.TestPool")
public class TestPool {

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(
                1,
                1,
                1000,
                TimeUnit.MILLISECONDS,
                (queue, task) -> {
                    // 1.死等
//                    queue.put(task);
                    // 2.带超时时间的等待
//                    queue.offer(task, 1500, TimeUnit.MILLISECONDS);
                    // 3.让调用者放弃任务执行(不要添加任务到队列就是放弃)
//                    log.debug("放弃{}", task);
                    // 4.让调用者抛出异常
//                    throw new RuntimeException("任务执行失败" + task);
                    // 5.让调用者自己执行任务
                    task.run();
                });

        for (int i = 0; i < 3; i++) {
            int j = i;
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("{}", j);
            });
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/676286.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

拒绝服务攻击

文章目录 拒绝服务攻击概述拒绝服务攻击简介分布式拒绝服务攻击DDoS与DoS的关系拒绝服务攻击分类 剧毒包型拒绝服务攻击WinNuke攻击泪滴(Teardrop) 攻击Land 攻击Ping of death攻击循环攻击 风暴型拒绝服务攻击风暴型DoS攻击风暴型攻击用的分组直接风暴型DDoSPING风暴攻击(直接…

03_03_初识SpringAOP和应用

一、SpringAOP的初识与原理 1、概述 AOP&#xff1a;面向切面编程OOP&#xff1a;面向对象编程面相切面编程&#xff1a;是基于OOP基础之上的新编程思想&#xff0c;OOP面向的主要是对象是类&#xff0c;而AOP面向的主要对象是切面&#xff0c;它在处理日志、安全管理、事务管…

第二十六章CSS3续~

3.CSS3渐变属性 CSS3渐变(gradients)可以在两个或多个指定的颜色之间显示平稳的过渡。 以前&#xff0c;我们必须使用图像来实现这些效果。但是&#xff0c;通过使用CSS3渐变(gradients)&#xff0c;可以减少下载的事件和宽带的使用。由于渐变(gradient)是由浏览器生成的&…

MyBatis学习(二)--MyBatis获取参数值的两种方式

1、搭建新的module:mybatis_parameter MyBatis获取参数值的两种方式&#xff1a;${}和#{} ${}的本质就是字符串拼接&#xff0c;采用sql拼接&#xff0c;无法防止sql注入 #{}的本质就是占位符赋值 &#xff0c;采用预编译 防止sql注入 不同参数使用案例 2、单个字面量类型…

深度学习-06-手动进行反向传播

深度学习-06-手动进行反向传播 本文是《深度学习入门2-自製框架》 的学习笔记&#xff0c;记录自己学习心得&#xff0c;以及对重点知识的理解。如果内容对你有帮助&#xff0c;请支持正版&#xff0c;去购买正版书籍&#xff0c;支持正版书籍不仅是尊重作者的辛勤劳动&#xf…

O2O : Finetuning Offline World Models in the Real World

CoRL 2023 Oral paper code Intro 算法基于TD-MPC&#xff0c;利用离线数据训练世界模型&#xff0c;然后在线融合基于集成Q的不确定性估计实现Planning。得到的在线数据将联合离线数据共同训练目标策略。 Method TD-MPC TD-MPC由五部分构成: 状态特征提取 z h θ ( s ) …

Amazon Q Developer 实战:从新代码生成到遗留代码优化(下)

简述 本文是使用 Amazon Q Developer 探索如何在 Visual Studio Code 集成编程环境&#xff08;IDE&#xff09;&#xff0c;从新代码生成到遗留代码优化的续集。在上一篇博客《Amazon Q Developer 实战&#xff1a;从新代码生成到遗留代码优化&#xff08;上&#xff09;》中…

java基础篇(1)

JDK是什么?有哪些内容组成?JDK是Java开发工具包 JVM虚拟机: Java程序运行的地方 核心类库: Java已经写好的东西&#xff0c;我们可以直接用开发工具: javac、java、jdb、jhat.. JRE是什么?有哪些内容组成? JRE是Java运行环境 JVM、核心类库、运行工具 JDK&#xff0c;JRE&…

Linux网络编程:传输层协议|UDP|TCP

知识引入&#xff1a; 端口号&#xff1a; 当应用层获得一个传输过来的报文时&#xff0c;这时数据包需要知道&#xff0c;自己应该送往哪一个应用层的服务&#xff0c;这时就引入了“端口号”&#xff0c;通过区分同一台主机不同应用程序的端口号&#xff0c;来保证数据传输…

Java1.8基于BS版 vue+ uniapp+ springboot专业团队自主研发的一套上门家政APP系统成品源码,支持商用(后台端介绍)

Java1.8基于BS版 vue uniapp springboot专业团队自主研发的一套上门家政APP系统成品源码&#xff0c;支持商用&#xff08;后台端介绍&#xff09; 家政服务后台端 家政服务后台端是一个专为家政服务行业设计的管理系统&#xff0c;用于处理业务运营、用户端管理、师傅端调度、…

Spring boot 随笔 1 DatasourceInitializer

0. 为啥感觉升级了 win11 之后&#xff0c;电脑像是刚买回来的&#xff0c;很快 这篇加餐完全是一个意外&#xff1a;时隔两年半&#xff0c;再看 Springboot-quartz-starter 集成实现的时候&#xff0c;不知道为啥我的h2 在应用启动的时候&#xff0c;不能自动创建quartz相关…

FL Studio怎么给钢琴加延音 FL Studio怎么用钢琴做伴奏

在使用钢琴音色进行音乐创作的时候&#xff0c;可以对钢琴进行延音处理&#xff0c;这样处理的音色给人的感觉会更加的饱满丰富&#xff0c;同时&#xff0c;给钢琴加了延音之后&#xff0c;钢琴的声音时值也会相应的变长&#xff0c;听起来更加的柔和。今天就和大家讲一讲&…

STM32使用HAL库UART接收不定长数据-1

使用STM32的HAL库实现UART串口不定长数据的接收 使用STM32的UART接收数据的时候&#xff0c;经常会遇到接收长度不固定的数据&#xff0c;比如一帧数据可能是10个字节&#xff0c;也可能是12个字节。这种数据称为不定长数据。 现有的很多通信协议是不定长的&#xff0c;比如mo…

vue3_组件间通信方式

目录 一、父子通信 1.父传子&#xff08; defineProps&#xff09; 2.父传子&#xff08;useAttrs&#xff09; 3.子传父&#xff08;ref&#xff0c;defineExpose &#xff09; 4.子传父&#xff08;defineEmits&#xff09; 5.子传父&#xff08;v-model&#xff09; …

数据库 mysql 的彻底卸载

MySQL卸载步骤如下&#xff1a; &#xff08;1&#xff09;按 winr 快捷键&#xff0c;在弹出的窗口输入 services.msc&#xff0c;打开服务列表。 &#xff08;2&#xff09;在服务列表中&#xff0c; 找到 mysql 开头的所有服务&#xff0c; 右键停止&#xff0c;终止对应的…

【问题随记】tightvnc 连接后灰屏

问题描述 刚刚入手了官方发的 OrangePi AI Pro&#xff0c;想用 tight vnc 来连接开发板&#xff0c;就不用连接屏幕那么麻烦了。结果连接后&#xff0c;没能显示 OrangePi AI Pro 桌面。 问题解决 看一下现有的桌面环境。 apt list --installed | grep desktop从中可以看到…

游戏找不到d3dcompiler43.dll怎么办,分享5种有效的解决方法

在计算机使用过程中&#xff0c;我们经常会遇到一些错误提示&#xff0c;其中之一就是找不到某个文件。其中&#xff0c;找不到d3dcompiler43.dll是一个常见的问题。这个问题通常出现在运行某些游戏或应用程序时&#xff0c;由于缺少了d3dcompiler43.dll文件&#xff0c;导致程…

【PTA】7-3 拯救007(C++)代码实现 易错点反思

题目见下: 输入样例 14 20 25 -15 -25 28 8 49 29 15 -35 -2 5 28 27 -29 -8 -28 -20 -35 -25 -20 -13 29 -30 15 -35 40 12 12 //输入上述数据后输出“Yes” AC代码如下: #include<bits/stdc++.h> using namespace std; #define sz 100 typedef struct node{int …

基于javacv ffmpeg 使用原生ffmpeg命令

基于javacv ffmpeg 使用原生ffmpeg命令 1. ffmpeg2. ffprobe 相关阅读&#xff1a; javacv ffmpeg使用笔记 测试过程中&#xff0c;发现ffmpeg-6.0-1.5.9-linux-x86_64.jar 存在问题&#xff08;ffmpeg原生命令执行失败&#xff09;&#xff0c;降级到ffmpeg-5.1.2-1.5.8-linux…

Mixly 开启WIFI AP UDP收发数据

一、开发环境 软件&#xff1a;Mixly 2.0在线版 硬件&#xff1a;ESP32-C3&#xff08;立创实战派&#xff09; 固件&#xff1a;ESP32C3 Generic(UART) 测试工工具&#xff1a;NetAssist V5.0.1 二、实现功能 ESP32开启WIFI AP&#xff0c;打印接入点IP地址&#xff0c;允许…