[Java线程池]ExecutorService|CompletionService的区别与选择

这段时间对业务系统做了个性能测试,其中使用了较多线程池的技术,故此做一个技术总结。

这次总结的内容比较多,主要是四个:

  1. ExecutorService
  2. CompletionService
  3. Runnable
  4. Callable

前两个是线程池相关接口,后两个是多线程相关接口。在最后,我会说明什么情况下使用哪个接口,这两类接口如何搭配使用。

Tips:个人拙见,如有不对,请多多指正。

一、ExecutorService

ExecutorService是一个接口,继承自Executor。ExecutorService提供了一些常用操作和方法,但是ExecutorService是一个接口,无法实例化。
不过,Java提供了一个帮助类Executors,可以快速获取一个ExecutorService对象,并使用ExecutorService接口的一些方法。
ExecutorService
Executors帮助类提供了多个构造线程池的方法,常用的分为两类:

  1. 直接执行的
    • newCachedThreadPool
    • newFixedThreadPool
    • newSingleThreadExecutor
  2. 延迟或定时执行的
    • newScheduledThreadPool
    • newSingleThreadScheduledExecutor

Executors为每种方法提供了一个线程工厂重载。

(一)newCachedThreadPool

创建一个默认的线程池对象,里面的线程和重用,且在第一次使用的时候才创建。可以理解为线程优先模式,来一个创一个线程,直到线程处理完成后,再处理其他的任务。
Code:

package com.macro.boot.javaBuiltThreadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class MyExecutorService {
    public static void main(String[] args) {
        // 1. 使用帮助类
//        ExecutorService executorService = Executors.newCachedThreadPool();

        // 2. 提交任务
/*        for (int i = 0; i < 20; i++) {
            executorService.submit(new MyRunnable(i));
        }*/

        // 3. 重载方法测试
        test();
    }

    private static void test() {
        // 1. 使用帮助类
        ExecutorService executorService = Executors.newCachedThreadPool(
                new ThreadFactory() {
                    int n = 1;

                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "线程正在执行 --->" + n++);
                    }
                }
        );

        // 2. 提交任务
        for (int i = 0; i < 20; i++) {
            executorService.submit(new MyRunnable(i));
        }
    }
}

/**
 * 1. 线程类
 */
class MyRunnable implements Runnable {
    private int id;

    public MyRunnable(int id) {
        this.id = id;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println(name + "正在执行..." + "--->" + id);
    }
}

输出:几乎是一下子就执行了,newCachedThreadPool会创建和任务数同等匹配的线程,直到处理完成任务的线程可以处理新增的任务。

(二)newFixedThreadPool

Code:创建一个可重用固定线程数量的线程池

package com.macro.boot.javaBuiltThreadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
 * 创建一个可固定重用次数的线程池
 */
public class MyNewFixedThreadPool {
    public static void main(String[] args) {
/*        // nThreads:线程数量
        ExecutorService es = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            es.submit(new MyRunnable(i));
        }*/
        test();
    }

    private static void test() {
        ExecutorService es = Executors.newFixedThreadPool(5, new ThreadFactory() {
            int n = 1;

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "线程" + n++);
            }
        });
        // 提交任务
        for (int i = 0; i < 10; i++) {
            es.submit(new MyRunnable(i));
        }
    }
}

(三)newSingleThreadExecutor

只有一个线程(线程安全)

package com.macro.boot.javaBuiltThreadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class MyNewSingleThreadExecutor {
    public static void main(String[] args) throws InterruptedException {
/*        ExecutorService es = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            es.submit(new MyRunnable(i));
        }*/
        test();
    }

    private static void test() throws InterruptedException {
        ExecutorService es = Executors.newSingleThreadExecutor(new ThreadFactory() {
            int n = 1;

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "线程" + n++);
            }
        });
        for (int i = 0; i < 10; i++) {
            Thread.sleep(100);
            es.submit(new MyRunnable(i));
        }
    }
}

(四)newScheduledThreadPool

怎么理解这个线程池的延迟时间?很简单,第一次执行的开始时间,加上延迟的时间,就是第二次执行的时间。

package com.macro.boot.ScheduledExecutorService;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class MyScheduledExecutor {
    public static void main(String[] args) {
        ScheduledExecutorService sec = Executors.newScheduledThreadPool(4);
        for (int i = 0; i < 10; i++) {
            sec.schedule(new MyRunnable(i), 1, TimeUnit.SECONDS);
        }
        System.out.println("开始执行。。。");
        sec.shutdown();
    }
}

class MyRunnable implements Runnable {
    private int id;

    @Override
    public String toString() {
        return "MyRunnable{" +
                "id=" + id +
                '}';
    }

    public MyRunnable(int id) {
        this.id = id;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println(name + "执行了任务" + id);
    }
}

(五)newSingleThreadScheduledExecutor

