1. 生产者简介
不管是把 Kafka 作为消息队列系统、还是数据存储平台,总是需要一个可以向 Kafka 写入数据的生产者和一个可以从 Kafka 读取数据的消费者,或者是一个兼具两种角色的应用程序。
使用 Kafka 的场景很多,诉求也各有不同,主要有:是否允许丢失消息?是否接受重复消息?是否有严格的延迟和吞吐量要求?
不同的场景对于 Kafka 生产者 API 的使用和配置会有直接的影响。
1.1. 生产者传输实体
Kafka Producer 发送的数据对象叫做 ProducerRecord
,它有 4 个关键参数:
Topic
- 主题Partition
- 分区(非必填)timestamp
: 消息的时间戳。(非必填)Key
- 键(非必填)Value
- 值headers
: 消息头(键值对列表)。(非必填)
核心字段
-
Topic:
- Kafka 的主题,用于分类消息。
- 类型:
String
- 示例:
"my-topic"
-
Partition:
- 消息的目标分区。
- 类型:
Integer
(可为null
,表示让分区器选择分区)。 - 示例:
0
(指定分区)或null
(自动分配)。
-
Key:
- 消息的键,通常用于确定分区。
- 类型: 泛型
K
,如String
。 - 示例:
"user123"
-
Value:
- 消息的内容。
- 类型: 泛型
V
,如String
或byte[]
。 - 示例:
"Hello, Kafka!"
-
Timestamp:
- 消息的时间戳,用于记录消息的生成时间。
- 类型:
Long
(毫秒时间戳,可为null
)。 - 示例:
System.currentTimeMillis()
-
Headers:
- 可选的消息头,包含额外的元数据。
- 类型:
Iterable<Header>
,键值对形式。 - 示例:
new RecordHeaders().add("header-key", "header-value".getBytes())
注意事项
-
序列化:
- 键和值必须可序列化。
- 在创建
KafkaProducer
时,需要配置键和值的序列化器(如StringSerializer
或自定义序列化器)。
-
时间戳:
- 默认使用 Kafka 服务器时间戳(日志时间)。
- 如果提供时间戳,Kafka 会以该时间戳为准。
-
消息头:
- 消息头常用于携带附加信息,如跟踪 ID、元数据等。
1.2. 生产者发送流程
Kafka 生产者发送消息流程:
( 0 ) 构建 ProducerRecord
生产者构建一条消息,封装成一个 ProducerRecord
对象,包括以下主要内容:
- 主题(Topic)
- 分区(Partition,可选)
- 消息键(Key,可选)
- 消息值(Value)
- 时间戳(Timestamp,可选)
- 消息头(Headers,可选)
(1)序列化 - 发送前,生产者要先把键和值序列化。生产者通过配置的 键序列化器 和 值序列化器 完成该步骤。
- 常用序列化器:
StringSerializer
、ByteArraySerializer
或自定义序列化器。
(2)分区 - 数据被传给分区器。如果在 ProducerRecord
中已经指定了分区,那么分区器什么也不会做;否则,分区器会根据 ProducerRecord
的键来选择一个分区。选定分区后,生产者就知道该把消息发送给哪个主题的哪个分区。
使用以下逻辑确定消息的目标分区:
- 直接指定分区: 如果
ProducerRecord
指定了分区,消息将直接发送到该分区。 - 通过键计算分区:
- 如果未指定分区但提供了键,Kafka 的默认分区器(
DefaultPartitioner
)会根据键的哈希值和主题的分区数计算分区。
- 如果未指定分区但提供了键,Kafka 的默认分区器(
- 随机选择分区: 如果既未指定分区也未提供键,Kafka 将随机选择一个分区。
(3)批次传输 - 接着,这条记录会被添加到一个记录批次中。这个批次中的所有消息都会被发送到相同的主题和分区上。有一个独立的线程负责将这些记录批次发送到相应 Broker 上。
- 批次,就是一组消息,这些消息属于同一个主题和分区。
- 发送时,会把消息分成批次传输,如果每次只发送一个消息,会占用大量的网路开销。
- Kafka 生产者使用一个内部的缓冲区(
RecordAccumulator
)来批量存储即将发送的消息。 - 同一主题和分区的消息会被聚合成批次(Batch)。
- 批量发送可以显著提高网络传输效率和吞吐量。
(4)消息压缩
- 如果启用了消息压缩(通过配置
compression.type
),Kafka 会在批次级别对消息进行压缩。- 支持的压缩算法:
gzip
、snappy
、lz4
、zstd
。
- 支持的压缩算法:
- 压缩后的消息占用更少的网络带宽和存储空间。
(5) 发送消息到 Kafka Broker
- 消息被发送到 Kafka 集群的对应 Broker 节点。
- Kafka 生产者使用异步 IO,通过
Selector
组件管理网络连接,并将消息发送到 Broker。 - Broker 收到消息后,将其存储到对应分区的日志中。
(6) Broker 确认(ACK)处理
生产者等待 Broker 的确认(ACK),根据配置的 acks
参数决定消息确认的级别:
acks=0
: 不等待确认,最快,但可能导致消息丢失。acks=1
: 等待分区的 Leader 确认,可能导致少量数据丢失。acks=all
(或-1
): 等待所有副本节点确认,最高可靠性。
(7) 重试机制
- 如果消息发送失败(如网络超时或 Leader 不可用),生产者会根据
retries
参数配置重试次数。 - 重试间隔由
retry.backoff.ms
控制。
(8) 回调(Callback)机制
- 如果生产者在发送消息时指定了回调函数,会在消息发送成功或失败后触发。
- 回调函数可用于记录日志、处理异常或更新统计数据。
(9) 异常处理
- 如果消息发送失败且重试耗尽,Kafka 生产者会抛出异常,开发者需要在代码中捕获并处理这些异常。
- 常见异常:
TimeoutException
: 消息发送超时。SerializationException
: 键或值的序列化失败。RetriableException
: 临时性错误,可重试。
生产者向 Broker 发送消息时是怎么确定向哪一个 Broker 发送消息?
Kafka 生产者在发送消息时,需要确定消息将发送到哪个 Broker 和哪个分区。这个过程依赖于主题的元数据信息和分区选择逻辑。以下是具体的流程:
1. 获取主题的元数据信息
- 元数据包含的信息:
- 主题的分区数量。
- 每个分区的 Leader Broker(负责接收写入请求)。
- 每个分区的副本信息。
- 元数据来源:
- 当生产者启动时,会向 Kafka 集群的任意一个 Broker 请求元数据信息。
- 元数据会定期更新(由配置
metadata.max.age.ms
决定),或在发生异常(如 Leader 变更)时重新获取。
2. 分区选择逻辑
Kafka 使用分区选择器(默认是 DefaultPartitioner
)来决定消息发送到哪个分区。其逻辑如下:
(1)如果指定了分区
- 直接发送到指定的分区,无需计算。
-
上述代码将消息直接发送到new ProducerRecord<>("my-topic", 0, "key", "value");
my-topic
的分区 0。
(2)如果未指定分区但提供了键
- 使用键的哈希值来计算目标分区:
partition = hash(key) % partitionCount
hash
函数:Kafka 默认使用 Java 的hashCode
方法。- 这种方式确保同一个键的消息始终发送到同一个分区,方便有序消费。
(3)如果既未指定分区也未提供键
- 生产者会随机选择一个分区,确保消息负载均衡:
- 通过循环的方式从可用分区中依次选择(轮询算法)。
3. 确定目标 Broker
- 每个分区都有一个 Leader Broker 负责处理写入请求。
- 生产者根据主题元数据确定目标分区的 Leader Broker,直接与其建立连接并发送消息。
4. 处理分区不可用情况
- 如果目标分区的 Leader Broker 不可用:
- 生产者会尝试刷新元数据以获取新的分区 Leader 信息。
- 如果刷新失败或超过配置的重试次数(
retries
),消息发送失败。
流程总结
1. 获取主题元数据(包括分区及其 Leader Broker)。
2. 使用分区选择逻辑确定目标分区: - 指定分区 → 直接使用。 - 提供键 → 哈希计算。 - 无键无分区 → 轮询分区。
3. 确定目标分区的 Leader Broker。
4. 向 Leader Broker 发送消息。
5. 如果发送失败,尝试重试或刷新元数据。
生产者配置相关参数
参数名称 | 作用 | 默认值 |
---|---|---|
metadata.max.age.ms | 元数据缓存时间,超过该时间会刷新元数据 | 300000(5分钟) |
partitioner.class | 自定义分区器类,用于分区选择逻辑 | 默认分区器 |
retries | 发送失败时的重试次数 | 0(不重试) |
retry.backoff.ms | 重试前的等待时间 | 100 |
max.in.flight.requests | 最大允许的未确认请求数 | 5 |
性能优化建议
- 批量发送: 增大
batch.size
和linger.ms
提高吞吐量。 - 压缩: 开启合适的消息压缩(
compression.type
)。 - 异步发送: 使用回调处理结果,避免阻塞主线程。
- 分区策略: 根据业务需求优化分区分配。
2. 生产者 API
Kafka 的 Java 生产者 API 主要的对象就是 KafkaProducer
。通常我们开发一个生产者的步骤有 4 步。
- 构造生产者对象所需的参数对象。
- 利用第 1 步的参数对象,创建
KafkaProducer
对象实例。 - 使用
KafkaProducer
的send
方法发送消息。 - 调用
KafkaProducer
的close
方法关闭生产者并释放各种系统资源。
2.1. 创建生产者
Kafka 生产者核心配置:
1. 基础配置
配置项 | 作用 | 默认值 |
---|---|---|
bootstrap.servers | Kafka 集群的地址列表,格式为 host1:port,host2:port | 必须配置 |
key.serializer | 键(Key)的序列化器,负责将键序列化为字节数组 | 必须配置 |
value.serializer | 值(Value)的序列化器,负责将值序列化为字节数组 | 必须配置 |
2. 消息确认和可靠性配置
配置项 | 作用 | 默认值 |
---|---|---|
acks | 确认机制: | 1 |
- 0 : 不等待 Broker 确认,消息可能丢失,但性能高。 | ||
- 1 : 仅等待 Leader 确认,性能和可靠性折中。 | ||
- all (或 -1 ): 等待所有副本确认,最高可靠性。 | ||
retries | 消息发送失败时的重试次数。 | 0 |
retry.backoff.ms | 每次重试的时间间隔(毫秒)。 | 100 |
enable.idempotence | 是否启用幂等性,确保消息不重复发送(需要 acks=all 和 max.in.flight.requests.per.connection <= 5 )。 | false |
3. 性能优化配置
配置项 | 作用 | 默认值 |
---|---|---|
batch.size | 每个分区的批次大小(字节),消息会被聚合成批次发送以提高吞吐量。 | 16384 (16 KB) |
linger.ms | 生产者等待更多消息加入批次的时间,增加延迟可以提升批量发送效率。 | 0 |
compression.type | 消息压缩算法,可选值:none 、gzip 、snappy 、lz4 、zstd 。 | none |
buffer.memory | 生产者内存缓冲区的大小(字节),用于暂存消息。 | 33554432 (32 MB) |
max.in.flight.requests.per.connection | 单个连接上允许未确认的请求数量。降低此值可避免乱序(特别是在启用幂等性的情况下)。 | 5 |
4. 分区和消息路由配置
配置项 | 作用 | 默认值 |
---|---|---|
partitioner.class | 自定义分区器类名,控制消息发送到哪个分区。 | 默认分区器 |
max.block.ms | 生产者在缓冲区满或元数据不可用时阻塞的最大时间(毫秒)。 | 60000 (1分钟) |
5. 超时和网络配置
配置项 | 作用 | 默认值 |
---|---|---|
request.timeout.ms | 等待 Broker 响应的超时时间(毫秒)。 | 30000 (30秒) |
delivery.timeout.ms | 整个消息发送的超时时间,包括重试时间。 | 120000 (2分钟) |
connections.max.idle.ms | 生产者连接空闲多久后关闭(毫秒)。 | 540000 (9分钟) |
socket.send.buffer.bytes | 生产者发送套接字缓冲区的大小。 | 131072 (128 KB) |
socket.receive.buffer.bytes | 生产者接收套接字缓冲区的大小。 | 32768 (32 KB) |
6. 幂等性和事务配置
配置项 | 作用 | 默认值 |
---|---|---|
enable.idempotence | 启用幂等性,防止消息重复(需要与 acks=all 配合)。 | false |
transactional.id | 设置事务 ID,启用事务性生产者,支持精确一次(EOS)。 | 无 |
transaction.timeout.ms | 事务的超时时间(毫秒)。 | 60000 |
示例配置
基础配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
高可靠性配置
props.put("acks", "all");
props.put("retries", 3);
props.put("enable.idempotence", true);
props.put("max.in.flight.requests.per.connection", 5);
高性能配置
props.put("batch.size", 32768); // 增大批次大小
props.put("linger.ms", 10); // 等待更多消息
props.put("compression.type", "gzip"); // 开启压缩
事务性生产者配置
props.put("transactional.id", "my-transactional-id");
常用场景的配置推荐
场景 | 推荐配置 |
---|---|
高吞吐 | batch.size=32768 、linger.ms=10 、compression.type=gzip |
高可靠性 | acks=all 、retries=3 、enable.idempotence=true 、max.in.flight.requests.per.connection=5 |
事务支持 | enable.idempotence=true 、transactional.id=my-transactional-id |
通过调整这些核心配置,Kafka 生产者可以根据不同的应用场景在性能、可靠性和复杂性之间进行权衡。
2.2. 异步发送
直接发送消息,不关心消息是否到达。
这种方式吞吐量最高,但有小概率会丢失消息。
【示例】异步发送
producer.send(new ProducerRecord<>("HelloWorld", "key", msg), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Sent out :" + msg);
}
}
});
2.3. 同步发送
返回一个 Future
对象,调用 get()
方法,会一直阻塞等待 Broker
返回结果。
这是一种可靠传输方式,但吞吐量最差。
【示例】同步发送
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
2.4. 异步响应发送
代码如下,异步方式相对于“发送并忽略返回”的方式的不同在于:在异步返回时可以执行一些操作,如:抛出异常、记录错误日志。
这是一个折中的方案,即兼顾吞吐量,也保证消息不丢失。
【示例】异步响应发送
首先,定义一个 callback:
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
然后,使用这个 callback:
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());
2.5. 关闭连接
调用 producer.close()
方法可以关闭 Kafka 生产者连接。
Producer<String, String> producer = new KafkaProducer<>(properties);
try {
producer.send(new ProducerRecord<>(topic, msg));
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭连接
producer.close();
}
3. 生产者的连接
Apache Kafka 的所有通信都是基于 TCP 的。无论是生产者、消费者,还是 Broker 之间的通信都是如此。
选用 TCP 连接是由于 TCP 本身提供的一些高级功能,如多路复用请求以及同时轮询多个连接的能力。
3.1. 何时创建 TCP 连接
Kafka 生产者创建连接有三个时机:
(1)在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时,首先会创建与 bootstrap.servers
中所有 Broker 的 TCP 连接。
(2)当 Producer 更新集群的元数据信息之后,如果发现与某些 Broker 当前没有连接,那么它就会创建一个 TCP 连接。
- 场景一:当 Producer 尝试给一个不存在的主题发送消息时,Broker 会告诉 Producer 说这个主题不存在。此时 Producer 会发送 METADATA 请求给 Kafka 集群,去尝试获取最新的元数据信息。
- 场景二:Producer 通过
metadata.max.age.ms
参数定期地去更新元数据信息。该参数的默认值是 300000,即 5 分钟,也就是说不管集群那边是否有变化,Producer 每 5 分钟都会强制刷新一次元数据以保证它是最及时的数据。
(3)当要发送消息时,Producer 发现尚不存在与目标 Broker 的连接,会创建一个 TCP 连接。
3.2. 何时关闭 TCP 连接
Producer 端关闭 TCP 连接的方式有两种:一种是用户主动关闭;一种是 Kafka 自动关闭。
主动关闭是指调用 producer.close()
方法来关闭生产者连接;甚至包括用户调用 kill -9
主动“杀掉”Producer 应用。
如果设置 Producer 端 connections.max.idle.ms
参数大于 0(默认为 9 分钟),意味着,在 connections.max.idle.ms
指定时间内,如果没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。如果设置该参数为 -1
,TCP 连接将成为永久长连接。
值得注意的是,在第二种方式中,TCP 连接是在 Broker 端被关闭的,但其实这个 TCP 连接的发起方是客户端,因此在 TCP 看来,这属于被动关闭的场景,即 passive close。被动关闭的后果就是会产生大量的 CLOSE_WAIT 连接,因此 Producer 端或 Client 端没有机会显式地观测到此连接已被中断。
4. 序列化
Kafka 内置了常用 Java 基础类型的序列化器,如:StringSerializer
、IntegerSerializer
、DoubleSerializer
等。
但如果要传输较为复杂的对象,推荐使用序列化性能更高的工具,如:Avro、Thrift、Protobuf 等。
使用方式是通过实现 org.apache.kafka.common.serialization.Serializer
接口来引入自定义的序列化器。
Kafka 内置的序列化器
以下是 Kafka 内置的序列化器及其对应的类:
数据类型 | 序列化器类名 | 说明 |
---|---|---|
字符串(String) | org.apache.kafka.common.serialization.StringSerializer | 将字符串编码为 UTF-8 字节数组。 |
字节数组(Bytes) | org.apache.kafka.common.serialization.ByteArraySerializer | 直接发送字节数组,无需额外转换。 |
整数(Integer) | org.apache.kafka.common.serialization.IntegerSerializer | 将整数编码为 4 字节的字节数组(大端序)。 |
长整型(Long) | org.apache.kafka.common.serialization.LongSerializer | 将长整型编码为 8 字节的字节数组(大端序)。 |
双精度(Double) | org.apache.kafka.common.serialization.DoubleSerializer | 将双精度浮点数编码为 8 字节字节数组。 |
空值(Null) | 无需序列化类(支持键或值为 null )。 | 不发送任何字节。 |
序列化器的配置
在 Kafka 生产者中,序列化器通过以下配置指定:
- Key 序列化器:
key.serializer
- Value 序列化器:
value.serializer
示例配置:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
上述配置中:
- 键和值都被序列化为 UTF-8 格式的字符串。
序列化器的使用场景
-
字符串序列化:
- 常用于键或值是文本数据的场景。
- 例如,发送 JSON 格式的消息时,值通常会序列化为字符串。
-
字节数组序列化:
- 如果数据已经是二进制格式(如图像、文件等),直接使用
ByteArraySerializer
发送。 - 避免重复序列化,提高性能。
- 如果数据已经是二进制格式(如图像、文件等),直接使用
-
整数或长整型序列化:
- 适合键是简单数字的场景(如用户 ID、订单编号)。
- 在分区计算时,整数键的哈希值更稳定。
-
双精度序列化:
- 适用于传递科学计算或金融领域的精确浮点数值。
自定义序列化器
如果内置的序列化器无法满足需求(例如,自定义对象),可以实现 Kafka 提供的序列化接口:org.apache.kafka.common.serialization.Serializer
。
自定义序列化器实现:
import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.databind.ObjectMapper;
public class CustomObjectSerializer<T> implements Serializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public byte[] serialize(String topic, T data) {
try {
return objectMapper.writeValueAsBytes(data); // 使用 Jackson 序列化对象
} catch (Exception e) {
throw new RuntimeException("Failed to serialize object", e);
}
}
}
使用自定义序列化器:
props.put("value.serializer", "com.example.CustomObjectSerializer");
总结
Kafka 提供的内置序列化器已经涵盖了大多数常见的数据类型:
- 文本数据推荐使用
StringSerializer
。 - 二进制数据直接使用
ByteArraySerializer
。 - 数值类型使用
IntegerSerializer
或LongSerializer
。
对于复杂的对象或自定义需求,可以自行实现序列化逻辑。选择合适的序列化器是确保消息高效传输和处理的关键。
5. 分区
5.1. 什么是分区
Kafka 的数据结构采用三级结构,即:主题(Topic)、分区(Partition)、消息(Record)。
在 Kafka 中,任意一个 Topic 维护了一组 Partition 日志,如下所示:
每个 Partition 都是一个单调递增的、不可变的日志记录,以不断追加的方式写入数据。Partition 中的每条记录会被分配一个单调递增的 id 号,称为偏移量(Offset),用于唯一标识 Partition 内的每条记录。
5.2. 为什么要分区
为什么 Kafka 的数据结构采用三级结构?
分区的作用就是提供负载均衡的能力,以实现系统的高伸缩性(Scalability)。
不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的机器节点来增加整体系统的吞吐量。
以下是 Kafka 使用分区的几个主要原因:
1. 提高并发性和吞吐量
- 负载均衡:Kafka 将主题拆分为多个分区,使得生产者和消费者可以并行操作不同的分区。每个分区可以独立地接收和处理消息,因此可以在集群中分布负载,提升整体吞吐量。
- 并行写入和读取:生产者可以同时将消息写入不同的分区,消费者也可以并行地从多个分区读取数据,这样可以有效地提高系统的吞吐量。
2. 水平扩展性
- 扩展集群:Kafka 的分区机制使得它能够方便地横向扩展。当数据量增大时,可以通过增加更多的分区和 Broker 来处理更多的负载。每个分区可以分配给不同的 Broker,从而实现数据的分布式存储和处理。
- 分布式存储:每个分区的数据可以分布在集群的多个 Broker 上,不同 Broker 负责不同的分区。随着集群扩展,分区数量可以增加,进而提升集群的容量和性能。
3. 容错性和高可用性
- 副本机制:Kafka 通过将分区数据复制到多个 Broker 来实现高可用性。每个分区有一个 Leader 和多个 Follower 副本,Leader 负责处理读写请求,Follower 副本则负责数据同步。在发生故障时,副本可以作为备份,确保数据的持久性和高可用性。
- 分区级别的容错:即使某个 Broker 出现故障,只要该 Broker 上的分区有副本存在,数据依然可以在集群中的其他 Broker 上访问。Kafka 会自动进行副本的切换,保证消息不丢失。
4. 分区带来的顺序性
- 保证分区内顺序性:Kafka 保证同一分区内的消息是有顺序的。生产者向分区写入的消息会按照发送顺序存储,消费者读取分区中的消息时也会按顺序消费。这对于需要保证数据顺序的应用(例如,事件日志、事务处理等)非常重要。
- 跨分区无顺序性保证:Kafka 不保证跨分区的消息顺序,因此,如果对全局顺序有严格要求,需要通过键(key)来确保同一键的消息发送到同一分区,从而保持顺序。
5. 灵活的分区策略
- 分区键控制数据分布:Kafka 通过分区键(通常是消息的 key)来决定消息发送到哪个分区。例如,同一类型的数据或属于同一用户的数据可以通过相同的键发送到同一个分区,从而保证这些消息的顺序性。
- 分区的数量可配置:Kafka 允许在创建主题时指定分区的数量。根据业务需求,可以灵活地调整分区数,确保系统在不同负载下的性能需求。
6. 实现更高效的消费
- 并行消费:分区使得 Kafka 支持多个消费者并行消费消息。当消费者组中的多个消费者分配到不同的分区时,每个消费者可以独立地消费各自分配的分区,进一步提高消费的效率。
- 消费者组:Kafka 支持消费者组(consumer groups)来实现多进程或多线程消费。每个消费者组中的每个消费者负责消费不同的分区,保证同一分区的消息只被一个消费者消费。
分区的工作原理
-
生产者将消息发送到分区:
- 生产者根据配置或分区键决定将消息发送到哪个分区。
- 如果消息没有指定分区,则生产者可以选择使用轮询或其他策略选择目标分区。
- 分区选择通常通过生产者的分区器(
Partitioner
)来完成。
-
Broker 存储消息:
- 每个分区都由一个或多个 Broker 负责存储。每个分区在 Kafka 集群中的一个 Broker 上有一个主副本(Leader),其他副本(Follower)存储相同的数据。
- 消息写入时由 Leader 负责,Follower 副本进行同步。
-
消费者消费消息:
- 消费者根据消费组和分区分配策略,从不同分区读取消息。
- Kafka 保证每个消费者组内每个分区只有一个消费者进行消费,避免重复消费。
总结
Kafka 分区机制的核心优势包括:
- 提供高并发的消息生产和消费能力,提升系统吞吐量。
- 支持水平扩展,能够处理大规模数据。
- 提供数据副本和故障恢复机制,确保数据可靠性。
- 保证分区内消息顺序,但不保证跨分区的顺序。
因此,分区是 Kafka 实现高可用、高性能和高扩展性的关键因素。
5.3. 分区策略
1. 默认分区策略(轮询)
如果生产者没有显式指定分区,Kafka 默认使用 轮询(Round-robin)策略将消息均匀地分配到可用的分区。即使生产者发送的消息没有指定 key
,Kafka 也会按顺序将消息轮流写入各个分区。
- 场景:当没有特别的需求来保证消息顺序时,轮询策略能够确保消息负载均衡,适合需要高吞吐量的场景。
示例:
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "message value");
// 没有指定分区,Kafka 使用默认的轮询分区策略
2. 基于键的分区策略
生产者可以使用 消息的 Key 来决定将消息发送到哪个分区。Kafka 默认使用该键的哈希值来进行分区计算。具体来说,Kafka 会计算 key.hashCode()
,然后使用这个哈希值除以分区数,得到一个目标分区的编号。
- 场景:这种策略适用于需要保证相同
key
的消息顺序的场景。比如,同一用户的所有操作需要保证顺序,或者所有有关某个会话的消息必须按顺序消费。
计算公式:
partition = key.hashCode() % numberOfPartitions
- 优点:
- 保证同一
key
的消息始终发送到同一个分区,保持顺序。 - 分区分配更加稳定,避免了不必要的负载均衡和数据重新分布。
- 保证同一
示例:
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "user123", "message value");
// 这里的 "user123" 是 key,Kafka 会根据其哈希值选择目标分区
3. 自定义分区策略
Kafka 允许生产者自定义分区器。通过实现 org.apache.kafka.clients.producer.Partitioner
接口,可以根据业务需求来决定消息的分区。自定义分区器可以根据更复杂的规则来确定目标分区,如按时间、消息内容等自定义逻辑进行分区。
- 场景:如果内置的分区策略不能满足需求(比如需要根据业务逻辑或消息的内容来决定分区),则可以实现自定义分区策略。
自定义分区器示例:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
// 可选:初始化分区器,获取配置等
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 这里可以根据自定义逻辑来决定分区
if (key instanceof String && ((String) key).equals("special_key")) {
return 0; // 将 "special_key" 的消息定向到分区 0
}
// 默认情况下按照 key 的 hash 计算分区
return Math.abs(keyBytes.hashCode()) % cluster.partitionCountForTopic(topic);
}
@Override
public void close() {
// 可选:关闭资源
}
}
在生产者配置中,使用自定义分区器:
props.put("partitioner.class", "com.example.CustomPartitioner");
4. 选择分区的策略(Producer API)
Kafka 的生产者客户端通过以下几种方式来选择分区:
- 未指定键时的分区选择:
- 当生产者没有提供消息的
key
,Kafka 默认使用 轮询 策略(Round-robin),按顺序选择一个分区来发送消息。
- 当生产者没有提供消息的
- 指定键时的分区选择:
- 当生产者提供了消息的
key
时,Kafka 会使用哈希算法根据该key
来选择分区,这保证了相同key
的消息发送到同一个分区。
- 当生产者提供了消息的
5. 分区分配(Consumer Group)
消费者组(Consumer Group)中的每个消费者被分配到一个或多个分区来进行并行消费,确保每个分区在消费者组内只有一个消费者。Kafka 的分区分配是动态的,分区可以在消费者数量变化时重新分配。
- 默认分配策略:
- Range 分配器:将分区按顺序分配给消费者,每个消费者尽可能地均匀分配到多个连续的分区。
- Round-robin 分配器:按消费者的数量轮流分配分区,使得每个消费者分配到相对均等的分区数。
消费者在启动时可以配置分配策略:
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
总结:Kafka 分区策略
-
轮询(Round-robin)策略:
- 默认策略,适用于负载均衡但不要求消息顺序的情况。
-
键控制(Hashing)策略:
- 适用于需要保证相同
key
的消息发送到同一个分区,并保持顺序的场景。
- 适用于需要保证相同
-
自定义分区器:
- 当内置策略无法满足需求时,可以通过实现
Partitioner
接口来定义更复杂的分区策略。
- 当内置策略无法满足需求时,可以通过实现
-
消费者分区分配策略:
- 消费者通过分区分配策略决定如何从多个分区中分配任务,支持并行消费。
通过合理选择和定制分区策略,Kafka 可以高效地处理大量数据,保持高吞吐量、低延迟,并能在不同的消费场景下提供灵活的数据分发。
6. 压缩
6.1. Kafka 的消息格式
Kafka 的消息格式是高度优化和简化的,以便在分布式环境中高效传输、存储和检索。Kafka 的消息结构由几个关键的元素组成,包括消息头、消息体以及相关的元数据。了解 Kafka 的消息格式对于开发者在处理消息时非常重要,特别是在对消息的生产、消费以及存储管理等方面。
Kafka 消息的核心结构
Kafka 的消息由以下几个部分组成:
- 消息头(Message Header)
- 消息体(Message Body)
- 消息元数据(Metadata)
1. 消息结构概览
Kafka 中的每条消息是由以下几个主要部分组成:
- Key(可选):消息的键,用于分区。用于确定消息写入哪个分区。
- Value(必填):消息的实际内容或负载,生产者发送的数据。
- Timestamp:消息的时间戳,表示消息生产的时间。
- Headers(可选):附加的键值对,允许开发者携带自定义的元数据。
- Message Metadata:包含分区和偏移量等其他重要信息。
2. Kafka 消息格式的详细说明
a. Key 和 Value
- Key:消息的键,通常用于控制消息的分区。当生产者向 Kafka 发送消息时,如果指定了
key
,Kafka 会基于key
的哈希值来确定该消息属于哪个分区。如果没有指定key
,Kafka 会使用默认的分区策略(例如轮询)来决定消息的分区。 - Value:消息的实际内容,即生产者发送的消息数据。通常是字节数组,可以是任何类型的数据,如文本、JSON、二进制数据等。
b. 消息体(Payload)
Kafka 的每条消息都是字节数组(byte array)。key
和 value
都是字节数组形式,因此消息体的数据可以是任意格式,取决于应用程序的需求。Kafka 本身并不对消息内容做任何限制或约束,数据格式完全由生产者和消费者的应用程序决定。
c. Timestamp
每条消息在 Kafka 中都会包含一个时间戳,通常是消息生产的时间。时间戳有两种类型:
- Create Time:消息创建的时间戳,表示消息被生产者发送的时间。这个时间戳通常由生产者在发送消息时设置。
- Log Append Time:消息被 Kafka 服务器写入日志的时间戳。这个时间戳由 Kafka 集群在写入时自动生成。
Kafka 可以根据配置来使用这两种时间戳。
d. Headers
Kafka 消息可以包含任意数量的键值对形式的 Headers,这是 Kafka 2.0 引入的新特性。消息头可以用于携带元数据,开发者可以自由定义这些头信息。这些头信息在 Kafka 消息中是可选的。
- 用途:Headers 可以用来传递一些额外的元数据,例如跟踪标识符、消息类型等,而不必将它们包含在
key
或value
中。
e. 消息元数据(Metadata)
- Partition:Kafka 中的每条消息都包含目标分区的编号。在消息被生产者发送时,Kafka 会根据分区策略确定目标分区,消息被写入该分区。
- Offset:每条消息都有一个唯一的 偏移量,这是 Kafka 中每个分区内消息的唯一标识符。Kafka 会根据偏移量来确保消费者能准确地读取消息。
- Leader 和 Replica:Kafka 在每个分区中维护一个 Leader 副本,其他副本为 Replica。消费者从 Leader 分区读取数据。
3. 消息结构示意
以下是 Kafka 消息的简化结构:
| Key (Optional) | Value (Required) | Timestamp | Headers (Optional) | Metadata (Partition, Offset) |
|-----------------|------------------|-----------|--------------------|-----------------------------|
| User ID | User Message | 1629384 | { 'header1': 'value1'} | Partition: 0, Offset: 1234 | | Order ID | Order Details | 1629385 | { 'header2': 'value2'} | Partition: 1, Offset: 1235 |
4. 消息的物理存储结构
在 Kafka 中,消息是按分区存储的,每个分区都是一个 日志。消息存储在磁盘上,格式如下:
- Kafka 将每个分区的数据存储为多个 日志段文件(Log Segments),每个日志段包含一组连续的消息。
- 每条消息在存储时会附加一个 偏移量(Offset)。偏移量是分区内的唯一标识符,帮助消费者跟踪已消费的消息。
5. 消息的传输和存储流程
Kafka 消息的传输和存储大致流程如下:
- 生产者发送消息:生产者使用 Kafka 提供的 Producer API 发送消息,并根据分区策略将消息写入到目标分区。
- 消息存储:Kafka 将消息存储到对应分区的日志中。每个分区有多个 日志文件,消息会附带一个偏移量。
- 消息消费:消费者通过 Consumer API 消费消息,通过偏移量进行跟踪。
6. 例子:发送消息的结构
假设我们要发送一条消息,消息包含:
- Key:
user123
- Value:
User message content
- Timestamp: 当前时间
- Headers:
{'header1': 'value1'}
使用 Kafka 的生产者发送这条消息时,Kafka 会将消息的 Key 和 Value 转换为字节数组,然后存储和传输。Kafka 会计算 key.hashCode()
并决定该消息应写入哪个分区。消息的时间戳和其他元数据(如分区、偏移量)将与消息一起存储。
总结
Kafka 的消息格式主要由 Key、Value、Timestamp、Headers 和 Metadata 组成。消息本身是一个字节数组,Kafka 通过分区和偏移量来管理消息的顺序和存储。Kafka 提供了灵活的消息格式,可以根据需要传输任何类型的数据,并且在分布式系统中保持高效的读写性能。
6.2. Kafka 的压缩流程
Kafka 的压缩流程,一言以概之——Producer 端压缩、Broker 端保持、Consumer 端解压缩。
压缩过程
在 Kafka 中,压缩可能发生在两个地方:生产者端和 Broker 端。
生产者程序中配置 compression.type
参数即表示启用指定类型的压缩算法。
【示例】开启 GZIP 的 Producer 对象
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启 GZIP 压缩
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);
通常,Broker 从 Producer 端接收到消息后,不做任何处理。以下两种情况除外:
-
情况一:Broker 端指定了和 Producer 端不同的压缩算法。显然,应该尽量避免这种情况。
-
情况二:Broker 端发生了消息格式转换。所谓的消息格式转换,主要是为了兼容老版本的消费者程序。在一个生产环境中,Kafka 集群中同时保存多种版本的消息格式非常常见。为了兼容老版本的格式,Broker 端会对新版本消息执行向老版本格式的转换。这个过程中会涉及消息的解压缩和重新压缩。一般情况下这种消息格式转换对性能是有很大影响的,除了这里的压缩之外,它还让 Kafka 丧失了引以为豪的 Zero Copy 特性。
所谓零拷贝,说的是当数据在磁盘和网络进行传输时避免昂贵的内核态数据拷贝,从而实现快速的数据传输。因此如果 Kafka 享受不到这个特性的话,性能必然有所损失,所以尽量保证消息格式的统一吧,这样不仅可以避免不必要的解压缩 / 重新压缩,对提升其他方面的性能也大有裨益。
解压缩的过程
通常来说解压缩发生在消费者程序中,也就是说 Producer 发送压缩消息到 Broker 后,Broker 照单全收并原样保存起来。当 Consumer 程序请求这部分消息时,Broker 依然原样发送出去,当消息到达 Consumer 端后,由 Consumer 自行解压缩还原成之前的消息。
那么现在问题来了,Consumer 怎么知道这些消息是用何种压缩算法压缩的呢?其实答案就在消息中。Kafka 会将启用了哪种压缩算法封装进消息集合中,这样当 Consumer 读取到消息集合时,它自然就知道了这些消息使用的是哪种压缩算法。
压缩算法
在 Kafka 2.1.0 版本之前,Kafka 支持 3 种压缩算法:GZIP、Snappy 和 LZ4。从 2.1.0 开始,Kafka 正式支持 Zstandard 算法(简写为 zstd)。
在实际使用中,GZIP、Snappy、LZ4 甚至是 zstd 的表现各有千秋。但对于 Kafka 而言,它们的性能测试结果却出奇得一致,即在吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在压缩比方面,zstd > LZ4 > GZIP > Snappy。
如果客户端机器 CPU 资源有很多富余,强烈建议开启 zstd 压缩,这样能极大地节省网络资源消耗。
Kafka 生产者压缩流程
当生产者向 Kafka 发送消息时,生产者会根据配置的压缩类型进行压缩。具体流程如下:
-
消息批量化:
- Kafka 的生产者客户端会将多条消息聚集成一个消息批次(batch)。Kafka 的消息通常是批量发送的,而不是一条一条发送的。通过批量发送,Kafka 可以减少网络开销,提高吞吐量。
-
压缩消息批次:
- 一旦消息被打包成一个批次,生产者会根据配置的压缩方式对整个批次进行压缩。压缩是在消息批次级别进行的,而不是单条消息。
- 如果
compression.type
配置为gzip
,则生产者会使用gzip
算法对该消息批次进行压缩。压缩后的批次将以压缩格式存储。
-
发送压缩批次:
- 压缩后的消息批次被发送到 Kafka 的 Broker。生产者会根据分区策略选择目标分区并将消息发送到对应的分区。
Kafka Broker 存储压缩消息
在 Kafka 的 Broker 端,消息存储在日志文件中。当消息批次被传输到 Broker 时,压缩后的批次会被直接存储在日志文件中。
- 磁盘存储:Kafka 会将压缩后的批次直接写入磁盘。由于压缩减少了数据的存储空间,Kafka 可以在磁盘上存储更多的消息。
- 副本同步:Kafka 会将压缩后的消息副本同步到其他 Broker 中的副本。压缩后的消息会以相同的压缩格式存储在各个副本中,保持数据一致性。
Kafka 消费者解压缩流程
消费者从 Kafka 中读取消息时,Kafka 会执行解压缩操作。具体流程如下:
-
拉取压缩消息:
- 消费者通过
KafkaConsumer
拉取消息。如果消息批次是压缩格式的,消费者会发现批次数据是压缩的。
- 消费者通过
-
解压缩消息:
- Kafka 会在消费者端自动解压缩这些压缩的消息批次。解压缩是按批次进行的,消费者会根据压缩类型(gzip、snappy、lz4 或 zstd)自动选择相应的解压缩算法。
- 解压后的消息会被传递给消费者应用程序,消费者可以按正常的方式处理这些消息。
-
消费未压缩消息:
- 如果消息没有经过压缩,消费者将直接读取和消费未压缩的消息。
压缩的优势和应用场景
a. 降低存储空间:
- 压缩大大减少了 Kafka 中消息的存储空间,尤其是在存储大量数据时,这可以显著降低磁盘空间的使用。
b. 提高吞吐量:
- 通过压缩消息,Kafka 减少了网络传输中的数据量,从而提高了吞吐量。在高流量情况下,压缩能够减少网络延迟和带宽使用。
c. 降低网络带宽使用:
- 在消息通过网络传输时,压缩可以显著减少需要传输的数据量,降低网络带宽的消耗,特别是在高负载的分布式环境中。
d. 数据存储和传输优化:
- Kafka 的压缩能够优化存储和传输过程,尤其是当消息内容重复或有一定的模式时,压缩效果尤为显著。
选择压缩算法的考量
- gzip:适合需要更高压缩比,但对延迟敏感度较低的场景。由于其压缩比高,但速度相对较慢,因此适合批量处理或存储密集型任务。
- snappy:适合对延迟敏感且数据量较大的场景。它提供了较快的压缩和解压速度,但压缩比略低。
- lz4:适用于实时性要求较高的系统,提供快速的压缩和解压速度,压缩比适中。
- zstd:提供高压缩比和较快的压缩速度,适用于大规模数据处理任务。
6.3. 何时启用压缩
何时启用压缩是比较合适的时机呢?
压缩是在 Producer 端完成的工作,那么启用压缩的一个条件就是 Producer 程序运行机器上的 CPU 资源要很充足。如果 Producer 运行机器本身 CPU 已经消耗殆尽了,那么启用消息压缩无疑是雪上加霜,只会适得其反。
如果环境中带宽资源有限,那么也建议开启压缩。
7. 幂等性
7.1. 什么是幂等性
幂等性(Idempotence) 是指在多次执行同一操作时,操作的结果与执行的次数无关,只有第一次执行会产生效果,后续相同的操作不会对系统的状态造成任何额外的影响。在编程和分布式系统中,幂等性通常用于确保在网络问题或重复请求的情况下,操作的结果不会重复或产生不一致。
幂等性的基本特点:
- 多次执行,结果不变:即使同一个操作被多次执行,结果依然是相同的,系统的状态不会发生变化。
- 系统容错性:在分布式系统中,网络中断或超时可能导致请求被重复发送。幂等性确保即使操作被多次执行,也不会导致错误的状态。
7.2. Kafka Producer 的幂等性
在 Kafka 中,Producer 默认不是幂等性的,但我们可以创建幂等性 Producer。它其实是 0.11.0.0 版本引入的新功能。在此之前,Kafka 向分区发送数据时,可能会出现同一条消息被发送了多次,导致消息重复的情况。在 0.11 之后,指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture)
,
或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
。
enable.idempotence
被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的去重。底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。当然,实际的实现原理并没有这么简单,但你大致可以这么理解。
我们必须要了解幂等性 Producer 的作用范围:
- 首先,
enable.idempotence
只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。 - 其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。
如果想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是事务(transaction)或者依赖事务型 Producer。这也是幂等性 Producer 和事务型 Producer 的最大区别!
Kafka Producer 的幂等性实现原理
Kafka 生产者的幂等性是通过以下机制实现的:
-
生产者配置
enable.idempotence=true
: Kafka 生产者通过设置enable.idempotence=true
来启用幂等性功能。在启用此功能时,生产者在发送消息时,会自动为每个请求分配一个 消息序列号,Kafka 会根据该序列号来判断消息是否是重复的。 -
消息序列号(Producer Sequence Number): Kafka 在生产者发送每条消息时,会生成一个 消息序列号(即每个 Producer 实例的消息计数器)。每个消息会携带一个序列号,在每个分区的消息队列中,消息会按照其序列号进行排序。Kafka Broker 会根据这些序列号来判断消息是否重复。如果一个消息被发送多次,Kafka 会根据序列号识别并丢弃重复的消息。
-
事务性(Transaction): 在 Kafka 中,幂等性和 事务 是密切相关的。Kafka 通过对每个生产者分配一个 Producer ID 和 事务 ID 来确保幂等性。每次发送消息时,生产者会带上事务 ID。如果该事务的消息由于某些原因(如网络失败)没有成功发送并发生重试,Kafka 会根据事务 ID 确保重复消息不会被写入。即使生产者重试发送同样的消息,Kafka 会确保这些重复的消息不会再次被存储。
-
分区级别的去重: Kafka 生产者的幂等性是分区级别的。即生产者为每个分区维护一个 单独的序列号。当生产者发送消息时,Kafka 使用分区级别的序列号来判断该消息是否已经被写入分区。如果已经写入,则该消息会被丢弃。
-
Kafka 的请求和响应机制: Kafka 在进行消息写入时,使用一个 确认机制(acknowledgement),它保证了消息的成功写入。在幂等性启用的情况下,即使由于网络问题或超时,生产者会自动重试发送消息,但只会将消息写入 Kafka 一次。
Kafka Producer 的幂等性工作流程
-
生产者发送消息: 当生产者发送消息时,它会为每个消息分配一个序列号,并使用这个序列号来追踪消息。
-
消息到达 Broker: 消息到达 Broker 后,Broker 会检查消息的序列号和
Producer ID
。如果 Broker 已经接收到该消息,或者序列号不符合要求(例如,序列号过小),它会忽略该消息。 -
消息写入分区: Broker 会将消息写入分区,并确保消息的顺序和唯一性。如果消息已经存在(例如,重复的消息),Kafka 会自动丢弃这些消息,而不会重复写入。
-
确认消息写入: 如果消息成功写入分区,Kafka 会向生产者返回成功的响应。生产者收到确认后才会认为消息被成功写入。
-
重试机制: 如果由于网络问题等原因,生产者没有收到消息写入成功的确认,它会重试发送相同的消息。由于启用了幂等性,Kafka 会确保即使发生重试,消息也不会被重复写入。
Kafka 的幂等性如何保证精确一次交付
启用幂等性后,Kafka 生产者能够保证消息的精确一次交付(Exactly Once Semantics,EOS)。这意味着:
- 重复消息不会写入:即使网络延迟或其他问题导致消息重试,Kafka 会通过序列号和事务 ID 来确保消息不会重复写入。
- 保证顺序:在单个分区内,消息会按照生产者发送的顺序严格写入,并且不会出现乱序问题。
- 容错性:即使在高网络延迟或 Kafka Broker 宕机的情况下,Kafka 也能保证消息的幂等性和精确一次交付。
幂等性与事务性
- 幂等性 主要保证单条消息的可靠性,确保在生产者重试时不会重复发送消息。
- 事务性 进一步提供了跨多个消息或多个分区的精确一次交付语义。Kafka 的 事务性 确保一组消息(跨多个分区)要么全部成功,要么全部失败。
通过启用 Kafka 事务(acks=all
, transactional.id
),生产者可以确保整个事务内的消息要么全部被成功提交,要么全部被回滚。事务性提供了更强的保障,尤其适用于需要跨多个分区发送消息的应用场景。
7.3. PID 和 Sequence Number
为了实现 Producer 的幂等性,Kafka 引入了 Producer ID(即 PID)和 Sequence Number。
- PID。每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个 PID 对用户是不可见的。
- 作用:Producer ID 用于区分不同的生产者,并且帮助 Kafka Broker 确定来自同一个生产者实例的消息是否已经被处理。每个生产者在启动时,Kafka 会为其分配一个唯一的 Producer ID。这个 ID 保证了即使在网络或其他故障的情况下,生产者的消息能够正确地被识别并去重。
- 分配方式:Kafka 生成 Producer ID 时,使用了一个全局唯一的标识符,并在 Kafka Broker 中为每个生产者维护一个 Producer ID。每个生产者实例只有一个 Producer ID,它在整个生命周期内都不会改变。
- Sequence Numbler。对于每个 PID,该 Producer 发送数据的每个
<Topic, Partition>
都对应一个从 0 开始单调递增的 Sequence Number。 -
作用:Sequence Number 主要用于保证消息的顺序性和幂等性。Kafka 通过检查消息的 Sequence Number 来确定是否为重复消息。如果生产者发送的消息序列号与已经接收到的消息序列号相同,Kafka 会丢弃重复的消息,避免重复写入。
-
工作原理:每个生产者(通过 Producer ID 标识)都维护一个递增的序列号,每发送一条消息,序列号就加 1。这样,Kafka 可以通过该序列号判断一条消息是否是重复的。对于同一个生产者 ID 和分区,Kafka 会严格按照序列号顺序处理消息。
Broker 端在缓存中保存了这 seq number,对于接收的每条消息,如果其序号比 Broker 缓存中序号大于 1 则接受它,否则将其丢弃。这样就可以实现了消息重复提交了。但是,只能保证单个 Producer 对于同一个 <Topic, Partition>
的 Exactly Once 语义。不能保证同一个 Producer 一个 topic 不同的 partion 幂等。
如何通过 PID 和 Sequence Number 实现幂等性
-
Producer ID 和 Sequence Number 的结合:当 Kafka 生产者发送一条消息时,消息携带着生产者的 Producer ID 和消息的 Sequence Number。这样,Kafka 就能够知道这条消息来自哪个生产者,并且可以检查该生产者发送的消息是否已经处理过。通过对比 Producer ID 和 Sequence Number,Kafka 可以判断消息是否重复。
-
重复消息的判断:如果生产者发送的消息与之前已经存储在 Kafka 中的消息具有相同的 Producer ID 和 Sequence Number,Kafka 会认为这条消息是重复的并丢弃它。因此,即使在网络超时或其他故障导致生产者重试发送同一条消息的情况下,Kafka 也能通过 PID 和 Sequence Number 确保该消息只会被写入一次。
Kafka Producer 的幂等性工作流程
-
生产者发送消息:当生产者向 Kafka 发送消息时,Kafka 为每个生产者分配一个唯一的 Producer ID。每次生产者发送消息时,都会附带一个递增的 Sequence Number。这个消息就包含了 Producer ID 和 Sequence Number。
-
Kafka Broker 接收到消息:Kafka Broker 会通过 Producer ID 和 Sequence Number 来判断消息是否是重复的。
- 如果同一个生产者(由 Producer ID 标识)在短时间内发送了多次相同的消息(例如,由于网络问题导致生产者重试),Kafka 会通过 Sequence Number 来判断消息是否已经存在。如果消息的 Sequence Number 已经处理过,那么 Kafka 会丢弃这条消息。
-
消息写入:如果消息是新的(即其 Sequence Number 是递增的),Kafka 会将消息写入分区。Kafka 确保每个生产者的消息按照其 Sequence Number 顺序写入。
-
生产者重试:在生产者发送消息时,如果没有收到确认(例如,由于网络问题),生产者会重试发送相同的消息。因为每条消息都包含 Producer ID 和 Sequence Number,即使生产者重试发送相同的消息,Kafka 也会根据消息的序列号来确认消息是否已经处理,避免重复插入。
-
消息确认:一旦消息成功写入 Kafka,Broker 会返回一个成功的确认响应给生产者。如果生产者没有收到确认,会进行重试;否则,消息就被认为已经成功写入。
Kafka 生产者的幂等性配置
为了启用生产者的幂等性功能,必须在生产者的配置中启用 enable.idempotence=true
。同时,生产者的以下配置参数也与幂等性密切相关:
-
acks=all
:确保所有副本都收到消息。这是启用幂等性的关键配置之一,确保生产者发送的消息能在所有副本上确认并最终一致。 -
retries=Integer.MAX_VALUE
:配置生产者的重试次数为无限次,这样在网络波动或 Broker 故障的情况下,生产者会不断尝试重新发送消息,直到消息成功写入。 -
linger.ms
和batch.size
:这些配置帮助生产者将更多消息合并成一个批次,提升吞吐量。
配置示例:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 启用幂等性
props.put("acks", "all"); // 确保所有副本确认收到消息
props.put("retries", Integer.MAX_VALUE); // 设置重试次数为无限
props.put("linger.ms", 1); // 最小延迟,更多消息打包在一起发送
props.put("batch.size", 16384); // 批量大小
// 启用幂等性
props.put("enable.idempotence", "true");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
7.4. 幂等性的应用实例
(1)配置属性
需要设置:
enable.idempotence
,需要设置为 ture,此时就会默认把 acks 设置为 all,所以不需要再设置 acks 属性了。
// 指定生产者的配置
final Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
// 设置 key 的序列化器
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 设置 value 的序列化器
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启幂等性
properties.put("enable.idempotence", true);
// 设置重试次数
properties.put("retries", 3);
//Reduce the no of requests less than 0
properties.put("linger.ms", 1);
// buffer.memory 控制生产者可用于缓冲的内存总量
properties.put("buffer.memory", 33554432);
// 使用配置初始化 Kafka 生产者
producer = new KafkaProducer<>(properties);
(2)发送消息
跟一般生产者一样,如下
public void produceIdempotMessage(String topic, String message) {
// 创建Producer
Producer producer = buildIdempotProducer();
// 发送消息
producer.send(new ProducerRecord<String, String>(topic, message));
producer.flush();
}
此时,因为我们并没有配置 transaction.id
属性,所以不能使用事务相关 API,如下
producer.initTransactions();
否则会出现如下错误:
Exception in thread “main” java.lang.IllegalStateException: Transactional method invoked on a non-transactional producer.
at org.apache.kafka.clients.producer.internals.TransactionManager.ensureTransactional(TransactionManager.java:777)
at org.apache.kafka.clients.producer.internals.TransactionManager.initializeTransactions(TransactionManager.java:202)
at org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:544)
8. Kafka 事务
Kafka 的事务是为了提供 精确一次交付(Exactly Once Semantics, EOS) 保证而设计的,确保消息的 原子性 和 一致性。通过 Kafka 的事务机制,生产者可以确保消息在多分区和多主题的情况下要么完全成功提交,要么完全回滚,从而避免了数据丢失和重复消息的情况。
8.1. 事务
Kafka 事务的关键概念
-
事务 ID(Transactional ID): 每个 Kafka 生产者在启用事务时都会指定一个 事务 ID,它是一个唯一的标识符,用来追踪该生产者发出的所有事务消息。这个 ID 允许 Kafka 区分不同的生产者,并确保同一生产者的消息按顺序提交或回滚。
-
事务隔离级别: Kafka 提供两种事务隔离级别:
read_committed
:消费者只会读取已提交的消息,不会读取事务中的未提交消息。默认的隔离级别是read_committed
。read_uncommitted
:消费者可以读取未提交的消息(事务内的消息)。这种模式下,消费者可能会读取到部分提交的事务数据。
-
事务日志: Kafka 使用事务日志来管理事务的状态。消息首先写入事务日志,在事务提交时才会正式写入日志文件。如果事务回滚,则所有相关消息会被丢弃。
-
生产者 API: Kafka 为生产者提供了事务控制的 API,允许生产者开始事务、提交事务或回滚事务。这些操作保证了事务内所有消息的原子性。
beginTransaction()
:开始一个事务。commitTransaction()
:提交事务,表示所有消息成功写入。abortTransaction()
:回滚事务,表示所有消息无效并丢弃。
Kafka 事务的工作流程
Kafka 事务的工作流程可以简化为以下几个步骤:
-
生产者启动事务: 当 Kafka 生产者启用事务时,Kafka 会为其分配一个唯一的 事务 ID,生产者可以通过调用
beginTransaction()
开始事务。 -
发送消息: 在事务开始后,生产者可以向 Kafka Broker 发送消息。消息会被标记为事务消息,并且先写入事务日志,而不是立即提交到日志文件。
-
提交事务: 如果生产者确认事务内的所有消息都成功发送,可以调用
commitTransaction()
来提交事务。这时,所有消息会正式写入 Kafka 中。 -
回滚事务: 如果生产者在事务处理中遇到错误,或者想要取消事务,可以调用
abortTransaction()
来回滚事务。回滚时,事务内的所有消息会被丢弃。 -
事务日志与消息提交: Kafka 会将事务消息写入事务日志,直到事务被提交或回滚。在提交时,消息会正式写入 Kafka 的日志文件;如果回滚,事务中的消息会被丢弃。
-
消费者读取消息: 消费者默认只读取已提交的消息(
read_committed
隔离级别)。因此,消费者不会读取到正在进行中的事务消息,保证了数据的一致性。
Kafka 事务的使用场景
Kafka 事务非常适合以下场景:
-
跨多个分区或主题的消息发送: Kafka 事务能够保证在跨多个分区或多个主题发送消息时,所有消息要么都成功,要么都失败。
-
精确一次交付: Kafka 的事务功能能够确保消息的精确一次交付,避免消息丢失或重复消费,适用于需要严格一致性保证的场景。
-
防止重复消费: Kafka 事务可以防止在消费者端读取到重复的消息,确保消费者不会因为生产者的重试机制而多次处理同一条消息。
Kafka 事务的限制
-
性能开销: 启用事务会导致性能开销,尤其是在写入和提交事务时,Kafka 需要处理事务日志和事务的提交状态。
-
事务提交延迟: 由于事务需要等待所有副本确认并写入事务日志,因此事务提交的延迟可能较高,尤其是在网络延迟较大的情况下。
-
事务性消费的复杂性: 在消费端,如果需要处理跨事务的消息,可能需要一些额外的处理逻辑,例如处理事务失败和重新消费的场景。
总结
Kafka 事务提供了 精确一次交付 和 跨分区、跨主题的原子操作,确保消息的处理既可靠又一致。通过 transactional.id
,Kafka 能够确保生产者的消息处理具有一致性,消费者也能够根据事务隔离级别读取正确的消息。尽管事务机制引入了一些性能开销,但它对于需要严格一致性保证的应用场景,尤其是分布式系统中,提供了强大的支持。
8.2. 事务型 Producer
生产者端配置
transactional.id
:指定生产者的事务 ID,生产者将通过这个 ID 来标识它的事务。acks=all
:确保所有副本都确认消息,保证消息的持久性和一致性。retries=Integer.MAX_VALUE
:设置生产者的重试次数为无限次,以确保即使发生网络波动,消息也能最终提交。
配置示例(生产者端):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 启用事务
props.put("acks", "all"); // 确保所有副本都收到消息
props.put("retries", Integer.MAX_VALUE); // 重试次数设为无限次
props.put("transactional.id", "my-transaction-id"); // 事务 ID,用于标识事务
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 开始事务
producer.beginTransaction();
try {
// 发送消息
producer.send(new ProducerRecord<>(topic, key, value));
// 提交事务
producer.commitTransaction();
} catch (ProducerFencedException | OutOfMemoryError | KafkaException e) {
// 事务失败,回滚
producer.abortTransaction();
} finally {
producer.close();
}
消费者端配置
消费者端的配置决定了是否能够读取未提交的消息。默认情况下,消费者读取已提交的消息。消费者可以通过 isolation.level
配置来指定事务的隔离级别:
read_committed
:只读取已提交的消息。read_uncommitted
:读取所有消息,包括未提交的事务消息。
配置示例(消费者端):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 设置事务隔离级别为 read_committed,表示只读取已提交的消息
props.put("isolation.level", "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
}
}
8.3. 事务操作的 API
Producer
提供了 initTransactions
, beginTransaction
, sendOffsets
, commitTransaction
, abortTransaction
五个事务方法。
/**
* 初始化事务。需要注意的有:
* 1、前提
* 需要保证transation.id属性被配置。
* 2、这个方法执行逻辑是:
* (1)Ensures any transactions initiated by previous instances of the producer with the same
* transactional.id are completed. If the previous instance had failed with a transaction in
* progress, it will be aborted. If the last transaction had begun completion,
* but not yet finished, this method awaits its completion.
* (2)Gets the internal producer id and epoch, used in all future transactional
* messages issued by the producer.
*
*/
public void initTransactions();
/**
* 开启事务
*/
public void beginTransaction() throws ProducerFencedException ;
/**
* 为消费者提供的在事务内提交偏移量的操作
*/
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException ;
/**
* 提交事务
*/
public void commitTransaction() throws ProducerFencedException;
/**
* 放弃事务,类似回滚事务的操作
*/
public void abortTransaction() throws ProducerFencedException ;
8.4. Kafka 事务相关配置
Kafka 事务相关配置
-
transactional.id
- 描述:设置 Kafka 生产者的事务 ID,这是每个事务的唯一标识符。它用于标识属于同一生产者实例的所有事务操作。
- 类型:
String
- 默认值:无(必须手动设置)
- 配置示例:
props.put("transactional.id", "my-transaction-id");
- 作用:确保 Kafka 能够在一个事务内追踪所有的消息,并且通过事务 ID 来处理事务的提交或回滚。
-
acks
- 描述:控制消息写入的确认级别,确保副本同步并写入磁盘。
- 类型:
String
- 可选值:
0
:生产者不等待确认。1
:生产者等待领导者确认。all
:生产者等待所有副本确认,确保数据的高可靠性。
- 默认值:
1
- 配置示例:
props.put("acks", "all"); // 确保所有副本都确认
- 作用:在启用事务时,
acks
设置为all
是推荐配置,它确保消息在所有副本都被确认后才算提交,从而保证数据一致性。
-
retries
- 描述:指定在发送失败时的重试次数,通常需要将其设置为较大的值,以确保消息最终被成功发送。
- 类型:
int
- 默认值:
0
- 配置示例:
props.put("retries", Integer.MAX_VALUE); // 设置为最大值,确保尽可能多的重试
- 作用:为了支持事务,
retries
配置应设置为Integer.MAX_VALUE
,确保 Kafka 生产者在发生暂时的网络或其他故障时能够进行重试。
-
acks
和retries
配置的配合使用 在生产者配置中,acks
和retries
配置是密切配合的。为了确保事务的可靠性,通常建议:acks
设置为all
,以确保消息在所有副本都被确认后才算完成。retries
设置为较大的值(如Integer.MAX_VALUE
),以保证消息发送的可靠性。
-
max.in.flight.requests.per.connection
- 描述:控制单个连接上最多可以有多少个未完成的请求(请求指消息的发送)。
- 类型:
int
- 默认值:
5
- 配置示例:
props.put("max.in.flight.requests.per.connection", "1");
- 作用:为了确保事务的幂等性,
max.in.flight.requests.per.connection
应该设置为1
。这可以防止在发生重试时出现乱序问题。
-
transaction.timeout.ms
- 描述:指定事务超时时间。如果事务在该时间内没有被提交或回滚,Kafka 会自动回滚事务。
- 类型:
long
- 默认值:
60000
(60秒) - 配置示例:
props.put("transaction.timeout.ms", "300000"); // 设置超时时间为 5 分钟
- 作用:防止事务长时间未提交,设置一个合理的超时时间可以避免系统因未完成的事务而阻塞。
-
isolation.level
- 描述:控制消费者读取事务消息的隔离级别。可以选择:
read_committed
:只读取已提交的消息(默认设置)。read_uncommitted
:读取所有消息,包括未提交的消息。
- 类型:
String
- 默认值:
read_committed
- 配置示例:
props.put("isolation.level", "read_committed"); // 只读取已提交的消息
- 作用:确保消费者读取的是已经完全提交的事务消息。设置为
read_committed
使得消费者不会读取到正在进行中的事务消息。
- 描述:控制消费者读取事务消息的隔离级别。可以选择:
使用 kafka 的事务 api 时的一些注意事项:
- 需要消费者的自动模式设置为 false,并且不能子再手动的进行执行
consumer#commitSync
或者consumer#commitAsyc
- 设置 Producer 端参数
transctional.id
。最好为其设置一个有意义的名字。 - 和幂等性 Producer 一样,开启
enable.idempotence = true
。如果配置了transaction.id
,则此时enable.idempotence
会被设置为 true - 消费者需要配置事务隔离级别
isolation.level
。在consume-trnasform-produce
模式下使用事务时,必须设置为READ_COMMITTED
。read_uncommitted
:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。read_committed
:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。
在使用 Kafka 事务 API 时,有几个关键的注意事项:
-
事务 ID 唯一性
- 确保
transactional.id
对每个生产者是唯一的。多个生产者不能使用相同的事务 ID,否则会造成冲突。 - Kafka 使用事务 ID 来追踪每个生产者的事务,因此每个生产者实例应该具有唯一的事务 ID,尤其是在分布式环境中。
- 确保
-
处理事务失败
- 事务操作可能会失败,尤其是在生产者发送消息时遇到网络问题或 Kafka Broker 停止服务的情况下。必须通过适当的异常捕获来处理这些失败,例如
ProducerFencedException
、OutOfMemoryError
、KafkaException
等。 - 在发生错误时,要调用
abortTransaction()
来回滚事务,确保不将部分消息提交。
- 事务操作可能会失败,尤其是在生产者发送消息时遇到网络问题或 Kafka Broker 停止服务的情况下。必须通过适当的异常捕获来处理这些失败,例如
-
事务提交与回滚
- 在正常的消息发送流程中,一旦所有消息都成功发送,就应该调用
commitTransaction()
来提交事务。如果发送消息时遇到任何问题,应调用abortTransaction()
来回滚事务。 - 一定要确保事务的提交和回滚操作正确,否则可能会导致数据的不一致。
- 在正常的消息发送流程中,一旦所有消息都成功发送,就应该调用
-
事务超时
- 事务超时(
transaction.timeout.ms
)是一个重要的配置项,应该根据业务需求进行适当配置。如果一个事务在规定时间内未完成,Kafka 会自动回滚该事务。 - 注意,事务超时的设置应该与 Kafka Broker 的设置保持一致。
- 事务超时(
-
消息的幂等性与事务的结合
- Kafka 在启用事务时,自动开启了生产者的幂等性机制。这意味着即使发生消息重试,也不会造成重复消息。因此,为了确保事务的可靠性,必须同时启用幂等性 (
acks=all
和retries=Integer.MAX_VALUE
)。
- Kafka 在启用事务时,自动开启了生产者的幂等性机制。这意味着即使发生消息重试,也不会造成重复消息。因此,为了确保事务的可靠性,必须同时启用幂等性 (
-
高吞吐量与事务性能开销
- 事务性操作会增加一定的性能开销,因为每次消息的发送都需要记录事务日志,并且 Kafka 需要处理消息的提交和回滚。高吞吐量的场景下,可能会导致延迟增加,因此在设计系统时需要权衡事务的一致性保证与性能之间的平衡。
-
事务的重试与消息顺序
- Kafka 在发送事务消息时提供了重试机制,但如果事务提交期间出现网络故障或 Kafka Broker 故障,消息可能会发生重试。在启用事务时,必须确保
max.in.flight.requests.per.connection
配置为1
,以防止消息在重新发送时乱序。
- Kafka 在发送事务消息时提供了重试机制,但如果事务提交期间出现网络故障或 Kafka Broker 故障,消息可能会发生重试。在启用事务时,必须确保
-
事务日志和消息提交
- Kafka 会将事务内的所有消息写入事务日志,确保消息的原子性。只有在调用
commitTransaction()
时,消息才会被正式提交到分区。如果发生回滚,事务内的消息会被丢弃,确保不会将不一致的数据提交到消费者。
- Kafka 会将事务内的所有消息写入事务日志,确保消息的原子性。只有在调用
8.5. Kafka 事务应用示例
Kafka 事务功能能够确保跨多个分区和多个主题的消息处理具有 原子性,保证消息要么完全提交,要么完全回滚,从而避免了数据丢失和重复的情况。以下是一个 Kafka 事务应用的示例,展示如何使用 Kafka 的事务功能来保证数据一致性。
Kafka 事务应用场景
假设我们有一个订单系统,该系统需要将订单消息发送到多个主题(例如 order-topic
和 inventory-topic
),在这种情况下,确保两个主题的数据一致性是非常重要的。如果订单创建消息已经成功发送到 order-topic
,但由于某种原因,库存更新失败并没有成功发送到 inventory-topic
,就会导致数据不一致。因此,使用 Kafka 事务可以确保两个消息要么同时成功,要么都失败。
Kafka 生产者事务应用示例
假设我们正在创建一个订单,并需要同时更新 order-topic
和 inventory-topic
。如果所有消息都成功,我们将提交事务;如果出现任何错误,我们将回滚事务。
1. 配置 Kafka 生产者
首先,我们需要配置 Kafka 生产者的事务参数:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaTransactionProducer {
public static void main(String[] args) {
// 配置生产者
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 启用事务
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-inventory-transaction"); // 事务 ID
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 确保所有副本都确认
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 设置重试次数为无限
// 创建 Kafka 生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
// 开始事务
producer.beginTransaction();
// 发送订单创建消息
ProducerRecord<String, String> orderMessage = new ProducerRecord<>("order-topic", "order1", "Order created: order1");
producer.send(orderMessage);
// 发送库存更新消息
ProducerRecord<String, String> inventoryMessage = new ProducerRecord<>("inventory-topic", "order1", "Inventory updated for order1");
producer.send(inventoryMessage);
// 如果两条消息都成功,提交事务
producer.commitTransaction();
System.out.println("Transaction committed successfully.");
} catch (Exception e) {
// 如果发生任何异常,回滚事务
producer.abortTransaction();
System.out.println("Transaction aborted due to an error.");
} finally {
// 关闭生产者
producer.close();
}
}
}
2. 消费者配置和读取消息
在这个示例中,我们有两个主题 order-topic
和 inventory-topic
,消费者需要确保只能读取已提交的消息。
消费者配置示例
消费者会在 order-topic
和 inventory-topic
上订阅消息,并根据事务隔离级别读取已提交的消息:
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaTransactionConsumer {
public static void main(String[] args) {
// 配置消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-consumer-group");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
// 设置事务隔离级别,确保只读取已提交的消息
props.put("isolation.level", "read_committed");
// 创建 Kafka 消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("order-topic"));
consumer.subscribe(Collections.singletonList("inventory-topic"));
while (true) {
// 拉取消息
consumer.poll(1000).forEach(record -> {
System.out.println("Consumed record: " + record.value());
});
}
}
}
Kafka 事务机制工作原理
生产者端事务流程:
- 开始事务: 使用
beginTransaction()
开始事务。 - 发送消息: 发送消息到多个分区和主题。在事务内,消息不会立即被写入到 Kafka 日志。
- 提交事务: 如果所有消息都成功发送并且没有出现错误,调用
commitTransaction()
来提交事务。 - 回滚事务: 如果在发送消息过程中发生任何错误,调用
abortTransaction()
来回滚事务,所有消息都会丢失。
消费者端事务隔离:
- 消费者使用
isolation.level=read_committed
配置来确保它们只会读取已经提交的消息。消费者会忽略正在进行中的事务消息,避免读取到部分提交的消息。
4. Kafka 事务的异常处理
事务在 Kafka 中可能会遇到一些异常,尤其是在生产者端。以下是一些可能的异常和对应的处理方法:
ProducerFencedException
:当生产者尝试进行操作时,发现它已经不再拥有事务 ID 时,Kafka 会抛出此异常。此时应该终止当前生产者并重新启动。OutOfMemoryError
:如果生产者遇到内存问题,事务会被回滚,并且可能需要重试。KafkaException
:Kafka 通用异常,表示发生了未知错误。如果捕获到此异常,应回滚事务。
5. 生产者和消费者的性能考量
- 事务的性能开销:启用事务会增加一定的性能开销,特别是当涉及到跨多个分区和主题时。事务的提交需要等待所有副本的确认,这可能导致较长的延迟。
- 消费者的性能影响:在启用
read_committed
时,消费者需要等待 Kafka 中的事务完成才能读取数据,这也可能会影响消费的延迟。
9. 生产者的配置
更详尽的生产者配置可以参考:Kafka 生产者官方配置说明(opens new window)
以下为生产者主要配置参数清单:
acks
:指定了必须有多少个分区副本收到消息,生产者才会认为消息写入是成功的。默认为acks=1
acks=0
如果设置为 0,则 Producer 不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成。在这种情况下,服务器是否收到请求是没法保证的,并且参数retries
也不会生效(因为客户端无法获得失败信息)。每个记录返回的 offset 总是被设置为-1。acks=1
如果设置为 1,leader 节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功。在这种情况下,如果 leader 节点在接收记录之后,并且在 follower 节点复制数据完成之前产生错误,则这条记录会丢失。acks=all
如果设置为 all,这就意味着 leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成。只要至少有一个同步副本存在,记录就不会丢失。这种方式是对请求传递的最有效保证。acks=-1 与 acks=all 是等效的。
buffer.memory
:用来设置 Producer 缓冲区大小。compression.type
:Producer 生成数据时可使用的压缩类型。默认值是 none(即不压缩)。可配置的压缩类型包括:none
、gzip
、snappy
、lz4
或zstd
。压缩是针对批处理的所有数据,所以批处理的效果也会影响压缩比(更多的批处理意味着更好的压缩)。retries
:用来设置发送失败的重试次数。batch.size
:用来设置一个批次可占用的内存大小。linger.ms
:用来设置 Producer 在发送批次前的等待时间。client.id
:Kafka 服务器用它来识别消息源,可以是任意字符串。max.in.flight.requests.per.connection
:用来设置 Producer 在收到服务器响应前可以发送多少个消息。timeout.ms
:用来设置 Broker 等待同步副本返回消息确认的时间,与acks
的配置相匹配。request.timeout.ms
:Producer 在发送数据时等待服务器返回响应的时间。metadata.fetch.timeout.ms
:Producer 在获取元数据时(如:分区的 Leader 是谁)等待服务器返回响应的时间。max.block.ms
:该配置控制KafkaProducer.send()
和KafkaProducer.partitionsFor()
允许被阻塞的时长。这些方法可能因为缓冲区满了或者元数据不可用而被阻塞。用户提供的序列化程序或分区程序的阻塞将不会被计算到这个超时。max.request.size
:请求的最大字节数。receieve.buffer.bytes
:TCP 接收缓冲区的大小。send.buffer.bytes
:TCP 发送缓冲区的大小。
在 Kafka 中确保消息的顺序性,可以采用以下方案,具体选择取决于你的业务需求和场景:
10. 利用 Kafka 的分区特性
Kafka 保证每个分区内的消息是严格按照发送顺序存储和消费的。因此,可以通过以下方式确保消息的先后顺序:
方法1:使用相同的 Key
- 将具有顺序要求的消息发送到同一个分区。
- 生产者在发送消息时使用相同的 Key(例如,订单 ID、用户 ID)。
- Kafka 根据 Key 的哈希值决定消息路由到哪个分区。
实现步骤:
- 确保 Kafka 的 Topic 设置为多分区(可以有多个分区,但顺序要求的 Key 的消息始终发往同一分区)。
- 在生产者端,发送消息时设置 Key:
ProducerRecord<String, String> record = new ProducerRecord<>("topicName", key, message);
-
producer.send(record);
- 消费端按分区消费消息,顺序处理。
方法2: 单分区(Single Partition)设计
如果全局消息都需要严格顺序,可以使用单分区的 Topic。
优点:
- 全局顺序性完全保证。
缺点:
- 单分区限制了吞吐量,所有消息的生产和消费都会受限于单分区的处理速度。
实现:
- 创建 Topic 时,指定
partitions=1
:kafka-topics.sh --create --topic topicName --partitions 1 --replication-factor 1 --zookeeper localhost:2181
- 生产者直接发送消息,不需要指定 Key。
方法3. 严格控制消费者的并发度
如果使用多分区,但某些分区的消息需要顺序处理,可以控制消费者的并发度:
方法:消费者每次只消费一个分区
- 确保每个分区只有一个消费者。
- 消费者按顺序处理分区内的消息。
Kafka Consumer Group 配置:
- 每个 Consumer Group 内的消费者数量不超过分区数量。
- 消费逻辑中保证单线程处理。
方法4: 使用事务(Kafka Transactional API)
如果需要同时保证顺序性和多操作的原子性,可以利用 Kafka 的事务 API:
- Kafka 支持生产者在事务中发送消息,确保消息的顺序性和一致性。
- 在消费端也可以启用事务模式,确保处理完一批消息后再提交偏移量。
实现:
- 生产者开启事务:
producer.initTransactions();
-
producer.beginTransaction();
-
producer.send(record);
-
producer.commitTransaction();
- 消费者使用事务逻辑处理批量消息。