Logstash笔记

目录​​​​​​​

一、简介

二、单个输入和输出插件

三、多个输入和输出插件

四、pipeline结构

五、队列和数据弹性

六、内存队列

七、持久化队列

八、死信队列 (DLQ)

九、输入插件

1)、beats

2)、dead_letter_queue

 3)、elasticsearch 

 4)、file

5)、redis

十、输出插件

1)、elasticsearch

2)、file

 3)、kafka

4)、redis

十一、过滤插件

1)、age

2)、bytes

3)、date

4)、grok


一、简介

Logstash 是一个具有实时管道功能的开源数据收集引擎。 Logstash 可以动态地统一来自不同来源的数据,并将数据规范化到您选择的目的地。

Logstash 管道有两个必需元素(输入和输出)以及一个可选元素(过滤器)。输入插件使用来自源的数据,过滤器插件根据您的指定修改数据,输出插件将数据写入目标。
例如:

image.png


最基本的 Logstash 管道:

cd logstash-8.12.2
# 示例中的管道从标准输入 stdin 获取输入,并以结构化格式将该输入移动到标准输出 stdout。
bin/logstash -e 'input { stdin { } } output { stdout {} }'

二、单个输入和输出插件

在创建 Logstash 管道之前,您将配置 Filebeat 以将日志行发送到 Logstash。 Filebeat 客户端是一个轻量级、资源友好型工具,它从服务器上的文件收集日志并将这些日志转发到 Logstash 实例进行处理。 Filebeat 专为可靠性和低延迟而设计。 Filebeat 在主机上的资源占用很少,Beats 输入插件最大限度地减少了 Logstash 实例的资源需求。

安装Filebeat后,需要对其进行配置。打开位于 Filebeat 安装目录中的 filebeat.yml 文件:

#输出logstash中filebeat.inputs:
- type: log
  paths:
    - /path/to/file/logstash-tutorial.log    # Filebeat 处理的一个或多个文件的绝对路径
output.logstash:
  hosts: ["localhost:5044"]   #输出logstash中

创建一个 Logstash 配置管道,该管道使用 Beats 输入插件从 Beats 接收事件,first-pipeline.conf 的文件:

input {
    beats {
        port => "5044"   # 使用 Filebeat 将 Apache Web 日志作为输入
    }
}
 filter {   # 解析这些日志以从日志中创建特定的命名字段
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
    geoip {
        source => "clientip"
    }
}
output {
    elasticsearch {
        hosts => [ "localhost:9200" ]   # 将解析后的数据写入 Elasticsearch 集群
    }
}

三、多个输入和输出插件

创建一个 Logstash 管道,该管道从 Twitter 源和 Filebeat 客户端获取输入,然后将信息发送到 Elasticsearch 集群以及将信息直接写入文件。

input {
    twitter {
        consumer_key => "enter_your_consumer_key_here"
        consumer_secret => "enter_your_secret_here"
        keywords => ["cloud"]
        oauth_token => "enter_your_access_token_here"
        oauth_token_secret => "enter_your_access_token_secret_here"
    }
    beats {
        port => "5044"
    }
}
output {
    elasticsearch {
        hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
    }
    file {
        path => "/path/to/target/file"
    }
}

四、pipeline结构

input {
  ...
}

filter {
  ...
}

output {
  ...
}

每个部分都包含一个或多个插件的配置选项。如果指定多个过滤器,它们将按照它们在配置文件中出现的顺序应用。如果指定多个输出,事件将按照它们在配置文件中出现的顺序按顺序发送到每个目标。

五、队列和数据弹性

默认情况下,Logstash 在管道阶段(输入 → 管道工作线程)之间使用内存中的有界队列来缓冲事件如果 Logstash 遇到临时机器故障,内存队列的内容将会丢失。(临时机器故障是指 Logstash 或其主机异常终止但能够重新启动的情况。)为了防止数据丢失并确保事件在管道中不间断地流动,Logstash 提供了数据弹性功能。

  • 持久队列 (PQ) 通过将事件存储在磁盘上的内部队列中来防止数据丢失。
  • 死信队列 (DLQ) 为 Logstash 无法处理的事件提供磁盘存储,以便您可以评估它们。您可以使用 dead_letter_queue 输入插件轻松地重新处理死信队列中的事件。

默认情况下,这些弹性功能处于禁用状态。要打开这些功能,您必须在 Logstash 设置文件中显式启用它们。

六、内存队列

内存队列大小不是直接配置的。相反,这取决于您如何调整 Logstash。

其上限由 pipeline.workers (默认值:CPU 数量)乘以 pipeline.batch.size (默认值:125)事件定义。该值称为“飞行计数”,确定每个内存队列中可以保存的最大事件数。

当队列已满时,Logstash 对输入施加反压,以阻止数据流入 Logstash。

七、持久化队列

