Kafka 的消息格式:了解消息结构与序列化

Kafka 作为一款高性能的消息中间件系统,其消息格式对于消息的生产、传输和消费起着至关重要的作用。本篇博客将深入讨论 Kafka 的消息格式,包括消息的结构、序列化与反序列化,以及一些常用的消息格式选项。通过更丰富的示例代码和深入的解析,希望能够帮助大家更好地理解 Kafka 消息的内部机制。

1. Kafka 消息结构

Kafka 的消息结构由消息头、消息键、消息值和时间戳等组成。下面是一个典型的 Kafka 消息结构:

----------------------------------------------------------------------------------------------
| Message Header | Key | Value | Timestamp | Optional Headers |
----------------------------------------------------------------------------------------------

1.1 消息头

消息头包含一些元数据信息,例如消息的大小、压缩信息等。消息头的结构可能会根据 Kafka 版本和配置而有所不同。

1.2 消息键与消息值

  • 消息键(Key): 用于标识消息的唯一性,通常用于分区和查找消息。

  • 消息值(Value): 包含实际的消息内容。

1.3 时间戳

时间戳表示消息的产生时间,有两种类型:

  • 创建时间戳: 表示消息被创建的时间。

  • LogAppendTime 时间戳: 表示消息被追加到日志的时间。

2. 消息的序列化与反序列化

Kafka 中的消息在生产者发送和消费者接收时需要进行序列化和反序列化。这是因为 Kafka 是以字节流的形式存储和传输消息的,而实际的消息内容可能是各种不同的数据类型。以下是一些常用的序列化器和反序列化器:

2.1 字符串序列化器

// 生产者端
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "Hello, Kafka!");

// 消费者端
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
    System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());
});

2.2 Avro 序列化器

Avro 是一种高性能且紧凑的二进制序列化格式,适用于复杂数据结构的消息。

// 生产者端
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("field1", "value1");
avroRecord.put("field2", 42);
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("my-topic", "key", avroRecord);

// 消费者端
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
    GenericRecord value = record.value();
    System.out.printf("Consumed record with key %s and value %s%n", record.key(), value);
});

2.3 JSON 序列化器

// 生产者端
JsonNode jsonNode = objectMapper.createObjectNode();
((ObjectNode) jsonNode).put("field1", "value1");
((ObjectNode) jsonNode).put("field2", 42);
ProducerRecord<String, JsonNode> record = new ProducerRecord<>("my-topic", "key", jsonNode);

// 消费者端
ConsumerRecords<String, JsonNode> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
    JsonNode value = record.value();
    System.out.printf("Consumed record with key %s and value %s%n", record.key(), value);
});

3. 自定义消息格式

在某些情况下,你可能需要定义自己的消息格式。Kafka 提供了 ByteArraySerializerByteArrayDeserializer,允许你将消息以字节数组的形式发送和接收,从而实现自定义的序列化和反序列化逻辑。

// 生产者端
byte[] customMessageBytes = serializeCustomMessage(customMessage);
ProducerRecord<String, byte[]> record = new ProducerRecord<>("my-topic", "key", customMessageBytes);

// 消费者端
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
    byte[] value = record.value();
    CustomMessage customMessage = deserializeCustomMessage(value);
    System.out.printf("Consumed record with key %s and value %s%n", record.key(), customMessage);
});

4. 消息的压缩与解压

Kafka 支持消息的压缩,以减小网络传输的开销。以下是一些常用的压缩选项:

// 生产者端
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);

// 消费者端
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
Consumer<String, String> consumer = new KafkaConsumer<>(props);

5. 消息的版本控制与兼容性

在实际应用中,系统的演进和变化是不可避免的。因此,考虑到消息的版本控制和兼容性是非常重要的。以下是一些相关的注意事项和最佳实践:

5.1 消息的演进

  • 向后兼容性: 新版本的消费者能够处理旧版本的消息。

  • 向前兼容性: 旧版本的消费者能够处理新版本的消息。

5.2 Schema Registry

Schema Registry 是一个用于存储和管理 Avro、JSON 等消息格式的架构的中心化服务。通过使用 Schema Registry,可以更好地管理消息的演进,并确保向前和向后的兼容性。

// 配置 Schema Registry 地址
props.put("schema.registry.url", "http://schema-registry:8081");

6. 消息的认证与加密

Kafka 提供了安全性特性,包括消息的认证和加密。以下是一些相关的配置选项:

6.1 SSL 加密通信

// 生产者端
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");

// 消费者端
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");

6.2 认证配置

// 生产者端
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");

// 消费者端
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");

7. 消息的追踪与监控

追踪和监控是保障系统稳定性和性能的重要手段。以下是一些常用的追踪和监控工具:

7.1 JMX 监控

