【Java】如何优雅的关闭线程池

文章目录

  • 背景
  • 一、线程中断 interrupt
  • 二、线程池的关闭 shutdown 方法
    • 2.1、第一步:advanceRunState(SHUTDOWN) 把线程池置为 SHUTDOWN
    • 2.2、第二步:interruptIdleWorkers() 把空闲的工作线程置为中断
    • 2.3、 第三步:onShutdown() 一个空实现,暂不用关注
    • 2.4、 小结
  • 三、线程池的关闭 shutdownNow 方式
    • 3.1、第一步:advanceRunState() 把线程池设置为STOP
    • 3.2、 第二步:interruptWorkers() 中断工作线程
    • 3.3、第三步:drainQueue() 把线程池中的任务都 drain 出来
    • 3.4、小结
  • 四、实战,与 JVM 钩子配合
  • 五、总结

背景

前几天在和同事聊一个需求,说是有个数据查询的功能,因为涉及到多个第三方接口调用,想用线程池并行来做。

很正常的一个方案,但是上线后发现,每次服务发布的时候,这个数据查询的功能就会挂掉,后来发现是线程池没有做好关闭,这里总结一下。

关键字:线程池、shutdown、shutdownNow、interrupt

一、线程中断 interrupt

先补一补基础的知识:线程中断。
线程中断的含义,并不是强制把运行中的线程给“咔嚓”中断,而是把线程的中断标志位置为true,这样等线程之后阻塞(wait、join、sleep)的时候,就会抛出 InterruptedException,程序通过捕获 InterruptedException 来做一定的善后处理,然后让线程退出。

来看个例子,下面这段代码是起一个线程,打印一百行文本,打印过程中,会把线程的中断标志位置为true

public static void test02() throws InterruptedException {

    Thread t = new Thread(() -> {
    for (int i = 0; i < 100; i++) {
        System.out.println("process i=" + i + ",interrupted:" + Thread.currentThread().isInterrupted());
    }
    });
    t.start();
    Thread.sleep(1);
    t.interrupt();
}

看看控制台的输出,发现在打印到 57 的时候,中断标志位已经成功置为true了,但是线程任然在打印,说明只是设置了中断标志位,而不是直接粗暴的把线程中断。

...
process i=55,interrupted:false
process i=56,interrupted:false
process i=57,interrupted:true
process i=58,interrupted:true
process i=59,interrupted:true
...

再看看这个示例,同样是打印一百行文本,打印过程中会判断中断标志位,如果中断就自行退出。

public static void test02() throws InterruptedException {

    Thread t = new Thread(() -> {
    for (int i = 0; i < 100; i++) {
        if (Thread.interrupted()) {
            System.out.println("线程已中断,退出执行");
            break;
        }
        System.out.println("process i=" + i + ",interrupted:" + Thread.currentThread().isInterrupted());
    }
    });
    t.start();
    Thread.sleep(1);
    t.interrupt();
}

控制台输出如下,:

process i=49,interrupted:false
process i=50,interrupted:false
process i=51,interrupted:false

线程已中断,退出执行

二、线程池的关闭 shutdown 方法

了解完线程中断,再来看看线程池的关闭方法。

