一文快速掌握高性能内存队列Disruptor

写在文章开头

Disruptor是英国外汇公司LMAX开源的一款高性能内存消息队列,理想情况下单线程可支撑600w的订单。所以本文会从使用以及设计的角度来探讨一下这款神级java消息队列。

在这里插入图片描述

Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili

因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

在这里插入图片描述

基础使用示例

前置步骤

我们会基于该框架实现一个简单的生产者消费者模型,在此之前我们需要引入一下依赖:

	<dependency>
                <groupId>com.lmax</groupId>
                <artifactId>disruptor</artifactId>
                <version>3.3.4</version>
            </dependency>

约定消息模型

生产者消费者沟通的媒介就是消息,所以我们首先需要创建发送消息的消息模型,这里仅仅简单创建一个对象用户记录发送的字符串消息:

@Data
public class MessageModel {
    /**
     * 消息内容
     */
    private String message;
}

基于消息模型初始化消息队列空间

Disruptor对于事件的存储进行更新操作都是基于RingBuffer,项目启动前它会基于我们的消息也就是上文的MessageModel 进行空间预初始化,所以我们需要继承EventFactory编写创建实例方法,告知Disruptor如何创建什么样消息对象空间,这一点我们可以在源码RingBufferFields的构造方法中得以印证:

