Java自定义IO密集型和CPU密集型线程池

文章目录

  • 前言
  • 线程池各类场景描述
  • 常见场景案例设计思路
    • 公共类
      • 自定义工厂类-MyThreadFactory
      • 自定义拒绝策略-RejectedExecutionHandlerFactory
      • 自定义阻塞队列-TaskQueue(实现 核心线程->最大线程数->队列)
    • 场景1:CPU密集型场景
      • 思路&计算公式
      • 实现代码
    • 场景2:IO密集型场景
      • 思路&计算公式
      • 实现代码
  • 其他部分组成
    • 拒绝策略兜底方案
      • 思路设计及思考
      • 设计1:数据库持久化方案
      • 设计2:Netty两种拒绝策略实现(根据场景来进行是否重试入队 + 失败抛异常)
      • 设计3:ActiveMQ(有效时间内尝试入队+入队失败抛出异常)
      • 设计4:dubbo设计思路(dump文件+抛出异常)
      • 设计5: 自定义设计-阻塞入队
  • 参考文章

稿定智能设计202502031721

前言

本章节配套源码:

  • gitee:https://gitee.com/changluJava/demo-exer/tree/master/JUC/src/main/java/demo10

线程池各类场景描述

**类型场景:**不同的场景设置参数也各不相同

  • 第一种:CPU密集型:最大线程数应该等于CPU核数+1,这样最大限度提高效率。
// 通过该代码获取当前运行环境的cpu核数
Runtime.getRuntime().availableProcessors();
  • **第二种:**IO密集型:主要是进行IO操作,执行IO操作的时间较长,这时cpu出于空闲状态,导致cpu的利用率不高。线程数为2倍CPU核数。当其中的线程在IO操作的时候,其他线程可以继续用cpu,提高了cpu的利用率。

  • 第三种:混合型:如果CPU密集型和IO密集型执行时间相差不大那么可以拆分;如果两种执行时间相差很大,就没必要拆分了。

  • **第四种(了解):**在IO优化中,线程等待时间所占比越高,需要线程数越多;线程cpu时间占比越高,需要越少线程数。

线程池初始化所有参数:

corePoolSize : 核心线程数,当线程池中的线程数量为 corePoolSize 时,即使这些线程处于空闲状态,也不会销毁(除非设置 allowCoreThreadTimeOut)。
maximumPoolSize : 最大线程数,线程池中允许的线程数量的最大值。
keepAliveTime : 线程空闲时间,当线程池中的线程数大于 corePoolSize 时,多余的空闲线程将在销毁之前等待新任务的最长时间。
workQueue : 任务队列
unit : 线程空闲时间的单位。
threadFactory : 线程工厂,线程池创建线程时使用的工厂。
handler : 拒绝策略,因达到线程边界和任务队列满时,针对新任务的处理方法。
	CallerRunsPolicy:由提交任务的线程直接执行任务,避免任务丢失。适合任务量波动较大的场景。
	AbortPolicy:直接抛出 RejectedExecutionException 异常。适合任务量可控的场景。
	DiscardPolicy:静默丢弃任务,不抛出异常。适合对任务丢失不敏感的场景。
	DiscardOldestPolicy:丢弃队列中最旧的任务,然后重新尝试提交当前任务。适合对任务时效性要求较高的场景。

核心线程池execute逻辑代码:

public void execute(Runnable command) {
		//任务判空
        if (command == null)
            throw new NullPointerException();
       	//查看当前运行的线程数量
        int c = ctl.get();
    	//若小于核心线程则直接添加一个工作线程并执行任务
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果线程数等于核心线程数则尝试将任务入队
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //入队失败,调用addWorker参数为false,尝试创建应急线程处理突发任务
        else if (!addWorker(command, false))
        	//如果创建应急线程失败,说明当前线程数已经大于最大线程数,这个任务只能拒绝了
            reject(command);
    }

image-20250201004416120


常见场景案例设计思路

公共类

image-20250202175504266

自定义工厂类-MyThreadFactory

MyThreadFactory.java:自定义了线程池工厂类,可以自行进行命名

package demo10;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 自定义线程池工厂类
 */
