线程池 ThreadPool

文章目录

  • 线程池
  • 一、线程池概述
    • 1、什么是线程池?
    • 2、为什么需要线程池?
    • 3、线程池的优势
    • 4、基本原理
  • 二、线程池相关接口与方法
    • 1、Executor
    • 2、ExecutorService
    • 3、ScheduledExecutorService
    • 4、Runnable & Callable
    • 5、Future & FutureTask
    • 6、execute & submit
    • 7、shutdown & shutdownNow
    • 8、awaitTermination
    • 9、isShutDown & isTerminated
  • 三、ThreadPoolExecutor
    • 1、构造方法与核心参数
    • 2、拒接策略
    • 3、线程池的 5 种状态
    • 4、执行原理
    • 5、源码分析
      • 1)execute
      • 2)addWorker
      • 3)runWorker
      • 4)getTask
    • 6、使用示例
  • 四、Executors 工具类(了解)
    • 1、newFixedThreadPool
    • 2、newSingleThreadExecutor
    • 3、newCachedThreadPool
    • 4、newScheduledThreadPool
  • 五、线程池最佳实践
    • 1、正确声明线程池
    • 2、线程数量 怎么配置
    • 3、阻塞队列 怎么选择
    • 4、线程工厂 怎么配置
      • 1)默认的线程工厂
      • 2)自定义线程工厂
    • 5、拒绝策略 怎么配置
    • 6、Spring 线程池
    • 8、不同业务使用不同的线程池
    • 7、线程池 和 ThreadLocal 共用的坑

线程池

一、线程池概述

1、什么是线程池?

线程池其实是一种池化的技术实现,池化技术的核心思想就是实现资源的复用,避免资源的重复创建和销毁带来的性能开销。线程池可以管理一堆线程,让线程执行完任务之后不进行销毁,而是继续去处理其它线程已经提交的任务。

2、为什么需要线程池?

程序启动一个新线程成本是比较高的,因为它涉及到要与操作系统进行交互。

  1. 启动线程的时候,会在内存中开辟一块空间,消耗系统资源。
  2. 销毁线程的时候,首先要把和线程相关东西进行销毁,还要把资源还给操作系统。

因此,如果并发的线程数量很多,这样频繁创建单个线程就会大大降低系统的效率。

但是,如果一个任务的时间非常长,就没必要用线程池了,因为不能控制线程池中线程的开始、挂起、和中止。(不是不能,是不宜)

3、线程池的优势

合理利用线程池能够带来三个好处:

  1. 降低资源消耗

    减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。

  2. 提高响应速度

    当任务到达时,不需要等待线程创建,可以立即执行。

  3. 提高线程的可管理性

    使用线程池可以进行统一分配、调优和监控。根据系统的承受能力,调整线程池中工作线程的数目。防止消耗过多的系统资源,降低系统的稳定性。(线程开的越多,消耗的内存也就越大,最后导致死机或宕机)

4、基本原理

先了解几个核心概念:

  • ThreadPool(线程池):
    • 线程池是整个线程管理的核心,负责创建、管理和调度线程的执行。
    • 线程池通常包含一组线程,这些线程被称为工作线程(Worker)。
    • 线程池维护着一个任务队列(workQueue),用于存储待执行的任务。
  • Worker(工作线程):
    • Worker 是线程池中的工作线程,它负责执行任务队列中的任务。
    • 每个 Worker 线程从任务队列中获取任务,并执行任务的 run() 方法。
    • 当没有任务可执行时,Worker 线程会阻塞等待,直到任务队列中有新的任务被提交。
  • workQueue(任务队列):
    • 任务队列是用于存储待执行的任务的缓冲区。
    • 当有任务被提交到线程池时,线程池会将任务放入任务队列中等待执行。
  • Task(任务):
    • 任务是需要线程池执行的工作单元。
    • 任务通常实现了 Runnable 接口,表示没有返回值的任务;或者实现了 Callable 接口,表示有返回值的任务。
    • 每个任务被执行时,会调用其 run() 方法或 call() 方法来执行具体的逻辑。

线程池的整体流程如下:

  1. 当有任务被提交到线程池时,线程池会将任务放入任务队列中。
  2. Worker 线程从任务队列中获取任务,并执行任务的 run() 方法或 call() 方法。
  3. 执行完任务后,Worker 线程继续循环从任务队列中获取下一个任务执行。
  4. 如果任务队列为空,Worker 线程会阻塞等待,直到有新的任务被提交到任务队列中。
  5. 如果任务队列已满(对于有界队列),新提交的任务可能会被拒绝或者阻塞,直到队列有空闲位置。

线程池负责管理线程的创建和销毁、工作线程的执行、任务队列的调度等工作,从而实现了任务的并发执行和资源的有效利用。

二、线程池相关接口与方法

从JDK1.5开始,为了把工作单元与执行机制分离开,Executor框架诞生了,这是异步执行任务的线程池框架。

在这里插入图片描述

