RabbitMQ 教程 | 客户端开发向导

👨🏻‍💻 热爱摄影的程序员
👨🏻‍🎨 喜欢编码的设计师
🧕🏻 擅长设计的剪辑师
🧑🏻‍🏫 一位高冷无情的编码爱好者
大家好,我是 DevOps 工程师
欢迎分享 / 收藏 / 赞 / 在看!

这篇 RabbitMQ 教程为学习者提供了全面的内容,从 RabbitMQ 的简介开始,涵盖了消息中间件的概念、RabbitMQ 的安装与使用,以及交换机、队列、路由键等相关概念的介绍。进一步深入,教程探讨了 AMQP 协议、客户端开发向导,以及消息的发送和消费方式。同时,学习者还可以了解消息传输保障、高级特性如死信队列、延迟队列、优先级队列、RPC 实现等。此外,教程还涵盖了 RabbitMQ 的管理、配置、运维、监控和集群管理等重要主题,帮助学习者充分掌握 RabbitMQ 的应用。整篇教程丰富内容详实,适合初学者和有经验的开发者参考学习。

全篇共 11 章,9 万余字。本文:第3章 客户端开发向导。

第3章 客户端开发向导

3.1 连接 RabbitMQ

在本节中,我们将学习如何使用 RabbitMQ 的客户端库与 RabbitMQ 服务器建立连接。

在 Spring Boot 中,连接 RabbitMQ 可以通过 Spring AMQP 库来实现。Spring AMQP 是 Spring 对 AMQP 协议的封装,使得在 Spring Boot 应用中连接和使用 RabbitMQ 变得非常方便。下面是连接 RabbitMQ 的步骤:

  1. 添加依赖: 在 pom.xml 文件中添加 Spring Boot 和 Spring AMQP 的依赖。确保你已经正确配置了 Maven 或 Gradle 来管理依赖。
<!-- Spring Boot Starter -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 配置 RabbitMQ 连接信息: 在 application.properties(或 application.yml)文件中配置 RabbitMQ 的连接信息,包括主机名、端口、用户名、密码等。
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

可以根据实际情况修改上述配置信息,确保连接到正确的 RabbitMQ 服务器。

  1. 创建 RabbitMQ 连接工厂: 在 Spring Boot 中,通过 ConnectionFactory 来创建 RabbitMQ 连接。可以直接使用 CachingConnectionFactory,它是 ConnectionFactory 的实现,会缓存连接,提高连接的复用性和性能。
@Configuration
public class RabbitMQConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }
}
  1. 创建 RabbitTemplate: RabbitTemplate 是 Spring AMQP 提供的用于与 RabbitMQ 交互的模板类。它封装了发送和接收消息的方法,简化了与 RabbitMQ 的交互过程。
@Configuration
public class RabbitMQConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        // ...
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
}
  1. 发送和接收消息: 现在,你可以在应用中使用 RabbitTemplate 来发送和接收消息了。通过调用 convertAndSend 方法发送消息,调用 receiveAndConvert 方法接收消息。
@RestController
public class MessageController {

    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public MessageController(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @PostMapping("/send")
    public String sendMessage(@RequestBody String message) {
        rabbitTemplate.convertAndSend("exchange", "routingKey", message);
        return "Message sent successfully!";
    }
}

这是一个简单的示例,通过发送 POST 请求,将消息发送到名为"exchange"的交换机,并使用"routingKey"进行路由。

以上就是在 Spring Boot 中连接 RabbitMQ 的基本步骤。通过 Spring AMQP 的封装,你可以轻松地在应用中与 RabbitMQ 进行交互,实现可靠的消息传递。

3.2 使用交换机和队列

在 RabbitMQ 中,交换机(Exchange)和队列(Queue)是两个重要的组件,它们一起协同工作,实现消息的传递和路由。以下是 RabbitMQ 使用交换机和队列的基本流程:

  1. 声明交换机: 首先,生产者或消费者需要声明一个交换机。交换机是消息的接收和路由中心,负责接收来自生产者的消息,并根据消息的路由键将消息路由到一个或多个队列中。声明交换机时,需要指定交换机的名称、类型和其他相关参数。
  2. 声明队列: 接下来,生产者或消费者需要声明一个队列。队列用于存储消息,生产者发送的消息会被交换机路由到队列中,而消费者从队列中接收消息进行处理。声明队列时,需要指定队列的名称、是否持久化、是否独占等参数。
  3. 绑定交换机和队列: 在声明了交换机和队列之后,需要将队列绑定到交换机上。绑定是指将队列与交换机关联起来,使得交换机可以将消息路由到指定的队列中。在绑定时,需要指定交换机的名称、队列的名称以及绑定键(Binding Key)等参数。
  4. 生产者发送消息: 生产者使用指定的交换机和路由键将消息发送到 RabbitMQ 服务器。消息发送到交换机后,交换机会根据消息的路由键将消息路由到绑定到它的队列中。
  5. 消费者接收消息: 消费者订阅队列,开始接收消息。当队列中有消息到达时,RabbitMQ 服务器将消息传递给消费者的消息回调函数。消费者可以在回调函数中处理收到的消息。

通过交换机和队列的灵活组合,RabbitMQ 可以实现不同类型的消息传递模式,如点对点(Point-to-Point)和发布-订阅(Publish-Subscribe)。生产者将消息发送到交换机,交换机根据消息的路由键将消息路由到相应的队列中,消费者订阅队列并接收消息进行处理。这种灵活的消息传递机制使得 RabbitMQ 非常适用于构建可靠的消息传递系统。

3.2.1 exchangeDeclare 方法详解

exchangeDeclare 方法是用于在 RabbitMQ 中声明交换机(Exchange)的方法。在使用该方法前,需要先连接到 RabbitMQ 服务器,并创建一个通道(Channel)。exchangeDeclare 方法的详细参数和含义如下:

void exchangeDeclare(
    String exchangeName,    // 交换机名称
    String exchangeType,    // 交换机类型
    boolean durable,        // 是否持久化
    boolean autoDelete,     // 是否自动删除
    boolean internal,       // 是否是内部使用的交换机
    Map<String, Object> arguments // 其他参数
) throws IOException;

参数解释:

  1. exchangeName(String):要声明的交换机的名称,是一个字符串。交换机名称在 RabbitMQ 中必须是唯一的。
  2. exchangeType(String):交换机的类型,是一个字符串。RabbitMQ 支持不同类型的交换机,常用的类型包括"direct"、"fanout"、"topic"和"headers"。不同类型的交换机有不同的消息路由规则。
  3. durable(boolean):是否持久化交换机。如果设置为 true,交换机会在 RabbitMQ 服务器重启后仍然存在,否则会在服务器重启时删除。
  4. autoDelete(boolean):是否自动删除交换机。如果设置为 true,当交换机不再被使用时(没有队列绑定到该交换机),RabbitMQ 会自动删除该交换机。
  5. internal(boolean):是否是内部使用的交换机。如果设置为 true,该交换机只能被 RabbitMQ 内部使用,客户端无法直接发送消息到该交换机。
  6. arguments(Map<String, Object>):其他参数,是一个键值对的 Map。可以通过该参数设置一些额外的参数,例如交换机的备份参数、TTL 参数等。

示例使用代码:

public class ExchangeDeclareExample {

    public static void main(String[] args) throws IOException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            String exchangeName = "myExchange";
            String exchangeType = "fanout";
            boolean durable = true;
            boolean autoDelete = false;
            boolean internal = false;
            Map<String, Object> arguments = new HashMap<>();

            channel.exchangeDeclare(exchangeName, exchangeType, durable, autoDelete, internal, arguments);
            System.out.println("Exchange declared successfully!");
        }
    }
}

上述示例中,创建了一个名为"myExchange"的 fanout 类型的持久化交换机。你可以根据需要选择不同类型的交换机,并根据业务需求设置其他参数。成功执行 exchangeDeclare 方法后,交换机将在 RabbitMQ 服务器中声明并可用于消息的路由和传递。

3.2.2 queueDeclare 方法详解

queueDeclare 方法是用于在 RabbitMQ 中声明队列(Queue)的方法。在使用该方法前,需要先连接到 RabbitMQ 服务器,并创建一个通道(Channel)。queueDeclare 方法的详细参数和含义如下:

AMQP.Queue.DeclareOk queueDeclare(
    String queueName,        // 队列名称
    boolean durable,         // 是否持久化
    boolean exclusive,       // 是否独占队列
    boolean autoDelete,      // 是否自动删除
    Map<String, Object> arguments // 其他参数
) throws IOException;

参数解释:

  1. queueName(String):要声明的队列的名称,是一个字符串。队列名称在 RabbitMQ 中必须是唯一的。
  2. durable(boolean):是否持久化队列。如果设置为 true,队列会在 RabbitMQ 服务器重启后仍然存在,否则会在服务器重启时删除。
  3. exclusive(boolean):是否独占队列。如果设置为 true,该队列只能被当前连接使用,其他连接无法访问。通常用于临时队列。
  4. autoDelete(boolean):是否自动删除队列。如果设置为 true,当队列不再被使用时(没有消费者订阅该队列),RabbitMQ 会自动删除该队列。
  5. arguments(Map<String, Object>):其他参数,是一个键值对的 Map。可以通过该参数设置一些额外的参数,例如队列的 TTL 参数、死信队列参数等。

示例使用代码:

public class QueueDeclareExample {

    public static void main(String[] args) throws IOException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            String queueName = "myQueue";
            boolean durable = true;
            boolean exclusive = false;
            boolean autoDelete = false;
            Map<String, Object> arguments = new HashMap<>();

            channel.queueDeclare(queueName, durable, exclusive, autoDelete, arguments);
            System.out.println("Queue declared successfully!");
        }
    }
}