public class MyThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    public MyThreadFactory(String factoryName) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                Thread.currentThread().getThreadGroup();
        namePrefix = factoryName + "-pool-" +
                poolNumber.getAndIncrement() +
                "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                namePrefix + threadNumber.getAndIncrement(),
                0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

自定义拒绝策略-RejectedExecutionHandlerFactory

RejectedExecutionHandlerFactory.java:包含有多种拒绝策略,其中包含本次需要使用的阻塞入队拒绝策略

package demo10;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 拒绝策略工厂类
 *
 */
@Slf4j
public class RejectedExecutionHandlerFactory {

    private static final AtomicLong COUNTER = new AtomicLong();

    /**
     * 拒绝执行,抛出 RejectedExecutionException
     * @param source name for log
     * @return A handler for tasks that cannot be executed by ThreadPool
     */
    public static RejectedExecutionHandler newAbort(String source) {
        return (r, e) -> {
            log.error("[{}] ThreadPool[{}] overload, the task[{}] will be Abort, Maybe you need to adjust the ThreadPool config!", source, e, r);
            throw new RejectedExecutionException("Task " + r.toString() +
                    " rejected from " + source);
        };
    }

    /**
     * 直接丢弃该任务
     * @param source log name
     * @return A handler for tasks that cannot be executed by ThreadPool
     */
    public static RejectedExecutionHandler newDiscard(String source) {
        return (r, p) -> {
            log.error("[{}] ThreadPool[{}] overload, the task[{}] will be Discard, Maybe you need to adjust the ThreadPool config!", source, p, r);
        };
    }

    /**
     * 调用线程运行
     * @param source log name
     * @return A handler for tasks that cannot be executed by ThreadPool
     */
    public static RejectedExecutionHandler newCallerRun(String source) {
        System.out.println("thread =>" + Thread.currentThread().getName() + "触发阻塞中...");
        return (r, p) -> {
            log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread, Maybe you need to adjust the ThreadPool config!", source, p, r);
            if (!p.isShutdown()) {
                r.run();
            }
        };
    }

    /**
     * 新线程运行
     * @param source log name
     * @return A handler for tasks that cannot be executed by ThreadPool
     */
    public static RejectedExecutionHandler newThreadRun(String source) {
        return (r, p) -> {
            log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by a new thread!, Maybe you need to adjust the ThreadPool config!", source, p, r);
            if (!p.isShutdown()) {
                String threadName = source + "-T-" + COUNTER.getAndIncrement();
                log.info("[{}] create new thread[{}] to run job", source, threadName);
                new Thread(r, threadName).start();
            }
        };
    }

    /**
     * 依据阻塞队列put 阻塞添加到队列中
     * @return 拒绝策略执行器
     */
    public static RejectedExecutionHandler blockCallerPolicy(String source) {
        return new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread, Maybe you need to adjust the ThreadPool config!", source, e, r);
                if (!e.isShutdown()) {
                    try {
                        // 阻塞入队操作,阻塞方为调用方执行submitjob的线程
                        e.getQueue().put(r);
                    } catch (InterruptedException ex) {
                        log.error("reject put queue error", ex);
                    }
                }
            }
        };
    }


}

自定义阻塞队列-TaskQueue(实现 核心线程->最大线程数->队列)

TaskQueue.java:线程池中实现先使用核心线程数

package demo10;

import java.util.concurrent.*;

public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {

    private transient ThreadPoolExecutor parent;

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public void setExecutor(ThreadPoolExecutor parent) {
        this.parent = parent;
    }

    /**
     * 核心线程 -> 最大核心线程数 -> 队列
     * @param runnable the element to add
     * @return
     */
    @Override
    public boolean offer(Runnable runnable) {
        // 如果没有线程池父类,则直接尝试入队
        if (parent == null) return super.offer(runnable);
        // 若是工作线程数 < 最大线程数,则优先创建线程跑任务
        if (parent.getPoolSize() < parent.getMaximumPoolSize()) return false;
        // 工作线程数 >= 最大线程数,入队
        return super.offer(runnable);
    }
}

场景1:CPU密集型场景

思路&计算公式

**场景:**具体是指那种包含大量运算、在持有的 CPU 分配的时间片上一直在执行任务、几乎不需要依赖或等待其他任何东西。处理起来其实没有多少优化空间,因为处理时几乎没有等待时间,所以一直占有 CPU 进行执行,才是最好的方式。

