2.部署kafka:9092

官方文档:http://kafka.apache.org/documentation.html

(虽然kafka中集成了zookeeper,但还是建议使用独立的zk集群)

Kafka3台集群搭建环境:

操作系统: centos7

防火墙:全关

3台zookeeper集群内的机器,1台logstash

软件版本: zookeeper-3.4.12.tar.gz

软件版本kafka_2.12-2.1.0.tgz

安装软件

(3台zookeeper集群的机器)

# tar xf kafka_2.12-2.1.0.tgz -C /usr/local/

# ln -s /usr/local/kafka_2.12-2.1.0/ /usr/local/kafka

创建数据目录(3台)

# mkdir /data/kafka-logs

修改第一台配置文件

(注意不同颜色标记的部分)

# egrep -v "^$|^#" /usr/local/kafka/config/server.properties

broker.id=1 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样

listeners=PLAINTEXT://192.168.148.141:9092 #监听套接字

num.network.threads=3 #这个是borker进行网络处理的线程数

num.io.threads=8 #这个是borker进行I/O处理的线程数

socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能

socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘

socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小

log.dirs=/data/kafka-logs #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数

#如果配置多个目录,新创建的topic把消息持久化在分区数最少那一个目录中

num.partitions=1 #默认的分区数,一个topic默认1个分区数

num.recovery.threads.per.data.dir=1 #在启动时恢复日志和关闭时刷新日志时每个数据目录的线程的数量,默认1

offsets.topic.replication.factor=2

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天

message.max.byte=5242880 #消息保存的最大值5M

default.replication.factor=2 #kafka保存消息的副本数

replica.fetch.max.bytes=5242880 #取消息的最大字节数

log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件

log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间,到目录查看是否有过期的消息如果有,删除

zookeeper.connect=192.168.148.141:2181,192.168.148.142:2181,192.168.148.143:2181

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0

修改另外两台配置文件

#scp /usr/local/kafka/config/server.properties kafka-2:/usr/local/kafka/config/

broker.id=2

listeners=PLAINTEXT://192.168.148.142:9092

# scp /usr/local/kafka/config/server.properties kafka-3:/usr/local/kafka/config/

broker.id=3

listeners=PLAINTEXT://192.168.148.143:9092

启动kafka(3台)

[root@host1 ~]# /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

查看启动情况(3台)

[root@host1 ~]# jps

10754 QuorumPeerMain

11911 Kafka

12287 Jps

创建topic来验证

