消息队列篇--基础篇(消息队列特点,应用场景、点对点和发布订阅工作模式,RabbmitMQ和Kafka代码示例等)

1、消息队列的介绍

消息(Message)是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

消息队列(Message Queue,简称MQ)是一种异步通信机制,允许应用程序之间的数据交换通过“消息”进行。生产者将消息发送到消息队列中,无需关心谁来获取消息。消费者从消息队列中读取消息并处理,也无需关心消息的来源。消息队列的核心思想是解耦生产者和消费者,使得它们可以独立运行,而不需要直接交互。

消息队列的特点:

  • 解耦:生产者和消费者不需要同时在线,生产者发送消息后可以立即返回,消费者可以在任何时间消费消息。
  • 异步通信:生产者和消费者之间是异步的,生产者不需要等待消费者的响应,提高了系统的响应速度。
  • 限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况。
  • 负载均衡:多个消费者可以同时从消息队列中读取消息,分担处理任务,实现负载均衡。
  • 高可用性:消息队列通常支持持久化存储,即使系统发生故障,消息也不会丢失。
  • 扩展性:消息队列可以通过增加更多的生产者或消费者来扩展系统的处理能力。

2、消息队列的应用场景

(1)、日志收集与分析

  • 应用场景:在分布式系统中,各个服务会产生大量的日志数据。使用消息队列可以将日志数据异步传输到集中式日志分析平台(如ELK Stack、Splunk),避免日志写入对业务系统的影响。
  • 优点:降低日志写入的延迟,提高系统的吞吐量。

(2)、异步任务处理

  • 应用场景:某些任务(如发送邮件、短信、生成报表等)不需要立即执行,可以将其放入消息队列,由后台进程异步处理。
  • 优点:减少主业务流程的阻塞,提升用户体验。
    同步处理示例图:
    在这里插入图片描述
    使用消息队列异步处理示例图:
    在这里插入图片描述

(3)、系统解耦

  • 应用场景:在微服务架构中,不同服务之间可以通过消息队列进行通信,而不是直接调用对方的接口。这样可以降低服务之间的耦合度,提高系统的可维护性和扩展性。
  • 优点:服务之间的依赖关系减弱,便于独立开发和部署。

示例图:
在这里插入图片描述

(4)、流量削峰

  • 应用场景:在高并发场景下,前端请求可能会超出后端系统的处理能力。使用消息队列可以将请求暂存,逐步处理,避免系统过载。
  • 优点:平滑流量,防止系统崩溃。

示例图:
在这里插入图片描述

(5)、事件驱动架构

  • 应用场景:在事件驱动架构中,系统中的各个组件通过事件(消息)进行通信。例如,用户注册后触发一系列事件(如发送欢迎邮件、创建用户档案、记录日志等)。
  • 优点:提高系统的灵活性和响应速度。

示例图:
在这里插入图片描述
解释:
第一个生产者推送消息到Topic1中。
消费者1消费Topic1中的消息,同时在作为生产者2将结果消息推送到Topic2中。
第二个消费者在消费Topic2中的消息。
简单说:
事件驱动就是同一个消息通过不同主题的发送和消费,实现各个组件在不同阶段对该任务做出正确的处理。

3、消息队列的两种模式

消息队列的主要功能是通过“消息”来解耦生产者和消费者,使得它们可以异步通信。根据消息传递的方式不同,消息队列通常支持两种主要的模式:点对点(Point-to-Point,P2P)模式和发布/订阅(Publish/Subscribe,Pub/Sub)模式。这两种模式在消息传递的行为、适用场景和系统设计上有着显著的区别。

(1)、点对点(Point-to-Point,P2P)模式

点对点模式是一种消息传递方式,其中每个消息只能被一个消费者消费,并且消息一旦被消费,就会从队列中移除。生产者将消息发送到队列中,消费者从队列中取出消息并处理。每个队列可以有多个消费者,但每个消息只能被其中一个消费者消费。

