基于Kafka的日志采集

目录

前言

架构图

资源列表

基础环境

关闭防护墙

关闭内核安全机制

修改主机名

添加hosts映射

一、部署elasticsearch

修改limit限制

部署elasticsearch

修改配置文件

启动

二、部署filebeat

部署filebeat

添加配置文件

启动

三、部署kibana

部署kibana

修改配置文件

启动

四、部署Kafka

安装java

安装kafka

配置环境变量

创建数据存储目录和日志存储目录

修改zk配置文件

修改Kafka配置文件

启动zk

启动Kafka

测试

五、部署logstash

部署logstash

添加配置文件

启动


前言

        当日志量变得非常大时,传统的日志收集平台可能会遇到性能瓶颈、单点故障或扩展性问题。在这种情况下,引入消息队列(如Kafka)可以显著增强日志收集系统的健壮性、可扩展性和实时性。

以下是当在日志收集平台中加入Kafka时,可以带来的优势和改进:

  1. 缓冲和异步处理
    Kafka作为一个消息队列,可以充当Filebeat(或其他日志收集器)和Logstash(或其他日志处理组件)之间的缓冲层。Filebeat可以将日志数据异步地发送到Kafka,而不需要等待Logstash的即时响应。这样,即使Logstash暂时无法处理数据,Kafka也可以暂时存储数据,直到Logstash恢复处理能力。

  2. 水平扩展
    随着日志量的增长,Kafka可以通过添加更多的节点(brokers)来实现水平扩展。这种扩展方式使得Kafka能够处理更多的并发写入和读取操作,而不会遇到单点故障或性能瓶颈。此外,Kafka的分布式架构还允许数据在多个节点之间进行复制,以提高数据的可靠性和容错性。

  3. 实时数据处理
    Kafka支持实时数据流处理,使得日志数据可以立即被消费和处理。这意味着一旦日志数据被写入Kafka,就可以立即被Logstash(或其他流处理工具)读取和处理,以满足实时分析、监控和告警的需求。

  4. 数据持久化
    Kafka将数据持久化到磁盘上,以确保即使在系统崩溃或重启的情况下,数据也不会丢失。这种持久化机制使得Kafka成为了一个可靠的数据传输和存储平台,特别适用于对日志数据进行长期存储和分析的场景。

  5. 多消费者支持
    Kafka允许多个消费者(如Logstash、其他数据分析工具或应用)从同一个主题(topic)中消费数据。这意味着您可以同时运行多个消费者来处理和分析日志数据,以满足不同的业务需求和数据使用场景。

  6. 可定制性和灵活性
    Kafka提供了丰富的API和工具,使得您可以轻松地定制和扩展日志收集系统。例如,您可以编写自定义的Kafka生产者来收集特定格式的日志数据,或者编写自定义的Kafka消费者来处理和分析日志数据。

  7. 与其他系统的集成
    Kafka是一个广泛使用的消息队列系统,它支持与其他各种系统和工具进行集成。这意味着您可以将Kafka轻松地集成到现有的日志收集、处理、存储和分析系统中,以构建一个更加健壮、可扩展和灵活的日志收集平台。

        综上所述,当日志量变得非常大时,在日志收集平台中加入Kafka可以显著提高系统的性能、可靠性和可扩展性。通过利用Kafka的缓冲、异步处理、水平扩展、实时数据处理、数据持久化、多消费者支持、可定制性和与其他系统的集成能力,您可以构建一个更加健壮、高效和灵活的日志收集系统。

        有需要本次实验软件包的评论区可以找我要,无偿提供。

架构图

资源列表

操作系统配置主机名IP
CentOS7.3.16112C4Ges01192.168.207.131
CentOS7.3.16112C4Gkibana192.168.207.165
CentOS7.3.16112C4Gfilebeat192.168.207.166
CentOS7.3.16112C4Gkafka192.168.207.167
CentOS7.3.16112C4Glogstash192.168.207.168

基础环境

关闭防护墙

systemctl stop firewalld
systemctl disable firewalld

关闭内核安全机制

sed -i "s/.*SELINUX=.*/SELINUX=disabled/g" /etc/selinux/config
reboot

