【RabbitMQ】工作模式

工作模式概述

简单模式

 简单模式中只存在一个生产者,只存在一个消费者。生产者生产消息,消费者消费消息。消息只能被消费一次,也称为点对点模式。

简单模式适合在消息只能被单个消费者处理的场景下存在。

工作队列模式(Work Queue)

工作队列模式中存在一个消费者,多个生产者。生产者生产消息,消息队列将生产的消息分发给不同的消费者,每个消费者接收到不同的消息后开始进行消费。简单来说,工作模式下,消息不会被重复消费,不同的消费者消费的是不同的消息。

工作模式适合在集群环境中做异步处理。

发布订阅模式

交换机(Exchange)

作用:生产者将消息发送到Broker中,会先经过交换机,由交换机将消息按照一定规则路由到一个或者多个消息队列中(在简单模式和工作队列模式下,由生产者直接将消息投递到队列中,这种情况在RabbitMQ中根本不会出现)。

RabbitMQ交换机有四种类型:fanout、direct、topic、headers。不同类型有着不同的路由策略。AMQP协议其实是有六种交换机类型的(除了上述四种,还有system和自定义),只不过RabbitMQ只使用了其四种而已。

1. Fanout:广播,交换机将从生产者中获取的消息交给与之绑定的全部队列(对应工作模式中的发布订阅模式)。

2. Direct:定向,交换机将从生产者中获取的消息交给与之绑定的符合RoutingKey的队列(对应工作模式中的路由模式)。具体RoutingKey是啥,后面会讲到。

3. Topic:通配符,交换机将从生产者中获取的消息交给与之绑定的符合RoutingKey的队列(对应工作模式中的通配符模式)。定向和通配符中的RoutingKey是略有不同的,具体到工作模式的路由模式和通配符模式就会明白。

4. headers:此类交换器并不依赖于RoutingKey的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。headers类型的交换器性能会很差,而且也不实用,基本看不到它的存在,了解即可。

Exchange只负责转化消息,并不负责存储消息。因此如果没有任何交换机和队列绑定,或者发送的消息没有符合路由规则的队列,消息就会丢失。

RoutingKey,路由键。生产者发送消息给Broker时,指定的一个字符串,用来告诉交换机应该如何处理这个消息。

BindingKey,绑定键。在声明交换机和队列之后,使用一个BindingKey将交换机和队列绑定起来。这样,当生产者将消息发送给Broker之后,交换机接收到消息就能根据消息中的RoutingKey和BindingKey进行对比,从而知道将消息路由到某个或者某几个队列中了。

本质上,BindingKey属于RoutingKey的一种。换句话说,两者的作用并没有什么差别。不同的是,路由键是生产者向Broker发送消息时使用的,绑定键则是交换机和队列绑定时进行绑定,然后再交换机给队列发送消息时使用。当生产者将一个绑定了RoutingKey的消息发送给交换机之后,交换机根据存在的BindingKey来将消息路由给队列。

发布订阅模式存在一个生产者,多个消费者。生产者生产消息,交换机将消息复制多份,每个队列都会接收到相同的消息,每个消费者接收到消息之后开始进行消费。简单来说,消费者发送的消息,所有与之关联的队列都会收到相同的消息。

发布订阅模式适合消费需要被多个消费者同时接收的场景,例如实时播报或者广播消息。

路由模式

 路由模式是发布订阅模式的变种,在发布订阅模式的基础上,增加了路由键。也就是说,消息到达交换机之后,不再是分发给所有关联的队列,而是根据绑定的路由规则来进行分发消息。

路由模式适合需要根据特定规则分发消息的场景。例如,系统日志打印,将不同级别的日志发送到不同的队列,最终输出到不同的文件。

通配符模式

通配符模式又是路由模式的变种,在路由模式的基础上,增加了通配符的功能,使消息分发更加灵活。

