logstash同步数据从kafka到es集群

背景:需求是这样的,原始文件是txt文件(每天300个文件),最终想要的结果是每天将txt中的数据加载到es中,开始的想法是通过logstash加载数据到es中,但是对logstash不太熟悉,不知道怎么讲程序弄成读取一个txt文件到es中以后,就将这个txt原始文件备份并且删除掉,然后就想到了通过一个中间件来做,Python读取成功一个txt文件,并且加载到kafka中以后,就将这个txt文件备份然后删除掉原始文件。

第一步:向kafka中添加数据,我用Python连接kafka集群,向其中加载数据

# -*- coding: utf-8 -*-

import json
import json
import msgpack
from loguru import logger
from kafka import KafkaProducer
from kafka.errors import KafkaError

def kfk_produce_1():
    """
        发送 json 格式数据
    :return:
    """
    producer = KafkaProducer(
        bootstrap_servers='192.168.85.109:9092',
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    #logstash-topic-one
    #producer.send('python_test_topic', {'key': 'value'})
    producer.send('logstash-topic-one', {'name': 'value'})

kfk_produce_1()
执行完的结果,来界面工具上看,显示这样,说明数据已经加载进来了

在这里插入图片描述

第二步:配置logstash,将kafka中的数据加载到es集群中

编写的logstash.conf配置如下;
input{
      kafka{
        bootstrap_servers => "192.168.85.109:9092"
        client_id => "consumer_id"
        group_id => "consumer_group"
        auto_offset_reset => "latest"
        consumer_threads => 1
        decorate_events => true
        topics => ["logstash-topic-one","logstash-topic-two"]
      }
}
output {

  if [@metadata][kafka][topic] == "logstash-topic-one" {
        elasticsearch {
          hosts => "http://192.168.85.109:9200"
          index => "kafka-one-data"
          timeout => 300
        }
    }

  if [@metadata][kafka][topic] == "logstash-topic-two" {
        elasticsearch {
          hosts => "http://192.168.85.109:9200"
          index => "kafka-two-data"
          timeout => 300
        }
    }
  stdout {}
}

第三步:执行logstash,通过kibana查看数据是否在es集群中,展示如下,则说明配置是正确的

在这里插入图片描述

在这里插入图片描述

问题1:现在发现,name字段是在message下面,如果是多个字段的话,不方便查询,想着怎么讲字段从message中弄出来,修改的配置如下,增加一段这样的代码就OK了

type => "json"
        codec => json {
            charset => "UTF-8"
        }

完整的配置文件logstash.conf代码如下;

input{
      kafka{
        bootstrap_servers => "192.168.85.109:9092"
        client_id => "consumer_id"
        group_id => "consumer_group"
        auto_offset_reset => "latest"
        consumer_threads => 1
        decorate_events => true
        topics => ["logstash-topic-one","logstash-topic-two"]
        type => "json"
        codec => json {
            charset => "UTF-8"
        }
      }
}
output {

  if [@metadata][kafka][topic] == "logstash-topic-one" {
        elasticsearch {
          hosts => "http://192.168.85.109:9200"
          index => "kafka-one-data"
          timeout => 300
        }
    }

  if [@metadata][kafka][topic] == "logstash-topic-two" {
        elasticsearch {
          hosts => "http://192.168.85.109:9200"
          index => "kafka-two-data"
          timeout => 300
        }
    }
  stdout {}
}

然后我又造了一个多字段的场景如下;

在这里插入图片描述

我先去logstash中查看日志如下,字段已经分离出来了

{
          "name" => "value",
      "@version" => "1",
          "type" => "json",
    "@timestamp" => 2023-05-17T06:13:48.825Z
}
{
      "@version" => "1",
          "type" => "json",
    "@timestamp" => 2023-05-17T06:20:57.729Z,
          "name" => "令狐冲",
           "age" => "30",
        "height" => "180cm"
}

去kibana中去查询,显示如下,测试成功喽,😄

在这里插入图片描述
在这里插入图片描述

问题2:在查询结果中发现,有些字段是没有用的,看看怎么去掉?

在配置文件中增加一个过滤器就可以解决了

filter { mutate {
                 remove_field => ["@version","@timestamp","type"] # 删除字段
                 }
 }

然后再去kibana中去查看,就发现这会儿的字段格式非常好看了,😄

在这里插入图片描述

文档后续再继续完善,有好的建议或者问题可以留言交流,😄

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/22577.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于SpringBoot的生鲜管理系统的设计与实现

背景 困扰交易市场的许多问题当中,生鲜交易管理一定是交易市场不敢忽视的一块。但是管理好生鲜交易又面临很多麻烦需要解决,例如有几个方面:第一,生鲜市场往往人数都比较多,如何保证能够管理到每一个商家,如何在工作琐碎,记录繁多的情况下将生鲜交易的当前情况反应给领导相关部…

柔顺机构学读书笔记1:悬臂梁变形

题目: 如图考虑悬臂梁,材料各向同性,即各个方向上的弹性模量和强度都相同。如果在x方向上作用一个可使最大应力等于屈服强度 S S S的力 F x F_x Fx​时, x x x轴方向的变形为多少,书上给出了答案: 我们来验…

2022级云曦实验室考试(一)pwn

讲真,俺都不知道pwn是啥,等俺搜搜! pwn简介: CTF中的pwn指的是通过通过程序本身的漏洞,编写利用脚本破解程序拿到主机的权限,这就需要对程序进行分析,了解操作系统的特性和相关漏洞&#xff0…

SHELL——流程控制条件判断

1、判断当前磁盘剩余空间是否有20G,如果小于20G,则将报警邮件发送给管理员,每天检查一次磁盘剩余空间。 2、判断web服务是否运行 1)、查看进程的方式判断该程序是否运行 2)、通过查看端口的方式判断该程序是否运行&am…

