动态订阅kafka mq实现(消费者组动态上下线)

和上篇文章 动态订阅rocket mq实现(消费者组动态上下线) 目的一致,直接上代码

    /**
     * Kafka topic container集合
     */
    private static final Map<String, ConcurrentMessageListenerContainer<String, String>> topics = new HashMap<>();

	public void registerKafkaListeners(BinlogPortDatabaseConfig binlogPortDatabaseConfig) {
	/*
		BinlogPortDatabaseConfig是自定义的数据结构,即需要动态注册的kafka配置
		包含topic、sever、client,自定义即可
	*/
        ConsumerFactory<String, String> consumerFactory = binlogPortDatabaseConfig.createConsumerFactory();
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setBatchListener(true);
        if (consumerFactory == null) {
            return;
        }
        factory.setConsumerFactory(consumerFactory);
        ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(binlogPortDatabaseConfig.getTopic());
        //设置为false,解决client后自动加-0的问题
        container.setAlwaysClientIdSuffix(false);
        container.setupMessageListener((MessageListener<String, String>) record -> {
           //TODO:你的消费逻辑,record即为消息体
                }
            } catch (IllegalArgumentException e) {
                log.error("registerKafkaListeners JSON解析失败", e);
            } catch (NullPointerException e) {
                log.error("registerKafkaListeners 消息为空或部分字段缺失", e);
            } catch (Exception e) {
                log.error("registerKafkaListeners 注册异常", e);
            }
        });
        container.start();
        topics.put(binlogPortDatabaseConfig.getTopic(), container);
    }


    public void factoryDel(String topic) {
        ConcurrentMessageListenerContainer<String, String> container = topics.get(topic);
        if (!topic.isEmpty()) {
            container.stop();
            topics.remove(topic);
        }
    }

    public ConsumerFactory<String, String> createConsumerFactory() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, /*你的kafka server*/);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, /*你的kafka client*/);
        if (SystemEnvUtil.isTest()) {
            props.put(ConsumerConfig.GROUP_ID_CONFIG, Constant.consumerGroupIdOffline + topic);
        }
        if (SystemEnvUtil.isProd() || SystemEnvUtil.isSandbox()) {
            props.put(ConsumerConfig.GROUP_ID_CONFIG,/*你的group id*/);
        }
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(100));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(false));

        Map<String, Object> configMap = new java.util.HashMap<>();
        for (Map.Entry<Object, Object> entry : props.entrySet()) {
            configMap.put((String) entry.getKey(), entry.getValue());
        }
        return new DefaultKafkaConsumerFactory<>(configMap);
    }



本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/975406.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

常用网络工具分析(ping,tcpdump等)

写在前面 本文看下常用网络工具。 1&#xff1a;ping 1.1&#xff1a;用途 用于检验网络的连通性。 1.2&#xff1a;实战 在Linux环境中执行&#xff1a;ping www.sina.com.cn&#xff1a; [rootlocalhost ~]# ping www.sina.com.cn PING spool.grid.sinaedge.com (111.…

windows系统本地部署DeepSeek-R1全流程指南:Ollama+Docker+OpenWebUI

本文将手把手教您使用OllamaDockerOpenWebUI三件套在本地部署DeepSeek-R1大语言模型&#xff0c;实现私有化AI服务搭建。 一、环境准备 1.1 硬件要求 CPU&#xff1a;推荐Intel i7及以上&#xff08;需支持AVX2指令集&#xff09; 内存&#xff1a;最低16GB&#xff0c;推荐…

计算机网络-面试总结

计算机网络 从输入一个URL到页面加载完成的过程 整体流程 DNS查询过程SSL四次握手HTTP 的长连接与短连接 HTTP 的 GET 和 POST 区别浏览器访问资源没有响应&#xff0c;怎么排查? OSI七层参考模型 TCP/IP四层参考模型比较 TCP/IP 参考模型与 OSI 参考模型 TCP三次握手&四…

AI 编程助手 cursor的系统提示词 prompt

# Role 你是一名极其优秀具有10年经验的产品经理和精通java编程语言的架构师。与你交流的用户是不懂代码的初中生&#xff0c;不善于表达产品和代码需求。你的工作对用户来说非常重要&#xff0c;完成后将获得10000美元奖励。 # Goal 你的目标是帮助用户以他容易理解的…

【服务器与本地互传文件】远端服务器的Linux系统 和 本地Windows系统 互传文件

rz 命令&#xff1a;本地上传到远端 rz 命令&#xff1a;用于从本地主机上传文件到远程服务器 rz 是一个用于在 Linux 系统中通过 串口 或 SSH 上传文件的命令&#xff0c;它实际上是 lrzsz 工具包中的一个命令。rz 命令可以调用一个图形化的上传窗口&#xff0c;方便用户从本…

Unity贴图与模型相关知识

一、贴图 1.贴图的类型与形状 贴图类型 贴图形状 2.在Unity中可使用一张普通贴图来生成对应的法线贴图&#xff08;但并不规范&#xff09; 复制一张该贴图将复制后的贴图类型改为Normal Map 3.贴图的sRGB与Alpha sRGB&#xff1a;勾选此选项代表此贴图存储于Gamma空间中…

Python----数据结构(哈希表:哈希表组成,哈希冲突)

一、哈希表 哈希表(Hash table)是一种常用、重要、高效的数据结构。 哈希表通过哈希函数,可以快速地将键(Key)映射到值(Value)。从而允许在近常数时间内对键关联的值进行插入、删除和查找操作。 哈希表的主要思想是通过哈希函数将键转换为索引&#xff0c;将索引映射到数组中…

java方法学习

