SpringBoot项目连接,有Kerberos认证的Kafka

在连接Kerberos认证kafka之前,需要了解Kerberos协议
二、什么是Kerberos协议

Kerberos是一种计算机网络认证协议 ,其设计目标是通过密钥系统为网络中通信的客户机(Client)/服务器(Server)应用程序提供严格的身份验证服务,确保通信双方身份的真实性和安全性。不同于其他网络服务,Kerberos协议中不是所有的客户端向想要访问的网络服务发起请求,他就能建立连接然后进行加密通信,而是在发起服务请求后必须先进行一系列的身份认证,包括客户端和服务端两方的双向认证,只有当通信双方都认证通过对方身份之后,才可以互相建立起连接,进行网络通信。即Kerberos协议的侧重在于认证通信双方的身份,客户端需要确认即将访问的网络服务就是自己所想要访问的服务而不是一个伪造的服务器,而服务端需要确认这个客户端是一个身份真实,安全可靠的客户端,而不是一个想要进行恶意网络攻击的用户。

三、Kerberos协议角色组成
Kerberos协议中存在三个角色,分别是:

客户端(Client):发送请求的一方
服务端(Server):接收请求的一方
密钥分发中心(Key distribution KDC)

一,首先需要准备三个文件
(user.keytab,krb5.conf,jass.conf)

其中user.keytab和krb5.conf是两个认证文件,需要厂商提供,就是你连接谁的kafka,让谁提供

jass.conf文件需要自己在本地创建

jass.conf文件内容如下,具体路径和域名需要换成自己的:

debug: true
 
fusioninsight:
  kafka:
    bootstrap-servers: 10.80.10.3:21007,10.80.10.181:21007,10.80.10.52:21007
    security:
      protocol: SASL_PLAINTEXT
    kerberos:
      domain:
        name: hadoop.798687_97_4a2b_9510_00359f31c5ec.com
    sasl:
      kerberos:
        service:
          name: kafka


其中kerberos.domain.name:hadoop.798687_97_4a2b_9510_00359f31c5ec.com

hadoop.798687_97_4a2b_9510_00359f31c5ec.com需要根据现场提供给你的域名

二、文件准备好后可以将三个配置文件,放在自己项目中,也可以放在服务器的某个目录下,只要确保项目启动后能读取到即可
我的目录结构如下:


pom依赖:
我用的是华为云的Kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>com.example</groupId>
    <artifactId>kafka-sample-01</artifactId>
    <version>2.3.1.RELEASE</version>
    <packaging>jar</packaging>
 
    <name>kafka-sample-01</name>
    <description>Kafka Sample 1</description>
 
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
 
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>
 
    <dependencies>
 
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
 
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.0-hw-ei-302002</version>
        </dependency>
 
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
 
 
 
        <!-- 华为 组件 kafka  start -->
<!--        <dependency>-->
<!--            <groupId>com.huawei</groupId>-->
<!--            <artifactId>kafka-clients</artifactId>-->
<!--            <version>2.4.0</version>-->
<!--            <scope>system</scope>-->
<!--            <systemPath>${project.basedir}/lib/kafka-clients-2.4.0-hw-ei-302002.jar</systemPath>-->
<!--        </dependency>-->
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
 
    <repositories>
 
        <repository>
            <id>huaweicloudsdk</id>
            <url>https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/</url>
            <releases><enabled>true</enabled></releases>
            <snapshots><enabled>true</enabled></snapshots>
        </repository>
 
        <repository>
            <id>central</id>
            <name>Mavn Centreal</name>
            <url>https://repo1.maven.org/maven2/</url>
        </repository>
 
    </repositories>
</project>

然后再SpringBoot项目启动类如下:

package com.example;
 
import com.common.Foo1;
 
 
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.util.backoff.FixedBackOff;
 
import java.io.File;
import java.util.HashMap;
import java.util.Map;
 
/**
 * @author
 */
@SpringBootApplication
public class Application {
 
