搭建大型分布式服务(四十一)SpringBoot 整合多个kafka数据源-支持亿级消息生产者

系列文章目录


文章目录

  • 系列文章目录
  • 前言
      • 一、本文要点
      • 二、开发环境
      • 三、原项目
      • 四、修改项目
      • 五、测试一下
      • 五、小结


前言

本插件稳定运行上百个kafka项目,每天处理上亿级的数据的精简小插件,快速上手。

<dependency>
    <groupId>io.github.vipjoey</groupId>
    <artifactId>multi-kafka-starter</artifactId>
    <version>最新版本号</version>
</dependency>

例如下面这样简单的配置就完成SpringBoot和kafka的整合,我们只需要关心com.mmc.multi.kafka.starter.OneProcessorcom.mmc.multi.kafka.starter.TwoProcessor 这两个Service的代码开发。

## topic1的kafka配置
spring.kafka.one.enabled=true
spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.one.topic=mmc-topic-one
spring.kafka.one.group-id=group-consumer-one
spring.kafka.one.processor=com.mmc.multi.kafka.starter.OneProcessor // 业务处理类名称
spring.kafka.one.consumer.auto-offset-reset=latest
spring.kafka.one.consumer.max-poll-records=10
spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

## topic2的kafka配置
spring.kafka.two.enabled=true
spring.kafka.two.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.two.topic=mmc-topic-two
spring.kafka.two.group-id=group-consumer-two
spring.kafka.two.processor=com.mmc.multi.kafka.starter.TwoProcessor // 业务处理类名称
spring.kafka.two.consumer.auto-offset-reset=latest
spring.kafka.two.consumer.max-poll-records=10
spring.kafka.two.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.two.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

## pb 消息消费者
spring.kafka.pb.enabled=true
spring.kafka.pb.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.pb.topic=mmc-topic-pb
spring.kafka.pb.group-id=group-consumer-pb
spring.kafka.pb.processor=pbProcessor
spring.kafka.pb.consumer.auto-offset-reset=latest
spring.kafka.pb.consumer.max-poll-records=10
spring.kafka.pb.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.pb.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

## kafka消息生产者
spring.kafka.four.enabled=true
spring.kafka.four.producer.name=fourKafkaSender
spring.kafka.four.producer.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.four.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.four.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer


国籍惯例,先上源码:Github源码

一、本文要点

本文将介绍通过封装一个starter,来实现多kafka数据源的配置,通过通过源码,可以学习以下特性。系列文章完整目录

  • SpringBoot 整合多个kafka数据源
  • SpringBoot 批量消费kafka消息
  • SpringBoot 优雅地启动或停止消费kafka
  • SpringBoot kafka本地单元测试(免集群)
  • SpringBoot 利用map注入多份配置
  • SpringBoot BeanPostProcessor 后置处理器使用方式
  • SpringBoot 将自定义类注册到IOC容器
  • SpringBoot 注入bean到自定义类成员变量
  • Springboot 取消限定符
  • SpringBoot 支持消费protobuf类型的kafka消息
  • SpringBoot Aware设计模式
  • SpringBoot 获取kafka消息中的topic、offset、partition、header等参数
  • SpringBoot 使用任意生产者发送kafka消息
  • SpringBoot 配置任意数量的kafka生产者

二、开发环境

  • jdk 1.8
  • maven 3.6.2
  • springboot 2.4.3
  • kafka-client 2.6.6
  • idea 2020

三、原项目

1、接前文,我们整合生产者到这个组件,那么假如我们生产的消息过亿级别,一个生产者不足以支持这么大的buffer发送量级,我们该如何操作?是否支持配置多个数量的生产者?


## 1.配置
spring.kafka.four.enabled=true
spring.kafka.four.producer.count=1 ## 生产者数量,默认为1个
spring.kafka.four.producer.name=fourKafkaSender
spring.kafka.four.producer.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.four.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.four.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

## 2.引用
@Resource(name = "fourKafkaSender")
private MmcKafkaSender mmcKafkaMultiSender;

## 3.使用
mmcKafkaMultiSender.sendStringMessage(topicOne, "aaa", json);

答案是可以的、但我们要升级和优化一下。

四、修改项目

1、新增MmcKafkaMultiSender类,用于支持任意数量的发送者,利用负载均衡发送消息。

public class MmcKafkaMultiSender implements MmcKafkaSender {

    private final AtomicLong atomicLong = new AtomicLong(1);

    private final List<KafkaTemplate<String, Object>> templates;

    public MmcKafkaMultiSender(List<KafkaTemplate<String, Object>> templates) {
        this.templates = templates;
    }