RingBufferFields(
        EventFactory<E> eventFactory,
        Sequencer sequencer)
    {
    	//基于sequencer完成队列数组entries 数组初始化
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();

      	//......

        this.indexMask = bufferSize - 1;
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        //基于我们给定的MsgEventFactory完成数组内部元素空间预初始化
        fill(eventFactory);
    }

    private void fill(EventFactory<E> eventFactory)
    {
        for (int i = 0; i < bufferSize; i++)
        {
        	//调用我们的工厂方法完成元素内部元素空间初始化
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }

所以我们直接继承EventFactory给出消息模型创建的工厂方法:

public class MsgEventFactory implements EventFactory<MessageModel> {
    @Override
    public MessageModel newInstance() {
        return new MessageModel();
    }
}

定制消息处理器

通过继承EventHandler并指定泛型即可接手并处理MessageModel消息,逻辑比较简单,读者可自行参阅:

@Slf4j
public class MsgEventHandler implements EventHandler<MessageModel> {
    @Override
    public void onEvent(MessageModel messageModel, long sequence, boolean endOfBatch) {

        //休眠2s,模拟异步消费
        ThreadUtil.sleep(2000);

        log.info("消费者处理消息开始");
        if (messageModel != null) {
            log.info("收费者收到消息,序列号:{},消息内容:{}", sequence, messageModel.getMessage());
        }
    }
}

配置Disruptor核心参数

上述步骤完成之后,我们就可以配置环形队列RingBuffer了:

@Configuration
public class RingBufferConfig {

    @Bean
    public RingBuffer<MessageModel> messageModelRingBuffer() {
        //定义事件处理线程池,即消费者线程池
        ThreadFactory threadFactory = ThreadFactoryBuilder.create()
                .setNamePrefix("thread-")
                .build();

        //指定事件工厂
        MsgEventFactory msgEventFactory = new MsgEventFactory();

        //指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
        int bufferSize = 1024 * 256;

        //单线程模式,获取额外的性能
        Disruptor<MessageModel> disruptor = new Disruptor<>(msgEventFactory, bufferSize, threadFactory,
                ProducerType.SINGLE, new BlockingWaitStrategy());

        //设置事件业务处理器---消费者
        disruptor.handleEventsWith(new MsgEventHandler());

        // 启动disruptor线程
        disruptor.start();

        //获取RingBuffer,用于接取生产者生产的事件
        RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();

        return ringBuffer;
    }
}

实现生产者

上述步骤已经完成的Disruptor的所有创建和配置工作,注入环形队列,我们的服务就可以投递的消息了,这里我们给出对应的DisruptorMqServiceImpl 的实现代码:

@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {

    @Autowired
    private RingBuffer<MessageModel> messageModelRingBuffer;


    @Override
    public void sendMsg(String message) {
        log.info("投递消息,消息内容: {}", message);
        //获取下一个Event槽的下标
        long sequence = messageModelRingBuffer.next();
        try {
            //给Event填充数据
            MessageModel event = messageModelRingBuffer.get(sequence);
            event.setMessage(message);
            log.info("往消息队列中添加事件:{}", JSONUtil.toJsonStr(event));
        } catch (Exception e) {
            log.error("消息发送失败,失败原因 {}", e.getMessage(), e);
        } finally {
            //发布Event,激活观察者去消费,将sequence传递给改消费者
            //注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
            messageModelRingBuffer.publish(sequence);
        }
    }
}

效果展示

我们通过外部接口直接调用,可以看到MsgEventHandler接收并准确的处理了我们投递的消息:

2024-02-16 00:17:14.223  INFO 9924 --- [io-18080-exec-1] c.s.service.impl.DisruptorMqServiceImpl  : 投递消息,消息内容: demoData
2024-02-16 00:17:14.254  INFO 9924 --- [io-18080-exec-1] c.s.service.impl.DisruptorMqServiceImpl  : 往消息队列中添加事件:{"message":"demoData"}
2024-02-16 00:17:14.255  INFO 9924 --- [io-18080-exec-1] c.sharkChili.controller.TestController   : 消息队列已发送完毕
2024-02-16 00:17:16.268  INFO 9924 --- [       thread-0] com.sharkChili.handler.MsgEventHandler   : 消费者处理消息开始
2024-02-16 00:17:16.268  INFO 9924 --- [       thread-0] com.sharkChili.handler.MsgEventHandler   : 收费者收到消息,序列号:0,消息内容:demoData

Disruptor工作流程详解

Disruptor官网文档详尽给出所有核心的组件概念,详情可参考:LMAX Disruptor User Guide

这里我们以流程化的方式给出几个比较核心的概念,如下图所示,首先是生产者Producer也就是我们上文中的DisruptorMqServiceImpl通过RingBuffer获取对应序列号的消息对象MessageModel的引用将消息设置进去。此时基于等待策略等待就绪事件的对应的SequenceBarrier就会拿到这个消息的序列号并传递给消费者即EventHandlerEventHandler会基于当前收到的序列号到RingBuffer中获取对应的消息并处理。

在这里插入图片描述

这一点我们可以在BatchEventProcessor的run方法中得以印证:

@Override
    public void run()
    {
        //......
        T event = null;
        long nextSequence = sequence.get() + 1L;
        try
        {
            while (true)
            {
                try
                {
                	//sequenceBarrier基于等待策略获取就绪的消息序列号
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);
					//拿到序列号之后从ringbuffer(也就是下文的dataProvider)获取对应的消息事件,并通过我们重写的eventHandler处理掉
                    while (nextSequence <= availableSequence)
                    {
                        event = dataProvider.get(nextSequence);
                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                        nextSequence++;
                    }

                    sequence.set(availableSequence);
                }
                catch (final TimeoutException e)
                {
                   //......
                }
              //......
            }
        }
        finally
        {
           //......
        }
    }

详解Disruptor高效的原因

缓存填充

JDK自带的队列ArrayBlockingQueue通过上锁并阻塞线程的方式却保证生产者和消费者之间安全通信,我们的入队(在我们的场景可直接理解为消息投递)为例,可以看到put方法会先上锁如果得不到锁线程会直接进入WAIT状态,然后判断队列是否达到上限,同样的若达到上限当前线程也会被阻塞,等待队列不满时被唤醒再次进行添加操作:

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //上锁,若得不到锁则进入等待状态
        lock.lockInterruptibly();
        try {
        	//队列已满则等待未满再进行操作
            while (count == items.length)
                notFull.await();
             //入队   
            enqueue(e);
        } finally {
        	//释放锁
            lock.unlock();
        }
    }

除此之外JDK自带的ArrayBlockingQueue也没有考虑到并发场景下的伪共享问题,例如每个线程对应的CPU核心都将内存中的ArrayBlockingQueue加载到缓存行中,为了保证双方的缓存一致性,一旦一端修改了ArrayBlockingQueue,那么另一端的ArrayBlockingQueue就会被视为脏数据,这就意味着另一端的CPU需要操作ArrayBlockingQueue就需要重新从内存加载一份全新的ArrayBlockingQueue才能进行更新操作,在并发激烈的场景下,这种情况操作效率大大降低:

