【已解决】Java 中使用 ES 高级客户端库 RestHighLevelClient 清理百万级规模历史数据

🎉工作中遇到这样一个需求场景:由于ES数据库中历史数据过多,占用太多的磁盘空间,需要定期地进行清理,在一定程度上可以释放磁盘空间,减轻磁盘空间压力。

🎈在经过调研之后发现,某服务项目每周产生的数据量已经达到千万级别,单日将近能产生两百万的数据量写入到 ES 数据库中,平均每个小时最少产生 10w+ 条数据,加上之前的历史数据,目前生产环境 ES 数据量已经达到两亿一千四百八十万的数据。并且随着当前业务量的爆发式增长,数据增长量急剧飙升,在未来一年内每周产生的数据量有望达到 3kw-5kw 左右。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

💡因此,对 ES 数据库中历史数据进行清理势在必行,为了能够释放磁盘空间,并且还要保证业务方能够进行日常问题的排查定位,决定从两个月前的数据开始清理,方案如下:

  • 编写定时任务,每天凌晨三点清理两个月前的那一天数据,之所以选择凌晨三点是因为在 Grafana 查看了生产环境的集群监控情况,凌晨两点至四点之间的集群、索引的查询以及写入 QPS 都比较低。

在这里插入图片描述

  • 清理一天的数据时,根据时间段进行清理,每个小时清理一次,避免内存中存放太多的数据,导致内存溢出。
  • 清理 ES 数据时,需要先查询出数据,而 ES 默认最多只能查询 1w 条数据,如果当次需要删除的数据量超过 1w 条,普通的查询操作无法完全删除数据。因此,需要采用滚动查询的方式,滚动查询结果保持时间需要设置合理,不能太长,否则也可能会导致内存溢出。

根据以上的思路方案,设计的定时清理ES历史数据代码如下:

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

/**
 * 清理ES历史数据定时任务
 */
@Component
public class CleanESHistoryDataTask {

    private static final Logger LOGGER = LoggerFactory.getLogger(CleanESHistoryDataTask.class);

    @Resource
    private RestHighLevelClient restHighLevelClient;

