使用 Logstash 及 enrich processor 实现数据丰富自动化

在我之前的文章:

  • Elasticsearch:enrich processor (7.5发行版新功能)

  • Elasticsearch:使用 Elasticsearch ingest pipeline 丰富数据

通过上面的两篇文章的介绍,我们应该充分掌握了如何使用 enrich processor 来丰富数据了。特别是在上面的第二篇文章中,我们需要使用手动来一个一个地通过 Kibana 的界面来写入数据。我们感觉还是比较麻烦。如果我们能够实现自动化来完成整个的操作,那将是非常好的。在今天的文章中,我们将结合 enrich processor 和 Logstash 来实现数据的丰富自动化。我们可以利用 Linux 所提供的脚本来完成数据摄入的自动化。

在一下的展示中,我将使用如下的架构来进行展示:

数据描述

在进行我们的练习之前,我们下载所需要的数据及相关文档:

git clone https://github.com/evermight/elasticsearch-ingest
arallels@ubuntu2004:~/data/elasticsearch-ingest/part-3$ pwd
/home/parallels/data/elasticsearch-ingest/part-3
parallels@ubuntu2004:~/data/elasticsearch-ingest/part-3$ tree -L 3
.
├── 01-zip_geo.sh
├── 02-customer.sh
├── 03-product.sh
├── 04-order_item.sh
├── 05-order.sh
├── data
│   ├── customer
│   │   ├── data.csv
│   │   └── readme.txt
│   ├── mysql
│   │   ├── load.sql
│   │   └── readme.md
│   ├── order
│   │   ├── data.csv
│   │   └── data.xlsx
│   ├── order_item
│   │   ├── data.csv
│   │   └── data.xlsx
│   ├── product
│   │   └── data.csv
│   └── zip_geo
│       ├── data.csv
│       └── data.xlsx
├── env.sample
├── logstash
│   ├── customer.conf
│   ├── order.conf
│   ├── order_item.conf
│   ├── product.conf
│   └── zip_geo.conf
├── mapping
│   ├── customer.json
│   ├── order.json
│   └── zip_geo.json
├── part-3.pdf
├── part-3.pptx
├── pipeline
│   ├── customer.json
│   ├── order_item.json
│   └── order.json
├── policy
│   ├── customer.json
│   ├── order_item.json
│   ├── product.json
│   └── zip_geo.json
├── readme.md
├── run.sh
└── teardown.sh

如上所示,我们的文档结构如上所示。我们的数据结构如下:

我们有如上的几个表格。它们之间的数据是相互关联的。我们知道在 Elasticsearch 中的数据,它不像传统的关系数据库,在查询的时候,我们可以通过 join 来丰富数据,而且为了能够提高数据的查询速度,我们最好把数据实现扁平化,这也就是的数据的非规范化(denormalization)。我们可以详细阅读文章 “Elasticsearch:Elasticsearch 中索引映射的非规范化”。在摄入数据的时候,我们希望把相关的内容最终能丰富到最后的文档中。我们希望实现如下的内容:

从上面的最终结果,我们可以看出来,我们需要的数据来自不同的表格。这个需要我们使用 enrich processor 来帮我们完成。

文件目录描述

在项目的目录(part-3)下面,我们可以看到如下的几个子目录:

  • data:在这个目录里它含有我们需要的各个数据以及它们的来源
  • mapping:在这个目录中,它含有各个表格数据的 mapping。通常我们并不需要预先定义数据的类型。我们可以让 Elasticsearch 帮我们自动识别数据的类型,但这往往不是最佳的。通过定义相应数据的 mapping,一方面它可以帮忙明确地定义数据字段的类型,比如 geo_point 数据类型,另一方面,通过设置 mapping,也可以提高数据的摄入速度
  • policy:在这个目录中,它定义了使用 enrich processor 时所需要的 policies。
  • pipeline:在这个目录里,它定义了在 enrich 时,我们需要使用到的 enrich processor
  • logstash:在这个目录里,它定义了 Logstash 需要使用到的配置文件

写入文档的顺序

由于我们的数据是一个关系数据表格,在我们写入数据的时候,我们先从上面图中的右边开始写入数据,这是因为左边的表格依赖于右边的表格。只有它们的数据是准备好的状态,那么我们才可以利用它们来丰富左边的表格。这也就是我们看到的如下的脚本:

如上图所示,我们可以看到  

01-zip_geo.sh 
02-customer.sh
03-product.sh 
04-order_item.sh
05-order.sh  

