Springboot 集成kafka 消费者实现ssl方式连接监听消息实现消费

证书准备:springboot集成kafka 消费者实现 如何配置是ssl方式连接的时候需要进行证书的转换。原始的证书是pem, 或者csr方式 和key方式的时候需要转换,因为kafka里面是jks 需要通过openssl进行转换。

证书处理:

  • KeyStore 用于存储客户端的证书和私钥,用于客户端身份验证。
  • TrustStore 用于存储受信任的根证书或证书链,用于验证服务器的身份。

合并一下证书:

cat your_cert.pem your_key.key > test.pem

  1. 合并证书和私钥为一个 PKCS12 文件:
cat your_cert.pem your_key.key > combined.pem
openssl pkcs12 -export -in combined.pem -out client.p12 -name your_alias

2,将 PKCS12 文件导入到 Java KeyStore 中:

keytool -importkeystore -srckeystore client.p12 -srcstoretype PKCS12 -destkeystore client.jks -deststoretype JKS

要生成 truststore.jks 文件,您需要导入服务器的根证书或者服务器的证书链。这样,您的客户端应用程序就可以验证与服务器建立的 SSL 连接。

下面是生成 truststore.jks 的步骤:

  1. 获取服务器的根证书或证书链。您可以使用之前提到的 openssl s_client 命令来获取证书链。openssl s_client -connect 你的连接域名 -showcerts

  2. 将根证书或证书链保存为 .pem 文件。

  3. 使用 keytool 命令将根证书或证书链导入到 truststore.jks 文件中:

    keytool -importcert -file your_root_cert.pem -alias root_alias -keystore truststore.jks

 

项目集成:

maven集成:

  <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.5.5.RELEASE</version>
        </dependency>

nacos配置:

spring:
  kafka:
    bootstrap-servers: SSL://connectedca.com:443  ##换成你自己的连接
    ssl:
      protocol: TLS
###3这三个密码是你证书配置的时候设置的密码
      trust-store-password: a123456
      key-store-password: a123456
      key-password: a123456
    consumer:
      group-id: 
    producer:
      topic: *.event  ##换成你自己的topic

核心配置:


import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;


import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
public class KafkaConfiguration {

    @Autowired
    C3ConfigProperties c3ConfigProperties;
    @Autowired
    private KafkaConfig kafkaProperties;
    @Autowired
    private ResourceLoader resourceLoader;
    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map <String, Object> configs = new HashMap <>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        return new KafkaAdmin(configs);
    }

    @Bean
    public DefaultKafkaConsumerFactory <String, String> consumerFactory() {
        Map <String, Object> consumerConfig = new HashMap <>();
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "newbie-car-owner-data-sync");
        consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "newbie-car-owner-data-sync");

        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        // 设置值的反序列化器为 ErrorHandlingDeserializer2,并配置类型信息
        consumerConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
        consumerConfig.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false); // 启用类型信息头
        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerConfig.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "*.KafkaC3MsgListener"); // 设置默认类型信息
        consumerConfig.put(JsonDeserializer.TRUSTED_PACKAGES, "*.KafkaC3MsgListener"); // 替换为你的实际包名


        String pemUrl = "";
        String csrUrl = "";
        if (c3ConfigProperties.getEnvironment().equals("uat")) {
            pemUrl = "file/uat/kafka/client.jks";
            csrUrl = "file/uat/kafka/truststore.jks";
        } else if (c3ConfigProperties.getEnvironment().equals("pre")) {
            pemUrl = "file/pre/kafka/client.jks";
            csrUrl = "file/pre/kafka/truststore.jks";

        } else if (c3ConfigProperties.getEnvironment().equals("prod")) {
            pemUrl = "file/prod/kafka/client.jks";
            csrUrl = "file/prod/kafka/truststore.jks";
        }

       try {
          
           // 获取证书资源 容器部署一定要用这种方式读取文件,要不然会报错,或者使用挂载
           Resource pemResource = resourceLoader.getResource("classpath:"+pemUrl);
           Resource csrResource = resourceLoader.getResource("classpath:"+csrUrl);
// 获取证书文件的路径
           String keyStorePath = pemResource.getFile().getAbsolutePath();
           String trustStorePath = csrResource.getFile().getAbsolutePath();
           consumerConfig.put("ssl.keystore.location", keyStorePath);
           consumerConfig.put("ssl.truststore.location", trustStorePath);

       }catch (Exception e){
           log.error("Resource file error:{}",e.getMessage());
       }
        consumerConfig.put("security.protocol", "SSL");
   consumerConfig.put("ssl.truststore.password", kafkaProperties.getTrustStorePassword());
        consumerConfig.put("ssl.keystore.password", kafkaProperties.getKeyStorePassword());
        consumerConfig.put("ssl.key.password", kafkaProperties.getKeyPassword());

        return new DefaultKafkaConsumerFactory <>(consumerConfig);
    }


    @Bean
    public ConcurrentKafkaListenerContainerFactory <String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory <String, String> factory = new ConcurrentKafkaListenerContainerFactory <>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3); // 设置并发消费者数量
        factory.setErrorHandler(new SeekToCurrentErrorHandler()); // 错误处理器

        return factory;
    }

    @Bean
    public KafkaC3MsgListener kafkaC3MsgListener() {
        return new KafkaC3MsgListener();
    }


}

