快速掌握Elasticsearch检索之二:滚动查询(scrool)获取全量数据(golang)

Elasticsearch8.17.0在mac上的安装

Kibana8.17.0在mac上的安装

Elasticsearch检索方案之一:使用from+size实现分页

1、滚动查询的使用场景

滚动查询区别于上一篇文章介绍的使用from、size分页检索,最大的特点是,它能够检索超过10000条外的所有文档,可以理解为是一种全量检索的技术方案,也正是因为这种特性,使得滚动查询的代价非常高昂,检索过程消耗大量的内存,所以对于实时检索的场景,滚动查询是不适用的。

那滚动查询使用在什么场景呢?主要是应用在离线、检索全量数据,对于实时性要求不高的场景,比如一个数据平台,前台页面展示的数据用来预览,可以使用from+size分页查询,以提升检索效率以及平台的用户体验,如果还需要检索全量数据用于二次使用,那么后台离线检索全量就需要使用滚动查询以获取到全量数据,这将是一个耗费大量资源和时间的过程。

2、使用Kibana直观体验滚动查询

初始化滚动查询:

GET /new_tag_202411/_search?scroll=1m
{
  "size": 10,
  "sort":[
    {
      "doc_id":{
        "order": "asc"
      }
    }
  ]
}

检索条件设置返回2条数据,按【doc_id】字段升序排列,doc_id分别为1-10的文档。

scroll=1m,表示Elasticsearch允许等待的最长时间是1分钟,如果在一分钟之内,接下来的 scroll 请求没有到达的话,那么当前请求的上下文将会失效:

 从上图返回可以看出,有一个【_scroll_id】字段,这个字段非常重要,接下来的滚动查询需要使用这个字段:

第一次滚动,返回doc_id从11开始的数据,第二次滚动时,需要使用第一次滚动返回的【_scroll_id】替换滚动请求,数据从doc_id为21的数据开始返回,之后循环这个过程,直到检索到全部数据。

注意一点,在测试过程中,我创建了多次滚动查询,发现scrool_id特别像,大家别误以为scrool_id没变,比如以下三个scrool_id,每个id只有3个字符不一样:

FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFng3akdDTWthVFZLVTE0ODhLdGdaR1EAAAAAAAAWbhZZZEloTnlyU1FGaTgxQV9QR1pXTUdR

FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFng3akdDTWthVFZLVTE0ODhLdGdaR1EAAAAAAAActhZZZEloTnlyU1FGaTgxQV9QR1pXTUdR

FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFng3akdDTWthVFZLVTE0ODhLdGdaR1EAAAAAAAAjDxZZZEloTnlyU1FGaTgxQV9QR1pXTUdR

3、代码实现滚动查询(golang)

首先是初始化一个滚动查询:

res, err := client.Search(
	client.Search.WithIndex("new_tag_202411"),
	client.Search.WithBody(strings.NewReader(dslQuery.BuildJson())),
	client.Search.WithScroll(time.Minute*1),
)

这行代码:

client.Search.WithScroll(time.Minute*1)

就是在设置滚动查询上下文的有效时间,其他几行很容易理解。

这几行代码执行完成后,除了能拿到检索数据,还能拿到scroll_id。之后就可以进行滚动查询:

for {
	docs = Documents{}
	res, err = client.Scroll(
		client.Scroll.WithScrollID(scrollId),
		client.Scroll.WithScroll(time.Minute),
	)
	if err != nil {
		fmt.Println("scroll err:", err.Error())
		return
	}

	err = json.NewDecoder(res.Body).Decode(&docs)
	if err != nil {
		fmt.Println("json decode err:", err)
		return
	}
	if len(docs.Hits.Hits) == 0 {
		break
	}
	fmt.Println("search count:", len(docs.Hits.Hits))
	scrollId = docs.ScrollID
}

这里要注意的一点是,循环滚动时,每个轮次,必须更新scrool_id为上一次滚动返回的值,如上面最后一行代码。

L17-L19行的代码,表示已经查出所有数据,本次没有数据了,同时循环结束。

4、一个必须要考虑的问题

对于滚动查询,前面也说过,会创建一个上下文,当es中存在的上下文数量超过一定限制后,将无法再次创建滚动查询,从而无法检索数据,这个【限制】es默认是500个,我们可以通过es的api查看当前系统中已经创建的上下文数量:

GET /_nodes/stats/indices/search

默认情况下,只要【open_contexts】值小于500,都能正常进行滚动查询,如果已经创建了500个上下文,就会出现问题,下面测试一下,利用代码,创建500个上下文:

 如上图,上下文已经创建500个,运行代码,再次执行滚动查询的动作:

