JUC并发工具类的应用场景详解

目录

常用并发同步工具类的真实应用场景

1. ReentrantLock 

1.1 常用API 

1.2 ReentrantLock使用 

独占锁:模拟抢票场景

公平锁和非公平锁 

可重入锁 

结合Condition实现生产者消费者模式 

1.3 应用场景总结

2. Semaphore 

2.1 常用API 

2.2 Semaphore使用 

Semaphore实现服务接口限流 

Semaphore实现数据库连接池 

2.3 应用场景总结 

3. CountDownLatch 

3.1 常用API

3.2 CountDownLatch使用 

模拟实现百米赛跑 

多任务完成后合并汇总 

3.3 应用场景总结 

4. CyclicBarrier

4.1 常用API 

4.2 CyclicBarrier使用 

模拟人满发车 

多线程批量处理数据

4.3 应用场景总结 

4.4 CyclicBarrier 与 CountDownLatch 区别 

5. Exchanger 

5.1 常用API 

5.2 Exchanger使用 

模拟交易场景 

模拟对账场景 

模拟队列中交换数据场景 

5.3 应用场景总结 

6. Phaser

​编辑6.1 常用API 

6.2 Phaser使用 

多线程批量处理数据 

阶段性任务:模拟公司团建 

6.3 应用场景总结 


常用并发同步工具类的真实应用场景

        jdk提供了比synchronized更加高级的各种同步工具,包括ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等,可以实现更加丰富的多线程操作。

 

1. ReentrantLock 

        ReentrantLock是一种可重入的独占锁,它允许同一个线程多次获取同一个锁而不会被阻塞。

        它的功能类似于synchronized是一种互斥锁,可以保证线程安全。相对于 synchronized, ReentrantLock具备如下特点:

  • 可中断
  • 可以设置超时时间
  • 可以设置为公平锁
  • 支持多个条件变量
  • 与 synchronized 一样,都支持可重入

它的主要应用场景是在多线程环境下对共享资源进行独占式访问,以保证数据的一致性和安全性

 

1.1 常用API 

Lock接口 

ReentrantLock实现了Lock接口规范,常见API如下: 

void lock()

获取锁,调用该方法当前线程会获取锁,当锁获得后,该方法返回

void lockInterruptibly() throws InterruptedException

可中断的获取锁,和lock()方法不同之处在于该方法会响应中断,即在锁的获取中可以中断当前线程

boolean tryLock()

尝试非阻塞的获取锁,调用该方法后立即返回。如果能够获取到返回true,否则返回false

boolean tryLock(long time, TimeUnit unit) throws InterruptedException

超时获取锁,当前线程在以下三种情况下会被返回:

当前线程在超时时间内获取了锁

当前线程在超时时间内被中断

超时时间结束,返回false

void unlock()

释放锁

Condition newCondition()

获取等待通知组件,该组件和当前的锁绑定,当前线程只有获取了锁,才能调用该组件的await()方法,而调用后,当前线程将释放锁

基本语法

//加锁  阻塞 
lock.lock(); 
try {  
    ...
} finally { 
    // 解锁 
    lock.unlock();  
}


//尝试加锁   非阻塞
if (lock.tryLock(1, TimeUnit.SECONDS)) {
    try {
        ...
    } finally {
        lock.unlock();
    }
}

在使用时要注意 4 个问题:

  1. 默认情况下 ReentrantLock 为非公平锁而不是公平锁;
  2. 加锁次数和释放锁次数一定要保持一致,否则会导致线程阻塞或程序异常;
  3. 加锁操作一定要放在 try 代码之前,这样可以避免未加锁成功又释放锁的异常;
  4. 释放锁一定要放在 finally 中,否则会导致线程阻塞。

1.2 ReentrantLock使用 

独占锁:模拟抢票场景

8张票,10个人抢,如果不加锁,会出现什么问题?

/**
 *  模拟抢票场景
 */
public class ReentrantLockDemo {

    private final ReentrantLock lock = new ReentrantLock();//默认非公平
    private static int tickets = 8; // 总票数

    public void buyTicket() {
        lock.lock(); // 获取锁
        try {
            if (tickets > 0) { // 还有票
                try {
                    Thread.sleep(10); // 休眠10ms,模拟出并发效果
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "购买了第" + tickets-- + "张票");
            } else {
                System.out.println("票已经卖完了," + Thread.currentThread().getName() + "抢票失败");
            }

        } finally {
            lock.unlock(); // 释放锁
        }
    }


    public static void main(String[] args) {
        ReentrantLockDemo ticketSystem = new ReentrantLockDemo();
        for (int i = 1; i <= 10; i++) {
            Thread thread = new Thread(() -> {

                ticketSystem.buyTicket(); // 抢票

            }, "线程" + i);
            // 启动线程
            thread.start();

        }


        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println("剩余票数:" + tickets);
    }
}

 不加锁的效果: 出现超卖的问题:

 

加锁效果: 正常,两个人抢票失败 :

 

公平锁和非公平锁 