总的来说,发布订阅模式是消息到达交换机之后,交换机无条件的将所有消息转发给队列。路由模式是消息到达交换机之后,交换机根据RoutingKey的规则,将数据筛选之后分发给不同的队列。通配符模式也是消息到达交换机之后,交换机根据RoutingKey的规则,将数据筛选之后分发给不同的队列,只不过该RoutingKey不再是一个确定的路由键,而是类似于正则表达式的方式来定义路由键。

通配符模式适合需要灵活匹配和过滤消息的场景。

RPC模式

 RPC模式没有生产者和消费者,比较类似于咋们的RPC远程调用,大概就是通过两个队列实现了一个可回调的过程。

1. 客户端发送消息到一个指定队列,并在消息属性中设置replyTo字段,这个字段指定了一个回调队列,用于接收服务器的响应,并且还设置了correctionId字段,用来确定响应是否为服务器所期望的。

2. 服务器接收到请求之后,处理请求并将响应消息发送到replyTo指定的回调队列中。

3. 客户端在回调队列上等待响应消息,一旦收到响应,客户端会检查消息的correctionId属性,以确定它是所期望的响应。

发布确认模式(Publisher Confirms)

发布确认模式是RabbitMQ提供的一种确保消息可靠发送到RabbitMQ服务器的机制。在这种模式下,生产者可以等待RabbitMQ服务器确认收到消息的通知,以确保消息已经被服务器所接收并进行了处理。

1. 生产者将channel设置为confirm模式(通过调用channel.confirmSelect(),发布的每一条消息都会获得一个唯一的ID,生产者可以将这些序列号与消息关联起来,以便跟踪消息的状态)。

2. 当消息被RabbitMQ接收并处理后,服务器会异步地向生产者发送一个确认(ACK)给生产者(内容包含了唯一ID),表示消息已经送达。

通过发布确认模式,生产者可以确保消息被RabbitMQ服务器接收并处理,从而避免了消息丢失的问题。

发布确认模式适合对数据安全性要求较高的场景,比如金融交易、订单处理。

SDK工作模式代码案例

简单模式

生产者代码

// 简单模式

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        // TODO 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("43.138.108.125"); // IP
        connectionFactory.setPort(5672); // PORT
        connectionFactory.setUsername("admin"); // 用户名
        connectionFactory.setPassword("admin"); // 密码
        connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机

        // TODO 创建连接
        Connection connection = connectionFactory.newConnection();

        // TODO 获取信道
        Channel channel = connection.createChannel();

        // TODO 声明队列
        channel.queueDeclare(Constants.SIMPLE_QUEUE, true, false, false, null);

        // TODO 声明交换机,使用内置交换机,无需声明

        // TODO 发送消息
        String msg = "hello simple";
        channel.basicPublish("", Constants.SIMPLE_QUEUE, null, msg.getBytes());
        System.out.println("简单模式生产者发送消息!");

        // TODO 关闭资源
        channel.close();
        connection.close();

    }

}

上述代码运行之后,在RabbitMQ的开源界面和IDEA终端上会有如下结果:

消费者代码

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        // TODO 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("43.138.108.125"); // IP
        connectionFactory.setPort(5672); // PORT
        connectionFactory.setUsername("admin"); // 用户名
        connectionFactory.setPassword("admin"); // 密码
        connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机

        // TODO 创建连接
        Connection connection = connectionFactory.newConnection();

        // TODO 获取信道
        Channel channel = connection.createChannel();

        // TODO 声明队列
        channel.queueDeclare(Constants.SIMPLE_QUEUE, true, false, false, null);

        // TODO 声明交换机,使用内置交换机,无需声明

        // TODO 接收消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("成功接收到消息:" + new String(body));
            }
        };
        channel.basicConsume(Constants.SIMPLE_QUEUE, true, consumer);

        // TODO 关闭资源
        channel.close();
        connection.close();

    }

}

 上述代码运行之后,队列中的消息被该消费者接收,控制台输出下述内容:

工作队列模式

由于在接下来的代码中,创建连接工厂,创建连接,开启信道,释放资源都要存在。因此为了简化开发,将这些步骤封装成方法,方便后续调用。

public class Common {

    private static Connection connection;
    private static Channel channel;