修改主机名

hostnamectl set-hostname es01
hostnamectl set-hostname kibana
hostnamectl set-hostname filebeat
hostnamectl set-hostname kafka
hostnamectl set-hostname logstash

添加hosts映射

cat >> /etc/hosts << EOF
192.168.207.131 es01
192.168.207.165 kibana
192.168.207.166 filebeat
192.168.207.167 kafka
192.168.207.168 logstash
EOF

一、部署elasticsearch

修改limit限制

cat > /etc/security/limits.d/es.conf << EOF
* soft nproc 655360
* hard nproc 655360
* soft nofile 655360
* hard nofile 655360
EOF
​
cat >> /etc/sysctl.conf << EOF
vm.max_map_count=655360
EOF
sysctl -p

部署elasticsearch

mkdir -p /data/elasticsearch
tar zxvf elasticsearch-7.14.0-linux-x86_64.tar.gz -C /data/elasticsearch

修改配置文件

mkdir /data/elasticsearch/{data,logs}

[root@es01 elasticsearch-7.14.0]# grep -v "^#" /data/elasticsearch/elasticsearch-7.14.0/config/elasticsearch.yml
cluster.name: my-application
node.name: es01
path.data: /data/elasticsearch/data
path.logs: /data/elasticsearch/logs
bootstrap.memory_lock: false
network.host: 0.0.0.0
http.port: 9200
cluster.initial_master_nodes: ["es01"]

启动

useradd es 
chown -R es:es /data/
su - es
/data/elasticsearch/elasticsearch-7.14.0/bin/elasticsearch -d

二、部署filebeat

部署filebeat

mkdir -p /data/filebeat
tar zxvf filebeat-7.14.0-linux-x86_64.tar.gz -C /data/filebeat/

添加配置文件

这里提供了两份filebeat配置文件的参考

[root@filebeat filebeat-7.14.0-linux-x86_64]# cat filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/messages         ###要监控的日志文件
setup.template.settings:
  index.number_of_shards: 3
output.kafka:
  #version:0.10.2             ### 根据不同 CKafka 实例开源版本配置
  hosts: ["192.168.207.167:9092"]  ###接入方式所用的IP和端口
  topic: 'topic_test1'       ###topic实例名
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: none
  max_message_bytes: 10000000

[root@filebeat filebeat-7.14.0-linux-x86_64]# cat filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/httpd/access_log         ###要监控的日志文件
  fields:
    kafka_topic: httpd_access
- type: log
  enabled: true
  paths:
    - /var/log/httpd/error_log         ###要监控的日志文件
  fields:
    kafka_topic: httpd_error
setup.template.settings:
  index.number_of_shards: 3
output.kafka:
  #version:0.10.2             ### 根据不同 CKafka 实例开源版本配置
  hosts: ["192.168.207.167:9092"]  ###接入方式所用的IP和端口
  topic: '%{[fields.kafka_topic]}'       ###topic实例名
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: none
  max_message_bytes: 10000000

启动

/data/filebeat/filebeat-7.14.0-linux-x86_64/filebeat -e -c filebeat.yml

三、部署kibana

部署kibana

mkdir -p /data/kibana
tar zxvf kibana-7.14.0-linux-x86_64.tar.gz -C /data/kibana/

修改配置文件

grep -v "^#" /data/kibana/kibana-7.14.0-linux-x86_64/config/kibana.yml  | grep -v "^$"
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://192.168.207.131:9200"]
kibana.index: ".kibana"

启动

useradd kibana
chown -R kibana:kibana /data 
su - kibana
/data/kibana/kibana-7.14.0-linux-x86_64/bin/kibana

四、部署Kafka

安装java

# 安装java环境
yum -y install java-1.8.0-openjdk

安装kafka

tar zxvf kafka_2.12-3.0.0.tgz
mv kafka_2.12-3.0.0 /usr/local/kafka

配置环境变量

# 配置环境变量
cat > /etc/profile.d/zookeeper.sh << 'EOF'
export ZOOKEEPER_HOME=/usr/local/kafka
export PATH=$ZOOKEEPER_HOME/bin:$PATH
EOF