注入配置:


import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

@Data
@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.producer.topic}")
    private String topic;

    @Value("${spring.kafka.ssl.trust-store-password}")
    private String trustStorePassword;

    @Value("${spring.kafka.ssl.key-store-password}")
    private String keyStorePassword;

    @Value("${spring.kafka.ssl.key-password}")
    private String keyPassword;

}

能够看到这个配置就成功了表示:

然后在监听处理消息即可

 ————没有与生俱来的天赋,都是后天的努力拼搏(我是小杨,谢谢你的关注和支持)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/446963.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Java多线程实战-实现多线程文件下载,支持断点续传、日志记录等功能

&#x1f3f7;️个人主页&#xff1a;牵着猫散步的鼠鼠 &#x1f3f7;️系列专栏&#xff1a;Java全栈-专栏 &#x1f3f7;️个人学习笔记&#xff0c;若有缺误&#xff0c;欢迎评论区指正 目录 前言 1 基础知识回顾 1.1 线程的创建和启动 1.2 线程池的使用 2.运行环境说…

k8s架构浅析

Node 节点&#xff08;物理主机或虚拟机&#xff09;&#xff0c;它们共同组成一个分布式集群&#xff0c;并且这些节点中会有一个 Master 节点&#xff0c;由它来统一管理 Node 节点。 Pod &#xff0c;在 K8S 中&#xff0c;Pod 是最基本的操作单元&#xff0c;它与 docker …

Linux之selinux详解

华子目录 概念作用selinux与传统的权限区别selinux工作原理名词解释主体&#xff08;subject&#xff09;目标&#xff08;object&#xff09;策略&#xff08;policy&#xff09;&#xff08;多个规则的集合&#xff09;安全上下文&#xff08;security context&#xff09; 文…

三栏布局的实现方法

1. 什么是三栏布局 常见的一种页面布局方式&#xff0c;将页面分为左栏、中栏和右栏左右两侧的盒子宽度固定&#xff0c;中间的盒子会随屏幕自适应一般中间放主体内容&#xff0c;左右两边放辅助内容 2. 如何实现三栏布局 2.1 弹性布局 将最外层盒子设为弹性布局&#xff0…

练习题-14

问题&#xff1a;已知函数 f : R → R f: \mathbb{R} \to \mathbb{R} f:R→R满足 f ( x y ) − f ( x − y ) f ( x ) f ( y ) , ∀ x , y ∈ R . f(xy)-f(x-y)f(x)f(y), \forall x, y \in \mathbb{R}. f(xy)−f(x−y)f(x)f(y),∀x,y∈R. 求 f f f. 提示&#xff1a;如果 f …

基于PBS向超算服务器队列提交任务的脚本模板与常用命令

本文介绍在Linux服务器中&#xff0c;通过PBS&#xff08;Portable Batch System&#xff09;作业管理系统脚本的方式&#xff0c;提交任务到服务器队列&#xff0c;并执行任务的方法。 最近&#xff0c;需要在学校公用的超算中执行代码任务&#xff1b;而和多数超算设备一样&a…

基于美洲狮优化算法(Puma Optimizar Algorithm ,POA)的无人机三维路径规划(提供MATLAB代码)

一、无人机路径规划模型介绍 无人机三维路径规划是指在三维空间中为无人机规划一条合理的飞行路径&#xff0c;使其能够安全、高效地完成任务。路径规划是无人机自主飞行的关键技术之一&#xff0c;它可以通过算法和模型来确定无人机的航迹&#xff0c;以避开障碍物、优化飞行…

第十五届蓝桥杯模拟考试III_物联网设计与开发