ReentrantLock支持公平锁和非公平锁两种模式:

  • 公平锁:线程在获取锁时,按照等待的先后顺序获取锁。
  • 非公平锁:线程在获取锁时,不按照等待的先后顺序获取锁,而是随机获取锁。ReentrantLock默认是非公平锁
ReentrantLock lock = new ReentrantLock(); //参数默认false,不公平锁  
ReentrantLock lock = new ReentrantLock(true); //公平锁  

 比如买票的时候就有可能出现插队的场景,允许插队就是非公平锁,如下图:

 

可重入锁 

        可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提锁对象得是同一个对象),不会因为之前已经获取过还没释放而阻塞。Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。在实际开发中,可重入锁常常应用于递归操作、调用同一个类中的其他方法、锁嵌套等场景中。 

class Counter {
    private final ReentrantLock lock = new ReentrantLock(); // 创建 ReentrantLock 对象

    public void recursiveCall(int num) {
        lock.lock(); // 获取锁
        try {
            if (num == 0) {
                return;
            }
            System.out.println("执行递归,num = " + num);
            recursiveCall(num - 1);
        } finally {
            lock.unlock(); // 释放锁
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        Counter counter = new Counter(); // 创建计数器对象

        // 测试递归调用
        counter.recursiveCall(10);
    }
}
结合Condition实现生产者消费者模式 

        java.util.concurrent类库中提供Condition类来实现线程之间的协调。调用Condition.await() 方法使线程等待,其他线程调用Condition.signal() 或 Condition.signalAll() 方法唤醒等待的线程。

注意:调用Condition的await()和signal()方法,都必须在lock保护之内

案例:基于ReentrantLock和Condition实现一个简单队列

public class ReentrantLockDemo3 {

    public static void main(String[] args) {
        // 创建队列
        Queue queue = new Queue(5);
        //启动生产者线程
        new Thread(new Producer(queue)).start();
        //启动消费者线程
        new Thread(new Customer(queue)).start();

    }
}

/**
 * 队列封装类
 */
class Queue {
    private Object[] items ;
    int size = 0;
    int takeIndex;
    int putIndex;
    private ReentrantLock lock;
    public Condition notEmpty; //消费者线程阻塞唤醒条件,队列为空阻塞,生产者生产完唤醒
    public Condition notFull; //生产者线程阻塞唤醒条件,队列满了阻塞,消费者消费完唤醒

    public Queue(int capacity){
        this.items = new Object[capacity];
        lock = new ReentrantLock();
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }


    public void put(Object value) throws Exception {
        //加锁
        lock.lock();
        try {
            while (size == items.length)
                // 队列满了让生产者等待
                notFull.await();

            items[putIndex] = value;
            if (++putIndex == items.length)
                putIndex = 0;
            size++;
            notEmpty.signal(); // 生产完唤醒消费者

        } finally {
            System.out.println("producer生产:" + value);
            //解锁
            lock.unlock();
        }
    }

    public Object take() throws Exception {
        lock.lock();
        try {
            // 队列空了就让消费者等待
            while (size == 0)
                notEmpty.await();

            Object value = items[takeIndex];
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            size--;
            notFull.signal(); //消费完唤醒生产者生产
            return value;
        } finally {
            lock.unlock();
        }
    }
}

/**
 * 生产者
 */
class Producer implements Runnable {

    private Queue queue;

