Kafka消费全流程

Kafka消费全流程

1.Kafka一条消息发送和消费的流程图(非集群)

image.png

2.三种发送方式

  • 准备工作

创建maven工程,引入依赖

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.3.1</version>
        </dependency>
  • 消费者
/**
 * 类说明:消费者入门
 */
public class HelloKafkaConsumer {

    public static void main(String[] args) {
        // 设置属性
        Properties properties = new Properties();
        // 指定连接的kafka服务器的地址
        properties.put("bootstrap.servers","192.168.140.103:9092");
        // 设置String的反序列化
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        // 设置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"llp_consumerA");
        // 构建kafka消费者对象
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        try {
            //指定消费者订阅的主题
            consumer.subscribe(Collections.singletonList("llp-topic"));
            // 调用消费者拉取消息
            while(true){
                // 设置1秒的超时时间
                ConsumerRecords<String, String> records= consumer.poll(Duration.ofSeconds(1));
                for(ConsumerRecord<String, String> record:records){
                    String key = record.key();
                    String value = record.value();
                    //分区
                    int partition = record.partition();
                    System.out.println("接收到消息: partition="+partition+", key = " + key + ", value = " + value);
                }
            }
        } finally {
            // 释放连接
            consumer.close();
        }

    }
}

1.1 发送并忘记

忽略send方法的返回值,不做任何处理。大多数情况下,消息会正常到达,而且生产者会自动重试,但有时会丢失消息。

消费者

/**
 * 类说明:kafak生产者
 */
public class HelloKafkaProducer {

    public static void main(String[] args) {
        // 设置属性
        Properties properties = new Properties();
        // 指定连接的kafka服务器的地址,9092kafka默认端口
        properties.put("bootstrap.servers","192.168.140.103:9092");
        //补充一下: 配置多台的服务  用,分割, 其中一个宕机,生产者 依然可以连上(集群)
        // 设置String的序列化 (对象-》二进制字节数组 : 能够在网络上传输 )
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);

        // 构建kafka生产者对象
        KafkaProducer<String,String> producer  = new KafkaProducer<String, String>(properties);
        try {
            ProducerRecord<String,String> record;
            try {
                // 构建消息
                record = new ProducerRecord<String,String>("llp-topic", "nickname","llp0");
                // 发送消息
                producer.send(record);
                System.out.println("消息发送成功");
            } catch (Exception e) {
                e.printStackTrace();
            }
        } finally {
            // 释放连接
            producer.close();
        }
    }

}

测试结果

image-20240114161541282

image-20240114161554133

1.2同步发送

/**
 * 类说明:发送消息--同步模式
 */
public class SynProducer {

    public static void main(String[] args) {
        // 设置属性
        Properties properties = new Properties();
        // 指定连接的kafka服务器的地址
        properties.put("bootstrap.servers","192.168.140.103:9092");
        // 设置String的序列化
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);



        // 构建kafka生产者对象
        KafkaProducer<String,String> producer  = new KafkaProducer<String, String>(properties);

        try {
            ProducerRecord<String,String> record;
            try {
                // 构建消息
                record = new ProducerRecord<String,String>("llp-topic", "llp","同步发送");
                // 发送消息
                Future<RecordMetadata> future =producer.send(record);
                RecordMetadata recordMetadata = future.get();
                if(null!=recordMetadata){
                    System.out.println("偏移量offset:"+recordMetadata.offset()+","
                            +"分区partition:"+recordMetadata.partition());
                }
                System.out.println("消息发送成功");
            } catch (Exception e) {
                e.printStackTrace();
            }
        } finally {
            // 释放连接
            producer.close();
        }
    }

}

测试结果

image-20240114162213572

image-20240114162239690

1.3 异步发送

/**
 * 类说明:发送消息--异步模式
 */
public class AsynProducer {

    public static void main(String[] args) {
        // 设置属性
        Properties properties = new Properties();
        // 指定连接的kafka服务器的地址
        properties.put("bootstrap.servers","192.168.140.103:9092");
        // 设置String的序列化
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);

        // 构建kafka生产者对象
        KafkaProducer<String,String> producer  = new KafkaProducer<String, String>(properties);

