【Java并发】聊聊Disruptor背后高性能的原理

为什么需要Disruptor

对于单机生产者消费者来说,JUC本身提供了阻塞队列,ArrayBlockingQueue、LinkedBlockingQueue 等,但是为了保证数据安全,使用了reentrantLock进行加锁操作会影响性能,另一方面,本身如果生产者生产数据过快会导致,内存溢出问题。以及采用数据实现会有伪共享问题。

Disruptor 原理和应用场景

那么Disruptor是如何进行设计的?

  • 环形数组结构
  • 元素位置定位
  • 无锁设计
  • 利用缓存行填充解决伪共享问题
  • 实现了基于事件驱动的生产者消费者模型(观察者模式)

RingBuffer数据结构

在这里插入图片描述
唤醒数组其实就是一个自定义大小的环形数组,有一个序列号 sequence 用以指向下一个可用的元素,需要保证数组的长度必须是2的N次幂,这样就可以通过sequence % length 或者 通过 sequence &(length -1) 可以直接获取到下标位置。其实hashmap也是同样的原理。

那么当环形数组数据满之后,就会覆盖0号位置,具体使用什么策略。提供了4种。

  • BlockingWaitStrategy:不覆盖数据,等待
  • SleepingWaitStrategy
  • YieldingWaitStrategy
  • BusySpinWaitStrategy

这里了解即可,用到的时候 在查

缓存行

abstract class RingBufferPad
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}
	
 
 
abstract class RingBufferFields<E> extends RingBufferPad
{
    ......
}
 
 
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
    public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
    protected long p1, p2, p3, p4, p5, p6, p7;
    ......

刚开始看 发现什么 写一堆这些px 有什么用,其实是充分利用计算机cpu core 的 cache。利用填充行填充。比如我们定义了一个10个长度的long数组,不同一个元素一个元素从内存加载搭配CPU cache的。而是一次性固定加载整个缓存行。

在这里插入图片描述
所以如果只是一个单独的long 变量,可能在多个线程操作下,前后的变量,可能来回的回写和加载,不断的导致INITIAL_CURSOR_VALUE 从内存到 CPU cache,为了防止,所以在前后 添加7个long 类型变量。就会一只在CPU cache中。

无锁的并发-生产者=消费者模型

其实我们可以通过一个数组模拟出一个生产者消费者模型,但是这种方式在单生产者单消费者的情况下,其实没有问题,在多线程情况下,其实没有办法解决,需要加锁的方式,但是加锁的话,其实从一定程度上降低了系统的整体性能,比如说 ArrayBlockingQueue 中 添加元素和获取元素 都是通过lock的方式加锁。

    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }

那么Disruptor是如何实现的?
对生产者来说: 往队列中添加数据之前,可以先申请可用空闲存储单元,并且是批量申请连续N个单元,申请之后,后续就不用往队列中添加元素,不用加锁。并且申请的存储单元是这个线程独享的。不过申请存储单元的过程需要加锁。

对于消费者来说:也是一次获取多个可读的范围,申请一批连续可读的存储单元。

比如对于 生产者A申请到一组连续的存储单元,3到6,生产者B申请到7到9的存储单元。那么在3到6没有完全写入数据之前,7到9是没有办法读取。这是一个弊端。
在这里插入图片描述

Code

构建数据

/**
 * @author qxlx
 * @date 2024/1/29 22:39
 */
public class Data  {

    private String uid;

    public String getUid() {
        return uid;
    }

    public void setUid(String uid) {
        this.uid = uid;
    }
}

生产者

public class EventProducer {

    private RingBuffer<Data> ringBuffer;

    public EventProducer(RingBuffer<Data> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void send(long value,String name) {
        long next = ringBuffer.next();
        Data data = ringBuffer.get(next);

        // 写入消息数据
        data.setUid(name);

        //发布事件
        ringBuffer.publish(next);
    }

}

数据工厂

public class OrderEventFactory implements EventFactory<Data> {

    @Override
    public Data newInstance() {
        return new Data();
    }
}

消费者

public class EventHanderConsumer implements EventHandler<Data> {

