Elasticsearch:是时候离开了! - 在 Elasticsearch 文档上使用 TTL

作者:来自 Elastic David Pilato

想象一下,圣诞老人必须向世界上所有的孩子们分发礼物。 他有很多工作要做,他需要保持高效。 他有一份所有孩子的名单,并且知道他们住在哪里。 他很可能会将礼物按区域分组,然后再交付。 但他不会在同一个地方停留太久。 他会丢下礼物然后离开。 他不会等待孩子们打开礼物。 他就会离开。

也许我们可以建议他列出一份他仍然需要访问的城市的清单。 一旦他送出了礼物,他就可以将这些城市从名单中删除。 这样,他就会知道自己还要去哪里。 而且他也不会浪费时间回到同一个地方。

为此,他只需使用他必须访问的城市的 TTL(time to live - 生存时间)即可。 他只需将 TTL 设置为他需要递送礼物的时间即可。 一旦 TTL 过期,他就会从列表中删除这些城市。

他的旅程可能是这样的:

{ "city": "Sidney", "deliver": "ASAP", "ttl": 0 }
{ "city": "Singapore", "deliver": "1 minute", "ttl": 1 }
{ "city": "Vilnius", "deliver": "3 minutes", "ttl": 3 }
{ "city": "Paris", "deliver": "5 minutes", "ttl": 5 }
{ "city": "London", "deliver": "6 minutes", "ttl": 6 }
{ "city": "Montréal", "deliver": "7 minutes", "ttl": 7 }
{ "city": "San Francisco", "deliver": "9 minutes", "ttl": 9 }
{ "city": "North Pole", "deliver": "forever" }

ttl 包含我们的 TTL 值(以分钟为单位):

  • 没有值意味着该文档将永远保留
  • 零意味着我们要尽快删除该文档
  • 任何正值都对应于删除文档之前需要等待的分钟数

ttl 摄取管道

要实现这样的功能,你只需要一个摄取管道:

PUT /_ingest/pipeline/ttl
{
  "processors": [
    {
      "set": {
        "field": "ingest_date",
        "value": "{{{_ingest.timestamp}}}"
      }
    },
    {
      "script": {
        "lang": "painless",
        "source": """
          ctx['ttl_date'] = ZonedDateTime.parse(ctx['ingest_date']).plusMinutes(ctx['ttl']);
        """,
        "if": "ctx?.ttl != null"
      }
    },
    {
      "remove": {
        "field": [ "ingest_date", "ttl" ],
        "ignore_missing": true
      }
    }
  ]
}

让我们解释一下。

第一个处理器在文档中设置一个临时字段 (ingest_date),然后我们在其中注入执行管道的时间 (_ingest.timestamp),该时间或多或少是索引日期。

然后我们运行一个 painless 脚本:

ctx['ttl_date'] = ZonedDateTime.parse(ctx['ingest_date']).plusMinutes(ctx['ttl']);

他的脚本根据 ingest_date 字段中可用的字符串值创建一个 ZonedDateTime java 对象。 然后我们只需调用 plusMinutes 方法并提供 ttl 的值作为参数。 这只会将摄取日期移动几分钟。 我们将结果存储在 ttl_date 新字段中。

请注意,我们需要添加一个条件,仅当 ttl 字段存在时才运行该处理器:

"if": "ctx?.ttl != null"

然后,我们只需删除不需要的字段 ingest_date 和可选的 ttl(如果我们不再需要它)。 请注意,出于调试目的,保留 ttl 可能是明智之举。 如果没有设置 ttl,如果丢失,我们也需要忽略它。 这是通过 “ignore_missing”: true 参数完成的。

为了测试这个管道,我们可以使用 simulate API:

POST /_ingest/pipeline/ttl/_simulate?filter_path=docs.doc._source,docs.doc._ingest
{
  "docs": [
    {
      "_source": { "city": "Sidney", "deliver": "ASAP", "ttl": 0 }
    },
    {
      "_source": { "city": "Singapore", "deliver": "1 minute", "ttl": 1 }
    },
    {
      "_source": { "city": "Paris", "deliver": "5 minutes", "ttl": 5 }
    },
    {
      "_source": { "city": "North Pole", "deliver": "forever" }
    }
  ]  
}

请注意在上面所显示的时间为 UTC 时间。我在当前的北京时间是下午 14点多。

我们可以看到文档删除的更改日期。

自动创建 ttl_date 字段

我们可以使用 final_pipeline 索引设置将 ttl 管道定义为在实际索引操作之前使用的管道。

