ELK 是 Elasticsearch 、 Logstash 、 Kibana 的简称
Elasticsearch
是实时全文搜索和分析引擎,提供搜集、分析、存储数据三大功能;是一套开放
REST
和
JAVA API
等结构提供高效搜索功能,可扩展的分布式系统。它构建于
Apache Lucene
搜索引擎库之上。
Logstash
是一个用来搜集、分析、过滤日志的工具。它支持几乎任何类型的日志,包括系统日志、错
误日志和自定义应用程序日志。它可以从许多来源接收日志,这些来源包括
syslog
、消息传递(例如
RabbitMQ
)和
JMX
,它能够以多种方式输出数据,包括电子邮件、
websockets
和
Elasticsearch
。
Kibana
是一个基于
Web
的图形界面,用于搜索、分析和可视化存储在
Elasticsearch
指标中的日志数
据。它利用
Elasticsearch
的
REST
接口来检索数据,不仅允许用户创建他们自己的数据的定制仪表板视
图,还允许他们以特殊的方式查询和过滤数据
使用 spring aop 进行日志收集,然后通过 kafka 将日志发送给 logstash , logstash 再将日志写入elasticsearch ,这样 elasticsearch 就有了日志数据了,最后,则使用 kibana 将存放在elasticsearch 中的日志数据显示出来,并且可以做实时的数据图表分析等等。
为什么要用
ELK
最开始我些项目的时候,都习惯用
log4j
来把日志写到
log
文件中,后来项目有了高可用的要求,我们就
进行了分布式部署
web
,这样我们还是用
log4j
这样的方式来记录
log
的话,那么就有
N
台机子的
N
个
log
目录,这个时候查找
log
起来非常麻烦,不知道问题用户出错
log
是写在哪一台服务器上的,后来,想到
一个办法,干脆把
log
直接写到数据库中去,这样做,虽然解决了查找异常信息便利性的问题了,但存
在两个缺陷 :
1
,
log
记录好多,表不够用啊,又得分库分表了,
2
,连接
db
,如果是数据库异常,那边
log
就丢失了,那么为了解决
log
丢失的问题,那么还得先将
log
写
在本地,然后等
db
连通了后,再将
log
同步到
db
,这样的处理办法,感觉是越搞越复杂。
ELK
职能分工
logstash
做日志对接,接受应用系统的
log
,然后将其写入到
elasticsearch
中,
logstash
可以支持
N
种
log
渠道,
kafka
渠道写进来的、和
log
目录对接的方式、也可以对
reids
中的
log
数据进行监控读
取,等等。
elasticsearch
存储日志数据,方便的扩展特效,可以存储足够多的日志数据。
kibana
则是对存放在
elasticsearch
中的
log
数据进行:数据展现、报表展现,并且是实时的。
1.docker环境搭建ELK+Kafka
搭建
elasticsearch
集群环境
将
elasticsearchCluster
文件夹拷贝虚拟机
使用
Xshell
连接服务器,在服务器上切换至
elasticsearchCluster
目录
赋权
chmod 777 *.sh
创建镜像
./createElasticsearchImage.sh
创建容器
./createElasticsearchContainer.sh
设置宿主机内存
sysctl -w vm.max_map_count=262144
重新启动
docker
systemctl restart docker
根据集群分配修改
kibana.yml
中
elasticsearch
的地址
docker cp kibana.yml kgc_kibana:/usr/local/kibana-6.2.4-linux-x86_64/config/
重新启动
kibana docker restart kgc_kibana
安装
logstash
与
kafka,
镜像
修改
kafka
的
server.properties
连接地址
# 修改为宿主机 IP 如 192.168.31.113advertised.listeners = PLAINTEXT : //kafka : 9092
修改logstash.conf文件
input{kafka {bootstrap_servers => ["kafka:9092"] # 修改为 kafka 的 IPauto_offset_reset => "latest" 将文件夹 kafka 与 logstash 复制到服务器,执行命令,生产镜像ProviderController.javaconsumer_threads => 5decorate_events => truetopics => ["user-error"] # 数组形式,可以填写多个type => "user-error" # 可以自由指定}}output {elasticsearch {hosts => [ "elasticsearch:9200"] # 指向 Elasticsearch 服务地址,可以有多个,注意IP 和端口和实际保持一致index => "%{[type]}log-%{+YYYY-MM-dd}"}}
将文件夹kafka与logstash复制到服务器,执行命令,生产镜像
docker build -t kgc/logstash 路径 /logstash/
根据镜像,生产容器
#kafkadocker run -d --name kgc_kafka -p 9092:9092 --network kgc_elastic_cluster --network-alias kafka kgc/kafka#logstashdocker run -d -it --name kgc_logstash --network kgc_elastic_cluster --networkalias logstash kgc/logstash
编写程序,修改端口号与连接kafka
server :port : 8088spring :kafka :producer :bootstrap-servers : 192.168.31.113 : 9092
ProviderController.java
@RestController
public class ProviderController {
@Autowired
private KafkaTemplate<String, String> KafkaTemplate;
@RequestMapping(value = "/test" )
public String test() throws Exception{
System.out.println(KafkaTemplate);
for (int i = 0; i < 10; i++) {
KafkaTemplate.send("wangzhuanyun", "dm", "wzy222222--->" + i);
}
return "send msg over...";
}
}