rabbitmq五种模式的总结——附java-se实现(详细)

rabbitmq五种模式的总结

完整项目地址:https://github.com/9lucifer/rabbitmq4j-learning

在这里插入图片描述

一、简单模式

(一)简单模式概述

RabbitMQ 的简单模式是最基础的消息队列模式,包含以下两个角色:

  1. 生产者:负责发送消息到队列。
  2. 消费者:负责从队列中接收并处理消息。

在简单模式中,消息的传递是单向的,生产者将消息发送到队列,消费者从队列中接收消息。

image-20250216063036914


(二)生产者代码解析

代码

生产者负责创建消息并将其发送到指定的队列中。

package top.miqiu._01_hello;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
        connectionFactory.setHost("ip(要换成真实的ip哦)");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        // 3. 创建连接对象
        Connection connection = connectionFactory.newConnection();
        // 4. 创建 Channel
        Channel channel = connection.createChannel();

        // 5. 声明队列
        /**
         * 参数说明:
         * 1. 队列名称:01-hello2
         * 2. 是否持久化:true(重启后队列仍然存在)
         * 3. 是否独占队列:false(允许多个消费者连接)
         * 4. 是否自动删除:false(队列不会自动删除)
         * 5. 额外参数:null
         */
        channel.queueDeclare("01-hello2", true, false, false, null);

        // 6. 发送消息
        /**
         * 参数说明:
         * 1. 交换机名称:空字符串(使用默认交换机)
         * 2. 路由键:队列名称(01-hello2)
         * 3. 额外属性:null
         * 4. 消息内容:字节数组
         */
        channel.basicPublish("", "01-hello2", null, "hello rabbitmq2".getBytes());
        System.out.println("消息发送成功");

        // 7. 关闭资源
        channel.close();
        connection.close();
    }
}
结果

image-20250216063335314


(三)消费者代码解析

代码

消费者负责从队列中接收并处理消息。

package top.miqiu._01_hello_c;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
        connectionFactory.setHost("ip(要换成真实的ip哦");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        // 3. 创建连接对象
        Connection connection = connectionFactory.newConnection();
        // 4. 创建 Channel
        Channel channel = connection.createChannel();

        // 5. 声明队列(需与生产者保持一致)
        channel.queueDeclare("01-hello2", false, false, false, null);

        // 6. 接收消息
        /**
         * 参数说明:
         * 1. 队列名称:01-hello2
         * 2. 是否自动确认:true(消息被消费后自动确认)
         * 3. 消息处理回调:DeliverCallback
         * 4. 消息取消回调:CancelCallback
         */
        channel.basicConsume("01-hello2", true, new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery delivery) throws IOException {
                System.out.println("接收到消息:" + new String(delivery.getBody()));
            }
        }, new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
                System.out.println("消息被取消");
            }
        });
    }
}
结果

image-20250216063418597

在mq中查看

image-20250216063443454


(四)总结

  1. 简单模式:适用于一对一的简单消息传递场景。
  2. 生产者:负责创建队列并发送消息。
  3. 消费者:负责从队列中接收并处理消息。
  4. 注意事项
    • 队列名称需保持一致,不然一定会报错!
    • 消息确认机制需根据业务需求选择自动或手动确认。
    • 使用完资源后需显式关闭 ChannelConnection

二、工作模式

(一)工作模式概述

工作模式是 RabbitMQ 的一种常见模式,用于将任务分发给多个消费者。它的特点是:

  1. 一个生产者:负责发送消息到队列。
  2. 多个消费者:共同消费同一个队列中的消息。
  3. 消息分发机制:默认情况下,RabbitMQ 会以轮询(Round-Robin)的方式将消息分发给消费者。

工作模式适用于任务分发场景,例如将耗时的任务分发给多个 Worker 处理。

image-20250216065036476


(二)生产者代码解析

生产者负责创建消息并将其发送到指定的队列中。