PUT /ttl-demo
{
  "settings": {
    "final_pipeline": "ttl"
  },
  "mappings": {
    "_source": {
      "excludes": [
        "ttl_date"
      ]
    },
    "properties": {
      "ttl_date": {
        "type": "date"
      }
    }
  }
}

你还可以使用 default_pipeline 索引设置,但你需要注意,如果一个用户想要使用用户管道索引文档,则不会调用 ttl 管道,例如:

POST /ttl-demo/_doc?pipeline=my-pipeline
{ 
  "city": "Singapore", 
  "deliver": "1 minute", 
  "ttl": 1
}

请注意,我们从 _source 字段中删除了 ttl_date 字段。 我们不想将其存储在 _source 字段中,因为它只是一个 “技术 ”字段。

索引文档

我们现在可以注入我们的数据集:

POST /ttl-demo/_bulk
{ "index": {} }
{ "city": "Sidney", "deliver": "ASAP", "ttl": 0 }
{ "index": {} }
{ "city": "Singapore", "deliver": "1 minute", "ttl": 1 }
{ "index": {} }
{ "city": "Vilnius", "deliver": "3 minutes", "ttl": 3 }
{ "index": {} }
{ "city": "Paris", "deliver": "5 minutes", "ttl": 5 }
{ "index": {} }
{ "city": "London", "deliver": "6 minutes", "ttl": 6 }
{ "index": {} }
{ "city": "Montréal", "deliver": "7 minutes", "ttl": 7 }
{ "index": {} }
{ "city": "San Francisco", "deliver": "9 minutes", "ttl": 9 }
{ "index": {} }
{ "city": "North Pole", "deliver": "forever" }

删除经过 TTL 处理的文档

现在可以轻松运行 “Delete By Query” 调用:

POST /ttl-demo/_delete_by_query
{
  "query": {
    "range": {
      "ttl_date": {
        "lte": "now"
      }
    }
  }
}

我们只想删除所有早于现在(即请求执行时间)的文档。

如果我们立即运行它,我们可以看到只有文档 { "city": "Sidney", "deliver": "ASAP", "ttl": 0 } 被删除。
一分钟后,{ "city": "Singapore”, "deliver": "1 minute", "ttl": 1 }。 再过几分钟,就只剩下 { "city": "North Pole", "deliver": "forever" } 了。 它将永远保留。

使用 Watcher 每分钟运行一次

你可以使用 crontab 每分钟运行一次这样的查询:

* * * * * curl -XPOST -u elastic:changeme https://127.0.0.1:9200/ttl-demo/_delete_by_query -H 'Content-Type: application/json' -d '{"query":{"range":{"ttl_date":{"lte":"now"}}}}'

请注意,你必须监视此作业。 但如果你有商业许可证,你也可以使用 Watcher 直接从 Elasticsearch 运行它:

PUT _watcher/watch/ttl
{
  "trigger": {
    "schedule": {
      "interval": "1m"
    }
  },
  "input": {
    "simple" : {}
  },
  "condition": {
    "always" : {}
  },
  "actions": {
    "call_dbq": {
      "webhook": {
        "url": "https://127.0.0.1:9200/ttl-demo/_delete_by_query",
        "method": "post",
        "body": "{\"query\":{\"range\":{\"ttl_date\":{\"lte\":\"now\"}}}}",
        "auth": {
          "basic": {
            "username": "elastic",
            "password": "changeme"
          }
        }
      }
    }
  }
}

请注意,我们使用 interval 参数每分钟运行此操作。 我们使用 Webhook 来调用按查询删除 API。 另请注意,我们需要提供身份验证信息。在上面,你需要根据自己的用户账号修改上面的身份信息。

如果你不是在 cloud.elastic.co 上运行,而是在本地使用自签名证书运行,因为 Elasticsearch 默认情况下是安全的,你需要将 xpack.http.ssl.verification_mode 设置为 none。 否则 Elasticsearch 将不会接受自签名证书。 当然,这只是为了测试目的。 不要在生产中这样做!如果我们不设置这个参数为 none,我们可以看到如下的错误信息:

最多一分钟后,圣诞老人就会看到他必须访问的城市已从列表中删除:

GET /ttl-demo/_search
{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 6,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "ttl-demo",
        "_id": "IBTn-4sBOKvQy-0aU35M",
        "_score": 1,
        "_source": {
          "deliver": "3 minutes",
          "city": "Vilnius"
        }
      },
      {
        "_index": "ttl-demo",
        "_id": "IRTn-4sBOKvQy-0aU35M",
        "_score": 1,
        "_source": {
          "deliver": "5 minutes",
          "city": "Paris"
        }
      },
      {
        "_index": "ttl-demo",
        "_id": "IhTn-4sBOKvQy-0aU35M",
        "_score": 1,
        "_source": {
          "deliver": "6 minutes",
          "city": "London"
        }
      },
      {
        "_index": "ttl-demo",
        "_id": "IxTn-4sBOKvQy-0aU35M",
        "_score": 1,
        "_source": {
          "deliver": "7 minutes",
          "city": "Montréal"
        }
      },
      {
        "_index": "ttl-demo",
        "_id": "JBTn-4sBOKvQy-0aU35M",
        "_score": 1,
        "_source": {
          "deliver": "9 minutes",
          "city": "San Francisco"
        }
      },
      {
        "_index": "ttl-demo",
        "_id": "JRTn-4sBOKvQy-0aU35M",
        "_score": 1,
        "_source": {
          "city": "North Pole",
          "deliver": "forever"
        }
      }
    ]
  }
}

