多线程代码案例之阻塞队列

目录

1.生产者消费者模型

2.使用标准库中的阻塞队列

3.模拟实现阻塞队列


在介绍阻塞队列之前,会先介绍一些前置知识,像队列:有普通队列、优先级队列、阻塞队列、和消息队列。前面两个是线程不安全的,而后面两个是线程安全的。本文重点介绍阻塞队列。

1.生产者消费者模型

1.1队列功能介绍

(1)阻塞队列

1)当队列为空时,尝试出队列;此时,队列会阻塞等待,直到队列不为空才继续执行出队列操作。

2)当队列为满时,尝试入队列;此时,队列就会阻塞等待,直到队列不为满为止才能继续执行入队操作。

(2)消息队列

并非遵循常规的先进先出,而是带有一个topic关键字。当出队时指定某个topic,就会先出topic下的元素(topic内部就遵循先进先出)

举例:例如到医院窗口排队,有很多种类型的窗口,如:妇科、儿科、骨科等等(这些称为topic),不是说,你先来了就一定可以就诊,而是等待你所在的topic是否呼唤你。

像上面的阻塞队列和消息队列,起到的作用就是可以实现:生产者消费者模型。

1.2.生产者消费者模型介绍

(1)什么是生产者消费者模型

1)A线程进行入队操作,B线程进行出队操作。当队列为空时,B线程需要等待A线程入队,才能从队列中取出元素;当队列满时,A线程需要等待B线程取出元素后,才能继续入队。这里的A线程就相当于生产者,B线程相当于消费者。

2)有一个自动售卖机(相当于阻塞队列/消息队列),商人负责填货(生产者),用户负责买东西(消费者)。

(2)模型的作用

1)可以让程序机械能解耦合操作

2)可以程序“削峰填谷”

解耦合作用举例:

1.如果A和B是互相调用的关系,那么如果A中需要修改,那么B中的大部分都需要同步修改,否则无法互相调用。

2.当A和B中间加了一个队列,那么A与B的交互只需要通过操作队列即可。即使其中一个出现了问题,也不会影响到另外一个。

削峰填谷举例:

1.当服务器直接和客户端交互时,当请求过多时,就会直接导致服务器崩溃。

2.如果在客户端和服务器中间加上一个队列,让他们通过队列进行交互。即使请求再多,也不会影响到服务器,最坏的情况也就是队列崩溃。

所以说,阻塞队列/消息队列,就是可以实现生产者消费者模型的效果。

2.使用标准库中的阻塞队列

现在,我们介绍如何调用标准库中的阻塞队列。

2.1.创建阻塞队列

(1)选择正确的接口

(2)实例化的对象

可以选择的有下面这三个,很明显,它们之间只是基于不同的数据结构进行实现。

这三个,我们都是可以选择的。

(3)队列的操作

因为是队列,我们只需要考虑入队和出队操作即可。

在阻塞队列中,只有这两个是带有阻塞功能的,所以我们只需要使用这两个即可。

因为这两个操作,是带有阻塞功能的,也就是wait,所以使用时需要声明异常。

(4)普通的操作

这里普通的操作指的是在一个线程中进行操作。

程序运行起来,发现没有任何的报错,只是程序仍然不会结束,这就是阻塞功能。

2.2.使用阻塞队列

(1)消费者消费的很慢

