【Linux】Kafka部署

1、部署

# 解压
[appuser@localhost app]$ tar -zxvf kafka_2.13-3.0.0.tgz

# 修改配置文件(具体配置见下文)
[appuser@localhost app]$ cd kafka_2.13-3.0.0
[appuser@localhost kafka_2.13-3.0.0]$ vim config/server.properties 

# 在所有集群节点启动Kafka
[appuser@localhost kafka_2.13-3.0.0]$ bin/kafka-server-start.sh -daemon config/server.properties

# 创建topic
[appuser@localhost kafka_2.13-3.0.0]$ bin/kafka-topics.sh --bootstrap-server 10.20.10.1:9092 --create --topic test --partitions 3 --replication-factor 3

# 查询Topic
[appuser@localhost kafka_2.13-3.0.0]$ bin/kafka-topics.sh --bootstrap-server [2106:410:a10::911]:9092 --list

# 查询消费组
#[appuser@localhost kafka_2.13-3.0.0]$ bin/kafka-consumer-groups.sh --bootstrap-server [2106:410:a10::911]:9092 --list

2、server.properties

kafka_2.13-3.0.0/config/server.properties

############################# Server Basics #############################
#必须为每个代理设置一个唯一的整数
broker.id=1	

############################# Socket Server Settings #############################
#套接字服务器侦听的地址。如果未配置,它将从 java.net.InetAddress.getCanonicalHostName()获取。
#listeners=PLAINTEXT://10.200.34.5:9092
listeners=PLAINTEXT://[2106:410:a10::911]:9092  

#代理生产和消费的主机端口。如果未设置,则在配置时使用“listeners”的值。否则,它将使用从java.net.InetAddress.getCanonicalHostName()返回的值。
# advertised.listeners=PLAINTEXT://10.20.10.1:9092	
advertised.listeners=PLAINTEXT://[2106:410:a10::911]:9092   

#服务器用于从网络接收请求并向网络发送响应的线程数(默认是3),接收线程会将接收到的消息放到内存中,然后再从内存中写入磁盘。
num.network.threads=3

#消息从内存中写入磁盘是时候使用的线程数量(默认是8)
num.io.threads=8                                               

#套接字服务器使用的发送缓冲区(SO_SNDBUF)       
socket.send.buffer.bytes=102400

#套接字服务器使用的接收缓冲区(SO_RCVBUF)
socket.receive.buffer.bytes=102400

#套接字服务器将接受的请求的最大大小(防止OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################
#日志
log.dirs=/app/kafka_2.13-3.0.0/logs

#每个topic的默认日志分区数。更多的分区允许更大的并行性以供使用,但这也会导致代理之间有更多的文件,建议broker少的话,默认就几个broker 就设置成几个分区
num.partitions=1 

#在启动时用于日志恢复和在关闭时刷新的每个数据目录的线程数。对于数据目录位于RAID阵列中的安装,建议增加此值。
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
#“__consumer_offsets”和“__transaction_state” topic内部的组元数据的复制因子。对于除开发测试之外的任何其他内容,建议使用大于1的值以确保可用性
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

#是否允许自动创建 Topic。
# auto.create.topics.enable=false
#是否允许 Unclean Leader 选举。
# unclean.leader.election.enable=false
#是否允许定期进行 Leader 选举。
auto.leader.rebalance.enable=false

############################# Log Flush Policy (日志刷新政策) #############################  
#消息立即写入文件系统,但默认情况下我们只有fsync()才能同步
#懒惰的操作系统缓存。以下配置控制将数据刷新到磁盘。
#这里有一些重要的权衡:
#1。持久性:如果您不使用复制,则可能会丢失未刷新的数据。
#2。延迟:当刷新确实发生时,非常大的刷新间隔可能会导致延迟峰值,因为会有大量数据需要刷新。
#3。吞吐量:冲洗通常是最昂贵的操作,并且小的冲洗间隔可能导致过多的搜索。
#以下设置允许配置刷新策略以在一段时间后刷新数据或每N条消息(或两者)。这可以在全局范围内完成,并在每个主题的基础上进行覆盖。

#强制刷新数据到磁盘之前要接受的消息数
#log.flush.interval.messages=10000

#强制刷新之前消息可以在日志中停留的最长时间
#log.flush.interval.ms=1000

############################# Log Retention Policy(日志保留政策) #############################
#以下配置控制日志段的处理。政策可以设置为在一段时间后或在累积给定大小后删除段。只要满足这些条件*either*,就会删除一个段。删除总是发生从日志的末尾开始。

# segment文件保留的最长时间,默认保留7天(168小时),超时将被删除,也就是说7天之前的数据将被清理掉
log.retention.hours=168

