在k8s中部署Kafka高可用集群超详细讲解

🐇明明跟你说过:个人主页

🏅个人专栏:《数据流专家:Kafka探索》🏅

🔖行路有良友,便是天堂🔖

目录

一、引言

1、Kafka简介

2、为什么在Kubernetes中部署Kafka

二、Kubernetes基础 

1、Kubernetes概述

2、Pods、Services、Deployments等基本概念

三、Kafka集群架构

1、Kafka集群的组成与工作原理

2、Kafka集群的高可用性设计

四、准备部署环境 

1、准备k8s集群

2、准备StorageClass 

3、准备部署Kafka集群所需的镜像 

五、部署Kafka集群

1、部署Zookeeper集群

2、部署Kafka集群


一、引言

1、Kafka简介

阿帕奇Kafka(Apache Kafka)是一个开源的分布式流处理平台,由LinkedIn开发并于2011年开源,现在是Apache软件基金会的一部分。Kafka的设计初衷是为了处理实时数据流,具备高吞吐量、低延迟、高容错性和可扩展性的特点。

Kafka的核心概念


1. Producer(生产者):

  • 生产者是向Kafka主题(topic)发送消息的应用程序。它们负责将数据推送到Kafka集群。

2. Consumer(消费者):

  • 消费者是从Kafka主题中读取消息的应用程序。它们可以是单个消费者或消费者组(consumer group),每个消费者组中的消费者可以并行地读取数据。

3. Topic(主题):

  • 主题是Kafka中的一个逻辑分类,用于将消息进行分组。每个主题可以细分为多个分区(partition),每个分区内的消息是有序的,但不同分区之间没有顺序保证。

4. Partition(分区):

  • 分区是主题的一个子单元,每个分区可以存储大量消息,并且每个分区可以被多个消费者读取。分区使得Kafka能够水平扩展。

5. Broker(代理):

  • Broker是Kafka集群中的一个服务器节点,负责接收、存储和发送消息。一个Kafka集群可以包含多个broker,以提供高可用性和容错能力。

6. Zookeeper:

  • Kafka使用Zookeeper来进行分布式协调和管理集群状态。Zookeeper负责维护配置信息、分区的leader选举以及消费组的offset管理等。

Kafka的主要功能


1. 消息持久化:

  • Kafka将消息持久化到磁盘,并且允许设置保留策略,确保数据的可靠性。

2. 高吞吐量:

  • Kafka能够处理高吞吐量的数据流,适合处理大规模的数据流应用。

3. 低延迟:

  • Kafka的设计优化了数据传输路径,使得消息传输具有低延迟。

4. 扩展性:

  • Kafka的分区机制允许在集群中增加更多的broker和分区,从而实现水平扩展。

5. 容错性:

  • 通过复制机制,Kafka确保了数据的高可用性,即使个别节点故障,数据仍然能够恢复。

Kafka的应用场景


1. 日志收集和聚合:

  • 作为一种高效的日志收集系统,Kafka可以收集和聚合分布式系统的日志数据。

2. 实时数据流处理:

  • Kafka与流处理框架(如Apache Flink、Apache Storm等)结合,可以进行实时数据分析和处理。

3. 消息队列:

  • Kafka可以充当高吞吐量的消息队列系统,用于不同应用程序之间的解耦和异步通信。

4. 事件源:

  • Kafka可以用作事件源系统,捕获和存储所有变化事件,以便后续处理和回放。

2、为什么在Kubernetes中部署Kafka

  1. 弹性和可伸缩性:Kubernetes提供了强大的自动伸缩和调度功能,可以根据负载情况自动扩展Kafka集群的节点数量,以满足不同工作负载的需求。这种弹性能力使得Kafka能够更好地应对数据量的变化和突发性负载。
  2. 易于管理:Kubernetes提供了统一的管理接口和工具,简化了Kafka集群的部署、扩展、更新和监控。通过Kubernetes的控制面板,管理员可以轻松地管理Kafka集群的生命周期,而无需深入了解底层的部署细节。
  3. 高可用性:Kubernetes具有自动故障检测和恢复机制,可以在节点故障时自动重新调度Kafka的副本,确保数据的高可用性和持久性。
  4. 资源隔离:通过Kubernetes的命名空间和资源限制功能,可以实现Kafka集群与其他应用程序之间的资源隔离,避免因资源竞争导致的性能问题。

 
wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==编辑

二、Kubernetes基础 

1、Kubernetes概述

Kubernetes(K8s)是一个开源的容器编排平台,最初由Google开发并于2014年发布,现已成为Cloud Native Computing Foundation(CNCF)的一部分。Kubernetes旨在简化容器化应用程序的部署、管理和扩展,提供了丰富的功能和工具,使得用户可以更轻松地构建和运行分布式系统。

Kubernetes的主要功能


1. 自动化部署和扩展:

  • Kubernetes提供了丰富的调度和自动伸缩功能,可以根据负载情况自动调度和扩展应用程序的副本数量,以适应不同的工作负载需求。

2. 服务发现和负载均衡:

  • Kubernetes通过Service对象提供了统一的服务发现和负载均衡功能,使得应用程序可以通过简单的域名访问其他服务,并自动实现负载均衡。