package top.miqiu._02_work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
        connectionFactory.setHost("你的ip!别忘了改");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        // 3. 创建连接对象
        Connection connection = connectionFactory.newConnection();
        // 4. 创建 Channel
        Channel channel = connection.createChannel();

        // 5. 声明队列
        /**
         * 参数说明:
         * 1. 队列名称:02-work1
         * 2. 是否持久化:true(重启后队列仍然存在)
         * 3. 是否独占队列:false(允许多个消费者连接)
         * 4. 是否自动删除:false(队列不会自动删除)
         * 5. 额外参数:null
         */
        channel.queueDeclare("02-work1", true, false, false, null);

        // 6. 发送消息
        for (int i = 0; i < 20; i++) {
            String message = "hello work:" + i;
            channel.basicPublish("", "02-work1", null, message.getBytes());
        }
        System.out.println("消息发送成功");

        // 7. 关闭资源
        channel.close();
        connection.close();
    }
}
关键点:
  1. 队列声明(queueDeclare):创建队列并设置队列属性。
  2. 消息发送(basicPublish):通过循环发送多条消息到队列。
  3. 持久化队列:设置为 true,确保队列在 RabbitMQ 重启后仍然存在。

(三)消费者代码解析

代码

消费者负责从队列中接收并处理消息。

package top.miqiu._02_work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
        connectionFactory.setHost("你的ip!别忘了改");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        // 3. 创建连接对象
        Connection connection = connectionFactory.newConnection();
        // 4. 创建 Channel
        Channel channel = connection.createChannel();

        // 5. 声明队列(需与生产者保持一致)
        channel.queueDeclare("02-work1", true, false, false, null);

        // 6. 设置每次只接收一条消息
        channel.basicQos(1);

        // 7. 接收消息
        /**
         * 参数说明:
         * 1. 队列名称:02-work1
         * 2. 是否自动确认:false(手动确认消息)
         * 3. 消息处理回调:DeliverCallback
         * 4. 消息取消回调:CancelCallback
         */
        channel.basicConsume("02-work1", false, new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery delivery) throws IOException {
                try {
                    // 模拟消息处理耗时
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者1 接收到消息:" + new String(delivery.getBody()));
                // 手动确认消息
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }, new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
                System.out.println("消息被取消");
            }
        });
    }
}
关键点:
  1. 队列声明(queueDeclare):确保队列存在,需与生产者保持一致。
  2. 消息预取(basicQos):设置每次只接收一条消息,避免某个消费者处理过多消息。
  3. 手动确认(basicAck):消息处理完成后手动确认,确保消息不会丢失。
  4. 消息处理耗时:通过 Thread.sleep(1000) 模拟消息处理耗时。
效果

image-20250216065155794

image-20250216065214992


(四)工作模式的特点

  1. 消息分发机制
    • 默认情况下,RabbitMQ 会以轮询的方式将消息分发给多个消费者。
    • 可以通过 basicQos 设置每次只接收一条消息,避免某个消费者处理过多消息。
  2. 消息确认机制
    • 设置为手动确认(autoAck=false),确保消息处理完成后才确认。(防止业务处理失败的情况下丢失消息)
    • 如果消费者在处理消息时崩溃,未确认的消息会重新分发给其他消费者。
  3. 适用场景
    • 任务分发场景,例如将耗时的任务分发给多个 Worker 处理。

(五)总结

  1. 工作模式:适用于任务分发场景,多个消费者共同消费同一个队列中的消息。
  2. 生产者:负责发送消息到队列。
  3. 消费者:负责接收并处理消息,支持手动确认和消息预取。
  4. 注意事项
    • 队列名称需保持一致。
    • 消息确认机制需根据业务需求选择自动或手动确认。
    • 使用 basicQos 控制消息分发,避免某个消费者处理过多消息。

三、发布订阅模式

(一)发布订阅模式概述

发布订阅模式(Publish/Subscribe Mode)是 RabbitMQ 的一种模式,用于将消息广播给多个消费者。它的特点是:

  1. 一个生产者:将消息发送到交换机(Exchange)。
  2. 多个消费者:每个消费者都有自己的队列,并与交换机绑定。
  3. 消息广播:交换机将消息广播给所有绑定的队列。

发布订阅模式适用于消息广播场景,例如日志系统、通知系统等。

image-20250216071658856


(二)生产者代码解析

生产者负责创建消息并将其发送到指定的交换机中。

