(七)消息队列-Kafka 序列化avro(传递)

(七)消息队列-Kafka 序列化avro(传递)

客从远方来,遗我双鲤鱼。呼儿烹鲤鱼,中有尺素书。
——佚名《饮马长城窟行》

在这里插入图片描述

本文已同步CSDN、掘金平台、知乎等多个平台,图片依然保持最初发布的水印(如CSDN水印)。(以后属于本人原创均以新建状态在多个平台分享发布)

前言

多年前,由于工作的性质,发现这系列没有写完,想了想,做人做事还是要有始有终。🤣实在是借口太多了,太不像话了…由于时间过得太久了,这篇开始,可能很多技术以最新或最近的几个版本为主了。

问题背景

在Kafka中,生产者与消费者之间传输消息时,通常需要对数据进行序列化和反序列化。常见的序列化方式如JSON或String存在以下问题:

  1. 数据冗余:字段名重复存储,占用带宽;
  2. 兼容性差:新增或删除字段时容易导致上下游解析失败;
  3. 类型安全缺失:动态解析易引发运行时错误。

而Avro作为一种高效的二进制序列化框架,通过Schema定义数据结构,可实现紧凑存储动态兼容性强类型校验,成为Kafka生态中推荐的序列化方案27。


核心原理
  1. Schema驱动
    Avro要求所有数据必须与预定义的Schema文件(.avsc)匹配。Schema以JSON格式描述数据结构,例如:

    {
      "type": "record",
      "name": "User",
      "namespace": "com.example.avro",
      "fields": [
        {"name": "id", "type": "int"},
        {"name": "name", "type": "string"}
      ]
    }
    

    然后使用 avro-maven-plugin 生成 Java 类:

    <plugin>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-maven-plugin</artifactId>
        <version>1.11.0</version>
        <executions>
            <execution>
                <phase>generate-sources</phase>
                <goals>
                    <goal>schema</goal>
                </goals>
            </execution>
        </executions>
    </plugin>
    

    执行 mvn clean compile 后,com.example.avro.User 类会被自动生成。

    生产者与消费者需共享同一Schema,确保序列化与反序列化的一致性。

  2. 二进制编码
    Avro将数据转换为紧凑的二进制格式,相比JSON减少约30%-50%的存储与传输开销。例如,整型字段直接以二进制存储,无需字段名冗余7。

  3. Schema Registry
    为实现Schema动态管理,通常搭配Schema Registry(如Confluent或Apicurio)使用。其核心功能包括:

    • Schema版本控制与兼容性检查;
    • 通过唯一ID关联消息与Schema,避免传输完整Schema带来的性能损耗。

实现步骤

以下以Java代码为例,展示Kafka集成Avro的配置方法:

1. 添加依赖
<dependencies>
    <!-- Spring Kafka 依赖-->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    
    <!-- Avro 依赖 -->
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
    </dependency>
    
    <!-- Schema Registry 依赖 -->
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
        <version>7.2.1</version>
    </dependency>
</dependencies>

运行 HTML

2. 配置生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", KafkaAvroSerializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081"); // Schema Registry地址

Producer<String, GenericRecord> producer = new KafkaProducer<>(props);

// 构建Avro消息
GenericRecord user = new GenericData.Record(schema);
user.put("id", 1);
user.put("name", "Alice");

producer.send(new ProducerRecord<>("user-topic", user));



------ SpringBoot框架 直接用配置application.yml 和生产者服务类--------------
spring:
  kafka:
    bootstrap-servers: localhost:9092
    properties:
      schema.registry.url: http://localhost:8081
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      
@Service
public class UserProducer {
    private final KafkaTemplate<String, User> kafkaTemplate;
    
    @Value("${kafka.topic.user}")
    private String topic;

    public UserProducer(KafkaTemplate<String, User> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendUser(User user) {
        kafkaTemplate.send(topic, user.getId().toString(), user);
    }
}

在 Spring Boot 启动后,我们可以使用以下代码发送一个 User 消息:
User user = User.newBuilder()
    .setId(1)
    .setName("Alice")
    .build();
userProducer.sendUser(user);

控制台应该能够看到消费者成功接收到 User 数据
3. 配置消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "avro-consumer");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", KafkaAvroDeserializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");

Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user-topic"));

while (true) {
    ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, GenericRecord> record : records) {
        System.out.println("Received: " + record.value().get("name"));
    }
}

------ SpringBoot框架 直接用配置application.yml 和消费者服务类--------------
在 application.yml 中配置消费者参数:

spring:
  kafka:
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      properties:
        specific.avro.reader: true

然后编写 Kafka 消费者代码:

@Service
@KafkaListener(topics = "user_topic", groupId = "user_group")
public class UserConsumer {