    private final Logger logger = LoggerFactory.getLogger(Application.class);
 
    @Value("${fusioninsight.kafka.bootstrap-servers}")
    public String boostrapServers;
 
    @Value("${fusioninsight.kafka.security.protocol}")
    public String securityProtocol;
 
    @Value("${fusioninsight.kafka.kerberos.domain.name}")
    public String kerberosDomainName;
 
    @Value("${fusioninsight.kafka.sasl.kerberos.service.name}")
    public String kerberosServiceName;
 
    public static void main(String[] args) {
//        String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main"
//        String filePath = "D:\\Java\\workspace\\20231123MOSPT4eB\\sample-01\\src\\main\\resources\\";
        String filePath = "/home/yxxt/";
        System.setProperty("java.security.auth.login.config", filePath + "jaas.conf");
        System.setProperty("java.security.krb5.conf", filePath + "krb5.conf");
        SpringApplication.run(Application.class, args);
    }
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<String, String> template) {
        System.out.println(boostrapServers);
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template),
            new FixedBackOff(0L, 2))); // dead-letter after 3 tries
        return factory;
    }
 
    @Bean
    public RecordMessageConverter converter() {
        return new StringJsonMessageConverter();
    }
 
    // 指定消费监听,该topic有消息时立刻消费
    @KafkaListener(id = "fooGroup1", topics = "topic_ypgk")
    public void listen(ConsumerRecord<String, String> record) {
        System.out.println("监听到了消息-----");
        logger.info("Received:消息监听成功! " );
        System.out.println("监听到了-----");
        System.out.println(record);
//        if (foo.getFoo().startsWith("fail")) {
//            // 触发83行的 ErrorHandler,将异常数据写入 topic名称+.DLT的新topic中
//            throw new RuntimeException("failed");
//        }
    }
 
    // 创建topic,指定分区数、副本数
//    @Bean
//    public NewTopic topic() {
//        return new NewTopic("topic1", 1, (short) 1);
//    }
 
    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
        configs.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol);
        configs.put("sasl.kerberos.service.name", kerberosServiceName);
        configs.put("kerberos.domain.name", kerberosDomainName);
        return new KafkaAdmin(configs);
    }
 
    @Bean
    public ConsumerFactory<Object, Object> consumerFactory() {
        Map<String, Object> configs = new HashMap<>();
        configs.put("security.protocol", securityProtocol);
        configs.put("kerberos.domain.name", kerberosDomainName);
        configs.put("bootstrap.servers", boostrapServers);
        configs.put("sasl.kerberos.service.name", kerberosServiceName);
        configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return new DefaultKafkaConsumerFactory<>(configs);
    }
 
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        Map<String, Object> configs = new HashMap<>();
        configs.put("security.protocol", securityProtocol);
        configs.put("kerberos.domain.name", kerberosDomainName);
        configs.put("bootstrap.servers", boostrapServers);
        configs.put("sasl.kerberos.service.name", kerberosServiceName);
        configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configs);
        return new KafkaTemplate<>(producerFactory);
    }
}

生产者:通过发送请求进行向主题里发送消息

package com.example;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
 
import com.common.Foo1;
 
/**
 * @author haosuwei
 *
 */
@RestController
public class Controller {
 
    @Autowired
    private KafkaTemplate<String, String> template;
 
    @PostMapping(path = "/send/foo/{what}")
    public void sendFoo(@PathVariable String what) {
        Foo1 foo1 = new Foo1(what);
        this.template.send("topic1", foo1.toString());
    }
 
}

运行成功,就可以监听到主题消息了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/183834.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Binlog 太大导致无法解析怎么办?

由于业务写入了一条大事务&#xff0c;导致 MySQL 的 binlog 膨胀。在解析大的 binlog 时&#xff0c;经常会遇到这个问题&#xff0c;导致无法解析&#xff0c;没有其他工具的情况下&#xff0c;很难分析问题。 作者&#xff1a;孙绪宗&#xff0c;新浪微博 DBA 团队工程师&am…

