ES 万条以外分页检索功能实现及注意事项

背景

以 ES 存储日志,且需要对日志进行分页检索,当数据量过大时,就面临 ES 万条以外的数据检索问题,如何利用滚动检索实现这个需求呢?本文介绍 ES 分页检索万条以外的数据实现方法及注意事项。

需求分析

在这里插入图片描述
用 ES 存储数据,分页检索,当 ES 数据量过大时,在页面上直接点击最后一页时,怎么保证请求能正常返回?

常规思路就是,超过万条以后,使用滚动检索,但需要注意:编写滚动检索的分页查询时,滚动请求的 size 一定不能用页面分页参数的 pageSize ,要能快速滚动到目标页所在的数据,最好以 ES 最大检索窗口值。

算法要点

第一,滚动检索的 Request 请求不能包含 from 属性, 且设置了 size 参数后,以后的每次滚动返回的数据量都以 size 为主。

第二,滚动获取数据的 size 选取。 滚动分页检索高效的关键是不能以页面分页参数 pageSize 作为滚动请求的 size ,而是以一个较大的数,或者直接以 ES 默认的滚动窗口最大值 10000 作为每批次获取的数据量。

第三,计算目标页的数据所在的位置。

  1. 根据分页参数计算出目标数据的位置是 [(pageSize-1)*pageSize, pageSize * pageNo] ,为了拿到目标页的数据,总共的数据量 total = pageNo * pageSize
  2. 目标数据在最终数据中的真正范围决定因素:mode = total % 10000
  3. 计算滚动请求几次能拿到目标数据。实际需要滚动请求的次数 scrollCount = mode == 0 ? total/ esWindowCount : (total/ esWindowCount + 1)
  4. 目标页的数据有没有分布在两次请求中。当 10000 % pageSize !=0 时,说明这一页的数据会横跨两次 ES 请求。例如 pageSize =15,pageNo = 2667,total = 40005,目标页的数据包含在最后两次请求中,倒数第二次请求中有 10 条数据,最后一次请求中有 5 条数据,合起来才是一整页的 15 条数据。
  5. 最后一页数据不足 pageSize 时,最后一页数据真正的长度。

第四,分页数据所在范围处理。 当最后一批次获取到数据后,从中摘出目标页的数据时,需要考虑的四种情况,主要是 mode 和最终获取的数据总长度直接的关系:

在这里插入图片描述
case 1:上图左,mode=0 时存在最后一页不足 size 的情况,realSize = size - (windowSize-length)

case 2:上图右,length < mode 时,最后一页不足 size 的情况,realSize = size - (mode -length)

最终的数据区间是 [from,to ] = [ length -realSize,length -1 ]
数据总长度 = end -start +1 = realSize
在这里插入图片描述
case 3 :上图左,分页数据在 mode 往前推 size 条。
case 4:上图右,分页数据横跨两次请求,两批数据组合成一页数据。

编码实现

编写 ES 滚动分页检索请求,处理超过万条之外的查询操作:

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.*;

@Slf4j
public class EsPageUtil {
    /**
     * 真正的 ES 连接对象
     */
    private RestHighLevelClient client;

    public void initClient() {
        // TODO 初始化 client 对象
    }

    /**
     * 使用 DSL JSON 配置创建检索请求 Builder
     * @param queryJson
     * @return
     */
    public SearchSourceBuilder createSearchSource(String queryJson) {
        if (StringUtils.isEmpty(queryJson)) {
            log.error("ElasticSearch dsl config is empty.");
            return null;
        }

        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        try {
            SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
            NamedXContentRegistry registry = new NamedXContentRegistry(searchModule.getNamedXContents());
            XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(registry, LoggingDeprecationHandler.INSTANCE, queryJson);
            searchSourceBuilder.parseXContent(parser);
            return searchSourceBuilder;
        } catch (Exception e) {
            log.error("Parse dsl error.", e);
            return null;
        }
    }

    /**
     * ES 分页查询:区分万条以内还是万条以外
     * @param pageSize  分页size
     * @param pageNo    查询页数
     * @param indices   目标索引
     * @param queryJson 查询 DSL JSON 格式字符串
     * @return
     */
    public Map<String, Object> queryByPage(int pageSize, int pageNo, String[] indices, String queryJson) {
        SearchSourceBuilder searchSourceBuilder = createSearchSource(queryJson);
        if (searchSourceBuilder == null) {
            return null;
        }

        // 创建请求对象
        SearchRequest searchRequest = new SearchRequest(indices).source(searchSourceBuilder);

        Map<String, Object> result = new HashMap<>();
        List<Map<String, Object>> data = null;
        int total = pageSize * pageNo ;
        int maxEsWindow = 10000;

        try {
            if (total <= 10000) {
                // 万条以内,直接查询:设置 from , size 属性
                searchSourceBuilder .from((pageNo - 1) * pageSize) .size(pageSize);

                SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
                data =  parseResponseToListData(response);
            } else {
                // 万条以外,以 ES 最大窗口值查询:只设置size 属性
                searchSourceBuilder.size(maxEsWindow);
                data = scrollQuery(maxEsWindow, pageSize, total, searchRequest);
            }
        } catch (IOException e) {
            log.error("ElasticSearch query error.", e);
        }

        result.put("total" , 0);
        result.put("data" , data);
        return result;
    }