    // 获取信道
    public static Channel getChannel() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("43.138.108.125"); // IP
        connectionFactory.setPort(5672); // PORT
        connectionFactory.setUsername("admin"); // 用户名
        connectionFactory.setPassword("admin"); // 密码
        connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机

        // TODO 创建连接
        connection = connectionFactory.newConnection();

        // TODO 获取信道
        channel = connection.createChannel();
        return channel;
    }

    // 释放资源
    public static void close() throws IOException, TimeoutException {
        channel.close();
        connection.close();
    }

}

生产者代码

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        // TODO 获取信道
        Channel channel = Common.getChannel();

        // TODO 声明交换机,使用内置交换机,因此无需声明

        // TODO 声明队列
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);

        // TODO 发送消息
        /**
         * 工作队列的模式是一个队列,多个消费者。
         * 当存在多个消息时,不同的消费者会接收不同的消息,消息并不会重复消费
         * 因此为了检验这个模式,发送多条消息
         */
        String msg = "hello work queue";
        for (int i = 0; i < 15; i++) {
            channel.basicPublish("", Constants.WORK_QUEUE, null, (msg + ":" + i).getBytes());
        }
        System.out.println("工作队列模式消息发送成功!");

        // TODO 释放资源
        Common.close();
    }

}

 消费者代码

public class Consumer1 {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // TODO 获取信道
        Channel channel = Common.getChannel();

        // TODO 声明交换机,使用内置交换机,因此无需声明

        // TODO 声明队列
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);

        // TODO 接收消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1接收到的消息:" + new String(body));
            }
        };
        channel.basicConsume(Constants.WORK_QUEUE, true, consumer);

        // TODO 释放资源
//        Common.close();
    }

}
public class Consumer2 {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // TODO 获取信道
        Channel channel = Common.getChannel();

        // TODO 声明交换机,使用内置交换机,因此无需声明

        // TODO 声明队列
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);

        // TODO 接收消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2接收到的消息:" + new String(body));
            }
        };
        channel.basicConsume(Constants.WORK_QUEUE, true, consumer);

        // TODO 释放资源
//        Common.close();
    }

}

 在上述代码中,不要释放资源。将生产者的代码重新启动一次之后,就会发现如下内容。从消费者消费消息的输出情况来看,很容易得到工作模式最主要的内容:消费者消费的消息都是不同的消息,消息并不会被重复消费。

发布订阅模式

生产者代码

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = Common.getChannel();

        // TODO 声明交换机
        channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);

        // TODO 声明队列
        channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
        channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);

        // TODO 绑定交换机和队列
        channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");
        channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");

        // TODO 发送消息
        String msg = "hello 发布订阅模式";
        channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes());
        System.out.println("发布订阅模式发送消息成功!");

        // TODO 释放资源
        Common.close();
    }

}

当上述代码启动之后,在开源界面中发生了如下变化。队列列表中新增了两个队列,交换机列表中新增了一条声明的交换机。

消费者代码

public class Consumer1 {

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = Common.getChannel();

        // TODO 声明交换机
        channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);

        // TODO 声明队列
        channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);

        // TODO 绑定交换机和队列
        channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");

        // TODO 接收消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1接收到消息:" + new String(body));
            }
        };
        channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);

        // TODO 释放资源
        Common.close();
    }

}
public class Consumer2 {

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = Common.getChannel();

        // TODO 声明交换机
        channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);

        // TODO 声明队列
        channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);

        // TODO 绑定交换机和队列
        channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");

        // TODO 接收消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2接收到消息:" + new String(body));
            }
        };
        channel.basicConsume(Constants.FANOUT_QUEUE2, true, consumer);

        // TODO 释放资源
        Common.close();
    }

}

路由模式

路由模式实现的代码案例按照此图的需求来做。根据此图可以看出,当生产者发送消息时的路由键为error时,两个队列都能收到消息;但是当生产者发送消息时的路由键为info或者warn时,只有队列二可以收到消息。