    @KafkaHandler
    public void consume(User user) {
        System.out.println("Received user: " + user.getName());
    }
}



常见问题与解决方案
  1. Schema兼容性错误
    • 现象:生产者更新Schema后消费者无法解析旧数据。
    • 解决:在Schema Registry中配置兼容性策略(如BACKWARD),允许新增字段并设置默认值7。
  2. ClassNotFoundException
    • 现象:反序列化时提示Avro生成的类不存在。
    • 解决:通过Maven插件avro-maven-plugin自动生成Java类,并确保生成路径在编译范围内2。
  3. 性能瓶颈
    • 现象:高吞吐场景下序列化延迟较高。
    • 优化:复用DatumWriterDatumReader实例,避免重复初始化开销7。

总结

Avro通过Schema定义与二进制编码,为Kafka提供了高效、类型安全的序列化方案。结合Schema Registry可实现动态兼容性管理,适用于复杂业务场景下的数据演进需求。实践中需注意Schema版本控制与性能调优,具体工具链配置可参考Confluent官方文档27。


引用说明

  • 代码结构参考自SpringBoot RestTemplate配置方案,通过替换默认组件实现功能增强。
  • Schema兼容性问题分析借鉴了MAT工具中内存对象关联性的排查思路。

后续

下期预告,敬请关注:
(八)消息队列-Kafka 生产者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/980292.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Docker 学习(一)

一、Docker 核心概念 Docker 是一个开源的容器化平台&#xff0c;允许开发者将应用及其所有依赖&#xff08;代码、运行时、系统工具、库等&#xff09;打包成一个轻量级、可移植的“容器”&#xff0c;实现 “一次构建&#xff0c;随处运行”。 1、容器&#xff08;Container…

使用mermaid查看cursor程序生成的流程图

一、得到cursor生成的流程图文本 cursor写的程序正常运行后&#xff0c;在对话框输入框中输入诸如“请生成扫雷的代码流程图”&#xff0c;然后cursor就把流程图给生成了&#xff0c;但是看到的还是文本的样子&#xff0c;保留这部分内容待用 二、注册一个Mermaid绘图账号 …

蜂鸣器使用

1、蜂鸣器原理 无源蜂鸣器模块根据输入的 不同方波信号&#xff08;作为震荡源&#xff09;可以发出不同的声音。驱动电路中三极管电阻一般为1K-4K都行&#xff0c;能够让三极管导通即可。&#xff08;三极管即带箭头的部分&#xff0c;基极和发射机&#xff08;PNP&#xff09…

15. LangChain实战项目2——易速鲜花海报文案生成

你已经制作好了一批鲜花的推广海报&#xff0c;想为每一个海报的内容&#xff0c;写一两句话&#xff0c;然后 post 到社交平台上&#xff0c;以期图文并茂。 下载 Salesforce/blip-image-captioning-large 图生文模型 通过以下几个命令下载该模型 pip install -U huggingfa…

支持IPD项目管理的9大系统,哪款工具能有效提高项目控制能力

本文介绍了以下9大系统: 1.Worktile&#xff1b; 2. 腾讯敏捷开发平台&#xff08;TAPD&#xff09;&#xff1b; 3. 简道云&#xff08;Jiandaoyun&#xff09;&#xff1b; 4. 蓝鲸智云&#xff08;BlueWhale&#xff09;&#xff1b; 5. 轻流&#xff08;Qingflow&#xff0…

创建一个MCP服务器,并在Cline中使用,增强自定义功能。

MCP介绍 MCP 是一个开放协议&#xff0c;它标准化了应用程序如何向LLMs提供上下文。可以将 MCP 视为 AI 应用程序的 USB-C 端口。正如 USB-C 提供了一种标准化的方法来将您的设备连接到各种外围设备和配件一样&#xff0c;MCP 提供了一种标准化的方法来将 AI 模型连接到不同的…

Linux之yum详解

—— 小 峰 编 程 目录 1、Linux软件的安装方式 2、什么是yum 3、配置网络yum源 4、yum命令 【语法】 【yum常用命令】 1、Linux软件的安装方式 在CentOS系统中&#xff0c;软件管理方式通常有三种方式&#xff1a; rpm安装 、 yum安装 以及 编译安装 。 2、什么是yum…

2025国家护网HVV高频面试题总结来了01(题目+回答)

网络安全领域各种资源&#xff0c;学习文档&#xff0c;以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各种好玩的项目及好用的工具&#xff0c;欢迎关注。 目录 0x1 高频面试题第一套 0x2 高频面试题第二套 0x3 高频面试题第三套 0x4 高频面试题第四套 0x5 高频面…

leetcode 59. 螺旋矩阵 II 中等

