Spring Boot3.x集成Disruptor4.0

Disruptor介绍

Disruptor是一个高性能内存队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。

Disruptor 是一个 Java 的并发编程框架,大大的简化了并发程序开发的难度,在性能上也比 Java 本身提供的一些并发包要好。它源于LMAX对并发性 、性能和非阻塞算法的研究,如今构成了其Exchange基础架构的核心部分。

Disruptor的队列功能和传统的MQ队列服务不同(比如:kafka\rabbitMq等等),Disruptor而是一个基于JDK的高性能内存队列,例如与Java的BlockingQueue进行对比。与队列一样,Disruptor的目的是在同一进程内的线程之间传递数据(例如消息或事件)。

目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。在美团技术团队它也有不少应用,有的项目架构借鉴了它的设计机制。

Disruptor功能

  • 高性能消息传递:Disruptor 能够通过避免锁和减少线程间的数据交换来提高性能。
  • 支持多生产者和多消费者:可以由多个生产者向队列中添加事件,同时多个消费者处理这些事件。
  • 事件处理模型:Disruptor 使用预分配事件的环形数组结构,每个事件槽可以被重复使用,减少了对象创建的开销。
  • 内存屏障优化:利用内存屏障来减少不必要的CPU缓存刷新,提高效率。

Disruptor优点

  • 极高的吞吐量和低延迟:通过减少锁的使用和优化内存操作,Disruptor 能够实现极高的数据处理速率和低延迟。
  • 避免了线程阻塞:使用无锁的设计,避免了传统队列中的线程阻塞问题。
  • 资源利用率高:通过重复使用事件对象,减少了垃圾回收的压力。

Disruptor缺点

  • 复杂性:Disruptor 的使用和理解比标准的队列或者其他并发模型要复杂,需要更多的学习和调试。
  • 适用场景有限:主要适用于需要极高性能和低延迟的系统,对于一般的应用场景可能是过度设计。
  • 调试困难:由于其无锁的设计和复杂的内部结构,当出现问题时,调试可能比较困难。
  • 总的来说,Disruptor 是一个专为高性能计算设计的工具,适用于那些对性能有极端要求的场景。对于普通应用或者数据量不大的情况,使用传统的并发模型可能更为合适。

Disruptor特征

Disruptor的目标之一是在低延迟环境中使用,在低延迟系统中,必须减少或移除内存分配;

在基于Java的系统中,目的是减少由于垃圾收集导致的系统停顿;为了支持这一点,用户可以预先分配Disruptor中事件所需的存储空间(也就是声明RingBuffer的大小)。

在构造RingBuffer期间,EventFactory由用户提供,并将在Disruptor的Ring Buffer中每个事件元素创建时候被调用。将新数据发布到Disruptor时,API将允许用户获取构造的对象,以便他们可以调用方法或更新该存储对象上的字段,Disruptor保证这些操作只要正确实现就是并发安全的。

官方文档

github开源地址

GitHub - LMAX-Exchange/disruptor: High Performance Inter-Thread Messaging Library

github介绍文档

LMAX Disruptor

开发示例

我们以一个项目来演示,开发一个订单业务消息处理服务,来模拟采用Disruptor队列来对订单进行管理;

采用一个生产者,多个消费者模式,并且多个消费者按不同的顺序进行链路排例,对生产者消息进行消费;

如:

某电商平台存在以下服务功能

  • 订单管理服务:生成订单后,过行订单管理与跟踪
  • 用户等级服务:用户购买商品后,重新评估用户星级等级,提供对标服务
  • 电商客服服务:负责售前售后服务,有3组,分别为A组、B组、C组,每个订单只分配到其中一组提供对接客服服务
  • 仓储管理服务:管理平台所有商品仓库,并提供已购买商品
  • 物流投递服务:负责从仓储中获取商品,投送到用户手中

电商平台每完成一笔支付订单,将消息发送到此Disruptor示例服务。

  • 首先分别经过订单管理服务和用户等级服务,注意:两个服务均为独立消费消息
  • 完成前置两个服务消费后,才能执行电商客服A组、电商客服B组、电商客服C组其中任意一个服务独立消费消息,注意:三选一,不能全部执行
  • 完成前置电商客服服务消费后,执行仓储管理服务消费消息
  • 完成前置仓储管理服务消费后,最后执行物流投递服务