3. 存储管理:

  • Kubernetes支持多种存储后端和存储卷类型,可以为应用程序提供持久化存储和共享存储功能,如PersistentVolume、PersistentVolumeClaim等。

4. 自我修复:

  • Kubernetes具有自我修复和健康检查机制,可以监控容器和节点的健康状态,并在出现故障时自动进行修复和恢复。

   

2、Pods、Services、Deployments等基本概念

1. Pods(容器组)


Pod是Kubernetes中最小的可部署单元,它可以包含一个或多个紧密相关的容器。这些容器共享相同的网络命名空间和存储卷,并且通常在同一节点上运行。Pod提供了一种逻辑主机的概念,使得容器之间可以共享资源和通信。Pod通常用于部署一个应用程序的一组相关容器。

2. Services(服务)


服务是Kubernetes中一种抽象,用于定义一组Pod的逻辑集合,并为它们提供统一的访问入口。服务通过标签选择器(Selector)将请求路由到对应的Pod,并实现负载均衡和服务发现功能。Kubernetes支持多种类型的服务,如ClusterIP、NodePort、LoadBalancer和ExternalName。

3. Deployments(部署)


部署是Kubernetes中用于管理Pod副本数量和应用程序版本更新的对象。部署定义了应用程序的期望状态,包括所需的副本数量、容器镜像和其他配置信息。部署控制器负责根据部署的定义创建、更新和删除Pod实例,同时提供滚动更新和回滚的功能。

三、Kafka集群架构

1、Kafka集群的组成与工作原理

Kafka集群由多个服务器节点(Broker)组成,每个Broker负责存储和处理一部分数据。Kafka的工作原理基于发布/订阅模式,其中生产者(Producer)将消息发布到一个或多个主题(Topic),而消费者(Consumer)订阅这些主题并从Broker拉取消息进行处理。

工作原理


1. 消息存储:

  • 生产者将消息发布到主题,并根据主题的分区策略选择将消息存储到对应的分区中。每个分区的消息按顺序存储,并根据消息的偏移量(Offset)进行标识。

2. 消息复制:

  • Kafka通过副本集机制实现数据的高可用性和持久性。每个分区都有一个主副本(Leader Replica)和多个副本(Follower Replica),主副本负责处理读写请求,而副本用于备份数据。当主副本故障时,Kafka会自动选举一个新的主副本来接管工作。

3. 消息传输:

  • 生产者将消息发送到Broker的主副本,Broker负责将消息复制到副本并进行持久化存储。消费者从Broker的主副本拉取消息进行处理,可以选择从不同的副本读取消息以实现负载均衡和故障恢复。

4. 消息保留策略:

  • Kafka支持设置消息的保留策略,包括基于时间、基于大小或基于日志段的策略。一旦消息达到保留期限或超过指定的大小限制,Kafka将自动删除过期消息,以释放存储空间。

   

2、Kafka集群的高可用性设计

1. 多副本复制

  • Kafka通过在多个Broker之间复制分区的副本来实现数据的冗余和高可用性。每个分区通常有多个副本,其中一个是主副本(Leader Replica),负责处理读写请求;其他副本是从副本(Follower Replica),用于备份数据。当主副本发生故障时,Kafka会自动选举一个新的主副本来接管工作,确保数据的可用性和一致性。

2. 自动故障检测与恢复

  • Kafka集群内置了自动故障检测和恢复机制,可以监控Broker和分区的健康状态,并在发现故障时自动进行故障转移和数据恢复。当Broker或分区发生故障时,Kafka会触发重新选举过程,选举出新的主副本并将数据复制到新的副本中,以实现快速的故障恢复。

3. 副本分布和均衡

  • Kafka会将分区的副本分布在不同的Broker上,以确保数据的冗余和均衡。通过将副本分布在不同的机架、数据中心或云区域中,可以提高系统的容错能力,即使整个机架或数据中心发生故障,系统仍然能够继续运行。

4. 水平扩展和动态伸缩

  • Kafka支持水平扩展和动态伸缩,可以根据负载情况和性能需求来增加或减少Broker的数量和容量。通过添加更多的Broker和分区,可以提高系统的吞吐量和容量,同时降低单点故障的风险,实现更高的可用性和可伸缩性。

   

四、准备部署环境 

1、准备k8s集群

这里我们使用的k8s集群版本为 1.23,也可以使用其他的版本,只是镜像导入命令不通,如果还未搭建k8s集群,请参考《在Centos中搭建 K8s 1.23 集群超详细讲解》这篇文章

2、准备StorageClass 

因为我们要对Kafka中的数据进行持久化,避免Pod漂移后数据丢失,保证数据的完整性与可用性

如果还未创建存储类,请参考《k8s 存储类(StorageClass)创建与动态生成PV解析,(附带镜像)》这篇文件。

3、准备部署Kafka集群所需的镜像 

 在k8s Node节点上执行以下命令

[root@node1 ~]# docker pull zookeeper:3.8
[root@node1 ~]# docker pull kafka:3.1.0
[root@node2 ~]# docker pull zookeeper:3.8
[root@node2 ~]# docker pull kafka:3.1.0
[root@node3 ~]# docker pull zookeeper:3.8
[root@node3 ~]# docker pull kafka:3.1.0

