【源码解析】聊聊线程池 实现原理与源码深度解析(二)

AbstractExecutorService

上一篇文章中,主要介绍了AbstractExecutorService的线程执行的核心流程,execute() 这个方法显然是没有返回执行任务的结果,如果我们需要获取任务执行的结果,怎么办?

Callable 就是一个可以获取线程执行的结果。

public abstract class AbstractExecutorService implements ExecutorService {

  	/*
     * 将任务包装成FutureTask任务。带返回值参数的
     */
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    /**
    ** 不带返回值的
    **/
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        //1.将任务包装成RunableFuture对象,由于RunnableFuture是实现Runable类,所以execute的参数是一个可拓展的类型
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        //2,交给具体的执行器进行实现
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        //将任务装成成一个FutureTask任务
        RunnableFuture<T> ftask = newTaskFor(task);
        //执行任务
        execute(ftask);
        return ftask;
    }
  }

submit其实是一个重载的方法,分别是一个task,以及可以传递获取结果的任务,以及使用callable。

demo

从源码上看三个方法其实都是将任务进行了封装,然后调用线程池执行的核心方法

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Callable<Integer> resultCallable = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return 1 + 1;
            }
        };

        ExecutorService threadPool = Executors.newFixedThreadPool(1);
        Future<Integer> resultTask = threadPool.submit(resultCallable);
        System.out.println(resultTask.get());
        threadPool.shutdown();
    }

FutureTask

public class FutureTask<V> implements RunnableFuture<V> {
     
    /* NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0; // 初始化状态
    private static final int COMPLETING   = 1; // 结果计算完成或响应中断到赋值给返回值的状态
    private static final int NORMAL       = 2; // 任务正常完成,结果被set
    private static final int EXCEPTIONAL  = 3; // 任务抛出异常
    private static final int CANCELLED    = 4; // 任务被取消
    private static final int INTERRUPTING = 5; // 线程中断状态被设置为true 线程未响应中断
    private static final int INTERRUPTED  = 6; // 线程已被中断

    /** The underlying callable; nulled out after running */
    private Callable<V> callable; // 需要执行的任务
    /** The result to return or exception to throw from get() */
    // 执行callable的线程,调用FutureTask.run()方法通过CAS设置
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    // 执行callable的线程,调用FutureTask.run()方法通过CAS设置
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;

  
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW; // 初始化状态是new      // ensure visibility of callable
    }
}
 /* 继承了Runnable ,因为线程池中执行的也是Runnbale的任务
 */
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

FutureTask 实现RunnableFuture,也间接实现了run方法。

重点

我们知道 execute(ftask); 本质就是利用线程池进行执行,而线程执行的时候,其实就是启动对应任务的run方法。