关闭线程池有两个方法 shutdown() 和 shutdownNow(),具体有什么区别?我们先来看看 shutdown() 方法

 /**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN); // 1. 把线程池的状态设置为 SHUTDOWN
            interruptIdleWorkers(); // 2. 把空闲的工作线程置为中断
            onShutdown(); // 3. 一个空实现,暂不用关注
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

看源码先看注释,翻译下:

启动有序关闭会执行以前提交的任务,但不接受任何新任务。
如果已经关闭,则调用不会产生额外的影响。
此方法不等待活动执行的任务终止。如果需要,可使用 awaitTermination() 做到这一点。

2.1、第一步:advanceRunState(SHUTDOWN) 把线程池置为 SHUTDOWN

线程池状态流转如下。调用 shutdown() 方法会把线程池的状态置为 SHUTDOWN,后续再往线程池提交任务就会被拒绝(execute() 方法中做了判断)。

在这里插入图片描述

2.2、第二步:interruptIdleWorkers() 把空闲的工作线程置为中断

interruptIdleWorkers() 方法遍历所有的工作线程,如果 tryLock() 成功,就把线程置为中断。
这里,如果 tryLock() 成功,说明对应的 woker 是一个空闲的,没有在执行任务的线程,如果没成功,说明对应的 worker 正在执行任务。也就是说,这里的中断,对正在执行中的任务并没有影响。

 private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

2.3、 第三步:onShutdown() 一个空实现,暂不用关注

这个没啥,就是个留空的方法。
在这里插入图片描述

2.4、 小结

shutdown() 方法干两件事:

  • 把线程池状态置为 SHUTDOWN 状态
  • 中断空闲线程

我们来看个例子,加深下印象。

public static void test01() throws InterruptedException {

        // corePoolSize 是 2,maximumPoolSize 是 2
        ThreadPoolExecutor es = new ThreadPoolExecutor(2, 2,
                60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>());
        es.prestartAllCoreThreads(); // 启动所有 worker
        es.execute(new Task()); // Task是一个访问某网站的 HTTP 请求,跑的慢,后面会贴出来完整代码,这里把他当做一个跑的慢的异步任务就行
        es.shutdown();
        es.execute(new Task()); // 在线程池 shutdown() 后 继续添加任务,这里预期是抛出异常
    }

这个例子我们主要观察两个现象。

  • 一个是线程池会有两个woker( prestartAllCoreThreads() 方法的调用使得已启动就有两个 worker),其中一个正在执行,一个处于空闲。所以当调用shutdown() 方法,走进 interruptIdleWorkers() 的时候,只有那个空闲的线程会调用 t.interrupt()。
    在这里插入图片描述

  • 第二个是调用 shutdown() 方法后,再调用 execute() 时,会抛出异常,因为线程池的状态已经置为 SHUTDOWN,不再接受新的任务添加进来。

三、线程池的关闭 shutdownNow 方式

 /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * cancels tasks via {@link Thread#interrupt}, so any task that
     * fails to respond to interrupts may never terminate.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP); // 1:把线程池设置为STOP
            interruptWorkers(); // 2.中断工作线程
            tasks = drainQueue(); // 3.把线程池中的任务都 drain 出来
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

注释的意思是:

尝试停止所有正在执行的任务,暂停正在等待的任务的处理,并返回等待执行的任务列表。从该方法返回时,这些任务将从任务队列中清空(移除)。
此方法不等待活动执行的任务终止。如果需要,可使用 awaitTermination() 做到这一点。
除了尽最大努力尝试停止处理主动执行的任务之外,没有其他保证。
此实现通过 Thread.Interrupt() 取消任务,因此任何无法响应中断的任务都可能永远不会终止。

3.1、第一步:advanceRunState() 把线程池设置为STOP

和 shutdown() 方法不同的是,shutdownNow() 方法会把线程池的状态设置为 STOP。

3.2、 第二步:interruptWorkers() 中断工作线程

interruptWorkers() 如下,可以看到,和 shutdown() 方法不同的是,所有的工作线程都调用了 interrupt() 方法

  /**
     * Interrupts all threads, even if active. Ignores SecurityExceptions
     * (in which case some threads may remain uninterrupted).
     */
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

3.3、第三步:drainQueue() 把线程池中的任务都 drain 出来

drainQueue() 方法如下,把阻塞队列里面等待的任务都拿出来,并返回。关闭线程池的时候,可以基于这个特性,把返回的任务都打印出来,做个记录。

  /**
     * Drains the task queue into a new list, normally using
     * drainTo. But if the queue is a DelayQueue or any other kind of
     * queue for which poll or drainTo may fail to remove some
     * elements, it deletes them one by one.
     */
    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }

3.4、小结

shutdownNow() 方法干三件事:

  • 把线程池状态置为 STOP 状态
  • 中断工作线程
  • 把线程池中的任务都 drain 出来并返回

我们来看个例子,代码合刚才的一样,只是关闭线程用的是shutdownNow()

public static void test01() throws InterruptedException {

        // corePoolSize 是 1,maximumPoolSize 是 1,无限容量
        ThreadPoolExecutor es = new ThreadPoolExecutor(1, 1,
                60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>());
        es.prestartAllCoreThreads(); // 启动所有 worker
        es.execute(new Task()); // Task是一个访问某网站的 HTTP 请求,跑的慢,后面会贴出来完整代码,这里把他当做一个跑的慢的异步任务就行
        es.execute(new Task());
        List<Runnable> result = es.shutdownNow();
        System.out.println(result);
        es.execute(new Task()); // 在线程池 shutdownNow() 后 继续添加任务,这里预期是抛出异常
    }