这个其实就是我们执行脚本的顺序。我们需要按照上面的顺序从上到下来进行执行。

摄入数据

我们知道在我们摄入数据的时候,我们可以使用 Logstash 来写入 CSV 文档。Logstash 的好处是,它含有丰富的 filters 来供我们对数据进行处理。

针对 Elastic Stack 8.x 的安装来说,在默认的情况下,Elasticsearch 是带有安全的。针对自签名的集群来说,它通常还含有证书。针对带有安全的集群,我们可以参考文章 “Logstash:如何连接到带有 HTTPS 访问的集群”。下面,我们以摄入 zip_geo 为例来进行展示。在摄入数据的时候,我们需要使用到 fingerprint。我们可以参考文章 “Beats:使用 fingerprint 来连接 Beats/Logstash 和 Elasticsearch”。

在 logstash 目录下,我们可以看到如下的 zip_geo.conf 文档:

zip_geo.conf

input {
  file {
    path => "##PROJECTPATH##/data/zip_geo/data.csv"
    start_position => "beginning"
    sincedb_path => "/dev/null" 
    mode => "read"
    exit_after_read => true
    file_completed_action => "log"
    file_completed_log_path => "##PROJECTPATH##/.logstash-status"
  }
}

filter {
  csv {
    autodetect_column_names => true
  }
  mutate {
    convert => {
      "zip" => "integer"
      "point" => "string"
    }
  }
}

output {
  elasticsearch {
    hosts => ["##ELASTICHOST##"]
    ssl => ##ELASTICSSL##
    user => "##ELASTICUSER##"
    password => "##ELASTICPASS##"
    index => "zip_geo"
    ssl => true
    ca_trusted_fingerprint => "##FINGERPRINT##"
  }
}

这是一个标准的 Logstash 配置文件。在上面,我们可以看到一下奇奇怪怪的的像 ##PROJECTPATH## 这样的占位符号。这个需要在哪里配置呢?

我们回到项目的根目录下(part-3),我们可以看到一个叫做 env.sample 的文档。我们通过如下的命令来来创建一个叫做 .env 的文件:

cp env.sample .env

我们可以使用我们喜欢的编辑器来编辑这个 .env 文件:

vi .env
PROJECTPATH="/home/parallels/data/elasticsearch-ingest/part-3"
ELASTICHOST="192.168.0.3:9200"
ELASTICSSL="true"
ELASTICUSER="elastic"
ELASTICPASS="h6y=vgnen2vkbm6D+z6-"
FINGERPRINT="bd0a26dc646ef1cb3cb5e132e77d6113e1b46d56ee390dd3c6f0b2d2b16962c4"
LOGSTASHPATH="/home/parallels/elastic/logstash-8.8.2"

我们根据自己的配置填入上面的信息。其中 FINGERPRINT 最为简单的办法就是通过 Kibana 的配置文件 config/kibana.yml 文件来获得。我们保存好上面的文件。这里其实就是定义的环境变量。我们接下来查看 1-zip_geo.sh 文件:

1-zip_geo.sh

#!/bin/bash

source ./.env

hostprotocol="http"
if [ "$ELASTICSSL" = "true" ]; then
  hostprotocol="https"
fi

curl -k -X PUT -u $ELASTICUSER:$ELASTICPASS "$hostprotocol://$ELASTICHOST/zip_geo"
curl -k -X PUT -u $ELASTICUSER:$ELASTICPASS "$hostprotocol://$ELASTICHOST/zip_geo/_mapping" \
-H "Content-Type: application/json" \
-d @$PROJECTPATH/mapping/zip_geo.json


logstashconf=`cat ${PROJECTPATH}/logstash/zip_geo.conf`
logstashconf="${logstashconf//\#\#PROJECTPATH\#\#/"$PROJECTPATH"}"
logstashconf="${logstashconf//\#\#ELASTICHOST\#\#/"$ELASTICHOST"}"
logstashconf="${logstashconf//\#\#ELASTICSSL\#\#/"$ELASTICSSL"}"
logstashconf="${logstashconf//\#\#ELASTICUSER\#\#/"$ELASTICUSER"}"
logstashconf="${logstashconf//\#\#ELASTICPASS\#\#/"$ELASTICPASS"}"
logstashconf="${logstashconf//\#\#FINGERPRINT\#\#/"$FINGERPRINT"}"
$LOGSTASHPATH/bin/logstash -e "$logstashconf"

curl -k -X PUT -u $ELASTICUSER:$ELASTICPASS "$hostprotocol://$ELASTICHOST/_enrich/policy/zip_geo_policy" \
-H "Content-Type: application/json" \
-d @$PROJECTPATH/policy/zip_geo.json

