Elasticsearch:使用 Elasticsearch ingest pipeline 丰富数据

在我之前的文章:

  • Elasticsearch:如何使用 Elasticsearch ingest 节点来丰富日志和指标

  • Elasticsearch:enrich processor (7.5发行版新功能)

我有详细描述如何使用 ingest pipeline 来丰富数据。在今天的文章中里,我们来更加详细地使用一个具体的例子来进行展示。更多官方文档描述,我们可以详细参阅文章 Enrich your data | Elasticsearch Guide [8.8] | Elastic。

什么是丰富数据

简单地说,我们可以使用其他的数据集里的数据添加到现有的数据集中。这样在我们的最终的数据集中,它含有另外一个数据集里的数据供我们分析数据。我们知道如果是独立于 Elasticsearch 的数据库,我们只有通过 Logstash 来完成这种操作。针对两个不同的 Elasticsearch 索引来说,我们可以使用 enrich processor 来完成两个不同的数据集之间的 “join” 操作。比如:

如上所示,我们有两个数据集:registration 及 customer。他们的数据分别如上所示。 它们是以 JSON 格式来进行表达的。我们可以通过 email 进行关联,那么最终我们可以得到如图右边的那个被丰富的数据。这个数据它不仅含有 registraion 里的数据,而且它还含有 customer 里的数据。从某种意义上讲,我们把两个不同的数据集通过 email 进行关联,并最终形成了一个被丰富的数据集。这个对于我们最终分析数据非常有效。

Elasticsearch enrich processor 的工作流程如下:

数据描述 

我们假想有一个活动的  signup 应用。在活动签名的时候这个应用收集了如下的信息:

如上所示,它只含有 email,location_id,paid_amount 及 product 四个字段的信息。这个信息被保存于一个 CSV 文件中。之后,市场部门提供了更多的信息表格给我我们。这些信息包含 location,member_type 及 user 等信息。

为了方便大家理解这个问题,我们可以在地址 GitHub - liu-xiao-guo/elasticsearch-ingest  找到相应的数据表述:

location.csv

 user.csv

member_type.csv

 signup.csv

 我们希望通过 enrich processor 的处理,我们最终能得到像如下结果的数据集:

也就是说,我们通过 enrich processor 的一番操作,我们可以把匹配的 user,location 及 member_type 信息添加进来,也即丰富原来的数据。

 

导入数据

我们可以使用 Kibana 来写入数据:

导入 user.csv

 

 

 

 

导入 location.csv

 

 

 

 在上面我们需要修改 point 为 geo_point 数据类型。

 

 

 

导入 member_type.csv

按照同样的方法,我们来导入 member_type.csv:

 

 

 在上面,我们添加了如下的 json processor:

    {
      "json" : {
        "field" : "price_range"
      }
    }

 

 

 

 

创建 enrich policy

我们可以参考链接:https://github.com/liu-xiao-guo/elasticsearch-ingest/blob/master/part-2/policy/user.txt

// Create users policy
PUT /_enrich/policy/user_policy
{
  "match": {
    "indices": "user",
    "match_field": "email",
    "enrich_fields": ["first_name", "last_name", "city", "zip", "state"]
  }
}

PUT /_enrich/policy/user_policy/_execute

我们在 Kibana 中运行上面的命令。在上面的 user_policy 中,我们使用 user 索引中的 email 字段,如果有匹配的话,那么 user 索引中相应的文档的 first_name,last_name,city,zip 及 state 将被丰富到文档中。

我们参考链接:https://github.com/liu-xiao-guo/elasticsearch-ingest/blob/master/part-2/policy/location.txt

PUT /_enrich/policy/location_policy
{
  "match": {
    "indices": "location",
    "match_field": "location_id",
    "enrich_fields": ["point"]
  }
}

PUT /_enrich/policy/location_policy/_execute

 在 Kibana 中运行上面的命令。

我们参考链接:https://github.com/liu-xiao-guo/elasticsearch-ingest/blob/master/part-2/policy/member_type.txt

// Create member_type policy
PUT /_enrich/policy/member_type_policy
{
  "range": {
    "indices": "member_type",
    "match_field": "price_range",
    "enrich_fields": ["member_type"]
  }
}

PUT /_enrich/policy/member_type_policy/_execute

我们在 Kibana 中运行上面的命令。

我们可以在 index management 中查看到新生成的 enrich index:

导入 signup.csv

 

 

 

如果你看看我们之前想要的结果的数据 mapping:

我们需要添加 geo 字段:

    "geo": {
      "properties": {
        "point": {
          "type": "geo_point"
	      }
      }
    }

我们需要更进一步修改 ingest pipeline。我们参考链接:https://github.com/liu-xiao-guo/elasticsearch-ingest/blob/master/part-2/pipeline/signup.json

