使用 Logstash 丰富你的 Elasticsearch 文档

作者:来自 Elastic David Pilato

我们在上一篇文章中看到,我们可以使用摄取管道中的 Elasticsearch Enrich Processor 在 Elasticsearch® 中进行数据丰富。 但有时,你需要执行更复杂的任务,或者你的数据源不是 Elasticsearch,而是另一个源。 或者,你可能希望存储在 Elasticsearch 和第三方系统中,在这种情况下,将管道的执行转移到 Logstash® 很有意义。

使用 Elasticsearch 丰富 Elasticsearch 数据

使用 Logstash,使用类似于以下的管道,这非常容易:

input {
  # Read all documents from Elasticsearch
  elasticsearch {
    hosts => ["${ELASTICSEARCH_URL}"]
    user => "elastic"
    password => "${ELASTIC_PASSWORD}"
    index => "kibana_sample_data_logs"
    docinfo => true
    ecs_compatibility => "disabled"
  }
}

filter {
  # Enrich every document with Elasticsearch
  elasticsearch {
    hosts => ["${ELASTICSEARCH_URL}"]
    user => "elastic"
    password => "${ELASTIC_PASSWORD}"
    index => "vip"
    query => "ip:%{[clientip]}"
    sort => "ip:desc"
    fields => {
      "[name]" => "[name]"
      "[vip]" => "[vip]"
    }
  }
  mutate { 
    remove_field => ["@version", "@timestamp"] 
  }
}

output {
  if [name] {
    # Write all modified documents to Elasticsearch
    elasticsearch {
      manage_template => false
      hosts => ["${ELASTICSEARCH_URL}"]
      user => "elastic"
      password => "${ELASTIC_PASSWORD}"
      index => "%{[@metadata][_index]}"
      document_id => "%{[@metadata][_id]}"
    }
  }
}

总共,我们有 14074 个事件需要解析。 虽然不是很多,但对于这个演示来说已经足够了。 这是一个示例事件:

{
  "agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
  "bytes": 1831,
  "clientip": "30.156.16.164",
  "extension": "",
  "geo": {
    "srcdest": "US:IN",
    "src": "US",
    "dest": "IN",
    "coordinates": {
      "lat": 55.53741389,
      "lon": -132.3975144
    }
  },
  "host": "elastic-elastic-elastic.org",
  "index": "kibana_sample_data_logs",
  "ip": "30.156.16.163",
  "machine": {
    "ram": 9663676416,
    "os": "win xp"
  },
  "memory": 73240,
  "message": "30.156.16.163 - - [2018-09-01T12:43:49.756Z] \"GET /wp-login.php HTTP/1.1\" 404 1831 \"-\" \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24\"",
  "phpmemory": 73240,
  "referer": "http://www.elastic-elastic-elastic.com/success/timothy-l-kopra",
  "request": "/wp-login.php",
  "response": 404,
  "tags": [
    "success",
    "info"
  ],
  "timestamp": "2023-03-18T12:43:49.756Z",
  "url": "https://elastic-elastic-elastic.org/wp-login.php",
  "utc_time": "2023-03-18T12:43:49.756Z",
  "event": {
    "dataset": "sample_web_logs"
  }
}

正如我们在上一篇文章中看到的,vip 索引包含有关我们客户的信息:

{ 
  "ip" : "30.156.16.164", 
  "vip": true, 
  "name": "David P" 
}

我们可以通过以下方式运行管道:

docker run \
  --name=logstash \
  --rm -it \
  -v $(pwd)/logstash-config/pipeline/:/usr/share/logstash/pipeline/ \
  -e XPACK_MONITORING_ENABLED=false \
  -e ELASTICSEARCH_URL="$ELASTICSEARCH_URL" \
  -e ELASTIC_PASSWORD="$ELASTIC_PASSWORD" \
  docker.elastic.co/logstash/logstash:8.12.0

丰富的文档现在看起来像这样:

{
  "agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
  "bytes": 1831,
  "clientip": "30.156.16.164",
  "extension": "",
  "geo": {
    "srcdest": "US:IN",
    "src": "US",
    "dest": "IN",
    "coordinates": {
      "lat": 55.53741389,
      "lon": -132.3975144
    }
  },
  "host": "elastic-elastic-elastic.org",
  "index": "kibana_sample_data_logs",
  "ip": "30.156.16.163",
  "machine": {
    "ram": 9663676416,
    "os": "win xp"
  },
  "memory": 73240,
  "message": "30.156.16.163 - - [2018-09-01T12:43:49.756Z] \"GET /wp-login.php HTTP/1.1\" 404 1831 \"-\" \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24\"",
  "phpmemory": 73240,
  "referer": "http://www.elastic-elastic-elastic.com/success/timothy-l-kopra",
  "request": "/wp-login.php",
  "response": 404,
  "tags": [
    "success",
    "info"
  ],
  "timestamp": "2023-03-18T12:43:49.756Z",
  "url": "https://elastic-elastic-elastic.org/wp-login.php",
  "utc_time": "2023-03-18T12:43:49.756Z",
  "event": {
    "dataset": "sample_web_logs"
  },
  "vip": true,
  "name": "David P"
}

实际上很简单,但有一个问题:速度很慢。 通过网络进行查找,尽管 Elasticsearch 速度极快,但仍然会减慢整个管道的速度。

使用静态 JDBC 过滤器

我最近在 ParisJUG 遇到了 Laurent,他来自令人惊叹的 Elastic Consulting 团队,我们讨论了这个问题。 他告诉我,他的一位客户必须面对这个问题。 他建议改用 Logstash 中的 Elasticsearch 缓存。

问题是:Logstash 中没有这样的过滤器缓存插件。 他找到了一种非常聪明的方法来解决该问题,即利用静态 JDBC 过滤器插件和 Elasticsearch JDBC 驱动程序。

请注意,这需要拥有白金许可证(或试用版)。

添加 Elasticsearch JDBC 驱动程序

我们首先需要将 JDBC 驱动程序添加到 Logstash 实例中。

mdir -p logstash-config/lib
wget https://artifacts.elastic.co/maven/org/elasticsearch/plugin/x-pack-sql-jdbc/8.12.0/x-pack-sql-jdbc-8.12.0.jar
mv x-pack-sql-jdbc-8.12.0.jar logstash-config/lib

我们只需要与 Logstash docker 实例共享此目录:

time docker run \
  --name=logstash \
  --rm -it \
  -v $(pwd)/logstash-config/pipeline/:/usr/share/logstash/pipeline/ \
  -v $(pwd)/logstash-config/lib/:/tmp/lib/ \
  -e XPACK_MONITORING_ENABLED=false \
  -e ELASTICSEARCH_URL="$ELASTICSEARCH_URL" \
  -e ELASTIC_PASSWORD="$ELASTIC_PASSWORD" \
  docker.elastic.co/logstash/logstash:8.12.0

更新管道

input 部分不变。 但现在,我们要在内存中创建一个名为 vip 的临时表(为了保持一致性)。 该表结构是使用 local_db_objects 参数定义的:

jdbc_static {
  local_db_objects => [ {
    name => "vip"
    index_columns => ["ip"]
    columns => [
      ["name", "VARCHAR(255)"],
      ["vip", "BOOLEAN"],
      ["ip", "VARCHAR(64)"]
    ]
  } ]
}

当 jdbc_static 启动时,我们要首先从 Elasticsearch vip索引中读取所有数据集。 这是在 loaders 选项中完成的:

jdbc_static {
  loaders => [ {
    query => "select name, vip, ip from vip"
    local_table => "vip"
  } ]
  jdbc_user => "elastic"
  jdbc_password => "${ELASTIC_PASSWORD}"
  jdbc_driver_class => "org.elasticsearch.xpack.sql.jdbc.EsDriver"
  jdbc_driver_library => "/tmp/lib/x-pack-sql-jdbc-8.12.0.jar"
  jdbc_connection_string => "jdbc:es://${ELASTICSEARCH_URL}"
}

每次我们需要进行查找时,我们都希望使用以下语句来执行它:

SELECT name, vip FROM vip WHERE ip = "THE_IP"

