springboot 实现kafka多源配置

文章目录

  • 背景
  • 核心配置
    • 自动化配置类
    • 注册生产者、消费者核心bean到spring
    • 配置spring.factories
    • yml配置
    • 使用
  • 源码仓库

背景

实际开发中,不同的topic可能来自不同的集群,所以就需要配置不同的kafka数据源,基于springboot自动配置的思想,最终通过配置文件的配置,自动生成生产者及消费者的配置。

核心配置

自动化配置类

import com.example.kafka.autoconfig.CustomKafkaDataSourceRegister;
import com.example.kafka.autoconfig.kafkaConsumerConfig;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.config.SmartInstantiationAwareBeanPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.annotation.EnableKafka;

@EnableKafka
@Configuration(
        proxyBeanMethods = false
)
@ConditionalOnWebApplication
@EnableConfigurationProperties({kafkaConsumerConfig.class})
@Import({CustomKafkaDataSourceRegister.class})
public class MyKafkaAutoConfiguration implements BeanFactoryAware, SmartInstantiationAwareBeanPostProcessor {
    public MyKafkaAutoConfiguration() {
    }

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        beanFactory.getBean(CustomKafkaDataSourceRegister.class);
    }
}

注册生产者、消费者核心bean到spring

public void afterPropertiesSet() {
        Map<String, ConsumerConfigWrapper> factories = kafkaConsumerConfig.getFactories();
        if (factories != null && !factories.isEmpty()) {
            factories.forEach((factoryName, consumerConfig) -> {
                KafkaProperties.Listener listener = consumerConfig.getListener();
                Integer concurrency = consumerConfig.getConcurrency();
                // 创建监听容器工厂
                ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = createKafkaListenerContainerFactory(consumerConfig.buildProperties(), listener, concurrency);
                // 注册到容器
                if (!beanFactory.containsBean(factoryName)) {
                    beanFactory.registerSingleton(factoryName, containerFactory);
                }
            });
        }
        Map<String, KafkaProperties.Producer> templates = kafkaProducerConfig.getTemplates();
        if (!ObjectUtils.isEmpty(templates)) {
            templates.forEach((templateName, producerConfig) -> {
                //registerBean(beanFactory, templateName, KafkaTemplate.class, propertyValues);
                //注册spring bean的两种方式
                registerBeanWithConstructor(beanFactory, templateName, KafkaTemplate.class, producerFactoryValues(producerConfig.buildProperties()));
            });
        }
    }

配置spring.factories


org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.example.kafka.MyKafkaAutoConfiguration

yml配置

spring:
  kafka:
    multiple:
      consumer:
        factories:
          test-factory:
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            bootstrap-servers: 192.168.56.112:9092
            group-id: group_a
            concurrency: 25
            fetch-min-size: 1048576
            fetch-max-wait: 3000
            listener:
              type: batch
            properties:
              spring-json-trusted-packages: '*'
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        auto-offset-reset: latest
      producer:
        templates:
          test-template:
            bootstrap-servers: 192.168.56.112:9092
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer

使用

在这里插入图片描述

在这里插入图片描述

源码仓库

https://github.com/fafeidou/shield

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/672084.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

建筑企业有闲置资质怎么办?

如果建筑企业拥有闲置资质&#xff0c;可以考虑以下几种方式来充分利用这些资质&#xff1a; 1. 租赁或转让资质&#xff1a; 将闲置的建筑资质租赁给其他企业或个人使用&#xff0c;或者通过转让的方式将资质出售给有需要的企业或个人。 2. 提供咨询服务&#xff1a; 利用建…

导线防碰撞警示灯:高压线路安全保障

导线防碰撞警示灯&#xff1a;高压线路安全保障 在广袤的大地上&#xff0c;高压线路如同血脉般纵横交错&#xff0c;然而&#xff0c;在这看似平静的电力输送背后&#xff0c;却隐藏着不容忽视的安全隐患。特别是在那些输电线路跨越道路、施工等区域的路段&#xff0c;线下超…

哈夫曼树的构造,哈夫曼树的存在意义--求哈夫曼编码

一:哈夫曼树的构造 ①权值,带权路径长度。 ②一组确定权值的叶子节点可以构造多个不同的二叉树,但是带权路径长度min的是哈夫曼树 ③算法基本思想及其实操图片演示 注:存储结构和伪代码 1 初始化: 构造2n-1棵只有一个根节点的二叉树,parent=rchild=lchild=-1; 其中…

编程环境资源汇总

目录 前言 正文 虚拟机模块 常用软件模块&#xff08;同时包含各别好用的小软件&#xff09; 语言模块 尾声 &#x1f52d; Hi,I’m Pleasure1234&#x1f331; I’m currently learning Vue.js,SpringBoot,Computer Security and so on.&#x1f46f; I’m studying in Univer…

人工智能在消化道肿瘤中的最新研究【24年五月|顶刊速递·05-31】

小罗碎碎念 2024-05-31|医学AI顶刊速递 今天分享的六篇文章,主题是AI+结肠癌。但是,并非所有的文章都是直接与结直肠癌相关,比如第一篇研究的就是肝癌。 我其实想关注的是消化道肿瘤的医学AI研究——消化道由口腔、食管、胃、小肠、大肠和直肠组成,而肝脏虽然不直接参与食…

免费企业域名备案手把手教程

走的阿里云的备案服务&#xff0c;全程免费 前提 主办者&#xff1a;你的企业主办者负责人&#xff1a;当前登录的阿里云账户的人&#xff0c;不是企业法人的话&#xff0c;得准备委托书&#xff0c;会有地方提供模板&#xff0c;打印一下&#xff0c;签字扫描上传就行域名的…

