Kafka SASL_SSL双重认证

文章目录

    • 1. 背景
    • 2. 环境
    • 3. 操作步骤
      • 3.1 生成SSL证书
      • 3.2 配置zookeeper认证
      • 3.3 配置kafka安全认证
      • 3.4 使用kafka客户端进行验证
      • 3.5 使用Java端代码进行认证

1. 背景

kafka提供了多种安全认证机制,主要分为SASL和SSL两大类。

  • SASL: 是一种身份验证机制,用于在客户端和服务器之间进行身份验证的过程,其中SASL/PLAIN是基于账号密码的认证方式。
  • SSL: 是一种加密协议,用于在网络通信中提供数据的保密性和完整性。它使用公钥和私钥来建立安全的连接,并对传输的数据进行加密和解密,以防止未经授权的访问和篡改。

在 Kafka 中启用 SASL_SSL 安全协议时,SASL 用于客户端和服务器之间的身份验证,SSL 则用于加密和保护数据的传输。不仅提供身份验证,还提供加密和数据保护的功能。

因工作需要,需要在测试环境搭建一套基于SASL_SSL协议的kafka环境。坑比较多,经过两天的研究终于搞定了,特在此记录下。

2. 环境

  • 操作系统:linux
  • kafka版本:kafka_2.13-2.7.1
  • zookeeper版本:apache-zookeeper-3.7.0
  • 应用程序版本:spring-boot-2.6.7、JDK1.8

3. 操作步骤

  1. 生成SSL证书
  2. 配置zookeeper
  3. 配置kafka
  4. 前三步配置完成后kafka就开启了SASL_SSL双重认证,可以使用kafka自带的客户端进行测试(3.4),
  5. 在业务代码中使用请查看(3.5)

3.1 生成SSL证书

按照步骤一步一步操作,生成服务器/客户端的SSL证书。也就是公钥与私钥

参考:【SSL协议】生成SSL证书 - lihewei - 博客园 (cnblogs.com)

3.2 配置zookeeper认证

第一步: 在apache-zookeeper-3.7.0/conf 目录下创建 kafka_zk_jaas.conf 配置文件(名称任意),定义了两个用户,可提供给生产者和消费者使用,格式为:user_用户名=“用户密码”,内容如下:

Server {
  org.apache.zookeeper.server.auth.DigestLoginModule required
  user_admin="1qaz@WSX"
  user_kafka="1qaz@WSX";
};

第二步: zookeeper配置文件zoo.cfg中新增SASL认证配置,如下:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

第三步: 在apache-zookeeper-3.7.0/bin/zkServer.sh脚本中新增jvm参数,让其启动时加载jaas配置文件

export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS -Djava.security.auth.login.config=/home/crbt/local/apache-zookeeper-3.7.0/conf/kafka_zk_jaas.conf"

3.3 配置kafka安全认证

第一步: /home/crbt/local/kafka_2.13-2.7.1/config目录下创建kafka-server-jaas.conf和kafka-client-jaas.conf配置文件,内容如下:

kafka-server-jaas.conf

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="1qaz@WSX"
  user_admin="1qaz@WSX"
  user_kafka="1qaz@WSX";
};
 
Client {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="kafka"
  password="1qaz@WSX";
};

kafka-client-jaas.conf

KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="kafka"
        password="1qaz@WSX";
};

第二步: 在kafka启动脚本(kafka_2.13-2.7.1/bin/kafka-server-start.sh)配置环境变量,指定jaas.conf文件,增加如下代码:

增加环境变量: -Djava.security.auth.login.config=/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf

...

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf"
fi

...

**第三步:**修改 kafka 的 server.properties配置文件

