搭建大型分布式服务(三十九)SpringBoot 整合多个kafka数据源-支持Aware模式

系列文章目录


文章目录

  • 系列文章目录
  • 前言
      • 一、本文要点
      • 二、开发环境
      • 三、原项目
      • 四、修改项目
      • 五、测试一下
      • 五、小结


前言

本插件稳定运行上百个kafka项目,每天处理上亿级的数据的精简小插件,快速上手。

<dependency>
    <groupId>io.github.vipjoey</groupId>
    <artifactId>multi-kafka-consumer-starter</artifactId>
    <version>最新版本号</version>
</dependency>

例如下面这样简单的配置就完成SpringBoot和kafka的整合,我们只需要关心com.mmc.multi.kafka.starter.OneProcessorcom.mmc.multi.kafka.starter.TwoProcessor 这两个Service的代码开发。

## topic1的kafka配置
spring.kafka.one.enabled=true
spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.one.topic=mmc-topic-one
spring.kafka.one.group-id=group-consumer-one
spring.kafka.one.processor=com.mmc.multi.kafka.starter.OneProcessor // 业务处理类名称
spring.kafka.one.consumer.auto-offset-reset=latest
spring.kafka.one.consumer.max-poll-records=10
spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

## topic2的kafka配置
spring.kafka.two.enabled=true
spring.kafka.two.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.two.topic=mmc-topic-two
spring.kafka.two.group-id=group-consumer-two
spring.kafka.two.processor=com.mmc.multi.kafka.starter.TwoProcessor // 业务处理类名称
spring.kafka.two.consumer.auto-offset-reset=latest
spring.kafka.two.consumer.max-poll-records=10
spring.kafka.two.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.two.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

## pb 消息消费者
spring.kafka.pb.enabled=true
spring.kafka.pb.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.pb.topic=mmc-topic-pb
spring.kafka.pb.group-id=group-consumer-pb
spring.kafka.pb.processor=pbProcessor
spring.kafka.pb.consumer.auto-offset-reset=latest
spring.kafka.pb.consumer.max-poll-records=10
spring.kafka.pb.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.pb.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

国籍惯例,先上源码:Github源码

一、本文要点

本文将介绍通过封装一个starter,来实现多kafka数据源的配置,通过通过源码,可以学习以下特性。系列文章完整目录

  • SpringBoot 整合多个kafka数据源
  • SpringBoot 批量消费kafka消息
  • SpringBoot 优雅地启动或停止消费kafka
  • SpringBoot kafka本地单元测试(免集群)
  • SpringBoot 利用map注入多份配置
  • SpringBoot BeanPostProcessor 后置处理器使用方式
  • SpringBoot 将自定义类注册到IOC容器
  • SpringBoot 注入bean到自定义类成员变量
  • Springboot 取消限定符
  • Springboot 支持消费protobuf类型的kafka消息
  • Springboot Aware设计模式
  • Springboot 获取kafka消息中的topic、offset、partition、header等参数

二、开发环境

  • jdk 1.8
  • maven 3.6.2
  • springboot 2.4.3
  • kafka-client 2.6.6
  • idea 2020

三、原项目

1、接前文,我们修改了抽象父类,并下沉了kafka相关依赖,使得可以支持消费pb类型的格式数据,根据自己的需求解析出实体类。但也有小伙伴反馈,有时候需要获取kafka消息中的partition、offset、topic、header等参数,这个怎样办?


@Slf4j
@Service
public class OneProcessor extends MmcKafkaKafkaAbastrctProcessor<DemoMsg> {

	// 需要在DemoMsg获取partition、offset、topic、header等参数
	// 但不又不想重写父类的doParse()方法,这样显得太笨重
	// 因为需要获取kafka消息中的参数毕竟场景不算太多
    @Override
    protected void dealMessage(List<DemoMsg> datas) {

        datas.forEach(x -> {
            log.info("dealMessage one: {}", x);
        });

    }


}

2、在Spring Boot中,Aware接口是一种特殊的接口,它允许我们在Spring容器初始化时获取特定的bean或属性。Spring Boot提供了多种Aware接口,用于获取不同类型的bean或属性。这些接口的实现原理主要依赖于Spring框架的生命周期回调机制。所以,我们是否可以参考这种方式,让子类按需选择需要哪些参数能力,类似下面这样呢?

// 获取消息去重能力
@Data
class DemoMsg implements MmcMsgDistinctAware{

    private String routekey;

    private String name;

    private Long timestamp;

}

// 获取kafka消息的topic、offset参数
@Data
class DemoMsg implements MmcMsgKafkaAware {

    private String routekey;

    private String name;

    private Long timestamp;

    private String topic;

    private long offset;
}

答案是可以的、但我们要升级和优化一下。

四、修改项目