给你一个正整数 n &#xff0c;生成一个包含 1 到 n2 所有元素&#xff0c;且元素按顺时针顺序螺旋排列的 n x n 正方形矩阵 matrix 。 示例 1&#xff1a; 输入&#xff1a;n 3 输出&#xff1a;[[1,2,3],[8,9,4],[7,6,5]]示例 2&#xff1a; 输入&#xff1a;n 1 输出&am…

基于Rook的Ceph云原生存储部署与实践指南(下)

#作者&#xff1a;任少近 文章目录 6Ceph资源对像管理6.1查看services6.2查看Jobs6.3 查看deployments.apps6.4查看daemonsets.apps6.5查看configmaps6.6查看clusterroles.rbac.authorization.k8s.io6.7查看clusterrolebindings.rbac.authorization.k8s.io6.8通过cephclusters…

深入浅出 Go 语言:协程(Goroutine)详解

深入浅出 Go 语言&#xff1a;协程(Goroutine)详解 引言 Go 语言的协程&#xff08;goroutine&#xff09;是其并发模型的核心特性之一。协程允许你轻松地编写并发代码&#xff0c;而不需要复杂的线程管理和锁机制。通过协程&#xff0c;你可以同时执行多个任务&#xff0c;并…

常用的api测试软件

我们在写完后端API接口的时候&#xff0c;前端工程师可能还没有写完前端的页面&#xff0c;这时候后端工程师需要测试接口&#xff0c;因此后端开发通常需要api测试软件来测试接口&#xff0c;同时通过测试软件把定义好的接口格式分享文档。 这里推荐两款api测试软件软件&…

深入理解并实现自定义 unordered_map 和 unordered_set

亲爱的读者朋友们&#x1f603;&#xff0c;此文开启知识盛宴与思想碰撞&#x1f389;。 快来参与讨论&#x1f4ac;&#xff0c;点赞&#x1f44d;、收藏⭐、分享&#x1f4e4;&#xff0c;共创活力社区。 在 C 的标准模板库&#xff08;STL&#xff09;中&#xff0c;unorder…

可商用街头文化艺术海报封面手写涂鸦标题LOGO排版英文字体 FS163 TYPE FACE

Freestyle 163 &#xff08;FS163&#xff09;是一个受街头文化和城市艺术启发的视觉宣言。该字体旨在突出我们的文化和创意根源&#xff0c;反映了街头运动、城市艺术以及来自社会和边缘的故事。 FS163与面临挑战、质疑规范、放大被忽视声音的品牌和个人联系在一起&#xff0c…

【教程】可视化配置多台主机通过交换机实现互联通信

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你&#xff0c;欢迎[点赞、收藏、关注]哦~ 目录 场景定义 配置步骤 1. 确认网络接口名称 2. 配置静态 IP 地址 3. 验证连接 主机间速率测试 场景定义 有两台主机通过网线与交换机相连。主…

HTTP/2 服务器端推送:FastAPI实现与前端集成指南

HTTP/2 服务器端推送&#xff1a;FastAPI实现与前端集成指南 注意&#xff1a;本文末尾附有完整示例代码&#xff0c;文中仅展示核心关键代码。完整代码可在GitHub仓库获取。 本文将会讲解HTTP2协议和相关配置实践。但是不要混淆&#xff0c;SSE的实现完全基于HTTP/1.1的持久连…

【CSS—前端快速入门】CSS 常用样式

CSS 常用 CSS 样式 1. 前端样式查询网站&#xff1a; MDN Web Docs (mozilla.org) w3school 2. border 2.1 借助 MDN 了解 border 我们借助 MDN 网站来学习 border 样式的使用&#xff1a; 2.2 border 常见属性 保存代码&#xff0c;打开页面&#xff1a; 对于标签不同样式的…

Autosar精华

应用层(APP) 目标:掌握如何设计软件组件(SWC)及其交互。 软件组件(SWC): 原子级SWC:独立的功能模块(如控制算法、传感器处理)。 端口(Ports):Sender-Receiver(数据传递)、Client-Server(服务调用)。 接口(Interface):定义组件间通信的数据类型(如Autosar …

【AD】AD软件中工具栏无IPC封装向导

问题&#xff1a;点击AD工具栏&#xff0c;无IPC封装向导 解决&#xff1a; 点击后点击install&#xff0c;关闭重启软件

【Python 初级函数详解】—— 参数沙漠与作用域丛林的求生指南

欢迎来到ZyyOvO的博客✨&#xff0c;一个关于探索技术的角落&#xff0c;记录学习的点滴&#x1f4d6;&#xff0c;分享实用的技巧&#x1f6e0;️&#xff0c;偶尔还有一些奇思妙想&#x1f4a1; 本文由ZyyOvO原创✍️&#xff0c;感谢支持❤️&#xff01;请尊重原创&#x1…