查看已经创建的topic
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
创建topic
创建分区和副本数为1的topic
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic acltest --partitions 1 --replication-factor 1
创建kafka用户
创建writer用户
./bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter -add-config 'SCRAM-SHA-256=[iterations=8192,password=pwd],SCRAM-SHA-512=[password=pwd]' --entity-type users --entity-name kafkawriter
创建reader用户
./bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter -add-config 'SCRAM-SHA-256=[iterations=8192,password=pwd],SCRAM-SHA-512=[password=pwd]' --entity-type users --entity-name kafkareader
删除指定用户
./bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name admin
查看指定用户
./bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users --entity-name kafkawriter
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name kafkareader
给topic授权
./bin/kafka-acls.sh --authorizer kafka.security.authorizer.AclAuthorizer --authorizer-properties zookeeper.connect=192.168.5.54:2181 --add --allow-principal User:kafkawriter --operation Write --topic bigData
./bin/kafka-acls.sh --authorizer kafka.security.authorizer.AclAuthorizer --authorizer-properties zookeeper.connect=192.168.5.54:2181 --add --allow-principal User:admin --operation Read --topic bigData
查看topic权限
./bin/kafka-acls.sh --authorizer kafka.security.authorizer.AclAuthorizer --authorizer-properties zookeeper.connect=192.168.5.54:2181 --list --topic bigData
修改server.properties,开启sasl
listeners=SASL_PLAINTEXT://192.168.1.214:9092
advertised.listeners=SASL_PLAINTEXT://192.168.1.214:9092
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false
super.users=User:admin
# 启用SCRAM机制,采用SCRAM-SHA-512算法
sasl.enabled.mechanisms=SCRAM-SHA-256
# 为broker间通讯开启SCRAM机制,采用SCRAM-SHA-512算法
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# broker间通讯使用PLAINTEXT,本例中不演示SSL配置
security.inter.broker.protocol=SASL_PLAINTEXT
重启kafka服务
新建生产者配置
config/producer.conf
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafkawriter" password="pwd";
给生产者授权
./bin/kafka-acls.sh --authorizer kafka.security.authorizer.AclAuthorizer --authorizer-properties zookeeper.connect=192.168.1.214:2181 --add --allow-principal User:kafkawriter --operation Write --topic bigData
生产者发送消息
./bin/kafka-console-producer.sh --broker-list 192.168.1.214:9092 --topic bigData --producer.config ./config/producer.conf
新建消费者配置
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafkareader" password="pwd";
cat config/producer.conf
给消费者授权
./bin/kafka-acls.sh --authorizer kafka.security.authorizer.AclAuthorizer --authorizer-properties zookeeper.connect=192.168.1.214:2181 --add --allow-principal User:kafkareader --operation Read --topic bigData
授权group分组
./bin/kafka-acls.sh --authorizer kafka.security.authorizer.AclAuthorizer --authorizer-properties zookeeper.connect=192.168.1.214:2181 --add --allow-principal User:kafkareader --operation Read --topic bigData --group testgroup
消费者消费消息
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.214:9092 --topic bigData --consumer.config ./config/consumer.conf --from-beginning --group testgroup
以上就是配置sasl的全过程了,kafka使用版本为kafka_2.13-3.7.0,文章中的ip地址经过变动,测试人员需保证ip地址一致