1、重命名MmcKafkaMsg类为MmcMsgDistinctAware,使得更加符合规范,只要子类实体类实现了本接口,那么就可以具备消息去重的能力。

public interface MmcMsgDistinctAware {

    /**
     * 代表kafka消息的唯一键,用于批次内分组.

     * @return 唯一键
     */
    String getRoutekey();

    /**
     * kafka消息生产或接收时间,用于批次内分组,根据时间去重,取最新的消息.
     *
     * @return 消息时间
     */
    Long getTimestamp();
}

2、新增MmcMsgKafkaAware,只要子类的实体类实现本接口,就可以方便获取kafka消息中的topic、offset等参数。

public interface MmcMsgKafkaAware {

    /**
     * 注入topic.
     *
     * @param topic topic名称
     */
    void setTopic(String topic);

    /**
     * 注入offset.
     *
     * @param offset offset
     */
    void setOffset(long offset);
}

3、修改KafkaAbastrctProcessor抽象父类,重写解析消息方法,使得可以根据实体类的Aware接口标记来获取对应的能力;

@Slf4j
@Setter
abstract class KafkaAbstractProcessor<T> implements MmcInputer {
   
    // 重写解析消息方法,使得可以根据实体类的Aware接口标记来获取对应的能力
	/**
     * 将kafka消息解析为实体,支持json对象或者json数组.
     *
     * @param map kafka消息对象,包含key、value、topic、partition、offset等
     * @return 实体类
     */
    protected Stream<T> doParse(ConsumerRecord<String, Object> map) {

        // 消息对象
        Object record = map.value();

        // 如果是pb格式
        if (record instanceof byte[]) {

            return doParseProtobuf((byte[]) record);

        } else if (record instanceof String) {

            // 普通kafka消息
            String json = record.toString();

            if (json.startsWith("[")) {

                // 数组
                List<T> datas = doParseJsonArray(json);
                if (CommonUtil.isEmpty(datas)) {

                    log.warn("{} doParse error, json={} is error.", name, json);
                    return Stream.empty();
                }

                // 反序列对象后,做一些初始化操作
                datas = datas.stream().peek(x -> doKafkaAware(x, map)).peek(this::doAfterParse).collect(Collectors.toList());

                return datas.stream();

            } else {

                // 对象
                T data = doParseJsonObject(json);
                if (null == data) {

                    log.warn("{} doParse error, json={} is error.", name, json);
                    return Stream.empty();
                }

                // 注入kafka相关
                doKafkaAware(data, map);

                // 反序列对象后,做一些初始化操作
                doAfterParse(data);

                return Stream.of(data);
            }

        } else if (record instanceof MmcKafkaMsg) {

            // 如果本身就是MmcKafkaMsg对象,直接返回
            //noinspection unchecked
            return Stream.of((T) record);

        } else {


            throw new UnsupportedForMessageFormatException("not support message type");
        }

    }

    protected void doKafkaAware(T x, ConsumerRecord<String, Object> record) {
		// 根据自己诉求去扩展,可以增加无限xxxAware,获取任意record的参数
        if (x instanceof MmcMsgKafkaAware) {
            ((MmcMsgKafkaAware) x).setOffset(record.offset());
            ((MmcMsgKafkaAware) x).setTopic(record.topic());
        }

    }

五、测试一下

1、引入kafka测试需要的jar。参考文章:kafka单元测试

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.18.0</version>
            <scope>test</scope>
        </dependency>
        
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java-util</artifactId>
            <version>3.18.0</version>
            <scope>test</scope>
        </dependency>

2、修改DemoMsg,让它实现MmcMsgKafkaAware接口,用来获取topic和offset参数,Processor不用修改,保持不变;

@Data
class DemoMsg implements MmcMsgKafkaAware {

    private String routekey;

    private String name;

    private Long timestamp;

    private String topic;

    private long offset;
}

3、消费者配置保持不变。

## 消费者配置
spring.kafka.one.enabled=true
spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.one.topic=mmc-topic-one
spring.kafka.one.group-id=group-consumer-one
spring.kafka.one.processor=com.mmc.multi.kafka.starter.OneProcessor
spring.kafka.one.consumer.auto-offset-reset=latest
spring.kafka.one.consumer.max-poll-records=10
spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

4、编写测试类,测试类保持不变。

@Slf4j
@ActiveProfiles("dev")
@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = {MmcMultiConsumerAutoConfiguration.class, DemoService.class, OneProcessor.class})
@TestPropertySource(value = "classpath:application.properties")
@DirtiesContext
@EmbeddedKafka(topics = {"${spring.kafka.one.topic}"})
class AppTest {


    @Resource
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Value("${spring.kafka.one.topic}")
    private String topicOne;

    @Value("${spring.kafka.two.topic}")
    private String topicTwo;