package top.miqiu._03_pubsub;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
        connectionFactory.setHost("用自己的ip!!");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        // 3. 创建连接对象
        Connection connection = connectionFactory.newConnection();
        // 4. 创建 Channel
        Channel channel = connection.createChannel();

        // 5. 声明交换机
        /**
         * 参数说明:
         * 1. 交换机名称:03-pubsub
         * 2. 交换机类型:fanout(广播模式)
         */
        channel.exchangeDeclare("03-pubsub", "fanout");

        // 6. 发送消息
        for (int i = 0; i < 20; i++) {
            String message = "hello work:" + i;
            /**
             * 参数说明:
             * 1. 交换机名称:03-pubsub
             * 2. 路由键:空字符串(fanout 模式忽略路由键)
             * 3. 消息属性:MessageProperties.TEXT_PLAIN
             * 4. 消息内容:字节数组
             */
            channel.basicPublish("03-pubsub", "", MessageProperties.TEXT_PLAIN, message.getBytes());
        }
        System.out.println("消息发送成功");

        // 7. 关闭资源
        channel.close();
        connection.close();
    }
}
关键点:
  1. 交换机声明(exchangeDeclare):创建交换机并设置类型为 fanout(广播模式)。
  2. 消息发送(basicPublish):将消息发送到交换机,路由键为空字符串(fanout 模式忽略路由键)。
  3. 消息广播:消息会被广播到所有绑定到该交换机的队列。

(三)消费者代码解析

代码

消费者负责从队列中接收并处理消息。

package top.miqiu._03_pubsub;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
        connectionFactory.setHost("用自己的ip!!");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        // 3. 创建连接对象
        Connection connection = connectionFactory.newConnection();
        // 4. 创建 Channel
        Channel channel = connection.createChannel();

        // 5. 声明交换机
        channel.exchangeDeclare("03-pubsub", "fanout");

        // 6. 创建临时队列
        String queue = channel.queueDeclare().getQueue();

        // 7. 绑定队列到交换机
        /**
         * 参数说明:
         * 1. 队列名称:queue
         * 2. 交换机名称:03-pubsub
         * 3. 路由键:空字符串(fanout 模式忽略路由键)
         */
        channel.queueBind(queue, "03-pubsub", "");

        // 8. 接收消息
        /**
         * 参数说明:
         * 1. 队列名称:queue
         * 2. 是否自动确认:true(自动确认消息)
         * 3. 消息处理回调:DeliverCallback
         * 4. 消息取消回调:CancelCallback
         */
        channel.basicConsume(queue, true, new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery delivery) throws IOException {
                try {
                    // 模拟消息处理耗时
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者2 接收到消息:" + new String(delivery.getBody()));
            }
        }, new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
                System.out.println("消息被取消");
            }
        });
    }
}
关键点:
  1. 交换机声明(exchangeDeclare):确保交换机存在,需与生产者保持一致。
  2. 临时队列(queueDeclare):创建一个临时队列,队列名称由 RabbitMQ 自动生成。
  3. 队列绑定(queueBind):将队列绑定到交换机,路由键为空字符串(fanout 模式忽略路由键)。
  4. 消息接收(basicConsume):从队列中接收消息并处理。
结果

image-20250216071734904

image-20250216071749761

可以看到两个consumer都消费了相同的消息


(四)发布订阅模式的特点

  1. 消息广播:交换机将消息广播给所有绑定的队列。
  2. 临时队列:消费者可以创建临时队列,队列名称由 RabbitMQ 自动生成。
  3. 适用场景
    • 日志系统:将日志消息广播给多个消费者。
    • 通知系统:将通知消息广播给多个用户。

(五)总结

  1. 发布订阅模式:适用于消息广播场景,多个消费者各自接收相同的消息。
  2. 生产者:负责将消息发送到交换机。
  3. 消费者:负责创建队列并绑定到交换机,接收并处理消息。
  4. 注意事项
    • 交换机类型需设置为 fanout
    • 队列绑定到交换机时,路由键为空字符串。
    • 临时队列的名称由 RabbitMQ 自动生成。

(六)RabbitMQ 交换机类型总结