        try {
            ProducerRecord<String,String> record;
            try {
                // 构建消息
                record = new ProducerRecord<String,String>("llp-topic", "nickname","孙悟空");
                // 发送消息
                producer.send(record, new Callback() {
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if (e == null){
                            // 没有异常,输出信息到控制台
                            System.out.println("偏移量offset:"+recordMetadata.offset()+"," +"分区partition:"+recordMetadata.partition());
                        } else {
                            // 出现异常打印
                            e.printStackTrace();
                        }
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        } finally {
            // 释放连接
            producer.close();
        }
    }
}

测试结果

image-20240114162541172

image-20240114162600616

3. 消费者群组

Kafka里消费者从属于消费者群组,一个群组里的消费者订阅的都是同一个主题,每个消费者接收主题一部分分区的消息。

image-20240114170048830


image.png

如上图,主题T有4个分区,群组中只有一个消费者,则该消费者将收到主题T1全部4个分区的消息。


image.png

如上图,在群组中增加一个消费者2,那么每个消费者将分别从两个分区接收消息,上图中就表现为消费者1接收分区1和分区3的消息,消费者2接收分区2和分区4的消息。


image.png

如上图,在群组中有4个消费者,那么每个消费者将分别从1个分区接收消息。


image.png

但是,当我们增加更多的消费者,超过了主题的分区数量,就会有一部分的消费者被闲置,不会接收到任何消息。

往消费者群组里增加消费者是进行横向伸缩能力的主要方式。所以我们有必要为主题设定合适规模的分区,在负载均衡的时候可以加入更多的消费者。但是要记住,一个群组里消费者数量超过了主题的分区数量,多出来的消费者是没有用处的。

4.序列化

创建生产者对象必须指定序列化器,默认的序列化器并不能满足我们所有的场景。我们完全可以自定义序列化器。只要实现org.apache.kafka.common.serialization.Serializer接口即可。

  • 举个例子,比如我现在需要传输一个user类的实体对象

user实体类

/**
 * 类说明:实体类
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
    private int id;
    private String name;

    public User(int id) {
        this.id = id;
    }
}

自定义序列化器,在生产者端进行指定

/**
 * 类说明:自定义序列化器
 */
public class UserSerializer implements Serializer<User> {
    public void configure(Map<String, ?> configs, boolean isKey) {
        //do nothing
    }

    public byte[] serialize(String topic, User data) {
        try {
            byte[] name;
            int nameSize;
            if(data==null){
                return null;
            }
            if(data.getName()!=null){
                name = data.getName().getBytes("UTF-8");
                //字符串的长度
                nameSize = name.length;
            }else{
                name = new byte[0];
                nameSize = 0;
            }
            /*id的长度4个字节,字符串的长度描述4个字节,
            字符串本身的长度nameSize个字节*/
            ByteBuffer buffer = ByteBuffer.allocate(4+4+nameSize);
            buffer.putInt(data.getId());//4
            buffer.putInt(nameSize);//4
            buffer.put(name);//nameSize
            return buffer.array();
        } catch (Exception e) {
            throw new SerializationException("Error serialize User:"+e);
        }
    }

    public void close() {
        //do nothing
    }
}

反序列化器,在消费者端指定

/**
 * 类说明:自定义反序列化器
 */
public class UserDeserializer implements Deserializer<User> {


    public void configure(Map<String, ?> configs, boolean isKey) {
        //do nothing
    }

    public User deserialize(String topic, byte[] data) {
        try {
            if(data==null){
                return null;
            }
            if(data.length<8){
                throw new SerializationException("Error data size.");
            }
            ByteBuffer buffer = ByteBuffer.wrap(data);
            int id;
            String name;
            int nameSize;
            id = buffer.getInt();
            nameSize = buffer.getInt();
            byte[] nameByte = new byte[nameSize];
            buffer.get(nameByte);
            name = new String(nameByte,"UTF-8");
            return new User(id,name);
        } catch (Exception e) {
            throw new SerializationException("Error Deserializer DemoUser."+e);
        }

    }

    public void close() {
        //do nothing
    }
}

生产者

/**
 * 类说明:发送消息--value使用自定义序列化
 */
public class ProducerUser {

    public static void main(String[] args) {
        // 设置属性
        Properties properties = new Properties();
        // 指定连接的kafka服务器的地址
        properties.put("bootstrap.servers","127.0.0.1:9092");
        // 设置String的序列化
        properties.put("key.serializer", StringSerializer.class);
        // 设置value的自定义序列化
        properties.put("value.serializer", UserSerializer.class);

        // 构建kafka生产者对象
        KafkaProducer<String,User> producer  = new KafkaProducer<String, User>(properties);
        try {
            ProducerRecord<String,User> record;
            try {
                // 构建消息
                record = new ProducerRecord<String,User>("llp-user", "teacher",new User(1,"孙悟空"));
                // 发送消息
                producer.send(record);
                System.out.println("message is sent.");
            } catch (Exception e) {
                e.printStackTrace();
            }
        } finally {
            // 释放连接
            producer.close();
        }
    }
}

消费者

/**
 * 类说明:
 */
public class ConsumerUser {


    public static void main(String[] args) {

        // 设置属性
        Properties properties = new Properties();
        // 指定连接的kafka服务器的地址
        properties.put("bootstrap.servers","127.0.0.1:9092");
        // 设置String的反序列化
        properties.put("key.deserializer", StringDeserializer.class);
        // 设置自定义的反序列化
        properties.put("value.deserializer", UserDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"ConsumerOffsets");
        // 构建kafka消费者对象
        KafkaConsumer<String,User> consumer = new KafkaConsumer<String, User>(properties);
        try {
            consumer.subscribe(Collections.singletonList("llp-user"));
            // 调用消费者拉取消息
            while(true){
                // 每隔1秒拉取一次消息
                ConsumerRecords<String, User> records= consumer.poll(Duration.ofSeconds(1));
                for(ConsumerRecord<String, User> record:records){
                    String key = record.key();
                    User user = record.value();
                    System.out.println("接收到消息: key = " + key + ", value = " + user.toString());
                }
            }
        } finally {
            // 释放连接
            consumer.close();
        }
    }
}

测试结果

image-20240114172048655

image-20240114172058088

自定义序列化容易导致程序的脆弱性。举例,在我们上面的实现里,我们有多种类型的消费者,每个消费者对实体字段都有各自的需求,比如,有的将字段变更为long型,有的会增加字段,这样会出现新旧消息的兼容性问题。特别是在系统升级的时候,经常会出现一部分系统升级,其余系统被迫跟着升级的情况。

解决这个问题,可以考虑使用自带格式描述以及语言无关的序列化框架。比如Protobuf,Kafka官方推荐的Apache Avro

5.分区

因为在Kafka中一个topic可以有多个partition,所以当一个生产发送消息,这条消息应该发送到哪个partition,这个过程就叫做分区。

当然,我们在新建消息的时候,我们可以指定partition,只要指定partition,那么分区器的策略则失效。

image-20240114172357181

5.1 Kafka3种分区分配策略

在我们的代码中可以看到,生产者参数中是可以选择分区器的。

注意,在测试之前我修改了server.properties中每个主题默认的分区数

# 配置每个主题默认的分区数
num.partitions=4
1.DefaultPartitioner 默认分区策略

全路径类名:org.apache.kafka.clients.producer.internals.DefaultPartitioner

  • 如果消息中指定了分区,则使用它
  • 如果未指定分区但存在key,则根据序列化key使用murmur2哈希算法对分区数取模。
  • 如果不存在分区或key,则会使用粘性分区策略

采用默认分区的方式,键的主要用途有两个:

一,用来决定消息被写往主题的哪个分区,拥有相同键的消息将被写往同一个分区。

二,还可以作为消息的附加消息。

2.RoundRobinPartitioner 分区策略

全路径类名:org.apache.kafka.clients.producer.internals.RoundRobinPartitioner

  • 如果消息中指定了分区,则使用它
  • 将消息平均的分配到每个分区中。

即key为null,那么这个时候一般也会采用RoundRobinPartitioner

image-20240114181401389

3.UniformStickyPartitioner 纯粹的粘性分区策略

全路径类名:org.apache.kafka.clients.producer.internals.UniformStickyPartitioner

他跟DefaultPartitioner 分区策略的唯一区别就是。

DefaultPartitionerd 如果有key的话,那么它是按照key来决定分区的,这个时候并不会使用粘性分区
UniformStickyPartitioner 是不管你有没有key, 统一都用粘性分区来分配

另外关于粘性分区策略

image-20240114174631916

在producer.properties中配置

# 配置生产者发送消息之前延迟多长时间在进行发送,默认0
#linger.ms=

# 对发送到分区的多个记录进行批处理时的默认批处理大小(以字节为单位)默认16K
#batch.size=

# 配置缓冲区的总大小,默认32M
#buffer.memory=

当linger.ms 和 batch.size有一个条件满足时,kafka客户端就会立即推送消息到kafka服务

https://bbs.huaweicloud.com/blogs/348729?utm_source=oschina&utm_medium=bbs-ex&utm_campaign=other&utm_content=content

5.2自定义分区器

在kafka中,我们可以通过实现Partitioner接口来自定义分区规则

/**
 * 类说明:自定义分区器,以value值进行分区
 */
public class SelfPartitioner implements Partitioner {
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
        int num = partitionInfos.size();
        // 来自DefaultPartitioner的处理,在存在key值时是根据key值去计算分区的Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
        // 我们定义一个分区器,根据value值去做计算消息所属分区
        int parId = Utils.toPositive(Utils.murmur2(valueBytes)) % num;
        return parId;
    }

    public void close() {
        //do nothing
    }

    public void configure(Map<String, ?> configs) {
        //do nothing
    }

}

生产者,指定自定义分区器

/**
 * 类说明:使用value值的分区器
 */
public class SelfPartitionProducer {