在这里插入图片描述

DisruptorRingBuffer中的字段RingBufferFields涉及RingBuffer中核心变量信息的记录,为了避免伪共享问题RingBufferFields继承RingBufferPad保证RingBufferFields每次被加载时前方都有7个8字节的数据填充。同理RingBuffer继承RingBufferFields在其后方填充7个8字节数据,由此保证了每一个RingBufferFields的任意字段被加载时,都有7个不可变的字段填充再任意CPU左右,避免RingBufferFields某个字段更新后,其他CPU缓存行的数据变为脏数据的缓存一致性问题:

在这里插入图片描述

对应的我们给出缓存填充的代码示例:

abstract class RingBufferPad
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

abstract class RingBufferFields<E> extends RingBufferPad
{
//......
}


public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
    public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
    protected long p1, p2, p3, p4, p5, p6, p7;
    //....
}

分支预测

Disruptor通过数组构成一个循环队列,它在初始化时就固定了存储空间,按照局部性原理,一次即可加载批量的元素到缓存行中,结合CPU的分支预测机制,因为数组的顺序加载规律分支预测器可以非常高效的预测并缓存下一条指令从而快速获取到数组中的下一个元素,这就是Disruptor第2个高效的原因:

Disruptor

无锁操作

进行消息批量投递和消费时,Disruptor都会按照如下步骤:

  1. 计算要获取的序列范围。
  2. CAS设置获取并更新序列号进度。
  3. CAS原子更新成功则获取并进行生产或者消费,反之循环重试CAS直至成功。

在这里插入图片描述

为了印证这一点,我们将生产模式模式改为多线程生产ProducerType.MULTI

 Disruptor<MessageModel> disruptor = new Disruptor<>(msgEventFactory, bufferSize, threadFactory,
                ProducerType.MULTI, new BlockingWaitStrategy());

我们以上文中DisruptorMqServiceImpl投递消息前获取序列号这一步作为入口查看这一过程:

 long sequence = messageModelRingBuffer.next();

可以看到底层用sequencer进行序列号自增:

@Override
    public long next()
    {
        return sequencer.next();
    }

对应MultiProducerSequencer的next方法可以看到,它会基于我们传入的n进行CAS累加操作,若成功则说明这批序列号获取成功,我们的生产者可以操作序列号对应的数组空间,从而进行消息投递,而这就是Disruptor高效的第3个原因——无锁。

@Override
    public long next(int n)
    {
      //......

        long current;
        long next;

        do
        {
        	//获取当前序列号
            current = cursor.get();
            //获取自增范围
            next = current + n;

           //......

            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
            {
              //......
            }
            //基于CAS原子更新cursor的数值,若成功则说明可以操作从current 到next这个范围的序列号的对应的数组元素空间,后续可以基于这个范围进行消息投递操作
            else if (cursor.compareAndSet(current, next))
            {
                break;
            }
        }
        //循环CAS操作直至成功
        while (true);

        return next;
    }

小结

以上便是笔者对于Disruptor剖析的全部内容,希望对你有帮助。

我是 sharkchiliCSDN Java 领域博客专家开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

在这里插入图片描述

参考

SpringBoot + Disruptor 实现特快高并发处理,支撑每秒 600 万订单无压力:https://mp.weixin.qq.com/s/k-WiWvIQcNft_fX7uroE0A

官网文档:https://lmax-exchange.github.io/disruptor/user-guide/index.html#user-guide-models

高性能队列——Disruptor:https://tech.meituan.com/2016/11/18/disruptor.html

【计组】理解Disruptor–《计算机组成原理》(十五):https://blog.csdn.net/weixin_56814032/article/details/128999761

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/598333.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

中学数学重大错误:射线A沿其正向平移非0距离就变为其真子集了