Logstash 持久队列通过将运行中的消息队列存储到磁盘来帮助防止异常终止期间的数据丢失。

  • 有助于防止正常关闭期间和 Logstash 异常终止期间消息丢失。如果在事件正在进行时 Logstash 重新启动,Logstash 会尝试传送存储在持久队列中的消息,直到传送至少成功一次。
  • 可以吸收突发事件,而不需要 Redis 或 Apache Kafka 等外部缓冲机制。

默认情况下禁用持久队列。

持久队列不能解决以下问题:

  • 不使用请求-响应协议的输入插件无法防止数据丢失。 Tcp、udp、zeromq 推+拉和许多其他输入没有向发送方确认收到的机制。 (beats和http等插件确实具有确认功能,因此受到该队列的良好保护。)
  • 如果在提交检查点文件之前发生异常关闭,则可能会丢失数据。
  • 持久队列不处理永久性机器故障,例如磁盘损坏、磁盘故障和机器丢失。保留到磁盘的数据不会被复制。

八、死信队列 (DLQ)

死信队列(DLQ)被设计为临时写入无法处理的事件的地方。 DLQ 使您能够灵活地调查有问题的事件,而不会阻塞管道或丢失事件。可以使用 dead_letter_queue 输入插件处理来自 DLQ 的事件。

默认情况下,当 Logstash 遇到由于数据包含映射错误或其他问题而无法处理的事件时,Logstash 管道会挂起或删除不成功的事件。为了防止这种情况下的数据丢失,您可以配置 Logstash 将不成功的事件写入死信队列而不是丢弃它们。

默认情况下禁用死信队列。要启用死信队列,请在logstash.yml设置文件中设置dead_letter_queue_enable选项:

dead_letter_queue.enable: true

当您准备好处理死信队列中的事件时,您可以创建一个使用 dead_letter_queue 输入插件从死信队列中读取事件的管道。

input {
  dead_letter_queue {
    path => "/path/to/data/dead_letter_queue"   # 包含死信队列的顶级目录的路径
    commit_offsets => true   # 当 true 时,保存偏移量。当管道重新启动时,它将继续从上次停止的位置读取,而不是重新处理队列中的所有项目
    pipeline_id => "main"   # 写入死信队列的管道的 ID。默认为“main”
    clean_consumed => true   # 删除已完全消耗的段,从而在处理时释放空间
    start_timestamp => "2017-06-06T23:40:37"  # 使用 start_timestamp 选项在队列中的特定点开始处理事件
  }
}

output {
  stdout {
    codec => rubydebug { metadata => true }   # 将事件(包括元数据)写入标准输出
  }
}

九、输入插件

1)、beats

在端口 5044 上侦听传入的 Beats 连接并为 Elasticsearch 建立索引:

input {
  beats {
    port => 5044
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "%{[@metadata][beat]}-%{[@metadata][version]}"  # %{[@metadata][beat]} 将索引名称的第一部分设置为元数据字段的值,%{[@metadata][version]} 将第二部分设置为 Beat 版本。例如:metricbeat-6.1.6。
  }
}

2)、dead_letter_queue

用于从 Logstash 的死信队列中读取事件: 

input {
  dead_letter_queue {
    path => "/var/logstash/data/dead_letter_queue"
    start_timestamp => "2017-04-04T23:40:37"
  }
}

 3)、elasticsearch 

input {
      # Read all documents from Elasticsearch matching the given query
      elasticsearch {
        hosts => "localhost"
        query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }'
      }
    }

 4)、file

input {
    file {
        path => "/path/log/*.log"
    }
}

5)、redis

input {
    redis {
        # Redis的key的类型。值可以是以下任意值:list、channel、pattern_channel
        data_type => "list"
        # Redis 数据库编号
        db => 5
        # Redis的数据库地址,默认为127.0.0.1
        host => "10.0.0.10"
        # Redis的端口号
        port => 6379
        # Redis的认证密码
        password => "admin"
        # Redis 列表或通道的名称
        key => "logstash-input-redis"
    }
}

十、输出插件

输出插件将事件数据发送到特定目的地。输出是事件管道的最后阶段。

1)、elasticsearch

使用 mutate 过滤器和条件来添加 [@metadata] 字段来设置每个事件的目标索引:

filter {
      if [log_type] in [ "test", "staging" ] {
        mutate { add_field => { "[@metadata][target_index]" => "test-%{+YYYY.MM}" } }
      } else if [log_type] == "production" {
        mutate { add_field => { "[@metadata][target_index]" => "prod-%{+YYYY.MM.dd}" } }
      } else {
        mutate { add_field => { "[@metadata][target_index]" => "unknown-%{+YYYY}" } }
      }
    }
    output {
      elasticsearch {
        index => "%{[@metadata][target_index]}"
      }
    }

2)、file

