线程
线程的状态
Java中线程可以有如下6中状态:
-
NEW 新创建
-
RUNNABLE 可运行
-
BLOCKED 阻塞
-
WAITING 等待
-
TIMED WAITING 计时等待
-
TERMINATED 终止
线程的创建
(1)继承Thread类
public class ExtendsThread extends Thread { @Override public void run() { System.out.println("1......"); } public static void main(String[] args) { new ExtendsThread().start(); } }
(2)实现 Runnable 接口
public class ImplementsRunnable implements Runnable { @Override public void run() { System.out.println("2......"); } public static void main(String[] args) { ImplementsRunnable runnable = new ImplementsRunnable(); new Thread(runnable).start(); } }
(3)实现 Callable 接口
public class ImplementsCallable implements Callable<String> { @Override public String call() throws Exception { System.out.println("3......"); return "zhuZi"; } public static void main(String[] args) throws Exception { ImplementsCallable callable = new ImplementsCallable(); FutureTask<String> futureTask = new FutureTask<>(callable); new Thread(futureTask).start(); System.out.println(futureTask.get()); } }
(4)使用线程池
ublic class UseExecutorService { public static void main(String[] args) { ExecutorService poolA = Executors.newFixedThreadPool(2); poolA.execute(()->{ System.out.println("4A......"); }); poolA.shutdown(); // 又或者自定义线程池 ThreadPoolExecutor poolB = new ThreadPoolExecutor(2, 3, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); poolB.submit(()->{ System.out.println("4B......"); }); poolB.shutdown(); } }
(5)使用 CompletableFuture 类
public class UseCompletableFuture { public static void main(String[] args) throws InterruptedException { CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> { System.out.println("5......"); return "zhuZi"; }); // 需要阻塞,否则看不到结果 Thread.sleep(1000); } }
(6)基于 ThreadGroup 线程组
public class UseThreadGroup { public static void main(String[] args) { ThreadGroup group = new ThreadGroup("groupName"); new Thread(group, ()->{ System.out.println("6-T1......"); }, "T1").start(); new Thread(group, ()->{ System.out.println("6-T2......"); }, "T2").start(); new Thread(group, ()->{ System.out.println("6-T3......"); }, "T3").start(); } }
(7) 使用 FutureTask 类
和实现 Callable 接口差不多,只不过匿名形式创建 Callable
public class UseFutureTask { public static void main(String[] args) { FutureTask<String> futureTask = new FutureTask<>(() -> { System.out.println("7......"); return "zhuZi"; }); new Thread(futureTask).start(); } }
因为当你用一个类,继承Thread类时,它内部所有的方法,都会被继承过来,所以当前类可以直接调用start()方法启动,更具体点来说,在Java中,创建线程的方式就只有一种:调用Thread.start()方法!只有这种形式,才能在真正意义上创建一条线程!
而例如ExecutorService线程池、ForkJoin线程池、CompletableFuture类、Timer定时器类、parallelStream并行流……,如果有去看过它们源码的小伙伴应该清楚,它们最终都依赖于Thread.start()方法创建线程。
死锁的四个条件
-
互斥条件:该资源任意一个时刻只由一个线程占用。
-
请求与保持条件:一个线程因请求资源而阻塞时,对已获得的资源保持不放。
-
不剥夺条件:线程已获得的资源在未使用完之前不能被其他线程强行剥夺,只有自己使用完毕后才释放资源。
-
循环等待条件:若干线程之间形成一种头尾相接的循环等待资源关系
线程的常用方法
-
interrupted:对当前线程的中断标识进行复位。如果该线程已经处于终结状态,即使该线程被中断过,在调用该线程对象的 isInterrupted() 时依旧会返回 false
-
setPriority(int):设置线程的优先级别。可选范围1-10,默认为5, 优先级高代表抢到时间片的概率高。
-
yield():让当前线程直接放弃时间片返回就绪。
-
join():让当前线程邀请调用方法的那个线程优先执行,在被邀请的线程执行结束之前当前线程一直处于阻塞状态,不再继续执行
1、yield() 和 sleep() 的异同
-
相同点:yield() 方法和 sleep() 方法类似,也不会释放“锁”。
-
不同点:yield() 方法只是使当前线程重新回到可执行状态,所以执行 yield() 的线程有可能在进入到可执行状态后马上又被执行。 yield() 方法只能使同优先级或者高优先级的线程得到执行机会,这也和 sleep() 方法不同。
2、join() 和 sleep() 的异同
-
相同点:都可以实现等待
-
不同点:由于 join 的内部实现是 wait(),所以使用 join() 方法时会释放锁,那么其他线程就可以调用此线程的同步方法了。 sleep() 方法不释放锁,因此线程会一直等待下去,直到任务完成,才会释放锁。
3、sleep() 与 wait() 的异同
-
相同点:一旦执行方法,都可以使得当前的线程进入阻塞状态。
-
不同点:
-
-
(1)两个方法声明的位置不同:Thread 类中声明 sleep() , Object 类中声明 wait()
-
(2)调用的要求不同:sleep() 可以在任何需要的场景下调用。 wait() 必须使用在同步代码块或同步方法中
-
(3)关于是否释放同步监视器:如果两个方法都使用在同步代码块或同步方法中,sleep() 不会释放锁,wait() 会释放锁。
-
(4)当调用某一对象的 wait() 方法后,会使当前线程暂停执行,并将当前线程放入对象等待池中,直到调用了 notify() 方法后,将从对象等待池中移出任意一个线程并放入锁标志等待池中,只有锁标志等待池中的线程可以获取锁标志,它们随时准备争夺锁的拥有权。当调用了某个对象的 notifyAll() 方法,会将对象等待池中的所有线程都移动到该对象的锁标志等待池。
-
锁
synchronized
在 Java 6 之后, synchronized 引入了大量的优化如自旋锁、适应性自旋锁、锁消除、锁粗化、偏向锁、轻量级锁等技术来减少锁操作的开销,这些优化让 synchronized 锁的效率提升了很多。因此, synchronized 还是可以在实际项目中使用的,像 JDK 源码、很多开源框架都大量使用了 synchronized 。
使用方式
(1)对于普通同步方法,锁是当前实例对象
synchronized void method() { //业务代码 }
(2) 对于静态同步方法,锁是当前类的 class 对象
synchronized static void method() { //业务代码 }
(3)对于同步方法块,锁是 Synchronized 括号里配置的对象
synchronized(this) { //业务代码 }
总结:
-
synchronized 关键字加到 static 静态方法和 synchronized(class) 代码块上都是是给 Class 类上锁;
-
synchronized 关键字加到实例方法上是给对象实例上锁;
-
尽量不要使用 synchronized(String a) 因为 JVM 中,字符串常量池具有缓存功能。
Q:构造方法可以用 synchronized 修饰么?
A:先说结论:构造方法不能使用 synchronized 关键字修饰。
构造方法本身就属于线程安全的,不存在同步的构造方法一说
-
synchronized 同步语句块的实现使用的是 monitorenter 和 monitorexit 指令,其中 monitorenter 指令指向同步代码块的开始位置,monitorexit 指令则指明同步代码块的结束位置。
-
synchronized 修饰的方法并没有 monitorenter 指令和 monitorexit 指令,取得代之的确实是 ACC_SYNCHRONIZED 标识,该标识指明了该方法是一个同步方法。
不过两者的本质都是对对象监视器 monitor 的获取。
ReentrantLook
同步器
重入锁
获取过锁的线程可以再次调用 lock 来获取锁而不堵塞。假设线程 A 获取锁成功了,释放锁之前,A 线程自己是可以重复获取此锁的(state 会累加)。这就是可重入性的体现:一个线程可以多次获取同一个锁而不会被阻塞。但是,这也意味着,一个线程必须释放与获取的次数相同的锁,才能让 state 的值回到 0,也就是让锁恢复到未锁定状态。只有这样,其他等待的线程才能有机会获取该锁
公平锁
如果一个锁是公平的,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是FIFO。
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; /** * Acquires only if reentrant or queue is empty. */ final boolean initialTryLock() { Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedThreads() && compareAndSetState(0, 1)) { setExclusiveOwnerThread(current); return true; } } else if (getExclusiveOwnerThread() == current) { // 这里实现了可重入锁的逻辑 if (++c < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(c); return true; } return false; } /** * Acquires only if thread is first waiter or empty */ protected final boolean tryAcquire(int acquires) { if (getState() == 0 && !hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } }
唯一不同的位置为判断条件多了hasQueuedPredecessors()方法,即加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回 true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁
hasQueuedPredecessors 是公平锁加锁时判断等待队列中是否存在有效节点的方法。如果返回 False,说明当前线程可以争取共享资源;如果返回 True,说明队列中存在有效节点,当前线程必须加入到等待队列中。
非公平锁(默认)
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; final boolean initialTryLock() { Thread current = Thread.currentThread(); if (compareAndSetState(0, 1)) { // first attempt is unguarded setExclusiveOwnerThread(current); return true; } else if (getExclusiveOwnerThread() == current) { // 这里实现了可重入锁的逻辑 int c = getState() + 1; if (c < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(c); return true; } else return false; } /** * Acquire for non-reentrant cases after initialTryLock prescreen */ protected final boolean tryAcquire(int acquires) { if (getState() == 0 && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } }
为什么会出现线程连续获取锁的情况呢?回顾 nonfairTryAcquire(int acquires)方法,当一个线程请求锁时,只要获取了同步状态即成功获取锁。在这个前提下,刚释放锁的线程再次获取同步状态的几率会非常大,使得其他线程只能在同步队列中等待。
非公平性锁可能使线程“饥饿”,为什么它又被设定成默认的实现呢?再次观察上表的结果,如果把每次不同线程获取到锁定义为 1 次切换,公平性锁在测试中进行了10 次切换,而非公平性锁只有 5 次切换,这说明非公平性锁的开销更小。下面运行测试用例(测试环境:ubuntuserver 14.04 i5-34708GB,测试场景:10 个线程,每个线程获取100000 次锁),通过 vmstat 统计测试运行时系统线程上下文切换的次数,运行结果如表 5-7 所示。
参照:
Java AQS 核心数据结构-CLH 锁 - Qunar 技术沙龙open in new window
ReentrantReadWriteLook
锁降级
锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。
RentrantReadWriteLock 不支持锁升级(把持读锁、获取写锁,最后释放读锁的过程)。目的也是保证数据可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了数据,则其更新对其他获取到读锁的线程是不可见的。
并发容器
ConcurrentHashMap
HashTable 容器使用 synchronized 来保证线程安全,但在线程竞争激烈的情况下HashTable 的效率非常低下。因为当一个线程访问 HashTable 的同步方法,其他线程也访问 HashTable 的同步方法时,会进入阻塞或轮询状态。如线程 1 使用put 进行元素添加,线程 2 不但不能使用 put 方法添加元素,也不能使用 get 方法来获取元素,所以竞争越激烈效率越低。
在 JDK1.7 的时候,ConcurrentHashMap 对整个桶数组进行了分割分段(Segment,分段锁),每一把锁只锁容器其中一部分数据(下面有示意图),多线程访问容器里不同数据段的数据,就不会存在锁竞争,提高并发访问率。
到了 JDK1.8 的时候,ConcurrentHashMap 已经摒弃了 Segment 的概念,而是直接用 Node 数组+链表+红黑树的数据结构来实现,并发控制使用 synchronized 和 CAS 来操作。(JDK1.6 以后 synchronized 锁做了很多优化) 整个看起来就像是优化过且线程安全的 HashMap,虽然在 JDK1.8 中还能看到 Segment 的数据结构,但是已经简化了属性,只是为了兼容旧版本。
由于整个扩容操作是同步的,面对较大容量hashmap扩容是一个相当耗时的过程,会阻塞当前线程的其他操作。因此我们在java中使用较大容量的hashmap时,常常需要预估初始化hashmap的初始容量,指定一个2的N次方初始值进行优化。
参照:
ConcurrentHashMap 源码分析 | JavaGuide
ConcurrentLinkedQueue
ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法(即 CAS 算法)来实现,该算法在 Michael&Scott 算法上进行了一些修改
CountDownLatch
CyclicBarrier
通过设置一个屏障数,然后调用 await()方法来增加屏障数并堵塞等待。
CountDownLatch和 CyclicBarrier区别
-
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用 reset()重置。
-
CyclicBarrier还有其他复杂的方法,比如 getNumberWaiting 可以获得Cyclic-Barrier堵塞的线程数量,isBroken()方法用来了解堵塞的线程是否被中断。
Semaphore
控制并发线程数
使用方式:
(1)设置并发数
(2)调用 acquire()方法获取一个许可证
(3)调用release()方法归还许可证
Exchanger
两个线程互相交换值。执行 exchanger 需要成对出现。如果是奇数个,会有一个线程陷入堵塞。
package com.testclass.concurrency; import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExchangerTest { private static final Exchanger<String> exgr = new Exchanger<>(); private static ExecutorService threadPool = Executors.newFixedThreadPool(2); public static void main(String[] args) { threadPool.execute(new Runnable() { @Override public void run() { try { String A = "银行流水A"; String other = exgr.exchange(A); System.out.println("银行流水exchange value: " + other); } catch (InterruptedException e) { } } }); threadPool.execute(new Runnable() { @Override public void run() { try { String B = "银行流水B"; String A = exgr.exchange("B"); System.out.println("A 和 B 数据是否一致" + A.equals(B) + ", A 录入的是:" + A + ", B录入是:" + B); } catch (InterruptedException e){ } } }); threadPool.execute(new Runnable() { @Override public void run() { try { String C = "银行流水C"; String other = exgr.exchange("C"); System.out.println("银行流水C exchanger value: " + other); } catch (InterruptedException e){ } } }); threadPool.execute(new Runnable() { @Override public void run() { try { String D = "银行流水D"; String other = exgr.exchange("D"); System.out.println("银行流水D exchanger value: " + other); } catch (InterruptedException e){ } } }); threadPool.shutdown(); } }
线程池
ThreadPoolExecutor
实现原理
关键参数
-
corePoolSIze(线程的基本大小)
-
runnableTaskQueue:任务队列。用于保存等待执行的任务的堵塞队列
(1)ArrryBlockingQueue: 基于数组的有界堵塞队列。按FIFO
(2)LinkBlockingQueue:基于链表的堵塞队列(无界)。
(3)SynchronousQueue: 一个不存储元素的堵塞队列。每个插入都需要等待另一个线程移走。
(4)PriorityBlockingQueue: 一个具有优先级的无限堵塞队列。
-
maximumPoolSize:线程池最大数量。线程池允许创建的最大线程数。值得注意的是,如果使用了无界的任务队列这个参数就没什么效果。
-
ThreadFactory:用于设置创建线程的工厂。可以给线程设置有意义的名字。使用开源框架 guava 提供的 ThreadFactoryBuilder 可以快速给线程池里的线程设置有意义的名字,代码如下。new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
-
RejectedExecutionHandler:饱和策略。当队列和线程池都满了,对于新提交的任务的处理方式。默认是AbortPolicy
(1)AbortPolicy:直接抛出异常
(2)CallerRunsPolicy:只用调用者所在线程来运行任务
(3)DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务
(4)DiscardPolicy:不处理,丢弃掉。
也可以根据应用场景需要来实现 RejectedExecutionHandler 接口自定义策略。
-
keepAliveTime:线程活动保持时间,线程池的工作线程空闲后,保持存活的时间。
-
TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)
向线程池提交任务
-
execute:提交不需要返回值的任务。提交的任务是一个 Runnable类的实例
-
submit:提交需要返回值的任务。线程池回返回一个 future 类型的对象。通过 future 的 get() 方法获取返回值。
关闭线程池
可以通过调用线程池的 shutdown 或 shutdownNow 方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,
-
shutdownNow 首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,
-
shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。
通常调用 shutdown 方法来关闭线程池,如果任务不一定要执行完,则可以调用 shutdownNow 方法。
合理设置线程池
要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。
• 任务的性质:CPU 密集型任务、IO 密集型任务和混合型任务。
• 任务的优先级:高、中和低。• 任务的执行时间:长、中和短。
• 任务的依赖性:是否依赖其他系统资源,如数据库连接。
任务的性质:
性质不同的任务可以用不同规模的线程池分开处理。CPU 密集型任务应配置尽可能小的线程,如配置 Ncpu+1 个线程的线程池。由于 IO 密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如 2*Ncpu。混合型的任务,如果可以拆分,将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的 CPU 个数。
任务的优先级
优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以让优先级高的任务先执行。
注意 如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
任务的依赖性
依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,等待的时间越长,则 CPU 空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用 CPU。
建议使用有界队列。
线程池监控
taskCount:线程池需要执行的任务数量。
-
completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount。
-
largestPoolSize:线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
-
•getPoolSize:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销毁,所以这个大小只增不减。
-
getActiveCount:获取活动的线程数。
通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的beforeExecute、afterExecute 和 terminated 方法,也可以在任务执行前、执行后和线程池
Executor框架
Executor框架的结构和成员
Executor框架的结构
-
任务:包括被执行任务需要实现的接口:Runnable 接口或 Callable 接口
-
任务的执行:包括任务执行机制的核心接口 Executor, 以及继承自 Executor 的 ExecutorService 接口。主要实现类 ThreadPoolExecutor 和 scheduledThreadPoolExecutor
-
异步计算的结果:包括接口 Future 和实现 Future 接口的 FutureTask 类
Executor 框架的成员
本节将介绍 Executor 框架的主要成员:ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future 接口、Runnable 接口、Callable 接口和Executors。
(1)ThreadPoolExecutor
-
FixedThreadPool
固定线程数,为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。
使用无界队列 LinkedBlockingQueue 作为线程池的工作队列(队列的容量为 Integer.MAX_VALUE)。使用无界队列作为工作队列会对线程池带来如下影响。
1)当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize。
2)由于是无界队列。如果线程的任务执行没那么快,会导致内存OOM
-
SingleThreadExecutor
-
-
单个worker线程。适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景。
-
SingleThreadExecutor 的 corePoolSize 和 maximumPoolSize 被设置为 1。其他参数与FixedThreadPool 相同。SingleThreadExecutor 使用无界队列LinkedBlockingQueue 作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。SingleThreadExecutor 使用无界队列作为工作队列对线程池带来的影响与 FixedThreadPool 相同
-
-
CachedThreadPool
大小无界的线程池。适用于执行很多的短期异步任务的小程序。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>() ); }
-
CachedThreadPool 的 corePoolSize 被设置为 0,即 corePool 为空;maximumPoolSize被设置为Integer.MAX_VALUE,即 maximumPool 是无界的。这里把keepAliveTime 设置为 60L,意味着 CachedThreadPool 中的空闲线程等待新任务的最长时间为 60 秒,空闲线程超过 60 秒后将会被终止。
-
如果主线程提交任务的速度高于maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新线程。极端情况下, CachedThreadPool 会因为创建过多线程而耗尽 CPU 和内存资源。
(2)ScheduledThreadPoolExecutor
适用于需要多个后台线程执行周期任务。
DelayQueue 是一个无界队列,所以 ThreadPoolExecutor 的 maximumPoolSize 在Scheduled-ThreadPoolExecutor 中没有什么意义(设置 maximumPoolSize 的大小没有什么效果)
一个任务的执行过程:
-
线程 1 从 DelayQueue 中获取已到期的 ScheduledFutureTask(DelayQueue.take())。到期任务是指ScheduledFutureTask 的 time 大于等于当前时间。
-
线程 1 执行这个 ScheduledFutureTask。
-
线程 1 修改 ScheduledFutureTask 的 time 变量为下次将要被执行的时间。
-
线程 1 把这个修改 time 之后的 ScheduledFutureTask 放回 DelayQueue 中(Delay-Queue.add())。
DelayQueue.Add的执行流程:
-
SingleThreadPoolExecutor
适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景。
FutureTask