生产者代码

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = Common.getChannel();

        // TODO 声明交换机
        channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);

        // TODO 声明队列
        channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
        channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);

        // TODO 绑定交换机和队列
        channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "error");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "info");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "warn");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "error");

        // TODO 发送消息
        // error的消息进入两个队列
        // info和warn只会进入队列2
        String[] msg = {"info", "error", "warn"};
        for (String s : msg) {
            channel.basicPublish(Constants.DIRECT_EXCHANGE, s, null, s.getBytes());
        }
        System.out.println("路由模式发送消息成功!");

        // TODO 释放资源
        Common.close();

    }

}

 当运行上述代码之后,发现队列中的结果和预想结果一致。

消费者代码

public class Consumer1 {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Channel channel = Common.getChannel();

        // TODO 声明交换机
        channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);

        // TODO 声明队列
        channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);

        // TODO 绑定交换机和队列
        channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "error");

        // TODO 接收消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1接收到消息:" + new String(body));
            }
        };
        channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);
        Thread.sleep(20000); // 阻塞等待消息接收完成

        // TODO 释放资源
        Common.close();
    }

}
public class Consumer2 {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Channel channel = Common.getChannel();

        // TODO 声明交换机
        channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);

        // TODO 声明队列
        channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);

        // TODO 绑定交换机和队列
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants. DIRECT_EXCHANGE, "error");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants. DIRECT_EXCHANGE, "info");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants. DIRECT_EXCHANGE, "warn");

        // TODO 接收消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2接收到消息:" + new String(body));
            }
        };
        channel.basicConsume(Constants.DIRECT_QUEUE2, true, consumer);
        Thread.sleep(20000); // 阻塞等待消息接收完成

        // TODO 释放资源
        Common.close();
    }

}

 上述代码启动之后,在控制台输出消息如下:

通配符模式

根据上述图片为需求来写代码。#表示可以一次匹配多个单词,*则表示一次只能匹配一个单词。 

生产者代码

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取信道
        Channel channel = Common.getChannel();

        // 声明交换机
        channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);

        // 声明队列
        channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
        channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);

        // 绑定交换机和队列
        /**
         * #表示匹配一个或者多个词
         * *表示只能匹配一个词
         */
        channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.orange.*");
        channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.rabbit");
        channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "lazy.#");

        // 发送消息
        /**
         * 根据匹配规则,第二个将不会匹配成功
         */
        String[] msg = new String[]{"a.orange.a", "a.b.orange.b.c", "c.c.rabbit", "lazy.a.b.v.c"};
        for (String s : msg) {
            channel.basicPublish(Constants.TOPIC_EXCHANGE, s, null, s.getBytes());
        }
        System.out.println("通配符模式消息发送成功!");

        // 释放资源
        channel.close();
    }

}

 消费者代码

public class Consumer1 {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取信道
        Channel channel = Common.getChannel();

        // 声明交换机
        channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);

        // 声明队列
        channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);

        // 绑定交换机和队列
        channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.orange.*");

        // 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        channel.basicConsume(Constants.TOPIC_QUEUE1, true, consumer);

        // 释放资源
        channel.close();
    }

}
public class Consumer2 {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 获取信道
        Channel channel = Common.getChannel();

        // 声明交换机
        channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);

        // 声明队列
        channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);

        // 绑定交换机和队列
        channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.rabbit");
        channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "lazy.#");

        // 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        channel.basicConsume(Constants.TOPIC_QUEUE2, true, consumer);
        Thread.sleep(2000);

        // 释放资源
        channel.close();
    }

}

下面分别为消费者1消费的内容和消费者2消费的内容: 

 

RPC模式

客户端代码

public class Client {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取信道
        Channel channel = Common.getChannel();

        // 声明队列
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);

        // 发送消息
        String uuid = UUID.randomUUID().toString().replace("-", "");
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .replyTo(Constants.RPC_RESPONSE_QUEUE) // 响应队列
                .correlationId(uuid) // 唯一id,用来确认接收的响应
                .build();
        String msg = "hello rpc";
        channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, properties, msg.getBytes());

        // 接收响应
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("获取到的信息是:" + new String(body));
                System.out.println("发送的id和接收的id:" + uuid + '\t' + properties.getCorrelationId());
            }
        };
        channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);

        // 释放资源

    }

}

 服务器代码

