Elasticsearch Ingest Pipelines

1. 前言

在将第三方数据源的数据导入到Elasticsearch中时,原始数据长什么样,索引后的文档就是什么样。文档数据结构不统一,导致后续数据分析时变得麻烦,以往需要额外写一个中间程序来读取原始数据,转换加工后再写入到Elasticsearch,比较麻烦,于是官方推出了Ingest pipeline。

Ingest pipeline 允许文档在被索引之前对数据进行预处理,将数据加工处理成我们需要的格式,例如删除或增加一些字段等。有了Ingest pipeline,就不需要我们再额外开发中间程序对数据进行加工转换等操作了,全权交给Elasticsearch即可。

2. 创建Ingest pipeline

Pipeline 由一系列处理器 Processor 组成,每个Processor按照顺序执行,对传入的文档进行加工处理,Elasticsearch再将加工后的文档添加到索引中。

默认情况下,每个节点都具备ingest能力,即文档预处理的能力。如果要预处理大量的文档,建议开启专门的ingest节点,配置如下:

node.roles: [ ingest ]

可以通过Elasticsearch提供的「_ingest/pipeline」端点来创建Ingest pipeline,如下示例,创建了一个名为“test-pipeline”的pipeline,它由三个Processor组成:lowercase负责将字段值改为小写、remove负责删除给定字段、rename负责重命名给定字段。

PUT _ingest/pipeline/test-pipeline
{
  "description": "测试pipeline",
  "processors": [
    {
      "lowercase": {
        "field": "title"
      },
      "remove": {
        "field": "extended_data"
      },
      "rename": {
        "field": "field_a",
        "target_field": "field_b"
      }
    }
  ]
}

然后,我们就可以通过Elasticsearch提供的模拟API,来测试我们的Pipelline,如下示例:

POST _ingest/pipeline/test-pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "title": "我是TITLE",
        "extended_data": "我是扩展数据",
        "field_a": "我是要rename的field"
      }
    }
  ]
}

Pipeline返回的结果符合我们的预期,字段title的值英文部分改为了等效的小写、extended_data字段被删除了、field_a被重命名为field_b。

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_version": "-3",
        "_id": "_id",
        "_source": {
          "title": "我是title",
          "field_b": "我是要rename的field"
        },
        "_ingest": {
          "timestamp": "2024-04-15T07:21:20.521194Z"
        }
      }
    }
  ]
}

3. 内置的Processor

截止Elasticsearch8.13版本,官方内置了40多个的Processor供我们使用,如下是部分Processor示例:

这里介绍几个常用的,其它参考官方文档即可。

append Processor:如果字段已经存在并且是一个数组,则向现有数组追加一个或多个值。

如下示例,如果文档存在tags字段并且是数组,则会自动追加给定的两个tag

PUT _ingest/pipeline/test-pipeline
{
  "processors": [
    {
      "append": {
        "field": "tags",
        "value": ["年度十佳","Top100"]
      }
    }
  ]
}

测试结果

POST _ingest/pipeline/test-pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "tags":["热门"]
      }
    }
  ]
}

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_version": "-3",
        "_id": "_id",
        "_source": {
          "tags": [
            "热门",
            "年度十佳",
            "Top100"
          ]
        },
        "_ingest": {
          "timestamp": "2024-04-15T07:31:23.089093Z"
        }
      }
    }
  ]
}

bytes Processor:将人类可读的字节值(例如1KB)转换为整型字节值(例如1024),如果是数组则转换所有成员。

如下示例,字段memory将会由人类可读的字符串转换为整型字节值

PUT _ingest/pipeline/test-pipeline
{
  "processors": [
    {
      "bytes": {
        "field": "memory"
      }
    }
  ]
}

测试结果

POST _ingest/pipeline/test-pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "memory":"132MB"
      }
    }
  ]
}

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_version": "-3",
        "_id": "_id",
        "_source": {
          "memory": 138412032
        },
        "_ingest": {
          "timestamp": "2024-04-15T07:35:07.233976Z"
        }
      }
    }
  ]
}

convert Processor:将文档中的某个字段转换为给定的数据类型,例如将字符串转换为布尔类型。数据类型转换不能随意配置,否则会报错,支持的类型有:integer、long、float、double、string、boolean、ip、auto。

如下示例,将字符串转换为布尔类型

PUT _ingest/pipeline/test-pipeline
{
  "processors": [
    {
      "convert": {
        "field": "deleted",
        "type": "boolean"
      }
    }
  ]
}

测试结果

POST _ingest/pipeline/test-pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "deleted":"false"
      }
    }
  ]
}

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_version": "-3",
        "_id": "_id",
        "_source": {
          "deleted": false
        },
        "_ingest": {
          "timestamp": "2024-04-15T07:40:42.066717Z"
        }
      }
    }
  ]
}