1、P2P工作原理
  • 生产者:生产者将消息发送到一个特定的队列中。
  • 队列:队列是一个存储消息的容器,消息按顺序排列,遵循先进先出(FIFO)的原则。
  • 消费者:消费者从队列中取出消息并处理。每个消息只能被一个消费者消费,消费后该消息会从队列中移除。
2、P2P特点
  • 单次消费:每个消息只能被一个消费者消费,确保消息不会被重复处理。
  • 负载均衡:如果队列中有多个消费者,消息会自动分发给不同的消费者,实现负载均衡。每个消费者处理的消息数量取决于其处理速度。
  • 消息持久化:消息在被消费之前会一直保存在队列中,即使消费者暂时不可用,消息也不会丢失。
  • 可靠性:通常支持消息确认机制,消费者处理完消息后需要向队列发送确认,确保消息不会因为消费者故障而丢失。
3、P2P适用场景
  • 任务分配:适用于需要将任务分配给多个工作者的场景。例如,订单处理系统中,多个订单处理服务可以从同一个队列中获取订单进行处理。
  • 异步任务处理:适用于需要异步执行的任务,如发送邮件、生成报表等。生产者将任务放入队列,消费者在后台逐步处理。
  • 负载均衡:适用于需要将请求分发给多个处理节点的场景,避免某个节点过载。
4、P2P消息队列
  • RabbitMQ的队列模式:RabbitMQ支持P2P模式,生产者将消息发送到队列中,消费者从队列中取出消息并处理。每个消息只能被一个消费者消费。
  • ActiveMQ的点对点队列:ActiveMQ也支持P2P模式,生产者将消息发送到队列中,消费者从队列中取出消息并处理。
5、RabbitMQ代码示例(P2P)
(1)、注入依赖
<!-- RabbitMQ 依赖 -->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)、生产者示例
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class P2PProducer {

    private static final String QUEUE_NAME = "p2p-queue";  // 队列名称

    public static void main(String[] args) throws Exception {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        // 2、建立连接
        try (Connection connection = factory.newConnection();
              // 3、声明队列(如果队列不存在则创建)
            Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 4、发送多条消息到队列
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
}

解释:

  • ConnectionFactory用于创建连接,并通过Channel发送消息到指定的队列p2p-queue。
  • channel.queueDeclare()用于声明队列,确保队列存在。如果队列已经存在,则不会重复创建。
  • channel.basicPublish()将消息发送到队列中,""表示不使用交换机(Exchange),直接将消息发送到队列。
(3)、消费者示例
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class P2PConsumer {

    private static final String QUEUE_NAME = "p2p-queue";  // 队列名称

    public static void main(String[] args) throws Exception {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        // 2、建立连接
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 3、声明队列(确保队列存在)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 4、声明消息处理回调方法。这里仅是声明回调方法,不会直接运行,需要basicConsume方法消费完成后触发回调方法,才会被执行
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            // message为消息,这里写消费消息的具体逻辑
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println(" [x] Done");
            // 手动确认消息已处理
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };

        // 5、开始消费消息,basicConsume为处理消费入口,本例方法体内不做任何处理,直接 触发调用deliverCallback 回调方法
        System.out.println(" [] Waiting for messages. To exit press CTRL+C");
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
    }
}

解释:

  • 消费者同样使用ConnectionFactory创建连接,并通过Channel订阅队列p2p-queue。
  • channel.queueDeclare()确保队列存在。
  • channel.basicConsume()开始消费消息,false参数表示手动确认消息已处理。
  • DeliverCallback是一个回调函数,当消息被basicConsume()消费后会触发该回调。消费者处理完消息后,调用channel.basicAck()确认消息已处理,防止消息丢失。
  • 如果消费者暂时不可用,消息会继续保存在队列中,直到消费者重新连接并处理。

说明:
在RabbitMQ中,队列是实现P2P模式的关键。每个消息只会被一个消费者消费,即使有多个消费者订阅了同一个队列,RabbitMQ会自动将消息分发给不同的消费者,实现负载均衡。
如果你启动多个P2PConsumer实例,它们会自动分担消息的处理任务,确保每个消息只被一个消费者处理。

6、kafka代码示例
(1)、注入依赖
<!-- Kafka 依赖 -->
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>
(2)、生产者示例
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class P2PProducer {

    public static void main(String[] args) {
        // 1、配置生产者
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 2、创建生产者
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            // 3、拼装消息(指定主题,消息的key以及消息的value)
            for (int i = 0; i < 10; i++) {
                String key = "key-" + i;
                String value = "message-" + i;
                ProducerRecord<String, String> record = new ProducerRecord<>("p2p-topic", key, value);

                // 4、异步发送消息,并添加回调函数
                producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null) {
                            System.out.println("Message sent to partition: " + metadata.partition() +
                                    ", offset: " + metadata.offset());
                        } else {
                            System.err.println("Error sending message: " + exception.getMessage());
                        }
                    }
                });
            }

            // 5、确保所有消息都已发送完毕
            producer.flush();
        }
    }
}

