[Netty] Mpsc Queue (十七)

JCTools 是适用于 JVM 并发开发的工具,主要提供了一些 JDK 确实的并发数据结构,例如非阻塞 Map、非阻塞 Queue 等。其中非阻塞队列可以分为四种类型,可以根据不同的场景选择使用。

  • Spsc 单生产者单消费者
  • Mpsc 多生产者单消费者
  • Spmc 单生产者多消费者
  • Mpmc 多生产者多消费者

Netty 中直接引入了 JCTools 的 Mpsc Queue

文章目录

      • 1.Mpsc Queue介绍
      • 2.Mpsc Queue 源码分析
        • 2.1 使用实例
        • 2.2 入队 offer()
        • 2.3 出队 poll()
      • 3.总结

1.Mpsc Queue介绍

Mpsc 的全称是 Multi Producer Single Consumer, 多生产者单消费者。

Mpsc Queue 可以保证多个生产者同时访问队列是线程安全的, 而且同一时刻只允许一个消费者从队列中读取数据, Netty Reactor 线程中任务队列 taskQueue 必须满足多个生产者可以同时提交任务, 所以 JCTools 提供的 Mpsc Queue 非常适合 Netty Reactor 线程模型。

Mpsc Queue 有多种的实现类, MpscArrayQueue, MpscUnboundedArrayQueue, MpscChunkedArrayQueue

在这里插入图片描述

MpscArrayQueue 继承了MpscXxxPad 和 MpscXxxField。每个有包含属性的类后面都会被 MpscXxxPad 类隔开。

// ConcurrentCircularArrayQueueL0Pad
long p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16, p17;

// ConcurrentCircularArrayQueue
protected final long mask;
protected final E[] buffer;

// MpmcArrayQueueL1Pad
long p00, p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16;

// MpmcArrayQueueProducerIndexField
private volatile long producerIndex;

// MpscArrayQueueMidPad
long p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16, p17;

// MpscArrayQueueProducerLimitField
private volatile long producerLimit;

// MpscArrayQueueL2Pad
long p00, p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16;

// MpscArrayQueueConsumerIndexField
protected long consumerIndex;

// MpscArrayQueueL3Pad
long p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16, p17;

MpscXxxPad 类中使用了大量 long 类型的变量, 是为了解决伪共享(false sharing)问题。

Mpsc Queue 采取了空间换时间的策略, 让不同线程共享的对象加载到不同的缓存行。

public class FalseSharingPadding {
    protected long p1, p2, p3, p4, p5, p6, p7;
    protected volatile long value = 0L;
    protected long p9, p10, p11, p12, p13, p14, p15;
}

变量 value 前后都填充了 7 个 long 类型的变量, 可以保证在多线程访问 value 变量时, value 与其他不相关的变量处于不同的 Cache Line。

在这里插入图片描述

2.Mpsc Queue 源码分析

MpscArrayQueue 属性

// ConcurrentCircularArrayQueue
protected final long mask; // 计算数组下标的掩码
protected final E[] buffer; // 存放队列数据的数组

// MpmcArrayQueueProducerIndexField
private volatile long producerIndex; // 生产者的索引

// MpscArrayQueueProducerLimitField
private volatile long producerLimit; // 生产者索引的最大值

// MpscArrayQueueConsumerIndexField
protected long consumerIndex; // 消费者索引

mask 变量表明队列中数组的容量大小肯定是 2 的次幂, Mpsc 是多生产者单消费者队列, 所有producerIndex 和 producerLimit 都是volatile修饰, 其中一个生产者线程的修改需要对其他生产者线程可见

2.1 使用实例