这可以使用 local_lookups 参数定义:

jdbc_static {
  local_lookups => [ {
    query => "SELECT name, vip FROM vip WHERE ip = :ip"
    parameters => { "ip" => "clientip" }
    target => "vip"
  } ]
}

如果没有找到数据,我们可以使用 default_hash 选项提供默认值:

jdbc_static {
  local_lookups => [ {
    query => "SELECT name, vip FROM vip WHERE ip = :ip"
    parameters => { "ip" => "clientip" }
    target => "vip" 
    default_hash => {
      name => nil
      vip => false
    }
  } ]
}

最后,这将在事件中生成 vip.name 和 vip.vip 字段。

我们现在可以定义我们想要对这些临时字段执行的操作:

jdbc_static {
  add_field => { name => "%{[vip][0][name]}" }
  add_field => { vip => "%{[vip][0][vip]}" }
  remove_field => ["vip"]
}

这给出了以下过滤器:

filter {
  # Enrich every document with Elasticsearch via static JDBC
  jdbc_static {
    loaders => [ {
      query => "select name, vip, ip from vip"
      local_table => "vip"
    } ]
    local_db_objects => [ {
      name => "vip"
      index_columns => ["ip"]
      columns => [
        ["name", "VARCHAR(255)"],
        ["vip", "BOOLEAN"],
        ["ip", "VARCHAR(64)"]
      ]
    } ]
    local_lookups => [ {
      query => "SELECT name, vip FROM vip WHERE ip = :ip"
      parameters => { "ip" => "clientip" }
      target => "vip" 
      default_hash => {
        name => nil
        vip => false
      }
    } ]
    add_field => { name => "%{[vip][0][name]}" }
    add_field => { vip => "%{[vip][0][vip]}" }
    remove_field => ["vip"]
    jdbc_user => "elastic"
    jdbc_password => "${ELASTIC_PASSWORD}"
    jdbc_driver_class => "org.elasticsearch.xpack.sql.jdbc.EsDriver"
    jdbc_driver_library => "/tmp/lib/x-pack-sql-jdbc-8.12.0.jar"
    jdbc_connection_string => "jdbc:es://${ELASTICSEARCH_URL}"
  }
  mutate { 
    remove_field => ["@version", "@timestamp"] 
  }
}

将修改后的文档写入Elasticsearch

在第一个管道中,我们测试事件中是否确实存在名称字段:

if [name] {
  # Index to Elasticsearch
}

我们仍然可以使用类似的东西,但因为我们提供了默认值,以防在 Elasticsearch vip 索引中找不到 ip,所以现在它会在标签表中生成一个新的 _jdbcstaticdefaultsused 标签。

我们可以用它来知道我们是否发现了某些东西,如果是前者,则将我们的数据发送到 Elasticsearch:

output {
  if "_jdbcstaticdefaultsused" not in [tags] {
    # Write all the modified documents to Elasticsearch
    elasticsearch {
      manage_template => false
      hosts => ["${ELASTICSEARCH_URL}"]
      user => "elastic"
      password => "${ELASTIC_PASSWORD}"
      index => "%{[@metadata][_index]}"
      document_id => "%{[@metadata][_id]}"
    }
  }
}

更快吗?

因此,当我们在这个小数据集上运行测试时,我们可以看到,使用 Elasticsearch 过滤器方法,需要两分钟多一点的时间来丰富我们的数据集:

real    2m3.146s
user    0m0.077s
sys     0m0.042s

当使用 JDBC 静态过滤器方法运行管道时,现在只需不到一分钟:

real    0m48.575s
user    0m0.064s
sys     0m0.039s

正如我们所看到的,我们显着减少了该丰富管道的执行时间(增益约为 60%)。

如果你有一个可以轻松放入 Logstash JVM 内存的小型 Elasticsearch 索引,你可以尝试此策略(或类似的策略)。 如果你有数亿个文档,你仍然应该使用 Elasticsearch Filter Plugin。

结论

在这篇文章中,我们了解了当我们需要在 Elasticsearch 中执行一些查找时,如何使用 JDBC 静态过滤器插件来加速数据丰富管道。 在下一篇文章中,我们将了解如何使用 Elastic Agent 在边缘进行类似的丰富。