消费链路流程如下:

工程环境

  • JDK:17
  • SpringBoot:3.1.3-SNAPSHOT

注:假设你已创建基础工程,并完成SpringBoot组件引入

引入Disruptor依赖

<dependencies>
    <dependency>
        <groupId>com.lmax</groupId>
        <artifactId>disruptor</artifactId>
        <version>4.0.0</version>
    </dependency>
</dependencies>

项目Disruptor配置

package com.example.disruptor;

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RequestPredicate;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

/**
 * @Description 服务配置类
 */
@Slf4j
@Configuration(proxyBeanMethods = false)
public class WebConfig {

    private static final RequestPredicate JSON_ACCEPT = accept(new MediaType(MediaType.APPLICATION_JSON, StandardCharsets.UTF_8));
    //缓冲区大小,必需是2的N次方
    final static int BUFFER_SIZE = 1024 * 1024;

    /**
     * 创建基于环的可重用队列存储,实现消息数据存储与推送到处理器执行
     * @return
     */
    @Bean("orderRingBuffer")
    public RingBuffer<OrderEvent> createRingBuffer(){
        // 创建消息处理器,注:正常业务模式下,由不同的业务类实现;此处为了简化演示,从createEventHandler()方法中获取模拟实现类;
        EventHandler<OrderEvent> orderHandler = createEventHandler("消息序例:{}, 发送到《订单管理服务》, 订单详情:{}");
        EventHandler<OrderEvent> userLevelHandler = createEventHandler("消息序例:{}, 发送到《用户等级服务》, 订单详情:{}");
        EventHandler<OrderEvent> customerService0Handler = createEventHandler(0, 3, "消息序例:{}, 发送到《电商客服A组》, 订单详情:{}");
        EventHandler<OrderEvent> customerService1Handler = createEventHandler(1, 3, "消息序例:{}, 发送到《电商客服B组》, 订单详情:{}");
        EventHandler<OrderEvent> customerService2Handler = createEventHandler(2, 3, "消息序例:{}, 发送到《电商客服C组》, 订单详情:{}");
        EventHandler<OrderEvent> storageHandler = createEventHandler("消息序例:{}, 发送到《仓储管理服务》, 订单详情:{}");
        EventHandler<OrderEvent> deliveryHandler = createEventHandler("消息序例:{}, 发送到《物流投递服务》, 订单详情:{}");

        //创建环形缓冲区处理器事件生成器
        Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(OrderEvent::new,
                //缓冲区大小
                BUFFER_SIZE,
                //默认线程工厂
                Executors.defaultThreadFactory(),
                //ProducerType.SINGLE(表示生产者只有一个)和ProducerType.MULTY(表示有多个生产者)
                ProducerType.SINGLE,
                /**
                 可用事件策略:
                 BlockingWaitStrategy:用了ReentrantLock的等待&&唤醒机制实现等待逻辑,是默认策略,比较节省CPU
                 BusySpinWaitStrategy:持续自旋,JDK9之下慎用(最好别用)
                 DummyWaitStrategy:返回的Sequence值为0,正常环境是用不上的
                 LiteBlockingWaitStrategy:基于BlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作,但是作者说测试不充分,不建议使用
                 TimeoutBlockingWaitStrategy:带超时的等待,超时后会执行业务指定的处理逻辑
                 LiteTimeoutBlockingWaitStrategy:基于TimeoutBlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作
                 SleepingWaitStrategy:三段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU,第三阶段睡眠执行时间,反复的的睡眠
                 YieldingWaitStrategy:二段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU
                 PhasedBackoffWaitStrategy:四段式,第一阶段自旋指定次数,第二阶段自旋指定时间,第三阶段执行Thread.yield交出CPU,第四阶段调用成员变量的waitFor方法,这个成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy这三个中的一个
                 注意:
                 BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现
                 SleepingWaitStrategy 的性能表现跟BlockingWaitStrategy差不多,对CPU的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景
                 YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于CPU逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性
                 */
                new YieldingWaitStrategy());

        //消息消费配置处理器执行链路:orderHandler, userLevelHandler》customerService[0~2]Handler》storageHandler》deliveryHandler
        //分别独立执行:orderHandler(订单管理服务), userLevelHandler(用户等级服务)
        disruptor.handleEventsWith(orderHandler, userLevelHandler)
                //前置消费后,再执行其中任意一个处理器:customerService[0~2]Handler(电商客服A组\电商客服B组\电商客服C组,三选一)
                .then(customerService0Handler, customerService1Handler, customerService2Handler)
                //前置消费后,再执行处理器:storageHandler(仓储管理服务)
                .then(storageHandler)
                //前置消费后,再执行处理器:deliveryHandler(物流投递服务)
                .then(deliveryHandler);

        //启动disruptor服务
        disruptor.start();
        return disruptor.getRingBuffer();
    }