public class MpscArrayQueueTest {
    public static final MpscArrayQueue<String> MPSC_ARRAY_QUEUE = new MpscArrayQueue<>(2);
    public static void main(String[] args) {
        for (int i = 1; i <= 2; i++) {
            int index = i;
            new Thread(() -> MPSC_ARRAY_QUEUE.offer("data" + index), "thread" + index).start();
        }
        try {
            Thread.sleep(1000L);
            MPSC_ARRAY_QUEUE.add("data3"); // 入队操作,队列满则抛出异常
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("队列大小:" + MPSC_ARRAY_QUEUE.size() + ", 队列容量:" + MPSC_ARRAY_QUEUE.capacity());
        System.out.println("出队:" + MPSC_ARRAY_QUEUE.remove()); // 出队操作,队列为空则抛出异常
        System.out.println("出队:" + MPSC_ARRAY_QUEUE.poll()); // 出队操作,队列为空则返回 NULL
    }
}
java.lang.IllegalStateException: Queue full
	at java.util.AbstractQueue.add(AbstractQueue.java:98)
	at MpscArrayQueueTest.main(MpscArrayQueueTest.java:17)
队列大小:2, 队列容量:2
出队:data1
出队:data2
Disconnected from the target VM, address: '127.0.0.1:58005', transport: 'socket'

入队 offer()和出队 poll()

2.2 入队 offer()

public boolean offer(E e) {
    if (null == e) {
        throw new NullPointerException();
    } else {
        long mask = this.mask;
        long producerLimit = this.lvProducerLimit(); // 获取生产者索引最大限制
        long pIndex;
        long offset;
        do {
            pIndex = this.lvProducerIndex(); // 获取生产者索引
            if (pIndex >= producerLimit) {
                offset = this.lvConsumerIndex(); // 获取消费者索引
                producerLimit = offset + mask + 1L;
                if (pIndex >= producerLimit) {
                    return false; // 队列已满
                }
                this.soProducerLimit(producerLimit); // 更新 producerLimit
            }
        } while(!this.casProducerIndex(pIndex, pIndex + 1L)); // CAS 更新生产者索引,更新成功则退出,说明当前生产者已经占领索引值
        offset = calcElementOffset(pIndex, mask); // 计算生产者索引在数组中下标
        UnsafeRefArrayAccess.soElement(this.buffer, offset, e); // 向数组中放入数据
        return true;
    }
}

producerIndex、producerLimit 以及 consumerIndex 之间的关系

public MpscArrayQueueProducerLimitField(int capacity) {
    super(capacity);
    this.producerLimit = capacity;
}

protected final long lvProducerLimit() {
    return producerLimit;
}

初始化状态, producerLimit 队列的容量是相等的, producerIndex = consumerIndex = 0。接下来 Thread1 和 Thread2 并发向 MpscArrayQueue 中存放数据。

在这里插入图片描述

两个线程此时拿到的 producerIndex 都是 0, 小于 producerLimit, 此时两个线程都会尝试使用 CAS 操作更新 producerIndex, 一个成功, 一个失败。

假设 Thread1 执行 CAS 操作成功, Thread2失败会重新更新producerIndex。

Thread1 更新后 producerIndex 的值为 1, 由于 producerIndex 是 volatile 修饰的, 对于Thread2 可见, 当 Thread1 和 Thread2 都通过 CAS 抢占成功后, 拿到的 pIndex 分别是 0 和 1, 根据 pIndex 进行位运算计算得到数组对应的下标, 然后通过 UNSAFE.putOrderedObject() 方法将数据写入到数组中。

在这里插入图片描述

