600万订单每秒Disruptor +SpringBoot,如何解决消息不丢失?

尼恩说在前面

在40岁老架构师 尼恩的读者交流群(50+)中,最近有小伙伴拿到了一线互联网企业如得物、阿里、滴滴、极兔、有赞、shein 希音、百度、网易的面试资格,遇到很多很重要的面试题:

Disruptor 官方说能达到每秒600w OPS订单处理能力,怎么实现的?

Disruptor 什么情况下发生消费数据丢失? 该怎么解决?

小伙伴 没有回答好,导致面试挂了。

Disruptor 是队列之王,相关面试题也是一个非常常见的面试题,考察的是高性能的基本功。

如何才能回答得很漂亮,才能 让面试官刮目相看、口水直流呢?

这里,尼恩给大家做一下系统化、体系化的梳理,让面试官爱到 “不能自已、口水直流”,然后帮大家 实现 ”offer自由”

当然,这道面试题,以及参考答案,也会收入咱们的 《尼恩Java面试宝典》V175版本PDF集群,供后面的小伙伴参考,提升大家的 3高 架构、设计、开发水平。

注:本文以 PDF 持续更新,最新尼恩 架构笔记、面试题 的PDF文件,请关注本公众号【技术自由圈】获取。

队列之王 Disruptor 简介

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。

基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。

在这里插入图片描述

2011年,企业应用软件专家Martin Fowler专门撰写长文介绍Disruptor。

2011年,Disruptor还获得了Oracle官方的Duke大奖

目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。

Disruptor通过以下设计来,来实现 单线程能支撑每秒600万订单的问题:

  • 核心架构1:无锁架构
    生产和消费,都是无锁架构。具体来说,生产者位点/消费者位点的操作,都是无锁操作,或者使用轻量级CAS原子操作。

    无锁架构好处是,既没有锁的竞争, 也没有线程的内核态、用户态切换的开销。 关于内核态、用户态的原理请参见尼恩的葵花宝典。

  • 核心架构2:环形数组架构

    数组元素不会被回收,避免频繁的GC,所以,为了避免垃圾回收,采用数组而非链表。

    同时,数组对处理器的缓存机制更加友好。

    数组长度2^n,通过位运算,加快定位的速度。

    下标采取递增的形式。不用担心index溢出的问题。

    index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。

  • 核心架构3:cache line padding

    两个维度的CPU 缓存行加速,享受到 CPU Cache 那风驰电掣的速度带来的红利:

    第一维度: 对 位点等核心组件进行 CPU cache line padding,实现高并发访问(修改和取值)。

    第二个维度: ringbuffer 是一个数据,加载的时候一般也会塞满整个 CPU cache line。也就是说 从内存加载数据到 CPU Cache 里面的时候, 如果是加载数组里面的数据(如 Disruptor),那么 CPU 就会加载到数组里面连续的多个数据。
    所以,Disruptor 数组的遍历、还是位点的增长, 很容易享受到 CPU Cache 那风驰电掣的速度带来的红利。

SpringBoot + Disruptor 使用实战

有关 Disruptor的 简单实战,请参见 尼恩的 《Disruptor 学习圣经 V3》, 由于过于简单,这里不做啰嗦。

下面,来看一个SpringBoot + Disruptor的 使用实战

使用 Disruptor 实现一个生产消费模型步骤是:

  • 准备好简单的一个springboot应用

  • 定义事件(Event) : 你可以把 Event 理解为存放在队列中等待消费的消息对象。

  • 创建事件工厂 :事件工厂用于生产事件,我们在初始化 Disruptor 类的时候需要用到。

  • 创建处理事件的 Handler :Event 在对应的 Handler 中被处理,你可以将其理解为生产消费者模型中的消费者。

  • 创建并装配 Disruptor : 事件的生产和消费需要用到Disruptor 对象。

  • 定义生产者,并使用生产者发消息

  • 对简单的SpringBoot + Disruptor 进行扩容,实现 容量监控预警+ 动态扩容

定义一个Event和工厂

首先定义一个Event来包含需要传递的数据:

在这里插入图片描述

由于需要让Disruptor为我们创建事件,我们同时还声明了一个EventFactory来创建Event对象。