    /**
     * 滚动查询
     *
     * @param esWindowCount
     * @param pageSize
     * @param total
     * @param searchRequest
     * @return
     */
    private List scrollQuery(int esWindowCount, int pageSize, int total , SearchRequest searchRequest) {
        List pageData = new ArrayList(pageSize);

        //创建滚动,指定滚动查询保持的时间
        final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(10L));

        //添加滚动
        searchRequest.scroll(scroll);

        //提交第一次请求
        SearchResponse searchResponse = null;
        String scrollId = null;
        try {
            searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
            //获取滚动查询id
            scrollId = searchResponse.getScrollId();
        } catch (IOException e) {
            log.error("Elasticsearch request error.", e);
            return pageData;
        }

        int counter = 2;
        int mode = total % esWindowCount;
        int realPageCount = mode == 0 ? total/ esWindowCount : (total/ esWindowCount + 1);

        while (counter <= realPageCount) {
            // 设置滚动查询id,从id开始继续向下查询
            SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);

            // 重置查询时间,若不进行重置,则在提交的第一次请求中设置的时间结束,滚动查询将失效
            scrollRequest.scroll(scroll);

            // 提交请求,获取结果
            try {
                searchResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT);
            } catch (IOException e) {
                log.error("Elasticsearch scroll request error.", e);
            }

            // size 非 10 的整数,则当前页数据横跨两个 Scroll 请求
            if (mode != 0 && mode < pageSize && counter == (realPageCount -1)) {
                collectFirstPart(searchResponse, pageData, mode, pageSize);
            }

            // 更新滚动查询id
            scrollId = searchResponse.getScrollId();
            counter++;
        }

        // 收集最后一次响应结果中的数据
        collectPageData(searchResponse, pageData, mode, pageSize, esWindowCount);

        //  滚动查询结束时,清除滚动
        ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
        clearScrollRequest.addScrollId(scrollId);
        try {
            client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.error("Elasticsearch clear scroll info error.", e);
        }

        return pageData;
    }

    /**
     * @param searchResponse
     * @param mode
     * @param size
     * @return
     */
    public void collectFirstPart(SearchResponse searchResponse, List<Map<String, Object>> firstPartData, int mode, int size) {
        int firstPartCount = size - mode;

        // 只截取响应结果中的 结尾 size - mode 部分的内容
        SearchHits hits = searchResponse.getHits();
        SearchHit[] dataList = hits.getHits();

        int from = dataList.length - firstPartCount;
        for (int i = from; i < dataList.length; i++) {
            firstPartData.add(dataList[i].getSourceAsMap());
        }

        log.info("Mode less than size, first part data is here {} .", firstPartCount);
    }

    /**
     * 滚动到最后一组数据中包含目标页的数据,从中摘出来
     * @param searchResponse
     * @param mode
     * @param size
     * @param esWindowCount
     * @return
     */
    public void collectPageData(SearchResponse searchResponse, List<Map<String, Object>> pageData, int mode, int size, int esWindowCount) {
        SearchHits hits = searchResponse.getHits();
        SearchHit[] dataList = hits.getHits();
        int from = 0;
        int length = dataList.length;
        if (mode == 0) { // 刚好在万条结尾
            // 不够一页
            if (length < esWindowCount) {
                int realSize = size - (esWindowCount - length);
                from = (length - realSize ) >= 0 ? (length - realSize ) : 0;
            } else {// 总长够一页
                from = length == esWindowCount ? (length - size) : 0;
            }
        } else if (length < mode){ // 最后一页且总长不足 size
            int realSize = size - (mode - length);
            from = (length - realSize) >= 0 ? (length - realSize) : 0;
        } else if (mode > size){ // 中间部分
            from = (mode - size) >= 0 ? (mode -size) : 0;
        } else  { // mode < size ,说明是一页数据的下半部分
            from = 0;
            size = mode;
            log.info("Page data is across two request ,this response has {} .", mode);
        }

        // 收集目标数据
        for (int i = from; i< from + size && i < length; i++) {
            pageData.add(dataList[i].getSourceAsMap());
        }
    }

    /**
     * 解析 ES 响应结果为数据集合
     * @param response
     * @return
     */
    public static List<Map<String, Object>> parseResponseToListData(SearchResponse response){
        List<Map<String, Object>> listData = new ArrayList<>();
        if (response == null) {
            return listData;
        }

        // 遍历响应结果
        SearchHits hits = response.getHits();
        SearchHit[] hitArray = hits.getHits();
        listData = new ArrayList<>(hitArray.length);
        for (SearchHit hit : hitArray) {
            Map<String, Object> sourceAsMap = hit.getSourceAsMap();
            listData.add(sourceAsMap);
        }

        // 返回结果
        return listData;
    }
}