数据分析真的很火吗?真的有很多企业需要这样的岗位吗?求大佬指点。

“我是去年毕业的,因为疫情影响,整个就业环境都很不好,很多企业都裁员了。加上疫情三年基本都是玩过去,也没啥一技之长,就业就更难了。听说现在做数据分析的人很多,我身边的朋友都在转行做数据分析。 其实…

【C++】哈希——unordered系列容器哈希概念哈希冲突

文章目录 1. unordered系列的关联式容器1.1 引言1.2 unordered_map的使用说明1.3 unordered_set的使用说明1.4 unordered_set和unordered_map的应用1.5 性能比较 2. 哈希概念3. 哈希函数4. 哈希冲突5. 哈希冲突的解决——开散列和闭散列5.1 闭散列5.2 开散列 1. unordered系列的…

Elasticsearch:Explicit mapping - 显式映射

显式映射相比较动态映射(Dynamic mapping)是需要我们在索引创建时就定义字段及其类型。这个和我们传统的 RDMS 数据库一样,在我们写入数据到数据库之前,我们需要工整地定义好每个字段及其类型和长度。Elasticsearch 既可以使用显式…

使用柔性数组重写MyString

hello,各位宝子,今天阿崽将使用c和柔性数组的方式重新去写String类 在开始本次知识前,首先给大家介绍下柔性数组这个buff特点: 结构中的柔性数组成员前面至少要包含一个其他成员 sizeof返回的这种结构大小不包括柔性数组的内存 …

数据结构课程设计——哈夫曼编/译码器

数据结构课程设计任务书 学生姓名: 专业班级:软件工程 指导教师: 工作单位: 题 目: 哈夫曼编/译码器 基础要求: (1)熟悉各种…

数字信号处理基础(二):FFT和IFFT的使用以及详细分析代码书写思路

