【并发编程】ThreadPoolExecutor类

       📝个人主页:五敷有你      
 🔥系列专栏:并发编程
⛺️稳重求进,晒太阳

ThreadPoolExecutor

1) 线程池状态

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量

状态名

高三位

接受新任务

处理阻塞队列任务

说明

RUNNING

111

Y

Y

SHUTDOWN

000

N

Y

不会接受新任务,但会处理阻塞队列剩余任务

STOP

001

N

N

会中断正在执行的任务,并抛弃阻塞队列任务

TIDYING

010

-

-

任务全执行完毕,任务线程为0即进入终结

TERMINATED

011

-

-

终结状态

从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING

这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一(赋值合二为一),这样就可以用一次 cas 原子操作进行赋值

// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }

2) 构造方法

public ThreadPoolExecutor(int corePoolSize, 
                            int maximumPoolSize, 
                            long keepAliveTime, 
                            TimeUnit unit, 
                            BlockingQueue workQueue,
                            ThreadFactory threadFactory,
                            RejectedExecutionHandler handler)
  • corePoolSize 核心线程数目 (最多保留的线程数)
  • maximumPoolSize 最大线程数目
  • keepAliveTime 生存时间 - 针对救急线程
  • unit 时间单位 - 针对救急线程
  • workQueue 阻塞队列
  • threadFactory 线程工厂 - 可以为线程创建时起个好名字
  • handler 拒绝策略

工作方式:

流程

  • 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
  • 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程。
  • 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的救急线程来救急。
  • 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现,其它著名框架也提供了实现
    • AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
    • CallerRunsPolicy 让调用者运行任务
    • DiscardPolicy 放弃本次任务
    • DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
    • Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方便定位问题Netty 的实现,是创建一个新线程来执行任务
    • ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
    • PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
  • 当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由keepAliveTime 和 unit 来控制。

根据这个构造方法,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池

3) newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) { 
            return new ThreadPoolExecutor(nThreads, nThreads, 
                                        0L, TimeUnit.MILLISECONDS, 
                                        new LinkedBlockingQueue());
    }

特点

核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间

阻塞队列是无界的,可以放任意数量的任务

评价:

适用于任务量已知,相对耗时的任务。

线程工厂的使用

ExecutorService executorService = Executors.newFixedThreadPool(2, new ThreadFactory() {
    private AtomicInteger t=new AtomicInteger(1);

    @Override
    public Thread newThread(Runnable r) {

        return new Thread(r,"t"+t.getAndIncrement());

    }
});

4) newCachedThreadPool

public static ExecutorService newCachedThreadPool() { 
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 
                                            60L, TimeUnit.SECONDS, 
                                            new SynchronousQueue<Runnable>());
          }

特点

  • 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着
    • 全部都是救急线程(60s 后可以回收)
    • 救急线程可以无限创建
  • 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)
SynchronousQueue<Integer> integers = new SynchronousQueue<>();
new Thread(() -> {
    try {
        log.debug("putting {} ", 1);
        integers.put(1);
        log.debug("{} putted...", 1);
        log.debug("putting...{} ", 2);
        integers.put(2);
        log.debug("{} putted...", 2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
},"t1").start();
sleep(1);
new Thread(() -> {
    try {
        log.debug("taking {}", 1);
        integers.take();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
},"t2").start();
sleep(1);
new Thread(() -> {
    try {
        log.debug("taking {}", 2);
        integers.take();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
},"t3").start();

输出

评价

整个线程池表现为线程数会根据任务量不断增加,没有上限,当任务执行完毕,空闲一分钟后释放线程。

适合任务数比较密集,但每个任务执行时间比较短的情况

5)newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor(){
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()));
}

使用场景

希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。

区别:

  • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
  • Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
    • FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法
  • Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改
    • 对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改

6)提交任务

// 执行任务
void execute(Runnable command);

// 提交任务 task,用返回值 Future 获得任务执行结果 
<T> Future submit(Callable task);

// 提交 tasks 中所有任务 
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

// 提交 tasks 中所有任务,带超时时间 
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
 <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
 
 // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
 <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

使用:

List<Future<Object>> futures = executorService.invokeAll(Arrays.asList(
        () -> {
            return "徐烁大美女";
        },
        () -> {
            return "赵菁大美女";
        }


));
for (Future<Object> future : futures) {
    System.out.println(future.get());
}
/*********************/
Object o = executorService.invokeAny(Arrays.asList(
        () -> {
            return "徐烁大美女";
        },
        () -> {
            return "赵菁大美女";
        }


));
System.out.println(o);

7) 关闭线程池

shutdown

/*线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 此方法不会阻塞调用线程的执行:就是主线程调用后,会继续向下执行,不会等待其他线程执行完*/