编程题 一、基本要求 使用大赛组委会提供的四梯/国信长天物联网省赛套装&#xff08;基于STM32L071KBU微控制器设计&#xff09;&#xff0c;完成本试题的程序设计与调试。程序编写、调试完成后&#xff0c;选手需提交两个LoRa终端对应的hex文件&#xff0c;LoRa终端A对应的文…

【Week Y1】调用官方权重进行检测

YOLO白皮书之调用官方权重进行检测 一、下载yolo-v5s源码&#xff0c;并配置编译环境二、输入本地图片查看检测结果三、输入本地视频查看检测结果 &#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 | 接辅导、项…

C++学习随笔(2)——引用与函数

经过上章对C有了一个初步认识后&#xff0c;本章我们来学习一下C的一些与C语言不同的新玩样引用&#xff0c;还有C的函数规则。 目录 1. 引用 1.1 引用概念 1.2 引用特性 1.3 常引用 1.4 使用场景 &#xff08;1&#xff09; 做参数 &#xff08;2&#xff09; 做返回值…

基于YOLOv8深度学习的路面坑洞检测与分割系统【python源码+Pyqt5界面+数据集+训练代码】深度学习实战、目标分割

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

计算机设计大赛 目标检测-行人车辆检测流量计数

文章目录 前言1\. 目标检测概况1.1 什么是目标检测&#xff1f;1.2 发展阶段 2\. 行人检测2.1 行人检测简介2.2 行人检测技术难点2.3 行人检测实现效果2.4 关键代码-训练过程 最后 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 行人车辆目标检测计数系统 …

服务器-->网站制作-->接口开发,一篇文章一条龙服务(2)

作者&#xff1a;q: 1416279170v: lyj_txd前述&#xff1a;本人非专业&#xff0c;兴趣爱好自学自研&#xff0c;很多没有说清楚的地方见谅&#xff0c;欢迎一起讨论的小伙伴~ 上期回顾&#xff0c;了解 服务器&#xff0c;网站制作&#xff0c;接口开发之见的关系&#xff0c…

【C#语言入门】17. 事件详解(上)

【C#语言入门】17. 事件详解&#xff08;上&#xff09; 一、初步了解事件 定义&#xff1a;单词Event&#xff0c;译为“事件” 通顺的解释就是**“能够发生的什么事情”**&#xff0c;例如&#xff0c;“苹果”不能发生&#xff0c;但是“公司上市”这件事能发生。在C#中事…

Android Gradle 开发与应用 (五) : 基于Gradle 8.2,创建Gradle插件

1. 前言 本文介绍在Android中&#xff0c;如何基于Gradle 8.2&#xff0c;创建Gradle插件。 1.1 本文环境 Android Studio 版本 : Android Studio Hedgehog | 2023.1.1Gralde版本 : gradle 8.2 使用 Android Gradle 插件升级助理 Android Gradle 插件版本说明 1.2 为什么要写…

机器学习(五) -- 监督学习(1) -- 线性回归

系列文章目录 机器学习&#xff08;一&#xff09; -- 概述 机器学习&#xff08;二&#xff09; -- 数据预处理&#xff08;1-3&#xff09; 机器学习&#xff08;三&#xff09; -- 特征工程&#xff08;1-2&#xff09; 机器学习&#xff08;四&#xff09; -- 模型评估…

批量提取PDF指定区域内容到 Excel 以及根据PDF里面第一页的标题来批量重命名-附思路和代码实现

首先说明下&#xff0c;PDF需要是电子版本的&#xff0c;不能是图片或者无法选中的那种。 需求1&#xff1a;假如我有一批数量比较多的同样格式的PDF电子文档&#xff0c;需要把特定多个区域的数字或者文字提取出来 需求2&#xff1a;我有一批PDF文档&#xff0c;但是文件的名…

使用VBA快速梳理多层级族谱(组织架构)

实例需求&#xff1a;族谱&#xff08;或者公司组织架构等&#xff09;都是典型的带有层级关系数据&#xff0c;例如下图中左侧表格所示。 A列为层级&#xff08;准确的讲是B列成员的层级&#xff09;&#xff0c;从一开始递增B列和C列为成员直接的父&#xff08;/母&#xff…

美术馆预约小程序|基于微信小程序的美术馆预约平台设计与实现(源码+数据库+文档)

美术馆预约小程序目录 目录 基于微信小程序的美术馆预约平台设计与实现 一、前言 二、系统设计 三、系统功能设计 1、用户信息管理 2、展品信息管理 3、美术馆信息管理 4、论坛信息管理 四、数据库设计 五、核心代码 七、最新计算机毕设选题推荐 八、源码获取&am…

谷歌BigQuery推出新玩意儿,向量搜索登场啦!

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…