**可优化的点:**就是当单个线程累计较多任务时,其他线程能进行分担,类似fork/join框架的概念。

设置参数:设置线程数时,针对单台机器,最好就是有几个 CPU ,就创建几个线程,然后每个线程都在执行这种任务,永不停歇。

Nthreads=Ncpu+1 
w/c =0 
理解也是正确的,+1 主要是防止因为系统上下文切换,让系统资源跑满!

实现代码

image-20250202184351200

这里核心+最大线程数使用的是CPU核心数+1:

package demo10.cpu;

import demo10.MyThreadFactory;
import demo10.RejectedExecutionHandlerFactory;
import demo10.TaskQueue;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * cpu密集型场景任务提交
 *  自定义队列:核心线程 -> 最大线程 -> 队列
 *  自定义拒绝策略:自定义采用执行阻塞队列的put操作来实现任务阻塞入队,而非直接使用调用者线程来直接跑任务
 *  非影响主线程执行流程:批次1000个任务统一在一个线程中去进行处理,与主流程main线程隔离
 *
 */
@Slf4j
public class CPUThreadPoolExample {

    public static void main(String[] args) {
        // 获取 CPU 核心数
        int cpuCores = Runtime.getRuntime().availableProcessors();

        // 自定义线程池参数
        int corePoolSize = cpuCores + 1; // 核心线程数 cpu核心数+1
        int maximumPoolSize = corePoolSize; // 最大线程数 cpu核心数+1
        long keepAliveTime = 60L; // 空闲线程存活时间
        TimeUnit unit = TimeUnit.SECONDS; // 时间单位
        // 自定义任务队列 核心线程 -> 最大核心线程数 -> 队列
        TaskQueue<Runnable> taskQueue = new TaskQueue<>(500); // 队列容量为核心线程数的 2 倍
        // 创建自定义线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                taskQueue,
                new MyThreadFactory("IOIntensiveThreadPool"), // 默认线程工厂 Executors.defaultThreadFactory() | 自定义工厂支持自定义线程池名字
                RejectedExecutionHandlerFactory.blockCallerPolicy("IOIntensiveThreadPool")
        );
        // 将线程池对象设置到任务队列中
        taskQueue.setExecutor(executor);

        // 统计任务的执行数量
        int jobNums = 1000000;
        final AtomicInteger count = new AtomicInteger(0);

        // 记录任务开始时间
        long startTime = System.currentTimeMillis();
        // 单独开一个线程(后续可改为线程池 核心、最大就1个场景)去完成整个任务提交处理
        // 如果submitjob阻塞,仅仅只会影响该thread线程
        new Thread(() -> {
            CountDownLatch latch = new CountDownLatch(jobNums);
            // 模拟1000个任务 (可改造为queue队列形式去在这个线程中去消费)
            for (int i = 0; i < jobNums; i++) {
                final int taskId = i;
                executor.submit(() -> {
                    // CPU计算
                    int sum = 0;
                    for (int j = 0; j < 100000; j++) {
                        sum += j;
                    }
                    System.out.println(Thread.currentThread().getName() + " 任务 " + taskId + " 完成!sum = " + sum);
                    count.incrementAndGet(); // 原子操作,+1 并返回新值
                    latch.countDown();
                });
            }
            System.out.println("所有任务提交完成!");
            // 关闭线程池,等待任务全部执行完毕
            try {
                latch.await();
                System.out.println("所有任务执行结束!");
                // 记录任务结束时间
                long endTime = System.currentTimeMillis();
                // 计算任务执行时间
                long duration = endTime - startTime;
                System.out.println("任务执行总耗时: " + duration + " 毫秒");
            } catch (InterruptedException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }finally {
                executor.shutdown();
            }
        }).start();

        try {
            // 等待所有任务完成
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow(); // 强制关闭
            }
            System.out.println("执行完任务数统计:" + count.get());
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
    }
}

效果:

image-20250202184444336


场景2:IO密集型场景

思路&计算公式

