部署ELFK+zookeeper+kafka架构

目录

前言

一、环境部署

二、部署ELFK

1、ELFK ElasticSearch 集群部署

1.1 配置本地hosts文件

1.2 安装 elasticsearch-rpm 包并加载系统服务

1.3 修改 elasticsearch 主配置文件

1.4 创建数据存放路径并授权

1.5 启动elasticsearch是否成功开启

1.6 查看节点信息

2、ELFK Logstash 部署

2.1 安装 logstash

2.2 测试 logstash

3、ELFK Kiabana 部署

3.1 安装 Kiabana

3.2 设置 Kibana 的主配置文件并启动服务

4、ELFK filebeat 部署

4.1 安装 filebeat + httpd

4.2 设置 filebeat 的主配置文件

三、部署Zoopkeeper + Kafak 集群

1、部署 Zoopkeeper 集群

1.1 部署 Zoopkeeper 集群

1.2 设置主配置文件

1.3 创建数据目录和日志目录

1.4 创建myid文件

1.5 配置 Zookeeper 启动脚本并设置开机自启

1.6 分别启动 Zookeeper

2、部署 Kafak 集群

2.1 设置主配置文件

2.2 创建数据目录和日志目录

2.3 创建myid文件

2.4 配置 Zookeeper 启动脚本并设置开机自启

2.5 分别启动 Zookeeper

四、测试

1、定义 logstash 的配置文件

2、查看所有的索引

3、登录 Kibana 添加索引 


前言

feilbeat + 缓存/消息队列+ Logstash + Elasticsearch + Kibana 模式

这是一种加健壮高效的架构,适合处理海量复杂的日志数据。

在这种模式下,filebeat和iogstach之间加入缓存或消息队列组件,如redis、kafka或RabbitMQ等。

这样可以降低对目志源主机的影响 ,提高目志传输的稳定性和可靠性,以及实现负载均衡和高可用

  • Logstash:从Kafka集群中消费日志数据,进行必要的数据解析、过滤、转换等预处理操作,然后将结构化后的数据发送到Elasticsearch
  • Elasticsearch:存储经过处理的日志数据,提供全文搜索、聚合分析等功能,便于后期进行日志分析和故障排查
  • Kibana:作为前端展示工具,基于Elasticsearch的数据创建可视化图表和仪表盘,为用户提供友好的日志分析界面
  • Filebeat/Fluentd:负责从各服务器节点上实时收集日志数据,Filebeat轻量级,适合大规模部署,Fluentd功能强大,支持丰富的插件和灵活的过滤规则
  • ZooKeeper:在某些场景下,ZooKeeper可以用于管理Kafka集群的元数据,例如Broker注册、Topic的分区分配等,确保Kafka集群的稳定性和一致性。同时,对于Logstash或Kafka Connect这类组件,也可以通过ZooKeeper获取集群配置信息
  • Kafka:作为一个分布式消息队列系统,承担起数据缓冲和中转的角色。日志数据先发送到Kafka集群,一方面可以缓解Logstash或Filebeat的压力,另一方面支持多消费者模型,允许数据被多个下游系统并行消费。此外,Kafka的高吞吐量和持久化特性使得系统在面临大量日志输入时仍能保持稳定

数据流向:应用程序日志 ---> Filebeat ---> Kafka ---> Logstash ---> Elasticsearch ---> Kibana 

一、环境部署

服务器配置(越高性能越好)主机名ip地址操作系统

主要软件

logstash节点/logstash172.16.12.10centos7.4

Logstash、Apache

es_node1节点2C/4Ges_node1172.16.12.12centos7.4ElasticSearch、Kibana
es_node2节点2C/4Ges_node2172.16.12.13centos7.4ElasticSearch
filebeat节点/filebeat172.16.12.15centos7.4filebeat
zookeeper+kafka节点1/zk-kfk01172.16.12.16centos7.4

zookeeper-3.5.7

kafka_2.13-2.7.1

zookeeper+kafka节点2/zk-kfk02172.16.12.17centos7.4

zookeeper-3.5.7

kafka_2.13-2.7.1

zookeeper+kafka节点3/zk-kfk03172.16.12.18centos7.4