关键类或接口含义
Executor线程池的顶级接口,提供 execute (Runnable command) 方法
ExecutorServiceExecutor的子接口,提供 submit、invoke、shutdown 等方法
ScheduledExecutorServiceExecutorService的子接口,提供延迟/定期指定任务的方法,比Timer更灵活
ThreadPoolExecutor线程池的核心实现类,用来创建线程池执行被提交的任务
ForkJoinPool通过分而治之的思想来并行执行任务
ScheduledThreadPoolExecutor可以执行延迟任务和周期性任务,是一个定时执行任务的线程池。
相关类或接口含义
Runnable接口 和 Callable接口 及它们的实现类线程要执行的任务,Runnable无返回值,Callable有返回值
Future 和 FutureTask代表异步计算的结果

1、Executor

Executor 是线程池的顶级接口,定义了一个单一方法 execute(Runnable command),用于执行给定的任务。

public interface Executor {
    void execute(Runnable command);
}

2、ExecutorService

ExecutorServiceExecutor 的子接口,扩展了 Executor,提供 submitinvokeshutdown 等方法

public interface ExecutorService extends Executor {
    // 平缓关闭线程池
    void shutdown();
    // 立即关闭线程池
    List<Runnable> shutdownNow();

    // 判断线程池是否已经调用了 shutdown() 方法进行关闭。
    boolean isShutdown();

    // 判断线程池中的所有任务是否都已经执行完毕,并且线程池已经完全终止。
    boolean isTerminated();

    // 阻塞一段时间。返回值:是否在指定时间内 线程池中的所有任务都已经执行完毕,并且线程池已经完全终止。
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    // 提交单个任务(支持Runnable和Callable两种任务)
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);

    // 提交多个Callable任务
    // 将所有任务提交到线程池中并等待它们执行完毕。
    // 返回所有任务的执行结果。当所有任务都执行完毕后,调用线程将继续执行。
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
        throws InterruptedException;

    // 提交多个Callable任务
    // 将所有任务提交到线程池中并等待任意一个任务执行完毕。
	// 返回第一个执行完毕的任务的执行结果,并且取消其他任务的执行
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

3、ScheduledExecutorService

ScheduledExecutorServiceExecutorService 的子接口,它提供了延迟或周期性执行任务的功能。

public interface ScheduledExecutorService extends ExecutorService {
    
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
}

4、Runnable & Callable

@FunctionalInterface
public interface Runnable {
   /**
    * 被线程执行,没有返回值也无法抛出异常
    */
    public abstract void run();
}
@FunctionalInterface
public interface Callable<V> {
    /**
     * 计算结果,或在无法这样做时抛出异常。
     *
     * @return 计算得出的结果
     * @throws 如果无法计算结果,则抛出异常
     */
    V call() throws Exception;
}

5、Future & FutureTask

在这里插入图片描述

FutureTaskRunnableFuture的实现类

public class FutureTask<V> implements RunnableFuture<V> {

    public FutureTask(Callable<V> callable) {}
    
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
}

RunnableFutureRunnableFuture的子接口

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

Future接口:用来记录线程任务执行完毕后产生的结果。

public interface Future<V> {

    // 获取结果(一般获取Callable任务的结果)
    V get() throws InterruptedException, ExecutionException;
    
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    
    boolean cancel(boolean mayInterruptIfRunning);
    
    boolean isCancelled();
    
    boolean isDone();
}

6、execute & submit

  • execute是Executor接口的方法;submit是ExecutorService的方法,ExecutorService接口继承了Executor接口
  • execute只能提交Runnable类型的任务;submit既能提交Runnable类型任务也能提交Callable类型任务。
  • execute没有返回值;submit会返回Future对象,通过Future的get方法获取返回值。
  • execute会直接抛出任务执行时的异常;submit 会存下异常,只有当调用Future的get方法才会将任务执行时的异常重新抛出。

看一下 AbstractExecutorServicesubmit 的实现:

public abstract class AbstractExecutorService implements ExecutorService {
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
}

可以看到:

  • submit方法中,首先会判断入参是否为null,如果是就抛出空指针异常,显然线程池不允许提交null任务。
  • submit会将RunnableCallable入参包装成RunnableFuture对象,调用execute方法执行并返回。

7、shutdown & shutdownNow

ExecutorService 接口提供了两个方法,用于关闭线程池。

  • shutdown:平缓关闭线程池,线程池的状态变为 SHUTDOWN

    线程池允许已经提交的任务(包括正在执行和待执行的任务)执行完毕,然后关闭线程池。

  • shutdownNow:立即关闭线程池,线程的状态变为 STOP

    线程池会终止正在执行的任务,然后关闭线程池,返回队列中未执行的任务列表。

注意:调用shutdown()shutdownNow()之后,都不再接受新的任务

  • 此时调用 execute() 方法提交新任务会抛出 RejectedExecutionException 异常。

8、awaitTermination