**场景:**其消耗的主要资源就是 IO 了,所接触到的 IO ,大致可以分成两种:磁盘 IO网络 IO。IO 操作的特点就是需要等待,我们请求一些数据,由对方将数据写入缓冲区,在这段时间中,需要读取数据的线程根本无事可做,因此可以把 CPU 时间片让出去,直到缓冲区写满。

  • 磁盘 IO ,大多都是一些针对磁盘的读写操作,最常见的就是文件的读写,假如你的数据库、 Redis 也是在本地的话,那么这个也属于磁盘 IO。
  • 网络 IO ,这个应该是大家更加熟悉的,我们会遇到各种网络请求,比如 http 请求、远程数据库读写、远程 Redis 读写等等。

设置参数:

# 如果存在IO,那么肯定w/c>1(阻塞耗时一般都是计算耗时的很多倍),但是需要考虑系统内存有限(每开启一个线程都需要内存空间),这里需要上服务器测试具体多少个线程数适合(CPU占比、线程数、总耗时、内存消耗)。如果不想去测试,保守点取1即,Nthreads=Ncpu*(1+1)=2Ncpu。这样设置一般都OK
# 通用就是2倍的CPU核心数(如果要效率最大化,需要测算当前系统环境每个线程任务的阻塞等待时间与实际计算时间)
Nthreads=Ncpu*(1+w/c)
公式中 W/C 为系统 阻塞率  w:等待时间 c:计算时间

实现代码

image-20250202183217126

IOIntensiveThreadPoolExample2.java:这里最终实现的Example2类来进行测试

package demo10.io;

import demo10.MyThreadFactory;
import demo10.RejectedExecutionHandlerFactory;
import demo10.TaskQueue;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * io密集型场景任务提交
 * demo3:基于demo2自定义拒绝策略
 *  自定义队列:核心线程 -> 最大线程 -> 队列
 *  自定义拒绝策略:自定义采用执行阻塞队列的put操作来实现任务阻塞入队,而非直接使用调用者线程来直接跑任务
 *  非影响主线程执行流程:批次1000个任务统一在一个线程中去进行处理,与主流程main线程隔离
 *
 */
@Slf4j
public class IOIntensiveThreadPoolExample2 {

    public static void main(String[] args) {
        // 获取 CPU 核心数
        int cpuCores = Runtime.getRuntime().availableProcessors();

        // 自定义线程池参数
        int corePoolSize = cpuCores * 2; // 核心线程数(IO 密集型任务可以设置较大)
        int maximumPoolSize = cpuCores * 4; // 最大线程数
        long keepAliveTime = 60L; // 空闲线程存活时间
        TimeUnit unit = TimeUnit.SECONDS; // 时间单位
        // 自定义任务队列 核心线程 -> 最大核心线程数 -> 队列
        TaskQueue<Runnable> taskQueue = new TaskQueue<>(corePoolSize * 2); // 队列容量为核心线程数的 2 倍
        // 创建自定义线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                taskQueue,
                new MyThreadFactory("IOIntensiveThreadPool"), // 默认线程工厂 Executors.defaultThreadFactory() | 自定义工厂支持自定义线程池名字
                RejectedExecutionHandlerFactory.blockCallerPolicy("IOIntensiveThreadPool")
        );
        // 将线程池对象设置到任务队列中
        taskQueue.setExecutor(executor);

        // 统计任务的执行数量
        int jobNums = 1000;
        final AtomicInteger count = new AtomicInteger(0);

        // 记录任务开始时间
        long startTime = System.currentTimeMillis();
        // 单独开一个线程(后续可改为线程池 核心、最大就1个场景)去完成整个任务提交处理
        // 如果submitjob阻塞,仅仅只会影响该thread线程
        new Thread(() -> {
            CountDownLatch latch = new CountDownLatch(jobNums);
            // 模拟1000个任务 (可改造为queue队列形式去在这个线程中去消费)
            for (int i = 0; i < jobNums; i++) {
                final int taskId = i;
                executor.submit(() -> {
                    System.out.println(Thread.currentThread().getName() + " 正在执行任务 " + taskId + "...");
                    try {
                        Thread.sleep(500); // 模拟 IO 操作(如网络请求或文件读写)10s
                        // xxxio类耗时操作
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        throw new RuntimeException(e);
                    }finally {
                        System.out.println(Thread.currentThread().getName() + " 任务 " + taskId + " 完成!");
                        count.incrementAndGet(); // 原子操作,+1 并返回新值
                        latch.countDown();
                    }
                });
            }
            System.out.println("所有任务提交完成!");
            // 关闭线程池,等待任务全部执行完毕
            try {
                latch.await();
                System.out.println("所有任务执行结束!");
                // 记录任务结束时间
                long endTime = System.currentTimeMillis();
                // 计算任务执行时间
                long duration = endTime - startTime;
                System.out.println("任务执行总耗时: " + duration + " 毫秒");
            } catch (InterruptedException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }finally {
                executor.shutdown();
            }
        }).start();