zookeeper-3.5.7

kafka_2.13-2.7.1

(1)关闭所有设备的防火墙和核心防护

[root@localhost ~]#systemctl stop firewalld
[root@localhost ~]#setenforce 0

(2)修改所有设备的主机名

[root@localhost ~]#hostnamectl set-hostname logstash
[root@localhost ~]#bash

[root@localhost ~]#hostnamectl set-hostname es_node1
[root@localhost ~]#bash
 
[root@localhost ~]#hostnamectl set-hostname es_node2
[root@localhost ~]#bash

[root@localhost ~]#hostnamectl set-hostname filebeat
[root@localhost ~]#bash

[root@localhost ~]#hostnamectl set-hostname zk-kfk01
[root@localhost ~]#bash

[root@localhost ~]#hostnamectl set-hostname zk-kfk02
[root@localhost ~]#bash

[root@localhost ~]#hostnamectl set-hostname zk-kfk03
[root@localhost ~]#bash

(3)所有设备都需部署java环境,安装oraclejdk

java -version    #不建议使用openjdk,所以三台设备都需安装oraclejdk

# rpm安装oraclejdk

#yum install或rpm -ivh安装oraclejdk
cd /opt    #将rpm软件包传至/opt目录下
rpm -ivh jdk-8u291-linux-x64.rpm

#将openjdk更换至oraclejdk
vim /etc/profile.d/jdk.sh
export JAVA_HOME=/usr/java/jdk1.8.0_201-amd64    #输出定义java的工作目录
export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar   #输出指定java所需的类文件
export PATH=$JAVA_HOME/bin:$PATH   #输出重新定义环境变量,$PATH一定要放在$JAVA_HOME的后面,让系统先读取到工作目录中的版本信息

source /etc/profile.d/jdk.sh  #执行配置文件
java -version
----------------------------------------------------------------------------------------
# 二进制包安装oraclejdk

cd /opt   #将二进制包传至/opt目录下
tar zxvf jdk-8u291-linux-x64.tar.gz -C /usr/local
ln -s /usr/local/jdk1.8.0_291/ /usr/local/jdk
 
vim /etc/profile.d/jdk.sh
export JAVA_HOME=/usr/local/jdk
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
 
source /etc/profile.d/jdk.sh
java -version

二、部署ELFK

1、ELFK ElasticSearch 集群部署

1.1 配置本地hosts文件

es_node节点1和es_node节点2都要配置本地的/etc/hosts文件

echo "172.16.12.12 es_node1" >> /etc/hosts
echo "172.16.12.13 es_node2" >> /etc/hosts

 1.2 安装 elasticsearch-rpm 包并加载系统服务

#安装elasticsearch-rpm 包
cd /opt    #上传elasticsearch-5.5.0.rpm到/opt目录下
rpm -ivh elasticsearch-5.5.0.rpm

#加载系统服务
systemctl daemon-reload    
systemctl enable elasticsearch.service  

 1.3 修改 elasticsearch 主配置文件

cp /etc/elasticsearch/elasticsearch.yml /etc/elasticsearch/elasticsearch.yml.bak
vim /etc/elasticsearch/elasticsearch.yml
--17--取消注释,指定集群名字
cluster.name: my-elk-cluster
--23--取消注释,指定节点名字:Node1节点为node1,Node2节点为node2
node.name: node1
--33--取消注释,指定数据存放路径
path.data: /data/elk_data
--37--取消注释,指定日志存放路径
path.logs: /var/log/elasticsearch/
--43--取消注释,改为在启动的时候不锁定内存
bootstrap.memory_lock: false
--55--取消注释,设置监听地址,0.0.0.0代表所有地址
network.host: 0.0.0.0
--59--取消注释,ES 服务的默认监听端口为9200
http.port: 9200
--68--取消注释,集群发现通过单播实现,指定要发现的节点 node1、node2
discovery.zen.ping.unicast.hosts: ["es_node1", "es_node2"]

grep -v "^#" /etc/elasticsearch/elasticsearch.yml
#过滤到能生效的语句,检查修改的是否正确

1.4 创建数据存放路径并授权