cat > /etc/profile.d/kafka.sh << 'EOF'
export KAFKA_HOME=/usr/local/kafka
export PATH=$KAFKA_HOME/bin:$PATH
EOF

source /etc/profile

创建数据存储目录和日志存储目录

mkdir -p /usr/local/kafka/zookeeper
mkdir -p /usr/local/kafka/log/zookeeper
mkdir -p /usr/local/kafka/log/kafka

# 创建zk需要的myid文件
echo 0 > /usr/local/kafka/zookeeper/myid

修改zk配置文件

# 注意Kafka安装目录下的config目录里
server.properties             #是Kafka的配置文件
zookeeper.properties          #是zookeeper的配置文件
cat >> /usr/local/kafka/config/zookeeper.properties << EOF
dataLogDir=/usr/local/kafka/log/zookeeper
tickTime=2000
initLimit=10
syncLimit=5
server.0=192.168.207.167:2888:3888
EOF

sed -i "s/dataDir\=\/tmp\/zookeeper/dataDir\=\/usr\/local\/kafka\/zookeeper/g" /usr/local/kafka/config/zookeeper.properties

修改Kafka配置文件

# /usr/local/kafka/config/server.properties修改

listeners=PLAINTEXT://192.168.207.167:9092
advertised.listeners=PLAINTEXT://192.168.207.167:9092
log.dirs=/usr/local/kafka/log/kafka
delete.topic.enable=true
zookeeper.connect=192.168.207.167:2181

启动zk

/usr/local/kafka/bin/zookeeper-server-start.sh -daemon /usr/local/kafka/config/zookeeper.properties

启动Kafka

/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

测试

# 创建一个topic
[root@kafka kafka]# bin/kafka-topics.sh --create --bootstrap-server 192.168.207.167:9092 --replication-factor 1   --partitions 1 --topic Hello-Kafka
Created topic Hello-Kafka.

# 往topic里面输入消息
[root@kafka kafka]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.207.167:9092 --topic Hello-Kafka

