Logstash输入Kafka输出Es配置

Logstash介绍

Logstash是一个开源的数据收集引擎,具有实时管道功能。它可以从各种数据源中动态地统一和标准化数据,并将其发送到你选择的目的地。Logstash的早期目标主要是用于收集日志,但现在的功能已经远远超出这个范围。任何事件类型都可以通过Logstash进行分析,通过输入、过滤器和输出插件进行转换。

Logstash的工作原理是使用管道方式进行日志的搜集处理和输出。这个管道包括三个阶段:输入、处理和输出。输入插件从数据源那里消费数据,过滤器插件根据你的期望修改数据,输出插件将数据写入目的地。

Logstash的输入支持各种选择,可以同时从众多常用来源捕捉事件,如日志、指标、Web应用、数据存储以及各种AWS服务等。在数据从源传输到存储库的过程中,Logstash的过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。

Logstash的输出也可以根据需要选择不同的存储方式,除了Elasticsearch作为首选输出方向外,还有其他的输出选择。

Logstash是一个强大的开源工具,可以用于实时处理和转换来自各种数据源的数据,为数据分析和商业决策提供支持。

Kafka介绍

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。它是一种高吞吐量的分布式发布订阅消息系统,可以处理消费者在网站中的所有动作流数据。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,Kafka是一个可行的解决方案。

Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

Es介绍

ES指的是Elasticsearch,它是一个基于RESTful web接口并且构建在Apache Lucene之上的开源分布式搜索引擎。它还是一个分布式文档数据库,其中每个字段均可被索引,而且每个字段的数据均可被搜索。它能够横向扩展至数以百计的服务器存储以及处理PB级的数据,可以在极短的时间内存储、搜索和分析大量的数据。通常作为具有复杂搜索场景情况下的核心发动机。

Logstash输入输出配置

Logstash的输入输出配置主要是针对其输入和输出插件进行设置。以下是一些常见的输入和输出插件的配置示例:

输入配置:

  1. file:从文件读取日志信息,例如:
input {
  file {
    path => "/var/log/error.log"
    type => "error"
    start_position => "beginning"
  }
}
  1. stdin:从标准输入读取日志信息,例如:
input {
  stdin {}
}
  1. syslog:从系统日志读取日志信息,例如:
input {
  syslog {
    type => "syslog"
  }
}

输出配置:

  1. stdout:将日志信息输出到标准输出,例如:
output {
  stdout {}
}
  1. elasticsearch:将日志信息输出到Elasticsearch集群,例如:
output {
  elasticsearch {
    hosts => "localhost:9200"
    index => "myindex"
  }
}

以上是一些常见的输入输出插件配置示例,Logstash还支持其他多种输入输出插件,可以根据具体需求进行选择和配置。

Logstash输入Kafka输出Es配置

Logstash的输入配置可以通过Kafka插件从Kafka中读取数据,输出配置可以通过Elasticsearch插件将数据写入Elasticsearch集群。以下是一个示例配置:

input {
  kafka {
    bootstrap_servers => "your_kafka_server:9092"
    client_id => "your_client_id"
    group_id => "your_group_id"
    auto_offset_reset => "latest"
    consumer_threads => 1
    decorate_events => true
    topics => ["your_topic"]
  }
}

output {
  if [@metadata][kafka][topic] == "your_topic" {
    elasticsearch {
      hosts => "your_elasticsearch_server:9200"
      index => "your_index"
      timeout => 300
    }
  }
}

