JAVA并发编程(二)_线程池

JAVA线程池

1.1Java 线程池之 Executor 框架

为了实现线程池和管理线程池,JDK 给我们提供了基于 Executor 接口的一系列接口、抽象类、实现类,我们把它称作线程池的 Executor 框架,Executor 框架本质上是一个线程池;

在这里插入图片描述

​ Java 线程(java.lang.Thread)被一对一映射为本地操作系统内核线程,Java 线程启动时会创建一个本地操作系统线程,操作系统会调度所有线程并将它们分配给可用的 CPU 执行,当该 Java 线程终止时,这个操作系统线程也会被回收;
实际上这是两层线程调度模型:
​ (1)上层 Java 线程的调度由 Executor 框架调度;
​ (2)下层操作系统的线程调度由操作系统调度;
​ Java 的线程是这么设计的,包含两部分:
​ 1、工作任务;(Runnable 和 Callable)
​ 2、执行机制;(Thread、Executor 框架)

1.2Executor 框架 的接口与类结构

  • java.util.concurrent (并发编程的工具) juc
  • java.util.concurrent.atomic (变量的线程安全的原子性操作)
  • java.util.concurrent.locks (用于锁定和条件等待同步等)
  • Executor [ɪɡˈzekjʊtə] 执行人、执行者

1.3线程池的七大参数

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5,
10,
15,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(5),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);

构造方法最多是 7 个参数;

1)int corePoolSize,

​ 指定线程池中的核心线程数量(最少的线程个数),线程池中会维护一个最小的线程数量,即使这些线程处理空闲状态,它们也不会被销毁,除非设置了 allowCoreThreadTimeOut;默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程;在实际中如果需要线程池创建之后立即创建线程,可以通过以下两种方式:
​ prestartCoreThread():boolean prestartCoreThread(),初始化一个核心线程;
​ prestartAllCoreThreads():int prestartAllCoreThreads(),初始化所有核心线程;

2) BlockingQueue workQueue,

任务队列,当核心线程全部繁忙时,由 execute/submit 方法提交的 Runnable 任务存放到该任务队列中,等待被核心线程来执行;

3)int maximumPoolSize

指定线程池中允许的最大线程数,当核心线程全部繁忙且任务队列存满之后,线程池会临时追加线程,直到总线程数达到 maximumPoolSize 这个上限;

4)long keepAliveTime,

线程空闲超时时间,如果一个线程处于空闲状态,并且当前的线程数量大于 corePoolSize,那么在指定时间后,这个空闲线程会被销毁;

5) TimeUnit unit

​ keepAliveTime 的时间单位 (天、小时、分、秒…)

6) ThreadFactory threadFactory,

线程工厂,用于创建线程,一般采用默认的即可,也可以自定义实现;
Executors.defaultThreadFactory(),
Executors.privilegedThreadFactory(),

7) RejectedExecutionHandler handler,

拒绝策略(饱和策略),当任务太多来不及处理时,如何“拒绝”任务?
任务拒绝是线程池的保护措施,当核心线程 corePoolSize 正在执行任务、线程池的任务队列
workQueue 已满、并且线程池中的线程数达到 maximumPoolSize 时,就需要“拒绝”掉新提交
过来的任务;

示例:

package com.lisus.threadpool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test01 {
    public static void main(String[] args) {
        Thread t=new Thread(){
            @Override
            public void run() {
                System.out.println("Runnable任务1");
            }
        };
        t.start();

        Thread t2=new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("Runnable任务2");
            }
        });
        t2.start();
        //基于Executor框架实现线程池
        ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(
                5,
                10,
                15,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(5),
                new ThreadPoolExecutor.CallerRunsPolicy());
        threadPoolExecutor.execute(()->{
            System.out.println("工作任务2");
        });
        //线程池关闭
        threadPoolExecutor.shutdown();
    }
}

package com.lisus.threadpool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test02 {
    //基于Executor框架实现线程池 (此时线程池中一个线程也没有)
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(
          5,
          10,
          15,
          TimeUnit.SECONDS,
          new ArrayBlockingQueue<Runnable>(5),
                Executors.defaultThreadFactory(),
          new ThreadPoolExecutor.DiscardPolicy()
        );
        //当提交了一个工作任务,此时线程池中就有一个线程
        threadPoolExecutor.execute(()->{
            System.out.println(Thread.currentThread().getName());
        });
        //关闭线程池
        //threadPoolExecutor.shutdown();

        //当核心线程处于空闲状态时候,允许销毁这些空闲的核心线程,默认是不允许销毁核心线程的
        //threadPoolExecutor.allowCoreThreadTimeOut(true);

        //如果想创建线程池后,立刻就创建好线程,那么执行:
        threadPoolExecutor.prestartCoreThread();//初始化/创建一个核心线程
        threadPoolExecutor.prestartAllCoreThreads();//初始化/创建所有的核心线程

        for (int i=0;i<50;i++){
            threadPoolExecutor.execute(()->{
                System.out.println(Thread.currentThread().getName());
            });
        }
        threadPoolExecutor.shutdown();
    }
}

package com.lisus.threadpool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test03 {
    public static void main(String[] args) {
        //基于Executor框架实现线程池
        ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(
                5,
                12,
                5,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(5),
                new MyThreadFactory(),
                new ThreadPoolExecutor.DiscardPolicy()
        );

        threadPoolExecutor.execute(()->{
            System.out.println(Thread.currentThread().getName());
        });
    }
    /**
     * 自己实现线程工厂
     */
    static class MyThreadFactory implements ThreadFactory {

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "my-thread");
        }
    }
}
package com.lisus.threadpool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * ThreadPoolExecutor线程池
 *
 * @author Cat老师,关注我,抖音搜索:java512
 */
public class Test04 {

    public static void main(String[] args) {
        //基于Executor框架实现线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                1,
                1,
                15,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2),
                Executors.defaultThreadFactory(),
                //Executors.privilegedThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );

        //同时提交4个任务
        threadPoolExecutor.execute(new MyRunnable(1));
        threadPoolExecutor.execute(new MyRunnable(2));
        threadPoolExecutor.execute(new MyRunnable(3));
        threadPoolExecutor.execute(new MyRunnable(4));

        threadPoolExecutor.shutdown();
    }

    static class MyRunnable implements Runnable {
        private int i;

        public MyRunnable(int i) {
            this.i = i;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + ": " + this.i);
        }
    }
}

运行结果:
main: 4
pool-1-thread-1: 1
pool-1-thread-1: 2
pool-1-thread-1: 3

Process finished with exit code 0

1.4线程池的拒绝策略

JDK 提供了 4 种内置的拒绝策略:AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy 和DiscardPolicy;
1、AbortPolicy(默认):丢弃任务并抛出 RejectedExecutionException 异常,这是线程池默认、的拒绝策略,在任务不能再提交的时候抛出异常,让开发人员及时知道程序运行状态,这样能在系统不能承载更大的并发量时,及时通过异常信息发现;