#listeners=SSL://10.1.61.121:9092
host.name=node1
#listeners=PLAINTEXT://node1:9092,SSL://node1:9093
listeners=SASL_SSL://node1:9093
#advertised.listeners=SSL://node1:9092
advertised.listeners=SASL_SSL://node1:9093
ssl.keystore.location=/home/crbt/lihw/ca/server/server.keystore.jks
ssl.keystore.password=Q06688
ssl.key.password=Q06688
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS 
ssl.truststore.type=JKS 
# kafka2.0.x开始,将ssl.endpoint.identification.algorithm设置为了HTTPS,即:需要验证主机名
# 如果不需要验证主机名,那么可以这么设置 ssl.endpoint.identification.algorithm=即可
ssl.endpoint.identification.algorithm=
# 设置内部访问也用SSL,默认值为security.inter.broker.protocol=PLAINTEXT
security.inter.broker.protocol=SASL_SSL
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true

注意:这里有个坑,生成SSL密钥私钥时指定了主机的hostname,这里也要配置kafka所在服务器的hostname

3.4 使用kafka客户端进行验证

第一步: 修改kafka/config/下的 consumer.properties、producer.properties,配置SASL_SSL验证的基本信息。

consumer.properties:

bootstrap.servers=node1:9093
security.protocol=SASL_SSL
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688

sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";

producer.properties:

bootstrap.servers=node1:9093
security.protocol=SASL_SSL
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688

sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";

第二步: 使用命令行操作时,让其找到上述设置的SASL_SSL配置文件( --producer.config …/config/producer.properties)

#生产
crbt@node1:/home/crbt/local/kafka_2.13-2.7.1/bin>./kafka-console-producer.sh --bootstrap-server node1:9093  --topic first --producer.config ../config/producer.properties
>aaa
>bbb
>ccc
>

#消费
crbt@node1:/home/crbt/local/kafka_2.13-2.7.1/bin>./kafka-console-consumer.sh --bootstrap-server node1:9093 --topic first -consumer.config /home/crbt/local/kafka_2.13-2.7.1/config/consumer.properties
aaa
bbb
ccc

3.5 使用Java端代码进行认证

第一步: yaml 配置文件

spring:
  kafka:
    bootstrap-servers: localhost:9093
    properties:
      sasl:
        mechanism: PLAIN
        jaas:
          #此处填写 SASL登录时分配的用户名密码(注意password结尾;)
          config: org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";
      security:
        protocol: SASL_SSL
    ssl:
      trust-store-location: /home/crbt/lihw/ca/trust/server.truststore.jks
      trust-store-password: Q06688
      key-store-type: JKS
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      batch-size: 106384
      acks: -1
      retries: 3
      properties:
        linger-ms: 1
        retry.backoff.ms: 1000
        buffer-memory: 33554432

