kafka(一)——简介

简介

Kafka 是一种分布式、支持分区、多副本的消息中间件,支持发布-订阅模式,多用于实时处理大量数据缓存的场景,类似于一个“缓存池”。

架构

在这里插入图片描述

  • Producer:消息生产者;
  • Consumer:消息消费者;
  • Broker:一台kafka服务器也称作一个broker,kafka集群包含多个broker;
  • Topic:一个topic为一个消息队列,生产者、消费者基于topic进行发布-订阅;
  • Partition:消息分区,一个topic可以分为多个partition,每个partition是一个消息队列;
  • Replica:副本。一个topic的每个分区都有若干个副本,一个Leader和若干个Follower
  • Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader;
  • Follower:每个分区多个副本中的“从”,实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower会成为新的Leader;

安装部署

  1. 下载kafka安装包
# 下载地址,取最新包即可,最新版本为kafka_2.13-3.6.1.tgz
https://kafka.apache.org/downloads
  1. 解压
tar -zxvf kafka_2.13-3.6.1.tgz

mv kafka_2.13-3.6.1 kafka

cd kafka/
  1. 修改配置文件

    1)本次集群配置采用kraft模式,集群节点为192.168.1.12~14;

    2)在3个节点解压后,进入kafka/config/kraft目录,修改server.properties文件:

    • 192.168.1.12节点配置文件修改如下
    "node.id=1"
    "controller.quorum.voters=1@192.168.1.12:9093,1@192.168.1.13:9093,3@192.168.1.14:9093"
    "advertised.listeners=PLAINTEXT://192.168.1.12:9092"
    
    • 192.168.1.13节点配置文件修改如下
    "node.id=2"
    "controller.quorum.voters=1@192.168.1.12:9093,1@192.168.1.13:9093,3@192.168.1.14:9093"
    "advertised.listeners=PLAINTEXT://192.168.1.13:9092"
    
    • 192.168.1.14节点配置文件修改如下
    "node.id=3"
    "controller.quorum.voters=1@192.168.1.12:9093,1@192.168.1.13:9093,3@192.168.1.14:9093"
    "advertised.listeners=PLAINTEXT://192.168.1.14:9092"
    

    节点说明:

    # Broker节点
    Kafka集群中的数据节点(消息队列),它们负责接收客户端的消息和传递消息给客户端,默认情况下,每个Broker节点会监听9092端口,该端口用于与客户端进行通信,客户端可以将消息发送到这个端口,或者从这个端口接收消息,这个端口可以称作客户端通信端口。
    
    # Controller节点
    Kafka集群中的控制器节点,负责管理集群的状态和元数据,Controller节点监听的端口通常是9093,该端口用于集群中其他节点获取元数据或在混合节点选举新的Controller时进行通信,通过该端口,其他节点可以与Controller节点交互,获取集群的元数据信息或参与控制器的选举过程,这个端口可以称作控制器端口。
    
    # 混合节点
    同时担任Broker和Controller角色的节点,这两个端口都会被使用,默认情况下混合节点将监听9092端口接收和传递消息给客户端,并监听9093端口用于与其他节点进行元数据交换和控制器选举通信,可见混合节点会同时使用两个端口分别作为客户端通信端口与控制器端口。
    
  2. 生成集群id

# KRaft模式的kafka集群需要设定一个id,在任意节点执行如下命令:
sh /usr/local/incvs-kafka/bin/kafka-storage.sh random-uuid
  1. 格式化数据目录
# 在集群所有节点执行如下命令:
sh /home/kafka/bin/kafka-storage.sh format -t "步骤4)生成的id" -c /home/kafka/config/kraft/server.properties
  1. 启动Kakfa
# 在3个节点依次执行如下命令:
sh /home/kafka/bin/kafka-server-start.sh /home/kafka/config/kraft/server.properties

测试

  • 在192.168.1.13节点启动消费者脚本订阅“MykafkaTopic”
sh /home/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.13:9092 --topic MykafkaTopic
hello kafka
  • 在192.168.1.12节点启动生产者脚本往topic生产消息
sh /home/kafka/bin/kafka-console-producer.sh --bootstrap-server 192.168.1.12:9092 --topic MykafkaTopic
>hello kafka
>

配置文件说明

server.properties配置示例:

############################# Server Basics #############################

# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller

# The node id associated with this instance's roles
node.id=1

# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9093

############################# Socket Server Settings #############################

# The address the socket server listens on.
# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.
# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),
# with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092,CONTROLLER://:9093

# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092

# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kraft-combined-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
  • process.roles:服务器角色,包含broker和controller。如示例中使用kraft模式使用的混合模式,即“process.roles=broker,controller”;
  • node.id:节点ID,一个集群中的node.id不能重复;
  • controller.quorum.voters:控制选举的投票节点,格式为“node-id@host:port”;
  • listeners:服务器监听的地址,格式为“listener_name://host_name:port”;
  • inter.broker.listener.name:用于broker之间通信的监听器名称;
  • advertised.listeners:服务器向客户端宣告的监听器名称、主机名和端口;
  • controller.listener.names:控制器使用的监听器名称列表;
  • listener.security.protocol.map:监听器名称到安全协议的映射。默认情况下,它们是相同的;
  • num.network.threads:服务器用于从网络接收请求和向网络发送响应的线程数;
  • num.io.threads:服务器用于处理请求(可能包括磁盘 I/O)的线程数;
  • socket.send.buffer.bytes:服务器用于发送数据的缓冲区大小;
  • socket.receive.buffer.bytes:服务器用于接收数据的缓冲区大小;
  • socket.request.max.bytes:服务器接受的请求的最大大小(用于防止内存溢出);
  • log.dirs:用于存储日志文件的目录列表;(重要,需清楚日志存储的原理)
  • num.partitions:每个主题的默认日志分区数;
  • num.recovery.threads.per.data.dir:每个数据目录在启动时用于日志恢复和关闭时用于刷新的线程数;
  • offsets.topic.replication.factor:内部主题 “__consumer_offsets” 和 “__transaction_state” 的复制因子;
  • transaction.state.log.replication.factor:事务状态日志的复制因子;
  • transaction.state.log.min.isr:事务状态日志的最小同步副本数;
  • log.flush.interval.messages:强制将数据刷新到磁盘之前接受的消息数;
  • log.flush.interval.ms:消息在日志中停留的最大时间,超过这个时间就会强制刷新到磁盘;
  • log.retention.hours:由于年龄而使日志文件有资格被删除的最小年龄;
  • log.retention.bytes:基于大小的日志保留策略,默认1G;
  • log.segment.bytes:日志段文件的最大大小;
  • log.retention.check.interval.ms:检查日志段是否可以根据保留策略被删除的间隔;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/336512.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Go】rune和byte类型的认识与使用

【Go】rune和byte类型的认识与使用 大家好 我是寸铁👊 总结了一篇rune和byte类型的认识与使用的文章✨ 喜欢的小伙伴可以点点关注 💝 byte和rune类型定义 byte,占用1个字节,共8个比特位,所以它实际上和uint8没什么本质区别,它表示…

Windows11开启SSH服务

文章目录 环境背景安装SSH服务配置SSH服务启动/停止SSH服务登录接下来参考 环境 Windows 11 家庭中文版 背景 在我的Windows电脑上,有些网站无法直接访问,所以需要通过Mac来代理。为此,需要创建一个位于Windows和Mac之间的SSH tunnel。 在…

【C++】vector容器接口要点的补充

接口缩容 在VS编译器的模式下&#xff0c;类似于erase和insert接口的函数通常会进行缩容&#xff0c;因此&#xff0c;insert和erase行参中的迭代器可能会失效。下图中以erase为例&#xff1a; 代码如下&#xff1a; #include <iostream> #include <vector> #inclu…

如何本地部署虚VideoReTalking

环境&#xff1a; Win10专业版 VideoReTalking 问题描述&#xff1a; 如何本地部署虚VideoReTalking 解决方案&#xff1a; VideoReTalking是一个强大的开源AI对嘴型工具&#xff0c;它是我目前使用过的AI对嘴型工具中效果最好的一个&#xff01;它是由西安电子科技大学、…

MATLAB聚类工具箱

本文借鉴了数学建模清风老师的课件与思路&#xff0c;可以点击查看链接查看清风老师视频讲解&#xff1a;【1】MATLAB聚类工具箱&#xff1a;提前预览工具箱的核心功能_哔哩哔哩_bilibili 关于工具箱的获取&#xff0c;在数学建模学习交流公众号里发送&#xff1a; 567891 %% …

idea 安装免费Ai工具 codeium

目录 概述 ide安装 使用 chat问答 自动写代码 除此外小功能 概述 这已经是我目前用的最好免费的Ai工具了&#xff0c;当然你要是有钱最好还是用点花钱的&#xff0c;比如copilot&#xff0c;他可以在idea全家桶包括vs&#xff0c;还有c/c的vs上运行&#xff0c;还贼强&am…

数据加密-mysql

想要实现数据加密可以在mysql数据库表实现数据加密&#xff0c;来确保数据安全。下面就是加密算法AES_ENCRYPT&#xff0c;其他加密算法类似。 1 创建一张临时表test_table CREATE TABLE test_table (name varchar(20) DEFAULT NULL,id blob ) ENGINEInnoDB DEFAULT CHARSETu…

【JavaEE】_基于UDP实现网络通信

目录 1. 服务器 1.1 实现逻辑 1.2 代码 1.3 部分代码解释 2. 客户端 2.1 实现逻辑 2.2 代码 2.3 客户端部分代码解释 3. 程序运行结果 4. 服务器客户端交互逻辑 此篇内容为实现UDP版本的回显服务器echo server&#xff1b; 普通服务器&#xff1a;收到请求&#xff…