    @Override
    public void sendStringMessage(String topic, String partitionKey, String message) {

        KafkaTemplate<String, Object> template = templates.get((int) (atomicLong.getAndIncrement() % templates.size()));
        template.send(topic, partitionKey, message);
    }


    @Override
    public void sendProtobufMessage(String topic, String partitionKey, byte[] message) {

        KafkaTemplate<String, Object> template = templates.get((int) (atomicLong.getAndIncrement() % templates.size()));
        template.send(topic, partitionKey, message);

    }
}

2、修改MmcMultiProducerAutoConfiguration配置类,配置多个数量的Sender;

 @Bean
    public MmcKafkaOutputContainer mmcKafkaOutputContainer() {

        // 初始化一个存储所有生产者的哈希映射
        Map<String, MmcKafkaSender> outputs = new HashMap<>();

        // 获取所有的Kafka配置信息
        Map<String, MmcMultiKafkaProperties.MmcKafkaProperties> kafkas = mmcMultiKafkaProperties.getKafka();

        // 逐个遍历,并生成producer
        for (Map.Entry<String, MmcMultiKafkaProperties.MmcKafkaProperties> entry : kafkas.entrySet()) {

            // 唯一生产者名称
            String name = entry.getKey();

            // 生产者配置
            MmcMultiKafkaProperties.MmcKafkaProperties properties = entry.getValue();

            // 是否开启
            if (properties.isEnabled()
                    && properties.getProducer().isEnabled()
                    && CommonUtil.isNotEmpty(properties.getProducer().getBootstrapServers())) {

                // bean名称
                String beanName = Optional.ofNullable(properties.getProducer().getName())
                        .orElse(name + "KafkaSender");


                // 数量
                List<KafkaTemplate<String, Object>> templates = new ArrayList<>(properties.getProducer().getCount());
                for (int i = 0; i < properties.getProducer().getCount(); i++) {

                    log.info("[pando] init producer {} - {} ", name, i);
                    KafkaTemplate<String, Object> template = mmcKafkaTemplate(properties);
                    templates.add(template);
                }

                // 创建实例
                MmcKafkaSender sender = new MmcKafkaMultiSender(templates);
                outputs.put(beanName, sender);

                // 注册到IOC
                beanDefinitionRegistry.registerSingleton(beanName, sender);
            }

        }

        return new MmcKafkaOutputContainer(outputs);
    }

五、测试一下

1、引入kafka测试需要的jar。参考文章:kafka单元测试

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.18.0</version>
            <scope>test</scope>
        </dependency>
        
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java-util</artifactId>
            <version>3.18.0</version>
            <scope>test</scope>
        </dependency>

2、消费者配置保持不变,增加生产者配置。

## json消息消费者
spring.kafka.one.enabled=true
spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.one.topic=mmc-topic-one
spring.kafka.one.group-id=group-consumer-one
spring.kafka.one.processor=oneProcessor
spring.kafka.one.duplicate=false
spring.kafka.one.snakeCase=false
spring.kafka.one.consumer.auto-offset-reset=latest
spring.kafka.one.consumer.max-poll-records=10
spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.container.threshold=2
spring.kafka.one.container.rate=1000
spring.kafka.one.container.parallelism=8

## json消息生产者
spring.kafka.four.enabled=true
spring.kafka.four.producer.name=fourKafkaSender
spring.kafka.four.producer.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.four.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.four.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

3、编写测试类。

@Slf4j
@ActiveProfiles("dev")
@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = {MmcMultiProducerAutoConfiguration.class, MmcMultiConsumerAutoConfiguration.class,
        DemoService.class, OneProcessor.class})
@TestPropertySource(value = "classpath:application-string.properties")
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"},
        topics = {"${spring.kafka.one.topic}"})
class KafkaStringMessageTest {


    @Value("${spring.kafka.one.topic}")
    private String topicOne;

    @Value("${spring.kafka.two.topic}")
    private String topicTwo;

    @Resource(name = "fourKafkaSender")
    private MmcKafkaSender mmcKafkaSender;


    @Test
    void testDealMessage() throws Exception {

        Thread.sleep(2 * 1000);

        // 模拟生产数据
        produceMessage();

        Thread.sleep(10 * 1000);
    }

    void produceMessage() {


        for (int i = 0; i < 10; i++) {

            DemoMsg msg = new DemoMsg();
            msg.setRoutekey("routekey" + i);
            msg.setName("name" + i);
            msg.setTimestamp(System.currentTimeMillis());

            String json = JsonUtil.toJsonStr(msg);

            mmcKafkaSender.sendStringMessage(topicOne, "aaa", json);


        }
    }
}