当消费者消费慢时,也就是让消费者每次sleep,此时,就会产生生产者在等消费者的过程

 public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
        Thread t1 = new Thread(()->{//负责入队操作
            for (int i = 0; i < 5000; i++) {
                try {
                    queue.put(i);
                    System.out.println("入队:"+i);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        Thread t2 = new Thread(()->{//负责出队操作
            for (int i = 0; i < 5000; i++) {
                try {
                    int tmp = queue.take();
                    System.out.println("出队:"+tmp);
                    Thread.sleep(1000);

                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t1.start();
        t2.start();
    }

(2)生成者生成的很慢

以上就是对阻塞队列的使用,下面我们自己实现一个阻塞队列

3.模拟实现阻塞队列

要模拟阻塞队列,就需要有入队/出队操作,并且可以进行阻塞等待,并且是线程安全的!

我们先从普通队列开始,然后加上线程安全和阻塞操作,最后进行优化和线程安全Pro版 

3.1.实现普通的队列

下面按照循环队列的形式进行创建队列,只提供了入队和出队操作

class MyBlockQueue {
    int head = 0;
    int tail = 0;
    int size = 0;
  

    public String[] elem;
    public MyBlockQueue(int capacity) {
        elem = new String[capacity];
    }
    //入队
    public void put(String s) throws InterruptedException {    
           if(size == elem.length) {
               return;
            }
            elem[tail] = s;//队尾入
            size++;
            if(tail >= elem.length-1) tail=0;
            else tail++;
           
        }
    }
    //出队
    public String take() throws InterruptedException {
            if(size == 0) {
              return null;
            }
            String tmp = elem[head];
            size--;
            if(head == elem.length-1) head = 0;
            else head++;
            return tmp;
    }
}
3.2.加上阻塞功能

这里我们使用的是wait而不是sleep。

class MyBlockQueue {
   int head = 0;
    int tail = 0;
    int size = 0;

    public String[] elem;
    public MyBlockQueue(int capacity) {
        elem = new String[capacity];
    }
    //入队
    public void put(String s) throws InterruptedException {
        synchronized (this) {
           if(size == elem.length) {
                System.out.println("队列满,阻塞等待");
                this.wait();
            }
            elem[tail] = s;//队尾入
            size++;
            if(tail >= elem.length-1) tail=0;
            else tail++;
            this.notify();//入队一个,唤醒一次
        }
    }
    //出队
    public String take() throws InterruptedException {
        synchronized (this) {
           if(size == 0) {
                System.out.println("队列空");
                this.wait();
            }
            String tmp = elem[head];
            size--;
            if(head == elem.length-1) head = 0;
            else head++;
            this.notify();//出队一个,唤醒一次
            return tmp;
        }
    }
}

(1)改进1:对入队、出队操作,都进行了加锁操作

(2)改进2:在队列满/空时,不进行return,而是进行阻塞等待;当有一个元素入队/出队之后,就进行唤醒一次。

上述不使用sleep的原因是:sleep是抱着锁使线程进入休眠状态,当此时有其他的操作(入队/出队)时,无法拿到锁,从而无法执行(发生了死锁)

上述还有一个缺点:就是wait不仅仅可以被notify唤醒,还可以被interrupt唤醒,所以要循环进行判断。

class MyBlockQueue {
   int head = 0;
    int tail = 0;
    int size = 0;

    public String[] elem;
    public MyBlockQueue(int capacity) {
        elem = new String[capacity];
    }
    //入队
    public void put(String s) throws InterruptedException {
        synchronized (this) {
            while (size >=elem.length) {
                //循环等待确认,防止不是被notify唤醒
                try {
                    this.wait();
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            elem[tail] = s;//队尾入
            size++;
            if(tail >= elem.length-1) tail=0;
            else tail++;
            this.notify();
        }
    }
    //出队
    public String take() throws InterruptedException {
        synchronized (this) {
            while (size ==0) {
                try {
                    this.wait();
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            String tmp = elem[head];
            size--;
            if(head == elem.length-1) head = 0;
            else head++;
            this.notify();
            return tmp;
        }
    }
}

(3)改进3:使用while+wait的方式反复确认是否要被唤醒

3.3.确保线程一定安全

上述线程其实已经很安全了,但是还需要再进一步优化,达到更安全的效果。对于线程安全还有两个:内存可见性问题和指令重排序,所以我们只需要对变量加上volatile关键字即可。

volatile int head = 0;
volatile int tail = 0;
volatile int size = 0;

上述就是一个完整的阻塞队列模拟实现的代码,下面展示完整代码:

class MyBlockQueue {
   /* int head = 0;
    int tail = 0;
    int size = 0;*/
   volatile int head = 0;
    volatile int tail = 0;
    volatile int size = 0;

    public String[] elem;
    public MyBlockQueue(int capacity) {
        elem = new String[capacity];
    }
    //入队
    public void put(String s) throws InterruptedException {
        synchronized (this) {
           /* if(size == elem.length) {
                System.out.println("队列满,阻塞等待");
                this.wait();
            }*/
            while (size >=elem.length) {
                //循环等待确认,防止不是被notify唤醒
                try {
                    this.wait();
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            elem[tail] = s;//队尾入
            size++;
            if(tail >= elem.length-1) tail=0;
            else tail++;
            this.notify();
        }
    }
    //出队
    public String take() throws InterruptedException {
        synchronized (this) {
           /* if(size == 0) {
                System.out.println("队列空");
                this.wait();
            }*/
            while (size ==0) {
                try {
                    this.wait();
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            String tmp = elem[head];
            size--;
            if(head == elem.length-1) head = 0;
            else head++;
            this.notify();
            return tmp;
        }
    }
}

测试代码:

 public static void main(String[] args) throws InterruptedException {
        MyBlockQueue myBlockQueue = new MyBlockQueue(10);
        Thread t1 = new Thread(()->{
            for (int i = 0; i < 20; i++) {
                try {
                    System.out.println("生成1:");
                    myBlockQueue.put("1");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        Thread t2 = new Thread(()->{
            for (int i = 0; i < 30; i++) {
                String tmp = null;
                try {
                    tmp = myBlockQueue.take();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("消费:"+tmp);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t1.start();
        t2.start();
    }

测试结果:

结果是可以的,和标准库中的阻塞队列基本一致。

几个注意事项:


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/542794.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

FFmpeg: 自实现ijkplayer播放器--03UI界面设计

文章目录 UI设计流程图UI设计界面点击播放功能实现 UI设计流程图 UI设计界面 主界面 控制条 播放列表 画面显示 标题栏 设置界面 提示框 点击播放功能实现 槽函数实现&#xff1a; connect(ui->ctrlBarWind, &CtrlBar::SigPlayOrPause, this, &Main…

软件杯 深度学习卷积神经网络垃圾分类系统 - 深度学习 神经网络 图像识别 垃圾分类 算法 小程序

文章目录 0 简介1 背景意义2 数据集3 数据探索4 数据增广(数据集补充)5 垃圾图像分类5.1 迁移学习5.1.1 什么是迁移学习&#xff1f;5.1.2 为什么要迁移学习&#xff1f; 5.2 模型选择5.3 训练环境5.3.1 硬件配置5.3.2 软件配置 5.4 训练过程5.5 模型分类效果(PC端) 6 构建垃圾…

InnoDB中高度为3的B+树最多可以存多少数据?

参考&#xff1a; &#x1f525;我说MySQL每张表最好不超过2000万数据&#xff0c;面试官让我回去等通知&#xff1f; - 掘金 考虑到磁盘IO是非常高昂的操作&#xff0c;计算机操作系统做了预读的优化&#xff0c;当一次IO时&#xff0c;不光把当前磁盘地址的数据&#xff0c;…

QtCreater 使用

QtCreater 创建项目 1.刚进入 QtCreater 的界面是这样的一个界面 ① 创建一个新的文件&#xff0c;那么我们就选择左上角的 “文件” ② 点击新建文件&#xff0c;或者也可以直接使用快捷键 CtrlN 此时就会弹出对话框&#xff0c;让我们选择想要创建的文件&#xff1a; Appli…

stm32f103---按键控制LED---代码学习

目录 一、总体代码 二、LED端口初始化分析 ​编辑 三、LED灭的控制 四、LED亮 五、按键初始化 ​ 六、按键控制LED的功能 一、总体代码 这里使用到了LED灯和按键&#xff0c;实现效果是当按键按下时灯的亮灭转化 #include "stm32f10x.h" #include "bsp_led…

Notion2024年最新桌面端安装+汉化教程,支持MAC和WIN版本

Notion 是一个多功能的协作工具&#xff0c;可以用于个人和团队的知识管理、项目管理、笔记记录和协同编辑等。它提供了灵活的页面和数据库功能&#xff0c;可以根据不同需求进行自定义和组织。Notion 能够帮助用户更高效地组织和共享信息&#xff0c;提升工作效率和团队合作。…

ThingsBoard通过服务端获取客户端属性或者共享属性

MQTT基础 客户端 MQTT连接 通过服务端获取属性值 案例 1、首先需要创建整个设备的信息&#xff0c;并复制访问令牌 ​2、通过工具MQTTX连接上对应的Topic 3、测试链接是否成功 4、通过服务端获取属性值 5、在客户端查看对应的客户端属性或者共享属性的key 6、查看整个…

改进YOLOv8系列:结合自研注意力模块MultiScaleAttentiveConv (MSAConv)

改进YOLOv8注意力系列七:结合空间关系增强注意力SGE、SKAttention动态尺度注意力、全局上下文信息注意力Triplet Attention 代码MultiScaleAttentiveConv (MSAConv)本文提供了改进 YOLOv8注意力系列包含不同的注意力机制以及多种加入方式,在本文中具有完整的代码和包含多种更…

蓝桥杯嵌入式(G431)备赛笔记——DMA+ADC(单通道+多通道)

单通道&#xff1a; 开启循环模式&#xff0c;两个参数设为word u32 adc_tick0; u32 r37_value0; u32 r38_value0; float r37_volt0; float r38_volt0;//DMAADCvoid DMA_ADC() {if(uwTick-adc_tick<100) return;adc_tick uwTick;HAL_ADC_Start_DMA(&hadc2, &r37_v…

vivado ila 运行触发器、停止触发器、使用自动重新触发

运行触发器 您可在 2 种不同模式下运行或装备 ILA 核触发器 &#xff1a; • “ Run Trigger ” &#xff1a; 选择要装备的 ILA 核 &#xff0c; 然后单击“ ILA 仪表板 (ILA Dashboard) ”窗口或“硬件 (Hardware) ”窗口 工具栏上的“ Run Trigger ”按钮即可装备 IL…

013:vue3 Pinia详解使用详解

文章目录 1. Pinia 是什么2. Pinia 功能作用3. 手动添加Pinia到Vue项目4. Pinia基础使用5. getters实现6. action异步实现7. storeToRefs工具函数8. Pinia的调试9. 总结 1. Pinia 是什么 Pinia 是 Vue 的专属的 最新状态管理库是 Vuex 状态管理工具的替代品和 Vuex 一样为 Vue…

Django处理枚举(枚举模型)以及source的使用

Django处理枚举-枚举模型 1、定义模型类、序列化器类2、对上面这些场景使用source参数3、支持连表查询4、自定义序列化输出方法5、案例5 1、定义模型类、序列化器类 定义模型类models.py&#xff1b;项目模型类、接口模型类、用例模型类 from django.db import modelsclass T…

选择自动化工具是一个关键的决策过程

好的自动化软件测试工具&#xff0c;不仅可以有效的缩短全生命周期的交付周期&#xff0c;还可以提高测试的有效性&#xff0c;还可以保证更好的高质量的交付。工具的选型是一项重要的决策过程&#xff0c;工具的采用涉及到企业的效率、成本和长期发展。 1、需求分析 确组织希…

08 Php学习:if语句、Switch语句

PHP 条件语句 当您编写代码时&#xff0c;您常常需要为不同的判断执行不同的动作。您可以在代码中使用条件语句来完成此任务。 在 PHP 中&#xff0c;提供了下列条件语句&#xff1a; if 语句 - 在条件成立时执行代码 if…else 语句 - 在条件成立时执行一块代码&#xff0c;…

Python学习笔记22 - 文件操作

文件读写的原理 文件读写的操作 常用的文件打开模式 文件对象的常用方法 with语句&#xff08;上下文管理器&#xff09;

码蹄集部分题目(2024OJ赛11期)

1&#x1f40b;&#x1f40b;&#x1f40b;银行账户&#xff08;黄金&#xff1b;模拟&#xff09; 时间限制&#xff1a;1秒 占用内存&#xff1a;128M &#x1f41f;题目描述 据说对银行账户进行盗窃时&#xff0c;如果只盗取小数点下的数值&#xff0c;就不容易引起注意…

【vue】Vue3开发中常用的VSCode插件

Vue - Official&#xff1a;vue的语法特性&#xff0c;如代码高亮&#xff0c;自动补全等 Vue VSCode Snippets&#xff1a;自定义一些代码片段 v3单文件组件vdata数据vmethod方法 别名路径跳转 参考 https://www.bilibili.com/video/BV1nV411Q7RX

Apple:叠加提示 - 高效的 RAG 优化方式

发表机构&#xff1a;Apple 本文介绍了一种新的检索增强生成&#xff08;RAG&#xff09;提示方法——叠加提示&#xff08;superposition prompting&#xff09;&#xff0c;该方法可以直接应用于预训练的基于变换器的大模型&#xff08;LLMs&#xff09;&#xff0c;无需微调…

spring容器

spring容器 实现方式 spring中提供了各式各样的IOC容器的实现供用户选择和使用&#xff0c;使用什么样的容器取决于用户的需要 BeanFactory 该接口是最简单的容器&#xff0c;提供了基本的DI支持。最常用的BeanFactory实现是XmlBeanFactory类&#xff0c;根据XML文件中的定义加…

嵌入式第三天:(C语言入门)

目录 一、跳转关键字 break&#xff1a; continue&#xff1a; goto&#xff1a; 二、函数 概述&#xff1a; 函数的使用&#xff1a; 无参无返回值&#xff1a; 有参无返回值&#xff1a; 有参有返回值&#xff1a; 返回值注意点&#xff1a; 函数的声明&#xff…