kafka 集群 KRaft 模式搭建

Apache Kafka是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序

Kafka 官网:https://kafka.apache.org/

Kafka 在2.8版本之后,移除了对Zookeeper的依赖,将依赖于ZooKeeper的控制器改造成了基于Kafka Raft的Quorm控制器,因此可以在不使用ZooKeeper的情况下实现集群

本文讲解 Kafka KRaft 模式集群搭建

笔者使用3台服务器,它们的 ip 分别是 192.168.3.232、192.168.2.90、192.168.2.11

目录

1、官网下载 Kafka

2、配置 Kafka

3、创建 KRaft 集群

4、启动 Kafka KRaft 集群

5、关闭 Kafka KRaft 集群

6、测试 KRaft 集群


1、官网下载 Kafka

这里笔者下载最新版3.6.0

下载完成

将kafka分别上传到3台linux

在3台服务器上分别创建 kafka 安装目录

mkdir /usr/local/kafka

在3台服务器上分别将 kafka 安装包解压到新创建的 kafka 目录

tar -xzf kafka_2.13-3.6.0.tgz -C /usr/local/kafka

2、配置 Kafka

进入配置目录

cd /usr/local/kafka/kafka_2.13-3.6.0/config/kraft

编辑配置文件

vi server.properties

server.properties 配置说明

node.id 是kafka的broker节点id

controller.quorum.voters 配置的是 kafka 集群中的其他节点,kafka Controller的投票者配置,定义了一组Controller节点,其中包括它们各自的 id 和网络地址

advertised.listeners 是节点自己的监听地址

192.168.3.232 节点配置

node.id = 1

192.168.2.90 节点配置

node.id = 2

192.168.2.11节点配置

node.id = 3

3、创建 KRaft 集群

生成集群id

在任意一个节点上执行就行,笔者使用 192.168.3.232 节点

进入bin 目录

cd /usr/local/kafka/kafka_2.13-3.6.0/bin

执行生成集群 id 命令

./kafka-storage.sh random-uuid

生成后保存生成的字符串    82vqfbdSTO2QzS_M0Su1Bw

然后分别在3台机器上执行下面命令

为方便执行命令,先回到 kafka安装目录

cd /usr/local/kafka/kafka_2.13-3.6.0

再执行命令,完成集群元数据配置

bin/kafka-storage.sh format -t 82vqfbdSTO2QzS_M0Su1Bw -c config/kraft/server.properties

192.168.3.232 节点

192.168.2.90 节点

192.168.2.11节点

上面命令执行完成后,开放防火墙端口

kafka 需要开放 9092 端口和 9093 端口

3台机器上分别开放 9092 和 9093 端口

查看开放端口

firewall-cmd --zone=public --list-ports

 开放9092 端口

firewall-cmd --zone=public --add-port=9092/tcp --permanent

  开放9093 端口

firewall-cmd --zone=public --add-port=9093/tcp --permanent

更新防火墙规则(无需断开连接,动态添加规则)

firewall-cmd --reload

4、启动 Kafka KRaft 集群

在3台机器上分别启动

下面2个命令均可启动

bin/kafka-server-start.sh -daemon config/kraft/server.properties

bin/kafka-server-start.sh config/kraft/server.properties

笔者使用第二个启动命令 启动,效果看下图

当 3 个节点都出现 Kafka Server started,集群启动成功

5、关闭 Kafka KRaft 集群

关闭命令

bin/kafka-server-stop.sh

在 3 个节点上分别执行关闭命令

6、测试 KRaft 集群

新建 maven 项目,添加 Kafka 依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.0</version>
</dependency>

笔者新建 maven项目 kafka-learn

kafka-learn 项目 pom 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.wsjzzcbq</groupId>
    <artifactId>kafka-learn</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.6.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

新建生产者 ProducerDemo

package com.wsjzzcbq;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * Demo
 *
 * @author wsjz
 * @date 2023/11/24
 */
public class ProducerDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        //配置集群节点信息
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");
        //配置序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        Producer<String, String> producer = new KafkaProducer<>(properties);

        //topic 名称是demo_topic
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo_topic", "明月别枝惊鹊");
        RecordMetadata recordMetadata = producer.send(producerRecord).get();
        System.out.println(recordMetadata.topic());
        System.out.println(recordMetadata.partition());
        System.out.println(recordMetadata.offset());

    }
}