# 日志的基于大小的保留策略。消息数量大于改值将删除最旧的数据。函数独立于log.retenation.h。
log.retention.bytes=1073741824

#Kafka 调用cleanupLogs的时间间隔 ,该函数对所有 Logs 执行清理操作,(目前不确定 Logs 对应的是 Topic 还是 Partition,目测应当是 Partition)
log.cleanup.interval.mins=2

# 日志段文件的最大大小。达到此大小时,将创建新的日志段。
log.segment.bytes=104857600

#检查日志段以查看是否可以删除日志段的时间间隔
log.retention.check.interval.ms=300000


############################# Zookeeper #############################
#zookeeper集群的地址,可以是多个,多个之间用逗号分割
zookeeper.connect=10.20.11.1:2181,10.20.11.2:2181,10.20.11.3:2181 

#设置连接Zeekeeper超时时间
zookeeper.connection.timeout.ms=6000 


############################# Group Coordinator Settings(组协调员设置) #############################   
#以下配置指定GroupCoordinator将延迟初始消费者重新平衡的时间(以毫秒为单位)。当新成员加入组时,重新平衡将进一步延迟group.initial.rebalance.delay.ms的值,最多为max.poll.interval.ms。默认值为3秒。我们将此覆盖为0,因为它为开发和测试提供了更好的开箱即用体验。但是,在生产环境中,默认值3秒更合适,因为这有助于避免在应用程序启动期间不必要且可能很昂贵的重新平衡。
group.initial.rebalance.delay.ms = 0s

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/904733.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

微服务设计模式 - 断路器模式 (Circuit Breaker Pattern)

微服务设计模式 - 断路器模式 (Circuit Breaker Pattern) 定义 断路器模式(Circuit Breaker Pattern)是云计算和微服务架构中的一种保护性设计模式,其目的是避免系统中的调用链出现故障时,导致系统瘫痪。通过断路器模式&#xff…

Yelp 数据集进行用户画像, 使用聚类做推荐

使用 Yelp 数据集进行用户画像(User Profiling)是一项有趣的任务,可以理解用户的偏好、行为和特征。以下是总结的一个基本的步骤,帮助构建用户画像 pandas 加载数据: import pandas as pd# 加载数据 users pd.read_…

DDRPHY数字IC后端设计实现系列专题之后端设计导入,IO Ring设计

本章详细分析和论述了 LPDDR3 物理层接口模块的布图和布局规划的设计和实 现过程,包括设计环境的建立,布图规划包括模块尺寸的确定,IO 单元、宏单元以及 特殊单元的摆放。由于布图规划中的电源规划环节较为重要, 影响芯片的布线资…

前端路由如何从0开始配置?vue-router 的使用

在 Web 开发中,路由是指根据 URL 的不同部分将请求分发到不同的处理函数或页面的过程。路由是单页应用(SPA, Single Page Application)和服务器端渲染(SSR, Server-Side Rendering)应用中的一个重要概念。 在开发中如何…

强化学习的数学原理-06随即近似理论和随机梯度下降

文章目录 Robbins-Monro algorithmStochastic gradient descentBGD、MBGD、 and SGDSummary Robbins-Monro algorithm 迭代式求平均数的算法 S t o c h a s t i c a p p r o x i m a t i o n ( S A ) Stochastic \; approximation \;(SA) Stochasticapproximation(SA)&#xf…

Apache Hive 通过Docker快速入门

QuickStarted 介绍 在伪分布式模式下在 docker 容器内运行 Apache Hive,以便为 Hive 提供以下快速启动/调试/准备测试环境 快速入门 步骤 1:拉取镜像 从 DockerHub 拉取镜像:https://hub.docker.com/r/apache/hive/tags。以下是最新的镜像…

【K8S系列】Kubernetes 中 NodePort 类型的 Service 无法访问的问题【已解决】

在 Kubernetes 中,NodePort 类型的 Service 允许用户通过每个节点的 IP 地址和指定的端口访问应用程序。如果 NodePort 类型的 Service 无法通过节点的 IP 地址和指定端口进行访问,可能会导致用户无法访问应用。本文将详细分析该问题的常见原因及其解决方…

逻辑卷动态扩容与缩容-----

一、创建逻辑卷 需求:创建一个2.5G大小的逻辑卷 思路: 1. 物理的设备 2. 将物理设备做成物理卷 pv 3. 创建卷组并将物理卷加入其中 vg 4. 创建逻辑卷 lv 5. 格式化逻辑卷 mkfs.ext4 6. 挂载使用 mount 步骤: 1. 物理设备【如何来分区】…

开关灯问题(c语言)