public class LongEventFactory implements EventFactory { 
    @Override 
    public Object newInstance() { 
        return new LongEvent(); 
    } 
} 

在这里插入图片描述定义

事件处理器(消费者)

我们还需要一个事件消费者,也就是一个事件处理器。

这个例子中,事件处理器的工作,就是简单地把事件中存储的数据打印到终端:

    /** 
     * 类似于消费者
     *  disruptor会回调此处理器的方法
     */
    static class LongEventHandler implements EventHandler<LongEvent> {
        @Override
        public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
            System.out.println(longEvent.getValue());
        }
    }

disruptor会回调此处理器的方法

定义事件源(生产者)

事件都会有一个生成事件的源,类似于 生产者的角色.

注意,这是一个 600wqps 能力的 异步生产者。 这里定义两个版本:

在这里插入图片描述

生产者的角色的接口定义如下

在这里插入图片描述

入门级:一个简单 DisruptorProducer 生产者的定义和使用

定义一个简单 DisruptorProducer 生产者

大致的代码如下

package com.crazymaker.cloud.disruptor.demo.business.impl;


@Slf4j
public class DisruptorProducer implements AsyncProducer {

    //一个translator可以看做一个事件初始化器,publicEvent方法会调用它
    //填充Event
    private static final EventTranslatorOneArg<LongEvent, Long> TRANSLATOR =
            new EventTranslatorOneArg<LongEvent, Long>() {
                public void translateTo(LongEvent event, long sequence, Long data) {
                    event.setValue(data);
                }
            };
    private final RingBuffer<LongEvent> ringBuffer;

    public DisruptorProducer() {
        this.ringBuffer = disruptor().getRingBuffer();
    }

    public void publishData(Long data) {
        log.info("生产一个数据:" + data + " | ringBuffer.remainingCapacity()= " + ringBuffer.remainingCapacity());

        ringBuffer.publishEvent(TRANSLATOR, data);

    }

 
    private Disruptor<LongEvent> disruptor() {

        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build();

        LongEventFactory eventFactory = new LongEventFactory();
        int bufferSize = 1024;
        Disruptor<LongEvent> disruptor = new Disruptor<>(eventFactory, bufferSize, namedThreadFactory,
                ProducerType.MULTI, new BlockingWaitStrategy());

        // 连接 消费者 处理器 ,两个消费者
        LongEventWorkHandler1 handler1 = new LongEventWorkHandler1();
        LongEventWorkHandler2 handler2 = new LongEventWorkHandler2();
        disruptor.handleEventsWith(handler1, handler2);
        //为消费者配置异常处理器
        disruptor.handleExceptionsFor(handler1).with(exceptionHandler);
        disruptor.handleExceptionsFor(handler2).with(exceptionHandler);

        // 开启 分裂者(事件分发)
        disruptor.start();
        return disruptor;
    }


    ExceptionHandler exceptionHandler =...//省略非核心代码  异常处理器实现
}


上面的代码,通过 disruptor() 方法创建和装配 一个Disruptor对象 ,Disruptor 里边有一个环形队列。然后 disruptor() 方法给 Disruptor对象设置消费者,并且为消费者设置异常处理器。

使用这一个简单 DisruptorProducer 生产者

定义一个配置类,用于实例化 生产者

在这里插入图片描述

定义controller, 注入这个 生产者,就可以异步发布数据 给消费者了

在这里插入图片描述

springboot应用启动之后, 可以通过 httpclient 工具,测试一下:

在这里插入图片描述

看一下测试数据

在这里插入图片描述

具体的代码和,演示过程,后面参考尼恩录制和发布《尼恩Java面试宝典》配套视频。

Disruptor:消费数据丢失问题的分析与解决

在处理高并发、大数据量等场景时,Disruptor虽然其高性能、低延迟,然而,在使用过程中,一些用户可能会遇到消费数据丢失问题。

为了解决这些问题,我们需要深入了解Disruptor的工作原理,并采取相应的解决方案。

消费数据丢失问题的根因

消费线程丢失问题通常发生在消费者处理速度跟不上生产者的时候。

由于Disruptor采用环形队列来存储数据,当队列满时,新的数据会覆盖旧的数据。

Disruptor 中,生产和消费的index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。