    @Test
    void testDealMessage() throws Exception {

        Thread.sleep(2 * 1000);
        
        // 模拟生产数据
        produceMessage();

        Thread.sleep(10 * 1000);
    }

    void produceMessage() {

        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();

        for (int i = 0; i < 10; i++) {

            DemoMsg msg = new DemoMsg();
            msg.setRoutekey("routekey" + i);
            msg.setName("name" + i);
            msg.setTimestamp(System.currentTimeMillis());

            String json = JsonUtil.toJsonStr(msg);
            producer.send(new ProducerRecord<>(topicOne, "my-aggregate-id", json));
            producer.send(new ProducerRecord<>(topicTwo, "my-aggregate-id", json));
            producer.flush();

        }
    }
}


5、运行一下,测试通过,可以看到已经打印topic和offset。
在这里插入图片描述

五、小结

将本项目代码构建成starter,就可以大大提升我们开发效率,我们只需要关心业务代码的开发,github项目源码:轻触这里。如果对你有用可以打个星星哦。下一篇,升级本starter,在kafka单分区下实现十万级消费处理速度。

《搭建大型分布式服务(三十六)SpringBoot 零代码方式整合多个kafka数据源》
《搭建大型分布式服务(三十七)SpringBoot 整合多个kafka数据源-取消限定符》
《搭建大型分布式服务(三十八)SpringBoot 整合多个kafka数据源-支持protobuf》
《搭建大型分布式服务(三十九)SpringBoot 整合多个kafka数据源-支持Aware模式》

加我加群一起交流学习!更多干货下载、项目源码和大厂内推等着你

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/682299.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

YOLOv8+PyQt5苹果叶病害检测(可以重新训练,yolov8模型,从图像、视频和摄像头三种路径识别检测)

效果视频&#xff1a;YOLOv8PyQt5苹果叶病害检测系统完整资源集合 资源包含可视化的苹果叶病害检测系统&#xff0c;基于最新的YOLOv8训练的苹果叶病害检测模型&#xff0c;和基于PyQt5制作的可视苹果叶病害系统&#xff0c;包含登陆页面和检测页面&#xff0c;该系统可自动检…

上位机图像处理和嵌入式模块部署(f407 mcu中的单独上位机烧录方法)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 前面我们说过&#xff0c;stm32有三种烧录方法&#xff0c;一种是st-link v2&#xff0c;一种是dap&#xff0c;一种是j-link。不过我们在实际操作…

电阻、电容和电感测试仪设计

在现代化生产、学习、实验当中,往往需要对某个元器件的具体参数进行测量,在这之中万用表以其简单易用,功耗低等优点被大多数人所选择使用。然而万用表有一定的局限性,比如:不能够测量电感,而且容量稍大的电容也显得无能为力。所以制作一个简单易用的电抗元器件测量仪是很…

17K star,一款开源免费的手机电脑无缝同屏软件

导读&#xff1a;白茶清欢无别事&#xff0c;我在等风也等你。 作为程序员&#xff0c;在我们的工作中经常需要把手机投票到电脑进行调试工作&#xff0c;选择一款功能强大的投屏软件是一件很必要的事情。今天给大家介绍一款开源且免费的投屏软件&#xff0c;极限投屏&#xff…

json和axion结合

目录 java中使用JSON对象 在pom.xml中导入依赖 使用 public static String toJSONString(Object object)把自定义对象变成JSON对象 json和axios综合案例 使用的过滤器 前端代码 响应和请求都是普通字符串 和 请求时普通字符串&#xff0c;响应是json字符串 响应的数据是…

postman教程-14-生成随机数

领取资料&#xff0c;咨询答疑&#xff0c;请➕wei: June__Go 上一小节我们学习了Postman关联接口的调用方法&#xff0c;本小节我们讲解一下Postman生成随机数的方法。 在接口测试中&#xff0c;经常需要向接口发送不同的输入数据&#xff0c;以确保接口的健壮性和可靠性。…

Vue3实战笔记(51)—Vue 3封装带均线的k线图

文章目录 前言带均线的k线图总结 前言 继续封装一个封装带均线的k线图 带均线的k线图 EChartsCandlestickSh.vue&#xff1a; <template><div ref"chartContainer" style"width: 100%; height: 500px"></div></template><scr…

HackTheBox-Machines--SolidState

SolidState 测试过程 1 信息收集 NMAP ┌──(root㉿serven)-[~] └─# nmap -p 0-65535 -A 10.129.224.177 Starting Nmap 7.92 ( https://nmap.org ) at 2024-06-05 00:52 CST Host is up (0.063s latency). Not shown: 65530 closed tcp ports (reset) PORT STATE SE…

如何解决 Zabbix模板同步超时:解决运维技术领域的BugFailed to sync Zabbix template due to timeout