package com.lisus.threadpool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test05 {
    public static void main(String[] args) {
        //基于Executor框架实现线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                8,
                16,
                15,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(),
                //Executors.privilegedThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
                //new MyRejectedExecutionHandler()
        );

        for (int i = 0; i < 40000; i++) {
            threadPoolExecutor.execute(new MyRunnable(i));
        }

        threadPoolExecutor.shutdown();
    }

    static class MyRunnable implements Runnable {
        private int i;

        public MyRunnable(int i) {
            this.i = i;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + ": " + this.i);
        }
    }
}
运行结果
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.lisus.threadpool.Test05$MyRunnable@61bbe9ba rejected from java.util.concurrent.ThreadPoolExecutor@610455d6[Running, pool size = 16, active threads = 16, queued tasks = 10, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at com.lisus.threadpool.Test05.main(Test05.java:24)
pool-1-thread-1: 0
pool-1-thread-1: 8
pool-1-thread-1: 9
pool-1-thread-1: 10
pool-1-thread-1: 11
pool-1-thread-1: 12
pool-1-thread-1: 13
pool-1-thread-1: 14
pool-1-thread-1: 15
pool-1-thread-1: 16
pool-1-thread-1: 17
pool-1-thread-2: 1
pool-1-thread-3: 2
pool-1-thread-4: 3
pool-1-thread-5: 4
pool-1-thread-6: 5
pool-1-thread-7: 6
pool-1-thread-8: 7
pool-1-thread-9: 18
pool-1-thread-10: 19
pool-1-thread-11: 20
pool-1-thread-12: 21
pool-1-thread-13: 22
pool-1-thread-14: 23
pool-1-thread-15: 24
pool-1-thread-16: 25    

2、DiscardPolicy:直接丢弃任务,不抛出异常,使用此策略可能会使我们无法发现系统的异、常状态,建议一些无关紧要的业务采用此策略;

package com.lisus.threadpool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test05 {
    public static void main(String[] args) {
        //基于Executor框架实现线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                8,
                16,
                15,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(),
                //Executors.privilegedThreadFactory(),
                new ThreadPoolExecutor.DiscardPolicy()
                //new MyRejectedExecutionHandler()
        );

        for (int i = 0; i < 40000; i++) {
            threadPoolExecutor.execute(new MyRunnable(i));
        }

        threadPoolExecutor.shutdown();
    }

    static class MyRunnable implements Runnable {
        private int i;

        public MyRunnable(int i) {
            this.i = i;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + ": " + this.i);
        }
    }
}
运行结果
pool-1-thread-1: 0
pool-1-thread-1: 8
pool-1-thread-1: 9
pool-1-thread-1: 10
pool-1-thread-1: 11
pool-1-thread-1: 12
pool-1-thread-1: 13
pool-1-thread-1: 14
pool-1-thread-1: 15
pool-1-thread-1: 16
pool-1-thread-1: 17
pool-1-thread-2: 1
pool-1-thread-3: 2
pool-1-thread-4: 3
pool-1-thread-5: 4
pool-1-thread-6: 5
pool-1-thread-7: 6
pool-1-thread-8: 7
pool-1-thread-9: 18
pool-1-thread-10: 19
pool-1-thread-11: 20
pool-1-thread-12: 21
pool-1-thread-13: 22
pool-1-thread-14: 23
pool-1-thread-15: 24
pool-1-thread-16: 25

Process finished with exit code 0

3、DiscardOldestPolicy:丢弃任务队列中靠最前的任务,并执行当前任务,是否要采用此拒绝策略,根据实际业务是否允许丢弃老任务来评估和衡量;

package com.lisus.threadpool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test05 {
    public static void main(String[] args) {
        //基于Executor框架实现线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                8,
                16,
                15,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(),
                //Executors.privilegedThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy()
                //new MyRejectedExecutionHandler()
        );

        for (int i = 0; i < 40000; i++) {
            threadPoolExecutor.execute(new MyRunnable(i));
        }

        threadPoolExecutor.shutdown();
    }

    static class MyRunnable implements Runnable {
        private int i;

        public MyRunnable(int i) {
            this.i = i;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + ": " + this.i);
        }
    }
}

4、CallerRunsPolicy: 交由任务的调用线程(提交任务的线程)来执行当前任务;这种拒绝策略会让所有任务都能得到执行,适合大量计算类型的任务执行,使用这种策略的最终目标是要、让每个任务都能执行完毕,而使用多线程执行计算任务只是作为增大吞吐量的手段;
新来的任务可以用 main 线程去执行,不用线程池里面的线程执行;

package com.lisus.threadpool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test05 {
    public static void main(String[] args) {
        //基于Executor框架实现线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                8,
                16,
                15,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(),
                //Executors.privilegedThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy()
                //new MyRejectedExecutionHandler()
        );

        for (int i = 0; i < 40000; i++) {
            threadPoolExecutor.execute(new MyRunnable(i));
        }

        threadPoolExecutor.shutdown();
    }

    static class MyRunnable implements Runnable {
        private int i;

        public MyRunnable(int i) {
            this.i = i;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + ": " + this.i);
        }
    }
}