解释:

  • 使用KafkaProducer发送消息到p2p-topic主题。
  • 每条消息都有一个键(key)和值(value),并使用StringSerializer进行序列化。
  • 通过producer.send()方法异步发送消息,并提供回调函数来处理发送结果。
  • producer.flush()确保所有消息都已发送完毕。
(3)、消费者示例
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class P2PConsumer {

    public static void main(String[] args) {
        // 1、配置消费者
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "p2p-group");  // 消费者组ID
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");  // 从最早的消息开始消费
        // 2、创建消费者
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            // 3、订阅主题
            consumer.subscribe(Collections.singletonList("p2p-topic"));

            // 4、持续接收消费消息
            while (true) {
                // 拉取消息,等待最多1秒
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

                // 5、处理拉取到的消息
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Received message: key = %s, value = %s, partition = %d, offset = %d%n",
                            record.key(), record.value(), record.partition(), record.offset());
                }

                // 6、提交偏移量
                consumer.commitSync();
            }
        }
    }
}

解释:

  • 使用KafkaConsumer订阅p2p-topic主题。
  • 消费者组ID为p2p-group,确保同一组内的多个消费者可以自动进行负载均衡。
  • consumer.poll()方法用于拉取消息,每次最多等待1秒。
  • 消费者处理完消息后,调用consumer.commitSync()提交偏移量,确保消息不会被重复消费。

说明:
在Kafka中,消费者组是实现P2P模式的关键。当多个消费者属于同一个消费者组时,Kafka会自动将消息分配给不同的消费者,确保每个消息只会被一个消费者消费。
如果你启动多个P2PConsumer实例,它们会自动分担消息的处理任务,实现负载均衡。

(2)、发布/订阅(Publish/Subscribe,Pub/Sub)模式

发布/订阅模式是一种消息传递方式,其中每个消息可以被多个消费者消费。生产者将消息发布到一个主题(Topic),所有订阅了该主题的消费者都可以接收到该消息。每个消费者都可以独立地消费消息,互不干扰。

1、工作原理
  • 生产者:生产者将消息发布到一个主题(Topic)中,而不是直接发送到队列。
  • 主题:主题是一个逻辑上的消息通道,生产者将消息发布到主题中,消费者订阅该主题以接收消息。
  • 消费者:消费者订阅主题后,每当有新消息发布到该主题时,所有订阅了该主题的消费者都会收到一份副本。每个消费者可以独立地处理消息,互不干扰。
2、特点
  • 多消费者:每个消息可以被多个消费者消费,适合广播消息或通知类场景。
  • 松耦合:生产者和消费者之间是完全解耦的,生产者不需要知道有多少消费者订阅了主题,消费者也不需要知道是谁发布了消息。
  • 消息复制:每个订阅了主题的消费者都会收到一份消息副本,因此消息会被复制给所有消费者。
  • 消息保留:通常支持消息保留策略,即使消费者暂时离线,消息也会保留一段时间,直到消费者重新连接并消费。
  • 灵活性:消费者可以根据需要动态订阅或取消订阅主题,灵活应对不同的业务需求。