样例&#xff1a;10 10 &#xff0c;输出&#xff1a;1&#xff0c;4&#xff0c;9 5 5 &#xff0c;输出&#xff1a;1&#xff0c;4 代码如下 #include<stdio.h> //引入bool值的概念 #include<stdbool.h> int main() {int n 0;//n为灯的数量int m 0;…

扫雷游戏(C语言详解)

扫雷游戏&#xff08;C语言详解&#xff09; 放在最前面的1、前言&#xff08;扫雷游戏的简介&#xff09;2、扫雷游戏的规则&#xff08;简易版&#xff09;3、代码实现&#xff08;3.1&#xff09;提醒一下&#xff1a;( i ) 提醒1&#xff1a;( ii ) 提醒2&#xff1a; &…

在面试了些外包以后,我有了些自己的思考

大家好&#xff0c;我是洋子&#xff0c;最近公司在降本增效&#xff0c;需要把外包从北京迁移到陕西的某新一线城市&#xff0c;其实就是变相裁员&#xff0c;减少外包的成本&#xff0c;裁掉现有的员工&#xff0c;重新招聘新人 在整个测试行业&#xff0c;外包测试的比重是…

论文 | Ignore Previous Prompt: Attack Techniques For Language Models

这篇论文探讨了针对大型语言模型&#xff08;LLM&#xff09;的“提示注入”攻击&#xff0c;并提出了一种名为 PROMPTINJECT 的框架来研究这类攻击。 论文的主要内容包括&#xff1a;1. 提示注入攻击&#xff1a; 论文定义了“提示注入”的概念&#xff0c;即通过在用…

Django-中间件

定义&#xff1a; 编写中间件&#xff1a; 注册中间件&#xff1a; 添加中间件&#xff1a; 1.在项目目录下添加一个文件夹&#xff08;名字随意&#xff09;&#xff0c;然后文件夹下创建.py文件 2.将中间件添加到setting文件中 MIDDLEWARE [django.middleware.security.Se…

MBR20100CT-ASEMI半塑封肖特基二极管MBR20100CT

编辑&#xff1a;ll MBR20100CT-ASEMI半塑封肖特基二极管MBR20100CT 型号&#xff1a;MBR20100CT 品牌&#xff1a;ASEMI 封装&#xff1a;TO-220 安装方式&#xff1a;插件 批号&#xff1a;最新 最大平均正向电流&#xff08;IF&#xff09;&#xff1a;20A 最大循环…

操作数据表

创建表 创建表语法&#xff1a; CREATE TABLE table_name ( field1 datatype [COMMENT 注释内容], field2 datatype [COMMENT 注释内容], field3 datatype ); 注意&#xff1a; 1. 蓝色字体为关键字 2. CREATE TABLE 是创建数据表的固定关键字&#xff0c;表…

一、ARMv8寄存器之通用、状态、特殊寄存器

ARMV8核心寄存器数量是非常大的&#xff0c;为了更好的学习&#xff0c;可以划分为以下几大类&#xff1a; 通用寄存器。这类寄存器主要是用来暂存数据和参与运算。通过load\store指令操作。状态寄存器。AArch64体系结构使用PSTATE寄存器表示当前处理器状态。特殊寄存器。有专门…

WPF+MVVM案例实战(六)- 自定义分页控件实现

文章目录 1、项目准备2、功能实现1、分页控件 DataPager 实现2、分页控件数据模型与查询行为3、数据界面实现 3、运行效果4、源代码获取 1、项目准备 打开项目 Wpf_Examples&#xff0c;新建 PageBarWindow.xaml 界面、PageBarViewModel.cs ,在用户控件库 UserControlLib中创建…

【Docker】构建Linux云桌面环境

目录 一、说明 二、离线安装Docker 1&#xff09;将下载的包上传到服务器上去 2&#xff09;安装docker 3) 启动docker 4&#xff09;配置加速器 三、安装云桌面镜像 四、启动云桌面 方式一&#xff1a;docker命令直接运行 方式二&#xff1a;docker-compose方式 五…

Easysearch 与 LLM 融合打造知识库系统

文章目录 一、LangChain 简介二、RAG 产生的背景及其局限性三、RAG 工作流程四、 Easysearch 结合 LLM 实现 RAG&#xff08;1&#xff09;Easysearch 简介&#xff08;2&#xff09;结合实现RAG 五、 Easysearch 结合 LLM 实现 RAG 的优势&#xff08;1&#xff09;提高检索准…

驱动-----adc

在key1.c的基础上进行对adc1.c进行编写 首先将文件里面的key全部改为adc 再修改一下设备号 按键和adc的区别是什么,按键只需要按一下就触发了,并且不需要返回一个值出来, adc要初始化,启动,返回值 以下是裸机adc的代码: #include <s3c2440.h> #include "ad…