output {
 file {
   path => ...
   codec => line { format => "custom format: %{message}"}
 }
}

 3)、kafka

output {
      kafka {
        codec => json
        topic_id => "mytopic"   # 生成消息的主题
      }
    }

4)、redis

output {
  redis {
     # 指定写入数据的类型
    data_type => "list"
    # Redis 数据库编号
    db => 5
    # Redis主机地址
    host => "10.0.0.10"
    # Redis的端口号
    port => 6379
    # Redis的认证密码
    password => "admin"
    # Redis写入的key值
    key => "logstash-input-redis"
  }
}

十一、过滤插件

过滤器插件对事件执行中间处理。通常根据事件的特征有条件地应用过滤器。

1)、age

用于计算事件年龄的简单过滤器。
此过滤器通过从当前时间戳减去事件时间戳来计算事件的年龄。您可以将此插件与删除过滤器插件一起使用来删除早于某个阈值的 Logstash 事件。 

filter {
  age {
    add_field => {   # 一次添加多个字段
          "foo_%{somefield}" => "Hello world, from %{host}"
          "new_field" => "new_static_value"
        }
  }
  if [@metadata][age] > 86400 {
    drop {}
  }
}

2)、bytes

将计算机存储大小的字符串表示形式(例如“123 MB”或“5.6gb”)解析为其以字节为单位的数值。

filter {
      bytes {
        source => "my_bytes_string_field"   # 包含存储大小的源字段的名称
        target => "my_bytes_numeric_field"  # 将包含存储大小(以字节为单位)的目标字段的名称
      }
    }

3)、date

日期过滤器用于解析字段中的日期,然后使用该日期或时间戳作为事件的 Logstash 时间戳。 

 filter {
      date {
        match => [ "logdate", "MMM dd yyyy HH:mm:ss" ]
      }
    }

4)、grok

解析任意文本并对其进行结构化。Grok 是将非结构化日志数据解析为结构化且可查询的数据的好方法。

虚构的 http 请求日志:

55.3.244.1 GET /index.html 15824 0.043
filter {
      grok {
        match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
      }
    }

 在 grok 过滤器之后,事件中将有一些额外的字段:

client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/641953.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

AI大模型探索之路-实战篇6: Function Calling技术调研之详细流程剖析

系列篇章💥 AI大模型探索之路-实战篇4:DB-GPT数据应用开发框架调研实践 AI大模型探索之路-实战篇5: Open Interpreter开放代码解释器调研实践 目录 系列篇章💥一、前言二、Function Calling详细流程剖析1、创建OpenAI客户端2、定…

手机边听边充音频转接器双盲插系列:便捷充电,畅享音乐6500

在快节奏的生活中,手机已经成为我们不可或缺的日常用品。无论是工作、学习还是娱乐,手机都扮演着重要角色。然而,当我们沉浸在音乐的海洋中时,手机电量不足的困扰却时常打断我们的美好体验。为了解决这一难题,手机边听…

ESP32-C3模组上跑通OTA升级(8)

接前一篇文章:ESP32-C3模组上跑通OTA升级(7) 本文内容参考: 杂项系统 API - ESP32 - — ESP-IDF 编程指南 latest 文档 《ESP32-C3 物联网工程开发实战》 乐鑫科技 特此致谢! 七、固件版本 将不同功能的固件标记为…

好书推荐|MATLAB科技绘图与数据分析

提升你的数据洞察力,用于精确绘图和分析的高级MATLAB技术 MATLAB科技绘图与数据分析——jd 本书内容 《MATLAB科技绘图与数据分析》结合作者多年的数据分析与科研绘图经验,详细讲解MATLAB在科技图表制作与数据分析中的使用方法与技巧。全书分为3部分&a…

ChatGPT可以开车吗?分享大型语言模型在自动驾驶方面的应用案例

自动驾驶边缘案例需要复杂的、类似人类的推理,远远超出传统的算法和人工智能模型。而大型语言模型正在致力实现这一目标。 人工智能技术如今正在快速发展和应用,人工智能模型也是如此。拥有100亿个参数的通用模型的性能正在碾压拥有5000万个参数的任务特…

Java进阶学习笔记11——多态

什么是多态? 多态是在继承/实现情况下一种现象,表现为:对象多态和行为多态。 同一个对象,在不同时刻表现出来的不同形态。 多态的前提: 要有继承/实现关系 要有方法的重写 要有父类引用指向子类对象。 多态的具体代码…

电脑文件夹怎么加密?文件夹加密软件推荐

加密文件夹是保护电脑数据的重要方法,我们可以使用专业的文件夹加密软件来加密保护文件夹。下面小编就为大家介绍三种优秀的文件夹加密软件,安全保护电脑文件夹。 文件夹加密超级大师 文件夹加密超级大师是一款功能强大的文件夹加密软件,文件…