交换机类型描述路由行为适用场景
Fanout广播模式,将消息发送到所有绑定到该交换机的队列。忽略路由键(Routing Key),消息会被广播到所有绑定的队列。日志系统、通知系统等需要广播消息的场景。
Direct直接模式,根据路由键将消息发送到匹配的队列。消息的路由键必须与队列绑定的路由键完全匹配。任务分发、点对点通信等需要精确路由的场景。
Topic主题模式,根据路由键的模式匹配将消息发送到符合条件的队列。支持通配符匹配:* 匹配一个单词,# 匹配零个或多个单词。消息分类、多条件路由等需要灵活匹配的场景。
Headers头部模式,根据消息的头部属性(Headers)进行匹配。不依赖路由键,而是通过消息的头部属性匹配队列绑定的条件。复杂的路由逻辑,例如根据消息的元数据进行路由。

详细说明

1. Fanout 交换机(广播,常用)
  • 特点
    • 消息会被广播到所有绑定到该交换机的队列。
    • 忽略路由键(Routing Key)。
  • 适用场景
    • 日志系统:将日志消息广播给多个消费者。
    • 通知系统:将通知消息广播给多个用户。
2. Direct 交换机
  • 特点
    • 消息的路由键必须与队列绑定的路由键完全匹配。
    • 支持一对一或一对多的精确路由。
  • 适用场景
    • 任务分发:将特定任务路由到特定的 Worker。
    • 点对点通信:将消息发送到特定的接收者。
3. Topic 交换机
  • 特点
    • 支持通配符匹配:
      • * 匹配一个单词。
      • # 匹配零个或多个单词。
    • 路由键的格式通常是点分字符串(如 user.create)。
  • 适用场景
    • 消息分类:根据消息的主题进行路由。
    • 多条件路由:支持灵活的路由规则。
4. Headers 交换机
  • 特点
    • 不依赖路由键,而是通过消息的头部属性(Headers)进行匹配。
    • 支持复杂的匹配规则(如 x-match 参数)。
  • 适用场景
    • 复杂的路由逻辑:根据消息的元数据进行路由。
    • 需要高度灵活性的场景。

对比
场景FanoutDirectTopicHeaders
日志广播所有消费者接收相同的日志消息。不适用。不适用。不适用。
任务分发不适用。将任务路由到特定的 Worker。将任务分类路由到不同的 Worker。根据任务的元数据进行路由。
通知系统所有用户接收相同的通知。特定用户接收特定通知。根据通知类型路由到不同用户。根据通知的元数据进行路由。
消息分类不适用。不适用。根据消息主题进行路由。根据消息的头部属性进行路由。

总结
  • Fanout:适用于广播场景。
  • Direct:适用于精确路由场景。
  • Topic:适用于灵活的路由场景。
  • Headers:适用于复杂的路由逻辑。

四、路由模式

(一)路由模式概述

路由模式是 RabbitMQ 的一种模式,使用 Direct 交换机 根据消息的 路由键(Routing Key) 将消息发送到匹配的队列。它的特点是:

  1. 一个生产者:将消息发送到 Direct 交换机,并指定路由键。
  2. 多个消费者:每个消费者可以绑定一个或多个路由键,只有匹配的路由键的消息才会被接收。
  3. 精确路由:消息的路由键必须与队列绑定的路由键完全匹配。

路由模式适用于需要根据特定条件精确路由消息的场景,例如日志级别分类、任务分发等。

image-20250216073521308


(二)生产者代码解析

生产者负责创建消息并将其发送到 Direct 交换机,同时指定路由键。

package top.miqiu._04_routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
        connectionFactory.setHost("你的ip!!!");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        // 3. 创建连接对象
        Connection connection = connectionFactory.newConnection();
        // 4. 创建 Channel
        Channel channel = connection.createChannel();

        // 5. 声明 Direct 交换机
        /**
         * 参数说明:
         * 1. 交换机名称:04-routing
         * 2. 交换机类型:direct
         */
        channel.exchangeDeclare("04-routing", "direct");

        // 6. 发送消息
        for (int i = 0; i < 20; i++) {
            String message = "hello work:" + i;
            /**
             * 参数说明:
             * 1. 交换机名称:04-routing
             * 2. 路由键:err(消息将发送到绑定 err 路由键的队列)
             * 3. 消息属性:MessageProperties.TEXT_PLAIN
             * 4. 消息内容:字节数组
             */
            channel.basicPublish("04-routing", "err", MessageProperties.TEXT_PLAIN, message.getBytes());
        }
        System.out.println("消息发送成功");

        // 7. 关闭资源
        channel.close();
        connection.close();
    }
}
关键点:
  1. 交换机声明(exchangeDeclare):创建 Direct 交换机,类型为 direct
  2. 消息发送(basicPublish):指定路由键(如 err),消息会被发送到绑定该路由键的队列。
  3. 路由键匹配:只有队列绑定的路由键与消息的路由键完全匹配时,消息才会被路由到该队列。