        try {
            // 等待所有任务完成
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow(); // 强制关闭
            }
            System.out.println("执行完任务数统计:" + count.get());
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
    }
}

一个任务耗时0.5s,1000个任务执行如下:

image-20250202183457306

说明:经过测试验证,如果IO阻塞时间特别长,调大最大核心线程数效果更好。


其他部分组成

拒绝策略兜底方案

思路设计及思考

如果核心线程、最大线程、队列都满了的情况下该如何处理?如果本身就是单台机器资源打满,就需要在设计策略上改变线程池的调度方案,如果我的目的是任何一个任务都不丢弃,同时在服务器上有余力及时处理?

方案1:持久化数据库设计

  • 如:设计一张任务表间任务存储到 MySQL 数据库中;redis缓存;任务提交到中间件来缓冲。

设计思路可以如下:参考https://zhuanlan.zhihu.com/p/700719289

image-20250201084054264

方案2:Netty 为例,它的拒绝策略则是直接创建一个线程池以外的线程处理这些任务,为了保证任务的实时处理,这种做法可能需要良好的硬件设备且临时创建的线程无法做到准确的监控。

  • 后续通过翻阅源码发现一种在拒绝策略场景带退避的重试策略

方案3:ActiveMQ 则是尝试在指定的时效内尽可能的争取将任务入队,以保证最大交付

**方案4:**dubbo设计思路(dump文件+抛出异常)

方案5:线程阻塞队列

思路:队列采用阻塞队列,在拒绝策略方法中使用put方法实现阻塞效果。

可能情况:阻塞主线程任务执行。


设计1:数据库持久化方案

设计思路:自定义拒绝策略,在拒绝策略情况下进行数据库持久化;自定义实现队列,在poll的时候优先从db获取任务,接着再从队列中获取。

**详细具体实现可见:**某大厂线程池拒绝策略连环问 https://blog.csdn.net/shark_chili3007/article/details/137042400


设计2:Netty两种拒绝策略实现(根据场景来进行是否重试入队 + 失败抛异常)

实现思路1:创建新线程执行任务

说明:为了保证任务的实时处理,这种做法需要良好的硬件设备且临时创建的线程无法做到准确的监控

private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
    NewThreadRunsPolicy() {
        super();
    }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            //创建一个临时线程处理任务
            final Thread t = new Thread(r, "Temporary task executor");
            t.start();
        } catch (Throwable e) {
            throw new RejectedExecutionException(
                    "Failed to start a new thread", e);
        }
    }
}

弊端:如果任务数特别多无上限场景,就会出现oom情况,导致服务挂掉。

实现思路2:拒绝策略场景带退避的重试策略

源码地址:https://github.dev/netty/netty

  • 具体代码文件:RejectedExecutionHandlers
/**
 * Tries to backoff when the task can not be added due restrictions for an configured amount of time. This
 * is only done if the task was added from outside of the event loop which means
 * {@link EventExecutor#inEventLoop()} returns {@code false}.
 */
public static RejectedExecutionHandler backoff(final int retries, long backoffAmount, TimeUnit unit) {
    // 检查 retries 参数是否为正数,如果不是则抛出异常
    ObjectUtil.checkPositive(retries, "retries");

    // 将退避时间转换为纳秒
    final long backOffNanos = unit.toNanos(backoffAmount);

    // 返回一个实现了 RejectedExecutionHandler 接口的匿名类
    return new RejectedExecutionHandler() {
        @Override
        public void rejected(Runnable task, SingleThreadEventExecutor executor) {
            // 检查当前线程是否不是事件循环线程
            if (!executor.inEventLoop()) {
                // 进行最多 retries 次重试
                for (int i = 0; i < retries; i++) {
                    // 尝试唤醒事件循环线程,以便它能够处理任务队列中的任务
                    executor.wakeup(false);

                    // 当前线程休眠指定的退避时间
                    LockSupport.parkNanos(backOffNanos);

                    // 尝试将任务重新加入任务队列
                    if (executor.offerTask(task)) {
                        // 如果任务成功加入队列,则直接返回
                        return;
                    }
                }
            }
            // 如果当前线程是事件循环线程,或者重试次数用尽后仍然无法加入任务队列,
            // 则抛出 RejectedExecutionException 异常
            throw new RejectedExecutionException();
        }
    };
}