mkdir -p /data/elk_data
chown elasticsearch:elasticsearch /data/elk_data/

1.5 启动elasticsearch是否成功开启

systemctl start elasticsearch.service
ss -antp | grep 9200

综上,两个es_node节点都部署安装完elasticsearch

1.6 查看节点信息

浏览器访问:http://172.16.12.12:9200  、 http://172.16.12.13:9200 来查看节点 es_node1、es_node2 的信息

浏览器访问:http://172.16.12.12:9200/_cluster/health?pretty  、 http://172.16.12.12:9200/_cluster/health?pretty 来查看群集的健康情况,可以看到 status 值为 green(绿色), 表示节点健康运行

绿色:健康,数据和副本,全都没有问题
红色:数据都不完整
黄色:数据完整,但副本有问题 

2、ELFK Logstash 部署

需要安装部署在logstash节点服务器上

2.1 安装 logstash

[root@logstash ~]#cd /opt   #上传软件包 logstash-5.5.1.rpm 到/opt目录下
[root@logstash opt]#rpm -ivh logstash-5.5.1.rpm           
[root@logstash opt]#systemctl enable --now logstash.service
[root@logstash opt]#ln -s /usr/share/logstash/bin/logstash /usr/local/bin/

2.2 测试 logstash

Logstash 命令常用选项:

常用选项说明
-f通过这个选项可以指定 Logstash 的配置文件,根据配置文件配置 Logstash 的输入和输出流
-e从命令行中获取,输入、输出后面跟着字符串,该字符串可以被当作 Logstash 的配置(如果是空,则默认使用 stdin 作为输入,stdout 作为输出)
-t测试配置文件是否正确,然后退出

定义输入和输出流:

(1)输入采用标准输入,输出采用标准输出(类似管道)

#输入采用标准输入,输出采用标准输出(类似管道)
[root@logstash opt]#logstash -e 'input { stdin{} } output { stdout{} }'

......
www.baidu.com										#键入内容(标准输入)
2024-04-11T05:50:42.684Z logstash www.baidu.com		#输出结果(标准输出)
www.google.com										#键入内容(标准输入)
2024-04-11T05:51:07.295Z logstash www.google.com	#输出结果(标准输出)

//执行 ctrl+c 退出

(2) 使用 rubydebug 输出详细格式显示,codec 为一种编解码器

#使用 rubydebug 输出详细格式显示,codec 为一种编解码器
[root@logstash opt]#logstash -e 'input { stdin{} } output { stdout{ codec=>rubydebug } }'

......
www.baidu.com										#键入内容(标准输入)
{
    "@timestamp" => 2024-04-11T05:54:11.432Z,       #输出结果(处理后的结果)
      "@version" => "1",
          "host" => "logstash",
       "message" => "www.baidu.com"
}

//执行 ctrl+c 退出

(3) 使用 Logstash 将信息写入 Elasticsearch 中

#使用 Logstash 将信息写入 Elasticsearch 中
[root@logstash opt]#logstash -e 'input { stdin{} } output { elasticsearch { hosts=>["172.16.12.13:9200"] } }'
			  输入				输出	    对接

......
www.baidu.com										#键入内容(标准输入)
www.sina.com.cn										#键入内容(标准输入)
www.google.com										#键入内容(标准输入)

//执行 ctrl+c 退出

3、ELFK Kiabana 部署

安装部署在es_node1节点上

3.1 安装 Kiabana

[root@es_node1 ~]#cd /opt      #上传软件包 kibana-5.5.1-x86_64.rpm 到/opt目录
[root@es_node1 opt]#rpm -ivh kibana-5.5.1-x86_64.rpm

3.2 设置 Kibana 的主配置文件并启动服务

[root@es_node1 opt]#vim /etc/kibana/kibana.yml
--2--取消注释,Kiabana 服务的默认监听端口为5601
server.port: 5601
--7--取消注释,设置 Kiabana 的监听地址,0.0.0.0代表所有地址
server.host: "0.0.0.0"
--21--取消注释,设置和 Elasticsearch 建立连接的地址和端口
elasticsearch.url: "http://172.16.12.12:9200" 
--30--取消注释,设置在 elasticsearch 中添加.kibana索引
kibana.index: ".kibana"

