aws msk加密方式和问控制连接方式

msk加密方式

msk提供了两种加密方式

  • 静态加密
  • 传输中加密

创建集群时可以指定加密方式,参数如下

aws kafka create-cluster --cluster-name "ExampleClusterName" --broker-node-group-info file://brokernodegroupinfo.json --encryption-info file://encryptioninfo.json --kafka-version "{YOUR MSK VERSION}" --number-of-broker-nodes 3

// encryptioninfo.json
{
   "EncryptionAtRest": {
       "DataVolumeKMSKeyId": "arn:aws:kms:us-east-1:123456789012:key/abcdabcd-1234-abcd-1234-abcd123e8e8e"
    },
   "EncryptionInTransit": {
        "InCluster": true,
        "ClientBroker": "TLS"
    }
}

查看证书位置

$ pwd
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-1.amzn2.0.1.x86_64/jre/lib/security
$ ls -al ../../../../../../../etc/pki/java/cacerts
lrwxrwxrwx 1 root root 40 May  8 09:44 ../../../../../../../etc/pki/java/cacerts -> /etc/pki/ca-trust/extracted/java/cacerts
$ cp /etc/pki/ca-trust/extracted/java/cacerts /tmp/kafka.client.truststore.jks

测试tls加密,创建client.properties

security.protocol=SSL
ssl.truststore.location=/tmp/kafka.client.truststore.jks

列出端点

$ aws kafka get-bootstrap-brokers --cluster-arn arn:aws-cn:kafka:cn-north-1:037047667284:cluster/mytest/93d5cf51-9e82-4049-a4bc-cefb6bd61716-3
{
    "BootstrapBrokerString": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092",
    "BootstrapBrokerStringTls": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094",
    "BootstrapBrokerStringSaslScram": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096",
    "BootstrapBrokerStringSaslIam": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098"
}

测试tls连接

$ ./kafka-topics.sh --bootstrap-server b-2.test320t.ivec50.c3.kafka.cn-north-1.amazonaws.com.cn:9094,b-1.test320t.ivec50.c3.kafka.cn-north-1.amazonaws.com.cn:9094 --command-config client.properties --list
__amazon_msk_canary
__consumer_offsets
first

# 连接string端点报错
$ ./kafka-topics.sh --bootstrap-server b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092 --command-config client.properties --list

[2023-07-20 12:40:04,944] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.28.80:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue. (org.apache.kafka.clients.NetworkClient)

指定iam的客户端配置,之后连接tls端口会报错

  • 可见这个tls broker连接仅仅是给Unauthenticated用的,并且如果开了iam认证会失败
./kafka-topics.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094 --command-config client.properties --list

[2023-07-20 12:26:31,401] ERROR [AdminClient clientId=adminclient-1] Connection to node -1 (b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.14.174:9094) failed authentication due to: Unexpected handshake request with client mechanism AWS_MSK_IAM, enabled mechanisms are [] (org.apache.kafka.clients.NetworkClient)
[2023-07-20 12:26:31,403] WARN [AdminClient clientId=adminclient-1] Metadata update failed due to authentication error (org.apache.kafka.clients.admin.internals.AdminMetadataManager)

msk访问控制

https://docs.amazonaws.cn/msk/latest/developerguide/kafka_apis_iam.html

kafka客户端配置,https://kafka.apache.org/documentation/#security_configclients

msk可选的访问控制/加密组合如下,访问控制方式决定了能够选择的加密方式

AuthenticationClient-broker encryption optionsBroker-broker encryption
UnauthenticatedTLS, PLAINTEXT, TLS_PLAINTEXTCan be on or off
mTLSTLS, TLS_PLAINTEXTMust be on
SASL/SCRAMTLSMust be on
SASL/IAMTLSMust be on

集群完毕后提供了多种连接终端节点

端口信息,https://docs.amazonaws.cn/en_us/msk/latest/developerguide/port-info.html

在这里插入图片描述

plaintext

采取Unauthenticated方式,客户端使用PLAINTEXT

在这里插入图片描述

查找bootstrap-server端点

在这里插入图片描述

(可选)在bin/client.properties中加入客户端配置

security.protocol=PLAINTEXT

测试连接,不需要特意配置tls连接

./bin/kafka-topics.sh --bootstrap-server b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092 --list

java客户端开启ssl连接

