基于Docker的Kafka分布式集群

目录

1. 说明

2. 服务器规划

3. docker-compose文件

kafka{i}.yaml

 kafka-ui.yaml

4. kafka-ui配置集群监控

5. 参数表

6. 测试脚本

生产者-异步生产: AsyncKafkaProducer1.py

消费者-异步消费: AsyncKafkaConsumer1.py

7. 参考


1. 说明

  • 创建一个本地开发环境所需的kafka集群
  • 分布在3个虚拟机上,以docker容器方式互联互通

2. 服务器规划

Host端口备注

host001.dev.sb

9092, 9093, 9081

kafka ui 访问

kafka0 节点

host002.dev.sb9092, 9093kafka1 节点
host003.dev.sb9092, 9093kafka2 节点

3. docker-compose文件

kafka{i}.yaml

- 其中 {i} 对应0,1,2

- 用户密码都配在文件里面

services:
  kafka:
    image: 'bitnami/kafka:3.6.2'
    container_name: kafka{i}
    hostname: kafka{i}
    restart: always
    ports:
      - 9092:9092
      - 9093:9093
    environment:
      # KRaft
      - KAFKA_CFG_NODE_ID={i}
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka0:9093,1@kafka1:9093,2@kafka2:9093
      - KAFKA_KRAFT_CLUSTER_ID=sbcluster01-mnopqrstuv
      # Listeners
      - KAFKA_CFG_LISTENERS=INTERNAL://:9094,CLIENT://:9095,CONTROLLER://:9093,EXTERNAL://:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SASL_PLAINTEXT,CLIENT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka0:9094,CLIENT://:9095,EXTERNAL://kafka0:9092
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_NUM_PARTITIONS=3
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
      # Clustering
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
      # Log
      - KAFKA_CFG_LOG_RETENTION_HOURS = 72
      # SASL
      - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
      - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
      - KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN
      - KAFKA_CONTROLLER_USER=kfkuser
      - KAFKA_CONTROLLER_PASSWORD=youknow
      - KAFKA_INTER_BROKER_USER=kfkuser
      - KAFKA_INTER_BROKER_PASSWORD=youknow
      - KAFKA_CLIENT_USERS=kfkuser
      - KAFKA_CLIENT_PASSWORDS=youknow
      # Others
      - TZ=Asia/Shanghai
    volumes:
      - '/data0/Server/Db/kafka0:/bitnami/kafka'
    extra_hosts: 
      - "kafka0:172.16.20.60"
      - "kafka1:172.16.20.61"
      - "kafka2:172.16.20.62"
 kafka-ui.yaml
services:
  kafka-ui:
    image: 'provectuslabs/kafka-ui:master'
    container_name: kafka-ui
    restart: always
    ports:
      - 9081:8080
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - DYNAMIC_CONFIG_ENABLED=true
      - AUTH_TYPE=LOGIN_FORM
      - SPRING_SECURITY_USER_NAME=admin
      - SPRING_SECURITY_USER_PASSWORD=youknow
    extra_hosts: 
      - "kafka0:172.16.20.60"
      - "kafka1:172.16.20.61"
      - "kafka2:172.16.20.62"

4. kafka-ui配置集群监控

5. 参数表

参数说明
KAFKA_CFG_PROCESS_ROLES

kafka角色,做broker, controller

示例:
KAFKA_CFG_PROCESS_ROLES=controller,broker