    private static KafkaProducer<String,String> producer = null;

    public static void main(String[] args) {
        // 设置属性
        Properties properties = new Properties();
        // 指定连接的kafka服务器的地址
        properties.put("bootstrap.servers","127.0.0.1:9092");
        // 设置String的序列化
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);
        // 设置自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, SelfPartitioner.class);


        // 构建kafka生产者对象
        KafkaProducer<String,String> producer  = new KafkaProducer<String, String>(properties);
        try {
            ProducerRecord<String,String> record;
            try {
                 for (int i =0;i<10;i++){
                     // 构建消息
                     record = new ProducerRecord<String,String>("llp-topic1", "teacher","孙悟空"+i);
                     // 发送消息
                     Future<RecordMetadata> future =producer.send(record);
                     RecordMetadata recordMetadata = future.get();
                     if(null!=recordMetadata){
                         System.out.println(i+","+"offset:"+recordMetadata.offset()+","
                                 +"partition:"+recordMetadata.partition());
                     }
                 }
            } catch (Exception e) {
                e.printStackTrace();
            }
        } finally {
            // 释放连接
            producer.close();
        }
    }

}

消费者

public class GroupAConusmer1 {

    public static void main(String[] args) {
        // 设置属性
        Properties properties = new Properties();
        // 指定连接的kafka服务器的地址
        properties.put("bootstrap.servers","127.0.0.1:9092");
        // 设置String的反序列化
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"groupA");
        // 构建kafka消费者对象
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        try {
            consumer.subscribe(Collections.singletonList("llp-topic1"));
            // 调用消费者拉取消息
            while(true){
                // 每隔1秒拉取一次消息
                ConsumerRecords<String, String> records= consumer.poll(Duration.ofSeconds(1));
                for(ConsumerRecord<String, String> record:records){
                    String key = record.key();
                    String value = record.value();
                    int partition =  record.partition();
                    System.out.println("接收到消息:"+",partition ="+partition +", key = " + key + ", value = " + value);
                }
            }
        } finally {
            // 释放连接
            consumer.close();

        }

    }

}

