【JavaEE】生产者消费者模式

作者主页:paper jie_博客

本文作者:大家好,我是paper jie,感谢你阅读本文,欢迎一建三连哦。

本文于《JavaEE》专栏,本专栏是针对于大学生,编程小白精心打造的。笔者用重金(时间和精力)打造,将MySQL基础知识一网打尽,希望可以帮到读者们哦。

其他专栏:《MySQL》《C语言》《javaSE》《数据结构》等

内容分享:本期将会分享设计模式中的生产者消费者模式

目录

什么是阻塞队列

什么是生产者消费者模式

生产者消费者模式的特点

 Java标准库中的阻塞队列

自定义实现一个阻塞队列

普通队列

 实现线程安全

正解

实现阻塞

正解

 基于自己实现的阻塞队列实现一个简单的生产者消费者模型


什么是阻塞队列

阻塞队列它也是队列,遵守着先进先出的原则.且它是一种线程安全的数据结构.它有两个特性:

1. 当队列满的时候,继续入队就会进行堵塞,直到有其他线程从队列中拿走元素才会解除堵塞.

2. 当队列为空的时候,继续出队就会进行堵塞,直到有其他线程从队列中插入元素后才会解除堵塞.

我们的阻塞队列最经典的使用场景就是生产者消费者模式.

举个栗子:

在我们家中,捏饺子一般有两个步骤: 1. 捏饺子皮, 2.包饺子. 假设有三个人.一个滑稽捏饺子皮,其他两个滑稽包饺子. 滑稽捏好饺子皮厚后就会放到板子上,而其他的滑稽就不用直接去捏饺子皮的滑稽手上拿饺子皮,而是去板子上拿即可.这样他们直接就只需要关注这个板子即可. 而这个板子就充当了我们的阻塞队列.

什么是生产者消费者模式

生产者消费者模式就是基于一个中间容器来解决它们之间的强耦合性问题.而这个中间容器就是阻塞队列.加入阻塞队列后,生产者与消费者之间就不会直接联系,而是通过通过阻塞队列来进行数据传输.这样生产者生产数据就不用知道是谁来处理他的数据,不用等待,直接交给阻塞队列即可.而消费者也不会去找生产者要数据,而是去阻塞队列里拿.

生产者消费者模式的特点

1. 通过阻塞队列可以降低生产者与消费者之间的耦合性

假设有三个服务器ABC,BC是将数据处理到,A是接受它们的数据.如果在没有加入阻塞队列的情况下.可能就会发生: 当B或者C挂了,这可能就会导致A也会挂,它们之间是强耦合的关系.因为B或者C的操作中需要涉及到一些关于A的操作.而A的操作也会涉及到一些关于B或C的操作.

但是当加入阻塞队列后就会发生不一样的结果. C,B处理好的结果只需要放到阻塞对列中,而A也只需要去阻塞队列中拿即可.这样B,C的操作对于A的影响就会很小,从而降低了他们之间的耦合性.

 

2. 削峰填谷 

这里就是加入阻塞队列起到一个平衡生产者与消费者之间的处理能力, 加入阻塞队列就可以防止当生产者一下生产出大量数据,而消费者一时间消费不了而导致挂了的问题.

举个栗子:

假设一个场景: 客户端发出请求,服务A接受请求,然后将请求交给BC处理器进行逻辑处理. 在正常情况下处理器是可以及时处理的.但是在一些特殊的时候会有一些突发峰值.外界客户端的请求非常的多.A接受这些请求一下子全部交给B,C服务器来处理,它们一下子可能就会支撑不住. 因为B,C需要就行逻辑处理业务,需要的资源开销就会比较大.如果一下给它大量的请求进行处理,处理器的资源可能就会超过它的上限而导致机器挂了.

但是在加入阻塞队列后就不用担心这种情况了. 就算客户端有大量的请求,A接受后也是传送给阻塞队列,再由B,C去阻塞队列中拿数据处理来慢慢消化.这就算数据再多,只要A服务器,阻塞队列不挂(阻塞队列和A服务器抗压能力很强,它们只需要进行存储数据和传送数据),BC也可以按正常速度进行处理.

 Java标准库中的阻塞队列

在Java标准库中也提供了阻塞队列,如果我们需要使用阻塞队列,只需要使用标准库中的即可. 库中的阻塞队列叫BlockingQueue,它是一个interface接口,它实现的的类有:

ArrayBlockingQueue

LinkedBlockingQueue

priorityBlockingQueue

里面的put和offer方法就是入队方法,但是put是带有阻塞的功能. take也是出队方法,但它也带有阻塞的功能.