KAFKA_KRAFT_CLUSTER_ID集群id, 同属节点需一样
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS投票选举列表
KAFKA_CFG_CONTROLLER_LISTENER_NAMES控制器名称
KAFKA_CFG_NUM_PARTITIONS默认分区数
KAFKA_CFG_LISTENERS监听器的地址和端口
KAFKA_CFG_ADVERTISED_LISTENERS发布监听器的地址和端口
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP监听器的协议 这里sasl_plain表示   仅认证加密 传输不加密
KAFKA_CLIENT_USERS加密客户端账号
KAFKA_CLIENT_PASSWORDS加密客户端密码
#Clustering
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTORKafka 内部使用的 __consumer_offsets 主题的复制因子。这个主题是用来存储消费者偏移
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTORKafka 内部使用的 __transaction_state 主题的复制因子。这个主题是用来存储事务日志
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISRKafka 内部使用的 __transaction_state 主题的最小 ISR(In-Sync Replicas)数量。ISR 是与
leader 保持同步的副本集合
#Log
KAFKA_CFG_LOG_DIRS日志目录
KAFKA_CFG_LOG_RETENTION_HOURS数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理,默认168小时,一周时间

6. 测试脚本

生产者-异步生产: AsyncKafkaProducer1.py
from confluent_kafka import Producer
import json


def delivery_report(err, msg):
    """Called once for each message produced to indicate delivery result.
    Triggered by poll() or flush()."""
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")


def create_async_producer(config):
    """Creates an instance of an asynchronous Kafka producer."""
    return Producer(config)


def produce_messages(producer, topic, messages):
    """Asynchronously produces messages to a Kafka topic."""
    for message in messages:
        # Trigger any available delivery report callbacks from previous produce() calls
        producer.poll(0)

        # Asynchronously produce a message, the delivery report callback
        # will be triggered from poll() above, or flush() below, when the message has
        # been successfully delivered or failed permanently.
        producer.produce(
            topic, json.dumps(message).encode("utf-8"), callback=delivery_report
        )

    # Wait for any outstanding messages to be delivered and delivery report
    # callbacks to be triggered.
    producer.flush()


if __name__ == "__main__":
    # Kafka configuration
    # Replace these with your server's configuration
    conf = {
        "bootstrap.servers": "host001.dev.sb:9092,host002.dev.sb:9092,host003.dev.sb:9092",
        "client.id": "PythonProducer",
        "security.protocol": "SASL_PLAINTEXT",
        "sasl.mechanisms": "PLAIN",
        "sasl.username": "kfkuser",
        "sasl.password": "youknow",
    }

    # Create an asynchronous Kafka producer
    async_producer = create_async_producer(conf)

    # Messages to send to Kafka
    messages_to_send = [{"key": "value1a"}, {"key": "value2a"}, {"key": "value3a"}]

    # Produce messages
    produce_messages(async_producer, "zx001.msg.user", messages_to_send)
消费者-异步消费: AsyncKafkaConsumer1.py
from confluent_kafka import Consumer, KafkaError, KafkaException
import asyncio
import json
import logging
from datetime import datetime

# 设置日志格式,'%()'表示日志参数
log_format = "%(message)s"
logging.basicConfig(
    filename="logs/kafka_messages1.log", format=log_format, level=logging.INFO
)


