前言
在进行多线程编程时,我们离不开两个重要的任务接口:Runnable、Callable。一个线程想要运行,首先它得知道它的任务是什么(它要做什么),而这两个接口恰好是用于表示一个线程需要执行的任务。
Runnable和Callable两个接口都是任务接口,它们之间有何不同呢?Runnable中的run方法是没有返回值的,而Callable中的call方法有返回值V(泛型);同时call方法支持抛出异常,一般情况下我们都会使用Runnable接口,当需要线程的执行结果时就使用Callable接口。
public interface Runnable {
public abstract void run();
}
public interface Callable<V> {
V call() throws Exception;
}
那么我们如何获取一个线程的执行结果呢?此时就需要用到Future接口及其实现类FutureTask了。Future接口中有一个get()方法,用于同步获取线程执行结果,同步表示当线程还没有执行完任务时,调用get()方法获取执行结果的线程会阻塞,直至线程返回执行结果。
FutureTask源码解读
1、FutureTask不直接实现Future接口,而是实现了一个RunnableFuture<V>接口,RunnableFuture<V>接口又继承自Runnable、Future接口,属于这两个接口的子接口,所以FutureTask不仅可以用来同步获取线程执行结果,还可以作为任务提交给线程执行。
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
2、FutureTask中的字段:
//任务的执行状态(get方法的关键)
private volatile int state;
//任务刚被创建
private static final int NEW = 0;
//任务正在处理
private static final int COMPLETING = 1;
//任务正常完成
private static final int NORMAL = 2;
//任务执行过程中出现异常
private static final int EXCEPTIONAL = 3;
//任务被取消
private static final int CANCELLED = 4;
//任务执行过程中被打断
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
//任务
private Callable<V> callable;
//任务执行结果
private Object outcome;
//执行任务的线程
private volatile Thread runner;
//链表结构:保存等待线程
private volatile WaitNode waiters;
2.1、WaitNode具体实现:
static final class WaitNode {
//结点存储的线程
volatile Thread thread;
//结点的next指针
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
3、FutureTask的构造方法:
//需要传递一个Callable接口的实现类
//在FutureTask作为任务提交给线程时,执行的是实现类实现的call方法
public FutureTask(Callable<V> callable) {
//如果入参的Callable为null,抛出异常
if (callable == null)
throw new NullPointerException();
//不然赋值给成员变量
//并将FutureTask状态设置为NEW
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
/**
需要传递两个参数:Runnable、Result
前者是FutureTask作为任务提交给线程时,线程的执行逻辑
后者是线程在任务完成时,需要get方法返回的结果
result入参可以为null,表示不需要给定的结果
*/
public FutureTask(Runnable runnable, V result) {
//callable方法会将runnable、result封装成一个callable
this.callable = Executors.callable(runnable, result);
//将FutureTask状态设置为NEW
this.state = NEW; // ensure visibility of callable
}
//将Runnable封装成Callable返回
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
//RunnableAdapter实现了Callable,所以也可以作为任务交给线程执行。
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
4、获取执行结果的get()方法:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
//如果任务是刚被创建(NEW)、或者是正在处理(COMPLETING)状态
//说明任务还未被处理完毕,阻塞获取任务执行结果的线程
s = awaitDone(false, 0L);
//如果任务是其它状态,说明任务已经处理完毕
//report方法会根据任务状态返回结果给调用get方法的线程
return report(s);
}
4.1、report()方法:
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
//任务状态为NORMAL,正常返回结果
return (V)x;
if (s >= CANCELLED)
//任务状态为CANCELLED、INTERRUPTING、INTERRUPTED其中之一
//抛出CancellationException异常
throw new CancellationException();
//任务状态为EXCEPTIONAL,抛出ExecutionException异常
throw new ExecutionException((Throwable)x);
}
4.2、awaitDone()方法:
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
//deadline变量赋值分两种情况
//调用了get()方法:deadline = 0
//调用了get(long timeout,TimeUnit unit)方法:deadline = 当前时间 + timeout
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
//如果当前线程被打断了
//就将其从等待链表中移除,并抛出InterruptedException异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
//获取当前任务的状态
int s = state;
//任务已完成(正常完成、取消、打断都算完成)
if (s > COMPLETING) {
//如果有线程在等待,就将线程设置为null,防止内存溢出
if (q != null)
q.thread = null;
//返回任务状态
return s;
}
//任务正在处理
//当前等待执行结果的线程调用yield方法让os将CPU时间片分给其它线程
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
//任务刚被创建,处于NEW状态
//若等待节点q为null,则创建一个等待节点
else if (q == null)
q = new WaitNode();
//如果当前等待节点还未加入等待队列,则通过CAS操作将其加入等待队列
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
//如果设置了超时时间,则计算等待时间
//1.等待时间 >= 超时时间,那么将等待节点移除,并返回任务状态
//2.等待时间 < 超时时间,那么就阻塞:超时时间 - 等待时间
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
//如果没有设置超时时间,通过LockSupport无时间限制的阻塞当前线程
else
LockSupport.park(this);
}
}
4.3、get方法流程图:
get方法测试:
public class FutureTaskTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//通过匿名内部类的形式实现call方法
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
//模拟线程执行任务耗时
Thread.sleep(3000);
return "task is completed!";
}
};
FutureTask<String> task = new FutureTask<>(callable);
long begin = System.currentTimeMillis();
//将任务task分配给worker线程
Thread worker = new Thread(task);
worker.start();
//主线程获取worker线程的执行结果
String result = task.get();
long end = System.currentTimeMillis();
System.out.println("等待" + (end - begin) + "ms后获取到执行结果:" + result);
}
}
执行结果:
补充:
在分析FutureTask构造方法时,第二个构造方法很有意思。RunnableAdapter只负责组合Runnable,并实现Callable接口,call()方法的逻辑由作为成员变量的Runnable实现,这里有点适配器模式的味道。如下图所示: