如何做到百万数据半小时跑批结束

什么是跑批

跑批就是应用程序定时对数据的批量处理。

跑批有以下特性:

  • 大数据量:批量任务一般伴随着大量的数据处理

  • 自动化:要求制定时间或频率自动运行

  • 性能:要求在指定时间内完成批处理任务

  • 健壮性:针对于异常数据,不可导致程序崩溃

  • 可靠性:针对于异常数据,我们后续可跟踪

数据准备

CREATE TABLE `test` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `date_time` datetime DEFAULT NULL COMMENT '时间',
  `str1` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3099998 DEFAULT CHARSET=utf8mb4

-- 添加数据存储过程
delimiter $$
create procedure insert_test()
begin
declare n int default 1;
while n< 3000000
do 
insert into test(date_time,str1) values(concat( CONCAT(FLOOR(2023 + (RAND() * 1)),'-',LPAD(FLOOR(10 + (RAND() * 2)),2,0),'-',LPAD(FLOOR(1 + (RAND() * 25)),2,0))),n);
set n = n+1;
end while;
end


call insert_test();

跑批需要考虑哪些问题

深度分页

MySQL limit 深分页 会变慢。

-- 0.016s
select id,str1 from test where date_time> '2020-09-19' limit 0,10;
-- 17.147s
select id,str1 from test where date_time> '2020-09-19' limit 2000000,10;

limit 的偏移量越大,执行时间越长。limit a, b会查询前a+b条数据,然后丢弃前a条数据,select * 会查询所有的列,会有回表操作。

针对上面的问题,我们需要的操作时尽量减少无效的回表策略,limit a,b,直接获取a+1到a+b条数据的id,再根据这些id查询数据这样就减少了回表的操作。

可以使用子查询优化SQL,先查出id,在分页。

-- 0.656s
select id,str1 FROM test where id >= (select a.id from test a where a.date_time >= '2020-09-19' limit 2000000,1) LIMIT 10;

sql优化这里不做过多赘述。

批量处理

跑批可能会涉及到数据准备的过程,边循环跑批数据边去查找所需的数据,一方面for嵌套的循环处理,时间复杂度通常是随着你的 for 个数上升的,例如:

  // 调用数据库查询需跑批数据
  List<BizDo> bizDoList = this.list(businessDate);
  // for 循环处理数据
  for(BizDo ba : bizDoList) {
    // 业务处理逻辑.. 省略
    
    // 查询账户数据
    List<BizAccountDo> bizAccountDoList = this.listGetBizAccount(ba.getbizUserId());
    for (BizAccountDo bic : bizAccountDoList){
      // 账户处理逻辑.. 省略
    }
    ... // 后续还会嵌套 for 循环
  }

这种情况可以采用批量处理,例如可以userId放在集合中,再去批量查询,这样可以提升效率。

List<String> bizUserIdList  = bizApplyDoList.parallelStream().map(BizApplyDo::getbizUserId()).collect(Collectors.toList());
// 批量进行账户查询
List<BizAccountDo> bizAccountDoList = this.listGetBizAccount(bizUserIdList);

同样对于插入也可以采用批量处理。

分片处理

在生产环境中,都是采用集群部署,如果一个跑批任务只跑在一个机器上,那效率肯定很低,我们可以利用 xxl-job**「分片广播」** 和 「动态分片」 功能。

执行器集群部署时,任务路由策略选择”分片广播”情况下,一次任务调度将会广播触发对应集群中所有执行器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;

“分片广播” 以执行器为维度进行分片,支持动态扩容执行器集群从而动态增加分片数量,协同进行业务处理;在进行大数据量业务操作时可显著提升任务处理能力和速度。

“分片广播” 和普通任务开发流程一致,不同之处在于可以获取分片参数,获取分片参数进行分片业务处理。

  • Java语言任务获取分片参数方式:BEAN、GLUE模式(Java)

    // 可参考Sample示例执行器中的示例任务"ShardingJobHandler"了解试用 int shardIndex = XxlJobHelper.getShardIndex();int shardTotal = XxlJobHelper.getShardTotal();
    

分片参数属性说明:

index:当前分片序号(从0开始),执行器集群列表中当前执行器的序号;total:总分片数,执行器集群的总机器数量;

该特性适用场景如:

  • 1、分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍;
  • 2、广播任务场景:广播执行器机器运行shell脚本、广播集群节点进行缓存更新等

/**
 * 分片广播进行100W用户重置
 * @param param
 */
@XxlJob(value = "shardingJob")
public void shardingJob(String param){

    // 获取当前节点的index 与 总节点数
    int shardIndex = XxlJobHelper.getShardIndex();
    int shardTotal = XxlJobHelper.getShardTotal();
    log.info("当前节点的index = {}, 总结点数 = {}", shardIndex, shardTotal);

    List<Integer> userIds = this.getUserIds();
    //这里只是给出参考,具体要结合实际
    userIds.stream().forEach(id ->{
        if(id % shardTotal == shardIndex){
            
            //todo 业务
        }
    });
}

/**
 * 模拟用户id
 * @return
 */
private List<Integer> getUserIds() {
    List<Integer> userIds = new ArrayList<>();
    for(int i = 0; i < 100 ; i++){
        userIds.add(i + 1);
    }
    return userIds;
}

线程安全

在进行跑批时,一般会采用多线程的方式进行处理,因此要考虑线程安全的问题,比如使用线程安全的容器,使用JUC包下的工具类。

事务

事务粒度要尽可能的小,选择合适的事务范围,要根据业务选择合适的事务传播属性。

1、这些操作自身是无法回滚的,这就会导致数据的不一致。可能RPC调用成功了,但是本地事务回滚了,可是PRC调用无法回滚了。

2、在事务中有远程调用,就会拉长整个事务。那么久会导致本事务的数据库连接一直被占用,那么如果类似操作过多,就会导致数据库连接池耗尽或者单个链接超时

异常处理

要保证程序的健壮性,做好异常处理,不能因为一处报错,导致整个任务执行失败,对于异常的数据可以跳过,不影响其他数据的正常执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/152201.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

BI 数据可视化平台建设(2)—筛选器组件升级实践

作者&#xff1a;vivo 互联网大数据团队-Wang Lei 本文是vivo互联网大数据团队《BI数据可视化平台建设》系列文章第2篇 -筛选器组件。 本文主要介绍了BI数据可视化平台建设中比较核心的筛选器组件&#xff0c; 涉及组件分类、组件库开发等升级实践经验&#xff0c;通过分享一些…

anaconda中安装pytorch和TensorFlow环境并在不同环境中安装kernel

❤️觉得内容不错的话&#xff0c;欢迎点赞收藏加关注&#x1f60a;&#x1f60a;&#x1f60a;&#xff0c;后续会继续输入更多优质内容❤️ &#x1f449;有问题欢迎大家加关注私戳或者评论&#xff08;包括但不限于NLP算法相关&#xff0c;linux学习相关&#xff0c;读研读博…

Redis最新2023年面试题高级面试题及附答案解析(2)【Redis最新2023年面试题高级面试题及附答案解析-第三十九刊】

文章目录 Redis最新2023年面试题高级面试题及附答案解析(2)01、Redis 集群方案应该怎么做&#xff1f;都有哪些方案&#xff1f;02、Redis 的内存用完了会发生什么&#xff1f;03、怎么测试 Redis 的连通性&#xff1f;04、Redis 集群会有写操作丢失吗&#xff1f;为什么&#…

【java学习—十五】Thread类的有关方法(3)

文章目录 1. 基本方法2. 线程的优先级3. 进阶方法3.1. 举例3.1.1. 线程让步3.1.2. join() 方法3.1.3. sleep()方法3.1.4. stop() 方法3.1.4. isAlive() 方法 1. 基本方法 方法名作用void start()启动线程&#xff0c;并执行对象的 run() 方法run()线程在被调度时执行的操作Str…

T13级专家被毕业?!研发大牛被裁带来的警示丨IDCF

2005年加入腾讯&#xff0c;腾讯第一位Web前端专家&#xff0c;T13职级&#xff0c;今年1月仍是腾讯前端最高专家。 在47岁的时候&#xff0c;拥有这样简历的前端大牛黄希彤被腾讯裁员。 黄希彤夫人在小红书上透露&#xff1a;&#xff08;黄希彤&#xff09;在鹅厂工作了15年…

大语言模型量化方法对比:GPTQ、GGUF、AWQ

在过去的一年里&#xff0c;大型语言模型(llm)有了飞速的发展&#xff0c;在本文中&#xff0c;我们将探讨几种(量化)的方式&#xff0c;除此以外&#xff0c;还会介绍分片及不同的保存和压缩策略。 说明&#xff1a;每次加载LLM示例后&#xff0c;建议清除缓存&#xff0c;以…

ROS 学习应用篇(六)参数的使用与编程

node可能不在一个电脑里但是这些服务的参数信息是共享的&#xff0c;因为话题Topic是异步的所以只有服务Service有实时参数信息可以调用。 接下来将演示服务参数信息的调用与修改。 创建功能包(工作空间src文件夹下) catkin_create_pkg learning_parameter roscpp rospy std…

MySQL中全文索引和普通索引的区别

MySQL中的全文索引&#xff08;Full-Text Index&#xff09;和普通索引&#xff08;比如B-Tree索引&#xff09;是为了提高查询效率而设计的&#xff0c;但它们适用于不同的场景和查询类型。 普通索引&#xff08;如B-Tree索引&#xff09; 适用场景&#xff1a;普通索引适用于…

jsp中使用PDF.js实现pdf文件的预览

本文介绍的是在使用jsp作为模板引擎的spring-mvc项目中&#xff0c;如何利用 PDF.js实现pdf文件的预览。 1、下载 PDF.js Getting Started (mozilla.github.io) 下载解压后其中有两个目录&#xff0c;直接将这两个文件夹放到项目的web资源目录中。此时相当于把PDF.js这个项目也…

3ds max 2024 V-Ray 6 ACES workflow 工作流设置

ACES的流程包括2个设置&#xff1a; 1、环境设置&#xff1b;2、贴图设置&#xff1a; 一、环境设置&#xff1a;3ds max 2024已经内置了OCIO文件&#xff1b;设置一下即可&#xff1b; 二、贴图设置&#xff1a; 所有类型贴图加载有默认和加后缀2种方法&#xff1a; 第一…

使用VC++设计程序使用邻域平均平滑算法、中值滤波算法、K近邻均值滤波器(KNNF)进行滤波

VC实现若干种图像滤波技术 文章目录 VC实现若干种图像滤波技术实验内容邻域平均平滑算法1. 原理2. 实验代码3. 实验现象 中值滤波算法1. 原理2. 实验代码3.实验现象 K近邻均值滤波算法&#xff08;KNNF&#xff09;1. 原理2. 实验代码实验现象 实验内容 实验要求&#xff1a; …

【机器学习】 特征工程:特征预处理,归一化、标准化、处理缺失值

特征预处理采用的是特定的统计方法&#xff08;数学方法&#xff09;将数据转化为算法要求的数字 1. 数值型数据 归一化&#xff0c;将原始数据变换到[0,1]之间 标准化&#xff0c;数据转化到均值为0&#xff0c;方差为1的范围内 缺失值&#xff0c;缺失值处理成均值、中…

PDF处理控件Aspose.PDF功能演示:使用C#查找和替换PDF文件中的文本

使用“查找并替换”选项可以一次性替换文档中的特定文本。这样&#xff0c;您不必手动定位和更新整个文档中每次出现的文本。本文甚至更进一步&#xff0c;介绍了如何在PDF文档中自动查找和替换文本功能。特别是&#xff0c;将学习如何使用C&#xff03;在整个PDF&#xff0c;特…

外汇天眼:失败的投资者经常陷入两个误区!

一、价格与价值的混淆 在金融领域&#xff0c;价格和价值往往被错误视为同义词。然而&#xff0c;审视市场时&#xff0c;我们会逐渐发现一个“安全差”的重要概念&#xff0c;这是由巴菲特的导师本杰明格雷厄姆提出的。 安全差是指股票的内在价值与市场价格之间的差异。内在…

WMS仓储管理系统与TMS系统整合后的优势

随着全球化的加速和供应链网络的日益复杂&#xff0c;仓库和运输成为企业运营中的两个关键环节。为了更高效地管理这两个环节&#xff0c;许多企业开始探索将WMS仓储管理系统和TMS运输管理系统整合的可能性。这种整合不仅可以提升仓库流程的可见性&#xff0c;还有助于改善调度…

1、24 个常见的 Docker 疑难杂症处理技巧(一)

1Docker 迁移存储目录 默认情况系统会将 Docker 容器存放在 /var/lib/docker 目录下 [问题起因] 今天通过监控系统&#xff0c;发现公司其中一台服务器的磁盘快慢&#xff0c;随即上去看了下&#xff0c;发现 /var/lib/docker 这个目录特别大。由上述原因&#xff0c;我们都知…

配置 ssh 免密登录

背景 从机器 A 使用 ssh 免密登录到机器 B&#xff0c;两台机器的 OS 都是 CentOS。其中机器 B 用作了一台 nodejs 的服务器&#xff0c;已经安装并运行了 sshd 服务&#xff0c;其用户名是 jmmem 我们想要实现在机器 A 上键入 ssh nodejs 就能免密登录到机器 B 的效果 机器…

python 最快多长时间学完?

以下是一个为零基础学员制作Python速成学习计划。这个计划包括了一些基本的Python概念和技能&#xff0c;以及一些实用的学习技巧。 第1周&#xff1a;基础入门 Python简介&#xff1a;了解Python的历史、特点、应用领域。 安装Python&#xff1a;在你的电脑上安装Python&am…

一个 不用充钱 也能让你变强的 VSCode 插件

今天给大家推荐一款不用充钱也能让你变强的 vscode 插件 通义灵码&#xff08;TONGYI Lingma&#xff09;&#xff0c;可以称之为 copilot 的替代甜品 &#x1f4aa; 什么是 通义灵码&#xff08;TONGYI Lingma&#xff09; 通义灵码&#xff08;TONGYI Lingma&#xff09;&am…

Martins 用法-利用现有的数学公式来绘制三相电机电流相位之间的关系

你可能会对这个主题很感兴趣。这背后的原因是我想分享一种我在花了大量时间研究诊断三相电机的方式时发现到的技术。我发现有一种非常简单的方式可以做到这一点&#xff0c;与Pico团队分享后&#xff0c;Steve Smith将它命名为Martins用法。 那么&#xff0c;让我们开始吧。 Ma…