3、适用场景
  • 事件通知:适用于需要将事件广播给多个接收者的场景。例如,用户注册后触发一系列事件(如发送欢迎邮件、创建用户档案、记录日志等),每个事件处理服务都可以订阅相应的主题。
  • 日志收集:适用于需要将日志数据广播给多个分析系统的场景。例如,多个日志分析平台可以订阅同一个日志主题,各自处理不同的日志数据。
  • 实时推送:适用于需要实时推送消息的场景,如股票行情更新、社交媒体通知等。每个用户可以订阅感兴趣的主题,实时接收相关消息。
4、适用的消息队列
  • Kafka的主题模式:Kafka是典型的Pub/Sub模型,生产者将消息发布到主题中,消费者组中的每个消费者都可以订阅该主题并消费消息。Kafka还支持分区,允许多个消费者并行处理同一主题的不同分区。
  • Pulsar的主题模式:Pulsar也支持Pub/Sub模式,生产者将消息发布到主题中,消费者可以订阅该主题并消费消息。Pulsar还支持层级存储,允许将冷数据存储到低成本存储介质中。
  • RabbitMQ的交换机(Exchange)模式:RabbitMQ支持多种交换机类型,包括Fanout、Direct、Topic等,可以实现Pub/Sub模式。生产者将消息发送到交换机,交换机会根据路由规则将消息转发给多个队列,消费者可以从这些队列中消费消息。
5、RabbitMQ代码示例
(1)、注入依赖
<!-- RabbitMQ 依赖 -->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)、生产者示例
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class PubSubProducer {

    private static final String EXCHANGE_NAME = "pubsub-exchange";

    public static void main(String[] args) throws Exception {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        // 2、建立连接
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 3、声明交换机(类型为 Fanout,即广播)
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

            // 4、发送多条消息到交换机
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
}

解释:

  • 使用ConnectionFactory创建连接,并通过Channel将消息发布到交换机pubsub-exchange。
  • channel.exchangeDeclare()用于声明交换机,类型为Fanout。Fanout交换机会将消息广播给所有绑定到该交换机的队列。
  • channel.basicPublish()将消息发布到交换机,""表示不指定路由键(Routing Key),因为Fanout交换机会将消息广播给所有绑定的队列。
(3)、消费者示例
import com.rabbitmq.client.*;

public class PubSubConsumer {

    private static final String EXCHANGE_NAME = "pubsub-exchange";

    public static void main(String[] args) throws Exception {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        // 2、建立连接
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 3、声明交换机(类型为 Fanout)
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        // 4、声明临时队列(随机生成队列名称)
        String queueName = channel.queueDeclare().getQueue();
        System.out.println(" [] Queue name: " + queueName);

        // 5、将队列绑定到交换机
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        // 6、声明消息处理回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            // 真实消费数据逻辑,message为报文数据
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println(" [x] Done");
            // 自动确认消息已处理
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };

        // 7、开始消费消息,处理完成会触发回调deliverCallback 方法
        System.out.println(" [] Waiting for messages. To exit press CTRL+C");
        channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});
    }
}

解释:

  • 消费者使用ConnectionFactory创建连接,并通过Channel订阅交换机 pubsub-exchange。
  • channel.exchangeDeclare()确保交换机存在。
  • channel.queueDeclare()声明一个临时队列,RabbitMQ会为每个消费者生成一个唯一的队列名称。
  • channel.queueBind()将临时队列绑定到交换机,确保该队列可以接收来自交换机的消息。
  • channel.basicConsume()开始消费消息,false参数表示手动确认消息已处理。
  • DeliverCallback是一个回调函数,当有新消息到达时会触发该回调。消费者处理完消息后,调用channel.basicAck()确认消息已处理。

说明:

  • 在RabbitMQ中,交换机(Exchange)是实现Pub/Sub模式的关键。生产者将消息发布到交换机,所有绑定到该交换机的队列都会接收到消息副本。
  • 每个消费者可以订阅不同的队列,或者多个消费者可以共享同一个队列。通过Fanout交换机,所有绑定到该交换机的队列都会收到相同的消息副本,实现广播效果。
  • 如果你启动多个PubSubConsumer实例,每个实例都会创建一个独立的队列,并绑定到交换机。因此,每个消费者都会收到相同的消息副本,互不干扰。
