Java自定义多队列线程池

一、Java线程池

1.ThreadPoolExecutor基础使用

Java线程池ThreadPoolExecutor基础使用

2.Java自定义多队列线程池原理

(1) 基本原理

  • 创建一组Thread然后执行,不断从阻塞队列BlockingQueue中取出任务进行处理

(2) 简单示例

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;

public class CustomThreadPool {

    private final WorkerThread[] threads;

    public CustomThreadPool(int poolSize) {
        threads = new WorkerThread[poolSize];
        for (int i = 0; i < poolSize; i++) {
            BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
            threads[i] = new WorkerThread(workQueue);
            threads[i].start();
        }
    }

    public void execute(Runnable task) {
        // 分配任务到某个线程的队列中
        // 这里简单地采用轮询方式分配任务
        int index = (int) (Math.random() * threads.length);
        threads[index].getQueue().add(task);
    }

    private static class WorkerThread extends Thread {
        private final BlockingQueue<Runnable> queue;

        public WorkerThread(BlockingQueue<Runnable> queue) {
            this.queue = queue;
        }

        public BlockingQueue<Runnable> getQueue() {
            return queue;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Runnable task = queue.take(); // 阻塞直到获取到任务
                    task.run();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

(3)多队列线程池

  • 创建任务提交接口 MultiQueueExecutor,通过key值offer到指定队列
package com.zzc.design.juc.concurrent;

public interface MultiQueueExecutor {

    void execute(Object key, Runnable command);

}
  • 拓展任务提交接口MultiQueueExecutorService,实现线程池相关的状态管理
package com.zzc.design.juc.concurrent;
import java.util.List;
import java.util.concurrent.TimeUnit;

public interface MultiQueueExecutorService extends MultiQueueExecutor {

    /**
     * 执行线程池关闭操作
     */
    void shutdown();

    /**
     * 立马执行线程池关闭操作
     * @return
     */
    List<Runnable> shutdownNow();

    /**
     * 线程池是否关闭
     * @return
     */
    boolean isShutdown();

    /**
     * 线程池已经完全终止,所有资源都被释放
     */
    boolean isTerminated();

    /**
     * 等一定时间之后,进行终止
     * @param timeout 时长
     * @param unit 时间单位类型
     * @return
     * @throws InterruptedException
     */
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

}

  • 抽象实现多队列线程池接口,后续实现通用功能可在此处添加
package com.zzc.design.juc.concurrent;

public abstract class AbstractMultiQueueExecutor implements MultiQueueExecutorService {

    public AbstractMultiQueueExecutor() {}
   
}
  • 定义多队列线程池拒绝策略接口MultiQueueRejectedExecutionHandler
package com.zzc.design.juc.concurrent;

public interface MultiQueueRejectedExecutionHandler {

    void rejectedExecution(Object key, Runnable r, MultiQueueThreadPoolExecutor executor);

}

  • 具体实现多队列线程池(初稿,基本测试暂时没有问题)
package com.zzc.design.juc.concurrent;
import java.lang.reflect.Array;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MultiQueueThreadPoolExecutor extends AbstractMultiQueueExecutor {

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//初始化 ctl 的值,表示线程池处于运行状态并且当前没有活动的工作线程。
    private static final int COUNT_BITS = Integer.SIZE - 3;//表示用于存储线程计数(workerCount)的位数。由于 Java 的 int 类型有 32 位(Integer.SIZE),这里减去 3 位留给状态标志,因此留下 29 位用于线程计数。如果线程数量不满足,则可以将类型改为long类型以及AtomicInteger改了AtomicLong
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;//00011111111111111111111111111111 这是一个掩码,用于从 ctl 中提取出线程计数值。它由 COUNT_BITS 位全为 1 组成,即低 29 位为 1。

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;//当线程池处于运行状态时,高 3 位设置为 -1 的二进制补码形式(即 111...000)。
    private static final int SHUTDOWN   =  0 << COUNT_BITS;//线程池正在关闭,不再接受新任务,但会继续处理队列中的任务,高 3 位为 000。
    private static final int STOP       =  1 << COUNT_BITS;//线程池已停止,不再接受新任务,也不再处理队列中的任务,高 3 位为 001。
    private static final int TIDYING    =  2 << COUNT_BITS;//所有任务都已完成,线程池即将进入终结状态,高 3 位为 010。
    private static final int TERMINATED =  3 << COUNT_BITS;//线程池已经完全终止,所有资源都被释放,高 3 位为 011。

    // Packing and unpacking ctl

    /**
     * COUNT_MASK=00011111111111111111111111111111 取反之后 11100000000000000000000000000000 既为RUNNING初始状态
     * c & ~COUNT_MASK 与运算得到线程池当前的状态;可以这样理解:当前ctl的值取二进制高三位进行比对,得到的结果就是当前ctl代表的线程池状态
     *
     */
    private static int runStateOf(int c)     { return c & ~COUNT_MASK; }

    /**
     * COUNT_MASK=00011111111111111111111111111111
     * c & COUNT_MASK 去除高三位状态,相当于重置高三位的二进制为0,后29位二进制就是当前线程数量
     */
    private static int workerCountOf(int c)  { return c & COUNT_MASK; }

    /**
     * 或运算运算,例如 00011111111111111111111111111111 | 11100000000000000000000000000000 = 11111111111111111111111111111111
     * 这样理解,高三位表示状态|低29位表示线程数量,这个时候只要用一个int类型存储,那么将高三位的二进制和低29位的二进制合并一起为一个数,就是ctl的值
     */
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    /*
     * Bit field accessors that don't require unpacking ctl.
     * These depend on the bit layout and on workerCount being never negative.
     */

    /**
     * 比较:检查当前线程池的 runState 是否小于给定的状态 s。
     * 返回:如果当前 runState 小于给定的状态,则返回 true;否则返回 false
     * @param c
     * @param s
     * @return
     */
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    /**
     * Attempts to CAS-increment the workerCount field of ctl.
     */
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    /**
     * Attempts to CAS-decrement the workerCount field of ctl.
     */
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    /**
     * Decrements the workerCount field of ctl. This is called only on
     * abrupt termination of a thread (see processWorkerExit). Other
     * decrements are performed within getTask.
     */
    private void decrementWorkerCount() {
        while (!compareAndDecrementWorkerCount(ctl.get()));
    }


    private final BlockingQueue<Runnable>[] workQueues;

    private volatile ThreadFactory threadFactory;

    private volatile MultiQueueRejectedExecutionHandler handler;

    private final Worker[] workers;

    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * Wait condition to support awaitTermination.
     */
    private final Condition termination = mainLock.newCondition();

    private volatile boolean allowCoreThreadTimeOut;

    private final int corePoolSize;

    private static final MultiQueueRejectedExecutionHandler defaultRejectedHandler = new AbortPolicy();

    private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");


    public MultiQueueThreadPoolExecutor(BlockingQueue<Runnable>[] workQueues) {
        this(workQueues, new DefaultThreadFactory(), defaultRejectedHandler);
    }

    @SuppressWarnings("unchecked")
    public MultiQueueThreadPoolExecutor(int corePoolSize, int capacity, Class<?> queueClass, ThreadFactory threadFactory, MultiQueueRejectedExecutionHandler handler) {
        this.corePoolSize = corePoolSize;
        this.workers = new Worker[corePoolSize];
        this.threadFactory = threadFactory;
        this.handler = handler;
        this.workQueues = (BlockingQueue<Runnable>[]) Array.newInstance(queueClass, corePoolSize);
        try {
            for (int i = 0; i < corePoolSize; i++) {
                workQueues[i] = (BlockingQueue<Runnable>) createQueueWithCapacity(capacity, queueClass);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public MultiQueueThreadPoolExecutor(int corePoolSize, int capacity, Class<?> queueClass) {
        this(corePoolSize, capacity, queueClass, new DefaultThreadFactory(), defaultRejectedHandler);
    }

    public MultiQueueThreadPoolExecutor(BlockingQueue<Runnable>[] workQueues, ThreadFactory threadFactory, MultiQueueRejectedExecutionHandler handler) {
        this.workQueues = workQueues;
        this.corePoolSize = workQueues.length;
        this.threadFactory = threadFactory;
        this.handler = handler;
        this.workers = new Worker[corePoolSize];
    }

    @Override
    public void execute(Object key, Runnable command) {
        // 分配任务到某个线程的队列中
        // 这里简单地采用轮询方式分配任务
        if (key == null || command == null) {
            throw new NullPointerException();
        }
        int index = index(key);
        if (!containsWorker(index)) {//如果不存在线程,则进行添加
            if (addWorker(index, command)) {
                return;
            }
        }

        int c = ctl.get();
        if (isRunning(c) && containsWorker(index) && workQueues[index(key)].offer(command)) {
            int recheck = ctl.get();
            if (!isRunning(recheck) && workQueues[index].remove(command)) {
                reject(key, command);
            } else if (workerCountOf(recheck) == 0) {
                reject(key, command);//队列数量和线程数量固定的,无法创建新的线程去处理
            }
        } else {
            reject(key, command);//异常情况的,只能拒绝
        }
    }

    private boolean addWorker(int index, Runnable firstTask) {
        retry:
        for (int c = ctl.get();;) {
            int rs = runStateOf(c);
            if (runStateAtLeast(rs, SHUTDOWN) &&
                    (runStateAtLeast(rs, STOP) || firstTask != null || workQueues[index].isEmpty())) {//直接判断当前是否存在线程,因为在线程取出队列任务运行失败之后,会进行线程清理,该处就已经为空了
                return false;
            }

            if (containsWorker(index)) {
                return false;
            }

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= (corePoolSize & COUNT_MASK)) {
                    return false;
                }
                if (compareAndIncrementWorkerCount(c)) {
                    break retry;
                }
                c = ctl.get();
                if (runStateAtLeast(c, SHUTDOWN)) {
                    continue retry;
                }
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker worker = null;
        try {
            worker = new Worker(index, firstTask);
            final Thread thread = worker.thread;
            if (thread != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    if (isRunning(rs)
                            || (rs == SHUTDOWN && firstTask == null)) {
                        if (thread.getState() != Thread.State.NEW) {
                            throw new IllegalThreadStateException("index=" + index);
                        }
                        if (sizeOfWorkers() > corePoolSize) {
                            //TODO
                            throw new RuntimeException("The number of workgroups exceeds the number of core threads. size:" + sizeOfWorkers());
                        }
                        putWorker(worker);
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    thread.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (!workerStarted) {
                addWorkerFailed(worker);
            }
        }
        return workerStarted;
    }

    private void addWorkerFailed(Worker worker) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (worker != null) {
                removeWorker(worker);
            }
            decrementWorkerCount();
            tryTerminate(worker);
        } finally {
            mainLock.unlock();
        }
    }

    final void runWorker(Worker worker) {
        Thread workerThread = Thread.currentThread();
        Runnable task = worker.firstTask;
        worker.firstTask = null;
        worker.unlock();
        //如果 completedAbruptly 为 true,则减少 workerCount,因为它意味着线程在未预期的情况下退出。
        //如果 completedAbruptly 为 false,则假设 workerCount 已经被调整过了。
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask(worker.index)) != null) {
                worker.lock();
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                        && !workerThread.isInterrupted()) {
                    workerThread.interrupt();
                }
                try {
                    task.run();
                } catch (Throwable ex) {
                    throw ex;
                } finally {
                    task = null;
                    worker.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(worker, completedAbruptly);
        }
    }

    private void processWorkerExit(Worker worker, boolean completedAbruptly) {
        if (completedAbruptly) {//异常
            //线程未在预期的情况下退出,可能发生异常
            decrementWorkerCount();//TODO 优化成直接创建新的线程进行替换?
        }
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (completedAbruptly) {//异常情况下
                removeWorker(worker);
            }
        } finally {
            mainLock.unlock();
        }

        tryTerminate(worker);

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {//是否早与stop
            if (!completedAbruptly) {//非异常情况下
                if (containsWorker(worker.index)) {//存在worker
                    return;
                }
                if (workerCountOf(c) >= corePoolSize) {//TODO
                    return;
                }
            }
            addWorker(worker.index, null);
        }
    }

    private Runnable getTask(int index) {
        boolean timedOut = false;//TODO 后续扩容需要用到,当超时的时候,可以开始处理缩容的事情
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            //
            if (runStateAtLeast(rs, SHUTDOWN)
                    && (runStateAtLeast(rs, STOP) || workQueues[index].isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            //boolean timed = allowCoreThreadTimeOut || workerCount > corePoolSize;//TODO 不确定是否根据总线程数量还是单个队列对应的一个线程作为判断

            if (wc > corePoolSize && workQueues[index].isEmpty()) {//TODO 缩容的时候使用
                if (compareAndDecrementWorkerCount(c)) {
                    return null;
                }
                continue;
            }

            try {
                return workQueues[index].take();
            } catch (InterruptedException retry) {

            }
        }
    }

    final void tryTerminate() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker worker : workers) {
                tryTerminate(worker);
            }
        } finally {
            mainLock.unlock();
        }
    }