测试结果

image-20240114180846856

image-20240114180930552

6.生产缓冲机制

客户端发送消息给kafka服务器的时候、消息会先写入一个内存缓冲中,然后直到多条消息组成了一个Batch,才会一次网络通信把Batch发送过去。producer.properties主要有以下参数:

buffer.memory

设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果数据产生速度大于向broker发送的速度,导致生产者空间不足,producer会阻塞或者抛出异常。缺省33554432 (32M)

buffer.memory: 所有缓存消息的总体大小超过这个数值后,就会触发把消息发往服务器。此时会忽略batch.size和linger.ms的限制。
buffer.memory的默认数值是32 MB,对于单个 Producer 来说,可以保证足够的性能。 需要注意的是,如果您在同一个JVM中启动多个 Producer,那么每个 Producer 都有可能占用 32 MB缓存空间,此时便有可能触发 OOM。

batch.size

当多个消息被发送同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次内存被填满后,批次里的所有消息会被发送出去。但是生产者不一定都会等到批次被填满才发送,半满甚至只包含一个消息的批次也有可能被发送(linger.ms控制)。缺省16384(16k) ,如果一条消息超过了批次的大小,会写不进去。

linger.ms

指定了生产者在发送批次前等待更多消息加入批次的时间。它和batch.size以先到者为先。也就是说,一旦我们获得消息的数量够batch.size的数量了,他将会立即发送而不顾这项设置,然而如果我们获得消息字节数比batch.size设置要小的多,我们需要“linger”特定的时间以获取更多的消息。这个设置默认为0,即没有延迟。设定linger.ms=5,例如,将会减少请求数目,但是同时会增加5ms的延迟,但也会提升消息的吞吐量。