无法查出任何数据,但是以下代码也无任何的报错:

res, err := client.Search(
	client.Search.WithIndex("new_tag_202411"),
	client.Search.WithBody(strings.NewReader(dslQuery.BuildJson())),
	client.Search.WithScroll(time.Minute*100),
)
if err != nil {
	fmt.Println("search err:", err.Error())
	return
}

没有走到err分支,经过调试发现,res的结构中的http状态码变了,我们加一行打印:

res, err := client.Search(
        client.Search.WithIndex("new_tag_202411"),
        client.Search.WithBody(strings.NewReader(dslQuery.BuildJson())),
        client.Search.WithScroll(time.Minute*100),
    )
    if err != nil {
        fmt.Println("search err:", err.Error())
        return
    }
    fmt.Println("resp code:", res.StatusCode)
    err = json.NewDecoder(res.Body).Decode(&docs)
    if err != nil {
        fmt.Println("decode err:", err.Error())
        return
    }

 运行结果如下:

状态码由正常值0变成了429,所以,在执行滚动查询时,我们需要加上对状态码的判断,以捕获到上下文超限的情况,否则没有检索到数据,还以为系统出bug了呢。

这个问题就是滚动查询的一个短板,系统用户量大了,发起滚动查询一旦超过500,就会影响用户检索数据,当然了,es还是有其他解决方案来进行全量的数据检索,还是那句话,下一篇文章再写。

5、所有代码

github:GitHub - liupengh3c/career 

代码位于以下文件:

https://github.com/liupengh3c/career/blob/main/elastic/scrool/main.go

代码也粘过来吧:

package main

import (
	"fmt"
	"os"
	"strings"
	"time"

	"github.com/elastic/go-elasticsearch/v8"
	jsoniter "github.com/json-iterator/go"
	"github.com/liupengh3c/esbuilder"
)

// 最外层数据结构
type Documents struct {
	ScrollID string      `json:"_scroll_id"`
	Shards   Shards      `json:"_shards"`
	Hits     HitOutLayer `json:"hits"`
	TimedOut bool        `json:"timed_out"`
	Took     int         `json:"took"`
}
type Shards struct {
	Failed     int `json:"failed"`
	Skipped    int `json:"skipped"`
	Successful int `json:"successful"`
	Total      int `json:"total"`
}
type HitOutLayer struct {
	Hits     []Hits  `json:"hits"`
	MaxScore float64 `json:"max_score"`
	Total    Total   `json:"total"`
}
type Hits struct {
	ID     string         `json:"_id"`
	Index  string         `json:"_index"`
	Score  float64        `json:"_score"`
	Source map[string]any `json:"_source"`
	Type   string         `json:"_type"`
}
type Total struct {
	Relation string `json:"relation"`
	Value    int    `json:"value"`
}

func main() {
	client, err := NewEsClient()
	if err != nil {
		fmt.Println("create client err:", err.Error())
		return
	}
	fmt.Println("connect success")
	for i := 0; i < 510; i++ {
		ScrollSearch(client)
	}
}
func NewEsClient() (*elasticsearch.Client, error) {
	cert, _ := os.ReadFile("/Users/liupeng/Documents/study/elasticsearch-8.17.0/config/certs/http_ca.crt")
	client, err := elasticsearch.NewClient(elasticsearch.Config{
		Username:  "elastic",
		Password:  "XBS=adqa799j_Aoz=A+h",
		Addresses: []string{"https://127.0.0.1:9200"},
		CACert:    cert,
	})

	if err != nil {
		// fmt.Println("create client err:", err.Error())
		return client, err
	}
	return client, nil
}

