KRaft使用SASL_PLAINTEXT进行认证

需要有KRaft相关的基础,才行。可参阅之前学习记录Kafka

一、配置

首先需要了解SASL的含义,SASL全称为Simple Authentication and Security Layer,它主要是用于在客户端和服务器之间提供安全的身份验证机制。

Kafka 支持以下几种 SASL 验证机制如下

  • GSSAPI (Kerberos)
  • PLAIN
  • SCRAM-SHA-256
  • SCRAM-SHA-512
  • OAUTHBEARER

其中,PLAIN相对来说更简单,本文就记录SASL/PLAIN的配置与使用。

1.1 配置单机SASL/PLAIN认证

安装的过程参考一键安装的脚本,安装好Kafka后,按照如下操作进行配置

1.) 创建kafka_server_jaas.conf

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_alice="alice-secret";
};

user_admin="admin-secret"表示用户名admin,对应的密码为admin-secretuser_alice同理。

usernamepassword表示节点建立集群时,需要验证的身份信息,只有验证通过的节点,方能成功建立集群。

2.)进入到kafka的bin路径中, 复制一份kafka-server-start.sh出来。

cp kafka-server-start.sh kafka-server-start-jaas.sh

并修改复制出来的文件,添加三行。

if [ "x$KAFKA_OPTS" ]; then
    export KAFKA_OPTS="-Djava.security.auth.login.config=/root/kafka_server_jaas.conf"
fi

3.) 将config/kraft/server.properties再复制一份。

cp server.properties server.jaas.properties

并修改复制出来的文件,修改参数如下。其中advertised.listeners中的ip要改为实际使用的ip地址。

sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN

listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=SASL_PLAINTEXT
advertised.listeners=SASL_PLAINTEXT://10.0.0.10:9092
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

其中,listener.security.protocol.map的格式为{别名}:{监听器类型},配置了别名,那么在配置listeners时,就可以通过{别名}://:{端口}来进行配置。前缀为 SASL 的两个参数是新增的,目的是为了配置 SASL 认证,其表示的含义如下

  • sasl.enabled.mechanisms:指定Kafka进行SASL验证的机制。PLAIN表示使用文本验证。
  • sasl.mechanism.inter.broker.protocol:指定broker之间通信所使用的SASL验证的机制。PLAIN表示使用文本验证。

Kafka支持不同的监听器类型,如下

监听器类型数据加密身份验证适用场景
PLAINTEXT————内网环境、安全性有保障
SSL/TLSSSL/TLS——在公网传输敏感数据
SASL_PLAINTEXT——SASL需要身份验证但不需要数据加密的环境
SASL_SSLSSL/TLSSASL安全性要求极高的环境

4.) 启动

格式化集群信息,并启动

/root/kafka_2.13-3.3.1/bin/kafka-storage.sh format -t T1CYXg2DQPmdSYSUI-FNFw -c /root/kafka_2.13-3.3.1/config/kraft/server.jaas.properties
/root/kafka_2.13-3.3.1/bin/kafka-server-start-jaas.sh /root/kafka_2.13-3.3.1/config/kraft/server.jaas.properties

会了单节点的配置,集群的配置只是简单调下参数,不过还是记录一下。

具体的配置参阅1.2即可。

1.2 配置集群SASL/PLAIN认证

准备三台机器

  • 10.0.0.101
  • 10.0.0.102
  • 10.0.0.102

按照单机Kafka的配置步骤进行配置,但是在第三步时,略有调整。

10.0.0.101的示例配置文件如下。每个节点中,需要单独修改三个参数,分别为node.idcontroller.quorum.votersadvertised.listeners

process.roles=broker,controller
node.id=1
controller.quorum.voters=1@10.0.0.101:9093,2@10.0.0.102:9093,3@10.0.0.103:9093
listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
inter.broker.listener.name=SASL_PLAINTEXT
advertised.listeners=SASL_PLAINTEXT://10.0.0.101:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

其中node.id表示节点的编号,controller.quorum.voters表示集群中的所有具有controller角色的节点,其配置格式为{node.id}@{ip}:{port}

