1、安装包下载
elasticsearch
https://www.elastic.co/cn/downloads/past-releases#elasticsearch
kibana安装包地址:
https://www.elastic.co/cn/downloads/past-releases/kibana-8-10-4
logstash安装包地址:
https://www.elastic.co/cn/downloads/past-releases/logstash-8-10-4
elasticsearch-analysis-ik包下载地址:
https://github.com/medcl/elasticsearch-analysis-ik/releases
2、解压安装包,并将elasticsearch-analysis-ik-8.10.4目录放到es的plugins目录下
3、修改es的config目录下的elasticsearch.yml
4、在终端启动es,在bin目录下点击elasticsearch.bat
5、在浏览器上查看
6、设置kibana的中文显示,修改kibana.yml
7、使用logstash进行mysql数据库数据同步到es配置
logstash.conf配置
# 连接到mysql数据库
input {
jdbc {
# MySQL JDBC驱动库的路径
jdbc_driver_library => "D:\soft\third_soft\elasticsearch\logstash-8.10.4\config\mysql-connector-java-8.0.11.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# MySQL数据库的连接字符串
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/main_literature?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
# MySQL数据库的用户名
jdbc_user => "root"
# MySQL数据库的密码
jdbc_password => "****"
# 开启分页
jdbc_paging_enabled => true
# 分页每页数量,可以自定义
jdbc_page_size => "10000"
# 查询语句
statement => "SELECT * FROM literature_parsing_record WHERE id > :sql_last_value"
# 定时执行的时间间隔,这里设置为每分钟执行一次。含义:分、时、天、月、年
schedule => "* * * * *"
# 定义的类型名称,说明哪个输入到哪个输出类型,与output中的if判断值对应
type => "literature_parsing_record"
# 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到last_run_metadata_path的文件
use_column_value => true
# 记录上一次追踪的结果值
last_run_metadata_path => "D:\soft\third_soft\elasticsearch\logstash-8.10.4\config\track_id"
# 用于增量同步的字段,如果use_column_value为true,配置本参数,追踪的column名,可以是自增id或时间
tracking_column => "id"
# tracking_colum 对应字段的类型
tracking_column_type => numeric
# 是否清除 last_run_metadata_path 的记录,true则每次都从头开始查询所有的数据库记录
clean_run => false
# 列字段是否都转为小写名称
lowercase_column_names => false
# 设置时区
jdbc_default_timezone =>"Asia/Shanghai"
}
jdbc {
# MySQL JDBC驱动库的路径
jdbc_driver_library => "D:\soft\third_soft\elasticsearch\logstash-8.10.4\config\mysql-connector-java-8.0.11.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# MySQL数据库的连接字符串
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/main_literature?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
# MySQL数据库的用户名
jdbc_user => "root"
# MySQL数据库的密码
jdbc_password => "*****"
# 开启分页
jdbc_paging_enabled => true
# 分页每页数量,可以自定义
jdbc_page_size => "10000"
# 查询语句
statement => "SELECT * FROM literature_content_record WHERE id > :sql_last_value"
# 定时执行的时间间隔,这里设置为每分钟执行一次。含义:分、时、天、月、年
schedule => "* * * * *"
# 定义的类型名称,说明哪个输入到哪个输出类型,与output中的if判断值对应
type => "literature_content_record"
# 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到last_run_metadata_path的文件
use_column_value => true
# 记录上一次追踪的结果值
last_run_metadata_path => "D:\soft\third_soft\elasticsearch\logstash-8.10.4\config\literature_content_record_track_id"
# 用于增量同步的字段,如果use_column_value为true,配置本参数,追踪的column名,可以是自增id或时间
tracking_column => "id"
# tracking_colum 对应字段的类型
tracking_column_type => numeric
# 是否清除 last_run_metadata_path 的记录,true则每次都从头开始查询所有的数据库记录
clean_run => false
# 列字段是否都转为小写名称
lowercase_column_names => false
# 设置时区
jdbc_default_timezone =>"Asia/Shanghai"
}
}
# 过滤数据
filter {
mutate {
# 移除Logstash自动生成的字段
remove_field => ["@version", "@timestamp"]
}
}
# 连接到Elasticsearch
output {
if[type]=="literature_parsing_record" {
elasticsearch {
# Elasticsearch的主机和端口
hosts => ["http://localhost:9200"]
# 写入es的索引名称
# index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
index => "literature_parsing_record"
# es的文档类型名称,6.x版本可以是一个索引对应多个文档类型,不建议这么做。之后版本只支持一个索引对应一个文档类型
document_type => "doc"
# 使用数据中的id字段作为文档id
document_id => "%{id}"
# 如果使用自己配置的模板,必须配置true
# manage_template => true
#
# template_overwrite => true
# 模板名称,与定义的模板名称对应
# template_name => "literature_parsing_record"
# 使用自定义模板的文件路径,模板用于创建es的索引,决定了索引的创建方式
# template => "/opt/elasticsearch/logstash-6.6.1/template/literature_parsing_record_logstash.json"
#user => "elastic"
#password => "changeme"
}
}
if[type]=="literature_content_record" {
elasticsearch {
# Elasticsearch的主机和端口
hosts => ["http://localhost:9200"]
# 写入es的索引名称
# index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
index => "literature_content_record"
# es的文档类型名称,6.x版本可以是一个索引对应多个文档类型,不建议这么做。之后版本只支持一个索引对应一个文档类型
document_type => "doc"
# 使用数据中的id字段作为文档id
document_id => "%{id}"
# 如果使用自己配置的模板,必须配置true
# manage_template => true
#
# template_overwrite => true
# 模板名称,与定义的模板名称对应
# template_name => "literature_content_record"
# 使用自定义模板的文件路径,模板用于创建es的索引,决定了索引的创建方式
# template => "/opt/elasticsearch/logstash-6.6.1/template/literature_content_record_logstash.json"
#user => "elastic"
#password => "changeme"
}
}
stdout {
codec => json_lines
}
}
8.下载mysql-connector-java-8.0.11.jar,放到配置的路径下
9、在终端启动logstash就可以进行数据同步了
logstash -f D:\soft\third_soft\elasticsearch\logstash-8.10.4\config\logstash.conf
10、在bin目录下启动kibana
11、点击开发工具查看
12、查看es中的数据