五、部署Kafka集群

1、部署Zookeeper集群

为什么部署kafka前要部署zookeeper?

Kafka依赖Zookeeper来实现分布式协调和配置管理。在Kafka集群中,Zookeeper扮演着多种角色,包括:

  1. 配置管理:Kafka集群的配置信息和元数据存储在Zookeeper中,包括主题(topics)、分区(partitions)、副本(replicas)等配置信息。
  2. Leader选举:Kafka的分区(partitions)被分布式存储在集群中的多个Broker上,每个分区都有一个Leader和多个Follower。Zookeeper负责Leader选举,确保每个分区都有一个活跃的Leader。
  3. Broker注册:Kafka Broker在启动时会向Zookeeper注册自己的信息,包括地址、ID等,以便其他Broker和客户端发现和连接。
  4. 健康监测:Zookeeper监控Kafka集群中各个节点的健康状态,并在节点出现故障或宕机时触发相应的处理操作。

因此,在部署Kafka之前,需要先部署Zookeeper,确保Kafka集群正常运行所需的分布式协调和配置管理功能可用。没有Zookeeper,Kafka无法正常运行,并且无法实现高可用性、数据一致性和故障容错等特性。

编写部署Zookeeper集群的YAML文件

[root@master ~]# vim zook.yaml
# 添加如下内容
apiVersion: v1
kind: Namespace
metadata:
  name: kafka
---
apiVersion: v1
kind: Service
metadata:
  name: zookeeper-cluster  #无头服务的名称,需要通过这个获取ip,与主机的对应关系
  namespace: kafka
  labels:
    app: zookeeper
spec:
  ports:
    - port: 2181
      name: zookeeper
    - port: 2188
      name: cluster1
    - port: 3888
      name: cluster2
  clusterIP: None
  selector:
    app: zookeeper
---
apiVersion: v1
kind: Service
metadata:
  name: zookeeper-nodeport-service-0
  namespace: kafka
spec:
  type: NodePort
  selector:
    statefulset.kubernetes.io/pod-name: zookeeper-0
  ports:
    - protocol: TCP
      port: 80        # Service 暴露的端口
      targetPort: 2181   # Pod 中容器的端口
      nodePort: 32181   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整
---
apiVersion: v1
kind: Service
metadata:
  name: zookeeper-nodeport-service-1
  namespace: kafka
spec:
  type: NodePort
  selector:
    statefulset.kubernetes.io/pod-name: zookeeper-1
  ports:
    - protocol: TCP
      port: 80        # Service 暴露的端口
      targetPort: 2181   # Pod 中容器的端口
      nodePort: 32182   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整
---
apiVersion: v1
kind: Service
metadata:
  name: zookeeper-nodeport-service-2
  namespace: kafka
spec:
  type: NodePort
  selector:
    statefulset.kubernetes.io/pod-name: zookeeper-2
  ports:
    - protocol: TCP
      port: 80        # Service 暴露的端口
      targetPort: 2181   # Pod 中容器的端口
      nodePort: 32183   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: zookeeper-config
  namespace: kafka
  labels:
    app: zookeeper
data:             #具体挂载的配置文件
  zoo.cfg: |+
    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/data
    dataLogDir=/datalog
    clientPort=2181
    server.1=zookeeper-0.zookeeper-cluster.kafka:2188:3888
    server.2=zookeeper-1.zookeeper-cluster.kafka:2188:3888
    server.3=zookeeper-2.zookeeper-cluster.kafka:2188:3888
    4lw.commands.whitelist=*
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: zookeeper
  namespace: kafka
spec:
  serviceName: "zookeeper-cluster"   #填写无头服务的名称
  replicas: 3
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      initContainers:
      - name: set-zk-id
        image: busybox:latest
        command: ['sh', '-c', "hostname | cut -d '-' -f 2 | awk '{print $0 + 1}' > /data/myid"]
        volumeMounts:
        - name: data
          mountPath: /data
      containers:
      - name: zookeeper
        image: zookeeper:3.8
        imagePullPolicy: Never
        resources:
          requests:
            memory: "500Mi"
            cpu: "500m"
          limits:
            memory: "1000Mi"
            cpu: "1000m"
        ports:
        - containerPort: 2181
          name: zookeeper
        - containerPort: 2188
          name: cluster1
        - containerPort: 3888
          name: cluster2
        volumeMounts:
          - name: zook-config            #挂载配置
            mountPath: /conf/zoo.cfg
            subPath: zoo.cfg
          - name: data
            mountPath: /data
        env:
        - name: MY_POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name  #metadata.name获取自己pod名称添加到变量MY_POD_NAME
      volumes:
      - name: zook-config
        configMap:                                #configMap挂载
          name: zookeeper-config
  volumeClaimTemplates:                     #这步自动创建pvc,并挂载动态pv
    - metadata:
        name: data
      spec:
        accessModes: ["ReadWriteMany"]
        storageClassName: nfs
        resources:
          requests:
            storage: 10Gi

创建Zookeeper集群

[root@master ~]# kubectl apply -f  zook.yaml

查看Pod状态  

2、部署Kafka集群

编写部署Kafka集群的YAML文件