上述示例中,创建了一个名为"myQueue"的持久化队列。你可以根据需要设置队列的持久性、独占性、自动删除和其他参数。成功执行 queueDeclare 方法后,队列将在 RabbitMQ 服务器中声明并可用于消息的接收和处理。

3.2.3 queueBind 方法详解

queueBind 方法用于在 RabbitMQ 中将队列(Queue)绑定到交换机(Exchange)。在使用该方法前,需要先连接到 RabbitMQ 服务器,并创建一个通道(Channel)。queueBind 方法的详细参数和含义如下:

void queueBind(
    String queueName,        // 队列名称
    String exchangeName,     // 交换机名称
    String routingKey,       // 绑定键(Binding Key)
    Map<String, Object> arguments // 其他参数
) throws IOException;

参数解释:

  1. queueName(String):要绑定的队列的名称,是一个字符串。
  2. exchangeName(String):要绑定的交换机的名称,是一个字符串。
  3. routingKey(String):绑定键(Binding Key),用于指定交换机将消息路由到队列的规则。不同类型的交换机对绑定键的匹配规则有所不同。
  4. arguments(Map<String, Object>):其他参数,是一个键值对的 Map。可以通过该参数设置一些额外的参数,例如绑定的头信息(Headers)等。

示例使用代码:

public class QueueBindExample {

    public static void main(String[] args) throws IOException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            String queueName = "myQueue";
            String exchangeName = "myExchange";
            String routingKey = "myRoutingKey";
            Map<String, Object> arguments = new HashMap<>();

            channel.queueBind(queueName, exchangeName, routingKey, arguments);
            System.out.println("Queue bound to exchange successfully!");
        }
    }
}

上述示例中,将名为"myQueue"的队列绑定到名为"myExchange"的交换机,绑定键为"myRoutingKey"。根据实际情况,你需要设置正确的队列名称、交换机名称和绑定键。成功执行 queueBind 方法后,交换机将根据绑定键的规则将消息路由到队列中,从而实现消息的传递和处理。

3.2.4 exchangeBind 方法详解

exchangeBind 方法用于在 RabbitMQ 中将一个交换机(Exchange)绑定到另一个交换机。在使用该方法前,需要先连接到 RabbitMQ 服务器,并创建一个通道(Channel)。exchangeBind 方法的详细参数和含义如下:

void exchangeBind(
    String destinationExchange,   // 目标交换机名称
    String sourceExchange,        // 源交换机名称
    String routingKey,            // 绑定键(Binding Key)
    Map<String, Object> arguments // 其他参数
) throws IOException;

参数解释:

  1. destinationExchange(String):要绑定到的目标交换机的名称,是一个字符串。
  2. sourceExchange(String):要绑定的源交换机的名称,是一个字符串。
  3. routingKey(String):绑定键(Binding Key),用于指定交换机将消息从源交换机路由到目标交换机的规则。不同类型的交换机对绑定键的匹配规则有所不同。
  4. arguments(Map<String, Object>):其他参数,是一个键值对的 Map。可以通过该参数设置一些额外的参数,例如绑定的头信息(Headers)等。

示例使用代码:

public class ExchangeBindExample {

    public static void main(String[] args) throws IOException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            String destinationExchange = "destinationExchange";
            String sourceExchange = "sourceExchange";
            String routingKey = "myRoutingKey";
            Map<String, Object> arguments = new HashMap<>();

            channel.exchangeBind(destinationExchange, sourceExchange, routingKey, arguments);
            System.out.println("Exchange bound successfully!");
        }
    }
}

上述示例中,将名为"destinationExchange"的交换机绑定到名为"sourceExchange"的交换机,绑定键为"myRoutingKey"。根据实际情况,你需要设置正确的目标交换机名称、源交换机名称和绑定键。成功执行 exchangeBind 方法后,消息从源交换机会被路由到目标交换机,进而实现更灵活的消息传递和路由。

3.2.5 何时创建

在 RabbitMQ 中,创建交换机和队列的时机取决于你的应用需求和消息传递模式。不同的场景可能需要不同的处理方式。以下是一些常见的场景和处理建议:

  1. 确定消息传递模式: 在创建交换机和队列之前,首先要明确你的消息传递模式。是点对点传输还是发布-订阅模式?根据消息传递模式,选择合适的交换机类型和绑定方式。
  2. 静态创建交换机和队列: 如果你的交换机和队列在应用启动时就已经确定,且不会动态变化,可以在应用启动时静态创建它们。这样做可以确保交换机和队列在应用运行期间一直可用。
  3. 动态创建交换机和队列: 如果你的交换机和队列是根据实际情况动态变化的,可以在需要时动态创建它们。例如,根据用户的订阅行为,动态创建队列用于订阅特定类型的消息。这样做可以节省资源,避免不必要的队列和交换机占用空间。
  4. 创建时机:
    • 对于持久化的交换机和队列,建议在应用启动时创建,确保它们在服务器重启后仍然存在。
    • 对于临时的交换机和队列,可以在需要时创建,使用完毕后再自动删除,节省资源。
  1. 错误处理:
    • 如果创建交换机或队列失败,应该处理创建失败的情况,例如记录日志、重试或通知管理员。
    • 在动态创建交换机和队列时,还需要考虑并发访问和资源竞争的情况,确保创建过程的线程安全性。
  1. 防止重复创建:
    • 在动态创建交换机和队列时,需要注意防止重复创建相同名称的交换机和队列。可以使用缓存或数据库记录已创建的交换机和队列,避免重复创建。