awaitTermination会阻塞调用线程,直到满足以下任一条件:

  1. 线程池终止,并且所有任务都已经执行完毕(返回true)
  2. 超过了指定的等待时间(返回false)
  3. 当前线程被中断

如果在指定的等待时间内线程池终止,则 awaitTermination() 方法返回 true,否则返回 false

public interface ExecutorService extends Executor {
    // 阻塞一段时间。返回值:是否在指定时间内 线程池中的所有任务都已经执行完毕,并且线程池已经完全终止。
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
}

一般 shutdown 会配合 awaitTermination 来使用

public class ShutdownDemo {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        executor.submit(() -> {
            // 执行任务
            for (int i = 0; i < 5; i++) {
                try {
                    TimeUnit.SECONDS.sleep(i);
                    System.out.println(i + "执行完毕");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        executor.shutdown(); // 关闭线程池
        System.out.println("execute shutdown");

        try {
            if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
                // 超时处理
                System.out.println("线程池未在指定时间内终止");
            }
        } catch (InterruptedException e) {
            // Todo Handle Exception
        }
    }

}

9、isShutDown & isTerminated

ExecutorService 接口提供了两个方法,用于获取线程池是否关闭。

  • isShutDown:当调用 shutdown() 方法后返回为 true。
  • isTerminated:当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true

三、ThreadPoolExecutor

ThreadPoolExecutor 是线程池的核心实现类,Java 主要是通过构建 ThreadPoolExecutor 来创建线程池的。

1、构造方法与核心参数

先看一下 ThreadPoolExecutor 的构造方法,其中有7个核心参数

public ThreadPoolExecutor(
    int corePoolSize,	 				// 核心线程数
    int maximumPoolSize, 				// 最大线程数 
    long keepAliveTime,  				// 空闲线程存活时间
    TimeUnit unit,		 				// 存活时间的单位
    BlockingQueue<Runnable> workQueue,  // 阻塞队列 - 任务队列
    ThreadFactory threadFactory,		// 线程工厂
    RejectedExecutionHandler handler)   // 拒绝策略
{
    if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
1. 核心线程数 - corePoolSize
	    线程池中常驻的核心线程数。
2. 最大线程数 - maximumPoolSize
		线程池允许创建的最大线程数,可以大于核心线程数。
3. 空闲线程存活时间 - keepAliveTime
		任务队列没有任务了 且 工作线程数 > 核心线程数 时,
		会销毁空闲时间达到keepAliveTime的临时线程,直到只剩下 corePoolSize 个线程为止。
4. 存活时间的单位 - unit
		keepAliveTime的时间单位
5. 任务队列 - workQueue
		阻塞队列,当线程数达到核心线程数后,会将任务存储在阻塞队列中。
6. 线程工厂 - threadFactory
		生成线程池中工作线程的线程工厂(一般使用默认,即不配置)
7. 拒绝策略 - handler
		当线程池关闭或饱和时,如何来拒绝请求执行的runnable的策略handler,
            关闭:执行shutdown()方法关闭线程池
            饱和:线程数达到maximumPoolSize 且 任务队列已满

2、拒接策略

当 线程数已达到最大线程数 并且 等待队列也已经排满了,我们就需要拒绝策略机制合理的处理这个时候过来的请求。

jdk自带的四种拒接策略:(ThreadPoolExecutor中的静态内部类)

  1. AbortPolicy 中止策略(默认)

    直接抛出 RejectedExecutionException 异常,阻止系统正常运行。(比较关键的业务,可以及时发现问题)

  2. DiscardPolicy 丢弃策略

    默默地丢弃无法处理的任务,不触发任何动作,就是一个空实现。(允许任务丢失时,可以使用)

  3. DiscardOldestPolicy 丢弃最旧策略

    丢弃队列中等待最久的任务,然后把当前任务加人队列中,尝试再次提交当前任务。(喜新厌旧)

  4. CallerRunsPolicy 调用者运行策略

    一种调节机制,既不会抛出异常,也不会丢弃任务,而是将任务退回给调用线程处理(如main主线程)。

    一般在不允许失败的、对性能要求不高、并发量较小的场景下使用。(性能和效率较低)

可以实现 RejectedExecutionHandle 接口,自定义拒绝策略

  • 自定义拒绝策略可以根据具体的业务需求,例如记录日志、将任务放入消息队列/缓存/数据库等。

3、线程池的 5 种状态

线程池内部有 5 个常量来代表线程池的五种状态

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

线程池状态具体是存在 ctl 成员变量中的,ctl 中不仅存储了线程池的状态还存储了当前线程池中线程数的大小

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }

在这里插入图片描述

  • RUNNING:能够接收新任务,以及对已添加的任务进行处理。
  • SHUTDOWN:线程池不再接收新任务,但是允许已经提交的任务(包括正在执行和待执行的任务)执行完毕。
  • STOP:线程池不再接收新任务,也不再处理队列中已经添加的任务,并且会尝试中断正在处理的任务的线程。
  • TIDYINGSHUTDOWNSTOP的收尾工作完成,执行完 terminated() 方法就会转变为 TERMINATED 状态。
  • TERMINATED:线程池彻底终止。

4、执行原理

在这里插入图片描述

5、源码分析

1)execute

看一下 ThreadPoolExecutorexecute 方法的实现

public class ThreadPoolExecutor extends AbstractExecutorService {
    public void execute(Runnable command) {
        // 首先检查提交的任务是否为null,是的话则抛出NullPointerException。
        if (command == null)
            throw new NullPointerException();

        // 获取线程池的当前状态(ctl是一个AtomicInteger,其中包含了线程池状态和工作线程数)
        int c = ctl.get();

        // 1. 检查当前运行的工作线程数是否少于核心线程数(corePoolSize)
        if (workerCountOf(c) < corePoolSize) {
            // 如果少于核心线程数,尝试添加一个新的工作线程来执行提交的任务
            // addWorker方法会检查线程池状态和工作线程数,并决定是否真的添加新线程
            if (addWorker(command, true))
                return;
            // 重新获取线程池的状态,因为在尝试添加线程的过程中线程池的状态可能已经发生变化
            c = ctl.get();
        }

        // 2. 尝试将任务添加到任务队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 双重检查线程池的状态
            if (!isRunning(recheck) && remove(command))  // 如果线程池已经停止,从队列中移除任务
                reject(command);
            // 如果线程池正在运行,但是工作线程数为0,尝试添加一个新的工作线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        
        // 3. 如果任务队列满了,尝试添加一个新的非核心工作线程来执行任务
        else if (!addWorker(command, false))
            // 如果无法添加新的工作线程(可能因为线程池已经停止或者达到最大线程数限制),则拒绝任务
            reject(command);
    }
}

2)addWorker