牛客网题目--哈夫曼树

关于哈夫曼编码与哈夫曼树的介绍,可以看这个视频 题目链接 以3,4,5,6为例构造哈夫曼树 import java.util.*;public class Main {public static void main(String[] args) {Scanner in new Scanner(System.in);int n in.nextInt();PriorityQueue<Long> heap new Pr…

Django 创建项目及应用

1&#xff0c;安装 Django pip install Django3.1.5 2&#xff0c;创建 Django项目 django-admin startproject myshop 3&#xff0c;创建 Django应用 python manage.py startapp app1 4&#xff0c;启动 Django项目 python .\manage.py runserver 到这里项目及应用创建…

鸿蒙ArkTS声明式开发:跨平台支持列表【禁用控制】 通用属性

禁用控制 组件是否可交互&#xff0c;可交互状态下响应[点击事件]、[触摸事件]、[拖拽事件]、[按键事件]、[焦点事件]和[鼠标事件]。 说明&#xff1a; 开发前请熟悉鸿蒙开发指导文档&#xff1a; gitee.com/li-shizhen-skin/harmony-os/blob/master/README.md点击或者复制转到…

【人工智能】第二部分:ChatGPT的架构设计和训练过程

人不走空 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌赋&#xff1a;斯是陋室&#xff0c;惟吾德馨 目录 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌…

521源码-网站源码-Thinkphp聊天室H5实时聊天室群聊聊天室自动分配账户完群组/私聊/禁言等功能/全开源运营版本

全开源运营版本聊天室H5实时聊天室群聊聊天室自动分配账户完群组/私聊/禁言等功能 都是去年买的&#xff0c;很多买的源码基本都下架了&#xff0c;详情还是套已经老站的&#xff0c;可能网上已经流传了点&#xff0c;不过还是不影响这个源码的牛逼所在 运营版本的聊天室&…

CV技术指南 | 中科院又一创举 SecViT | 多功能视觉 Backbone 网络,图像分类、目标检测、实例分割和语义分割都性能起飞!

本文来源公众号“CV技术指南”&#xff0c;仅用于学术分享&#xff0c;侵权删&#xff0c;干货满满。 原文链接&#xff1a;中科院又一创举 SecViT | 多功能视觉 Backbone 网络&#xff0c;图像分类、目标检测、实例分割和语义分割都性能起飞&#xff01; 前言 视觉 Transfor…

美颜相机与美图秀秀的非会员图片保存技巧畅享专业级图像处理探索

美颜相机与美图秀秀的非会员图片保存技巧畅享专业级图像处理探索 今日对美颜相机和美图秀秀的深入使用中&#xff0c;我遇到了一些功能限制&#xff0c;特别是在尝试保存特定处理后的图片时&#xff0c;发现通常需要开通VIP会员才能享受完整服务。作为一名热衷于技术探索的爱好…

HCIP的学习(28)

第九章&#xff0c;链路聚合和VRRP 链路聚合 ​ 目的&#xff1a;备份链路以及提高链路带宽。 ​ 链路聚合技术&#xff08;Eth-Trunk&#xff09;&#xff1a;将多个物理接口捆绑成一个逻辑接口&#xff0c;将N条物理链路逻辑上聚合为一条逻辑链路。 正常情况下&#xff0c;…

D课堂 | DDoS、CC,网站被攻击怎么办?

在前面两期《D课堂》中&#xff0c;D妹和大家分享了网站搭建好之后无法访问&#xff0c;应该如何排查&#xff0c;不知道大家学会了吗&#xff1f; 然而&#xff0c;想要维持网站长久健康的运行&#xff0c;我们还需要关注网站的安全问题。 不少站长们苦心经营自己的网站&#…

操作系统真象还原:保护模式入门

第4章-保护模式入门 这是一个网站有所有小节的代码实现&#xff0c;同时也包含了Bochs等文件 Intel8086CPU由于自身设计存在诸多缺点&#xff0c;最致命的有两条&#xff1a;1、仅能寻址1MB内存空间&#xff1b;2、用户程序可以通过自由修改段基址来访问所有内存空间而引出的…

leetCode.89. 格雷编码

leetCode.89. 格雷编码 题目思路 代码 class Solution { public:vector<int> grayCode(int n) {vector<int> res(1,0); // n 0时&#xff0c;之后一位0while (n--) {// 想要实现对象超下来&#xff0c;就从末尾开始&#xff0c;让vector里面 加 元素for (int i …

js切割数组的两种方法slice(),splice()

slice() 返回一个索引和另一个索引之间的数据(不改变原数组),slice(start,end)有两个参数(start必需,end选填),都是索引,返回值不包括end 用法和截取字符串一样 splice() 用来添加或者删除数组的数据,只返回被删除的数据,类型为数组(改变原数组) var heroes["李白&q…

乡村振兴与文化传承:挖掘乡村历史文化资源,传承乡村优秀传统,打造具有地方特色的美丽乡村文化品牌

目录 一、引言 二、乡村历史文化资源的挖掘与保护 &#xff08;一&#xff09;乡村历史文化资源的内涵 &#xff08;二&#xff09;乡村历史文化资源的挖掘 &#xff08;三&#xff09;乡村历史文化资源的保护 三、乡村优秀传统的传承与创新 &#xff08;一&#xff09;…

十_信号14 - system()

意思是 应在在调用 system() 函数前 阻塞 SIGCHLD 信号&#xff0c;否则&#xff0c;子进程结束的时候&#xff0c;系统会向该进程(父)发送 SIGCHLD信号&#xff0c;则该进程认为是自己的一个子进程结束了&#xff0c;于是调用 wait函数获取子进程的终止状态。这本来是正常的操…