黄小宁 射线A沿其射出的方向平移非0距离变为B≌A&#xff0c;中学数学一直认定B是A的一部分&#xff0c;其实这是将两异射线&#xff08;函数&#xff09;误为同一射线&#xff08;函数&#xff09;的肉眼直观错觉。设“点集A&#xff5b;点p&#xff5d;”表示A的元素是点p&a…

MongoDB(四):条件操作符

条件操作 1、概述2、比较操作2.1、大于操作符-$gt2.2、大于等于操作符-$gte2.3、小于——$lt2.4、小于等于——$lte2.5、范围查询 3、总结 大家好&#xff0c;我是欧阳方超&#xff0c;可以扫描下方二维码关注我的公众号“欧阳方超”&#xff0c;后续内容将在公众号首发。 1、…

python实现图书馆借阅管理系统-文件存储

《面向对象》案例引入 通过本章的学习,请用面向对象思想实现《图书馆借阅管理系统》的登录注册页面和用户信息维护页面和图书借阅页面。 【功能要求】: 1、用面向对象思想改写上一章的《函数模块》案例引入。 2、增加图书借阅页面。 ①学生登录后,可以进入图书借阅页面,实现…

ROS摄像头标定

目录 一、内容概要二、 配置ubuntu摄像头环境2.1 硬件准备2.2 软件准备 三、 完成摄像头标定四、 标定结果运用五、 实验心得参考链接 一、内容概要 配置ubuntu摄像头环境进行摄像头标定 二、 配置ubuntu摄像头环境 2.1 硬件准备 1.电脑自带摄像头 或 2.USB外设摄像头 在原…

ASV1000视频监控平台:接入支持JT808标准的设备

目录 一、JT/T 808标准简介 &#xff08;一&#xff09;概述 &#xff08;二&#xff09;标准内容简介 1、消息分类 2、位置信息 3、报警信息 4、车辆控制 5、数据转发 二、在ASV1000上通过JT808添加设备 &#xff08;一&#xff09;登录视频监控平台管理端 &#x…

Hass哈斯数控数据采集网络IP配置设置

机床数据采集&#xff08;MDC&#xff09;允许你使用Q和E命令通过网络接口或选项无线网络从控制系统提取数据。设置143支持该功能&#xff0c;并且指定控制器使用这个数据端口。MDC是一个需要一台附加计算机发送请求&#xff0c;解释说明和存储机床数据的软件功能。这个远程计算…

使用pytorch构建GAN网络并实现FID评估

上一篇文章介绍了GAN的详细理论&#xff0c;只要掌握了GAN&#xff0c;对于后面各种GAN的变形都变得很简单&#xff0c;基础打好了&#xff0c;盖大楼自然就容易了。既然有了理论&#xff0c;实践也是必不可少的&#xff0c;这篇文章将使用mnist数据集来实现简单的GAN网络&…

如何从Mac上的清空垃圾箱中恢复已删除的文件?

Mac用户几乎每天都会删除文件。当您将文档删除到 Mac 垃圾箱时&#xff0c;该文件将被剪切到 Mac 垃圾箱中&#xff0c;并且可以轻松放回原处。但是&#xff0c;在某些情况下&#xff0c;您错误地删除了文档和文件&#xff0c;并在您意识到自己犯了一个大错误之前清空了垃圾箱。…

Advanced RAG 06:生成结果的相关性低? 快用 Query Rewriting 优化技术

编者按&#xff1a;在现实生活中&#xff0c;普通用户很难编写合适的提示词&#xff08;prompt&#xff09;来指示 LLM 完成期望任务。用户提出的 queries 往往存在词汇不准确、缺乏语义信息等问题&#xff0c;导致 LLM 难以理解并生成相关的模型响应。因此&#xff0c;如何优化…

刷代码随想录有感(58):二叉树的最近公共祖先

题干&#xff1a; 代码&#xff1a; class Solution { public:TreeNode* traversal(TreeNode* root, TreeNode* p, TreeNode* q){if(root NULL)return NULL;if(root p || root q)return root;TreeNode* left traversal(root->left, p, q);TreeNode* right traversal(r…

JuiceFS v1.2-beta1,Gateway 升级,多用户场景权限管理更灵活

