搭建大型分布式服务(四十二)SpringBoot 无代码侵入实现多Kafka数据源整合插件发布

系列文章目录


文章目录

  • 系列文章目录
  • 前言
  • MultiKafkaStarter [V2.2]
    • 一、功能特性
    • 二、快速开始(生产端)
    • 三、快速开始(消费端)
    • 四、其它特性
    • 五、变更记录
    • 六、参考文章


前言

在分布式服务的架构演进中,消息队列作为核心组件之一,承载着解耦、异步、削峰填谷等关键职责。Apache Kafka 作为业界广泛使用的分布式流处理平台,因其高吞吐、低延迟的特性被大量应用在各类大数据场景中。然而,随着业务的复杂度不断提升,如何在 SpringBoot 中高效地整合并管理多个 Kafka 数据源,成为了一个值得探讨的问题。

在过去的一段时间里,我们通过系列文章详细阐述了如何在 SpringBoot 中以零代码或极低的代码侵入方式,实现多 Kafka 数据源的整合。从基础的配置到高级特性如 protobuf 支持、Aware 模式以及亿级消息生产者的优化,我们希望通过这些内容帮助开发者更加便捷地应对复杂的业务场景。

今天,我们将这些内容凝练成一个全新的 SpringBoot 插件——MultiKafkaStarter,旨在进一步降低开发者整合多 Kafka 数据源的门槛,提升系统的可维护性和扩展性。

核心特点

  • 无代码侵入:通过 SpringBoot 的自动配置机制,无需修改业务代码即可实现多 Kafka 数据源的整合。
  • 灵活配置:支持动态配置多个 Kafka 数据源,包括 bootstrap servers、group id、security protocol 等关键参数。
  • 全面特性支持:不仅支持基础的消息消费和生产功能,还提供了对 protobuf 序列化/反序列化的支持,以及对 Aware 模式的适配。
  • 亿级消息处理:针对高并发场景,提供了包括批量发送、分区策略优化等在内的多项性能优化措施,确保系统能够稳定处理亿级消息量。
  • 易用性与可维护性:插件采用模块化的设计思想,易于集成和升级,同时提供了丰富的文档和社区支持

国籍惯例,先上源码:Github源码

MultiKafkaStarter [V2.2]

SpringBoot 零代码方式整合多个kafka数据源,支持任意kafka集群,已封装为一个小模块,集成所有kafka配置,让注意力重新回归业务本身。

一、功能特性

  • SpringBoot无编程方式整合多个kafka数据源
  • 支持批量消费kafka并对单批次消息根据条件去重
  • 支持消费kafka消息类型为pb格式
  • 支持任意数量生产者

1、引入最新依赖包,如果找不到依赖包,请到工程目录mvn clean package install执行一下命令。

<dependency>
    <groupId>io.github.vipjoey</groupId>
    <artifactId>multi-kafka-starter</artifactId>
    <version>2.2</version>
</dependency>

二、快速开始(生产端)

2、添加kafka地址等相关配置。


## json消息生产者
spring.kafka.four.enabled=true
spring.kafka.four.producer.count=1 ## 生产者数量,默认为1个
spring.kafka.four.producer.name=fourKafkaSender  ## 设置bean的名称,方便后续引用。如果没有设置,默认值为xxxKafkaSender
spring.kafka.four.producer.bootstrap-servers=${spring.embedded.kafka.brokers} ## 必须设置
spring.kafka.four.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.four.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

## pb 消息生产者
spring.kafka.five.enabled=true
spring.kafka.five.producer.name=fiveKafkaSender
spring.kafka.five.producer.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.five.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.five.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer

3、根据名称注入生产者MmcKafkaMultiSender,就可以发送kafka消息。


    @Resource(name = "fourKafkaSender")
    private MmcKafkaMultiSender mmcKafkaMultiSender;

    @Resource(name = "fiveKafkaSender")
    private MmcKafkaMultiSender mmcKafkaMultiSender;

    @Resource
    private MmcKafkaOutputContainer mmcKafkaOutputContainer;
    
    // 方式一
    void produceMessage() {

        for (int i = 0; i < 10; i++) {
    
            DemoAwareMsg msg = new DemoAwareMsg();
            msg.setRoutekey("routekey" + i);
            msg.setName("name" + i);
            msg.setTimestamp(System.currentTimeMillis());
        
            String json = JsonUtil.toJsonStr(msg);
        
            mmcKafkaMultiSender.sendStringMessage(topicOne, "aaa", json);
    
    
        }

    }
    
    // 方式二
    void produceMessage() {

            MmcKafkaSender sender = mmcKafkaOutputContainer.getOutputs().get("xxxKafkaSender");

            sender.sendStringMessage(topic, sku.getRoutekey(), message);
    }