(三)消费者代码解析

代码

消费者负责创建队列并绑定到 Direct 交换机,同时指定路由键。

package top.miqiu._04_routing;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
        connectionFactory.setHost("你的ip!!!");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        // 3. 创建连接对象
        Connection connection = connectionFactory.newConnection();
        // 4. 创建 Channel
        Channel channel = connection.createChannel();

        // 5. 声明 Direct 交换机
        channel.exchangeDeclare("04-routing", "direct");

        // 6. 创建临时队列
        String queue = channel.queueDeclare().getQueue();

        // 7. 绑定队列到交换机,并指定路由键
        /**
         * 参数说明:
         * 1. 队列名称:queue
         * 2. 交换机名称:04-routing
         * 3. 路由键:info、err、waring
         */
        channel.queueBind(queue, "04-routing", "info");
        channel.queueBind(queue, "04-routing", "err");
        channel.queueBind(queue, "04-routing", "waring");

        // 8. 接收消息
        /**
         * 参数说明:
         * 1. 队列名称:queue
         * 2. 是否自动确认:true(自动确认消息)
         * 3. 消息处理回调:DeliverCallback
         * 4. 消息取消回调:CancelCallback
         */
        channel.basicConsume(queue, true, new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery delivery) throws IOException {
                try {
                    // 模拟消息处理耗时
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者1 接收到消息:" + new String(delivery.getBody()));
            }
        }, new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
                System.out.println("消息被取消");
            }
        });
    }
}
关键点:
  1. 交换机声明(exchangeDeclare):确保 Direct 交换机存在,需与生产者保持一致。
  2. 临时队列(queueDeclare):创建一个临时队列,队列名称由 RabbitMQ 自动生成。
  3. 队列绑定(queueBind):将队列绑定到交换机,并指定路由键(如 infoerrwaring)。
  4. 消息接收(basicConsume):从队列中接收消息并处理。
效果

consumer1绑定了[info,err,waring],所以在producer绑定了info时发送消息的情况下,consumer1可以接收到信息

image-20250216073403669

由于consumer2绑定的是trace,所以consumer2是接收不到消息的

image-20250216073447112


(四)路由模式的特点

  1. 精确路由:消息的路由键必须与队列绑定的路由键完全匹配。
  2. 多路由键支持:一个队列可以绑定多个路由键,接收多种类型的消息。
  3. 适用场景
    • 日志级别分类:将不同级别的日志(如 infoerr)路由到不同的队列。
    • 任务分发:将特定任务路由到特定的 Worker。

(五)总结

  1. 路由模式:适用于需要根据路由键精确路由消息的场景。
  2. 生产者:负责将消息发送到 Direct 交换机,并指定路由键。
  3. 消费者:负责创建队列并绑定到 Direct 交换机,同时指定路由键。
  4. 注意事项
    • 路由键必须完全匹配。
    • 一个队列可以绑定多个路由键,接收多种类型的消息。

五、Topic 模式

(一)Topic 模式概述

Topic 模式是 RabbitMQ 的一种模式,使用 Topic 交换机 根据消息的 路由键(Routing Key) 进行模式匹配,将消息发送到符合条件的队列。它的特点是:

  1. 一个生产者:将消息发送到 Topic 交换机,并指定路由键。
  2. 多个消费者:每个消费者可以绑定一个或多个路由键模式,只有匹配的路由键的消息才会被接收。
  3. 灵活的路由:支持通配符匹配:
    • * 匹配一个单词。
    • # 匹配零个或多个单词。

Topic 模式适用于需要根据复杂条件灵活路由消息的场景,例如消息分类、多条件路由等。