JuiceFS v1.2-beta1 今天正式发布。在这个版本中&#xff0c;除了进行了大量使用体验优化和 bug 修复外&#xff0c;新增三个特性&#xff1a; Gateway 功能扩展&#xff1a;新增了“身份和访问管理&#xff08;Identity and Access Management&#xff0c;IAM&#xff09;” 与…

泛型编程四:栈、堆,内存管理

文章目录 前言一、栈、堆栈&#xff08;Stack&#xff09;堆&#xff08;Heap&#xff09; 二、static生命期三、heap生命期四、new、delete的作用机制五、动态分配的内存&#xff08;in VC&#xff09;如图&#xff0c;第一列为调试模式下的复数的内存分配&#xff0c;复数有两…

电子合同:纸质合同的未来替代者?

随着科技的迅猛发展&#xff0c;电子合同作为一种新兴的合同形式&#xff0c;逐渐在各行各业中崭露头角。那么&#xff0c;电子合同是否会替代纸质合同&#xff0c;成为未来合同形式的主流呢&#xff1f;本文将就此话题展开探讨。 首先&#xff0c;我们来看电子合同的优势。电…

cookie没有携带的问题

背景&#xff1a; build-model应用在hcs迁移的时候&#xff0c;前、后端各自部署了一个新应用&#xff0c;但是调试时候发现没有cookie&#xff0c;导致鉴权失败&#xff01; 注&#xff1a; 后端通过cookie中的token做鉴权的&#xff0c;前端调用接口的时候&#xff0c;查看&…

SPD1179 电路设计---汽车电机控制设计

概述 SPD1179 是旋智针对汽车应用推出的一颗高度集成的片上系统&#xff08;SOC&#xff09; 微控制器&#xff0c;内置 32 位高性能 ARMCortex-M4F 内核&#xff0c;最高 100MHz 的软件可编程时钟频率&#xff0c; 32KB SRAM&#xff0c; 128KB 嵌入式 FLASH&#xff0c; 1KB …

04-18 周四 为LLM_inference项目配置GitHub CI过程记录

04-18 周四 为LLM_inference项目配置GitHub CI过程记录 时间版本修改人描述2024年4月18日10:30:13V0.1宋全恒新建文档 简介和相关文档 04-15 周一 GitHub仓库CI服务器配置过程文档actions-runner 是托管与GitHub上的仓库&#xff0c;下载最新的客户端程序即可。self hosted r…

多C段的美国站群服务器有什么用途?

多C段的美国站群服务器有什么用途? 多C段的美国站群服务器是一种常见的网络运营策略&#xff0c;其用途主要体现在以下几个方面&#xff1a; 多C段的美国站群服务器有什么用途? 1. 提高站点排名和流量 部署多个站点在不同的C段IP地址上&#xff0c;可以通过不同的IP地址发布…

BGP协议应用:SW1、SW2、SW3、RT1、RT2之间运行BGP协议

8.SW1、SW2、SW3、RT1、RT2之间运行BGP协议,SW1、SW2、RT1 AS号65001、RT2 AS号65002、SW3 AS号65003。 (1)SW1、SW2、SW3、RT1、RT2之间通过Loopback1建立IPv4 BGP邻居。SW1和SW2之间财务通过Loopback2建立IPv4 BGP邻居,SW1和SW2的Loopback2互通采用静态路由。 (2)SW1…

运行一个jar包

目录 传送门前言一、Window环境二、Linux环境1、第一步&#xff1a;环境配置好&#xff0c;安装好jdk2、第二步&#xff1a;打包jar包并上传到Linux服务器3、第三步&#xff1a;运行jar包 三、docker环境1、Linux下安装docker和docker compose2、Dockerfile方式一运行jar包2.1、…

优思学院|HR部门如何制定公司的精益六西格玛培训计划?

在许多企业中&#xff0c;精益六西格玛作为一种提升效率和质量的重要方法论&#xff0c;越来越受到重视。HR部门在推广和实施精益六西格玛培训计划中其实也扮演着关键角色。以下是HR部门可以采取的几个步骤&#xff0c;以有效地制定和实施这样的培训计划。 1. 需求分析 首先&…