[root@master ~]# vim kafka.yaml
# 输入如下内容 
apiVersion: v1
kind: Service
metadata:
  name: kafka-cluster  #无头服务的名称,需要通过这个获取ip,与主机的对应关系
  namespace: kafka
  labels:
    app: kafka
spec:
  ports:
    - port: 9092
      name: kafka
  clusterIP: None
  selector:
    app: kafka
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-nodeport-service-0
  namespace: kafka
spec:
  type: NodePort
  selector:
    statefulset.kubernetes.io/pod-name: kafka0-0
  ports:
    - protocol: TCP
      port: 9092        # Service 暴露的端口
      targetPort: 9092   # Pod 中容器的端口
      nodePort: 30092   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整
      name: kafka
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-nodeport-service-1
  namespace: kafka
spec:
  type: NodePort
  selector:
    statefulset.kubernetes.io/pod-name: kafka1-0
  ports:
    - protocol: TCP
      port: 9092        # Service 暴露的端口
      targetPort: 9092   # Pod 中容器的端口
      nodePort: 30093   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整
      name: kafka
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-nodeport-service-2
  namespace: kafka
spec:
  type: NodePort
  selector:
    statefulset.kubernetes.io/pod-name: kafka2-0
  ports:
    - protocol: TCP
      port: 9092        # Service 暴露的端口
      targetPort: 9092   # Pod 中容器的端口
      nodePort: 30094   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整
      name: kafka
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka0
  namespace: kafka
spec:
  serviceName: "kafka-cluster"   #填写无头服务的名称
  replicas: 1
  selector:
    matchLabels:
      app: kafka0
  template:
    metadata:
      labels:
        app: kafka0
    spec:
      containers:
      - name: kafka
        image: kafka:3.1.0
        imagePullPolicy: Never
        resources:
          requests:
            memory: "500Mi"
            cpu: "500m"
          limits:
            memory: "1000Mi"
            cpu: "2000m"
        ports:
        - containerPort: 9092
          name: kafka
        command:
        - sh 
        - -c 
        - "exec /app/kafka/bin/kafka-server-start.sh /app/kafka/config/server.properties --override broker.id=0 \
          --override listeners=PLAINTEXT://:9092 \
          --override advertised.listeners=PLAINTEXT://192.168.40.181:30092 \
          --override zookeeper.connect=192.168.40.181:32181,192.168.40.181:32182,192.168.40.181:32183/kafka \
          --override log.dirs=/var/lib/kafka/data \
          --override auto.create.topics.enable=true \
          --override auto.leader.rebalance.enable=true \
          --override background.threads=10 \
          --override compression.type=producer \
          --override delete.topic.enable=true \
          --override leader.imbalance.check.interval.seconds=300 \
          --override leader.imbalance.per.broker.percentage=10 \
          --override log.flush.interval.messages=9223372036854775807 \
          --override log.flush.offset.checkpoint.interval.ms=60000 \
          --override log.flush.scheduler.interval.ms=9223372036854775807 \
          --override log.retention.bytes=-1 \
          --override log.retention.hours=168 \
          --override log.roll.hours=168 \
          --override log.roll.jitter.hours=0 \
          --override log.segment.bytes=1073741824 \
          --override log.segment.delete.delay.ms=60000 \
          --override message.max.bytes=1000012 \
          --override min.insync.replicas=1 \
          --override num.io.threads=8 \
          --override num.network.threads=3 \
          --override num.recovery.threads.per.data.dir=1 \
          --override num.replica.fetchers=1 \
          --override offset.metadata.max.bytes=4096 \
          --override offsets.commit.required.acks=-1 \
          --override offsets.commit.timeout.ms=5000 \
          --override offsets.load.buffer.size=5242880 \
          --override offsets.retention.check.interval.ms=600000 \
          --override offsets.retention.minutes=1440 \
          --override offsets.topic.compression.codec=0 \
          --override offsets.topic.num.partitions=50 \
          --override offsets.topic.replication.factor=3 \
          --override offsets.topic.segment.bytes=104857600 \
          --override queued.max.requests=500 \
          --override quota.consumer.default=9223372036854775807 \
          --override quota.producer.default=9223372036854775807 \
          --override replica.fetch.min.bytes=1 \
          --override replica.fetch.wait.max.ms=500 \
          --override replica.high.watermark.checkpoint.interval.ms=5000 \
          --override replica.lag.time.max.ms=10000 \
          --override replica.socket.receive.buffer.bytes=65536 \
          --override replica.socket.timeout.ms=30000 \
          --override request.timeout.ms=30000 \
          --override socket.receive.buffer.bytes=102400 \
          --override socket.request.max.bytes=104857600 \
          --override socket.send.buffer.bytes=102400 \
          --override unclean.leader.election.enable=true \
          --override zookeeper.session.timeout.ms=6000 \
          --override zookeeper.set.acl=false \
          --override broker.id.generation.enable=true \
          --override connections.max.idle.ms=600000 \
          --override controlled.shutdown.enable=true \
          --override controlled.shutdown.max.retries=3 \
          --override controlled.shutdown.retry.backoff.ms=5000 \
          --override controller.socket.timeout.ms=30000 \
          --override default.replication.factor=1 \
          --override fetch.purgatory.purge.interval.requests=1000 \
          --override group.max.session.timeout.ms=300000 \
          --override group.min.session.timeout.ms=6000 \
          --override log.cleaner.backoff.ms=15000 \
          --override log.cleaner.dedupe.buffer.size=134217728 \
          --override log.cleaner.delete.retention.ms=86400000 \
          --override log.cleaner.enable=true \
          --override log.cleaner.io.buffer.load.factor=0.9 \
          --override log.cleaner.io.buffer.size=524288 \
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
          --override log.cleaner.min.cleanable.ratio=0.5 \
          --override log.cleaner.min.compaction.lag.ms=0 \
          --override log.cleaner.threads=1 \
          --override log.cleanup.policy=delete \
          --override log.index.interval.bytes=4096 \
          --override log.index.size.max.bytes=10485760 \
          --override log.message.timestamp.difference.max.ms=9223372036854775807 \
          --override log.message.timestamp.type=CreateTime \
          --override log.preallocate=false \
          --override log.retention.check.interval.ms=300000 \
          --override max.connections.per.ip=2147483647 \
          --override num.partitions=1 \
          --override producer.purgatory.purge.interval.requests=1000 \
          --override replica.fetch.backoff.ms=1000 \
          --override replica.fetch.max.bytes=1048576 \
          --override replica.fetch.response.max.bytes=10485760 \
          --override reserved.broker.max.id=1000"
        volumeMounts:
          - name: data0
            mountPath: /var/lib/kafka/data
        env:
        - name: MY_POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name  #metadata.name获取自己pod名称添加到变量MY_POD_NAME
        - name: ALLOW_PLAINTEXT_LISTENER
          value: "yes"
        - name: KAFKA_HEAP_OPTS
          value : "-Xms1g -Xmx1g"
        - name: JMX_PORT
          value: "5555"
  volumeClaimTemplates:                     #这步自动创建pvc,并挂载动态pv
    - metadata:
        name: data0
      spec:
        accessModes: ["ReadWriteMany"]
        storageClassName: nfs
        resources:
          requests:
            storage: 10Gi
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka1
  namespace: kafka