public class ThreadDemo1 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
        queue.put("aaa");

        System.out.println(queue.take());

        System.out.println(queue.take());
    }
}

自定义实现一个阻塞队列

这里我们准备实现一个基于数组的阻塞队列,也就是环形队列.这里队列里面我们需要一个数组,计数器和两个头尾指针,put和take方法. 这里我们分三步来实现这个阻塞队列.

1) 普通队列

2) 实现线程安全

3) 实现阻塞功能

普通队列

class MyarrayBlockingQueue {
    private int elems[] = null;
    private int size = 0;
    private int head = 0;
    private int tail = 0;
    public MyarrayBlockingQueue(int capactiy) {
        elems = new int[capactiy];
    }
    public void put(int value) {
        //判断队列满没满
        if(size == elems.length) {
            //阻塞
            return;
        }
        //添加元素
        elems[tail] = value;
        tail++;
        //判断尾指针是不是需要循环到0位置
        if(tail == elems.length) {
            tail = 0;
        }
        size++;
    }
    public int take() {
        int elem = 0;
        //判断队列为不为空
        if(size == 0) {
            //阻塞
            return elem;
        }
        //出队
        elem = elems[head];
        head++;
        if(head == elems.length) {
            head = 0;
        }
        size--;
        return elem;
    }
}

 实现线程安全

实现线程安全,我们就需要加锁.但是当我们下面这个代码这样加锁时,就会出现问题.

public void put(int value) {
        //判断队列满没满
        if(size == elems.length) {
            //阻塞
            return;
        }
        synchronized (this) {
            //添加元素
            elems[tail] = value;
            tail++;
            //判断尾指针是不是需要循环到0位置
            if(tail == elems.length) {
                tail = 0;
            }
            size++;
        }
    }
    public int take() {
        int elem = 0;
        //判断队列为不为空
        if(size == 0) {
            //阻塞
            return elem;
        }
        synchronized(this) {
            //出队
            elem = elems[head];
            head++;
            if(head == elems.length) {
                head = 0;
            }
            size--;
            return elem;
        }
    }

我们发现如果当有两个线程t1,t2同时使用put或者take方法.假设当前有99个元素,容量为100.当t1执行到if(size == elems.length)后被调度走了,再轮到t2执行.当t2执行完后,队列的元素语已经满了.但是当轮到t1执行时因为上一次的if判断它还会再入队一个元素,这就会出现size为101的问题.

正解

我们将if判断条件也放到锁中就可以了.

public void put(int value) {
        synchronized (this) {
            //判断队列满没满
            if(size == elems.length) {
                //阻塞
                return;
            }
            //添加元素
            elems[tail] = value;
            tail++;
            //判断尾指针是不是需要循环到0位置
            if(tail == elems.length) {
                tail = 0;
            }
            size++;
        }
    }
    public int take() {
        int elem = 0;
        synchronized(this) {
            //判断队列为不为空
            if(size == 0) {
                //阻塞
                return elem;
            }
            //出队
            elem = elems[head];
            head++;
            if(head == elems.length) {
                head = 0;
            }
            size--;
            return elem;
        }
    }

实现阻塞

这里需要实现阻塞就要使用我们的wait和notify方法.

当队列满时,使用put就会执行wait进入阻塞,只有当使用take调用notify队列才会解除堵塞.

当队列为空时,使用take就会执行wait进入堵塞,只有当使用put调用notify队列才会解除堵塞.

public void put(int value) throws InterruptedException {
        synchronized (this) {
            //判断队列满没满
            if(size == elems.length) {
                this.wait();
                return;
            }
            //添加元素
            elems[tail] = value;
            tail++;
            //判断尾指针是不是需要循环到0位置
            if(tail == elems.length) {
                tail = 0;
            }
            size++;
            this.notify();
        }
    }
    public int take() throws InterruptedException {
        int elem = 0;
        synchronized(this) {
            //判断队列为不为空
            if(size == 0) {
                this.wait();
                return elem;
            }
            //出队
            elem = elems[head];
            head++;
            if(head == elems.length) {
                head = 0;
            }
            size--;
            this.notify();
            return elem;
        }
    }

但是这样又会出现一个问题. 假设有三个线程t1,t2,t3 t1和t2调用put. t3调用take. 且这个队列已经满了. 这里就会有一种情况: t1和t2调用put发现满了就都会在wait那里堵塞,且解锁. 这时t3就执行take方法出队了一个且调用了notify方法唤醒了t1. 则t1也就向下执行入队了一个,此时队列是满了.但是!!!t1的notify方法就可能会唤醒t2.而t2就会直接向下执行又入队一个,但队列是满的,这就出现问题了.