date Processor:从文档中的某个字段中解析时间,并将其作为文档的时间戳,默认时间戳的字段名为”@timestamp“。

如下示例,我们将publish_time作为文档的时间戳:

PUT _ingest/pipeline/test-pipeline
{
  "processors": [
    {
      "date": {
        "field": "publish_time",
        "formats": ["yyyy-MM-dd HH:mm:ss"]
      }
    }
  ]
}

测试结果:

POST _ingest/pipeline/test-pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "publish_time":"2024-01-01 00:00:00"
      }
    }
  ]
}

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_version": "-3",
        "_id": "_id",
        "_source": {
          "publish_time": "2024-01-01 00:00:00",
          "@timestamp": "2024-01-01T00:00:00.000Z"
        },
        "_ingest": {
          "timestamp": "2024-04-15T07:44:28.383089Z"
        }
      }
    }
  ]
}

最后介绍一个功能强大的Processor:script,通过内联的自定义脚本来操作文档,使用起来相当灵活。

如下示例,Processor将从content字段中提取品牌、商品标题和价格,并移除掉content字段。

PUT _ingest/pipeline/test-pipeline
{
  "processors": [
    {
      "script": {
        "lang": "painless",
        "source": """
        String[] splits = ctx['content'].splitOnToken(',');
        ctx['brand'] = splits[0];
        ctx['title'] = splits[1];
        ctx['price'] = Integer.parseInt(splits[2]);
        ctx.remove('content');
        """
      }
    }
  ]
}

测试结果

POST _ingest/pipeline/test-pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "content":"小米,小米汽车SU7,214900"
      }
    }
  ]
}

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_version": "-3",
        "_id": "_id",
        "_source": {
          "title": "小米汽车SU7",
          "brand": "小米",
          "price": 214900
        },
        "_ingest": {
          "timestamp": "2024-04-15T07:58:44.134495Z"
        }
      }
    }
  ]
}

4. 应用Ingest pipeline

Ingest pipeline创建好以后,主要有四类应用场景。

先创建一个名为”set-last-update-time-pipeline“的pipeline,它的目的是给文档设置一个最后修改的时间戳。

PUT _ingest/pipeline/set-last-update-time-pipeline
{
  "processors": [
    {
      "set": {
        "field": "last_update_time",
        "value": "{{{_ingest.timestamp}}}"
      }
    }
  ]
}

4.1 索引时指定pipeline

可以在索引文档时指定Pipeline,文档写入前会先经过管道的预处理。

下面是没指定Pipeline时写入文档和索引结果

POST test-index/_doc/1
{
  "title":"new doc"
}

{
  "_index": "test-index",
  "_id": "1",
  "_version": 1,
  "_seq_no": 0,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "title": "new doc"
  }
}

下面是指定Pipeline时索引文档和索引结果,发现文档自动添加了last_update_time字段。

POST test-index/_doc/1?pipeline=set-last-update-time-pipeline
{
  "title":"new doc"
}

{
  "_index": "test-index",
  "_id": "1",
  "_version": 1,
  "_seq_no": 0,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "title": "new doc",
    "last_update_time": "2024-04-15T08:11:50.448923Z"
  }
}

4.2 更新时指定pipeline

在更新文档时,也可以指定Pipeline对要更新的文档执行预处理操作。

如下示例,先索引两个文档

POST test-index/_doc/1
{
  "title":"new doc1"
}
POST test-index/_doc/2
{
  "title":"new doc2"
}

然后利用”_update_by_query“端点来批量更新文档,同时指定pipeline,即可给所有文档添加last_update_time

POST test-index/_update_by_query?pipeline=set-last-update-time-pipeline
{
  "query": {
    "match_all": {}
  }
}

下面是更新后的文档结果

{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 2,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "test-index",
        "_id": "1",
        "_score": 1,
        "_source": {
          "title": "new doc1",
          "last_update_time": "2024-04-15T08:17:24.527237Z"
        }
      },
      {
        "_index": "test-index",
        "_id": "2",
        "_score": 1,
        "_source": {
          "title": "new doc2",
          "last_update_time": "2024-04-15T08:17:24.528187Z"
        }
      }
    ]
  }
}

4.3 索引映射指定pipeline

如果不想每次写入和更新时都指定Pipeline,也可以在创建索引时指定Pipeline,如下示例:

PUT test-index
{
  "settings": {
    "index":{
      "default_pipeline":"set-last-update-time-pipeline"
    }
  }, 
  "mappings": {
    "properties": {
      "title":{
        "type": "keyword"
      }
    }
  }
}

索引文档或更新文档时,会自动触发对应的Pipeline对文档进行预处理。如下示例,写入两篇文档不指定Pipeline