Kafka 提供了 JMX 接口,可以通过 JConsole 或其他 JMX 客户端进行监控。

7.2 Kafka Manager

Kafka Manager 是一款开源的 Kafka 集群管理和监控工具,提供了直观的 Web 界面。

7.3 Prometheus 和 Grafana

使用 Prometheus 进行指标采集,结合 Grafana 进行可视化展示,可以更全面地监控 Kafka 集群的性能和健康状况。

总结

在深入探讨Kafka消息格式、版本控制、安全性和监控等关键主题后,对构建高效、灵活的消息系统有了更为全面的认识。了解消息结构、序列化与反序列化、自定义消息格式,以及消息的压缩与解压,是确保消息传递的基础。随后,版本控制与兼容性的重要性得到了强调,Schema Registry成为管理Avro、JSON等消息格式的利器。在保障消息传递安全方面,SSL加密通信和认证配置提供了可靠的手段。最后,通过JMX监控、Kafka Manager、以及Prometheus和Grafana的运用,能够实时追踪和监控Kafka集群的健康状态。

这篇文章旨在为大家提供全方位的Kafka消息系统知识,使其能够在实际应用中根据业务需求构建稳健、高效的消息处理系统。深入理解这些关键概念,将有助于确保消息系统的可维护性、稳定性和安全性,为实际业务场景中的挑战提供可行的解决方案。继续关注更多Kafka相关的技术内容,将使大家能够不断深化对消息系统的认识,应对日益复杂的数据处理需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/224407.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Quasar】暗黑主题随系统切换部分组件无法随系统切换

问题描述 Quasar部分组件无法随系统切换主题 。 假如系统、Quasar主题为白天模式。Quasar设置主题随系统切换&#xff0c;当系统切换暗黑模式时&#xff0c;Quasar导航栏无法正常切换为暗黑模式&#xff0c;此时背景还是白天模式&#xff0c;如图 正常切换参考图 正常暗黑…

【musl-pwn】msul-pwn 刷题记录 -- musl libc 1.2.2

前言 本文不分析 musl libc 相关源码&#xff0c;仅仅为刷题记录&#xff0c;请读者自行学习相关知识&#xff08;看看源码就行了&#xff0c;代码量也不大&#xff09; starCTF2022_babynote 保护&#xff1a;保护全开 程序与漏洞分析&#xff1a; 程序实现了一个菜单堆&…

第3章:知识表示:概述、符号知识表示、向量知识表示

&#x1f497;&#x1f497;&#x1f497;欢迎来到我的博客&#xff0c;你将找到有关如何使用技术解决问题的文章&#xff0c;也会找到某个技术的学习路线。无论你是何种职业&#xff0c;我都希望我的博客对你有所帮助。最后不要忘记订阅我的博客以获取最新文章&#xff0c;也欢…

Python 从入门到精通 学习笔记 Day01

Python 从入门到精通 第一天 今日目标 计算机组成原理、编程语言、Python环境安装 第一个Python程序、PyCharm的安装与使用 Python的基础语法、Python的基本数据类型 一、计算机组成原理 计算机的组成 计算机硬件通常由以下几个部分组成: 1.中央处理器(CPU):负责执行计算机…

HarmonyOS4.0从零开始的开发教程03初识ArkTS开发语言(中)

HarmonyOS&#xff08;二&#xff09;初识ArkTS开发语言&#xff08;中&#xff09;之TypeScript入门 浅析ArkTS的起源和演进 1 引言 Mozilla创造了JS&#xff0c;Microsoft创建了TS&#xff0c;Huawei进一步推出了ArkTS。 从最初的基础的逻辑交互能力&#xff0c;到具备类…

Docker-多容器应用

一、概述 到目前为止&#xff0c;你一直在使用单个容器应用。但是&#xff0c;现在您将 MySQL 添加到 应用程序堆栈。经常会出现以下问题 - “MySQL将在哪里运行&#xff1f;将其安装在同一个 容器还是单独运行&#xff1f;一般来说&#xff0c;每个容器都应该做一件事&#x…

题目分析,高度理解一维二维数组的申请和[]是什么运算符

第0题: 动态申请二维数组并输出非负数和 和负数出现次数 思路:输入数组大小,然后申请内存并不对其初始化,提高速度,传入数据到申请的数组中,判断如果数组中有元素小于0对其进行计数,否则加上非0数最后输出答案,释放内存 第一题: 解答: 运行结果: 思路分析: 创建长度为20的…

聚观早报 |东方甄选将上架文旅产品;IBM首台模块化量子计算机

