Disruptor 有哪些典型的使用场景?

大家好,我是君哥。

Disruptor 是一款高性能的内存有界队列,它通过内存预分配、无锁并发、解决伪共享问题、使用 RingBuffer 取代阻塞队列等措施来大幅提升队列性能。

但开发者们往往对它的使用场景不太了解,到底应该在哪些场景使用呢?今天咱们就来聊一聊 Disruptor 的使用场景。

Disruptor 是一个生产-消费模式的队列,这里我们使用官网的示例,生产者发送一个 long 类型的变量,消费者收到消息后把变量打印出来。首先定义消息体:

public class LongEvent {
    private long value;
    public void set(long value)
    {
        this.value = value;
    }

    @Override
    public String toString()
    {
        return "LongEvent{" + "value=" + value + '}';
    }
}

为了让 Disruptor 给消息预先分配内存,定义一个 EventFactory,代码如下:

public class LongEventFactory implements EventFactory<LongEvent>
{
    @Override
    public LongEvent newInstance()
    {
        return new LongEvent();
    }
}

下面定义个消费者 LongEventHandler:

public class LongEventHandler implements EventHandler<LongEvent>
{
    private String consumer;

    public LongEventHandler(String consumer) {
        this.consumer = consumer;
    }

    @Override
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
        System.out.println("consumer: " + consumer + ",Event: " + event);
    }
}

1.广播场景

广播场景在我们的开发工作中并不少见,比如系统收到上游系统的一个请求消息,然后把这个消息发送给多个下游系统来处理。Disruptor 支持广播模式。比如消费者生产的消息由三个消费者来消费:

图片

public class Broadcast {
    public static void main(String[] args) throws InterruptedException {
        int bufferSize = 1024;

        Disruptor<LongEvent> disruptor =
                new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");
        EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");
        EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");

        disruptor.handleEventsWith(consumer1, consumer2, consumer3);
        disruptor.start();

        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
            Thread.sleep(1000);
        }
    }
}

2.日志收集

再来看一个日志收集的例子。这里我们假设一个场景,业务系统集群有 3 个节点,每个节点打印的业务日志发送到 Disruptor,Disruptor 下游有 3 个消费者负责日志收集。

图片

这里我们需要重新定义一个日志收集处理类,代码如下:

public class LogCollectHandler implements WorkHandler<LongEvent> {
    public LogCollectHandler(String consumer) {
        this.consumer = consumer;
    }

    private String consumer;


    @Override
    public void onEvent(LongEvent event)
    {
        System.out.println("consumer: " + consumer + ",Event: " + event);
    }
}

下面这个代码是绑定消费者的代码:

public static void main(String[] args) throws InterruptedException {
 int bufferSize = 1024;

 Disruptor<LongEvent> disruptor =
   new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

 WorkHandler<LongEvent> consumer1 = new LogCollectHandler("consumer1");
 WorkHandler<LongEvent> consumer2 = new LogCollectHandler("consumer2");
 WorkHandler<LongEvent> consumer3 = new LogCollectHandler("consumer3");

 disruptor.handleEventsWithWorkerPool(consumer1, consumer2, consumer3);
 disruptor.start();
}

需要注意的是,上面使用的是 Disruptor 的 handleEventsWithWorkerPool 方法,使用的消费者不是 EventHandler,而是 WorkHandler。消费者组里面的消费者如果是 WorkHandler,那消费者之间就是有竞争的,比如一个 Event 已经被 consumer1 消费过,那就不再会被其他消费者消费了。消费者组里面的消费者如果是 EventHandler,那消费者之间是没有竞争的,所有消息都会消费。

3.责任链

责任链这种设计模式我们都比较熟悉了,同一个对象的处理有多个不同的逻辑,每个逻辑作为一个节点组成责任链,比如收到一条告警消息,处理节点分为:给开发人员发送邮件、给运维人员发送短信、给业务人员发送 OA 消息。

图片