三、快速开始(消费端)

2、添加kafka地址等相关配置。

## topic1的kafka配置
spring.kafka.one.enabled=true
spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.one.topic=mmc-topic-one
spring.kafka.one.group-id=group-consumer-one
spring.kafka.one.processor=你的处理类bean名称(例如:oneProcessor)
spring.kafka.one.dupicate=true   ## 如果为true表示对批次内的kafka消息去重,需要实现MmcKafkaMsg接口,默认为false
spring.kafka.one.consumer.auto-offset-reset=latest
spring.kafka.one.consumer.max-poll-records=10
spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

## topic2的kafka配置
spring.kafka.two.enabled=true
spring.kafka.two.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.two.topic=mmc-topic-two
spring.kafka.two.group-id=group-consumer-two
spring.kafka.two.processor=你的处理类bean名称
spring.kafka.two.consumer.auto-offset-reset=latest
spring.kafka.two.consumer.max-poll-records=10
spring.kafka.two.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.two.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

## protobuf类型的消息的kafka配置
spring.kafka.pb.enabled=true
spring.kafka.pb.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.pb.topic=mmc-topic-pb
spring.kafka.pb.group-id=group-consumer-pb
spring.kafka.pb.processor=pbProcessor
spring.kafka.pb.consumer.auto-offset-reset=latest
spring.kafka.pb.consumer.max-poll-records=10
spring.kafka.pb.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.pb.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

3、新建kafka消息对应的实体类,可以选择实现MmcMsgDistinctAware接口,例如

@Data
class DemoMsg implements MmcMsgDistinctAware {

    private String routekey;

    private String name;

    private Long timestamp;

}

如果你配置了spring.kafka.xxx.duplicate=fale,则不需要实现MmcMsgDistinctAware接口。

        PS:如果实现MmcMsgDistinctAware接口,就自动具备了消息去重能力


4、新建kafka消息处理类,要求继承MmcKafkaKafkaAbastrctProcessor,然后就可以愉快地编写你的业务逻辑了。

@Slf4j
@Service("oneProcessor") // 你的处理类bean名称,如果没有定义bean名称,那么默认就是首字母缩写的类名称
public class OneProcessor extends MmcKafkaKafkaAbastrctProcessor<DemoMsg> {
    

    @Override
    protected void dealMessage(List<DemoMsg> datas) {

        // 下面开始编写你的业务代码
    }


}

@Slf4j
@Service("pbProcessor")
public class PbProcessor extends MmcKafkaKafkaAbastrctProcessor<DemoMsg> {

    @Override
    protected Stream<DemoMsg> doParseProtobuf(byte[] record) {


        try {

            DemoPb.PbMsg msg = DemoPb.PbMsg.parseFrom(record);
            DemoMsg demo = new DemoMsg();
            BeanUtils.copyProperties(msg, demo);

            return Stream.of(demo);

        } catch (InvalidProtocolBufferException e) {

            log.error("parssPbError", e);
            return Stream.empty();
        }

    }

    @Override
    protected void dealMessage(List<DemoMsg> datas) {

        System.out.println("PBdatas: " + datas);

    }
}


四、其它特性

1、支持单次拉取kafka的batch消息里去重,需要实现MmcMsgDistinctAware的getRoutekey()和getTimestamp()方法;如果为false,则不要实现MmcMsgDistinctAware接口。

spring.kafka.xxx.duplicate=true

2、支持字符串kafka消息,json是驼峰或者下划线

# 默认为支持驼峰的kafka消息,为ture则支持下划线的消息
spring.kafka.xxx.snakeCase=false

3、支持pb的kafka消息,需要自行重写父类的doParseProtobuf方法;

    @Override
    protected Stream<DemoMsg> doParseProtobuf(byte[] record) {
    
            try {
    
                DemoMsg msg = new DemoMsg();
                DemoPb.PbMsg pb = DemoPb.PbMsg.parseFrom(record);
                BeanUtils.copyProperties(pb, msg);
        
                return Stream.of(msg);
        
                } catch (InvalidProtocolBufferException e) {
        
                log.error("doParseProtobuf error: {}", e.getMessage());
        
                return Stream.empty();
            }

        }

4、支持获取kafka的topic、offset属性,注入到实体类中,需要实现MmcMsgKafkaAware接口

@Data
class DemoAwareMsg implements MmcKafkaAware {
    
    private String routekey;

    private String name;

    private Long timestamp;

    private String topic;

    private long offset;

}


五、变更记录

  • 20240623 v2.2 支持Kafka生产者,并对MultiKafkaConsumerStarter项目重命名为MultiKafkaStarter
  • 20240602 v2.1 支持获取kafka消息中topic、offset属性
  • 20240602 v2.0 支持protobuf、json格式
  • 20240430 v1.1 取消限定符
  • 20231111 v1.0 初始化