生产和消费的index 下标采取递增的形式。不用担心index溢出的问题。

生产和消费的index 是通过 取模, 映射到 ring 环形数据的。

在这里插入图片描述

如果消费者速度慢, 生产者快,消费跟不上,生产的index(/Sequence)就会越来越大,取模的时候,又会从0开始,去覆盖ring前面的数据,从而导致没有消费的数据被丢失。

在这里插入图片描述

从上图可以看到,只要生产者 的Sequence 大于消费者 一个ring 的数量, 就开始 覆盖旧的数据,也就是开始丢失数据。

消费数据丢失问题解决方案:

  1. 增加消费者数量:增加消费者线程的数量,可以并行处理更多的数据,提高消费速度。

    同时,合理配置消费者与生产者的数量比例,确保队列生产者 的Sequence 不会大于消费者 一个ring 的数量。

  2. 增加ring环形数组的大小:通过增加数组的大小,从而保证一个环可以存放足够多的数据,但这个可能会导致OOM。

  3. 剩余容量监控与告警:
    通过Prometheus 对 remainingCapacity剩余容量 进行实时监控,当remainingCapacity 超过80%(阈值)及时发出告警通知相关人员处理,进行微服务实例的 HPA 横向扩容,或者进行 Disruptor 队列进行动态扩容。

  4. Disruptor 动态扩容
    对 Disruptor 框架进行合理封装,从单个Disruptor 队列模式,变成 ringbuffer 数组的形式,并且可以结合nacos 或者 Zookeeper 这种发布订阅组件, 对 ringbuffer 数组 进行在线扩容。

总之,通过增加消费者数量、增加ring环形数组的大小、剩余容量监控与告警, Disruptor 动态扩容等方式,可以有效解决 消费数据丢失问题。

高级版:Spring Boot + Prometheus 监控剩余容量 大小

我们的微服务项目中使用了 spring boot,集成 prometheus。

在这里插入图片描述

我们可以通过将remainingCapacity 作为指标暴露到 prometheus 中,通过如下代码:

在这里插入图片描述

增加这个代码之后,请求 /actuator/prometheus 之后,可以看到对应的返回:

在这里插入图片描述

这样,当这个值低于20%,我们就认为这个剩余空间不够,可以扩容了。

Disruptor 如何 动态扩容?

关于 Disruptor 动态扩容的方案,可以实现一个可以扩容的子类

在这里插入图片描述

定义一个环形队列的数据

    private Disruptor<LongEvent>[] executors;

在构造函数中,初始化为1:

    public ResizableDisruptorProducer() {
        executors = new Disruptor[1];
        executors[0] = disruptor();
        this.ringBuffer = executors[0].getRingBuffer();

    }

发布事件的时候,通过取模的方式,确定使用executors 数组 其中的一个RingBuffer

在这里插入图片描述

在next方法,执行indx 取模和 获取 ringbuffer 的操作

在这里插入图片描述

这里参考了netty源码里边 PowerOfTwoEventExecutorChooser 反应器选择的方式,使用位运算取模,从而实现高性能取模。

什么时候扩容呢? 当监控发现超过80%的阈值后,运维会收到告警,然后可以通过naocos、Zookeeper的发布订阅, 通知微服务进行扩容。

微服务 扩容要回调下面的risize方法

在这里插入图片描述

具体的代码和,演示过程,后面参考尼恩录制和发布《尼恩Java面试宝典》配套视频。

在使用Disruptor框架时,需要根据实际情况选择合适的监控和扩容解决方案,并不断优化和调整系统配置,以满足日益增长的业务需求。

说在最后

以上的内容,如果大家能对答如流,如数家珍,基本上 面试官会被你 震惊到、吸引到。最终,让面试官爱到 “不能自已、口水直流”。offer, 也就来了。

在面试之前,建议大家系统化的刷一波 5000页《尼恩Java面试宝典》V174,在刷题过程中,如果有啥问题,大家可以来 找 40岁老架构师尼恩交流。

另外,如果没有面试机会,可以找尼恩来帮扶、领路。

尼恩已经指导了大量的就业困难的小伙伴上岸,前段时间,帮助一个40岁+就业困难小伙伴拿到了一个年薪100W的offer,小伙伴实现了 逆天改命 。