image-20250216075115591


(二)生产者代码解析

代码

生产者负责创建消息并将其发送到 Topic 交换机,同时指定路由键。

package top.miqiu._05_topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
        connectionFactory.setHost("用自己的ip!!");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        // 3. 创建连接对象
        Connection connection = connectionFactory.newConnection();
        // 4. 创建 Channel
        Channel channel = connection.createChannel();

        // 5. 声明 Topic 交换机
        /**
         * 参数说明:
         * 1. 交换机名称:05-topic
         * 2. 交换机类型:topic
         */
        channel.exchangeDeclare("05-topic", "topic");

        // 6. 发送消息
        for (int i = 0; i < 20; i++) {
            String message = "hello work:" + i;
            /**
             * 参数说明:
             * 1. 交换机名称:05-topic
             * 2. 路由键:user.hi(消息将发送到匹配 user.* 或 user.# 的队列)
             * 3. 消息属性:MessageProperties.TEXT_PLAIN
             * 4. 消息内容:字节数组
             */
            channel.basicPublish("05-topic", "user.hi", MessageProperties.TEXT_PLAIN, message.getBytes());
        }
        System.out.println("消息发送成功");

        // 7. 关闭资源
        channel.close();
        connection.close();
    }
}
关键点:
  1. 交换机声明(exchangeDeclare):创建 Topic 交换机,类型为 topic
  2. 消息发送(basicPublish):指定路由键(如 user.hi),消息会被发送到匹配的队列。
  3. 通配符匹配
    • * 匹配一个单词(如 user.* 匹配 user.hi,但不匹配 user.hi.there)。
    • # 匹配零个或多个单词(如 user.# 匹配 user.hiuser.hi.there)。

(三)消费者代码解析

代码

消费者负责创建队列并绑定到 Topic 交换机,同时指定路由键模式。

package top.miqiu._05_topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
        connectionFactory.setHost("用自己的ip!!");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        // 3. 创建连接对象
        Connection connection = connectionFactory.newConnection();
        // 4. 创建 Channel
        Channel channel = connection.createChannel();

        // 5. 声明 Topic 交换机
        channel.exchangeDeclare("05-topic", "topic");

        // 6. 创建临时队列
        String queue = channel.queueDeclare().getQueue();

        // 7. 绑定队列到交换机,并指定路由键模式
        /**
         * 参数说明:
         * 1. 队列名称:queue
         * 2. 交换机名称:05-topic
         * 3. 路由键模式:user.*(匹配 user.hi、user.hello 等)
         */
        channel.queueBind(queue, "05-topic", "user.*");

        // 8. 接收消息
        /**
         * 参数说明:
         * 1. 队列名称:queue
         * 2. 是否自动确认:true(自动确认消息)
         * 3. 消息处理回调:DeliverCallback
         * 4. 消息取消回调:CancelCallback
         */
        channel.basicConsume(queue, true, new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery delivery) throws IOException {
                try {
                    // 模拟消息处理耗时
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者2 user.* 接收到消息:" + new String(delivery.getBody()));
            }
        }, new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
                System.out.println("消息被取消");
            }
        });
    }
}
关键点:
  1. 交换机声明(exchangeDeclare):确保 Topic 交换机存在,需与生产者保持一致。
  2. 临时队列(queueDeclare):创建一个临时队列,队列名称由 RabbitMQ 自动生成。
  3. 队列绑定(queueBind):将队列绑定到交换机,并指定路由键模式(如 user.*)。
  4. 消息接收(basicConsume):从队列中接收消息并处理。
效果

当我在producer使用“employee.hi”作为路由key的时候,绑定了“employee.*”的consumer1可以消费这个消息

image-20250216075306371


(四)Topic 模式的特点

  1. 灵活的路由:支持通配符匹配,可以根据复杂的条件路由消息。
  2. 多路由键支持:一个队列可以绑定多个路由键模式,接收多种类型的消息。
  3. 适用场景
    • 消息分类:根据消息的主题进行路由。
    • 多条件路由:支持灵活的路由规则。