    public Producer(Queue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            // 隔1秒轮询生产一次
            while (true) {
                Thread.sleep(1000);
                queue.put(new Random().nextInt(1000));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

/**
 * 消费者
 */
class Customer implements Runnable {

    private Queue queue;

    public Customer(Queue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            // 隔2秒轮询消费一次
            while (true) {
                Thread.sleep(2000);
                System.out.println("consumer消费:" + queue.take());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

1.3 应用场景总结

ReentrantLock具体应用场景如下:

  1. 解决多线程竞争资源的问题,例如多个线程同时对同一个数据库进行写操作,可以使用ReentrantLock保证每次只有一个线程能够写入。
  2. 实现多线程任务的顺序执行,例如在一个线程执行完某个任务后,再让另一个线程执行任务。
  3. 实现多线程等待/通知机制,例如在某个线程执行完某个任务后,通知其他线程继续执行任务。

2. Semaphore 

        Semaphore(信号量)是一种用于多线程编程的同步工具,用于控制同时访问某个资源的线程数量。 

        Semaphore维护了一个计数器,线程可以通过调用acquire()方法来获取Semaphore中的许可证,当计数器为0时,调用acquire()的线程将被阻塞,直到有其他线程释放许可证;线程可以通过调用release()方法来释放Semaphore中的许可证,这会使Semaphore中的计数器增加,从而允许更多的线程访问共享资源。 

2.1 常用API 

构造器 

  • permits 表示许可证的数量(资源数)
  • fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程

常用方法 

  • acquire() 表示阻塞并获取许可
  • tryAcquire() 方法在没有许可的情况下会立即返回 false,要获取许可的线程不会阻塞
  • release() 表示释放许可

2.2 Semaphore使用 

Semaphore实现服务接口限流 
@Slf4j
public class SemaphoreDemo {

    /**
     * 同一时刻最多只允许有两个并发
     */
    private static Semaphore semaphore = new Semaphore(2);

    private static Executor executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {
        for(int i=0;i<10;i++){
            executor.execute(()->getProductInfo2());
        }
    }

    public static String getProductInfo() {
        try {
            semaphore.acquire();
            log.info("请求服务");
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }finally {
            semaphore.release();
        }
        return "返回商品详情信息";
    }

    public static String getProductInfo2() {

        if(!semaphore.tryAcquire()){
            log.error("请求被流控了");
            return "请求被流控了";
        }
        try {
            log.info("请求服务");
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }finally {
            semaphore.release();
        }
        return "返回商品详情信息";
    }
}
Semaphore实现数据库连接池 
public class SemaphoreDemo2 {

    public static void main(String[] args) {
        final ConnectPool pool = new ConnectPool(2);
        ExecutorService executorService = Executors.newCachedThreadPool();

        //5个线程并发来争抢连接资源
        for (int i = 0; i < 5; i++) {
            final int id = i + 1;
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    Connect connect = null;
                    try {
                        System.out.println("线程" + id + "等待获取数据库连接");
                        connect = pool.openConnect();
                        System.out.println("线程" + id + "已拿到数据库连接:" + connect);
                        //进行数据库操作2秒...然后释放连接
                        Thread.sleep(2000);
                        System.out.println("线程" + id + "释放数据库连接:" + connect);

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        pool.releaseConnect(connect);
                    }

                }
            });
        }
    }
}

//数据库连接池
class ConnectPool {
    private int size;
    private Connect[] connects;

    //记录对应下标的Connect是否已被使用
    private boolean[] connectFlag;
    //信号量对象
    private Semaphore semaphore;

    /**
     * size:初始化连接池大小
     */
    public ConnectPool(int size) {
        this.size = size;
        semaphore = new Semaphore(size, true);
        connects = new Connect[size];
        connectFlag = new boolean[size];
        initConnects();//初始化连接池
    }

    private void initConnects() {
        for (int i = 0; i < this.size; i++) {
            connects[i] = new Connect();
        }
    }

    /**
     * 获取数据库连接
     *
     * @return
     * @throws InterruptedException
     */
    public Connect openConnect() throws InterruptedException {
        //得先获得使用许可证,如果信号量为0,则拿不到许可证,一直阻塞直到能获得
        semaphore.acquire();
        return getConnect();
    }

    private synchronized Connect getConnect() {
        for (int i = 0; i < connectFlag.length; i++) {
            if (!connectFlag[i]) {
                //标记该连接已被使用
                connectFlag[i] = true;
                return connects[i];
            }
        }
        return null;
    }

    /**
     * 释放某个数据库连接
     */
    public synchronized void releaseConnect(Connect connect) {
        for (int i = 0; i < this.size; i++) {
            if (connect == connects[i]) {
                connectFlag[i] = false;
                semaphore.release();
            }
        }
    }
}

/**
 * 数据库连接
 */
class Connect {

    private static int count = 1;
    private int id = count++;

    public Connect() {
        //假设打开一个连接很耗费资源,需要等待1秒
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("连接#" + id + "#已与数据库建立通道!");
    }

    @Override
    public String toString() {
        return "#" + id + "#";

    }

}

2.3 应用场景总结 

以下是一些使用Semaphore的常见场景:

  1. 限流:Semaphore可以用于限制对共享资源的并发访问数量,以控制系统的流量。
  2. 资源池:Semaphore可以用于实现资源池,以维护一组有限的共享资源。

3. CountDownLatch 

        CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成操作集。

 

        CountDownLatch使用给定的计数值(count)初始化。await方法会阻塞直到当前的计数值(count),由于countDown方法的调用达到0,count为0之后所有等待的线程都会被释放,并且随后对await方法的调用都会立即返回。这是一个一次性现象 —— count不会被重置。 

3.1 常用API

构造器

常用方法

// 调用 await() 方法的线程会被挂起,它会等待直到 count 值为 0 才继续执行
public void await() throws InterruptedException { };  
// 和 await() 类似,若等待 timeout 时长后,count 值还是没有变为 0,不再等待,继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  
// 会将 count 减 1,直至为 0
public void countDown() { }; 

3.2 CountDownLatch使用 

模拟实现百米赛跑 
public class CountDownLatchDemo {
    // begin 代表裁判 初始为 1
    private static CountDownLatch begin = new CountDownLatch(1);

    // end 代表玩家 初始为 8
    private static CountDownLatch end = new CountDownLatch(8);

    public static void main(String[] args) throws InterruptedException {

        for (int i = 1; i <= 8; i++) {
            new Thread(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    // 预备状态
                    System.out.println("参赛者"+Thread.currentThread().getName()+ "已经准备好了");
                    // 等待裁判吹哨
                    begin.await();
                    // 开始跑步
                    System.out.println("参赛者"+Thread.currentThread().getName() + "开始跑步");
                    Thread.sleep(1000);
                    // 跑步结束, 跑完了
                    System.out.println("参赛者"+Thread.currentThread().getName()+ "到达终点");
                    // 跑到终点, 计数器就减一
                    end.countDown();
                }
            }).start();
        }
        // 等待 5s 就开始吹哨
        Thread.sleep(5000);
        System.out.println("开始比赛");
        // 裁判吹哨, 计数器减一
        begin.countDown();
        // 等待所有玩家到达终点
        end.await();
        System.out.println("比赛结束");

    }
多任务完成后合并汇总 

        很多时候,我们的并发任务,存在前后依赖关系;比如数据详情页需要同时调用多个接口获取数据,并发请求获取到数据后需要进行结果合并;或者多个数据操作完成后,需要数据check。

public class CountDownLatchDemo2 {
    public static void main(String[] args) throws Exception {

        CountDownLatch countDownLatch = new CountDownLatch(5);
        for (int i = 0; i < 5; i++) {
            final int index = i;
            new Thread(() -> {
                try {
                    Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(2000));
                    System.out.println("任务" + index +"执行完成");
                    countDownLatch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }

        // 主线程在阻塞,当计数器为0,就唤醒主线程往下执行
        countDownLatch.await();
        System.out.println("主线程:在所有任务运行完成后,进行结果汇总");
    }
}

3.3 应用场景总结 

以下是使用CountDownLatch的常见场景:

  1. 并行任务同步:CountDownLatch可以用于协调多个并行任务的完成情况,确保所有任务都完成后再继续执行下一步操作。
  2. 多任务汇总:CountDownLatch可以用于统计多个线程的完成情况,以确定所有线程都已完成工作。
  3. 资源初始化:CountDownLatch可以用于等待资源的初始化完成,以便在资源初始化完成后开始使用。

4. CyclicBarrier

        CyclicBarrier(回环栅栏或循环屏障),是 Java 并发库中的一个同步工具,通过它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。  

4.1 常用API 

构造器 

 // parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
 public CyclicBarrier(int parties)
 // 用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行)
 public CyclicBarrier(int parties, Runnable barrierAction)

常用方法

//指定数量的线程全部调用await()方法时,这些线程不再阻塞
// BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException

//循环  通过reset()方法可以进行重置
public void reset()

4.2 CyclicBarrier使用 

模拟人满发车 

利用CyclicBarrier的计数器能够重置,屏障可以重复使用的特性,可以支持类似“人满发车”的场景 

public class CyclicBarrierDemo {

    public static void main(String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(5);

        CyclicBarrier cyclicBarrier = new CyclicBarrier(5,
                () -> System.out.println("人齐了,准备发车"));

        for (int i = 0; i < 10; i++) {
            final int id = i+1;
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(id+"号马上就到");
                        int sleepMills = ThreadLocalRandom.current().nextInt(2000);
                        Thread.sleep(sleepMills);
                        System.out.println(id + "号到了,上车");
                        cyclicBarrier.await();

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }catch(BrokenBarrierException e){
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}
多线程批量处理数据
public class CyclicBarrierBatchProcessorDemo {

    public static void main(String[] args) {
        //生成数据
        List<Integer> data = new ArrayList<>();
        for (int i = 1; i <= 50; i++) {
            data.add(i);
        }

        //指定数据处理大小
        int batchSize = 5;
        CyclicBarrierBatchProcessor processor = new CyclicBarrierBatchProcessor(data, batchSize);
        //处理数据
        processor.process(batchData -> {
            for (Integer i : batchData) {
                System.out.println(Thread.currentThread().getName() + "处理数据" + i);
            }
        });
    }
}


class CyclicBarrierBatchProcessor {
    private List<Integer> data;
    private int batchSize;
    private CyclicBarrier barrier;
    private List<Thread> threads;

    public CyclicBarrierBatchProcessor(List<Integer> data, int batchSize) {
        this.data = data;
        this.batchSize = batchSize;
        this.barrier = new CyclicBarrier(batchSize);
        this.threads = new ArrayList<>();
    }

    public void process(BatchTask task) {
        // 对任务分批,获取线程数
        int threadCount = (data.size() + batchSize - 1) / batchSize;
        for (int i = 0; i < threadCount; i++) {
            int start = i * batchSize;
            int end = Math.min(start + batchSize, data.size());
            //获取每个线程处理的任务数
            List<Integer> batchData = data.subList(start, end);
            Thread thread = new Thread(() -> {
                task.process(batchData);
                try {
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
            threads.add(thread);
            thread.start();
        }

    }

    public interface BatchTask {
        void process(List<Integer> batchData);
    }
}

4.3 应用场景总结 

以下是一些常见的 CyclicBarrier 应用场景:

  1. 多线程任务:CyclicBarrier 可以用于将复杂的任务分配给多个线程执行,并在所有线程完成工作后触发后续操作。
  2. 数据处理:CyclicBarrier 可以用于协调多个线程间的数据处理,在所有线程处理完数据后触发后续操作。

4.4 CyclicBarrier 与 CountDownLatch 区别 

  • CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
  • CountDownLatch 参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束。CyclicBarrier 参与的线程职责是一样的。

5. Exchanger 

        Exchanger是一个用于线程间协作的工具类,用于两个线程间交换数据。具体交换数据是通过exchange方法来实现的,如果一个线程先执行exchange方法,那么它会同步等待另一个线程也执行exchange方法,这个时候两个线程就都达到了同步点,两个线程就可以交换数据。

5.1 常用API 

public V exchange(V x) throws InterruptedException
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutExce
  • V exchange(V v):等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。
  • V exchange(V v, long timeout, TimeUnit unit):等待另一个线程到达此交换点,或者当前线程被中断——抛出中断异常;又或者是等候超时——抛出超时异常,然后将给定的对象传送给该线程,并接收该线程的对象。

5.2 Exchanger使用 

模拟交易场景 

        用一个简单的例子来看下Exchanger的具体使用。两方做交易,如果一方先到要等另一方也到了才能交易,交易就是执行exchange方法交换数据。 

public class ExchangerDemo {
    private static Exchanger exchanger = new Exchanger();
    static String goods = "电脑";
    static String money = "$4000";
    public static void main(String[] args) throws InterruptedException {

        System.out.println("准备交易,一手交钱一手交货...");
        // 卖家
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("卖家到了,已经准备好货:" + goods);
                try {
                    String money = (String) exchanger.exchange(goods);
                    System.out.println("卖家收到钱:" + money);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        Thread.sleep(3000);

        // 买家
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("买家到了,已经准备好钱:" + money);
                    String goods = (String) exchanger.exchange(money);
                    System.out.println("买家收到货:" + goods);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

    }
}
模拟对账场景 
public class ExchangerDemo2 {

    private static final Exchanger<String> exchanger = new Exchanger();
    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {

        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String A = "12379871924sfkhfksdhfks";
                    exchanger.exchange(A);
                } catch (InterruptedException e) {
                }
            }
        });

        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String B = "32423423jknjkfsbfj";
                    String A = exchanger.exchange(B);
                    System.out.println("A和B数据是否一致:" + A.equals(B));
                    System.out.println("A= "+A);
                    System.out.println("B= "+B);
                } catch (InterruptedException e) {
                }
            }
        });

        threadPool.shutdown();

    }
}
模拟队列中交换数据场景 
public class ExchangerDemo3 {

    private static ArrayBlockingQueue<String> fullQueue
            = new ArrayBlockingQueue<>(5);
    private static ArrayBlockingQueue<String> emptyQueue
            = new ArrayBlockingQueue<>(5);
    private static Exchanger<ArrayBlockingQueue<String>> exchanger
            = new Exchanger<>();


    public static void main(String[] args) {
        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();

    }

    /**
     * 生产者
     */
    static class Producer implements Runnable {
        @Override
        public void run() {
            ArrayBlockingQueue<String> current = emptyQueue;
            try {
                while (current != null) {
                    String str = UUID.randomUUID().toString();
                    try {
                        current.add(str);
                        System.out.println("producer:生产了一个序列:" + str + ">>>>>加入到交换区");
                        Thread.sleep(2000);
                    } catch (IllegalStateException e) {
                        System.out.println("producer:队列已满,换一个空的");
                        current = exchanger.exchange(current);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 消费者
     */
    static class Consumer implements Runnable {
        @Override
        public void run() {
            ArrayBlockingQueue<String> current = fullQueue;
            try {
                while (current != null) {
                    if (!current.isEmpty()) {
                        String str = current.poll();
                        System.out.println("consumer:消耗一个序列:" + str);
                        Thread.sleep(1000);
                    } else {
                        System.out.println("consumer:队列空了,换个满的");
                        current = exchanger.exchange(current);
                        System.out.println("consumer:换满的成功~~~~~~~~~~~~~~~~~~~~~~");
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}

5.3 应用场景总结 

Exchanger 可以用于各种应用场景,具体取决于具体的 Exchanger 实现。常见的场景包括:

  1. 数据交换:在多线程环境中,两个线程可以通过 Exchanger 进行数据交换。
  2. 数据采集:在数据采集系统中,可以使用 Exchanger 在采集线程和处理线程间进行数据交换。

6. Phaser

        Phaser(阶段协同器)是一个Java实现的并发工具类,用于协调多个线程的执行。它提供了一些方便的方法来管理多个阶段的执行,可以让程序员灵活地控制线程的执行顺序和阶段性的执行。Phaser可以被视为CyclicBarrier和CountDownLatch的进化版,它能够自适应地调整并发线程数,可以动态增加或减少参与线程的数量。所以Phaser特别适合用在重复执行或者重用的情况。 

6.1 常用API 

构造方法 

  • Phaser(): 参与任务数0
  • Phaser(int parties) :指定初始参与任务数
  • Phaser(Phaser parent) :指定parent阶段器, 子对象作为一个整体加入parent对象, 当子对象中没有参与者时,会自动从parent对象解除注册
  • Phaser(Phaser parent,int parties) : 集合上面两个方法

增减参与任务数方法 

  • int register() 增加一个任务数,返回当前阶段号。
  • int bulkRegister(int parties) 增加指定任务个数,返回当前阶段号。
  • int arriveAndDeregister() 减少一个任务数,返回当前阶段号

到达、等待方法

  • int arrive() 到达(任务完成),返回当前阶段号。
  • int arriveAndAwaitAdvance() 到达后等待其他任务到达,返回到达阶段号
  • int awaitAdvance(int phase) 在指定阶段等待(必须是当前阶段才有效)
  • int awaitAdvanceInterruptibly(int phase) 阶段到达触发动作
  • int awaitAdvanceInterruptiBly(int phase,long timeout,TimeUnit unit)
  • protected boolean onAdvance(int phase,int registeredParties)类似CyclicBarrier的触发命令,通过重写该方法来增加阶段到达动作,该方法返回true将终结Phaser对象。

6.2 Phaser使用 

多线程批量处理数据 
public class PhaserBatchProcessorDemo {

    private final List<String> data;
    private final int batchSize;  //一次处理多少数据
    private final int threadCount; //处理的线程数
    private final Phaser phaser;
    private final List<String> processedData;

    public PhaserBatchProcessorDemo(List<String> data, int batchSize, int threadCount) {
        this.data = data;
        this.batchSize = batchSize;
        this.threadCount = threadCount;
        this.phaser = new Phaser(1);
        this.processedData = new ArrayList<>();
    }

    public void process() {
        for (int i = 0; i < threadCount; i++) {

            phaser.register();
            new Thread(new BatchProcessor(i)).start();
        }

        phaser.arriveAndDeregister();
    }

    private class BatchProcessor implements Runnable {

        private final int threadIndex;

        public BatchProcessor(int threadIndex) {
            this.threadIndex = threadIndex;
        }

        @Override
        public void run() {
            int index = 0;
            while (true) {
                // 所有线程都到达这个点之前会阻塞
                phaser.arriveAndAwaitAdvance();

                // 从未处理数据中找到一个可以处理的批次
                List<String> batch = new ArrayList<>();
                synchronized (data) {
                    while (index < data.size() && batch.size() < batchSize) {
                        String d = data.get(index);
                        if (!processedData.contains(d)) {
                            batch.add(d);
                            processedData.add(d);
                        }
                        index++;
                    }
                }

                // 处理数据
                for (String d : batch) {
                    System.out.println("线程" + threadIndex + "处理数据" + d);
                }

                // 所有线程都处理完当前批次之前会阻塞
                phaser.arriveAndAwaitAdvance();

                // 所有线程都处理完当前批次并且未处理数据已经处理完之前会阻塞
                if (batch.isEmpty() || index >= data.size()) {
                    phaser.arriveAndDeregister();
                    break;
                }
            }
        }
    }

    public static void main(String[] args) {
        //数据准备
        List<String> data = new ArrayList<>();
        for (int i = 1; i <= 15; i++) {
            data.add(String.valueOf(i));
        }

        int batchSize = 4;
        int threadCount = 3;
        PhaserBatchProcessorDemo processor = new PhaserBatchProcessorDemo(data, batchSize, threadCount);
        //处理数据
        processor.process();
    }
}
阶段性任务:模拟公司团建 
public class PhaserDemo {
    public static void main(String[] args) {
        final Phaser phaser = new Phaser() {
            //重写该方法来增加阶段到达动作
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                // 参与者数量,去除主线程
                int staffs = registeredParties - 1;
                switch (phase) {
                    case 0:
                        System.out.println("大家都到公司了,出发去公园,人数:" + staffs);
                        break;
                    case 1:
                        System.out.println("大家都到公园门口了,出发去餐厅,人数:" + staffs);
                        break;
                    case 2:
                        System.out.println("大家都到餐厅了,开始用餐,人数:" + staffs);
                        break;

                }

                // 判断是否只剩下主线程(一个参与者),如果是,则返回true,代表终止
                return registeredParties == 1;
            }
        };

        // 注册主线程 ———— 让主线程全程参与
        phaser.register();
        final StaffTask staffTask = new StaffTask();

        // 3个全程参与团建的员工
        for (int i = 0; i < 3; i++) {
            // 添加任务数
            phaser.register();
            new Thread(() -> {
                try {
                    staffTask.step1Task();
                    //到达后等待其他任务到达
                    phaser.arriveAndAwaitAdvance();

                    staffTask.step2Task();
                    phaser.arriveAndAwaitAdvance();

                    staffTask.step3Task();
                    phaser.arriveAndAwaitAdvance();

                    staffTask.step4Task();
                    // 完成了,注销离开
                    phaser.arriveAndDeregister();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }

        // 两个不聚餐的员工加入
        for (int i = 0; i < 2; i++) {
            phaser.register();
            new Thread(() -> {
                try {
                    staffTask.step1Task();
                    phaser.arriveAndAwaitAdvance();

                    staffTask.step2Task();
                    System.out.println("员工【" + Thread.currentThread().getName() + "】回家了");
                    // 完成了,注销离开
                    phaser.arriveAndDeregister();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }

        while (!phaser.isTerminated()) {
            int phase = phaser.arriveAndAwaitAdvance();
            if (phase == 2) {
                // 到了去餐厅的阶段,又新增4人,参加晚上的聚餐
                for (int i = 0; i < 4; i++) {
                    phaser.register();
                    new Thread(() -> {
                        try {
                            staffTask.step3Task();
                            phaser.arriveAndAwaitAdvance();

                            staffTask.step4Task();
                            // 完成了,注销离开
                            phaser.arriveAndDeregister();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }).start();
                }
            }
        }
    }

    static final Random random = new Random();

    static class StaffTask {
        public void step1Task() throws InterruptedException {
            // 第一阶段:来公司集合
            String staff = "员工【" + Thread.currentThread().getName() + "】";
            System.out.println(staff + "从家出发了……");
            Thread.sleep(random.nextInt(5000));
            System.out.println(staff + "到达公司");
        }

        public void step2Task() throws InterruptedException {
            // 第二阶段:出发去公园
            String staff = "员工【" + Thread.currentThread().getName() + "】";
            System.out.println(staff + "出发去公园玩");
            Thread.sleep(random.nextInt(5000));
            System.out.println(staff + "到达公园门口集合");

        }

        public void step3Task() throws InterruptedException {
            // 第三阶段:去餐厅
            String staff = "员工【" + Thread.currentThread().getName() + "】";
            System.out.println(staff + "出发去餐厅");
            Thread.sleep(random.nextInt(5000));
            System.out.println(staff + "到达餐厅");

        }

        public void step4Task() throws InterruptedException {
            // 第四阶段:就餐
            String staff = "员工【" + Thread.currentThread().getName() + "】";
            System.out.println(staff + "开始用餐");
            Thread.sleep(random.nextInt(5000));
            System.out.println(staff + "用餐结束,回家");
        }
    }
}

6.3 应用场景总结 

以下是一些常见的 Phaser 应用场景:

  1. 多线程任务分配:Phaser 可以用于将复杂的任务分配给多个线程执行,并协调线程间的合作。
  2. 多级任务流程:Phaser 可以用于实现多级任务流程,在每一级任务完成后触发下一级任务的开始。
  3. 模拟并行计算:Phaser 可以用于模拟并行计算,协调多个线程间的工作。
  4. 阶段性任务:Phaser 可以用于实现阶段性任务,在每一阶段任务完成后触发下一阶段任务的开始。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/369402.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

VMware无法检测到插入的USB设备,虚拟机插拔USB无反应

原本正常使用的VMware虚拟机&#xff0c;在进行了重装软件后&#xff0c;发现虚拟机插拔USB设备都无法检测到&#xff0c;没有任何的反应和提示。 通过一系列的操作发现&#xff0c;在新安装了VMware workstation 软件后&#xff0c;存在一定的概率性会发生VMware虚拟机无法自…

在windows下安装docker部署环境运行项目----docker-compose.yml

前言 小编我将用CSDN记录软件开发求学之路上亲身所得与所学的心得与知识&#xff0c;有兴趣的小伙伴可以关注一下&#xff01; 也许一个人独行&#xff0c;可以走的很快&#xff0c;但是一群人结伴而行&#xff0c;才能走的更远&#xff01;让我们在成长的道路上互相学习&…

Android 13.0 原生SystemUI下拉通知栏每条通知默认展开

1.前言 在13.0的系统rom原生开发中,在在对SystemUI下拉通知栏做定制的时候,在下拉状态栏的时候,通知栏中最后一条通知默认是收缩的 点击按钮 就会展开 原生系统systemui就是如此,为了更美观 所以要求最后一条通知也默认展开,显得更美观 最终效果图: 2.原生SystemUI下拉通…

09. BI - 数据可视化,如何进行基本图形绘制

本文为 「茶桁的 AI 秘籍 - BI 篇 第 09 篇」 文章目录 EDA 作用可视化视图Python 进行可视化subplot Hi&#xff0c;你好。我是茶桁。 今天想给大家讲的是关于数据的可视化。在工作中很多时候我们不光要计算结果&#xff0c;还要把结果呈现出来&#xff0c;最好是一种图形化的…

斗地主登录界面(JAVA图形化界面)设置

1.实现代码 import CodeUtil.CodeUtil; import domain.User;import javax.swing.*; import java.awt.*; import java.awt.event.MouseEvent; import java.awt.event.MouseListener; import java.util.ArrayList;public class LoginGame extends JFrame implements MouseListen…

png图片怎么转换成jpg?四个方法搞定不求人

在数字图像处理领域&#xff0c;PNG和JPG是两种常见的图片格式。PNG以无损压缩而闻名&#xff0c;适用于保存透明背景和保留图像细节&#xff1b;而JPG以有损压缩而著称&#xff0c;适用于在较小的文件大小下保持照片质量。有时候&#xff0c;您可能需要将PNG格式的图片转换为J…

Win10系统搭建个人hMailServer邮件服务结合内网穿透远程发邮件

文章目录 前言1. 安装hMailServer2. 设置hMailServer3. 客户端安装添加账号4. 测试发送邮件5. 安装cpolar6. 创建公网地址7. 测试远程发送邮件8. 固定连接公网地址9. 测试固定远程地址发送邮件 前言 hMailServer 是一个邮件服务器,通过它我们可以搭建自己的邮件服务,通过cpola…

【Linux】统信服务器操作系统V20 1060a-AMD64 Vmware安装

目录 ​编辑 一、概述 1.1 简介 1.2 产品特性 1.3 镜像下载 二、虚拟机安装 一、概述 1.1 简介 官网&#xff1a;统信软件 – 打造操作系统创新生态 统信服务器操作系统V20是统信操作系统&#xff08;UOS&#xff09;产品家族中面向服务器端运行环境的&#xff0c;是一款…

【数据结构】链表OJ面试题2(题库+解析)

1.前言 前五题在这http://t.csdnimg.cn/UeggB 休息一天&#xff0c;今天继续刷题&#xff01; 2.OJ题目训练 1. 编写代码&#xff0c;以给定值x为基准将链表分割成两部分&#xff0c;所有小于x的结点排在大于或等于x的结点之前 。链表分割_牛客题霸_牛客网 思路 既然涉及…

6-树-二叉树的层序遍历 II

这是树的第7篇算法&#xff0c;力扣链接。 给你二叉树的根节点 root &#xff0c;返回其节点值 自底向上的层序遍历 。 &#xff08;即按从叶子节点所在层到根节点所在的层&#xff0c;逐层从左向右遍历&#xff09; 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,nu…

开源浏览器Firefox:使用Docker本地部署并远程访问进行测试

&#x1f308;个人主页&#xff1a;聆风吟 &#x1f525;系列专栏&#xff1a;网络奇遇记、数据结构 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 &#x1f4cb;前言一. 部署Firefox二. 本地访问Firefox三. Linux安装Cpolar四. 配置Firefox公网地址…

单片机学习笔记---定时器/计数器(简述版!)

目录 定时器的介绍 定时计数器的定时原理 定时计数器的内部结构 两种控制寄存器 &#xff08;1&#xff09;工作方式寄存器TMOD &#xff08;2&#xff09;控制寄存器TCON 定时计数器的工作方式 方式0 方式1 方式2 方式3 定时器的配置步骤 第一步&#xff0c;对…

YOLO部署实战(3):Darknet训练模型权重

1 一些概念和问题 YOLO中的darknet到底指的是什么&#xff1f; darknet到底是一个类似于TensorFlow、PyTorch的框架&#xff0c;还是一个类似于AlexNet、VGG的模型&#xff1f; 其实都是。YOLO作者自己写的一个深度学习框架叫darknet&#xff08;见YOLO原文2.2部分&#xff…

Leetcode2857. 统计距离为 k 的点对

Every day a Leetcode 题目来源&#xff1a;2857. 统计距离为 k 的点对 解法1&#xff1a;暴力 暴力枚举数组 coordinates 的两个点 coordinates[i] (x1, y1) 和 coordinates[j] (x2, y2)&#xff0c;它们的距离 d (x1 XOR x2) (y1 XOR y2) &#xff0c;XOR 指的是按位…

「 CISSP学习笔记 」08. 安全运营

该知识领域涉及如下考点&#xff0c;具体内容分布于如下各个子章节&#xff1a; 理解并遵守调查执行记录和监控活动执行配置管理 (CM)&#xff08;例如&#xff0c;预配、基线、自动化&#xff09;应用基本的安全操作概念应用资源保护执行事故管理执行和维护检测和预防措施实施…

QSlider使用笔记

最近做项目使用到QSlider滑动条控件&#xff0c;在使用过的过程中&#xff0c;发现一个问题就是点滑动条上的一个位置&#xff0c;滑块并没有移动到鼠标点击的位置&#xff0c;体验感很差&#xff0c;于是研究了下&#xff0c;让鼠标点击后滑块移动到鼠标点击的位置。 1、event…

【Linux】Ext2 文件系统

文件系统 前言一、磁盘硬件1. 磁盘的物理存储结构2. 磁盘存储的逻辑抽象结构 二、理解 Ext2 文件系统1. 初步理解文件系统2. 深入理解文件系统&#xff08;1&#xff09;inode Table&#xff08;2&#xff09;Data blocks&#xff08;3&#xff09;inode Bitmap&#xff08;4&a…

内衣洗衣机是不是鸡肋?好用的小型洗衣机全自动推荐

随着大家工作的压力越来越大&#xff0c;下了班之后只能想躺平&#xff0c;在洗完澡之后看着还需要手洗的内衣裤真的很头疼。有些小伙伴还有会攒几天再丢进去洗衣机里面一起&#xff0c;而且这样子是非常不好的&#xff0c;用过的内衣裤长时间不清洗容易滋生细菌&#xff0c;而…

python_ACM模式《剑指offer刷题》二叉树1

题目&#xff1a; 面试tips&#xff1a; 1. 询问是否可以使用双端队列 (看后面思路就可知为什么要问这个) 思路&#xff1a; 时复和空复都为O(n) 思路一&#xff1a;利用双端队列。总体思想是利用二叉树层序遍历(二叉树的层序遍历就是用队列dq&#xff0c;且从左往右每一层…

【C++】笔试训练(八)

目录 一、选择题二、编程题1、两种排序方法2、求最小公倍数 一、选择题 1、关于重载函数&#xff0c;哪个说明是正确的&#xff08;&#xff09; A 函数名相同&#xff0c;参数类型或个数不同 B 函数名相同&#xff0c;返回值类型不同 C 函数名相同&#xff0c;函数内部实现不…