尼恩技术圣经系列PDF

  • 《NIO圣经:一次穿透NIO、Selector、Epoll底层原理》
  • 《Docker圣经:大白话说Docker底层原理,6W字实现Docker自由》
  • 《K8S学习圣经:大白话说K8S底层原理,14W字实现K8S自由》
  • 《SpringCloud Alibaba 学习圣经,10万字实现SpringCloud 自由》
  • 《大数据HBase学习圣经:一本书实现HBase学习自由》
  • 《大数据Flink学习圣经:一本书实现大数据Flink自由》
  • 《响应式圣经:10W字,实现Spring响应式编程自由》
  • 《Go学习圣经:Go语言实现高并发CRUD业务开发》

……完整版尼恩技术圣经PDF集群,请找尼恩领取

《尼恩 架构笔记》《尼恩高并发三部曲》《尼恩Java面试宝典》PDF,请到下面公号【技术自由圈】取↓↓↓

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/420656.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

databinding双向绑定原理,Android程序员最新职业规划

1. Android架构设计模式 MVC架构设计模式&#xff1a;MVC全名是Model View Controller&#xff0c;是模型(model)-视图(view)-控制器(controller)的缩写。MVP架构设计模式&#xff1a;MVC全名是Model View Persenter&#xff0c;MVP由MVC演变而来&#xff0c;是现在主流的开发…

IDEA切换 Springboot初始化 URL

&#x1f339;作者主页&#xff1a;青花锁 &#x1f339;简介&#xff1a;Java领域优质创作者&#x1f3c6;、Java微服务架构公号作者&#x1f604; &#x1f339;简历模板、学习资料、面试题库、技术互助 &#x1f339;文末获取联系方式 &#x1f4dd; 往期热门专栏回顾 专栏…

Mysql常见用法(2)

目录​​​​​​​ mysql 约束 primary key 主键的基本使用 notnull(非空) unique(唯一) foreign key(外键) check 自增长 mysql索引 索引的原理 索引的类型 索引的使用 --添加索引 删除索引&#xff1a; -- 修改索引 &#xff0c; 先删除&#xff0c;在添加新…

94. 递归实现排列型枚举 刷题笔记

思路 依次枚举 每个位置用哪个数字 要求按照字典序最小来输出 而每次搜索下一层时i都是从1开始 也就是说 如果有小的数可以填上 那么该方案会填上这个数字 例如 当n等于3 第一次搜索 1 2 3输出后返回 返回后此时i3 第二个位置填3 1 3 2 输出后返回 此时返回到第一层…

vscode设置打开浏览器

安装这个插件 Open Browser Preview

MYSQL--锁机制*

一.对锁机制的大概介绍: 1.大概的来说,MYSQL当中的锁实际上就是合理的管理多个服务器对于同一个共享资源的使用,是计算机协调多个进程或者是线程并发访问某一资源的机制(避免争抢资源的现象发生) 2.在数据库当中,数据是一种可以供许多的用户进行共享使用的资源,如何保证数据并发…

Vue2:用node+express部署Vue项目

一、编译项目 命令 npm run build执行命令后&#xff0c;我们会在项目文件夹中看到如下生成的文件 二、部署Vue项目 接上一篇&#xff0c;nodeexpress编写轻量级服务 1、在demo中创建static文件夹 2、将dist目录中的文件放入static中 3、修改server.js文件 关键配置&…

Function calling流程总结 和 用于构建Agent的Function calling流程

Function calling流程总结的步骤如下&#xff1a; 自定义函数&#xff1a;根据用户需求&#xff0c;自定义函数chen_ming_algorithm&#xff0c;用于处理特定的任务。创建字典&#xff1a;根据自定义函数&#xff0c;创建一个字典chen_ming_function&#xff0c;其中包含自定义…

华为 OD 一面算法原题

2.2 亿彩票公布调查结果 昨天&#xff0c;闹得沸沸扬扬的《10 万中 2.2 亿》的彩票事件&#xff0c;迎来了官方公告。 简单来说&#xff0c;调查结果就是&#xff1a;一切正常&#xff0c;合规合法。 关于福利彩票事件&#xff0c;之前的推文我们已经分析过。 甚至在后面出现《…