本文中描述的任何特性或功能的发布和时间安排均由 Elastic 自行决定。 当前不可用的任何特性或功能可能无法按时交付或根本无法交付

更多阅读:

  • Logstash:Jdbc static filter plugin 介绍

  • Logstash:运用 jdbc_streaming 来丰富我们的数据

原文:Enrich your Elasticsearch documents with Logstash | Elastic Blog

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/447512.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

掌握这3种方法,mp3格式转换就是这么简单!

掌握MP3格式转换并不需要复杂的技术或专业知识。在数字化时代,我们有许多简单而有效的方法可以实现这一目标。无论是为了节省存储空间,提高音频文件的兼容性,还是其他需求,本文将介绍三种简单的方法,让您轻松掌握mp3格…

编程界的圣经:从Scheme到JavaScript构建你的计算思维

文章目录 适读人群目 录 《计算机程序的构造和解释》(Structure and Interpretation of Computer Programs,简记为SICP)是MIT的基础课教材,出版后引起计算机教育界的广泛关注,对推动全世界大学计算机科学技术教育的发…

掌握潮流,使用渐变色彩图标icon,打造独特风格!

渐变色图标icon非常抢眼,从日常使用频率最高的手机到街上随处可见的海报,通常色彩搭配出众,让人感觉很惊艳。对色彩搭配的不同理解会影响我们设计产品的最终性能。本文将带您了解在UI设计圈兴起的时尚色彩组合——什么是渐变色,如…

Docker进阶:深入理解 Dockerfile

Docker进阶:深入理解 Dockerfile 一、Dockerfile 概述二、为什么要学习Dockerfile三、Dockerfile 编写规则四、Dockerfile 中常用的指令1、FROM2、LABEL3、RUN4、CMD5、ENTRYPOINT6、COPY7、ADD8、WORKDIR9、 ENV10、EXPOSE11、VOLUME12、USER13、注释14、ONBUILD 命…

【Python】成功解决ZeroDivisionError: division by zero

【Python】成功解决ZeroDivisionError: division by zero 🌈 个人主页:高斯小哥 🔥 高质量专栏:Matplotlib之旅:零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程👈 希望得到您的订…

ROS2从入门到精通0-2:ROS2简介、对比ROS1与详细安装流程

目录 0 专栏介绍1 什么是机器人操作系统?2 ROS的发展历程3 ROS2与ROS1的区别4 ROS2安装4.1 基本安装4.2 测试ROS24.2.1 测试一:发布者与订阅者4.2.2 测试二:海龟仿真器 5 常见问题 0 专栏介绍 本专栏旨在通过对ROS2的系统学习,掌…

CMake 交叉编译

想知道“魔笛手”在这里能发挥什么作用吗?想象一下,把 CMake 当做法力高强的魔笛手,C 的项目则是故事中的那些被魔笛手拯救的孩子。 父母要抚养一个孩子并非易事,营养需要面面俱到,保证身体健康,关心事无巨…

【周总结周末日常】

周总结 完成任务开发并且与前端联调通过 完成已开发功能的冒烟测试 修复测试中出现的一些数据显示问题 2024/3/10 晴 温度适宜 这周天气比上周好多了,最起码见到好几次太阳 周六在世纪公园溜达一会儿,偶尔呼吸下大自然,挺棒的…

wpscan专门针对wordpress的安全扫描工具

说明 WPScan是一款专门针对WordPress的漏洞扫描工具,它使用Ruby编程语言编写。WPScan能够扫描WordPress网站中的多种安全漏洞,包括WordPress本身的漏洞、插件漏洞和主题漏洞。此外,WPScan还能扫描类似robots.txt这样的敏感文件,并…

Clion attach一个linux进程进行debug

背景 手头的一段程序&#xff0c;目前已经在linux上运行了&#xff0c;我在windows上有源代码&#xff0c;想在本地debug一下&#xff0c;看看代码里复杂的流程都是怎么样运行的。 代码样例 # fileName:calculateSum.cpp #include <iostream>int main() {int num1, nu…