在这个配置中,Logstash通过Kafka插件从指定的Kafka服务器和主题中读取数据,然后通过Elasticsearch插件将数据写入指定的Elasticsearch索引。你可以根据实际情况修改配置中的参数,例如Kafka服务器的地址、客户端ID、组ID、主题等。

  • 上面的配置参数的含义如下所示:
  1. bootstrap_servers: 这是Kafka服务器的地址和端口。你需要提供Kafka集群中至少一个服务器的地址。
  2. client_id: 这是客户端的唯一标识符,用于标识连接到Kafka集群的客户端。
  3. group_id: 这是消费者组的ID。如果你有多个Logstash实例读取同一个Kafka主题,并且你想将它们作为一个消费者组来处理,那么你需要使用这个参数。
  4. auto_offset_reset: 这个参数决定了当Logstash无法找到其之前读取的偏移量时应该怎么做。设置为"latest"意味着从最新的记录开始读取。
  5. consumer_threads: 这是用于消费Kafka消息的线程数。增加线程数可以加快数据读取速度,但也会增加CPU和内存的使用。
  6. decorate_events: 如果设置为true,Logstash会为每个事件添加额外的元数据,例如Kafka主题和分区信息。
  7. topics: 这是Logstash要读取的Kafka主题列表。
  8. if [@metadata][kafka][topic] == “your_topic”: 这是一个条件语句,用于确定是否将事件发送到Elasticsearch。只有当事件的主题与指定的"your_topic"匹配时,事件才会被发送到Elasticsearch。
  9. hosts: 这是Elasticsearch集群的地址和端口。
  10. index: 这是Logstash将数据写入Elasticsearch的索引名称。
  11. timeout: 这是Logstash与Elasticsearch集群通信的超时时间(以秒为单位)。

这些参数可以根据你的具体需求进行调整,以满足你的数据收集和处理需求。

java发送消息到Kafka示例

Apache Kafka是一种分布式流处理平台,你可以使用它来处理各种数据。以下是使用Java向Kafka发送消息的示例代码:

首先,你需要添加Apache Kafka的依赖到你的项目中。如果你正在使用Maven,那么你可以在pom.xml文件中添加如下依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.8.0</version>
    </dependency>
</dependencies>

以下是使用Java发送消息的示例代码:

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class ProducerDemo {
    public static void main(String[] args) {
        // 1. 配置生产者客户端参数
        Properties props = new Properties();
        // Kafka集群地址
        props.put("bootstrap.servers", "your_kafka_server:9092");
        // 消息ack模式: all表示消息被leader和follower都写入后才返回ack, -1表示只被leader写入就返回ack
        props.put("acks", "all");
        // 重试次数
        props.put("retries", 0);
        // 批量发送大小
        props.put("batch.size", 16384);
        // 发送延时,用于控制producer发送请求的延迟时间,可以提高吞吐量
        props.put("linger.ms", 1);
        // 缓冲区大小
        props.put("buffer.memory", 33554432);
        // key序列化类
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化类
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2. 创建生产者对象,传入配置参数
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            // 3. 创建消息对象,指定topic、消息key和消息体value
            ProducerRecord<String, String> record = new ProducerRecord<>("your_topic", "key" + i, "value" + i);
            // 4. 发送消息到Kafka集群,并获取返回结果
            RecordMetadata metadata = producer.send(record).get();
            // 打印结果,发送是否成功,以及发送到的分区和offset等信息
            System.out.printf("offset = %d, partition = %d%n", metadata.offset(), metadata.partition());
        }
        // 5. 关闭生产者对象,释放资源
        producer.close();
    }
}

在这个示例中,我们创建了一个名为ProducerDemo的类,这个类使用Kafka的生产者API发送消息到名为"my-topic"的主题。请注意你需要替换"bootstrap.servers"属性的值为你的Kafka集群的实际地址。如果你的集群在本地运行,并且使用的是默认的端口,那么你可以使用"localhost:9092"。

Logstash常用输入插件

Logstash的常用输入插件包括以下几种:

  1. file:该插件可以从文件中读取事件。它使用了FileWatch库来监听文件变化,并跟踪被监听的日志文件的当前读取位置,从而确保不会漏过任何数据。
  2. stdin:该插件是标准的输入插件,能够从命令行中读取事件。
  3. TCP:从TCP连接中读取数据。
  4. UDP:从UDP套接字中读取数据。
  5. Redis:从Redis中读取数据。
  6. JDBC:从关系型数据库中读取数据。
  7. HTTP:从HTTP服务器中读取数据。

Logstash常用输出插件