新建消费者 ConsumerDemo

package com.wsjzzcbq;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * ConsumerDemo
 *
 * @author wsjz
 * @date 2023/11/24
 */
public class ConsumerDemo {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // 配置集群节点信息
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");

        // 消费分组名
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo_group");
        // 序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
        // 消费者订阅主题
        consumer.subscribe(Arrays.asList("demo_topic"));

        while (true) {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String,String> record:records) {
                System.out.printf("收到消息:partition=%d, offset=%d, key=%s, value=%s%n",record.partition(),
                        record.offset(),record.key(),record.value());
            }
        }
    }
}

运行测试

效果图

消息成功发送并成功消费

至此完

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/187835.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【JavaEE初阶】线程安全问题及解决方法

目录 一、多线程带来的风险-线程安全 1、观察线程不安全 2、线程安全的概念 3、线程不安全的原因 4、解决之前的线程不安全问题 5、synchronized 关键字 - 监视器锁 monitor lock 5.1 synchronized 的特性 5.2 synchronized 使用示例 5.3 Java 标准库中的线程安全类…

vscode运行c++程序如何支持c++11

参考https://zhuanlan.zhihu.com/p/269244754 更改setting.json文件

Spring cloud - Feign

Feign的作用 Feign是Netflix公司开发的声明式web客户端组件&#xff0c;Spring对Feign做了无缝集成&#xff1a; Feign is a declarative web service client. It makes writing web service clients easier. To use Feign create an interface and annotate it. It has plugg…

什么是 Jest ? Vue2 如何使用 Jest 进行单元测试?Vue2 使用 Jest 开发单元测试实例

什么是Jest? Jest 是一个流行的 JavaScript 测试框架,由 Facebook 开发并维护,专注于简单性和速度。它通常用于编写 JavaScript 和 TypeScript 应用程序的单元测试、集成测试和端到端测试。 特点: 简单易用: Jest 提供简洁的 API 和易于理解的语法,使得编写测试用例变得…

kolla-ansible 部署OpenStack云计算平台

目录 一、环境 二、安装及部署 三、测试 一、环境 官方文档&#xff1a;https://docs.openstack.org/kolla-ansible/yoga/user/quickstart.html rhel8.6 网络设置&#xff1a; 修改网卡名称 网络IP&#xff1a; 主机名&#xff1a; 网络时间协议 配置软件仓库 vim docke…

模拟退火算法应用——求解二元函数的最小值(极小值)

仅作自己学习使用 一、问题 二、代码 clear clcT1 cputime; xmax 5; xmin -5; ymax 5; ymin -5; L 20; % 马尔科夫链长度 dt 0.998; % 降温系数 S 0.02; % 步长因子 T 200; % 初始温度 TZ 1e-8; % 容差 Tmin 0.01;% 最低温度 P 0; % Metropolis接受…

持续集成交付CICD:GitLabCI 通过trigger触发流水线

目录 一、理论 1.GitLabCI 二、实验 1.搭建共享库项目 2.GitLabCI 通过trigger触发流水线 三、问题 1.项目app02未触发项目app01 2.GitLab 报502网关错误 一、理论 1.GitLabCI (1) 概念 GitLab CI&#xff08;Continuous Integration&#xff09;是一种持续集成工具…

为什么要隐藏id地址?使用IP代理技术可以实现吗?

随着网络技术的不断发展&#xff0c;越来越多的人开始意识到保护个人隐私的重要性。其中&#xff0c;隐藏自己的IP地址已经成为了一种常见的保护措施。那么&#xff0c;为什么要隐藏IP地址&#xff1f;使用IP代理技术可以实现吗&#xff1f;下面就一起来探讨这些问题。 首先&am…

蓝桥杯每日一题2023.11.24

题目描述 #include <stdio.h> #define N 100int connected(int* m, int p, int q) {return m[p]m[q]? 1 : 0; }void link(int* m, int p, int q) {int i;if(connected(m,p,q)) return;int pID m[p];int qID m[q];for(i0; i<N; i) ________________________________…

没搞懂二维差分是什么怎么办???