newSingleThreadScheduledExecutor和newScheduledThreadPool的区别是,newSingleThreadScheduledExecutor的第二次执行时间,等于第一次开始执行的时间,加上执行线程所耗费的时间,再加上延迟时间,即等于第二次执行的时间。

二、CompletionService

CompletionService是一个接口。
当我们使用ExecutorService启动多个Callable时,每个Callable返回一个Future,而当我们执行Future的get方法获取结果时,会阻塞线程直到获取结果。
而CompletionService正是为了解决这个问题,它是Java8的新增接口,它的实现类是ExecutorCompletionService。CompletionService会根据线程池中Task的执行结果按执行完成的先后顺序排序,任务先完成的可优先获取到。
Code:

package com.macro.boot.completions;

import java.util.concurrent.*;

public class CompletionBoot {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 实例化线程池
        ExecutorService es = Executors.newCachedThreadPool();
        ExecutorCompletionService<Integer> ecs = new ExecutorCompletionService<>(es);

        for (int i = 0, j = 3; i < 20; i++) {
            ecs.submit(new CallableExample(i, j));
        }
        for (int i = 0; i < 20; i++) {
            // take:阻塞方法,从结果队列中获取并移除一个已经执行完成的任务的结果,如果没有就会阻塞,直到有任务完成返回结果。
            Integer integer = ecs.take().get();
            // 从结果队列中获取并移除一个已经执行完成的任务的结果,如果没有就会返回null,该方法不会阻塞。
            // Integer integer = ecs.poll().get();
            System.out.println(integer);
        }
        // 不要忘记关闭线程池
        es.shutdown();
    }
}
class CallableExample implements Callable<Integer> {
    /**
     * 使用构造方法获取变量
     * */
    private int a;
    private int b;

    public CallableExample(int a, int b) {
        this.a = a;
        this.b = b;
    }

    @Override
    public Integer call() throws Exception {
        return a + b;
    }

    @Override
    public String toString() {
        return "CallableExample{" +
                "a=" + a +
                ", b=" + b +
                '}';
    }
}

三、Runnable

Runnable和Callable两者都是接口,但是也有区别:

  1. 实现Callable接口的任务线程能返回执行结果;而实现Runnable接口的任务线程不能返回结果;(重点)
  2. Callable接口的call()方法允许抛出异常;而Runnable接口的run()方法的异常只能在内部消化,不能继续上抛;

Code:

class MyRunnable02 implements Runnable {
    private int i;

    public MyRunnable02(int i) {
        this.i = i;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println(name + "执行了... ---> " + i);
    }

    @Override
    public String toString() {
        return "MyRunnable{" +
                "i=" + i +
                '}';
    }
}

四、Callable

Code:

class CallableExample implements Callable<Integer> {
    /**
     * 使用构造方法获取变量
     * */
    private int a;
    private int b;

    public CallableExample(int a, int b) {
        this.a = a;
        this.b = b;
    }

    @Override
    public Integer call() throws Exception {
        return a + b;
    }

    @Override
    public String toString() {
        return "CallableExample{" +
                "a=" + a +
                ", b=" + b +
                '}';
    }
}

五、Example

本次Demo:使用线程池,循环查询数据库500次。
在最开始的时候,是使用ExecutorServer + Future.get(因为查询数据库肯定需要获取结果,所以必须要用Callable,并且get到结果集)。但是get的阻塞操作,实在是太影响速度了,虽然考虑了两种手段去解决,但是都不了了之。
Code:(只贴线程池的代码,线程类和获取连接的类就不放了)

private void executorServerStart() throws SQLException, ClassNotFoundException, ExecutionException, InterruptedException {
        // get con
        TDConUtils tdConUtils = new TDConUtils();
        Connection con = tdConUtils.getCon();
        Statement statement = con.createStatement();

        // SQL
        String sql = "select last_row(value_double) from db1.tb1;";

        // ThreadPool
        ExecutorService es = Executors.newCachedThreadPool();

        // for each
        int count = 500;
        for (int i = 0; i < count; i++) {
            Future<ResultSet> submit = es.submit(new MyThread(i, con, sql));
            ResultSet resultSet = submit.get();
            // print
            while (resultSet.next()) {
                System.out.printf("输出:时间:%s,值:%f \n", resultSet.getTimestamp(1)
                        , resultSet.getDouble(2));
            }
        }
        es.shutdown();

        // close resources
        tdConUtils.close(con, statement);
    }

运行时间:8000ms +
改CompletionService:
Code:

private void completionServerStart() throws SQLException, ClassNotFoundException, InterruptedException, ExecutionException {
        // get con
        TDConUtils tdConUtils = new TDConUtils();
        Connection con = tdConUtils.getCon();
        Statement statement = con.createStatement();

        // SQL
        String sql = "select last_row(value_double) from db1.tb1;";

        // ThreadPool
        ExecutorService es = Executors.newCachedThreadPool();

        //构建ExecutorCompletionService,与线程池关联
        ExecutorCompletionService<ResultSet> ecs = new ExecutorCompletionService<ResultSet>(es);
        // for each
        int count = 500;

        for (int i = 0; i < count; i++) {
            ecs.submit(new MyThread(i, con, sql));
        }
        for (int i = 0; i < count; i++) {
            // 通过take获取Future结果,此方法会阻塞
            ResultSet resultSet = ecs.take().get();
            while (resultSet.next()) {
                System.out.printf("输出:时间:%s,值:%f \n", resultSet.getTimestamp(1)
                        , resultSet.getDouble(2));
            }
        }

        es.shutdown();
        tdConUtils.close(con, statement);
    }

运行时间:300+ms

六、使用小结

分情况。
如果需要获取结果:线程使用Callable;
如果需要异步获取结果:线程池使用CompletionService。
如果不需要获取结果:线程使用Runnable;
如果需要阻塞获取结果:线程池使用ExecutorService。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/521809.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

《手把手教你》系列基础篇(七十四)-java+ selenium自动化测试-框架设计基础-TestNG实现DDT - 上篇(详解教程)

1.简介 上一篇文章中宏哥简单的讲解了一下通过xml文件传递参数&#xff0c;这一篇宏哥讲解通过通过DataProvider传递参数&#xff0c;也就是我们常说的数据驱动测试。如何利用TestNG实现DDT&#xff08;数据驱动测试 Data Driver Test&#xff09;&#xff0c;什么是数据驱动测…

椋鸟数据结构笔记#8:二叉树的遍历、创建与销毁

萌新的学习笔记&#xff0c;写错了恳请斧正。 链式二叉树 这篇笔记我们讨论基于链式二叉树&#xff0c;其节点的数据结构如下&#xff1a; typedef int BTDatatype;typedef struct BTNode {BTDataType data;struct BTNode* left;struct BTNode* right; } BTNode;二叉树的遍历…

STM32CubeMX配置步骤详解六 —— 时钟及其它内部参数配置(1)

接前一篇文章&#xff1a;STM32CubeMX配置步骤详解五 —— 基础配置&#xff08;2&#xff09; 本文内容主要参考&#xff1a; STM32CUBEMX配置教程&#xff08;一&#xff09;基础配置-CSDN博客 野火STM32系列HAL库开发教程 —— 第12讲 STM32的复位和时钟控制&#xff08;第…

环形链表 - LeetCode 热题 25

大家好&#xff01;我是曾续缘&#x1f970; 今天是《LeetCode 热题 100》系列 发车第 25 天 链表第 4 题 ❤️点赞 &#x1f44d; 收藏 ⭐再看&#xff0c;养成习惯 环形链表 给你一个链表的头节点 head &#xff0c;判断链表中是否有环。 如果链表中有某个节点&#xff0c;可…

2-3 AUTOSAR ASW Runable可运行实体

返回总目录->返回总目录<- 目录 一、概述 二、RTE Event 一、概述 运行实体(Runnable Entity,RE)是一段可执行的代码,其包含实际实现的函数(具体的逻辑算法或者操作)。一个软件组件可以包含一个或者多个运行实体。 Runnable就是SWC中的函数,而在AutoSAR架构在被…

【云计算】云数据中心网络(一):VPC

云数据中心网络&#xff08;一&#xff09;&#xff1a;VPC 1.什么是 VPC2.VPC 的组成2.1 虚拟交换机2.2 虚拟路由器 3.VPC 网络规划3.1 VPC 数量规划3.2 交换机数量规划3.3 地址空间规划3.4 不同规模企业地址空间规划实践 4.VPC 网络高可靠设计4.1 单地域单可用区部署4.2 单地…

[StartingPoint][Tier1]Funnel