Spring 模拟管理Web应用程序

MVC:Model View Controller 1)controller:控制层(Servlet是运行服务器端,处理请求响应java语言编写技术) 2)service:业务层(事务,异常) 3&#xf…

Apache Hive 安装与配置的详细教程

1. Hive简介 Hive是基于Hadoop的一个数据仓库工具,用来进行数据提取、转化、加载,这是一种可以存储、查询和分析存储在Hadoop中的大规模数据的机制。hive数据仓库工具能将结构化的数据文件映射为一张数据库表,并提供SQL查询功能,能…

2024-05-22 VS2022使用modules

点击 <C 语言编程核心突破> 快速C语言入门 VS2022使用modules 前言一、准备二、使用其一, 用VS installer 安装模块:第二个选项就是, 与你的代码一同编译std模块, 这个非常简单, 但是也有坑. 总结 前言 要解决问题: 使用VS2022开启modules. 想到的思路: 跟着官方文档整…

乡村振兴的乡村环境治理与保护:加强乡村环境治理与保护,改善乡村环境质量,打造美丽宜居的乡村环境

一、引言 随着乡村振兴战略的深入实施&#xff0c;乡村环境治理与保护成为推动乡村全面振兴的关键环节。乡村环境是乡村发展的重要基础&#xff0c;关系到农民的生产生活和身心健康&#xff0c;也直接影响到乡村经济的可持续发展。因此&#xff0c;加强乡村环境治理与保护&…

Windows VS2022 C语言使用 sqlite3.dll 访问 SQLite数据库

今天接到一个学生C语言访问SQLite数据库的的需求: 第一步,SQLite Download Page下载 sqlite3.dll 库 下载解压,发现只有两个文件: 于是使用x64 Native Tools Command Prompt 终端 生成 sqlite3.lib 和 sqlite3.exp文件 LIB -def:sqlite3.def -out:sqlite3.lib -machin…

React中显示数据

SX 会让你把标签放到 JavaScript 中。而大括号会让你 “回到” JavaScript 中&#xff0c;这样你就可以从你的代码中嵌入一些变量并展示给用户。例如&#xff0c;这将显示 user.name&#xff1a; return (<h1>{user.name}</h1> ); 你还可以将 JSX 属性 “转义到 …

PYQT5点击Button执行多次问题解决方案(亲测)

PYQT5点击Button却执行多次问题 使用pyqt5时遇到问题&#xff0c;UI上按钮点击一次&#xff0c;对应的槽函数却执行了3遍 首先&#xff0c;确认函数名无冲突&#xff0c;UI button名无命名冲突&#xff0c;下图是简单的示例程序&#xff1a; 运行后&#xff0c;点击按钮&#…

数据库主流技术

文章目录 1.分布式数据库1.1 基础知识1.2 体系结构 2.Web数据库3.XML与数据库4.面向对象数据库5.大数据和数据仓库 1.分布式数据库 1.1 基础知识 分布式数据库系统是数据库系统和计算机网络相结合的产物。 由于计算机功能增强&#xff0c;成本下降&#xff0c;几乎每个办公室…

Java开发大厂面试第17讲:MySQL 的优化方案有哪些?数据库设计、查询优化、索引优化、硬件和配置优化等

性能优化&#xff08;Optimize&#xff09;指的是在保证系统正确性的前提下&#xff0c;能够更快速响应请求的一种手段。而且有些性能问题&#xff0c;比如慢查询等&#xff0c;如果积累到一定的程度或者是遇到急速上升的并发请求之后&#xff0c;会导致严重的后果&#xff0c;…

Midjourney是一个基于GPT-3.5系列接口开发的免费AI机器人

Midjourney是一个基于GPT-3.5系列接口开发的免费AI机器人&#xff0c;旨在提供多领域的智能对话服务。Midjourney在不同领域中有不同的定义和应用&#xff0c;以下是对其中两个主要领域的介绍&#xff1a; Midjourney官网&#xff1a;https://www.midjourney.com/ 一、AI绘画工…

人才测评的应用:人才选拔,岗位晋升,面试招聘测评

人才测评自诞生以来&#xff0c;就被广泛应用在各大方面&#xff0c;不仅是我们熟悉的招聘上&#xff0c;还有其他考核和晋升&#xff0c;都会需要用到人才测评。不知道怎么招聘&#xff1f;或者不懂得如何实现人才晋升&#xff1f;都可以参考人才测评&#xff0c;利用它帮我们…

ensp-三层交换技术

交换机-三层交换 一.概述 单臂路由有明显的缺陷,单臂路由的链路使用率高,可能会造成网路拥塞,造成网络不可用 可以让多个交换机连接路由器的不同接口,但是路由器的接口毕竟有限,不像交换机一样有那么多接口 使用三层交换解决路由器接口不够用问题 二.三层交换 1.创建多个VLAN…