除了上面的四种拒绝策略,还可以通过实现 RejectedExecutionHandler 接口,实现自定义的拒绝策略;

package com.lisus.threadpool;

import java.util.concurrent.*;

public class Test05 {
    public static void main(String[] args) {
        //基于Executor框架实现线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                8,
                16,
                15,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(),
                //Executors.privilegedThreadFactory(),
                //new ThreadPoolExecutor.CallerRunsPolicy()
                new MyRejectedExecutionHandler()
        );

        for (int i = 0; i < 40000; i++) {
            threadPoolExecutor.execute(new MyRunnable(i));
        }

        threadPoolExecutor.shutdown();
    }

    static class MyRejectedExecutionHandler implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            //如果任务队列满了,就超时等待,可以设置一个时间
            try {
                executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class MyRunnable implements Runnable {
        private int i;

        public MyRunnable(int i) {
            this.i = i;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + ": " + this.i);
        }
    }
}

AbortPolicy 异常中止策略:异常中止,无特殊场景;
DiscardPolicy 丢弃策略:无关紧要的任务(文章点击量、商品浏览量等);
DiscardOldestPolicy 弃老策略:允许丢掉老数据的场景;
CallerRunsPolicy 调用者运行策略:不允许失败场景(对性能要求不高、并发量较小的场景);

1.5线程池的原理

在这里插入图片描述

1.6线程池底层源码实现

1.6.1线程池构造方法

在这里插入图片描述

1.6.2线程池源码-控制变量

在这里插入图片描述

COUNT_BITS = 29
CAPACITY = (1 << COUNT_BITS) - 1
int 类型的数是占用 4 字节,32 位,所以前面填了一堆 0;
原码:00000000 00000000 00000000 00000001
左移:00100000 00000000 00000000 00000000
减一:00011111 11111111 11111111 11111111 (536870911 = 5 亿多)

1.6.3线程池源码-线程池状态值

在这里插入图片描述

1.6.7线程池源码- 核心源码解读-execute
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //获取 clt 控制变量的值,clt 控制变量记录着 runState 和 workerCount 的值;
        int c = ctl.get();
        /**
         * workerCountOf方法获取控制变量ctl低29位值
         * 如果当前活动线程小于核心线程corePoolSize,,则新建一个线程放入线程池中,并把任务添加到该线程中运行
         */
        if (workerCountOf(c) < corePoolSize) {
            /**
             * addWorker 方法
             * 第一参数是要提交的工作任务
             * 第二个参数:
             * 如果是true,根据corePoolSize来判断表示添加核心线程;(保持稳定的线程数来处理任务)
             *  如果是 false,根据 maximumPoolSize 来判断,表示添加非核心线程;(应对突发的任务处理)
             */
            //addWorker()方法会检查运行状态和工作线程数,如果返回 false 则说明线程没有创建成功;
            if (addWorker(command, true))
                //添加成功则返回;
                return;
            //如果添加失败,则重新获取控制变量 ctl 的值;
            c = ctl.get();
        }
        //到这里了,说明 workerCountOf(c) >= corePoolSize,并且如果当前线程池是运行状态并且工作任务添加到任务队列成功
        if (isRunning(c) && workQueue.offer(command)) {
            // 重新获取 ctl 值
            int recheck = ctl.get();
            //再次判断线程池是否是运行状态,如果不是运行状态,由于之前已经把 command 添加到 workQueue 中了,此时需要移除该 command;
            if (! isRunning(recheck) && remove(command))
                reject(command);
            /**
             * 线程池是运行状态,获取一下线程池中的有效线程数,如果是 0,则执行 addWorker()方法;
             * addWorker()方法:
             * 第一个参数为 null,表示在线程池中创建一个线程,但不启动;
             * 第二个参数为 false,表示是非核心线程;
             *
             * 接下来这里没有写 else,表示如果判断 workerCount 大于 0,则不需要做什么处理,直接返回,
             * 加入到 workQueue 中的 command 会在将来的某个时刻被执行;
            */
            else if (workerCountOf(recheck) == 0)
                //此处是创建一个线程,但并没有传入任务,因为任务已经被添加到 workQueue 中了,到时候线程会从从 workQueue 中获取任务来执行;
                //所以当 workerCountOf(recheck) == 0 时执行 addWorker(null, false);
                //是为了保证线程池在 RUNNING 状态下必须要有一个线程来执行任务;
                addWorker(null, false);
        }
        /**
         * 如果执行到这里,有两种情况:
        * 1. 线程池已经不是 RUNNING 状态;
        * 2. 线程池是 RUNNING 状态,但往 workQueue 已经放不进去,即 workerCount >= corePoolSize,并且 workQueue 已满;
        * 此时再次调用 addWorker()方法,第二个参数为 false,表示非核心线程,如果失败则拒绝该任务;
         */
        else if (!addWorker(command, false))
            reject(command);
    }