// 开启 tls 连接
properties.put("security.protocol", "SSL");
properties.put("sasl.mechanism", "SCRAM-SHA-512");

// 创建kafka对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

IAM认证

msk对kafka的源码进行了修改,允许使用iam进行认证,访问事件会发送到cloudtrail中。注意事项

  • 不适用于zk节点

  • 开启iam认证后,allow.everyone.if.no.acl.found配置无效

  • 使用iam认证后创建的kafka acl(存储在zk中),对iam认证无效

  • client和broker之间必须启用tls加密

  • 和连接kafka相关的权限以kafka-cluster作为前缀,https://docs.amazonaws.cn/en_us/msk/latest/developerguide/iam-access-control.html

  • 需要使用9098和9198端口

shell连接

需要在客户端配置如下参数

# config/client.properties
# ssl.truststore.location=<PATH_TO_TRUST_STORE_FILE> # if don't specify a value for ssl.truststore.location, the Java process uses the default certificate.
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
awsProfileName="admin";

下载客户端依赖jar到/libs目录下

https://github.com/aws/aws-msk-iam-auth/releases

aws s3 cp s3://zhaojiew/software/aws-msk-iam-auth-1.1.7-all.jar .

测试连接

./bin/kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --list

可能出现以下报错

./bin/kafka-topics.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --list                     Error while executing topic command : Call(callName=listTopics, deadlineMs=1689850718887, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
[2023-07-20 10:57:39,293] ERROR org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1689850718887, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: listTopics
 (kafka.admin.TopicCommand$)
[2023-07-20 10:57:39,316] ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1': (org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402)
        at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
        at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1333)
        at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1264)
        at java.lang.Thread.run(Thread.java:750)

指定client配置后成功连接

  • .aws/config中的profile需要写成[profile prod]
./kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098  --command-config client.properties --list
[2023-07-20 11:21:06,517] WARN The configuration 'awsProfileName' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
__amazon_msk_canary
__consumer_offsets

创建topic

./kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098  --command-config client.properties --topic first --create --partitions 2 --replication-factor 2

发送消息

./kafka-console-producer.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098  --producer.config client.properties --topic first

消费信息

./kafka-console-consumer.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --consumer.config client.properties --from-beginning --topic first

java代码连接

加入依赖

<dependency>
 <groupId>software.amazon.msk</groupId>
 <artifactId>aws-msk-iam-auth</artifactId>
 <version>1.0.0</version>
</dependency>
// 完整配置
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "AWS_MSK_IAM");
properties.put("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;");
properties.put("sasl.client.callback.handler.class",IAMClientCallbackHandler.class.getName());
properties.put("awsProfileName","admin");

相关报错

// 没有导上面包的报错如下
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: No LoginModule found for software.amazon.msk.auth.iam.IAMLoginModule
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
        at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
        at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:448)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:429)
        ... 4 more