    /**
     * 创建消息处理器
     * @param msg
     * @return
     */
    private EventHandler<OrderEvent> createEventHandler(final String msg){
        return (event, sequence, endOfBatch)->{
            log.info(msg, sequence,event);
            //业务逻辑代码...
        };
    }

    /**
     * 创建消息处理器,支持相同处理器多选一(取模计算)
     * @param index
     * @param handlerCount
     * @param msg
     * @return
     */
    private EventHandler<OrderEvent> createEventHandler(final int index, final int handlerCount, final String msg){
        return (event, sequence, endOfBatch)->{
            if (sequence % handlerCount == index) {
                log.info(msg, sequence, event);
                //业务逻辑代码...
            }
        };
    }

    /**
     * 后台服务请求router
     * @param orderRingBuffer
     * @return
     */
    @Bean
    public RouterFunction<ServerResponse> webRouterFunction(RingBuffer<OrderEvent> orderRingBuffer) {
        return route()
                .POST("/order/push", JSON_ACCEPT, (request)->{
                    String orderId = request.queryParam("orderId").orElse("");
                    String name = request.queryParam("name").orElse("");
                    String price = request.queryParam("price").orElse("");
                    orderRingBuffer.publishEvent((orderEvent, sequence) -> {
                        orderEvent.setOrderId(orderId);
                        orderEvent.setName(name);
                        orderEvent.setPrice(Double.parseDouble(price));
                    });
                    return ServerResponse.status(HttpStatus.OK).bodyValue("ok,200");
                })
                .build();
    }
}

订单对象

package com.example.disruptor;

import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
public class OrderEvent {
    private String orderId;
    private String name;
    private Double price;
}

启动类

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DisruptorSamplesApplication {
    public static void main(String[] args) {
        SpringApplication.run(DisruptorSamplesApplication.class, args);
    }
}

YML配置文件

# 本地服务访问
server:
  # 服务端口
  port: 8080
  # 服务IP
  address: 0.0.0.0
# 配置日志
logging:
  level:
    org.springframework: info
# 开启debug模式
debug: false

工程测试

Postman发送POST请求,将模拟表单数据提交到服务端;

服务打印日志

2024-04-30T18:08:05.227+08:00  INFO 39828 --- [pool-4-thread-1] com.example.disruptor.WebConfig: 消息序例:0, 发送到《订单管理服务》, 订单详情:OrderEvent(orderId=VL-20240495001, name=苹果1斤, price=7.65)
2024-04-30T18:08:05.227+08:00  INFO 39828 --- [pool-4-thread-2] com.example.disruptor.WebConfig: 消息序例:0, 发送到《用户等级服务》, 订单详情:OrderEvent(orderId=VL-20240495001, name=苹果1斤, price=7.65)
2024-04-30T18:08:05.230+08:00  INFO 39828 --- [pool-4-thread-3] com.example.disruptor.WebConfig: 消息序例:0, 发送到《电商客服A组》, 订单详情:OrderEvent(orderId=VL-20240495001, name=苹果1斤, price=7.65)
2024-04-30T18:08:05.230+08:00  INFO 39828 --- [pool-4-thread-6] com.example.disruptor.WebConfig: 消息序例:0, 发送到《仓储管理服务》, 订单详情:OrderEvent(orderId=VL-20240495001, name=苹果1斤, price=7.65)
2024-04-30T18:08:05.230+08:00  INFO 39828 --- [pool-4-thread-7] com.example.disruptor.WebConfig: 消息序例:0, 发送到《物流投递服务》, 订单详情:OrderEvent(orderId=VL-20240495001, name=苹果1斤, price=7.65)

通过日志清楚的展示了,消费者消息处理器按程序执行配置的链路顺序正确打印;

结束