    @Override
    public void onEvent(Data data, long l, boolean b) throws Exception {
        System.out.println("消费者获取数据"+data.getUid());
    }
}
    public static void main(String[] args) {
       // 构建disruptor对象 
        Disruptor<Data> disruptor = new Disruptor<Data>(
                new OrderEventFactory(),
                1024 * 1024,
                Executors.defaultThreadFactory(),
                ProducerType.SINGLE,
                new YieldingWaitStrategy() //等待策略
        );
		
		// 消费者
        disruptor.handleEventsWith((EventHandler<? super Data>) new EventHanderConsumer());
        
        // 启动
        disruptor.start();

		// 生产数据
        RingBuffer<Data> ringBuffer = disruptor.getRingBuffer();
        EventProducer eventProducer = new EventProducer(ringBuffer);

        for (int i = 0; i < 100; i++) {
            eventProducer.send(1,"fix"+i);
        }
    }

应用场景

在实际的应用场景中,比如我们分库分表,user表有8个子表。那么如何保证每个子表生产的uid是固定增长的。一种方式是使用分布式id 雪花算法,另一种方式则可以通过将每个子表的id 每次都+8。比如表1的id是从1 9。表二 从2 10 这样就可以通过固定的步长确定。

好了本篇其实主要简单介绍了其核心原理,具体的大家可以看源代码。

推荐阅读:https://tech.meituan.com/2016/11/18/disruptor.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/364511.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

正点原子--STM32中断系统学习笔记(1)

1、什么是中断&#xff1f; 原子哥给出的概念是这样的&#xff1a;打断CPU正常执行的程序&#xff0c;转而处理紧急程序&#xff0c;然后返回原暂停的程序继续运行&#xff0c;就叫中断。 当发生中断时&#xff0c;当前执行的程序会被暂时中止&#xff0c;进而进入中断处理函…

05:容器镜像技术揭秘|发布容器服务器|私有镜像仓库

容器镜像技术揭秘&#xff5c;发布容器服务器&#xff5c;私有镜像仓库 创建镜像使用commit方法创建自定义镜像。Dockerfile打包镜像创建apache服务镜像制作 php 镜像 微服务架构创建nginx镜像 发布服务通过映射端口发布服务容器共享卷 docker私有仓库 创建镜像 使用commit方法…

Hadoop3.x基础(3)- MapReduce

来源: B站尚硅谷 目录 MapReduce概述MapReduce定义MapReduce优缺点优点缺点 MapReduce核心思想MapReduce进程常用数据序列化类型MapReduce编程规范WordCount案例实操本地测试提交到集群测试 Hadoop序列化序列化概述自定义bean对象实现序列化接口&#xff08;Writable&#xff…

Nicn的刷题日常之BC68 X形图案

1.题目描述 KiKi学习了循环&#xff0c;BoBo老师给他出了一系列打印图案的练习&#xff0c;该任务是打印用“*”组成的X形图案。 输入描述&#xff1a; 多组输入&#xff0c;一个整数&#xff08;2~20&#xff09;&#xff0c;表示输出的行数&#xff0c;也表示组成“X”的反斜…

推荐一款嵌入式系统自动化测试工具(可免费试用)

本文介绍一款对嵌入式系统进行全面自动化测试的工具&#xff0c;不需要自己做任何开发&#xff0c;就可以在项目测试中直接使用起来&#xff0c;支持对各类嵌入式系统进行全面自动化测试。 嵌入式系统一般是产品的核心单元&#xff0c;嵌入式系统是否可靠决定了整个产品的质量…

【域适应十五】Universal Domain Adaptation through Self-Supervision

1.motivation 传统的无监督域自适应方法假设所有源类别都存在于目标域中。在实践中,对于这两个领域之间的类别重叠可能知之甚少。虽然有些方法使用部分或开放集类别处理目标设置,但它们假设特定设置是已知的先验设置。本文提出了一个更普遍适用的领域自适应框架,可以处理任…

【leetcode热题100】编辑距离

给你两个单词 word1 和 word2&#xff0c; 请返回将 word1 转换成 word2 所使用的最少操作数 。 你可以对一个单词进行如下三种操作&#xff1a; 插入一个字符删除一个字符替换一个字符 示例 1&#xff1a; 输入&#xff1a;word1 "horse", word2 "ros&qu…

政安晨的机器学习笔记——演绎一个TensorFlow官方的Keras示例(对服装图像进行分类,很全面)

导语 Keras是一个高级API接口&#xff0c;用于构建和训练神经网络模型。它是TensorFlow的一部分&#xff0c;提供了一种简洁、直观的方式来创建深度学习模型。 Keras的主要特点如下&#xff1a; 简洁易用&#xff1a;Keras提供了一组简单的函数和类&#xff0c;使模型的创建和…

10、数据结构与算法——堆

一、什么是堆 堆是一种特殊的树形数据结构&#xff0c;通常实现为完全二叉树或满二叉树。堆又分为两种类型最大堆&#xff08;Max Heap&#xff09; 和 最小堆&#xff08;Min Heap&#xff09; 1.1、什么是二叉树 二叉树是一种数据结构&#xff0c;它是由n&#xff08;n≥0&a…

【数据分享】1929-2023年全球站点的逐日最低气温数据(Shp\Excel\免费获取)

气象数据是在各项研究中都经常使用的数据&#xff0c;气象指标包括气温、风速、降水、湿度等指标&#xff0c;其中又以气温指标最为常用&#xff01;说到气温数据&#xff0c;最详细的气温数据是具体到气象监测站点的气温数据&#xff01; 之前我们分享过1929-2023年全球气象站…

面试中问到的算法题。————目录树生成

前言 我在面试中遇到了算法题&#xff0c;也是我第一次面试&#xff0c;也不知道是太紧张了还是太久没刷算法题了&#xff0c;感觉压有点懵的状态&#xff0c;所以当时面试的时候没有做出来或者说只做了一半没有做完。 面试完成后&#xff0c;我又重新审视了一下题目&#xff…

【MBtiles数据格式说明】GeoServer改造Springboot番外系列一

一、MBTiles数据格式 MBTiles格式是指由MapBox制定的一种将瓦片地图数据存储到SQLite数据库中并可快速使用、管理和分享的规范&#xff0c;是一种用于即时使用和高效传输的规范。MBTiles既可以用作栅格输入数据存储&#xff0c;也可以用作WMSGetMap输出格式。规范有1.0&#xf…

linux使用iptables禁用ip

iptables是什么&#xff1f; iptables 是一个强大的开源软件&#xff0c;它是 Linux 系统内核中 netfilter 包过滤框架的一部分&#xff0c;用来实现防火墙功能。iptables 提供了一种灵活的方式来控制和管理进出以及通过 Linux 计算机的网络流量。 前提 我在云服务器上用doc…

物联网可视化平台:赋能企业数字化转型

在数字化转型的大潮中&#xff0c;企业面临着如何更好地理解和利用海量数据的挑战。物联网技术的快速发展&#xff0c;为企业提供了一个全新的视角和解决方案。通过物联网可视化平台&#xff0c;企业能够实时监控、分析和展示物联网数据&#xff0c;从而加速数字化转型的进程。…

前端构建变更:从 webpack 换 vite

现状 这里以一个 op &#xff08;内部运营管理用&#xff09;项目为例&#xff0c;从 webpack 构建改为 vite 构建&#xff0c;提高本地开发效率&#xff0c;顺便也加深对 webpack 、 vite 的了解。 vite 是前端构建工具&#xff0c;使用 一系列预配置进行rollup 打包&#x…

【目标检测】对DETR的简单理解

【目标检测】对DETR的简单理解 文章目录 【目标检测】对DETR的简单理解1. Abs2. Intro3. Method3.1 模型结构3.2 Loss 4. Exp5. Discussion5.1 二分匹配5.2 注意力机制5.3 方法存在的问题 6. Conclusion参考 1. Abs 两句话概括&#xff1a; 第一个真正意义上的端到端检测器最…

phpMyAdmin 未授权Getshell

前言 做渗透测试的时候偶然发现&#xff0c;phpmyadmin少见的打法&#xff0c;以下就用靶场进行演示了。 0x01漏洞发现 环境搭建使用metasploitable2,可在网上搜索下载&#xff0c;搭建很简单这里不多说了。 发现phpmyadmin&#xff0c;如果这个时候无法登陆&#xff0c;且也…

ubuntn挂载硬盘为只读问题

做为服务器操作系统&#xff0c;linux是很多站长经常用到的&#xff0c;那么在linux系统下如果需要新增加硬盘&#xff0c;该怎么增加呢&#xff1f;下面就来详细了解一下linux系统下添加新硬盘、分区及挂载硬盘的全过程。没有服务器的朋友可以点击了解一下阿里云1折优惠云服务…

【JS】Express.js环境配置与示例

&#x1f60f;★,:.☆(&#xffe3;▽&#xffe3;)/$:.★ &#x1f60f; 这篇文章主要介绍Express.js环境配置与示例。 学其所用&#xff0c;用其所学。——梁启超 欢迎来到我的博客&#xff0c;一起学习&#xff0c;共同进步。 喜欢的朋友可以关注一下&#xff0c;下次更新不…

力扣hot100 二叉树的右视图 DFS BFS 层序遍历 递归

Problem: 199. 二叉树的右视图 文章目录 思路&#x1f496; BFS&#x1f496; DFS 思路 &#x1f469;‍&#x1f3eb; 甜姨 &#x1f496; BFS ⏰ 时间复杂度: O ( n ) O(n) O(n) &#x1f30e; 空间复杂度: O ( n ) O(n) O(n) class Solution {public List<Integer&…