上面提到了,通过 addWorker 方法创建线程来执行任务。

public class ThreadPoolExecutor extends AbstractExecutorService {

    private boolean addWorker(Runnable firstTask, boolean core) {
        // ...
        Worker w = null;
        try {
            // 创建 Worker 对象的时候,会创建一个线程,和任务一起封装到 Worker 内部
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // ...
                if (workerAdded) {
                    // 启动线程,执行run方法
                    t.start();
                    workerStarted = true;
                }
            }
        } // ...
    }

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

        // 创建 Worker 对象的时候,会把线程和任务一起封装到 Worker 内部
        Worker(Runnable firstTask) {
            setState(-1);
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        // 重写了run方法,实际调用的是runWorker方法
        public void run() {
            runWorker(this);
        }
    }

}

3)runWorker

可以看到,创建 Worker 对象时,会把线程任务一起封装到 Worker 内部,然后调用 runWorker 方法来让线程执行任务

public class ThreadPoolExecutor extends AbstractExecutorService {

    final void runWorker(Worker w) {
        // 获取当前工作线程
        Thread wt = Thread.currentThread();

        // 从 Worker 中取出任务
        Runnable task = w.firstTask;
        w.firstTask = null;

        // 解锁 Worker(允许中断)
        w.unlock(); 

        boolean completedAbruptly = true;
        try {
            // 当有任务需要执行 或者 能够从任务队列中获取到任务时,工作线程就会持续运行
            while (task != null || (task = getTask()) != null) {
                // 锁定 Worker,确保在执行任务期间不会被其他线程干扰
                w.lock();

                // 如果线程池正在停止,并确保线程已经中断
                // 如果线程没有中断并且线程池已经达到停止状态,中断线程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();

                try {
                    // 在执行任务之前,可以插入一些自定义的操作
                    beforeExecute(wt, task);

                    Throwable thrown = null;
                    try {
                        // 实际执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 执行任务后,可以插入一些自定义的操作
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 清空任务,并更新完成任务的计数
                    task = null;
                    w.completedTasks++;
                    // 解锁 Worker
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 工作线程退出的后续处理
            processWorkerExit(w, completedAbruptly);
        }
    }
}

runWorker 内部使用了 while 循环:

  • 当 Worker 内部的任务执行完毕,就会不断地通过 getTask 方法从队列中获取任务。
  • 只要能获取到任务,就会调用 run 方法继续执行任务(这就是线程能够复用的主要原因)
  • 如果从 getTask 获取不到任务的话,就会调用 finally 中的 processWorkerExit 方法,将线程退出。

这里还有一个细节: Worker 继承了 AQS,每次在执行任务之前都会调用 lock 方法,执行完任务之后,会调用 unlock 方法。

因此,如果想知道线程是否正在执行任务,只需要调用 Woker 的 tryLock 方法,根据是否加锁成功就能判断,加锁成功说明当前线程没有加锁,也就没有执行任务了。调用 shutdown 方法关闭线程池的时候,就是用这种方式来判断线程有没有在执行任务,如果没有的话,会尝试打断没有执行任务的线程。

4)getTask

前面讲到,会在 while 循环中不断地通过 getTask 方法获取任务,获取不到就退出。接下来就看一看 getTask 方法的实现。

public class ThreadPoolExecutor extends AbstractExecutorService {

    private Runnable getTask() {
        // 标志,表示最后一个poll()操作是否超时
        boolean timedOut = false;

        // 无限循环,直到获取到任务或决定工作线程应该退出
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 如果线程池状态是SHUTDOWN或更高(如STOP)并且任务队列为空,那么工作线程应该减少并退出
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 检查工作线程是否应当在没有任务执行时,经过keepAliveTime之后被终止
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // 工作线程数超出最大线程数 或者 超出核心线程数且上一次poll()超时
            if ((wc > maximumPoolSize || (timed && timedOut))
                // 工作线程数大于1 或 队列为空
                && (wc > 1 || workQueue.isEmpty())) {
                // CAS减少工作线程数
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 从任务队列获取任务(根据timed标志,决定使用poll还是take)
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 超时等待
                workQueue.take(); // 阻塞等待
                // 成功获取到任务
                if (r != null)  
                    return r;
                // 如果poll()超时,则设置timedOut标志
                timedOut = true;
            } catch (InterruptedException retry) {
                // 如果在等待任务时线程被中断,重置timedOut标志并重新尝试获取任务
                timedOut = false;
            }
        }
    }
}

整个 runWorker 及 getTask 方法以及线程超时退出的机制如图所示:

在这里插入图片描述

6、使用示例

public class ThreadPoolExecutorDemo {