public class Server {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取信道
        Channel channel = Common.getChannel();

        // 声明队列
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);

        // 接收消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("从客户端接收的消息为:" + new String(body));
                System.out.println("客户端要求响应的队列:" + properties.getReplyTo());
                // 处理客户端发送过来的消息并返回给客户端消息
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
                        .correlationId(properties.getCorrelationId())
                        .build();
                String msg = "服务端返回消息";
                channel.basicPublish("", properties.getReplyTo(), basicProperties, msg.getBytes());
            }
        };
        channel.basicConsume(Constants.RPC_REQUEST_QUEUE, true, consumer);
    }

}

当两个程序都启动时,客户端和服务器输出的结果分别是:

SpringBoot工作模式代码案例

模板

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
spring:
  rabbitmq:
    host: 43.138.108.125
    port: 5672
    username: admin
    password: admin
    virtual-host: mq-springboot-test
@Component
public class RabbitTemplateConfig {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }

}

工作队列模式

声明队列

@Configuration
public class WorkConfig {

    // 声明队列
    @Bean("workQueue")
    public Queue workQueue() {
        return QueueBuilder.durable(Constants.WORK_QUEUE).build();
    }

}

生产者代码

@RestController
@RequestMapping("/work")
public class WorkController {

    @Resource
    public RabbitTemplate rabbitTemplate;

    @RequestMapping
    public String workQueue() {
        this.rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, "hello work spring");
        return "成功";
    }

}

消费者代码

@Configuration
public class WorkListener {

    /**
     * @RabbitListener 是 Spring 框架中用于监听 RabbitMQ 队列的注解。
     * 通过这个注解,可以定义一个方法,以便从 RabbitMQ 队列中接收消息
     * 该注解支持多种参数类型,这些参数类型代表了从 RabbitMQ 接收到的消息和相关信息
     *
     * 常用的参数类型:
     * 1. String :返回消息的内容
     * 2. Message:SpringAMQP 的 Message 类,返回原始的消息体以及消息的属性,如果消息ID、内容和队列信息等等
     * 3. Channel:RabbitMQ 的通道对象,可以用于进行更高级的操作,如手动确认消息
     */
    @RabbitListener(queues = Constants.WORK_QUEUE)
    public void workListener1(String msg) {
        System.out.println("消费者1消费的代码:" + msg);
    }

    @RabbitListener(queues = Constants.WORK_QUEUE)
    public void workListener2(String msg) {
        System.out.println("消费者2消费的代码:" + msg);
    }

}

当生产者发送6条消息之后,消费者消费消息如下输出结果:

 

发布确认模式

声明队列、交换机、绑定关系

@Configuration
public class FanoutConfig {

    // 声明队列
    @Bean("fanoutQueue1")
    public Queue fanoutQueue1() {
        return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();
    }

    @Bean("fanoutQueue2")
    public Queue fanoutQueue2() {
        return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();
    }

    // 声明交换机
    @Bean("fanoutExchange")
    public Exchange fanoutExchange() {
        return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();
    }

    // 声明绑定关系
    @Bean("fanoutQueueBind1")
    public Binding fanoutQueueBind1(@Qualifier("fanoutQueue1") Queue queue,
                                                  @Qualifier("fanoutExchange") FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }

    @Bean("fanoutQueueBind2")
    public Binding fanoutQueueBind2(@Qualifier("fanoutQueue2") Queue queue,
                                                  @Qualifier("fanoutExchange") FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }

}

生产者代码

@RestController
@RequestMapping("/fanout")
public class FanoutController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RequestMapping
    public String fanoutQueue() {
        this.rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE, "", "hello fanout");
        return "成功";
    }

}

 启动程序之后,使用127.0.0.1:8080/fanout发送一条消息,得到如下结果:

消费者代码

@Configuration
public class FanoutListener {

    @RabbitListener(queues = Constants.FANOUT_QUEUE1)
    public void fanoutListener1(String msg) {
        System.out.println("消费者1获取消息为:" + msg);
    }

    @RabbitListener(queues = Constants.FANOUT_QUEUE2)
    public void fanoutListener2(String msg) {
        System.out.println("消费者2获取消息为:" + msg);
    }

}

 当生产者发送3条消息之后,消费者获取的消息内容如下:

路由模式

声明队列、交换机、绑定关系

@Configuration
public class DirectConfig {

    @Bean("directQueue1")
    public Queue directQueue1() {
        return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();
    }

    @Bean("directQueue2")
    public Queue directQueue2() {
        return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();
    }

    @Bean("directExchange")
    public Exchange directExchange() {
        return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();
    }

    @Bean("directQueueBind1")
    public Binding directQueueBind1(@Qualifier("directQueue1") Queue queue,
                                    @Qualifier("directExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("error");
    }

    @Bean("directQueueBind2")
    public Binding directQueueBind2(@Qualifier("directQueue2") Queue queue,
                                    @Qualifier("directExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("error");
    }

    @Bean("directQueueBind3")
    public Binding directQueueBind3(@Qualifier("directQueue2") Queue queue,
                                    @Qualifier("directExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("info");
    }

    @Bean("directQueueBind4")
    public Binding directQueueBind4(@Qualifier("directQueue2") Queue queue,
                                    @Qualifier("directExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("warn");
    }

}

 生产者代码

@RestController
@RequestMapping("/direct")
public class DirectController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RequestMapping
    public String directQueue() {
        String[] msg = new String[]{"error", "info", "warn"};
        for (String s : msg) {
            this.rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, s, "hello direct " + s);
        }
        return "成功";
    }

}

消费者代码

@Configuration
public class DirectListener {

    @RabbitListener(queues = Constants.DIRECT_QUEUE1)
    public void directListener1(String msg) {
        System.out.println("消费者1获取到的消息:" + msg);
    }

    @RabbitListener(queues = Constants.DIRECT_QUEUE2)
    public void directListener2(String msg) {
        System.out.println("消费者2获取到的消息:" + msg);
    }

}

通配符模式

声明队列、交换机、绑定关系

@Configuration
public class TopicConfig {

    @Bean("topicQueue1")
    public Queue topicQueue1() {
        return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();
    }

    @Bean("topicQueue2")
    public Queue topicQueue2() {
        return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();
    }

    @Bean("topicExchange")
    public Exchange topicExchange() {
        return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();
    }

    /**
     * *表示一个词
     * #表示多个词
     */

    @Bean("topicQueueBind1")
    public Binding topicQueueBind1(@Qualifier("topicQueue1") Queue queue,
                                   @Qualifier("topicExchange") TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("*.orange.*");
    }

    @Bean("topicQueueBind2")
    public Binding topicQueueBind2(@Qualifier("topicQueue2") Queue queue,
                                   @Qualifier("topicExchange") TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("*.*.rabbit");
    }

    @Bean("topicQueueBind3")
    public Binding topicQueueBind3(@Qualifier("topicQueue2") Queue queue,
                                   @Qualifier("topicExchange") TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("lazy.#");
    }

}

生产者代码

@RestController
@RequestMapping("/topic")
public class TopicController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RequestMapping
    public String topicQueue() {
        String[] msg = new String[]{"a.orange.a", "a.b.orange.b.c", "c.c.rabbit", "lazy.a.b.v.c"};
        for (String s : msg) {
            this.rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE, s, "hello topic" + s);
        }
        return "成功";
    }

}

消费者代码

@Configuration
public class TopicListener {

    @RabbitListener(queues = Constants.TOPIC_QUEUE1)
    public void topicListener1(String msg) {
        System.out.println("消费者1获取到的消息" + msg);
    }

    @RabbitListener(queues = Constants.TOPIC_QUEUE2)
    public void topicListener2(String msg) {
        System.out.println("消费者2获取到的消息" + msg);
    }

}