创建交换机和队列的时机和处理方式应该根据你的业务需求和消息传递模式来确定。灵活地根据实际情况选择静态或动态创建,以及持久化或临时创建,确保消息传递的高效性和可靠性。同时,还要注意错误处理和资源竞争等情况,保证应用的稳定性和可靠性。

3.3 发送消息

使用 RabbitMQ 的客户端库向交换机发送消息通常涉及以下步骤:

  1. 建立连接和创建通道: 首先,你需要建立到 RabbitMQ 服务器的连接,并创建一个通道(Channel)。RabbitMQ 的连接和通道是发送和接收消息的基础。
  2. 声明交换机: 在发送消息之前,你需要先声明要使用的交换机(Exchange)。交换机负责将消息路由到队列。
  3. 发布消息: 使用 basicPublish 方法将消息发布到交换机。在这里,你需要指定交换机的名称、路由键(Routing Key)以及要发送的消息内容。
  4. 关闭连接和通道: 当发送完所有消息后,记得及时关闭通道和连接,以释放资源。

下面是一个使用 RabbitMQ 的 Java 客户端库实现向交换机发送消息的简单示例:

public class MessagePublisher {

    private static final String EXCHANGE_NAME = "myExchange";
    private static final String ROUTING_KEY = "myRoutingKey";
    private static final String MESSAGE = "Hello RabbitMQ!";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            // 发布消息
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, MESSAGE.getBytes());
            System.out.println("Message sent successfully!");

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

注意事项:

  1. 确保交换机已经存在: 在发送消息之前,确保你要使用的交换机已经在 RabbitMQ 服务器上声明过。否则,消息将无法正确路由到队列。
  2. 确认路由键和交换机类型匹配: 确保发送消息时指定的路由键和交换机类型匹配,否则消息可能无法正确路由到队列。
  3. 处理异常: 在发送消息时,可能会出现网络异常或其他错误。在实际应用中,建议捕获异常并处理,例如记录日志或重试。
  4. 避免阻塞: 在发送消息时,不要在主线程中执行阻塞操作。如果发送大量消息,考虑使用异步发送消息,避免主线程阻塞。
  5. 注意消息序列化: 在实际应用中,你可能需要将复杂对象转换为字节流进行传输。在这种情况下,需要考虑消息的序列化和反序列化。

使用 RabbitMQ 的客户端库向交换机发送消息是一个相对简单的过程。需要注意交换机和队列的声明,正确设置路由键和交换机类型,处理异常情况,以及避免阻塞操作。合理地使用异步发送消息可以提高系统性能和吞吐量。同时,根据实际需求进行消息序列化和反序列化,确保消息的正确传递和处理。

3.4 消费消息

使用 RabbitMQ 的客户端库从队列中消费消息通常涉及以下步骤:

  1. 建立连接和创建通道: 首先,你需要建立到 RabbitMQ 服务器的连接,并创建一个通道(Channel)。RabbitMQ 的连接和通道是接收和处理消息的基础。
  2. 声明队列: 在消费消息之前,你需要先声明要消费的队列(Queue)。如果队列不存在,RabbitMQ 会自动创建一个新的队列。
  3. 创建消费者: 使用 DefaultConsumer 类创建一个消费者,该类是 RabbitMQ 客户端库中提供的默认消费者实现。
  4. 注册消费者: 使用 basicConsume 方法将消费者注册到队列上,从而开始接收和处理消息。
  5. 处理消息: 在消费者的 handleDelivery 方法中,你可以自定义对接收到的消息进行处理的逻辑。
  6. 关闭连接和通道: 当消费消息的任务完成后,记得及时关闭通道和连接,以释放资源。

下面是一个使用 RabbitMQ 的 Java 客户端库实现从队列中消费消息的简单示例:

public class MessageConsumer {

    private static final String QUEUE_NAME = "myQueue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 创建消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) {
                    String message = new String(body, "UTF-8");
                    System.out.println("Received message: " + message);
                }
            };

            // 注册消费者
            channel.basicConsume(QUEUE_NAME, true, consumer);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

不同消费模式的选择:

  1. 自动确认模式(Automatic Acknowledgment):
    • 设置 autoAck 参数为 true,表示自动确认消息。
    • 适用于简单、不需要保证消息处理一次性的场景。
    • 只要消息被消费者接收,RabbitMQ 就会将消息从队列中删除,不管消费者是否处理成功。
  1. 手动确认模式(Manual Acknowledgment):
    • 设置 autoAck 参数为 false,表示手动确认消息。
    • 需要在消费者处理完消息后,调用 basicAck 方法手动确认消息,告知 RabbitMQ 消息已被处理。
    • 适用于需要确保消息处理的可靠性和一次性处理的场景。
  1. 消费者预取(Consumer Prefetch):
    • 使用 basicQos 方法设置 prefetchCount 参数,表示消费者一次性从队列中预取的消息数量。
    • 通过合理设置预取数量,可以提高消息处理的吞吐量和性能。