6、kafka代码示例
(1)、注入依赖
<!-- Kafka 依赖 -->
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>
(2)、生产者示例
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class PubSubProducer {

    public static void main(String[] args) {
        // 1、配置生产者
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 2、创建生产者
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            // 3、组装发送消息到主题
            for (int i = 0; i < 10; i++) {
                String key = "key-" + i;
                String value = "message-" + i;
                ProducerRecord<String, String> record = new ProducerRecord<>("pubsub-topic", key, value);

                // 4、异步发送消息,并添加回调函数
                producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null) {
                            System.out.println("Message sent to partition: " + metadata.partition() +
                                    ", offset: " + metadata.offset());
                        } else {
                            System.err.println("Error sending message: " + exception.getMessage());
                        }
                    }
                });
            }

            // 5、确保所有消息都已发送完毕
            producer.flush();
        }
    }
}

解释:

  • 与P2P模式的生产者代码几乎相同,唯一的区别是消息发送到的主题名称不同(pubsub-topic)。
  • 生产者将消息发布到pubsub-topic,所有订阅了该主题的消费者都可以接收到这些消息。
(3)、消费者示例
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class PubSubConsumer {

    public static void main(String[] args) {
        // 1、配置消费者
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "pubsub-group-" + Thread.currentThread().getId());  // 每个消费者使用不同的组 ID
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");  // 从最早的消息开始消费
        // 2、创建消费者
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            // 3、订阅主题
            consumer.subscribe(Collections.singletonList("pubsub-topic"));

            // 4、持续消费消息
            while (true) {
                // 拉取消息,等待最多 1 秒
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

                // 5、处理拉取到的消息
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Consumer %d received message: key = %s, value = %s, partition = %d, offset = %d%n",
                            Thread.currentThread().getId(), record.key(), record.value(), record.partition(), record.offset());
                }

                // 6、提交偏移量
                consumer.commitSync();
            }
        }
    }
}

解释:

  • 每个消费者使用不同的消费者组ID(pubsub-group-<线程ID>),确保每个消费者都能独立地消费消息。
  • consumer.subscribe()订阅pubsub-topic,所有订阅了该主题的消费者都会收到相同的消息副本。
  • 消费者处理完消息后,调用consumer.commitSync()提交偏移量,确保消息不会被重复消费。

说明:

  • 在Kafka中,每个消费者组是独立的,即使多个消费者订阅了同一个主题,它们也不会共享消息。每个消费者组中的消费者只会消费属于该组的消息。
  • 通过为每个消费者分配不同的消费者组ID,我们可以实现Pub/Sub模式,即每个消息会被广播给所有订阅了该主题的消费者。
  • 如果你启动多个PubSubConsumer实例,每个实例都会使用不同的消费者组ID,因此它们会各自独立地消费消息。每个消费者都会收到相同的消息副本,实现广播效果。

(3)、两种模式对比总结

在这里插入图片描述

  • 点对点(P2P)模式:适用于需要确保每个消息只被处理一次的场景,例如任务分配、异步任务处理和负载均衡。它具有较高的可靠性和负载均衡能力,适合处理任务队列和批处理任务。

  • 发布/订阅(Pub/Sub)模式:适用于需要将消息广播给多个接收者的场景,例如事件通知、日志收集和实时推送。它具有更高的灵活性和解耦性,适合构建事件驱动架构和实时数据流处理系统。

选择哪种模式取决于你的具体需求:

  • 如果你需要确保每个消息只被处理一次,并且希望实现负载均衡,P2P模式是更好的选择。
  • 如果你需要将消息广播给多个接收者,并且希望生产者和消费者之间完全解耦,Pub/Sub模式更加合适。

乘风破浪会有时,直挂云帆济沧海!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/958429.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

状态模式——C++实现