sleep 30
curl -k -X PUT -u $ELASTICUSER:$ELASTICPASS "$hostprotocol://$ELASTICHOST/_enrich/policy/zip_geo_policy/_execute"

上面的代码看起来很负责,一下子看不太明白。在开始的部分,我们从环境变量里得到 ELASTICSSL 的值。如果 Elasticsearch 集群的访问是 https 访问的,那么这个值应该设置为 true。这个在接下来的 curl 指令中需要用到。值得注意的是:由于我们的集群是自签名的,我们使用 -k 选项来绕开证书的配置,尽管我们也可以通过设置来配置证书的访问。

记下来,我们使用 curl 指令来创建 zip_geo 索引。它的指令的格式有点类似:

curl -k -u elastic:h6y=vgnen2vkbm6D+z6- https://localhost:9200/zip_geo

如果是在 Kibana 中的 Dev Tools 中进行操作,它相当于:

PUT zip_geo

上述指令创建一个叫做 zip_geo 的指令。

接下来的指令,它相当于:

curl -k -X PUT -u elastic:h6y=vgnen2vkbm6D+z6- ”https://localhost:9200/zip_geo/_mapping" \
-H "Content-Type: application/json" \
-d /Users/liuxg/data/elasticsearch-ingest/part-3/mapping/zip_geo.json

上述命令相当于在 Kibana 中打入如下的命令:

PUT zip_geo/_mapping
{
  "properties": {
    "zip": {
      "type": "long"
    },
    "point": {
      "type": "geo_point"
    }
  }
}

下面的代码:

logstashconf=`cat ${PROJECTPATH}/logstash/zip_geo.conf`
logstashconf="${logstashconf//\#\#PROJECTPATH\#\#/"$PROJECTPATH"}"
logstashconf="${logstashconf//\#\#ELASTICHOST\#\#/"$ELASTICHOST"}"
logstashconf="${logstashconf///\#\#ELASTICSSL\#\#/"$ELASTICSSL"}"
logstashconf="${logstashconf//\#\#ELASTICUSER\#\#/"$ELASTICUSER"}"
logstashconf="${logstashconf//\#\#ELASTICPASS\#\#/"$ELASTICPASS"}"
logstashconf="${logstashconf//\#\#FINGERPRINT\#\#/"$FINGERPRINT"}"
./bin/logstash -e "$logstashconf"

这部分代码的真正意思是替换 zip_geo,conf 里含有 “## ... ##" 部分的字符串进行替换。如果你对这个不是很熟悉的话,请参阅网上的链接。在上面的最后部分,我们使用 Logstash 来运行在 logstashconf 变量里的管道。

下面的代码:

curl -k -X PUT -u $ELASTICUSER:$ELASTICPASS "$hostprotocol://$ELASTICHOST/_enrich/policy/zip_geo_policy" \
-H "Content-Type: application/json" \
-d @$PROJECTPATH/policy/zip_geo.json

它用来运行 zip_geo_policy 以生成相应的 .enrich_zip_geo_policy,,,,, 索引。它想到于如下的命令:

curl -k -X PUT -u elastic:h6y=vgnen2vkbm6D+z6- "https://localhost:9200/_enrich/policy/zip_geo_policy" \
-H "Content-Type: application/json" \
-d @$PROJECTPATH/policy/zip_geo.json

在 Kibana 中,我们可以打入如下的命令来实现同样的功能:

PUT /_enrich/policy/zip_geo_policy
{
  "match": {
    "indices": "zip_geo",
    "match_field": "zip",
    "enrich_fields": ["point"]
  }
}

由于生成丰富索引需要一定的时间,在脚本的部分,我们挂起 30 秒的时间,当然这个依赖于数据量的多少。

在最后的部分,我们执行:

curl -k -X PUT -u $ELASTICUSER:$ELASTICPASS "$hostprotocol://$ELASTICHOST/_enrich/policy/zip_geo_policy/_execute"

它相当于执行:

curl -k -X PUT -u elastic:h6y=vgnen2vkbm6D+z6- "https://localhost:9200/_enrich/policy/zip_geo_policy/_execute"

在 Kibana 中,我们可以通过如下的命令来完成相应的功能:


PUT /_enrich/policy/zip_geo_policy/_execute

好了,让我们来执行第一个脚本:

运行完,我们的第一个脚本后,我们可以在 Kibana 中进行查看:

我们按照同样的套路依次执行如下的脚本:

02-customer.sh
03-product.sh 
04-order_item.sh
05-order.sh  

在运行完 02-customer.sh 后,我们可以看到:

我们接着运行 02-product.sh 脚本。我们可以查看到 product 索引的文档:

我们再接着运行 04-order_item.sh 脚本:

我们接下来运行 05-order.sh:

从上面,我们可以看到我们最终想要的结果。

为了能删除所有之前创建的资源,我们可以一键删除:

./teardown.sh

然后,我们可以再使用一个命令来完成所有的运行:

parallels@ubuntu2004:~/data/elasticsearch-ingest/part-3$ cat run.sh
./01-zip_geo.sh
./02-customer.sh
./03-product.sh
./04-order_item.sh
./05-order.sh
./run.sh

特别注意的一点是,我们的 enrich processor 是在 ingest pipeline 里被调用的,比如:

output {
  elasticsearch {
    hosts => ["##ELASTICHOST##"]
    ssl => ##ELASTICSSL##
    user => "##ELASTICUSER##"
    password => "##ELASTICPASS##"
    index => "customer"
    pipeline => "customer_pipeline"
    ca_trusted_fingerprint => "##FINGERPRINT##"    
  }
}

你可以在地址下载所有的代码:GitHub - evermight/elasticsearch-ingest

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/50157.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

深度探索 Elasticsearch 8.X:function_score 参数解读与实战案例分析

在 Elasticsearch 中,function_score 可以让我们在查询的同时对搜索结果进行自定义评分。 function_score 提供了一系列的参数和函数让我们可以根据需求灵活地进行设置。 近期有同学反馈,function_score 的相关参数不好理解,本文将深入探讨 f…

重排链表——力扣143