深度剖析Python错误类型的全面解读

更多Python学习内容&#xff1a;ipengtao.com 在Python编程过程中&#xff0c;经常会遇到各种错误。了解这些错误的类型以及如何处理它们是成为一位优秀的Python开发者所必备的技能之一。本文将介绍Python中常见的错误类型&#xff0c;并通过丰富的示例代码演示如何识别、捕获和…

error: ‘for‘ loop initial declarations are only allowed in C99 or C11 mode

在使用for循环时&#xff0c;在循环内定义变量&#xff0c;出现如下错误 [Error] ‘for’ loop initial declarations are only allowed in C99 or C11 mode [Note] use option -stdc99&#xff0c;-stdgnu99&#xff0c;-stdc11 or-stdgnu11 to compile your code 出现这个错误…

FreeSWITCH学习笔记:EventSocket

Event Socket命令最后需带有两个换行符。 api 执行API命令。阻塞。 1api API [ARG [ ...]] auth 内连模式下身份验证。需要第一个发送。 1auth PASSWORD bgapi 后台执行API命令。不阻塞。 1bgapi API [ARG [ ...]]2[Job-UUID: UUID] 返回带有Job-UUID。当API命令执行完成…

Spring Security(安全框架,必须登录成功才能访问指定资源)

一、背景知识 1、Spring Security 是一个能够为基于Spring的企业应用系统提供声明式的安全访问控制解决方案的安全框架。它提供了一组可以在Spring应用上下文中配置的Bean&#xff0c;充分利用了Spring IoC&#xff0c;DI&#xff08;IOC: 控制反转Inversion of Control ,DI:D…

文心一言 VS 讯飞星火 VS chatgpt (142)-- 算法导论12.1 2题

二、用go语言&#xff0c;二叉搜索树性质与最小堆性质(见 6.1 节)之间有什么不同?能使用最小堆性质在 O(n)时间内按序输出一棵有 n 个结点树的关键字吗?可以的话&#xff0c;请说明如何做&#xff0c;否则解释理由。 文心一言&#xff1a; 在二叉搜索树&#xff08;Binary …

微信小程序完整实现微信支付功能(SpringBoot和小程序)

1.前言 不久前给公司实现支付功能&#xff0c;折腾了一阵子&#xff0c;终于实现了&#xff0c;微信支付对于小白来说真的很困难&#xff0c;特别是没有接触过企业级别开发的大学生更不用说&#xff0c;因此尝试写一篇我如何从小白实现微信小程序支付功能的吧&#xff0c;使用…

rabbitMQ发布确认-交换机不存在或者无法抵达队列的缓存处理

rabbitMQ在发送消息时&#xff0c;会出现交换机不存在&#xff08;交换机名字写错等消息&#xff09;&#xff0c;这种情况如何会退给生产者重新处理&#xff1f;【交换机层】 生产者发送消息时&#xff0c;消息未送达到指定的队列&#xff0c;如何消息回退&#xff1f; 核心&…

便携式工业RFID读写器怎么选?

便携式工业RFID读写器在物流、零售、制造等行业都有着极为广泛的应用。企业利用RFID手持终端设备&#xff0c;可以将采集到的物品信息自动传输到中央信息系统&#xff0c;实现数据的实时交换和共享。目前市面上RFID手持终端品牌、型号众多&#xff0c;ANDEAWELL作为国内物联网产…

数十亿美元商机!英国数字基础设施公司Equinix与法国量子计算公司Alice Bob 合作

​&#xff08;图片来源&#xff1a;网络&#xff09; 近日&#xff0c;全球数字基础设施公司Equinix宣布与全球领先的法国量子计算公司Alice & Bob合作&#xff0c;旨在共同开发市场上最为可靠的量子处理器之一。此次合作将使Equinix公司的客户通过使用Equinix Metal和Eq…

2023软件应用类下载系统平台源码/手机软件应用、新闻资讯下载站/软件库网站源码

