一、Java线程池
1.ThreadPoolExecutor基础使用
Java线程池ThreadPoolExecutor基础使用
2.Java自定义多队列线程池原理
(1) 基本原理
- 创建一组Thread然后执行,不断从阻塞队列BlockingQueue中取出任务进行处理
(2) 简单示例
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
public class CustomThreadPool {
private final WorkerThread[] threads;
public CustomThreadPool(int poolSize) {
threads = new WorkerThread[poolSize];
for (int i = 0; i < poolSize; i++) {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
threads[i] = new WorkerThread(workQueue);
threads[i].start();
}
}
public void execute(Runnable task) {
// 分配任务到某个线程的队列中
// 这里简单地采用轮询方式分配任务
int index = (int) (Math.random() * threads.length);
threads[index].getQueue().add(task);
}
private static class WorkerThread extends Thread {
private final BlockingQueue<Runnable> queue;
public WorkerThread(BlockingQueue<Runnable> queue) {
this.queue = queue;
}
public BlockingQueue<Runnable> getQueue() {
return queue;
}
@Override
public void run() {
try {
while (true) {
Runnable task = queue.take(); // 阻塞直到获取到任务
task.run();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
(3)多队列线程池
- 创建任务提交接口 MultiQueueExecutor,通过key值offer到指定队列
package com.zzc.design.juc.concurrent;
public interface MultiQueueExecutor {
void execute(Object key, Runnable command);
}
- 拓展任务提交接口MultiQueueExecutorService,实现线程池相关的状态管理
package com.zzc.design.juc.concurrent;
import java.util.List;
import java.util.concurrent.TimeUnit;
public interface MultiQueueExecutorService extends MultiQueueExecutor {
/**
* 执行线程池关闭操作
*/
void shutdown();
/**
* 立马执行线程池关闭操作
* @return
*/
List<Runnable> shutdownNow();
/**
* 线程池是否关闭
* @return
*/
boolean isShutdown();
/**
* 线程池已经完全终止,所有资源都被释放
*/
boolean isTerminated();
/**
* 等一定时间之后,进行终止
* @param timeout 时长
* @param unit 时间单位类型
* @return
* @throws InterruptedException
*/
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
}
- 抽象实现多队列线程池接口,后续实现通用功能可在此处添加
package com.zzc.design.juc.concurrent;
public abstract class AbstractMultiQueueExecutor implements MultiQueueExecutorService {
public AbstractMultiQueueExecutor() {}
}
- 定义多队列线程池拒绝策略接口MultiQueueRejectedExecutionHandler
package com.zzc.design.juc.concurrent;
public interface MultiQueueRejectedExecutionHandler {
void rejectedExecution(Object key, Runnable r, MultiQueueThreadPoolExecutor executor);
}
- 具体实现多队列线程池(初稿,基本测试暂时没有问题)
package com.zzc.design.juc.concurrent;
import java.lang.reflect.Array;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class MultiQueueThreadPoolExecutor extends AbstractMultiQueueExecutor {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//初始化 ctl 的值,表示线程池处于运行状态并且当前没有活动的工作线程。
private static final int COUNT_BITS = Integer.SIZE - 3;//表示用于存储线程计数(workerCount)的位数。由于 Java 的 int 类型有 32 位(Integer.SIZE),这里减去 3 位留给状态标志,因此留下 29 位用于线程计数。如果线程数量不满足,则可以将类型改为long类型以及AtomicInteger改了AtomicLong
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;//00011111111111111111111111111111 这是一个掩码,用于从 ctl 中提取出线程计数值。它由 COUNT_BITS 位全为 1 组成,即低 29 位为 1。
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;//当线程池处于运行状态时,高 3 位设置为 -1 的二进制补码形式(即 111...000)。
private static final int SHUTDOWN = 0 << COUNT_BITS;//线程池正在关闭,不再接受新任务,但会继续处理队列中的任务,高 3 位为 000。
private static final int STOP = 1 << COUNT_BITS;//线程池已停止,不再接受新任务,也不再处理队列中的任务,高 3 位为 001。
private static final int TIDYING = 2 << COUNT_BITS;//所有任务都已完成,线程池即将进入终结状态,高 3 位为 010。
private static final int TERMINATED = 3 << COUNT_BITS;//线程池已经完全终止,所有资源都被释放,高 3 位为 011。
// Packing and unpacking ctl
/**
* COUNT_MASK=00011111111111111111111111111111 取反之后 11100000000000000000000000000000 既为RUNNING初始状态
* c & ~COUNT_MASK 与运算得到线程池当前的状态;可以这样理解:当前ctl的值取二进制高三位进行比对,得到的结果就是当前ctl代表的线程池状态
*
*/
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
/**
* COUNT_MASK=00011111111111111111111111111111
* c & COUNT_MASK 去除高三位状态,相当于重置高三位的二进制为0,后29位二进制就是当前线程数量
*/
private static int workerCountOf(int c) { return c & COUNT_MASK; }
/**
* 或运算运算,例如 00011111111111111111111111111111 | 11100000000000000000000000000000 = 11111111111111111111111111111111
* 这样理解,高三位表示状态|低29位表示线程数量,这个时候只要用一个int类型存储,那么将高三位的二进制和低29位的二进制合并一起为一个数,就是ctl的值
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*/
/**
* 比较:检查当前线程池的 runState 是否小于给定的状态 s。
* 返回:如果当前 runState 小于给定的状态,则返回 true;否则返回 false
* @param c
* @param s
* @return
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* Attempts to CAS-increment the workerCount field of ctl.
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* Attempts to CAS-decrement the workerCount field of ctl.
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* Decrements the workerCount field of ctl. This is called only on
* abrupt termination of a thread (see processWorkerExit). Other
* decrements are performed within getTask.
*/
private void decrementWorkerCount() {
while (!compareAndDecrementWorkerCount(ctl.get()));
}
private final BlockingQueue<Runnable>[] workQueues;
private volatile ThreadFactory threadFactory;
private volatile MultiQueueRejectedExecutionHandler handler;
private final Worker[] workers;
private final ReentrantLock mainLock = new ReentrantLock();
/**
* Wait condition to support awaitTermination.
*/
private final Condition termination = mainLock.newCondition();
private volatile boolean allowCoreThreadTimeOut;
private final int corePoolSize;
private static final MultiQueueRejectedExecutionHandler defaultRejectedHandler = new AbortPolicy();
private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
public MultiQueueThreadPoolExecutor(BlockingQueue<Runnable>[] workQueues) {
this(workQueues, new DefaultThreadFactory(), defaultRejectedHandler);
}
@SuppressWarnings("unchecked")
public MultiQueueThreadPoolExecutor(int corePoolSize, int capacity, Class<?> queueClass, ThreadFactory threadFactory, MultiQueueRejectedExecutionHandler handler) {
this.corePoolSize = corePoolSize;
this.workers = new Worker[corePoolSize];
this.threadFactory = threadFactory;
this.handler = handler;
this.workQueues = (BlockingQueue<Runnable>[]) Array.newInstance(queueClass, corePoolSize);
try {
for (int i = 0; i < corePoolSize; i++) {
workQueues[i] = (BlockingQueue<Runnable>) createQueueWithCapacity(capacity, queueClass);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public MultiQueueThreadPoolExecutor(int corePoolSize, int capacity, Class<?> queueClass) {
this(corePoolSize, capacity, queueClass, new DefaultThreadFactory(), defaultRejectedHandler);
}
public MultiQueueThreadPoolExecutor(BlockingQueue<Runnable>[] workQueues, ThreadFactory threadFactory, MultiQueueRejectedExecutionHandler handler) {
this.workQueues = workQueues;
this.corePoolSize = workQueues.length;
this.threadFactory = threadFactory;
this.handler = handler;
this.workers = new Worker[corePoolSize];
}
@Override
public void execute(Object key, Runnable command) {
// 分配任务到某个线程的队列中
// 这里简单地采用轮询方式分配任务
if (key == null || command == null) {
throw new NullPointerException();
}
int index = index(key);
if (!containsWorker(index)) {//如果不存在线程,则进行添加
if (addWorker(index, command)) {
return;
}
}
int c = ctl.get();
if (isRunning(c) && containsWorker(index) && workQueues[index(key)].offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && workQueues[index].remove(command)) {
reject(key, command);
} else if (workerCountOf(recheck) == 0) {
reject(key, command);//队列数量和线程数量固定的,无法创建新的线程去处理
}
} else {
reject(key, command);//异常情况的,只能拒绝
}
}
private boolean addWorker(int index, Runnable firstTask) {
retry:
for (int c = ctl.get();;) {
int rs = runStateOf(c);
if (runStateAtLeast(rs, SHUTDOWN) &&
(runStateAtLeast(rs, STOP) || firstTask != null || workQueues[index].isEmpty())) {//直接判断当前是否存在线程,因为在线程取出队列任务运行失败之后,会进行线程清理,该处就已经为空了
return false;
}
if (containsWorker(index)) {
return false;
}
for (;;) {
int wc = workerCountOf(c);
if (wc >= (corePoolSize & COUNT_MASK)) {
return false;
}
if (compareAndIncrementWorkerCount(c)) {
break retry;
}
c = ctl.get();
if (runStateAtLeast(c, SHUTDOWN)) {
continue retry;
}
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker worker = null;
try {
worker = new Worker(index, firstTask);
final Thread thread = worker.thread;
if (thread != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (isRunning(rs)
|| (rs == SHUTDOWN && firstTask == null)) {
if (thread.getState() != Thread.State.NEW) {
throw new IllegalThreadStateException("index=" + index);
}
if (sizeOfWorkers() > corePoolSize) {
//TODO
throw new RuntimeException("The number of workgroups exceeds the number of core threads. size:" + sizeOfWorkers());
}
putWorker(worker);
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
thread.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted) {
addWorkerFailed(worker);
}
}
return workerStarted;
}
private void addWorkerFailed(Worker worker) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (worker != null) {
removeWorker(worker);
}
decrementWorkerCount();
tryTerminate(worker);
} finally {
mainLock.unlock();
}
}
final void runWorker(Worker worker) {
Thread workerThread = Thread.currentThread();
Runnable task = worker.firstTask;
worker.firstTask = null;
worker.unlock();
//如果 completedAbruptly 为 true,则减少 workerCount,因为它意味着线程在未预期的情况下退出。
//如果 completedAbruptly 为 false,则假设 workerCount 已经被调整过了。
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask(worker.index)) != null) {
worker.lock();
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !workerThread.isInterrupted()) {
workerThread.interrupt();
}
try {
task.run();
} catch (Throwable ex) {
throw ex;
} finally {
task = null;
worker.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(worker, completedAbruptly);
}
}
private void processWorkerExit(Worker worker, boolean completedAbruptly) {
if (completedAbruptly) {//异常
//线程未在预期的情况下退出,可能发生异常
decrementWorkerCount();//TODO 优化成直接创建新的线程进行替换?
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (completedAbruptly) {//异常情况下
removeWorker(worker);
}
} finally {
mainLock.unlock();
}
tryTerminate(worker);
int c = ctl.get();
if (runStateLessThan(c, STOP)) {//是否早与stop
if (!completedAbruptly) {//非异常情况下
if (containsWorker(worker.index)) {//存在worker
return;
}
if (workerCountOf(c) >= corePoolSize) {//TODO
return;
}
}
addWorker(worker.index, null);
}
}
private Runnable getTask(int index) {
boolean timedOut = false;//TODO 后续扩容需要用到,当超时的时候,可以开始处理缩容的事情
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//
if (runStateAtLeast(rs, SHUTDOWN)
&& (runStateAtLeast(rs, STOP) || workQueues[index].isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//boolean timed = allowCoreThreadTimeOut || workerCount > corePoolSize;//TODO 不确定是否根据总线程数量还是单个队列对应的一个线程作为判断
if (wc > corePoolSize && workQueues[index].isEmpty()) {//TODO 缩容的时候使用
if (compareAndDecrementWorkerCount(c)) {
return null;
}
continue;
}
try {
return workQueues[index].take();
} catch (InterruptedException retry) {
}
}
}
final void tryTerminate() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker worker : workers) {
tryTerminate(worker);
}
} finally {
mainLock.unlock();
}
}
final void tryTerminate(Worker worker) {
if (worker == null) {
//TODO ?
return;
}
for (;;) {
int c = ctl.get();
if (isRunning(c)
|| runStateAtLeast(c, TIDYING)
|| (runStateOf(c) == SHUTDOWN && !workQueues[worker.index].isEmpty())) {
return;
}
if (workerCountOf(c) != 0) {
interruptIdleWorkers(worker);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
private void interruptIdleWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker worker : workers) {
interruptIdleWorkers(worker);
}
} finally {
mainLock.unlock();
}
}
private void interruptIdleWorkers(Worker worker) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (worker != null) {
Thread thread = worker.thread;
if (!thread.isInterrupted() && worker.tryLock()) {
try {
thread.interrupt();
} catch (SecurityException ignore) {
} finally {
worker.unlock();
}
}
}
} finally {
mainLock.unlock();
}
}
final void reject(Object key, Runnable command) {
handler.rejectedExecution(key, command, this);
}
public void setThreadFactory(ThreadFactory threadFactory) {
if (threadFactory == null)
throw new NullPointerException();
this.threadFactory = threadFactory;
}
/**
* Returns the thread factory used to create new threads.
*
* @return the current thread factory
* @see #setThreadFactory(ThreadFactory)
*/
public ThreadFactory getThreadFactory() {
return threadFactory;
}
@Override
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
} finally {
mainLock.unlock();
}
tryTerminate();
}
/*private void checkShutdownAccess() {
@SuppressWarnings("removal")
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
for (Worker w : workers)
security.checkAccess(w.thread);
}
}*/
/**
* 将线程池的状态从当前值转换为一个新的目标状态
* @param targetState
*/
private void advanceRunState(int targetState) {
while (true) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) {
break;
}
}
}
@Override
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
return tasks;
}
@Override
public boolean isShutdown() {
return !isRunning(ctl.get());
}
@Override
public boolean isTerminated() {
int c = ctl.get();
return !isRunning(c) && runStateLessThan(c, TERMINATED);
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
while (true) {
if (runStateAtLeast(ctl.get(), TERMINATED)) {
return true;
}
if (nanos <= 0) {
return false;
}
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
private List<Runnable> drainQueue() {
List<Runnable> taskList = new ArrayList<Runnable>();
for (BlockingQueue<Runnable> workQueue : workQueues) {
workQueue.drainTo(taskList);
if (!workQueue.isEmpty()) {
for (Runnable r : workQueue.toArray(new Runnable[0])) {
if (workQueue.remove(r))
taskList.add(r);
}
}
}
return taskList;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker worker : workers)
worker.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
public BlockingQueue<Runnable> getQueue(Object key) {
return workQueues[index(key)];
}
private void putWorker(Worker worker) {
workers[worker.index] = worker;
}
private void removeWorker(Worker worker) {
workers[worker.index] = null;
}
private int sizeOfWorkers() {
return workers.length;
}
/**
* 是否存在线程
* @param index
* @return true=存在;false=不存在
*/
final boolean containsWorker(int index) {
return workers[index] != null;
}
final int index(Object key) {
return (corePoolSize - 1) & hash(key);
}
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
protected void terminated() { }
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
@SuppressWarnings("serial") // Unlikely to be serializable
final Thread thread;
/** Initial task to run. Possibly null. */
@SuppressWarnings("serial") // Not statically typed as Serializable
Runnable firstTask;
final int index;
public Worker(int index, Runnable firstTask) {
setState(-1);// inhibit interrupts until runWorker
this.index = index;
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
@Override
public void run() {
runWorker(this);
}
/**
*
* @return
*/
protected boolean isHeldExclusively() {
return getState() != 0;//当前线程独占持有同步状态
}
/**
*
* 设置:如果相等,则将 state 更新为新的值 (update)
* @param unused the acquire argument. This value is always the one
* passed to an acquire method, or is the value saved on entry
* to a condition wait. The value is otherwise uninterpreted
* and can represent anything you like.
* @return
*/
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {//比较:检查当前的同步状态 (state) 是否等于预期值 (expect)。- 如果相等,则将 state 更新为新的值 (update)
setExclusiveOwnerThread(Thread.currentThread());//将指定的线程设置为当前独占持有同步状态的线程。
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);//如果传入的参数为 null,则表示清除当前的独占持有者,释放锁时调用。
setState(0);
return true;
}
public void lock() {
acquire(1);
}
public boolean tryLock() {
return tryAcquire(1);
}
public void unlock() {
release(1);
}
public boolean isLocked() {
return isHeldExclusively();
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {}
}
}
}
public static class CallerRunsPolicy implements MultiQueueRejectedExecutionHandler {
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
@Override
public void rejectedExecution(Object key, Runnable r, MultiQueueThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
public static class AbortPolicy implements MultiQueueRejectedExecutionHandler {
public AbortPolicy() {}
@Override
public void rejectedExecution(Object key, Runnable r, MultiQueueThreadPoolExecutor executor) {
throw new RejectedExecutionException("Task " + "key " + key + r.toString() +
" rejected from " +
executor.toString());
}
}
public static class DiscardPolicy implements MultiQueueRejectedExecutionHandler {
public DiscardPolicy() { }
@Override
public void rejectedExecution(Object key, Runnable r, MultiQueueThreadPoolExecutor executor) {
}
}
public static class DiscardOldestPolicy implements MultiQueueRejectedExecutionHandler {
public DiscardOldestPolicy() { }
@Override
public void rejectedExecution(Object key, Runnable r, MultiQueueThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
executor.getQueue(key).poll();
executor.execute(key, r);
}
}
}
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
group = Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
/**
* 根据传入的 Class<BlockingQueue> 类型创建对应的 BlockingQueue 实例。
*
* @param queueClass 要创建的 BlockingQueue 实现类
* @return 创建的 BlockingQueue 实例
* @throws Exception 如果无法创建实例,则抛出异常
*/
private static BlockingQueue<?> createQueue(int capacity, Class<? extends BlockingQueue<Runnable>> queueClass) throws Exception {
try {
// 尝试使用无参构造函数创建实例
return queueClass.getDeclaredConstructor().newInstance();
} catch (NoSuchMethodException e) {
// 如果没有无参构造函数,则尝试带容量参数的构造函数
return createQueueWithCapacity(capacity, queueClass);
}
}
private static BlockingQueue<?> createQueueWithCapacity(int capacity, Class<?> queueClass) throws Exception {
try {
// 尝试使用带有 int 参数的构造函数创建实例,默认容量为16
Constructor<?> constructor = queueClass.getConstructor(int.class);
return (BlockingQueue<?>) constructor.newInstance(capacity); // 可以根据需求调整默认容量
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Unsupported BlockingQueue implementation: " + queueClass.getName());
}
}
}
- TODO 当前线程池不支持自动扩容,后续优化为支持扩容的形式
思路:
当该线程池的队列数量达到75%的容量的时候(参考hashMap设计),标记为需要扩容、进行计数;
被标记为要扩容的队列,在提取任务的时候,被扩容到新队列的任务不会被执行,将会被添加到按照二的次幂指定到新的队列,并启动新的队列的线程,而新的队列新增的任务,放到临时队列中,当达到被标记需要扩容时的数值时,进行临时队列合并;
缩容的方式,当队列数量少于25%的时候,进行缩容(可设置是否进行缩容,避免性能耗费)
设计原因:
通过标记的方式,避免队列过大进行迁移的过程中阻塞过久;