Java 中使用 ES 高级客户端库 RestHighLevelClient 清理百万级规模历史数据

🎉工作中遇到这样一个需求场景:由于ES数据库中历史数据过多,占用太多的磁盘空间,需要定期地进行清理,在一定程度上可以释放磁盘空间,减轻磁盘空间压力。

🎈在经过调研之后发现,某服务项目每周产生的数据量已经达到千万级别,单日将近能产生两百万的数据量写入到 ES 数据库中,平均每个小时最少产生 10w+ 条数据,加上之前的历史数据,目前生产环境 ES 数据量已经达到两亿一千四百八十万的数据。并且随着当前业务量的爆发式增长,数据增长量急剧飙升,在未来一年内每周产生的数据量有望达到 3kw-5kw 左右。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

💡因此,对 ES 数据库中历史数据进行清理势在必行,为了能够释放磁盘空间,并且还要保证业务方能够进行日常问题的排查定位,决定从两个月前的数据开始清理,方案如下:

  • 编写定时任务,每天凌晨三点清理两个月前的那一天数据,之所以选择凌晨三点是因为在 Grafana 查看了生产环境的集群监控情况,凌晨两点至四点之间的集群、索引的查询以及写入 QPS 都比较低。

在这里插入图片描述

  • 清理一天的数据时,根据时间段进行清理,每个小时清理一次,避免内存中存放太多的数据,导致内存溢出。
  • 清理 ES 数据时,需要先查询出数据,而 ES 默认最多只能查询 1w 条数据,如果当次需要删除的数据量超过 1w 条,普通的查询操作无法完全删除数据。因此,需要采用滚动查询的方式,滚动查询结果保持时间需要设置合理,不能太长,否则也可能会导致内存溢出。

根据以上的思路方案,设计的定时清理ES历史数据代码如下:

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

/**

  • 清理ES历史数据定时任务
    */
    @Component
    public class CleanESHistoryDataTask {

    private static final Logger LOGGER = LoggerFactory.getLogger(CleanESHistoryDataTask.class);

    @Resource
    private RestHighLevelClient restHighLevelClient;

    /**

    • 根据索引名称删除当前日期两个月前的那一天的历史文档数据
    • @param jobContext
      */
      @Scheduled
      public void cleanESHistoryData(JobContext jobContext) {
      // jobContext为定时任务中回传数据
      String indexName = jobContext.getData();
      if (StringUtils.isBlank(indexName)) {
      LOGGER.warn(“ES索引名称不能为空!”);
      return;
      }
      long startTimeMillis = System.currentTimeMillis();
      String twoMonthsAgoDate = DateTool.format(DateUtils.addMonths(new Date(), -1), DateTool.DF_DAY);
      try {
      String startTimeStr = twoMonthsAgoDate + " 00:00:00";
      // 初始化时间,形如2023-08-06 00:00:00
      Date initialStartTime = DateTool.parse(startTimeStr, DF_FULL);
      // 每次循环清理一个小时历史文档数据,循环24次清理完一天的历史文档数据
      for (int i = 0; i < 24; i++) {
      Date startTime = initialStartTime;
      startTime = DateUtils.addHours(startTime, i);
      Date endTime = DateUtils.addHours(startTime, 1);
      LOGGER.info(“正在清理索引:[{}],时间:{} 至 {}的历史文档数据…”, indexName, DateTool.format(startTime, DF_FULL), DateTool.format(endTime, DF_FULL));
      long currentStartTimeMillis = System.currentTimeMillis();
      // 指定操作的索引库
      SearchRequest searchRequest = new SearchRequest(indexName);
      // 构造查询条件,指定查询的时间范围,每次最多写入1000条数据至内存,减轻服务器内存压力
      SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.rangeQuery(“createTimeStr.keyword”)
      .from(DateTool.format(startTime, DF_FULL))
      .to(DateTool.format(endTime, DF_FULL)))
      .size(1000);
      // 设置滚动查询结果在内存中的过期时间为1min
      Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
      // 将滚动以及构造的查询条件放入查询请求
      searchRequest.scroll(scroll).source(searchSourceBuilder);
      SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
      // 记录要滚动的ID
      String scrollId = searchResponse.getScrollId();
      SearchHit[] hits = searchResponse.getHits().getHits();
      while (hits != null && hits.length > 0) {
      // 创建批量处理请求对象
      BulkRequest bulkRequest = new BulkRequest();
      for (SearchHit hit : hits) {
      DeleteRequest deleteRequest = new DeleteRequest(indexName, hit.getId());
      bulkRequest.add(deleteRequest);
      }
      // 执行批量删除请求操作
      restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
      // 构造滚动查询条件,继续滚动查询
      SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
      scrollRequest.scroll(scroll);
      searchResponse = restHighLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);
      scrollId = searchResponse.getScrollId();
      hits = searchResponse.getHits().getHits();
      }
      // 当前滚动查询结束,清除滚动,释放服务器内存资源
      ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
      clearScrollRequest.addScrollId(scrollId);
      restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
      LOGGER.info(“清理索引:[{}],时间:{} 至 {}的历史文档数据成功,耗时{}ms”, indexName, DateTool.format(startTime, DF_FULL), DateTool.format(endTime, DF_FULL), (System.currentTimeMillis() - currentStartTimeMillis));
      }
      LOGGER.info(“[cleanESHistoryData] 定时任务-清理索引:[{}],时间:{}的历史文档数据成功,耗时{}ms”, indexName, twoMonthsAgoDate, (System.currentTimeMillis() - startTimeMillis));
      } catch (Exception e) {
      LOGGER.error(String.format(“[cleanESHistoryData] 定时任务-清理索引:[{}],时间:{}的历史文档数据失败,耗时{}ms”, indexName, twoMonthsAgoDate, (System.currentTimeMillis() - startTimeMillis)), e);
      }
      }
      }