源码简介&#xff1a; 这个是最新软件应用类平台源码、手机应用下载系统源码、软件应用市场下载站源码、新闻资讯软件下载。2023软件应用类平台源码/手机软件应用、新闻资讯下载站&#xff0c;它是软件库网站源码。 最新软件应用类平台源码 手机应用下载系统源码 软件应用市场…

Vue基础入门(二):Vue3的创建与分析

Vue3的创建 ​ vue3 是基于 es6 的一些新特性的支持而从 vue2 升级上来的版本&#xff0c;但是 vue3 是兼容 vue2 的。 一、Vue的使用 1.1 通过CDN使用Vue ​ 你可以借助 script 标签直接通过 CDN 来使用 Vue&#xff1a; <script src"https://unpkg.com/vue3/dist…

用友BIP与用友BIP对接集成销售出库列表查询连通销售出库单个保存((红字)销售出库审核-v)

用友BIP与用友BIP对接集成销售出库列表查询连通销售出库单个保存(&#xff08;红字&#xff09;销售出库审核-v) 源系统:用友BIP 面向数智化市场&#xff0c;用友倾力打造了全球领先的数智商业创新平台——用友BIP&#xff0c;定位为数智商业的应用级基础设施、企业服务产业的共…

2023-11-23 LeetCode每日一题(HTML 实体解析器)

2023-11-23每日一题 一、题目编号 1410. HTML 实体解析器二、题目链接 点击跳转到题目位置 三、题目描述 「HTML 实体解析器」 是一种特殊的解析器&#xff0c;它将 HTML 代码作为输入&#xff0c;并用字符本身替换掉所有这些特殊的字符实体。 HTML 里这些特殊字符和它们…

新苹果手机如何导入旧手机数据?解决方案来了,记得收藏!

为了保持其竞争优势&#xff0c;苹果公司不断推出新的产品和服务&#xff0c;因此苹果手机的更新换代速度是比较快的。正巧最近刚出了iPhone15&#xff0c;相信很多小伙伴已经换上了期待已久的新手机。 更换新手机后&#xff0c;大家都会面临一个问题&#xff1a;新苹果手机如…

深入了解接口测试:方法、工具和关键考虑因素(一)

接口测试是软件测试中的一项重要工作&#xff0c;它涉及到系统与系统之间的交互点。接口可以是外部接口&#xff0c;也可以是内部接口&#xff0c;包括上层服务与下层服务接口以及同级接口。在接口测试中&#xff0c;我们需要确保接口能够按照预期的方式进行通信和交互&#xf…

BMS实战: BMS产品介绍,电池外观分析,电芯种类分析,焊接方式分析,充电方式,电压平台,电芯型号分析。

快速入门的办法就是了解产品,了解现在市面上正在流通的成熟产品方案。光看基础知识是没有效果的。 首先我们找到了一张市面上正在出售的电池pack包。 图片来源网上,侵权删 电池外观分析 外壳: 一般是金属外壳,大部分都是铁壳加喷漆,特殊材质可以定制。 提手 一般是…

内衣专用洗衣机怎么样?口碑最好的小型洗衣机

随着人们的生活水平的提升&#xff0c;越来越多小伙伴来开始追求更高的生活水平&#xff0c;一些智能化的小家电就被发明出来&#xff0c;而且内衣洗衣机是其中一个。现在通过内衣裤感染到细菌真的是越来越多&#xff0c;所以我们对内衣裤的清洗频次会高于普通衣服&#xff0c;…

案例018:基于微信小程序的实习记录系统

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…

iview table 默认排序字段不高亮解决办法

iview treeSelect 组件封装 1、表格增加排序时触发的方法2、定义三个变量&#xff0c;sortColumnDefaultStyle存放默认的样式&#xff0c;定义页面默认的列以及顺序3、显示的列加上 sortable, 和样式4、使用下面这块代表默认选中5、点击时清除掉默认的排序6、把排序的字段查询时…