    public static <E> void soElement(E[] buffer, long offset, E e) {
        UnsafeAccess.UNSAFE.putOrderedObject(buffer, offset, e);
    }

putOrderedObject() 不会立刻将数据更新到内存中, 并把其他 Cache Line 置为失效, 使用的是LazySet 延迟更新机制。性能比putObject() 高。

Java 中有四种类型的内存屏障, LoadLoad、StoreStore、LoadStore 和 StoreLoad, putOrderedObject() 使用了 StoreStore, 对于 Store1,StoreStore,Store2 这样的操作序列, 在 Store2 进行写入之前, 会保证 Store1 的写操作对其他处理器可见。

LazySet 机制是有代价的, 是写操作结果有纳秒级的延迟, 不会立刻被其他线程以及自身线程可见。在Mpsc Queue的使用场景中, 多个生产者只负责写入数据, 并没有写入之后立刻读取的需求, 所以使用 LazySet 机制是没有问题的, 只需StoreStore Barrier 保证多线程写入的顺序即可。

对于 do-while 循环内的逻辑, 为什么需要两次 if(pIndex >= producerLimit) 判断呢, 说明当生产者索引大于 producerLimit 阈值时, 可能存在 1> producerLimit 缓存值过期了或者队列已经满了, 需要读取最新的消费者索引 consumerIndex, 重新做一次 producerLimit 计算, 2> 生产者索引还是大于 producerLimit 阈值, 说明队列的真的满了。

因为生产者有多个线程, 所以 MpscArrayQueue 采用了 UNSAFE.getLongVolatile() 方法保证获取消费者索引 consumerIndex 的准确性。
getLongVolatile() 使用了 StoreLoad Barrier, 在 Load2 以及后续的读取操作之前, 会保证 Store1 的写入操作对其他处理器可见。

StoreLoad 是四种内存屏障开销最大的, 引入producerLimit 的好处在于, 假设我们的消费速度和生产速度比较均衡的情况下, 差不多走完一圈数组才需要获取一次消费者索引 consumerIndex, 从而减少了getLongVolatile() 方法的使用次数。

2.3 出队 poll()

移除队列的首个元素并返回, 如果队列为空, 返回Null

public E poll() {
    long cIndex = this.lpConsumerIndex(); // 直接返回消费者索引 consumerIndex
    long offset = this.calcElementOffset(cIndex); // 计算数组对应的偏移量
    E[] buffer = this.buffer;
    E e = UnsafeRefArrayAccess.lvElement(buffer, offset); // 取出数组中 offset 对应的元素
    if (null == e) {
        if (cIndex == this.lvProducerIndex()) { // 队列为空
            return null;
        }
        do {
            e = UnsafeRefArrayAccess.lvElement(buffer, offset); 
        } while(e == null); // 等待生产者填充元素
    }
    UnsafeRefArrayAccess.spElement(buffer, offset, (Object)null); // 消费成功后将当前位置置为 NULL
    this.soConsumerIndex(cIndex + 1L); // 更新 consumerIndex 到下一个位置
    return e;
}

只有一个消费者线程, 所以么有CAS操作, 核心思路是获取消费者索引 consumerIndex, 然后根据 consumerIndex 计算得出数组对应的偏移量, 将数组对应位置的元素取出并返回, 最后将 consumerIndex 移动到环形数组下一个位置。

在这里插入图片描述

public static <E> E lvElement(E[] buffer, long offset) {
    return (E) UNSAFE.getObjectVolatile(buffer, offset);
}

getObjectVolatile() 方法则使用的是 LoadLoad Barrier, 对于 Load1, LoadLoad, Load2 来说, 在 Load2 以及后续读取操作之前, Load1读取操作执行完毕。所以 getObjectVolatile() 方法可以保证每次读取数据都可以从内存中拿到最新值。

当调用 lvElement() 方法获取到的元素为 NULL 时,

  1. 队列为空或者生产者填充的元素还没有对消费者可见。
  2. 如果消费者索引 consumerIndex 等于生产者 producerIndex, 说明队列为空。
  3. 只要两者不相等, 消费者需要等待生产者填充数据完毕。

当成功消费数组中的元素之后, 把当前消费者索引 consumerIndex 的位置置为 NULL, 把 consumerIndex 移动到数组下一个位置。

public static <E> void spElement(E[] buffer, long offset, E e) {
    UNSAFE.putObject(buffer, offset, e);
}

protected void soConsumerIndex(long newValue) {
    UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue);
}

putObject() 不会使用任何内存屏障, 会直接更新对象对应偏移量的值。而 putOrderedLong 与 putOrderedObject() 是一样的, 都使用了 StoreStore Barrier。

3.总结

  1. 通过大量填充 long 类型变量解决伪共享问题
  2. 环形数组的容量设置为 2 的次幂,可以通过位运算快速定位到数组对应下标
  3. 入队 offer() 操作中 producerLimit 的巧妙设计,大幅度减少了主动获取消费者索引 consumerIndex 的次数,性能提升显著
  4. 入队和出队操作中都大量使用了 UNSAFE 系列方法,针对生产者和消费者的场景不同,使用的 UNSAFE 方法也是不一样的。Jctools 在底层操作的运用上也是有的放矢,把性能发挥到极致

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/13900.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【AI生产力工具】Midjourney:为创意人士提供创造性灵感和支持的工具