目录 1. 状态模式简介 2. 代码示例 3. 单例状态对象 4. 状态模式与策略模式的辨析 1. 状态模式简介 状态模式是一种行为型模式。 状态模式的定义&#xff1a;状态模式允许对象在内部状态改变时改变它的行为&#xff0c;对象看起来好像修改了它的类。 通俗的说就是一个对象…

GESP202309 三级【进制判断】题解(AC)

》》》点我查看「视频」详解》》》 [GESP202309 三级] 进制判断 题目描述 N N N 进制数指的是逢 N N N 进一的计数制。例如&#xff0c;人们日常生活中大多使用十进制计数&#xff0c;而计算机底层则一般使用二进制。除此之外&#xff0c;八进制和十六进制在一些场合也是常用…

汽车敏捷开发:项目经理如何精准跟进项目流程

在敏捷开发环境中&#xff0c;项目经理身兼协调者、推动者、决策者等关键角色。 作为协调者&#xff0c;需在团队及部门间搭建沟通桥梁&#xff0c;确保信息流畅。 作为推动者&#xff0c;面对迭代中的技术难题、资源短缺等阻碍&#xff0c;要主动寻找解决方案&#xff0c;为…

数据从前端传到后端入库过程分析

数据从前端传到后端入库过程分析 概述 积累了一些项目经验&#xff0c;成长为一个老程序员了&#xff0c;自认为对各种业务和技术都能得心应手的应对了&#xff0c;殊不知很多时候我们借助了搜索引擎的能力&#xff0c;当然现在大家都是通过AI来武装自己。 今天要分析的话题是…

Netty 实战

Netty实践 1 Netty 版本选择2 Netty 模版代码2.1 Server2.2 Client 3 组件3.1 EventLoop、EventLoopGroup3.1.1 EventLoop3.1.2 EventLoopGroup 3.2 Channel3.2.1 ChannelFuture3.2.2 CloseFuture 3.3 ChannelHandler3.2.1 常用的 ChannelInboundHandlerAdapter3.2.1.1 LineBas…

Triton:内存高效注意力机制的实现与解析

Triton:内存高效注意力机制的实现与解析 引言 在深度学习领域&#xff0c;特别是自然语言处理&#xff08;NLP&#xff09;任务中&#xff0c;注意力机制是模型理解序列数据的关键组成部分。然而&#xff0c;随着模型规模和输入长度的增长&#xff0c;传统的注意力机制面临着…

微信小程序使用上拉加载onReachBottom。页面拖不动。一直无法触发上拉的事件。

1&#xff0c;可能是原因是你使用了scroll-view的标签&#xff0c;用onReachBottom触发加载事件。这两个是有冲突的。没办法一起使用。如果页面的样式是滚动的是无法去触发页面的onReachBottom的函数的。因此&#xff0c;你使用overflow:auto.来使用页面的某些元素滚动&#xf…

机器学习2 (笔记)(朴素贝叶斯,集成学习,KNN和matlab运用)

朴素贝叶斯模型 贝叶斯定理&#xff1a; 常见类型 算法流程 优缺点 集成学习算法 基本原理 常见方法 KNN&#xff08;聚类模型&#xff09; 算法性质&#xff1a; 核心原理&#xff1a; 算法流程 优缺点 matlab中的运用 朴素贝叶斯模型 朴素贝叶斯模型是基于贝叶斯…

【2024年华为OD机试】(B卷,100分)- 非严格递增连续数字序列 (JavaScriptJava PythonC/C++)

一、问题描述 题目描述 给定一个仅包含大小写字母和数字的字符串&#xff0c;要求找出其中最长的非严格递增连续数字序列的长度。非严格递增连续数字序列指的是序列中的数字从左到右依次递增或保持不变&#xff0c;例如 12234 就是一个非严格递增连续数字序列。 输入描述 输…

Android中Service在新进程中的启动流程2

目录 1、Service在客户端的启动入口 2、Service启动在AMS的处理 3、Service在新进程中的启动 4、Service与AMS的关系再续 上一篇文章中我们了解了Service在新进程中启动的大致流程&#xff0c;同时认识了与客户端进程交互的接口IApplicationThread以及与AMS交互的接口IActi…

Three城市引擎地图插件Geo-3d

