kafka学习笔记 @by_TWJ

目录

  • 1. 消息重复消费怎么解决
    • 1.1. 确保相同的消息不会被重复发送(消费幂等性)
    • 1.2. 消息去重
    • 1.3. 消息重试机制
    • 1.4. kafka怎么保证消息的顺序性
      • 1.4.1. 利用分区的特征:
      • 1.4.2. 解决办法:
      • 1.4.3. 分区分配策略
        • 1.4.3.1. RangeAssignor (每组(Topic)里的分区(partition)都依次消费,可能导致第一个消费者负重,最后的消费者无消息可消费)
        • 1.4.3.2. RoundRobinAssignor (所有分区(partition)依次消费,不在区分有没有组(Topic)。所有资源都尽量就尽量均衡。)
        • 1.4.3.3. StickyAssignor (不改变原来的分配上,把关联消费者宕机的分区,重新分配给其他消费者,尽量达到均衡)
    • 1.5. kafka 怎么知道消费成功
    • 1.6. 死信(消息处理失败后怎么处理)
      • 1.6.1. 死信配置(这里存放到特定的主题)
    • 1.7. 事务
      • 1.7.1. 配置参数开启事务:
      • 1.7.2. Producer 事务
        • 1.7.2.1. javaSDK例子
        • 1.7.2.2. springboot例子
      • 1.7.3. Consumer 事务
        • 1.7.3.1. 隔离级别:
        • 1.7.3.2. 提交事务和回滚事务
        • 1.7.3.3. 总结:
  • 2. kafka文档
    • 2.1. Topic、Group、Partition、消费者的关系
    • 2.2. 管理topic
    • 2.3. 发送producer消息
    • 2.4. consumer - 消费producer消息
    • 2.5. 部署kafka
    • 2.6. 部署zookeeper
    • 2.7. 学习用的demo

1. 消息重复消费怎么解决

消息重复消费的问题可以通过多种方法解决,主要包括消费幂等性、消息去重、消息确认机制、消息重试机制、保证消息的顺序性以及将消息进行持久化存储。

  • 消费幂等性:确保在同一条消息被重复消费时,系统不会产生副作用或影响系统的正确性。这可以通过在消费端使用唯一标识来判断消息是否已经被消费过,例如使用数据库的唯一索引、使用分布式锁等方式来保证幂等性。

  • 消息去重:针对同一条消息的多次消费,只保留其中一次消费结果。可以在消费者端进行去重,使用缓存或数据库来记录已经处理过的消息,避免重复消费。

  • 消息确认机制:MQ一般提供消息确认机制,如ACK机制。消费者在成功处理一条消息后,发送ACK给MQ,表示该消息已经被成功消费。如果消费者在处理消息时发生异常或失败,可以不发送ACK,MQ会将该消息重新发送给其他消费者进行处理。

  • 消息重试机制:当消息处理失败时,可以将消息重新发送给MQ,由MQ重新投递给消费者进行处理。可以在消息的header中添加重试次数的标记,当达到最大重试次数后,可以将消息发送到死信队列进行处理,以避免消息的无限重试。

  • 保证消息的顺序性:如果消息的顺序性很重要,可以将相关消息发送到同一个分区或同一个队列中,以保证消息的顺序性。(kafka分区)

  • 持久化机制:为了避免消息丢失,可以将消息进行持久化存储,例如将消息存储到数据库或文件系统中。即使MQ发生故障或重启,也可以通过持久化的消息进行恢复。

这些方法可以单独或结合使用,以有效解决消息重复消费的问题,确保系统的稳定性和数据的准确性。

1.1. 确保相同的消息不会被重复发送(消费幂等性)

Kafka事务是怎么实现的?Kafka事务消息原理详解
Kafka生产者可以配置为幂等,确保相同的消息不会被重复发送。
保证在消息重发的时候,消费者不会重复处理。即使在消费者收到重复消息的时候,重复处理,也要保证最终结果的一致性。(可以理解为在应用端做了幂等处理。即使重复消息发送过来了,也会判断是否已在处理,从而达到一条消息只会被一个处理)

消息在 MQ 中的传递,大致可以归类为下面三种:

  • At most once: 至多一次。消息在传递时,最多会被送达一次。是不安全的,可能会丢数据。

  • At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。

  • Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。

大部分消息队列满足的都是At least once,也就是可以允许重复的消息出现。

1.2. 消息去重

针对同一条消息的多次消费,只保留其中一次消费结果。可以在消费者端进行去重,使用缓存或数据库来记录已经处理过的消息,避免重复消费。
例如,使用redis缓存去重:

 // 去重
  public void removeRepeatProcessMessage(FsConsumer fsConsumer){

      RSet<String> set = redisson.getSet("local:fs:downloadFileMsg");

      if (set.add(fsConsumer.getMsgId())) {// 成功则处理,失败则抛出异常,记录到死信队列里
          // 只有当消息ID被成功添加到集合时才处理消息
          processMessage(fsConsumer);
      } else {
          // 去重处理
          // 如果消息ID已存在于集合中,则表示该消息已处理,跳过
          System.out.println("Message with ID " + fsConsumer.getMsgId() + " has already been processed.");
      }
  }

1.3. 消息重试机制

当消息处理失败时,可以将消息重新发送给MQ,由MQ重新投递给消费者进行处理。可以在消息的header中添加重试次数的标记,当达到最大重试次数后,可以将消息发送到死信队列进行处理,以避免消息的无限重试。

@Configuration
public class KafkaConfig {
    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory,
            KafkaTemplate<Object, Object> template) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        //最大重试三次
        ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
//        设置重试间隔 10秒, 重试次数为 1 次
        BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
        // 失败进入死信,topic和group,都变成${topic}.DLT和${group}.DLT ,如下是进入死信的例子:
        /*
        # bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup.DLT && bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup
        GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST            CLIENT-ID
        myGroup.DLT     myTopic.DLT     0          4               4               4               test-0-faf8afcc-e4b1-4aca-b064-356163564495 /192.168.3.3    test-0

        GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST            CLIENT-ID
        myGroup         myTopic         0          800020          800020          0               test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3    test-0
        myGroup         myTopic         1          400012          400012          0               test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3    test-0
        myGroup         myTopic         2          300000          300000          0               test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3    test-0
         */
        factory.setCommonErrorHandler(new DefaultErrorHandler(recoverer,backOff));
        return factory;
    }
}

1.4. kafka怎么保证消息的顺序性

1.4.1. 利用分区的特征:

  1. 分区是最小的并列单位;
  2. 一个消费者可以消费多个分区;
  3. 一个分区可以被多个消费者组里的消费者消费;
  4. 但是,一个分区不能同时被同一个消费者组里的多个消费者消费;(意思是:在组里,一个分区同时消费的只能有一个消费者)

1.4.2. 解决办法:

  1. 把全部有序消息放到同一个分区里
  2. 把有需要有序的消息,放到同一个分区里。

1.4.3. 分区分配策略

Kafka的消费者分区策略
一个consumer group中有多个consumer,一个topic有多个partition,所以必然会涉及到partition的分配问题,即确定哪个partition由哪个consumer来消费。Kafka提供了3种消费者分区分配策略:RangeAssigorRoundRobinAssignorStickyAssignor

术语:
partition - 分区

1.4.3.1. RangeAssignor (每组(Topic)里的分区(partition)都依次消费,可能导致第一个消费者负重,最后的消费者无消息可消费)

RangeAssignor对每个Topic进行独立的分区分配。对于每一个Topic,首先对分区按照分区ID进行排序,然后订阅这个Topic的消费组的消费者再进行排序,之后尽量均衡的将分区分配给消费者。这里只能是尽量均衡,因为分区数可能无法被消费者数量整除,那么有一些消费者就会多分配到一些分区。分配示意图如下:

在这里插入图片描述

T0 是Topic-0
T1 是Topic-1
在这里插入图片描述

1.4.3.2. RoundRobinAssignor (所有分区(partition)依次消费,不在区分有没有组(Topic)。所有资源都尽量就尽量均衡。)

RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进行排序后尽量均衡的分配(RangeAssignor是针对单个Topic的分区进行排序分配的)。如果消费组内,消费者订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果是尽量均衡的(消费者之间分配到的分区数的差值不会超过1)。如果订阅的Topic列表是不同的,那么分配结果是不保证“尽量均衡”的,因为某些消费者不参与一些Topic的分配。
在这里插入图片描述

1.4.3.3. StickyAssignor (不改变原来的分配上,把关联消费者宕机的分区,重新分配给其他消费者,尽量达到均衡)

StickyAssignor分区分配算法,目的是在执行一次新的分配时,能在上一次分配的结果的基础上,尽量少的调整分区分配的变动,节省因分区分配变化带来的开销。Sticky是“粘性的”,可以理解为分配结果是带“粘性的”——每一次分配变更相对上一次分配做最少的变动。其目标有两点:

分区的分配尽量的均衡。
每一次重分配的结果尽量与上一次分配结果保持一致。
当这两个目标发生冲突时,优先保证第一个目标。第一个目标是每个分配算法都尽量尝试去完成的,而第二个目标才真正体现出StickyAssignor特性的。

StickyAssignor算法比较复杂,下面举例来说明分配的效果(对比RoundRobinAssignor),前提条件:

有4个Topic:T0、T1、T2、T3,每个Topic有2个分区。
有3个Consumer:C0、C1、C2,所有Consumer都订阅了这4个分区。
在这里插入图片描述

1.5. kafka 怎么知道消费成功

Kafka 不直接提供一种机制来确认消费者是否成功消费了消息。但是,它提供了几种策略来保证消息的成功处理:

  • 使用 Kafka 的自动提交功能:可以配置消费者自动定期提交消息的偏移量。

  • 手动提交偏移量:消费者可以在处理完消息后手动提交偏移量,表明该消息已被成功处理。

  • 使用 Kafka 事务:在支持事务的 Kafka 集群上,可以开启事务来保证消费者处理消息的全部成功或失败。

以下是一个简单的示例,展示如何在消费者中手动提交偏移量:

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
 
import java.util.Arrays;
import java.util.Properties;
 
public class ManualOffsetCommit {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 关闭自动提交
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
 
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("topic"));
 
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
 
            for (ConsumerRecord<String, String> record : records) {
                // 处理消息
                System.out.println(record.value());
 
                // 处理完后提交当前偏移量
                consumer.commitSync();
            }
        }
    }
}

1.6. 死信(消息处理失败后怎么处理)

死信,即Dead Letter,是指在Kafka中无法消费的消息。当消息因为以下原因而无法被消费时,可能会变成死信:

  • 消费者故障,无法处理消息。
  • 消息处理时发生异常。
  • 消息达到设定的消息Level再次尝试消费的次数限制。

为了处理死信,Kafka提供了几种策略:

  • 将死信发送到一个特定的主题(Topic)。
  • 将死信保存到一个文件中。

以下是一个示例,演示如何设置Kafka消费者,以便将死信消息发送到一个特定的主题:

1.6.1. 死信配置(这里存放到特定的主题)

    @Bean
    DefaultErrorHandler errorHandler(KafkaTemplate<Object, Object> template) {
        // 失败进入死信,topic和group,都变成${topic}.DLT和${group}.DLT ,如下是进入死信的例子:
        /*
        # bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup.DLT && bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup
        GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST            CLIENT-ID
        myGroup.DLT     myTopic.DLT     0          4               4               0               test-0-faf8afcc-e4b1-4aca-b064-356163564495 /192.168.3.3    test-0

        GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST            CLIENT-ID
        myGroup         myTopic         0          800020          800020          0               test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3    test-0
        myGroup         myTopic         1          400012          400012          0               test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3    test-0
        myGroup         myTopic         2          300000          300000          0               test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3    test-0
        */
//        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
//                (r, e) -> {
//                       //死信的结果发送到特定的主题
//                    return new TopicPartition(r.topic()+".DLT", r.partition());
//
//                });
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
//        ErrorHandler errorHandler = new FallbackBatchErrorHandler(recoverer, new FixedBackOff(0L, 2L));
        return new DefaultErrorHandler(recoverer,new FixedBackOff(10*1000L, 2L));
    }

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
        kafkaListenerContainerFactory(DefaultErrorHandler defaultErrorHandler) {
        ConcurrentKafkaListenerContainerFactory<String, String>
                factory = new ConcurrentKafkaListenerContainerFactory<>();
        // 设置消费者工厂
        factory.setConsumerFactory(consumerFactory());
        // 消费者组中线程数量
        factory.setConcurrency(3);
        // 拉取超时时间
        factory.getContainerProperties().setPollTimeout(3000);
        // 当使用批量监听器时需要设置为true
        factory.setBatchListener(true);
        factory.setCommonErrorHandler(defaultErrorHandler);
        return factory;
    }

触发死信的业务代码

 @Service
public class UserConsumerService {
    @KafkaListener(topics = {"myUser"},groupId = "myUserGroup", containerFactory="kafkaListenerContainerFactory")
//    public void kafkaListener(String message){
    public void kafkaListener(List<ConsumerRecord<String, String>> recordList){
        System.out.println("消费列表:"+recordList.size());
        for (ConsumerRecord<String, String> consumerRecord : recordList) {
            kafkaListener(consumerRecord);
        }

    }
    public void kafkaListener(ConsumerRecord<String, String> record){
        String key = record.key().toString();
        String value = record.value().toString();
        System.out.println(record.offset()+" \t "+key+" \t "+ value);
        if(value.contains("abc")){
            System.out.println("存在消费abc");
            throw new RuntimeException("存在消费abc");
        }
    }

}

1.7. 事务

1.7.1. 配置参数开启事务:

// transactional.id = transactionId
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactionId");

1.7.2. Producer 事务

保证事务原子性操作

事务期间,会发送produce消息到kafka服务器上的Topic,这是未提交状态的消息,你可以看到Topic上多了几条消息。
事务回滚后,消息还是会存在的。所以comsumer需要使用读已提交的方式获取。

1.7.2.1. javaSDK例子

以下是 Producer 事务使用示例:

Properties props = new Properties();
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "ProducerTranscationnalExample");
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "test-transactional");
props.put("acks", "all");

KafkaProducer producer = new KafkaProducer(props);

producer.initTransactions();

try {

    String msg = "matt test";
    producer.beginTransaction();
    producer.send(new ProducerRecord(topic, "0", msg.toString()));
    producer.send(new ProducerRecord(topic, "1", msg.toString()));
    producer.send(new ProducerRecord(topic, "2", msg.toString()));
    producer.commitTransaction();

} catch (ProducerFencedException e1) {
    e1.printStackTrace();
    producer.close();
} catch (KafkaException e2) {
    e2.printStackTrace();
    producer.abortTransaction();
}
producer.close();
1.7.2.2. springboot例子
@Transactional
public String sendForTransaction(String userNo, String jsonString) {
    System.out.println("sendForTransaction方法中,是否开启事务中:"+kafkaTemplate.inTransaction());
    kafkaTemplate.send("myUser","key",jsonString);
    return "success";
}

1.7.3. Consumer 事务

通过使用隔离级别能查看到未提交的消息。

1.7.3.1. 隔离级别:

隔离级别:

  • isolation.level=read_uncommitted 读未提交
  • isolation.level=read_committed 读取已提交

配置方式:

propsMap.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));//# 读取已提交的消息
propsMap.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT));//# 读取已提交的消息


Springboot中kafka默认隔离级别是 读未提交

public class ConsumerConfig{
    ....
    public static final String DEFAULT_ISOLATION_LEVEL = IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT);
    ....
}    

1.7.3.2. 提交事务和回滚事务

使用 Kafka 事务:在支持事务的 Kafka 集群上,可以开启事务来保证消费者处理消息的全部成功或失败。

// 自动提交事务
    @Transactional
    public String sendForTransaction1(String userNo, String jsonString) {
        System.out.println("sendForTransaction方法中,是否开启事务中:"+kafkaTemplate.inTransaction());
        kafkaTemplate.send("myUser","key",jsonString);
        return "success";
    }
// 回滚事务
   @Transactional
    public String sendForTransaction(String userNo, String jsonString) {
        System.out.println("sendForTransaction方法中,是否开启事务中:"+kafkaTemplate.inTransaction());
        kafkaTemplate.send("myUser","key",jsonString);
        throw new RuntimeException();
    }
1.7.3.3. 总结:

初步:因为分区只会被一个消费者消费,没消费完就不会发送下个消息,所以发送到消费者端,实际上就同时进行事务的就只有一个,相当于同步消费。

进阶:但这样性能会很差,所以后面可以把业务操作区间的,进行隔离归类,就像id余数为1的放1分区,id余数为2的放2分区,这样大大提高了消费速度,可自定义程度高,受制于本人设计,它的瓶颈就是设计人员,如果归类好,可以不存在资源抢占问题。

瓶颈:但消息有个问题,就是一个分区每次只能消费一条记录,所以它并行处理严重依赖于分区数量,但分区数量也不是乱加的,设计不合理,会存在资源操作冲突的问题。

对比其他事务的瓶颈:

  • 消息事务:瓶颈在于设计人员设计,合理分配分区数量,资源抢占问题在设计层面就已经解决了。
  • seata:瓶颈在于是否存在资源抢占。
    • seata的XA、AT、TCC等 的瓶颈在于是否存在资源抢占,如果在修改仓库那功能的话,因为存在资源抢占的话,相当于同步处理,其他线程被锁,只有一个线程在处理,这么多服务都只会有一个业务在处理,直接导致seta性能拖慢。

个人建议:

  • 如果业务存在资源抢占的话,使用消息事务会更优。
  • 如果业务不存在资源抢占的话,使用seta会更优。

消息事务就像是基金一样,稳定处理,
seata就像是股票一样,上下波动,一时快,一时慢。(若使用在合适场景会很快。)

应用场景:如果不停的区分seata和消息事务会很累的,而且因为费用问题,还有业务一定会有资源抢占问题,所以一般都使用消息事务的方式处理。这是我个人分析。

2. kafka文档

https://kafka.apache.org/documentation/#producerconfigs

2.1. Topic、Group、Partition、消费者的关系

大体上分为Topic和Group,代表的是发布者与订阅者。

Topic 就像是一个数据库表,Partition就是数据库表中的分表存储的记录。
Group指的是消费者组,消费者组里有很多个消费者,消费Topic的Partition表里的消息

例如:

Topics
    myOrder
    myOrder.DLT
    myTopic
    myTopic.DLT
    myUser
    myUser.DLT
Consumer Groups
    myGroup
    myGroup.DLT
    myUserGroup
    myUserGroup.DLT

Topic的消息只记录数据,不记录消费记录
Group只记录消息消费情况。

Topic 与 消费组 的关系,就像是聊天室,topic是发布消息A,其他消费组都能收到消息A,消息是一对多的关系,每次增加一组消费者,都会从0offset开始读取Topic

2.2. 管理topic

https://blog.51cto.com/u_16213637/9850264

# 查看所有topic


bin/kafka-topics.sh --bootstrap-server localhost:9092  --list
--------------------
__consumer_offsets
myTopic
test-javasdk
--------------------
# 查看topic详情
> bin/kafka-topics.sh --describe --topic myTopic --bootstrap-server localhost:9092
--------------------
Topic: myTopic	TopicId: Y_t8v2snRgmirrGykcLYmg	PartitionCount: 3	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: myTopic	Partition: 0	Leader: 1001	Replicas: 1001	Isr: 1001
	Topic: myTopic	Partition: 1	Leader: 1001	Replicas: 1001	Isr: 1001
	Topic: myTopic	Partition: 2	Leader: 1001	Replicas: 1001	Isr: 1001

--------------------
# 创建topic
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my_topic_name \
        --partitions 20 --replication-factor 1 --config x=y
简写:> bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092        
        
# 修改topic
> bin/kafka-topics.sh --bootstrap-server localhost:9092 -alter --partitions 3 --topic myTopic
//> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name myTopic --partitions 20 --alter 

# 修改topic 的分区
bin/kafka-topics.sh --bootstrap-server localhost:9092 -alter --partitions 3 --topic myTopic

# 删除topic
 > bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name

# 添加topic配置,例如修改  partitions 40      
> bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \
        --partitions 40
        
# 删除topic配置
> bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y

# 消费者列表
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic --from-beginning
--------------------
test-99988
test-99989
test-99990
test-99991
test-99992
test-99993
test-99994
test-99995
--------------------
# 查看消费组里的成员
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup  --members
--------------------
GROUP           CONSUMER-ID                                        HOST            CLIENT-ID       #PARTITIONS     
myGroup         ConsumerTest-1a87e781-b3ce-43c8-ac4a-76c8ccdec5aa  /192.168.3.3    ConsumerTest    2               
myGroup         ConsumerTest2-ed1ddf24-7a68-4136-ad39-d774f531d765 /192.168.3.3    ConsumerTest2   1  
--------------------

# 查看消息堆积情况(还可以看出是哪台机器有)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup
--------------------

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                        HOST            CLIENT-ID
myGroup         myTopic         0          664075          700001          35926           ConsumerTest-3f121c59-ca11-4fa8-8fa0-538e0f0d5bdb  /192.168.3.3    ConsumerTest
myGroup         myTopic         1          284334          300000          15666           ConsumerTest-3f121c59-ca11-4fa8-8fa0-538e0f0d5bdb  /192.168.3.3    ConsumerTest
myGroup         myTopic         2          200000          200000          0               ConsumerTest2-4ae32ff0-e67f-4d09-8185-005bf61f3b1f /192.168.3.3    ConsumerTest2


其中:LAG 是待消费记录
--------------------

  • replication-factor

复制因素控制有多少服务器将复制写入的每条消息。如果您的复制因子为3,那么在您失去对数据的访问权限之前,最多可以有2台服务器出现故障。我们建议您使用2或3的复制因子,这样您就可以在不中断数据消耗的情况下透明地跳转机器。

  • partitions

分区计数控制主题将被切分成多少个日志。分区计数有几个影响。首先,每个分区必须完全适合单个服务器。因此,如果您有20个分区,那么整个数据集(以及读写负载)将由不超过20台服务器处理(不包括副本)。最后,分区计数会影响消费者的最大并行性。这将在概念部分进行更详细的讨论。

每个分片的分区日志都放在Kafka日志目录下自己的文件夹中。这类文件夹的名称由主题名称、加上破折号(-)和分区id组成。由于典型的文件夹名称长度不能超过255个字符,因此主题名称的长度将受到限制。我们假设分区的数量永远不会超过100,000。因此,主题名称不能超过249个字符。这在文件夹名称中留下了足够的空间来放置破折号和可能的5位数长的分区id。

  • config

与主题相关的配置既有服务器默认值,也有可选的每个主题覆盖。如果没有给出每个主题的配置,则使用服务器默认值。可以在创建主题时通过提供一个或多个——config选项来设置覆盖。下面的例子创建了一个名为my-topic的主题,并自定义了最大消息大小和刷新速率:

> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 1 \
  --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1

以后还可以使用alter configs命令更改或设置覆盖。下面的例子更新了my-topic的最大消息大小:

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic
  --alter --add-config max.message.bytes=128000

要检查主题上设置的覆盖,您可以执行以下操作

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe

要移除覆盖,您可以这样做

> bin/kafka-configs.sh --bootstrap-server localhost:9092  --entity-type topics --entity-name my-topic
  --alter --delete-config max.message.bytes

以下是主题级配置。服务器对此属性的默认配置在服务器默认属性标题下给出。给定的服务器默认配置值仅适用于没有显式主题配置覆盖的主题。

2.3. 发送producer消息

# 发送消息
> bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event

Ctrl-C 终止

2.4. consumer - 消费producer消息

# 消费消息
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

数量统计:普通消息和死信消息
死信是应用端生成的,非kafka本身自带。
myGroup.DLT 是死信
myGroup 是普通消息

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup.DLT && bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup
---------------------------------
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST            CLIENT-ID
myGroup.DLT     myTopic.DLT     0          4               4               0               test-0-faf8afcc-e4b1-4aca-b064-356163564495 /192.168.3.3    test-0

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST            CLIENT-ID
myGroup         myTopic         0          800020          800020          0               test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3    test-0
myGroup         myTopic         1          400012          400012          0               test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3    test-0
myGroup         myTopic         2          300000          300000          0               test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3    test-0

---------------------------------

2.5. 部署kafka

创建docker-compose.yml

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
# 证实可以
docker-compose up -d


2.6. 部署zookeeper

后面在找了一个
创建docker-compose.yml


version: '2'
services:
  zookeeper: # 注意,在我的archlinux系统的机子会有内存泄露问题
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"2.6.   kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
# 启动
docker-compose up -d

2.7. 学习用的demo

https://gitee.com/alvis128/springboot-kafka-demo

官网:https://docs.spring.io/spring-kafka/docs/2.7.8/reference/html/#using-kafkatransactionmanager

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/684428.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

水电表自动抄表系统

1.简述 水电表自动抄表系统是一种现代化智能化管理系统&#xff0c;它利用先进的物联网&#xff0c;完成了远程控制、即时、零接触的水电表读值收集&#xff0c;大大提升了公共事业服务项目的效率和准确性。该系统不仅减少了人工抄表工作量&#xff0c;还避免了人为失误&#…

Mac环境下,简单反编译APK

一、下载jadx包 https://github.com/skylot/jadx/releases/tag/v1.4.7 下载里面的这个&#xff1a;下载后&#xff0c;找个干净的目录解压&#xff0c;我是放在Downloads下面 二、安装及启动 下载和解压 jadx&#xff1a; 下载 jadx-1.4.7.zip 压缩包。将其解压到你希望的目…

Flink的简单学习四

一 有状态计算 1.1 概念 1.状态;上一次计算的结果 2.需要基于上一个结果来进行计算&#xff0c;被称为有状态计算 1.2 未使用有状态计算 1.下面这个代码将相同的key发送到同一个task任务里面计算。就是因为这个导致了&#xff0c;明明之前没有输入b&#xff0c;但是输入b之…

【鸿蒙】开发之页面跳转组件—实现页面跳转方法汇总!

①不同 Slice 间跳转&#xff0c;同一个 Ability 中&#xff0c;优点是方便&#xff0c;高效&#xff0c;缺点是业务逻辑复杂度受限。 button.setClickedListener(listener -> present(new SecondAbilitySlice(), new Intent()) );②使用 Intent 借助于 ElementName&#x…

【管理咨询宝藏126】德勤咨询为某大型机械集人才体系发展思路方案

本报告首发于公号“管理咨询宝藏”&#xff0c;如需阅读完整版报告内容&#xff0c;请查阅公号“管理咨询宝藏”。 【管理咨询宝藏126】德勤咨询为某大型机械集人才体系发展思路方案 【格式】PDF版本 【关键词】人力咨询、人才体系、人才盘点 【核心观点】 - 中国整车与零部件…

七天进阶elasticsearch[two]

批量保存 批量保存是通过_bulk API来实现的 请求方式 post 请求地址 _bulk 通过_bulk操作文档,一般至少有两行参数 第一行用于确定要干什么(插入,修改还是删除) 第二行才是操作的数据; 当然以上是标准操作,也可以不遵循标准操作,使用不同的请求方式来完成 批量保存demo…

故障预警 vs 故障分类:哪个更有意义,哪个更具挑战性?

故障预警 vs 故障分类&#xff1a;哪个更有意义&#xff0c;哪个更具挑战性&#xff1f; 在现代工业系统中&#xff0c;风力发电机、制造设备等关键装置的可靠性和稳定性对生产效率至关重要。为此&#xff0c;故障预警和故障分类成为保障设备正常运行的重要手段。那么&#xf…

备份和恢复realme智能手机:综合指南

realme自2018年成立至今&#xff0c;一直秉持着“敢于超越”的品牌精神&#xff0c;专注于为全球年轻用户提供性能卓越、设计新颖的高品质手机。对于如何备份和恢复realme手机&#xff0c;本文将介绍多种不同的方法。 第1部分&#xff1a;使用Coolmuster Android Backup Mana…

怎解ESP-ADF组件 error: unknown type name ‘xSemaphoreHandle‘

没有定义&#xff0c;看一下最上面的头文件。 通过看最上面的头文件引入&#xff0c;可以看到信号量头文件已经有了&#xff0c;那很明显就是类型的兼容的问题&#xff0c;打开MenuConfig开启向后兼容API的选项。 问题解决。

CVE-2024-2961:将phpfilter任意文件读取提升为远程代码执行(RCE)

0x00 前言 前几天p牛师傅在星球发了一个帖子&#xff1a;PHP利用glibc iconv()中的一个缓冲区溢出漏洞CVE-2024-2961&#xff0c;实现将文件读取提升为任意命令执行漏洞&#xff0c;当时觉得这个漏洞蛮有意思&#xff0c;就想研究一下。于是web狗开启了一次二进制漏洞的学习之…

