部署Filebeat+ELK详解
- 1. 部署Filebeat节点(CentOS 7-4)
- 1.1 部署Apache服务
- 1.2 部署Filebeat服务
- 2. filter插件
- 2.1 grok正则捕获插件
- 2.1.1 内置正则表达式调用
- 2.1.2 自定义表达式调用
- 2.1.3 设置正则表达式过滤条件
- 2.2 mutate数据修改插件
- 2.2.1 Mutate过滤器常用的配置选项
- 2.2.2 Mutate过滤器示例
- 2.2.3 mutate数据修改插件实战演练
- 2.3 multiline多行合并插件
- 2.4 date时间处理插件
- 3.实验中的故障案例
- 4. logstash filter过滤插件总结
接上文安装与部署ELK详解
1. 部署Filebeat节点(CentOS 7-4)
###关闭和禁止防火墙开机自启功能
systemctl stop firewalld
systemctl disable firewalld
setenforce 0
sed -i 's/enforcing/disabled/' /etc/selinux/config
1.1 部署Apache服务
(1)在Filebeat节点上,安装Apache服务
yum install -y httpd
(2)修改Apache服务的配置文件
vim /etc/httpd/conf/httpd.conf
ServerName www.clr.com:80
(3)开启Apache服务
systemctl start httpd
systemctl enable httpd
netstat -lntp | grep httpd
(4)浏览器访问,验证Apache服务
http://192.168.80.40/
1.2 部署Filebeat服务
(1) 安装Filebeat
cd /opt
rz -E
#filebeat-6.7.2-linux-x86_64.tar.gz
tar xf filebeat-6.7.2-linux-x86_64.tar.gz
(2)设置filebeat的主配置文件
cd /opt/filebeat-6.7.2-linux-x86_64/
cp filebeat.yml{,.bak}
vim filebeat.yml
###15行-------------
filebeat.inputs:
- type: log #指定 log 类型,从日志文件中读取消息
enabled: true
paths:
- /var/log/httpd/access_log #指定监控的日志文件
- tags: ["filebaeat"]
#设置索引标签
###46行--------------------
fields: #可以使用 fields 配置选项设置一些参数字段添加到 output 中
service_name: httpd #重点强调,service_name:与httpd中间存在一个空格
log_type: access
from: 192.168.80.40
--------------Elasticsearch output-------------------
###152行
(全部注释掉)
----------------Logstash output---------------------
###165行
output.logstash:
hosts: ["192.168.80.30:5044"] #指定 logstash 的 IP 和端口
(3)启动filebeat
nohup ./filebeat -e -c filebeat.yml > filebeat.out &
#-e:输出到标准输出,禁用syslog/文件输出
#-c:指定配置文件
#nohup:在系统后台不挂断地运行命令,退出终端不会影响程序的运行
(4)在Logstash组件所在节点上新建一个Logstash 配置文件(CentOS 7-3)
vim /etc/logstash/conf.d/filebeat.conf
input {
beats {
port => "5044"
}
}
output {
elasticsearch {
hosts => ["192.168.80.10:9200","192.168.80.20:9200"]
index => "%{[fields][service_name]}-%{+YYYY.MM.dd}"
}
stdout {
codec => rubydebug
}
}
(5)启动logstash,加载配置文件
cd /etc/logstash/conf.d
logstash -f filebeat.conf #切记logstash服务一定要在/etc/logstash/conf.d配置文件的绝对路径下启动
###在filebeat节点上启动filebeat服务,并检查配置我文件filebeat.yml
./filebeat -e -c filebeat.yml #在加载logstash -f /etc/logstash/conf.d/*.conf配置文件时,要保证filebeat服务一直是开启的状态
(6)浏览器访问
http://192.168.80.10:5601
登录Kibana,单击“Create Index Pattern”按钮添加索引“filebeat-*”,单击 “create” 按钮创建,单击 “Discover” 按钮可查看图表信息及日志信息。
2. filter插件
对于Logstash的Filter,这个才是Logstash最强大的地方。Filter插件也非常多,我们常用到的grok、date、mutate、mutiline四个插件。
对于filter的各个插件执行流程,可以看下面这张图:
2.1 grok正则捕获插件
grok使用文本片段切分的方式来切分日志事件.
2.1.1 内置正则表达式调用
语法:%{SYNTAX:SEMANTIC}
-
SYNTAX代表匹配值的类型,例如,0.11可以NUMBER类型所匹配,10.222.22.25可以使用IP匹配。
-
SEMANTIC表示存储该值的一个变量声明,它会存储在elasticsearch当中,方便kibana做字段搜索和统计,你可以将一个IP定义为客户端IP地址client_ip_address,如%{IP:client_ip_address},所匹配到的值就会存储到client_ip_address这个字段里边,类似数据库的列名,也可以把 event log中的数字,当成数字类型存储在一个指定的变量当中,比如响应时间http_response_time,假设event log record如下:
message: 192.168.80.10 GET /index.html 15824 0.043
#可以使用如下grok pattern来匹配这种记录
%{IP:client_id_address} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:http_response_time}
IPV4的正则表达式
###grok内置IPV4正则表达式
(?<![0-9])(?:(?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5]))(?![0-9])
###自定义IPV4正则表达式思路
[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}
0-255
0-9 [0-9]
10-99 [1-9][0-9]
100-199 1[0-9][0-9]
200-249 2[0-4][0-9]
250-255 25[O-5]
2.1.2 自定义表达式调用
语法:(?<field_name>pattern)
举例:捕获10或11和长度的十六进制数的queue_id可以使用表达式(?<queue_id>[0-9A-F]{10,11})
message: 192.168.80.10 GET /index.html 15824 0.043
(?<remote_addr>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) (?<http_method>[A-Z]+) (?<request_uri>/.*) (?<response_bytes>[0-9]+) (?<response_time>[0-9\.]+)
活学活用
#字段名 正则表达式过滤条件
remote_addr IP
log_time HTTPDATE
http_method WORD
request_url URIPATH
http_ver .+
response_code NUMBER
user_agent .+
实战演练:
使用正则表达式过滤日志数据记录
###日志数据记录一
192.168.80.1 - - [11/Jul/2023:09:48:24 +0800] "GET / HTTP/1.1" 403 4897 "-" "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.5359.125 Safari/537.36"
###日志数据记录二
192.168.80.1 - - [11/Jul/2023:10:24:52 +0800] "GET /test.html HTTP/1.1" 200 32 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36"
###日志数据记录三
192.168.80.20 - - [11/Jul/2023:10:24:36 +0800] "GET /test.html HTTP/1.1" 200 32 "-" "curl/7.29.0"
###使用正则表达式进行过滤日志数据
%{IP:remote_addr} - - \[%{HTTPDATE:logtime}\] \"%{WORD:http_method} %{URIPATH:request_url} (?<http_ver>.+)\" %{NUMBER:response_code} \d+ \".+\" \"(?<user_agent>.+)\"
2.1.3 设置正则表达式过滤条件
#在logstash节点中,添加正则表达式过滤条件
vim /etc/logstash/conf.d/filebeat.conf
###添加如下对日志数据,进行过滤的正则表达式条件
filter {
grok {
match => ["message","%{IP:remote_addr} - - \[%{HTTPDATE:logtime}\] \"%{WORD:http_method} %{URIPATH:request_url} (?<http_ver>.+)\" %{NUMBER:response_code} \d+ \".+\" \"(?<user_agent>.+)\""]
}
}
###重启filebeat服务和logstash服务
cd /opt/filebeat-6.7.2-linux-x86_64
./filebeat -e -c filebeat.yml #重启filebeat服务
#重启logstash服务
cd /etc/logstash/conf.d
logstash -f /etc/logstash/conf.d/filebeat.conf
2.2 mutate数据修改插件
它提供了丰富的基础类型数据处理能力。可以重命名,删除,替换和修改事件中的字段。
2.2.1 Mutate过滤器常用的配置选项
add_field 向事件添加新字段,也可以添加多个字段
remove_field 从事件中删除任意字段(只能删除logstash自己日志中的字段,无法删除filebeat携带的日志字段)
add_tag 向事件添加任意标签,在tag字段中添加一段自定义的内容,当tag字段中超过一个内容的时候会变成数组
remove_tag 从事件中删除标签(如果存在)
convert 将字段值转换为另一种数据类型
id 向现场事件添加唯一的ID
lowercase 将字符串字段转换为其小写形式
replace 用新值替换字段
strip 删除开头和结尾的空格
uppercase 将字符串字段转换为等效的大写字母
update 用新值更新现有字段
rename 重命名事件中的字段
gsub 通过正则表达式替换字段中匹配到的值
merge 合并数组或 hash 事件
split 通过指定的分隔符分割字段中的字符串为数组
2.2.2 Mutate过滤器示例
将字段old_field重命名为new_field
filter {
mutate {
#写法1,使用中括号括起来
rename => ["old_field" => "new_field"]
#写法2,使用大括号{}括起来
rename => { "old_field" => "new_field" }
}
}
添加字段
filter {
mutate {
add_field => {
"f1" => "field1"
"f2" => "field2"
}
}
}
将字段删除
filter {
mutate {
remove_field => ["message", "@version", "tags"]
}
}
将filedName1字段数据类型转换成string类型,filedName2字段数据类型转换成float类型
filter {
mutate {
#写法1,使用中括号括起来
convert => ["filedName1", "string"]
#写法2,使用大括号{}括起来
convert => { "filedName2" => "float" }
}
}
将filedName字段中所有"/“字符替换为”_"
filter {
mutate {
gsub => ["filedName", "/" , "_"]
}
}
将filedName字段中所有",“字符后面添加空格
filter {
mutate {
gsub => ["filedName", "," , ", "]
}
}
将filedName字段以"|"为分割符拆分数据成为数组
filter {
mutate {
split => ["filedName", "|"]
}
}
合并 “filedName1” 和 “ filedName2” 两个字段
filter {
merge { "filedName2" => "filedName1" }
}
用新值替换filedName字段的值
filter {
mutate {
replace => { "filedName" => "new_value" }
}
}
添加字段first,值为message数组的第一个元素的值
filter {
mutate {
split => ["message", "|"]
add_field => {
"first" => "%{[message][0]}"
}
}
}
有条件的添加标签
filter {
#在日志文件路径包含 access 的条件下添加标签内容
if [path] =~ "access" {
mutate {
add_tag => ["Nginx Access Log"]
}
}
#在日志文件路径是 /var/log/nginx/error.log 的条件下添加标签内容
if [path] == "/var/log/nginx/error.log" {
mutate {
add_tag => ["Nginx Error Log"]
}
}
}
2.2.3 mutate数据修改插件实战演练
实战演练一:
vim /etc/logstash/conf.d/filebeat.conf
###在logstash节点中,添加muate修改日志字段模块信息
filter {
mutate {
remove_field => ["message", "@version"]
rename => { "source" => "log_path" }
}
}
###重启filebeat服务和logstash服务
cd /opt/filebeat-6.7.2-linux-x86_64
./filebeat -e -c filebeat.yml #重启filebeat服务
#重启logstash服务
cd /etc/logstash/conf.d
logstash -f /etc/logstash/conf.d/filebeat.conf
实战演练二:
vim /etc/logstash/conf.d/filebeat.conf
###在logstash节点中,添加muate修改日志字段模块信息
filter {
mutate {
remove_field => ["message", "@version"]
rename => { "source" => "log_path" }
replace => { "log_path" => "access_log" }
add_field => { "myname" => "gzy" }
}
}
###重启filebeat服务和logstash服务
cd /opt/filebeat-6.7.2-linux-x86_64
./filebeat -e -c filebeat.yml #重启filebeat服务
#重启logstash服务
cd /etc/logstash/conf.d
logstash -f /etc/logstash/conf.d/filebeat.conf
2.3 multiline多行合并插件
java错误日志一般都是一条日志很多行的,会把堆栈信息打印出来,当经过logstash解析后,每一行都会当做一条记录存放到ES, 那这种情况肯定是需要处理的。 这里就需要使用multiline插件,对属于同一个条日志的记录进行拼接。
(1)在logstas节点上安装multiline插件
cd /usr/share/logstash
rz -E
#logstash-offline-plugins-6.7.2.zip
./bin/logstash-plugin install file:///usr/share/logstash/logstash-offline-plugins-6.7.2.zip
###检查下插件是否安装成功,可以执行以下命令查看插件列表
./bin/logstash-plugin list
使用multiline插件
第一步:每一条日志的第一行开头都是一个时间,可以用时间的正则表达式匹配到第一行。
第二步:然后将后面每一行的日志与第一行合并。
第三步:当遇到某一行的开头是可以匹配正则表达式的时间的,就停止第一条日志的合并,开始合并第二条日志。
第四步:重复第二步和第三步。
(2)在logstash节点中制作java类型的错误日志信息
vim /var/log/java.log
###自定义java类型的错误日志信息
2022-11-11 17:09:19.774[XNIo-1 task-1]ERROR com.passjava.controlle .NembercController-查询用户 活动数据>失败,异常信息为:
com.passjava.exception.MemberException: 当前没有配置活动规则
at com.passjava.service.impL.queryAdmin(DailyServiceImpl.java:1444)
at com.passjava.service.impl.dailyserviceImpL$$FastcLass
2022-11-11 17:10:56.256][KxNIo-1 task-1] ERROR com.passjava.controlle .NemberControl1er-查询员工 饭活动>数据失败,异常信息为:
com.passjava.exception.MemberException: 当前没有配置活动规则
at com.passjava.service.impL.queryAdmin(DailyServiceImpl.java:1444)
at com.passjava.service.impL.daiLyserviceImpL$$FastcLass
(3)指定java.log日志文件的输入流和输出流
vim /etc/logstash/conf.d/java.conf
###指定java.log日志文件的输入流和输出流
input {
file {
path => "/var/log/java.log"
type => "java"
start_position => "beginning"
}
}
filter {
multiline {
pattern => "^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}.\d{3}"
negate => true
what => "previous"
}
}
output {
elasticsearch {
hosts => ["192.168.80.10:9200","192.168.80.20:9200"]
index => "java-%{+YYYY.MM.dd}"
}
}
###启动logstash服务,检查java.conf文件
cd /etc/logstash/conf.d
logstash -f java.conf #采集java.conf的日志文件
-
pattern:用来匹配文本的表达式,也可以是grok表达式
-
what:如果pattern匹配成功的话,那么匹配行是归属于上一个事件,还是归属于下一个事件。
-
previous: 归属于上一个事件,向上合并。
-
next: 归属于下一个事件,向下合并
-
negate:是否对pattern的结果取反。false:不取反,是默认值。true:取反。将多行事件扫描过程中的行匹配逻辑取反(如果pattern匹配失败,则认为当前行是多行事件的组成部分)
2.4 date时间处理插件
用于分析字段中的日期,然后使用该日期或时间戳作为事件的logstash时间戳。
在Logstash产生了一个Event对象的时候,会给该Event设置一个时间,字段为“@timestamp”,同时,我们的日志内容一般也会有时间,但是这两个时间是不一样的,因为日志内容的时间是该日志打印出来的时间,而“@timestamp”字段的时间是input插件接收到了一条数据并创建Event的时间,所以一般来说的话==“@timestamp”的时间要比日志内容的时间晚一点,因为Logstash监控数据变化,数据输入,创建Event导致的时间延迟==。这两个时间都可以使用,具体要根据自己的需求来定。
(1)在logstash节点中添加如下时间戳内容
vim /etc/logstash/conf.d/filebeat.conf
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => ".+ - - \[%{HTTPDATE:log_time}\] .+" }
}
date {
match => ["access_time", "dd/MMM/YYYY:HH:mm:ss Z", "UNIX", "yyyy-MM-dd HH:mm:ss", "dd-MMM-yyyy HH:mm:ss"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
mutate {
remove_field => ["message", "@version"]
rename => { "source" => "log_path" }
replace => { "log_path" => "access_log" }
add_field => { "myname" => "gzy" }
}
}
output {
elasticsearch {
hosts => ["192.168.80.10:9200","192.168.80.20:9200"]
index => "%{[fields][service_name]}-%{+YYYY.MM.dd}"
}
stdout {
codec => rubydebug
}
}
logstash -f filebeat.conf
3.实验中的故障案例
报错一:
Logstash could not be started because there is already another instance using the configured data directory.
解决步骤:
cd /var/lib/logstash
###删除之前运行的.lock缓存
rm -rf .lock
ps -ef | grep logstash
kill -9 logstash服务的进程号
报错二:
No configuration found in the configured sources.
解决步骤:
cd /etc/logstash/conf.d
###使用绝对路径方式,执行logstash服务的配置文件
logstash -f /etc/logstash/conf.d filebeat.conf
4. logstash filter过滤插件总结
grok 将大文本字段进行分片成多个小字段 %{内置正则:字段名} (?<字段名>自定义正则)
date 将logstash收集的日志事件时间与实际记录的日志时间进行格式统一,需要配合grok模块使用
mutate 对logstash收集的日志事件字段进行再处理,可以重命名,删除,替换和修改事件中的字段
multiline 将多行日志内容合并成一整行