5、运行一下,测试通过,可以看到能正常发送消息和消费。
在这里插入图片描述

五、小结

将本项目代码构建成starter,就可以大大提升我们开发效率,我们只需要关心业务代码的开发,github项目源码:轻触这里。如果对你有用可以打个星星哦。下一篇,升级本starter,在kafka单分区下实现十万级消费处理速度。

《搭建大型分布式服务(三十六)SpringBoot 零代码方式整合多个kafka数据源》
《搭建大型分布式服务(三十七)SpringBoot 整合多个kafka数据源-取消限定符》
《搭建大型分布式服务(三十八)SpringBoot 整合多个kafka数据源-支持protobuf》
《搭建大型分布式服务(三十九)SpringBoot 整合多个kafka数据源-支持Aware模式》
搭建大型分布式服务(四十)SpringBoot 整合多个kafka数据源-支持生产者
搭建大型分布式服务(四十一)SpringBoot 整合多个kafka数据源-支持亿级消息生产者

加我加群一起交流学习!更多干货下载、项目源码和大厂内推等着你

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/735794.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

PD虚拟机和VMware有什么区别?PD虚拟机和VMware谁更好用?

随着电脑硬件设备的飞快发展&#xff0c;一些高端的技术已经不再遥不可及&#xff0c;比如虚拟化&#xff0c;虚拟机技术已经成为IT领域和个人用户不可或缺的工具。特别是PD虚拟机&#xff08;Parallels Desktop&#xff09;和VMware&#xff0c;作为市场上两个主流的虚拟机软件…

智能优化算法改进策略之局部搜索算子(四)--梯度搜索法

2、仿真实验 以海洋捕食者算法&#xff08;MPA&#xff09;为基本算法。考察基于梯度搜索的改进海洋捕食者算法&#xff08;命名为GBSMPA&#xff09; vs. 海洋捕食者算法&#xff08;MPA&#xff09; 在Sphere函数上的比较 在Penalized1函数上的比较 在CEC2017-1上的比较 在C…

本地离线模型搭建指南-本地运行显卡选择

搭建一个本地中文大语言模型&#xff08;LLM&#xff09;涉及多个关键步骤&#xff0c;从选择模型底座&#xff0c;到运行机器和框架&#xff0c;再到具体的架构实现和训练方式。以下是一个详细的指南&#xff0c;帮助你从零开始构建和运行一个中文大语言模型。 本地离线模型搭…

WordPress视频主题Qinmei 2.0

WordPress视频主题Qinmei 2.0&#xff0c;简单漂亮的WP视频站源码 主题功能 可以根据豆瓣ID直接获取到其他详细信息&#xff0c;省去慢慢填写的痛苦&#xff1b;播放器支持直链&#xff0c;解析&#xff0c;m3u8格式&#xff0c;同时解析可匹配正则自动更改&#xff1b;新增动…

独立看门狗窗口开门狗

独立看门狗 接线图&#xff1a;按键用于阻塞喂狗。独立看门狗&窗口开门狗接线一样。 第一步&#xff0c;是开启时钟了&#xff0c;只有这个LSI时钟开启了独立看门狗才能运行&#xff0c;所以初始化独立看门狗之前&#xff0c;LSI必须得开启&#xff0c;但是这个开启LSI的…

【机器学习300问】128、简述什么Word2Vec?

一、一句话说明Word2Vec是什么&#xff1f; Word2Vec是一种常见的词嵌入技术。Word2Vec的目标是将每个词表示为一个向量&#xff0c;使得这些向量能够反映出词语之间的相似性和关联性。 word2vec算法通过预测中心词和上下文词的共现概率来学习词向量&#xff0c;能够捕捉词语之…

WordPress软件下载主题Inpandora

Inpandora&#xff08;中文名为潘多拉&#xff09;是一款基于软件下载站定制的WordPress主题&#xff0c;帮助站长使用WordPress快速搭建一个专业的WordPress软件博客。Inpandora这款WordPress主题可以说是因软件而生&#xff0c;从UI设计到后台设置功能&#xff0c;都充分体现…

高德地图轨迹回放/轨迹播放

前言 本篇文章主要介绍高德地图的轨迹回放或播放的实现过程&#xff0c;是基于vue2实现的功能&#xff0c;同时做一些改动也是能够适配vue3的。其中播放条是用的是element UI中的el-slider组件&#xff0c;包括使用到的图标也是element UI自带的。可以实现轨迹的播放、暂停、停…