输出有10个元素的整型数组各元素的值

&#xff08;1&#xff09;下标法 编写程序&#xff1a; &#xff08;2&#xff09;指针法&#xff1a; 将上面程序第7行和第10行的a[i]改为"*(ai)"。 &#xff08;3&#xff09;用指针变量指向数组元素 编写程序&#xff1a; 运行结果&#xff1a; 对3种方法的比…

家政上门按摩小程序源码 仿东郊到家小程序源码

家政上门按摩小程序源码 仿东郊到家小程序源码 实用行业 适用于&#xff1a;预约私教&#xff0c;预约瑜伽/健身、预约美容/美发/美甲、预约理疗/足疗/推拿、预约清洁/保洁/保安、预约洗车/维修/安装、预约保姆/月嫂/护工/洗衣/烧饭、钟点工等暖心服务。在家政市场上&#xf…

数据结构复习

基本概念和术语&#xff1a; 数据&#xff1a;是描述客观事物的符号&#xff0c;是计算机中可以操作的对象&#xff0c;是能被计算机识别&#xff0c;并输入给计算机处理的符号集合。 数据元素&#xff1a;是组成数据的&#xff0c;具有一定意义的基本单位&#xff0c;在计算机…

ChatGPT Prompt技术全攻略-入门篇:AI提示工程基础

系列篇章&#x1f4a5; No.文章1ChatGPT Prompt技术全攻略-入门篇&#xff1a;AI提示工程基础2ChatGPT Prompt技术全攻略-进阶篇&#xff1a;深入Prompt工程技术3ChatGPT Prompt技术全攻略-高级篇&#xff1a;掌握高级Prompt工程技术4ChatGPT Prompt技术全攻略-应用篇&#xf…

nvm详细安装使用教程(nvm-node多版本管理工具),详细命令

nvm是什么 NVM 是 Node Version Manager 的缩写&#xff0c;它是一个用于管理 Node.js 版本的命令行工具。通过NVM&#xff0c;你可以在同一台机器上安装和切换多个 Node.js 版本&#xff0c;对于开发和测试在不同 Node.js 版本上运行的应用程序非常有用。 1、卸载node&#…

【C++课程学习】:C++入门(输入输出,缺省参数)

&#x1f381;个人主页&#xff1a;我们的五年 &#x1f50d;系列专栏&#xff1a;C课程学习 &#x1f389;欢迎大家点赞&#x1f44d;评论&#x1f4dd;收藏⭐文章 目录 &#x1f369;1.关于C输入输出&#xff1a; &#x1f369;2.缺省参数函数&#xff1a; 缺省参数的概…

Allegro-开店指南

开店指南 Allegro企业账户注册流程 Allegro注册流程分成两个主要阶段: 第一创建您的账户&#xff0c;第二激活您账户的销售功能。完成两个阶段&#xff0c;才能在Allegro进行销售。 中国企业应该入驻Business account&#xff08;企业账户&#xff09;。 第二阶段&#xff…

通用高电子迁移率晶体管(HEMT)的差分微变解算方案及分析型模型

来源&#xff1a;A Difference-Microvariation Solution and Analytical Model for Generic HEMTs&#xff08;TED 22年&#xff09; 摘要 这篇论文提出了一种AlGaN/GaN和AlGaAs/GaAs基高电子迁移率晶体管(HEMT)的分析型直流模型。该模型考虑了高栅偏压下势垒层中积累的电荷。…

Vue2项目错误提示:Vue: <template v-for> key should be placed on the <template> tag.

1. 场景还原 升级了最新的Webstorm后打开Vue2项目提示以下波浪线错误&#xff1a; Vue: <template v-for> key should be placed on the <template> tag. 该错误不会影响正常运行和构建&#xff0c;但我们看到了会不舒服。 2. 错误原因 Vue2中key不能放在temp…

iOS 之homebrew ruby cocoapods 安装

cocoapods安装需要ruby&#xff0c;更新ruby需要rvm&#xff0c;下载rvm需要gpg&#xff0c;下载gpg需要homebrew&#xff0c;所以安装顺序是homebrew->gpg->rvm->ruby-cocoapods Rvm 官网&#xff1a; RVM: Ruby Version Manager - RVM Ruby Version Manager - Docum…