提升 Spring Boot 系统性能:高效处理实时数据流的 BufferTrigger 使用详解

提升 Spring Boot 系统性能:高效处理实时数据流的 BufferTrigger 使用详解

在现代应用中,特别是像社交平台、金融系统等高并发场景下,如何高效地处理大量实时数据成为了系统设计的一个关键问题。BufferTrigger 是由快手开源的一个工具,专为解决大数据流处理场景中的缓冲与批量处理问题。本文将详细讲解如何在 Spring Boot 项目中使用 BufferTrigger,帮助你提高系统的吞吐量与响应速度,减少 I/O 操作,从而提升整体性能。

BufferTrigger 简介:如何高效处理实时数据流

快手开源的 BufferTrigger 是一个用于数据处理,它主要用于实时数据流处理场景。BufferTrigger 的主要作用是为了解决在大数据流处理中常见的问题:如何高效地对连续的数据流进行缓冲,并在满足一定条件时触发下游计算或存储操作。

使用 BufferTrigger 优势如下:

  1. 提高效率:通过批量处理数据而不是逐条处理,可以显著减少 I/O 操作的次数,从而提升整体处理效率。
  2. 资源优化:对于一些需要消耗较多计算资源的操作(如写入数据库、调用外部服务等),通过累积一批数据后再执行一次这样的操作,可以更有效地利用系统资源。
  3. 简化逻辑:对于开发者而言,使用 BufferTrigger 可以帮助简化代码逻辑,将注意力集中在业务逻辑上而不是复杂的缓冲控制逻辑上。
  4. 灵活配置:支持多种触发策略(比如基于时间窗口、基于数据量大小等),使得用户可以根据具体应用场景灵活选择最合适的触发方式。
  5. 易于集成:设计上考虑了与现有数据处理框架的良好兼容性,使得它可以方便地与其他组件一起工作,在现有的技术栈中引入该功能变得更加容易。

如何添加依赖:快速集成到 Spring Boot 项目

只需要在 pom.xml 中添加以下依赖,即可将 BufferTrigger 集成到你的 Spring Boot 项目中:

 <properties>
        // 省略...
        <buffertrigger.version>0.2.21</buffertrigger.version>
    </properties>

    <!-- 统一依赖管理 -->
    <dependencyManagement>
        <dependencies>
            // 省略...

            <!-- 快手 Buffer Trigger -->
            <dependency>
                <groupId>com.github.phantomthief</groupId>
                <artifactId>buffer-trigger</artifactId>
                <version>${buffertrigger.version}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

快手 BufferTrigger 使用讲解

  • 核心概念
    1. 缓冲队列:BufferTrigger 会维护一个内部缓冲区,用来缓存从外部接收的数据。它允许指定缓存队列的最大容量,当达到上限时会根据预设的触发策略进行数据的批量处理。
    2. 触发策略:触发策略是指何时将缓存的数据批量提交进行处理。常见的触发策略有:
      • 基于数据量:当缓存的数据达到指定大小时触发处理。
      • 基于时间窗口:每隔一定时间就触发一次处理。
      • 混合触发:同时满足数据量和时间条件时触发。
    3. 数据消费:通过 BufferTrigger 提供的消费者回调机制,开发者可以自定义数据消费的逻辑。一般情况下,消费的过程是对缓存的数据进行处理、存储或其他操作。
    4. 批量处理:将一批数据聚合后一起处理,而不是一条一条地处理。这样能够减少 I/O 操作的次数,从而提高系统的吞吐量。

使用案例

在许多社交平台上,网红或明星的粉丝数通常会发生频繁的波动。比如,当一个网红被大量用户关注或取消关注时,这些信息会通过消息队列(如 RocketMQ)快速传递,系统需要高效地处理这些变化,并更新到缓存或数据库中。在这种场景下,如果每次有粉丝关注或取消关注时都进行一次 I/O 操作,会导致系统的负载过大,尤其是在并发请求较高时。

为了提高系统的性能,减少频繁的 I/O 操作,通常采用 批量处理 的方式来对消息进行合并和延迟处理。BufferTrigger 就是为了应对这种高并发和实时性要求的场景,它能够将多条消息缓存起来,当满足触发条件时(比如缓存队列达到一定大小或时间窗口到期),将这些消息批量处理,从而减少与缓存系统的交互次数,提升系统的吞吐量和响应速度。

