单机无锁线程安全队列-Disruptor

Disruptor

1、基本介绍

说到队列,除了常见的mq中间件,java中也自带线程安全的BlockingQueue,但是BlockingQueue通过在入队和出队时加锁的方式避免并发操作,性能上会大打折扣。
而Disruptor是一个线程安全、低延迟、吞吐量高的队列,并且解决BlockingQueue加锁带来的性能下降问题,十分适合单机使用。
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。基于Disruptor开发的系统单线程能支撑每秒600万订单。

2、与BlockingQueue对比

  1. 使用CAS代替锁
  2. 多播模式,同一事件可以交给多个消费者处理
  3. 基于环形数组RingBuffer,创建时就固定长度,不出现空间新分配情况,减少垃圾回收

这是官网与BlockingQueue对比的延迟直方图,可以看出,BlockingQueue出现延迟的机率比Disruptor高得多。

img.png

3、生产者消费者模式

在Disruptor中,生产者与消费者支持一对一、一对多或者多对多的关系。下面举例如何实现:

引入最新包

        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>4.0.0</version>
        </dependency>

定义一个商品

@Data
public class Goods {

    private String name;

}

定义生产者

public class Producer {
    private final RingBuffer<Goods> ringBuffer;

    public Producer(RingBuffer<Goods> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    /**
     * 生产货品
     * @param goodsName
     */
    public void onData(String goodsName) {
        long sequence = ringBuffer.next();
        try {
            Goods goods = ringBuffer.get(sequence);
            goods.setName(goodsName);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}

定义消费者

@Data
public class Consumer implements EventHandler<Goods>{

    private String name;

    public Consumer(String name){
        this.name = name;
    }

    @Override
    public void onEvent(Goods goods, long l, boolean b)  {
        //消费者接收到货品
        System.out.println(name+"消费了"+goods.getName());
    }

    @Override
    public void onBatchStart(long batchSize, long queueDepth) {
        EventHandler.super.onBatchStart(batchSize, queueDepth);
    }

    @Override
    public void onStart() {
        EventHandler.super.onStart();
    }

    @Override
    public void onShutdown() {
        EventHandler.super.onShutdown();
    }

    @Override
    public void onTimeout(long sequence) throws Exception {
        EventHandler.super.onTimeout(sequence);
    }

    @Override
    public void setSequenceCallback(Sequence sequenceCallback) {
        EventHandler.super.setSequenceCallback(sequenceCallback);
    }
}

一个生产者对一个消费者

img_1.png

public class DisruptorDemo {
    
    public static void main(String[] args) throws InterruptedException {
        Disruptor<Goods> disruptor = new Disruptor<>(
                Goods::new,
                16,  // RingBuffer 大小,必须是 2 的 N 次方
                Executors.defaultThreadFactory(), //线程池
                ProducerType.SINGLE,   //指定单生产者还是多生产者
                new YieldingWaitStrategy() //等待策略
        );
        RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();
        //单生产者,单消费者
        disruptor.handleEventsWith(new Consumer("Consumer1"));
        disruptor.start();
        Producer producer = new Producer(ringBuffer);
        while (true){
            producer.onData("goods"+UUID.randomUUID());
            Thread.sleep(1000);
        }
    }
}

一个生产者对多个消费者

消费者按顺序消费:

img_2.png

public class DisruptorDemo {
    public static void main(String[] args) throws InterruptedException {
        Disruptor<Goods> disruptor = new Disruptor<>(
                Goods::new,
                16,  // RingBuffer 大小,必须是 2 的 N 次方
                Executors.defaultThreadFactory(), //线程池
                ProducerType.MULTI,   //指定单生产者还是多生产者
                new YieldingWaitStrategy() //等待策略
        );
        RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();
        //多个消费者按顺序消费
        disruptor.handleEventsWith(new Consumer("Consumer1")).then(new Consumer("Consumer2"));
        disruptor.start();
        Producer producer = new Producer(ringBuffer);
        while (true){
            producer.onData("goods"+UUID.randomUUID());
            Thread.sleep(1000);
        }
    }
}

多播模式,同一事件可以交给多个消费者处理

img_4.png
只需要将上述代码修改一下即可

   //Consumer1、Consumer2、Consumer3先消费,Consumer4后消费
   disruptor.handleEventsWith(new Consumer("Consumer1"),new Consumer("Consumer2"),new Consumer("Consumer3"))
   .then(new Consumer("Consumer4"));

多个生产者对多个消费者

img_5.png

public class DisruptorDemo {

    public static void main(String[] args) throws InterruptedException {
        Disruptor<Goods> disruptor = new Disruptor<>(
                Goods::new,
                16,  // RingBuffer 大小,必须是 2 的 N 次方
                Executors.defaultThreadFactory(), //线程池
                ProducerType.MULTI,   //指定单生产者还是多生产者
                new YieldingWaitStrategy() //等待策略
        );
        RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();
        disruptor.handleEventsWith(new Consumer("Consumer1")).then(new Consumer("Consumer2"));
        disruptor.start();
        Producer producer1 = new Producer(ringBuffer);
        Producer producer2 = new Producer(ringBuffer);
        Producer producer3 = new Producer(ringBuffer);
        while (true){
            producer1.onData("goods"+UUID.randomUUID());
            producer2.onData("goods"+UUID.randomUUID());
            producer3.onData("goods"+UUID.randomUUID());
            Thread.sleep(1000);
        }
    }
}

除了上述多播模式中多个消费者各自处理事件(一个event事件会同时被多个消费者处理),其实还有Disruptor另一种模式:多个消费者合作处理一批事件(一个event事件会被其中一个消费者处理),由Disruptor 的 WorkPool 支持,不过在4.0中已经被去除了

img_8.png
看了github的issue,作者大概意思说难以维护,并且在LMAX公司也不会用到WorkPool,所以就去除了。

img_9.png

img_10.png

4、RingBuffer原理

Disruptor内部由环形数组Ring Buffer(数组必须为2的n次方)。

image.png
1、Ring Buffer使用环形数组,有效避免线性数组index越界问题,而且数组内元素的内存地址是连续的,对CPU缓存友好,在硬件级别,数组中的元素是会被预加载的,所以RingBuffer中,CPU无需时不时去主内存加载数组中的下一个元素。通过对cursor指针的移动,可以实现数据在数组中的环形存取。
2、在多生产者场景下,多个生产者会进行竞争,防止读到还未写的元素。引入了一个与Ring Buffer大小相同的buffer:available Buffer,用来判断Ring Buffer某个元素是否已经就绪。
3、为什么available Buffer也做成圈呢?这样做是防止把上一轮的数据当成这一轮的数据,错误判断Ring Buffer元素可用。
4、为什么Ring Buffer要2的n次方,因为会涉及到二进制&运算,来算出元素位置,在源码中可以找到。

img_11.png
5、具体RingBuffer写数据和读数据流程,可以参考美团技术博客:https://tech.meituan.com/2016/11/18/disruptor.html

5、等待策略

生产者和消费者都可能出现速度过快的情况,比如队列满了,生产者需要等待消费者消费后才能生产,或者消费者消费过快导致队列为空,进而需要等待生产者生产。
Disruptor目前一共内置了8种等待策略。

img_7.png

  1. BlockingWaitStrategy:用了ReentrantLock的等待唤醒机制实现等待逻辑,是默认策略,对CPU的消耗最小
  2. BusySpinWaitStrategy: 持续自旋,会消耗大量CPU资源
  3. LiteBlockingWaitStrategy: 基于BlockingWaitStrategy,非重入锁的阻塞等待策略,在没有锁竞争的时候会省去唤醒操作
  4. TimeoutBlockingWaitStrategy: 超时等待策略,它会使消费者线程进入阻塞状态,在指定的时间内等待新的事件,如果等待超时则退出
  5. LiteTimeoutBlockingWaitStrategy: 基于TimeoutBlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作
  6. SleepingWaitStrategy: 三段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU,第三阶段睡眠执行时间,反复的睡眠
  7. YieldingWaitStrategy: 二段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU
  8. PhasedBackoffWaitStrategy: 四段式,第一阶段自旋指定次数,第二阶段自旋指定时间,第三阶段执行Thread.yield交出CPU,第四阶段调用成员变量的waitFor方法,这个成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy这三个中的一个

6、结束

Disruptor简单的介绍已经结束了,点个赞再走啦!~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/219956.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

波奇学C++:类型转换和IO流

隐式类型转换 int i0; double pi; 强制类型转换 int* pnullptr; int a(int)p; 单参数构造函数支持隐式类型转换 class A { public:A(string a):_a(a){} private:string _a; }; A a("xxxx"); //"xxx" const char* 隐式转换为string 多参数也可以通过{…

深入理解强化学习——马尔可夫决策过程:占用度量-[基础知识]

分类目录&#xff1a;《深入理解强化学习》总目录 文章《深入理解强化学习——马尔可夫决策过程&#xff1a;贝尔曼期望方程-[基础知识]》中提到&#xff0c;不同策略的价值函数是不一样的。这是因为对于同一个马尔可夫决策过程&#xff0c;不同策略会访问到的状态的概率分布是…

园区规划技术要点

&#xff08;一&#xff09;技术点介绍 1.WLAN&#xff1a;无线局域网WLAN&#xff08;Wireless Local Area Network&#xff09;是一种无线计算机网络&#xff0c;使用无线信道代替有线传输介质连接两个或多个设备形成一个局域网LAN&#xff08;Local Area Network&#xff09…

【亲测有效,超详细】收到微信小程序限期完成微信认证通知怎么处理?微信小程序年审认证都需要哪些资料?

背景&#xff1a;近期部分微信小程序管理员最近收到了年审认证通知如下图 微信官方通知 微信小程序认证流程 第一步&#xff1a;登录微信公众平台 网址&#xff1a;微信公众平台 第二步&#xff1a;登录进入后会看到年审通知弹窗&#xff0c;点击去年审 第二步&#xff1a;登…

java中Random随机数使用和生成随机数的多个示例

在 Java 中&#xff0c;我们可以使用 java.util.Random 类生成伪随机数。伪随机数的特性是&#xff0c;虽然它们看起来是随机的&#xff0c;但实际上它们是由一个固定的算法生成的。只要我们提供相同的种子&#xff0c;这个算法就会生成相同的数字序列。 首先&#xff0c;我们…

HarmonyOS开发基础(一)

HarmonyOS开发基础&#xff08;一&#xff09; // &#xff1a;装饰器&#xff1a;用来装饰类结构、方法、变量 Entry // Entry&#xff1a;标记当前组件为入口组件 Component // Component&#xff1a;标记为自定义组件 // struct&#xff1a;自定义组件&#xff0c;可复用的…

winform使用串口通信读取压力传感装置(CFM)的数据

一、简介 目的&#xff1a;获取CFM的 “hi” 报文&#xff0c;解析出如下数据并绘制波形图。 实现&#xff1a;使用c#打开CFM串口&#xff0c;发送 02 00 02 4C 49 0D 请求到串口&#xff0c;CFM就会不断返回不同类型的报文&#xff0c;我解析的是 “hi” 报文&#xff08;至…

2477. 到达首都的最少油耗 : 逐步讲解最低油耗求解思路

题目描述 这是 LeetCode 上的 「2477. 到达首都的最少油耗」 &#xff0c;难度为 「中等」。 Tag : 「DFS」 给你一棵 n 个节点的树&#xff08;一个无向、连通、无环图&#xff09;&#xff0c;每个节点表示一个城市&#xff0c;编号从 0 到 n - 1&#xff0c;且恰好有 n - 1 …

全网最新最牛的Appium自动化:Appium常用操作之TouchAction操作

TouchAction操作 Appium的辅助类&#xff0c;主要针对手势操作&#xff0c;比如滑动、长按、拖动等。其原理是将一系列的动作放在一个链条中&#xff0c;然后将该链条传递给服务器。服务器接受到该链条后&#xff0c;解析各个动作&#xff0c;逐个执行。 TouchAction类支持的动…

解决:docx.opc.exceptions.PackageNotFoundError: Package not found at ‘xxx’

解决&#xff1a;docx.opc.exceptions.PackageNotFoundError: Package not found at ‘xxx’ 文章目录 解决&#xff1a;docx.opc.exceptions.PackageNotFoundError: Package not found at ‘xxx’背景报错问题报错翻译报错位置代码报错原因解决方法今天的分享就到此结束了 背景…

深度学习TensorFlow2基础知识学习前半部分

目录 测试TensorFlow是否支持GPU&#xff1a; 自动求导&#xff1a; 数据预处理 之 统一数组维度 定义变量和常量 训练模型的时候设备变量的设置 生成随机数据 交叉熵损失CE和均方误差函数MSE 全连接Dense层 维度变换reshape 增加或减小维度 数组合并 广播机制&#…

MYSQL8用户权限配置详解

单位的系统性能问题需要把Mysql5升级到Mysql8&#xff0c;需要用到Mysql8的一些特性来提升系统的性能。 配置用户权限过程中发现一些问题&#xff0c;学习并记录一下。 目录 一、环境 二、MySQL8 用户权限 2.1 账号管理权限 2.1.1 连接数据库 2.1.2 账号权限配置 2.2 密码…

从开发到测试,你需要掌握哪些必备测试技能?

一、为什么从开发转测试 我从2019年5月开始从一名java开发女程序猿正式转为测试开发工程师&#xff0c;原因除了机缘凑巧之外&#xff0c;当然是因为这个行业对测试工程师的要求已经越来越高&#xff0c;简单做些UI脚本录制和回放的自动化&#xff0c;参考度娘写出框架demo却不…

二叉树求叶子节点

以这个图展示叶子节点的求取 项目结构 项目代码截图&#xff1a;使用递归的方式求取二叉树的叶子节点&#xff08;递归指的是函数自己调用自己的过程&#xff09; 具体代码展示 #define _CRT_SECURE_NO_WARNINGS #include <stdio.h> #include <stdlib.h> #includ…

全网最新最全的Appium自动化:Appium常用操作之等待操作

等待机制&#xff1a; 为了保证脚本的稳定性&#xff0c;有时候需要引入等待时间&#xff0c;等待页面加载元素后再进行操作&#xff0c;主要有三种等待时间设置方式。 方式一&#xff1a; sleep()&#xff1a;固定等待时间设置&#xff0c;python的time包里提供了休眠方法sle…

Clion自定义管理和配置软件构建过程的工具(代替CMake)构建程序

在公司由于需要x86环境和其他arm环境&#xff0c;同时需要使用公司自定义的mine_x86或者mine_orin对代码进行编译。 编译命令如下mine_x86 build -Dlocal1 -j8,为使用Clion对程序进行调试&#xff0c;需要对程序进行设置。方便调试代码时能够断点查看变量。尝试了很多次&#…

什么是网络爬虫?有什么用?怎么爬?

嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 【导读】 网络爬虫也叫做网络机器人&#xff0c;可以代替人们自动地在互联网中进行数据信息的采集与整理。 在大数据时代&#xff0c;信息的采集是一项重要的工作&#xff0c;如果单纯靠人力进行信息采集&#xff0c;不仅低…

作业12.5

1.定义一个基类 Animal&#xff0c;其中有一个虛函数perform&#xff08;)&#xff0c;用于在子类中实现不同的表演行为。 #include <iostream>using namespace std; class Animal { private:int weight; public:Animal(){}Animal(int weight):weight(weight){}virtual …

temu最近数据:拼多多旗下跨境电商平台的业绩持续增长

据最近的报道和数据显示&#xff0c;拼多多旗下的跨境电商平台Temu在2023年第三季度取得了显著的业绩增长。销售额突破50亿美元&#xff0c;市场份额不断扩大&#xff0c;用户数量迅速增长。本文将深入探讨Temu的业绩增长、市场份额、用户增长以及其营销策略。 先给大家推荐一款…

批量给文件名加相同后缀的两个方法

如何批量给文件名加相同后缀&#xff1f;文件处理是每个上班族需要面对的工作&#xff0c;并且文件处理能力的高低也体现了我们工作能力的高低&#xff0c;文件处理中就包含文件名称的修改&#xff0c;修改文件名是非常简单的&#xff0c;通过点击软件重命名就可以进行操作&…