Disruptor 支持链式处理消息,看下面的示例代码:

public static void main(String[] args) throws InterruptedException {
 int bufferSize = 1024;

 Disruptor<LongEvent> disruptor =
   new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

 EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");
 EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");
 EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");

 disruptor.handleEventsWith(consumer1).then(consumer2).then(consumer3);
 disruptor.start();
}

Disruptor 也支持多个并行责任链,下图是 2 条责任链的场景:

图片

这里给出一个示例代码:

public static void main(String[] args) throws InterruptedException {
 int bufferSize = 1024;

 Disruptor<LongEvent> disruptor =
   new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

 EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");
 EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");
 EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");
 EventHandler<LongEvent> consumer4 = new LongEventHandler("consumer4");
 EventHandler<LongEvent> consumer5 = new LongEventHandler("consumer5");
 EventHandler<LongEvent> consumer6 = new LongEventHandler("consumer6");

 disruptor.handleEventsWith(consumer1).then(consumer2).then(consumer3);
 disruptor.handleEventsWith(consumer4).then(consumer5).then(consumer6);
 disruptor.start();
}

4.多任务协作

一个经典的例子,我们在泡咖啡之前,需要烧水、洗被子、磨咖啡粉,这三个步骤可以并行,但是需要等着三步都完成之后,才可以泡咖啡。

图片

当然,这个例子可以用 Java 中的 CompletableFuture 来实现,代码如下:

public static void main(String[] args){
    ExecutorService executor = ...;
    CompletableFuture future1 = CompletableFuture.runAsync(() -> {
        try {
            washCup();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, executor);

    CompletableFuture future2 = CompletableFuture.runAsync(() -> {
        try {
            hotWater();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, executor);

    CompletableFuture future3 = CompletableFuture.runAsync(() -> {
        try {
            grindCoffee();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, executor);

    CompletableFuture.allOf(future1, future2, future3).thenAccept(
            r -> {
                System.out.println("泡咖啡");
            }
    );
    System.out.println("我是主线程");
}

同样,使用 Disruptor 也可以实现这个场景,看下面代码:

public static void main(String[] args) throws InterruptedException {
 int bufferSize = 1024;

 Disruptor<LongEvent> disruptor =
   new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

 EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");
 EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");
 EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");
 EventHandler<LongEvent> consumer4 = new LongEventHandler("consumer4");

 disruptor.handleEventsWith(consumer1, consumer2, consumer3).then(consumer4);
 disruptor.start();
}

5.多消费者组

类比主流消息队列的场景,Disruptor 也可以实现多消费者组的场景,组间并行消费互不影响,组内消费者竞争消息,如下图:

图片

示例代码如下:

public static void main(String[] args) throws InterruptedException {
 int bufferSize = 1024;

 Disruptor<LongEvent> disruptor =
   new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

 WorkHandler<LongEvent> consumer1 = new LogWorkHandler("consumer1");
 WorkHandler<LongEvent> consumer2 = new LogWorkHandler("consumer2");
 WorkHandler<LongEvent> consumer3 = new LogWorkHandler("consumer3");
 WorkHandler<LongEvent> consumer4 = new LogWorkHandler("consumer4");
 WorkHandler<LongEvent> consumer5 = new LogWorkHandler("consumer5");
 WorkHandler<LongEvent> consumer6 = new LogWorkHandler("consumer6");

 disruptor.handleEventsWithWorkerPool(consumer1, consumer2, consumer3);
 disruptor.handleEventsWithWorkerPool(consumer4, consumer5, consumer6);
 disruptor.start();
}

6.总结

通过消费者的灵活组合,Disruptor 的使用场景非常丰富。本文介绍了 Disruptor 的 5 个典型使用场景。在选型的时候,除了使用场景,更多地要考虑到 Disruptor 作为高性能内存队列的这个特点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/946622.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

网络药理学:3、零基础复现一篇生信文章:筛选疾病靶点、GeneCards、OMIM、TTD使用教程

一、前言 药物和疾病英文名 大黄英文名&#xff1a;Dahuang食管癌英文名&#xff1a;esophageal cancer 网站地址 TCMSP网站地址&#xff1a;https://old.tcmsp-e.com/tcmsp.phpGeneCards网站首页&#xff1a;https://www.genecards.org/OMIM网站首页&#xff1a;https://w…

VK11\VK12保存增强

VK11\VK12保存增强 一、 VK11\VK12保存增强 事务码VK11、VK12创建和修改条件记录时&#xff0c;点击保存时修改其中的条件 二、增强步骤 通过查找&#xff0c;对应的BADI&#xff1a;SD_COND_SAVE_A 通过SE19创建BADI&#xff1a;ZSD_COND_SAVE_A修改函数CONDITION_SAVE_E…

学习vue3的笔记

一、vue和react的对比 1、基础介绍 vue&#xff1a;https://cn.vuejs.org/ vue3是2020年创建的 react&#xff1a;https://react.dev/ react是一个2013年开源的JavaScript库&#xff0c;严格意义上来说不是一个框架 2、diff算法 两个框架采用的都是同级对比策略 两节点对…

【kubernetes组件合集】深入解析Kubernetes组件之三:client-go

深入解析Kubernetes组件之三&#xff1a;client-go 目录 深入解析Kubernetes组件之三&#xff1a;client-go 引言 1. client-go简介 2. client-go的功能 2.1 资源操作 2.2 资源监听 2.3 认证和授权 2.4 错误处理和重试 2.5 扩展性和定制化 3. 使用client-go与Kubern…

springboot533图书管理系统(论文+源码)_kaic

摘 要 传统信息的管理大部分依赖于管理人员的手工登记与管理&#xff0c;然而&#xff0c;随着近些年信息技术的迅猛发展&#xff0c;让许多比较老套的信息管理模式进行了更新迭代&#xff0c;图书信息因为其管理内容繁杂&#xff0c;管理数量繁多导致手工进行处理不能满足广…

大数据数仓Hive和数据集市、数据治理

数仓&#xff08;Data Warehouse&#xff09; 为企业制定决策、提供数据支持的&#xff0c;可以帮助企业改进业务流程提高产品质量等。DW不是数据最终目的地&#xff0c;而是为数据最终目的地做好准备&#xff0c;这些准备包括对数据的备份&#xff0c;清洗&#xff0c;转义、…

WebRTC线程的启动与运行

WebRTC线程运行的基本逻辑&#xff1a; while(true) {…Get(&msg, …);…Dispatch(&msg);… }Dispatch(Message *pmsg) {…pmsg->handler->OnMessage(pmsg);… }在执行函数内部&#xff0c;就是一个while死循环&#xff0c;只做两件事&#xff0c;从队列里Get取…

一种pod容器动态挂卷方案

一、背景 1.1 个人调试kvm 我们这边基于云平台的k8skubevirt&#xff0c;给安卓手机领域的开发工程师们提供了独占式虚拟机资源。这些资源主要用于工程师的个人级开发与调试&#xff0c;因此有如下特点&#xff1a; 使用时间与工作时间强相关&#xff0c;即工程师工作时间使…

cesium 小知识:PostProcessStage 和 PostProcessStageLibrary详解对比

在Cesium中,PostProcessStage 和 PostProcessStageLibrary 是用于实现后期处理效果的关键API。它们允许开发者通过应用各种视觉效果来增强3D场景的渲染质量或实现特定的视觉需求。下面将详细介绍这两个API,并对比它们的功能和使用方法。 PostProcessStage 1. 概念 后期处理…

如何使用AI工具cursor(内置ChatGPT 4o+claude-3.5)

⚠️温馨提示&#xff1a; 禁止商业用途&#xff0c;请支持正版&#xff0c;充值使用&#xff0c;尊重知识产权&#xff01; 免责声明&#xff1a; 1、本教程仅用于学习和研究使用&#xff0c;不得用于商业或非法行为。 2、请遵守Cursor的服务条款以及相关法律法规。 3、本…

Flink operator实现自动扩缩容

官网文档位置&#xff1a; 1.Autoscaler | Apache Flink Kubernetes Operator 2.Configuration | Apache Flink Kubernetes Operator 1.部署K8S集群 可参照我之前的文章k8s集群搭建 2.Helm安装Flink-Operator helm repo add flink-operator-repo https://downloads.apach…

深入Android架构(从线程到AIDL)_03 IPC的IBinder接口

目录 4、 IPC的IBinder接口 -- 定义与实现 IBinder接口的定義 IBinder接口的實現類 Java层的Binder基类定义​编辑 Binder基类的主要函数 Java层的BinderProxy基类定义 4、 IPC的IBinder接口 -- 定义与实现 IBinder接口的定義 大家都知道&#xff0c;当两个类都在同一个…

vscode代码AI插件Continue 安装与使用

“Continue” 是一款强大的插件&#xff0c;它主要用于在开发过程中提供智能的代码延续功能。例如&#xff0c;当你在编写代码并且需要进行下一步操作或者完成一个代码块时&#xff0c;它能够根据代码的上下文、语法规则以及相关的库和框架知识&#xff0c;为你提供可能的代码续…

我的Java-Web进阶--SpringMVC

1.三层架构与MVC模式 三层架构 MVC模式 2.SpringMVC执行流程 3.SpringMVC的基本使用方法 1. 配置 1.1 Maven依赖 首先&#xff0c;在pom.xml文件中添加Spring MVC的依赖&#xff1a; <dependencies><!-- Spring MVC --><dependency><groupId>org.…

flux中的缓存

1. cache&#xff0c;onBackpressureBuffer。都是缓存。cache可以将hot流的数据缓存起来。onBackpressureBuffer也是缓存&#xff0c;但是当下游消费者的处理速度比上游生产者慢时&#xff0c;上游生产的数据会被暂时存储在缓冲区中&#xff0c;防止丢失。 2. Flux.range 默认…

在基于Centos7的服务器上启用【Gateway】的【Clion Nova】(即 ReSharper C++ 引擎)

1. 检查启动报错日志&#xff0c;目录在 ~/.cache/JetBrains/CLion202x.x.x/log/backend.202x-xx-xx_xxxx.xxxx-err.log 2. 大致可能有两种报错 a. Process terminated. Couldnt find a valid ICU package installed on the system. 这个报错只需要装一下 libicu-devel 包即可…

【spring】参数校验Validation

前言 在实际开发中&#xff0c;我们无法保证客户端传来的请求都是合法的。比如一些要求必传的参数没有传递&#xff0c;传来的参数长度不符合要求等&#xff0c;这种时候如果放任不管&#xff0c;继续执行后续业务逻辑&#xff0c;很有可能就会出现意想不到的bug。 有人可能会…

Android实现队列出入队测试

演示效果: 安卓队列测试 入队操作 空队&#xff0c;满队判断 队列实现代码: package com.example.generalqueue;import android.content.Context; import android.widget.Toast;import java.util.Arrays;public class ArrayQueue {private int capacity;//队列容量private in…

Python Pyglet实战(1)——迷宫游戏

大家好啊&#xff0c;今天就来跟大家分享一下Python Pyglet的实战样例吧。 一.导入所需模块 1.导入__future__模块 首先&#xff0c;我们需要导入__future__模块中的division变量&#xff0c;此变量在__future__.py中的定义如下&#xff1a; division _Feature((2, 2, 0, …

第十一章 图论

题目描述&#xff1a; 阿里这学期修了计算机组织和架构课程。他了解到指令之间可能存在依赖关系&#xff0c;比如WAR&#xff08;读后写&#xff09;、WAW、RAW。 如果两个指令之间的距离小于安全距离&#xff0c;则会导致危险&#xff0c;从而可能导致错误的结果。因此&#…