kafka 集群 ZooKeeper 模式搭建

Apache Kafka是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序

Kafka 官网:Apache Kafka

关于ZooKeeper的弃用

根据 Kafka官网信息,随着Apache Kafka 3.5版本的发布,Zookeeper现已被标记为已弃用。未来计划在Apache Kafka(4.0版)的下一个主要版本中删除ZooKeeper,该版本最快将于2024年4月发布。在弃用阶段,ZooKeeper仍然支持用于Kafka集群元数据的管理,但不建议用于新的部署。新的部署方式使用 KRaft 模式,KRaft 模式部署可以看笔者的文章《kafka 集群 KRaft 模式搭建》,考虑到一些公司仍然在使用老版本的 Kafka,故笔者写这篇文章记录 Kafka 集群Zookeeper 模式搭建

官网信息截图

笔者使用3台服务器,它们的 ip 分别是 192.168.3.232、192.168.2.90、192.168.2.11

目录

1、官网下载 Kafka

2、配置 Kafka

3、启动 Kafka 集群

4、关闭 Kafka 集群

5、使用Kafka 可视化工具查看

6、测试Kafka集群


1、官网下载 Kafka

这里笔者下载最新版3.6.0

3.6.0 版本需要至少 java8 及以上版本,笔者使用的是 java8 版本

关于 linux 安装 java,没安装过的朋友可以参考《linux 系统安装 jdk》

下载完成

将 kafka分别上传到3台linux

在3台服务器上分别创建 kafka 安装目录

mkdir /usr/local/kafka

在3台服务器上分别将 kafka 安装包解压到新创建的 kafka 目录

tar -xzf kafka_2.13-3.6.0.tgz -C /usr/local/kafka

2、配置 Kafka

进入配置目录

cd /usr/local/kafka/kafka_2.13-3.6.0/config

编辑配置文件 server.properties

vi server.properties

配置 broker.id,advertised.listeners,zookeeper.connect

broker.id 每个节点的id

advertised.listeners 本机的外网访问地址

zookeeper.connect zookeeper 地址

192.168.3.232 节点配置

advertised.listeners 笔者配置为本机地址

192.168.2.90 节点

192.168.2.11 节点

笔者zookeeper 地址是 192.168.2.130:2181

zookeeper 版本是3.8.3

关于zookeeper单机安装和集群安装可以参考:《Linux环境 安装 zookeeper》《windows环境 安装 zookeeper》《linux 使用 nginx 搭建 zookeeper 集群》

3、启动 Kafka 集群

首先启动 zookeeper

然后在3台机器上依次启动 Kafka

进入 kafka 目录

cd /usr/local/kafka/kafka_2.13-3.6.0

下面2个命令皆可

bin/kafka-server-start.sh config/server.properties

bin/kafka-server-start.sh -daemon config/server.properties

4、关闭 Kafka 集群

关闭命令

bin/kafka-server-stop.sh

在 3 个节点上分别执行关闭命令

5、使用Kafka 可视化工具查看

下载地址:https://www.kafkatool.com/download.html

运行效果

6、测试Kafka集群

新建 maven 项目,添加 Kafka 依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.0</version>
</dependency>

笔者新建 maven项目 kafka-learn

kafka-learn 项目 pom 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>com.wsjzzcbq</groupId>
    <artifactId>kafka-learn</artifactId>
    <version>1.0-SNAPSHOT</version>
 
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.6.0</version>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

新建生产者 ProducerDemo

package com.wsjzzcbq;
 
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
 
/**
 * Demo
 *
 * @author wsjz
 * @date 2023/11/24
 */
public class ProducerDemo {
 
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        //配置集群节点信息
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");
        //配置序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
 
        Producer<String, String> producer = new KafkaProducer<>(properties);
 
        //topic 名称是demo_topic
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo_topic", "明月别枝惊鹊");
        RecordMetadata recordMetadata = producer.send(producerRecord).get();
        System.out.println(recordMetadata.topic());
        System.out.println(recordMetadata.partition());
        System.out.println(recordMetadata.offset());
 
    }
}

新建消费者 ConsumerDemo

package com.wsjzzcbq;
 
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
 
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
 
/**
 * ConsumerDemo
 *
 * @author wsjz
 * @date 2023/11/24
 */
public class ConsumerDemo {
 