云上攻防-云服务篇弹性计算服务器云数据库实例元数据控制角色AK控制台接管

知识点: 1、云服务-弹性计算服务器-元数据&SSRF&AK 2、云服务-云数据库-外部连接&权限提升 章节点&#xff1a; 云场景攻防&#xff1a;公有云&#xff0c;私有云&#xff0c;混合云&#xff0c;虚拟化集群&#xff0c;云桌面等 云厂商攻防&#xff1a;阿里云&am…

Kepler 参数化查询优化方法

写在前面 本文主要介绍了发布于 2023 年 SIGMOD 的论文《Kepler: Robust Learning for Faster Parametric Query Optimization》&#xff0c;该文章针对参数化查询&#xff0c;将参数化查询优化与查询优化结合&#xff0c;旨在减少查询规划时间的同时提高查询性能。 为此&…

【Java项目介绍和界面搭建】拼图小游戏——添加图片

&#x1f36c; 博主介绍&#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 hacker-routing &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【应急响应】 【Java】 【VulnHub靶场复现】【面试分析】 &#x1f389;点赞➕评论➕收藏 …

C++ 补充之常用排序算法

C 补充之常用排序算法 常用的排序算法主要包括冒泡排序、选择排序、插入排序、快速排序、归并排序和堆排序&#xff0c;下面简单介绍一下它们的概念和原理&#xff1a; 冒泡排序&#xff08;Bubble Sort&#xff09;&#xff1a; 冒泡排序是一种基础的排序算法&#xff0c;它重…

作业1-224——P1015 [NOIP1999 普及组] 回文数

题目描述 思路 首先此题为一道高精度题&#xff0c;然后本题按照题目意思模拟即可。我们可以开两个数组来记录高精度数字&#xff0c;这样方便我们处理。判断“该数组是否回文”、“c翻转存入d再做cd”可以写成两个单独的函数。然后主程序组织一下他们即可。注意好退出循环的…

CSC联合培养博士生需要特别关注的几点问题

国家留学基金委&#xff08;CSC&#xff09;的联合培养博士生的申请方法、申报流程等&#xff0c;我们以往做过多次介绍&#xff0c;但因为在读博士本身的特殊性&#xff0c;申请时还应考虑其它因素&#xff0c;本篇知识人网小编谈谈联培博士生需要特别关注的问题。 一、注意安…

VIT速记

VIT架构 【ViT论文逐段精读【论文精读】】 【精准空降到 30:29】 https://www.bilibili.com/video/BV15P4y137jb/?share_sourcecopy_web&vd_sourcef09504571c3138e9e610217797aba3a4&t1829 首先把图片分为几个Patch&#xff0c;比如我们此时输入的图片为224*224*3&…

渗透测试靶场环境搭建

1.DVWA靶场 DVWA&#xff08;Damn Vulnerable Web Application&#xff09;是一个用来进行安全脆弱性鉴定的PHP/MySQL Web应用&#xff0c;包含了OWASP TOP10的所有攻击漏洞的练习环境&#xff0c;旨在为安全专业人员测试自己的专业技能和工具提供合法的环境&#xff0c;同时…

判断闰年(1000-2000)

判断规则&#xff1a;1.能被4整除&#xff0c;不能被100整除是闰年,2.能被400整除是闰年 #include <stdio.h>int is_leap_year(int n){if((n % 400 0)||((n % 4 0)&&(n % 100 ! 0)))return 1;elsereturn 0; } int main() {int i 0;int count 0;for(i 1000;…

【C++精简版回顾】15.继承派生

1.继承派生的区别 继承&#xff1a;子继父业&#xff0c;就是子类完全继承父类的全部内容 派生&#xff1a;子类在父类的基础上发展 2.继承方式 1.public继承为原样继承 2.protected继承会把public继承改为protect继承 3.private继承会把public&#xff0c;protected继承改为pr…

179基于matlab的2D-VMD处理图像

基于matlab的2D-VMD处理图像&#xff0c;将图片进行VMD分解&#xff0c;得到K个子模态图&#xff0c;将每个模态图进行重构&#xff0c;得到近似的原图。可以利用这点进行图像去噪。程序已调通&#xff0c;可直接运行。 179 2D-VMD 图像分解重构 图像处理 (xiaohongshu.com)