void shutdown();
public void shutdown() { 
    final ReentrantLock mainLock = this.mainLock; 
    mainLock.lock(); 
    try { 
        checkShutdownAccess(); 
        // 修改线程池状态 
        advanceRunState(SHUTDOWN); 
        // 仅会打断空闲线程 
        interruptIdleWorkers(); 
        onShutdown(); // 扩展点 ScheduledThreadPoolExecutor 
        } finally { 
        mainLock.unlock(); 
        } 
        // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等) 
        tryTerminate();
   }

shutdownNow

/*线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务*/
List shutdownNow();
public List shutdownNow() {
    List tasks; 
    final ReentrantLock mainLock = this.mainLock; 
    mainLock.lock(); 
    try { 
    checkShutdownAccess(); 
    // 修改线程池状态 
    advanceRunState(STOP); 
    // 打断所有线程 
    interruptWorkers(); 
    // 获取队列中剩余任务 
    tasks = drainQueue(); 
    } finally {
         mainLock.unlock(); 
  } 
      // 尝试终结 
      tryTerminate();
       return tasks;
   }

其它方法

// 不在 RUNNING 状态的线程池,此方法就返回 
trueboolean isShutdown();
// 线程池状态是否是 TERMINATED
boolean isTerminated();
// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) 
                        throws InterruptedException;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/387954.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

网络安全威胁,如何解决缓冲区溢出攻击

目录 一、什么是网络安全 二、什么是缓冲区 三、缓冲区溢出 四、缓冲区溢出攻击的类型 一、什么是网络安全 网络安全&#xff08;Network Security&#xff09;指的是保护计算机网络及其相关设备、系统和数据免受未经授权访问、破坏、篡改、窃取或滥用的威胁和攻击。随着网…

【C语言必刷题】1.打印1~100之间的奇数

&#x1f4da;博客主页&#xff1a;爱敲代码的小杨. ✨专栏&#xff1a;《Java SE语法》 | 《数据结构与算法》 | 《C生万物》 ❤️感谢大家点赞&#x1f44d;&#x1f3fb;收藏⭐评论✍&#x1f3fb;&#xff0c;您的三连就是我持续更新的动力❤️ &#x1f64f;小杨水平有…

[职场] 应聘销售的简历怎么写 #职场发展#笔记

应聘销售的简历怎么写 应聘销售的简历怎么写1 基本信息 姓名&#xff1a;吴x 性别&#xff1a;女 毕业院校&#xff1a;徐州师范大学计算机科学院 学历&#xff1a;大专 联系电话&#xff1a;电子邮件&#xff1a; 工作经验&#xff1a;4年 求职意向 期望从事职业&#xff1a;销…

前端秘法基础式(HTML)(第二卷)

目录 一.表单标签 1.表单域 2.表单控件 2.1input标签 2.2label/select/textarea标签 2.3无语义标签 三.特殊字符 一.表单标签 用来完成与用户的交互,例如登录系统 1.表单域 <form>通过action属性,将用户填写的数据转交给服务器 2.表单控件 2.1input标签 type…

(03)Hive的相关概念——分区表、分桶表

目录 一、Hive分区表 1.1 分区表的概念 1.2 分区表的创建 1.3 分区表数据加载及查询 1.3.1 静态分区 1.3.2 动态分区 1.4 分区表的本质及使用 1.5 分区表的注意事项 1.6 多重分区表 二、Hive分桶表 2.1 分桶表的概念 2.2 分桶表的创建 2.3 分桶表的数据加载 2.4 …

UART通信中的奇偶校验

UART通信中的奇偶校验&#xff1a;提升数据传输可靠性的简单方法 在微控制器&#xff08;MCU&#xff09;和各种电子设备之间的数据通信领域&#xff0c;UART&#xff08;Universal Asynchronous Receiver/Transmitter&#xff0c;通用异步收发传输器&#xff09;协议是一种广泛…

23款奔驰S400商务版没有后排电动座椅那改装一套跟选装有区别吗

改装的后排电动座椅通常提供以下功能&#xff1a; 电动调节&#xff1a;座椅可以通过按钮或控制面板进行前后调节&#xff0c;以适应乘客的腿部空间需求。 靠背角度调节&#xff1a;乘客可以通过电动调节功能来调整座椅的靠背角度&#xff0c;以获得更舒适的坐姿。 座椅倾斜调…

投资银行在网络安全生态中的作用

文章目录 一、投资银行的含义(一)并购买方。(二)并购卖方。(三)IPO辅助。(四)投资银行业务的另一方面是帮助这些交易融资。二、从投资银行角度看网络安全产业(一)行业的短期前景三、复杂的网络安全并购(一)行业知识对投资银行业务很重要(二)在网络安全领域,技术…