@Component
@RocketMQMessageListener(consumerGroup = "xiaohashu_group_" + MQConstants.TOPIC_COUNT_FANS, // Group 组
        topic = MQConstants.TOPIC_COUNT_FANS // 主题 Topic
)
@Slf4j
public class CountFansConsumer implements RocketMQListener<String> {

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    private BufferTrigger<String> bufferTrigger = BufferTrigger.<String>batchBlocking()
            .bufferSize(50000) // 缓存队列的最大容量
            .batchSize(1000)   // 一批次最多聚合 1000 条
            .linger(Duration.ofSeconds(1)) // 多久聚合一次
            .setConsumerEx(this::consumeMessage)
            .build();

    @Resource
    private RocketMQTemplate rocketMQTemplate;



    @Override
    public void onMessage(String body) {
        // 往 bufferTrigger 中添加元素
        bufferTrigger.enqueue(body);
    }

    private void consumeMessage(List<String> bodys) {
        log.info("==> 聚合消息, size: {}", bodys.size());
        log.info("==> 聚合消息, {}", JsonUtils.toJsonString(bodys));
        // List<String> 转 List<CountFollowUnfollowMqDTO>
        List<CountFollowUnfollowMqDTO> countFollowUnfollowMqDTOS = bodys.stream()
                .map(body -> JsonUtils.parseObject(body, CountFollowUnfollowMqDTO.class)).toList();

        //按照用户进行一个分组
        Map<Long, List<CountFollowUnfollowMqDTO>> groupMap  =
                countFollowUnfollowMqDTOS.stream()
                        .collect(Collectors.groupingBy(CountFollowUnfollowMqDTO::getUserId));

        // 按组汇总数据,统计出最终的计数
        // key 为目标用户ID, value 为最终操作的计数
        Map<Long, Integer> countMap = Maps.newHashMap();

        for (Map.Entry<Long, List<CountFollowUnfollowMqDTO>> entry : groupMap.entrySet()) {
            List<CountFollowUnfollowMqDTO> list = entry.getValue();
            // 最终的计数值,默认为 0
            int finalCount = 0;
            for (CountFollowUnfollowMqDTO countFollowUnfollowMqDTO : list) {
                Integer type = countFollowUnfollowMqDTO.getType();
                FollowUnfollowTypeEnum followUnfollowTypeEnum = FollowUnfollowTypeEnum.valueOf(type);
                // 若枚举为空,跳到下一次循环
                if (Objects.isNull(followUnfollowTypeEnum)) {
                    continue;
                }

                switch (followUnfollowTypeEnum) {
                    case FOLLOW -> finalCount += 1;
                    case UNFOLLOW -> finalCount -= 1;
                }

            }
            // 将分组后统计出的最终计数,存入 countMap 中
            countMap.put(entry.getKey(), finalCount);
        }
        log.info("## 聚合后的计数数据: {}", JsonUtils.toJsonString(countMap));
        // 更新 Redis
        countMap.forEach((k, v) -> {
            // Redis Key
            String redisKey = RedisKeyConstants.buildCountUserKey(k);
            // 判断 Redis 中 Hash 是否存在
            boolean isExisted = redisTemplate.hasKey(redisKey);

            // 若存在才会更新
            // (因为缓存设有过期时间,考虑到过期后,缓存会被删除,这里需要判断一下,存在才会去更新,而初始化工作放在查询计数来做)
            if (isExisted) {
                // 对目标用户 Hash 中的粉丝数字段进行计数操作
                redisTemplate.opsForHash().increment(redisKey, RedisKeyConstants.FIELD_FANS_TOTAL, v);
            }
        });

        // 发送 MQ, 计数数据落库
        // 构建消息体 DTO
        Message<String> message = MessageBuilder.withPayload(JsonUtils.toJsonString(countMap))
                .build();

        // 异步发送 MQ 消息,提升接口响应速度
        rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_FANS_2_DB, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("==> 【计数服务:粉丝数入库】MQ 发送成功,SendResult: {}", sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("==> 【计数服务:粉丝数入库】MQ 发送异常: ", throwable);
            }
        });
    }

}

代码解析

  • BufferTrigger 的构建:通过 .batchBlocking() 创建一个 BufferTrigger 实例,该实例设置了缓存队列的最大容量、每批次最多处理的消息数量、以及聚合的时间窗口等配置。
  • enqueue(body):每接收到一条消息,就将消息加入到缓冲队列中,BufferTrigger 会根据设定的策略决定何时批量处理这些数据。
  • consumeMessage(List<String> bodys):当数据满足触发条件时(如缓存队列满或时间窗口到期),consumeMessage 会被调用,处理聚合后的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/980736.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