[root@es_node1 opt]#systemctl enable --now kibana.service
[root@es_node1 opt]#netstat -natp | grep 5601      #查看5601端口进程

4、ELFK filebeat 部署

 4.1 安装 filebeat + httpd

[root@filebeat ~]#cd /opt #上传软件包 filebeat-6.6.1-x86_64.rpm 到/opt目录
[root@filebeat opt]#rpm -ivh filebeat-6.6.1-x86_64.rpm
[root@filebeat opt]#yum install -y httpd
[root@filebeat opt]#systemctl start httpd

4.2 设置 filebeat 的主配置文件

[root@filebeat opt]#vim /etc/filebeat/filebeat.yml
filebeat.prospectors:
- type: log          # 指定 log 类型,从日志文件中读取消息
  enabled: true
  paths:
    - /var/log/httpd/access_log   #指定监控的日志文件
  tags: ["access"]
  
- type: log
  enabled: true
  paths:
    - /var/log/httpd/error_log
  tags: ["error"]
  
......
--------------Elasticsearch output-------------------

(全部注释掉)

#添加输出到 Kafka 的配置
----------------kafak output---------------------
output.kafka:
  enabled: true
  hosts: ["172.16.12.16:9092","172.16.12.17:9092","172.16.12.18:9092"]    #指定 Kafka 集群配置
  topic: "httpd"    #指定 Kafka 的 topic

#启动filebeat
[root@filebeat opt]#systemctl start filebeat.service 
[root@filebeat opt]#filebeat -e -c  /etc/filebeat/filebeat.yml

#-e: 这是一个参数,表示以交互式模式(interactive mode)启动 Filebeat。在交互式模式下,Filebeat 会将日志输出打印到控制台,方便用户查看实时日志信息
#-c filebeat.yml: 这也是一个参数,指定了 Filebeat 的配置文件。通过指定这个配置文件,Filebeat 将会加载其中定义的配置选项和设置

三、部署Zoopkeeper + Kafak 集群

1、部署 Zoopkeeper 集群

1.1 部署 Zoopkeeper 集群

三台zookeeper+kafka节点服务器都要安装zookeeper

#下载安装包
官方下载地址:https://archive.apache.org/dist/zookeeper/

下载apache-zookeeper-3.5.7-bin.tar.gz到本地的/opt目录下,进行二进制包解压,无需安装,能直接使用

cd /opt
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7

1.2 设置主配置文件

三台节点服务器都要修改各自的配置文件

由于三台节点服务器配置文件需要修改的内容一样,修改完zk-kfk01服务器的主配置文件后,将其直接scp远程拷贝到另外两台节点服务器:zk-kfk02和zk-kfk03

[root@zk-kfk01 opt]#cd /usr/local/zookeeper-3.5.7/conf/
[root@zk-kfk01 conf]#cp zoo_sample.cfg zoo.cfg
[root@zk-kfk01 conf]#vim zoo.cfg     #修改主配置文件
  
tickTime=2000                     
#通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
initLimit=10                      
#Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s
syncLimit=5                       
#Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer
dataDir=/usr/local/zookeeper-3.5.7/data    
#修改,指定保存Zookeeper中的数据的目录,目录需要单独创建
dataLogDir=/usr/local/zookeeper-3.5.7/logs  
#添加,指定存放日志的目录,目录需要单独创建
clientPort=2181                   #客户端连:接端口
server.1=172.16.12.16:3188:3288
server.2=172.16.12.17:3188:3288
server.3=172.16.12.18:3188:3288

#解释
server.A=B:C:D
# A是一个数字,表示这个是第几号服务器。集群模式下需要在zoo.cfg中dataDir指定的目录下创建一个文件myid,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。
# B是这个服务器的地址。
# C是这个服务器Follower与集群中的Leader服务器交换信息的端口。
# D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

#拷贝配置好的 Zookeeper 配置文件到其他节点服务器上
[root@zk-kfk01 conf]# scp zoo.cfg 172.16.12.17:/usr/local/zookeeper-3.5.7/conf/
[root@zk-kfk01 conf]# scp zoo.cfg 172.16.12.18:/usr/local/zookeeper-3.5.7/conf/

