8. 常用的辅助类(必会)
1)CountDownLatch
CountDownLatch: 减法计数器
CountDownLatch是一个同步辅助类,在多线程环境中用于控制线程的执行顺序。它可以让一个或多个线程等待其他线程完成一组操作后再继续执行。
CountDownLatch通过一个计数器来实现,计数器的初始值可以设为任意值,每个线程完成自己的操作后,可以调用CountDownLatch的countDown()方法将计数器减1。当计数器的值变为0时,所有等待的线程都会被唤醒。
CountDownLatch通常用于以下场景:
主线程等待多个子线程都完成一定操作后再继续执行
。一个线程等待其他多个线程都完成某个初始化操作后再进行下一步操作
。
CountDownLatch的用法如下:
创建CountDownLatch对象,设置计数器的初始值
。在需要等待的线程中,调用await()方法进入等待状态
。在其他线程完成操作后,调用countDown()方法将计数器减1
。等待的线程在计数器变为0后会被唤醒,继续执行后续操作
。
以下是一个使用CountDownLatch的示例代码:
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
Thread thread = new Thread(new Worker(latch));
thread.start();
}
// 等待所有线程完成操作
latch.await();
System.out.println("All threads have finished their work.");
}
static class Worker implements Runnable {
private CountDownLatch latch;
public Worker(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
// 模拟线程的工作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread finished its work.");
// 完成操作后调用countDown()
latch.countDown();
}
}
}
以上示例创建了5个线程,每个线程执行模拟的工作操作。在主线程中,创建了一个CountDownLatch对象,并设置计数器的初始值为5。然后,启动5个子线程,并将CountDownLatch对象传递给每个线程。
每个线程在完成工作后,调用countDown()方法将计数器减1。主线程通过调用await()方法进入等待状态,直到计数器的值变为0。最后,主线程恢复执行,并打印出所有线程都完成工作的信息。
使用CountDownLatch可以有效地控制多线程的执行顺序,使线程之间的协作更加灵活和可控。
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 总数是6
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "==> Go Out");
countDownLatch.countDown(); // 每个线程都数量 -1
},String.valueOf(i)).start();
}
countDownLatch.await(); // 等待计数器归零 然后向下执行
System.out.println("close door");
}
}
主要方法:
- countDown 减一操作;
- await 等待计数器归零
await 等待计数器归零,就唤醒,再继续向下运行
2)CyclickBarrier
加法计数器
CyclicBarrier是Java中的一个同步辅助类,它允许一组线程在达到一个屏障点之前相互等待。当所有线程都到达屏障点时,它们会被释放并继续执行。
CyclicBarrier被认为是可重用的,因为一旦所有线程都到达屏障点并被释放,它就会被重置,可以再次使用。这使得CyclicBarrier非常适用于一组线程需要等待彼此完成某个操作后再继续执行的情况。
使用CyclicBarrier需要指定一个屏障点,当线程到达这个点时,它们会被阻塞直到所有线程都到达。一旦所有线程都到达,CyclicBarrier的内部计数器会重置,并且所有线程都会被释放。
CyclicBarrier有两种使用方式:
指定一个Runnable,在所有线程都到达屏障点时,会执行这个Runnable
。调用await()方法等待其他线程,一旦所有线程都调用了await()方法,就会继续执行
。
以下是一个示例代码,演示了CyclicBarrier的使用:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
final int totalThreads = 3;
CyclicBarrier barrier = new CyclicBarrier(totalThreads, new Runnable() {
public void run() {
System.out.println("All threads have reached the barrier");
}
});
for (int i = 0; i < totalThreads; i++) {
Thread thread = new Thread(new Worker(barrier));
thread.start();
}
}
static class Worker implements Runnable {
private CyclicBarrier barrier;
public Worker(CyclicBarrier barrier) {
this.barrier = barrier;
}
public void run() {
try {
System.out.println("Thread " + Thread.currentThread().getId() + " is waiting");
barrier.await();
System.out.println("Thread " + Thread.currentThread().getId() + " has passed the barrier");
} catch (InterruptedException ex) {
ex.printStackTrace();
} catch (BrokenBarrierException ex) {
ex.printStackTrace();
}
}
}
}
在上面的示例中,我们创建了一个CyclicBarrier,其中有3个线程需要等待。当所有线程都到达屏障点时,我们会执行一个Runnable,并打印一条消息。
输出将类似于:
Thread 11 is waiting
Thread 12 is waiting
Thread 13 is waiting
All threads have reached the barrier
Thread 12 has passed the barrier
Thread 11 has passed the barrier
Thread 13 has passed the barrier
从输出可以看出,当所有线程都到达屏障点时,Runnable会执行,并且所有线程都会被释放继续执行。
CyclicBarrier有一些其他的方法,例如getParties()返回到达屏障点所需的线程数量,getNumberWaiting()返回正在等待的线程数量等等。可以根据具体需求选择使用。
public class CyclicBarrierDemo {
public static void main(String[] args) {
// 主线程
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,() -> {
System.out.println("召唤神龙");
});
for (int i = 1; i <= 7; i++) {
// 子线程
int finalI = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "收集了第" + finalI + "颗龙珠");
try {
cyclicBarrier.await(); // 加法计数 等待
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
3)Semaphore
信号量
Semaphore是一种用于控制并发访问资源的同步工具,它基于计数器的机制来实现。计数器可以看作是一个资源的数量,Semaphore维护着一个许可证的集合,每个许可证表示一个可用的资源。
Semaphore有两个基本操作:acquire
和release
。当线程需要访问资源时,它需要首先调用acquire方法来获取一个许可证,如果当前没有可用的许可证,则线程会被阻塞。当线程使用完资源后,它需要调用release方法来释放许可证,以便其他线程可以继续访问资源。
以下是一个使用Semaphore的示范代码:
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
public static void main(String[] args) {
// 创建一个Semaphore实例,初始许可证数量为3
Semaphore semaphore = new Semaphore(3);
// 启动10个线程
for (int i = 1; i <= 10; i++) {
Thread thread = new Thread(new Worker(semaphore, i));
thread.start();
}
}
static class Worker implements Runnable {
private final Semaphore semaphore;
private final int id;
public Worker(Semaphore semaphore, int id) {
this.semaphore = semaphore;
this.id = id;
}
@Override
public void run() {
try {
System.out.println("线程 " + id + " 正在尝试获取许可证...");
semaphore.acquire();
System.out.println("线程 " + id + " 获取到了许可证,正在使用资源...");
Thread.sleep(2000); // 模拟使用资源的耗时操作
System.out.println("线程 " + id + " 使用资源完毕,释放许可证");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
在这个示例中,我们创建了一个Semaphore实例,初始许可证数量为3。然后我们启动了10个线程,每个线程代表一个工作线程。
在每个工作线程中,首先调用
semaphore.acquire()
方法来获取一个许可证,如果没有可用的许可证,则线程会被阻塞。当线程获取到许可证后,它会执行一段耗时操作(模拟使用资源的过程),然后调用semaphore.release()
方法来释放许可证。
运行这个示例代码,你会发现每次最多有3个线程同时获取到许可证,其他线程会在
acquire()
方法处被阻塞,直到有许可证可用。这个示例展示了Semaphore如何限制对资源的并发访问。
public class SemaphoreDemo2 {
public static void main(String[] args) {
// 线程数量,停车位,限流
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i <= 6; i++) {
new Thread(() -> {
// acquire() 得到
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "抢到车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "离开车位");
}catch (Exception e) {
e.printStackTrace();
}finally {
semaphore.release(); // release() 释放
}
}).start();
}
}
}
原理:
semaphore.acquire()获得资源,如果资源已经使用完了,就等待资源释放后再进行使用!
semaphore.release()释放,会将当前的信号量释放+1,然后唤醒等待的线程!
作用: 多个共享资源互斥的使用! 并发限流,控制最大的线程数!
9. 读写锁
ReentrantReadWriteLock是Java中提供的一种读写锁,它能够同时支持多个线程进行读操作,但只允许一个线程进行写操作。它是ReentrantLock的一种扩展,具备了可重入性。
下面是一个简单的示范代码,演示了如何使用ReentrantReadWriteLock:
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Example {
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private int value;
public int getValue() {
lock.readLock().lock(); // 加读锁
try {
return value;
} finally {
lock.readLock().unlock(); // 解读锁
}
}
public void setValue(int value) {
lock.writeLock().lock(); // 加写锁
try {
this.value = value;
} finally {
lock.writeLock().unlock(); // 解写锁
}
}
}
在上面的示例中,我们使用了ReentrantReadWriteLock来保护一个共享变量value。在getValue方法中,我们获取了一个读锁,允许多个线程同时访问value的值。在setValue方法中,我们获取了一个写锁,确保只有一个线程能够修改value的值。
需要注意的是,获取读锁使用的是readLock()方法,获取写锁使用的是writeLock()方法。在使用完锁之后,需要调用unlock()方法来释放锁。
ReentrantReadWriteLock的使用可以提高并发性能,特别是在读操作比写操作频繁的情况下。读操作不会阻塞其他的读操作,而只有在写操作时才会阻塞其他的读操作和写操作,从而提高并发性能。
未加锁
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
int num = 6;
for (int i = 1; i <= num; i++) {
int finalI = i;
new Thread(() -> {
myCache.write(String.valueOf(finalI), String.valueOf(finalI));
},String.valueOf(i)).start();
}
for (int i = 1; i <= num; i++) {
int finalI = i;
new Thread(() -> {
myCache.read(String.valueOf(finalI));
},String.valueOf(i)).start();
}
}
}
/**
* 方法未加锁,导致写的时候被插队
*/
class MyCache {
private volatile Map<String, String> map = new HashMap<>();
public void write(String key, String value) {
System.out.println(Thread.currentThread().getName() + "线程开始写入");
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "线程写入ok");
}
public void read(String key) {
System.out.println(Thread.currentThread().getName() + "线程开始读取");
map.get(key);
System.out.println(Thread.currentThread().getName() + "线程写读取ok");
}
}
我们也可以采用synchronized这种重量锁和轻量锁 lock去保证数据的可靠。
但是这次我们采用更细粒度的锁:ReadWriteLock 读写锁来保证
加读写锁:
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache2 myCache = new MyCache2();
int num = 6;
for (int i = 1; i <= num; i++) {
int finalI = i;
new Thread(() -> {
myCache.write(String.valueOf(finalI), String.valueOf(finalI));
},String.valueOf(i)).start();
}
for (int i = 1; i <= num; i++) {
int finalI = i;
new Thread(() -> {
myCache.read(String.valueOf(finalI));
},String.valueOf(i)).start();
}
}
}
class MyCache2 {
private volatile Map<String, String> map = new HashMap<>();
private ReadWriteLock lock = new ReentrantReadWriteLock();
public void write(String key, String value) {
lock.writeLock().lock(); // 写锁
try {
System.out.println(Thread.currentThread().getName() + "线程开始写入");
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "线程写入ok");
}finally {
lock.writeLock().unlock(); // 释放写锁
}
}
public void read(String key) {
lock.readLock().lock(); // 读锁
try {
System.out.println(Thread.currentThread().getName() + "线程开始读取");
map.get(key);
System.out.println(Thread.currentThread().getName() + "线程写读取ok");
}finally {
lock.readLock().unlock(); // 释放读锁
}
}
}
10. 阻塞队列
1)BlockQueue
是Collection的一个子类
什么情况下我们会使用阻塞队列
多线程并发处理、线程池
BlockingQueue(阻塞队列)是Java中的一个接口,它继承自Queue接口,并在此基础上提供了额外的阻塞操作。阻塞队列在多线程编程中非常有用,可以用于实现生产者-消费者模式。
阻塞队列的特点如下:
当队列为空时,消费者线程会被阻塞,直到有任务被放入队列
。当队列满时,生产者线程会被阻塞,直到有任务被取走
。支持可选的公平性策略,即线程按照先进先出的原则等待队列
。
Java中的BlockingQueue接口有多个实现类,如ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue等。
以下是一个简单的示例代码,使用ArrayBlockingQueue实现阻塞队列:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueExample {
public static void main(String[] args) {
// 创建一个容量为3的ArrayBlockingQueue
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
// 创建生产者线程
Thread producer = new Thread(() -> {
try {
// 往队列中放入数据
for (int i = 1; i <= 5; i++) {
queue.put(i);
System.out.println("生产者放入数据:" + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 创建消费者线程
Thread consumer = new Thread(() -> {
try {
// 从队列中取出数据
while (true) {
int data = queue.take();
System.out.println("消费者取出数据:" + data);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 启动生产者和消费者线程
producer.start();
consumer.start();
}
}
在以上示例中,我们创建了一个容量为3的ArrayBlockingQueue,并创建了一个生产者线程和一个消费者线程。生产者线程往队列中放入数据,消费者线程从队列中取出数据。当队列满了时,生产者线程会被阻塞,直到有队列空闲。当队列为空时,消费者线程会被阻塞,直到有数据可取。
运行以上代码,可以看到生产者线程往队列中放入数据,消费者线程从队列中取出数据,并且它们按照先进先出的原则顺序执行。
BlockingQueue的4组API
等待,阻塞(一直阻塞)
等待,阻塞(等待超时)
/**
* 抛出异常
*/
public static void test1(){
//需要初始化队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
//抛出异常:java.lang.IllegalStateException: Queue full
// System.out.println(blockingQueue.add("d"));
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
//如果多移除一个
//这也会造成 java.util.NoSuchElementException 抛出异常
System.out.println(blockingQueue.remove());
}
=======================================================================================
/**
* 不抛出异常,有返回值
*/
public static void test2(){
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
//添加 一个不能添加的元素 使用offer只会返回false 不会抛出异常
System.out.println(blockingQueue.offer("d"));
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//弹出 如果没有元素 只会返回null 不会抛出异常
System.out.println(blockingQueue.poll());
}
=======================================================================================
/**
* 等待 一直阻塞
*/
public static void test3() throws InterruptedException {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//一直阻塞 不会返回
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
//如果队列已经满了, 再进去一个元素 这种情况会一直等待这个队列 什么时候有了位置再进去,程序不会停止
// blockingQueue.put("d");
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
//如果我们再来一个 这种情况也会等待,程序会一直运行 阻塞
System.out.println(blockingQueue.take());
}
=======================================================================================
/**
* 等待 超时阻塞
* 这种情况也会等待队列有位置 或者有产品 但是会超时结束
*/
public static void test4() throws InterruptedException {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.offer("a");
blockingQueue.offer("b");
blockingQueue.offer("c");
System.out.println("开始等待");
blockingQueue.offer("d",2, TimeUnit.SECONDS); //超时时间2s 等待如果超过2s就结束等待
System.out.println("结束等待");
System.out.println("===========取值==================");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println("开始等待");
blockingQueue.poll(2,TimeUnit.SECONDS); //超过两秒 我们就不要等待了
System.out.println("结束等待");
}
根据实际业务挑选适合的API
2)SynchronousQueue同步队列
同步队列 没有容量,也可以视为容量为1的队列;
进去一个元素,必须等待取出来之后,才能再往里面放入一个元素;
put方法 和 take方法;
SynchronousQueue和 其他的BlockingQueue 不一样 它不存储元素;
put了一个元素,就必须从里面先take出来,否则不能再put进去值!
并且SynchronousQueue 的take是使用了lock锁保证线程安全的。
(看源码)
SynchronousQueue是Java中的一个阻塞队列实现,它的特点是它的容量为0。它主要用于在多个线程之间传递数据
的场景,其中一个线程等待另一个线程将数据放入队列中,然后两个线程可以通过队列来进行数据交换。
SynchronousQueue的特点如下:
容量为0
:SynchronousQueue没有容量,也就意味着每一个插入操作必须等待一个相应的删除操作,反之亦然。
2.公平性
:SynchronousQueue可以选择是否公平地处理线程的访问。当公平性设置为true时,线程将按照FIFO的顺序竞争访问队列。当公平性设置为false时,线程将无序地竞争访问队列,默认为false。
下面是一个示例代码,展示了如何使用SynchronousQueue进行线程间的数据交换:
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueExample {
public static void main(String[] args) {
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
// 生产者线程
Thread producer = new Thread(() -> {
try {
int data = 123;
System.out.println("Producer puts: " + data);
queue.put(data); // 等待消费者线程取走数据
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 消费者线程
Thread consumer = new Thread(() -> {
try {
int data = queue.take();
System.out.println("Consumer takes: " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
}
}
在上面的示例中,我们创建了一个SynchronousQueue对象,并创建了一个生产者线程和一个消费者线程。生产者线程通过调用put方法将数据放入队列中,并等待消费者线程取走数据。消费者线程通过调用take方法从队列中取走数据。由于SynchronousQueue的容量为0,put方法和take方法都会阻塞线程直到另一个线程相应地调用take方法和put方法。
当我们运行这个示例代码时,可以看到生产者线程先输出了数据,然后消费者线程才输出了相同的数据。这证明了SynchronousQueue用于线程间的数据交换。
package com.marchsoft.queue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
public class SynchronousQueue {
public static void main(String[] args) {
BlockingQueue<String> synchronousQueue = new java.util.concurrent.SynchronousQueue<>();
// 网queue中添加元素
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "put 01");
synchronousQueue.put("1");
System.out.println(Thread.currentThread().getName() + "put 02");
synchronousQueue.put("2");
System.out.println(Thread.currentThread().getName() + "put 03");
synchronousQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 取出元素
new Thread(()-> {
try {
System.out.println(Thread.currentThread().getName() + "take" + synchronousQueue.take());
System.out.println(Thread.currentThread().getName() + "take" + synchronousQueue.take());
System.out.println(Thread.currentThread().getName() + "take" + synchronousQueue.take());
}catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
JUC并发编程-常用的多线程操作辅助类(必会)、读写锁、阻塞队列 到此完结,笔者归纳、创作不易,大佬们给个3连再起飞吧