这一篇我们继续了解 ES 7.10 相较于 ES 6.8 调优的集群管理和任务管理的方法,主要有断联查询的主动取消、投票节点角色、异步查询和可搜索快照四个功能。
Query 自动取消
对于一个完善的产品来说,当一个任务发起链接主动断联的时候,服务端与之相关的任务应该也都被回收。但是这个特性到了 elasticsearch 7.4 版本才有了明确的声明。
Elasticsearch now automatically terminates queries sent through the _search endpoint when the initiating connection is closed.
相关的 PR 和 issue 在这里,对源码有兴趣的同学可以挖掘一下。
PR:https://github.com/elastic/elasticsearch/pull/43332
issue:https://github.com/elastic/elasticsearch/issues/43105
简单来说,ES 接受在某个查询的 http 链接断掉的时候,与其相关的父子任务的自动取消。原来的场景下可能需要手工一个个关闭。
实际测试
利用 painless 模拟复杂查询,下面这个查询在测试集群上能维持 5s 左右
GET /_search?max_concurrent_shard_requests=1
{
"query": {
"bool": {
"must": [
{
"script": {
"script": {
"lang": "painless",
"source": """
long sum = 0;
for (int i = 0; i < 100000; i++) {
sum += i;
}
return true;
"""
}
}
},
{
"script": {
"script": {
"lang": "painless",
"source": """
long product = 1;
for (int i = 1; i < 100000; i++) {
product *= i;
}
return true;
"""
}
}
},
{
"script": {
"script": {
"lang": "painless",
"source": """
long factorial = 1;
for (int i = 1; i < 100000; i++) {
factorial *= i;
}
long squareSum = 0;
for (int j = 0; j < 100000; j++) {
squareSum += j * j;
}
return true;
"""
}
}
},
{
"script": {
"script": {
"lang": "painless",
"source": """
long fib1 = 0;
long fib2 = 1;
long next;
for (int i = 0; i < 100000; i++) {
next = fib1 + fib2;
fib1 = fib2;
fib2 = next;
}
return true;
"""
}
}
}
]
}
}
}
查看任务被终止的状态
GET /_tasks?detailed=true&actions=*search*
测试脚本,判断上面该查询被取消后是否还可以查到任务
import requests
import multiprocessing
import time
from requests.exceptions import RequestException
from datetime import datetime
# Elasticsearch 地址
#ES_URL = "http://localhost:9210" # 6.8版本地址
ES_URL = "http://localhost:9201"
# 耗时查询的 DSL
LONG_RUNNING_QUERY = {"size":0,
"query": {
"bool": {
"must": [
{
"script": {
"script": {
"lang": "painless",
"source": """
long sum = 0;
for (int i = 0; i < 100000; i++) {
sum += i;
}
return true;
"""
}
}
},
{
"script": {
"script": {
"lang": "painless",
"source": """
long product = 1;
for (int i = 1; i < 100000; i++) {
product *= i;
}
return true;
"""
}
}
},
{
"script": {
"script": {
"lang": "painless",
"source": """
long factorial = 1;
for (int i = 1; i < 100000; i++) {
factorial *= i;
}
long squareSum = 0;
for (int j = 0; j < 100000; j++) {
squareSum += j * j;
}
return true;
"""
}
}
},
{
"script": {
"script": {
"lang": "painless",
"source": """
long fib1 = 0;
long fib2 = 1;
long next;
for (int i = 0; i < 100000; i++) {
next = fib1 + fib2;
fib1 = fib2;
fib2 = next;
}
return true;
"""
}
}
}
]
}
}
}
# 用于同步的事件对象
query_finished = multiprocessing.Event()
# 新增:进程终止标志位
process_terminated = multiprocessing.Event()
# 定义一个函数用于添加时间戳到日志
def log_with_timestamp(message,*message1):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] {message}+{message1}")
# 发起查询的函数
def run_query():
try:
log_with_timestamp("发起查询...")
session = requests.Session()
response = session.post(
f"{ES_URL}/_search",
json=LONG_RUNNING_QUERY,
stream=True # 启用流式请求,允许后续中断
)
try:
# 尝试读取响应内容(如果连接未被中断)
if response.status_code == 200:
log_with_timestamp("查询完成,结果:", response.json())
else:
log_with_timestamp("查询失败,错误信息:", response.text)
except RequestException as e:
log_with_timestamp("请求被中断:", e)
finally:
# 标记查询完成
query_finished.set()
# 中断连接的信号函数
def interrupt_signal():
time.sleep(1) # 等待 1 秒
log_with_timestamp("发出中断查询信号...")
# 标记可以中断查询了
query_finished.set()
# 检测任务是否存在的函数
def check_task_exists():
# 等待进程终止标志位
process_terminated.wait()
max_retries = 3
retries = 0
time.sleep(1) #1s后检查
while retries < max_retries:
log_with_timestamp("检查任务是否存在...")
tasks_url = f"{ES_URL}/_tasks?detailed=true&actions=*search*"
try:
tasks_response = requests.get(tasks_url)
if tasks_response.status_code == 200:
tasks = tasks_response.json().get("nodes")
if tasks:
log_with_timestamp("任务仍存在:", tasks)
else:
log_with_timestamp("任务已消失")
break
else:
log_with_timestamp("获取任务列表失败,错误信息:", tasks_response.text)
except RequestException as e:
log_with_timestamp(f"检测任务失败(第 {retries + 1} 次重试): {e}")
retries += 1
time.sleep(1) # 等待 1 秒后重试
if retries == max_retries:
log_with_timestamp("达到最大重试次数,无法检测任务状态。")
# 主函数
def main():
# 启动查询进程
query_process = multiprocessing.Process(target=run_query)
query_process.start()
# 启动中断信号进程
interrupt_process = multiprocessing.Process(target=interrupt_signal)
interrupt_process.start()
# 等待中断信号
query_finished.wait()
# 检查查询进程是否还存活并终止它
if query_process.is_alive():
log_with_timestamp("尝试中断查询进程...")
query_process.terminate()
log_with_timestamp("查询进程已终止")
# 新增:设置进程终止标志位
process_terminated.set()
# 启动任务检测进程
check_process = multiprocessing.Process(target=check_task_exists)
check_process.start()
# 等待所有进程完成
query_process.join()
interrupt_process.join()
check_process.join()
if __name__ == "__main__":
main()
实际测试结果:
# 6.8 版本
[2025-02-08 15:17:21] 发起查询...+()
[2025-02-08 15:17:22] 发出中断查询信号...+()
[2025-02-08 15:17:22] 尝试中断查询进程...+()
[2025-02-08 15:17:22] 查询进程已终止+()
[2025-02-08 15:17:23] 检查任务是否存在...+()
[2025-02-08 15:17:23] 任务仍存在:+({'fYMNv_KxQGCGzhgfMxPXuA': {......}},)
可以看到在查询任务被终止后 1s 再去检查,任务仍然存在
# 7.10 版本
[2025-02-08 15:18:16] 发起查询...+()
[2025-02-08 15:18:17] 发出中断查询信号...+()
[2025-02-08 15:18:17] 尝试中断查询进程...+()
[2025-02-08 15:18:17] 查询进程已终止+()
[2025-02-08 15:18:18] 检查任务是否存在...+()
[2025-02-08 15:18:18] 任务已消失+()
这里可以看到任务已经检测不到了。
关于 timeout 配置
这里展开讨论下,timeout 配置。超时回收处理是一个‘best effort’行为。
(Optional, time units) Specifies the period of time to wait for a response. If no response is received before the timeout expires, the request fails and returns an error. Defaults to no timeout.
the search request is more of a best effort and does not guarantee that the request will never last longer than the specified amount of time.
异步搜索
使用方法
可以让用户进行异步的搜索,可以通过相关参数进行检查维护该搜索的状态和结果。比较合适查询量较大但对延迟要求较低的查询,进行精细化的管理控制。
注意:这里的参数基本都是添加到 url 上的,并不是添加到 request body 上的。
POST test_index/_async_search?keep_on_completion=true
{
"query": {
"match_all": {}
}
}
注:这里为了产生查询结果 id 使用了 keep_on_completion 参数,这个参数的使用见下面解释。
返回结果,和一般的查询结果不同的是,添加了结果 id 和查询的一些状态数据。
{
"id": "Fmk2b0VjM2FEVE9Dbk9TemVyOTlkMncbOFlwRGU2OWZTa2kxNEpoT0Q2bVZrZzozODIz",//结果id,可以用于后续的复查
"is_partial": false,//是否为部分完成结果
"is_running": false,//是否还在查询
"start_time_in_millis": 1738978637287,//查询产生时间戳
"expiration_time_in_millis": 1739410637287,//查询结果过期时间戳
"response": {
"took": 1,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 3,
"relation": "eq"
},
"max_score": 1,
"hits": [······]
}
}
}
管理查询结果
//查询结果和第一次返回的内容一致
GET /_async_search/Fmk2b0VjM2FEVE9Dbk9TemVyOTlkMncbOFlwRGU2OWZTa2kxNEpoT0Q2bVZrZzozODIz
//主动删除查询结果
DELETE /_async_search/Fmk2b0VjM2FEVE9Dbk9TemVyOTlkMncbOFlwRGU2OWZTa2kxNEpoT0Q2bVZrZzozODIz
关键参数
- wait_for_completion_timeout:参数(默认为 1 秒),这个参数用来设置异步查询的等待时间。当异步搜索在此时间内完成时,响应将不包括 ID,结果也不会存储在集群中。
- keep_on_completion:参数(默认为 false)可以设置为 true,可以强制存储查询结果,即便在 wait_for_completion_timeout 设置时间内完成搜索,该结果也能被查询到。
- keep_alive:指定异步搜索结果可以被保存多长时间,默认为 5d(5 天)。在此期间之后,正在进行的异步搜索和任何保存的搜索结果将被删除。
- batched_reduce_size:是 Elasticsearch 中的一个配置参数,默认值为 5。它的作用是控制分片结果的部分归并频率,具体来说,它决定了协调节点(coordinating node)在接收到多少个分片的响应后,会执行一次部分结果归并(partial reduction)。
- pre_filter_shard_size:是 Elasticsearch 中与查询执行相关的一个参数,它的默认值为 1,并且不可更改。这个参数的作用是强制 Elasticsearch 在执行查询之前,先进行一轮预过滤(pre-filter),以确定哪些分片(shard)可能包含与查询匹配的文档,从而跳过那些肯定不包含匹配文档的分片。
查询结果存储位置
异步查询的结果部分存储在.async-search
中,但是进行了程序加密,内容对使用者不可见。
GET .async-search/_search
// 返回的结果
···
"hits": [
{
"_index": ".async-search",
"_type": "_doc",
"_id": "bPNotcTCTV-gSIiZLuK0IA",
"_score": 1,
"_source": {
"result": "i6+xAwFERm1KUVRtOTBZMVJEVkZZdFoxTkphVnBNZFVzd1NVRWJPRmx3UkdVMk9XWlRhMmt4TkVwb1QwUTJiVlpyWnpvek1EWTEAAQEDAD+AAAADP4AAAAAAABR0Sm9yNDVRQlQ3bzBsZTdsYmp0TgAAAARfZG9jAP//AwALeyJhIjoxMTExfQoAAAAAAAAAAQEAAAAWOFlwRGU2OWZTa2kxNEpoT0Q2bVZrZwp0ZXN0X2luZGV4Fk5fYmphNXM1UWtpcnU4RXdleVlGSUEAAAA/gAAAAAAAFHRab3I0NVFCVDdvMGxlN2xlVHNrAAAABF9kb2MA//8DAAt7ImEiOjExMTJ9CgAAAAAAAAABAQAAABY4WXBEZTY5ZlNraTE0SmhPRDZtVmtnCnRlc3RfaW5kZXgWTl9iamE1czVRa2lydThFd2V5WUZJQQAAAD+AAAAAAAAUdHBvcjQ1UUJUN28wbGU3bGZqc28AAAAEX2RvYwD//wMAC3siYSI6MTExM30KAAAAAAAAAAEBAAAAFjhZcERlNjlmU2tpMTRKaE9ENm1Wa2cKdGVzdF9pbmRleBZOX2JqYTVzNVFraXJ1OEV3ZXlZRklBAAAAAAAAAAAAAgABAQEAAAAAAAsAAAAAAAABlOMuvCQAAAGU/O6IJA==",
"headers": {},
"expiration_time": 1739410278436,
"response_headers": {}
}
},
···
只投票候选节点
这是一个主候选节点角色的优化,能相对固定 master 节点的位置,减少了选举候选节点过多的问题。
作用
Voting - only master - eligible node(仅参与投票的具备主节点资格的节点)在 Elasticsearch 集群中有以下作用:
- 参与主节点选举:该节点参与主节点选举过程,但本身不会成为集群选出的主节点,主要作为选举中的决胜因素(打破平局)。
- 保障高可用性:在高可用性(HA)集群中,至少需要三个具备主节点资格的节点,其中至少两个不能是仅参与投票的节点,这样即使有一个节点故障,集群仍能选出主节点。
- 分担选举及状态发布任务:和普通具备主节点资格的节点一样,在集群状态发布期间承担特定任务。
- 灵活承担其他角色:可以同时承担集群中的其他角色,如数据节点;也可以作为专用节点,不承担其他角色。
配置
三个节点的集群:可以配置两个普通主节点资格节点和一个仅参与投票的节点。这样在一个普通主节点故障时,剩下的普通主节点和仅参与投票的节点一起可以完成主节点选举,保证集群的正常运行。
理论上,主候选节点数量能满足不同区域间的主备切换要求即可,其余可以都是投票节点。
可搜索快照
注意:这是一个收费功能
实现机制
可搜索快照让你能够通过使用快照来保障数据恢复能力,而非在集群内维护副本分片,从而降低运营成本。
当你将快照中的索引挂载为可搜索快照时,Elasticsearch 会将索引分片复制到集群内的本地存储中。这能确保搜索性能与搜索其他任何索引相当,并尽量减少对访问快照存储库的需求。如果某个节点发生故障,可搜索快照索引的分片会自动从快照存储库中恢复。
搜索可搜索快照索引与搜索其他任何索引的方式相同。搜索性能与常规索引相当,因为在挂载可搜索快照时,分片数据会被复制到集群中的节点上。
如果某个节点发生故障,且需要从快照中恢复可搜索快照分片,在 Elasticsearch 将分片分配到其他节点的短暂时间内,集群健康状态将不会显示为绿色。在这些分片重新分配完成之前,对这些分片的搜索将会失败或返回部分结果。
对于搜索频率较低的数据,这能显著节省成本。使用可搜索快照,不再需要额外的索引分片副本以避免数据丢失,这有可能将搜索该数据所需的节点本地存储容量减少一半。同时可搜索快照依赖于备份使用的快照,也不需要额外的空间。
使用建议
-
从含多索引的快照挂载单个索引时,建议进行使用分隔,创建仅含目标索引的快照副本并挂载,方便独立管理备份与可搜索快照生命周期。
-
挂载为可搜索快照索引前,建议将索引强制合并为每分片一个段,减少从存储库读取数据的操作和成本。
实际测试
基础配置
前提条件:需要一个镜像使用存储,这里使用 minIO 作为测试
- 安装 S3 插件,并注册快照库信息
# 在线安装插件
elasticsearch-plugin install repository-s3
# 设置访问minio的信息,elasticsearch的bin目录下,使用minIO中设置的用户名密码
./elasticsearch-keystore add s3.client.default.access_key
./elasticsearch-keystore add s3.client.default.secret_key
# 重载安全设置,然后重启节点
POST _nodes/reload_secure_settings
# 注册快照库
PUT _snapshot/my-minio-repository
{
"type": "s3",
"settings": {
"bucket": "es-bucket",
"endpoint": "http://127.0.0.1:9002",
"compress": true
}
}
- 挂载需要的快照索引
POST /_snapshot/my-minio-repository/snapshot_es_prp_cmain_20240829/_mount?wait_for_completion=true
{
"index": "es_prp_cmain_insured_itemkind_detail_formal_20240829",
"renamed_index": "test_searchable_snapshot",//挂载时对索引进行重命名
"index_settings": {
"index.number_of_replicas": 0
},
"ignore_index_settings": [ "index.refresh_interval" ]
}
- 检查空间占用
GET _cat/indices/test_searchable_snapshot?v
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
green open test_searchable_snapshot qROj2flcRdiGOZaejeAmQQ 1 0 10000 0 21.3mb 21.3mb
在系统上也看到了对应 uuid 的文件目录
[root@hcss-ecs 0]# ls
_state snapshot_cache translog
[root@hcss-ecs 0]# pwd
/data/elasticsearch-7.10.2/data/nodes/0/indices/qROj2flcRdiGOZaejeAmQQ/0
小结
这篇的内容讲解测试的相对较细,对于查询的自动取消和异步查询增加了 ES 查询任务的灵活性;只投票节点也是加强了主节点选举的稳定性;可搜索快照是成本和功能的均衡方法,对于日志场景的使用是一个不错的选择。
推荐阅读
- 谈谈 ES 6.8 到 7.10 的功能变迁(1)- 性能优化篇
- 谈谈 ES 6.8 到 7.10 的功能变迁(2)- 字段类型篇
- 谈谈 ES 6.8 到 7.10 的功能变迁(3)- 查询方法篇
- 谈谈 ES 6.8 到 7.10 的功能变迁(4)- 聚合功能篇
- 谈谈 ES 6.8 到 7.10 的功能变迁(6)- 其他