摸鱼的时候画的&#xff0c;根据公式反推 一维差分倒是懂了 a[10]{1,2,6,9,11,12,17,21,32,67}; c[10]{1,1,4,3,2,1,5,4,11,35}; 现要把[3,7]的值都增加3 c[10]{1,1,7,3,2,1,5,1,11,35}; 要查询的时候再用for循环相加 结论&#xff1a;成立且适用于多次修改 不知道为什么这个…

跨域攻击分析和防御(上)

点击星标&#xff0c;即时接收最新推文 跨域攻击 在大型公司或大型跨国企业中都拥有自己的内网&#xff0c;跨国公司都有各个子公司一般以建立域林进行共享资源。根据不同职能区分的部门&#xff0c;从逻辑上以主域和子域进行划分以方便统一管理。在物理层使用防火墙将各个子公…

『亚马逊云科技产品测评』活动征文|EC2 实例安装 docker 与配套软件部署前后端分离的医疗管理后台系统

授权声明&#xff1a;本篇文章授权活动官方亚马逊云科技文章转发、改写权&#xff0c;包括不限于在 Developer Centre, 知乎&#xff0c;自媒体平台&#xff0c;第三方开发者媒体等亚马逊云科技官方渠道 目录 一、AWS 产品类别选择 &#xff08;1&#xff09;应用服务器选择…

浅谈现代化城市建设中智慧消防的研究与应用

安科瑞 华楠 【摘要】随着城市现代化发展&#xff0c;城市居住密度愈来愈大&#xff0c;城市建筑结构复杂多样化&#xff0c;高层建筑火灾发生率在不断地升高。对现代化城市面临的消防问题展开讨论&#xff0c;针对智慧消防在现代化城市建设中的现状进行了分析&#xff0c;并提…

探索 Vue 中的 bus.$emit:实现组件通信的强大工具

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

快手AI布局:从直播电商到大模型,如何打造智能生态?

快手科技在2023年第三季度业绩中&#xff0c;首次披露了关于AI业务的一些重要信息&#xff0c;显示出其对AI的重视和投入。快手AI的核心业务和竞争优势是什么&#xff1f;AI的发展&#xff0c;对快手业绩带来了哪些方面的提振&#xff1f; 快手AI业务板块&#xff1a;直播电商…

华为云人工智能入门级开发者认证学习笔记

人工智能入门级开发者认证 人工智能定义 定义 人工智能 (Artificial Intelligence) 是研究、延伸和扩展人的智能的理论、方法、技术及应用系统的一门新的技术科学。 强人工智能 vs 弱人工智能 强人工智能&#xff1a;强人工智能观点认为有可能制造出真正能推理&#xff08…

【React】打包体积分析 source-map-explorer

通过分析打包体积&#xff0c;才能知道项目中的哪部分内容体积过大&#xff0c;方便知道哪些包需要进一步优化。 使用步骤 安装分析打包体积的包&#xff1a;npm i source-map-explorer在 package.json 中的 scripts 标签中&#xff0c;添加分析打包体积的命令对项目打包&…

【Java并发】聊聊创建线程池的几种方式以及实际生产如何应用

上一篇文章&#xff0c;主要讲述了如果通过线程池进行执行任务&#xff0c;以及相关的核心流程&#xff0c;线程执行框架本身提供了一系列的类&#xff0c;封装了线程创建、关闭、执行、管理等跟业务逻辑无关的代码逻辑&#xff0c;一方面将业务和非业务逻辑进行解耦合&#xf…

Docker简介与安装

Docker简介 1.什么是docker docker是解决了运行环境和配置问题的软件容器。是一种方便做持续集成并有助于整体发布的容器虚拟化技术。docker官网&#xff1a;http://www.docker.com 2.与传统虚拟技术的区别 传统虚拟机技术是虚拟出一套硬件后&#xff0c;在其上运行一个完整…

关于pytorch以及相关包的安装教程

一.查看自己电脑的配置 首先查看自己电脑的cuda的版本&#xff0c;WinR,敲入cmd打开终端 输入nvidia-smi&#xff0c;查看自己电脑的显卡等配置 这里要说明一下关于这个CUDA,它具有向后兼容性&#xff0c;这意味着支持较低版本的 CUDA 的应用程序通常也可以在较高版本的 CUD…