(五)总结

  1. Topic 模式:适用于需要根据复杂条件灵活路由消息的场景。
  2. 生产者:负责将消息发送到 Topic 交换机,并指定路由键。
  3. 消费者:负责创建队列并绑定到 Topic 交换机,同时指定路由键模式。
  4. 注意事项
    • 路由键模式支持通配符 *#
    • 一个队列可以绑定多个路由键模式,接收多种类型的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/971217.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据结构 day02

3. 线性表 3.1. 顺序表 3.1.3. 顺序表编程实现 操作&#xff1a;增删改查 .h 文件 #ifndef __SEQLIST_H__ #define __SEQLIST_H__ #define N 10 typedef struct seqlist {int data[N];int last; //代表数组中最后一个有效元素的下标 } seqlist_t;//1.创建一个空的顺序表 seq…

STM32的HAL库开发---ADC

一、ADC简介 1、ADC&#xff0c;全称&#xff1a;Analog-to-Digital Converter&#xff0c;指模拟/数字转换器 把一些传感器的物理量转换成电压&#xff0c;使用ADC采集电压&#xff0c;然后转换成数字量&#xff0c;经过单片机处理&#xff0c;进行控制和显示。 2、常见的AD…

25/2/16 <算法笔记> DirectPose

DirectPose 是一种直接从图像中预测物体的 6DoF&#xff08;位姿&#xff1a;6 Degrees of Freedom&#xff09;姿态 的方法&#xff0c;包括平移和平面旋转。它在目标检测、机器人视觉、增强现实&#xff08;AR&#xff09;和自动驾驶等领域中具有广泛应用。相比于传统的位姿估…

企业级API集成方案:基于阿里云函数计算调用DeepSeek全解析

解决方案链接&#xff1a;https://www.aliyun.com/solution/tech-solution/deepseek-r1-for-platforms?utm_contentg_1000401616 何为DeepSeek R1 DeepSeek R1模型有诸多技术优势。高效架构设计使其能更高效提取特征&#xff0c;减少冗余计算&#xff0c;提升数据处理速度、…

137,【4】 buuctf web [SCTF2019]Flag Shop

进入靶场 都点击看看 发现点击work会增加&#xffe5; 但肯定不能一直点下去 抓包看看 这看起来是一个 JWT&#xff08;JSON Web Token&#xff09;字符串。JWT 通常由三部分组成&#xff0c;通过点&#xff08;.&#xff09;分隔&#xff0c;分别是头部&#xff08;Header&…

ThinkPHP8视图赋值与渲染

【图书介绍】《ThinkPHP 8高效构建Web应用》-CSDN博客 《2025新书 ThinkPHP 8高效构建Web应用 编程与应用开发丛书 夏磊 清华大学出版社教材书籍 9787302678236 ThinkPHP 8高效构建Web应用》【摘要 书评 试读】- 京东图书 在控制器操作中&#xff0c;使用view函数可以传入视图…

渗透利器:YAKIT 工具-基础实战教程.

YAKIT 工具-基础实战教程. YAKIT&#xff08;Yak Integrated Toolkit&#xff09;是一款基于Yak语言开发的集成化网络安全单兵工具&#xff0c;旨在覆盖渗透测试全流程&#xff0c;提供从信息收集、漏洞扫描到攻击实施的自动化支持。其核心目标是通过GUI界面降低Yak语言的使用…

Fiori APP配置中的Semantic object 小bug

在配置自开发程序的Fiori Tile时&#xff0c;需要填入Semantic Object。正常来说&#xff0c;是需要通过事务代码/N/UI2/SEMOBJ来提前新建的。 但是在S4 2022中&#xff0c;似乎存在一个bug&#xff0c;即无需新建也能输入自定义的Semantic Object。 如下&#xff0c;当我们任…

shell——分支语句

文章目录 基本语法常用判断条件(1)两个整数之间比较&#xff08;2&#xff09;按照文件权限进行判断&#xff08;3&#xff09;按照文件类型进行判断&#xff08;4&#xff09;多条件判断&#xff08;&& 表示前一条命令执行成功时&#xff0c;才执行后一条命令&#xf…

Ubuntu 连接 air pods

&#xff11;&#xff0e; sudo vim /etc/bluetooth/main.conf , 修改蓝牙模式为blder &#xff12;&#xff0e;sudo /etc/init.d/bluetooth restart, 重启蓝牙&#xff0c;即可连接成功