正解

我们可以在if判断那里将if改成while,这样就算被notify唤醒了,也会再次判断队列是不是满/空.才会选择是不是执行还是继续堵塞.

public void put(int value) throws InterruptedException {
        synchronized (this) {
            //判断队列满没满
            while(size == elems.length) {
                this.wait();
                return;
            }
            //添加元素
            elems[tail] = value;
            tail++;
            //判断尾指针是不是需要循环到0位置
            if(tail == elems.length) {
                tail = 0;
            }
            size++;
            this.notify();
        }
    }
    public int take() throws InterruptedException {
        int elem = 0;
        synchronized(this) {
            //判断队列为不为空
            while(size == 0) {
                this.wait();
                return elem;
            }
            //出队
            elem = elems[head];
            head++;
            if(head == elems.length) {
                head = 0;
            }
            size--;
            this.notify();
            return elem;
        }
    }

 基于自己实现的阻塞队列实现一个简单的生产者消费者模型

public class ThreadDemo8 {
    public static void main(String[] args) {
        block queue = new block(1000);
        Thread t1 = new Thread(() -> {
            int count = 0;
            while(true) {
                try {
                    queue.put(count);
                    System.out.println("生产者: " + count);
                    Thread.sleep(1000);
                    count++;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

        });
        Thread t2 = new Thread(() -> {
            while(true) {
                try {
                    System.out.println("消费者: " +  queue.take());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t1.start();
        t2.start();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/225232.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

从零开始学习 JS APL(一):完整指南和实例解析

本章内容主要是按一下来&#xff1a; 操作DOM BOM 比如 控制网页元 素交互等各种网页 交互效果 以下是我总结笔记&#xff08;仅供参考&#xff09; webAPL 获取DOM对象 变量声明有三个 var let 和 const 我们应该用那个呢&#xff1f; 首先var 先排除&#xff0c;老派写法…

UDP协议实现群聊

代码&#xff1a; import java.awt.*; import java.awt.event.*; import javax.swing.*; import java.net.*; import java.io.IOException; import java.lang.String; public class liaotian extends JFrame{ private static final int DEFAULT_PORT8899; private J…

机器的深度强化学习算法可以被诱导

设计一个好的奖励函数是机器深度强化学习算法的关键之一。奖励函数用于给予智能体&#xff08;机器&#xff09;在环境中采取不同行动时的反馈信号&#xff0c;以指导其学习过程。一个好的奖励函数应该能够引导智能体朝着期望的行为方向学习&#xff0c;并尽量避免潜在的问题&a…

使用ASIRequest库进行Objective-C网络爬虫示例

在Objective-C中&#xff0c;ASIHTTPRequest是一个非常受欢迎的库&#xff0c;用于处理HTTP请求。它可用于下载网页内容&#xff0c;处理API请求&#xff0c;甚至进行复杂的网络交互。下面是一个简单的示例&#xff0c;展示了如何使用ASIHTTPRequest库来爬取网页代码。 首先&a…

【ROS2指南-9】Bag的record和play操作

目标&#xff1a;记录在某个话题上发布的数据&#xff0c;以便您可以随时回放和检查它。 教程级别&#xff1a;初学者 时间&#xff1a; 10分钟 内容 背景 先决条件 任务 1 设置 2 选择一个主题 3 ros2包记录 4 ros2 包信息 5 ros2包玩 概括 下一步 相关内容 背景 …

Java 简易版 TCP UDP聊天

客户端 import java.io.*; import java.net.Socket; import java.util.Date; import javax.swing.*;public class MyClient {private JFrame jf;private JButton jBsend;private JTextArea jTAcontent;private JTextField jText;private JLabel JLcontent;private Date data;pr…

自定义类加载器

通过继承ClassLoader类&#xff0c;重写findClass方法&#xff0c;实现自定义类加载器。 一、自定义类加载器 package com.molange.JavaSE.myclassloader;import java.io.BufferedInputStream; import java.io.ByteArrayOutputStream; import java.io.FileInputStream; impor…

MATLAB - 绘制立体图(平面+水深)

目录 代码结果 代码 % 在 X-Y 平面上绘图 % 正常绘制平面图 [X,Y,Z] peaks; contour(X,Y,Z,20); hold on% ****重点******************************************** % 改为三维视图&#xff0c;具体可以help % view(3); %此时的平面图对应z0 &#xff1b;默认az-37.5&#x…

大模型在企业知识库场景的落地思考

一、引言 在这个信息爆炸的时代&#xff0c;企业的知识库已不再是简单的数据堆砌&#xff0c;而是需要智能化、高效率的知识管理和利用。大模型作为AI领域的一个重要突破&#xff0c;正逐步成为企业知识库管理的强大助力。通过前面一段时间对于大模型在企业落地的深入调研和实…

Linux---逻辑卷管理

本章主要介绍逻辑卷的管理。 了解什么是逻辑卷创建和删除逻辑卷扩展逻辑卷缩小逻辑卷逻辑卷快照的使用 前面介绍了分区的使用&#xff0c;如果某个分区空间不够&#xff0c;想增加空间是非常困难的。所以&#xff0c;建议尽可能使用逻辑卷而非普通的分区&#xff0c;因为逻辑卷…

【C语言】数据在内存中的存储

目录 练笔 整型数据的存储&#xff1a; char 型数据——最简单的整型 整型提升&#xff1a; 推广到其他整形&#xff1a; 大小端&#xff1a; 浮点型数据的存储&#xff1a; 存储格式&#xff1a; 本篇详细介绍 整型数据&#xff0c;浮点型数据 在计算机中是如何储存的。…

Redis和MySQL双写一致性实用解析

1、背景 先阐明一下Mysql和Redis的关系&#xff1a;Mysql是数据库&#xff0c;用来持久化数据&#xff0c;一定程度上保证数据的可靠性&#xff1b;Redis是用来当缓存&#xff0c;用来提升数据访问的性能。 关于如何保证Mysql和Redis中的数据一致&#xff08;即缓存一致性问题…

RT-DETR手把手教程:NEU-DET钢材表面缺陷检测任务 | 不同网络位置加入EMA注意力进行魔改

&#x1f4a1;&#x1f4a1;&#x1f4a1;本文独家改进&#xff1a;本文首先复现了将EMA引入到RT-DETR中&#xff0c;并跟不同模块进行结合创新&#xff1b;1&#xff09;多种Rep C3结合&#xff1b;2&#xff09;直接作为注意力机制放在网络不同位置&#xff1b; NEU-DET钢材…

C#excel导入dategridview并保存到数据库/dategridview增加一行或几行一键保存数据库

excel导入到dategridview显示并保存到数据库 dategridview增加一行或几行一键保存数据库 ExcelHelper类(这个要导入NPOI包) using NPOI.HSSF.UserModel; using NPOI.SS.UserModel; using NPOI.XSSF.UserModel; using System; using System.Collections.Generic; using Syste…

java多人聊天

服务端 package 多人聊天;import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintStream; import java.net.ServerSocket; import java.net.Socket; import java.util.ArrayList;…

用 Bytebase 做数据库 schema 迁移

数据库 schema 迁移指修改管理数据库结构的变更&#xff0c;包括为数据库添加视图或表、更改字段类型或定义新约束。Bytebase 提供了可视化 GUI 方便迁移数据库 schema&#xff0c;本教程将展示如何使用 Bytebase 为 schema 迁移配上 SQL 审核&#xff0c;自定义审批流&#xf…

解决Could not establish connection to : XHR failed

解决Could not establish connection to : XHR failed 问题描述 用vscode用远程连接服务器时总报上面的错误&#xff0c;用xshell和Xftp和vscode终端都可以连上&#xff0c;但是用vscode的ssh连接缺总报错&#xff0c;导致无法连接服务器进行代码调试 一、原因 原因可能是在…

Python tkinter 之文件对话框(filedialog)

文章目录 1 文件1.1 获取单个文件名称&#xff1a;askopenfilename()1.2 获取多个文件名称&#xff1a;askopenfilenames()1.3 获取单个文件属性&#xff1a;askopenfile()1.4 获取多个文件属性&#xff1a;askopenfiles()1.5 获取保存文件的路径&#xff1a;asksaveasfilename…

树莓派4B iio子系统 mpu6050

编写基于iio的mpu6050 遇到的问题&#xff0c;在读取数据时&#xff0c;读出来的数据不能直接拼接成int类型 需要先将其转换成short int&#xff0c;再转换成int 效果如图所示 注&#xff1a;驱动是使用的modprobe加载的 简单画的思维导图 设备树修改部分&#xff1a; …

大模型发展对教育领域的巨大影响

摘要&#xff1a; 教育是一个复杂而微妙的领域;有效的教学涉及对学生认知的推理&#xff0c;并应反映学生的学习目标。基础模型的性质在这里提出了在人工智能教育领域尚未实现的承诺&#xff1a;虽然教育中的某些许多数据流单独地过于有限&#xff0c;无法训练基础模型&#xf…