    public static void main(String[] args) {
        Properties properties = new Properties();
        // 配置集群节点信息
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");
 
        // 消费分组名
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo_group");
        // 序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
        // 消费者订阅主题
        consumer.subscribe(Arrays.asList("demo_topic"));
 
        while (true) {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String,String> record:records) {
                System.out.printf("收到消息:partition=%d, offset=%d, key=%s, value=%s%n",record.partition(),
                        record.offset(),record.key(),record.value());
            }
        }
    }
}

运行测试

效果图

至此完

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/209073.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

TCP_握手+挥手过程状态变化分析

TCP状态解读 握手挥手过程状态变化 同时握手 双发同时发起syn请求&#xff0c;状态变化过程如下&#xff1a; 图片来源&#xff1a;http://www.tcpipguide.com/free/t_TCPConnectionEstablishmentProcessTheThreeWayHandsh-4.htm 同时挥手 4次挥手&#xff0c;可以理解为2…

基于相关性的四种机器学习聚类方法

在这篇文章中&#xff0c;基于20家公司的股票价格时间序列数据。根据股票价格之间的相关性&#xff0c;看一下对这些公司进行分类的四种不同方式。 苹果&#xff08;AAPL&#xff09;&#xff0c;亚马逊&#xff08;AMZN&#xff09;&#xff0c;Facebook&#xff08;META&…

Public Keys为constant size的accountable multi-signature

1. 引言 见Dan Boneh等人2023年论文《Accountable Multi-Signatures with Constant Size Public Keys》。 多签方案用于&#xff0c;将多方对同一消息 m m m的多个签名&#xff0c;聚合为对 m m m的单个短签名。 多签方案应用广泛&#xff0c;尤其是在proof-of-stake共识协议…

高并发下缓存失效问题-缓存穿透、缓存击穿、缓存雪崩、Redis分布式锁简单实现、Redisson实现分布式锁

文章目录 缓存基本使用范式暴露的几个问题缓存失效问题---缓存穿透缓存失效问题---缓存击穿一、单机锁正确的锁粒度不正确的锁粒度无法保证查询数据库次数是唯一 二、分布式锁getCatalogJsonData()分布式锁演进---基本原理分布式锁(加锁)演进一&#xff1a;删锁失败导致死锁分布…

zookeeper心跳检测 (实操课程)

本系列是zookeeper相关的实操课程&#xff0c;课程测试环环相扣&#xff0c;请按照顺序阅读来学习和测试zookeeper。 阅读本文之前&#xff0c;请先阅读----​​​​​​zookeeper 单机伪集群搭建简单记录&#xff08;实操课程系列&#xff09;zookeeper 客户端常用命令简单记录…

nodejs微信小程序+python+PHP学科建设管理信息系统的设计与实现-计算机毕业设计推荐

目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 1.2 国内外研究概况 1 1.3 研究的内容 1 第2章 相关技术 3 2.1 nodejs简介 4 2.2 express框架介绍 6 2.4 MySQL数据库 4 第3章 系统分析 5 3.1 需求分析 5 3.2 系统可行性分析 5 3.2.1技术可行性&#xff1a;…

11.28~11.29基本二叉树的性质、定义、复习;排序算法;堆

完全二叉树&#xff08;Complete Binary Tree&#xff09;是一种特殊的二叉树结构&#xff0c;它具有以下特点&#xff1a; 所有的叶子节点都集中在树的最后两层&#xff1b;最后一层的叶子节点都靠左排列&#xff1b;除了最后一层&#xff0c;其他层的节点数都达到最大值。 …

如何快速看懂市场行情?

一、看大盘指数 咱们平时所说的大盘其实指的就是上证指数&#xff0c;它是整个市场的晴雨表。大盘涨了&#xff0c;个股跟着上涨的概率就大&#xff0c;大盘跌了&#xff0c;个股被拖累下跌的概率也大。所以&#xff0c;要想在股市中尝到甜头&#xff0c;大盘分析是少不了滴&am…

Django HMAC 请求签名校验与 Vue.js 实现安全通信

概要 在 Web 应用的开发过程中&#xff0c;确保数据传输的安全性和完整性是一个不容忽视的问题。使用 HMAC&#xff08;Hash-based Message Authentication Code&#xff09;算法对请求内容进行签名校验&#xff0c;是一种常见且有效的安全策略。本文将详细介绍如何在 Django …