    public static void main(String[] args) {
        // 创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, 5, 10, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(10),
            new ThreadPoolExecutor.AbortPolicy()
        );

        for (int i = 0; i < 20; i++) {
            int index = i + 1;
            try {
                // 提交任务
                executor.execute(() -> {
                    System.out.println("任务" + index + "正在运行");
                    while (true) {
                        // 让线程一直执行
                    }
                });
            } catch (RejectedExecutionException e) {
                System.out.println("丢弃任务: " + index);
            }
        }

        System.out.println("线程池阻塞队列中的任务数:" + executor.getQueue().size());
    }

}

四、Executors 工具类(了解)

Executors线程池工具类,提供四种快捷创建线程池的静态方法,可以创建多种类型的 ThreadPoolExecutor

1、newFixedThreadPool

public class Executors {
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
}

newFixedThreadPool 创建一个固定线程数的线程池

  • 最大线程数 = 核心线程数 = 指定数量
  • 无界阻塞队列 LinkedBlockingQueue

2、newSingleThreadExecutor

public class Executors {
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
}

newSingleThreadExecutor 创建一个只有一个线程的线程池

  • 最大线程数 = 核心线程数 = 1
  • 无界阻塞队列 LinkedBlockingQueue

3、newCachedThreadPool

public class Executors {
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
}

newCachedThreadPool 创建一个动态调整线程数的线程池

  • 最大线程数 = Integer.MAX_VALUE(约21亿,可能导致OOM)
  • 同步阻塞队列 SynchronousQueue(没有容量)

4、newScheduledThreadPool

public class Executors {
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
}
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 
              0, NANOSECONDS, 
              new DelayedWorkQueue());
    }
}

newScheduledThreadPool 创建一个支持延迟任务和周期性任务的线程池 ScheduledThreadPoolExecutor

  • 最大线程数 = Integer.MAX_VALUE(约21亿,可能导致OOM)
  • 延迟阻塞队列 DelayedWorkQueue

五、线程池最佳实践

1、正确声明线程池

《阿里巴巴 Java 开发手册》强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 构造函数的方式

首先,使用 Executors 创建线程池有很多弊端:

  • CachedThreadPoolScheduledThreadPool

    允许创建的最大线程数为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

  • FixedThreadPoolSingleThreadExecutor

    使用默认的 LinkedBlockingQueue,队列长度默认为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。

除了避免 OOM 的原因之外,不推荐使用 Executors 快捷创建线程池的原因还有:

  • 实际使用中需要根据自己机器的性能、业务场景来手动配置线程池的参数比如核心线程数、使用的任务队列、饱和策略等等。
  • 应该显示地给我们的线程池命名,这样有助于我们定位问题。

因此,正确声明线程池,应该通过 ThreadPoolExecutor 构造函数,结合生产环境,具体问题具体分析。

2、线程数量 怎么配置

线程数的设置主要取决于业务是 IO 密集型还是 CPU 密集型

CPU密集型
    定义:任务需要大量的运算,并且没有阻塞,CPU一直在全速运行
    配置:最大线程数 = CPU核数 + 1
    分析:线程开太多也没有处理器执行,+1是为了防止线程偶发的缺页中断

I/O密集型
    定义:任务存在大量的I/O,导致大量的运算能力浪费在等待I/O上,CPU使用率比较低。
    配置:最大线程数 = CPU核数 * 2
    分析:线程在处理I/O时,不会占用 CPU 来处理,启用其他线程继续使用 CPU,以提高 CPU 的使用率。
// Java获取 CPU 核心数的方法
Runtime.getRuntime().availableProcessors();