启示录

滚动查询时优化了 size 用一万,相比用页面的分页参数 pageSize ,可以解决数据量过大时,直接从页面点击最后一页导致页面卡死长时间无响应的问题。

页面分页参数最大不过 100,当总数量几百万、pageSize=10,分页跳转查询后面某页 如 3000 时,ES 的滚动请求次数 是 3000 次,而优化后滚动请求 3次,第三次中的一万条数据的最后10条即本页的数据。

话说回来,ES 数据量过大时,用分页查询靠后的数据时,也没多大的价值了,列表宽泛条件查询结果过大时,谁看得过来呢?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/182850.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

GPU服务器常见故障修复记录

日常写代码写方案文档&#xff0c;偶尔遇上服务器出现问题的时候&#xff0c;也需要充当一把运维工程师&#xff0c;此帖用来记录GPU服务器报错的一些解决方案&#xff0c;仅供参考&#xff01; 文章目录 一、服务器简介二、机箱拆解三、基本操作四、常见故障4.1 电源开关键闪烁…

HBuilderX前端软件社区+Thinkphp后端源码

HBuilderX前端软件社区thinkphp后端源码&#xff0c;搭建好后台在前端找到 util 这个文件把两个js文件上面的填上自己的域名&#xff0c;登录HBuilderX账号没有账号就注册账号然后上传文件即可。打包选择发行 可以打包app或h5等等 后端设置运行目录为public(重要)&#xff0c;…

解决:ImportError: cannot import name ‘Adam‘ from ‘keras.optimizers‘

解决&#xff1a;ImportError: cannot import name ‘Adam‘ from ‘keras.optimizers‘ 背景 在使用之前的代码时&#xff0c;报错&#xff1a; from keras.optimizers import Adam ImportError: cannot import name ‘Adam’ 报错问题 from keras.optimizers import Adam I…

Unity调用dll踩坑记

请用写一段代码&#xff0c;让unity无声无息的崩溃。 你说这怕是有点难哦&#xff0c;谁会这么不幸呢&#xff1f;不幸的是&#xff0c;我幸运的成为了那个不幸的人。 unity里面调用dll的方式是使用 DllImport &#xff0c;比如有一个 Hello.dll&#xff0c;里面有一个 char* …

计算机网络之应用层

一、概述 引入目的&#xff1a; 为了方便用户去使用&#xff1b; 该如何方便用户使用网络呢&#xff0c;即怎样帮助用户使用网络&#xff1f; 1.用户需要知道网络资源所在的位置 2.网络上资源一定是在资源子网的主机上 3.资源子网上的主机&#xff0c;在通信子网中用IP地…

Android设计模式--装饰模式

千淘万漉虽辛苦&#xff0c;吹尽黄沙始到金 一&#xff0c;定义 动态地给一个对象添加一些额外的职责。就增加功能来说&#xff0c;装饰模式相比生成子类更为灵活。 装饰模式也叫包装模式&#xff0c;结构型设计模式之一&#xff0c;其使用一种对客户端透明的方式来动态地扩展…

基于SpringBoot+Vue的电子产品销售管理系统

基于SpringBootVue的电子产品销售管理系统的设计与实现~ 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringBootMyBatisVue工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 主页 购物车 管理员界面 摘要 基于Spring Boot和Vue的电子产品销售管…

如何开启MySQL的慢查询日志

说明&#xff1a;如果需要查看某一条SQL查询速度慢&#xff0c;并对慢的SQL进行优化&#xff0c;那么开启MySQL慢查询日志是一定要做的事情&#xff0c;本文介绍如何开启MySQL的慢查询日志&#xff1b; 查看MySQL慢查询是否开启 首先&#xff0c;输入下面的命令&#xff0c;查…

再添千万级罚单,某银行年内罚款过亿!金融行业合规问题亟待解决

11月17日晚间&#xff0c;国家金融监管总局上海监管局披露行政处罚信息显示&#xff0c;某银行因32项违法违规事实收到两张690万元的大额罚单&#xff0c;合计罚款金额达1380万元。但这并不是银行该今年收到的第一张大额罚单。今年4月28日&#xff0c;该行因在结售汇、外币理财…