[1] AR Tag 在ros中的使用

1.定义 AR Tag 是一种用于增强现实&#xff08;AR&#xff09;应用中的视觉标记&#xff0c;用于跟踪和定位虚拟物体在现实世界中的位置。 AR Tag由黑白正方形图像表示&#xff0c;图像内部有黑色边框中的某些图案。它与我们经常用到的二维码长得类似&#xff0c;原理其实也一…

STM32内部温度传感器使用方法详解

STM32内部温度传感器使用方法详解 前言 STM32内部集成了一个片上温度传感器&#xff0c;可以用来测量MCU及周围的温度。测量范围&#xff1a;-40~125&#xff0c;精度1.5℃。虽然精度不高&#xff0c;但在某些应用场景下是够了的&#xff0c;相比于外部接入传感器&#xff0c…

nodejs微信小程序+python+PHP金融产品销售系统的设计与实现-计算机毕业设计推荐

目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 1.2 国内外研究概况 1 1.3 研究的内容 1 第2章 相关技术 3 2.1 nodejs简介 4 2.2 express框架介绍 6 2.4 MySQL数据库 4 第3章 系统分析 5 3.1 需求分析 5 3.2 系统可行性分析 5 3.2.1技术可行性&#xff1a;…

Windows server 2016 FTP服务器的搭建

FTP&#xff08;File Transfer Protocol&#xff09;是一个用来在两台计算机之间传输文件的通信协议。这两台计算机中&#xff0c;一台是FTP服务器&#xff0c;另一台是FTP 客户端。 1.安装FTP服务与建立FTP站点 1.1 打开服务器管理器——单击仪表盘的添加角色和功能 1.2 持续…

【计算机网络笔记】PPP协议

系列文章目录 什么是计算机网络&#xff1f; 什么是网络协议&#xff1f; 计算机网络的结构 数据交换之电路交换 数据交换之报文交换和分组交换 分组交换 vs 电路交换 计算机网络性能&#xff08;1&#xff09;——速率、带宽、延迟 计算机网络性能&#xff08;2&#xff09;…

C语言:写一个函数,输入一个十六进制数,输出相应的十进制数

分析&#xff1a; 当用户运行该程序时&#xff0c;程序会提示用户输入一个十六进制数。用户需要在命令行中输入一个有效的十六进制数&#xff0c;例如&#xff1a;"1A3F"。 接下来&#xff0c;程序调用了名为 xbed 的函数&#xff0c;并将用户输入的十六进制数作…

Linux 权限管理

1 Linux 安全模型 AAA认证资源分派&#xff1a; 当用户登录时&#xff0c;系统会自动分配令牌 token&#xff0c;包括用户标识和组成员等等信息 1.1 用户 Linux 中每个用户是通过 User ID&#xff08;UID&#xff09;来唯一标识的。 1.2 用户组 Linux 中可以将一个或者多个…

limit 10和limit 10000 10效率相同吗

先说结论&#xff1a;不相同&#xff0c;差异很大。 set profiling 1;select * from xiatui order by id limit 10000,10;select * from xiatui order by id limit 10;show profiles; select * from xiatui order by name limit 90000,10;select * from xiatui order by name…

使用 NRF24L01 无线收发模块进行远程控制

NRF24L01 是一款基于 2.4GHz 射频通信的低功耗无线收发模块&#xff0c;具有高性能和稳定性&#xff0c;适用于远程控制和数据传输应用。本文将介绍如何使用 NRF24L01 模块进行远程控制&#xff0c;包括硬件的连接和配置&#xff0c;以及相应的代码示例。 一、引言 NRF24L01 是…

k8s报错

报错&#xff1a; 这个错误信息表明你的容器运行时&#xff08;container runtime&#xff09;没有正常运行&#xff0c;具体是因为CRI&#xff08;容器运行时接口&#xff09;v1版本的API没有为特定的端点实现。这通常发生在使用containerd作为容器运行时时。错误信息中提到的…

@RequestMapping处理请求异常

使用RequestMapping不指定请求方式&#xff0c;多种请求方式都支持。 Get格式FORM_URLENCODED Content-Typeapplication/x-www-form-urlencoded URL形式传参&#xff0c;请求体里面的内容是&#xff1a;usernamejohnexample.com&passwordsecretpassword&grant_type…