1.6.8线程池源码-核心源码解读-addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            // 获取线程池控制变量的值
            int c = ctl.get();
            //线程运行状态
            int rs = runStateOf(c);

            //  if判断,如果rs>=SHUTDOWN,并且(判断3个条件,只要有1个不满足)返回false
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                //获取线程数
                int wc = workerCountOf(c);
                // 如果 wc 超过 CAPACITY,也就是 ctl 的低 29 位的最大值(二进制是 29 个 1),返回 false;
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 尝试增加 workerCount,如果成功,则跳出外层 for 循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 如果增加 workerCount 失败,则重新获取控制变量 ctl 的值
                c = ctl.get();  // Re-read ctl
                // 如果当前线程池的运行状态不等于 rs,说明线程池运行状态已被改变,返回外层 for 循环继续执行
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
		// Worker 线程是否启动
        boolean workerStarted = false;
    	// Worker 线程是否添加
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 根据 firstTask 来创建 Worker 对象
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 检查线程池运行状态
                    int rs = runStateOf(ctl.get());
					// rs < SHUTDOWN 表示是 RUNNING 状态;
 					// 如果 rs 是 RUNNING 状态或者 rs 是 SHUTDOWN 状态并且 firstTask 为 null,向线程池中添加线程。
					// 因为在 SHUTDOWN 时不会在添加新的任务,但还是会执行 workQueue 中的任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 检查线程已经是运行状态,抛出非法线程状态异常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // workers 是一个 HashSet
                        workers.add(w);
                        // largestPoolSize 记录着线程池中出现过的最大线程数量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            // 把历史上出现过的最大线程数的值更新一下
                            largestPoolSize = s;
                        // Worker 线程添加成功
                        workerAdded = true;
                    }
                } finally {
                    // 释放 ReentrantLock 锁
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    // Worker 线程已经启动
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                // Worker 线程没有启动成功
                addWorkerFailed(w);
        }
    	// 返回 Worker 线程是否启动成功
        return workerStarted;
    }

在这里插入图片描述

1.6.9线程池源码-核心源码解读-runWorker 方法
  final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
      //允许响应中断
        w.unlock(); // allow interrupts
     	// 线程退出的原因,true 是任务导致,false 是线程正常退出
        boolean completedAbruptly = true;
        try {
            // 当前任务为空,且当前任务队列为空,停止循环
            while (task != null || (task = getTask()) != null) {
                // 上锁处理并发问题,防止在 shutdown()时终止正在运行的 worker
                w.lock();
				// 如果线程池是 stop 状态,并且线程没有被中断,就要确保线程被中断,如果线程池不是,确保线程池没有被中断;
                // 清除当前线程的中断标志,做一个 recheck 来应对 shutdownNow 方法
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 执行前(空方法,由子类重写实现)
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行 Runnable 类的 run()方法
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 执行后(空方法,由子类重写实现)
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    // 完成的任务数+1
                    w.completedTasks++;
                    // 释放锁
                    w.unlock();
                }
            }
            // 到此,线程是正常退出
            completedAbruptly = false;
        } finally {
            // 处理 worker 的退出
            processWorkerExit(w, completedAbruptly);
        }
    }