为何要设计缓冲机制

1、减少IO的开销(单个 ->批次)但是这种情况基本上也只是linger.ms配置>0的情况下才会有,因为默认inger.ms=0的,所以基本上有消息进来了就发送了,跟单条发送是差不多!!

2、减少Kafka中Java客户端的GC。

比如缓冲池大小是32MB。然后把32MB划分为N多个内存块,比如说一个内存块是16KB(batch.size),这样的话这个缓冲池里就会有很多的内存块。

你需要创建一个新的Batch,就从缓冲池里取一个16KB的内存块就可以了,然后这个Batch就不断的写入消息

下次别人再要构建一个Batch的时候,再次使用缓冲池里的内存块就好了。这样就可以利用有限的内存,对他不停的反复重复的利用。因为如果你的Batch使用完了以后是把内存块还回到缓冲池中去,那么就不涉及到垃圾回收了。

image.png

7.消费者偏移量提交

一般情况下,我们调用poll方法的时候,broker返回的是生产者写入Kafka同时kafka的消费者提交偏移量,这样可以确保消费者消息消费不丢失也不重复,所以一般情况下Kafka提供的原生的消费者是安全的,但是事情会这么完美吗?

自动提交

最简单的提交方式是让消费者自动提交偏移量。 如果enable.auto.commit被设为 true,消费者会自动把从poll()方法接收到的最大偏移量提交上去。提交时间间隔由auto.commit.interval.ms控制,默认值是5s。

自动提交是在轮询里进行的,消费者每次在进行轮询时会检査是否该提交偏移量了,如果是,那么就会提交从上一次轮询返回的偏移量。

不过,在使用这种简便的方式之前,需要知道它将会带来怎样的结果。

假设我们仍然使用默认的5s提交时间间隔, 在最近一次提交之后的3s发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了3s,所以在这3s内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量, 减小可能出现重复消息的时间窗, 不过这种情况是无法完全避免的。

在使用自动提交时,每次调用轮询方法都会把上一次调用返回的最大偏移量提交上去,它并不知道具体哪些消息已经被处理了,所以在再次调用之前最好确保所有当前调用返回的消息都已经处理完毕(enable.auto.comnit被设为 true时,在调用 close()方法之前也会进行自动提交)。一般情况下不会有什么问题,不过在处理异常或提前退出轮询时要格外小心。

/**
 * 类说明:消费者 手动提交:  自定义的方式
 */
public class ConsumerSpecial {

    public static void main(String[] args) {
        // 设置属性
        Properties properties = new Properties();
        // 指定连接的kafka服务器的地址
        properties.put("bootstrap.servers","127.0.0.1:9092");
        // 设置String的反序列化
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"ConsumerOffsets");
        /*取消自动提交*/
        properties.put("enable.auto.commit",false);

        // 构建kafka消费者对象
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        //当前偏移量
        Map<TopicPartition, OffsetAndMetadata> currOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        int count = 0;
        try {
            consumer.subscribe(Collections.singletonList("llp-topic1"));
            // 调用消费者拉取消息
            while(true){
                // 每隔1秒拉取一次消息
                ConsumerRecords<String, String> records= consumer.poll(Duration.ofSeconds(1));
                for(ConsumerRecord<String, String> record:records){
                    String key = record.key();
                    String value = record.value();
                    System.out.println("接收到消息: key = " + key + ", value = " + value);
                    currOffsets.put(new TopicPartition(record.topic(),record.partition()),
                            new OffsetAndMetadata(record.offset()+1,"no meta"));
                    count++;
                    if(count%10==0){
                        //每10条提交一次(特定的偏移量),下一次10条数据会接着在上一次提交的偏移量后进行消费
                        consumer.commitAsync(currOffsets,null);
                    }

                }
            }
        }catch (CommitFailedException e) {
            System.out.println("Commit failed:");
            e.printStackTrace();
        }finally {
            try {
                consumer.commitSync(currOffsets);//同步提交(这里也可以)
            } finally {
                consumer.close();
            }
        }

    }

}
消费者的配置参数

