线程之间协作有哪些方式
当多个线程可以一起工作去解决某个问题时,如果某些部分必须在其他部分之前完成,那么就需要对线程进行协调。
共享变量和轮询方式
实现:定义一个共享变量(如 volatile
修饰的布尔标志)。线程通过检查共享变量的状态来决定是否继续执行。
public class Test {
private static volatile boolean flag = false;
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
public void run() {
System.out.println("flag" + flag);
while (!flag){
System.out.println( "waiting"); //轮询等待
}
System.out.println("Flag is now " + flag);
}
}).start();
Thread.sleep(1000);
flag = true; // 修改共享变量
}
}
使用wait() 与 notify()/notifyAll()
通过object类中的wait()、notify()和notifyAll来实现。
实现:
-
wait():当前线程释放锁并进入等待状态(
WAITING
)。需在同步块中调用(持有锁)。 -
notify():随机唤醒一个等待线程(
WAITING
→BLOCKED
)。 -
notifyAll():唤醒所有等待线程。
public class ProducerConsumer {
private final Queue<Integer> queue = new LinkedList<>();
private final int capacity = 10;
public void produce(int value) throws InterruptedException {
synchronized (queue) {
while (queue.size() == capacity) {
queue.wait(); // 队列满,等待
}
queue.add(value);
queue.notifyAll(); // 唤醒消费者
}
}
public int consume() throws InterruptedException {
synchronized (queue) {
while (queue.isEmpty()) {
queue.wait(); // 队列空,等待
}
int value = queue.poll();
queue.notifyAll(); // 唤醒生产者
return value;
}
}
public static void main(String[] args) throws InterruptedException {
ProducerConsumer pc = new ProducerConsumer();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
pc.produce(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
new Thread(() -> {
while (true){
try {
System.out.println(pc.consume());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
}
}
Condition 条件变量
通过 ReentrantLock
的 Condition
实现更灵活的线程协作。
实现:Condition类似于 wait()
/notify()
,但支持多个条件队列,使用它需要和ReentrantLock配合使用。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerWithCondition {
private final Queue<Integer> queue = new LinkedList<>();
private final int capacity = 10;
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public void produce(int value) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await(); // 等待队列不满
}
queue.add(value);
notEmpty.signalAll(); // 唤醒消费者
} finally {
lock.unlock();
}
}
public int consume() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); //等待队列不空
}
int value = queue.poll();
notFull.signalAll(); // 唤醒生产者
return value;
} finally {
lock.unlock();
}
}
}
CountDownLatch
通过CountDownLatch实现线程的等待与唤醒。
实现:初始化时指定计数值,线程调用await()等待计数器归零;其他线程调用countDown()减少计数器。
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int threadCount = 3;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
System.out.println("Thread finished");
latch.countDown(); // 计数器减1
}).start();
}
latch.await(); // 等待所有线程完成
System.out.println("All threads finished");
}
}
CyclicBarrier
是一个同步辅助工具,允许一组线程相互等待,直到所有线程到达屏障点后才能继续执行。可以重复使用。
实现:初始化时指定参与线程数;线程调用await()等待其他线程到达屏障点;所有线程到达后,屏障重置,可重复使用。
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("All threads reached the barrier");
});
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
System.out.println("Thread waiting at barrier");
try {
barrier.await(); // 等待其他线程
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
Semaphore信号量
通过Semaphore控制资源的并发访问。
实现:初始化时指定许可数,线程调用acquire()获取许可,调用release()释放许可。
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
public static void main(String[] args) {
int permits = 2;
Semaphore semaphore = new Semaphore(permits);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可
System.out.println("Thread acquired permit");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放许可
System.out.println("Thread released permit");
}
}).start();
}
}
}
阻塞队列
阻塞队列(Blocking Queue) 是 Java 多线程中实现线程协作的核心工具之一。它通过内置的阻塞机制,让生产者线程和消费者线程自动协调工作,无需开发者手动管理 wait()
、notify()
或锁的细节。
阻塞队列是一种特殊的队列,提供以下功能:
-
队列空时的阻塞:消费者线程尝试从空队列取数据时,会被阻塞直到队列非空。
-
队列满时的阻塞:生产者线程尝试向满队列放数据时,会被阻塞直到队列有空位。
-
支持超时机制:部分阻塞队列允许在指定时间内尝试操作,超时后返回失败
Java 并发包 java.util.concurrent
提供了多种阻塞队列实现:
实现类 | 特点 |
---|---|
ArrayBlockingQueue | 基于数组的有界队列,固定容量,公平性可选。 |
LinkedBlockingQueue | 基于链表的队列,默认无界(可指定容量),吞吐量高。 |
PriorityBlockingQueue | 支持优先级排序的无界队列。 |
SynchronousQueue | 不存储元素的队列,生产者插入操作必须等待消费者移除。 |
DelayQueue | 元素按延迟时间排序,只有到期后才能被取出。 |
示例:生产者——消费者模型
通过阻塞队列,生产者和消费者线程无需直接交互,只需要操作队列即可自动协调;
-
生产者线程:调用
put()
方法放入数据,若队列满则阻塞。 -
消费者线程:调用
take()
方法取出数据,若队列空则阻塞。
协作流程
-
生产者放数据:
-
若队列未满,直接插入数据并唤醒消费者线程。
-
若队列已满,生产者线程阻塞,等待消费者取走数据后唤醒。
-
-
消费者取数据:
-
若队列非空,直接取出数据并唤醒生产者线程。
-
若队列为空,消费者线程阻塞,等待生产者放入数据后唤醒。
-
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerExample {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); // 容量为10的阻塞队列
// 生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 100; i++) {
queue.put(i); // 队列满时自动阻塞
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 消费者线程
Thread consumer = new Thread(() -> {
try {
while (true) {
Integer value = queue.take(); // 队列空时自动阻塞
System.out.println("Consumed: " + value);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
}
}
总结
协作方式 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
共享变量与轮询 | 简单状态检查 | 简单易用 | 浪费 CPU,无法精确唤醒 |
wait/notify | 生产者-消费者模型 | 精确控制线程唤醒 | 需手动管理锁和条件 |
Condition | 复杂条件等待 | 支持多条件队列,灵活性高 | 代码复杂度高 |
CountDownLatch | 一次性等待场景(如任务完成) | 简单易用 | 不可重置 |
CyclicBarrier | 分阶段任务同步 | 可重复使用 | 需预先知道线程数 |
Semaphore | 控制资源并发访问 | 灵活控制并发数 | 需手动管理许可 |
阻塞队列 | 生产者-消费者模型、 线程池任务队列:如 ThreadPoolExecutor 使用阻塞队列管理待执行任务。 流量控制:通过有界队列限制系统资源使用,防止内存溢出。 延迟任务调度:如 DelayQueue 实现定时任务执行。 | 简化代码,避免竞态条件,自动阻塞与唤醒,支持多种策略 | 无界队列可能导致内存溢出。 |