六、参考文章

  • 《搭建大型分布式服务(三十六)SpringBoot 零代码方式整合多个kafka数据源》
  • 《搭建大型分布式服务(三十七)SpringBoot 整合多个kafka数据源-取消限定符》
  • 《搭建大型分布式服务(三十八)SpringBoot 整合多个kafka数据源-支持protobuf》
  • 《搭建大型分布式服务(三十九)SpringBoot 整合多个kafka数据源-支持Aware模式》
  • 《搭建大型分布式服务(四十)SpringBoot 整合多个kafka数据源-支持生产者》
  • 《搭建大型分布式服务(四十一)SpringBoot 整合多个kafka数据源-支持亿级消息生产者》
  • 《搭建大型分布式服务(四十二)SpringBoot 无代码侵入实现多Kafka数据源整合插件发布》
  • 《搭建大型分布式服务(四十三)SpringBoot 多Kafka数据源发布到Maven中央仓库:让世界看到你的作品!》

加我加群一起交流学习!更多干货下载、项目源码和大厂内推等着你

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/757175.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

容易涨粉的视频素材有哪些?容易涨粉的爆款短素材库网站分享

如何挑选社交媒体视频素材&#xff1a;顶级视频库推荐 在社交媒体上脱颖而出&#xff0c;视频素材的选择至关重要。以下是一些顶级的视频素材网站推荐&#xff0c;不仅可以提升视频质量&#xff0c;还能帮助你吸引更多粉丝。 蛙学网&#xff1a;创意的源泉 作为创意和独特性的…

携程二面测开—中核

4.12 35min面试经验 自我介绍 在面试的开始&#xff0c;我简洁明了地进行了自我介绍&#xff0c;突出了我的教育背景、技能特长以及实习经历&#xff0c;为后续的面试内容打下了良好的基础。 实习的具体工作内容 在谈及实习经历时&#xff0c;我详细阐述了在实习期间所承担…

NodeJs 使用中间件实现日志生成功能

写在前面 今天我们实现一个记录 nodejs 服务请求日志的功能&#xff0c;大概的功能包括请求拦截&#xff0c;将请求的信息作为日志文件的内容写入到 txt 文件中&#xff0c;然后输出到指定的日志到当天日期目录中&#xff0c;从而实现后续查找用户请求信息的功能&#xff0c;下…

Ubuntu 20.04安装中文输入法出错:gnome-user-docs-zh-hans安装失败

问题&#xff1a;Ubuntu20.04安装中文输入法出错&#xff1a;gnome-user-docs-zh-hans安装失败 现象&#xff1a; 打开language Support页面的时候&#xff0c;提示install依赖的文件 这个过程中会弹窗提示: The following packages have unmet dependencies:gnome-user-doc…

Lombok的使用

IntelliJ 安装 Lombok Lombok 注解大全说明 NonNull&#xff1a;给方法参数增加这个注解&#xff0c;会自动在方法内对该参数进行是否为空的校验&#xff0c;如果为空&#xff0c;则抛出 NPE&#xff08;NullPointerException&#xff09; Getter/Setter&#xff1a;用在属性上…

Python_Socket

Python Socket socket 是通讯中的一种方式&#xff0c;主要用来处理客户端与伺服器端之串连&#xff0c;只需要protocol、IP、Port三项目即可进行网路串连。 Python套件 import socketsocket 常用函式 socket.socket([family], [type] , [proto] ) family: 串接的类型可分为…

Rpc服务的提供方(Rpcprovider)的调用流程

首先&#xff0c;服务的提供方&#xff0c;会通过rpcprovider向rpc服务方注册rpc服务对象和服务方法&#xff0c; 那么&#xff0c;我们通过protobuf提供的抽象层的service和method&#xff0c;将服务对象和它所对应的服务方法记录在map表中&#xff0c; 当它启动以后&#xff…

隐藏Python运行产生的缓存文件(__pycache__)

不少同学使用VScode 提交或运行python代码的时候&#xff0c;出现一些缓存文件 类似于(__pycache__) 这种&#xff0c;对于我这种有一丢丢强迫症的人来说&#xff0c;运行一次就得删除一次&#xff0c;那有没有什么办法将其隐藏的&#xff1f; 在vscode编辑器中打开设置&#…

权限维持-域环境单机版---粘滞键屏保登录

免责声明;本文仅做技术交流与学习,,, 目录 粘滞键: 粘滞键位置&#xff1a; 屏保&登录: 1、WinLogon配合无文件落地上线 结合ps命令: 2、屏幕保护生效后执行后门 粘滞键: Windows维权之粘滞键项维权-腾讯云开发者社区-腾讯云 (tencent.com) 系统自带的辅助功能进行替…