spec:
  serviceName: "kafka-cluster"   #填写无头服务的名称
  replicas: 1
  selector:
    matchLabels:
      app: kafka1
  template:
    metadata:
      labels:
        app: kafka1
    spec:
      containers:
      - name: kafka
        image: kafka:3.1.0
        imagePullPolicy: Never
        resources:
          requests:
            memory: "500Mi"
            cpu: "500m"
          limits:
            memory: "1000Mi"
            cpu: "2000m"
        ports:
        - containerPort: 9092
          name: kafka
        command:
        - sh 
        - -c 
        - "exec /app/kafka/bin/kafka-server-start.sh /app/kafka/config/server.properties --override broker.id=1 \
          --override listeners=PLAINTEXT://:9092 \
          --override advertised.listeners=PLAINTEXT://192.168.40.181:30093 \
          --override zookeeper.connect=192.168.40.181:32181,192.168.40.181:32182,192.168.40.181:32183/kafka \
          --override log.dirs=/var/lib/kafka/data \
          --override auto.create.topics.enable=true \
          --override auto.leader.rebalance.enable=true \
          --override background.threads=10 \
          --override compression.type=producer \
          --override delete.topic.enable=true \
          --override leader.imbalance.check.interval.seconds=300 \
          --override leader.imbalance.per.broker.percentage=10 \
          --override log.flush.interval.messages=9223372036854775807 \
          --override log.flush.offset.checkpoint.interval.ms=60000 \
          --override log.flush.scheduler.interval.ms=9223372036854775807 \
          --override log.retention.bytes=-1 \
          --override log.retention.hours=168 \
          --override log.roll.hours=168 \
          --override log.roll.jitter.hours=0 \
          --override log.segment.bytes=1073741824 \
          --override log.segment.delete.delay.ms=60000 \
          --override message.max.bytes=1000012 \
          --override min.insync.replicas=1 \
          --override num.io.threads=8 \
          --override num.network.threads=3 \
          --override num.recovery.threads.per.data.dir=1 \
          --override num.replica.fetchers=1 \
          --override offset.metadata.max.bytes=4096 \
          --override offsets.commit.required.acks=-1 \
          --override offsets.commit.timeout.ms=5000 \
          --override offsets.load.buffer.size=5242880 \
          --override offsets.retention.check.interval.ms=600000 \
          --override offsets.retention.minutes=1440 \
          --override offsets.topic.compression.codec=0 \
          --override offsets.topic.num.partitions=50 \
          --override offsets.topic.replication.factor=3 \
          --override offsets.topic.segment.bytes=104857600 \
          --override queued.max.requests=500 \
          --override quota.consumer.default=9223372036854775807 \
          --override quota.producer.default=9223372036854775807 \
          --override replica.fetch.min.bytes=1 \
          --override replica.fetch.wait.max.ms=500 \
          --override replica.high.watermark.checkpoint.interval.ms=5000 \
          --override replica.lag.time.max.ms=10000 \
          --override replica.socket.receive.buffer.bytes=65536 \
          --override replica.socket.timeout.ms=30000 \
          --override request.timeout.ms=30000 \
          --override socket.receive.buffer.bytes=102400 \
          --override socket.request.max.bytes=104857600 \
          --override socket.send.buffer.bytes=102400 \
          --override unclean.leader.election.enable=true \
          --override zookeeper.session.timeout.ms=6000 \
          --override zookeeper.set.acl=false \
          --override broker.id.generation.enable=true \
          --override connections.max.idle.ms=600000 \
          --override controlled.shutdown.enable=true \
          --override controlled.shutdown.max.retries=3 \
          --override controlled.shutdown.retry.backoff.ms=5000 \
          --override controller.socket.timeout.ms=30000 \
          --override default.replication.factor=1 \
          --override fetch.purgatory.purge.interval.requests=1000 \
          --override group.max.session.timeout.ms=300000 \
          --override group.min.session.timeout.ms=6000 \
          --override log.cleaner.backoff.ms=15000 \
          --override log.cleaner.dedupe.buffer.size=134217728 \
          --override log.cleaner.delete.retention.ms=86400000 \
          --override log.cleaner.enable=true \
          --override log.cleaner.io.buffer.load.factor=0.9 \
          --override log.cleaner.io.buffer.size=524288 \
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
          --override log.cleaner.min.cleanable.ratio=0.5 \
          --override log.cleaner.min.compaction.lag.ms=0 \
          --override log.cleaner.threads=1 \
          --override log.cleanup.policy=delete \
          --override log.index.interval.bytes=4096 \
          --override log.index.size.max.bytes=10485760 \
          --override log.message.timestamp.difference.max.ms=9223372036854775807 \
          --override log.message.timestamp.type=CreateTime \
          --override log.preallocate=false \
          --override log.retention.check.interval.ms=300000 \
          --override max.connections.per.ip=2147483647 \
          --override num.partitions=1 \
          --override producer.purgatory.purge.interval.requests=1000 \
          --override replica.fetch.backoff.ms=1000 \
          --override replica.fetch.max.bytes=1048576 \
          --override replica.fetch.response.max.bytes=10485760 \
          --override reserved.broker.max.id=1000"
        volumeMounts:
          - name: data1
            mountPath: /var/lib/kafka/data
        env:
        - name: MY_POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name  #metadata.name获取自己pod名称添加到变量MY_POD_NAME
        - name: ALLOW_PLAINTEXT_LISTENER
          value: "yes"
        - name: KAFKA_HEAP_OPTS
          value : "-Xms1g -Xmx1g"
        - name: JMX_PORT
          value: "5555"
  volumeClaimTemplates:                     #这步自动创建pvc,并挂载动态pv
    - metadata:
        name: data1
      spec:
        accessModes: ["ReadWriteMany"]
        storageClassName: nfs
        resources:
          requests:
            storage: 10Gi
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka2
  namespace: kafka