注意事项:

  1. 防止消息丢失:
    • 确保在处理消息时,发生异常时不要丢失消息。可以捕获异常后重新处理或记录日志。
  1. 防止消息阻塞:
    • 避免在消费者的 handleDelivery 方法中执行耗时的操作,避免阻塞其他消息的处理。
  1. 并发处理:
    • 在多线程环境中,需要确保消费者的线程安全性,避免并发问题。

使用 RabbitMQ 的客户端库从队列中消费消息是一个相对简单的过程。根据实际需求选择自动确认模式或手动确认模式,注意消息的处理可靠性和防止阻塞,合理设置消费者预取参数以提高性能。在实际应用中,还需要考虑异常处理、并发问题和消息丢失等情况,确保消息的可靠传递和处理。

3.4.1 推模式

推模式是一种消息消费方式,其中消费者主动从队列中取出消息并进行处理。相比于拉模式,推模式更加主动和实时。

推模式的消费方式可以通过以下步骤实现:

  1. 建立连接和创建通道: 首先,你需要建立到 RabbitMQ 服务器的连接,并创建一个通道(Channel)。RabbitMQ 的连接和通道是发送和接收消息的基础。
  2. 声明队列: 在消费消息之前,你需要先声明要消费的队列(Queue)。如果队列不存在,RabbitMQ 会自动创建一个新的队列。
  3. 注册消费者: 使用 basicConsume 方法将消费者注册到队列上,从而开始接收和处理消息。需要设置autoAck参数为false,以使用手动确认模式。
  4. 获取消息: 使用 basicGet 方法主动从队列中获取一条消息。该方法会立即返回,无论队列中是否有消息。如果队列为空,返回的消息对象为 null。
  5. 处理消息: 对于获取的消息对象,你可以进行相应的处理操作,例如解析消息内容、执行业务逻辑等。
  6. 手动确认: 在消息处理完成后,调用 basicAck 方法手动确认消息。通过向 RabbitMQ 发送确认信息,告知它消息已被处理。
  7. 重复获取和处理消息: 重复执行步骤 4 至步骤 6,即循环获取和处理队列中的消息。

下面是一个使用 RabbitMQ 的 Java 客户端库实现推模式消费消息的简单示例:

public class PushConsumer {

