轻松上手 Disruptor:两个实例解析并发编程利器

  Disruptor 是英国外汇交易公司 LMAX 开发的一个高性能队列。很多知名开源项目里,比如 canal 、log4j2、 storm 都是用了 Disruptor 以提升系统性能 。

这篇文章,我们通过两个例子一步一个脚印帮助同学们入门 Disruptor 。

1 环形缓冲区

下图展示了 Disruptor 的流程图 。

和线程池机制非常类似, Disruptor 也是非常典型的生产者 / 消费者模式。线程池存储提交任务的容器是阻塞队列,而 Disruptor 使用的是环形缓冲区 RingBuffer

环形缓冲区的设计相比阻塞队列有如下优点:

  • 环形数组结构

为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。

  • 元素位置定位

数组长度 2^n,通过位运算,加快定位的速度。下标采取递增的形式,不用担心 index 溢出的问题。index 是 long 类型,即使 100 万 QPS 的处理速度,也需要 30 万年才能用完。

  • 无锁设计

每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

2 写一个 Hello world

我们写一个非常简单的例子:生产者传递一个单一的长整型值给消费者,而消费者将简单地打印出这个值

2.1 添加依赖

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.6</version>
</dependency>

2.2 定义事件

首先,我们将定义一个事件(Event),它将携带数据,并且在接下来的所有示例中都是通用的。

public class LongEvent {

    private long value;

    public void set(long value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "LongEvent{" + "value=" + value + '}';
    }
}

为了让 Disruptor 为我们预分配这些事件,我们需要一个 EventFactory 来执行构造。这可以是一个方法引用,比如 LongEvent::new ,或者是 EventFactory 接口的显式实现:

public class LongEventFactory implements EventFactory<LongEvent> {
    @Override
    public LongEvent newInstance() {
        return new LongEvent();
    }
}

2.3 定义消费者

定义了事件,我们需要创建一个消费者来处理这些事件。我们会创建一个事件处理器(EventHandler),它会将把值打印到控制台上。

public class LongEventHandler implements EventHandler<LongEvent> {
    @Override
    public void onEvent(LongEvent longEvent, long sequence, boolean endOfBatch) throws Exception {
          System.out.println("currentThread:" + Thread.currentThread().getName() + " Event: " + longEvent);
    }
}

2.4 发布

public class LongEventMain {
    public static void main(String[] args) throws Exception {
        int bufferSize = 2;
        Disruptor<LongEvent> disruptor =
                new Disruptor<>(
                        new LongEventFactory(),
                        bufferSize,
                        DaemonThreadFactory.INSTANCE,
                        ProducerType.SINGLE,
                        new BlockingWaitStrategy());
        disruptor.handleEventsWith(new LongEventHandler());
        disruptor.start();

        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++) {
            bb.putLong(0, l);
            ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
            Thread.sleep(1000);
        }
    }
}

整个发布流程分为四个部分:

  1. 指定环形缓冲区的大小,必须是 2 的幂次方,例子中设置的值是 1024 ;

  2. 构建 Disruptor ,参数分别是事件工厂EventFactory 环形缓冲区的大小ringBufferSize 处理器线程池 、生产者类型 (单生产者 / 多生产者)、消费者阻塞策略 ;

  3. 定义事件处理器eventHandler,我们这里的逻辑是打印数据打印在控制台;

  4. 启动 Disruptor,从 Disruptor 中获取环形缓冲区ringBuffer,在 for 循环里 ,调用环形队列的 publishEvent 方法。

    这里使用了 ByteBuffer 做为数据的存储容器,方便作为参数传递。

我们来看下执行结果 :

3 日志处理

3.1 应用场景

上面的例子比较简单,但假如要应用到生产环境,就显得非常粗糙。

我们模拟一个日志处理的场景,用户进入视频播放页面,浏览器定时的发送浏览日志到服务端,服务端将日志存储起来。

3.2 核心类设计

我们定义一个 DisruptorManager 管理器 , 管理器包含三个核心参数:消费者监听器 DataEventListener消费者数量环形队列长度

public class DisruptorManager<T> {

    private static final Integer DEFAULT_CONSUMER_SIZE = 4;

    public static final Integer DEFAULT_SIZE = 4096 << 1 << 1;

    private DataEventListener<T> dataEventListener;

    private DisruptorProducer<T> producer;

    private int ringBufferSize;

    private int consumerSize;