spec:
  serviceName: "kafka-cluster"   #填写无头服务的名称
  replicas: 1
  selector:
    matchLabels:
      app: kafka2
  template:
    metadata:
      labels:
        app: kafka2
    spec:
      containers:
      - name: kafka
        image: kafka:3.1.0
        imagePullPolicy: Never
        resources:
          requests:
            memory: "500Mi"
            cpu: "500m"
          limits:
            memory: "1000Mi"
            cpu: "2000m"
        ports:
        - containerPort: 9092
          name: kafka
        command:
        - sh 
        - -c 
        - "exec /app/kafka/bin/kafka-server-start.sh /app/kafka/config/server.properties --override broker.id=2 \
          --override listeners=PLAINTEXT://:9092 \
          --override advertised.listeners=PLAINTEXT://192.168.40.181:30094 \
          --override zookeeper.connect=192.168.40.181:32181,192.168.40.181:32182,192.168.40.181:32183/kafka \
          --override log.dirs=/var/lib/kafka/data \
          --override auto.create.topics.enable=true \
          --override auto.leader.rebalance.enable=true \
          --override background.threads=10 \
          --override compression.type=producer \
          --override delete.topic.enable=true \
          --override leader.imbalance.check.interval.seconds=300 \
          --override leader.imbalance.per.broker.percentage=10 \
          --override log.flush.interval.messages=9223372036854775807 \
          --override log.flush.offset.checkpoint.interval.ms=60000 \
          --override log.flush.scheduler.interval.ms=9223372036854775807 \
          --override log.retention.bytes=-1 \
          --override log.retention.hours=168 \
          --override log.roll.hours=168 \
          --override log.roll.jitter.hours=0 \
          --override log.segment.bytes=1073741824 \
          --override log.segment.delete.delay.ms=60000 \
          --override message.max.bytes=1000012 \
          --override min.insync.replicas=1 \
          --override num.io.threads=8 \
          --override num.network.threads=3 \
          --override num.recovery.threads.per.data.dir=1 \
          --override num.replica.fetchers=1 \
          --override offset.metadata.max.bytes=4096 \
          --override offsets.commit.required.acks=-1 \
          --override offsets.commit.timeout.ms=5000 \
          --override offsets.load.buffer.size=5242880 \
          --override offsets.retention.check.interval.ms=600000 \
          --override offsets.retention.minutes=1440 \
          --override offsets.topic.compression.codec=0 \
          --override offsets.topic.num.partitions=50 \
          --override offsets.topic.replication.factor=3 \
          --override offsets.topic.segment.bytes=104857600 \
          --override queued.max.requests=500 \
          --override quota.consumer.default=9223372036854775807 \
          --override quota.producer.default=9223372036854775807 \
          --override replica.fetch.min.bytes=1 \
          --override replica.fetch.wait.max.ms=500 \
          --override replica.high.watermark.checkpoint.interval.ms=5000 \
          --override replica.lag.time.max.ms=10000 \
          --override replica.socket.receive.buffer.bytes=65536 \
          --override replica.socket.timeout.ms=30000 \
          --override request.timeout.ms=30000 \
          --override socket.receive.buffer.bytes=102400 \
          --override socket.request.max.bytes=104857600 \
          --override socket.send.buffer.bytes=102400 \
          --override unclean.leader.election.enable=true \
          --override zookeeper.session.timeout.ms=6000 \
          --override zookeeper.set.acl=false \
          --override broker.id.generation.enable=true \
          --override connections.max.idle.ms=600000 \
          --override controlled.shutdown.enable=true \
          --override controlled.shutdown.max.retries=3 \
          --override controlled.shutdown.retry.backoff.ms=5000 \
          --override controller.socket.timeout.ms=30000 \
          --override default.replication.factor=1 \
          --override fetch.purgatory.purge.interval.requests=1000 \
          --override group.max.session.timeout.ms=300000 \
          --override group.min.session.timeout.ms=6000 \
          --override log.cleaner.backoff.ms=15000 \
          --override log.cleaner.dedupe.buffer.size=134217728 \
          --override log.cleaner.delete.retention.ms=86400000 \
          --override log.cleaner.enable=true \
          --override log.cleaner.io.buffer.load.factor=0.9 \
          --override log.cleaner.io.buffer.size=524288 \
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
          --override log.cleaner.min.cleanable.ratio=0.5 \
          --override log.cleaner.min.compaction.lag.ms=0 \
          --override log.cleaner.threads=1 \
          --override log.cleanup.policy=delete \
          --override log.index.interval.bytes=4096 \
          --override log.index.size.max.bytes=10485760 \
          --override log.message.timestamp.difference.max.ms=9223372036854775807 \
          --override log.message.timestamp.type=CreateTime \
          --override log.preallocate=false \
          --override log.retention.check.interval.ms=300000 \
          --override max.connections.per.ip=2147483647 \
          --override num.partitions=1 \
          --override producer.purgatory.purge.interval.requests=1000 \
          --override replica.fetch.backoff.ms=1000 \
          --override replica.fetch.max.bytes=1048576 \
          --override replica.fetch.response.max.bytes=10485760 \
          --override reserved.broker.max.id=1000"
        volumeMounts:
          - name: data2
            mountPath: /var/lib/kafka/data
        env:
        - name: MY_POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name  #metadata.name获取自己pod名称添加到变量MY_POD_NAME
        - name: ALLOW_PLAINTEXT_LISTENER
          value: "yes"
        - name: KAFKA_HEAP_OPTS
          value : "-Xms1g -Xmx1g"
        - name: JMX_PORT
          value: "5555"
  volumeClaimTemplates:                     #这步自动创建pvc,并挂载动态pv
    - metadata:
        name: data2
      spec:
        accessModes: ["ReadWriteMany"]
        storageClassName: nfs
        resources:
          requests:
            storage: 10Gi