控制线程创建数量,避免使用 Integer.MAX_VALUE 导致OOM。

3、阻塞队列 怎么选择

尽量使用有界队列

ArrayBlockingQueue 对比 LinkedBlockingQueue

  • ArrayBlockingQueue 使用一个单独的 ReentrantLock 来控制对队列的访问

  • LinkedBlockingQueue 使用两个锁(putLock 和 takeLock),一个用于放入操作,另一个用于取出操作。

    这可以提供更细粒度的控制,并可能减少线程之间的竞争。

但是 LinkedBlockingQueue 默认构造的长度默认为 Integer.MAX_VALUE,可能导致OOM,需要使用指定长度的构造。

4、线程工厂 怎么配置

建议通过实现 ThreadFactory 接口,自定义线程工厂(例如设置线程的名称),方便问题排查。

1)默认的线程工厂

创建线程池时,如果不指定 ThreadFactory,默认使用 Executors.defaultThreadFactory()

public class Executors {
    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }

    static class DefaultThreadFactory implements ThreadFactory {
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
            Thread.currentThread().getThreadGroup();
            // 默认的线程名:pool-poolNumber-thread-n
            namePrefix = "pool-" +
                poolNumber.getAndIncrement() +
                "-thread-";
        }
    }
}

2)自定义线程工厂

通过实现 ThreadFactory 接口,自定义线程工厂:

/**
 * 线程工厂,它设置线程名称,有利于我们定位问题。
 */
public final class CustomThreadFactory implements ThreadFactory {

    private final AtomicInteger threadNum = new AtomicInteger();
    private final ThreadFactory delegate;
    private final String namePrefix;

    /**
     * 创建一个带名字的线程池生产工厂
     */
    public CustomThreadFactory(ThreadFactory delegate, String namePrefix) {
        this.delegate = delegate;
        this.namePrefix = namePrefix;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = delegate.newThread(r);
        t.setName(namePrefix + " [" + threadNum.incrementAndGet() + "]");
        return t;
    }

}
@Slf4j
public class CustomThreadFactoryTest {

    private static final String namePrefix = "custom-thread-";

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, 5, 10, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(10),
            // 指定自定义的线程工厂
            new CustomThreadFactory(Executors.defaultThreadFactory(), namePrefix),
            new ThreadPoolExecutor.AbortPolicy()
        );

        for (int i = 0; i < 20; i++) {
            int index = i + 1;
            try {
                executor.execute(() -> {
                    log.info("任务{}正在运行", index);
                    while (true) {
                        // 让线程一直执行
                    }
                });
            } catch (RejectedExecutionException e) {
                 // Todo Handle Exception
            }
        }
    }
}

运行结果如下所示:

14:35:57.995 [custom-thread- [1]] INFO com.demo.threadpool.CustomThreadFactoryTest - 任务1正在运行
14:35:57.996 [custom-thread- [3]] INFO com.demo.threadpool.CustomThreadFactoryTest - 任务13正在运行
14:35:57.995 [custom-thread- [2]] INFO com.demo.threadpool.CustomThreadFactoryTest - 任务2正在运行
14:35:58.019 [custom-thread- [4]] INFO com.demo.threadpool.CustomThreadFactoryTest - 任务14正在运行
14:35:58.020 [custom-thread- [5]] INFO com.demo.threadpool.CustomThreadFactoryTest - 任务15正在运行

5、拒绝策略 怎么配置

  • 不允许任务丢弃:CallerRunsPolicy
  • 允许任务丢弃:其他几种

可以实现 RejectedExecutionHandle 接口,自定义拒绝策略

  • 自定义拒绝策略可以根据具体的业务需求,例如记录日志、将任务放入消息队列/缓存/数据库等。
@Slf4j
public class CustomRejectPolicy implements RejectedExecutionHandler {

    public CustomRejectPolicy() {
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            log.error("线程池正在跑主线程,请注意!");
            r.run();
        }
    }
}

6、Spring 线程池

使用 Spring 内部线程池时,一定要手动自定义线程池,配置合理的参数,不然会出现生产问题(一个请求创建一个线程)。

@Configuration
@EnableAsync
public class ThreadPoolExecutorConfig {

    @Bean(name="threadPoolExecutor")
    public Executor threadPoolExecutor() {
        ThreadPoolTaskExecutor threadPoolExecutor = new ThreadPoolTaskExecutor();
        // 获取当前系统的处理器数量
        int processNum = Runtime.getRuntime().availableProcessors();
        int corePoolSize = (int) (processNum / (1 - 0.2)); 	// 核心线程数
        int maxPoolSize = (int) (processNum / (1 - 0.5)); 	// 最大线程数(I/O密集型)
        threadPoolExecutor.setCorePoolSize(corePoolSize);
        threadPoolExecutor.setMaxPoolSize(maxPoolSize);
        threadPoolExecutor.setQueueCapacity(maxPoolSize * 1000);
        threadPoolExecutor.setThreadPriority(Thread.MAX_PRIORITY);
        threadPoolExecutor.setDaemon(false);
        threadPoolExecutor.setKeepAliveSeconds(300);
        threadPoolExecutor.setThreadNamePrefix("test-Executor-"); // 线程名字前缀
        return threadPoolExecutor;
    }
}