# 从topic里面消费消息
[root@kafka ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.207.167:9092 --topic Hello-Kafka --from-beginning

# 查看topic列表
[root@kafka kafka]# /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.207.167:9092 --list
Hello-Kafka

# 删除topic
[root@kafka kafka]# bin/kafka-topics.sh --delete --bootstrap-server 192.168.207.167:9092 --topic Hello-Kafka

五、部署logstash

部署logstash

mkdir -p /data/logstash
tar zxvf logstash-7.14.0-linux-x86_64.tar.gz -C /data/logstash/

添加配置文件

mkdir /data/logstash/logstash-7.14.0/conf.d

cat > /data/logstash/logstash-7.14.0/conf.d/system.conf << 'EOF'
input { 
  kafka{ 
    bootstrap_servers =>"192.168.207.167:9092" 
    topics =>"topic_test1" 
    type =>"topic_test1"
    codec =>"json" 
  } 
}
output { 
  if [type] == "topic_test1" {
  elasticsearch { 
    hosts => ["192.168.207.131:9200"] 
    index =>"kafka-system-%{+YYYY.MM.dd}" 
  } 
  }
}

EOF
cat > /data/logstash/logstash-7.14.0/conf.d/httpd.conf << 'EOF'
input { 
  kafka{ 
    bootstrap_servers =>"192.168.207.167:9092" 
    topics =>"httpd_access" 
    type =>"httpd_access"
    codec =>"json" 
  } 
  kafka{ 
    bootstrap_servers =>"192.168.207.167:9092" 
    topics =>"httpd_error" 
    type =>"httpd_error"
    codec =>"json" 
  }
}
output { 
  if [type] == "httpd_access" {
  elasticsearch { 
    hosts => ["192.168.207.131:9200"] 
    index =>"httpd-access-%{+YYYY.MM.dd}" 
  } 
  }
  if [type] == "httpd_error" {
  elasticsearch { 
    hosts => ["192.168.207.131:9200"] 
    index =>"httpd-error-%{+YYYY.MM.dd}" 
  } 
  }
}

EOF

启动

/data/logstash/logstash-7.14.0/bin/logstash -f /data/logstash/logstash-7.14.0/conf.d/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/636862.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

实现底部表情评价,聚焦改变颜色,点击坏评价自动变成好评价

如下&#xff0c;非常简单的一个小玩意。 废话不多说直接上代码&#xff1a; <!--* 轮子的作者: 轮子哥* Date: 2024-05-22 10:43:45* LastEditTime: 2024-05-24 10:28:53 --> <!DOCTYPE html> <html lang"en"><head><meta charset"…

51-指针_野指针,指针运算

51-1 野指针 51-1-1 什么是野指针 概念&#xff1a;野指针就是指针指向的位置是不可知的&#xff08;随机的、不正确的、没有明确限制的) 没有初始化 int main() {int* p;//p没有初始化&#xff0c;就意味着没有明确的指向//一个局部变量不初始化的话&#xff0c;放的是随机…

clickhouse 中的数组(array)和元组(Tuple)—— clickhouse 基础篇(二)

文章目录 数组判断是否为空计算数组长度获取数组元素判断某个元素是否存在数组切片数组元素展开数组元素去重删除连续重复元素连接多个数组数组倒序数组拍平数组元素映射数组元素过滤数组聚合分析计算数组交集计算数组并集计算数组差集SQL 子查询进行集合操作 元组创建元组获取…

八国多语言微盘微交易所系统源码 单控点控 K线完好

安装环境linux NGMySQL5.6PHP7.2&#xff08;函数全删&#xff09;pm2管理器&#xff08;node版本选择v12.20.0&#xff09; config/ database.php 修改数据库链接 设置运行目录 public 伪静态thinkphp

最有效的企业数据防泄漏手段 | 数据泄漏防护系统推荐

随意信息安全意识不断提高&#xff0c;企业纷纷寻求高效的数据防泄漏手段。在众多解决方案中&#xff0c;这五款软件各具特色&#xff0c;但它们的共同目标都是确保企业数据的安全性和保密性。 接下来&#xff0c;我们将逐一介绍这五款软件的特点和优势。 1、Ping 32 Ping32…

C中十进制转十六进制示例

uint8_t QR_code_RxBfr[255]{0}; uint8_t TouchCode[100];memcpy (&Sys.TouchCode[0], &QR_code_RxBfr[0], Sys.QR_code_Len);Str &Sys.TouchCode[TmpVble];Sys.Card_ID 0; while(0 ! isdigit(*Str)){Sys.Card_ID Sys.Card_ID*10 *Str - 0;Str;} 最后在通过以下…

【淘宝商品API接口】淘宝一件代发,如何找到便宜又靠谱的货源商品铺货到自己店铺?

电商商品API接口 对没有自己货源的淘宝小卖家来说&#xff0c;从1688等货源平台拿货做一件代发是最好的选择。不需要囤货&#xff0c;也不需要自己打包发货&#xff0c;投入小、风险低。 在大部分卖家眼里&#xff0c;1688上面的商品很贵&#xff0c;甚至比淘宝同行卖的都贵&…

Ceph集群RBD块存储:快照与Copy-on-Write克隆的基本操作

文章目录 1.RBD块存储镜像克隆概念2.copy-on-write克隆的基本使用2.1.在块存储中创建一个快照2.2.将快照配置成保护模式2.3.基于快照克隆出镜像2.4.使用克隆的镜像2.5.查看一个快照下有哪些克隆的镜像 1.RBD块存储镜像克隆概念 镜像克隆官方文档&#xff1a;https://docs.ceph…

Qt for android 添加自己的java包

java 包 目录 .pro 中添加 OTHER_FILES $$PWD/android/src/ScytheStudio \$$PWD/android/src/Serial 或 DISTFILES android/src/ScytheStudio \android/src/Serial \或(可以在Qt Creator中显示) DISTFILES android/src/ScytheStudio/*.java \android/src/Serial/*.java \

QQ沐个人引导页html源码

好看的QQ沐个人引导页html源码&#xff0c;鼠标移动滑出美丽的线条收缩特效&#xff0c;界面美观大气&#xff0c;源码由HTMLCSSJS组成&#xff0c;记事本打开源码文件可以进行内容文字之类的修改&#xff0c;双击html文件可以本地运行效果&#xff0c;也可以上传到服务器里面 …

【分享】3种方法取消PPT的“限制保护”

PPT如果设置了有密码的“只读方式”&#xff0c;每次打开PPT&#xff0c;都会出现对话框&#xff0c;提示需要输入密码才能修改文件&#xff0c;否则只能以“只读方式”打开。 以“只读方式”打开的PPT就会被限制&#xff0c;无法进行编辑修改等操作。那如果后续不需要“限制保…

Springboot阶段项目---《书城项目》

一 项目介绍 本项目采用集成开发平台IntelliJ IDEA开发了在线作业成绩统计系统的设计与实现&#xff0c;实现了图书商城系统的综合功能和图形界面的显示&#xff0c;可以根据每个用户登录系统后&#xff0c;动态展示书城首页图书&#xff0c;实现了分类还有分页查询&#xff0c…

MySQL -- 相关知识点

1.数据库相关介绍 数据库的选择通常取决于具体的应用需求&#xff0c;如性能、扩展性、数据一致性和易用性等因素。 1. 关系型数据库&#xff08;RDBMS&#xff09; MySQL&#xff1a; 广泛使用的开源数据库&#xff0c;支持大多数操作系统。强调易用性、灵活性和广泛的社区支…

Flutter Text导致A RenderFlex overflowed by xxx pixels on the right.

使用Row用来展示两个Text的时候页面出现如下异常,提示"A RenderFlex overflowed by xxx pixels on the right." The following assertion was thrown during layout: A RenderFlex overflowed by 4.8 pixels on the right.The relevant error-causing widget was:…

ClickHouse课件

列式存储数据库&#xff1a;hbase clickhouse 简介 ClickHouse入门 ClickHouse是俄罗斯的Yandex于2016年开源的列式存储数据库&#xff08;DBMS&#xff09;&#xff0c;使用C语言编写&#xff0c;主要用于在线分析处理查询&#xff08;OLAP&#xff09;&#xff0c;能够使用…

K8S中YAML案例

目录 案例&#xff1a;自主式创建service并关联上面的pod 案例&#xff1a;部署redis 案例&#xff1a;部署myapp 案例&#xff1a;部署MySQL数据库 总结 1.K8S集群中访问流向 K8S集群外部&#xff1a;客户端——nodeIP&#xff1a;nodeport——通过target port——podIP…

【排序算法】堆排序(Heapsort)

✨✨✨专栏&#xff1a;排序算法 &#x1f9d1;‍&#x1f393;个人主页&#xff1a;SWsunlight 目录 ​编辑 前言&#xff1a; 一、堆排序&#xff1a; 时间复杂度&#xff1a; 空间复杂度&#xff1a; 算法稳定性&#xff1a; 二、升序的实现&#xff1a;通过建大堆实…

find 几招在 Linux 中高效地查找目录

1. 介绍 在 Linux 操作系统中&#xff0c;查找目录是一项常见的任务。无论是系统管理员还是普通用户&#xff0c;都可能需要查找特定的目录以执行各种操作&#xff0c;如导航文件系统、备份数据、删除文件等。Linux 提供了多种命令和工具来帮助我们在文件系统中快速找到目标目…

百度软件测试面试经历,期望薪资27K

一面 1、 请为百度搜索框设计测试用例&#xff1f; 2、百度设计框上线前需要进行那些测试&#xff1f; 界面测试&#xff0c;功能测试&#xff0c;性能测试&#xff0c;安全性测试&#xff0c;易用性测试&#xff0c;兼容性测试&#xff0c;UI测试。 3、如何查看http状态码…

高通 Android 12/13冻结屏幕

冻结屏幕很多第一次听到以为是Android一种异常现象&#xff0c;实则不然&#xff0c;就是防止用户在做一些非法操作导致问题防止安全漏洞问题。 1、主要通过用户行为比如禁止下拉状态栏和按键以及onTouch事件拦截等&#xff0c;不知道请看这篇文章&#xff08;Touch事件传递流…