到最后,这里就只剩下他的家了:

{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "ttl-demo",
        "_id": "JRTn-4sBOKvQy-0aU35M",
        "_score": 1,
        "_source": {
          "city": "North Pole",
          "deliver": "forever"
        }
      }
    ]
  }
}

这是一个简单快速但不够规范的解决方案,用于从 Elasticsearch 集群中删除旧数据。 但请注意,你永远不应该将此技术应用于日志或任何基于时间的索引,或者如果要以这种方式删除的数据量超过数据集的 10%。

相反,你应该更喜欢使用 delete index API 立即删除完整索引,而不是删除整套文档。 这是一种更有效的方式。

删除索引(首选)

为此,我们实际上可以更改管道以将数据发送到名称包含 ttl 日期的索引:

PUT /_ingest/pipeline/ttl
{
  "processors": [
    {
      "set": {
        "field": "ingest_date",
        "value": "{{{_ingest.timestamp}}}"
      }
    },
    {
      "script": {
        "lang": "painless",
        "source": """
          ctx['ttl_date'] = ZonedDateTime.parse(ctx['ingest_date']).plusDays(ctx['ttl']);
        """,
        "ignore_failure": true
      }
    },
    {
      "date_index_name" : {
        "field" : "ttl_date",
        "index_name_prefix" : "ttl-demo-",
        "date_rounding" : "d",
        "date_formats": ["yyyy-MM-dd'T'HH:mm:ss.nz"],
        "index_name_format": "yyyy-MM-dd",
        "if": "ctx?.ttl_date != null"
      }
    },
    {
      "set": {
        "field" : "_index",
        "value": "ttl-demo-forever",
        "if": "ctx?.ttl_date == null"
      }
    },
    {
      "remove": {
        "field": [ "ingest_date", "ttl", "ttl_date" ],
        "ignore_missing": true
      }
    }
  ]
}

在此示例中,我切换到每日索引,因为它比你在生产中看到的要准确得多,因为通常数据不会在几分钟后过期,但会在几天或几个月后过期。有关 date_index_name 的用法,请阅读文章 “Elasticsearch:使用 pipelines 路由文档到想要的 Elasticsearch 索引中去”。

我们更改了脚本以添加天数:

ctx['ttl_date'] = ZonedDateTime.parse(ctx['ingest_date']).plusDays(ctx['ttl']);

如果 ttl_date 字段存在,我们使用 date_index_name 处理器根据 ttl_date 字段创建一个新的索引名称。 我们使用 date_rounding 参数将日期四舍五入到当天。 我们使用 index_name_format 参数将日期格式化为 yyyy-MM-dd。 这将使用索引名称,例如 ttl-demo-2023-11-27:

{
  "date_index_name" : {
    "field" : "ttl_date",
    "index_name_prefix" : "ttl-demo-",
    "date_rounding" : "d",
    "date_formats": ["yyyy-MM-dd'T'HH:mm:ss.nz"],
    "index_name_format": "yyyy-MM-dd",
    "if": "ctx?.ttl_date != null"
  }
}

如果 ttl_date 字段不存在,我们只需将索引名称设置为 ttl-demo-forever:

{
  "set": {
    "field" : "_index",
    "value": "ttl-demo-forever",
    "if": "ctx?.ttl_date == null"
  }
}

我们可以重新索引我们的数据集:

POST /ttl-demo/_bulk?pipeline=ttl
{ "index": {} }
{ "city": "Sidney", "deliver": "ASAP", "ttl": 0 }
{ "index": {} }
{ "city": "Singapore", "deliver": "1 day", "ttl": 1 }
{ "index": {} }
{ "city": "Vilnius", "deliver": "3 days", "ttl": 3 }
{ "index": {} }
{ "city": "Paris", "deliver": "5 days", "ttl": 5 }
{ "index": {} }
{ "city": "North Pole", "deliver": "forever" }