    public DisruptorManager(DataEventListener<T> dataEventListener) {
        this(dataEventListener, DEFAULT_CONSUMER_SIZE, DEFAULT_SIZE);
    }

    public DisruptorManager(DataEventListener<T> dataEventListener, final int consumerSize, final int ringBufferSize) {
        this.dataEventListener = dataEventListener;
        this.ringBufferSize = ringBufferSize;
        this.consumerSize = consumerSize;
    }

    public void start() {
        EventFactory<DataEvent<T>> eventFactory = new DisruptorEventFactory<>();
        Disruptor<DataEvent<T>> disruptor = new Disruptor<>(
                eventFactory,
                ringBufferSize,
                DisruptorThreadFactory.create("consumer-thread", false),
                ProducerType.MULTI,
                new BlockingWaitStrategy()
        );
        DisruptorConsumer<T>[] consumers = new DisruptorConsumer[consumerSize];
        for (int i = 0; i < consumerSize; i++) {
            consumers[i] = new DisruptorConsumer<>(dataEventListener);
        }
        disruptor.handleEventsWithWorkerPool(consumers);
        disruptor.start();
        RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();
        this.producer = new DisruptorProducer<>(ringBuffer, disruptor);
    }

    public DisruptorProducer getProducer() {
        return this.producer;
    }

}

首先和 Hello world 代码中的不同的点,Disruptor 的构造函数中我们自定义了消费者的处理器线程。

DisruptorThreadFactory.create("consumer-thread", false),

然后我们定义消费者的业务逻辑 :

DisruptorConsumer<T>[] consumers = new DisruptorConsumer[consumerSize];
for (int i = 0; i < consumerSize; i++) {
    consumers[i] = new DisruptorConsumer<>(dataEventListener);
}
disruptor.handleEventsWithWorkerPool(consumers);

消费者本质上是 workHandler 的实现类,只不过初始化时将 DataEventListener 作为构造函数的参数。

public class DisruptorConsumer<T> implements WorkHandler<DataEvent<T>> {

    private DataEventListener<T> dataEventListener;

    public DisruptorConsumer(DataEventListener dataEventListener) {
        this.dataEventListener = dataEventListener;
    }

    @Override
    public void onEvent(DataEvent<T> dataEvent) throws Exception {
        if (dataEvent != null) {
            dataEventListener.processDataEvent(dataEvent);
        }
    }

}

因为我们是希望线程池并行的处理这些消息数据,使用的是 disruptor.handleEventsWithWorkerPool 可以保证每个事件只会由一个工作处理器处理

在 springboot 项目中,我们需要初始化相关 bean。

@Configuration
@AutoConfigureBefore(RedisConfig.class)
public class DisruptorConfig {

    private final static Logger logger = LoggerFactory.getLogger(DisruptorConfig.class);

    private final static String LIST_KEY = "disruptor:list";

    @Autowired
    private RedissonClient redissonClient;

    @Bean
    public DataEventListener<String> createConsumerListener() {
        DataEventListener<String> dataEventListener = new DataEventListener<String>() {
            @Override
            public void processDataEvent(DataEvent<String> dataEvent) throws InterruptedException {
                logger.info("processDateEvent data:" + dataEvent.getData());
                redissonClient.getList(LIST_KEY).add(dataEvent.getData());
            }
        };
        return dataEventListener;
    }

    @Bean
    public DisruptorProducer<String> createProducer(DataEventListener dataEventListener) {
        DisruptorManager disruptorManage = new DisruptorManager(dataEventListener,
                8,
                1024 * 1024);
        disruptorManage.start();
        return disruptorManage.getProducer();
    }

首先,我们定义好消费者的事件监听器,然后定义 DisruptorProducer, 该类用来将数据提交到环形队列。

public class DisruptorProducer<T> {

    private final Logger logger = LoggerFactory.getLogger(DisruptorProducer.class);

    private final RingBuffer<DataEvent<T>> ringBuffer;

    private final Disruptor<DataEvent<T>> disruptor;

    private final EventTranslatorOneArg<DataEvent<T>, T> translatorOneArg = (event, sequence, t) -> event.setData(t);

    public DisruptorProducer(final RingBuffer<DataEvent<T>> ringBuffer, final Disruptor<DataEvent<T>> disruptor) {
        this.ringBuffer = ringBuffer;
        this.disruptor = disruptor;
    }

