07-java基础-锁之AQSReentrantLockBlockingQueueCountDownLatchSemapho

文章目录

  • 0:AQS简介-常见面试题
    • AQS具备特性
    • state表示资源的可用状态
    • AQS定义两种资源共享方式
    • AQS定义两种队列
    • 自定义同步器实现时主要实现以下几种方法:
    • 同步等待队列
    • 条件等待队列
  • 1:AQS应用之ReentrantLock
    • ReentrantLock如何实现synchronized不具备的公平与非公平性呢?
    • ReentrantLocak源码流程图
    • ReentrantLock加锁示例代码-代码git地址
  • 2:AQS应用之Semapho
    • Semaphore 是什么?
    • 怎么使用 Semaphore?
      • 构造方法
      • 重要方法
      • 基本使用
    • Semapho源码流程图
    • Semapho示例代码git地址
  • 3:AQS应用之CountDownLatch
    • CountDownLatch是什么?
    • CountDownLatch如何工作?
    • API
    • CountDownLatch应用场景例子
      • 代码如下:
    • CountDownLatch源码流程图
    • CountDownLatch示例代码git地址
  • 4:CyclicBarrier
    • API
    • 应用场景
    • 示例代码:
    • CyclicBarrier示例代码git地址
  • 5:AQS应用之BlockingQueue
    • 队列类型
    • 队列数据结构
    • 常见的4种阻塞队列
    • ArrayBlockingQ
    • LinkedBlockingQueue
    • DelayQueue
    • BlockingQueue API
      • 添加元素
      • 检索元素
      • 多线程生产者-消费者示例
    • BlockingQueue各种队列底层流程图
    • BlockingQueue代码示例

0:AQS简介-常见面试题

  • Java并发编程核心在于java.concurrent.util包
  • 而juc当中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是基于AbstractQueuedSynchronizer简称AQS,
  • AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器。

AQS具备特性

  1. 阻塞等待队列
  2. 共享/独占
  3. 公平/非公平
  4. 可重入
  5. 允许中断
  • 除了Lock外,Java.concurrent.util当中同步器的实现Latch,Barrier,BlockingQueue等,都是基于AQS框架实现。
  1. 一般通过定义内部类Sync继承AQS
  2. 将同步器所有调用都映射到Sync对应的方法
  3. AQS内部维护属性volatile int state (32位)

state表示资源的可用状态

State三种访问方式
getState()、setState()、compareAndSetState()

AQS定义两种资源共享方式

  • Exclusive-独占,只有一个线程能执行,如ReentrantLock
  • Share-共享,多个线程可以同时执行,Semaphore/CountDownLatch

AQS定义两种队列

  • 同步等待队列
  • 条件等待队列

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。

自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():
    该线程是否正在独占资源。只有用到condition才需要去实现它
  • tryAcquire(int):
    独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):
    独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):
    共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):
    共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

同步等待队列

  • AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。
    在这里插入图片描述

条件等待队列

  • Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备时,这些等待线程才会被唤醒,从而重新争夺锁
    在这里插入图片描述

1:AQS应用之ReentrantLock

  • ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全。
  • 而且它具有比synchronized更多的特性,比如它支持手动加锁与解锁,支持加锁的公平性。
//使用ReentrantLock进行同步
ReentrantLock lock = new ReentrantLock(false);//false为非公平锁,true为公平锁
lock.lock(); //加锁
lock.unlock(); //解锁

ReentrantLock如何实现synchronized不具备的公平与非公平性呢?

  • 在ReentrantLock内部定义了一个Sync的内部类,该类继承AbstractQueuedSynchronized,对该抽象类的部分方法做了实现;并且还定义了两个子类:
    1、FairSync 公平锁的实现
    2、NonfairSync 非公平锁的实现
  • 这两个类都继承自Sync,也就是间接继承了AbstractQueuedSynchronized,所以这一个ReentrantLock同时具备公平与非公平特性。
  • ReentrantLock lock = new ReentrantLock(false);//false为非公平锁,true为公平锁,默认实现的非公平锁
package com.zgs.lock.reentrant_lock;

import java.util.concurrent.locks.ReentrantLock;

/**
 * @author guisong.zhang
 * @date 2024/3/9 23:27
 * @description ReentrantLock实现加锁示例代码  取出
 */
