Callable 与 FutureTask
Callable 接口和 Runnable 接口是并列关系,都是用来给线程提供任务的,只不过 Callable 接口的任务可以带有返回值。
但是 Callable 接口创建的任务不能直接传入 Thread 里面,这也是为了 解耦合,我们可以使用 FutureTask 这个玩意来接收一下 Callable 接口定义的任务,然后再通过 FutureTask 传给 Thread里面。
public static void main(String[] args) {
Callable<Integer> c = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i < 5000; i++) {
sum++;
}
return sum;
}
};
FutureTask<Integer> futureTask = new FutureTask<>(c);
Thread t = new Thread(futureTask);
t.start();
}
ReentrantLock 与 原子类
这些已经在上一篇文章中提到,不了解的可以阅读此文:Java 多线程(八)—— 锁策略,synchronized 的优化,JVM 与编译器的锁优化,ReentrantLock,CAS
Semaphore 信号量
信号量,用来表示"可用资源的个数"。本质上就是一个计数器.
可以把信号量想象成是停车场的展示牌:当前有车位 100 个。表示有 100 个可用资源.
当有车开进去的时候, 就相当于申请⼀个可用资源, 可用车位就 -1 (这个称为信号量的 P 操作)
当有车开出来的时候,就相当于释放⼀个可用资源,可用车位就 +1 (这个称为信号量的 V 操作)
如果计数器的值已经为 0 了,还尝试申请资源,就会阻塞等待,直到有其他线程释放资源。
在操作系统中,每一次的 P 操作就会让信号量 - 1,每一次的 V 操作就会让信号量 + 1
P 操作表示向操作系统申请资源, V 操作表示释放资源
在 Java 中,我们可以实例化 Semophore ,使用 acquire 方法表示申请资源(P操作),release 方法表示释放资源(V操作)
代码演示:
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(2);
semaphore.acquire();
System.out.println("进行了一次P操作");
semaphore.acquire();
System.out.println("进行了一次P操作");
semaphore.release();
System.out.println("进行了一次V操作");
semaphore.release();
System.out.println("进行了一次V操作");
}
这里要注意:如果资源不够的话,那就不能进行资源分配,该申请资源的线程会阻塞等待(死等 waiting)状态,直到有资源分配为止。
所以我们可以利用 Semaphore 这一个特性来制作一个类似锁的功能,我们给 Semaphore 传入一个信号量。
private static int sum = 0;
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(1);
Thread t1 = new Thread(() -> {
for (int i = 0; i < 50000; i++) {
try {
semaphore.acquire();
sum++;
semaphore.release();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 50000; i++) {
try {
semaphore.acquire();
sum++;
semaphore.release();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("sum = " + sum);
}
CountDownLatch
我们在使用多线程经常把一个大的任务拆分成很多个子任务,当这些线程去执行这些任务的时候,我们如何判断这些子任务都完成了呢?整个任务都完成了呢?
这时候我们可以使用 CountDownLatch 来进行统计任务执行的次数。
构造方法:
count 参数表示任务总数
CountDownLatch 提供 countDown()
方法,在每次执行完一个子任务之后,我们就调用一次这个方法,让计数器减 1.
同时也提供了 await
方法:阻塞等待所有的子任务执行完毕,也就是计数器为 0,开始执行该线程的任务。
代码演示:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Demo4 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(10);
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
int id = i;
executorService.submit(() -> {
System.out.println("子任务开始执行:" + id);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(id + "子任务执行完毕");
countDownLatch.countDown();
});
}
countDownLatch.await();
System.out.println("等待所有任务执行完毕......");
executorService.shutdown();
}
}
线程安全的集合类
多线程环境使用 ArrayList
推荐自行加锁
在需要加锁的地方我们手动去加锁,自己打包好原子操作。
Collections.synchronizedList()
synchronizedList 是标准库提供的⼀个基于 synchronized 进行线程同步的 List.synchronizedList 的关键操作上都带有 synchronized
代码演示:Collections.synchronizedList(new ArrayList<Integer>(10));
CopyOnWriteArrayList
CopyOnWrite容器即写时复制的容器
当我们要去往一个容器进行写操作的时候,不是直接在原来的地方进行写操作,而是先将容器拷贝一份,在拷贝的哪个容器上进行写操作,等到写操作结束之后,我们再将原来容器的引用指向为拷贝的容器。
代码演示:CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>();
这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。
所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。
优点:在读多写少的场景下,性能很高,不需要加锁竞争。
缺点:1.占用内存较多。
2.新写的数据不能被第一时间读取到。
多线程环境使用队列
-
ArrayBlockingQueue
基于数组实现的阻塞队列 -
LinkedBlockingQueue
基于链表实现的阻塞队列 -
PriorityBlockingQueue
基于堆实现的带优先级的阻塞队列 -
TransferQueue
最多只包含⼀个元素的阻塞队列
多线程环境使用哈希表
HashMap 本身不是线程安全的
Hashtable
只是简单的把关键方法加上了 synchronized 关键字
这个实现不是很好,当多个线程进行对这个哈希表进行操作的时候,很容易就会发生锁冲突。
同时,size 属性也是通过 synchronized 来控制同步,一旦触发扩容,就由该线程完成整个扩容过程。这个过程会涉及到大量的元素拷贝,效率会非常低.
所以我们不会去使用这个玩意,我们会选择下面的 ConcurrentHashMap
ConcurrentHashMap
读操作没有加锁(但是使用了volatile保证从内存读取结果),只对写操作进行加锁。加锁的方式仍然是用synchronized,但是不是锁整个对象,而是 “锁桶” (用每个链表的头结点作为锁对象),大大降低了锁冲突的概率.
充分利用 CAS 特性,比如 size 属性通过 CAS 来更新。避免出现重量级锁的情况。
优化了扩容方式:化整为零
发现需要扩容的线程,只需要创建一个新的数组,同时只搬几个元素过去。
扩容期间,新老数组同时存在。
后续每个来操作 ConcurrentHashMap 的线程,都会参与搬家的过程。每个操作负责搬运一小部分元素。 搬完最后⼀个元素再把老数组删掉。
这个期间,插入只往新数组加。
这个期间,查找需要同时查新数组和老数组。