auto.offset.reset

earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

只要group.Id不变,不管auto.offset.reset 设置成什么值,都从上一次的消费结束的地方开始消费。

/**
 * 类说明:消费者入门:自动提交的消费模式
 */
public class GroupBConusmer1 {

    public static void main(String[] args) {
        // 设置属性
        Properties properties = new Properties();
        // 指定连接的kafka服务器的地址
        properties.put("bootstrap.servers","127.0.0.1:9092");
        // 设置String的反序列化
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"groupB");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");  //最早:从头开始消费

        //properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest"); // 已提交的offset开始消费


        // 构建kafka消费者对象
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        try {
            consumer.subscribe(Collections.singletonList("llp-topic1"));
            // 调用消费者拉取消息
            while(true){
                // 每隔1秒拉取一次消息
                ConsumerRecords<String, String> records= consumer.poll(Duration.ofSeconds(1));
                for(ConsumerRecord<String, String> record:records){
                    String key = record.key();
                    String value = record.value();
                    int partition =  record.partition();
                    System.out.println("接收到消息:"+",partition ="+partition +", key = " + key + ", value = " + value);
                }
            }
        } finally {
            // 释放连接
            consumer.close();

        }

    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/321220.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

UDS 诊断通讯

UDS有哪些车型支持 UDS(统一诊断服务)协议被广泛应用于汽车行业中,支持多种车型。具体来说,UDS协议被用于汽车电子控制单元(ECU)之间的通讯,以实现故障诊断、标定、编程和监控等功能。 支持UDS协议的车型包括但不限于以下几种: 奥迪(Audi)车型:包括A3、A4、A5、A6…

C++I/O流——(4)文件输入/输出(第一节)

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd; 含泪播种的人一定能含笑收获&#xff…

外部晶振、复位按键、唤醒按键、扩展排针原理图详解

前言&#xff1a;本文对外部晶振、复位按键、唤醒按键、扩展排针原理图详解。本文使用的MCU是GD32F103C8T6 目录 外部晶振原理图 复位按键、唤醒按键原理图 扩展排针部分原理图 ​外部晶振原理图 如下图&#xff0c;两个外部晶振&#xff0c;分别是8M&#xff08;主晶振&a…

git的三种状态概念

git的三种状态 Git 有三种状态&#xff0c;你的文件可能处于其中之一&#xff1a; 已提交&#xff08;committed&#xff09;、已修改&#xff08;modified&#xff09; 和 已暂存&#xff08;staged&#xff09;。 已修改表示修改了文件&#xff0c;但还没保存到数据库中。 …

Rust-函数

简介 Rust的函数使用关键字fn开头。 函数可以有一系列的输入参数&#xff0c;还有一个返回类型。 函数体包含一系列的语句(或者表达式)。 函数返回可以使用return语句&#xff0c;也可以使用表达式。 Rust编写的可执行程序的入口就是fn main()函数。 以下是一个函数的示例…

案例121:基于微信小程序的作品集展示系统设计与实现

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…

解决ERROR 24680 --- [ main] o.a.catalina.core.AprLifecycleListener 报错:

1.报错全称&#xff1a; ERROR 24680 --- [ main] o.a.catalina.core.AprLifecycleListener : An incompatible version [1.2.32] of the Apache Tomcat Native library is installed, while Tomcat requires version [1.2.34] 2.解决方案&#xff1a; 步骤一 在…

高创新!EI论文复现+改进:聚合温度调控策略的综合能源系统/微电网/虚拟电厂多目标优化调度程序代码!

程序考虑供热的热惯性&#xff0c;并根据室内供热效果进行柔性供热&#xff0c;发挥热温度负荷的“储能”能力&#xff1b;针对普适性参数的室内空调进行集群研究&#xff0c;深入剖析温度设定值调整导致负荷波动的机理&#xff0c;并提出一种新的温度调整方法&#xff0c;平抑…

「 典型安全漏洞系列 」03.跨站请求伪造CSRF详解

引言&#xff1a;CSRF&#xff08;Cross-Site Request Forgery&#xff0c;跨站请求伪造&#xff09;是一种攻击技术&#xff0c;通过使用用户的身份进行不诚实地操作&#xff0c;恶意用户可以在受害者&#xff08;目标&#xff09;的机器上执行一些未授权的操作。这可能会危及…

行业分享----dbaplus174期:美团基于Orchestrator的MySQL高可用实践

记录 MySQL高可用方案-MMM、MHA、MGR、PXC https://blog.csdn.net/jycjyc/article/details/119731980 美团数据库高可用架构的演进与设想 https://tech.meituan.com/2017/06/29/database-availability-architecture.html

推荐一款通过ssh连接linux服务的开源工具WindTerm

文章目录 前言WindTerm介绍WindTerm使用主密码和锁屏总结 前言 工作一入门便是游戏服务器开发&#xff0c;所以常常有连接Linux服务器的需求&#xff0c;之前用的最多的是Xshell&#xff0c;最近这个软件个人版只能免费使用一个月了&#xff0c;超过时间会提示更新无法正常使用…

NLP论文阅读记录 - 2022 WOS | 语义提取文本摘要的新方法

文章目录 前言0、论文摘要一、Introduction1.1目标问题1.2相关的尝试1.3本文贡献 二.背景三.本文方法四 实验效果4.1数据集4.2 对比模型4.3实施细节4.4评估指标4.5 实验结果4.6 细粒度分析 五 总结思考 前言 A Novel Approach for Semantic Extractive Text Summarization&…

面向对象三大特征之三:多态--java学习笔记

什么是多态 多态是在继承/实现情况下的一种现象&#xff0c;表现为&#xff1a;对象多态、行为多态 对象多态&#xff1a;举个栗子&#xff0c;比如一个人&#xff0c;他可以是一个老师&#xff0c;也可以是一个歌手&#xff0c;也可以是一个丈夫...... 行为多态&#xff1a;举…

空间计算时代催生新一波巨大算力市场需求

什么是空间计算&#xff1f; 空间计算是一种整合虚拟现实&#xff08;VR&#xff09;、增强现实&#xff08;AR&#xff09;、混合现实&#xff08;MR&#xff09;等技术的计算模式&#xff0c;旨在将数字信息与真实世界融合在一起。这种融合创造了一个全新的计算环境&#xff…

优惠券兑换码生成需求——事务失效问题分析

前段时间收到一个优惠券兑换码的需求&#xff1a;管理后台针对一个优惠券发起批量生成兑换码&#xff0c;这些兑换码可以导出分发到各个合作渠道&#xff08;比如&#xff1a;抖音、京东等&#xff09;&#xff0c;用户通过这些渠道获取到兑换码之后&#xff0c;再登录到我司研…

将Android应用修改为鸿蒙应用的工作

将Android应用修改为鸿蒙&#xff08;HarmonyOS&#xff09;应用需要进行一系列主要的工作。以下是在进行这一转换过程中可能需要进行的主要工作&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流合作。 1.项目…

Vue2x的自定义指令和render函数使用自定义指令

在某些情况下&#xff0c;我们需要对底层DOM进行操作&#xff0c;而内置的指令不能满足需求&#xff0c;就需要自定义指令。一个自定义指令由一个包含类似组件的生命周期的钩子的对象来定义&#xff0c;钩子函数会接收到指令所绑定的元素作为参数。 定义指令 常用两种方式进行…

【天龙怀旧服】攻略day7

关键字&#xff1a; 新星1.49、金针渡劫、10灵 1】新星&#xff08;苍山破煞&#xff09; 周三周六限定副本&#xff0c;19.00-24.00 通常刷1.49w&#xff0c;刷149点元佑碎金 boss选择通常为狂鬼难度&#xff0c;八风不动即放大不选&#xff0c;第二排第一个也不选&#xf…

SAP SD-DN-MM 交货单相关物料凭证的视图的日期问题

眼下有个需求 获取交货单对应的物料凭证的过账日期BLDAT。 同步BW数据过去 新增一个数据库视图 但是实际使用时&#xff0c;有效部分仅本月&#xff0c;再选择条件里面要加上 MATdoc-bldat > sy-datum - sydatum6(2). 于是使用ST05 跟踪了一下&#xff0c;发现在DD28S…

算法通关村第十五关—继续研究超大规模数据场景的问题(黄金)

继续研究超大规模数据场景的问题 一、对20GB文件进行排序 题目要求&#xff1a;假设你有一个20GB的文件&#xff0c;每行一个字符串&#xff0c;请说明如何对这个文件进行排序&#xff1f;  分析&#xff1a;这里给出大小是20GB,其实面试官就在暗示你不要将所有的文件都装入到…