1.3 创建数据目录和日志目录

在每个节点上创建数据目录和日志目录

mkdir /usr/local/zookeeper-3.5.7/data
mkdir /usr/local/zookeeper-3.5.7/logs

1.4 创建myid文件

在每个节点的dataDir指定的目录下创建一个 myid 的文件

[root@zk-kfk01 ~]# echo 1 > /usr/local/zookeeper-3.5.7/data/myid
[root@zk-kfk02 ~]# echo 2 > /usr/local/zookeeper-3.5.7/data/myid
[root@zk-kfk03 ~]# echo 3 > /usr/local/zookeeper-3.5.7/data/myid

1.5 配置 Zookeeper 启动脚本并设置开机自启

配置 Zookeeper 启动脚本:

[root@zk-kfk01 ~]#vim /etc/init.d/zookeeper
#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/usr/local/zookeeper-3.5.7'
case $1 in
start)
	echo "---------- zookeeper 启动 ------------"
	$ZK_HOME/bin/zkServer.sh start
;;
stop)
	echo "---------- zookeeper 停止 ------------"
	$ZK_HOME/bin/zkServer.sh stop
;;
restart)
	echo "---------- zookeeper 重启 ------------"
	$ZK_HOME/bin/zkServer.sh restart
;;
status)
	echo "---------- zookeeper 状态 ------------"
	$ZK_HOME/bin/zkServer.sh status
;;
*)
    echo "Usage: $0 {start|stop|restart|status}"
esac

#拷贝写好的zookeeper启动脚本到其他两台节点服务器
[root@zk-kfk01 ~]# scp /etc/init.d/zookeeper 172.16.12.17:/etc/init.d/zookeeper
[root@zk-kfk01 ~]# scp /etc/init.d/zookeeper 172.16.12.18:/etc/init.d/zookeeper

设置开机自启:

chmod +x /etc/init.d/zookeeper
chkconfig --add zookeeper
# 将 "zookeeper" 服务添加到系统的服务管理列表中,并且配置它在系统启动时自动运行
# 前提创建一个名为 "zookeeper" 的服务脚本(通常是放在 /etc/init.d/ 目录下)

1.6 分别启动 Zookeeper

zk-kfk01服务器、zk-kfk02服务器、zk-kfk03服务器依次分别启动

#分别启动 Zookeeper
service zookeeper start

#查看当前状态
service zookeeper status

2、部署 Kafak 集群

 三台节点服务器都要安装zookeeper

#下载安装包
官方下载地址:https://archive.apache.org/dist/zookeeper/

下载apache-zookeeper-3.5.7-bin.tar.gz到本地的/opt目录下,进行二进制包解压,无需安装,能直接使用

cd /opt
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7

2.1 设置主配置文件

三台节点服务器都要修改各自的配置文件

由于三台节点服务器配置文件需要修改的内容一样,修改完zk-kfk01服务器的主配置文件后,将其直接scp远程拷贝到另外两台节点服务器:zk-kfk02和zk-kfk03

[root@zk-kfk01 opt]#cd /usr/local/zookeeper-3.5.7/conf/
[root@zk-kfk01 conf]#cp zoo_sample.cfg zoo.cfg
[root@zk-kfk01 conf]#vim zoo.cfg     #修改主配置文件
  
tickTime=2000                     
#通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
initLimit=10                      
#Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s
syncLimit=5                       
#Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer
dataDir=/usr/local/zookeeper-3.5.7/data    
#修改,指定保存Zookeeper中的数据的目录,目录需要单独创建
dataLogDir=/usr/local/zookeeper-3.5.7/logs  
#添加,指定存放日志的目录,目录需要单独创建
clientPort=2181                   #客户端连:接端口
server.1=172.16.12.16:3188:3288
server.2=172.16.12.17:3188:3288
server.3=172.16.12.18:3188:3288

#解释
server.A=B:C:D
# A是一个数字,表示这个是第几号服务器。集群模式下需要在zoo.cfg中dataDir指定的目录下创建一个文件myid,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。
# B是这个服务器的地址。
# C是这个服务器Follower与集群中的Leader服务器交换信息的端口。
# D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