Logstash常用的输出插件包括以下几种:

  1. Elasticsearch:将日志数据输出到Elasticsearch,用于后续的搜索和分析。
  2. Kafka:将日志数据发送到Kafka集群,供其他消费者使用。
  3. File:将日志数据输出到文件中,便于后续查看和审计。
  4. Gelf:将日志数据输出到Gelf兼容的服务器,用于远程监控和报警。
  5. Fluentd:将日志数据输出到Fluentd,用于统一日志收集和转发。

拓展

Logstash使用指南

Kafka使用指南

Elasticsearch使用指南

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/239462.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

记录汇川:MODBUS-梯形图

H5U的MODBUS通信不需要编写程序&#xff0c;通过组态MODBUS通信配置表&#xff0c;实现数据通信。 相对自由口走报文的形式&#xff0c;这个更加的方便。配置结束&#xff0c;就可以监控数据或写入。

Linux - 进程间通信(中)- 管道的应用场景

前言 在上篇博客当中&#xff0c;对Linux 当中的进程通信&#xff0c;做了详细阐述&#xff0c;主要是针对父子进程的通信来阐述的同时&#xff0c;也进行了模拟实现。 对于管道也有了初步了解&#xff0c;但是这仅仅是 进程间通信的一部分&#xff0c;Linux 当中关于进程间通…

Unity光照模型实践

光照作为3D渲染中最重要的部分之一&#xff0c;如何去模拟真实环境的光照是重要的研究内容&#xff0c;但是现实环境光照过于复杂&#xff0c;有很多经典好用的光照模型去近似真实光照。 根据基础的Phong模型 最终某个点的结果为 环境光Ambient 漫反射光Diffuse 高光Specula…

安卓MediaRecorder(2)录制源码分析

文章目录 前言JAVA new MediaRecorder() 源码分析android_media_MediaRecorder.cpp native_init()MediaRecorder.java postEventFromNativeandroid_media_MediaRecorder.cpp native_setup() MediaRecorder 参数设置MediaRecorder.prepare 分析MediaRecorder.start 分析MediaRec…

Navicat 技术指引 | 适用于 GaussDB 分布式的服务器对象的创建/设计

Navicat Premium&#xff08;16.3.3 Windows版或以上&#xff09;正式支持 GaussDB 分布式数据库。GaussDB分布式模式更适合对系统可用性和数据处理能力要求较高的场景。Navicat 工具不仅提供可视化数据查看和编辑功能&#xff0c;还提供强大的高阶功能&#xff08;如模型、结构…

JavaSE知识点回顾,附学习思维导图

第一阶段 day01 java 发展&#xff0c;java 环境( path, java_home, class_path)&#xff0c;java 原理&#xff0c; java 执行 &#xff0c; jvm , jre , jdk day02 变量 标识符命名规则 数据类型 数据类型的转换 运算符 day03 选择结构 if , switch day04 循环结构 for , whi…

java--Collection的遍历方式

1.迭代器概述 迭代器是用来遍历集合的专用方式(数组没有迭代器)&#xff0c;在java中迭代器是Iterator。 2.Collection集合获取迭代器的方法 3.Iterator迭代器中的常用方法 4.增强for循环 ①增强for可以用来遍历集合或数组。 ②增强for遍历集合&#xff0c;本质就是迭代器遍…

005、Softmax损失

之——softmax与交叉熵 杂谈 我们常用到softmax函数与交叉熵的结合作为损失函数以监督学习&#xff0c;这里做一个小小的总结。 正文 1.softmax的基本改进 所谓softmax就是在对接全连接层输出时候把输出概率归一化&#xff0c;最基础的就是这样&#xff1a; 效果就是这样&…

(第65天)PDB 快照

介绍 PDB 快照是一个 PDB 指定时间点的副本。在创建快照时,源 PDB 可以是只读或者读写模式。 PDB 快照可以用于快速创建 PDB。 PDB 快照可以分为手动和自动两种创建方式(create pluggable database|alter pluggable database): 手动快照使用 SNAPSHOT 子句的方式来创建自动…