[root@host1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.148.143:2181 --replication-factor 2 --partitions 1 --topic cien

出现Created topic "cien"验证成功运行

在一台服务器上创建一个发布者

[root@host2 ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.148.141:9092 --topic cien

> hello kafka

> ni hao ya

>

在另一台服务器上创建一个订阅者

[root@host3 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.148.142:9092 --topic cien --from-beginning

...

hello kafka

ni hao ya

如果都能接收到,说明kafka部署成功!

[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.10.23:2181 --list #查看所有topic

[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.10.23:2181 --topic qianfeng #查看指定topic的详细信息

Topic:qianfeng PartitionCount:1 ReplicationFactor:2 Configs:

Topic: qianfeng Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3

[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.10.23:2181 --topic qianfeng #删除topic

Topic qianfeng is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.

配置elfk集群订阅和zookeeper和kafka

配置第一台logstash生产消息输出到kafka

yum -y install wget

wget https://d6.injdk.cn/oraclejdk/8/jdk-8u341-linux-x64.rpm

yum localinstall jdk-8u341-linux-x64.rpm -y

java -version

1.安装logstash

tar xf logstash-6.4.1.tar.gz -C /usr/local

ln -s /usr/local/logstash-6.4.1 /usr/local/logstash

2.修改配置文件

cd /usr/local/logstash/config/

vim logstash.yml

http.host: "0.0.0.0"

3.编写配置文件

不要过滤, logstash会将message内容写入到队列中

# cd /usr/local/logstash/config/

# vim logstash-kafka.conf

input {
    file {
      type => "sys-log"
      path => "/var/log/messages"
      start_position => beginning
    }
}
output {
    kafka {
      bootstrap_servers => "192.168.148.141:9092,192.168.148.142:9092,192.168.148.143:9092"     #输出到kafka集群
      topic_id => "sys-log-messages"         #主题名称
      compression_type => "snappy"         #压缩类型
      codec =>  "json"
    }
}

启动logstash

# /usr/local/logstash/bin/logstash -f logstash-kafka.conf

在kafka上查看主题,发现已经有了sys-log-messages,说明写入成功了

[root@host2 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.148.141:2181 --list

__consumer_offsets

qianfeng

sys-log-messages

[root@host2 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.148.141:2181 --topic sys-log-messages

Topic:sys-log-messages PartitionCount:1 ReplicationFactor:2 Configs:

Topic: sys-log-messages Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2

配置第二台logstash,订阅kafka日志,输出到es集群

# cat kafka-es.conf

input {
	kafka {
		bootstrap_servers => "192.168.148.141:9092,192.168.148.142:9092,192.168.148.143:9092" 
    	topics => "sys-log-messages"          #kafka主题名称
      codec => "json"
    	auto_offset_reset => "earliest"
	}
}

output {
    elasticsearch {
       hosts => ["192.168.148.131:9200","192.168.148.132:9200"]
       index => "kafka-%{type}-%{+YYYY.MM.dd}"
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/978424.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

VUE向外暴露文件,并通过本地接口调用获取,前端自己生成接口获取public目录里面的文件

VUE中,如果我们想对外暴露一个文件,可以在打包之后也能事实对其进行替换,我们只需要把相关文件放置在public目录下即可,可以放置JSON,Excel等文件 比如我在这里放置一个other文件 我们可以直接在VUE中使用axios去获取…

ubuntu-24.04.1-desktop 中安装 QT6.7

ubuntu-24.04.1-desktop 中安装 QT6.7 1 环境准备1.1 安装 GCC 和必要的开发包:1.2 Xshell 连接 Ubuntu2 安装 Qt 和 Qt Creator:2.1 下载在线安装器2.2 在虚拟机中为文件添加可执行权限2.3 配置镜像地址运行安装器2.4 错误:libxcb-xinerama.so.0: cannot open shared objec…

应用的负载均衡

概述 负载均衡(Load Balancing) 调度后方的多台机器,以统一的接口对外提供服务,承担此职责的技术组件被称为“负载均衡”。 负载均衡器将传入的请求分发到应用服务器和数据库等计算资源。负载均衡是计算机网络中一种用于优化资源利…

Linux: 已占用接口

Linux: 已占用接口 1. netstat(适用于旧系统)1.1 书中对该命令的介绍 2. ss(适用于新系统,替代 netstat)3. lsof(查看详细进程信息)4. fuser(快速查找占用端口的进程)5. …

组件注册方式、传递数据

组件注册 一个vue组件要先被注册,这样vue才能在渲染模版时找到其对应的实现。有两种注册方式:全局注册和局部注册。(组件的引入方式) 以下这种属于局部引用。 组件传递数据 注意:props传递数据,只能从父…

使用DeepSeek/chatgpt等AI工具辅助网络协议流量数据包分析

随着deepseek,chatgpt等大模型的能力越来越强大,本文将介绍一下deepseek等LLM在分数流量数据包这方面的能力。为需要借助LLM等大模型辅助分析流量数据包的同学提供参考,也了解一下目前是否有必要继续学习wireshark工具以及复杂的协议知识。 pcap格式 目…

蓝桥杯嵌入式客观题以及解释

第十一届省赛(大学组) 1.稳压二极管时利用PN节的反向击穿特性制作而成 2.STM32嵌套向量终端控制器NVIC具有可编程的优先等级 16 个 3.一个功能简单但是需要频繁调用的函数,比较适用内联函数 4.模拟/数字转换器的分辨率可以通过输出二进制…

《Mycat核心技术》第17章:实现MySQL的读写分离

作者:冰河 星球:http://m6z.cn/6aeFbs 博客:https://binghe.gitcode.host 文章汇总:https://binghe.gitcode.host/md/all/all.html 星球项目地址:https://binghe.gitcode.host/md/zsxq/introduce.html 沉淀&#xff0c…

虚拟机 | Ubuntu 安装流程以及界面太小问题解决

文章目录 前言一、Ubuntu初识二、使用步骤1.下载ubuntu镜像2.创建虚拟机1、使用典型(节省空间)2、稍后安装方便配置3、优选Linux版本符合4、浏览位置,选择空间大的磁盘 6、 配置信息,选择镜像7、 启动虚拟机,执行以下步…

2025系统架构师(一考就过):案例之三:架构风格总结

软件架构风格是描述某一特定应用领域中系统组织方式的惯用模式,按照软件架构风格,物联网系统属于( )软件架构风格。 A:层次型 B:事件系统 C:数据线 D:C2 答案:A 解析: 物联网分为多个层次&#xff0…

ubuntu离线安装Ollama并部署Llama3.1 70B INT4

文章目录 1.下载Ollama2. 下载安装Ollama的安装命令文件install.sh3.安装并验证Ollama4.下载所需要的大模型文件4.1 加载.GGUF文件(推荐、更容易)4.2 加载.Safetensors文件(不建议使用) 5.配置大模型文件 参考: 1、 如…

算法-数据结构(图)-DFS深度优先遍历

深度优先遍历(DFS)是一种用于遍历或搜索图的算法。以下是对它的详细介绍: 1. 定义 基本思想:从图中某个起始顶点出发,沿着一条路径尽可能深地访问图中的顶点,直到无法继续前进(即到达一个没…

uni-app集成sqlite

Sqlite SQLite 是一种轻量级的关系型数据库管理系统(RDBMS),广泛应用于各种应用程序中,特别是那些需要嵌入式数据库解决方案的场景。它不需要单独的服务器进程或系统配置,所有数据都存储在一个单一的普通磁盘文件中&am…

python文件的基本操作,文件读写

1.文件 1.1文件就是存储在某种长期存储设备上的一段数据 1.2文件操作 打开文件-->读写文件-->关闭文件 注意:可以只打开和关闭文件不进行任何操作 1.3文件对象的方法 1.open():创建一个file对象,默认以只读模式打开 2.read(n):n表示从文件中…

半导体晶圆精控:ethercat转profient网关数据提升制造精度

数据采集系统通过网关连接离子注入机,精细控制半导体晶圆制造过程中的关键参数。 在半导体制造中,晶圆制造设备的精密控制是决定产品性能的关键因素。某半导体工厂采用耐达讯Profinet转EtherCAT协议网关NY-PN-ECATM,将其数据采集系统与离子注…

双臂机器人的动力学建模

双臂机器人的动力学建模是研究机器人在运动过程中的力学行为和动力学特性,主要目的是确定在给定的控制指令下,机器人各个关节或末端执行器所受的力与加速度之间的关系。建立动力学模型通常涉及以下几个步骤: 1. 定义机器人坐标系和关节空间 双…

驱动开发系列39 - Linux Graphics 3D 绘制流程(二)- 设置渲染管线

一:概述 Intel 的 Iris 驱动是 Mesa 中的 Gallium 驱动,主要用于 Intel Gen8+ GPU(Broadwell 及更新架构)。它负责与 i915 内核 DRM 驱动交互,并通过 Vulkan(ANV)、OpenGL(Iris Gallium)、或 OpenCL(Clover)来提供 3D 加速。在 Iris 驱动中,GPU Pipeline 设置 涉及…

中国的Cursor! 字节跳动推出Trae,开放Windows版(附资源),开发自己的网站,内置 GPT-4o 强大Al模型!

Trae是什么 Trae 是字节跳动推出的免费 AI IDE,通过 AI 技术提升开发效率。支持中文,集成了 Claude 3.5 和 GPT-4 等主流 AI 模型,完全免费使用。Trae 的主要功能包括 Builder 模式和 Chat 模式,其中 Builder 模式可帮助开发者从…

【洛谷排序算法】P1012拼数-详细讲解

洛谷 P1012 拼数这道题本身并非单纯考察某种经典排序算法(如冒泡排序、选择排序、插入排序、快速排序、归并排序等)的实现,而是在排序的基础上,自定义了排序的比较规则,属于自定义排序类型的题目。不过它借助了标准库中…

阿里云可观测全面拥抱 OpenTelemetry 社区

作者:古琦 在云计算、微服务、容器化等技术重塑 IT 架构的今天,系统复杂度呈指数级增长。在此背景下,开源可观测性技术已从辅助工具演变为现代 IT 系统的"数字神经系统",为企业提供故障预警、性能优化和成本治理的全方…