POST test-index/_doc/1
{
  "title":"new doc1"
}
POST test-index/_doc/2
{
  "title":"new doc2"
}

检索文档发现均已添加last_update_time字段

[
  {
    "_index": "test-index",
    "_id": "1",
    "_score": 1,
    "_source": {
      "title": "new doc1",
      "last_update_time": "2024-04-15T08:21:49.591599Z"
    }
  },
  {
    "_index": "test-index",
    "_id": "2",
    "_score": 1,
    "_source": {
      "title": "new doc2",
      "last_update_time": "2024-04-15T08:21:49.592733Z"
    }
  }
]

4.4 reindex指定pipeline

Elasticsearch提供了reindex API用于将一个索引的文档复制到另一个索引,reindex过程也可以指定Pipeline在写入目标索引前进行预处理。

如下示例,先定义源索引以及写入两篇文档

PUT source-index
{
  "mappings": {
    "properties": {
      "title":{
        "type": "keyword"
      }
    }
  }
}

POST source-index/_doc
{
  "title":"doc1"
}
POST source-index/_doc
{
  "title":"doc2"
}

接着定义目标索引,映射新增last_update_time字段

PUT dest-index
{
  "mappings": {
    "properties": {
      "title":{
        "type": "keyword"
      },
      "last_update_time":{
        "type": "date"
      }
    }
  }
}

然后调用reindex API完成文档迁移

POST _reindex
{
  "source": {
    "index": "source-index"
  },
  "dest": {
    "index": "dest-index",
    "pipeline": "set-last-update-time-pipeline"
  }
}

最后查看dest-index文档数据,发现均添加last_update_time字段

[
  {
    "_index": "dest-index",
    "_id": "glfh4I4BXAgLe9UUkOwp",
    "_score": 1,
    "_source": {
      "title": "doc1",
      "last_update_time": "2024-04-15T08:43:18.905182Z"
    }
  },
  {
    "_index": "dest-index",
    "_id": "g1fh4I4BXAgLe9UUkuy1",
    "_score": 1,
    "_source": {
      "title": "doc2",
      "last_update_time": "2024-04-15T08:43:18.906068Z"
    }
  }
]

5. 尾巴

Ingest Pipelines 是Elasticsearch一个非常实用的功能,它类似于大数据中的ETL,在真正索引数据前,先经过Pipeline完成数据的清洗和加工,把原始数据转换成我们想要的格式再索引,利于后续的数据分析。默认情况下,所有节点都具备Ingest能力,如果要预处理大量文档,建议部署专门的ingest节点,避免影响到主数据节点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/891966.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux下Docker方式Jenkins安装和配置

一、下载&安装 Jenkins官方Docker仓库地址:https://hub.docker.com/r/jenkins/jenkins 从官网上可以看到,当前最新的稳定版本是 jenkins/jenkins:lts-jdk17。建议下在新的,后面依赖下不来 所以,我们这里,执行doc…

智绘城市地图:使用百度地图 API 实现智能定位

✨✨ 欢迎大家来访Srlua的博文(づ ̄3 ̄)づ╭❤~✨✨ 🌟🌟 欢迎各位亲爱的读者,感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢,在这里我会分享我的知识和经验。&am…

测网速小程序,纯前端

搜索:证寸照制作 源码介绍: 测网速小程序源码,是一款纯前端无需服务器的测网速小程序,依赖百度开发者中心js接口,真正的永久使用的小工具源码,很实用,可以单独运行,测网速很流畅~ 合法域名: ht…

深入理解 pnpm(Performant NPM) 的实现原理及其与 npm 的区别

深入理解 pnpm 的实现原理及其与 npm 的区别 在 JavaScript 生态系统中,包管理器是开发者日常工作中不可或缺的工具。npm(Node Package Manager)作为 Node.js 的默认包管理器,已经广泛应用于各种项目中。然而,随着项目…

力扣之接雨水(42)

刷题不在多,而在精。 这道题号称字节的保洁阿姨都能做出的。 方法一:动态规划 下面这幅图简直封神,看了下图我才做出来的。 两个方向遍历,然后取相同覆盖值-原始值(heigth数组) 这种方法更好理解。但是也有…

厨房老鼠数据集:掀起餐饮卫生监测的科技浪潮

厨房老鼠数据集:掀起餐饮卫生监测的科技浪潮 摘要:本文深入探讨了厨房老鼠数据集在餐饮行业卫生管理中的重要性及其相关技术应用。厨房老鼠数据集通过收集夜间厨房图像、老鼠标注信息以及环境数据,为深度学习模型提供了丰富的训练样本。基于…

新手爬虫DAY1

这个错误信息表明在你的Python程序中,re.search() 函数没有找到预期的匹配项,因此返回了 None。当你尝试在 None 对象上调用 group(1) 方法时,Python 抛出了一个 AttributeError。 具体来说,错误发生在 pc.py 文件的第6行&#x…