其中,需要注意以下几点

  • 在 Java 中对 ES 进行操作,这里使用的是 ES 的高级客户端组件 RestHighLevelClient
  • @Scheduled 注解为自研定时任务工具注解,外界无法使用,在使用定时任务时需要自己选择合适的定时任务框架。
  • DateTool 工具类为自研工具类,外界同样无法使用,在以上代码段中就是用于对 java.util.Date 类型进行转换为字符串,DF_FULLDateTool.DF_DAY 均是常量,它们的值分别为 yyyy-MM-dd HH:mm:ssyyyy-MM-dd

在这里插入图片描述

🎈通过观察监控可以发现,在凌晨三点执行定时任务清理 ES 历史数据期间,集群、索引查询 QPS 以及 CPU 利用率指标都明显飙升。因此,清理 ES 数据时一定要避开流量高峰期,避免在流量高峰期清理数据时造成资源实例宕机,造成生产事故。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/90557.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python学习笔记_实战篇(二)_django多条件筛选搜索

多条件搜索在很多网站上都有用到&#xff0c;比如京东&#xff0c;淘宝&#xff0c;51cto&#xff0c;等等好多购物教育网站上都有&#xff0c;当然网上也有很多开源的比楼主写的好的多了去了&#xff0c;仅供参考&#xff0c;哈哈 先来一张效果图吧&#xff0c;不然幻想不出来…

【C++】—— c++11新的类功能

目录 &#xff08;一&#xff09;默认成员函数 1、 移动构造函数 2、代码辅助理解 3、移动赋值运算符重载 &#xff08;二&#xff09;default关键字 &#xff08;三&#xff09;delete关键字 &#xff08;四&#xff09;委托构造函数 1、优势 2、缺点 总结 &#x…

基于开源IM即时通讯框架MobileIMSDK:RainbowChat-iOS端v7.0版已发布

关于MobileIMSDK MobileIMSDK 是一套专门为移动端开发的开源IM即时通讯框架&#xff0c;超轻量级、高度提炼&#xff0c;一套API优雅支持 UDP 、TCP 、WebSocket 三种协议&#xff0c;支持 iOS、Android、H5、标准Java、小程序、Uniapp&#xff0c;服务端基于Netty编写。 工程…

Multisim软件安装包分享(附安装教程)

目录 一、软件简介 二、软件下载 一、软件简介 Multisim软件是一款电路仿真和设计软件&#xff0c;由美国国家仪器公司&#xff08;National Instruments&#xff09;开发。它提供了一个交互式的图形界面&#xff0c;使用户能够轻松地构建和仿真电路。以下是Multisim软件的详…

攻防世界-embarrass

原题 解题思路 搜索flag&#xff0c;结果搜不到。 换到kali里看。

【多模态】26、视觉-文本多模态任务超详细介绍 「CLIP/LSeg/ViLD/GLIP/ALBEF/BLIP/CoCa/BEIT」

文章目录 准备知识一、CLIP&#xff1a;不同模态简单对比的方法更适合于图文检索1.1 CLIP 在分割上的改进工作1.1.1 LSeg1.1.2 Group ViT 1.2 CLIP 在目标检测上的改进工作1.2.1 ViLD1.2.2 GLIPv11.2.3 GLIPv2 二、ViLT/ALBEF &#xff1a;多模态融合在 VQA/VR 任务中更重要三、…

mysql 命令行 执行sql文件

方法1 source source file.sql; file.sql : 绝对路径或 相对路径。 方法2 mysql -u xxx -p < file.sql 方法3 MySQLImport 工具 mysqlimport [options] database file_name 其中&#xff0c;database为要导入数据的数据库名&#xff0c;file_name为要导入的SQL文件名。还可以…

后端开发有哪几种语言? - 易智编译EaseEditing

后端开发是构建应用程序的一部分&#xff0c;负责处理服务器端的逻辑、数据库交互和数据处理。有许多编程语言可用于后端开发&#xff0c;以下是一些常见的后端开发语言&#xff1a; Java&#xff1a; Java是一种广泛使用的面向对象编程语言&#xff0c;具有强大的跨平台能力。…