async def consume_loop(consumer, topics):
    try:
        # 订阅主题
        consumer.subscribe(topics)

        while True:
            # 轮询消息
            msg = consumer.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    print(
                        "%% %s [%d] reached end at offset %d\n"
                        % (msg.topic(), msg.partition(), msg.offset())
                    )
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                # 正常消息
                raw_message = msg.value()
                # print(f"Raw message: {raw_message}")
                str_msg = raw_message.decode("utf-8")
                parsed_message = json.loads(str_msg)
                parsed_message["time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                print(f"Received message: {type(parsed_message)} : {parsed_message}")
                json_data = json.dumps(parsed_message, ensure_ascii=False)
                logging.info("{}".format(json_data))
            await asyncio.sleep(0.01)  # 小睡片刻,让出控制权
    finally:
        # 关闭消费者
        consumer.close()


async def consume():
    # 消费者配置
    conf = {
        "bootstrap.servers": "host001.dev.sb:9092,host002.dev.sb:9092,host003.dev.sb:9092",
        "group.id": "MsgGroup2",
        "auto.offset.reset": "earliest",
        "client.id" :  "PythonConsumer",
        "security.protocol" :  "SASL_PLAINTEXT",
        "sasl.mechanisms" :  "PLAIN",
        "sasl.username" :  "kfkuser",
        "sasl.password" :  "youknow"
    }

    # 创建消费者
    consumer = Consumer(conf)
    await consume_loop(consumer, ["zx001.msg.user"])


if __name__ == "__main__":
    asyncio.run(consume())

7. 参考

- Apache Kafka® Quick Start - Local Install With Docker

- kafka-ui-docs/configuration/configuration-wizard.md at main · provectus/kafka-ui-docs · GitHub

- https://juejin.cn/post/7187301063832109112

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/958862.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux系统 C/C++编程基础——基于Qt的图形用户界面编程

ℹ️大家好,我是练小杰,今天周四了,距离除夕只有4天了,各位今年卫生都搞完了吗!😆 本文是接着昨天Linux 系统C/C编程的知识继续讲,基于Qt的图形用户界面编程概念及其命令,后续会不断…

C++11(二)

目录 左值引用与右值引用 左值引用 右值引用 右值与左值交叉引用 移动语义 移动构造 移动赋值 完美转发 本期我们将学习C11中比较重要的一个知识点------右值引用。 左值引用与右值引用 在学习左值引用和右值引用之前,我们得先知道什么是左值&#xff0…

【python】四帧差法实现运动目标检测

四帧差法是一种运动目标检测技术,它通过比较连续四帧图像之间的差异来检测运动物体。这种方法可以在一定的程度上提高检测的准确性。 目录 1 方案 2 实践 ① 代码 ② 效果图 1 方案 具体的步骤如下: ① 读取视频流:使用cv2.VideoCapture…

SpringBoot开发(二)Spring Boot项目构建、Bootstrap基础知识

1. Spring Boot项目构建 1.1. 简介 基于官方网站https://start.spring.io进行项目的创建. 1.1.1. 简介 Spring Boot是基于Spring4框架开发的全新框架,设计目的是简化搭建及开发过程,并不是对Spring功能上的增强,而是提供了一种快速使用Spr…

PMP–一、二、三模–分类–12.采购管理

文章目录 技巧十二、采购管理 一模12.采购管理--3.控制采购--输出--风险登记册--每个被选中的卖方都会带来特殊的风险。随着早期风险的过时以及新风险的出现,在项目执行期间对风险登记册进行变更。 供应商还未开始做,是一个风险,当做风险进行…

栈和队列(C语言)

目录 数据结构之栈 定义 实现方式 基本功能实现 1)定义,初始化栈 2)入栈 3)出栈 4)获得栈顶元素 5)获得栈中有效元素个数 6)检测栈是否为空 7)销毁栈 数据结构之队列 定义 实现方…

B站pwn教程笔记-1

因为没有垃圾处理机制,适合做编译,不会有堵塞 c语言市场占有率还是比较高的。 Windows根据后缀识别文件,linux根据文件头识别 55:16 编译过程 一步:直接gcc编译.c文件 这只是其中的一些步骤 gcc -S 转变为汇编。但其实这时候还…

jQuery小游戏

jQuery小游戏(一) 嘻嘻,今天我们来写个jquery小游戏吧 首先,我们准备一下写小游戏需要准备的佩饰,如果:图片、音乐、搞怪的小表情 这里我准备了一些游戏中需要涉及到的图片 游戏中使用到的方法 eval() 函…

Batch Normalization学习笔记

文章目录 一、为何引入 Batch Normalization二、具体步骤1、训练阶段2、预测阶段 三、关键代码实现四、补充五、参考文献 一、为何引入 Batch Normalization 现在主流的卷积神经网络几乎都使用了批量归一化(Batch Normalization,BN)1&#xf…

JavaSec系列 | 动态加载字节码