一、简介 基于Three开发&#xff0c;为Three 3D场景提供GIS能力和城市底座渲染能力。支持Web墨卡托、WGS84、GCJ02等坐标系&#xff0c;支持坐标转换&#xff0c;支持影像、地形、geojson建筑、道路&#xff0c;植被等渲染。支持自定义主题。 二、效果 三、代码 //插件初始化…

Ubuntu环境 nginx 源码 编译安装

ubuntu 终端 使用 wget 下载源码 sudo wget http://nginx.org/download/nginx-1.24.0.tar.gz解压刚下载的源码压缩包 nginx-1.24.0.tar.gz sudo tar -zxvf nginx-1.24.0.tar.gz 解压完成 产生 nginx-1.24.0 目录 进入该目录 cd ./nginx-1.24.0 目录下有一个可执行文件 con…

【深度学习】神经网络实战分类与回归任务

第一步 读取数据 ①导入torch import torch ②使用魔法命令&#xff0c;使它使得生成的图形直接嵌入到 Notebook 的单元格输出中&#xff0c;而不是弹出新的窗口来显示图形 %matplotlib inline③读取文件 from pathlib import Path import requestsDATA_PATHPath("dat…

60,【1】BUUCF web [RCTF2015]EasySQL1

先查看源码 1&#xff0c;changepwd&#xff08;修改密码&#xff09; <?php // 开启会话&#xff0c;以便使用会话变量 session_start();// 设置页面的内容类型为 HTML 并使用 UTF-8 编码 header("Content-Type: text/html; charsetUTF-8");// 引入配置文件&…

Chrome插件:图片缩放为头像(128*128)

前置条件&#xff1a; 安装有chrome谷歌浏览器的电脑 使用步骤&#xff1a; 1.打开chrome扩展插件 2.点击管理扩展程序 3.加载已解压的扩展程序 4.选择对应文件夹 5.成功后会出现一个扩展小程序 6.点击对应小程序 7.使用小程序 8.拖拽成功后会自动保存到下载 代码&#xf…

machine learning knn算法之使用KNN对鸢尾花数据集进行分类

通过导入必要的scikit-learn导入必要的库&#xff0c;加载给定的数据&#xff0c;划分测试集和训练集之后训练预测和评估即可 具体代码如下&#xff1a; import numpy as np from sklearn.datasets import load_iris from sklearn.model_selection import train_test_split f…

电子应用设计方案102:智能家庭AI鱼缸系统设计

智能家庭 AI 鱼缸系统设计 一、引言 智能家庭 AI 鱼缸系统旨在为鱼类提供一个健康、舒适的生活环境&#xff0c;同时为用户提供便捷的管理和观赏体验。 二、系统概述 1. 系统目标 - 自动维持水质稳定&#xff0c;包括水温、酸碱度、硬度和溶氧量等关键指标。 - 智能投食&…

【C语言系列】深入理解指针(3)

深入理解指针&#xff08;3&#xff09; 一、字符指针变量二、数组指针变量2.1数组指针变量是什么&#xff1f;2.2数组指针变量怎么初始化&#xff1f; 三、二维数组传参的本质四、函数指针变量4.1函数指针变量的创建4.2函数指针变量的使用4.3两段有趣的代码4.4 typedef关键字 …

2024年度总结-CSDN

2024年CSDN年度总结 Author&#xff1a;OnceDay Date&#xff1a;2025年1月21日 一位热衷于Linux学习和开发的菜鸟&#xff0c;试图谱写一场冒险之旅&#xff0c;也许终点只是一场白日梦… 漫漫长路&#xff0c;有人对你微笑过嘛… 文章目录 2024年CSDN年度总结1. 整体回顾2…

Linux下php8安装phpredis扩展的方法

Linux下php8安装phpredis扩展的方法 下载redis扩展执行安装编辑php.ini文件重启php-fpmphpinfo 查看 下载redis扩展 前提是已经安装好redis服务了 php-redis下载地址 https://github.com/phpredis/phpredis 执行命令 git clone https://github.com/phpredis/phpredis.git执行…