这个例子我们主要观察三个现象。
一个是线程池有两个woker,所以当调用shutdownNow() 方法,走进 interruptWorkers() 的时候,所有的 woker 都会调用 t.interrupt()。
在这里插入图片描述

第二个是 shutdownNow() 方法会返回还没来得及执行的task,并打印出来。
第三个是调用 shutdownNow() 方法后,再调用 execute() 时,会抛出异常,因为线程池的状态已经置为 STOP,不再接受新的任务添加
在这里插入图片描述

四、实战,与 JVM 钩子配合

实际工作中,我们一般是使用 shutdown() 方法,因为它比较“温和”,会等待我们把线程池中的任务都执行完,这里也已 shutdown() 方法为例。

我们回到最开头聊到的那个 case,机器重新发布,但是线程池中还有没执行完任务,机器一关,这些任务全部被kill,怎么办呢?有什么机制能够阻塞一下,等待这个任务执行完再关闭吗?

有的,用 JVM 的钩子!

实例代码如下,一个线程池,提交了三个任务去执行,执行完得半分钟。然后增加一个JVM的钩子,这个钩子可以简单理解为监听器,注册后,JVM在关闭的时候就会调用这个方法,调用完才会正式关闭JVM。

public static void test01() throws InterruptedException {

        ThreadPoolExecutor es = new ThreadPoolExecutor(1, 1,
                60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>());

        es.execute(new Task());
        es.execute(new Task());
        es.execute(new Task());


        Thread shutdownHook = new Thread(() -> {
            es.shutdown();
            try {
                es.awaitTermination(3, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                e.printStackTrace();
                System.out.println("等待超时,直接关闭");
            }
        });
        Runtime.getRuntime().addShutdownHook(shutdownHook);
    }

在机器上执行,会发现,我使用 ctrl + c (注意不是ctrl + z )关闭进程,会发现进程并没有直接关闭,线程池任然执行,一直等到线程池的任务执行完,进程才会正式退出。
在这里插入图片描述

怎么样,是不是很神奇。
本文中涉及的 Task 的源码如下。这个任务是对 stackoverflow 网站发起 10 次请求,用来模拟跑的比较慢的任务,当然这不是重点,可以忽略,有兴趣动手试一下本文代码的同学可以参考下。

 public static class Task implements Runnable {

        @Override
        public void run() {
            System.out.println("task start");
            for (int i = 0; i < 10; i++) {
                httpGet();
                System.out.println("task execute " + i);
            }
            System.out.println("task finish");
        }

        private void httpGet() {

            String url = "https://stackoverflow.com/";
            String result = "";
            BufferedReader in = null;
            try {
                String urlName = url;
                URL realUrl = new URL(urlName);
                // 打开和URL之间的连接
                URLConnection conn = realUrl.openConnection();
                // 设置通用的请求属性
                conn.setRequestProperty("accept", "*/*");
                conn.setRequestProperty("connection", "Keep-Alive");
                conn.setRequestProperty("user-agent",
                        "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)");
                // 建立实际的连接
                conn.connect();
                // 获取所有响应头字段
                Map<String, List<String>> map = conn.getHeaderFields();
//                 遍历所有的响应头字段
//                for (String key : map.keySet()) {
//                    System.out.println(key + "--->" + map.get(key));
//                }
                // 定义BufferedReader输入流来读取URL的响应
                in = new BufferedReader(
                        new InputStreamReader(conn.getInputStream()));
                String line;
                while ((line = in.readLine()) != null) {
                    result += "/n" + line;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            // 使用finally块来关闭输入流
            finally {
                try {
                    if (in != null) {
                        in.close();
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
//            System.out.print(result);
        }
    }

五、总结

想要优雅的关闭线程池,首先要理解线程中断的含义。
其次,关闭线程池有两种方式:shutdown() 和 shutdownNow(),二者最大的区别是 shutdown() 只是把空闲的 woker 置为中断,不影响正在运行的woker,并且会继续把待执行的任务给处理完。shutdonwNow() 则是把所有的 woker 都置为中断,待执行的任务全部抽出并返回,日常工作中更多是使用 shutdown()。
最后,单纯的使用 shutdown() 也不靠谱,还得使用 awaitTermination() 和 JVM 的钩子,才算优雅的关闭线程池。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/31784.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Java POI (1)—— 数据读写操作快速入门

一、Excel的版本区别&#xff08;03版和07版&#xff09; 所谓“03版” 和 “07版”&#xff0c;指的是 Microsoft Excel 版本号。这些版本号代表着不同的Excel 文件格式。2003版 Excel 使用的文件格式为 .xls&#xff0c;而2007版开始使用新的文件格式 .xlsx。 . xlsx 文件格式…

【Spring 】项目创建和使用

哈喽&#xff0c;哈喽&#xff0c;大家好~ 我是你们的老朋友&#xff1a;保护小周ღ 谈起Java 圈子里的框架&#xff0c;最年长最耀眼的莫过于 Spring 框架啦&#xff0c;如今已成为最流行、最广泛使用的Java开发框架之一。不知道大家有没有在使用 Spring 框架的时候思考过这…

VulnHub靶机渗透:SKYTOWER: 1

SKYTOWER: 1 靶机环境介绍nmap扫描端口扫描服务扫描漏洞扫描总结 80端口目录爆破 3128端口获取立足点获取立足点2提权总结 靶机环境介绍 https://www.vulnhub.com/entry/skytower-1,96/ 靶机IP&#xff1a;192.168.56.101 kali IP&#xff1a;192.168.56.102 nmap扫描 端口扫…

使用mpi并行技术实现wordcount算法

【问题描述】 编写程序统计一个英文文本文件中每个单词的出现次数&#xff08;词频统计&#xff09;&#xff0c;并将统计结果按单词字典序输出到屏幕上。 注&#xff1a;在此单词为仅由字母组成的字符序列。包含大写字母的单词应将大写字母转换为小写字母后统计。 【输入形…

ChatGPT使用的SSE技术是什么?

在现代web应用程序中&#xff0c;实时通信变得越来越重要。HTTP协议的传统请求/响应模式总是需要定期进行轮询以获得最新的数据&#xff0c;这种方式效率低下并且浪费资源。因此&#xff0c;出现了一些新的通信技术&#xff0c;如WebSocket和SSE。但是&#xff0c;GPT为什么选择…

分布式数据库架构

分布式数据库架构 1、MySQL常见架构设计 对于mysql架构&#xff0c;一定会使用到读写分离&#xff0c;在此基础上有五种常见架构设计&#xff1a;一主一从或多从、主主复制、级联复制、主主与级联复制结合。 1.1、主从复制 这种架构设计是使用的最多的。在读写分离的基础上…

JS 介绍 Babel 的使用及 presets plugins 的概念

一、Babel 是什么 Bebal 可以帮助我们将新 JS 语法编译为可执行且兼容旧浏览器版本的一款编译工具。 举个例子&#xff0c;ES6&#xff08;编译前&#xff09;&#xff1a; const fn () > {};ES5&#xff08;编译后&#xff09;&#xff1a; var fn function() {}二、B…

设计模式-抽象工厂模式

抽象工厂模式 1、抽象工厂模式简介2、具体实现 1、抽象工厂模式简介 抽象工厂模式(Abstract Factory Pattern)在工厂模式尚添加了一个创建不同工厂的抽象接口(抽象类或接口实现)&#xff0c;该接口可叫做超级工厂。在使用过程中&#xff0c;我们首先通过抽象接口创建不同的工厂…

【HTML界面设计(二)】说说模块、登录界面

记录很早之前写的前端界面&#xff08;具体时间有点久远&#xff09; 一、说说模板 采用 适配器&#xff08;Adapter&#xff09;原理 来设计这款说说模板&#xff0c;首先看一下完整效果 这是demo样图&#xff0c;需要通过业务需求进行修改的部分 这一部分&#xff0c;就是dem…

Redis系列--布隆过滤器(Bloom Filter)

一、前言 在实际开发中&#xff0c;会遇到很多要判断一个元素是否在某个集合中的业务场景&#xff0c;类似于垃圾邮件的识别&#xff0c;恶意ip地址的访问&#xff0c;缓存穿透等情况。类似于缓存穿透这种情况&#xff0c;有许多的解决方法&#xff0c;如&#xff1a;redis存储…

宏景eHR SQL注入漏洞复现(CNVD-2023-08743)

0x01 产品简介 宏景eHR人力资源管理软件是一款人力资源管理与数字化应用相融合&#xff0c;满足动态化、协同化、流程化、战略化需求的软件。 0x02 漏洞概述 宏景eHR 存在SQL注入漏洞&#xff0c;未经过身份认证的远程攻击者可利用此漏洞执行任意SQL指令&#xff0c;从而窃取数…

如何在大规模服务中迁移缓存

当您启动初始服务时&#xff0c;通常会过度设计以考虑大量流量。但是&#xff0c;当您的服务达到爆炸式增长阶段&#xff0c;或者如果您的服务请求和处理大量流量时&#xff0c;您将需要重新考虑您的架构以适应它。糟糕的系统设计导致难以扩展或无法满足处理大量流量的需求&…

docker基础

文章目录 通过Vagrant安装虚拟机修改虚拟机网络配置 docker CE安装(在linux上)docker desktop安装(在MacOS上)Docker架构关于-阿里云镜像加速服务配置centos卸载docker 官网: http://www.docker.com 仓库: https://hub.docker.com Docker安装在虚拟机上&#xff0c;可以通过V…

Go语言的TCP和HTTP网络服务基础

目录 【TCP Socket 编程模型】 Socket读操作 【HTTP网络服务】 HTTP客户端 HTTP服务端 TCP/IP 网络模型实现了两种传输层协议&#xff1a;TCP 和 UDP&#xff0c;其中TCP 是面向连接的流协议&#xff0c;为通信的两端提供稳定可靠的数据传输服务&#xff1b;UDP 提供了一种…

[MySQL]不就是SQL语句

前言 本期主要的学习目标是SQl语句中的DDL和DML实现对数据库的操作和增删改功能&#xff0c;学习完本章节之后需要对SQL语句手到擒来。 1.SQL语句基本介绍 SQL&#xff08;Structured Query Language&#xff09;是一种用于管理关系型数据库的编程语言。它允许用户在数据库中存…

双因素身份验证在远程访问中的重要性

在快速发展的数字环境中&#xff0c;远程访问计算机和其他设备已成为企业运营的必要条件。无论是在家庭办公室运营的小型初创公司&#xff0c;还是团队分散在全球各地的跨国公司&#xff0c;远程访问解决方案都能保证工作效率和连接性&#xff0c;能够跨越距离和时间的阻碍。 …

7Z045 引脚功能详解

本文针对7Z045芯片&#xff0c;详细讲解硬件设计需要注意的技术点&#xff0c;可以作为设计和检查时候的参考文件。问了方便实用&#xff0c;按照Bank顺序排列&#xff0c;包含配置Bank、HR Bank、HP Bank、GTX Bank、供电引脚等。 参考文档包括&#xff1a; ds191-XC7Z030-X…

怎么计算 flex-shrink 的缩放尺寸

计算公式: 子元素的宽度 - (子元素的宽度的总和 - 父盒子的宽度) * (某个元素的flex-shrink / flex-shrink总和) 面试问题是这样的下面 left 和 right 的宽度分别是多少 * {padding: 0;margin: 0;}.container {width: 500px;height: 300px;display: flex;}.left {width: 500px…

红日靶场(一)外网到内网速通

红日靶场&#xff08;一&#xff09; 下载地址&#xff1a;http://vulnstack.qiyuanxuetang.net/vuln/detail/2/ win7:双网卡机器 win2003:域内机器 win2008域控 web阶段 访问目标机器 先进行一波信息收集&#xff0c;扫一下端口和目录 扫到phpmyadmin&#xff0c;还有一堆…

【资料分享】Xilinx Zynq-7010/7020工业核心板规格书(双核ARM Cortex-A9 + FPGA,主频766MHz)

1 核心板简介 创龙科技SOM-TLZ7x是一款基于Xilinx Zynq-7000系列XC7Z010/XC7Z020高性能低功耗处理器设计的异构多核SoC工业核心板&#xff0c;处理器集成PS端双核ARM Cortex-A9 PL端Artix-7架构28nm可编程逻辑资源&#xff0c;通过工业级B2B连接器引出千兆网口、USB、CAN、UA…