    private static final String QUEUE_NAME = "myQueue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 注册消费者
            channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) {
                    String message = new String(body, "UTF-8");
                    System.out.println("Received message: " + message);

                    // 处理消息

                    // 手动确认消息
                    try {
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });

            // 不断循环获取和处理消息
            while (true) {
                // 获取消息
                GetResponse response = channel.basicGet(QUEUE_NAME, false);

                if (response != null) {
                    // 处理消息
                    String message = new String(response.getBody(), "UTF-8");
                    System.out.println("Received message: " + message);

                    // 手动确认消息
                    channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

注意事项:

  1. 避免阻塞: 在获取消息和处理消息的过程中,避免阻塞操作,以允许消费者能够及时获取到消息并进行处理。
  2. 处理异常: 在实际应用中,可能会出现网络异常或其他错误。在处理消息时,建议捕获异常并进行适当的处理,例如记录日志、重试或通知管理员。
  3. 控制消费速度: 使用合适的方式控制消费速度,避免消费者处理消息的速度过快或过慢。
  4. 注意消息处理的幂等性: 由于消息的推送和处理是异步的,确保消息处理的幂等性,以防止重复处理相同的消息。

推模式的消费方式可以更实时地获取消息并进行处理,适用于需要快速响应和实时性要求较高的场景。然而,需要注意消费者的阻塞和异常处理,以及消息处理的幂等性问题。根据具体的业务需求,选择合适的消费模式,推模式和拉模式都有各自的适用场景。

3.4.2 拉模式

拉模式是一种消息消费方式,其中消费者根据需要主动从队列中拉取消息进行处理。相比于推模式,拉模式更加灵活,消费者可以根据自身的处理能力和需求主动控制消息获取的频率。

拉模式的消费方式可以通过以下步骤实现:

  1. 建立连接和创建通道: 首先,你需要建立到 RabbitMQ 服务器的连接,并创建一个通道(Channel)。RabbitMQ 的连接和通道是发送和接收消息的基础。
  2. 声明队列: 在消费消息之前,你需要先声明要消费的队列(Queue)。如果队列不存在,RabbitMQ 会自动创建一个新的队列。
  3. 获取消息: 使用 basicGet 方法主动从队列中获取一条消息。该方法会立即返回,无论队列中是否有消息。如果队列为空,返回的消息对象为 null。
  4. 处理消息: 对于获取的消息对象,你可以进行相应的处理操作,例如解析消息内容、执行业务逻辑等。
  5. 手动确认: 在消息处理完成后,调用 basicAck 方法手动确认消息。通过向 RabbitMQ 发送确认信息,告知它消息已被处理。
  6. 重复获取和处理消息: 通过循环不断地执行步骤 3 至步骤 5,即可实现拉模式的消息消费。

下面是一个使用 RabbitMQ 的 Java 客户端库实现拉模式消费消息的简单示例:

public class PullConsumer {

    private static final String QUEUE_NAME = "myQueue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 循环获取和处理消息
            while (true) {
                // 获取消息
                GetResponse response = channel.basicGet(QUEUE_NAME, true);

                if (response != null) {
                    // 处理消息
                    String message = new String(response.getBody(), "UTF-8");
                    System.out.println("Received message: " + message);
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

注意事项:

  1. 控制消息获取频率: 在拉模式下,消费者可以根据自身的处理能力和需求主动控制消息获取的频率。可以使用合适的等待时间,以避免过于频繁地获取消息。
  2. 处理异常: 在实际应用中,可能会出现网络异常或其他错误。在处理消息时,建议捕获异常并进行适当的处理,例如记录日志、重试或通知管理员。
  3. 注意消息处理的幂等性: 由于消息的拉取和处理是异步的,确保消息处理的幂等性,以防止重复处理相同的消息。

拉模式的消费方式相对于推模式更加灵活,适用于需要根据消费者自身需求主动控制消息获取的场景。然而,需要注意控制消息获取频率和异常处理,以及消息处理的幂等性问题。根据具体的业务需求,选择合适的消费模式,拉模式和推模式都有各自的适用场景。

3.5 消费端的确认与拒绝

在 RabbitMQ 中,消费者可以通过手动确认和拒绝消息来确保消息的处理可靠性。当消费者成功处理了一条消息时,可以发送确认消息给 RabbitMQ,告知它该消息已经被处理。如果在处理消息时出现异常或处理失败,消费者可以发送拒绝消息给 RabbitMQ,要求重新投递或将消息转移到死信队列。以下是手动确认和拒绝消息的方法:

  1. 手动确认消息: 在消费者处理消息成功后,调用 basicAck 方法手动确认消息。这会告诉 RabbitMQ 该消息已被成功处理,可以从队列中删除。
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) {
        try {
            // 处理消息
            processMessage(body);

            // 手动确认消息
            channel.basicAck(envelope.getDeliveryTag(), false);
        } catch (Exception e) {
            // 处理异常
            e.printStackTrace();
            // 如果处理消息失败,可以选择拒绝消息并重新投递或转移到死信队列
            // channel.basicNack(envelope.getDeliveryTag(), false, true);
        }
    }
});
  1. 手动拒绝消息: 在消费者处理消息失败时,可以调用 basicReject 方法手动拒绝消息。第三个参数 requeue 指定是否重新将消息放回队列。如果 requeue 为 false,则消息将会被删除或进入死信队列;如果为 true,则消息会重新投递到队列中。
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) {
        try {
            // 处理消息(假设出现异常)
            throw new Exception("Something went wrong!");

        } catch (Exception e) {
            // 处理异常
            e.printStackTrace();

            // 手动拒绝消息,并不重新投递
            channel.basicReject(envelope.getDeliveryTag(), false);
            // 或手动拒绝消息,并重新投递
            // channel.basicReject(envelope.getDeliveryTag(), true);
        }
    }
});

在消费端出现异常时的处理方式取决于业务需求和消息处理策略:

  • 如果消息处理失败,且不希望重新处理该消息,可以使用 basicReject 方法拒绝消息,并设置 requeue 参数为 false,告诉 RabbitMQ 将该消息丢弃或转移到死信队列。
  • 如果消息处理失败,但希望重新处理该消息,可以使用 basicReject 方法拒绝消息,并设置 requeue 参数为 true,将消息重新放回队列。
  • 如果消息处理失败,但希望等待一段时间后再重新处理该消息,可以使用 basicNack 方法拒绝消息,并设置 requeue 参数为 true,并结合消息的过期时间或延迟队列来实现延迟重试。
  • 如果消息处理失败,但希望将该消息保存起来以供稍后处理,可以使用 basicReject 方法拒绝消息,并将消息内容持久化到数据库或其他存储介质中。
  • 在处理异常时,建议记录日志,以便后续排查问题和分析失败原因。

总之,手动确认和拒绝消息能够确保消息的处理可靠性,并根据业务需求和消息处理策略选择适当的处理方式。在消费端出现异常时,可以选择拒绝消息并重新投递、拒绝消息并将其丢弃或转移到死信队列,或者将消息持久化到数据库中等方式来处理异常情况。

3.6 关闭连接

