Kafka之【生产消息】

消息(Record)

在kafka中传递的数据我们称之为消息(message)或记录(record),所以Kafka发送数据前,需要将待发送的数据封装为指定的数据模型:
在这里插入图片描述

在这里插入图片描述

相关属性必须在构建数据模型时指定,其中主题和value的值是必须要传递的。如果配置中开启了自动创建主题,那么Topic主题可以不存在。value就是我们需要真正传递的数据了,而在未指定分区器或者未指定分区得情况下,Key可以用于数据的分区定位


根据前面提供的配置信息创建生产者对象,通过这个生产者对象向Kafka服务器节点发送数据,而具体的发送是由生产者对象创建时,内部构建的多个组件实现的,多个组件的关系有点类似于生产者消费者模式。

生产者(Producer)是一个关键组件,负责将消息发送到Kafka集群。Kafka生产者主要由三个核心部分组成:

  • KafkaProducer
  • RecordAccumulator
  • Sender

在这里插入图片描述


数据生产者(KafkaProducer)

作用:

KafkaProducer是生产者客户端的核心接口,为生产者对象,用于对我们的数据进行必要的转换和处理,将处理后的数据放入到数据收集器中,类似于生产者消费者模式下的生产者,负责提供向Kafka集群发布消息的功能。

组成:

KafkaProducer由以下关键部分组成:

  • 配置(ProducerConfig):用于初始化和配置生产者客户端的参数。
  • 拦截器(Interceptor):用于在消息发送前后进行自定义处理。如果配置拦截器栈(interceptor.classes),那么将数据进行拦截处理。某一个拦截器出现异常并不会影响后续的拦截器处理。
  • 序列化器(Serializer):因为发送的数据为KV数据,所以需要根据配置信息中的序列化对象对数据中Key和Value分别进行序列化处理。
  • 分区器(Partitioner):计算数据所发送的分区位置。

工作流程:

  • 初始化:根据配置参数初始化客户端,包括序列化器、分区器等。
  • 消息发送:消息经过拦截器处理、序列化、分区选择后,放入RecordAccumulator中。
  • 事务支持:处理事务消息的发送和事务边界(如果配置了事务,如幂等)。

数据收集器(RecordAccumulator)

作用:

RecordAccumulator用于收集,转换我们产生的数据,类似于生产者消费者模式下的缓冲区。为了优化数据的传输,Kafka并不是生产一条数据就向Broker发送一条数据,而是通过合并单条消息,进行批量(批次)发送,提高吞吐量,减少带宽消耗。

组成:

RecordAccumulator由以下部分组成:

  • 内存缓冲区(BufferPool):管理消息缓冲区的内存分配。
  • 消息队列(Deque):每个分区对应一个消息队列,用于存储批次消息。
  • 批次(ProducerBatch):用于存储一批待发送的消息。

内部工作:

  • 默认情况下,一个发送批次的数据容量为16K,这个可以通过参数batch.size进行改善。
  • 批次是和分区进行绑定的。也就是说发往同一个分区的数据会进行合并,形成一个批次。
  • 将消息追加到对应分区的批次中,如果当前批次已满达到时间限制,创建新的批次。
  • 这个队列使用的是Java的双端队列Deque。旧的批次关闭不再接收新的数据,等待发送

重要参数:

  • batch.size:每个批次的大小,默认16K。
  • linger.ms:发送前的等待时间限制,默认0s。
  • buffer.memory:内存缓冲区的总大小默认32M。

数据发送器(Sender)

作用:

Sender是一个后台线程,负责从RecordAccumulator中取出消息批次,向服务节点发送。类似于生产者消费者模式下的消费者。因为是线程对象,所以启动后会不断轮询获取数据收集器中已经关闭的批次数据。对批次进行整合后再发送到Broker节点中

组成:

Sender由以下部分组成:

  • 网络客户端(NetworkClient):负责与Kafka Broker进行网络通信。
  • 元数据管理(Metadata):获取和更新Kafka集群的元数据信息。
  • 请求管理(ClientRequest/ClientResponse):管理发送的请求和接收的响应。

内部工作:

  • 因为数据真正发送的地方是Broker节点,不是分区。所以需要将从数据收集器中收集到的批次数据按照可用Broker节点重新组合成List集合。
  • 将组合后的<节点,List<批次>>的数据封装成客户端请求发送到网络客户端对象的缓冲区,由网络客户端对象通过网络发送给Broker节点。
  • Broker节点获取客户端请求,并根据请求键进行后续的数据处理:向分区中增加数据。