设计3:ActiveMQ(有效时间内尝试入队+入队失败抛出异常)

说明:尝试在指定的时效内尽可能的争取将任务入队,以保证最大交付,超过时间内则返回false。

github地址:https://github.dev/apache/activemq

  • 对应代码:BrokerService#getExecutor
new RejectedExecutionHandler() {
      @Override
      public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
          try {
            	// 在60s内进行尝试入队,如果入队失败,则抛出异常
              if (!executor.getQueue().offer(r, 60, TimeUnit.SECONDS)) {
                  throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
              }
          } catch (InterruptedException e) {
              throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
          }
      }
  }

设计4:dubbo设计思路(dump文件+抛出异常)

github地址:

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    String msg = String.format(
            "Thread pool is EXHAUSTED!"
                    + " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d),"
                    + " Task: %d (completed: %d),"
                    + " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
            threadName,
            e.getPoolSize(),
            e.getActiveCount(),
            e.getCorePoolSize(),
            e.getMaximumPoolSize(),
            e.getLargestPoolSize(),
            e.getTaskCount(),
            e.getCompletedTaskCount(),
            e.isShutdown(),
            e.isTerminated(),
            e.isTerminating(),
            url.getProtocol(),
            url.getIp(),
            url.getPort());

    // 0-1 - Thread pool is EXHAUSTED!
    logger.warn(COMMON_THREAD_POOL_EXHAUSTED, "too much client requesting provider", "", msg);

    if (Boolean.parseBoolean(url.getParameter(DUMP_ENABLE, Boolean.TRUE.toString()))) {
        // 进行dump文件
        dumpJStack();
    }
		// 指派发送消息给listener监听器
    dispatchThreadPoolExhaustedEvent(msg);

    throw new RejectedExecutionException(msg);
}

dubbo的工作线程触发了线程拒绝后,主要做了三个事情,原则就是尽量让使用者清楚触发线程拒绝策略的真实原因

1)输出了一条警告级别的日志,日志内容为线程池的详细设置参数,以及线程池当前的状态,还有当前拒绝任务的一些详细信息。可以说,这条日志,使用dubbo的有过生产运维经验的或多或少是见过的,这个日志简直就是日志打印的典范,其他的日志打印的典范还有spring。得益于这么详细的日志,可以很容易定位到问题所在

2)输出当前线程堆栈详情,这个太有用了,当你通过上面的日志信息还不能定位问题时,案发现场的dump线程上下文信息就是你发现问题的救命稻草。

3)继续抛出拒绝执行异常,使本次任务失败,这个继承了JDK默认拒绝策略的特性


设计5: 自定义设计-阻塞入队

在线程池初始化的时候自定义拒绝策略:阻塞入队操作,阻塞方为调用方执行submitjob的线程

new RejectedExecutionHandler() {
  @Override
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread, Maybe you need to adjust the ThreadPool config!", "IOIntensiveThreadPool", e, r);
      if (!e.isShutdown()) {
          try {
              // 阻塞入队操作,阻塞方为调用方执行submitjob的线程
              e.getQueue().put(r);
          } catch (InterruptedException ex) {
              log.error("reject put queue error", ex);
          }
      }
  }
}

如果要执行的任务数量过多,核心线程数、最大核心线程数占满、任务队列占满,此时让任务进行入队阻塞,等待队列中任务有空余位置。


参考文章

[1]. Java 线程池讲解——针对 IO 密集型任务:https://www.jianshu.com/p/66b6dfcf3173(提出dubbo 或者 tomcat 的线程池中自定义Queue的实现,核心线程数 -> 最大线程数 -> 队列中)