Okhttp 浅析

安全的连接 OkHttpClient: OkHttpClient: 1.线程调度 2.连接池,有则复用,没有就创建 3.interceptor 4.interceptor 5.监听工厂 6.是否失败重试 7.自动修正访问,如果没有权限或认证 8是否重定向 followRedirects 9.协议切换时候是否继续重定向 10.Cookie jar 容器 默认…

Electron+VUE3开发简版的编辑器【文件预览】

简版编辑器的功能主要是: 打开对话框,选择文件后台读取文件文件前端展示文件内容。主要技术栈是VUE3、Electron和Nodejs,VUE3做页面交互,Electron提供一个可执行Nodejs的环境以及支撑整个应用的环境,nodeJS负责读取文件内容。 环境配置、安装依赖这些步骤就不再叙述了。 …

PHP众筹系统源码+支持报名众筹+商品众筹+无偿众筹+市面上所有的众筹模式 附带完整的搭建教程

大家好啊&#xff0c;罗峰今天来给大家分好用的源码系统了。今天要给大家分享的是一款PHP众筹系统源码。众筹作为一种新型的融资方式&#xff0c;逐渐在市场上占据了重要的地位。从公益众筹到商品众筹&#xff0c;再到股权众筹&#xff0c;各种众筹模式层出不穷。然而&#xff…

Go lumberjack 日志轮换和管理

在开发应用程序时&#xff0c;记录日志是一项关键的任务&#xff0c;以便在应用程序运行时追踪问题、监视性能和保留审计记录。Go 语言提供了灵活且强大的日志记录功能&#xff0c;可以通过多种方式配置和使用。其中一个常用的日志记录库是 github.com/natefinch/lumberjack&am…

Proteus下仿真AT89C51报“串行口通信失败,请检查电平适配是否正确。”解决办法

在Proteus下进行AT89C51串行口仿真时&#xff0c;如果遇到“串行口通信失败&#xff0c;请检查电平适配是否正确”的错误提示&#xff0c;以下是一些解决办法&#xff1a; 1. 了解AT89C51和外部设备的电平要求&#xff1a; 首先&#xff0c;了解AT89C51和外部设备之间的电平…

【数据结构(C语言)】浅谈栈和队列

目录 文章目录 前言 一、栈 1.1 栈的概念及结构 1.2 栈的实现 1.2.1. 支持动态增长的栈的结构 1.2.2 初始化栈 1.2.3 入栈 1.2.4 出栈 1.2.5 获取栈顶元素 1.2.6 获取栈中有效元素个数 1.2.7 检查栈是否为空 1.2.8 销毁栈 二、队列 2.1 队列的概念及结构 2.2 队…

[BJDCTF2020]The mystery of ip1

提示 ssti模板注入head头x-forwarded-for 每一次做题的最开始流程都大致因该是 信息收集找可以操控的地方 查看hint页面的源代码又发现它提示说 ####你知道为什么会知道你的ip吗 查看flag页面 从刚才给我的提示以及他这里显示的我的ip&#xff0c;大概找到了我可操作的可控点 …

Spark---基于Yarn模式提交任务

Yarn模式两种提交任务方式 一、yarn-client提交任务方式 1、提交命令 ./spark-submit --master yarn --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100 或者 ./spark-submit --master yarn–client --class org.apache.s…

学习.NET验证模块FluentValidation的基本用法(续1:其它常见用法)

FluentValidation模块支持链式验证方法调用&#xff0c;也就是说&#xff0c;除了 RuleFor(r > r.UserName).NotEmpty()调用方式之外&#xff0c;还可以将对单个属性的多种验证函数以链式调用方式串接起来&#xff0c;比如UserName属性不能为空&#xff0c;长度在5~10之间&a…

北京数字孪生赋能工业制造,加速推进制造业数字化转型

随着新一代信息技术与实体经济深度融合进程的加快&#xff0c;企业数字化转型需求的提升&#xff0c;政策的持续支持&#xff0c;数字孪生将为工业制造、未来生活带来无限的可能。在制造业数字化大变革时代&#xff0c;以5G、大数据、物联网、人工智能等为代表的工业4.0&#x…

职场Excel:求和家族,不简单

说到excel函数&#xff0c;很多人第一时间想到的就是求和函数sum。作为excel入门级函数&#xff0c;sum的确是小白级的&#xff0c;以至于很多人对求和函数有点“误解”&#xff0c;觉得求和函数太简单了。 但是&#xff0c;你可能不知道&#xff0c;sum只是excel求和家族里的一…