func ScrollSearch(client *elasticsearch.Client) {
	var json = jsoniter.ConfigCompatibleWithStandardLibrary
	docs := Documents{}
	dslQuery := esbuilder.NewDsl()
	boolQuery := esbuilder.NewBoolQuery()

	dslQuery.SetOrder(esbuilder.NewSortQuery("doc_id", "asc"))
	dslQuery.SetQuery(boolQuery)
	dslQuery.SetSize(10000)

	res, err := client.Search(
		client.Search.WithIndex("new_tag_202411"),
		client.Search.WithBody(strings.NewReader(dslQuery.BuildJson())),
		client.Search.WithScroll(time.Minute*20),
	)
	if err != nil {
		fmt.Println("search err:", err.Error())
		return
	}
	err = json.NewDecoder(res.Body).Decode(&docs)
	if err != nil {
		fmt.Println("decode err:", err.Error())
		return
	}
	fmt.Println("search count:", len(docs.Hits.Hits))
	scrollId := docs.ScrollID
	for {
		docs = Documents{}
		res, err = client.Scroll(
			client.Scroll.WithScrollID(scrollId),
			client.Scroll.WithScroll(time.Minute),
		)
		if err != nil {
			fmt.Println("scroll err:", err.Error())
			return
		}

		err = json.NewDecoder(res.Body).Decode(&docs)
		if err != nil {
			fmt.Println("decode err:", err.Error())
			return
		}
		defer res.Body.Close()
		if res.StatusCode == 429 {
			fmt.Println("scroll contexts is more than 500")
			return
		}
		if len(docs.Hits.Hits) == 0 {
			break
		}
		fmt.Println("search count:", len(docs.Hits.Hits))
		scrollId = docs.ScrollID
	}

	client.ClearScroll(
		client.ClearScroll.WithScrollID(scrollId),
	)
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/945598.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【C++】深入理解 break 和 continue 语句

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;break 和 continue 介绍**break** 的作用**continue** 的作用注意事项 &#x1f4af;break 示例代码示例**执行结果****解析过程** &#x1f4af;continue 示例代码示例&am…

高效使用AI完成编程项目任务的指南:从需求分析到功能实现

随着人工智能工具的普及&#xff0c;即便是零编程基础或基础薄弱的用户&#xff0c;也可以借助AI完成许多技术任务。然而&#xff0c;要高效地使用AI完成编程任务&#xff0c;关键在于如何清晰表达需求&#xff0c;并逐步引导AI实现目标。 在本文中&#xff0c;我们将通过开发…

算法每日双题精讲 —— 滑动窗口(水果成篮,找到字符串中所有字母异位词)

&#x1f31f;快来参与讨论&#x1f4ac;&#xff0c;点赞&#x1f44d;、收藏⭐、分享&#x1f4e4;&#xff0c;共创活力社区。 &#x1f31f; 别再犹豫了&#xff01;快来订阅我们的算法每日双题精讲专栏&#xff0c;一起踏上算法学习的精彩之旅吧&#xff01;&#x1f4aa;…

基于Qt事件机制中的定时器事件的闹钟设计

目标 代码 pro文件 QT core gui texttospeechgreaterThan(QT_MAJOR_VERSION, 4): QT widgetsCONFIG c11# The following define makes your compiler emit warnings if you use # any Qt feature that has been marked deprecated (the exact warnings # depend on …

后台管理系统DEMO

该项目后端使用SpringBootMyBatisPlusJWT&#xff0c;前端使用Vue3Vite2TSPiniaAxiosElementPlus等简单技术栈&#xff0c;实现了一个简约精致版的后台管理系统&#xff0c;包含非常基础的rbac权限功能&#xff0c;可以增删改查角色、用户、权限&#xff0c;角色添加权限、添加…

数据结构之线性表之链表(附加一个考研题)

链表的定义 链表的结构&#xff1a; 单链表-初始化 代码实现&#xff1a; 单链表-头插法 代码实现&#xff1a; 这里我给大家分析一下 我们每创建一个新的节点都要插在头节点的后面&#xff0c;我们一定要注意顺序 一定要先让新节点指向头节点指向的下一个节点&#xff0c;…

Python爬取城市天气信息,并存储到csv文件中

1.爬取的网址为&#xff1a;天气网 (weather.com.cn) 2.需要建立Weather.txt文件&#xff0c;并在里面加入如下形式的字段&#xff1a; 101120701济宁 101010100北京 3.代码运行后&#xff0c;在命令行输入Weather.txt文件中添加过的城市&#xff0c;如&#xff1a;济宁。 …

工厂+策略模式之最佳实践(疾病报卡维护模块API设计)

目录 &#x1f4bb;业务场景 &#x1f527;应用技术 ⚙概要流程 ❗开发注意 服务类上标注了 自定义注解 却无法直接利用getDeclaredAnnotation 获取 *Spring代理机制 代理机制的工作原理 代理的工作机制 代理的使用场景 已获取EmrXXXServiceImpl 的Class&#xff0c;…

【智行安全】基于Synaptics SL1680的AI疲劳驾驶检测方案

随著车载技术的快速进步&#xff0c;驾驶安全越来越受到重视&#xff0c;而疲劳驾驶是造成交通事故的重要原因之一。传统的驾驶监控技术因精度不足或反应迟缓&#xff0c;无法满足实时监测需求。因此&#xff0c;结合人工智能技术的疲劳驾驶检测系统成为行业新方向&#xff0c;…

Go-知识 注释

Go-知识 注释 行注释块注释包注释结构体&接口注释函数&方法注释废弃注释文档 在 go 语言中注释有两种&#xff0c;行注释和块注释 行注释 使用双斜线 // 开始&#xff0c;一般后面紧跟一个空格。行注释是Go语言中最常见的注释形式&#xff0c;在标准包中&#xff0c;…

2025年阿里云认证改版新消息!2025年阿里云认证考试内容有变!

阿里云认证已经确定在2025年要进行大改&#xff0c;这次改动幅度会比2023年改动更大&#xff0c;2023年主要改变是在考试题型上的变化&#xff0c;这次则主要是考试内容的变化了&#xff01; 2023年阿里云ACP认证考试的改版变化主要有&#xff1a; &#xff08;一&#xff09…

ArrayList 和LinkedList的区别比较

前言 ‌ArrayList和LinkedList的主要区别在于它们的底层数据结构、性能特点以及适用场景。‌ArrayList和LinkedList从名字分析&#xff0c;他们一个是Array&#xff08;动态数组&#xff09;的数据结构&#xff0c;一个是Linked&#xff08;链表&#xff09;的数据结构&#x…

STM32-笔记22-sg90舵机

一、接线 二、实验实现 动手让 SG90 每秒转动一下&#xff0c;0 -> 20 -> 40 -> 100 -> 180 如此循环。 舵机接A6 复制18-呼吸灯&#xff0c;重命名24-sg90舵机 把PWM重命名sg90 打开项目文件 在魔术棒和品上把PWM都去掉&#xff0c;加载sg90文件夹 加载之后…

QT集成intel RealSense 双目摄像头

最近一个小项目&#xff0c;用到了双目相机&#xff0c;选用了Intel的RealSense双目相机。功能很简单&#xff0c;就是识别某一个物体&#xff0c;然后对对这个物体进行操作。具体功能随后再说&#xff0c;这里只介绍QT如何集成IntelRealSense相机&#xff0c;就是下面这个。 首…

前端小案例——520表白信封

前言&#xff1a;我们在学习完了HTML和CSS之后&#xff0c;就会想着使用这两个东西去做一些小案例&#xff0c;不过又没有什么好的案例让我们去练手&#xff0c;本篇文章就提供里一个案例——520表白信封 ✨✨✨这里是秋刀鱼不做梦的BLOG ✨✨✨想要了解更多内容可以访问我的主…

Golang的发展历程

Golang的发展历程可以分为以下几个阶段&#xff1a; 设计阶段&#xff1a;2007年&#xff0c;Google开始研究开发一种新的编程语言&#xff0c;主要出于对C和Java等编程语言的不足之处的反思。经过一年多的研究和讨论&#xff0c;Golang的设计方案得到确定&#xff0c;主要包括…

硬件设计-硬件 EMC 设计规范

目录 引言&#xff1a; 常见原因 总体概念及考虑 布局 屏蔽 滤波 引言&#xff1a; 本规范只简绍 EMC 的主要原则与结论&#xff0c;为硬件工程师们在开发设计中抛砖引玉。 电磁干扰的三要素是干扰源、干扰传输途径、干扰接收器。EMC 就围绕这些 问题进行研究。最基本的…

后端开发-Maven

环境说明&#xff1a; windows系统&#xff1a;11版本 idea版本&#xff1a;2023.3.2 Maven 介绍 Apache Maven 是一个 Java 项目的构建管理和理解工具。Maven 使用一个项目对象模型&#xff08;POM&#xff09;&#xff0c;通过一组构建规则和约定来管理项目的构建&#xf…

C++ 编译过程全解析:从源码到可执行文件的蜕变之旅

引言 C 作为一种广泛应用于系统开发、游戏编程、嵌入式系统等领域的高级编程语言&#xff0c;其代码需要经过编译才能转换为计算机可执行的机器语言。编译过程涵盖多个复杂阶段&#xff0c;每个阶段对最终生成的可执行文件的性能、稳定性及兼容性都有着深远影响。深入理解 C 编…

数据库的概念和操作

目录 1、数据库的概念和操作 1.1 物理数据库 1. SQL SERVER 2014的三种文件类型 2. 数据库文件组 1.2 逻辑数据库 2、数据库的操作 2.1 T-SQL的语法格式 2.2 创建数据库 2.3 修改数据库 2.4 删除数据库 3、数据库的附加和分离 1、数据库的概念和操作 1.1 物理数据库…