[2]. 某大厂线程池拒绝策略连环问 https://blog.csdn.net/shark_chili3007/article/details/137042400

[3]. 线程池拒绝策略:https://blog.csdn.net/qq_40428665/article/details/121680262

[4]. Java线程池如何合理配置核心线程数:https://www.cnblogs.com/Vincent-yuan/p/16022613.html

[5]. 线程池参数配置:https://blog.csdn.net/whp404/article/details/131960756(计算公式)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/963910.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

浅谈线段树

文章同步发布于洛谷&#xff0c;建议前往洛谷查看。 前言 蒟蒻终于学会线段树&#xff08;指【模板】线段树 1 1 1&#xff09;啦&#xff01; 线段树思想 我们先来考虑 P3372&#xff08;基础线段树模板题&#xff09;给的操作&#xff1a; 区间修改&#xff08;增加&am…

linux运行级别

运行级别&#xff1a;指linux系统在启动和运行过程中所处的不同的状态。 运行级别之间的切换&#xff1a;init (级别数) 示例&#xff1a; linux的运行级别一共有7种&#xff0c;分别是&#xff1a; 运行级别0&#xff1a;停机状态 运行级别1&#xff1a;单用户模式/救援模式…

【自开发工具介绍】SQLSERVER的ImpDp和ExpDp工具03

SQLSERVER的ImpDp和ExpDp工具 1、全部的表导出&#xff08;仅表结构导出&#xff09; 2、导出的表结构&#xff0c;导入到新的数据库 导入前&#xff0c;test3数据没有任何表 导入 导入结果确认&#xff1a;表都被做成&#xff0c;但是没有数据 3、全部的表导出&#x…

商品列表及商品详情展示

前言 本文将展示一段结合 HTML、CSS 和 JavaScript 的代码&#xff0c;实现了一个简单的商品展示页面及商品详情&#xff0c;涵盖数据获取、渲染、搜索及排序等功能。 效果展示 点击不同的商品会展示对应的商品详情。 代码部分 代码总体实现 <!DOCTYPE html> <htm…

c++提取矩形区域图像的梯度并拟合直线

c提取旋转矩形区域的边缘最强梯度点&#xff0c;并拟合直线 #include <opencv2/opencv.hpp> #include <iostream> #include <vector>using namespace cv; using namespace std;int main() {// 加载图像Mat img imread("image.jpg", IMREAD_GRAYS…

独立开发浏览器插件:案例与启示

浏览器插件&#xff08;Browser Extension&#xff09;作为提升用户浏览体验的重要工具&#xff0c;近年来吸引了许多独立开发者的关注。从广告拦截到生产力工具&#xff0c;再到个性化定制功能&#xff0c;浏览器插件的开发为个人开发者提供了一个低成本、高潜力的创业机会。本…

Linux系统 环境变量

环境变量 写在前面概念查看环境变量main函数的参数argc & argvenv bash环境变量 写在前面 对于环境变量&#xff0c;本篇主要介绍基本概念及三四个环境变量 —— PATH、HOME、PWD。其中 PATH 作为 “ 敲门砖 ”&#xff0c;我们会更详细讲解&#xff1b;理解环境变量的全局…

BFS(广度优先搜索)——搜索算法

BFS&#xff0c;也就是广度&#xff08;宽度&#xff09;优先搜索&#xff0c;二叉树的层序遍历就是一个BFS的过程。而前、中、后序遍历则是DFS&#xff08;深度优先搜索&#xff09;。从字面意思也很好理解&#xff0c;DFS就是一条路走到黑&#xff0c;BFS则是一层一层地展开。…

SpringCloud基础二(完结)

HTTP客户端Feign 在SpringCloud基础一中&#xff0c;我们利用RestTemplate结合服务注册与发现来发起远程调用的代码如下&#xff1a; String url "http://userservice/user/" order.getUserId(); User user restTemplate.getForObject(url, User.class);以上代码就…

Spring Bean 容器

技术成长&#xff0c;是对场景设计细节不断的雕刻&#xff01; 你觉得自己的技术什么时候得到了快速的提高&#xff0c;是CRUD写的多了以后吗&#xff1f;想都不要想&#xff0c;绝对不可能&#xff01;CRUD写的再多也只是能满足你作为一个搬砖工具人&#xff0c;敲击少逻辑流…