// 如果没有找到凭证
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Bootstrap broker b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 (id: -1 rack: null) disconnected
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Failed authentication with b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.28.80 (An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Failed to find AWS IAM Credentials [Caused by com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: [com.amazonaws.auth.DefaultAWSCredentialsProviderChain@7e8d5309: Unable to load AWS credentials from any provider in the chain: [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey), WebIdentityTokenCredentialsProvider: To use assume role profiles the aws-java-sdk-sts module must be on the class path.,

mTLS

目前中国区不可用,需要依赖于Private CA

SASL/SCRAM

https://docs.amazonaws.cn/en_us/msk/latest/developerguide/msk-password.html

使用secret manager保存username和password

在这里插入图片描述

创建secret

  • 名称必须以AmazonMSK_开头

  • 不能使用默认kms加密secret

    在这里插入图片描述

  • 密钥内容必须为以下格式

    {
      "username": "alice",
      "password": "alice-secret"
    }
    

    在这里插入图片描述

shell连接

创建配置文件users_jaas.conf,导出为环境变量

# KafkaClient首字母大写
cat > /tmp/users_jaas.conf << EOF
KafkaClient {
   org.apache.kafka.common.security.scram.ScramLoginModule required
   username="alice"
   password="alice-secret";
};
EOF

export KAFKA_OPTS=-Djava.security.auth.login.config=/tmp/users_jaas.conf

bin目录下创建客户端配置文件

cat > client_sasl.properties << EOF
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.location=/tmp/kafka.client.truststore.jks
EOF

链接集群

./kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096  --command-config client_sasl.properties --list

相关报错

# 密码错误
[2023-07-21 09:40:44,467] ERROR [AdminClient clientId=adminclient-1] Connection to node -1 (b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.23.61:9096) failed authentication due to: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512

java代码连接

java代码连接配置

System.setProperty("java.security.auth.login.config", "/tmp/users_jaas.conf");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "SCRAM-SHA-512"); //仅支持SCRAM-SHA-512

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/643282.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ASP+ACCESS公司门户网站建设

【摘 要】随着计算机科学的发展&#xff0c;数据库技术在Internet中的应用越来越广泛&#xff0c;为广大网络用户提供了更加周到和人性化的服务。本文讲解了一个公司的网站的建设&#xff0c;它基于数据关联规则的公司个性化页面及动态数据生成案例&#xff0c;在网页方面&…

Kubeadm安装部署k8s集群、踩坑日常

背景 ​ Docker是一个非常流行的容器化平台&#xff0c;它可以让我们方便构建、打包、发布和运行容器化应用程序。但是&#xff0c;在生产环境中&#xff0c;我们可能需要处理成百上千个容器&#xff0c;需要更好的管理这些容器&#xff0c;这就是Kubernetes(K8S)的用武之地。…

利用大语言模型增强网络抓取:一种现代化的方法

本文将探讨大语言模型(LLMs)与网络抓取的集成&#xff0c;以及如何利用LLMs高效地将复杂的HTML转换为结构化的JSON。 作为一名数据工程师&#xff0c;我的职业生涯可以追溯到2016年。那时&#xff0c;我的主要职责是利用自动化工具从不同网站上获取海量数据&#xff0c;这个过…

网络模型-策略路由配置

在实际网络应用中&#xff0c;策略路由也是一种重要的技术手段。尽管在考试并不注重策略路由&#xff0c;但是实际上应用较多建议考生除了掌握基本的静态路由协议IP route-static&#xff0c;动态路由协议RIP、还要掌握如何配置策略路由。策略路由的基本原理:根据ACL定义的不同…

云界洞见:移动云服务开启技术创新与问题解决的新篇章

一、什么是移动云 移动云以“央企保障、安全智慧、算网一体、属地服务”为品牌支撑&#xff0c;聚焦智能算力建设&#xff0c;打造一朵智能、智慧、安全可信可控的云&#xff0c;提供更优质的算力服务&#xff0c;引领云计算产业发展。 那么下面博主带领大家了解移动云的优势所…

Golang单元测试

文章目录 传统测试方法基本介绍主要缺点 单元测试基本介绍测试函数基准测试示例函数 传统测试方法 基本介绍 基本介绍 代码测试是软件开发中的一项重要实践&#xff0c;用于验证代码的正确性、可靠性和预期行为。通过代码测试&#xff0c;开发者可以发现和修复潜在的错误、确保…

【vue-cli搭建vue项目的过程2.x】

vue-cli搭建vue项目 vue-cli搭建vue项目安装node安装vue-cli脚手架并创建项目安装 Ant Design Vue或element-ui(笔者使用Ant-design-vue组件&#xff0c;并全局引入)开发安装三方库包1、Package.json文件---引入如下package.json文件执行npm i或npm install命令即可下载如下依赖…

Cloudflare Worker 部署bingai

Cloudflare Worker 部署 1. 注册 Cloudflare 账号 2. 一键部署 登录账户后, 点击下面链接 https://deploy.workers.cloudflare.com/?urlhttps://github.com/Harry-zklcdc/go-proxy-bingai 点击「Authorize Workers」, 登录 Github 账号授权 Cloudflare 点击「I have a ac…

输入输出(3)——C++的标准输入流

目录 一、cin 流 二、成员函数 get 获取一个字符 (一)无参数的get函数。 (二)有一个参数的get函数。 (三&#xff09;有3个参数的get函数 (四&#xff09;用成员函数 getline 函数读取一行字符 (五&#xff09;用成员函数 read 读取一串字符 (六&#xff09;istream 类…

【Basic】Upload-Labs-Linux

文章目录 前言Pass-01Pass-02Pass-03Pass-04Pass-05Pass-06Pass-07Pass-08Pass-09Pass-10Pass-11Pass-12Pass-13Pass-14Pass-15Pass-16解题感悟 前言 美好的一天从刷题开始 Pass-01 我淦20道题&#xff1f;&#xff1f;&#xff1f;一道一道来吧 先看第一道题 先在home里搞一…

蜂窝物联四情监测:助力农业升级,科技赋能打造丰收新篇章!

农业四情指的是田间的虫情、作物的苗情、气候的灾情和土壤墒情。“四情”监测预警系统的组成包括管式土壤墒情监测站、虫情测报灯、气象站、农情监测摄像机&#xff0c;可实时监测基地状况,可以提高监测的效率和准确性&#xff0c;为农业生产提供及时、科学的数据支持&#xff…

碌时刻必备!微信自动回复让你告别消息堆积

在忙碌的时候&#xff0c;我们往往会面临消息堆积如山的情况。无法及时回复消息不仅容易造成交流障碍&#xff0c;还可能错过重要的机会。 但是现在&#xff0c;有一个神奇的工具——个微管理系统&#xff0c;可以帮助我们轻松应对这个问题 &#xff0c;实现微信自动回复。 首…

基于YOLOv8+PySide6的快递分类管理系统

1、背景 随着电子商务的飞速发展&#xff0c;快递行业所承受的数据处理需求愈发庞大。在这样的背景下&#xff0c;传统的手工分类方法已经显得力不从心&#xff0c;因其不仅耗时耗力&#xff0c;还存在着易出错的隐患。因此&#xff0c;迫切需要研发出一套高效而准确的自动化系…

解决问题:Collecting package metadata (current_repodata.json)--faile

目录 解决步骤&#xff1a; 1、创建pip.ini文件&#xff1a;winR对话框中复制输入&#xff1a;%APPDATA%&#xff0c;然后回车。 2、conda添加清华源&#xff1a; 3、这些创建完&#xff0c;重启电脑就可以创建你的虚拟环境了 4、利用镜像源下载库&#xff1a; 5、查看to…

F.费用报销【蓝桥杯】/01背包

费用报销 01背包 思路&#xff1a;f[i][j]表示前i个票据在容量为j的背包中能占的最大值。 #include<iostream> #include<algorithm> using namespace std; int day[13]{0,31,28,31,30,31,30,31,31,30,31,30,31}; int dp[1005][5005]; int s[13]; int last[1005];…

Vue从入门到实战Day07

一、vuex概述 目标&#xff1a;明确vuex是什么&#xff0c;应用场景&#xff0c;优势 1. 是什么&#xff1a; vuex是一个vue的状态管理工具&#xff0c;状态就是数据。 大白话&#xff1a;vuex是一个插件&#xff0c;可以帮助我们管理vue通用的数据&#xff08;多组件共享的…

01.msf

文章目录 永恒之蓝下载msfconsolemsfvenom 永恒之蓝 下载 msdn.itellyou.cn msfconsole M e t a s p l o i t C y b e r M i s s i l e C o m m a n d Metasploit Cyber Missile Command MetasploitCyberMissileCommand 的简称 search ms17_010 use 0 或者 use exploit/wind…

WordPress国外超人气主题Vikinger汉化版

WordPress国外超人气主题Vikinger汉化版 前言效果图安装教程领取主题下期更新预报 前言 我们在上一个教程已经学过如何安装WordPress&#xff0c;所以现在不用多说。 效果图 安装教程 下载后先本地解压&#xff0c;找到vikinger.zip文件&#xff0c;上传安装并启用主题。 访…

什么是SSL证书?如何选择SSL证书?

在浏览网站的时候&#xff0c;你会不会有这样一些疑问。 为什么有的网站是http://开头&#xff0c;有的却是https://&#xff1f;它们有什么区别吗&#xff1f; 经常访问的网站&#xff0c;浏览器突然提示“安全证书过期”&#xff0c;提醒你不要浏览该网址&#xff1f; 这一切…

算法打卡 Day13(栈与队列)-滑动窗口最大值 + 前 K 个高频元素 + 总结

文章目录 Leetcode 239-滑动窗口最大值题目描述解题思路 Leetcode 347-前 K 个高频元素题目描述解题思路 栈与队列总结 Leetcode 239-滑动窗口最大值 题目描述 https://leetcode.cn/problems/sliding-window-maximum/description/ 解题思路 在本题中我们使用自定义的单调队列…