#拷贝配置好的 Zookeeper 配置文件到其他节点服务器上
[root@zk-kfk01 conf]# scp zoo.cfg 172.16.12.17:/usr/local/zookeeper-3.5.7/conf/
[root@zk-kfk01 conf]# scp zoo.cfg 172.16.12.18:/usr/local/zookeeper-3.5.7/conf/

2.2 创建数据目录和日志目录

在每个节点上创建数据目录和日志目录

mkdir /usr/local/zookeeper-3.5.7/data
mkdir /usr/local/zookeeper-3.5.7/logs

2.3 创建myid文件

在每个节点的dataDir指定的目录下创建一个 myid 的文件

[root@zk-kfk01 ~]# echo 1 > /usr/local/zookeeper-3.5.7/data/myid
[root@zk-kfk02 ~]# echo 2 > /usr/local/zookeeper-3.5.7/data/myid
[root@zk-kfk03 ~]# echo 3 > /usr/local/zookeeper-3.5.7/data/myid

2.4 配置 Zookeeper 启动脚本并设置开机自启

配置 Zookeeper 启动脚本:

[root@zk-kfk01 ~]#vim /etc/init.d/zookeeper
#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/usr/local/zookeeper-3.5.7'
case $1 in
start)
	echo "---------- zookeeper 启动 ------------"
	$ZK_HOME/bin/zkServer.sh start
;;
stop)
	echo "---------- zookeeper 停止 ------------"
	$ZK_HOME/bin/zkServer.sh stop
;;
restart)
	echo "---------- zookeeper 重启 ------------"
	$ZK_HOME/bin/zkServer.sh restart
;;
status)
	echo "---------- zookeeper 状态 ------------"
	$ZK_HOME/bin/zkServer.sh status
;;
*)
    echo "Usage: $0 {start|stop|restart|status}"
esac

#拷贝写好的zookeeper启动脚本到其他两台节点服务器
[root@zk-kfk01 ~]# scp /etc/init.d/zookeeper 172.16.12.17:/etc/init.d/zookeeper
[root@zk-kfk01 ~]# scp /etc/init.d/zookeeper 172.16.12.18:/etc/init.d/zookeeper

设置开机自启:

chmod +x /etc/init.d/zookeeper
chkconfig --add zookeeper
# 将 "zookeeper" 服务添加到系统的服务管理列表中,并且配置它在系统启动时自动运行
# 前提创建一个名为 "zookeeper" 的服务脚本(通常是放在 /etc/init.d/ 目录下)

2.5 分别启动 Zookeeper

zk-kfk01服务器、zk-kfk02服务器、zk-kfk03服务器依次分别启动

#分别启动 Zookeeper
service zookeeper start

#查看当前状态
service zookeeper status

四、测试

1、定义 logstash 的配置文件

Logstash 配置文件基本由三部分组成:input、output 以及 filter(可选,根据需要选择使用)

  • input:表示从数据源采集数据,常见的数据源如Kafka、日志文件等
  • filter:表示数据处理层,包括对数据进行格式化处理、数据类型转换、数据过滤等,支持正则表达式
  • output:表示将Logstash收集的数据经由过滤器处理之后输出到Elasticsearch。
#格式:
input {...}
filter {...}
output {...}
#在每个部分中,也可以指定多个访问方式。例如,若要指定两个日志来源文件,则格式如下:
input {
    file { path =>"/var/log/messages" type =>"syslog"}
    file { path =>"/var/log/httpd/access.log" type =>"apache"}
}

设置 Logstash 配置文件,数据来源是kafak集群,logstash加工完数据后并将其输出到 elasticsearch 中

[root@logstash ~]# cd /etc/logstash/conf.d/
[root@logstash conf.d]# vim kafka.conf
input {
    kafka {
        bootstrap_servers => "172.16.12.16:9092,172.16.12.17:9092,172.16.12.18:9092"  
# kafka集群地址
        topics  => "httpd"             # 拉取的kafka的指定topic
        type => "httpd_kafka"          # 指定 type 字段
        codec => "json"                # 解析json格式的日志数据
		auto_offset_reset => "latest"  # 拉取最近数据,earliest为从头开始拉取
		decorate_events => true        # 传递给elasticsearch的数据额外增加kafka的属性数据
    }
}
 