    /**
     * 根据索引名称删除当前日期两个月前的那一天的历史文档数据
     * @param jobContext
     */
    @Scheduled
    public void cleanESHistoryData(JobContext jobContext) {
    	// jobContext为定时任务中回传数据
        String indexName = jobContext.getData();
        if (StringUtils.isBlank(indexName)) {
            LOGGER.warn("ES索引名称不能为空!");
            return;
        }
        long startTimeMillis = System.currentTimeMillis();
        String twoMonthsAgoDate = DateTool.format(DateUtils.addMonths(new Date(), -1), DateTool.DF_DAY);
        try {
            String startTimeStr = twoMonthsAgoDate + " 00:00:00";
            // 初始化时间,形如2023-08-06 00:00:00
            Date initialStartTime = DateTool.parse(startTimeStr, DF_FULL);
            // 每次循环清理一个小时历史文档数据,循环24次清理完一天的历史文档数据
            for (int i = 0; i < 24; i++) {
                Date startTime = initialStartTime;
                startTime = DateUtils.addHours(startTime, i);
                Date endTime = DateUtils.addHours(startTime, 1);
                LOGGER.info("正在清理索引:[{}],时间:{} 至 {}的历史文档数据...", indexName, DateTool.format(startTime, DF_FULL), DateTool.format(endTime, DF_FULL));
                long currentStartTimeMillis = System.currentTimeMillis();
                // 指定操作的索引库
                SearchRequest searchRequest = new SearchRequest(indexName);
                // 构造查询条件,指定查询的时间范围,每次最多写入1000条数据至内存,减轻服务器内存压力
                SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.rangeQuery("createTimeStr.keyword")
                                                                                    .from(DateTool.format(startTime, DF_FULL))
                                                                                    .to(DateTool.format(endTime, DF_FULL)))
                                                                                    .size(1000);
                // 设置滚动查询结果在内存中的过期时间为1min
                Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
                // 将滚动以及构造的查询条件放入查询请求
                searchRequest.scroll(scroll).source(searchSourceBuilder);
                SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
                // 记录要滚动的ID
                String scrollId = searchResponse.getScrollId();
                SearchHit[] hits = searchResponse.getHits().getHits();
                while (hits != null && hits.length > 0) {
                    // 创建批量处理请求对象
                    BulkRequest bulkRequest = new BulkRequest();
                    for (SearchHit hit : hits) {
                        DeleteRequest deleteRequest = new DeleteRequest(indexName, hit.getId());
                        bulkRequest.add(deleteRequest);
                    }
                    // 执行批量删除请求操作
                    restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
                    // 构造滚动查询条件,继续滚动查询
                    SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
                    scrollRequest.scroll(scroll);
                    searchResponse = restHighLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);
                    scrollId = searchResponse.getScrollId();
                    hits = searchResponse.getHits().getHits();
                }
                // 当前滚动查询结束,清除滚动,释放服务器内存资源
                ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
                clearScrollRequest.addScrollId(scrollId);
                restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
                LOGGER.info("清理索引:[{}],时间:{} 至 {}的历史文档数据成功,耗时{}ms", indexName, DateTool.format(startTime, DF_FULL), DateTool.format(endTime, DF_FULL), (System.currentTimeMillis() - currentStartTimeMillis));
            }
            LOGGER.info("[cleanESHistoryData] 定时任务-清理索引:[{}],时间:{}的历史文档数据成功,耗时{}ms", indexName, twoMonthsAgoDate, (System.currentTimeMillis() - startTimeMillis));
        } catch (Exception e) {
            LOGGER.error(String.format("[cleanESHistoryData] 定时任务-清理索引:[{}],时间:{}的历史文档数据失败,耗时{}ms", indexName, twoMonthsAgoDate, (System.currentTimeMillis() - startTimeMillis)), e);
        }
    }
}

其中,需要注意以下几点

  • 在 Java 中对 ES 进行操作,这里使用的是 ES 的高级客户端组件 RestHighLevelClient
  • @Scheduled 注解为自研定时任务工具注解,外界无法使用,在使用定时任务时需要自己选择合适的定时任务框架。
  • DateTool 工具类为自研工具类,外界同样无法使用,在以上代码段中就是用于对 java.util.Date 类型进行转换为字符串,DF_FULLDateTool.DF_DAY 均是常量,它们的值分别为 yyyy-MM-dd HH:mm:ssyyyy-MM-dd

在这里插入图片描述

🎈通过观察监控可以发现,在凌晨三点执行定时任务清理 ES 历史数据期间,集群、索引查询 QPS 以及 CPU 利用率指标都明显飙升。因此,清理 ES 数据时一定要避开流量高峰期,避免在流量高峰期清理数据时造成资源实例宕机,造成生产事故。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/68998.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

git的简单介绍和使用

git学习 1. 概念git和svn的区别和优势1.1 区别1.2 git优势 2. git的三个状态和三个阶段2.1 三个状态&#xff1a;2.2 三个阶段&#xff1a; 3. 常用的git命令3.1 下面是最常用的命令3.2 git命令操作流程图如下&#xff1a; 4. 分支内容学习4.1 项目远程仓库4.2 项目本地仓库4.3…

IPv4编址及子网划分

IPv4编址及子网划分 一、IPv4地址概述1.1、IPv4报文结构1.2、IPv4地址分类1.2.1、A类1.2.2、B类1.2.3、C类1.2.4、D类1.2.5、E类 1.3、私有IP地址1.4、特殊地址 二、子网划分2.1、子网掩码2.2、VLSM 可变长的子网掩码2.3、子网划分2.4、子网划分示例2.4.1、子网划分案例 —— A…