密码学基础之ASN.1编码

简介 ASN.1(Abstract Syntax Notation One)&#xff0c;抽象语法标记。ASN.1是一种国际标准的正式语言&#xff0c;由国际标准化组织&#xff08;ISO&#xff09;和国际电信联盟&#xff08;ITU-T&#xff09;共同制定&#xff0c;用于定义数据结构的抽象语法。它的设计目标是…

鸿蒙开发设备管理:【@ohos.multimodalInput.inputConsumer (组合按键)】

组合按键 InputConsumer模块提供对按键事件的监听。 说明&#xff1a; 本模块首批接口从API version 8开始支持。后续版本的新增接口&#xff0c;采用上角标单独标记接口的起始版本。本模块接口均为系统接口&#xff0c;三方应用不支持调用。 导入模块 import inputConsumer …

EfficientNet-V2论文阅读笔记

目录 EfficientNetV2: Smaller Models and Faster Training摘要Introduction—简介Related work—相关工作EfficientNetV2 Architecture Design—高效EfficientNetV2架构设计Understanding Training Efficiency—了解训练效率Training-Aware NAS and Scaling—训练感知NAS和缩放…

Leetcode3190. 使所有元素都可以被 3 整除的最少操作数

Every day a Leetcode 题目来源&#xff1a;3190. 使所有元素都可以被 3 整除的最少操作数 解法1&#xff1a;遍历 遍历数组&#xff0c;累加最少操作数&#xff0c;即 min(num % 3, 3 - num % 3)。 代码&#xff1a; /** lc appleetcode.cn id3190 langcpp** [3190] 使所…

自媒体常用的高清素材网站有哪些?自媒体必备的素材网站库分享

在自媒体时代&#xff0c;拥有高质量的素材库对创作者来说至关重要。素材的高清晰度、多样性和易用性可以显著提升你的内容吸引力和专业感。今天&#xff0c;我们就来探讨一些对自媒体创作者非常有用的高清素材网站。 蛙学网&#xff1a;自媒体创作者的理想选择 蛙学网为自媒体…

五、Spring IoCDI ★ ✔

5. Spring IoC&DI 1. IoC & DI ⼊⻔1.1 Spring 是什么&#xff1f;★ &#xff08;Spring 是包含了众多⼯具⽅法的 IoC 容器&#xff09;1.1.1 什么是容器&#xff1f;1.1.2 什么是 IoC&#xff1f;★ &#xff08;IoC: Inversion of Control (控制反转)&#xff09;总…

将深度相机的实时三维坐标数据保存为excel文档

一、如何将数据保存为excel文档 1.excel文件库与相关使用 &#xff08;1&#xff09;导入相应的excel文件库&#xff0c;导入前先要进行pip安装&#xff0c;pip install xlwt import xlwt # 导入用于创建和写入Excel文件的库 (2) 建立一个excel文档&#xff0c;并在第0行写…

51单片机STC89C52RC——12.1 数据存储芯片AT24C02

目的/效果 利用存储芯片AT24C02存储数据&#xff0c;LCD1602显示存储的数据。 一&#xff0c;STC单片机模块 二&#xff0c;AT24C02存储芯片 2.1 介绍 AT24C02是一个2K位串行CMOS E2PROM&#xff0c;内部含有256个8位字节&#xff0c;采用先进CMOS技术实质上减少了器件的功…

什么是中断?---STM32篇

目录 一&#xff0c;中断的概念 二&#xff0c;中断的意义 三&#xff0c;中断的优先级 四&#xff0c;中断的嵌套 如果一个高优先级的中断发生&#xff0c;它会立即打断当前正在处理的中断&#xff08;如果其优先级较低&#xff09;&#xff0c;并首先处理这个高优…

【SGX系列教程】(五)Intel-SGX 官方示例分析(SampleCode)——RemoteAttestation

文章目录 一.RemoteAttestation原理介绍1.1 远程认证原理1.2 远程认证步骤1.3 远程认证基本流程1.4 IAS通过以下步骤验证报告的签名1.5 关键术语1.6 总结二.源码分析2.1 README2.1.1 README给出的编译流程2.2 重点代码分析2.2.0 主要代码模块交互流程分析2.2.1 isv_app文件夹2.…

RAG 基本流程及处理技巧 with LangChain

LLM 主要存在两个问题&#xff1a;幻想和缺乏领域知识。领域知识缺乏的原因是因为训练 LLM 本身的知识更新慢&#xff0c;对特定领域的知识也没有太细致的输入。 RAG 主要是解决 LLM 缺乏领域知识的问题。底层的逻辑是&#xff1a;把 LLM 作为逻辑推理引擎&#xff0c;而不是信…