该篇文章中,对MQ的常用工作模式以及对应RabbitMQ的SDK示例和SpringBoot示例进行了简单表示。接下来就进入对RabbitMQ一些特性的文章上。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/879356.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Apache SeaTunnel Zeta引擎源码解析(三) Server端接收任务的执行流程

作者&#xff1a;刘乃杰 编辑整理&#xff1a;曾辉 引入 本系列文章是基于 Apache SeaTunnel 2.3.6版本&#xff0c;围绕Zeta引擎给大家介绍其任务是如何从提交到运行的全流程&#xff0c;希望通过这篇文档&#xff0c;对刚刚上手SeaTunnel的朋友提供一些帮助。 我们整体的文…

linux文件系统权限详解

注:目录的执行权限代表是否可以进入。 一、文件权限控制对文件的访问: 可以针对文件所属用户、所属组和其他用户可以设置不同的权限 权限具有优先级。user权限覆盖group权限,后者覆盖other权限。 有三种权限类别:读取、写入和执行 读权限:对文件:可读取文件…

[SAP ABAP] 修改内表数据

1.利用关键字修改数据 语法格式 MODIFY TABLE <itab> FTOM <wa> [TRANSPORTING f1 f2...].<itab>&#xff1a;代表内表 <wa>&#xff1a;代表工作区 示例1 内表修改前的数据 将上述数据行中的AGE字段值更改为25&#xff0c;SEX字段值更改为女 输出结…

5.基础漏洞——文件上传漏洞

目录 一.文件上传漏洞原理 二.文件上传漏洞条件&#xff1a; 三.上传限制手段分为两大类 (1)客户端校验 (2)服务端校验 四.具体实现 1.文件上传漏洞——绕过JS检测 2.文件上传漏洞——绕过MIME类型检测 3.文件上传漏洞——绕过黑名单检测 绕过方式:(1) 绕过方式:(2) …

城市脉络下的空间句法:整合度与选择度的深度解析

上回写过一篇&#xff0c;基于空间句法的路网整合度、选择度分析&#xff0c;当时碍于篇幅和侧重点&#xff0c;主要讲了如何安装sDNA这个插件来实现路网的整合度、选择度分析&#xff0c;并且分析部分也只是画了几条简单的线段&#xff0c;这次我们深化一下原理和指标的解析&a…

二十种编程语言庆祝中秋节

二十种编程语言庆祝中秋节 文章目录 二十种编程语言庆祝中秋节中秋快乐&#xff01;家人们 &#x1f973;一 Python二 C三 C四 Java五 C#六 Perl七 Go八 Asp九 PHP十 JavaScript十一 JavaScript HTML十二 Visual Basic十三 早期 VB十四 Visual C十五 Delphi十六 Shell十七 Cobo…

Codeforces practice C++ 2024/9/11 - 2024/9/18

D. Mathematical Problem Codeforces Round 954 (Div. 3) 原题链接&#xff1a;https://codeforces.com/contest/1986/problem/D 题目标签分类&#xff1a;brute force&#xff0c;dp&#xff0c;greedy&#xff0c;implementation&#xff0c;math&#xff0c;two pointers…

svn回退到以前历史版本修改并上传

svn回退到以前版本&#xff0c;并在以前版本上修改代码后&#xff0c;上传到svn库当中&#xff0c;如下步骤&#xff1a; 3、 以回退到版本号4为例&#xff1a;选中版本号4&#xff0c;右键->Revert to this version,在出现的对话框中 点击yes&#xff01; 4、 5、

【C++ Primer Plus习题】16.8

大家好,这里是国中之林! ❥前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。有兴趣的可以点点进去看看← 问题: 解答: main.cpp #include <iostream> #include <set> #includ…

矩阵分析 学习笔记3 多项式矩阵 jordan标准型