我们需要添加如下的三个 enrich processor:

    {
      "enrich" : {
        "description": "Add 'user' data based on 'email'",
        "policy_name": "user_policy",
        "field" : "email",
        "target_field": "user",
        "max_matches": "1"
      }
    },
    {
      "enrich" : {
        "description": "Add 'member_type' data based on 'paid_amount'",
        "policy_name": "member_type_policy",
        "field" : "paid_amount",
        "target_field": "member_type",
        "max_matches": "1"
      }
    },
    {
      "enrich" : {
        "description": "Add 'geo' data based on 'location_id'",
        "policy_name": "location_policy",
        "field" : "location_id",
        "target_field": "geo",
        "max_matches": "1"
      }
    },

 点击上面的 import 按钮:

 

 

我们接下来针对 signup 索引来做一个搜索:

GET signup/_search?filter_path=**.hits

上面的命令返回的结果为:

{
  "hits": {
    "hits": [
      {
        "_index": "signup",
        "_id": "Q9mvgokBWubr9hCu1VXI",
        "_score": 1,
        "_source": {
          "member_type": {
            "member_type": "regular",
            "price_range": {
              "lte": 5
            }
          },
          "geo": {
            "location_id": 2351,
            "point": "POINT(-71.61 42.28)"
          },
          "product": "earlybird",
          "paid_amount": 5,
          "user": {
            "zip": 9303,
            "city": "Arleta",
            "last_name": "Fly",
            "state": "CA",
            "first_name": "Marty",
            "email": "martymcfly@backtothefuture.com"
          },
          "email": "martymcfly@backtothefuture.com",
          "location_id": 2351
        }
      },
      {
        "_index": "signup",
        "_id": "RNmvgokBWubr9hCu1VXI",
        "_score": 1,
        "_source": {
          "member_type": {
            "member_type": "regular",
            "price_range": {
              "lte": 5
            }
          },
          "geo": {
            "location_id": 2322,
            "point": "POINT(-71.63 42.56)"
          },
          "product": "earlybird",
          "paid_amount": 5,
          "user": {
            "zip": 58008,
            "city": "Springfield",
            "last_name": "Simpson",
            "state": "OR",
            "first_name": "Homer",
            "email": "homersimpson@springfield.com"
          },
          "email": "homersimpson@springfield.com",
          "location_id": 2322
        }
      },
      {
        "_index": "signup",
        "_id": "RdmvgokBWubr9hCu1VXI",
        "_score": 1,
        "_source": {
          "member_type": {
            "member_type": "premium",
            "price_range": {
              "gt": 5
            }
          },
          "geo": {
            "location_id": 2019,
            "point": "POINT(-72.68 42.2)"
          },
          "product": "regular",
          "paid_amount": 10,
          "user": {
            "zip": 99686,
            "city": "Valdez",
            "last_name": "Riker",
            "state": "AK",
            "first_name": "Will",
            "email": "willriker@federation.com"
          },
          "email": "willriker@federation.com",
          "location_id": 2019
        }
      }
    ]
  }
}

从上面的输出中,我们可以看出来我们已经成功地丰富了 signup 索引。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/48061.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【已解决】 Celery 报错:AttributeError: ‘EntryPoints‘ object has no attribute ‘get‘

【已解决】 Celery 报错:AttributeError: EntryPoints object has no attribute get 1、起因2、实验环境3、解决方案 1、起因 今天闲来无事学习 Celery 分布式任务队列,写好代码发布并执行,报错了 AttributeError: EntryPoints object has n…

【数据结构】实验七:字符串

实验七 字符串实验报告 一、实验目的与要求 1)巩固对串的理解; 2)掌握串的基本操作实现; 3)掌握 BF 和 KMP 算法思想。 二、实验内容 1. 给定一个字符串ababcabcdabcde和一个子串abcd,查找字串是否在主串中出现。…

Oracle 多条记录根据某个字段获取相邻两条数据间的间隔天数,小于31天的记录都筛选出来

需求描述:在Oracle中 住院记录记录表为v_hospitalRecords,表中FIHDATE入院时间,FBIHID是住院号, 我想查询出每个患者在他们的所有住院记录中是否在一个月内再次入院(相邻的两条记录进行比较),并且住院记录大于一的患者…

【高分论文密码】大尺度空间模拟预测与数字制图教程

详情点击链接:【高分论文密码】大尺度空间模拟预测与数字制图 一,R语言空间数据及数据挖掘关键技术 1、R语言空间数据及应用特点 1)R语言基础与数据科学 2)R空间矢量数据 3)R栅格数据 2、R语言空间数据挖掘关键技术 二,R语言空间数据高…

ChatGPT有几个版本,哪个版本最强,如何选择适合自己的?

​ChatGPT就像内容生产界的瑞士军刀。它可以是数学导师、治疗师、职业顾问、编程助手,甚至是旅行指南。只要你知道如何让它做你想做的事,ChatGPT几乎可以提供你要的任何东西。 但重要的是,你知道哪个版本的ChatGPT最能满足你的需求吗&#x…

C++容器——list的模拟实现

目录 一.list的基本结构 二. 接下来就是对list类构造函数的设计了: 三.链表数据的增加: 四.接下来就是迭代器的创建了: 四.简单函数的实现: 五.构造与析构 六.拷贝构造和赋值重载 传统写法: 现代写法: 七.迭…