文章目录 一、Midjourney是什么&#xff1f;二、Midjourney的优势三、Midjourney的应用四、结语 在现代社会&#xff0c;创意和创新成为越来越重要的能力。然而&#xff0c;创意灵感的获取却不是一件容易的事情&#xff0c;这就需要我们使用一些辅助工具来帮助我们发现和实现创…

Docker容器---网络、容器操作

Docker容器---网络、容器操作 一、docker实现原理二、docker网路模式1、Host模式2、container模式3、none模式4、bridge模式 三、自定义网络1、查看网络模式列表2、查看容器信息3、指定分配IP地址4、自定义网络固定IP 四、暴露端口五、容器端口映射1、创建端口映射 六、资源控制…

达梦数据迁移问题罗列

目录 一、前言 二、问题罗列 一、前言 最近小编接触到国产的数据库达梦数据库&#xff0c;然后在用达梦数据迁移工具MySQL迁移至达梦的时候遇到了一系列的问题现在罗列一下在这里。目前关于国产的数据库达梦这些资料比较少&#xff0c;希望能够帮到有需要的同志们&#xff01…

零代码是什么?零代码平台适合谁用?

随着信息技术的发展&#xff0c;软件开发领域也不断发生变革&#xff0c;零代码&#xff08;No-Code&#xff09;开发模式越来越受到关注。 零代码到底是什么&#xff0c;能不能用通俗的话来说&#xff1f;这就来给大家讲一讲&#xff01; 01 零代码为什么出现&#xff1f; 随…

Java基础——缓冲流

&#xff08;1&#xff09;缓冲流概述&#xff1a; 缓冲流也称高效流&#xff0c;或者高级流。&#xff08;字节流可称原始流&#xff09;作用&#xff1a;缓冲流自带缓冲区&#xff0c;可以提高原始字节流&#xff0c;字符流读写数据的性能。 &#xff08;2&#xff09;字节缓…

【工作思考】如何提升自己的编程能力?

文章目录 前言一、代码评审为什么要进行代码评审&#xff1f; 二、持续学习能力三、良好的编程习惯代码注释避免深度嵌套拒绝长函数重视自测文档编写重构你的代码学会思考 四、多接触开源项目五、总结 前言 在工作中&#xff0c;我们大部分的时间都是在阅读代码&#xff0c;阅…

无人机影像处理流程

无人机由于其方便快捷&#xff0c;精度高等特点已经广泛应用于农田尺度的作物生长监测。尤其是近年来大疆推出了两个多光谱无人机&#xff0c;价格也相较便宜。但目前无人机的使用实际上需要进一步处理才能获取得到农田的基本信息&#xff0c;主要包括影像的校正和图像拼接&…

[oeasy]python0139_尝试捕获异常_ try_except_traceback

尝试捕获异常 回忆上次内容 变量相加 整型数字变量可以相加字符串变量也可以拼接 但是 字符串 和 整型数字整型数字 和 字符串不能相加 怎么办&#xff1f; 转格式int(“1”)str(2) 可是 如果输入的苹果数量是 字符串"abc" int(“abc”)会发生什么&#xff1f;&…

OpenText Exceed TurboX (ETX) 安全功能介绍

OpenText Exceed TurboX (ETX) 安全功能介绍 将所有重要的知识产权&#xff08;IP &#xff09;相关数据保存在受良好保护的中央数据中心是保护 IP 的最佳做法。安全的远程访问是保护知识产权的关键。 所有数据流量均采用最新标准加密技术进行加密ETX 整合多种身份验证系统ET…

Python爬虫之MongoDB

目录 一、Mongo概述 二、安装&下载 1.下载&#xff1a; 2.安装 三、基本命令 插⼊数据 查询数据 修改数据 删除数据 索引 四、Python与MongoDB交互 1.安装pymongo 2.使⽤ 一、Mongo概述 MongoDB是什么&#xff1f; MongoDB是⾮关系型数据库(No sql) 为啥需要…