第二步: 使用 kafkaTemplate 的方式,配置一个 config

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.producer.acks}")
    private String acks;
    @Value("${spring.kafka.producer.retries}")
    private String retries;
    @Value("${spring.kafka.producer.batch-size}")
    private String batchSize;
    @Value("${spring.kafka.producer.properties.linger-ms}")
    private int lingerMs;
    @Value("${spring.kafka.producer.properties.buffer-memory}")
    private int bufferMemory;
    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;
    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;
    @Value("${spring.kafka.properties.security.protocol}")
    private String protocol;
    @Value("${spring.kafka.properties.sasl.mechanism}")
    private String mechanism;
    @Value("${spring.kafka.ssl.trust-store-location}")
    private String trustStoreLocation;
    @Value("${spring.kafka.ssl.trust-store-password}")
    private String trustStorePassword;
    @Value("${spring.kafka.ssl.key-store-type}")
    private String keyStoreType;
    @Value("${spring.kafka.properties.sasl.jaas.config}")
    private String jaasConfig;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    /**
     * the producer factory config
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.ACKS_CONFIG, acks);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        props.put("security.protocol", protocol);
        props.put(SaslConfigs.SASL_MECHANISM, mechanism);
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation);
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
        props.put(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, keyStoreType);
        props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
        return new DefaultKafkaProducerFactory<String, String>(props);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/373529.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

计算机网络——新型网络架构:SDN/NFV

1. 传统节点与SDN节点 1.1 传统节点(Traditional Node) 这幅图展示了传统网络节点的结构。在这种设置中&#xff0c;控制层和数据层是集成在同一个设备内。 以太网交换机&#xff1a;在传统网络中&#xff0c;交换机包括控制层和数据层&#xff0c;它不仅负责数据包的传输&…

【CSS】margin塌陷和margin合并及其解决方案

【CSS】margin塌陷和margin合并及其解决方案 一、解决margin塌陷的问题二、避免外边距margin重叠&#xff08;margin合并&#xff09; 一、解决margin塌陷的问题 问题&#xff1a;当父元素包裹着一个子元素的时候&#xff0c;当给子元素设置margin-top:100px&#xff0c;此时不…

vue2 自定义指令 v-highlight 文本高亮显示分享

简单分享一个文本高亮显示的自定义指令&#xff0c;主要分两部分&#xff1a; 1、代码实现&#xff1a;在 main.js 文件中添加一个自定义指令&#xff0c;实现搜索时文本高亮显示&#xff0c;代码如下&#xff1a; const highlightText (el, searchText) > {const textCo…

详细关于如何解决mfc140.dll丢失的步骤,有效修复mfc140.dll文件丢失的问题。

mfc140.dll文件是Microsoft Visual Studio 2015程序集之一&#xff0c;它包含用于支持多种功能的代码和库。当这个mfc140.dll文件丢失时&#xff0c;可能会导致相关程序运行出错甚至无法运行。很多用户可能会遇到mfc140.dll丢失的问题&#xff0c;但是这并不是不可解决的困难。…

2024 年十大 Vue.js UI 库

Vue.js 是一个流行的 JavaScript 框架&#xff0c;它在前端开发者中越来越受欢迎&#xff0c;以其简单、灵活和易用性而闻名。 Vue.js 如此受欢迎的原因之一是它拥有庞大的 UI 库生态系统。 这些库为开发人员提供了预构建的组件和工具&#xff0c;帮助他们快速高效地构建漂亮…

PCIE 参考时钟架构

一、PCIe架构组件 首先先看下PCIE架构组件&#xff0c;下图中主要包括&#xff1a; ROOT COMPLEX (RC) (CPU); PCIE PCI/PCI-X Bridge; PCIE SWITCH; PCIE ENDPOINT (EP) (pcie设备); BUFFER; 各个器件的时钟来源都是由100MHz经过Buffer后提供。一个PCIE树上最多可以有256个…

如何在Termux中使用Hexo结合内网穿透工具实现远程访问本地博客站点

文章目录 前言 1.安装 Hexo2.安装cpolar3.远程访问4.固定公网地址 前言 Hexo 是一个用 Nodejs 编写的快速、简洁且高效的博客框架。Hexo 使用 Markdown 解析文章&#xff0c;在几秒内&#xff0c;即可利用靓丽的主题生成静态网页。 下面介绍在Termux中安装个人hexo博客并结合…

考研数据结构笔记(1)

数据结构&#xff08;1&#xff09; 数据结构在学什么&#xff1f;数据结构的基本概念基本概念三要素逻辑结构集合线性结构树形结构图结构 物理结构&#xff08;存储结构&#xff09;顺序存储链式存储索引存储散列存储重点 数据的运算 算法的基本概念什么是算法算法的五个特性有…

LeetCode、198. 打家劫舍【中等,一维线性DP】

文章目录 前言LeetCode、198. 打家劫舍【中等&#xff0c;一维线性DP】题目及分类思路线性DP&#xff08;一维&#xff09; 资料获取 前言 博主介绍&#xff1a;✌目前全网粉丝2W&#xff0c;csdn博客专家、Java领域优质创作者&#xff0c;博客之星、阿里云平台优质作者、专注…

记一次页面接口502问题:“502 Bad Gateway”

接收别人的项目进行迭代&#xff0c;项目部署到服务器上之后&#xff0c;有一个接口数据刷不出来&#xff0c;一直502 后来联想到网关的问题&#xff0c;想通过设置白名单的方式解决&#xff0c;设置之后依旧不行。 查看nginx日志发现报错&#xff1a; *169 connect() failed …

vue - 指令(一)

看文章可以得到什么&#xff1f; 1.可以快速的了解并会使用vue的指令 2.可以加深你对vue指令的理解&#xff0c;知道每个指令代表什么功能​​​​​​​ 目录 什么是vue的指令&#xff1f;​​​​​​​ vue常见指令的使用 v-html v-show v-if v-else 和v-else-…

Redis(三)(实战篇)

查漏补缺 1.spring 事务失效 有时候我们需要在某个 Service 类的某个方法中&#xff0c;调用另外一个事务方法&#xff0c;比如&#xff1a; Service public class UserService {Autowiredprivate UserMapper userMapper;public void add(UserModel userModel) {userMapper.…

QMUI_Android:提升Android开发效率与质量的利器

QMUI_Android&#xff1a;提升Android开发效率与质量的利器 在Android应用开发过程中&#xff0c;开发者常常面临着重复编写基础组件和处理兼容性问题的挑战&#xff0c;这不仅耗费时间&#xff0c;也降低了开发效率。为了解决这一问题&#xff0c;Tencent推出了QMUI_Android框…

Linux-3 进程概念(三)

1.环境变量 1.1基本概念 环境变量(environment variables)一般是指在操作系统中用来指定操作系统运行环境的一些参数 如&#xff1a;我们在编写C/C代码的时候&#xff0c;在链接的时候&#xff0c;从来不知道我们的所链接的动态静态库在哪里&#xff0c;但是照样可以链接成功…

【教学类-46-05】吉祥字门贴5.0(华光彩云_CNKI 文本框 空心字涂色 ,繁简都可以,建议简体)

作品展示 背景需求&#xff1a; 1、制作了空心字的第1款 华光通心圆_CNKI &#xff0c;发现它不能识别某些简体字&#xff0c;但可以识别他们的繁体字&#xff08;繁体为准&#xff09; 【教学类-46-01】吉祥字门贴1.0&#xff08;华光通心圆_CNKI 文本框 空心字涂色&#xf…

怎么把几百M大小的视频做成二维码?扫码播放视频在线教程

怎么把几百M大小的视频做成一个二维码展示呢&#xff1f;通过二维码来作为视频的载体是现在很常用的一种手段&#xff0c;通过这种方式不仅成本比较低&#xff0c;而且传播速度也比较快&#xff0c;通过访问云端数据就可以播放视频。 视频二维码生成的方法一般会通过二维码生成…

蓝桥杯----凑算式

这个算式中A~I代表1~9的数字,不同的字母代表不同的数字。 比如: 68/3952/714 就是一种解法, 53/1972/486 是另一种解法. 这个算式一共有多少种解法? 注意:你提交应该是个整数,不要填写任何多余的内容或说明性文字。

RPA财务机器人之UiPath实战 - 自动化操作Excel进行财务数据汇总与分析之流程建立与数据读取、处理、汇总、分析

一、案例介绍&#xff1a; A公司共有13个开在不同银行的帐户&#xff0c;分别用于不同的业务分部或地区分部收付款。公司总部为了核算每月的收支情况&#xff0c;查看银行在哪个月交易量频繁&#xff0c;需要每月汇总各个银行的帐户借方和贷方金额&#xff0c;并将其净收支&am…

springboot 拦截器

定义 拦截器类似于javaweb中filter 功能 注意: 只能拦截器controller相关的请求 作用 举一个例子&#xff0c;例如我们在Controller中都有一段业务逻辑&#xff0c;这样我们就可以都统一放在拦截器中 因此拦截器的作用就是将controller中共有代码放入到拦截器中执行,减少co…

Leetcode02.05:链表求和

一、题目描述 给定两个用链表表示的整数&#xff0c;每个节点包含一个数位。 这些数位是反向存放的&#xff0c;也就是个位排在链表首部。 编写函数对这两个整数求和&#xff0c;并用链表形式返回结果。 示例&#xff1a; 输入&#xff1a;(7 -> 1 -> 6) (5 -> 9 -…