output {
  if "access" in [tags] {
    elasticsearch {
      hosts => ["172.16.12.12:9200"]
      index => "httpd_access-%{+YYYY.MM.dd}"
    }
  }
  
  if "error" in [tags] {
    elasticsearch {
      hosts => ["172.16.12.12:9200"]
      index => "httpd_error-%{+YYYY.MM.dd}"
    }
  }
  
  stdout { codec => rubydebug }
}
[root@logstash conf.d]# logstash -f kafka.conf
# 启动 logstash;或者 filebeat -e -c filebeat.yml &

2、查看所有的索引

客户端浏览器先访问apache服务器,否则无法有httpd_access索引
http://172.16.12.15/

生产黑屏操作es时查看所有的索引:

[root@es_node1 ~]# curl -X GET "localhost:9200/_cat/indices?v"

3、登录 Kibana 添加索引 

浏览器访问 http://172.16.12.12:5601,添加索引“httpd*”,查看图表信息及日志信息

(1)添加索引“httpd_access-*”和“httpd_error-*”

(2) 查看图表信息及日志信息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/560499.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

面试不慌张:一文读懂FactoryBean的实现原理

大家好,我是石头~ 在深入探讨Spring框架内部机制时,FactoryBean无疑是一个关键角色,也是面试中经常出现的熟悉面孔。 不同于普通Java Bean,FactoryBean是一种特殊的Bean类型,它的存在并非为了提供业务逻辑,…

使用Python进行自动化测试

👽发现宝藏 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。【点击进入巨牛的人工智能学习网站】。 如何使用Python进行自动化测试:测试框架的选择与应用 自动化测试是软件开发过程…

代码随想录算法训练营DAY28|C++回溯算法Part.4|93.复原IP地址、78.子集、90.子集II

文章目录 93.复原IP地址思路确定非法的范围树形结构 伪代码 78.子集思路伪代码实现CPP代码 90.子集II思路CPP代码用used去重的办法用set去重的版本不使用used数组、set的版本 93.复原IP地址 力扣题目链接 文章讲解:93.复原IP地址 视频讲解:回溯算法如何分…

curlftpfs和fusermount

curlftpfs 是一种 Linux 系统下用来将 FTP 服务器挂载为文件系统的工具,这意味着可以通过本地目录来访问和操作 FTP 服务器上的文件。 挂载FTP服务器到本地系统 为了挂载FTP服务器到本地系统中,使用curlftpfs工具,可以按照以下格式书写命令…

如何通过MSTSC连接Ubuntu的远程桌面?

正文共:666 字 12 图,预估阅读时间:1 分钟 前面我们介绍了如何通过VNC连接Ubuntu 18.04的远程桌面(Ubuntu 18.04开启远程桌面连接),非常简单。但是有小伙伴咨询如何使用微软的远程桌面连接MSTSC&#xff08…

黑灰产行业简介

参考:2021年黑灰产行业研究及趋势洞察报告 1. 有哪些场景面临大量黑灰产攻击? 1.营销活动场景 -- 该场景最为猖獗 1. 抹机及接码注册:黑灰产会使用抹机工具修改设备参数伪装成一台新设备,再配合联系卡商进行手机号接码&#xf…

项目7-音乐播放器3(删除模块+播放音乐模块设计)

1.播放音乐模块设计 1.1 请求响应设计 请求: { get, /music/get?pathxxx.mp3 } 响应: { 音乐数据本身的字节信息 } 1.2 后端代码 1. Files.readAllBytes(String path) : 读取文件中的所有字节,读入内存 &#xff…

java/C#语言开发的医疗信息系统10套源码

java/C#语言开发的医疗信息系统10套源码 云HIS系统源码,云LIS系统源码,PEIS体检系统,手麻系统 源 码,PACS系统源码,微源预约挂号源码,医院绩效考核源码,3D智能导诊系统源码,ADR药物…

UE5(基础动作)多人游戏制作蹲伏