task.run();
		// 这里是什么时候调用的,其实是
    // execute(ftask)传入的任务 task.run()
    public void run() {
        //不是新建状态 直接中止
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //核心,执行任务的call方法,你看就是调用普通的方法一样。
                    result = c.call();
                    //同步调用获取结果值
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    //设置结果值
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            //响应中断
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
  • 判断当前任务状态,非NEW直接返回
  • 执行对应c.call() 其实就是执行callable中的call方法。
  • 将返回值set进去
    protected void set(V v) {
        //CAS 去设置当前任务执行状态 new-completing
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            //返回结果outcome
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

get

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        //如果是在执行中,则等待一会
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        //返回结果
        return report(s);
    }

    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        //设置了超时时间,则等待一定的时间,如果还没有获取到返回异常
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        //执行完成 返回x结果
        if (s == NORMAL)
            return (V)x;
        //如果任务取消,返回异常
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

awaitDone

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            //如果线程执行interrupted,直接抛出异常,并且将任务移除
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            //状态大于COMPLETING 说明完成了
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            //
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

小结

FutureTask是一个支持取消行为的异步任务执行器。该类实现了Future接口的方法。
如:

  1. 取消任务执行
  2. 查询任务是否执行完成
  3. 获取任务执行结果(”get“任务必须得执行完成才能获取结果,否则会阻塞直至任务完成)。

如果在当前线程中需要执行比较耗时的操作,但又不想阻塞当前线程时,可以把这些作业交给FutureTask,另开一个线程在后台完成,当当前线程将来需要时,就可以通过FutureTask对象获得后台作业的计算结果或者执行状态。

Future模式其实是多线程编程中常用的设计模式,主线程向另外一个线程提交任务,无需等待任务执行的结果,返回一个凭证,就是future,通过future.get()去获取结果。这个过程可能是阻塞的。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/217530.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

(C语言)通过循环按行顺序为一个矩阵赋予1,3,5,7,9,等奇数,然后输出矩阵左下角的值。

#include<stdio.h> int main() {int a[5][5];int n 1;for(int i 0;i < 5;i ){for(int j 0;j < 5;j ){a[i][j] n;n 2;}}for(int i 0;i < 5;i ){for(int j 0;j < i;j )printf("%-5d",a[i][j]);printf("\n");}return 0; } 运行截图…

电子学会C/C++编程等级考试2022年06月(四级)真题解析

C/C++等级考试(1~8级)全部真题・点这里 第1题:公共子序列 我们称序列Z = < z1, z2, ..., zk >是序列X = < x1, x2, ..., xm >的子序列当且仅当存在 严格上升 的序列< i1, i2, ..., ik >,使得对j = 1, 2, ... ,k, 有xij = zj。比如Z = < a, b, f, c &…

【信息安全】MD5哈希函数

1. MD5介绍 MD5&#xff08;Message Digest Algorithm 5&#xff09;是一种常见的哈希函数&#xff0c;通常用于产生数据的数字摘要&#xff0c;也称为哈希值或摘要值。它是由Ron Rivest在1991年设计的&#xff0c;广泛用于数据完整性验证、密码存储、数字签名等领域。 MD5哈…

游戏被流量攻击会有什么样的影响,该用什么样的防护方式去处理

德迅云安全-领先云安全服务与解决方案提供商德迅云游戏盾专门针对游戏进行防护&#xff0c;可免费提供防护方案~ 如果游戏被流量攻击会产生以下影响&#xff1a; 服务器过载&#xff1a;流量攻击会导致游戏服务器接收到的请求数量急剧增加&#xff0c;超出服务器的处理能力。这…

新媒体营销教学模拟实训平台解决方案

一、背景与目标 随着新媒体的快速发展&#xff0c;营销人才需求旺盛&#xff0c;而具备新媒体营销能力的人才供给却相对不足。为了解决这一矛盾&#xff0c;本方案旨在构建一个新媒体营销教学模拟实训平台&#xff0c;帮助学生掌握新媒体营销的实际操作技能&#xff0c;提高就…

xcode swiftui项目添加依赖

打开项目targets——Build Phases 点击“” 属于Apple SDKs的依赖可以直接添加 其他依赖需要在 Add Other中添加&#xff0c;在右上角用名字搜索或者URL地址(如GitHub上插件的地址)搜索,然后添加&#xff0c;也可添加本地文件

Linux Camera Driver(2):CIS设备注册(DTS)

一:MIPI接口 1、硬件接口 MIPI接口以rv1109和gc2053的硬件为例进行说明: 2、ISP驱动 注意配置事项: endpoint配置,必须指定data-lanes,否则无法识别为mipi类型 链接方式:sensor->csi_dphy->isp->ispp (1)sensor节点配置 根据原理图可知:mipicsi_clk0即引…

推荐一本Python数据分析的书:《Python数据科学应用从入门到精通》(张甜 杨维忠 著 2023年11月新书 清华大学出版社)

1.Python是堪与Office办公软件比肩的职场人士必备技能 Python作为一门简单、易学、易读、易维护、用途广泛、速度快、免费、开源的主流编程语言&#xff0c;广泛应用于Web开发、大数据处理、人工智能、云计算、爬虫、游戏开发、自动化运维开发等各个领域&#xff0c;是众多高等…

抖音视频水印怎么去除?这三个视频去水印技巧值得收藏!

抖音视频水印怎么去除&#xff1f;随着互联网的持续发展&#xff0c;越来越多的人选择使用视频分享平台来展示他们的生活与工作。然而&#xff0c;上传到这些平台上的许多视频常常遭到恶意水印的攻击&#xff0c;严重影响了观众的观看体验。今天&#xff0c;我们将分享三个视频…

网盘系统设计:万亿 GB 网盘如何实现秒传与限速?

Java全能学习面试指南&#xff1a;https://javaxiaobear.cn 网盘&#xff0c;又称云盘&#xff0c;是提供文件托管和文件上传、下载服务的网站&#xff08;File hostingservice&#xff09;。人们通过网盘保管自己拍摄的照片、视频&#xff0c;通过网盘和他人共享文件&#xff…

跨域问题与解决-gatway

3.6.1.什么是跨域问题 跨域&#xff1a;域名不一致就是跨域&#xff0c;主要包括&#xff1a; 域名不同&#xff1a; www.taobao.com 和 www.taobao.org 和 www.jd.com 和 miaosha.jd.com域名相同&#xff0c;端口不同&#xff1a;localhost:8080和localhost8081 跨域问题&a…

回溯法及例题(C++实现)

回溯法概念 概念&#xff1a;在包含问题所有解的解空间树中&#xff0c;按照深度优先搜索的策略&#xff0c;根据根结点&#xff08;开始节点&#xff09;出发搜索解空间树。 流程&#xff1a;首先根结点成为活节点&#xff0c;同时也成为当前的扩展结点。在当前的扩展结点处…

智能优化算法(二):禁忌搜索算法

文章目录 禁忌搜索算法1.禁忌搜索算法预备知识1.1 预备知识1---解空间1.2.预备知识2---邻域 2.禁忌搜索算法实现过程2.1.禁忌搜索算法思想2.2.禁忌搜索构成要素2.2.1.搜索结果表达2.2.2.邻域移动策略2.2.3.禁忌表引入2.2.4.禁忌搜索选择策略2.2.5.禁忌搜索渴望水平2.2.6.禁忌搜…

[Mac软件]HitPaw Video Converter 功能强大的视频格式转换编辑软件激活版

软件介绍&#xff1a; 以令人难以置信的速度将无损视频和音乐转换为1000多种格式&#xff1a;MP4、MOV、AVI、VOB、MKV等。不仅适用于普通编解码器&#xff0c;也适用于高级VP9、ProRes和Opus编码器。这解决了您不支持格式的所有问题&#xff0c;并允许您在任何平台和设备上播…

美颜SDK是什么?集成第三方美颜SDK的步骤

第三方美颜SDK提供了实时美颜效果。本文将深入探讨集成第三方美颜SDK的步骤&#xff0c;助您在应用中实现引人注目的美颜功能。 第一步&#xff1a;选择适合的第三方美颜SDK 在开始之前&#xff0c;务必仔细选择一个适合您应用需求的第三方美颜SDK。不同的SDK可能具有不同的特…

顺序查找、折半查找、分块查找

概念 查找表&#xff0c;分为静态查找表和动态查找表。 顺序查找 效率分析&#xff1a; 优化 折半查找 折半查找&#xff0c;又称“二分查找”仅适用于有序的顺序表。 ⭐&#xff0c;因为顺序表可以随机访问&#xff0c;链表不可以 效率分析 折半查找判定树的构造 如果&…

ubuntu安装tomcat并配置前端项目

1.1查找 # 先更新 sudo apt update # 查找 apt search jdk1.2安装 sudo apt install openjdk-8-jdk1.3验证 java -version 2.安装tomcat 下载链接&#xff1a;Apache Tomcat - Apache Tomcat 8 Software Downloadshttps://tomcat.apache.org/download-80.cgi下载这个&…

linux远程桌面管理工具(xrdp)、向日葵

Windows远程桌面 linux远程桌面 使用向日葵远程桌面&#xff08;手机端同理&#xff09; Windows远程桌面 微软自带Remote Desktop Connection Manager &#xff08;RDCMan&#xff09;远程控制管理软件介绍 远程桌面连接管理器 v2.93 linux远程桌面 Windows远程桌面Ubunt…

Unity中C#使用协程控制Shader材质变化

文章目录 前言一、协程是什么二、在Unity中使用协程1、我们在 Start 中测试一下协程的执行顺序2、我们实现一个点击按钮实现角色受击效果 三、协程中的动画过渡1、首先&#xff0c;在协程内实现中毒并且消散的效果2、在 OnGUI 内&#xff0c;给一个新按钮使用刚刚定义的协程 四…

洛谷P1044 [NOIP2003 普及组] 栈 递归方法

目录 核心&#xff1a; 问题转化&#xff1a; 状态转化&#xff1a;&#xff08;你得先读懂题&#xff0c;理解我们要干什么&#xff09; 对应不同情况下的状态转化&#xff1a;&#xff08;比如栈空就不能出栈&#xff0c;&#xff0c;&#xff09; AC代码&#xff1a; 题…