    /**
     * Send a data.
     *
     * @param data the data
     */
    public void onData(final T data) {
        try {
            ringBuffer.publishEvent(translatorOneArg, data);
        } catch (Exception ex) {
            logger.error("publish event error:", ex);
        }
    }

    public void shutdown() {
        if (null != disruptor) {
            disruptor.shutdown();
        }
    }

}

最后,在控制器中,接收前端请求:

@Autowired
private DisruptorProducer<String> producer;

@GetMapping("/pushlog")
public ResponseEntity pushlog(String log) {
    producer.onData(log);
    return ResponseEntity.successResult(null);
}

从下图中,我们可以看到从控制器接收到请求后,消费处理器线程不断地将数据打印出来,并且发送到队列中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/899475.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

详解Oracle审计(一)

题记&#xff1a; 有段时间没写过oracle了&#xff0c;今天回归。 本文将详细介绍oracle的审计功能&#xff0c;基于11g版本&#xff0c;但对12c&#xff0c;19c也同样适用。 审计&#xff08;Audit&#xff09;用于监视用户所执行的数据库操作&#xff0c;并且 Oracle 会将审…

hadoop的yarn

1.分布式的资源调度-yarn(hadoop的一个组件) 资源服务器硬件资源&#xff0c;如&#xff1a;CPU,内存,硬盘,网络等 资源调度:管控服务器硬件资源&#xff0c;提供更好的利用率 分布式资源调度:管控整个分布式服务器集群的全部资源&#xff0c;整合进行统一调度 总结就是使用yar…

chatGpt4.0Plus,Claude3最新保姆级教程开通升级

如何使用 WildCard 服务注册 Claude3 随着 Claude3 的震撼发布&#xff0c;最强 AI 模型的桂冠已不再由 GPT-4 独揽。Claude3 推出了三个备受瞩目的模型&#xff1a;Claude 3 Haiku、Claude 3 Sonnet 以及 Claude 3 Opus&#xff0c;每个模型都展现了卓越的性能与特色。其中&a…

Blazor WebAssembly 项目部署时遇到 500.19错误

这个错误其实很普遍&#xff0c;在部署 asp.net core 的时候都能解决 无非是安装 这些, 尤其是下面那个 Hosting Bundle 但是遇到 Blazor WebAssembly 项目部署时还得多装一个 “重写模块” 下载地址&#xff0c;安装后重启网址 https://www.iis.net/downloads/microsoft/u…

【从零开始的LeetCode-算法】3185. 构成整天的下标对数目 II

给你一个整数数组 hours&#xff0c;表示以 小时 为单位的时间&#xff0c;返回一个整数&#xff0c;表示满足 i < j 且 hours[i] hours[j] 构成 整天 的下标对 i, j 的数目。 整天 定义为时间持续时间是 24 小时的 整数倍 。 例如&#xff0c;1 天是 24 小时&#xff0c…

使用Vue指令实现面板拉伸功能

目的&#xff1a;现在有一个PDF预览页面&#xff0c;左侧为PDF预览区域&#xff0c;右侧为可以进行AI功能的面板。现在想让AI面板通过拖动左边框实现面板拉伸。 实现效果如下面的视频&#xff1a; 关键点&#xff1a; 预览区是使用iframe渲染的&#xff0c;在拖动的过程中&…

软件测试学习笔记丨Selenium键盘鼠标事件ActionChains

本文转自测试人社区&#xff0c;原文链接&#xff1a;https://ceshiren.com/t/topic/22515 本文为霍格沃兹测试开发学社的学习经历分享&#xff0c;写出来分享给大家&#xff0c;希望有志同道合的小伙伴可以一起交流技术&#xff0c;一起进步~ 说明&#xff1a;本篇博客基于sel…

Detecting Holes in Point Set Surfaces 论文阅读

下载链接 Detecting Holes in Point Set Surfaces 摘要 3D 数据采集过程&#xff08;例如激光范围扫描&#xff09;产生的重要物体模型通常包含由于遮挡、反射或透明度而产生的孔洞。本文的目标就是在点集表面上检测存在的孔洞。对于每个点&#xff0c;将多个标准组合成一个综…

C# shader 生成程序纹理

1、程序纹理是什么 程序纹理&#xff08;Procedural Textures&#xff09;就是通过程序代码生成的纹理 2、程序纹理如何生成 一般生成程序纹理由两种方式&#xff1a; 通过C#脚本生成纹理后传递给Shader直接在Shader代码中自定义逻辑生成纹理 3、程序纹理的好处 程序纹理…

2.1 > Shell 是什么、如何更熟练的使用 Bash Shell

Shell 基础知识 Shell是计算机操作系统中的一个命令行解释器&#xff0c;由C语言编写&#xff0c;用于用户与操作系统之间进行交互。用户可以通过Shell输入命令&#xff0c;操作系统接收到这些命令后执行相应的操作。Shell一般还提供了编程语言的基本功能&#xff0c;允许用户…

梯度累积的隐藏陷阱:Transformer库中梯度累积机制的缺陷与修正

在本地环境下对大规模语言模型&#xff08;LLMs&#xff09;进行微调时&#xff0c;由于GPU显存限制&#xff0c;采用大批量训练通常难以实现。为解决此问题&#xff0c;一般普遍会采用梯度累积技术来模拟较大的批量规模。该方法不同于传统的每批次更新模型权重的方式&#xff…

MacOS RocketMQ安装

MacOS RocketMQ安装 文章目录 MacOS RocketMQ安装一、下载二、安装修改JVM参数启动关闭测试关闭测试测试收发消息运行自带的生产者测试类运行自带的消费者测试类参考博客&#xff1a;https://blog.csdn.net/zhiyikeji/article/details/140911649 一、下载 打开官网&#xff0c;…

A-【项目开发知识管理】Android AIDL跨进程通信

Android AIDL跨进程通信 文章目录 Android AIDL跨进程通信0.我为啥要写这篇文章1.AIDL是干啥的&#xff1f;1.1简述1.2官方话 2.在AndroidStudio中怎么干&#xff1f;2.1准备工作2.2在项目A中创建AIDL文件夹2.3在项目A中创建一个aidl文件2.4将项目A进行一次Rebuild操作2.5在项目…

visual studio设置修改文件字符集方法

该方法来自网文&#xff0c;特此记录备忘。 添加两个组件&#xff0c;分别是Force UTF-8,FileEncoding。 截图如下&#xff1a; 方法如下&#xff1a;vs中点击“扩展”->“管理扩展”&#xff0c;输入utf搜索&#xff0c;安装如下两个插件&#xff0c;然后重启vs&#xf…

【设计模式系列】观察者模式

一、什么是观察者模式 观察者模式&#xff08;Observer Pattern&#xff09;是一种行为设计模式&#xff0c;它定义了对象之间的一对多依赖关系&#xff0c;当一个对象的状态发生变化时&#xff0c;所有依赖于它的对象都会得到通知并自动更新。这种模式也被称为发布-订阅模式&…

matplotlib库

1.概念 Matplotlib 库&#xff1a;是一款用于数据可视化的 Python 软件包&#xff0c;支持跨平台运行&#xff0c;它能够根据 NumPy ndarray 数组来绘制 2D 图像&#xff0c;它使用简单、代码清晰易懂 Figure&#xff1a;指整个图形&#xff0c;您可以把它理解成一张画布&…

【含开题报告+文档+PPT+源码】基于vue框架的东升餐饮点餐管理平台的设计与实现

开题报告 在当前信息化社会背景下&#xff0c;餐饮行业正经历着由传统线下服务模式向线上线下深度融合的转变。随着移动互联网技术及大数据应用的飞速发展&#xff0c;用户对于餐饮服务平台的需求也日益多元化和个性化。他们期望能在一个集便捷、高效、个性化于一体的平台上完…

快速搭建SpringBoot3+Prometheus+Grafana

快速搭建SpringBoot3PrometheusGrafana 一、搭建SpringBoot项目 1.1 创建SpringBoot项目 1.2 修改pom文件配置 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://…

如何使用 Spring Cloud 实现客户端负载平衡

微服务系统通常运行每个服务的多个实例。这是实施弹性所必需的。因此&#xff0c;在这些实例之间分配负载非常重要。执行此操作的组件是负载均衡器。Spring 提供了一个 Spring Cloud Load Balancer 库。在本文中&#xff0c;您将学习如何使用它在 Spring Boot 项目中实现客户端…

SolarWinds Web Help Desk曝出严重漏洞,已遭攻击者利用

近日&#xff0c;CISA 在其 “已知漏洞”&#xff08;KEV&#xff09;目录中增加了三个漏洞&#xff0c;其中一个是 SolarWinds Web Help Desk (WHD) 中的关键硬编码凭据漏洞&#xff0c;供应商已于 2024 年 8 月底修复了该漏洞。 SolarWinds Web Help Desk 是一款 IT 服务台套…