Jmeter beanshell编程实例

1、引言 BeanShell是一种小型的&#xff0c;免费的&#xff0c;可嵌入的符合Java语法规范的源代码解释器&#xff0c;具有对象脚本语言特性。 在Jmeter实践中&#xff0c;由于BeanShell组件较高的自由度&#xff0c;通常被用来处理较为复杂&#xff0c;其它组件难以处理的问题…

jmeter接口测试之登录测试

注册登录_登陆接口文档 1.登录 请求地址&#xff1a; POST xxxxxx/Home/Login 请求参数&#xff1a; args{LoginName:"mtest", // 登录名&#xff0c;可以为用户名或邮箱Password:"123456" // 密码" }响应数据&#xff1a; 成功 {"S…

微表情检测(四)----SL-Swin

SL-Swin: A Transformer-Based Deep Learning Approach for Macro- and Micro-Expression Spotting on Small-Size Expression Datasets 在本文中&#xff0c;我们致力于解决从视频中检测面部宏观和微观表情的问题&#xff0c;并通过使用深度学习方法分析光流特征提出了引人注…

[GFCTF 2021]文件查看器

文章目录 前置知识可调用对象数组对方法的调用GC回收机制phar修改签名 解题步骤 前置知识 可调用对象数组对方法的调用 我们先来看下面源码 <?phperror_reporting(0);class User{public $username;public $password;public function check(){if($this->username"…

用perl查找文件夹中的所有文件和目录

查找文件夹中的文件和目录是一个很常见的操作&#xff0c;使用perl的File::Find模块可以很方便的实现。首先使用perldoc File::Find 查看一下文档: 这个核心的就是文档中描述的回调函数。我们举一个实际的例子&#xff0c;一个空的git仓库为例&#xff0c;下面的脚本用于查找…

Aduino实现音频频谱效果

看到这样一个效果,于是想用arduino实现类似效果。需要的组件如下 1 arduino开发板 2 音频传感器 3 灯带 接线图如图 代码如下 #include <EEPROM.h>#include <Adafruit_NeoPixel.h>#define PIN 2 // input pin Neopixel is attached to#define NUMPIXELS …

流程控制之条件判断

流程控制之条件判断 2.1.if语句语法 2.1.1单分支结构 # 语法1: if <条件表达式> then 指令 fi #语法2: if <条件表达式>;then 指令 fi # if&#xff0c;if 标志循环起始终止…

现代雷达车载应用——第2章 汽车雷达系统原理 2.2节

经典著作&#xff0c;值得一读&#xff0c;英文原版下载链接【免费】ModernRadarforAutomotiveApplications资源-CSDN文库。 2.2 汽车雷达架构 从顶层来看&#xff0c;基本的汽车雷达由发射器&#xff0c;接收器和天线组成。图2.2给出了一种简化的单通道连续波雷达结构[2]。这…

【Docker】vxlan的原理与实验

VXLAN&#xff08;Virtual eXtensible Local Area Network&#xff0c;虚拟可扩展局域网&#xff09;&#xff0c;是一种虚拟化隧道通信技术。它是一种Overlay&#xff08;覆盖网络&#xff09;技术&#xff0c;通过三层的网络来搭建虚拟的二层网络。 VXLAN介绍 VXLAN是在底层…

常用的测试用例大全

登录、添加、删除、查询模块是我们经常遇到的&#xff0c;这些模块的测试点该如何考虑 1)登录 ① 用户名和密码都符合要求(格式上的要求) ② 用户名和密码都不符合要求(格式上的要求) ③ 用户名符合要求&#xff0c;密码不符合要求(格式上的要求) ④ 密码符合要求&#xf…

使用Java将图片添加到Excel的几种方式

1、超链接 使用POI&#xff0c;依赖如下 <dependency><groupId>org.apache.poi</groupId><artifactId>poi</artifactId><version>4.1.2</version></dependency>Java代码如下,运行该程序它会在桌面创建ImageLinks.xlsx文件。 …