嵌入式C语言学习——基于Linux与GCC(二)

系列文章目录 一.C语言常用关键字及运算符操作 文章目录 系列文章目录内存四区指针指针概述指针 修饰符constvoliatiletypedef 指针运算符多级指针 数组数组空间字符空间及地址 结构体、共用体定义、字节对齐位域 内存分布图段错误分析 内存四区 C/C语言的内存四区&#xff…

MySQL 基础知识(六)之数据查询(一)

目录 1 基本查询 1.1 查询相关列 (select * / 列名) 1.2 别名 (as) 1.3 去重 (distinct) 1.4 对列中的数据进行运算 (、-、*、/) 2 条件查询 (where) 2.1 等值查询 () 2.2 非等值查询 (>、<、>、<、!、><) 2.3 逻辑判断 (and、or、not) 2.4 区间判…

Shell 学习笔记(一)-Shell脚本编程简介

一 什么是shell&#xff1f; shell是一个用 C 语言编写的程序&#xff0c;它是用户使用 Linux 的桥梁。Shell 既是一种命令语言&#xff0c;又是一种程序设计语言。 Shell 是指一种应用程序&#xff0c;这个应用程序提供了一个界面&#xff0c;用户通过这个界面访问操作系统内…

rocketMQ下载、安装及配置

topic主题 - 里边存在多个队列&#xff08;队列是真实存在的&#xff09; rocketMQ安装及配置 一、官网下载 windows和linux系统版本都一样。Binary 下载 下载 | RocketMQ (apache.org) 二、修改运行内存及broker.conf、配置环境变量 1、修改根目录->bin目录下runserve…

ubuntu22.04@laptop OpenCV Get Started: 010_blob_detection

ubuntu22.04laptop OpenCV Get Started: 010_blob_detection 1. 源由2. blob应用Demo2.1 C应用Demo2.2 Python应用Demo 3. 重点分析3.1 Threshold3.2 Area3.3 Circularity3.4 Convexity3.5 Inertia Ratio 4. 总结5. 参考资料6. 补充 1. 源由 Blob是图像中的一组连接像素&#…

猫头虎分享已解决Bug || 代码部署失败(Code Deployment Failure):DeploymentError, FailedRelease

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通鸿蒙》 …

点云旋转处理

实现代码为&#xff1a; //以中心化点进行旋转double theta atan(maindirection.a);//计算的是弧度单位for (int i 0; i < origipts.size(); i){pcl::PointXYZ tempone;tempone.x aftercenerlizepts[i].x*cos(theta) aftercenerlizepts[i].y*sin(theta) center.x;temp…

SPFA最短路

文章目录 从Bellman-Ford开始核心思想模拟算法执行过程时间复杂度模板 spfaspfa优化的思想模板 从Bellman-Ford开始 对于所有边权都大于等于0的图&#xff0c;任意两个顶点之间的最短路&#xff0c;显然不会经过重复的顶点或者边。也就是说任意一条最短路经过的定点数不会超过…

动态内存管理:new和delete的底层探索

之前我们在C语言上是学过malloc和calloc还要realloc等函数来在堆上获取相应的内存&#xff0c;但是这些函数是存在缺陷的&#xff0c;今天引入对new和delete的学习&#xff0c;来了解new和delete的底层实现。 首先就是在C中我们为什么要对内存进行区域的分块&#xff1f; 答案…

MyBatisPlus基础操作之增删改查

目录 一、基本使用 1.1 插入数据 1.2 删除操作 1.3 更新操作 二、条件构造器Wrapper 2.1 常用AbstractWrapper方法 2.1.1 示例一 2.2.2 示例二 2.2.3 示例三 2.2 常用QueryWrapper方法 2.2.1 示例一 2.2.2 示例二 2.2.3 示例三&#xff08;常用&#xff09; 2.3 常…

攻防演练后的一点随记

攻防演练 攻防演练算是告一段落了&#xff0c;各位红队和蓝队的兄弟们都辛苦了&#xff0c;写一点随记&#xff0c;供大家参考。 记得第一次参加攻防演练是在2018年&#xff0c;当时被派到北京&#xff0c;在某个政企单位做攻防演练支撑工作&#xff0c;然后2020年又被紧急派到…

【STM32 CubeMX】学STM必会的数据结构——环形缓冲区

文章目录 前言一、环形缓冲区是什么二、实现环形缓冲区实现分析2.1 环形缓冲区初始化2.2 写buf2.3 读buf2.4 测试 三、代码总况总结 前言 在嵌入式系统开发中&#xff0c;经常需要处理数据的缓存和传输&#xff0c;而环形缓冲区是一种常见且有效的数据结构&#xff0c;特别适用…