导入别人的net文件报红问题sdk

1. 使用cmd命令 dotnet --info 查看自己使用的SDK版本 2.直接找到项目中的 global.json 文件&#xff0c;右键打开&#xff0c;直接修改版本为本机的SDK版本&#xff0c;就可以用了

【STM32】STM32通过I2C实现温湿度采集与显示

目录 一、I2C总线通信协议 1.I2C通信特征 2.I2C总线协议 3.软件I2C和硬件I2C 二、stm32通过I2C实现温湿度&#xff08;AHT20&#xff09;采集 1.stm32cube配置 RCC配置&#xff1a; SYS配置&#xff1a; I2C1配置&#xff1a; USART1配置&#xff1a; GPIO配置&#…

智慧校园综合管理系统的优点有哪些

在当今这个信息化飞速发展的时代&#xff0c;智慧校园综合管理系统正逐步成为教育领域的一股革新力量&#xff0c;它悄然改变着我们对传统校园管理的认知。这套系统如同一个无形的桥梁&#xff0c;将先进的信息技术与学校的日常运作紧密相连&#xff0c;展现出多维度的优势。 …

网络技术原理需要解决的5个问题

解决世界上任意两台设备时如何通讯的&#xff1f;&#xff1f; 第一个问题&#xff0c;pc1和pc3是怎么通讯的&#xff1f; 这俩属于同一个网段&#xff0c;那么同网段的是怎么通讯的&#xff1f; pc1和pc2属于不同的网段&#xff0c;第二个问题&#xff0c;不同网段的设备是…

国企:2024年6月中国移动相关招聘信息 二

在线营销服务中心-中国移动通信有限公司在线营销服务中心 硬件工程师 工作地点:河南省-郑州市 发布时间 :2024-06-18 截至时间: 2024-06-30 学历要求:本科及以上 招聘人数:1人 工作经验:3年 岗位描述 1.负责公司拾音器等音视频智能硬件产品全过程管理,包括但…

【Java】Java基础语法

一、注释详解 1.1 注释的语法&#xff1a; // 单行注释/*多行注释 *//**文档注释 */ 1.2 注释的特点&#xff1a; 注释不影响程序的执行&#xff0c;在Javac命令进行编译后会将注释去掉 1.3 注释的快捷键 二、字面量详解 2.1 字面量的概念&#xff1a; 计算机是用来处理…

干货 | 2024生成式AI产业落地路径研究报告(免费下载)

【1】关注本公众号&#xff0c;转发当前文章到微信朋友圈 【2】私信发送 2024生成式AI产业落地路径研究报告 【3】获取本方案PDF下载链接&#xff0c;直接下载即可。 如需下载本方案PPT/WORD原格式&#xff0c;诚挚邀请您微信扫描以下二维码加入方案驿站知识星球&#xff0c;…

HMI 的 UI 风格,超凡脱俗

HMI 的 UI 风格&#xff0c;超凡脱俗

什么是孪生素数猜想

什么是孪生素数猜想 素数p与素数p2有无穷多对 孪生素数的公式&#xff08;详见百度百科&#xff1a;孪生素数公式&#xff09; 利用素数的判定法则&#xff0c;可以得到以下的结论&#xff1a;“若自然数q与q2都不能被任何不大于的素数 整除&#xff0c;则q与q 2都是素数”…

Redis预备知识

一.预备知识 1.基本全局命令 set key value 将key的值设置成value get key 得到key的值 keys [pattern] 查看匹配pattern的所有key 比如h?llo匹配hallo&#xff0c;hbllo&#xff0c;hcllo……只要用一个符号将&#xff1f;代替即可 比如h*llo匹配hllo&#xff0c;heeeello…

Java预约家政5.0服务本地服务源码(APP+小程序+公众号+H5)

预约家政本地服务平台系统&#xff1a;一站式解决家居需求&#x1f3e0;&#x1f4bc; 一、引言&#xff1a;开启便捷家居新时代 在快节奏的现代生活中&#xff0c;我们渴望拥有更多的时间和精力去享受生活&#xff0c;而不是被繁琐的家务所困扰。预约家政本地服务平台系统应…

远程医疗软件到底哪个好用?

随着科技进步的不断推进&#xff0c;远程医疗已经成为现代医疗体系的一个重要支柱。远程医疗软件&#xff0c;通过网络通信技术的运用&#xff0c;打破了地理限制&#xff0c;实现了医疗资源的有效整合与共享&#xff0c;为民众提供了前所未有的便捷高效的医疗服务体验。那么&a…