一次有趣的前后端跨越排查

进行前后端代码联调的时候&#xff0c;使用axios调用后端请求&#xff0c;因为都是本地进行联调&#xff0c;所以没有考虑跨域的问题&#xff0c;写了一个get的请求接口&#xff0c;请求后端时&#xff0c;突然跳出下面的问题&#xff1a; 错误的信息一看很像就是跨域的问题&…

创建一个简单的spring boot+vue前后端分离项目

一、环境准备 此次实验需要的环境&#xff1a; jdk、maven、nvm和node.js 开发工具&#xff1a;idea或者Spring Tool Suite 4&#xff0c;前端可使用HBuilder X&#xff0c;数据库Mysql 下面提供maven安装与配置步骤和nvm安装与配置步骤&#xff1a; 1、maven安装与配置 1…

【0011】HTML其他文本格式化标签详解(em标签、strong标签、b标签、i标签、sup标签、sub标签......)

如果你觉得我的文章写的不错&#xff0c;请关注我哟&#xff0c;请点赞、评论&#xff0c;收藏此文章&#xff0c;谢谢&#xff01; 本文内容体系结构如下&#xff1a; 本文旨在深入探讨HTML中其他的文本格式化标签&#xff0c;主要有<em> 标签、<strong> 标签、…

从零开始:H20服务器上DeepSeek R1 671B大模型部署与压力测试全攻略

前言 最近&#xff0c;我有幸在工作中接触到了DeepSeek R1 671B模型&#xff0c;这是目前中文开源领域参数量最大的高质量模型之一。DeepSeek团队在2024年推出的这款模型&#xff0c;以其惊人的6710亿参数量和出色的推理性能&#xff0c;引起了业界广泛关注。 作为一名AI基础…

mySQL复习

目录 一.写在前面 二.介绍 三.选择语句 四.内连接 五.列属性 一.写在前面 课程视频&#xff1a;【中字】SQL进阶教程 | 史上最易懂SQL教程&#xff01;10小时零基础成长SQL大师&#xff01;&#xff01;_哔哩哔哩_bilibili 课程所需资料&#xff1a; 链接&#xff1a;h…

基于SpringBoot+Vue的医院挂号管理系统+LW示例参考

系列文章目录 1.基于SSM的洗衣房管理系统原生微信小程序LW参考示例 2.基于SpringBoot的宠物摄影网站管理系统LW参考示例 3.基于SpringBootVue的企业人事管理系统LW参考示例 4.基于SSM的高校实验室管理系统LW参考示例 5.基于SpringBoot的二手数码回收系统原生微信小程序LW参考示…

golang介绍,特点,项目结构,基本变量类型与声明介绍(数组,切片,映射),控制流语句介绍(条件,循环,switch case)

目录 golang 介绍 面向并发 面向组合 特点 项目结构 图示 入口文件 main.go 基本变量类型与声明 介绍 声明变量 常量 字符串(string) 字符串格式化 空接口类型 数组 切片 创建对象 追加元素 复制切片 map(映射) 创建对象 使用 多重赋值 控制流语句…

3.2-A-L1-2-第15讲-冒泡排序 mochen @denglexi

博观而约取 厚积而薄发 Observe extensively but select wisely; accumulate deeply but release sparingly. 每次比较两个相邻的元素&#xff0c;如果它们的顺序错误就把它 们交换过来。 每一轮进行两两比较&#xff0c;将该轮中最大/最小的值冒出来。 冒泡程序核心代码&#…

25、泛型

十二章、泛型 12-1 为何要有泛型 1、泛型&#xff1a;是一种标签。把元素的类型设计成一个参数&#xff0c;这个类型参数就叫做泛型 2、所谓泛型&#xff0c;就是允许在定义类、接口时通过一个标识表示类中 某个属性的类型或者是某个方法的返回值及参数类型。这个类型参数将在…

[KEIL]单片机技巧 01

1、查看外设寄存器的值 配合对应的芯片开发手册以查看寄存器及其每一位的意义&#xff0c;可以解决90%以上的单纯的片内外设bug&#xff0c;学会如何通过寄存器的值来排外设上的蛊是嵌入式开发从小白到入门的重要一步&#xff0c;一定要善于使用这个工具&#xff0c;而不是外设…