以上介绍了disruptor的基本信息与特点,并通过代码工程演示了dirsruptor在项目中如何开发,以及使用场景等;本文内容介绍有限,实际应用过程中disruptor还有很多其它用法,并未通过本文全整的展述,比如:如何使用多个消费者在线程池下完成消费,以及多生产者模式;可以通过官方文档与源码了解更多dirsruptor用法与功能;本文如有不足之处,欢迎指正与交流;

参考:

高性能队列——Disruptor-CSDN博客

LMAX Disruptor

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/601490.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

css--控制滚动条的显示位置

各种学习后的知识点整理归纳&#xff0c;非原创&#xff01; ① direction属性 滚动条在左侧显示② transform:scaleY() 滚动条在上侧显示 正常的滚动条会在内容超出规定的范围后在区域右侧和下侧显示在有些不正常的需求下会希望滚动条在上侧和左侧显示自己没有想到好的解决方案…

Vue3:menu导航栏出现多个同一跳转路径的菜单处理

文章目录 需求整理实现思路实现过程 需求整理&#xff0c;实现思路 最近公司想将之前老的项目整理出来&#xff0c;因为这个老项目内容太杂什么页面都往里面塞&#xff0c;导致菜单特别多&#xff0c;公司就像将这个老的项目迁出来&#xff0c;这个旧的项目本来是后端PHP写的。…

面试分享——订单超30分钟未支付自动取消用什么实现?如何使用Redis实现延迟队列?

目录 1.订单超时未支付自动取消&#xff0c;这个你用什么方案实现&#xff1f; 2.如何使用Redis实现延迟队列 2.1实验步骤 2.2实现生产可用的延迟队列还需关注什么 3.总结 电商场景中的问题向来很受面试官的青睐&#xff0c;因为业务场景大家都相对更熟悉&#xff0c;相关…

禹晶、肖创柏、廖庆敏《数字图像处理(面向新工科的电工电子信息基础课程系列教材)》Chapter 5插图

禹晶、肖创柏、廖庆敏《数字图像处理&#xff08;面向新工科的电工电子信息基础课程系列教材&#xff09;》 Chapter 5插图

【Verilog】big_small_cnt

通用大小计数器 timescale 1ns / 1ps // // Company: // Engineer: wengf // Create Date: // Design Name: // Module Name: big_small_cnt // Project Name: // Target Devices: // Tool Versions: // Description: // Dependencies: // Revision: // Revision 0…

超声波测距传感器--第七天

1.超声波测距 型号:HC-SR04 接线参考:模块除了两个电源引脚外,还有TRIG,ECHO引脚,这两个引脚分别接我们开发板的P1.5和P1.6端 超声波模块是用来测量距离的一种产品,通过发送超声波,利用时间差和声音传播速度,计算模块到前方障碍物的距离。 2. 如何让它发送波: Tri…

Linux入门攻坚——22、通信安全基础知识及openssl、CA证书

Linux系统常用的加解密工具&#xff1a;OpenSSL&#xff0c;gpg&#xff08;是pgp的实现&#xff09; 加密算法和协议&#xff1a; 对称加密&#xff1a;加解密使用同一个秘钥&#xff1b; DES&#xff1a;Data Encryption Standard&#xff0c;数据加密标准&…

【postgreessql 】查询数据库表占用物理空间

查询单个表的磁盘使用量&#xff1a; SELECTrelname,pg_size_pretty ( pg_total_relation_size ( relid ) ) AS total_size FROMpg_catalog.pg_statio_user_tables; 查询所有表的总磁盘使用量&#xff1a; SELECTpg_size_pretty ( SUM ( pg_total_relation_size ( relid ) )…

简洁大气APP下载单页源码

源码介绍 简洁大气APP下载单页源码&#xff0c;源码由HTMLCSSJS组成&#xff0c;记事本打开源码文件可以进行内容文字之类的修改&#xff0c;双击html文件可以本地运行效果&#xff0c;也可以上传到服务器里面 效果截图 源码下载 简洁大气APP下载单页源码

[oeasy]python0016_在vim中直接运行python程序

回忆上次内容 上次 置换 esc 和 caps lock 任何操作 都可以在不移动 手腕的状态下完成了 每次都要 退出vim编辑器&#x1f634; 才能 在shell中 运行python程序有点麻烦 想要 不退出vim 直接在 vim应用 中运行 py程序可能吗&#xff1f;&#x1f914; 运行程序 以前都是 先退…