视频教程在我主页简介或专栏里 目录: 动态加载字节码 字节码 加载远程/本地文件 利用defineClass()直接加载字节码 利用TemplatesImpl加载字节码 动态加载字节码 字节码 Java字节码指的是JVM执行使用的一类指令,通常被存储在.class文件中。 加载远程…

第十四讲 JDBC数据库

1. 什么是JDBC JDBC(Java Database Connectivity,Java数据库连接),它是一套用于执行SQL语句的Java API。应用程序可通过这套API连接到关系型数据库,并使用SQL语句来完成对数据库中数据的查询、新增、更新和删除等操作…

JVM面试题解,垃圾回收之“分代回收理论”剖析

一、什么是分代回收 我们会把堆内存中的对象间隔一段时间做一次GC(即垃圾回收),但是堆内存很大一块,内存布局分为新生代和老年代、其对象的特点不一样,所以回收的策略也应该各不相同 对于“刚出生”的新对象&#xf…

电脑如何访问手机文件?

手机和电脑已经深深融入了我们的日常生活,无时无刻不在为我们提供服务。除了电脑远程操控电脑外,我们还可以在电脑上轻松地访问Android或iPhone手机上的文件。那么,如何使用电脑远程访问手机上的文件呢? 如何使用电脑访问手机文件…

ThinkPHP 8模型与数据的插入、更新、删除

【图书介绍】《ThinkPHP 8高效构建Web应用》-CSDN博客 《2025新书 ThinkPHP 8高效构建Web应用 编程与应用开发丛书 夏磊 清华大学出版社教材书籍 9787302678236 ThinkPHP 8高效构建Web应用》【摘要 书评 试读】- 京东图书 使用VS Code开发ThinkPHP项目-CSDN博客 编程与应用开…

【MySQL】数据库基础知识

欢迎拜访:雾里看山-CSDN博客 本篇主题:【MySQL】数据库基础知识 发布时间:2025.1.21 隶属专栏:MySQL 目录 什么是数据库为什么要有数据库数据库的概念 主流数据库mysql的安装mysql登录使用一下mysql显示数据库内容创建一个数据库创…

【线性代数】基础版本的高斯消元法

[精确算法] 高斯消元法求线性方程组 线性方程组 考虑线性方程组, 已知 A ∈ R n , n , b ∈ R n A\in \mathbb{R}^{n,n},b\in \mathbb{R}^n A∈Rn,n,b∈Rn, 求未知 x ∈ R n x\in \mathbb{R}^n x∈Rn A 1 , 1 x 1 A 1 , 2 x 2 ⋯ A 1 , n x n b 1…

高等数学学习笔记 ☞ 微分方程

1. 微分方程的基本概念 1. 微分方程的基本概念: (1)微分方程:含有未知函数及其导数或微分的方程。 举例说明微分方程:;。 (2)微分方程的阶:指微分方程中未知函数的导数…

HarmonyOS基于ArkTS卡片服务

卡片服务 前言 Form Kit(卡片开发框架)提供了一种在桌面、锁屏等系统入口嵌入显示应用信息的开发框架和API,可以将应用内用户关注的重要信息或常用操作抽取到服务卡片(以下简称“卡片”)上,通过将卡片添加…

Java复习第四天

一、代码题 1.相同的树 (1)题目 给你两棵二叉树的根节点p和q,编写一个函数来检验这两棵树是否相同。 如果两个树在结构上相同,并且节点具有相同的值,则认为它们是相同的。 示例 1: 输入:p[1,2,3],q[1,2,3] 输出:true示例 2: 输…

全面了解 Web3 AIGC 和 AI Agent 的创新先锋 MelodAI

不管是在传统领域还是 Crypto,AI 都是公认的最有前景的赛道。随着数字内容需求的爆炸式增长和技术的快速迭代,Web3 AIGC(AI生成内容)和 AI Agent(人工智能代理)正成为两大关键赛道。 AIGC 通过 AI 技术生成…