(纯c)数据结构之------>链表(详解)

目录 一. 链表的定义 1.链表的结构. 2.为啥要存在链表及链表的优势. 二. 无头单向链表的常用接口 1.头插\尾插 2.头删\尾删 3.销毁链表/打印链表 4.在pos位置后插入一个值 5.消除pos位置后的值 6.查找链表中的值并且返回它的地址 7.创建一个动态开辟的结点 三.顺序表与链表…

MySQL事务的隔离级别

前置阅读 快速搭建 Linux 学习平台 一、事务的特性 对于事务&#xff0c;我觉得有一句英文描述的非常贴切&#xff1a;All or not, now or never. 事务 &#xff08;Transaction&#xff09;可以说是关系型数据库最重要的特性了。SQL 事务就是一个或者多个 SQL 语句的集合&a…

响应式布局bootstrap使用

响应式布局 学习目标 能够说出响应式原理 能够使媒体查询完成响应式导航 能够使用Bootstrap的栅格系统 能够使用bootstrap的响应式工具 1.响应式原理 1.1响应式开发原理 就是使用媒体查询针对不同宽度的设备进行布局和样式的设置,从而适配不同设备的目的 1.2响应式布局容器…

顺序表之初

欢迎来到我的&#xff1a;世界 希望作者的文章对你有所帮助&#xff0c;有不足的地方还请指正&#xff0c;大家一起学习交流 ! 目录 线性表简介顺序表定义动态顺序表的初始化尾插头插Cheak 判断是否增容尾删&#xff1a;头删&#xff1a;打印在pos位置前插入x删除pos位置的值查…

centos7搭建apache作为文件站后,其他人无法访问解决办法

在公司内网的一个虚拟机上搭建了httpsd服务&#xff0c;准备作为内部小伙伴们的文件站&#xff0c;但是搭建好之后发现别的小伙伴是无法访问我机器的。 于是寻找一下原因&#xff0c;排查步骤如下&#xff1a; 1.netstat -lnp 和 ps aux 先看下端口和 服务情况 发现均正常 2.…

Linux(基础篇一)

Linux基础篇 Linux基础篇一1. Linux文件系统与目录结构1.1 Linux文件系统1.2 Linux目录结构 2. VI/VIM编辑器2.1 vi/vim是什么2.2 模式间的转换2.3 一般模式2.4 插入模式2.4.1 进入编辑模式2.4.2 退出编辑模式 2.5 命令模式 3. 网络配置3.1 网络连接模式3.2 修改静态ip3.3 配置…

python中的matplotlib画直方图(数据分析与可视化)

python中的matplotlib画直方图&#xff08;数据分析与可视化&#xff09; import numpy as np import pandas as pd import matplotlib.pyplot as pltpd.set_option("max_columns",None) plt.rcParams[font.sans-serif][SimHei] plt.rcParams[axes.unicode_minus]Fa…

Linux TCP协议——三次握手,四次挥手

一、TCP协议介绍 TCP协议是可靠的、面向连接的、基于字节流的传输层通信协议。 TCP的头部结构&#xff1a; 源/目的端口号: 表示数据是从哪个进程来, 到哪个进程去;&#xff08;tcp是传输层的协议&#xff0c;端与端之间的数据传输&#xff0c;在TCP和UDP协议当中不会体现出I…

[Linux]命令行参数和进程优先级

[Linux]命令行参数和进程优先级 文章目录 [Linux]命令行参数和进程优先级命令行参数命令行参数的概念命令函参数的接收编写代码验证 进程优先级进程优先级的概念PRI and NI使用top指令修改nice值 命令行参数 命令行参数的概念 命令行参数是指用于运行程序时在命令行输入的参数…

软件设计师学习笔记3-CPU组成

目录 1.计算机结构 1.1计算机的外设与主机 1.2计算机各部分之间的联系(了解一下即可) 2.CPU结构 1.计算机结构 1.1计算机的外设与主机 1.2计算机各部分之间的联系(了解一下即可) 该图片来自希赛软考 注&#xff1a;黄色的是传递数据的数据总线&#xff0c;白色的是传递控…

AI新时代,英特尔如何加强产学研融合?

人工智能作为当前数字经济发展的核心驱动力&#xff0c;我们在关注AI技术发展之际&#xff0c;为发挥AI强大助力&#xff0c;更需进一步思考AI的科研、产业应用与人才培育的工作&#xff0c;推动产学研融合创新。 正如英特尔公司高级副总裁、英特尔中国区董事长王锐在刚结束的…

【C++】C/C++内存管理-new、delete

文章目录 一、C/C内存分布二、C/C中动态内存管理方式2.1 C语言中动态内存管理方式2.2 C内存管理方式 三、operator new和operator delete函数3.1 operator new和operator delete函数3.2 operator new与operator delete的类专属重载&#xff08;了解&#xff09; 四、new和delet…