在三节点上均执行如下命令

/root/kafka_2.13-3.3.1/bin/kafka-storage.sh format -t T1CYXg2DQPmdSYSUI-FNFw -c /root/kafka_2.13-3.3.1/config/kraft/server.jaas.properties
/root/kafka_2.13-3.3.1/bin/kafka-server-start-jaas.sh /root/kafka_2.13-3.3.1/config/kraft/server.jaas.properties

启动后日志如图。

1.3 client接入授权

1.) 如果kafka未开启身份认证

./kafka-consumer-groups.sh --bootstrap-server 10.0.0.101:9092 --describe --all-groups

2.) 如果kafka开启了身份认证,则需要创建存储身份认证的文件kafka.properties

security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret";

之后再执行命令

./kafka-consumer-groups.sh --bootstrap-server 10.0.0.101:9092 --describe --all-groups --command-config ./kafka.properties

1.4 集群为何是奇数

常见的分布式一致性算法

  1. Paxos
  2. Raft

KRaft全称为Kafka Raft,是基于Raft算法实现的分布式副本管理协议。

Raft的多数派原则,规定了集群的建立无需全员存活,只要存活多数(n个节点,存活节点数>n/2)即可视为集群为正常状态。

这个规定解决了集群中出现的脑裂问题。

比如4台节点组成的集群,其中2台为上海机房,另外2台为北京机房,当上海与北京的网络出现故障,那么一个集群就会分裂为2个集群,这就是脑裂现象。

由于这个规定,导致集群中n台和n+1台节点,他们的容灾能力是一样的(n为奇数),都只能坏一台。使用奇数个节点反而能节省资源。

二、监控

2.1 监控组件比较

常见的Kafka监控如下

  1. CMAK (previously known as Kafka Manager)
    • 优点:监控功能更强大
    • 缺点:a. 重量级 b. 不支持KRaft,参考Kafka 3.3.1 with KRaft Support · Issue #898 · yahoo/CMAK
  2. kafdrop
    • 优点:a. 轻量,开箱即用 b. 支持KRaft
    • 缺点:a. 实时性监控并不好 b. 交互做得并不好,当节点开启身份验证时,会存在严重卡顿情况

2.2 kafdrop使用

首先下载Release 4.0.1 · obsidiandynamics/kafdrop

执行命令,启动监控

java -jar kafdrop-4.0.1.jar --kafka.brokerConnect=10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092

kafdrop本身是一个springboot项目,对于javaer来说是很有好的。其他一些复杂的配置,可以直接下载源码查看application.yml

# 省略以上若干配置。详细内容可自行查看
kafka:
  brokerConnect: localhost:9092
  saslMechanism: "PLAIN"
  securityProtocol: "SASL_PLAINTEXT"
  truststoreFile: "${KAFKA_TRUSTSTORE_FILE:kafka.truststore.jks}"
  propertiesFile : "${KAFKA_PROPERTIES_FILE:kafka.properties}"
  keystoreFile: "${KAFKA_KEYSTORE_FILE:kafka.keystore.jks}"

“${KAFKA_PROPERTIES_FILE:kafka.properties}”

表示获取环境变量KAFKA_PROPERTIES_FILE,如果存在则使用环境变量值,否则使用默认值kafka.properties

比如我开启了授权,那么需要再kafdrop的同级目录下创建kafka.properties,然后再次启动。

security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret";

最终kafdrop监控效果如图。

三、使用

3.1 SpringKafka

3.1.1 使用

创建springboot项目,添加依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

生产者application.yml配置

spring:
  kafka:
#    bootstrap-servers: 10.0.0.101:9092
    bootstrap-servers: 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092
    # 接入授权
    properties:
      security.protocol: SASL_PLAINTEXT
      sasl.mechanism: PLAIN
      sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret";
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

消费者application.yml配置

spring:
  kafka:
#    bootstrap-servers: 10.0.0.10:9092
    bootstrap-servers: 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092
    # 接入授权
    properties:
      security.protocol: SASL_PLAINTEXT
      sasl.mechanism: PLAIN
      sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret";
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: group # 默认的分组名,消费者都会有所属组

源码参考kafka-cluster-demo/spring-kafka-demo at master · meethigher/kafka-cluster-demo

3.1.2 重试机制

关于SpringKafka的重试机制,未进行深入探索,只参考了文章Spring Kafka:Retry Topic、DLT 的使用与原理 - 知乎。

感觉这篇文章讲得很透彻了。按照该文章的说明,验证了以下两种重试策略

  1. 默认重试策略
  2. 自定义重试策略
默认重试策略

快速重试10次,无间隔时间,如果最后还是失败,则自动commit。

@KafkaListener(topics = "test-retry")
public void test(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer) {
    int i=1/0;
}

不需要额外配置,这是默认的重试策略。

自定义重试策略

Spring单独提供了RetryableTopic注解,及重试后的回调注解DltHandler。底层逻辑是新建了topic对这些失败的数据进行存储,以及监听这些新建的topic再进行消费,细节的话还是参考文章Spring Kafka:Retry Topic、DLT 的使用与原理 - 知乎。

@KafkaListener(topics = "test-retry")
@org.springframework.kafka.annotation.RetryableTopic(attempts = "5", backoff = @org.springframework.retry.annotation.Backoff(delay = 5000, maxDelay = 10000, multiplier = 2))
public void test(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer) {
    int i = 1 / 0;
}


@DltHandler
public void listenDlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                      @Header(KafkaHeaders.OFFSET) long offset) {
    log.error("DLT Received: {} from {} @ {}", in, topic, offset);
}

更多的案例可以参考spring-kafka/samples at main · spring-projects/spring-kafka

3.2 KafkaClient

添加依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.3.1</version>
</dependency>

生产者示例

import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class Producer {

    private static Logger log = LoggerFactory.getLogger(Producer.class);

    public static void main(String[] args) throws Exception {
        // 配置生产者属性
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 配置 SASL_PLAINTEXT 认证
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"alice\" password=\"alice-secret\";");

        // 创建生产者实例
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 发送消息到指定主题
        String topic = "meethigher";
        String key = "timestamp";

        while (true) {
            // 发送消息
            String value= String.valueOf(System.currentTimeMillis());
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
            producer.send(record, (metadata, exception) -> log.info("sent key={},value={} to topic={},partition={}", record.key(), record.value(), metadata.topic(), metadata.partition()));
            System.in.read();
        }

        // 关闭生产者
//        producer.close();
    }
}

消费者示例

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class Consumer {

    private static final Logger log = LoggerFactory.getLogger(Consumer.class);

    public static void main(String[] args) {
        // 配置消费者属性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "your_consumer_group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        // 配置 SASL_PLAINTEXT 认证
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"alice\" password=\"alice-secret\";");

        // 创建消费者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        String topic = "meethigher";
        consumer.subscribe(Collections.singletonList(topic));

        // 消费消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            records.forEach(record -> {
                log.info("Consumed record with key {} and value {}", record.key(), record.value());
            });
        }
    }
}

四、参考致谢

Apache Kafka

spring-projects/spring-kafka: Provides Familiar Spring Abstractions for Apache Kafka

Listeners in KAFKA

kafka+Kraft模式集群+安全认证_kafka认证机制_鸢尾の的博客-CSDN博客

我的 Kafka 旅程 - SASL+ACL 认证授权 · 配置 · 创建账号 · 用户授权 · 应用接入 - Sol·wang - 博客园

从k8s集群主节点数量为什么是奇数来聊聊分布式系统 - 知乎

万字长文解析raft算法原理 - 知乎

Spring Kafka:Retry Topic、DLT 的使用与原理 - 知乎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/214123.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

YOLOV7改进:在C5模块不同位置添加SegNext_Attention

