kafka 3.4.1
elk+filebeat+kafka
实现日志收集
httpd1 mysql1 topic 2.7 3.0
关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
setenforce 0
安装 JDK
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
java -version
安装 Zookeeper
cd /opt
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin /opt/zookeeper
修改配置文件
cd /opt/zookeeper/conf/
cp zoo_sample.cfg zoo.cfg
tickTime=2000 #通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
initLimit=10 #Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s
syncLimit=5 #Leader和Follower之间同步通信的超时时间,这里表示如果超过52s,Leader认为Follwer死掉,并从服务器列表中删除Follwer
dataDir=/opt/zookeeper/data ●修改,指定保存Zookeeper中的数据的目录,目录需要单独创建
dataLogDir=/opt/zookeeper/logs ●添加,指定存放日志的目录,目录需要单独创建
clientPort=2181 #客户端连接端口
#添加集群信息
server.1=192.168.176.70:3188:3288
server.2=192.168.176.71:3188:3288
server.3=192.168.176.72:3188:3288
在每个节点上创建数据目录和日志目录
mkdir /opt/zookeeper/data
mkdir /opt/zookeeper/logs
在每个节点的dataDir指定的目录下创建一个 myid 的文件
echo 1 > /opt/zookeeper/data/myid
echo 2 > /opt/zookeeper/data/myid
echo 3 > /opt/zookeeper/data/myid
配置 Zookeeper 启动脚本
vim /etc/init.d/zookeeper
#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/opt/zookeeper'
case $1 in
start)
echo "---------- zookeeper 启动 ------------"
$ZK_HOME/bin/zkServer.sh start
;;
stop)
echo "---------- zookeeper 停止 ------------"
$ZK_HOME/bin/zkServer.sh stop
;;
restart)
echo "---------- zookeeper 重启 ------------"
$ZK_HOME/bin/zkServer.sh restart
;;
status)
echo "---------- zookeeper 状态 ------------"
$ZK_HOME/bin/zkServer.sh status
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac
设置开机自启
chmod +x /etc/init.d/zookeeper
chkconfig --add zookeeper
分别启动 Zookeeper
service zookeeper start
查看当前状态
service zookeeper status
部署 kafka 集群
安装:
cd /opt/
tar zxvf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 kafka
修改配置文件
cd kafka/config/
cp server.properties server.properties.bak
vim server.properties
改经纪人id
如果修改了id 28行可以不改
broker.id=0 ●21行,broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置 broker.id=1、broker.id=2
listeners=PLAINTEXT://192.168.80.10:9092 ●31行,指定监听的IP和端口,如果修改每个broker的IP需区分开来,也可保持默认配置不用修改
num.network.threads=3 #42行,broker 处理网络请求的线程数量,一般情况下不需要去修改
num.io.threads=8 #45行,用来处理磁盘IO的线程数量,数值应该大于硬盘数
socket.send.buffer.bytes=102400 #48行,发送套接字的缓冲区大小
socket.receive.buffer.bytes=102400 #51行,接收套接字的缓冲区大小
socket.request.max.bytes=104857600 #54行,请求套接字的缓冲区大小
log.dirs=/var/log/kafka #60行,kafka运行日志存放的路径,也是数据存放的路径
num.partitions=1 #65行,topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖
num.recovery.threads.per.data.dir=1 #69行,用来恢复和清理data下数据的线程数量
log.retention.hours=168 #103行,segment文件(数据文件)保留的最长时间,单位为小时,默认为7天,超时将被删除
log.segment.bytes=1073741824 #110行,一个segment文件最大的大小,默认为 1G,超出将新建一个新的segment文件
#Kafka 以日志文件的形式维护其数据,而这些日志文件被分割成多个日志段。当一个日志段达到指定的大小时,就会创建一个新的日志段。
配置连接Zookeeper集群地址
zookeeper.connect=192.168.176.70:2181,192.168.176.71:2181,192.168.176.72:2181
kafka默认不允许删除主题
每一台配置一下
修改环境变量日志段是主题分区日志文件的一部分。
vim /etc/profile
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
配置 Zookeeper 启动脚本
vim /etc/init.d/kafka
设置开机自启
chmod +x /etc/init.d/kafka
chkconfig --add kafka
分别启动 Kafka
service kafka start
Kafka 命令行操作
创建topic
kafka-topics.sh --create --bootstrap-server 192.168.176.70:9092,192.168.176.71:9092,192.168.176.72:9092 --replication-factor 2 --partitions 3 --topic cc
查看当前服务器中的所有 topic
kafka-topics.sh --list --bootstrap-server 192.168.176.70:9092,192.168.176.71:9092,192.168.176.72:9092
查看某个 topic 的详情
kafka-topics.sh --describe --bootstrap-server 192.168.176.70:9092,192.168.176.71:9092,192.168.176.72:9092
发布消息
kafka-console-producer.sh --broker-list 192.168.176.70:9092,192.168.176.71:9092,192.168.176.72:9092 --topic cc
消费消息
kafka-console-consumer.sh --bootstrap-server 192.168.176.70:9092,192.168.176.71:9092,192.168.176.72:9092 --topic cc
kafka-console-consumer.sh --bootstrap-server 192.168.176.70:9092,192.168.176.71:9092,192.168.176.72:9092 --topic cc --from-beginning
创建单个主题
kafka-topics.sh --create --bootstrap-server 192.168.176.70:9092 --replication-factor 2 --partitions 3 --topic test1
修改分区数
kafka-topics.sh --bootstrap-server 192.168.176.70:9092,192.168.176.71:9092,192.168.176.72:9092 --alter --topic test1 --partitions 6
删除 topic
kafka-topics.sh --delete --bootstrap-server 192.168.176.70:9092,192.168.176.71:9092,192.168.176.72:9092 --topic test1
安装 Filebeat
tar zxvf filebeat-6.7.2-linux-x86_64.tar.gz
mv filebeat-6.7.2-linux-x86_64/ filebeat
安装logstash
#上传软件包 logstash-6.7.2.rpm 到/opt目录下
cd /opt
rpm -ivh logstash-6.7.2.rpm
systemctl start logstash.service
systemctl enable logstash.service
ln -s /usr/share/logstash/bin/logstash /usr/local/bin/
安装nginx或者httpd
yum -y install epel-release
yum -y install nginx
或
yum -y install httpd
部署 Zookeeper+Kafka 集群
2.部署 Filebeat
cd /usr/local/filebeat
vim filebeat.yml
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
- /var/log/nginx/error.log
tags: ["nginx"]
fields:
service_name: 192.168.176.10_nginx
log_type: nginx
from: 192.168.176.10
output.kafka:
enabled: true
hosts: ["192.168.233.70:9092","192.168.233.71:9092","192.168.233.72:9092"]
topic: "nginx"
底下output 注释
nohup ./filebeat -e -c filebeat.yml >filebeat.out &
tail -f filebeat.out
部署 ELK,在 Logstash 组件所在节点上新建一个 Logstash 配置文件
cd /etc/logstash/conf.d/
vim kafka.conf
input {
kafka {
bootstrap_servers => "192.168.176.70:9092,192.168.176.71:9092,192.168.176.72:9092"
topics => "nginx"
type => "nginx_kafka"
codec => "json"
auto_offset_reset => "earliest"
decorate_events => true
}
}
output {
if "nginx" in [tags] {
elasticsearch {
hosts => ["192.168.176.10:9200","192.168.176.50:9200"]
index => "%{[fields][service_name]}-%{+YYYY.MM.dd}"
}
}
stdout { codec => rubydebug }
}
logstash -f kafka.conf --path.data /opt/test1 &
httpd服务
yum -y install httpd
安装 Filebeat
tar zxvf filebeat-6.7.2-linux-x86_64.tar.gz
mv filebeat-6.7.2-linux-x86_64/ filebeat
安装logstash
#上传软件包 logstash-6.7.2.rpm 到/opt目录下
cd /opt
rpm -ivh logstash-6.7.2.rpm
systemctl start logstash.service
systemctl enable logstash.service
ln -s /usr/share/logstash/bin/logstash /usr/local/bin/
部署 Zookeeper+Kafka 集群
2.部署 Filebeat
cd /usr/local/filebeat
vim filebeat.yml
底下output 注释
nohup ./filebeat -e -c filebeat.yml >filebeat.out &
tail -f filebeat.out
在 Logstash 组件所在节点上新建一个 Logstash 配置文件
cd /etc/logstash/conf.d/
vim kafkahttpd.conf
logstash -f kafkahttpd.conf --path.data /opt/test1 &