吃透Redis面试八股文

Redis连环40问&#xff0c;绝对够全&#xff01; Redis是什么&#xff1f; Redis&#xff08;Remote Dictionary Server&#xff09;是一个使用 C 语言编写的&#xff0c;高性能非关系型的键值对数据库。与传统数据库不同的是&#xff0c;Redis 的数据是存在内存中的&#xf…

原来这就是所谓的 JSR!

相信大家在学习 Java 的过程中&#xff0c;或多或少都见过 JSR 这个词。本篇文章就科普下什么是 JSR。 什么是 JSR &#xff1f; JSR&#xff08;Java Specification Requests&#xff09;&#xff0c;是指 Java 规范请求&#xff08;或者活规范提案&#xff09;。这个请求&a…

API 自动化测试难点总结与分享

笔者是 API 管理工具的项目参与者之一&#xff0c;在日常工作中会经常遇到 API 自动化测试难点&#xff0c;我决定总结分享给大家&#xff1a; API 自动化测试的难点包括&#xff1a; 接口的参数组合较多&#xff0c;需要覆盖各种可能的情况。 接口的状态和数据关联较多&#…

typescript全局安装卸载以及npm相关问题

全局安装 npm install -g typescript 全局安装之后&#xff0c;如果想要卸载要使用 npm uninstall -g typescript 全局安装之后可以在终端使用 tsc xxx 编译ts文件 本地安装&#xff0c;也就是在项目目录下安装 npm install typescript 本地卸载 npm uninstall type…

2022年中国广义数据智能市场规模为442亿元

数据智能是当前市场上的热点应用。但业界对于数据智能的发展状况&#xff0c;缺乏深入探讨与研究。为了系统梳理中国数据智能行业市场全景&#xff0c;厘清行业发展脉络&#xff0c;为从业者提供有价值的借鉴&#xff0c;海比研究院联合中国软件网、中国软件行业协会应用软件产…

camunda如何处理流程待办任务

在 Camunda 中处理流程任务需要使用 Camunda 提供的 API 或者用户界面进行操作。以下是两种常用的处理流程任务的方式&#xff1a; 1、通过 Camunda 任务列表处理任务&#xff1a;在 Camunda 任务列表中&#xff0c;可以看到当前需要处理的任务&#xff0c;点击任务链接&#…

【谷歌扩展程序入门】简单制作一个查看网页结构的扩展程序

简言 在想看网页结构的时候一般会F12查看元素内容。 太麻烦了 还不简单方便。 扩展程序 扩展建立在诸如 HTML、JavaScript 和 CSS 之类的 Web 技术之上。它们在单独的沙盒执行环境中运行并与 Chrome 浏览器交互。 扩展允许您通过使用 API 修改浏览器行为和访问 Web 内容来“扩…

婚恋交友app开发中需要注意的安全问题

前言 随着移动设备的普及&#xff0c;婚恋交友app已经成为了人们生活中重要的一部分。但是&#xff0c;这些应用的开发者需要确保应用的安全性&#xff0c;以保护用户的隐私和数据免受攻击。本文将介绍在婚恋交友app开发中需要注意的安全问题。 在当今数字化时代&#xff0c;…

程序员面试完之后,人麻了...

去面试吧 面不被录用的试 面hr为了完成任务的试 面一轮二轮没有下文试 面需要通勤2小时的试 面随时加班的试 ...... 今年的“金三银四”被网友们称为“铜三铁四”&#xff0c;招聘软件上的岗位都能背下来了&#xff0c;简历却依然石沉大海。 好不容易等来个回复&#xff…

Linux常用命令——iptables命令

在线Linux命令查询工具 iptables Linux上常用的防火墙软件 补充说明 iptables命令是Linux上常用的防火墙软件&#xff0c;是netfilter项目的一部分。可以直接配置&#xff0c;也可以通过许多前端和图形界面配置。 补充说明 - 语法选项基本参数 - 命令选项输入顺序工作机制…