【24择校指南】温州大学计算机考研考情分析

温州大学(C) 考研难度&#xff08;☆&#xff09; 内容&#xff1a;23考情概况&#xff08;拟录取和复试分数人数统计&#xff09;、院校概况、23专业目录、23复试详情、各科目以及各专业考情分析。 正文1349字&#xff0c;预计阅读&#xff1a;3分钟。 2023考情概况 温州…

IDEA新建类时自动设置类注释信息,署名和日期

IDEA设置路径 File --> Settings --> Editor --> File and Code Templates --> Include --> File Header 官方模板 这里 ${USER} 会读取计算机的用户名 ${DATE}是日期 ${TIME}是时间 /*** Author ${USER}* Date ${DATE} ${TIME}* Version 1.0*/

探索自动化网页交互的魔力:学习 Selenium 之旅【超详细】

"在当今数字化的世界中&#xff0c;网页自动化已经成为了不可或缺的技能。想象一下&#xff0c;您可以通过编写代码&#xff0c;让浏览器自动执行各种操作&#xff0c;从点击按钮到填写表单&#xff0c;从网页抓取数据到进行自动化测试。学习 Selenium&#xff0c;这一功能…

Small Tip: 如何Debug Start Routine

我也不知道咋地&#xff0c;在generated ABAP里面打断点进不去。 我也不晓得怎么弄&#xff0c;今天反正是硬找着去弄。不晓得有没有其他好办法。有知道的小伙伴评论下吧。 1、 在DTP里面选Before Transformation&#xff0c;要去debug start routine选这个就够了。其他的随意…

【Java可执行命令】(十七)JVM运行时信息动态维护工具 jinfo:一个维护 JVM 相关的配置参数和系统属性的工具,辅助故障排除、诊断和优化 ~

Java可执行命令之jinfo 1️⃣ 概念2️⃣ 优势和缺点3️⃣ 使用3.1 语法格式3.2 -flags&#xff1a;查看进程的启动参数3.3 -sysprops&#xff1a;查看进程的系统属性3.4 -flag < name>&#xff1a;查看特定虚拟机参数的值3.5 -flag [/-]< name>&#xff1a;启用或禁…

【在树莓派上安装cpolar内网穿透实战】

文章目录 前言1.在树莓派上安装cpolar2.查询cpolar版本号3.激活本地cpolar客户端4.cpolar记入配置文件 前言 树莓派作为一个超小型的电脑系统&#xff0c;虽然因其自身性能所限&#xff0c;无法如台式机或笔记本等准系统一样&#xff0c;运行大型软件或程序&#xff08;指望用…

VsCode美化 - VsCode自定义 - VsCode自定义背景图

VsCode美化 - VsCode自定义 - VsCode自定义背景图&#xff1a;添加二次元老婆图到VsCode 前言 作为一个二刺螈&#xff0c;VsCode用久了&#xff0c;总觉得少了些什么。是啊&#xff0c;高效的代码生产工具中怎么能没有老婆呢&#xff1f; 那就安装一个VsCode插件把老婆添加…

数学建模学习(9):模拟退火算法

模拟退火算法(Simulated Annealing, SA)的思想借 鉴于固体的退火原理&#xff0c;当固体的温度很高的时候&#xff0c;内能比 较大&#xff0c;固体的内部粒子处于快速无序运动&#xff0c;当温度慢慢降 低的过程中&#xff0c;固体的内能减小&#xff0c;粒子的慢慢趋于有序&a…

go重制版的海盗王gateserver网关服务端

海盗王原有的gateserver网关经常出现无故报错和掉地图的问题&#xff0c;经过反复修改都无法解决相关问题。 加上&#xff0c;原有的程序已经趋于古董级别&#xff0c;存在很大的兼容性问题。 以上&#xff0c;萌发了用go语言进行重新开发一个gateserver网关程序的想法&#xf…