1.该文章属于YOLOV5/YOLOV7/YOLOV8改进专栏,包含大量的改进方式,主要以2023年的最新文章和2022年的文章提出改进方式。 2.提供更加详细的改进方法,如将注意力机制添加到网络的不同位置,便于做实验,也可以当做论文的创新点。 3.涨点效果:SegNext_Attention注意力机制,实现…

正是阶段高等数学复习--函数极限的计算

之前在预备阶段中函数极限的解决方式分三步&#xff0c;第一步观察形式并确定用什么方式来解决&#xff0c;第二步化简&#xff0c;化简方式一共有7种&#xff0c;分别是最重要的三种&#xff08;等价替换、拆分极限存在的项、计算非零因子&#xff09;以及次重要的4种&#xf…

DAPP开发【06】nodejs安装与npm路径更换

windows系统在执行用户命令时顺序 windows系统在执行用户命令时&#xff0c;若用户未给出文件的绝对路径&#xff0c; 则 &#xff08;1&#xff09;首先在当前目录下寻找相应的可执行文件、批处理文件等&#xff1b; &#xff08;2&#xff09;若找不到&#xff0c;再依次在系…

数据结构和算法-哈夫曼树以相关代码实现

文章目录 总览带权路径长度哈夫曼树的定义哈夫曼树的构造法1法2 哈夫曼编码英文字母频次总结实验内容&#xff1a; 哈夫曼树一、上机实验的问题和要求&#xff08;需求分析&#xff09;&#xff1a;二、程序设计的基本思想&#xff0c;原理和算法描述&#xff1a;三、调试和运行…

2024美赛数学建模资料---100%获奖资料

很好的教程了 一共二十四章 每一章都是一个模型 并且有matlab编程编码 第一章 线性规划 第二章 整数规划 第三章 非线性规划 第四章 动态规划 第五章 图与网络 第六章 排队论 第七章 对策论 第八章 层次分析法 第九章 插值与拟合 第十章 数据的统计描述和分析 第十一章…

【方案】智慧林业:如何基于EasyCVR视频能力搭建智能林业监控系统

随着人类进程的发展。城市化范围的扩大&#xff0c;森林覆盖率越来越低&#xff0c;为保障地球环境&#xff0c;保护人类生存的净土&#xff0c;森林的保护与监管迫在眉睫。TSINGSEE青犀智慧林业智能视频监控系统方案的设计&#xff0c;旨在利用现代科技手段提高林业管理的效率…

《洛谷深入浅出进阶篇》模意义下的乘法逆元+洛谷P3811

什么是乘法逆元&#xff1f; 算数意义上的乘法逆元指的是倒数&#xff0c;即&#xff1a;a*&#xff08;1/a&#xff09;1 所以 1/a 是 a在算数意义下的乘法逆元&#xff0c;或者可以说二者互为逆元。 这有什么用呢&#xff1f; 除以a就等于乘上a的乘法逆元&#xff0c;乘以…

JS不同运算符下的隐式类型转换

目录 运算符 逻辑运算符&#xff08;&&、||、!&#xff09;和 条件表达式&#xff08;if、三元表达式&#xff09; 逻辑运算符 条件表达式 算数运算符&#xff08;*、/、- %、&#xff09;和 关系运算符&#xff08;>、<、、!&#xff09; 算数运算符 关系…

solidity实现ERC721代币标准发布NFT

文章目录 1、非同质化货币&#xff08;NFT&#xff09;- 维基百科2、IERC1653、IERC7214、IERC721Receiver5、IERC721Metadata6、ERC7217、ERC721 NFT 的实现8、编译部署 1、非同质化货币&#xff08;NFT&#xff09;- 维基百科 非同质化代币&#xff08;英语&#xff1a;Non-F…

时间复杂度为O (nlogn)的排序算法

归并排序 归并排序遵循分治的思想&#xff1a;将原问题分解为几个规模较小但类似于原问题的子问题&#xff0c;递归地求解这些子问题&#xff0c;然后合并这些子问题的解来建立原问题的解&#xff0c;归并排序的步骤如下&#xff1a; 划分&#xff1a;分解待排序的 n 个元素的…