0507华为od二面

只记录自己没回答上的问题 1、ZGC的缺点&#xff1a; 1)只是适用于32位系统 2)最大只是支持4TB内存容量 3)最糟糕的情况下吞吐量会下降15%&#xff0c;这都不是事至于吞吐量&#xff0c;通过扩容分分钟解决 4)分代的原因:不同对象的生命周期不相同&#xff0c;可能会扫描整个堆…

TiDB数据库 使用tiup 缩容遇到的tikv处于下线中状态无法转为tombstone状态

官方的缩容文档 https://docs.pingcap.com/zh/tidb/stable/scale-tidb-using-tiup 论坛地址 https://tidb.net/ 问题&#xff1a;使用tiup 缩容遇到的tikv处于下线中状态无法转为tombstone状态 解决方法 1.缩容 tiup cluster scale-in --node 10.0.1.5:20160 2.查看 tiup…

2024.5.6 关于 SpringCloud 的基本认知

目录 引言 微服务框架所包含的技术栈 微服务架构演变 单体架构 分布式架构 微服务架构 微服务技术对比 认识 SpringCloud SpringBoot 版本兼容关系 服务拆分和远程调用 服务拆分注意事项 远程调用 引言 微服务是一种框架风格&#xff0c;按照业务板块来划分应用代码…

彻底解决python的pip install xxx报错(文末附所有依赖文件)

今天安装pip install django又报错了&#xff1a; C:\Users\Administrator>pip install django WARNING: Ignoring invalid distribution -ip (d:\soft\python\python38\lib\site-pac kages) Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple Collecting djan…

构建 WebRTC 一对一信令服务器

构建 WebRTC 一对一信令服务器 构建 WebRTC 一对一信令服务器前言为什么选择 Nodejs&#xff1f;Nodejs 的基本原理浏览器使用 Nodejs安装 Nodejs 和 NPMsocket.io信令服务器搭建信令服务器客户端服务端启动服务器并测试 总结参考 构建 WebRTC 一对一信令服务器 前言 我们在学…

Bookends for Mac v15.0.2 文献书籍下载管理

Bookends Mac版可以轻松地将其导入参考 &#xff0c;并直接搜索和进口从数以百计的线上资料来源。Bookends Mac版使用内置在浏览器中下载参考与PDF格式的文件&#xff0c;或和/或网页的点击。 Bookends for Mac v15.0.2注册激活版下载 本文由 mdnice 多平台发布

信息系统项目管理师0092:项目管理原则(6项目管理概论—6.4价值驱动的项目管理知识体系—6.4.1项目管理原则)

点击查看专栏目录 文章目录 6.4价值驱动的项目管理知识体系6.4.1项目管理原则1.原则一:勤勉、尊重和关心他人2.原则二:营造协作的项目管理团队环境3.原则三:促进干系人有效参与4.原则四:聚焦于价值5.原则五:识别、评估和响应系统交互6.原则六:展现领导力行为7.原则七:根…

python菜鸟级安装教程 -下篇(安装编辑器)

来来~接着上篇的来~ 安装好python.exe之后&#xff0c;我们可以根据cmd命令窗口&#xff0c;码代码。 这算最简单入门了~ 如果我们在安装个编辑器。是什么效果&#xff0c;一起体验一下吧 第一步&#xff0c;下载编辑器&#xff0c;选择官网&#xff0c;下载免费版本入门足…

探索Baidu Comate:编程世界中的新利器

文章目录 Baidu Comate 介绍Baidu Comate的优势Baidu Comate安装过程Baidu Comate实战演练代码调优代码解释代码生成注释生成 总结 Baidu Comate 介绍 随着GPT的大火&#xff0c;衍生了各种AI工具&#xff0c;这些AI工具遍布在各行业各领域中&#xff0c;有AI写作、AI办公、AI…

Linux 认识与学习Bash——3

在Linux bash中&#xff0c;数据流重定向是指将命令的输出从默认的标准输出&#xff08;通常是终端&#xff09;重定向到其他位置&#xff0c;如文件或另一个命令的输入。这是通过使用特定的符号来实现的。例如&#xff0c;>用于将输出重定向到文件&#xff0c;而<用于将…