正确地关闭与 RabbitMQ 服务器的连接是很重要的,这样可以释放资源并避免可能的内存泄漏或其他问题。在关闭连接时,需要注意以下事项:

  1. 关闭通道(Channel): 在关闭连接之前,先关闭所有的通道。通道是进行消息传递的实际渠道,关闭通道可以确保所有的消息都已被处理或传递。
  2. 停止消费者: 如果存在消费者,确保在关闭连接之前先停止消费者。这样可以防止消费者在连接关闭后继续尝试消费消息,从而导致资源浪费或错误。
  3. 关闭连接: 最后,关闭与 RabbitMQ 服务器的连接。

在Java客户端库中,关闭连接的操作如下所示:

public class ConnectionManager {
    private Connection connection;

    // 建立连接
    public void connect() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try {
            connection = factory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 关闭连接
    public void closeConnection() {
        try {
            if (connection != null) {
                // 关闭所有通道
                for (Channel channel : connection.getChannels()) {
                    channel.close();
                }
                // 关闭连接
                connection.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

需要注意的事项:

  1. 在关闭连接前确保所有的通道都已关闭。忽略关闭通道的步骤可能导致资源泄漏。
  2. 在关闭连接时,要确保所有需要处理的消息都已被消费或确认。如果有未处理的消息,它们可能会在连接关闭后重新投递给其他消费者,或者进入死信队列,具体取决于消费端的处理策略。
  3. 在连接关闭后,不要再试图使用已关闭的连接或通道。这可能会导致意外错误。
  4. 如果在连接关闭前出现异常,要进行适当的异常处理,例如记录日志或尝试重新关闭连接。

总结起来,正确地关闭与 RabbitMQ 服务器的连接是确保应用程序稳定性和性能的重要一步。遵循上述步骤,先关闭通道,再停止消费者,最后关闭连接。同时,注意异常处理,以及在连接关闭后不再使用已关闭的连接或通道。这样可以避免资源泄漏和其他可能的问题,并保证应用程序的正常运行。

3.7 小结

本章介绍了 RabbitMQ 客户端的开发向导,包括连接 RabbitMQ 服务器、使用交换机和队列、发送和消费消息等操作。在下一章中,我们将进一步学习 RabbitMQ 的高级特性,包括消息何去何从、过期时间、死信队列、延迟队列等功能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/51402.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JMeter常用内置对象:vars、ctx、prev

在前文 Beanshell Sampler 与 Beanshell 断言 中&#xff0c;初步阐述了JMeter beanshell的使用&#xff0c;接下来归集整理了JMeter beanshell 中常用的内置对象及其使用。 注&#xff1a;示例使用JMeter版本为5.1 1. vars 如 API 文档 所言&#xff0c;这是定义变量的类&a…

【点云处理教程】04 Python 中的点云过滤

一、说明 这是我的“点云处理”教程的第 4 篇文章。“点云处理”教程对初学者友好&#xff0c;我们将在其中简单地介绍从数据准备到数据分割和分类的点云处理管道。 在本教程中&#xff0c;我们将学习如何使用 Open3D 在 python 中过滤点云以进行下采样和异常值去除。使用 Open…

Python将COCO格式实例分割数据集转换为YOLO格式实例分割数据集

Python将COCO格式实例分割数据集转换为YOLO格式实例分割数据集 前言相关介绍COCO格式实例分割数据集转换为YOLO格式实例分割数据集coco格式对应的json文件&#xff0c;以test.json为例格式转换代码&#xff0c;内容如下 前言 由于本人水平有限&#xff0c;难免出现错漏&#xf…

【JAVASE】什么是方法

⭐ 作者&#xff1a;小胡_不糊涂 &#x1f331; 作者主页&#xff1a;小胡_不糊涂的个人主页 &#x1f4c0; 收录专栏&#xff1a;浅谈Java &#x1f496; 持续更文&#xff0c;关注博主少走弯路&#xff0c;谢谢大家支持 &#x1f496; 方法 1. 方法概念及使用1.1 什么是方法1…

Vue『卡片拖拽式课程表』

Vue『卡片拖拽式课程表』 概述 在本篇技术博客中&#xff0c;我们将介绍一个使用Vue实现的『卡片拖拽式课程表』。这个课程表允许用户通过拖拽课程卡片来安排不同的课程在时间表上的位置。我们将逐步讲解代码实现&#xff0c;包括课程表的布局、拖拽功能的实现&#xff0c;以…

6G内存运行Llama2-Chinese-7B-chat模型

6G内存运行Llama2-Chinese-7B-chat模型 Llama2-Chinese中文社区 第一步&#xff1a; 从huggingface下载 Llama2-Chinese-7b-Chat-GGML模型放到本地的某一目录。 第二步&#xff1a; 执行python程序 git clone https://github.com/Rayrtfr/llama2-webui.gitcd llama2-web…

QtC++ 技术分析3 - IOStream

目录 iostreamscanf/printfiostream 整体架构流相关类流缓冲区 模板特化后整体结构文件流文件流对象创建常见文件流操作输出格式设定文件流状态 字符串流字符串流内部缓冲区字符串流使用 流缓冲区用户自定义 IO iostream scanf/printf 几种常见的输入输出流函数 scanf 从键盘…

操作系统4

文件管理 文件的逻辑结构 文件的目录 文件的物理结构 文件存储空间管理 文件的基本操作

【深度学习】以图搜索- 2021sota repVgg来抽取向量 + facebook的faiss的做特征检索, 从环境搭建到运行案例从0到1

文章目录 前言安装小试牛刀用repVgg抽取向量构建Faiss索引进行相似性搜索项目延伸总结 前言 Faiss的全称是Facebook AI Similarity Search。 这是一个开源库&#xff0c;针对高维空间中的海量数据&#xff0c;提供了高效且可靠的检索方法。 暴力检索耗时巨大&#xff0c;对于…

Mac下certificate verify failed: unable to get local issuer certificate

出现这个问题&#xff0c;可以安装证书 在finder中查找 Install Certificates.command找到后双击&#xff0c;或者使用其他终端打开 安装完即可

tcp三次握手python实现和结果

下载抓包工具 安装 使用1 使用2 结果 红色笔为想要发送的数据。 代码 from scapy.all import * import logginglogging.getLogger(scapy.runtime).setLevel(logging.ERROR)target_ip = 172.20.211.4 target_port = 80 data = GET / HTTP/1.0 \r\n\r\ndef start_tcp(target_…

Mac代码编辑器sublime text 4中文注册版下载

Sublime Text 4 for Mac简单实用功能强大&#xff0c;是程序员敲代码必备的代码编辑器&#xff0c;sublime text 4中文注册版支持多种编程语言&#xff0c;包括C、Java、Python、Ruby等&#xff0c;可以帮助程序员快速编写代码。Sublime Text的界面简洁、美观&#xff0c;支持多…

上传图片到腾讯云对象存储桶cos 【腾讯云对象存储桶】【cos】【el-upload】【vue3】【上传头像】【删除】

1、首先登录腾讯云官网控制台 进入对象存储页面 2、找到跨越访问CIRS设置 配置规则 点击添加规则 填写信息 3、书写代码 这里用VUE3书写 第一种用按钮出发事件形式 <template><div><input type"file" change"handleFileChange" /><…

【设计模式】详解观察者模式

文章目录 1、简介2、观察者模式简单实现抽象主题&#xff08;Subject&#xff09;具体主题&#xff08;ConcreteSubject&#xff09;抽象观察者&#xff08;Observer&#xff09;具体观察者&#xff08;ConcrereObserver&#xff09;测试&#xff1a; 观察者设计模式优缺点观察…

DataEase开源BI工具安装_数据全量_增量同步_大屏拖拽自动生成_多数据源支持_数据血缘分析---大数据工作笔记0183

我这里用的是Centos7.9安装的 可以通过uname -p来查看一下我们的电脑架构,可以看到是x86_64架构的 我们下第一个,这个是x86架构的,第二个arm架构的 然后解压到/opt/module中 然后再去重命名一下文件夹. 推荐200G 本地模式的功能比较多 推荐100G

【FPGA IP系列】FIFO的通俗理解

FPGA厂商提供了丰富的IP核&#xff0c;基础性IP核都是可以直接免费调用的&#xff0c;比如FIFO、RAM等等。 本文主要介绍FIFO的一些基础知识&#xff0c;帮助大家能够理解FIFO的基础概念。 一、FIFO介绍 FIFO全称是First In First Out&#xff0c;即先进先出。 FIFO是一个数…

C语言第十一课--------操作符的使用与分类-------基本操作

作者前言 作者介绍&#xff1a; 作者id&#xff1a;老秦包你会&#xff0c; 简单介绍&#xff1a; 喜欢学习C语言和python等编程语言&#xff0c;是一位爱分享的博主&#xff0c;有兴趣的小可爱可以来互讨 个人主页::小小页面 gitee页面:秦大大 一个爱分享的小博主 欢迎小可爱们…

7.27 作业 QT

要求&#xff1a; 结果图&#xff1a; clock.pro: QT core gui QT texttospeechgreaterThan(QT_MAJOR_VERSION, 4): QT widgetsCONFIG c11# The following define makes your compiler emit warnings if you use # any Qt feature that has been marked deprecated …

算法与数据结构(四)--排序算法

一.冒泡排序 原理图&#xff1a; 实现代码&#xff1a; /* 冒泡排序或者是沉底排序 *//* int arr[]: 排序目标数组,这里元素类型以整型为例; int len: 元素个数 */ void bubbleSort (elemType arr[], int len) {//为什么外循环小于len-1次&#xff1f;//考虑临界情况&#xf…

Power BI-云端报表定时刷新--ODBC、MySQL、Oracle等其他本地数据源的刷新(二)

ODBC数据源 一些小众的数据源无法直接连接&#xff0c;需要通过微软系统自带的应用“ODBC数据源”连接。 1.首次使用应安装对应数据库的ODBC驱动程序&#xff0c;Mysql的ODBC驱动需要手动安装 2.在web服务中进行数据源的配置 Mysql数据源 1.Powerbi与Gateway第一次连SQL…