重要参数:

  • retries:重试次数。
  • retry.backoff.ms:重试间隔时间。
  • request.timeout.ms:请求超时时间。

协作机制

  1. 消息发送流程

    • 用户通过KafkaProducer.send()方法发送消息。
    • 消息经过序列化、分区选择和拦截器处理后,进入RecordAccumulator
    • RecordAccumulator将消息存入对应分区的消息队列中,形成批次。
  2. 消息传输流程

    • Sender后台线程不断从RecordAccumulator中取出已准备好的消息批次。
    • Sender通过NetworkClient将消息批次发送到Kafka Broker。
    • 如果发送成功,Sender接收响应并通知KafkaProducer;如果发送失败,根据重试策略进行重试。

通过这种协作机制,Kafka生产者实现了高效、可靠的消息发送。KafkaProducer负责接口和配置管理,RecordAccumulator负责消息缓存和批量处理,Sender负责消息的实际传输和重试逻辑。


生产者代码

// TODO 配置属性集合
Map<String, Object> configMap = new HashMap<>();
// TODO 配置属性:Kafka服务器集群地址
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// TODO 配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作
configMap.put(
        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer");
configMap.put(
        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer");
// TODO 创建Kafka生产者对象,建立Kafka连接
//      构造对象时,需要传递配置参数
KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
// TODO 准备数据,定义泛型
//      构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
        "test", "key1", "value1"
);
// TODO 生产(发送)数据
producer.send(record);
// TODO 关闭生产者连接
producer.close();

拦截器

生产者API在数据准备好发送给Kafka服务器之前,允许我们对生产的数据进行统一的处理,比如校验,整合数据等等。这些处理我们是可以通过Kafka提供的拦截器完成。因为拦截器不是生产者必须配置的功能,所以可以根据实际的情况自行选择使用。

但是要注意,这里的拦截器是可以配置多个的。执行时,会按照声明顺序执行完一个后,再执行下一个。并且某一个拦截器如果出现异常,只会跳出当前拦截器逻辑,并不会影响后续拦截器的处理。所以开发时,需要将拦截器的这种处理方法考虑进去。

在这里插入图片描述

自定义拦截器

要想自定义拦截器,只需要创建一个类,然后实现Kafka提供的分区类接口ProducerInterceptor,接下来重写方法。这里我们只关注onSend方法即可。


import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

/**
 * TODO 自定义数据拦截器
 *      1. 实现Kafka提供的生产者接口ProducerInterceptor
 *      2. 定义数据泛型 <K, V>
 *      3. 重写方法
 *         onSend
 *         onAcknowledgement
 *         close
 *         configure
 */
public class KafkaInterceptorMock implements ProducerInterceptor<String, String> {
    
    @Override
    // 数据发送前,会执行此方法,进行数据发送前的预处理
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return record;
    }

    @Override	
    // 数据发送后,获取应答时,会执行此方法
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    @Override
    // 生产者关闭时,会执行此方法,完成一些资源回收和释放的操作
    public void close() {
    }

    @Override
    // 创建生产者对象的时候,会执行此方法,可以根据场景对生产者对象的配置进行统一修改或转换。
    public void configure(Map<String, ?> configs) {
    }
}

使用拦截器

// 仅需在配置properties的时候,指定自定义拦截器器即可
configMap.put( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaInterceptorMock.class.getName());

同步发送和异步发送

1. 异步发送

如果Kafka通过主线程代码将一条数据放入到缓冲区后,无需等待数据的后续发送过程,就直接发送一下条数据的场合,我们就称之为异步发送。
在这里插入图片描述

import org.apache.kafka.clients.producer.*;

import java.util.HashMap;
import java.util.Map;

public class KafkaProducerASynTest {
    public static void main(String[] args) {
        Map<String, Object> configMap = new HashMap<>();
        configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configMap.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        configMap.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
        // TODO 循环生产数据
        for ( int i = 0; i < 10; i++ ) {
            // TODO 创建数据
            ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);
            // TODO 发送数据
            producer.send(record, new Callback() {
                // TODO 回调对象
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    // TODO 当数据发送成功后,会回调此方法
                    System.out.println("数据发送成功:" + recordMetadata.timestamp());
                }
            });
            // TODO 发送当前数据
            System.out.println("发送数据");
        }
        producer.close();
    }
}

2. 同步发送

如果Kafka通过主线程代码将一条数据放入到缓冲区后,需等待数据的后续发送操作的应答状态,才能发送一下条数据的场合,我们就称之为同步发送。所以这里的所谓同步,就是生产数据的线程需要等待发送线程的应答(响应)结果。

在这里插入图片描述

import org.apache.kafka.clients.producer.*;

import java.util.HashMap;
import java.util.Map;

public class KafkaProducerASynTest {
    public static void main(String[] args) throws Exception {
        Map<String, Object> configMap = new HashMap<>();
        configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configMap.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        configMap.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
        // TODO 循环生产数据
        for ( int i = 0; i < 10; i++ ) {
            // TODO 创建数据
            ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);
            // TODO 发送数据
            producer.send(record, new Callback() {
                // TODO 回调对象
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    // TODO 当数据发送成功后,会回调此方法
                    System.out.println("数据发送成功:" + recordMetadata.timestamp());
                }
            }).get();
            // TODO 发送当前数据
            System.out.println("发送数据");
        }
        producer.close();
    }
}

分区器

  1. 如果指定了分区,直接使用
  2. 如果指定了自己的分区器,通过分区器计算分区编号,如果有效,直接使用
  3. 如果指定了数据Key,且使用Key选择分区的场合,采用murmur2非加密散列算法(类似于hash)计算数据Key序列化后的值的散列值,然后对主题分区数量模运算取余,最后的结果就是分区编号。hash(key)%numPartitions = 分区号
  4. 如果未指定数据Key,或不使用Key选择分区,那么Kafka会自动分区

自定义分区器

只需要创建一个类,然后实现Kafka提供的分区类接口Partitioner,接下来重写方法。这里我们只关注partition方法即可,因为此方法的返回结果就是需要的分区编号。

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * TODO 自定义分区器实现步骤:
 *      1. 实现Partitioner接口
 *      2. 重写方法
 *         partition : 返回分区编号,从0开始
 *         close
 *         configure
 */
public class KafkaPartitionerMock implements Partitioner {
    /**
     * 分区算法 - 根据业务自行定义即可
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes The serialized key to partition on( or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster The current cluster metadata
     * @return 分区编号,从0开始
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

使用分区器

// 仅需在配置properties的时候,指定自定义分区器即可
configMap.put( ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaPartitionerMock.class.getName());

消息可靠性(Acknowledgement)

ACK = 0

当生产数据时,生产者对象将数据通过网络客户端将数据发送到网络数据流中的时候,Kafka就对当前的数据请求进行了响应(确认应答),如果是同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。

ACK = 1

当生产数据时,Kafka Leader副本将数据接收到并写入到了日志文件后,就会对当前的数据请求进行响应(确认应答),如果是同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。

ACK = -1/all (默认) 最可靠

当生产数据时,Kafka Leader副本和Follower副本都已经将数据接收到并写入到了日志文件后,再对当前的数据请求进行响应(确认应答),如果是同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。


数据重试导致的数据重复

由于网络或服务节点的故障,Kafka在传输数据时,可能会导致数据丢失,所以我们才会设置ACK应答机制,尽可能提高数据的可靠性。

  1. 在某些场景中,数据并不是真正地丢失,比如将ACK应答设置为1,Leader副本将数据写入文件后,Kafka就可以对请求进行响应。
  2. 此时,假设网络故障的原因,Kafka并没有成功将ACK应答信息发送给Producer,那么此时对于Producer来讲,以为kafka没有收到数据,所以就会一直等待响应,一旦超过某个时间阈值,就会发生超时错误,也就是说在Kafka Producer眼里,数据已经丢了
  3. 所以在这种情况下,kafka Producer会尝试对超时的请求数据进行重试(retry)操作。通过重试操作尝试将数据再次发送给Kafka。
  4. 如果此时发送成功,Kafka就又收到了数据,两条数据一样,也就是说数据的重复。

数据乱序

数据重试(retry)功能除了可能会导致数据重复以外,还可能会导致数据乱序。

  1. 假设需要将编号为1,2,3的三条连续数据发送给Kafka。
  2. 如果在发送过程中,1因为网络原因发送失败,2、3发生成功
  3. 则此时在Broker的缓存中,为消息2、3
  4. 生产者重发消息1
  5. 则此时在Broker的缓存中,为消息2、3、1

这就产生了数据的乱序


幂等

为了解决Kafka传输数据时,所产生的数据重复和乱序问题,Kafka引入了幂等性操作,所谓的幂等性,就是Producer同样的一条数据,无论向Kafka发送多少次,kafka都只会存储一条。注意,这里的同样的一条数据,指的不是内容一致的数据,而是指的不断重试的数据。

// 幂等需要手动开启
enable.idempotence配置为true
  1. 开启幂等性后,为了保证数据不会重复,那么就需要给每一个请求批次的数据增加唯一性标识,kafka中,这个标识采用的是连续的序列号数字sequencenum,但是不同的生产者Producer可能序列号是一样的,所以仅仅靠seqnum还无法唯一标记数据,所以还需要同时对生产者进行区分,所以Kafka采用申请生产者ID(producerid)的方式对生产者进行区分。这样,在发送数据前,我们就需要提前申请producerid以及序列号sequencenum

  2. Broker中会给每一个分区记录生产者的生产状态:采用队列的方式缓存最近的5个批次数据。队列中的数据按照seqnum进行升序排列。这里的数字5是经过压力测试,均衡空间效率和时间效率所得到的值,所以为固定值,无法配置且不能修改。

  3. 如果Borker当前新的请求批次数据在缓存的5个旧的批次中存在相同的,如果有相同的,那么说明有重复,当前批次数据不做任何处理。
    在这里插入图片描述

  4. 如果Broker当前的请求批次数据在缓存中没有相同的,那么判断当前新的请求批次的序列号是否为缓存的最后一个批次的序列号加1,如果是,说明是连续的,顺序没乱。那么继续,如果不是,那么说明数据已经乱了,发生异常。
    在这里插入图片描述

  5. Broker根据异常返回响应,通知Producer进行重试。Producer重试前,需要在缓冲区中将数据重新排序,保证正确的顺序后。再进行重试即可。
    在这里插入图片描述

  6. 如果请求批次不重复,且有序,那么更新缓冲区中的批次数据。将当前的批次放置再队列的结尾,将队列的第一个移除,保证队列中缓冲的数据最多5个。

从上面的流程可以看出,Kafka的幂等性是通过消耗时间和性能的方式提升了数据传输的有序和去重,在一些对数据敏感的业务中是十分重要的。但是通过原理,咱们也能明白,这种幂等性还是有缺陷的

  • 幂等性的producer仅做到单分区上的幂等性,即单分区消息有序不重复,多分区无法保证幂等性。
  • 只能保持生产者单个会话的幂等性,无法实现跨会话的幂等性(也就是说如果一个producer挂掉再重启,那么重启前和重启后的producer对象会被当成两个独立的生产者,从而获取两个不同的独立的生产者ID,导致broker端无法获取之前的状态信息,所以无法实现跨会话的幂等)
  • 要想解决这个问题,就需要采用事务功能。

什么是生产者事务

在Kafka中,生产者事务(Producer Transactions)允许生产者以原子方式向一个或多个主题写入消息。事务可以确保消息要么全部成功写入,要么全部失败,从而保证数据的一致性。下面详细解释Kafka中生产者事务的原理:

事务组件

在Kafka中,事务涉及以下几个组件:

  • Transactional Producer(事务生产者):负责在事务中写入消息。
  • Transaction Coordinator(事务协调器):管理事务的生命周期,包括开始事务、提交事务和中止事务。
  • Broker(代理):Kafka集群中的服务器,存储消息并协助协调事务。

事务处理的详细步骤

  1. 初始化事务:事务生产者向事务协调器注册,事务协调器为该生产者分配一个唯一的Transactional IDProducer Epoch
  2. 开始事务:生产者调用beginTransaction时,事务协调器记录事务开始状态。
  3. 发送消息:生产者发送消息时,消息会标记为“待处理”状态并包含事务ID和epoch。
  4. 提交事务:生产者调用commitTransaction时,事务协调器将事务状态更新为“提交中”,然后通知所有相关分区代理提交消息。
  5. 完成提交:所有分区代理确认消息已写入日志后,事务协调器更新事务状态为“已提交”,通知生产者事务完成。
  6. 中止事务:在任何步骤发生错误时,生产者可以调用abortTransaction,事务协调器将事务状态更新为“中止中”,通知相关分区代理丢弃消息,最后更新事务状态为“已中止”。

实现事务的要点

  • 幂等性:确保消息的幂等性,以避免重复写入。Kafka通过Producer IDSequence Number实现幂等性。
  • 协调和日志:事务协调器负责管理事务状态,事务日志记录事务的所有状态变化。
  • 原子提交:确保事务提交的原子性,通过两阶段提交协议(2PC)实现。
    1. 第一阶段:预提交

      • 生产者发送消息时,代理记录Producer ID和Sequence Number。
      • 消息标记为“待提交”。
    2. 第二阶段:提交或中止

      • 当生产者提交事务时,事务协调器通知代理将消息标记为“已提交”。
      • 如果事务中止,代理丢弃消息,不进行处理。
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;

public class ProducerTransactionTest {
    public static void main(String[] args) {
        Map<String, Object> configMap = new HashMap<>();
        configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configMap.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configMap.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // TODO 配置幂等性
        configMap.put( ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        // TODO 配置事务ID
        configMap.put( ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-tx-id");
        // TODO 配置事务超时时间
        configMap.put( ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5);
        // TODO 创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
        // TODO 初始化事务
        producer.initTransactions();
        try {
            // TODO 启动事务
            producer.beginTransaction();
            // TODO 生产数据
            for ( int i = 0; i < 10; i++ ) {
                ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);
                final Future<RecordMetadata> send = producer.send(record);
            }
            // TODO 提交事务
            producer.commitTransaction();
        } catch ( Exception e ) {
            e.printStackTrace();
            // TODO 终止事务
            producer.abortTransaction();
        }
        // TODO 关闭生产者对象
        producer.close();

    }
}

如何确保跨会话中的幂等(生产者崩溃后,事务恢复生产者,即为跨会话)

确保幂等性的过程主要依赖于Kafka的Producer ID、Producer Epoch和Sequence Number机制。以下是详细的过程描述:

1. 初始生产者启动和消息发送

  1. 生产者初始化

    • 生产者在初始化时,Kafka为其分配一个唯一的Producer ID (PID)。
    • Producer ID标识当前生产者实例。
  2. 开始发送消息

    • 生产者发送消息时,每个消息附带一个Sequence Number。
    • Sequence Number在每个分区内是递增的,用于标识消息的顺序。

2. 记录Producer ID和Sequence Number

  • Kafka代理(Broker)会记录每个分区的Producer ID和最新的Sequence Number。
  • 每条消息发送时,代理会检查当前Producer ID和Sequence Number,确保消息的顺序和唯一性。

3. 生产者崩溃和重启

  1. 生产者崩溃

    • 假设生产者在事务过程中崩溃。
  2. 生产者重启

    • 生产者重启后,使用相同的Transactional ID重新初始化。
    • Kafka为重启的生产者分配一个新的Producer ID。
    • 同时,Producer Epoch递增,标识这是生产者的一个新的会话。

4. 恢复未完成的事务

  • 事务协调器:负责管理事务状态,恢复未完成的事务。
  • 当生产者重启并调用initTransactions时,事务协调器会:
    • 检查与Transactional ID相关的未完成事务。
    • 根据事务日志记录,决定是提交还是中止这些事务。

5. 幂等性保障机制

  1. 新的Producer ID和递增的Producer Epoch

    • Kafka代理识别新的Producer ID和递增的Producer Epoch,确保重启后的生产者实例与之前的实例区分开。
  2. 更新记录

    • 代理更新记录新的Producer ID和Sequence Number。
    • 新的Producer ID和递增的Sequence Number确保消息不重复处理。
  3. 消息去重

    • 代理通过检查Producer ID和Sequence Number,确保同一个Producer ID的消息不会被处理两次。
    • 即使生产者在重启后重新发送消息,代理能够识别并忽略重复的消息。

6. 事务性消息处理

  1. 第一阶段:预提交

    • 生产者发送消息时,代理记录Producer ID和Sequence Number。
    • 消息标记为“待提交”。
  2. 第二阶段:提交或中止

    • 当生产者提交事务时,事务协调器通知代理将消息标记为“已提交”。
    • 如果事务中止,代理丢弃消息,不进行处理。

总结

通过以上机制,Kafka确保生产者在不同会话中的幂等性:

  • Producer ID和Producer Epoch:标识生产者实例和会话阶段。
  • Sequence Number:确保每个分区中的消息顺序和唯一性。
  • 事务协调器:管理事务状态,确保事务的一致性和原子性。

即使生产者崩溃并重启,通过这些机制,Kafka能够保证消息的幂等性和事务一致性,避免重复处理消息,确保数据可靠性和一致性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/638747.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

近临算法(个人总结版)

背景 近邻算法&#xff08;Nearest Neighbor Algorithm&#xff09;是一种基本但非常有效的分类和回归方法。最早由Fix和Hodges在1951年提出&#xff0c;经过几十年的发展和改进&#xff0c;已成为数据挖掘、模式识别和机器学习领域的重要工具。近邻算法基于相似性原则&#x…

get和post的区别,二者是幂等的吗?

一、什么是幂等 所谓幂等性通俗的将就是一次请求和多次请求同一个资源产生相同的副作用。 维基百科定义&#xff1a;幂等&#xff08;idempotent、idempotence&#xff09;是一个数学与计算机学概念&#xff0c;常见于抽象代数中。 在编程中一个幂等操作的特点是其任意多次执…

git分支常用命令

最近在用git提交代码的时候&#xff0c;发现有些命令不是很会&#xff0c;先记录几个常用分支命令&#xff0c;后续再补充&#xff0c;在执行git push命令提交代码的时候遇到报错&#xff0c;一并记录下。 1.git常用命令 新建分支&#xff1a; git branch <分支名称> 比…

day16|二叉树的属性

相关题目 ● 104.二叉树的最大深度 559.n叉树的最大深度 ● 111.二叉树的最小深度 ● 222.完全二叉树的节点个数 二叉树的深度与高度 如图&#xff0c; 二叉树的深度表示&#xff1a;任意一个叶子节点到根节点的距离&#xff0c;是从上往下计数的&#xff0c;因此使用前序遍历…

Transformer详解(2)-位置编码

位置编码公式 偶数位置用sin,奇数位置用cos. d_model 表示token的维度&#xff1b;pos表示token在序列中的位置&#xff1b;i表示每个token编码的第i个位置&#xff0c;属于[0,d_model)。 torch实现 import math import torch from torch import nn from torch.autograd im…

blender 烘焙渲染图片,已经导出fbx,导出贴图。插件生成图片

1.新建一个模型。选择资产浏览器的材质&#xff0c;并拖动到模型身上&#xff0c;如下图。资产浏览器的材质可以网上找。 2.打开着色器面板。正下方着色器窗口中&#xff0c;点击空白取消选择&#xff0c;然后右击-添加-着色器-原理化BSDF&#xff0c;右击-添加-纹理-图像纹理。…

初阶数据结构之双向链表详解

目录 一&#xff1a;双向链表的概念 1.什么是双向链表&#xff1f; 2.双向链表的优点 3.双向链表的结构 二&#xff1a;双向链表的实现 1.定义链表结点 2.初始化双向链表 3.添加结点 4.尾插 5.头插 6.打印双向链表 7.查找链表结点 8.在指定结点后插入新结点 9.删…

力扣:92. 反转链表 II(Java)

目录 题目描述&#xff1a;示例 1&#xff1a;示例 2&#xff1a;代码实现&#xff1a; 题目描述&#xff1a; 给你单链表的头指针 head 和两个整数 left 和 right &#xff0c;其中 left < right 。请你反转从位置 left 到位置 right 的链表节点&#xff0c;返回 反转后的…

TypeScript-搭建编译环境

搭建编译环境 TypeScript 编写的代码是无法直接在js引擎( 浏览器 / Nodejs )中运行的&#xff0c;最终还需要经过编译成js代码才可以正常运行 搭建手动编译环境 1️⃣ 全局安装 typescript 包&#xff08;编译引擎&#xff09; -> 注册 tsc 命令 npm i -g typescript 2…

如何解决vcruntime140.dll丢失问题,详细介绍5种靠谱的解决方法

vcruntime140.dll是Microsoft Visual C Redistributable Package的一部分&#xff0c;它为使用Visual C编译器开发的应用程序提供必要的运行时环境。该DLL文件包含了大量应用程序运行时需要调用的库函数&#xff0c;这些函数是实现C标准库、异常处理机制、RTTI&#xff08;运行…

2461. 长度为 K 子数组中的最大和(c++)

给你一个整数数组 nums 和一个整数 k 。请你从 nums 中满足下述条件的全部子数组中找出最大子数组和&#xff1a; 子数组的长度是 k&#xff0c;且子数组中的所有元素 各不相同 。 返回满足题面要求的最大子数组和。如果不存在子数组满足这些条件&#xff0c;返回 0 。 子数…

2024电工杯数学建模A题Matlab代码+结果表数据教学

2024电工杯A题保姆级分析完整思路代码数据教学 A题题目&#xff1a;园区微电网风光储协调优化配置 以下仅展示部分&#xff0c;完整版看文末的文章 %A_1_1_A % 清除工作区 clear;clc;close all;warning off; %读取参数%正常读取 % P_LOADxlsread(附件1&#xff1a;各园区典…

如何创建 Gala Games 账户:解决 Cloudflare 验证指南 2024

Gala Games 站在数字娱乐新时代的前沿&#xff0c;将区块链技术与游戏相结合&#xff0c;重新定义了所有权和奖励。本文将引导您创建 Gala Games 账户并使用 CapSolver 解决 Cloudflare 验证难题&#xff0c;确保您顺利进入这一创新的生态系统。 什么是 Gala Games&#xff1f…

debian nginx upsync consul 实现动态负载

1. consul 安装 wget -O- https://apt.releases.hashicorp.com/gpg | sudo gpg --dearmor -o /usr/share/keyrings/hashicorp-archive-keyring.gpg echo "deb [signed-by/usr/share/keyrings/hashicorp-archive-keyring.gpg] https://apt.releases.hashicorp.com $(lsb_r…

微信小程序使用input标签遇到的问题

场景1&#xff1a;多个input标签切换无法聚焦问题 解决方案1&#xff1a; 在网上搜的用官方给的always-embed属性&#xff0c;但是也明确标注了只有ios可用 解决方案2&#xff1a; 使用focus属性&#xff1a;每次点击input标签都重新设置 wxml: <input adjust-position…

【YOLOv5/v7改进系列】替换激活函数为SiLU、ReLU、LeakyReLU、FReLU、PReLU、Hardswish、Mish、ELU等

一、导言 激活函数在目标检测中的作用至关重要&#xff0c;它们主要服务于以下几个关键目的&#xff1a; 引入非线性&#xff1a;神经网络的基本构建块&#xff08;如卷积层、全连接层等&#xff09;本质上是线性变换&#xff0c;而激活函数通过引入非线性&#xff0c;使得网络…

画图工具之PlantUML插件使用

文章目录 1 PlantUML插件1.1 引言1.2 什么是PlantUML1.3 PlantUML插件1.3.1 IntelliJ IDEA中插件1.3.2 VS Code中插件1.3.3 使用例子 1.4 PlantUML时序图语法1.4.1 声明参与者1.4.2 消息传递1.4.2.1 同步消息1.4.2.2 异步消息1.4.2.3 返回消息1.4.2.4 自调用 1.4.3 生命线&…

在Windows10中重命名文件和文件夹的6种方法,有你熟悉和不熟悉的

序言 你可以通过多种方式在Windows 10上重命名文件。如果每次你想更改文件名时仍右键单击并选择“重命名”,那么我们有一些技巧可以加快更改速度。 使用文件资源管理器重命名文件和文件夹 Windows 10的文件资源管理器是一个功能强大的工具。你知道吗,有四种不同的方法可以…

理解大语言模型(二)——从零开始实现GPT-2

相关说明 这篇文章的大部分内容参考自我的新书《解构大语言模型&#xff1a;从线性回归到通用人工智能》&#xff0c;欢迎有兴趣的读者多多支持。 本文涉及到的代码链接如下&#xff1a;regression2chatgpt/ch11_llm/char_gpt.ipynb1 本文将讨论如何利用PyTorch从零开始搭建G…

【Linux网络】端口及UDP

文章目录 1.再看四层2.端口号2.1引入linux端口号和进程pid的区别端口号是如何生成的传输层有了pid还设置端口号端口号划分 2.2问题2.3netstat 3.UDP协议3.0每学一个协议 都要讨论一下问题3.1UDP协议3.2谈udp/tcp实际上是在讨论什么&#xff1f; 1.再看四层 2.端口号 端口号(Po…