多项式矩阵 就是说这个矩阵里面的各个元素都是多项式&#xff0c;多项式的主角是类目大&#xff08;自变量&#xff09;。 多项式矩阵的秩 0多项式就是完全0的那种&#xff0c;就一个0&#xff0c;类目大都没有了。 多项式矩阵的秩和带一个类目大进去变成普通矩阵的秩不是一回…

深度学习|损失函数:网络参数优化基准

文章目录 引言均方误差计算示例矩阵形式代码实现 交叉熵误差计算示例代码实现 绝对误差计算示例代码实现 Hinge Loss计算示例代码实现 Kullback-Leibler Divergence计算示例代码实现 结语 引言 在上文「深度学习&#xff5c;模型训练&#xff1a;手写 SimpleNet」中&#xff0…

十款主流的供应链管理系统盘点,优缺点一目了然!

本文将盘点十款供应链管理系统&#xff0c;为企业选型提供参考&#xff01; 想象一下&#xff0c;一家企业在生产和销售产品的过程中&#xff0c;原材料供应不及时、库存积压严重、物流配送混乱。这时&#xff0c;供应链管理系统就如同一位高效的指挥家&#xff0c;将各个环节紧…

离散制造 vs 流程制造:锚定精准制造未来,从装配线到化学反应,实时数据集成在制造业案例中的多维应用

使用 TapData&#xff0c;化繁为简&#xff0c;摆脱手动搭建、维护数据管道的诸多烦扰&#xff0c;轻量替代 OGG, Kettle 等同步工具&#xff0c;以及基于 Kafka 的 ETL 解决方案&#xff0c;「CDC 流处理 数据集成」组合拳&#xff0c;加速仓内数据流转&#xff0c;帮助企业…

Linux权限理解【Shell的理解】【linux权限的概念、管理、切换】【粘滞位理解】

目录 Linux权限理解1.Xshell命令以及运行原理2.linux权限的学习2.1linux权限的切换2.2linux权限的概念2.3linux权限管理2.3.1linux中文件访问者的分类2.3.2文件类型和访问权限(文件属性)2.3.2.1文件类型2.3.2.2文件权限拓展—文件的起始权限 2.3.3文件权限管理2.3.4文件权限的应…

腾讯云Ubuntu系统安装宝塔,配置Java环境,运行spring boot项目

致谢 本次学习宝塔部署spring boot项目&#xff0c;参考如下资料 https://www.cnblogs.com/daen/p/15997872.html 系统安装宝塔 直接用的腾讯云云服务器面板上的登录&#xff0c;你可以换成 xshell 进入宝塔官网&#xff1a; https://www.bt.cn/new/download.html 我们采…

跟《经济学人》学英文:2024年09月14日这期 Volunteering has big benefits for the elderly

Volunteering has big benefits for the elderly But those Britons who would most benefit are least likely to do it 原文&#xff1a; THE CROSSNESS Pumping Station is not what you’d expect of a sewage works. With its spiral staircases, colourful tiling and…

Sapiens——人类视觉大模型的基础

引言 大规模预训练以及随后针对特定任务的语言建模微调取得了显著成功&#xff0c;已将这种方法确立为标准做法。同样&#xff0c; 计算机视觉方法正逐步采用大规模数据进行预训练。LAION5B、Instagram-3.5B、JFT-300M、LVD142M、Visual Genome 和 YFCC100M 等大型数据集的出现…

计算机毕业设计 健身房管理系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…

Linux--守护进程与会话

进程组 概念 进程组就是一个或多个进程的集合。 一个进程组可以包含多个进程。 下面我们通过一句简单的命令行来展示&#xff1a; 为什么会有进程组&#xff1f; 批量操作&#xff1a;进程组允许将多个进程组织在一起&#xff0c;形成一个逻辑上的整体。当需要对多个进程…

Matlab如何配置小波工具(Wavelet Toolbox)

1、发现问题 因为实验要使用小波工具函数&#xff0c;运行时报错如下&#xff1a; 查看对应文件夹发现没有小波工具&#xff08;也可在控制台输入ver&#xff09;&#xff0c;检查是否有该工具&#xff0c;输入后回车返回如下&#xff1a; 2、下载工具包 没有这个工具就要去下…