机器学习:k近邻

所有代码和文档均在golitter/Decoding-ML-Top10: 使用 Python 优雅地实现机器学习十大经典算法。 (github.com)&#xff0c;欢迎查看。 K 邻近算法&#xff08;K-Nearest Neighbors&#xff0c;简称 KNN&#xff09;是一种经典的机器学习算法&#xff0c;主要用于分类和回归任务…

低空经济:开启未来空中生活的全新蓝海

引言 随着科技的进步&#xff0c;我们不再仅仅依赖地面交通和传统物流。你是否曾幻想过&#xff0c;未来的某一天&#xff0c;快递、外卖可以像魔法一样直接从空中送到你手中&#xff1f;或者&#xff0c;你能乘坐小型飞行器&#xff0c;快速穿梭于城市之间&#xff0c;告别拥堵…

DeepSeek核心算法解析:如何打造比肩ChatGPT的国产大模型

注&#xff1a;此文章内容均节选自充电了么创始人&#xff0c;CEO兼CTO陈敬雷老师的新书《自然语言处理原理与实战》&#xff08;人工智能科学与技术丛书&#xff09;【陈敬雷编著】【清华大学出版社】 文章目录 DeepSeek大模型技术系列一DeepSeek核心算法解析&#xff1a;如何…

苍穹外卖day4 redis相关简单知识 店铺营业状态设置

内存存储 键值对 key-value 一般用于处理突发性大量请求数据操作&#xff08;暂时浅显理解&#xff09; 读写速度极快&#xff0c;常用于缓存数据&#xff0c;减少对数据库的访问压力&#xff0c;提高系统性能。例如&#xff0c;可以缓存用户会话、商品信息、页面数据 设置默…

API 接口自动化

HTTP协议 - 白月黑羽 HTTP协议简介 如果客户端是浏览器&#xff0c;如何在chrome浏览器中查看 请求和响应的HTTP消息&#xff1f;按f12-》network 清除当前信息 响应的消息体在Response里看 点preview&#xff0c;可以看响应的消息体展开的格式 HTTP请求消息 请求头 reques…

Oracle序列(基础操作)

序列概念 序列是用于生成唯一、连续序号的对象。 序列可以是升序的&#xff0c;也可以是降序的。 使用CREATE SEQUENCE语句创建序列。 start with 1 指定第一个序号从1开始 increment by 1 指定序号之间的间隔为1 increment by -1 降序1000 999 998这样 maxvalue 2000 表…

【pytorch】weight_norm和spectral_norm

apply_parametrization_norm 和spectral_norm是 PyTorch 中用于对模型参数进行规范化的方法&#xff0c;但它们在实现和使用上有显著的区别。以下是它们的主要区别和对比&#xff1a; 实现方式 weight_norm&#xff1a; weight_norm 是一种参数重参数化技术&#xff0c;将权…

unity学习44:学习Animator 的一个动作捕捉网站,实测好用

目录 1 动作捕捉网站 2 注册和下载 3 比如首页的内容&#xff0c;可以直接下载为fbx模型文件 4 上传并修改 5 在 unity里使用 5.1 下载的fbx文件直接拖入到unity 5.2 动画修改 5.3 游戏里播放 1 动作捕捉网站 一个动作捕捉网站 AI神器集合网站 千面视频动捕 | AI神器…

云原生(五十五) | ECS中自建数据库迁移到RDS

文章目录 ECS中自建数据库迁移到RDS 一、场景说明 二、ECS中自建数据库迁移到RDS实现步骤 三、 创建wordpress数据库 四、登录ECS导出wordpress数据库 五、返回RDS数据库管理控制台 六、开启外网地址并设置白名单 七、获取RDS外网访问地址 八、重新设置wordpress的wp-…

【NLP 22、语言模型 language model】

有时候我也想听听&#xff0c;我在你心里&#xff0c;是什么样子 —— 25.1.12 一、什么是语言模型 语言是灵活的&#xff0c;也是有规律的 了解一门语言的人可以判断一句话是否“合理” 通俗来讲&#xff0c;语言模型用来评价一句话(句子可以看作是字的组合)是否“合理”或…