    final void tryTerminate(Worker worker) {
        if (worker == null) {
            //TODO ?
            return;
        }
        for (;;) {
            int c = ctl.get();
            if (isRunning(c)
                    || runStateAtLeast(c, TIDYING)
                    || (runStateOf(c) == SHUTDOWN && !workQueues[worker.index].isEmpty())) {
                return;
            }
            if (workerCountOf(c) != 0) {
                interruptIdleWorkers(worker);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
        }
    }

    private void interruptIdleWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker worker : workers) {
                interruptIdleWorkers(worker);
            }
        } finally {
            mainLock.unlock();
        }
    }

    private void interruptIdleWorkers(Worker worker) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (worker != null) {
                Thread thread = worker.thread;
                if (!thread.isInterrupted() && worker.tryLock()) {
                    try {
                        thread.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        worker.unlock();
                    }
                }
            }
        } finally {
            mainLock.unlock();
        }
    }

    final void reject(Object key, Runnable command) {
        handler.rejectedExecution(key, command, this);
    }

    public void setThreadFactory(ThreadFactory threadFactory) {
        if (threadFactory == null)
            throw new NullPointerException();
        this.threadFactory = threadFactory;
    }

    /**
     * Returns the thread factory used to create new threads.
     *
     * @return the current thread factory
     * @see #setThreadFactory(ThreadFactory)
     */
    public ThreadFactory getThreadFactory() {
        return threadFactory;
    }


    @Override
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

    /*private void checkShutdownAccess() {
        @SuppressWarnings("removal")
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkPermission(shutdownPerm);
            for (Worker w : workers)
                security.checkAccess(w.thread);
        }
    }*/

    /**
     * 将线程池的状态从当前值转换为一个新的目标状态
     * @param targetState
     */
    private void advanceRunState(int targetState) {
        while (true) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) {
                break;
            }
        }
    }

    @Override
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        return tasks;
    }

    @Override
    public boolean isShutdown() {
        return !isRunning(ctl.get());
    }

    @Override
    public boolean isTerminated() {
        int c = ctl.get();
        return !isRunning(c) && runStateLessThan(c, TERMINATED);
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            while (true) {
                if (runStateAtLeast(ctl.get(), TERMINATED)) {
                    return true;
                }
                if (nanos <= 0) {
                    return false;
                }
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

    private List<Runnable> drainQueue() {
        List<Runnable> taskList = new ArrayList<Runnable>();
        for (BlockingQueue<Runnable> workQueue : workQueues) {
            workQueue.drainTo(taskList);
            if (!workQueue.isEmpty()) {
                for (Runnable r : workQueue.toArray(new Runnable[0])) {
                    if (workQueue.remove(r))
                        taskList.add(r);
                }
            }
        }
        return taskList;
    }

    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker worker : workers)
                worker.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

    public BlockingQueue<Runnable> getQueue(Object key) {
        return workQueues[index(key)];
    }

    private void putWorker(Worker worker) {
        workers[worker.index] = worker;
    }

    private void removeWorker(Worker worker) {
        workers[worker.index] = null;
    }

    private int sizeOfWorkers() {
        return workers.length;
    }

    /**
     * 是否存在线程
     * @param index
     * @return true=存在;false=不存在
     */
    final boolean containsWorker(int index) {
        return workers[index] != null;
    }

    final int index(Object key) {
        return (corePoolSize - 1) & hash(key);
    }

    static final int hash(Object key) {
        int h;
        return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
    }

    protected void terminated() { }


    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

        @SuppressWarnings("serial") // Unlikely to be serializable
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        @SuppressWarnings("serial") // Not statically typed as Serializable
                Runnable firstTask;

        final int index;

        public Worker(int index, Runnable firstTask) {
            setState(-1);// inhibit interrupts until runWorker
            this.index = index;
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        @Override
        public void run() {
            runWorker(this);
        }

        /**
         *
         * @return
         */
        protected boolean isHeldExclusively() {
            return getState() != 0;//当前线程独占持有同步状态
        }

        /**
         *
         * 设置:如果相等,则将 state 更新为新的值 (update)
         * @param unused the acquire argument. This value is always the one
         *        passed to an acquire method, or is the value saved on entry
         *        to a condition wait.  The value is otherwise uninterpreted
         *        and can represent anything you like.
         * @return
         */
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {//比较:检查当前的同步状态 (state) 是否等于预期值 (expect)。- 如果相等,则将 state 更新为新的值 (update)
                setExclusiveOwnerThread(Thread.currentThread());//将指定的线程设置为当前独占持有同步状态的线程。
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);//如果传入的参数为 null,则表示清除当前的独占持有者,释放锁时调用。
            setState(0);
            return true;
        }

        public void lock() {
            acquire(1);
        }

        public boolean tryLock() {
            return tryAcquire(1);
        }

        public void unlock() {
            release(1);
        }

        public boolean isLocked() {
            return isHeldExclusively();
        }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {}
            }
        }

    }

    public static class CallerRunsPolicy implements MultiQueueRejectedExecutionHandler {
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        @Override
        public void rejectedExecution(Object key, Runnable r, MultiQueueThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

    public static class AbortPolicy implements MultiQueueRejectedExecutionHandler {
        public AbortPolicy() {}


        @Override
        public void rejectedExecution(Object key, Runnable r, MultiQueueThreadPoolExecutor executor) {
            throw new RejectedExecutionException("Task "  + "key " + key + r.toString() +
                    " rejected from " +
                    executor.toString());
        }
    }

    public static class DiscardPolicy implements MultiQueueRejectedExecutionHandler {

        public DiscardPolicy() { }

        @Override
        public void rejectedExecution(Object key, Runnable r, MultiQueueThreadPoolExecutor executor) {

        }
    }

    public static class DiscardOldestPolicy implements MultiQueueRejectedExecutionHandler {

        public DiscardOldestPolicy() { }

        @Override
        public void rejectedExecution(Object key, Runnable r, MultiQueueThreadPoolExecutor executor) {
            if (!executor.isShutdown()) {
                executor.getQueue(key).poll();
                executor.execute(key, r);
            }
        }
    }

    private static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            group = Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                    poolNumber.getAndIncrement() +
                    "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                    namePrefix + threadNumber.getAndIncrement(),
                    0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

    /**
     * 根据传入的 Class<BlockingQueue> 类型创建对应的 BlockingQueue 实例。
     *
     * @param queueClass 要创建的 BlockingQueue 实现类
     * @return 创建的 BlockingQueue 实例
     * @throws Exception 如果无法创建实例,则抛出异常
     */
    private static  BlockingQueue<?> createQueue(int capacity, Class<? extends BlockingQueue<Runnable>> queueClass) throws Exception {
        try {
            // 尝试使用无参构造函数创建实例
            return queueClass.getDeclaredConstructor().newInstance();
        } catch (NoSuchMethodException e) {
            // 如果没有无参构造函数,则尝试带容量参数的构造函数
            return createQueueWithCapacity(capacity, queueClass);
        }
    }

    private static BlockingQueue<?> createQueueWithCapacity(int capacity, Class<?> queueClass) throws Exception {
        try {
            // 尝试使用带有 int 参数的构造函数创建实例,默认容量为16
            Constructor<?> constructor = queueClass.getConstructor(int.class);
            return (BlockingQueue<?>) constructor.newInstance(capacity); // 可以根据需求调整默认容量
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException("Unsupported BlockingQueue implementation: " + queueClass.getName());
        }
    }
}
  • TODO 当前线程池不支持自动扩容,后续优化为支持扩容的形式
    思路:
    当该线程池的队列数量达到75%的容量的时候(参考hashMap设计),标记为需要扩容、进行计数;
    被标记为要扩容的队列,在提取任务的时候,被扩容到新队列的任务不会被执行,将会被添加到按照二的次幂指定到新的队列,并启动新的队列的线程,而新的队列新增的任务,放到临时队列中,当达到被标记需要扩容时的数值时,进行临时队列合并;
    缩容的方式,当队列数量少于25%的时候,进行缩容(可设置是否进行缩容,避免性能耗费)
    设计原因:
    通过标记的方式,避免队列过大进行迁移的过程中阻塞过久;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/957534.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

BOBO小火炬全套源码XE修复版2025(火炬天花板二次开发版)

《小火炬全套源码 传奇游戏源码讲解》 小火炬全套源码是一种用于开发经典传奇类游戏的源码包。传奇游戏作为一款经典的多人在线角色扮演游戏&#xff08;MMORPG&#xff09;&#xff0c;有着庞大的用户基础和强大的游戏生态。小火炬全套源码主要提供了从基础架构到核心功能的完…

Flutter:搜索页,搜索bar封装

view 使用内置的Chip简化布局 import package:chenyanzhenxuan/common/index.dart; import package:ducafe_ui_core/ducafe_ui_core.dart; import package:flutter/material.dart; import package:get/get.dart; import package:tdesign_flutter/tdesign_flutter.dart;import i…

网络通信---MCU移植LWIP

使用的MCU型号为STM32F429IGT6&#xff0c;PHY为LAN7820A 目标是通过MCU的ETH给LWIP提供输入输出从而实现基本的Ping应答 OK废话不多说我们直接开始 下载源码 LWIP包源码&#xff1a;lwip源码 -在这里下载 ST官方支持的ETH包&#xff1a;ST-ETH支持包 这里下载 创建工程 …

麒麟操作系统服务架构保姆级教程(十三)tomcat环境安装以及LNMT架构

如果你想拥有你从未拥有过的东西&#xff0c;那么你必须去做你从未做过的事情 之前咱们学习了LNMP架构&#xff0c;但是PHP对于技术来说确实是老掉牙了&#xff0c;PHP的市场占有量越来越少了&#xff0c;我认识一个10年的PHP开发工程师&#xff0c;十年工资从15k到今天的6k&am…

elementUI Table组件实现表头吸顶效果

需求描述 当 table 内容过多的时候&#xff0c;页面上滑滚动&#xff0c;表头的信息也会随着被遮挡&#xff0c;无法将表头信息和表格内容对应起来&#xff0c;需要进行表头吸顶 开始编码&#x1f4aa; 环境&#xff1a;vue2.6、element UI step1&#xff1a; 给el-table__h…

AI 新动态:技术突破与应用拓展

目录 一.大语言模型的持续进化 二.AI 在医疗领域的深度应用 疾病诊断 药物研发 三.AI 与自动驾驶的新进展 四.AI 助力环境保护 应对气候变化 能源管理 后记 在当下科技迅猛发展的时代&#xff0c;人工智能&#xff08;AI&#xff09;无疑是最具影响力的领域之一。AI 技…

题解 CodeForces 131D Subway BFS C++

题目传送门 Problem - 131D - Codeforceshttps://codeforces.com/problemset/problem/131/Dhttps://codeforces.com/problemset/problem/131/D 翻译 地铁方案&#xff0c;对于Berland城市来说是一种经典的表示&#xff0c;由一组n站点和连接这些站点的n通道组成&#xff0c;…

如何查看某用户的Git提交数

说明&#xff1a;有些公司自己搭建的Git仓库&#xff0c;可以在仓库项目上查看各用户的提交量及占比。也可通过下面这两个Git命令&#xff0c;查看当前仓库&#xff0c;当前分支的总提交数&#xff0c;及某用户的提交数&#xff1b; # 当前分支的总提交数 git log --oneline |…

SQL sever数据导入导出实验

1.创建数据库TCP-H &#xff08;1&#xff09;右键“数据库”&#xff0c;点击“新建数据库”即可 &#xff08;2&#xff09;用sql语言创建&#xff0c;此处以创建数据库DB_test为例&#xff0c;代码如下&#xff1a; use master;go--检查在当前服务器系统中的所有数据里面…

Codeforces Round 903 (Div. 3) E. Block Sequence

题解&#xff1a; 想到从后向前DP f[i] 表示从 i ~ n 转化为“美观”所需要的最少的步骤 第一种转移方式&#xff1a;直接删除掉第i个元素&#xff0c;那么就是上一步 f[i 1] 加上 1;第二种转移方式&#xff1a;从第 i a[i] 1 个元素直接转移&#xff0c;不需要增加步数&a…

linux-FTP服务配置与应用

也许你对FTP不陌生&#xff0c;但是你是否了解FTP到底是个什么玩意&#xff1f; FTP 是File Transfer Protocol&#xff08;文件传输协议&#xff09;的英文简称&#xff0c;而中文简称为 “文传协议” 用于Internet上的控制文件的双向传输。同时&#xff0c;它也是一个应用程序…

Alluxio 联手 Solidigm 推出针对 AI 工作负载的高级缓存解决方案

作者&#xff1a;Wayne Gao, Yi Wang, Jie Chen, Sarika Mehta Alluxio 作为全球领先的 AI 缓存解决方案供应商&#xff0c; 提供针对 GPU 驱动 AI 负载的高速缓存。其可扩展架构支持数万个节点&#xff0c;能显著降低存储带宽的消耗。Alluxio 在解决 AI 存储挑战方面的前沿技…

深度学习篇---AnacondaLabelImg

文章目录 前言第一部分&#xff1a;Anaconda是什么&#xff1f;1.简介2.特点&#xff08;1&#xff09;包管理器Conda&#xff08;2&#xff09;环境管理&#xff08;3&#xff09;预装包&#xff08;4&#xff09;跨平台&#xff08;5&#xff09;社区支持 3.安装WindowsLinux…

基于Redis实现短信验证码登录

目录 1 基于Session实现短信验证码登录 2 配置登录拦截器 3 配置完拦截器还需将自定义拦截器添加到SpringMVC的拦截器列表中 才能生效 4 Session集群共享问题 5 基于Redis实现短信验证码登录 6 Hash 结构与 String 结构类型的比较 7 Redis替代Session需要考虑的问题 8 …

Open3D计算点云粗糙度(方法一)【2025最新版】

目录 一、Roughness二、代码实现三、结果展示博客长期更新,本文最近更新时间为:2025年1月18日。 一、Roughness 通过菜单栏的Tools > Other > Roughness找到该功能。 这个工具可以估计点云的“粗糙度”。 选择一个或几个点云,然后启动这个工具。 CloudCompare只会询问…

(二叉树)

我们今天就开始引进一个新的数据结构了&#xff1a;我们所熟知的&#xff1a;二叉树&#xff1b; 但是我们在引进二叉树之前我们先了解一下树&#xff1b; 树 树的概念和结构&#xff1a; 树是⼀种⾮线性的数据结构&#xff0c;它是由 n &#xff08; n>0 &#xff09; …

洛谷P8837

[传智杯 #3 决赛] 商店 - 洛谷 代码区&#xff1a; #include<stdio.h> #include<stdlib.h> int cmp(const void*a,const void *b){return *(int*)b-*(int*)a; } int main(){int n,m;scanf("%d%d",&n,&m);int w[n];int c[m];for(int i0;i<n;…

C语言练习(17)

两个乒乓球队进行比赛&#xff0c;各出3人。甲队为A、B、C 3人&#xff0c;乙队为X、Y、Z 3人&#xff0c;并抽签决定比赛名单。有人向队员打听比赛的名单&#xff0c;A说他不和X比&#xff0c;C说他不和X、Z比&#xff0c;请编程序找出3对选手的对阵名单。 #include <stdi…

excel实用工具

持续更新… 文章目录 1. 快捷键1.1 求和 2. 命令2.1 查找 vloopup 1. 快捷键 1.1 求和 windows: alt mac : command shift T 2. 命令 2.1 查找 vloopup vlookup 四个入参数 要查找的内容 &#xff08;A2 6xx1&#xff09;查找的备选集 &#xff08;C2:C19&#xff09;…

【高阶数据结构】布隆过滤器(BloomFilter)

1. 概念 1.1 背景引入 背景&#xff1a;在计算机软件中&#xff0c;一个常见的需求就是 在一个集合中查找一个元素是否存在 &#xff0c;比如&#xff1a;1. Word 等打字软件需要判断用户键入的单词是否在字典中存在 2. 浏览器等网络爬虫程序需要保存一个列表来记录已经遍历过…