C++程序设计-第四/五章 函数和类和对象【期末复习|考研复习】

前言 总结整理不易&#xff0c;希望大家点赞收藏。 给大家整理了一下C程序设计中的重点概念&#xff0c;以供大家期末复习和考研复习的时候使用。 C程序设计系列文章传送门&#xff1a; 第一章 面向对象基础 第四/五章 函数和类和对象 第六/七/八章 运算符重载/包含与继承/虚函…

FX110网:在CP Markets申请出金四个多月了,没任何消息!

近期&#xff0c;本站收到一中国汇友投诉&#xff0c;称其CP Markets平台已数月无法出金&#xff0c;平台方也没有任何回应。在货币市场闯荡久了的“老鸟”就会知道&#xff0c;平台无故不给出金必定有妖&#xff0c;更何况还是长达数月&#xff01; 在CP Markets申请出金四个多…

LINE社群:为您的跨境出海业务带来更多流量

LINE 社群就是一个大型的公开聊天室&#xff0c;通过LINE社群不需要将对方添加为好友就可以聊天。它主要是以「兴趣」作为区分&#xff0c;所以商家可以在社群中找到不少潜在客户。尤其是面向台湾、日本、泰国这些地区的商家&#xff0c;LINE在这些地区的普及度很高&#xff0c…

10000的单子谈崩了,坚持用beego什么骚操作?

今天有个客户询盘&#xff0c;想仿制一个旅游的网站&#xff0c;一句话差点让我当场拒绝&#xff1a;前端都是静态的&#xff0c;网上有很多工具可以下载源代码&#xff0c;后端接入支付&#xff0c;做个简单的统计&#xff0c;客服&#xff0c;再接入google地图就行了...... 简…

【JAVA重要知识 | 第六篇】Java集合类使用总结(List、Set、Map接口及常见实现类)以及常见面试题

文章目录 6.Java集合类使用总结6.1概览6.1.1集合接口类特性6.1.2List接口和Set接口的区别6.1.3简要介绍&#xff08;1&#xff09;List接口&#xff08;2&#xff09;Set接口&#xff08;3&#xff09;Map接口 6.2Collection接口6.3List接口6.3.1ArrayList6.3.2LinkedList—不常…

PyCharm无代码提示解决

PyCharm无代码提示解决方法 在使用PyCharm工具时&#xff0c;调用方法却无法进行提示&#xff0c;针对PyCharm无代码提示整理下解决方案 1、Python内置语法无智能提示 复现&#xff1a;我这里以urllib库读取网页内容为例&#xff0c;在通过urlopen(&#xff09;之后调用getur…

Python打包为可执行文件

一文带你搞定python脚本(.py)打包为可执行文件(.exe) 文章目录 一文带你搞定python脚本(.py)打包为可执行文件(.exe)01、为什么要打包Python脚本&#xff1f;02、打包步骤&#xff1f;第一步&#xff1a;环境配置与操作包安装第二步&#xff1a;开始打包第三步&#xff1a;输入…

汽车行业一项网络安全标准实践指南发布,SSL证书助力传输通道加密,确保数据安全

2024年3月7日&#xff0c;全国网络安全标准化技术委员会秘书处发布了《网络安全标准实践指南——车外画面局部轮廓化处理效果验证》&#xff08;以下简称《实践指南》&#xff09;&#xff0c;旨在指导汽车数据处理者对车外画面进行人脸、车牌局部轮廓化处理效果的自行验证。《…

图的链式前向星存储与搜索

图的存储与搜索 链式前向星存储 图的存储方式有很多种&#xff0c;但是也都有各自的优缺点。例如&#xff1a;采用邻接矩阵的形式存储的时候&#xff0c;存储比较简单&#xff0c;但是遍历或者处理的时候就会比较浪费时间&#xff1b;而采用邻接表存储&#xff0c;则效率会有…

无需修改配置springboot启动多个不同端口的启动类

idea:2023 1.4版本 复制原先启动类&#xff0c;原先没有启动类&#xff0c;点击上方➕添加启动类 需要配置不同的端口号&#xff0c;其他默认 点击应用即可