QT开发--QT基础

第0章 QT工具介绍 0.1 编译工具 uic,rcc,moc,qmake 都是 qt 的工具 uic 主要是 编译 .ui文件 -> ui_xxx.h //.ui文件 .h rcc 主要是 编译 资源文件.qrc文件 -> xxx.rcc …

某电子元器件企业人力资源管理体系搭建咨询项目

某电子元器件企业人力资源管理体系搭建咨询项目 ——搭建管理体系,梳理工作流程 【导读】 与其他同类企业一样,该电子公司面临招不到合适的人才、留不住人才的难题,自然也加大了人力资源管理的成本。公司人事部员工的工作基本上陷入了“招…

OpenUAV:首个专为现实无人机视觉语言导航设计的大规模轨迹数据集,由大约 12k 个轨迹组成,涵盖了多种环境和复杂的飞行动态。

2024-10-10,由北京航空航天大学人工智能研究所、香港中文大学MMLab以及感知与交互智能中心共同创建了OpenUAV数据集,首个专为现实无人机(UAV)视觉语言导航(VLN)任务设计的大型轨迹数据集,该数据…

波司登超1000+门店用钉钉Teambition开店管理,实现拓店“自动化”

门店开在哪里?什么时候装修?什么时候开门迎客? 在瞬息万变的零售行业,门店作为连接产品和消费者、融合线上和线下的核心场景,其运营效率和管理策略至关重要。 近日,波司登正式启用钉钉项目 Teambition&am…

【uniapp】打包成H5并发布

目录 1、设置配置mainifest.sjon 1.1 页面标题 1.2 路由模式 1.3 运行的基础路径 2、打包 2.1 打包入口 2.2 打包成功 2.3 依据目录找到web目录 3、 将web目录整体拷贝出来 4、上传 4.1 登录uniapp官网注册免费空间 4.2 上传拷贝的目录 4.3 检查上传是否正确 5、…

内容共创与UGC:TikTok腰部达人推动品牌海外传播新风向

当今数字营销的新时代,内容共创已成为品牌与用户之间构建深度互动的关键方式。在TikTok上,腰部达人通过UGC等形式,不仅能增强品牌与用户的互动性和参与度,还能够帮助品牌在海外市场上实现声量和知名度的提升。本文Nox聚星将和大家…

嵌入式开发学习日记——认识指针及和数组函数的联系(c语言)

一、指针的定义 一般格式: 数据类型 * 指针变量名 [初始地址值]; 数据类型是指针所指向的地址处的数据类型,如 int、char、float 等。 符号 * 用于通知系统,这里定义的是一个指针变量,通常跟在类型关键字的后面,表示…

从入门到高手的99个Python案例

想掌握Python编程语言,从零基础的小白晋升为大神?没问题!接下来我们将以轻松有趣的方式,逐一解锁Python学习路上的99个关键知识点。每一步都将结合实际应用场景、函数功能解析及简洁代码演示,带你深度领略Python的魅力…

为什么火箭回收技术如此重要?——以马斯克的星舰为例

引言 随着人类对太空探索的深入,火箭技术成为了人类通往星辰大海的关键工具。在这个领域,SpaceX 的火箭回收技术近年来取得了重要突破,尤其是其 “筷子夹火箭” 的设计进一步引发了广泛讨论。2024年10月13日,马斯克的第五次星舰试…

Flink窗口分配器WindowAssigner

前言 Flink 数据流经过 keyBy 分组后,下一步就是 WindowAssigner。 WindowAssigner 定义了 stream 中的元素如何被分发到各个窗口,元素可以被分发到一个或多个窗口中,Flink 内置了常用的窗口分配器,包括:tumbling wi…

而今再看unet

从最开始听到人用Unet左inpainting,再到自己使用Unet做图像去噪任务,虽然没有用Unet做过分割,但Unet也可以称得上是老朋友了。现在回头再看Unet,温故知新,一些魔鬼真就藏在一些细节之中。 structure 结构由forward函数…

【C++】:工厂模式

欢迎来到 破晓的历程的 博客 ⛺️不负时光,不负己✈️ 文章目录 简单工厂模什么是简单工厂模式?如何实现简单工厂模式? 工厂方法抽象工厂模式总结简单工厂模式工厂方法抽象工厂「Abstract Factory」 简单工厂模 什么是简单工厂模式&#xf…

【计算机网络】详解数据链路层数据帧Mac地址ARP协议

一、以太网帧 "以太网" 不是一种具体的网络,而是一种技术标准;既包含了数据链路层的内容,也包含了一些物理层的内容 。例如:规定了网络拓扑结构,访问控制方式,传输速率等;例如以太网中…