我们可以看到这些文档现在位于不同的索引中:

GET /ttl-demo-*/_search?filter_path=hits.hits._index,hits.hits._source.deliver

索引名称不再引用,因为我们习惯于查看数据的日期,而是删除数据的日期。 因此我们可以每天再次运行 crontab 来删除旧索引。 以下脚本旨在在 Mac OS X 系统上运行:

0 0 * * * curl -XDELETE -u elastic:changeme https://127.0.0.1:9200/ttl-demo-$(date -v -1d -j +%F)

总结起来

我们看到了在 Elasticsearch 文档上执行 TTL 的两种方法。 第一个是使用 TTL 字段并使用 “Delete By Query ” 调用来删除文档。 第二种(效率更高,需要删除大量数据)是使用 TTL 字段将文档路由到不同的索引,然后使用 crontab 删除索引。

但对于这两种解决方案,在 contrab 运行之前文档仍然可见。

你可以考虑使用索引过滤别名来隐藏旧文档:

POST _aliases
{
  "actions": [
    {
      "add": {
        "index": "ttl-demo",
        "alias": "ttl-filtered",
        "filter": {
          "bool": {
            "filter": [
              {
                "range": {
                  "ttl_date": {
                    "gt": "now/m"
                  }
                }
              }
            ]
          }
        }
      }
    }
  ]
}

即使批处理(crontab 或 watcher)尚未删除过期文档,在 ttl-filtered  别名内搜索也只会返回尚未过期的文档。

圣诞老人现在可以知道接下来安全地去哪里,然后享受当之无愧的休息一年!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/317738.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Qt QSQlite数据库插入字符串中存在单个双引号或单个单引号解决方案

1. 前言 当进行数据库写入或更新时,有时会遇到存在字符串中包含单个双引号或者单引号。 2. 单引号和双引号""作用 在数据库中,字符串常量时需要用一对英文单引号或英文双引号""将字符串常量括起来。 比如: select * …

stable-diffusion 学习笔记

从效果看Stable Diffusion中的采样方法 参考:Ai 绘图日常 篇二:从效果看Stable Diffusion中的采样方法_软件应用_什么值得买 大概示例:

Java Web课设——个人博客(双端系统)

项目演示 先看看演示视频吧 演示图 简单介绍 个人博客管理系统采用Springboot2.4.5框架开发,是标准的MVC模式,将这个系统划分为View层、Controller层、Service层、DAO层和持久层五层。其中,Spring MVC负责请求的转发和视图管理,S…

蓝桥杯单片机组备赛——数码管动态显示

✨文章内容会不断优化,如果你感兴趣的话,欢迎点藏收藏关注我哟 🧨如果文章有哪里看不懂的欢迎评论区或私信留言,我会及时回复的 ⏰如果文章出现错误,欢迎指正,看到后我会马上改正 文章目录 一、动态显示原理…

Java SE入门及基础(10)

