文章目录
- 一、生产者与消费者问题
- (1)概要
- (2)案例
- 1、案例描述及需要明确的问题
- 2、整体框架构思
- 3、生产者和消费者的数据共享问题
- 4、对Clerk类里面方法的设计
- 5、测试
- 6、唤醒机制
- 7、两个消费者
- 二、是否释放锁的操作
- (1)释放锁的操作
- (2)不会释放锁的操作
一、生产者与消费者问题
(1)概要
等待唤醒机制可以解决经典的“生产者与消费者”的问题。生产者与消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。
该问题描述了两个(多个)共享固定大小缓冲区的线程
——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。
生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。
与此同时,消费者也在缓冲区消耗这些数据。
该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
☕生产者与消费者问题中其实隐含了两个问题:
- 线程安全问题:因为生产者与消费者共享数据缓冲区,产生安全问题。不过这个问题可以使用同步解决。
- 线程的协调工作问题:
- 要解决该问题,就必须让生产者线程在缓冲区满时等待(wait),暂停进入阻塞状态,等到下次消费者消耗了缓冲区中的数据的时候,通知(notify)正在等待的线程恢复到就绪状态,重新开始往缓冲区添加数据。
- 同样,也可以让消费者线程在缓冲区空时进入等待(wait),暂停进入阻塞状态,等到生产者往缓冲区添加数据之后,再通知(notify)正在等待的线程恢复到就绪状态。通过这样的通信机制来解决此类问题。
(2)案例
1、案例描述及需要明确的问题
举例:生产者&消费者
🌋案例描述
生产者(Productor)将产品交给店员(Clerk),而消费者(Customer)从店员处取走产品。
店员一次只能持有固定数量的产品(比如:20),如果生产者试图生产更多的产品,店员会叫生产者停一下,如果店中有空位放产品了再通知生产者继续生产;
如果店中没有产品了,店员会告诉消费者等一下,如果店中有产品了再通知消费者来取走产品。
类似的场景,比如厨师和服务员等。
🍻几个需要明确的问题
①是否是多线程问题?
是。多线程问题至少要有两个线程。
线程有:生产者
(负责生产),消费者
(负责消费)。
注意店员不是,可以理解为生产者和消费者共同操作的数据。
②是否有共享数据?
有。若多个线程没有共享数据的话,也不用考虑线程安全问题。
共享数据是:产品
。
生产者让产品增加,消费者让产品减少。
③是否有线程安全问题?
有。因为有共享数据。
④是否需要考虑处理线程安全问题?如何处理?
需要。可以使用同步机制。
⑤是否存在线程间的通信?如何体现?
存在。
生产者生产到20就停止生产,需要wait;若消费完了,消费者也需要wait。
一旦消费到头了,当生产者又生产了一个,消费者就会被notify唤醒,继续消费。
一旦生产到头了,当消费者又消费了一个,生产者就会被notify唤醒,继续生产。
⑥如何设计解决?
用面向对象的编程思想。
比如线程就需要用相应类的对象去体现。
共享数据,若是简单的可以用变量体现,复杂的话可以设计成类。
2、整体框架构思
🍰分析
只要涉及到多线程问题,就需要使用两种创建多线程的方式,实现Runnable接口和继承Thread类。
就以继承Thread类
来演示。
共享数据是“产品”,产品最终交给店员来处理了,店员来唤醒生产者和消费者。
可以将产品封装在店员这里。
大方向上,有这样几个类,一个店员Clerk
,一个生产者Producer
,一个消费者Consumer
。
如下:
public class ProducerConsumerTest {
}
class Clerk{ //店员
}
class Producer{ //生产者
}
class Consumer{ //消费者
}
用继承的方式来写,那么先让它们继承Thread
,并重写run
方法。如下:
class Producer extends Thread{ //生产者
@Override
public void run() {
}
}
class Consumer extends Thread{ //消费者
@Override
public void run() {
}
}
在Clerk
类里面定义一个产品的数量productNum
,如下:
class Clerk{ //店员
private int productNum=0; //产品数量
}
让生产者去生产,就相当于调用Clerk里面的productNum
,让它增加;消费者则是让它减少。
增加/减少通常用方法来体现。(对属性的修改通常用方法来体现)
如下:
class Clerk{ //店员
private int productNum=0; //产品数量
//增加产品数量的方法(对属性的修改通常用方法来体现)
public void addProduct(){
}
//减少产品数量的方法
public void minusProduct(){
}
}
3、生产者和消费者的数据共享问题
现在生产者只需要调用Clerk类里面的addProduct()
方法即可,如何去调用呢?
其实现在操作的就是共享数据,但是目前生产者与消费者没有体现共享的概念啊?
因为产品数量声明在Clerk类
里面的,那我们就来声明一个Clerk类型的变量。如下:
给这个clerk
变量赋值,可以使用构造器:(通过构造器的方式给clerk初始化)
class Producer extends Thread{ //生产者
private Clerk clerk;
//构造器
public Producer(Clerk clerk){
this.clerk=clerk;
}
//...
}
那么在run
方法里面,就可以使用clerk,让它调用addProduct()
来生产产品。如下:
class Producer extends Thread{ //生产者
private Clerk clerk;
//构造器
public Producer(Clerk clerk){
this.clerk=clerk;
}
@Override
public void run() {
while(true){
System.out.println("生产者开始生产产品啦...");
clerk.addProduct();
}
}
}
当然,可以让它生产慢一点,在这里sleep()
一下:
class Producer extends Thread{ //生产者
private Clerk clerk;
//构造器
public Producer(Clerk clerk){
this.clerk=clerk;
}
@Override
public void run() {
while(true){
System.out.println("生产者开始生产产品啦...");
try {
Thread.sleep(50); //50ms生产一个产品
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.addProduct();
}
}
}
在消费者里面是同样的逻辑:
class Consumer extends Thread{ //消费者
private Clerk clerk;
public Consumer(Clerk clerk){ //通过构造器给clerk赋值
this.clerk=clerk;
}
@Override
public void run() {
while(true){
System.out.println("消费者开始消费产品喽...");
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.minusProduct();
}
}
}
通过构造器给clerk
赋值,在具体造线程的时候,放同一个clerk
即可保证“共享”。
🎲有的小伙伴可能会疑惑:这个clerk不是每个类中的私有属性吗,怎么就共享了呢?
其实各自的Clerk都不是自己new的,可以在构造初始化的时候给他们传同一个clerk对象。
Java只有值传递,传一个类的时候实际传递的是堆中的空间地址,因此是共享的。
之前说过一个案例,不同的Consumer
,在创建多个线程的时候,用的是同一个Account act
,那自然就是共享的。如下:
public class AccountTest {
public static void main(String[] args) {
Account acct=new Account();
Customer customer1=new Customer(acct,"小旺");
Customer customer2=new Customer(acct,"小岁");
customer1.start();
customer2.start();
}
}
class Account{ //账户
private double balance; //余额
public synchronized void deposit(double amt){ //this:是唯一的,即为actt
if(amt>0){
balance+=amt;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"存钱1000元,余额为:"+balance);
}
}
class Customer extends Thread{
Account account;
//构造器
public Customer(Account acct){
this.account=acct;
}
public Customer(Account acct,String name){
super(name);
this.account=acct;
}
@Override
public void run() {
for (int i = 0; i < 3; i++) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
account.deposit(1000);
}
}
}
4、对Clerk类里面方法的设计
接下来两个线程的执行都到Clerk
类里面去操作了。
所以我们将重心放在Clerk里面。
首先是增加产品,最先想到的就是让产品增加一下:
class Clerk{ //店员
private int productNum=0; //产品数量
//增加产品数量的方法(对属性的修改通常用方法来体现)
public void addProduct(){
productNum++; //让产品加一
System.out.println(Thread.currentThread().getName()+"生产了第"+productNum+"个产品");
}
}
比如现在的产品数量是0,调用addProduct()
方法的时候,productNum
加一,然后输出”某线程生产了第1个产品“。
有一个需要注意的,让产品增加的时候,不能无限制增加。
需要判断一下,若是产品数量大于等于20的话,需要等待,不能继续生产了。如下:
记得处理一下wait()
的异常。
wait()
需要使用在同步代码块或同步方法当中,productNum
就是共享数据,而addProduct()
方法就是在操作共享数据,所以
我们可以将addProduct()
声明为共享方法,如下:
同步监视器是this
,wait()
的调用者也是this
,所以此时没有问题。
若产品数量大于等于20就wait()
等待,不满足这个条件就生产产品。
接下来看一下消费者,注意需要先输出再让产品减一:
//减少产品数量的方法
public void minusProduct(){
System.out.println(Thread.currentThread().getName()+"消费了第"+productNum+"个产品");
productNum--; //让产品减一
}
当然,也不能无限制消费,当产品数量小于等于0的时候,需要等待:
类Clerk
的代码如下:
class Clerk{ //店员
private int productNum=0; //产品数量
//增加产品数量的方法(对属性的修改通常用方法来体现)
public synchronized void addProduct(){ //wait()需要在同步代码块或同步方法中去使用
if(productNum>=20){ //产品数量大于等于20
//等待
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
productNum++; //让产品加一
System.out.println(Thread.currentThread().getName()+"生产了第"+productNum+"个产品");
}
//减少产品数量的方法
public synchronized void minusProduct(){ //wait()需要在同步代码块或同步方法中去使用
if(productNum<=0){ //产品数量小于等于0
//等待
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+"消费了第"+productNum+"个产品");
productNum--; //让产品减一
}
}
5、测试
在测试类ProducerConsumerTest
里面创建共享数据clerk
,然后创建生产者和消费者对象。
如下:
public class ProducerConsumerTest {
public static void main(String[] args) {
Clerk clerk=new Clerk(); //clerk本身就充当了共享数据
Producer pro1=new Producer(clerk); //创建生产者对象pro1
Consumer con1=new Consumer(clerk); //创建消费者对象con1
}
}
起个名字,然后各自去调用start()
方法,它们就各自去执行各自的run()
方法。
如下:
public class ProducerConsumerTest {
public static void main(String[] args) {
Clerk clerk=new Clerk(); //clerk本身就充当了共享数据
Producer pro1=new Producer(clerk); //创建生产者对象pro1
Consumer con1=new Consumer(clerk); //创建消费者对象con1
pro1.setName("生产者1");
con1.setName("消费者1");
pro1.start();
con1.start();
}
}
🌱代码
package yuyi04.Communication;
/**
* ClassName: ProducterConsumerTest
* Package: yuyi04.Communication
* Description:
*
* @Author 雨翼轻尘
* @Create 2024/2/2 0002 17:15
*/
public class ProducerConsumerTest {
public static void main(String[] args) {
Clerk clerk=new Clerk(); //clerk本身就充当了共享数据
Producer pro1=new Producer(clerk); //创建生产者对象pro1
Consumer con1=new Consumer(clerk); //创建消费者对象con1
pro1.setName("生产者1");
con1.setName("消费者1");
pro1.start();
con1.start();
}
}
class Clerk{ //店员
private int productNum=0; //产品数量
//增加产品数量的方法(对属性的修改通常用方法来体现)
public synchronized void addProduct(){ //wait()需要在同步代码块或同步方法中去使用
if(productNum>=20){ //产品数量大于等于20
//等待
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
productNum++; //让产品加一
System.out.println(Thread.currentThread().getName()+"生产了第"+productNum+"个产品");
}
//减少产品数量的方法
public synchronized void minusProduct(){ //wait()需要在同步代码块或同步方法中去使用
if(productNum<=0){ //产品数量小于等于0
//等待
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+"消费了第"+productNum+"个产品");
productNum--; //让产品减一
}
}
class Producer extends Thread{ //生产者
private Clerk clerk;
//构造器
public Producer(Clerk clerk){
this.clerk=clerk;
}
@Override
public void run() {
while(true){
System.out.println("生产者开始生产产品啦...");
try {
Thread.sleep(50); //50ms生产一个产品
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.addProduct();
}
}
}
class Consumer extends Thread{ //消费者
private Clerk clerk;
public Consumer(Clerk clerk){ //通过构造器给clerk赋值
this.clerk=clerk;
}
@Override
public void run() {
while(true){
System.out.println("消费者开始消费产品喽...");
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.minusProduct();
}
}
}
🍺输出(部分)
可以看到,消费者消费的时候没有产品,就会wait()
,因为没有唤醒机制,生产者就会一直生产,生产到第20个的时候也开始wait()
,然后程序就在一直等待。
6、唤醒机制
刚才没有唤醒机制,导致程序无法执行结束,两个线程一直等待。
🎲那什么时候唤醒呢?
对于生产者来说,当生产满20就wait()
等待,当消费者消费了1个就可以唤醒它继续生产了。
对于消费者来说,当没有可以消费的产品的时候(即产品小于等于0的时候)就进入wait()
等待,当生产者生产了1个产品就可以唤醒它继续消费了。
所以,生产者生产了1个产品就可以唤醒消费者,消费者消费了1个产品就可以唤醒生产者。互相唤醒。
如下:
🌱代码
package yuyi04.Communication;
/**
* ClassName: ProducterConsumerTest
* Package: yuyi04.Communication
* Description:
*
* @Author 雨翼轻尘
* @Create 2024/2/2 0002 17:15
*/
public class ProducerConsumerTest {
public static void main(String[] args) {
Clerk clerk=new Clerk(); //clerk本身就充当了共享数据
Producer pro1=new Producer(clerk); //创建生产者对象pro1
Consumer con1=new Consumer(clerk); //创建消费者对象con1
pro1.setName("生产者1");
con1.setName("消费者1");
pro1.start();
con1.start();
}
}
class Clerk{ //店员
private int productNum=0; //产品数量
//增加产品数量的方法(对属性的修改通常用方法来体现)
public synchronized void addProduct(){ //wait()需要在同步代码块或同步方法中去使用
if(productNum>=20){ //产品数量大于等于20
//等待
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
productNum++; //让产品加一
System.out.println(Thread.currentThread().getName()+"生产了第"+productNum+"个产品");
//生产了一个就可以唤醒消费者
notify();
}
//减少产品数量的方法
public synchronized void minusProduct(){ //wait()需要在同步代码块或同步方法中去使用
if(productNum<=0){ //产品数量小于等于0
//等待
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+"消费了第"+productNum+"个产品");
productNum--; //让产品减一
//只要消费者消费了一个,就可以唤醒生产者继续生产
notify();
}
}
class Producer extends Thread{ //生产者
private Clerk clerk;
//构造器
public Producer(Clerk clerk){
this.clerk=clerk;
}
@Override
public void run() {
while(true){
System.out.println("生产者开始生产产品啦...");
try {
Thread.sleep(50); //50ms生产一个产品
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.addProduct();
}
}
}
class Consumer extends Thread{ //消费者
private Clerk clerk;
public Consumer(Clerk clerk){ //通过构造器给clerk赋值
this.clerk=clerk;
}
@Override
public void run() {
while(true){
System.out.println("消费者开始消费产品喽...");
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.minusProduct();
}
}
}
🍺输出(部分)
可以看到一直在生产和消费,停不下来了。
这是因为我们设置的生产和消费sleep
时间都是一样的50ms,所以生产一个再消费一个,停不下来了。
现在我们将消费改为100ms,消费慢一点。如下:
再次输出:
生产的比较快,到20的时候会有等待的行为,然后消费一个,生产者再生产一个,无止尽。
7、两个消费者
现在我们再整一个消费者con2,如下:
public class ProducerConsumerTest {
public static void main(String[] args) {
Clerk clerk=new Clerk(); //clerk本身就充当了共享数据
Producer pro1=new Producer(clerk); //创建生产者对象pro1
Consumer con1=new Consumer(clerk); //创建消费者对象con1
Consumer con2=new Consumer(clerk); //创建消费者对象con1
pro1.setName("生产者1");
con1.setName("消费者1");
con2.setName("消费者2");
pro1.start();
con1.start();
con2.start();
}
}
记得在生产了一个产品的时候将两个消费者都唤醒:
还有一个小细节,生产者将两个消费者都唤醒之后,两个消费者就会从被wait()的位置继续向下执行,而此时只有1个商品了。
这两个消费者可能都消费商品,某一个消费者消费商品使得产品数量变成了0,另一个消费者再去消耗产品就会使得产品数量变成了负数,不太合适。
如下:
所以我们需要将这些逻辑写入else中:
此时消费者若是唤醒之后,需要重新进入同步方法,去握锁。
已经在if语句里的wait()
被唤醒后,就不会再进入else,而是直接跳出这个if-else语句,去抢minusProduct
方法的锁,两个方法谁先抢到,谁就先消耗资源,后进来那经过判断进入if语句,休眠。
当线程多个
wait
时,同时全部唤醒是有可能一次性执行代码的,这是因为:wait解除之后代码不是从头开始执行,而是从wait开始。
对应的,生产者也稍微调整一下代码:
🌱代码
package yuyi04.Communication;
/**
* ClassName: ProducterConsumerTest
* Package: yuyi04.Communication
* Description:
*
* @Author 雨翼轻尘
* @Create 2024/2/2 0002 17:15
*/
public class ProducerConsumerTest {
public static void main(String[] args) {
Clerk clerk=new Clerk(); //clerk本身就充当了共享数据
Producer pro1=new Producer(clerk); //创建生产者对象pro1
Consumer con1=new Consumer(clerk); //创建消费者对象con1
Consumer con2=new Consumer(clerk); //创建消费者对象con1
pro1.setName("生产者1");
con1.setName("消费者1");
con2.setName("消费者2");
pro1.start();
con1.start();
con2.start();
}
}
class Clerk{ //店员
private int productNum=0; //产品数量
//增加产品数量的方法(对属性的修改通常用方法来体现)
public synchronized void addProduct(){ //wait()需要在同步代码块或同步方法中去使用
if(productNum>=20){ //产品数量大于等于20
//等待
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}else{
productNum++; //让产品加一
System.out.println(Thread.currentThread().getName()+"生产了第"+productNum+"个产品");
//生产了一个就可以唤醒消费者
notifyAll();
}
}
//减少产品数量的方法
public synchronized void minusProduct(){ //wait()需要在同步代码块或同步方法中去使用
if(productNum<=0){ //产品数量小于等于0
//等待
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}else{
System.out.println(Thread.currentThread().getName()+"消费了第"+productNum+"个产品");
productNum--; //让产品减一
//只要消费者消费了一个,就可以唤醒生产者继续生产
notifyAll();
}
}
}
class Producer extends Thread{ //生产者
private Clerk clerk;
//构造器
public Producer(Clerk clerk){
this.clerk=clerk;
}
@Override
public void run() {
while(true){
System.out.println("生产者开始生产产品啦...");
try {
Thread.sleep(50); //50ms生产一个产品
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.addProduct();
}
}
}
class Consumer extends Thread{ //消费者
private Clerk clerk;
public Consumer(Clerk clerk){ //通过构造器给clerk赋值
this.clerk=clerk;
}
@Override
public void run() {
while(true){
System.out.println("消费者开始消费产品喽...");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.minusProduct();
}
}
}
🍺输出(部分)
现在就没什么问题了。
二、是否释放锁的操作
任何线程进入同步代码块
、同步方法
之前,必须先获得对同步监视器的锁定,那么何时会释放对同步监视器的锁定呢?
(1)释放锁的操作
- 当前线程的同步方法、同步代码块执行结束。
- 当前线程在同步代码块、同步方法中遇到
break
、return
终止了该代码块、该方法的继续执行。 - 当前线程在同步代码块、同步方法中出现了未处理的Error或Exception,导致当前线程异常结束。
- 当前线程在同步代码块、同步方法中执行了锁对象的
wait()
方法,当前线程被挂起,并释放锁。
(2)不会释放锁的操作
- 线程执行同步代码块或同步方法时,程序调用
Thread.sleep()
、Thread.yield()
方法暂停当前线程的执行。 - 线程执行同步代码块时,其他线程调用了该线程的
suspend()
方法将该该线程挂起,该线程不会释放锁(同步监视器)。 - 应尽量避免使用
suspend()
和resume()
这样的过时来控制线程。