8、不同业务使用不同的线程池

如果多个业务使用相同的线程池,可能会造成“死锁”。下面举个例子:

@Service
@Slf4j
public class TestService {
    @Resource(name = "threadPoolExecutor")
    private ExecutorService taskExecutor;

    public void method1() {
        taskExecutor.execute(() -> method1());
    }

    public void method2() {
        method3();
        method4();
    }

    public void method3() {
        // other operation...
    }

    public void method4() {
        taskExecutor.execute(() -> System.out.println("xxxx"));
    }
}

可以看到,method1()method4() 使用了同一个线程池。

试想这样一种极端情况:

  • 线程池的核心线程数和最大线程数都为N,method1()的任务数也为N。
  • 由于method1()把线程池的资源用完了,所以 method4() 无法获取到线程资源,一直被阻塞在队列中无法正常执行。

此时 method1() 在等 method4()执行,而 method4()又在等method1()释放线程池资源,也就形成了“死锁”

7、线程池 和 ThreadLocal 共用的坑

线程池和 ThreadLocal共用,可能会导致线程从ThreadLocal获取到的是旧值/脏数据。这是因为线程池会复用线程对象,与线程对象绑定的类的静态属性 ThreadLocal 变量也会被重用,这就导致一个线程可能获取到其他线程的ThreadLocal 值。

解决上述问题比较建议的办法是使用阿里巴巴开源的 TransmittableThreadLocal(TTL)。

TransmittableThreadLocal类继承并加强了 JDK 内置的InheritableThreadLocal类,在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/391893.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

永久内核映射

内存就像墙上的气球&#xff0c;32体系就好比是小孩&#xff0c;64位体系好比是大人。对于位置比较低的气球&#xff0c;抬抬脚就可以够到&#xff0c;这些气球相当于DMA内存(可用于设备直接内存访问)&#xff1b;位置再高一点的气球&#xff0c;小孩伸手可以够到&#xff0c;这…

AutoKeras(Python自动化机器学习)多模态数据和多任务

要点拓扑 AutoKeras 拓扑 要点 常规机器学习&#xff1a;scikit-learn示例探索性数据分析和数据预处理&#xff0c;线性回归&#xff0c;决策树图像分类ResNet模型示例&#xff0c;合成数据集DenseNet模型示例绘图线性回归和决策树模型使用Python工具seaborn、matplotlib、pan…

import tensorflow_hub报错

问题&#xff1a; 导入tensorflow_hub报ModuleNotFoundError: No module named ‘tensorflow.python.checkpoint’ 解决&#xff1a; tensorflow-estimator版本不对 和tensorflow&#xff08;2.6.0&#xff09;版本一致 。 pip install -U tensorflow-estimator2.6.0 验证&a…

一个收集了大量的C#/.NET/.NET Core项目宝库组织

项目宝库介绍 为.NET开发者提供一个寻找优秀C#/.NET/.NET Core项目和框架的入口&#xff0c;通过了解和对比更多的项目和框架来选择最适合我们自己学习、工作开发的一套项目或者框架。优秀的项目不应该被埋没&#xff0c;欢迎大家一起加入这个组织共同完善、发展.NET社区&…

线程和进程【并发和并行、线程上下文切换、线程的状态】

线程和进程【并发和并行、线程上下文切换、线程的状态】 什么是并发与并行&#xff1f;什么是线程上下文切换&#xff1f;线程状态&#xff1a;一个线程的一生 转自 极客时间 进程&#xff1a;是指内存中运行的一个应用程序&#xff0c;每个进程都有自己独立的内存空间&#x…

RapidMiner数据挖掘2 —— 初识RapidMiner

本节由一系列练习与问题组成&#xff0c;这些练习与问题有助于理解多个基本概念。它侧重于各种特定步骤&#xff0c;以进行直接的探索性数据分析。因此&#xff0c;其主要目标是测试一些检查初步数据特征的方法。大多数练习都是关于图表技术&#xff0c;通常用于数据挖掘。 为此…

51_蓝桥杯_蜂鸣器与继电器

一 电路 二 蜂鸣器与继电器工作原理 2.1蜂鸣器与继电器 2.2 十六进制与二进制 二进制 0000 0001 0010 0011 0100 0101 0110 0111 1000 1001 1010 1011 1100 1101 1110 1111 十六进制 0 1 2 3 4 5 6 7 8 9 A B C D E F 2.3非门 二 代码 …

C++初阶(十一) list

一、list的介绍及使用 1.1 list的介绍 list的文档介绍 1. list是可以在常数范围内在任意位置进行插入和删除的序列式容器&#xff0c;并且该容器可以前后双向迭代。 2. list的底层是双向链表结构&#xff0c;双向链表中每个元素存储在互不相关的独立节点中&#xff0c;在节点…