目录 1. fft和ifft的原理1.1 fft1.2 ifft 2. 书写代码思路3. 完整代码4. 结果图 1. fft和ifft的原理 1.1 fft fft是快速傅里叶变换,是MATLAB中计算信号频谱的函数,使用方法是fft(x),直接对信号x进行fft计算。 由于fft函数计算信号的频谱是0…

vue3与vue2共存环境搭建

1、全局安装vue2 npm install vue-cli -g2、自行在任意位置创建一个文件夹,局部安装vue3 npm初始化 npm initnpm初始化 提示: 初始化后 出现文件package.json 如果没有初始化 会报错,且文件夹中不会新增内容 3、局部安装vue3 npm install …

宏工科技“全面”发力CIBF,助推电池智造“高效提质”

5月16-18日,第十五届中国国际电池技术展览会(CIBF2023)在深圳盛大举行。宏工科技携电池材料与电池匀浆领域的创新产品和系统解决方案精彩亮相。 据了解,宏工科技在新能源行业的业务涉及电池材料整线产线、电池匀浆、电池回收三个…

R语言实践——rWCVP入门

rWCVP入门 介绍1. 访问到WCVP1.1 方法一1.2 方法二(谨慎) 2. WCVP数据筛选2.1 关于按分类单元筛选的说明2.2 关于按分布区域筛选的说明 笔者实践 介绍 世界维管植物名录(WCVP)是维管植物物种的全球共识。它提供了科学已知的> …

【C语言】结构体指针

结构体指针 结构体基础知识注意对于成员的赋值 结构体指针指向结构体变量的指针结构体指针与结构体成员指针用结构体指针引用结构体成员 结构体 基础知识 初识结构体,可以先看这篇浅显易懂的文章结构体–基础篇 所谓结构体,是一组类型可以不同的相关变…

怎么把录音转文字?推荐你这三款工具

随着科技不断发展,录音转文字的技术也逐渐被广泛应用于各种场景中。其中最常见的一种就是会议记录。在日常工作中,会议是企业和组织中必不可少的一个环节,但在会议过程中的录音和记录往往需要花费大量的时间和精力。这个时候,我们…

基于MAC地址的ACL配置

基于MAC地址的ACL配置 【实验目的】 掌握基于MAC地址的标准ACL的配置。验证配置。 【实验拓扑】 实验拓扑如图1所示。 图1 实验拓扑 设备参数如表所示。 表1 设备参数表 设备 接口 IP地址 子网掩码 默认网关 S1 e0/0 N/A N/A N/A e0/1 N/A N/A N/A PC1 N/…

架构-软件工程模块-2

系统分析 数据流图可能出案例题,状态转换图了解作用即可 用例图、类图选择题多,暴徒了解即可 #mermaid-svg-lGozbtkYJPEQF1eo {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-lGozbtkYJPEQF1e…

c++学习——c与c++const修饰的变量的区别

c语言下const修饰的变量 1、c语言下const修饰的变量都有空间 2. c语言的const修饰的全局变量具有外部链接属性 07 const修饰的变量.c #define _CRT_SECURE_NO_WARNINGS #include <stdio.h> #include <string.h> #include <stdlib.h>const int a 10;//常…

解析使用FPGA逻辑实现FIR滤波器的几种架构

有限脉冲响应(finite impulse response&#xff0c;FIR)数字滤波器 一、FIR数字滤波器理论介绍 FIR滤波器的实质就是输入序列与系统脉冲响应的卷积&#xff0c;即&#xff1a; 其中&#xff0c;N为滤波器的阶数&#xff0c;也即抽头数&#xff1b;x(n)为第n个输入序列&#xff…

AI人工智能标记数据的技术:类型、方法、质量控制、应用

AI人工智能 标记数据 在人工智能&#xff08;Artificial Intelligence&#xff0c;简称AI&#xff09;领域中&#xff0c;标记数据是非常重要的一环。它是指对原始数据进行标记和注释&#xff0c;以便机器学习算法可以理解和利用这些数据。标记数据可以提高机器学习模型的准确…