【聚观365】12月6日消息 东方甄选将上架文旅产品 IBM首台模块化量子计算机 新思科技携手三星上新兴领域 英伟达与软银推动人工智能研发 苹果对Vision Pro供应商做出调整 东方甄选将上架文旅产品 东方甄选宣布12月10日将在东方甄选APP上线文旅产品&#xff0c;受这一消息影…

【算法】算法题-20231207

这里写目录标题 一、共同路径二、数字列表排序三、给定两个整数 n 和 k&#xff0c;返回 1 … n 中所有可能的 k 个数的组合。 一、共同路径 给你一个完整文件名组成的列表&#xff0c;请编写一个函数&#xff0c;返回他们的共同目录路径。 # nums[/hogwarts/assets/style.cs…

1-Tornado的介绍

1 tornado的介绍 **Tornado**是一个用Python编写的可扩展的、无阻塞的**Web应用程序框架**和**Web服务器**。 它是由FriendFeed开发使用的&#xff1b;该公司于2009年被Facebook收购&#xff0c;而Tornado很快就开源了龙卷风以其高性能着称。它的设计允许处理大量并发连接&…

4.Java程序设计-基于springboot得在线考试系统

编程技术交流、源码分享、模板分享、网课分享 企鹅&#x1f427;裙&#xff1a;772162324 摘要&#xff1a; 本文设计并实现了一款基于Spring Boot框架的在线考试系统小程序。随着远程学习和在线教育的普及&#xff0c;对于灵活、便捷的在线考试系统的需求逐渐增加。该小程序…

零基础小白怎么准备蓝桥杯-蓝桥杯竞赛经验分享

零基础小白怎么准备蓝桥杯-蓝桥杯竞赛经验分享 前言竞赛简介竞赛目的如何备战1.基础学习2.实战训练&#xff08;非常重要&#xff09; 资料分享 前言 博主在蓝桥杯中获得过十四届Java B 组的省一国二&#xff0c;本文为大家介绍一下蓝桥杯并分享一下自己的参赛经验。 竞赛简介…

流量异常-挂马造成百度收录异常关键词之解决方案(虚拟主机)

一.异常现象&#xff1a;流量突然暴涨&#xff0c;达到平时流量几倍乃至几十倍&#xff0c;大多数情况下因流量超标网站被停止。 二.排查原因&#xff1a; 1.首先分析web日志&#xff1a;访问量明显的成倍、几十倍的增加&#xff1b;访问页面不同&#xff1b;访问IP分散并不固…

阿里云上传文件出现的问题解决(跨域设置)

跨域设置引起的问题 起因&#xff1a;开通对象存储服务后&#xff0c;上传文件限制在5M 大小&#xff0c;无法上传大文件。 1.查看报错信息 2.分析阿里云服务端响应内容 <?xml version"1.0" encoding"UTF-8"?> <Error><Code>Invali…

27. 深度学习进阶 - 为什么RNN

文章目录 一个柯基的例子为什么RNN or CNN Hi&#xff0c;你好。我是茶桁。 这节课开始&#xff0c;我们将会讲一个比较重要的一种神经网络&#xff0c;它对应了咱们整个生活中很多类型的一种问题结构&#xff0c;它就是咱们的RNN网络。 咱们首先回忆一下&#xff0c;上节课咱…

zookeeper集群介绍

一个leader&#xff0c;多个follower&#xff0c;组成的集群 集群中只要有半数以上得节点存活&#xff0c;zookeeper集群就能正常服务 顺序一致性&#xff1a; 来自同一个client的更新请求按其发送顺序依次执行 原子性&#xff1a; 更新操作要么成功要么失败&#xff0c; 没有…

zookeeper1==zookeeper源码阅读,源码启动ZK集群

下载源码 Tags apache/zookeeper GitHub https://codeload.github.com/apache/zookeeper/zip/refs/tags/release-3.9.1 JDK8 MAVEN3.8.6 mvn -DskipTeststrue package 配置ZK1 zkServer.cmd中指出了启动类是 QuorumPeerMain QuorumPeer翻译成集群成员比较合理&#xf…

一文读懂MySQL基础知识文集

&#x1f3c6;作者简介&#xff0c;普修罗双战士&#xff0c;一直追求不断学习和成长&#xff0c;在技术的道路上持续探索和实践。 &#x1f3c6;多年互联网行业从业经验&#xff0c;历任核心研发工程师&#xff0c;项目技术负责人。 &#x1f389;欢迎 &#x1f44d;点赞✍评论…

实战演练 | 在 Navicat 中格式化日期和时间

Navicat 支持团队收到来自用户常问的一个问题是&#xff0c;如何将网格和表单视图中的日期和时间进行格式化。其实这个很简单。今天&#xff0c;我们将介绍在 Navicat Premium 中进行全局修改日期和时间格式的步骤。 如果你想边学边用&#xff0c;欢迎点击 这里 下载免费全功能…