领略指针之妙

&#x1d649;&#x1d65e;&#x1d658;&#x1d65a;!!&#x1f44f;&#x1f3fb;‧✧̣̥̇‧✦&#x1f44f;&#x1f3fb;‧✧̣̥̇‧✦ &#x1f44f;&#x1f3fb;‧✧̣̥̇:Solitary-walk ⸝⋆ ━━━┓ - 个性标签 - &#xff1a;来于“云”的“羽球人”。…

【高等数学之极限】

一、引言 我们先思考一下&#xff0c;上面三个表达式&#xff0c;是否可以将极限值直接代入求值&#xff0c;我们在计算之前需要先分析一下&#xff0c;如果将极限值代入&#xff0c;那么表达式将会变成什么形式? 经过上面的分析&#xff0c;我们发现第一个式子可以直接带入&a…

微电网优化MATLAB:遗传算法(Genetic Algorithm,GA)求解微电网优化(提供MATLAB代码)

一、微网系统运行优化模型 微电网优化是指通过对微电网系统中各个组件的运行状态进行监测和调节&#xff0c;以实现微电网系统的高效运行和能源利用的最大化。微电网是由多种能源资源&#xff08;如太阳能、风能、储能等&#xff09;和负载&#xff08;如建筑、工业设备等&…

通用人工智能的能力评估框架-Levels of AGI Operationalizing Progress on the Path to AGI

通用人工智能的能力评估框架-Levels of AGI: Operationalizing Progress on the Path to AGI 译自’Levels of AGI: Operationalizing Progress on the Path to AGI’&#xff0c;有所删节.笔者能力有限&#xff0c;敬请勘误。 摘要 Google DeepMind提出一种针对通用人工智能 …

端口映射的定义、特点、场景、实例、常见问题回答(Port Mapping)

目 录 一、端口映射&#xff08;Port Mapping&#xff09; 二、端口映射应用场景&#xff08;什么时候用到端口映射&#xff09; &#xff08;一&#xff09;、使用端口映射的条件 &#xff08;二&#xff09;使用端口映射的具体场景 三、端口映射技术的特点 …

LLM之RAG实战(十七)| 高级RAG:通过使用LlamaIndex重新排序来提高检索效率

基本RAG的检索是静态的&#xff0c;会检索到固定数字&#xff08;k&#xff09;个相关文档&#xff0c;而如果查询需要更多的上下文&#xff08;例如摘要&#xff09;或更少的上下文&#xff0c;该怎么办&#xff1f; 可以通过在以下两个阶段来实现动态检索&#xff1a; 预检索…

Python对Excel文件中不在指定区间内的数据加以去除的方法

本文介绍基于Python语言&#xff0c;读取Excel表格文件&#xff0c;基于我们给定的规则&#xff0c;对其中的数据加以筛选&#xff0c;将不在指定数据范围内的数据剔除&#xff0c;保留符合我们需要的数据的方法。 首先&#xff0c;我们来明确一下本文的具体需求。现有一个Exc…

Centos 7 单机部署 consul

一、下载安装 参考官网文档 Install | Consul | HashiCorp Developer 进入Centos 执行下面命令 sudo yum install -y yum-utils sudo yum-config-manager --add-repo https://rpm.releases.hashicorp.com/RHEL/hashicorp.repo sudo yum -y install consul 这种方法安装完成…

Javascript简介(全部是基础)

js初识 js是一种解释性语言&#xff0c;不需要编译&#xff0c;直接由浏览器解析执行 组成 ECMAScript是一种开放的&#xff0c;被国际上广为接收的&#xff0c;标准的脚本语言规范&#xff0c;主要描述&#xff1a;语法&#xff0c;变量&#xff0c;数据类型&#xff0c;运算…

什么是DOM?(JavaScript DOM是什么?)

1、DOM简洁 DOM是js中最重要的一部分&#xff0c;没有DOM就不会通过js实现和用户之间的交互。 window是最大的浏览器对象&#xff0c;在它的下面还有很多子对象&#xff0c;我们要学习的DOM就是window对象下面的document对象 DOM&#xff08;Document Object Model&#xff09…

C++ 学习系列 -- std::function 与 std::bind

一 std::function 与 std::bind 的介绍 1. std::function std::function 是 c 11 的新特性 &#xff0c;包含在头文件<functional>中&#xff0c;为了更方便的调用函数而引入。 std::function 是一个函数包装器&#xff08;function wrapper&#xff09;&#xff0c;…

Cmake(1)——Cmake的基本介绍和原理、Cmake的安装、如何使用Cmake构建项目

Cmake的基本介绍和原理、Cmake的安装、如何使用Cmake构建项目 插播&#xff01;插播&#xff01;插播&#xff01;亲爱的朋友们&#xff0c;我们的Cmake课程上线啦&#xff01;感兴趣的小伙伴可以去下面的链接学习哦~ https://edu.csdn.net/course/detail/39261 1、Cmake的基…