第三节作业:基于 InternLM 和 LangChain 搭建你的知识库

参考文档&#xff1a;https://github.com/InternLM/tutorial/tree/main/langchain 基础作业&#xff1a;复现课程知识库助手搭建过程 (截图) 1.环境配置 2.知识库搭建 &#xff08;1&#xff09;数据收集 收集由上海人工智能实验室开源的一系列大模型工具开源仓库作为语料库来…

004 - Hugo, 分类

004 - Hugo, 分类content文件夹 004 - Hugo, 分类 content文件夹 ├─.obsidian ├─categories │ ├─Python │ └─Test ├─page │ ├─about │ ├─archives │ ├─links │ └─search └─post├─chinese-test├─emoji-support├─Git教程├─Hugo分类├─…

STL:优先级队列的实现

STL中优先级队列本质上就是堆。在上一篇博客中讲到过&#xff1a;堆是一种完全二叉树&#xff0c;逻辑结构上看起来像树&#xff0c;但在物理结构中是存储在线性表中。与普通线性表不同的是&#xff0c;堆中数据大小是规律排列的&#xff1a;小堆中每个节点都大于它的父节点&am…

2024免费人像摄影后期处理工具Portraiture4.1

Portraiture作为一款智能磨皮插件&#xff0c;确实为Photoshop和Lightroom用户带来了极大的便利。通过其先进的人工智能算法&#xff0c;它能够自动识别并处理照片中的人物皮肤、头发和眉毛等部位&#xff0c;实现一键式的磨皮美化效果&#xff0c;极大地简化了后期处理的过程。…

QKD安全攻击防御方案分析和分级评估研究报告

今天分享的是行业报告&#xff1a;《QKD安全攻击防御方案分析和分级评估研究报告》 &#xff08;内容出品方&#xff1a;量子信息网络产业联盟&#xff09; 报告共计&#xff1a;180页 来源&#xff1a;《见鹿报告》 前言 量子通信是量子信息科学的重要分支&#xff0c;它…

人工智能学习与实训笔记(十四):Langchain之Agent

人工智能专栏文章汇总&#xff1a;人工智能学习专栏文章汇总-CSDN博客 本篇目录 0、概要 1、Agent整体架构 2、langchain中agent实现 3、Agent业务实现逻辑 0、概要 Agent是干什么的&#xff1f; Agent的核心思想是使用语言模型&#xff08;LLM&#xff09;作为推理的大脑…

飞行路线(分层图+dijstra+堆优化)(加上题目选数复习)

飞行路线 这一题除了堆优化和dijstra算法和链式前向星除外还多考了一个考点就是&#xff0c;分层图&#xff0c;啥叫分层图呢&#xff1f;简而言之就是一个三维的图&#xff0c;按照其题意来说有几个可以免费的点就有几层&#xff0c;而且这个分层的权值为0&#xff08;这样就相…

嵌入式Qt 计算器界面设计

一.计算器界面设计 计算机界面程序分析&#xff1a; 需要用到的组件&#xff1a; 界面设计&#xff1a; 界面设计实现&#xff1a; 实验1&#xff1a;计算器界面设计 #include <QtGui/QApplication> #include <QWidget> //主窗口 #include <QLineEdit> //文…

由斐波那契数列探究递推与递归

斐波那契数列定义&#xff1a; 斐波那契数列大家都非常熟悉。它的定义是&#xff1a; 对于给定的整数 x &#xff0c;我们希望求出&#xff1a; f ( 1 ) f ( 2 ) … f ( x ) f(1)f(2)…f(x) f(1)f(2)…f(x) 的值。 有两种方法,分别是递推(迭代)与递归 具体解释如下图 备注…

Mysql知识点汇总

Mysql知识点汇总 1. Mysql基本场景的简单语句。2. Mysql的增删改查&#xff0c;统计表中的成绩最好的两个同学的名字&#xff0c;年级等。3&#xff1a;请使用多种方法查询每个学生的每门课分数>80的学生姓名4、order by&#xff0c;group by&#xff0c;子查询4.1、having和…

优化嵌入式系统电源管理以提高稳定性

&#xff08;本文为简单介绍&#xff0c;观点源于网络&#xff09; 在嵌入式系统的领域中&#xff0c;电源管理扮演着至关重要的角色&#xff0c;关乎系统稳定性与用户体验。如果电源管理做得不好&#xff0c;就可能导致系统不稳定、数据丢失&#xff0c;甚至硬件损坏。电源管…

springboot186人格障碍诊断系统

简介 【毕设源码推荐 javaweb 项目】基于springbootvue 的 适用于计算机类毕业设计&#xff0c;课程设计参考与学习用途。仅供学习参考&#xff0c; 不得用于商业或者非法用途&#xff0c;否则&#xff0c;一切后果请用户自负。 看运行截图看 第五章 第四章 获取资料方式 **项…