Task 1 How many TCP ports are open? (打开了多少个 TCP 端口&#xff1f;) # nmap -sS -T4 10.129.224.226 --min-rate 1000 2 Task 2 What is the name of the directory that is available on the FTP server? (FTP 服务器上可用的目录名称是什么&#xff1f;) $ n…

爬虫 新闻网站 以湖南法治报为例(含详细注释,控制台版) V3.0 升级 自定义查询关键词、时间段、粗略判断新闻是否和优化营商环境相关,避免自己再一个个判断

目标网站&#xff1a;湖南法治报 爬取目的&#xff1a;为了获取某一地区更全面的在湖南法治报的已发布的和优化营商环境相关的宣传新闻稿&#xff0c;同时也让自己的工作更便捷 环境&#xff1a;Pycharm2021&#xff0c;Python3.10&#xff0c; 安装的包&#xff1a;requests&a…

强力推荐一款具有故障保护和CAN FD 功能的隔离CAN收发器 SiLM5150S

控制器局域网总线(CAN&#xff0c;Controller Area Network)&#xff0c;是一种用于实时应用的串行通讯协议总线&#xff0c;它可以使用双绞线来传输信号&#xff0c;是目前应用最广泛的现场总线之一。CAN协议具有实时性强、可靠性高、传输距离远的特点&#xff0c;适用于各种复…

Python中定时任务调度利器APScheduler

在Python开发中&#xff0c;经常需要执行一些定时任务&#xff0c;比如定期发送邮件、定期更新数据等。APScheduler&#xff08;Advanced Python Scheduler&#xff09;是一个强大且易用的Python库&#xff0c;专门用于定时任务调度。它提供了丰富的调度接口&#xff0c;使得定…

51单片机学习笔记14 LCD1602显示屏使用

51单片机学习笔记14 LCD1602显示屏使用 一、LCD1602介绍1. 简介2. 引脚定义3. DDRAM4. 字模5. 指令&#xff08;1&#xff09;清屏指令 0x01&#xff08;2&#xff09;光标归位指令 0x02&#xff08;3&#xff09;进入模式设置指令 0x06&#xff08;4&#xff09;显示开关控制指…

短毛猫也能吃得好!揭秘宠物店推荐猫粮的秘密!

短毛猫通常毛发短而浓密&#xff0c;性格温顺&#xff0c;容易打理。那么&#xff0c;对于我们这些爱护短毛猫的朋友们来说&#xff0c;选择一款合适的猫粮就显得尤为重要了。今天&#xff0c;我要向大家推荐一款我个人非常喜欢的猫粮——福派斯三文鱼益生菌猫粮。 &#x1f41…

SAP操作教程第7期:SAP B1日期偏离允许范围解决方法

作为一种灵活的工具&#xff0c;自定义能够充分满足企业多样的需求。它允许你根据个人或团队的具体需求和情况来调整计划。通过自定义&#xff0c;你可以根据优先级、时间表、资源分配和风险管理等因素&#xff0c;制定更具体、实用的计划。 下面我们将详细探讨在SAP Business …

ARM_04

1.总结二进制信号量和计数型信号量的区别&#xff0c;以及他们的使用场景。 二进制信号量&#xff1a;只有0和1&#xff0c;一般用于资源共享时使用 计数型信号量&#xff1a;值一般是大于等于2的&#xff0c;可以实现同步互斥作用 2.使用技术型信号量完成生产者和消费者模型…

vue快速入门(九)事件绑定参数传递

注释很详细&#xff0c;直接上代码 上一篇 新增内容 事件绑定基础模板事件传参方法 源码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, init…

蓝桥杯嵌入式(G431)备赛笔记——LED

目录 cubeMX配置&#xff1a; 代码模板&#xff1a; 注意&#xff1a; cubeMX配置&#xff1a; 原理图&#xff0c;其中PD2高电平使能锁存器&#xff0c;PC8-15默认给高电平&#xff0c;放置上电初始化LED亮 74HC573是八路输出锁存器 1脚是使能&#xff0c;低电平有效&#…

【JavaEE】浅谈线程(一)

线程 前言线程的由来线程是什么线程的属性线程更高效的原因举个例子&#xff08;线程便利性的体现&#xff09; 多线程代码线程并发执行的代码jconsole(观测多线程) 线程的调度问题创建线程的几种方法1&#xff09;通过继承Thread 重写run2&#xff09;使用Runnable接口 重写ru…

Ubuntu系统同时使用AMD和NVIDIA GPU出现的问题及解决

问题产生&#xff1a; Ubuntu 22.04系统同时使用了AMD W7900和NVIDIA GTX 1070Ti&#xff0c;想用1070Ti做显示&#xff0c;W7900做运算。结果Ubuntu 22.04系统不能启动了。 解决方法&#xff1a; 同时按下Ctrl和Alt键&#xff0c;并保持按住。在此过程中&#xff0c;按一下…

Windows Server 2012 R2安装远程桌面服务

文章目录 一、打开【服务器管理器】二、点击【添加角色和功能】三、点击【下一步】四、点击【下一步】五、点击【下一步】![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/05b61a830faf477e81f858ec00bbdfff.png)六、勾选【远程桌面服务】→点击【下一步】七、点击【…

ruoyi-nbcio-plus基于vue3的flowable流程设计器组件的升级修改

更多ruoyi-nbcio功能请看演示系统 gitee源代码地址 前后端代码&#xff1a; https://gitee.com/nbacheng/ruoyi-nbcio 演示地址&#xff1a;RuoYi-Nbcio后台管理系统 http://122.227.135.243:9666/ 更多nbcio-boot功能请看演示系统 gitee源代码地址 后端代码&#xff1a…