文章目录 题目描述法一:寻找链表中点、链表逆序、链表合并 题目描述 法一:寻找链表中点、链表逆序、链表合并 void reorderList(ListNode* head){if(headnullptr){return;}// 找到中点 ListNode* mid FindMiddle(head);ListNode *h1head, *h2mid->ne…

tinkerCAD案例:22. Backpack Zipper Pull 背包拉链头

tinkerCAD案例:21. Custom Stamp 定制印章 原文 tinkerCAD案例:22. Backpack Zipper Pull 背包拉链头 Lesson Overview: 课程概述: Now we’re going to make a zipper pull! 现在我们要做一个拉链头! Your backpack, howev…

金融行业软件测试面试题及其答案

下面是一些常见的金融行业软件测试面试题及其答案: 1. 什么是金融行业软件测试? 金融行业软件测试是针对金融领域的软件系统进行验证和确认的过程,旨在确保软件在安全、稳定、可靠和符合法规要求的条件下运行。 2. 解释一下金融软件中的风险…

代码随想录算法训练营day15 | 102. 二叉树的层序遍历,226. 翻转二叉树,101. 对称二叉树

目录 102. 二叉树的层序遍历 226. 翻转二叉树 101. 对称二叉树 100. 相同的树 100是101的衍生题目。572也为101的衍生题目。 102. 二叉树的层序遍历 思路&#xff1a; 以前的笔记 代码&#xff1a; class Solution {public List<List<Integer>> levelOrder(T…

卸载大脑,相信DFS

切莫相信动规&#xff0c;吾将为您指明前进之路 印子 比赛时&#xff0c;你是否有这样的经历&#xff1a;不敢用for暴搜&#xff0c;又不会用数学公式推理&#xff1b;焦急地在纸上打草&#xff0c;却没有优化思路&#xff1b;明明比赛前一天晚上背了那么多模板却脑子一片空白…

生成模型和判别模型工作原理介绍

您解决的大多数机器学习和深度学习问题都是从生成模型和判别模型中概念化的。在机器学习中,人们可以清楚地区分两种建模类型: 将图像分类为狗或猫属于判别性建模生成逼真的狗或猫图像是一个生成建模问题神经网络被采用得越多,生成域和判别域就增长得越多。要理解基于这些模型…

Python Web开发技巧VIII

目录 ModelSerializer和Serializer区别是什么 从queryset中取出某个models的字段值 Q对象进行模糊匹配 HTTP方式-如何模糊搜索JSON字段中的某个KEY值呢&#xff1f; showmigrations 合并两个或多个queryset ModelSerializer和Serializer区别是什么 都是DRF中用于序列化和…

QT【day3】

思维导图&#xff1a; 闹钟&#xff1a; //widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> // #include<QTime> //定时器 #include<QDebug> // #in…

力扣算法数学类—剑指 Offer 43. 1~n 整数中 1 出现的次数

目录 剑指 Offer 43. 1&#xff5e;n 整数中 1 出现的次数 题解&#xff1a; 代码&#xff1a; 结果&#xff1a; 输入一个整数 n &#xff0c;求1&#xff5e;n这n个整数的十进制表示中1出现的次数。 例如&#xff0c;输入12&#xff0c;1&#xff5e;12这些整数中包含1 的…

【雕爷学编程】MicroPython动手做(10)——零基础学MaixPy之神经网络KPU

早上百度搜“神经网络KPU”&#xff0c;查到与非网的一篇文章《一文读懂APU/BPU/CPU/DPU/EPU/FPU/GPU等处理器》&#xff0c;介绍各种处理器非常详细&#xff0c;关于“KPU”的内容如下&#xff1a; KPU Knowledge Processing Unit。 嘉楠耘智&#xff08;canaan&#xff09;号…

web流程自动化详解

今天给大家带来Selenium的相关解释操作 一、Selenium Selenium是一个用于自动化Web浏览器操作的开源工具和框架。它提供了一组API&#xff08;应用程序接口&#xff09;&#xff0c;可以让开发人员使用多种编程语言&#xff08;如Java、Python、C#等&#xff09;编写测试脚本&…

PS软件打开闪退是什么原因?怎么处理闪退的问题?

Photoshop简称PS&#xff0c;它作为图像处理专家&#xff0c;具有相当强大的功能&#xff0c;但是有小伙伴说不好用&#xff0c;因为打开后会闪退&#xff0c;那该怎么办呢&#xff1f; PS软件闪退的处理方法&#xff1a; 1.下载并安装Adobe Creative Cloud&#xff0c;再登录…

Vue3 word如何转成pdf代码实现

&#x1f642;博主&#xff1a;锅盖哒 &#x1f642;文章核心&#xff1a;word如何转换pdf 目录 1.前端部分 2.后端部分 在Vue 3中&#xff0c;前端无法直接将Word文档转换为PDF&#xff0c;因为Word文档的解析和PDF的生成通常需要在后端进行。但是&#xff0c;你可以通过Vu…

output delay 约束

output delay 约束 一、output delay约束概述二、output delay约束系统同步三、output delay约束源同步 一、output delay约束概述 特别注意&#xff1a;在源同步接口中&#xff0c;定义接口约束之前&#xff0c;需要用create_generated_clock 先定义送出的随路时钟。 二、out…

汽车交流充电桩控制主板的电路设计

汽车充电桩控制主板的电路设计 你是否曾经遇到过汽车没油的问题?但是&#xff0c;随着电动汽车的普及&#xff0c;充电问题也变得越来越重要。而汽车充电桩控制板电路设计则是解决这一问题的关键。 汽车充电桩控制板电路设计包括硬件电路设计、软件电路设计和安全性设计。硬件…

内网隧道代理技术(十三)之内网代理介绍

前言 什么?你问我内网隧道代理技术怎么突然就第十三篇了,第十二篇呢?这个,因为某些不可抗拒力量,第十二篇博客无法发表,如果想要查阅,请加内网渗透qq群:838076210 内网代理介绍 内网代理介绍 内网资产扫描这种场景一般是进行内网渗透才需要的代理技术,如果你不打内…

No104.精选前端面试题,享受每天的挑战和学习(小米)

文章目录 聊一下vue和react的区别react生命周期有哪些hooks解决了什么问题小程序跳转传参怎么传附录&#xff1a;「简历必备」前后端实战项目&#xff08;推荐&#xff1a;⭐️⭐️⭐️⭐️⭐️&#xff09; &#x1f4c8;「作者简介」&#xff1a;前端开发工程师 | 蓝桥云课签…

Jenkins 配置maven和jdk

前提:服务器已经安装maven和jdk 一、在Jenkins中添加全局变量 系统管理–>系统配置–>全局属性–>环境变量 添加三个全局变量 JAVA_HOME、MAVEN_HOME、PATH 二、配置maven 系统管理–>全局工具配置–>maven–>新增 新增配置 三、配置JDK 在系统管…

查看GPU使用的最佳方式

1. watch -n 1 nvidia-smi (最有名,没有之一) nvidia自带了一个nvidia-smi的命令行工具,会显示GPU使用情况 ​​​​​​​ 作为监控 GPU 的工具就显得有点过于简陋了。比如 Process name 栏只显示命令行的程序名,不显示参数,这样输出结果就是一堆 python 和 .../Minico…