sxs卡丢失数据如何找回?sxs卡数据丢失原因和修复办法分享!

说起sxs卡&#xff0c;你们是否有所了解呢&#xff1f;sxs卡具有很好的传输性能&#xff0c;能够存储照片和视频数据&#xff0c;主要被放置在索尼XDCAM EX型摄像机上。 而在使用sxs卡设备过程中&#xff0c;难免和其他设备一样&#xff0c;容易出现数据丢失情况。而如果丢失的…

ESP-IDF插件去除红色波浪线

如图&#xff0c;新装的ESP-IDF打开别人的工程有好多红色波浪线。 把这里的第一个文件夹删除&#xff0c;就是那个.vscode&#xff0c;接下来按ctrlshiftP&#xff0c;输入vscode&#xff0c; 选第一个&#xff0c;添加配置文件夹。 问题解决。 之后记得重新配置板子信息和串…

豪越HYDO智能运维助力智慧医院信息化建设

随着国家政策的推动与支持&#xff0c;医疗行业信息化应用不断普及&#xff0c;大数据、AI、医疗物联网等技术的应用&#xff0c;快速推动了电子病历、智慧服务、智慧管理的智慧医院建设和医院信息标准化建设&#xff0c;通过不断探索创新“智慧医院”服务模式&#xff0c;实现…

Java实现籍贯级联选择器

在工作中要求写一个籍贯的级联选择器&#xff0c;记录一下自己写这个级联选择器的过程&#xff0c;因为自己才刚开始工作&#xff0c;有很多地方都没有考虑的很清楚&#xff0c;希望各位大佬能给出建议。 一、需求 A:正常的23个省&#xff0c;籍贯由“省区/县/市”组成&#xf…

回归预测 | MATLAB实现POA-CNN-GRU鹈鹕算法优化卷积门控循环单元多输入单输出回归预测

回归预测 | MATLAB实现POA-CNN-GRU鹈鹕算法优化卷积门控循环单元多输入单输出回归预测 目录 回归预测 | MATLAB实现POA-CNN-GRU鹈鹕算法优化卷积门控循环单元多输入单输出回归预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 MATLAB实现POA-CNN-GRU鹈鹕算法优化卷积门…

Linux下 时间戳的转化

Linux下一般用date 记录当前时间&#xff0c;尤其是我们需要保存测试log的时候&#xff0c;或者设计一个跑多长时间的脚本都需要时间戳。下面看一下平时最常用的几种写法 1 date “%Y-%m-%d %H:%M” 显示具体时间 2 修改时间 date -s 3 date %s :当前时间的时间戳 显示具体时…

pycharm,VSCode 几个好用的插件

pycharm Tabnine AI Code 可以在编写程序的时候为你提供一些快捷方式&#xff0c;增加编程速度 Chinese 对英文不好的程序员来说是个不错的选择&#xff0c;可以将英文状态下的pycharm变为中文版的 ChatGPT 可以跟ai聊天&#xff0c;ai可以解决你80%的问题 &#xff0c;也可以帮…

机器学习笔记 - 基于C++的​​深度学习 二、实现卷积运算

一、卷积 卷积是信号处理领域的老朋友。最初的定义如下 在机器学习术语中: I(…)通常称为输入 K(…)作为内核,并且 F(…)作为给定K的I(x)的特征图。 虑多维离散域,我们可以将积分转换为以下求和 对于二维数字图像,我们可以将其重写为: <

鸿蒙开发学习笔记1——真机运行hello world

问题背景 学习任何语言和框架的第一步&#xff0c;永远都是跑通熟悉的“hello world”&#xff0c;本文将介绍鸿蒙开发如何跑通“hello world”。 问题分析 一、构建第一个ArkTS应用&#xff08;fa模型&#xff09; 说明&#xff1a;请使用DevEco Studio V3.0.0.601 Beta1及…