1.6.10线程池源码-核心源码解读-getTask 方法
private Runnable getTask() {
    // 表示上一次从任务队列中取任务时是否超时
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            /**
             如果线程池为`SHUTDOWN`状态且任务队列为空(线程池 shutdown 状态可以处理任务队列中的任务,不再接受新任务)
            或者
            线程池状态>=STOP,则意味着线程池不必再获取任务了,
            将当前工作线程数量-1 并返回 null;
             */
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            /**
            timed 变量用于判断是否需要进行超时控制;
            allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时;
            wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
            表示对于超过核心线程数量的这些线程,需要进行超时控制(默认情况)
            */
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
			/**
            * 两个条件全部为 true,则通过 CAS 使工作线程数-1,即去除工作线程:
            * 条件 1:工作线程数大于 maximumPoolSize,或(工作线程需要超时控制且上次在任务队列拉取任务超时)
            * 条件 2:wc > 1 或任务队列为空
            * 如果减 1 失败,则返回重试;
            */
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                /**
                * 执行到这里,说明已经经过前面的校验,开始真正获取 task;
                * 根据 timed 来判断,如果工作线程有超时时间,则通过任务队列的 poll 方法进行超时等待方式获取任务 ,
                * 如果在 keepAliveTime 时间内没有获取到任务,则返回 null,否则通过 take 方法;
                * take 方法表示如果这时任务队列为空,则会阻塞直到任务队列不为空;
                * 一般 poll()用于普通线程、take()用于核心线程
                */
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                // 如果 r == null,说明已经超时得不到任务,timedOut 设置为 true
                timedOut = true;
            } catch (InterruptedException retry) {
                // 如果获取任务时当前线程发生了中断,则设置 timedOut 为 false 并返回循环重试
                timedOut = false;
            }
        }
    }