java 方法 在Java中&#xff0c;方法是类&#xff08;或对象&#xff09;的行为或功能的实现。&#xff08;一起实现一个功能&#xff09;java的方法类似于其他语言的函数&#xff0c;是一段用来完成特定功能的代码片段。 方法是解决一类问题步骤的有序结合。 方法包含于类或…

网络运维学习笔记 015网工初级(HCIA-Datacom与CCNA-EI)NAT网络地址转换

文章目录 NAT(Network Address Translation&#xff0c;网络地址转换)思科&#xff1a;1&#xff09;PAT2&#xff09;静态端口转换 华为&#xff1a;1&#xff09;EasyIP2&#xff09;NAT Server静态NAT&#xff1a;动态NAT&#xff1a;实验1&#xff1a;在R1上配置NAPT让内网…

强化学习的数学原理-六、随机近似与随机梯度下降

代码来自up主【强化学习的数学原理-作业】GridWorld示例代码&#xff08;已更新至DQN、REINFORCE、A2C&#xff09;_哔哩哔哩_bilibili SGD、GD、MGD举例&#xff1a; # 先初始化一个列表&#xff0c;未来要在这100个样本里面再sample出来 np.random.seed(0) X np.linspace(-…

问卷数据分析|SPSS实操之相关分析

皮尔逊还是斯皮尔曼的选取主要看数据的分布 当数据满足正态分布且具有线性关系时&#xff0c;用皮尔逊相关系数 当有一个不满住时&#xff0c;用斯皮尔曼相关系数 1. 选择分析--相关--双变量 2. 将Z1-Y2加入到变量中&#xff0c;选择皮尔逊 3. 此处为结果&#xff0c;可看我案…

jsherp importItemExcel接口存在SQL注入

一、漏洞简介 很多人说管伊佳ERP&#xff08;原名&#xff1a;华夏ERP&#xff0c;英文名&#xff1a;jshERP&#xff09;是目前人气领先的国产ERP系统虽然目前只有进销存财务生产的功能&#xff0c;但后面将会推出ERP的全部功能&#xff0c;有兴趣请帮点一下 二、漏洞影响 …

解决华硕主板的Boot界面无法设置M.2的系统启动盘问题

一、问题描述 当我们的华硕主板电脑开机后&#xff0c;发现电脑无法正常进入Windows系统界面&#xff0c;直接显示PXE网络网络信息&#xff1b;且知道我们进入到BIOS界面也无法找到选择系统盘&#xff0c;界面只显示【UEFI:PXE IP4 Intel(R) Ethernet】、【UEFI:PXE IP6 Intel(…

BuildFarm Worker 简要分析

更多BuildFarm/Bazel/Remote Execution API的文章见我的个人博客&#xff1a; Bazel 报错&#xff1a;/tmp/external/gcc_toolchain_x86_64_files/bin/x86_64-linux-gcc&#xff1a; No such file or directory 记录Bazel 编译 java 代码为独立运行的 jar 包的方法BuildFarm S…

docker修改镜像默认存储路径(基于页面迁移)

文章目录 1、停止服务2、拷贝镜像3、docker界面设置路径4、重新启动服务5、重启电脑 1、停止服务 桌面底部右键打开任务管理器 停止docker服务 2、拷贝镜像 从原目录拷贝到新的目录下&#xff0c;新的目录自己定&#xff0c;如果没有权限&#xff0c;需要先对原文件添加权限…

基于ffmpeg+openGL ES实现的视频编辑工具-opengl相关逻辑(五)

在我们的项目中,OpenGL ES 扮演着至关重要的角色,其主要功能是获取图像数据,经过一系列修饰后将处理结果展示到屏幕上,以此实现各种丰富多样的视觉效果。为了让大家更好地理解后续知识,本文将详细介绍 OpenGL 相关代码。需要注意的是,当前方案将对 OpenGL 的所有操作都集…

机器学习实战(7):聚类算法——发现数据中的隐藏模式

第7集&#xff1a;聚类算法——发现数据中的隐藏模式 在机器学习中&#xff0c;聚类&#xff08;Clustering&#xff09; 是一种无监督学习方法&#xff0c;用于发现数据中的隐藏模式或分组。与分类任务不同&#xff0c;聚类不需要标签&#xff0c;而是根据数据的相似性将其划…

七星棋牌顶级运营产品全开源修复版源码教程:6端支持,200+子游戏玩法,完整搭建指南(含代码解析)

棋牌游戏一直是移动端游戏市场中极具竞争力和受欢迎的品类&#xff0c;而七星棋牌源码修复版无疑是当前行业内不可多得的高质量棋牌项目之一。该项目支持 6大省区版本&#xff08;湖南、湖北、山西、江苏、贵州&#xff09;&#xff0c;拥有 200多种子游戏玩法&#xff0c;同时…

uniapp邪门事件

很久之前在这篇《THREEJS 在 uni-app 中使用&#xff08;微信小程序&#xff09;》&#xff1a;THREEJS 在 uni-app 中使用&#xff08;微信小程序&#xff09;_uni-app_帶刺的小葡萄-华为开发者空间 中学到了如何在uniapp的微信小程序里接入three.js的3d模型 由于小程序自身很…

【OS安装与使用】part6-ubuntu 22.04+CUDA 12.4运行MARL算法(多智能体强化学习)

文章目录 一、待解决问题1.1 问题描述1.2 解决方法 二、方法详述2.1 必要说明2.2 应用步骤2.2.1 下载源码并安装2.2.2 安装缺失的依赖项2.2.3 训练执行MAPPO算法实例 三、疑问四、总结 一、待解决问题 1.1 问题描述 已配置好基础的运行环境&#xff0c;尝试运行MARL算法。 1…