简介
1.1 什么是Logstash?
Logstash作为一个具备实时流水线功能的开源数据收集引擎,拥有强大的能力。它能够从不同来源收集数据,并将其动态地汇聚,进而根据我们定义的规范进行转换或者输出到我们定义的目标地址。
1.2 Logstash的主要特点
Logstash通过清洗和使数据多样化,Logstash使数据变得适用于各种高级下游分析和可视化用例。此外,Logstash提供广泛的输入、过滤器和输出插件,而且许多本地编解码器进一步简化了数据摄取的过程。无论是数据整理还是提供给下游应用,Logstash都是一个强大且灵活的解决方案。
安装
- 注意所需要的java版本,因为我电脑装的是jdk1.8,因此选择了7.17版本,如果电脑上jdk版本较高可以用最新版本,后续8.0默认jdk11起
下载
https://www.elastic.co/cn/downloads/logstash
环境配置
Logstash 推荐在环境变量配置 LS_JAVA_HOME 变量指向jdk目录来使用jdk。配置jdk目录时,我们可以直接使用Logstash 中的jdk目录,省去了另外下载和有可能引起版本不能用的问题。(7.17.12支持的jdk版本只有 jdk8,jdk11和jdk15)
驱动包准备
创建存放目录mkdir mylib
下载驱动包到该目录
创建同步配置
vim mysql-log-to-es.conf
input {
jdbc {
jdbc_driver_library => "./mylib/mysql-connector-java-8.0.13.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/zliov_test_log_center"
jdbc_user => "root"
jdbc_password => "123456"
parameters => { "flag" => 1 }
schedule => "* * * * *"
statement => "SELECT id,username,params,flag,createTime FROM log WHERE flag = :flag AND id > :sql_last_value ORDER BY id ASC LIMIT
2"
last_run_metadata_path => "mysql-by-id-to-es.index"
tracking_column => "id"
use_column_value => true
tracking_column_type => "numeric"
}
}
filter {
mutate { add_field => { "from" => "logstash" } }
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "test-by-id-%{+YYYY.MM}"
}
stdout {
}
}
启动
./bin/logstash -f mylib/mysql-by-id-to-es.conf
后台运行
/usr/share/logstash/bin/logstash -f mylib/mysql-by-id-to-es.conf --config.reload.automatic &
观察es