【react+redux】 react使用redux相关内容

首先说一下&#xff0c;文章中所提及的内容都是我自己的个人理解&#xff0c;是我理逻辑的时候&#xff0c;自我说服的方式&#xff0c;如果有问题有补充欢迎在评论区指出。 一、场景描述 为什么在react里面要使用redux&#xff0c;我的理解是因为想要使组件之间的通信更便捷…

利用腾讯云cloud studio云端免费部署deepseek-R1

1. cloud studio 1.1 cloud studio介绍 Cloud Studio&#xff08;云端 IDE&#xff09;是基于浏览器的集成式开发环境&#xff0c;为开发者提供了一个稳定的云端工作站。支持CPU与GPU的访问。用户在使用 Cloud Studio 时无需安装&#xff0c;随时随地打开浏览器即可使用。Clo…

基于VMware的ubuntu与vscode建立ssh连接

1.首先安装openssh服务 sudo apt update sudo apt install openssh-server -y 2.启动并检查ssh服务状态 到这里可以按q退出 之后输入命令 &#xff1a; ip a 红色挡住的部分就是我们要的地址&#xff0c;这里就不展示了哈 3.配置vscode 打开vscode 搜索并安装&#xff1a;…

四川正熠法律咨询有限公司正规吗可信吗?

在纷繁复杂的法律环境中&#xff0c;寻找一家值得信赖的法律服务机构是每一个企业和个人不可或缺的需求。四川正熠法律咨询有限公司&#xff0c;作为西南地区备受瞩目的法律服务提供者&#xff0c;以其专注、专业和高效的法律服务&#xff0c;成为众多客户心中的首选。 正熠法…

【优先算法】专题——位运算

在讲解位运算之前我们来总结一下常见的位运算 一、常见的位运算 1.基础为运算 << &&#xff1a;有0就是0 >> |&#xff1a;有1就是1 ~ ^&#xff1a;相同为0&#xff0c;相异位1 /无进位相加 2.给一个数 n&#xff0c;确定它的二进制表示…

Android --- handler详解

handler 理解 handler 是一套Android 消息传递机制&#xff0c;主要用于线程间通信。 tips&#xff1a; binder/socket 用于进程间通信。 参考&#xff1a; Android 进程间通信-CSDN博客 handler 就是主线程在起了一个子线程&#xff0c;子线程运行并生成message &#xff0c;l…

【线程】基于阻塞队列的生产者消费者模型

文章目录 1 生产者消费者模型2 阻塞队列2.1 成员变量2.2 消费者操作2.3 生产者生产 3 总结 1 生产者消费者模型 在多线程环境中&#xff0c;生产者消费者模型是一种经典的线程同步模型&#xff0c;用于处理生产者线程与消费者线程之间的工作调度和资源共享问题。在这个模型中&a…

解决PyG安装中torch-sparse安装失败问题:详细指南

1 问题描述 最近在学习GNN&#xff0c;需要使用PyTorch Geometric&#xff08;PyG&#xff09;库。在安装PyG的过程中&#xff0c;遇到了torch-sparse安装失败的问题&#xff0c;错误提示为&#xff1a; ERROR: Failed building wheel for torch-sparse本文将详细记录问题的解…

4 [危机13小时追踪一场GitHub投毒事件]

事件概要 自北京时间 2024.12.4 晚间6点起&#xff0c; GitHub 上不断出现“幽灵仓库”&#xff0c;仓库中没有任何代码&#xff0c;只有诱导性的病毒文件。当天&#xff0c;他们成为了 GitHub 上 star 增速最快的仓库。超过 180 个虚假僵尸账户正在传播病毒&#xff0c;等待不…

【B站保姆级视频教程:Jetson配置YOLOv11环境(六)PyTorchTorchvision安装】

Jetson配置YOLOv11环境&#xff08;6&#xff09;PyTorch&Torchvision安装 文章目录 1. 安装PyTorch1.1安装依赖项1.2 下载torch wheel 安装包1.3 安装 2. 安装torchvisiion2.1 安装依赖2.2 编译安装torchvision2.2.1 Torchvisiion版本选择2.2.2 下载torchvisiion到Downloa…