如何解决 Zabbix模板同步超时&#xff1a;解决运维技术领域的BugFailed to sync Zabbix template due to timeout 原创作者&#xff1a; 猫头虎 作者微信号&#xff1a; Libin9iOak 作者公众号&#xff1a; 猫头虎技术团队 更新日期&#xff1a; 2024年6月6日 博主猫头虎…

python学习笔记-05

函数 基本上所有的高级语言都支持函数&#xff0c;函数就是一种代码抽象的方式。之前所使用的len、print等都是python的内置函数。 1.初识函数 在编写程序过程中&#xff0c;如果一段代码经常出现&#xff0c;为了提高编写效率&#xff0c;将这类实现某个功能的代码作为一个…

Linux: ubi rootfs 加载故障案例

文章目录 1. 前言2. ubi rootfs 加载故障现场3. 故障分析与解决4. 参考资料 1. 前言 限于作者能力水平&#xff0c;本文可能存在谬误&#xff0c;因此而给读者带来的损失&#xff0c;作者不做任何承诺。 2. ubi rootfs 加载故障现场 问题故障内核日志如下&#xff1a; Star…

前后端分离与实现 ajax 异步请求 和动态网页局部生成

前端 <!DOCTYPE html><!-- 来源 --> <!-- https://cloud.tencent.com/developer/article/1705089 --> <!-- https://geek-docs.com/ajax/ajax-questions/19_ajax_javascript_send_json_object_with_ajax.html --> <!-- 配合java后端可以监听 --&…

Jmeter的几种参数化方式

1.为什么要做参数化&#xff1f; 在用jmeter脚本进行压测的时候&#xff0c;为了更真实的模拟起到更好的效果&#xff0c;我们需要让参数动态变化起来&#xff0c;也就是参数化。通过参数化我们也可以更好、更灵活的维护我们的测试脚本。 2.参数化的方式 能够实现参数化的方式有…

Linux 磁盘分区步骤

1.lsblk用于查看磁盘分区情况&#xff0c;lsblk -f用于查看uuid字符串以及挂载点。 以下是虚拟机部分添加磁盘的步骤。 其余没展示的都按照默认设置进入下一步即可。 2.添加完成后使用reboot重新进入后再使用lsblk就会发现磁盘sdb已经有了&#xff0c;但是没有分区。现在添加分…

LitCTF2024部分wp

litctf wp 第一次ak了web和misc&#xff0c;非常激动&#xff0c;感谢lictf给我这个机会 最终成果 全靠队里的密码逆向✌带飞。一个人就砍了近一半的分数 这里是我们队的wp web exx 题目名反过来就是xxe&#xff0c;考察xxe&#xff0c;查看登录的数据包 发现传的就是xml…

华为高斯数据库招聘

西安华为&#xff0c;部门直招数据库开发&#xff0c;测试&#xff0c;维护&#xff0c;hc充足&#xff0c;流程快。 语言不限&#xff0c;专业不限&#xff0c;与业内数据库大佬共事&#xff0c;致力于做数据库行业领军者。 一、岗位职责&#xff1a; 1.负责数据库系统内核模…

Qt信号槽与函数直接调用性能对比

1. 测试方法 定义一个类Recv&#xff0c;其中包含一个成员变量num和一个成员函数add()&#xff0c;add()实现num的递增。 另一个类Send通过信号槽或直接调用的方法调用Recv的add函数。 单独开一个线程Watcher&#xff0c;每秒计算num变量的增长数值&#xff0c;作为add函数被调…

SpaceX 首席火箭着陆工程师 MIT论文详解:非凸软着陆最优控制问题的控制边界和指向约束的无损凸化

上一篇blog翻译了 Lars Blackmore(Lars Blackmore is principal rocket landing engineer at SpaceX)的文章&#xff0c;SpaceX 使用 CVXGEN 生成定制飞行代码,实现超高速机载凸优化。利用地形相对导航实现了数十米量级的导航精度,着陆器在着陆过程中成像行星表面并将特征与机载…

Spring自带定时任务@Scheduled注解

文章目录 1. cron表达式生成器2. 简单定时任务代码示例&#xff1a;每隔两秒打印一次字符3. Scheduled注解的参数3.1 cron3.2 fixedDelay3.3 fixedRate3.4 initialDelay3.5 fixedDelayString、fixedRateString、initialDelayString等是String类型&#xff0c;支持占位符3.6 tim…

用幻灯片讲解C++手动内存管理

用幻灯片讲解C手动内存管理 1.栈内存的基本元素 2.栈内存的聚合对象 3.手动分配内存和释放内存 注意&#xff1a;手动分配内存&#xff0c;指的是在堆内存中。 除非实现自己的数据结构&#xff0c;否则永远不要手动分配内存! 即使这样&#xff0c;您也应该通过std::allocator…