TCP/IP 5层协议簇:网络层(IP数据包的格式、路由器原理)

目录 1. TCP/IP 5层协议簇 2. IP 三层包头协议 3. 路由器原理 4. 交换机和路由的对比 1. TCP/IP 5层协议簇 如下&#xff1a; 2. IP 三层包头协议 数据包如下&#xff1a;IP包头不是固定的&#xff0c;每一个数字是一个bit 其中数据部分是上层的内容&#xff0c;IP包头最…

免费轻巧多功能 PDF 处理工具:转换、压缩、提取一应俱全

软件技术 今天要给大家分享一款超实用的 PDF 处理工具&#xff0c;它免费又轻巧&#xff0c;如同随时待命的得力小帮手&#xff0c;功能之强大超乎想象&#xff0c;真的值得大家收藏。 这款工具是绿色版软件&#xff0c;解压后开启&#xff0c;满满的 PDF 处理功能便映入眼帘…

基于微信小程序的疫情互助平台(源码+lw+部署文档+讲解),源码可白嫖!

摘要 时代在飞速进步&#xff0c;每个行业都在努力发展现在先进技术&#xff0c;通过这些先进的技术来提高自己的水平和优势&#xff0c;从2019年底新型冠状肺炎疫情的爆发以来&#xff0c;使很多工作的管理工作难度再上一层楼。为了在疫情期间能更好的维护信息管理&#xff0…

飞致云开源社区月度动态报告(2025年2月)

自2023年6月起&#xff0c;中国领先的开源软件公司飞致云以月度为单位发布《飞致云开源社区月度动态报告》&#xff0c;旨在向广大社区用户同步飞致云旗下系列开源软件的发展情况&#xff0c;以及当月主要的产品新版本发布、社区运营成果等相关信息。 飞致云开源运营数据概览&…

数据库拓展操作

目录 一、截断表&#xff1a; 操作目的&#xff1a; 操作内容&#xff1a; 性能影响&#xff1a; 基本语法&#xff1a; 例子&#xff1a; 二、插入查询结果&#xff1a; 基本语法&#xff1a; 例子&#xff1a; 三、聚合函数&#xff1a; 常用函数&#xff1a; 基…

在 Mac 上使用 Docker 安装宝塔并部署 LNMP 环境

前言 只因为在mac上没有找到合适的PHP开发集成环境&#xff0c;之前有安装了Eserver&#xff0c;但是安装一些常用PHP扩展有时候还是需要手动去编译添加。phpStudy也没有找到适合Mac的版本&#xff0c;在后面安装了Parallels Desktop虚拟机 来运行Ubuntu系统搭建了一套LNMP环境…

Node.js二:第一个Node.js应用

精心整理了最新的面试资料和简历模板&#xff0c;有需要的可以自行获取 点击前往百度网盘获取 点击前往夸克网盘获取 创建的时候我们需要用到VS code编写代码 我们先了解下 Node.js 应用是由哪几部分组成的&#xff1a; 1.引入 required 模块&#xff1a;我们可以使用 requi…

Excel基础(详细篇):总结易忽视的知识点,有用的细节操作

目录 基础篇Excel主要功能必会快捷键LotusExcel的文件类型工作表基本操作表项操作选中与缩放边框线 自动添加边框线格式刷设置斜线表头双/多斜线表头不变形的:双/多斜线表头插入多行、多列单元格/行列的移动冻结窗口 方便查看数据打印的常见问题Excel格式数字格式日期格式文本…

vue3:四嵌套路由的实现

一、前言 1、嵌套路由的含义 嵌套路由的核心思想是&#xff1a;在某个路由的组件内部&#xff0c;可以定义子路由&#xff0c;这些子路由会渲染在父路由组件的特定位置&#xff08;通常是 <router-view> 标签所在的位置&#xff09;。通过嵌套路由&#xff0c;你可以实…

【实战篇】【深度解析DeepSeek:从机器学习到深度学习的全场景落地指南】

一、机器学习模型:DeepSeek的降维打击 1.1 监督学习与无监督学习的"左右互搏" 监督学习就像学霸刷题——给标注数据(参考答案)训练模型。DeepSeek在信贷风控场景中,用逻辑回归模型分析百万级用户数据,通过特征工程挖掘出"凌晨3点频繁申请贷款"这类魔…