switch选择结构 1. 概念 switch 表示开关的意思,为了帮助理解,下面以线路为例,进行解释说明 上图中表示一条带有多个开关的线路,当开关打开时,该开关所控制的灯即被点亮。 2. 语法规则 switch ( 表达式 ){ //…

SECS/GEM的变量SVID是什么?JAVA SECS通信 JAVA与SECS集成资料大全JAVA开发SECS快速入门资料

Java与SECS基础通信 Java实现SECS指令S2F17获取时间 Java实现SECS指令 S10F3 终端单个显示例子 工艺配方管理S7FX Java实现SECS指令 S5F1报警/取消报警上传 实例源码及DEMO请查阅 变量可以是设备的状态信息 定义: 此功能允许主机查询设备数据变量&#x…

C#基于ScottPlot进行可视化

前言 上一篇文章跟大家分享了用NumSharp实现简单的线性回归,但是没有进行可视化,可能对拟合的过程没有直观的感受,因此今天跟大家介绍一下使用C#基于Scottplot进行可视化,当然Python的代码,我也会同步进行可视化。 P…

虚幻引擎nDisplay教程:如何同步nDisplay节点与Switchboard + Helix Core

对于使用大型LED屏幕进行拍摄的虚拟制作团队来说,虚幻(Unreal)的nDisplay是一个重要的工具。但是,在nDisplay中将正确版本的文件发送到每个节点会非常耗时。立即阅读本文,您将了解到如何使用Perforce Helix Core版本控…

【b站咸虾米】新课uniapp零基础入门到项目打包(微信小程序/H5/vue/安卓apk)全掌握

课程地址:【新课uniapp零基础入门到项目打包(微信小程序/H5/vue/安卓apk)全掌握】 https://www.bilibili.com/video/BV1mT411K7nW/?p12&share_sourcecopy_web&vd_sourceb1cb921b73fe3808550eaf2224d1c155 三、vue语法 继续回到官…

Python之jieba分词相关介绍

1.jieba分词的安装 直接在cmd窗口当中pip install即可 2.jieba分词的介绍 jieba分词是目前比较好的中文分词组件之一,jieba分词支持三种模式的分词(精确模式、全模式、搜索引擎模式),并且支持自定义词典(这一点在特定的领域很重要,有时候…

VMware workstation搭建与安装AlmaLinux-9.2虚拟机

VMware workstation搭建与安装AlmaLinux-9.2虚拟机 适用于需要在VMware workstation平台安装AlmaLinux-9.2(最小化安装、无图形化界面)虚拟机。 1. 安装准备 1.1 安装平台 Windows 11 1.2. 软件信息 软件名称软件版本安装路径VMware-workstation 1…

【前端素材】bootstrap5实现美食餐饮网站RegFood

一、需求分析 美食餐饮网站是指专门提供关于美食和餐饮的信息、服务和资源的在线平台。这类网站通常提供以下功能: 餐厅搜索和预订:用户可以在网站上搜索附近的餐厅,并预订桌位。网站会提供餐厅的详细信息,包括菜单、地址、电话号…

Jmeter 性能-监控服务器

Jmeter监控Linux需要三个文件 JMeterPlugins-Extras.jar (包:JMeterPlugins-Extras-1.4.0.zip) JMeterPlugins-Standard.jar (包:JMeterPlugins-Standard-1.4.0.zip) ServerAgent-2.2.3.zip 1、Jemter 安装插件 在插件管理中心的搜索Servers Perform…

JDK介绍

JDK(Java Development Kit)是Sun Microsystems针对Java开发员的产品。自从Java推出以来,JDK已经成为使用最广泛的Java SDK(Software development kit),JDK是一个写Java的applet和应用程序的程序开发环境。它由一个处于操作系统层之…

vue3-计算属性

计算属性 模板中的表达式虽然方便&#xff0c;但也只能用来做简单的操作。如果在模板中写太多逻辑&#xff0c;会让模板变得臃肿&#xff0c;难以维护。 根据作者今年是否看过书展示不同信息 <script lang"ts" setup> import { ref, reactive } from "…

C++11 14 17内存管理

智能指针 unique_ptr 初始化 访问和移动赋值 重置和移动内存资源 自定义删除器 shared_ptr 原理 自定义删除器 分配器allocator和new重载 new表达式原理 operator new delete placement new new (buf) 是一种 "placement new" 的使用方式&#xff0c;它允许在已…

thinkphp学习06-连接数据库与模型初探

新建数据库 CREATE DATABASE tp6stu01 CHARACTER SET utf8mb4 COLLATE utf8mb4_bin;创建表和数据 DROP TABLE IF EXISTS tp_user; CREATE TABLE tp_user (id mediumint(8) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 自动编号,username varchar(20) CHARACTER SET utf8 COLL…

绘制几何图形(Shape)

目录 1、创建绘制组件 2、形状视口viewport 3、自定义样式 4、场景示例 绘制组件用于在页面绘制图形&#xff0c;Shape组件是绘制组件的父组件&#xff0c;父组件中会描述所有绘制组件均支持的通用属性。具体用法请参考Shape。 1、创建绘制组件 绘制组件可以由以下两种形式…

Spring MVC的RequestMapping注解、controller方法返回值

1.使用说明 作用&#xff1a;用于建立请求URL和处理请求方法之间的对应关系。 出现位置&#xff1a; 类上&#xff1a; 请求 URL的第一级访问目录。此处不写的话&#xff0c;就相当于应用的根目录。写的话需要以/开头。它出现的目的是为了使我们的 URL 可以按照模块化管理&…

Netty开篇——NIO章上(三)

Java NIO基本介绍 java non-blocking I/O 称为NIO(也叫New IO)。JDK4开始提供,同步非阻塞相关内容在 java.nio 包及子包下&#xff0c;对java.io 包中的很多类进行改写。三大核心: Channel(通道)&#xff0c;Buffer(缓冲区),Selector(选择器)NIO是面向缓冲区或者面向块编程的。…