1.6.11线程池源码-核心源码解读- 线程池复用
1、threadPoolExecutor.execute(runnable)
2addWorker(command, boolean)
3Worker w = new Worker(firstTask); //已经创建了 Thread
4HashSet workers.add(w);
5、t.start(); //w.thread.start();
6、worker.run();
7runWorker(this)
8、task = w.firstTask 或者 task = getTask()
9、task.run();
1.6.12线程池源码-核心源码解读- 线程池大小变化
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        //completedAbruptly 为 true 表示线程异常执行结束
		//completedAbruptly 为 false 表示线程正常执行结束
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
		//从线程 set 集合中移除工作线程,该过程需要加锁,因为 HashSet 是线程不安全的集合
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //统计完成的任务数:将该 worker 已完成的任务数追加到线程池已完成的任务数
            completedTaskCount += w.completedTasks;
            //从 HashSet<Worker>中移除该 worker
            workers.remove(w);
        } finally {
            //释放锁
            mainLock.unlock();
        }
		//根据线程池状态进行判断是否结束线程池
        tryTerminate();

        int c = ctl.get();
        //当线程池是 RUNNING 或 SHUTDOWN 状态时
        if (runStateLessThan(c, STOP)) {
            //如果 worker 不是异常结束
            if (!completedAbruptly) {
                //如果 allowCoreThreadTimeOut=true,最小线程个数就可以变为 0;
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                //但是,如果等待队列有任务,至少保留一个 worker 来处理任务
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                //如果工作线程大于等于核心线程,直接 return 就行了,否则就需要添加一个线程;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //是异常执行结束的,添加一个线程去执行任务
            addWorker(null, false);
        }
    }
    try {
        //统计完成的任务数:将该 worker 已完成的任务数追加到线程池已完成的任务数
        completedTaskCount += w.completedTasks;
        //从 HashSet<Worker>中移除该 worker
        workers.remove(w);
    } finally {
        //释放锁
        mainLock.unlock();
    }
	//根据线程池状态进行判断是否结束线程池
    tryTerminate();

    int c = ctl.get();
    //当线程池是 RUNNING 或 SHUTDOWN 状态时
    if (runStateLessThan(c, STOP)) {
        //如果 worker 不是异常结束
        if (!completedAbruptly) {
            //如果 allowCoreThreadTimeOut=true,最小线程个数就可以变为 0;
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            //但是,如果等待队列有任务,至少保留一个 worker 来处理任务
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            //如果工作线程大于等于核心线程,直接 return 就行了,否则就需要添加一个线程;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        //是异常执行结束的,添加一个线程去执行任务
        addWorker(null, false);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/531432.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux LVM磁盘扩容

1、查看磁盘情况 df -h df -h2、查看逻辑卷 lvdisplay lvdisplay3、查看逻辑组 vgdisplay vgdisplay4、查看物理卷 pvdisplay pvdisplay5、查看磁盘 fdisk -l fdisk -l6、磁盘分区fdisk /dev/磁盘名 # 上一步查看到的新硬盘路径 fdisk /dev/vdb7、格式化磁盘mkfs -t ext4…

【负载均衡——一致性哈希算法】

1.一致性哈希是什么 一致性哈希算法就很好地解决了分布式系统在扩容或者缩容时&#xff0c;发生过多的数据迁移的问题。 一致哈希算法也用了取模运算&#xff0c;但与哈希算法不同的是&#xff0c;哈希算法是对节点的数量进行取模运算&#xff0c;而一致哈希算法是对 2^32 进…

基于SSM+Jsp+Mysql的超市管理系统

开发语言&#xff1a;Java框架&#xff1a;ssm技术&#xff1a;JSPJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包…

图片管理系统:原理、设计与实践

title: 图片管理系统&#xff1a;原理、设计与实践 date: 2024/4/9 20:04:25 updated: 2024/4/9 20:04:25 tags: 图片管理存储组织上传采集处理编辑搜索检索展示分享AI应用 第一章&#xff1a;图片管理系统概述 1.1 图片管理系统简介 图片管理系统是一种用于存储、组织、处理…

【Java网络编程】IP网络协议与TCP、UDP网络传输层协议

1.1、IP协议 当应用层的数据被封装后&#xff0c;想要将数据在网络上传输&#xff0c;数据究竟要被发往何处&#xff0c;又该如何精准的在网络上定位目标机器&#xff0c;此时起到关键作用的就是“IP协议”。IP协议的作用在于把各种数据包准确无误的传递给目标方&#xff0c;其…

力扣HOT100 - 56. 合并区间

解题思路&#xff1a; class Solution {public int[][] merge(int[][] intervals) {// 先按照区间起始位置排序Arrays.sort(intervals, (v1, v2) -> v1[0] - v2[0]);int[][] res new int[intervals.length][2];int idx -1;for (int[] interval : intervals) {//直接加入的…

前端实现打开新标签页后,再次定位到该标签页

需求 A 页面中点击按钮可以打开新的标签页 B 并且向 B 页面发送消息数据。 当新的标签页 B 未关闭且符合同源策略时&#xff0c;再次点击按钮&#xff0c;可以自动跳转到标签页 B 并且发生消息数据。 B.html <script>window.onmessage evt > {console.log(evt.d…

wps的1)2)3)编号,怎么更新

全部选中->格式刷->把它刷了必须全部选中

应急响应-挖矿脚本检测指南威胁情报样本定性文件清除入口修复

一、演示案例-挖矿样本-Win&Linux-危害&定性 危害&#xff1a;CPU拉满&#xff0c;网络阻塞&#xff0c;服务器卡顿等 定性&#xff1a;威胁情报平台上传解析分析&#xff0c;文件配置查看等windows样本 linux样本 二、演示案例-Linux-Web安全漏洞导致挖矿事件 某公司…

20240403-算法复习打卡day43||● 1049. 最后一块石头的重量 II ● 494. 目标和 ● 474.一和零

