目录
(一)Callable接口
(1)Callable与Runnable的区别
(2)Future接口
2.1Futrue接口中的方法
2.2FutureTask类
(3)Callable接口的使用
3.1借助FutureTask运行
3.2借助线程池运行
(4)小结
(二)ReentrantLock(重入锁)
(1)可重入性n
(2)手动加锁释放锁
(3)提供了公平锁
(4)Condition类
(5)与synchronized区别
(三)信号量(Semaphore)
(1)理解信号量
(2)简单示例
(四)CountDownLatch
(1)用途
(2)两个主要方法
(3)代码示例
(五)线程安全集合类
(1)线程安全集合概览
(2)ConCurrentHashMap
2.1HashMap,HashTable,ConCurrentHashMap
HashTable与ConCurrentHashMap区别
ConcurrentHashMap对扩容操作的优化
前言:jdk的concurrent包中,包含了很对在多线程开发中所需要使用的类,这篇文章将详细解析在这个包中的类,包括Callable接口,ReentrantLock,信号量(Semaphore),CountDownLatch, ConcurrentHashMap。
(一)Callable接口
(1)Callable与Runnable的区别
1:Callable接口规定的是call方法,Runnable接口规定的是run方法
2:Callable接口有返回值,Runnable接口没有返回值
3:call方法能够抛出异常,而run方法不能
(2)Future接口
2.1Futrue接口中的方法
Future接口通常和Callable搭配使用,因为Callable接口有返回值,所以需要Future来对Callable任务产生的结果进行封装
想对Callable进行操作就可以通过Future提供的方法来实现:
1:get():用于获取异步任务的结果。如果异步任务没有完成,调用此方法的线程会被阻塞,直到任务完成并返回结果
2:get(long timeout, TimeUnit unit):也是获取异步任务的结果,但是允许指定一个超时时间,如果在指定时间内任务没有完成,该方法会抛出一个TimeoutException异常
3:isDone():用于判断异步任务是否已经完成,如果任务完成,无论是正常结束还是异常结束,方法都会返回true,否则返回false
4:cancel(boolean mayInterruptIfRunning):
如果传入的参数为true:如果任务正在执行,则可以尝试中断执行该任务的线程,如果任务已经完成或者尚未开始,则此方法会立即返回false,并无法再去取消任务。如果任务正在阻塞等待,此方法会使任务立即停止等待并返回
如果传入的参数为false:仅在任务还没开始执行的时候可以取消任务,对于正在执行的任务不会尝试中断,并返回false表示无法取消正在运行的任务。
5:isCanceled():
用来判断这个任务是否被取消,被取消返回true,没有被取消返回false
2.2FutureTask类
FutureTask类实现了RunnableFuture接口,而RunnableFuture接口又继承了Runnable接口和Future接口,所以FutureTask可以被提交给线程池(实现Runnable接口),还能够提供异步计算结果(实现了Future接口)
(3)Callable接口的使用
3.1借助FutureTask运行
由于Callable接口定义的任务通常都会有返回值,所以我们一般就通过搭配FutureTask来使用
public class Demo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<Integer> clallable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i < 101; i++) {
sum += i;
}
return sum;
}
};
FutureTask<Integer> futureTask = new FutureTask<>(clallable);//将Callable对象交给FutureTask包装
System.out.println(futureTask.get());//获取任务计算结果
3.2借助线程池运行
public class Demo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(5);
Callable<Integer> clallable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i < 101; i++) {
sum += i;
}
return sum;
}
};
Future future = service.submit(clallable);//将callable提交给线程池并用future接收
System.out.println(future.get());
(4)小结
1:Callable是一个接口,相当于给线程任务封装了一个返回值,相比于需要计算结果的多线程场景,使用有返回值的Callable接口,比没返回值的Runnable接口更加方便。
2:Callable通常需要搭配FutureTask来使用,由于Callable接口有返回值,但这个运行Callable任务的线程不知道什么时候结束,这时就需要FutureTask来保存Callable的返回结果,负责这个等待结果出来的工作。
(二)ReentrantLock(重入锁)
(1)可重入性n
顾名思义ReentrantLock与synchronized相同都是一个可重入锁,即一个线程可多次获取同一个锁
(2)手动加锁释放锁
与sychronized自动加锁释放锁不同的是,reetrantLock是手动加锁释放锁,其中reentrantLock提供了三个方法来实现加锁解锁操作
1:lock ():加锁,获取不到锁就死等
2:trylock(long timeout, TimeUnit unit):加锁,如果获取不到锁,超出设定的时间后放弃加锁
3:unlock():解锁,一般在finally代码块中使用
若是sychronized进行加锁操作代码是
public class Demo2 {
private int sum = 1;
public void add(int count){
synchronized (this) {
sum += count;
}
}
换成ReentrantLock进行加锁代码
public class Demo2 {
ReentrantLock reentrantLock = new ReentrantLock();
private int sum = 1;
public void add(int count){
reentrantLock.lock();
try{
sum += count;
}finally {
reentrantLock.unlock();
}
}
}
(3)提供了公平锁
ReentrantLock实现了公平锁和非公平锁两个版本,而sychronized只有非公平锁
如图若在创建ReentrantLock对象时传入true,则以公平锁方式创建,否则以非公平锁方式创建
(4)Condition类
sychronized可以通过和wait(),notify(),notifyall(),来实现线程间的通信,那么ReentrantLock也需要实现线程间通信就需要借助Condition类来实现。
public class Demo3 {
//想要让线程1执行完之后,线程2才能够执行
private static boolean flag = false;//引入标志位目的是如果t1先获取锁后t2线程也能够执行,
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Thread t1 = new Thread(()->{
try {
lock.lock();//t1获取锁
Thread.sleep(2000);
System.out.println("线程1执行");
flag = true;//标志位设置为true
condition.signal();//唤醒线程2
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
lock.unlock();//释放锁
}
});
Thread t2 = new Thread(()->{
try {
lock.lock();//t2获取锁
while (!flag){//如果标志位为false,则等待
condition.await();
}
Thread.sleep(2000);
System.out.println("线程2执行");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();//释放锁
}
});
t1.start();
t2.start();
}
}
(5)与synchronized区别
1:synchronized是一个关键字,是JVM内部实现的(C++),ReentrantLock是标准库中的一个类,是由java实现的。
2:synchronized使用时是自动加锁释放锁,ReentrantLock是手动加锁释放锁,使用起来更加灵活也更加麻烦
3:synchronized在申请锁失败时,会阻塞死等,ReentrantLock可通过trylock方法等待一段时间就放弃等待
4:synchronized是非公平锁,ReentrantLock实现了公平锁,只要通过构造方法传入true就可开启公平锁模式
5:更强大的唤醒机制,synchronized通过wait和notify的唤醒智能唤醒随机的一个线程,而ReentrantLock搭配Condition类可以实现精准控制线程唤醒。
使用场景
如果锁冲突不激烈,可考虑使用synchronized,自动加锁释放锁更方便,锁冲突激烈时,可考虑使用ReentrantLock的tryLock,避免出现加锁死等提升效率,如果需要使用到公平锁,就使用eentrantLock
(三)信号量(Semaphore)
(1)理解信号量
信号量可以理解为可用资源的数量,这个资源,线程可以通过acquire()方法来获取,获取完信号量减一,当信号量为0时,线程无法再获取到资源,会阻塞等待,直到其他线程用release()方法释放资源,被阻塞的线程才能再尝试获取资源。
例子:比如有一个停车场只有10个空位,那么这个停车场的可用资源就是10,当一个车子进去后(acquir操作),可用资源就减一,当10个空位都被占满,外面的车子想要进来就需要阻塞等待,除非停车场里的车子出来(release操作),被阻塞在外面的车子才能进去。
(2)简单示例
如下代码我只是简单创建4个线程,但信号量Semaphore只有三个资源,且每个线程获取完资源后不释放,那么当4个线程启动的时候,总会有一个线程会因抢不到资源而阻塞等待。
package juc;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class Demo4 {
Semaphore semaphore = new Semaphore(3);//定义了三个资源
Thread t1 = new Thread(() -> {
try {
semaphore.acquire();//获取一个资源
System.out.println("t1获取资源");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
//semaphore.release();//释放一个资源
}
});
Thread t2 = new Thread(() -> {
try {
semaphore.acquire();//获取一个资源
System.out.println("t2获取资源");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
//semaphore.release();//释放一个资源
}
});
Thread t3 = new Thread(() -> {
try {
semaphore.acquire();//获取一个资源
System.out.println("t3获取资源");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
//semaphore.release();//释放一个资源
}
});
Thread t4 = new Thread(() -> {
try {
semaphore.acquire();//获取一个资源
System.out.println("t4获取资源");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
//semaphore.release();//释放一个资源
}
});
public static void main(String[] args) {
Demo4 demo4 = new Demo4();
demo4.t1.start();
demo4.t2.start();
demo4.t3.start();
demo4.t4.start();
}
}
(四)CountDownLatch
(1)用途
这个东西可以用在多线程中,多个线程完成一个一系列任务的时候,用来衡量任务的进度是否完成,比如发射火箭大概步骤是:火箭组装与测试,有效载荷安装与检查,燃料加注,发射场地准备,点火与发射,其中只有当我们把前面几步都完成之后,我们才能够进行最后的点火与发射操作,这时我们就可以借助CountDownLatch来判断前面的几个任务是否全部完成。
(2)两个主要方法
countDown():countDown
方法主要用于减少CountDownLatch
计数器的值。在一个多线程的场景中,它表示一个任务或者一个操作的完成。当计数器的值归零时,就意味着所有被CountDownLatch
等待的任务都已经完成。
await():await
方法用于使当前线程等待,直到CountDownLatch
的计数器达到零。如果计数器的值大于零,那么调用await
方法的线程将被阻塞,暂停执行,直到计数器归零或者等待被中断(如果是可中断的await
方法)
(3)代码示例
package juc;
import java.util.Date;
import java.text.SimpleDateFormat;
import java.util.concurrent.CountDownLatch;
public class Demo5 {
static class DateUtils {//用来进行日期格式化的工具类
public static String formatDate(Date date){
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YY-MM-dd HH:mm:ss");
return simpleDateFormat.format(date);
}
}
private CountDownLatch countDownLatch = new CountDownLatch(4);//4个子线程都执行完,主线程才能执行
//想要模拟一下火箭发射,四个子线程模拟火箭发射前准备,主线程模拟火箭发射
Thread t1 = new Thread(() -> {
try {
Thread.sleep(1000);
System.out.println("火箭组装与测试完成"+ DateUtils.formatDate(new Date(System.currentTimeMillis())));
countDownLatch.countDown();//报告一个子线程完成
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
Thread t2 = new Thread(() -> {
try {
Thread.sleep(4000);
System.out.println("有效载荷安装与检查完成"+ DateUtils.formatDate(new Date(System.currentTimeMillis())));
countDownLatch.countDown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
Thread t3 = new Thread(() -> {
try {
Thread.sleep(2000);
System.out.println("燃料加注完成"+ DateUtils.formatDate(new Date(System.currentTimeMillis())));
countDownLatch.countDown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
Thread t4 = new Thread(() -> {
try {
Thread.sleep(3000);
System.out.println("发射场地准备完成"+ DateUtils.formatDate(new Date(System.currentTimeMillis())));
countDownLatch.countDown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
public static void main(String[] args) {
Demo5 demo5 = new Demo5();
demo5.t1.start();
demo5.t2.start();
demo5.t3.start();
demo5.t4.start();
try {
demo5.countDownLatch.await();//等待所有子线程完成
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("发射成功"+ DateUtils.formatDate(new Date(System.currentTimeMillis())));
}
}
(五)线程安全集合类
(1)线程安全集合概览
在java.util.concurrent包下,Java给我们提供了很多线程安全的集合类,之前我们使用的集合类如,ArrayList ,HashMap,ArrayDequeque等,都是线程不安全的集合类,也就是说多个线程操作会发生线程安全问题这里我列举了这个包中的线程安全类
接口 | 线程不安全版本 | 线程安全版本 |
---|---|---|
List | ArrayList | CopyOnWriteArrayList |
Map | HashMap | ConcurrentHashMap |
Set | HashSet / TreeSet | CopyOnWriteArraySet |
Queue | ArrayDeque / LinkedList | ArrayBlockingQueue / LinkedBlockingQueue |
Deque | ArrayDeque / LinkedList | LinkedBlockingDeque |
Collections线程安全集合转换器
java给我们提供了一个能够将不安全的集合转变为安全集合的转换器,就是Collections类
如下代码中,我们将不安全的HashMap对象通过Collections.synchronizedMap()方法进行处理之后,就能够返回给我们一个线程安全的集合
Map unsafeMap = new HashMap();
Map threadSafeMap = Collections.synchronizedMap(unsafeMap);
工作原理
可以看到在synchronizedMap<K,V>类中,维护了两个对象,一个Map,一个mutex
并且可以看到这个类有两个构造方法,当我们在调用这个方法时需要传入一个Map,mutex参数可传可不传,如果传入了mutex参数那么将对象的排斥锁赋值为传入的对象,如果没有传入则将对象的排斥锁赋值为this
通过这个方法创建出来的Map再次操作方法的时候就能够对所有方法加锁如下图
除了synchronizedMap(),Collections中还有synchronizedList()....等方法,来将对应的集合处理,使用方法与此类似
虽然这个方法能够将集合从不安全改为安全,但由于处理方法太过简单粗暴,所以这种线程安全的集合效率并不高,并不推荐使用。
(2)ConCurrentHashMap
2.1HashMap,HashTable,ConCurrentHashMap
我们都知道HashMap不是线程安全的,所以在处理并发的时候会出现问题。
而HashTable虽然是线程安全的,但是是通过整个来加锁的方式,当一个线程在写操作的时候,另外的线程则不能进行读写。
而ConcurrentHashMap则可以支持并发的读写。跟1.7版本相比,1.8版本又有了很大的变化,已经抛弃了Segment的概念,虽然源码里面还保留了,也只是为了兼容性的考虑。
HashTable与ConCurrentHashMap区别
HashTable与ConCurrentHashMap最主要的区别在于锁粒度的粗细。
如下图为HashTable的加锁模式,只要是操作Map数据,HashTable就会对整个数据全部加锁,导致效率会很低。
如下图是ConcurrentHashMap的加锁模式
在上面的HashTable保证线程安全的方式,主要就是给关键的方法加上synchronized,直接加到方法上,此时只要两个线程,同时操作一个HashTable时就会发生冲突,但实际上,对哈希表来说,锁并不一定要加的这么粗,有些情况是并不会涉及到线程安全问题的。
相比之下,对于Map的线程安全问题,只有在两个线程,同时操作一个链表的时候才会发生,如果操作的是不同的链表,一般不会发生线程安全问题。
ConcurrentHashMap做的最核心的改进,就是将全局的大锁,改进为每个链表独立的小锁,将锁粒度变细,大大降低锁冲突的概率,提升效率。
同时ConcurrentHashMap充分利用CAS的特性,把一些不必要的加锁环节就给省略加锁了。
下图中是ConCurrentHashMap的putVal方法的一段逻辑,这段的逻辑就是,就是如果这个位置没有元素的话,则通过cas的方式尝试添加,注意这个时候是没有加锁的,就省略了加锁的环节,提升效率。
ConCurrentHashMap对扩容操作的优化
在传统的HashMap扩容中,扩容是一个复杂且非常影响性能的工作,当HashMap中的元素达到一定的阈值,就会触发扩容,扩容过程会创建新的数组,然后将旧的数组中的元素重新计算哈希值并放入新数组中,这个过程在多线程环境下就很容易出现如死循环等待等问题
而ConcurrentHashMap
在扩容时允许其他线程协助进行数据迁移。当一个线程发现正在进行扩容操作时,它可以帮助将旧桶中的元素迁移到新桶中。这是通过transfer
方法来实现的,每个线程会负责一部分桶的迁移工作。
假设ConcurrentHashMap
有 16 个桶,线程 A 开始进行扩容操作,线程 B 在执行其他操作时发现扩容正在进行。线程 B 就可以参与到扩容过程中,比如帮助迁移其中的 4 个桶的元素,这样就加快了扩容的整体速度。