运维高级--shell脚本完成分库分表

为什么要进行分库分表 随着系统的运行,存储的数据量会越来越大,系统的访问的压力也会随之增大,如果一个库中的表数据超过了一定的数量,比如说MySQL中的表数据达到千万级别,就需要考虑进行分库分表; 其…

基于拉格朗日-遗传算法的最优分布式能源DG选址与定容(Matlab代码实现)

目录 1 概述 2 数学模型 2.1 问题表述 2.2 DG的最佳位置和容量(解析法) 2.3 使用 GA 进行最佳功率因数确定和 DG 分配 3 仿真结果与讨论 3.1 33 节点测试配电系统的仿真 3.2 69 节点测试配电系统仿真 4 结论 1 概述 为了使系统网损达到最低值&a…

Paragon NTFS2023最新版Mac读写NTFS磁盘工具

Paragon NTFS for Mac是Mac平台上一款非常优秀的读写工具,可以在Mac OS X中完全读写、修改、访问NTFS硬盘、U盘等外接设备的文件。这款软件最大的亮点简书可以让我们读写 NTFS 分区,因为在Mac OS X 系统上,默认状态下我们只能读取NTFS 分区&a…

【Ubuntu18.04免密码登录SSH】

Ubuntu18.04免密码登录SSH 1 查看Ubuntu18.04系统中是否存在SSH服务2 配置SSH2.1 先删除一下ssh的目录,重新配置2.2 生存公钥和私钥2.3 将公钥上传到需要登录的服务器2.4 测试登录 1 查看Ubuntu18.04系统中是否存在SSH服务 sudo ps -e |grep ssh没有的话&#xff0…

网络安全(黑客)自学基础到高阶路线

01 什么是网络安全 网络安全可以基于攻击和防御视角来分类,我们经常听到的 “红队”、“渗透测试” 等就是研究攻击技术,而“蓝队”、“安全运营”、“安全运维”则研究防御技术。 无论网络、Web、移动、桌面、云等哪个领域,都有攻与防两面…

[JavaWeb]MySQL的安装与介绍

MySQL的安装与介绍 一.数据库相关概念1.1 数据库1.2 常见的关系型数据库管理系统 二.MySQL数据库1.MySQL的安装2.配置环境变量3.新建MySQL配置文件4.初始化MySQL5.注册MySQL的服务6.修改默认账户与密码7.连接MySQL服务8.MySQL的卸载 三.MySQL的数据模型1.关系型数据库 一.数据库…

Gitlab 备份与恢复

备份 1、备份数据(手动备份) gitlab-rake gitlab:backup:create2、备份数据(定时任务备份) [rootlocalhost ]# crontab -l 00 1 * * * /opt/gitlab/bin/gitlab-rake gitlab:backup:create 说明:每天凌晨1点备份数据…

C++之lambda表达式/function/using/typedef用法总结(一百六十六)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 人生格言: 人生…

软件设计师学习第一章

计算机组成与体系结构(6分) 内容概述 数据的表示 进制转换 R 进制转十进制使用按权展开法,其具体操作方式为:将 R 进制数的每一位数值用 Rk 形示,即幂的底数是 R ,指数为 k , k 与该位和小数点…

惠普HP Color Laser 150a开机红色感叹号闪烁不打印故障解决方法

故障描述: 惠普HP Color Laser 150a开机红色感叹号闪烁,不能打印,电脑提示C3-6140。 检测分析: 在解决C3-6140错误代码之前,我们需要先检查打印机是否连接正常。如果打印机连接不正常,也可能会出现这个错误…

2、HAproxy调度算法

HAProxy的调度算法可以大致分为以下几大类: 静态算法:这类算法的调度策略在配置时就已经确定,并且不会随着负载的变化而改变。常见的静态算法有: Round Robin(轮询) Least Connections(最少连接数) Static-Weight(静态权重) Sourc…

总结 Android 开发中截取字符串的方法

string str”hello word”;int i5; 1 取字符串的前i个字符 strstr.Substring(0,i); // or strstr.Remove(i,str.Length-i);substring(start,end):substring是截取2个位置之间及start-end之间的字符串2 去掉字符串的前i个字符: strstr.Remove(0,i); // or…

LabVIEW开发谐振器陀螺仪仿真系统

LabVIEW开发谐振器陀螺仪仿真系统 陀螺仪是INS系统中最重要的传感器。它们的性能(如精度和偏置稳定性)决定了INS系统的水平。陀螺仪按原理分为三类:角动量守恒、萨格纳克效应和科里奥利效应。旋转坐标系中的移动物体受到的力与旋转坐标系的角…

flutter:角标

角标应该非常常见了,以小说app为例,通常会在小说封面的右上角上显示当前未读的章数。 badges 简介 Flutter的badges库是一个用于创建徽章组件的开源库。它提供了简单易用的API,使开发者可以轻松地在Flutter应用程序中添加徽章效果。 官方文…