public class Test {
    //    private static ReentrantLock reentrantLock = new ReentrantLock();
    private static MyLock reentrantLock = new MyLock();

    public static void main(String[] args) {
        new Thread(() -> {
            reentrantLock.lock();
            withdrawMoney();
            reentrantLock.unlock();
        }, "取钱线程1").start();

        new Thread(() -> {
            reentrantLock.lock();
            withdrawMoney();
            reentrantLock.unlock();
        }, "取钱线程2").start();
    }

    public static void withdrawMoney() {
        System.out.println(Thread.currentThread().getName() + ":开始取钱");
        sleep(3000);
        System.out.println(Thread.currentThread().getName() + ":取钱完成");
    }

    private static void sleep(long time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {


        }
    }
}

  • 自己实现一个锁
package com.zgs.lock.reentrant_lock;

import sun.misc.Unsafe;

import java.lang.reflect.Field;


/**
 * @author guisong.zhang
 * @date 2024/3/9 23:41
 * @description 自定义lock
 */
public class MyLock {
    private static final Unsafe unsafe = getUnsafe();

    private volatile int state;
    private static long stateOffSet;

    static {
        try {
            stateOffSet = unsafe.objectFieldOffset(MyLock.class.getDeclaredField("state"));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 自己实现线程阻塞的几种方式
     * 1:wait:wait需要搭配 synchronized 使用,没必要因为synchronized就会加锁,这并不是我们自己实现的锁。
     * 2:sleep:解锁时间不确定,怎么唤醒呢,所以也不行
     * 3:park:
     * 4:while(true)自旋:
     */
    public void lock() {
        //判断当前线程是否需要加锁
        while (!unsafe.compareAndSwapInt(this, stateOffSet, 0, 1)) {
            System.out.println(Thread.currentThread().getName() + ":正在自选尝试加锁");
        }
        System.out.println(Thread.currentThread().getName() + ":加锁成功");
    }

    public void unlock() {
        state = 0;
    }

    private static Unsafe getUnsafe() {
        try {
            Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            theUnsafe.setAccessible(true);
            return (Unsafe) theUnsafe.get(null);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
}

  • park的使用
package com.zgs.lock.reentrant_lock;

import java.util.concurrent.locks.LockSupport;

/**
 * @author guisong.zhang
 * @date 2024/3/10 22:44
 * @description 类描述
 */
public class ParkTest {
    public static void main(String[] args) {

        Thread thread1 = new Thread(() -> {
            System.out.println("线程1开始执行");
            LockSupport.park();
            System.out.println("线程1解除阻塞,继续执行了.....");
        }, "线程1");
        thread1.start();

        Thread thread2 = new Thread(() -> {
            System.out.println("线程2开始执行");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            LockSupport.unpark(thread1);
        }, "线程2");
        thread2.start();
    }
}

ReentrantLocak源码流程图

ReentrantLock加锁示例代码-代码git地址

2:AQS应用之Semapho

Semaphore 是什么?

  • Semaphore 字面意思是信号量的意思,它的作用是控制访问特定资源的线程数目,底层依赖AQS的状态State,是在生产当中比较常用的一个工具类。

怎么使用 Semaphore?

构造方法

 public Semaphore(int permits)
 public Semaphore(int permits, boolean fair)

重要方法

public void acquire() throws InterruptedException
public void release()
tryAcquire(long timeout, TimeUnit unit)

基本使用

需求场景

  • 资源访问,服务限流(Hystrix里限流就有基于信号量方式)。
    代码实现
package com.zgs.lock.semapho;

import java.util.concurrent.Semaphore;

/**
 * @author guisong.zhang
 * @date 2024/3/11 23:09
 * @description semapho测试类
 * 默认实现非公平锁
 */
public class SemaphoRunnerTest {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(2);
        for (int i = 0; i < 10; i++) {
            new Thread(new Task(semaphore, "张贵松-线程" + i)).start();
        }
    }

    static class Task extends Thread {
        Semaphore semaphore;

        public Task(Semaphore semaphore, String name) {
            super(name);
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                //获取资源
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + "获取到资源,时间:" + System.currentTimeMillis());
                Thread.sleep(5000);
                //释放资源
                semaphore.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
  • 输出结果
    在这里插入图片描述

  • 从打印结果可以看出,一次只有两个线程执行 acquire(),只有线程进行 release() 方法后才会有别的线程执行 acquire()。

Semapho源码流程图

Semapho示例代码git地址

3:AQS应用之CountDownLatch

CountDownLatch是什么?

  • CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行
  • 使用场景:Zookeeper分布式锁,Jmeter模拟高并发等

CountDownLatch如何工作?

  • CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。
  • 当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务

API

CountDownLatch.countDown()
CountDownLatch.await();

CountDownLatch应用场景例子

  • 比如陪媳妇去看病。医院里边排队的人很多,如果一个人的话,要先看大夫,看完大夫再去排队交钱取药。现在我们是双核,可以同时做这两个事(多线程)。
  • 假设看大夫花3秒钟,排队交费取药花5秒钟。我们同时搞的话,5秒钟我们就能完成,然后一起回家(回到主线程)

代码如下:

  • CountDownLatchRunner
package com.zgs.lock.countdown_latch;

import java.util.concurrent.CountDownLatch;

/**
 * @author: guisong.zhang
 * @date: 2024/3/12 15:12:09
 * @description CountDownLatch测试类
 **/
public class CountDownLatchRunner {
    public static void main(String[] args) throws InterruptedException {
        long timeNow = System.currentTimeMillis();
        CountDownLatch countDownLatch = new CountDownLatch(2);

        new Thread(new SeeDoctorTask(countDownLatch)).start();
        new Thread(new QueueTask(countDownLatch)).start();

        countDownLatch.await();
        System.out.println("等待所有线程执行完毕后继续执行——cost time:" + (System.currentTimeMillis() - timeNow));
    }

}
  • QueueTask
package com.zgs.lock.countdown_latch;

import java.util.concurrent.CountDownLatch;

/**
 * @author: guisong.zhang
 * @date: 2024/3/12 15:27:57
 * @description TODO
 **/
public class QueueTask extends Thread {
    private CountDownLatch countDownLatch;

    public QueueTask(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        try {
            System.out.println("开始在医院药房排队买药....");
            Thread.sleep(5000);
            System.out.println("排队成功,可以开始缴费买药");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (countDownLatch != null) {
                countDownLatch.countDown();
            }
        }
    }
}
  • SeeDoctorTask
package com.zgs.lock.countdown_latch;

import java.util.concurrent.CountDownLatch;

/**
 * @author: guisong.zhang
 * @date: 2024/3/12 15:28:13
 * @description TODO
 **/
public class SeeDoctorTask extends Thread {
    private CountDownLatch countDownLatch;

    public SeeDoctorTask(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        try {
            System.out.println("开始看医生");
            Thread.sleep(3000);
            System.out.println("结束看医生");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (null != countDownLatch) {
                countDownLatch.countDown();
            }
        }
    }
}

CountDownLatch源码流程图

CountDownLatch示例代码git地址

4:CyclicBarrier

  • 栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
  • CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

API

cyclicBarrier.await();

应用场景

  • 可以用于多线程计算数据,最后合并计算结果的场景。
  • 例如,用一个Excel保存了用户所有银行流水,每个Sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,
  • 先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,
  • 最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。

示例代码:

package com.zgs.lock.cyclicBarrier;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author: guisong.zhang
 * @date: 2024/3/12 16:52:33
 * @description CyclicBarrierRunner测试类
 **/

public class CyclicBarrierRunner {

    static class WorkerThread implements Runnable {
        private final CyclicBarrier barrier;
        private int id;

        public WorkerThread(CyclicBarrier barrier, int id) {
            this.barrier = barrier;
            this.id = id;
        }

        @Override
        public void run() {
            try {
                // 模拟线程做准备工作或任务执行
                System.out.println("Worker " + id + " started.");
                Thread.sleep(1000); // 假设执行耗时操作

                System.out.println("Worker " + id + " is about to reach the barrier.");

                // 当前线程到达屏障点并等待其他线程
                barrier.await();

                System.out.println("Worker " + id + " passed the barrier and can continue now.");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        int numberOfWorkers = 5; // 线程数量
        CyclicBarrier barrier = new CyclicBarrier(numberOfWorkers, () -> {
            System.out.println("All workers have reached the barrier. Executing a barrier action...");
        });

        ExecutorService executorService = Executors.newFixedThreadPool(numberOfWorkers);

        for (int i = 0; i < numberOfWorkers; i++) {
            executorService.execute(new WorkerThread(barrier, i + 1));
        }

        // 关闭线程池
        executorService.shutdown();
    }
}

CyclicBarrier示例代码git地址

5:AQS应用之BlockingQueue

  • BlockingQueue,是java.util.concurrent 包提供的用于解决并发生产者 消费者问题的最有用的类,它的特性是在任意时刻只有一个线程可以进行take或者put操作,并且BlockingQueue提供了超时return null的机制,在许多生产场景里都可以看到这个工具的身影。

队列类型

  1. 无限队列 (unbounded queue ) - 几乎可以无限增长
  2. 有限队列 ( bounded queue ) - 定义了最大容量

队列数据结构

队列实质就是一种存储数据的结构

  • 通常用链表或者数组实现
  • 一般而言队列具备FIFO先进先出的特性,当然也有双端队列(Deque)优先级队列
  • 主要操作:入队(EnQueue)与出队(Dequeue)
    在这里插入图片描述

常见的4种阻塞队列

  • ArrayBlockingQueue 由数组支持的有界队列
  • LinkedBlockingQueue 由链接节点支持的可选有界队列
  • PriorityBlockingQueue 由优先级堆支持的无界优先级队列
  • DelayQueue 由优先级堆支持的、基于时间的调度队列

ArrayBlockingQ

  • 队列基于数组实现,容量大小在创建ArrayBlockingQueue对象时已定义好
  • 数据结构如下图:
    在这里插入图片描述
  • 队列创建:
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>();
  • 应用场景
    在线程池中有比较多的应用,生产者消费者场景
  • 工作原理
    基于ReentrantLock保证线程安全,根据Condition实现队列满时的阻塞

LinkedBlockingQueue

  • 是一个基于链表的无界队列(理论上有界)
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();
  • 上面这段代码中,blockingQueue 的容量将设置为 Integer.MAX_VALUE 。
  • 向无限队列添加元素的所有操作都将永远不会阻塞,[注意这里不是说不会加锁保证线程安全],因此它可以增长到非常大的容量。
  • 使用无限 BlockingQueue 设计生产者 - 消费者模型时最重要的是 消费者应该能够像生产者向队列添加消息一样快地消费消息 。否则,内存可能会填满,然后就会得到一个 OutOfMemory 异常。

DelayQueue

  • 由优先级堆支持的、基于时间的调度队列,内部基于无界队列PriorityQueue实现,而无界队列基于数组的扩容实现。
  • 队列创建
BlockingQueue<String> blockingQueue = new DelayQueue();
  • 要求
    入队的对象必须要实现Delayed接口,而Delayed集成自Comparable接口
  • 应用场景
    电影票
  • 工作原理:
    队列内部会根据时间优先级进行排序。延迟类线程池周期执行。

BlockingQueue API

  • BlockingQueue 接口的所有方法可以分为两大类:负责向队列添加元素的方法和检索这些元素的方法。在队列满/空的情况下,来自这两个组的每个方法的行为都不同。

添加元素

在这里插入图片描述

检索元素

在这里插入图片描述
在构建生产者 - 消费者程序时,这些方法是 BlockingQueue 接口中最重要的构建块。

多线程生产者-消费者示例

  • 代码git地址

  • 代码说明

  • 接下来我们创建一个由两部分组成的程序 - 生产者 ( Producer ) 和消费者 ( Consumer) 。

  • 生产者将生成一个 0 到 100 的随机数(十全大补丸的编号),并将该数字放在BlockingQueue 中。我们将创建 16 个线程(潘金莲)用于生成随机数并使用 put() 方法阻塞,直到队列中有可用空间。

  • 需要记住的重要一点是,我们需要阻止我们的消费者线程无限期地等待元素出现在队列中。

  • 从生产者(潘金莲)向消费者(武大郎)发出信号的好方法是,不需要处理消息,而是发送称为毒( poison ) 丸 ( pill ) 的特殊消息。 我们需要发送尽可能多的毒 ( poison )丸 ( pill ) ,因为我们有消费者(武大郎)。然后当消费者从队列中获取特殊的毒 (poison ) 丸 ( pill )消息时,它将优雅地完成执行。

  • 以下生产者的代码:

package com.zgs.lock;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;

/**
 * @author: guisong.zhang
 * @date: 2024/3/7 15:26:09
 * @description TODO
 **/
public class NumbersProducer implements Runnable {
    private BlockingQueue<Integer> numbersQueue;
    private final int poisonPill;
    private final int poisonPillPerProducer;

    public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
        this.numbersQueue = numbersQueue;
        this.poisonPill = poisonPill;
        this.poisonPillPerProducer = poisonPillPerProducer;
    }

    @Override
    public void run() {
        try {
            generateNumbers();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void generateNumbers() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
            System.out.println("潘金莲‐" + Thread.currentThread().getId() + "-号,给武大郎的泡药!");
        }
        for (int j = 0; j < poisonPillPerProducer; j++) {
            numbersQueue.put(poisonPill);
            System.out.println("潘金莲‐" + Thread.currentThread().getId() + "-号,往武大郎的药里放入第" + j + 1 + "颗毒丸!");
        }
    }
}
  • 我们的生成器构造函数将 BlockingQueue 作为参数,用于协调生产者和使用者之间的处理。我们看到方法 generateNumbers() 将 100 个元素(生产100副药给武大郎吃)放入队列中。
  • 它还需要有毒 ( poison ) 丸 ( pill ) (潘金莲给武大郎下毒)消息,以便知道在执行完成时放入队列的消息类型。该消息需要将 poisonPillPerProducer 次放入队列中。
  • 每个消费者将使用 take() 方法从 BlockingQueue 获取一个元素,因此它将阻塞,直到队列中有一个元素。从队列中取出一个 Integer 后,它会检查该消息是否是毒 ( poison) 丸 ( pill )(武大郎看潘金莲有没有下毒) ,
  • 如果是,则完成一个线程的执行。否则,它将在标准输出上打印出结果以及当前线程的名称。
package com.zgs.lock;

import java.util.concurrent.BlockingQueue;

/**
 * @author: guisong.zhang
 * @date: 2024/3/7 15:30:16
 * @description TODO
 **/
public class NumbersConsumer implements Runnable {
    private BlockingQueue<Integer> queue;
    private final int poisonPill;

    public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
        this.queue = queue;
        this.poisonPill = poisonPill;
    }

    public void run() {
        try {
            while (true) {
                Integer number = queue.take();
                if (number.equals(poisonPill)) {
                    return;
                }
                System.out.println("武大郎‐" + Thread.currentThread().getId() + "-号,喝药‐编号:" + number);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
  • 需要注意的重要事项是队列的使用。与生成器构造函数中的相同,队列作为参数传递。我们可以这样做,是因为 BlockingQueue 可以在线程之间共享而无需任何显式同步。
  • 既然我们有生产者和消费者,我们就可以开始我们的计划。我们需要定义队列的容量,并将其设置为 10个元素
  • 我们创建4 个生产者线程,并且创建等于可用处理器数量的消费者线程:
package com.zgs.lock;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * @author: guisong.zhang
 * @date: 2024/3/7 15:34:24
 * @description TODO
 **/
public class Main {

    public static void main(String[] args) {
        int BOUND = 10;
        int N_PRODUCERS = 16;
        int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
        int poisonPill = Integer.MAX_VALUE;
        int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
        int mod = N_CONSUMERS % N_PRODUCERS;

        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);
        //潘金莲给武大郎熬药
        for (int i = 1; i < N_PRODUCERS; i++) {
            new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
        }
        //武大郎开始喝药
        for (int j = 0; j < N_CONSUMERS; j++) {
            new Thread(new NumbersConsumer(queue, poisonPill)).start();
        }
        //潘金莲开始投毒,武大郎喝完毒药GG
        new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();
    }

}
  • BlockingQueue 是使用具有容量的构造创建的。我们正在创造 4 个生产者和 N 个消费者(武大郎)。
  • 我们将我们的毒 ( poison ) 丸 ( pill )消息指定为 Integer.MAX_VALUE,因为我们的生产者在正常工作条件下永远不会发送这样的值。
  • 这里要注意的最重要的事情是 BlockingQueue 用于协调它们之间的工作。

BlockingQueue各种队列底层流程图

BlockingQueue代码示例

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/452375.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

string的使用

前言 我们前面已经介绍了C的基本知识&#xff0c;本期开始我们将进入C的第二部分&#xff0c;也是非常重要的一个部分&#xff01;他就是STL&#xff01;本期我们来先介绍string及其使用&#xff01; 本期内容介绍 STL介绍 为什么要学习string? string的常用接口介绍 一、ST…

【Twinmotion】Twinmotion导入UE5

步骤 1. 在虚幻商城中安装“Datasmith Twinmotion导入器插件” 安装“面向虚幻引擎的Twinmotion内容” 2. 打开虚幻引擎&#xff0c;在插件中搜索“twinmotion”&#xff0c;勾选如下两个插件&#xff0c;然后重启虚幻引擎 3. 打开Twinmotion&#xff0c;随便添加一个物体 导出…

NAND和NOR Flash 完全学习笔记

本文要点&#xff1a; NAND FLASH与NOR FLASH 的技术对比&#xff1b;最详细的存储单元对比详解&#xff1b;NAND FLASH与NOR FLASH 的最新市场份额及应用&#xff1b;NAND FLASH与NOR FLASH 的基础原理分析。 目前&#xff0c;NOR FLASH和NAND FLASH是市场上主要的非易失性闪…

李彦宏:在中文上文心大模型4.0已经超过了GPT-4!如何优雅地反驳

大家好&#xff0c;我是木易&#xff0c;一个持续关注AI领域的互联网技术产品经理&#xff0c;国内Top2本科&#xff0c;美国Top10 CS研究生&#xff0c;MBA。我坚信AI是普通人变强的“外挂”&#xff0c;所以创建了“AI信息Gap”这个公众号&#xff0c;专注于分享AI全维度知识…

<逻辑回归算法(Logistic regression)>——《机器学习算法初识》

目录 一、 逻辑回归介绍 1 逻辑回归的应用场景 2 逻辑回归的原理 2.1 输入 2.2 激活函数 3 损失以及优化 3.1 损失 3.2 优化 4 小结 二、逻辑回归api介绍 实现过程&#xff1a; 三、分类评估方法 1.分类评估方法 1.1 精确率与召回率 1.1.1 混淆矩阵 1.1.2 精确…

【五、接口自动化测试】5分钟掌握python + requests接口测试

你好啊&#xff01;我是山茶&#xff0c;一个持续探索AI 测试的程序员&#xff01; 在做接口测试时&#xff0c;在python中内置了HTTP库 urllib&#xff0c;可以用于发送http请求。基于urllib二次封装的三方库Requests&#xff0c;相较于urllib更佳简介易用。所以&#xff0c;…

LED基础知识分享(一)

大家好&#xff0c;我是砖一。 今天给大家分享一下&#xff0c;LED的基础知识&#xff0c;有照明行业&#xff0c;或者对LED感兴趣的朋友&#xff0c;可以学习一下&#xff0c;希望对你有用~ 一&#xff0c;什么是LED (Light Emitting Diode)? 1&#xff0c;LED是一种发出某…

MathType7最新软件产品秘钥2024中文版

MathType 7是一款功能强大的数学公式编辑器&#xff0c;专为教育工作者、学生、科研人员以及任何需要处理数学公式的人群设计。以下是对MathType 7的详细介绍&#xff1a; 一、功能特点&#xff1a; 广泛的符号和模板支持&#xff1a;MathType 7支持各种数学符号、公式、方程…

Centos7 安装postgresql14后无法连接数据库

1、数据库服务器允许外部访问5432端口。 2、postgresql.conf 3、pg_hba.conf a、制定某个IP&#xff08;192.168.0.107&#xff09;访问 b、指定ip段访问 允许10.1.1.0~10.1.1.255网段登录数据库 host all all 10.1.1.0/24 trust c、指定全网访问 host a…

mysql5.6---windows和linux安装教程和忘记密码怎么办

一、windows安装 1.完成解压 解压完成之后将其放到你喜欢的地址当中去&#xff0c;这里我默认放在了D盘&#xff0c;这是我的根目录 2.配置环境变量 我的电脑->属性->高级->环境变量->系统变量 选择PATH,在其后面添加: (注意自己的安装地址) D:\mysql-5.6.49…

【C++庖丁解牛】vector容器的简易模拟实现(C++实现)(最后附源码)

&#x1f341;你好&#xff0c;我是 RO-BERRY &#x1f4d7; 致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 &#x1f384;感谢你的陪伴与支持 &#xff0c;故事既有了开头&#xff0c;就要画上一个完美的句号&#xff0c;让我们一起加油 目录 前言vector容器代码实现内…

【测试开发学习历程】Linux用户管理+文件权限管理

目录 一、用户管理 &#xff08;一&#xff09;用户和用户组的基本概念 1.概念 2.设置原因 3.用户与用户组的关系 4.用户类型 &#xff08;二&#xff09;用户的创建、修改属性和删除用户 1.用户信息文件 2.创建用户 3.修改用户密码 4.修改用户信息 5.用户查询 6.…

5.shell if判断语句

shell-if判断语句 1.什么是if2.为什么要用if3.if基础语法4.基于文件进行判断5.基于整数比对6.基于字符比对7.基于正则比对 1.什么是if if其实就是模仿人类的判断来进行的&#xff0c;要么真、要么假、就这两种结果。 2.为什么要用if 判断 3.if基础语法 单条件 if [ 如果你…

RocketMQ快速入门

RocketMQ快速入门 准备工作下载RocketMQ环境要求 JDK下载安装JDK下载JDK安装 安装RocketMQ安装步骤目录介绍 启动RocketMQ测试RocketMQ发送消息接收消息 关闭RocketMQ RocketMQ是阿里巴巴2016年开源的MQ中间件&#xff0c;使用Java语言开发&#xff0c;在阿里内部&#xff0c;R…

String、StringBuilder、StringBuffer 有什么区别?

1、典型回答 String、StringBuilder 和 StringBuffer 都是 Java 语言中&#xff0c;用于操作字符串的类&#xff0c;但它们在性能、可变性和线程安全性方面有一些区别 1、String&#xff1a;不可变字符串类&#xff0c;也就是说一旦创建&#xff0c;它的值就不可变。每次对 S…

数据库基础理论知识

1.基本概念 数据(Data)&#xff1a;数据库存储的基本对象。数字、字符串、图形、图像、音频、视频等数据库(DB)&#xff1a;在计算机内&#xff0c;永久存储、有组织、可共享的数据集合数据库管理系统(DBMS)&#xff1a;管理数据库的系统软件数据库系统(DBS):DBDBMSDBADBAP 数…

【spring】-多模块构建二-问题整理

1、bean注入问题 The injection point has the following annotations: - org.springframework.beans.factory.annotation.Autowired(requiredtrue) 解决1&#xff1a; 由于引入的bean类 不属于启动类的子模块下&#xff0c;需要在启动类手动声明扫描的类 也适用于公共子模…

Ribbon-负载均衡

目录 一、负载均衡的作用位置 二、Ribbon负载均衡的工作流程 三、IRule接口 负载均衡的策略&#xff1a; 修改负载均衡策略&#xff08;即修改使用的IRule接口的实现类&#xff09;&#xff1a; 四、饥饿加载 五、总结 前置知识&#xff1a;Eureka注册中心 不熟悉Eureka的…

【Emgu CV教程】9.3、形态学常用操作之开运算

文章目录 一、相关概念1.什么叫开运算3.开运算的函数 二、演示1.原始素材2.代码3.运行结果 一、相关概念 1.什么叫开运算 腐蚀、膨胀已经讲完&#xff0c;这两个是最基础的形态学操作。这次讲的是开运算&#xff0c;它是一个先腐蚀、后膨胀的过程。原始图像先被腐蚀&#xff…

使用Anaconda创建Python指定版本的虚拟环境

由于工作的需要和学习的需要&#xff0c;需要创建不同Python版本的虚拟环境。 比如zdppy的框架&#xff0c;主要支持的是Python3.8的版本&#xff0c;但是工作中FastAPI主要使用的是3.11的版本&#xff0c;所以本地需要两套Python环境。 决定使用Anaconda虚拟环境管理的能力&…