java并发编程概述
一. 进程和线程的概念
进程是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配的基本单位。进程是程序运行的实例,每当操作系统在运行一个程序时,会为其创建一个进程。每个进程都拥有自己的一整套变量。
线程是操作系统能够进行运算调度的最小单位。一个进程中可以创建多个线程。线程与它同属一个进程的其他线程,是可以共享进程所拥有的全部资源的。线程在一定的程度上可以看作是一个轻量级的进程。
CPU会给每个线程分配时间片,每个进程循环的获取CPU时间片来进行执行运算调度。由于时间片是非常短的(一般是几十毫秒),所以CPU切换线程执行速度非常快,就使得所有程序好象是在“同时”运行一样。
二.线程同步
1.volatile
volatile关键字是用来修饰会被不同线程访问和修改的变量。如果一个字段被声明成volatile的,Java中会确保所有线程看到这个变量的值是一致的。如果一个线程修改了一个volatile的声明的变量,其他的所有线程都会读到这个变量修改后的值,保证了变量的可见性。
Java支持多个线程同时访问一个对象或对象的成员变量。因为对象以及成员变量分配的内存是在共享内存中,但是每个线程在内存还拥有这些变量的一个拷贝。线程中对变量的修改是对拷贝变量进行的,所以线程访问的变量并不一定是最新的。
而关键字volatile修饰的变量,就会告知程序任何对该变量的访问都要从共享内存中获取,而且对它的任何改变必须同步刷新回共享内存中,这样就保证了所有线程对变量访问的可见性。
对volatile修饰的变量进行写操作时,与该写操作之前的任何读、写操作不会被重排序,同时对volatile修饰的变量进行读操作时,与该读操作之后的任何读、写操作不会被重排序。从而保障了有序性。
volatile可以看做是一个轻量级的synchronized,它比synchronized的使用和执行成本更低,因为它不会引起线程上下文的切换和调度。
2.Java中的锁
多个线程并发访问共享资源的时候,可能会造成线程安全的问题。Java中提供了锁(Lock)来保障线程安全的同步机制。
一个线程在访问共享资源前必须申请相应的锁。如果线程申请成功的话,持有锁的这个线程,才可以对共享资源进行访问。一个锁一次只能被一个线程持有,只有等持有锁的线程把锁释放后,其他线程才能申请锁,并在申请成功情况下,才能访问共享资源。实际锁的机制将多个线程对共享数据的并发访问转换为串行访问。
锁相当于一个令牌,只有拿到令牌的线程,才能访问共享资源。针对一定共享资源的令牌只有一个。只有拿到令牌的线程在对共享资源处理完成,交出令牌后,其他的线程才可以获取令牌,令牌获取成功,才能访问共享资源。
2.1 synchronized隐式锁
synchronized关键字是java中的一种隐式同步锁,它可以保证在同一时刻,被修饰的代码块或方法只会有一个线程执行。
synchronized用的锁是存储在Java对象头里的。synchronized是Java解决并发问题的一种常见和简单的方法。我们在使用synchronized隐式锁时,不需要关心加锁和释放锁的过程,我们只需要告诉虚拟机哪些代码块需要加锁即可,其他的细节会由虚拟机自己实现。
使用synchronized实现线程同步时,Java中的每一个对象都可以作为锁。它以下的三种形式:
- 修饰普通的同步方法,锁是当前实例对象。
- 修饰静态同步方法,锁是当前类的Class对象。
- 修饰同步方法块,锁是synchonized括号中配置的对象。
2.2 Lock显示锁
Lock在java中是一个接口,它定义了锁的获取和释放的基本操作。Lock接口的一系列实例就是一个个显式锁的对象。所谓的显式锁,就是用户拥有了锁获取和释放的可操作性,而不像synchronized关键字,锁的获取和释放都托管给了虚拟机,对用户来说是无感的。
Lock接口概览:
方法名称 | 方法说明 |
void lock() | 获取锁 |
void lockInterruptibly() | 可中断的获取锁 |
boolean tryLock() | 尝试非阻塞的获取锁 |
boolean tryLock(long time, TimeUnit unit) | 超时获取锁 |
void unlock() | 释放锁 |
Condition newCondition() | 返回绑定到Lock对象的Condition实例 |
2.2.1 队列同步器AbstractQueuedSynchronizer
队列同步器AbstractQueuedSynchronizer(简称AQS),是用来构建锁或自定义同步组件的基础框架。AQS中拥有一个CLH队列来来管理所有暂时没有获取到锁而等待的线程。CLH队列是一个双向的先进先出(FIFO)队列。AQS中定义了一个volatile的int类型的state变量来表示同步状态,并通过CAS原子性操作来改变这个同步状态。
我们可以通过继承AbstractQueuedSynchronizer这个基类来实现自定义的同步组件。我们只需要重写AQS中定义的获取或释放同步状态(tryAcquire,tryRelease)等一些方法,就可以轻松的实现一个自定义的同步组件。
AQS是基于模板方法模式设计的,用户自定义同步组件只需要重写了父类AQS的一些虚方法,当程序调用AQS的模板方法时,在这些模板方法中就会调用到自定义同步组件重写的那些方法。
AQS是实现锁的关键,一般锁的实现类中都会聚合一个队列同步器AQS。锁实现类对锁接口语义的实现,一般都是是委托给了AQS。例如锁的获取和释放,其实是对应调用了AQS的同步状态的获取和释放。我们常用的ReentrantLock可重入锁就是这样实现的。
在ReentrantLock类中定义了一个Sync的自定义的同步器,这个同步器就继承了AbstractQueuedSynchronizer这个父类。
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
ReentrantLock类实现了Lock接口。从源码中可以看出,ReentrantLock锁对Lock接口定义的方法实现,最终都是委托了给了自定义同步器的Sync类来完成的。(以lock()和unlock()方法为例)
public void lock() {
sync.lock();
}
public void unlock() {
sync.release(1);
}
2.2.2 ReentrantLock锁
ReentrantLock是Lock接口的一个默认实现类,ReentrantLock类除了提供锁的基本功能外,它还是一个可以重入的锁,并且它还提供公平锁和非公平锁的特性。
2.2.2.1基本应用
下面我们以一个示例,来展示ReentrantLock锁的基本用法。在我们平时的开发中,可能有生成业务流水号的需求。本例中的流水号是由日期加上一个自增数来组成的。为了保证生成的流水号的唯一不重复,我们需要使用锁来防止并发,避免多个线程产生重复的流水号。
示例中创建了一个SerialNumber类,类的generate方法专门用于生成流水号。在SerialNumber类中,声明了一个ReentrantLock的显示锁。使用锁的lock方法,来申请锁,锁的unlock方法来释放锁。一个线程再成功申请到锁后,就可以已独占的方式,来访问共享的资源,当前线程释放锁后。其他的线程才能通过申请锁来访问共享资源。从示例代码中可以看出,显示锁的使用方法比较简单。
需要注意的是,显示锁的使用过程中,我们需要在finally块中来释放锁,这样可以保证申请到的锁,最终都能够被释放,规避锁泄露的问题。而锁申请的方法lock,最好不要写在try块中,如果try块中的其他代码发生了异常,会导致程序在finally代码块中,对未加锁的对象进行unlock解锁,从而会抛出IllegalMonitorStateException异常,掩盖了真正的异常信息。如果确实需要把lock.lock()放在try块中,一般建议把获取锁的放在try块的第一行。
public class SerialNumber {
//创建了一个ReentrantLock锁的实例
private final Lock lock = new ReentrantLock();
//定义了一个自增的计数器变量
private int count;
private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
//生成业务流水号
public String generate() {
String no = "";
//申请锁
lock.lock();
try {
//在临界区对共享变量count,进行自增。
no = dateFormat.format(new Date())+"-"+count++;
}
finally {
//在finally中对锁进行释放,可避免导致锁泄露。
lock.unlock();
}
return no;
}
}
在示例代码SerialNumberDemo中,我们定义了一个SerialNumber对象,专门用于生成流水号。定义了六个工作线程,来模拟并发生成流水号。
public class SerialNumberDemo {
private static SerialNumber serialNumber = new SerialNumber();
private static class Worker extends Thread {
private String threadName;
public Worker(String threadName) {
super(threadName);
this.threadName = threadName;
}
public void run() {
System.out.println("threadName:"+threadName+";serialNumber:"+serialNumber.generate());
}
}
public static void main(String[] args) {
Worker worker1 = new Worker("worker1");
Worker worker2 = new Worker("worker2");
Worker worker3 = new Worker("worker3");
Worker worker4 = new Worker("worker4");
Worker worker5 = new Worker("worker5");
Worker worker6 = new Worker("worker6");
worker1.start();
worker2.start();
worker3.start();
worker4.start();
worker5.start();
worker6.start();
}
}
运行结果:
2.2.2.2锁的可重入
锁的可重入性是指一个线程在获取到锁之后,还可以再次申请到锁而不会被阻塞。一个线程申请到了互斥锁资源,在锁释放之前如果再去竞争同一把锁,就不需要再经历释放锁和获取锁的过程,直接获取锁成功。(底层实现是记录重入的次数)
重入锁是解决了在递归调用或嵌套代码中的死锁问题。如果一个线程获得了锁,但在持有锁的代码块中又调用了获取同一个锁的方法时,对于非可重入锁而言,线程就会因为无法再次获得同一个锁而陷入死锁的状态。而可重入锁则允许线程多次获得同一个锁,可以避免死锁的问题。
下面我们通过源码来分析一下,ReentrantLock类是如何实现锁的可重入性(以非公平为例)。
final boolean nonfairTryAcquire(int acquires) {
//获取当前的线程
final Thread current = Thread.currentThread();
//获取同步状态
int c = getState();
//同步状态为0,表示此时还没有线程竞争该锁
if (c == 0) {
//若通过CAS设置同步状态成功的话,则将当前线程设置为独占线程。
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
//返回true表示当前的线程获取锁成功
return true;
}
}
//若当前获取锁的线程是独占线程,则表示线程是再次获取锁
//锁的可重入性
else if (current == getExclusiveOwnerThread()) {
//对当前的同步状态,增加重入次数。
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//用增加了重入次数的同步状态更新同步状态。
setState(nextc);
//返回true表示锁再次获取成功。
return true;
}
return false;
}
从nonfairTryAcquire获取锁的方法可以看出,当线程第一次获取锁时,此时获取到的同步状态是0。程序会设置新的同步状态,并设置占有线程为当前线程,最后程序会返回true,来表示线程获取锁成功。
当程序再次调用nonfairTryAcquire获取锁的时候,此时的同步状态不为0,程序就会判断此时获取锁的线程是不是第一次成功获取锁的线程。如果不是,则直接返回false,表示获取锁不成功。如果是同一个线程,则在当前同步状态的基础上,增加重入的次数,并重新设置这个新的同步状态,最后返回true,表示获取同步状态成功。这样同一个获取锁的线程,是可以多次获取锁的。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
有了锁的获取方法,就会有锁的释放方法。ReentrantLock类的tryRelease方法就是一个重入锁的释放方法。该方法中会用线程当前的同步状态减去传入的释放次数,来得到一个新的同步状态值,只有当这个新的同步状态减少变为为0的时候,程序才会将占有线程设置为null,并返回true,表示锁释放成功。其他不为0的状态,会直接返回false。
其实也就是同一个线程获取了n次锁,那么在释放锁的时候,前(n-1)次调用tryRelease方法都会返回false,表示锁还没有完全释放。只有第n次调用tryRelease方法,此时的同步状态会变成为0,表示同步状态已经完全释放,会返回true,表示锁最终释放完成。
2.2.2.3 锁的公平与非公平性
锁的公平与非公平性,其实主要是线程获取锁的方式不同。如果是公平锁,那么线程获取锁的顺序就符合先进先出(FIFO)的原则,是按照线程请求锁时间的先后顺序依次来获取锁。如果是非公平锁,只要当前线程CAS设置同步状态成功,那么当前线程就成功的获取了锁。
ReentrantLock锁中存在Sync、FairSync、NonfairSync三个类。FairSync表示公平锁,NonfairSync表示非公平锁。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
从ReentrantLock类的构造函数可以看出,ReentrantLock默认的是非公平锁。该构造函数也可以通过fair这个参数,来决定是使用公平锁还是非公平锁。
2.2.2.3.1 Sync
Sync类是继承了AbstractQueuedSynchronizer队列同步器,它实现了父类AQS队列同步器的部分方法。Sync可以看作是一个自定义的同步器。Sync类中定义了nonfairTryAcquire(int acquires)方法,该方法在的锁的可重入性已经分析过。它专门用于非公平方式来获取锁。
2.2.2.3.2 NonfairSync
NonfairSync类表示以非公平策略获取锁,它继承了Sync类,实现了父类Sync的lock()方法和AbstractQueuedSynchronizer队列同步器的tryAcquire(int acquires)方法。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
//重写了父类Sync的lock()方法
final void lock() {
//如果CAS设置同步状态成功
//则表示当前线程已经成功获取到了锁
if (compareAndSetState(0, 1))
//将当前线程设置为独占线程
setExclusiveOwnerThread(Thread.currentThread());
//如果CAS设置同步状态失败的话,说明锁已被其它线程占用
else
//调用队列同步器(AQS)的acquire方法以独占方式获取同步状态
//该方法中会调用重写的tryAcquire(int acquires)方法
acquire(1);
}
//重写了队列同步器(AQS)的tryAcquire(int acquires)方法
//tryAcquire方法中调用了父类Sync的nonfairTryAcquire方法
//是以非公平的方式来获取同步状态
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
从NonfairSync类的源码可以看出,调用NonfairSync类的lock方法在获取锁的时候,会首先直接使用CAS来设置同步状态,如果设置成功,则表示当前线程已经成功获取了锁,并把当前线程设置为独占线程。
如果CAS设置同步状态不成功,则调用队列同步器(AQS)的acquire方法以获取同步状态。而acquire方法又会调用tryAcquire(int acquires)方法。tryAcquire(int acquires)方法在NonfairSync类已经重写了。它又会调用父类Sync的nonfairTryAcquire方法,nonfairTryAcquire方法则是以非公平的方式来获取同步状态。
所谓的非公平的方式,则表示线程获取锁的时候,并不是谁先申请的锁,谁就能先获取到锁。等待时间最长的线程不一定能获取到锁。锁的获取对于线程而言是没有规则的。
2.2.2.3.3 FairSync
FairSync类则表示以公平策略获取锁,同NonfairSync类一样,它也是Sync的子类,同样实现了父类Sync的lock()方法和AbstractQueuedSynchronizer队列同步器的tryAcquire(int acquires)方法。
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
//重写了父类Sync的lock()方法
final void lock() {
//调用队列同步器(AQS)的acquire方法
//该方法中会调用重写的tryAcquire(int acquires)方法
acquire(1);
}
//重写了队列同步器(AQS)的tryAcquire(int acquires)方法
protected final boolean tryAcquire(int acquires) {
//获取当前的线程
final Thread current = Thread.currentThread();
//获取同步状态
int c = getState();
//同步状态为0,表示此时还没有线程竞争该锁
if (c == 0) {
//hasQueuedPredecessors方法是保证公平方式获取锁的关键
//它主要判断是否有线程获取锁等待的时间比当前线程长
//如果没有,则说明当前线程是最早申请锁的,先进先出(FIFO)
//再通过CAS设置同步状态成功的话,则将当前线程设置为独占线程。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
//返回true表示当前的线程获取锁成功
return true;
}
}
//若当前获取锁的线程是独占线程,则表示线程再次获取了锁
//锁的可重入性
else if (current == getExclusiveOwnerThread()) {
//对当前的同步状态,增加重入次数。
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
//用增加了重入次数的同步状态更新同步状态。
setState(nextc);
//返回true表示锁再次获取成功。
return true;
}
return false;
}
}
从FairSync类的源码可以看出,FairSync类的lock获取锁的方法,其实直接调用了队列同步器(AQS)的acquire方法。acquire方法是以独占方式来获取同步状态,它忽略了中断。如果获取同步状态成功,则表示线程已经成功获取了锁。否则,线程将会进入同步队列中等待,该方法中又会调用重写的 tryAcquire(intarg)方法。
FairSync类中重写了tryAcquire(int acquires)方法,如果当前的同步状态为0,则表示此时还没有线程竞争锁,程序会通过CAS设置同步状态,以竞争锁。但是代码中有增加了一个hasQueuedPredecessors()方法的判断,这个判断方法是FairSync类实现以公平方式获取锁的关键。
hasQueuedPredecessors()主要是判断是否有线程获取锁等待的时间比当前线程长。如果没有,则说明当前线程是所有申请锁的线程中最早的,它是有资格获取锁的。如果有线程申请锁的时间是早于当前线程,则当前线程不能获取锁。所以FairSync公平锁保证了以线程申请锁的时间绝对先后顺序来获取锁,符合了先进线程FIFO的原则。
NonfairSync非公平锁和FairSync公平锁都继承了Sync类,他们的以策略模式分别给出了两种不同的获取锁的方式。从源码也可以看出,非公平锁和公平锁都是可重入的锁。
ReentrantLock的构造函数可以通过fair这个参数,来决定是使用公平锁还是非公平锁, 但默认的是非公平锁。默认为非公平锁,可能会导致同一个获取了锁的线程再次获取锁的几率变得非常大。如果是同一个线程连续多次获取锁来执行程序,这样就会减少的线程上下文的切换,虽然会使其他的线程可能处于空闲的状态,但非公平锁却保证了其更大的吞吐量。
2.2.3 ReadWriteLock读写锁
读写锁是一种改进型的排他锁,它允许同一时刻可以有多个只读线程来访问共享数据,但如果有写线程在更新共享数据时,其他的读线程和写线程都会被阻塞。有读线程在读取共享数据的时,其他写线程是无法更新这个共享数据的,而一个写线程在更新共享数据时,其他的任何线程都无法读取和更新这个共享数据。
与一般的排他锁相比,读写锁的并发性有了很大的提升。读写锁特别适合于有大量对数据读操作而写操作很少的业务场景。
在java中对于读写锁提供了一个ReadWriteLock接口,这个接口有一个默认的实现类ReentrantReadWriteLock。
ReadWriteLock接口:
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
ReadWriteLock接口定义了两个方法:readLock()和writeLock(),分别用于返回Lock类型的读锁和写锁。
读写锁中是维护了一个读锁和一个写锁这样的一对读写分离的锁。线程在读取共享数据的时候,必须先持有读写锁的读锁。这个读锁是可以同时被多个线程所持有的,这样多个持有读锁的线程,就可以同时读取这些共享数据,提高了并发性。只要有一个线程持有了读写锁的读锁,其他的任何线程就无法再获取该读写锁的写锁了。这样就可以保障线程在读取共享数据时,没有其他的线程可以对数据进行更新,从而保证线程读取到的共享数据是最新的。
线程在更新共享数据的时,必须先持有这个读写锁的写锁。写锁是排他的,一个线程持有写锁的时候,其他的任何线程就无法再获得这个读写锁的写锁或者读锁。只有当这个线程在更新完成共享数据,并释放写锁后,其他的线程才能再次获取到写锁或者读锁。这样就保证了线程对共享数据的更新对其他线程都是可见的
我们用一个示例来介绍一下ReentrantReadWriteLock读写锁的使用。
public class ReentrantReadWriteLockDemo {
//把初始值设置为0
private String value = "0";
//实例化一个ReentrantReadWriteLock读写锁的示例
private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
//读操作
public void read() {
//申请读锁
rwLock.readLock().lock();
System.out.println(Thread.currentThread().getName() + ":读取数据开始!");
try {
//当前线程休眠几秒钟
Thread.sleep((long) (Math.random() * 10000));
//程序打印当前读取的数据
System.out.println(Thread.currentThread().getName() + ":读取数据为:"+value);
} catch (InterruptedException e) {
e.printStackTrace();
}
finally {
System.out.println(Thread.currentThread().getName() + ":读取数据结束!");
//释放读锁
rwLock.readLock().unlock();
}
}
//写操作
public void write(String value) {
//申请写锁
rwLock.writeLock().lock();
System.out.println(Thread.currentThread().getName() + ":写入数据开始!");
try {
//当前线程休眠几秒钟
Thread.sleep((long) (Math.random() * 10000));
this.value = value;
//程序打印当前写入的数据
System.out.println(Thread.currentThread().getName() + ":写入数据为:"+value);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
finally {
System.out.println(Thread.currentThread().getName() + ":写入数据结束!");
//释放写锁
rwLock.writeLock().unlock();
}
}
}
在以上示例代码的ReentrantReadWriteLockDemo类中,我们定义了一个ReentrantReadWriteLock读写锁的实例。ReentrantReadWriteLockDemo类中分别定义了读和写的两个方法。在read的读方法中我们会申请ReentrantReadWriteLock读写锁的读锁,以防止读取数据期间,其他线程修改了数据。同样在write写方法中,我们会申请读写锁的写锁,以防止写入数据期间,其他线程读取或修改了数据。
public class ReentrantReadWriteLockApp {
public static void main(String[] args) {
ReentrantReadWriteLockDemo demo = new ReentrantReadWriteLockDemo();
//循环10次
for (int i = 0; i < 10; i++) {
//随机数是偶数的话,就启动一个线程进行读操作
if((new Random().nextInt(10000)) % 2 == 0) {
new Thread(() -> demo.read()).start();
}
else {
//随机数是奇数的话,就启动一个线程进行写操作,写入一个100以内的随机整数。
new Thread(() -> demo.write(new Random().nextInt(100)+"")).start();
}
}
}
}
运行结果:
从程序的运行结果可以看出,在数据写入操作时,ReentrantReadWriteLock读写锁的写锁是独占排他的,而它的多个读锁是可以并发的。也就是遵循了读读相容,读写互斥,写写互斥的读写互斥原则。
三.Java异步编程
1.Java中的线程池
线程对系统来说,是一种稀缺资源。Java中每创建一个线程,JVM就会在相应的在宿主机操作系统中创建一个的本地线程。系统在执行完任务后,一般会把线程销毁。而每次线程的创建和销毁,都会消耗CPU和内存等系统资源。
如果一个系统中会不断产生的大量任务,在这种情况下,如果我们针对每个任务都新创建一个线程来处理,就消耗非常多的系统资源,还可能会导致系统不稳定。
其实我们可以复用一定数量的线程(这些线程处理完任务后,不需立即销毁),由这些线程按照一定顺序去处理不断产生的任务。这些一定数量的线程就组成了线程池。线程池可以使用有限的系统资源去处理相对无限的任务。
线程池(thread pool):是一种线程的使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
在Java中的我们一般是new一个ThreadPoolExecutor类的实例,来创建一个线程池。ThreadPoolExecutor是java线程池中一个十分核心的实现类。
1.1 ThreadPoolExecutor类的参数
corePoolSize(核心线程数):核心线程数是线程池中能保持的最少的线程数,即使这些线程是处于空闲状态,也不会销毁。当线程池中的线程数小于corePoolSize时,我们向线程池提交一个任务,线程池就会创建一个新的线程。即使线程池中有空闲线程时,也会重新创建。对于新提交的任务,线程池会一直创建线程,直到线程数达到corePoolSize。
maximumPoolSize(最大线程数):线程池中存在的最大线程数。
workQueue(任务队列):任务队列是用于保存等待线程执行的任务。当线程池中的线程数达到corePoolSize时,我们再次向线程池提交任务时,此时线程池就不会再创建新的线程用于执行这个任务了。线程池会把这个任务保存到任务队列中,等待有空闲的线程出现时,再用这个空闲的线程来执行这个任务。
maximumPoolSize(最大线程数):线程池中允许的最大线程数量,默认值为Integer.MAX_VALUE。当线程池的线程数已经达到corePoolSize,并且在任务队列存储任务已满的情况下,如果此时线程池没有空闲线程且线程数小于maximumPoolSize,对于新提交的任务,线程池会再次创建新的线程用于执行任务。当线程池的线程数已达到maximumPoolSize且这些线程都处于非空闲的状态,若再次提交的任务,线程池则会执行拒绝策略,抛出RejectedExecutionException类型的异常。
keepAliveTime(线程存活时间):是当线程池中线程数大于corePoolSize时,多余空闲线程在终止前等待新任务存活的最长时间。
TimeUnit(线程存活时间的单位):可选时,分,秒,天等单位,默认是纳秒。
threadFactory(线程工厂): 用于创建执行任务线程。
handler(饱和策略): 当线程池中的线程数已经达到maximumPoolSize,且任务队列已满的情况下,线程池会采取一种策略处理提交的新任务。默认是AbortPolicy,表示无法处理新任务而抛出异常。JDK中提供了以下几种策略。
- AbortPolicy:默认是AbortPolicy,线程池会抛出异常。
- CallerRunsPolicy: 由调用者所在线程来执行任务。
- DiscardPolicy:不做任务处理,丢弃掉。
- DiscardOldestPolicy:从队列中踢出最先进入队列的任务,再执行当前提交的任务。
- RejectedExecutionHandler:用户自定义饱和策略。定义实现了RejectedExecutionHandler接口的实例,在rejectedExecution方法中,用户根据自身业务的场景需来实现自定义策略。
1.2线程池的队列
当线程池的线程数达到corePoolSize(核心线程数),且所有的这些线程都处于非空闲的状态,我们再向线程池提交任务,那么此时的任务都会放入队列(队列不是饱和状态),已待有空闲状态的线程时,再执行队列中的任务。线程池中的任务队列可以分为:有界队列,无界队列,同步移交队列这三种阻塞队列。
ArrayBlockingQueue:是一个基于数组的有界阻塞队列,队列按FIFO(先进先出)原则对元素进行排序。ArrayBlockingQueue类在实例化对象时,必须要指定队列的大小。
LinkedBlockingQueue:是一个基于链表的有界阻塞队列,队列也是按FIFO(先进先出)原则对元素进行排序。LinkedBlockingQueue的默认大小为Integer.MAX_VALUE,为了避免可能的OOM错误,所以我们再构建LinkedBlockingQueue的对象时,最好指定队列的大小。
PriorityBlockingQueue:一个具有优先级的无界阻塞队列,队列都返回优先级最高或者最低的元素。队列内部维护最小堆,使用平衡二叉树实现,直接遍历队列元素不能保证有序。队列使用元素的compareTo方法的指定规则排序。
SynchronousQueue:是一个不存储元素的同步移交队列,队列的容量为0,也就是队列中无任何的缓冲,队列在取元素和放入元素时都会产生阻塞。取元素要等待,只到另一个线程有插入元素的操作,否则会一直阻塞。队列放入元素时也会一直阻塞,直到有另一个线程来取元素。
1.3 ThreadPoolExecutor的执行流程
(1).首先判断线程池中的线程数是否小于corePoolSize(核心线程数),如果是小于corePoolSize,则创建一个新的工作线程来执行提交的任务。线程执行完任务后,不会销毁,而是保存在线程池中,循环的执行队列中的任务。
(2). 当线程池的线程数已达到corePoolSize,对于新提交的任务,在没有空闲线程的状况下,线程池会把当前提交的任务放入到队列中。任务放入队列成功后,线程池还需要进行一个double-check。<1>.检查此时线程池是否处于关闭状态,如果是,则需要从队列中移除刚刚添加的任务,再对任务执行拒绝策略。<2>.检查此时线程池的线程数是否为0,为0的情况下,则需要添加一个新的线程(防止所有的工作线程都已经销毁回收的情况)。
(3). 当线程池的线程数已达到corePoolSize且任务队列已满的情况下,此时判断线程池中的线程数是否小于maximumPoolSize(最大线程数)。如果小于maximumPoolSize,且没有空闲线程的状况下,则创建一个新的工作线程来执行提交的任务。线程在执行完任务后,在存活时间内没有新的任务,线程就会被销毁。对于空闲的线程,线程池会一直销毁到线程数为corePoolSize时为止。
(4).当线程数已达到maximumPoolSize且队列中任务已满的情况下,如果此时没有空闲的线程,此时对于新提交的任务,线程池会执行饱和策略。
2.Executor框架
Executor框架解耦了任务的提交与任务执行。Executor框架是由任务,任务的执行,异步计算的结果三个部分组成。
2.1任务
任务主要是实现了Runnable接口或Callable接口的实例。
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
Runnable是一个函数式接口,接口中定义了一个抽象的run方法,这个方法是对任务处理逻辑的抽象。我们在定义一个任务时,可以实现Runnable接口,在重写接口run方法中给出自己的业务处理逻辑。
Runnable接口的实例可以作为参数给Thread线程来直接执行,也可以给Executor线程池作为异步任务来执行。
Runnable接口的实例执行任务过程中不会抛出异常,也不会返回执行结果。
Callable接口
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
Callable同Runnable一样也是函数式接口,Callable接口的任务只需要实现接口声明的call()方法。但Callable接口是一个泛型接口,call()方法会有返回结果,而且会抛出异常。
Callable接口的实例也可以转换成Runnable接口的实例。Executors工厂类中提供了callable方法可以完成它们之间的转换。
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<Object>(task, null);
}
由于FutureTask类间接实现了Runnable接口。我们也可以通过FutureTask类的一个构造函数将Callable实例转换为Runnable实例。
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
2.2任务的执行
Executor接口和其子接口ExecutorService的提供了任务的执行机制。
public interface Executor {
void execute(Runnable command);
}
Executor接口定义了execute方法,专门用于执行Runnable接口类型的任务,execute方法没有返回值。
public interface ExecutorService extends Executor {
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
}
ExecutorService接口继承了Executor接口,定义了submit方法用于执行Runnable和Callable类型的任务,submit方法会返回任务执行结果。
Executor的类图
从类图中可以看出,Executor接口系列有两个实现类ThreadPoolExecutor和ScheduledThreadPoolExecutor。
2.2.1 ThreadPoolExecutor
ThreadPoolExecutor是ExecutorService的默认实现类,在Java中的我们一般是new一个ThreadPoolExecutor类的实例,来创建一个线程池。ThreadPoolExecutor是java线程池中一个十分核心的类。在1. Java中的线程池的章节中已对ThreadPoolExecutor做了详细的描述,这里就不再复述。
Executor框架中还提供了一个工厂类Executors,它可以用来创建一系列的ThreadPoolExecutor类型的线程池实例。
2.2.1.1 FixedThreadPool
FixedThreadPool是一个可复用且线程数固定的ThreadPoolExecutor类型的线程池。我们可以使用Executors工厂类的newFixedThreadPool方法来创建一个FixedThreadPool的线程池。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
从newFixedThreadPool方法可以看出,方法返回了一个ThreadPoolExecutor类型的实例,该线程池的核心线程数和最大线程数都为入参nThreads。keepAliveTime设置为OL,意味着多余的空闲线程会被立即终止。任务的缓存队列是无界队列LinkedBlockingQueue。
当FixedThreadPool线程池中的线程数小于corePoolSize(核心线程数)时,对于新提交的任务,线程池会创建一个新的线程来执行任务。而当线程数达到corePoolSize(核心线程数)后,线程池中的线程数就会保持不变了。对于空闲的线程,线程池不会销毁。对新提交的任务,如果线程池中有的空闲线程,则直接用空闲的线程来执行任务。没有空闲的线程,线程池也不会再增加新的线程,而是新提交的任务缓存到LinkedBlockingQueue队列中。LinkedBlockingQueue队列的容量为Integer.MAX_VALUE,相当于一个无界队列。
FixedThreadPool线程池中线程数大小一旦达到corePoolSize(核心线程数)后,就既不会增加也不会减少工作线程,一直会保持corePoolSize的线程数。这些线程会循环反复的从LinkedBlockingQueue获取任务来执行。FixedThreadPool线程池一旦使用完毕不再需要时,我们需要将其关闭。
FixedThreadPool线程池适合于不想占用太多服务器资源,任务数量固定且耗时比较长的任务。
2.2.1.2 CachedThreadPool
CachedThreadPool是一个可以根据实际运行情况的需要,自行动态调整线程数的可复用的线程池。若CachedThreadPool线程池中存在空闲的线程,对于新提交的任务,则优先复用那些空闲的线程来执行任务。若线程池中没有线程或所有的线程都在执行任务,则会创建一个新的线程来执行任务。当线程闲置一定的时间后,就自动销毁。所以CachedThreadPool中的线程数会根据执行任务的情况,来增加或者减少线程。CachedThreadPool线程池中的线程数一直是处于一个变化的状态。
我们可以使用Executors工厂类的newCachedThreadPool方法来创建一个CachedThreadPool的线程池。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
可以看出newCachedThreadPool方法是直接new一个ThreadPoolExecutor的实例返回。在ThreadPoolExecutor类的构造函数中,corePoolSize(核心线程数)被设置为0,maximumPoolSize(最大线程数)被设置为Integer.MAX_VALUE,也可以认为maximumPool是无界的。keepAliveTime设置为60秒,也就是说空闲线程超过60秒后将会被终止。线程池的缓存队列是SynchronousQueue。SynchronousQueue是一个没有数据缓冲的阻塞队列,它的容量是0,不持有任何元素,当生产者线程对其的插入offer操作时,必须等待消费者线程的poll移除操作。
当第一次向CachedThreadPool线程池提交任务时,由于corePoolSize为0。生产者线程会执行SynchronousQueue.offer(Runnable task)方法把任务缓存到队列中,由于没有对应的消费者线程执行队列的SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),所以生产者线程的SynchronousQueue.offer(Runnable task)操作将不会成功。因为SynchronousQueue队列的容量是0,此时就相当于队列已满的情况。线程池的maximumPoolSize是Integer.MAX_VALUE。所以线程池此时会创一个新的消费者线程来执行任务。
同以上的情况,当线程池有了消费者线程但如果都处于非空闲状态,若此时生产者线程提交了一个任务,由于也没有消费者线程执行队列的SynchronousQueue.poll()方法,所以线程池此时也是创建一个新的线程来执行任务。在极端的情况下,给线程池每提交一个任务可能都会导致一个新的消费者线程被创建,从而导致系统中产生过多的线程。
当消费者线程执行完任务后,它会执行队列的SynchronousQueue.poll()操作,获取下一个任务来执行。由于keepAliveTime设置为60秒,所以如果在线程空闲的60秒内,有生产者线程提交了一个任务,生产者和消费者线程操作配对成功后,线程池就会复用这个消费者线程来执行提交的任务。若超过60秒,这个空闲的消费者线程将会被销毁。由于空闲60秒的空闲线程都会被销毁,因此长时间持有空闲的CachedThreadPool线程池不会占用系统资源。
CachedThreadPool线程池适合于用来执行大量耗时较短且提交频率较高的任务。
2.2.1.3 SingleThreadExecutor
SingleThreadExecutor从名称上就可以看出是只有一个工作线程的线程池,该线程池确保了在任意的一个时刻只有一个工作线程在执行任务。
我们可以使用Executors工厂类的newSingleThreadExecutor方法来创建一个SingleThreadExecutor的线程池。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
从newSingleThreadExecutor方法源码可以看出,该方法返回并不是ThreadPoolExecutor实例。而是一个对ThreadPoolExecutor进行了包装的一个ExecutorService实例。
SingleThreadExecutor的corePoolSize(核心线程数)和maximumPoolSize(最大线程数)都被设置为1,keepAliveTime设置为OL,任务的缓存队列使用的是无界队列LinkedBlockingQueue。从SingleThreadExecutor线程池的设置参数可以看出,它相当于一个线程数为1的FixedThreadPool线程池,即Executors.newFixedThreadPool(1)。
(1)向 SingleThreadExecutor线程池提交任务时,如果当前的线程数少于corePoolSize(核心线程数)(第一次提交任务或工作线程由于异常而终止时)。此时线程池会创建一个新的工作线程来执行任务。
(2)由于corePoolSize(核心线程数)为1,LinkedBlockingQueue是一个无界的队列,所以后续提交的任务都会缓存到LinkedBlockingQueue。
(3) 线程池的工作线程在执行完任务后,会在LinkedBlockingQueue队列中依次循环的取下一个任务来执行。
SingleThreadExecutor线程池适合于用来执行串行的需要按照提交顺序执行的任务。如果不想占用太多服务器资源,且需要执行任务不多,就可以考虑 SingleThreadExecutor线程池。
2.2.2 ScheduledThreadPoolExecutor
从Executor的类图可以看出ScheduledThreadPoolExecutor类继承了ThreadPoolExecutor类,ScheduledThreadPoolExecutor线程池主要是用于执行延迟任务和周期性的任务。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
从以上源码中可以看出ScheduledThreadPoolExecutor类的构造函数是直接调用了父类ThreadPoolExecutor的构造函数。该线程池的corePoolSize(核心线程数)是由参数传入,maximumPoolSize(最大线程数)被设置为Integer.MAX_VALUE。keepAliveTime设置为0L。缓存队列使用的是DelayedWorkQueue队列。
2.2.2.1 schedule方法
ScheduledThreadPoolExecutor类中定义的schedule方法,是用于执行延迟性的任务,任务只能执行一次。
schedule方法的参数:
参数名 | 类型 | 描述 |
command 或 callable | Runnable 或 Callable | 要执行的任务 |
delay | long | 延迟执行的任务的时间 |
unit | TimeUnit | 延迟时间单位 |
public class ScheduleApp {
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(3);
System.out.println("start... "+DateToString(new Date()));
scheduledThreadPoolExecutor.schedule(() -> {
System.out.println("i am is "+Thread.currentThread().getName()+";"+DateToString(new Date()));
}, 3, TimeUnit.SECONDS);
}
private static String DateToString(Date date) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
return sdf.format(date);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
上述示例代码中,调用了schedule方法,程序会在延迟3秒钟后会打印出当前线程的名称。
运行结果:
2.2.2.2 scheduleAtFixedRate方法
ScheduledThreadPoolExecutor类的scheduleAtFixedRate方法,是表示按照一定时间间隔周期不间断的执行某个任务。
scheduleAtFixedRate方法的参数:
参数名 | 类型 | 描述 |
command | Runnable | 要执行的任务 |
initialDelay | long | 任务初始化执行时的延迟时间 |
period | long | 任务执行的时间间隔周期 |
unit | TimeUnit | 间隔周期时间单位 |
public class ScheduleAtFixedRateApp {
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(3);
System.out.println("start... "+DateToString(new Date()));
scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
System.out.println("i am is "+Thread.currentThread().getName()+";"+DateToString(new Date()));
Sleep(3);
}, 2, 5, TimeUnit.SECONDS);
}
private static String DateToString(Date date) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
return sdf.format(date);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
private static void Sleep(int seconds){
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
示例代码中,调用了ScheduledThreadPoolExecutor类的scheduleAtFixedRate方法。任务启动后延迟2秒后执行,以后任务会每间隔5秒钟执行一次。
运行结果:
2.2.2.3 scheduleWithFixedDelay方法
ScheduledThreadPoolExecutor类的scheduleWithFixedDelay方法,是表示任务在执行完成后,会等待一定时间间隔周期后再次执行这个任务,并按照这个间隔周期会一直执行下去。
scheduleWithFixedDelay方法的参数:
参数名 | 类型 | 描述 |
command | Runnable | 要执行的任务 |
initialDelay | long | 任务初始化执行时的延迟时间 |
delay | long | 任务执行完成后再次执行的延迟时间间隔 |
unit | TimeUnit | 间隔周期时间单位 |
public class ScheduleWithFixedDelayApp {
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(3);
System.out.println("start... "+DateToString(new Date()));
scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
System.out.println("i am is "+Thread.currentThread().getName()+";"+DateToString(new Date()));
Sleep(6);
}, 3, 5, TimeUnit.SECONDS);
}
private static String DateToString(Date date) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
return sdf.format(date);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
private static void Sleep(int seconds){
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在以上示例代码中, ScheduledThreadPoolExecutor类的scheduleWithFixedDelay方法,会在任务启动后延迟3秒来执行任务,当任务执行完成后会延迟5秒再次执行当前任务。任务是打印当前线程的名称,打印完成后,当前的线程会休眠6秒钟。线程会休眠6秒钟可以看作是任务执行的时间,任务执行完成后会延迟5秒钟再次执行,所以下个任务会在11秒后再执行。
运行结果:
2.3异步计算的结果
2.3.1 Future接口
Future接口的一系列实例代表了异步计算的结果。Future接口中声明了get方法来获取任务的执行结果。接口还声明了cancel方法,用于取消正在执行的任务。
java.util.concurrent 包下的FutureTask类是Future接口一个典型的具体实现。
public interface Future<V> {
//尝试取消执行的任务
boolean cancel(boolean mayInterruptIfRunning);
//判断任务是否已经取消
boolean isCancelled();
//判断任务是否已经执行完毕
boolean isDone();
//等待任务执行完成后,返回结果
V get() throws InterruptedException, ExecutionException;
//若在超时时间内任务执行完成后,则返回结果,否则抛出TimeoutException异常
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
类图
从类图中可以看出,FutureTask类实现了RunnableFuture接口,而RunnableFuture接口又继承了Runnable和Future接口。FutureTask类不仅实现了Future接口,还间接实现了Runnable接口。这样的话FutureTask就可以作为一个任务交给工作线程Thread或者Executor类型的线程池来执行。
2.3.2 FutureTask的应用
2.3.2.1 FutureTask与线程
由于FutureTask实现了Runnable接口,那么FutureTask可以作为任务交给一个Thread线程来执行。我们可以通过FutureTask的get方法来获取执行结果,如果结果没有计算完成,get方法就会产生阻塞一直到结果计算完成完成后再返回。FutureTask非常适合用于耗时的计算,主线程在完成自己的任务后,再去获取FutureTask的结果。
public class FutureTaskApp {
public static void main(String[] args) throws InterruptedException, ExecutionException {
//定义了一个Callable接口的实例task,重写Callable接口的call()方法
//在call()方法中,调用了calculate方法,入参是15。
//会进行15次计算,最后把计算结果返回。
Callable<Integer> task = new Callable<Integer>(){
@Override
public Integer call() throws Exception {
return calculate(15);
}
};
//用Callable接口的实例task,作为构造函数的参数,来实例化一个FutureTask的实例futureTask。
//由于FutureTask间接实现了Runnable接口,所以futureTask相当于把一个Callable接口的实例
//转化成了一个Runnable的接口实例
FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
//实例化一个计时器类对象,用于计算程序耗时
StopWatch stopWatch = new StopWatch();
stopWatch.start();
//实例化一个线程专门用于执行futureTask的任务
//由于futureTask实现了Runnable接口,所以可以给Thread执行
new Thread(futureTask, "FutureTaskThread").start();
//在主线程中,进行5次计算,得到计算结果
int result1 = calculate(5);
System.out.println("result1 is :"+result1);
//通过futureTask的get方法得到异步计算的结果。
int result2 = futureTask.get();
//int result2 = calculate(15);
System.out.println("result2 is :"+result2);
//最后对主线程和异步计算的结果进行求和
System.out.println("total is :"+(result1+result2));
stopWatch.stop();
//输出程序的总耗时
System.out.println("总共耗时 :"+stopWatch.getTime()+"毫秒");
}
//定义了一个用于计算的方法,计算的次数是由参入的参数来决定。
//结果每累加一次,停顿一秒
private static int calculate(int num) throws InterruptedException {
int result = 0;
for(int i = 0; i < num;i++) {
result += i*3;
Thread.sleep(1000);
}
return result;
}
}
在上述示例代码中,我们定义了一个calculate的方法来模拟耗时计算。方法中的计算次数是由入参来决定,方法每进行一次加法计算,线程都会有一秒钟的停顿。
在main方法中,定义了一个Callable类型的实例task。task重写了call()方法,call()方法中会调用calculate方法,给的入参是15,也就是会进行15次计算,call()方法会把最后的计算结果返回。
再用task作为FutureTask构造函数的参数,来实例化一个FutureTask的类型的futureTask。由于FutureTask间接实现了Runnable接口,所以futureTask就相当于把一个Callable接口的实例转化成了一个Runnable的接口实例。Runnable的接口实例是可以给线程来进行执行的。
实例化一个Thread线程专门用于执行futureTask的任务,在主线程外再起另一个线程来执行耗时的任务。
在主线程中,也是调用calculate方法,入参是5,来模拟一个主线程中的一个计算任务。
最后通过futureTask的get方法来获得异步计算的结果,get方法会产生阻塞,一直到结果计算完成后才会返回结果。
程序最后会把主线程中的一个计算结果和异步计算的结果求和后输出。在系统资源允许的情况下,对于一些耗时的任务,可以通过FutureTask另起一个线程来执行异步计算。由于增加了一个线程来并行执行耗时的任务,程序的总体执行性能会提高。
示例代码的运行结果:
如果我们不使用FutureTask另起一个线程来执行异步计算,而是把两个计算都放在main方法中同步执行的话,程序总体执行性能就会慢很多。
public class App {
public static void main(String[] args) throws InterruptedException {
//实例化一个计时器类对象,用于计算程序耗时
StopWatch stopWatch = new StopWatch();
stopWatch.start();
int result1 = calculate(5);
System.out.println("result1 is :"+result1);
int result2 = calculate(15);
System.out.println("result2 is :"+result2);
System.out.println("total is :"+(result1+result2));
stopWatch.stop();
//输出程序的总耗时
System.out.println("总共耗时 :"+stopWatch.getTime()+"毫秒");
}
//定义了一个用于计算的方法,计算的次数是由参入的参数来决定。
//结果每累加一次,停顿一秒
private static int calculate(int num) throws InterruptedException {
int result = 0;
for(int i = 0; i < num;i++) {
result += i*3;
Thread.sleep(1000);
}
return result;
}
}
示例代码的运行结果:
2.3.2.2 FutureTask与线程池
ThreadPoolExecutor线程池的父类AbstractExecutorService提供了submit方法,专门用于提交需要执行的任务。submit方法会返回Future类型的实例。这些实例一般都是FutureTask类型的对象。
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
从上述代码中可以看出,AbstractExecutorService提供了三个submit的重载方法,入参可以是Runnable和Callable类型的任务。由于FutureTask实现了Runnable接口,所以FutureTask实例也可以作为submit方法的入参,FutureTask实例也可以作为任务交给线程池来执行。
public class FutureTaskExecutorApp {
public static void main(String[] args) throws InterruptedException, ExecutionException {
//定义了一个Callable接口的实例task,重写Callable接口的call()方法
//在call()方法中,会进行20次计算,每次线程停顿一秒,最后再把计算结果返回。
Callable<Integer> task = new Callable<Integer>(){
@Override
public Integer call() throws Exception {
Integer result = 0;
for(int i = 0; i < 20;i++) {
result += i*3;
Thread.sleep(1000);
}
return result;
}
};
//定义只有一个线程的线程池。
ExecutorService executorService = Executors.newFixedThreadPool(1);
//把Callable接口的实例task作为线程池submit的参数,来提交任务。
//得到Future类型的实例
Future<Integer> future = executorService.submit(task);
System.out.println("future get start");
//通过future的get方法得到异步计算的结果,若异步任务没有计算完成,则会阻塞知道 计算完成后返回结果。
Integer result = future.get();
System.out.println("result is "+result);
executorService.shutdown();
}
}
上述的示例代码中,定义了一个Callable接口的实例task,重写了接口的call()方法,在call()方法中模拟了耗时的计算,计算完成后返回结果。定义只有一个线程的线程池executorService。调用executorService的submit方法,把Callable接口的实例task作为线程池submit方法的参数,来提交任务。submit方法会返回一个Future类型的实例。最终通过future的get方法来得到异步计算的结果。若此时异步任务没有计算完成,就会阻塞直到计算完成后返回结果。
2.3.3 FutureTask的状态
FutureTask类中定义了一个int型的state变量来表示异步任务运行的状态,FutureTask一共定义了7个不同的状态。
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
由于FutureTask内部会维护一个表示任务运行状态的变量,所以FutureTask可以被设计成用来表示一次性执行的任务。 FutureTask的run()方法在执行任务时,会先判断任务的运行状态,如果该任务已经被执行过,那么run()方法就会直接返回计算结果。
因此,FutureTask 实例所代表的任务是无法被重复执行的。这意味着同一个 FutureTask实例不能多次提交给Executor实例执行。
但是FutureTask的runAndReset()方法能够打破这种限制,可以使得一个 FutureTask实例所代表的任务能够多次被执行。
按照FutureTask类的run()方法的执行的时机(未启动,已启动,已完成),这个7个状态之间的转换一般是以下4种方式来进行装换:
1. 未启动:FutureTask任务未启动是指在创建FutureTask实例后,还尚未执行实例的run()方法,此时state为NEW(初始状态)。此时执行FutureTask的get()方法,会导致调用线程阻塞,直到任务执行完成并返回结果。若执行cancel()方法,会将任务状态标记为已CANCELLED(取消结束)。
2. 已启动:FutureTask任务已启动是当FutureTask实例调用了run()方法后或者对执行中任务又调用了cancel(true) 方法, 此时任务的state就会变为COMPLETING(完成中)或INTERRUPTING(中断中)这两个状态。此时执行FutureTask的get()方法,跟任务未启动一样,也会导致调用线程阻塞,直到任务执行完成并返回结果。若执行cancel(false)方法,只会将任务标记为已CANCELLED(取消结束),但不会对正在执行任务的线程产生任何影响,任务会继续执行直至结束。若执行cancel(true)方法,则将任务状态标记为已INTERRUPTING(中断中),将以中断线程的方式来试图阻止任务的继续执行。
3. 已完成:FutureTask任务已完成是当FutureTask的run()方法已经执行完成或任务已经被中断了。此时任务的状态可能是NORMAL(正常结束),EXCEPTIONAL(异常结束),CANCELLED(取消结束),INTERRUPTED(中断结束)这4个状态。此时执行FutureTask的get()方法,会立即返回结果或抛出异常。执行cancel()方法,则立即返回false,表示无法取消已经完成的任务。