1.创建输入操作,IA_Crouch 在输入映射中添加 IA_Crouch,在触发器中创建两个索引,已按下已松开来创建蹲伏输入。 蹲伏操作必须要勾选角色-角色移动-crouch勾选可蹲伏否则你的人物无法真正蹲下。 为蹲伏创建函数,创建布尔来判断是否蹲伏。 通过…

链表经典算法OJ题目

1.单链表相关经典算OJ题目1:移除链表元素 思路一 直接在原链表里删除val元素,然后让val前一个结点和后一个节点连接起来。 这时我们就需要3个指针来遍历链表: pcur —— 判断节点的val值是否于给定删除的val值相等 prev ——保存pcur的前…

LCR 023. 相交链表

给定两个单链表的头节点 headA 和 headB ,请找出并返回两个单链表相交的起始节点。如果两个链表没有交点,返回 null 。 图示两个链表在节点 c1 开始相交: 题目数据 保证 整个链式结构中不存在环。 注意,函数返回结果后&#xf…

大话设计模式-装饰器模式

大话设计模式书中,作者举了一个穿衣服的例子来为我们引入装饰器模式。 概念 定义 装饰模式在书中的定义是:动态地给一个对象添加一些额外的职责,就增加功能来说,装饰模式比生成子类更灵活。 这句话直接去理解可能会有点抽象&#…

javase__进阶 day13stream流和方法引用

1.不可变集合 1.1 什么是不可变集合 ​ 是一个长度不可变,内容也无法修改的集合 1.2 使用场景 ​ 如果某个数据不能被修改,把它防御性地拷贝到不可变集合中是个很好的实践。 ​ 当集合对象被不可信的库调用时,不可变形式是安全的。 简单…

java:Java中的抽象类

什么是抽象类: 我们知道,类用来模拟现实的事物,一个类模拟一类事物,某个类的一个实例化对象可以模拟某个属于该类的具体事物。类中描绘了该类所有对象的共同的特性,当一个类中给出的信息足够全面时候,我们就…

如何从零开始创建React应用:简易指南

🌟 前言 欢迎来到我的技术小宇宙!🌌 这里不仅是我记录技术点滴的后花园,也是我分享学习心得和项目经验的乐园。📚 无论你是技术小白还是资深大牛,这里总有一些内容能触动你的好奇心。🔍 &#x…

【大模型应用极简开发入门(1)】LLM概述:LLM在AI中所处位置、NLP技术的演变、Transformer与GPT、以及GPT模型文本生成逻辑

文章目录 一. AI中大语言模型的位置与技术发展1. 从AI到Transformer2. NLP:自然语言处理3. LLM大型语言模型:NLP的一种特定技术3.1. LLM定义3.2. LLM的技术发展3.2.1. n-gram模型3.2.2. RNN与LSTM 二. Transformer在LLM中脱颖而出1. Transformer架构能力…

【Linux】详解进程通信中信号量的本质同步和互斥的概念临界资源和临界区的概念

一、同步和互斥的概念 1.1、同步 访问资源在安全的前提下,具有一定的顺序性,就叫做同步。在多道程序系统中,由于资源有限,进程或线程之间可能产生冲突。同步机制就是为了解决这些冲突,保证进程或线程之间能够按照既定…

sketchup创建3D打印机的模型

查了一下,这玩意有几个版本,其中一个sketchup free是免费的,到官网上看看 下载 SketchUp | 免费试用 3D 建模软件 | SketchUp 是个在线网页版,然后可以再这个网站上注册一个账号 弄个邮箱试试看 创建好进入后,里面就…

解锁多智能体路径规划新境界:结合启发式搜索提升ML本地策略

引言:多智能体路径寻找(MAPF)问题的重要性与挑战 在现代自动化和机器人技术迅速发展的背景下,多智能体路径寻找(Multi-agent path finding,简称MAPF)问题的研究变得日益重要。MAPF问题涉及为一…

《QT实用小工具·二十七》各种炫酷的样式表

1、概述 源码放在文章末尾 该项目实现了各种炫酷的样式表,如单选、多选、按钮、日历、表格、下拉框、滚轮等,下面是项目demo演示: 项目部分代码如下: #include "frmmain.h" #include "ui_frmmain.h" #inc…