创建Kafka集群

[root@master ~]# kubectl apply -f  kafka.yaml 

 查看Pod状态

  

至此,Kafka集群部署完成

💕💕💕每一次的分享都是一次成长的旅程,感谢您的陪伴和关注。希望这些关于Kafka的文章能陪伴您走过技术的一段旅程,共同见证成长和进步!😺😺😺

🧨🧨🧨让我们一起在技术的海洋中探索前行,共同书写美好的未来!!! 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/683347.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

决策树Decision Tree

目录 一、介绍发展优点缺点基本原理 二、熵1、熵2、条件熵3、信息增益4、信息增益率 三、基尼系数四、ID3算法1、建树过程2、优点3、缺点 五、C4.51、二分法处理连续变量1、流程:2、示例 2、缺点 六、CART1、连续数据处理2、离散数据处理3、CART回归原理1、均方误差…

医学编码系统说明

简介 流程说明 登录系统 在浏览器中访问FNEHR的站点,输入医院编号、用户和密码,选择“Other”,点击“Login”按钮,登录系统: 登录后,在左边显示系统的菜单: 系统设置 医院设置 点击左侧的“Acc…

尚硅谷2024新版3小时速通Docker教程

尚硅谷2024新版3小时速通Docker教程 百度网盘:https://pan.baidu.com/s/1SncgHbdJehvZspjcrrbLSw?pwd6c27

【C语言】详解函数(下)(庖丁解牛版)

文章目录 1. 前言2. 数组做函数形参3. 函数嵌套调用和链式访问3.1 嵌套调用3.2 链式访问 1. 前言 详解C语言函数(上)的链接:http://t.csdnimg.cn/EGsfe 经过对函数的初步了解之后,相信大家已经对C语言标准库里的函数已经有初步的认知了,并且还学会了如…

【工具箱】嵌入式系统存储芯片——CS创世 SD NAND