MySQL进阶部分

存储引擎 MySQL体系结构图&#xff1a; 连接层&#xff1a; 最上层是一些客户端连接服务&#xff0c;主要完成一些类似于连接处理 &#xff0c;授权认证及相关的安全方案。服务器也会为安全接入的每个用户端验证它所具有的操作权限。 服务层&#xff1a; 第二层架构主要完成大…

Visual Studio 2022+Python3.11实现C++调用python接口

大家好&#xff01;我是编码小哥&#xff0c;欢迎关注&#xff0c;持续分享更多实用的编程经验和开发技巧&#xff0c;共同进步。 查了一些资料&#xff0c;不是报这个错&#xff0c;就是报哪个错&#xff0c;没有找到和我安装的环境的一致的案例&#xff0c;于是将自己的摸索分…

贪心 55. 跳跃游戏 45.跳跃游戏 II

55. 跳跃游戏 题目&#xff1a; 给定非负数组&#xff0c;初始位置在数组第一格&#xff0c;数组值是可以选择的最大跳跃步数&#xff0c;判断能不能达到数组末尾。 示例 1: * 输入: [2,3,1,1,4] * 输出: true * 解释: 我们可以先跳 1 步&#xff0c;从位置 0 到达 位置 1,…

在Windows 11中,把iPhone照片和视频导出来又快又简单,无需第三方软件

如果你想将照片和视频从iPhone传输到Windows 11 PC&#xff0c;最快、最简单的方法是插入手机并执行自动导入。以下是操作方法。 如何将照片和视频从iPhone导入Windows 如果你用USB数据线将iPhone插入Windows PC&#xff0c;Windows 11可以像标准数码相机一样连接到它&#x…

JavaWeb 带条件的分页查询

最终效果图 注意&#xff1a;没有带条件的时候 默认的是第一页数据 条件是组合的 sql->sql的动态变换 注意第二次查询的时候回显问题 就是填完条件后显示完当页数据ok 但是我点击第二页的时候条件还存在着 此时ListSerevlet不仅要拿到页码 页容量 还要拿到三个条件参数 封装…

IDEA中的Postman!

Postman是大家最常用的API调试工具&#xff0c;那么有没有一种方法可以不用手动写入接口到Postman&#xff0c;即可进行接口调试操作&#xff1f;今天给大家推荐一款IDEA插件&#xff1a;Apipost Helper&#xff0c;写完代码就可以调试接口并一键生成接口文档&#xff01;而且还…

Excel 删除空白行

目录 一. 方式一: 筛选删除二. 方式二: 定位条件三. 方式三: 隐藏非空白行&#xff0c;删除空白行 一. 方式一: 筛选删除 选中空白行对应的列&#xff0c;按下Ctrl Shift L&#xff0c;给列添加过滤条件。过滤出空白行&#xff0c;然后删除即可。 二. 方式二: 定位条件 按下…

Excel 分列功能

一. 需求 ⏹有一段文本&#xff0c;文本一共有7列。这7列文本之间的分隔符不相同 有一个空格的有多个空格的有Tab的jmw_state 和 method 之间用 & 连接 现在要求&#xff0c;将这段文本粘贴到Excel中&#xff0c;进行分列。并且需要将 jmw_state 和 method 也进行分列 也…

使用求2个字符串最短编辑距离动态规划算法实现 git diff 算法 java 实现

测试类 MyDiffTest.java&#xff1a; import java.io.BufferedReader; import java.io.FileReader; import java.util.ArrayList; import java.util.List;public class MyDiffTest {private static String path "\\xxx\\";private static List<String> lines…

HBase整合Phoenix

文章目录 一、简介1、Phoenix定义2、Phoenix架构 二、安装Phoenix1、安装 三、Phoenix操作1、Phoenix 数据映射2、Phoenix Shell操作3、Phoenix JDBC操作3.1 胖客户端3.2 瘦客户端 四、Phoenix二级索引1、为什么需要二级索引2、全局索引&#xff08;global index&#xff09;3、…