目录
- 1.✨Java 线程间有哪些通信方式?
- 1.1.volatile 和 synchronized 关键字
- 1.2.等待/通知机制
- 1.2.1.概述
- 1.2.2.经典范式
- 1.3.管道输入/输出流
- 1.4.信号量
- 2.Thread.join() 有什么作用?它的使用场景是什么?
- 3.Java 中需要主线程等待子线程执行完毕后再执行,有哪些方法可以实现?
参考:《Java 并发编程的艺术》
1.✨Java 线程间有哪些通信方式?
1.1.volatile 和 synchronized 关键字
(1)Java 支持多个线程同时访问一个对象或者对象的成员变量,由于每个线程可以拥有这个变量的拷贝(虽然对象以及成员变量分配的内存是在共享内存中的,但是每个执行的线程还是可以拥有一份拷贝,这样做的目的是加速程序的执行,这是现代多核处理器的一个显著特性),所以程序在执行过程中,一个线程看到的变量并不一定是最新的。
(2)关键字 volatile
可以用来修饰字段(成员变量),就是告知程序任何对该变量的访问均需要从共享内存中获取,而对它的改变必须同步刷新回共享内存,它能保证所有线程对变量访问的可见性。
举个例子,定义一个表示程序是否运行的成员变量 boolean on = true
,那么另一个线程可能对它执行关闭动作(on = false),这里涉及多个线程对变量的访问,因此需要将其定义成为 volatile boolean on=true
,这样其他线程对它进行改变时,可以让所有线程感知到变化,因为所有对 on 变量的访问和修改都需要以共享内存为准。但是,过多地使用 volatile 是不必要的,因为它会降低程序执行的效率。
(3)关键字 synchronized
可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性。
有关 synchronized 与 volatile 的具体知识可以参考 Java 并发编程面试题——synchronized 与 volatile 这篇文章。
1.2.等待/通知机制
1.2.1.概述
(1)等待/通知的相关方法是任意 Java 对象都具备的,因为这些方法被定义在所有对象的超类 java.lang.Object
上:
(2)等待/通知机制是指:一个线程 A 调用了对象 O 的 wait()
方法进入等待状态,而另一个线程 B 调用了对象 O 的 notify()
或者 notifyAll()
方法,线程 A 收到通知后从对象 O 的 wait()
方法返回,进而执行后续操作。上述两个线程通过对象 O 来完成交互,而对象上的 wait()
和notify/notifyAll()
的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。
(3)下面的例子中创建了两个线程——WaitThread
和 NotifyThread
,前者检查 flag 值是否为 false,如果符合要求,进行后续操作,否则在 lock 上等待,后者在睡眠了一段时间后对 lock 进行通知。
class WaitNotify {
static boolean flag = true;
static Object lock = new Object();
public static void main(String[] args) throws Exception {
Thread waitThread = new Thread(new Wait(), "WaitThread");
waitThread.start();
TimeUnit.SECONDS.sleep(1);
Thread notifyThread = new Thread(new Notify(), "NotifyThread");
notifyThread.start();
}
static class Wait implements Runnable {
public void run() {
//加锁,拥有 lock 的 Monitor
synchronized (lock) {
//当条件不满足时,继续 wait,同时释放了 lock 的锁
while (flag) {
try {
System.out.println(Thread.currentThread() + " flag is true. wait @ " +
new SimpleDateFormat("HH:mm:ss").format(new Date()));
lock.wait();
} catch (InterruptedException e) {
}
}
// 条件满足时,完成工作
System.out.println(Thread.currentThread() + " flag is false. running @ " +
new SimpleDateFormat("HH:mm:ss").format(new Date()));
}
}
}
static class Notify implements Runnable {
public void run() {
//加锁,拥有 lock 的 Monitor
synchronized (lock) {
/*
获取 lock 的锁,然后进行通知,通知时不会释放 lock 的锁,直到
当前线程释放了 lock 后,WaitThread 才能从 wait 方法中返回
*/
System.out.println(Thread.currentThread() + " hold lock. notify @ " +
new SimpleDateFormat("HH:mm:ss").format(new Date()));
lock.notifyAll();
flag = false;
SleepUtils.second(5);
}
// 再次加锁
synchronized (lock) {
System.out.println(Thread.currentThread() + " hold lock again. sleep @ " +
new SimpleDateFormat("HH:mm:ss").format(new Date()));
SleepUtils.second(5);
}
}
}
}
class SleepUtils {
public static final void second(long seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
}
}
}
输出如下(输出内容可能不同,主要区别在时间上):
Thread[WaitThread,5,main] flag is true. wait @ 16:03:28
Thread[NotifyThread,5,main] hold lock. notify @ 16:03:29
Thread[NotifyThread,5,main] hold lock again. sleep @ 16:03:34
Thread[WaitThread,5,main] flag is false. running @ 16:03:39
上述第 3 行和第 4 行输出的顺序可能会互换,而上述例子主要说明了调用 wait()、notify() 以及 notifyAll() 时需要注意的细节,如下:
- 使用 wait()、notify() 和 notifyAll() 时需要先对调用对象加锁。
- 调用 wait() 方法后,线程状态由 RUNNING 变为 WAITING,并将当前线程放置到对象的等待队列。
- notify() 或 notifyAll() 方法调用后,等待线程依旧不会从 wait() 返回,需要调用 notify() 或 notifAll() 的线程释放锁之后,等待线程才有机会从 wait() 返回。
- notify() 方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而 notifyAll() 方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由 WAITING 变为 BLOCKED。
- 从 wait() 方法返回的前提是获得了调用对象的锁。
(4)从上述细节中可以看到,等待/通知机制依托于同步机制,其目的就是确保等待线程从 wait() 方法返回时能够感知到通知线程对变量做出的修改。下图描述了上述示例的过程:
在上图中,WaitThread 首先获取了对象的锁,然后调用对象的 wait() 方法,从而放弃了锁并进入了对象的等待队列 WaitQueue 中,进入等待状态。由于 WaitThread 释放了对象的锁,NotifyThread 随后获取了对象的锁,并调用对象的 notify() 方法,将 WaitThread 从 WaitQueue 移到 SynchronizedQueue 中,此时 WaitThread 的状态变为阻塞状态。NotifyThread 释放了锁之后,WaitThread 再次获取到锁并从 wait() 方法返回继续执行。
有关抽象队列同步器的相关知识可以参考 Java 并发编程面试题——Lock 与 AbstractQueuedSynchronizer (AQS) 这篇文章。
1.2.2.经典范式
(1)从上节中的示例中可以提炼出等待/通知的经典范式,该范式分为两部分,分别针对等待方(消费者)和通知方(生产者)。
(2)等待方遵循如下原则。
- 获取对象的锁。
- 如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件。
- 条件满足则执行对应的逻辑。
对应的伪代码如下:
synchronized (对象) {
while (条件不满足) {
对象.wait();
}
对应的处理逻辑
}
(3)通知方遵循如下原则:
- 获得对象的锁。
- 改变条件。
- 通知所有等待在对象上的线程。
对应的伪代码如下:
synchronized(对象) {
改变条件
对象.notifyAll();
}
1.3.管道输入/输出流
(1)管道输入/输出流和普通的文件输入/输出流或者网络输入/输出流不同之处在于:它主要用于线程之间的数据传输,而传输的媒介为内存。管道输入/输出流主要包括了如下 4 种具体实现:PipedOutputStream
、PipedInputStream
、PipedReader
和 PipedWriter
,前两种面向字节,而后两种面向字符。
(2)下面的例子中创建了 printThread
,它用来接受 main 线程的输入,任何 main 线程的输入均通过 PipedWriter
写入,而 printThread
在另一端通过 PipedReader
将内容读出并打印。
import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.*;
import java.util.concurrent.*;
public class Piped {
public static void main(String[] args) throws Exception {
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();
//将输出流和输入流进行连接,否则在使用时会抛出 IOException
out.connect(in);
Thread printThread = new Thread(new Print(in), "PrintThread");
printThread.start();
int receive = 0;
try {
while ((receive = System.in.read()) != -1) {
out.write(receive);
}
} finally {
out.close();
}
}
static class Print implements Runnable {
private PipedReader in;
public Print(PipedReader in) {
this.in = in;
}
public void run() {
int receive = 0;
try {
while ((receive = in.read()) != -1) {
System.out.print((char) receive);
}
} catch (IOException ex) {
}
}
}
}
运行该示例,输入一组字符串,可以看到被 printThread
进行了原样输出。
Repeat my words.
Repeat my words.
对于 Piped 类型的流,必须先要进行绑定,也就是调用 connect()
方法,如果没有将输入/输出流绑定起来,对于该流的访问将会抛出异常。
1.4.信号量
(1)Semaphore
(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。Semaphore 可以用于做流量控制,特别是公用资源有限的应用场景。
(2)举个例子,6 辆汽车抢占 3 个车位:
// 6 辆汽车,抢占 3 个停车位
class SemaphoreDemo {
//创建许可数量
private static final int N = 3;
public static void main(String[] args) {
//创建 Semaphore 对象,并设置许可数量
Semaphore semaphore = new Semaphore(N);
//模拟 6 辆汽车
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
//抢占
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 抢到了车位(即抢占资源)!");
//设置随机停车时间
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
System.out.println(Thread.currentThread().getName() + " ------离开了车位(即释放资源)!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}
某次运行的输出结果如下:
1 抢到了车位(即抢占资源)!
4 抢到了车位(即抢占资源)!
2 抢到了车位(即抢占资源)!
1 ------离开了车位(即释放资源)!
2 ------离开了车位(即释放资源)!
6 抢到了车位(即抢占资源)!
5 抢到了车位(即抢占资源)!
4 ------离开了车位(即释放资源)!
3 抢到了车位(即抢占资源)!
5 ------离开了车位(即释放资源)!
6 ------离开了车位(即释放资源)!
3 ------离开了车位(即释放资源)!
Semaphore 是 JUC 包下的一个常用并发工具类,有关其具体知识可以参考 Java 并发编程面试题——JUC 并发工具类 这篇文章。
2.Thread.join() 有什么作用?它的使用场景是什么?
(1)Thread.join() 是一个线程同步的方法,它的作用是让当前线程等待被调用 join() 方法的线程执行完毕。线程 Thread 除了提供 join() 方法之外,还提供了 join(long millis)
和 join(long millis, int nanos)
两个具备超时特性的方法。这两个超时方法表示,如果线程thread在给定的超时时间里没有终止,那么将会从该超时方法中返回。例子如下:
class JoinDemo {
static class Task implements Runnable {
@Override
public void run() {
try {
System.out.println("我是线程 t1,我先 sleep 2 秒");
Thread.sleep(2000);
System.out.println("我是线程 t1,我 sleep 完了 2 秒");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new Task(), "t1");
t1.start();
t1.join();
System.out.println("如果不加 t1.join() 方法,我会先被打印出来");
}
}
输出结果如下:
我是线程 t1,我先 sleep 2 秒
我是线程 t1,我 sleep 完了 2 秒
如果不加 t1.join() 方法,我会先被打印出来
(2)下面的代码是 JDK 中 Thread.join() 方法的源码(进行了部分调整):
// 加锁当前线程对象
public final synchronized void join() throws InterruptedException {
// 条件不满足,继续等待
while (isAlive()) {
wait(0);
}
// 条件符合,方法返回
}
当线程终止时,会调用线程自身的 notifyAll()
方法,会通知所有等待在该线程对象上的线程。可以看到 join()
方法的逻辑结构与上面描述的等待/通知经典范式一致,即加锁、循环和处理逻辑这 3 个步骤。
(3)Thread.join() 方法的使用场景包括:
- 等待其他线程的结果:在多线程中,有时候需要等待其他线程执行完毕后再进行后续操作,这时可以使用 join() 方法等待其他线程的完成。
- 线程顺序控制:通过 join() 方法可以控制线程的执行顺序,即确保某个线程在其他线程执行完毕后再开始执行。
需要注意的是,如果线程执行过程中发生异常或被中断,join() 方法也会立即返回。
3.Java 中需要主线程等待子线程执行完毕后再执行,有哪些方法可以实现?
在 Java 中,有几种方法可以实现主线程等待子线程执行完毕后再执行的需求:
- 使用
Thread.join()
方法:在主线程中,可以通过调用子线程对象的join()
方法等待子线程执行完毕。join()
方法会阻塞主线程,直到子线程执行完毕才会继续执行主线程的后续代码。示例代码如下:
Thread myThread = new Thread(() -> {
// 子线程的执行逻辑
});
// 启动子线程
myThread.start();
try {
// 等待子线程执行完毕
myThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 子线程执行完毕后,继续执行主线程的后续代码
- 使用
CountDownLatch
类:CountDownLatch
可以用于实现线程之间的计数等待。在主线程中,可以创建一个CountDownLatch
对象,并指定等待的子线程数。在子线程中,在执行完毕后调用countDown()
方法进行计数减一。主线程可以通过调用await()
方法等待计数变为 0。示例代码如下:
CountDownLatch latch = new CountDownLatch(1);
Thread myThread = new Thread(() -> {
// 子线程的执行逻辑
// 子线程执行完毕后,计数减一
latch.countDown();
});
// 启动子线程
myThread.start();
try {
// 等待子线程执行完毕
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 子线程执行完毕后,继续执行主线程的后续代码
- 使用
ExecutorService
线程池:可以通过ExecutorService
来管理线程池,并使用submit()
方法提交子线程的任务。然后,可以通过调用shutdown()
方法关闭线程池,并通过调用awaitTermination()
方法等待所有任务执行完毕。示例代码如下:
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.submit(() -> {
// 子线程的执行逻辑
});
executorService.shutdown();
try {
// 等待所有任务执行完毕
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 子线程执行完毕后,继续执行主线程的后续代码