大家都知道MCU是一种"麻雀"虽小,却"五脏俱全"的主控。它的应用领域非常广泛,小到手机手表,大到航空航天的设备上都会用到MCU.市面上目前几个主流厂商有意法半导体(其中最经典的一款就是STM32系列)…

足球俱乐部管理系统的设计

管理员账户功能包括:系统首页,个人中心,管理员管理,教练管理,用户管理,合同信息管理,赛事管理 前台账户功能包括:系统首页,个人中心,公告信息,赛事…

C++ STL - 容器

C STL(标准模板库)中的容器是一组通用的、可复用的数据结构,用于存储和管理不同类型的数据。 目录 零. 简介: 一 . vector(动态数组) 二. list(双向链表) 三. deque&#xff08…

Ansible部署 之 zookeeper集群

简介 Ansible是近年来越来越火的一款轻量级运维自动化工具,主要功能为帮助运维实现运维工作的自动化、降低手动操作的失误、提升运维工作效率。常用于自动化部署软件、自动化配置、自动化管理,支持playbook编排。配置简单,无需安装客户端&am…

LNMP网络架构的搭建

操作准备:准备三台虚拟机 安装 MySQL 服务 (1)准备好mysql目录上传软件压缩包并解压 cd /opt mkdir mysql tar xf mysql-boost-5.7.44.tar.gz (2)安装mysql环境依赖包 yum -y install ncurses ncurses-devel bison…

idea 中:运行 Application 时出错。命令行过长

一、问题描述: idea 导入新项目,在编译后,运行项目时,报以下错误: 14:47 运行 Application 时出错运行 Application 时出错。命令行过长。通过 JAR 清单或通过类路径文件缩短命令行,然后重新运行。二、问题…

在Oracle VM virtual box 中复制 CentOS 7虚拟机更改IP地址的操作

最近玩Redis主从复制的时候,我装了一个虚拟机,但主从复制需要准备3个虚拟机,这个时候,我又不想一个一个去装,我看到Oracle VM virtual box提供了一个虚拟机复制操作,于是就用了一下这个功能,发现…

LabVIEW传感器虚拟综合实验系统

LabVIEW传感器虚拟综合实验系统 开发了一个基于LabVIEW的传感器虚拟综合实验系统,该系统集成了NIELVIS和CSY系列传感器实验平台,通过图形化编程语言进行数据处理和实验管理。系统允许用户进行多种传感器参数的测量和实验报告的自动生成,支持…

SqlServer2016企业版安装

前言 好久没有知识的累积,最近工作上遇到新的SqlServer2016安装,记录一下 参考文章 SQL Server 2016软件安装包和安装教程 - 哔哩哔哩 (bilibili.com) 安装包准备 需要提前准备软件安装包如下 cn_sql_server_2016_enterprise_x64_dvd_8699450&…

(免费领源码)Java#springboot#MySQL书法社团管理系统36200-计算机毕业设计项目选题推荐

目 录 摘要 Abstract 1 绪论 1.1 研究背景 1.2 研究意义 1.3论文结构与章节安排 2 书法社团管理系统系统分析 2.1 可行性分析 2.2 系统流程分析 2.2.1 数据增加流程 2.2.2 数据修改流程 2.2.3 数据删除流程 2.3 系统功能分析 2.3.1 功能性分析 2.3.2 非功能性分析…

重邮803计网概述

目录 一.计算机网络向用户提供的最重要的功能 二.互联网概述 1.网络的网络 2.计算机网络的概念 3. 互联网发展的三个阶段 4.制订互联网的正式标准要经过以下的四个阶段 5.互联网的组成(功能) 6.互联网功能 7.互联网的组成(物理&#…

Vitis HLS 学习笔记--static RAM/ROM

目录 1. 简介 2. static RAM 2.1 无 reset 的情形 2.2 含 reset 的情形 3. static ROM 4. 总结 1. 简介 本文仍然是讨论阵列的初始化与复位问题,区别于《Vitis HLS 学习笔记--global_array_RAM初始化及复位-CSDN博客》,本文讨论的对象是静态阵列&…

ARM的异常处理

目录 异常的概念 ARM的异常源 异常优先级 异常模式 异常处理的过程 状态寄存器CPSR 异常向量表 异常的概念 异常是计算机系统中一种突发事件或错误情况,它打破了正常的程序执行流程,需要特殊处理。异常可能由硬件错误、软件错误或外部事件引发&am…

09-spring的bean创建流程(一)

文章目录 spring中bean的创建流程finishBeanFactoryInitialization(beanFactory)beanFactory.preInstantiateSingletons();getMergedLocalBeanDefinition(beanName);流程实现FactoryBean接口,里面的对象实例化过程 spring中bean的创建流程 finishBeanFactoryInitialization(be…

8990890

作者主页:作者主页 数据结构专栏:数据结构 创作时间 :2024年5月18日

Mac OS 用户开启 8080 端口

开启端口 sudo vim /etc/pf.conf # 开放对应端口 pass out proto tcp from any to any port 8080 # 刷新配置文件 sudo pfctl -f /etc/pf.conf sudo pfctl -e获取本机ip地址 ifconfig en0 | grep inet | grep -v inet6 | awk {print $2}访问指定端口