1049. 最后一块石头的重量 II class Solution { public:int lastStoneWeightII(vector<int>& stones) {vector<int> dp(15001, 0);int sum 0;for (int i 0; i < stones.size(); i) sum stones[i];int target sum / 2;for (int i 0; i < stones.siz…

C/C++与Python:各自的优势与前景展望

在讨论C/C和Python这两种编程语言的前景时&#xff0c;我们必须认识到每种语言都有其独特的定位和应用场景&#xff0c;并不存在绝对意义上的“谁更有前景”。它们分别在不同的领域发挥着重要作用&#xff0c;而且在未来的技术发展过程中&#xff0c;二者都将继续保持其不可替代…

600MA线性锂电池充电芯片 - YB4054DJ

描述: YB4054一款完整的单节锂离子电池充电器。其SOT23-5的封装与较少的外部元件数使得YB4054成为便携式应用的理想选择。采用了内部PMOSFET架构&#xff0c;加上防倒充电路&#xff0c;不需要外部检测电阻器和隔离二极管。热反馈可对充电电流进行自动调节&#xff0c;以便在大…

接口自动化测试要做什么?一文3个步骤带你成功学会!

先了解下接口测试流程&#xff1a; 1、需求分析 2、Api文档分析与评审 3、测试计划编写 4、用例设计与评审 5、环境搭建&#xff08;工具&#xff09; 6、执行用例 7、缺陷管理 8、测试报告 了解了接口测试的工作流程&#xff0c;那"接口自动化测试"怎么弄&#xff1…

深入浅出Redis(十一):Redis四种高级数据结构:Geosptial、Hypeloglog、Bitmap、Bloom Filter布隆过滤器

引言 Redis提供丰富的数据结构来解决各种场景下的问题&#xff0c;前段时间的一篇文章深入浅出Redis&#xff08;一&#xff09;&#xff1a;对象与数据结构已经深入浅出的说明Redis中的常用基础对象与数据结构 本篇文章将作为那篇文章的补充&#xff0c;深入浅出的解析另外四…

“春招市场”鸿蒙岗位增长163%!你想活出怎样的代码人生?

前段时间“智联招聘崩了”登上微博热搜。不少网友都感叹&#xff0c;可想而知今年的就业压力有多大&#xff0c;找工作真的太难了…… 金三银四&#xff0c;大家的求职热情暴涨到服务器都承受不住&#xff0c;都想在1179万毕业生中脱颖而出&#xff0c;开年春招就已经热辣滚烫……

B站广告推广操作教程及费用?

哔哩哔哩&#xff08;B站&#xff09;作为国内极具影响力的年轻人文化社区&#xff0c;已成为众多品牌与企业触达目标受众、提升品牌影响力的重要阵地。然而&#xff0c;面对B站复杂的广告系统与精细化运营需求&#xff0c;许多广告主可能对如何高效开展B站广告推广感到困惑。云…

03-JAVA设计模式-组合模式

组合模式 什么是组合模式 组合模式&#xff08;Composite Pattern&#xff09;允许你将对象组合成树形结构以表示“部分-整体”的层次结构&#xff0c;使得客户端以统一的方式处理单个对象和对象的组合。组合模式让你可以将对象组合成树形结构&#xff0c;并且能像单独对象一…

Python零基础从小白打怪升级中~~~~~~~文件和文件夹的操作 (1)

第七节&#xff1a;文件和文件夹的操作 一、IO流&#xff08;Stream&#xff09; 通过“流”的形式允许计算机程序使用相同的方式来访问不同的输入/输出源。stream是从起源&#xff08;source&#xff09;到接收的&#xff08;sink&#xff09;的有序数据。我们这里把输入/输…

Llama 3下月正式发布,继续开源!

4月10日&#xff0c;Techcrunch消息&#xff0c;Meta在本周伦敦举办的一场活动中确定&#xff0c;下个月将正式发布Llama 3并且继续开源。 Meta全球事务总裁Nick Clegg表示&#xff0c;我们希望在下个月&#xff0c;甚至更短的时间内&#xff0c;正式推出新一代基础模型Llama …

[C语言][数据结构][链表] 单链表的从零实现!

目录 零.必备知识 1.一级指针 && 二级指针 2. 节点的成员列表 a.数据 b.指向下一个节点的指针. 3. 动态内存空间的开辟 (malloc-calloc-realloc) 一.单链表的实现与销毁 1.1 节点的定义 1.2 单链表的尾插 1.3 单链表的头插 1.4 单链表的尾删 1.5 单链表的头删 1…