RabbitMQ学习总结

目录

一:介绍        

二:应用场景

三:工作原理

组成部分说明

消息发布接收流程

四:下载安装

五:环境搭建

创建Maven工程

生产者

消费者 

 六:工作模式

Work queues

Publish/subscribe

生产者

消费者 

Routing 

生产者

​编辑消费者

 思考

 Topics

生产者

匹配规则 

Header模式

生产者

RPC

说明

七:SpringBoot整合RabbitMQ

一:添加依赖

二:配置文件

三:配置类

四:生产者

五:消费者 

八:高级特性

消费者绑定信息

消息可靠性投递

产生原因

解决方案

代码实现

生产端确认

 备份交换机

 ACK/NACK机制

总结 


一:介绍        

MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发.

RabbitMQ官方地址:http://www.rabbitmq.com/

二:应用场景

1)任务异步处理

2)应用程序解耦合

三:工作原理

组成部分说明

 Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。

 Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。

 Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。

 Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。

 Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

消息发布接收流程

 发送消息:

 1、生产者和Broker建立TCP连接。
 2、生产者和Broker建立通道。
 3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
 4、Exchange将消息转发到指定的Queue(队列)

 接收消息:

 1、消费者和Broker建立TCP连接
 2、消费者和Broker建立通道
 3、消费者监听指定的Queue(队列)
 4、当有消息到达Queue时Broker默认将消息推送给消费者。
 5、消费者接收到消息

四:下载安装

 RabbitMQ的下载地址:http://www.rabbitmq.com/download.html

 本项目使用Erlang/OTP 20.3版本和RabbitMQ3.7.3版本。

 下载erlang: http://erlang.org/download/otp_win64_20.3.exe

  erlang安装完成需要配置erlang环境变量: ERLANG_HOME=D:\Program Files\erl9.3 在path中 添加%ERLANG_HOME%\bin;

安装erlang直接下一步就好

 安装RabbitMQ: Release RabbitMQ 3.7.3 · rabbitmq/rabbitmq-server · GitHub

安装完开始菜单会有显示:

RabbitMQ Service-install :安装服务
RabbitMQ Service-remove 删除服务
RabbitMQ Service-start 启动
RabbitMQ Service-stop 启动

如果没有开始菜单则进入安装目录下sbin目录手动启动:

1)安装并运行服务
rabbitmq-service.bat install 安装服务 rabbitmq-service.bat stop 停止服务 rabbitmq-service.bat start 启动服务
2)安装管理插件
安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ
管理员身份运行 rabbitmq-plugins.bat enable rabbitmq_management

3、启动成功 登录RabbitMQ
进入浏览器,输入:http://localhost:15672

初始账号和密码:guest/guest

注:每个虚拟机就相当于一个独立的MQ,默认虚拟机的名字为/

4、如果启动失败找到.erlang.cookie,位于C:\windows\system32\config\systemprofile下,将此处的.erlang.cookie覆盖C:\user\admin.erlang.cookie后重启RabbitMQ即可解决问题

注意事项

1、安装erlang和rabbitMQ以管理员身份运行。
2、当卸载重新安装时会出现RabbitMQ服务注册失败,此时需要进入注册表清理erlang搜索RabbitMQ、ErlSrv,将对应的项全部删除。

五:环境搭建

创建Maven工程

创建生产者工程和消费者工程,分别加入RabbitMQ java client的依赖。
test-rabbitmq-producer:生产者工程
test-rabbitmq-consumer:消费者工程

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp‐client</artifactId>
<version>4.0.3</version><!‐‐此版本与spring boot 1.5.9版本匹配‐‐>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐logging</artifactId>
</dependency>

生产者

public class Producer01 {
    //队列名称
    private static final String QUEUE = "helloworld";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
            //创建与RabbitMQ服务的TCP连接
            connection = factory.newConnection();
            //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
            channel = connection.createChannel();

            /**
             * 声明队列,如果Rabbit中没有此队列将自动创建
             * param1:队列名称
             * param2:durable是否持久化,如果持久化,mq重启后队列还在
             * param3:exclusive队列是否独占此连接,队列只允许在该连接中访问,如果连接关闭队列自动删除,如果将此参数设置为true可用于临时队列的创建
             * param4:autoDelete队列不再使用时是否自动删除此队列和exclusive搭配使用
             * param5:队列参数(可以设置队列的扩展参数)
             */
            channel.queueDeclare(QUEUE, true, false, false, null);
            String message = "helloworld小明" + System.currentTimeMillis();
            /**
             * 消息发布方法
             * param1:Exchange的名称,如果没有指定,则使用Default Exchange
             * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列,如果使用默认交换机,routingKey设置为队列的名称
             * param3:消息包含的属性
             * param4:消息体
             */

            /**
             * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显
             示绑定或解除绑定
             * 默认的交换机,routingKey等于队列名称
             */
            channel.basicPublish("", QUEUE, null, message.getBytes());
            System.out.println("Send Message is:'" + message + "'");
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
}

此时显示创建了一个队列,有1条消息待消费, 消息总数为1

 

 注:队列中是可以拿到发送的消息的

 注:Purge Message可以清空该队列的消息

 注:Publish message可以在指定的队列中发消息,因此生产者发送消息时要打印发送的body,消费者端做幂等,一旦出现问题可通过控制台重新发送该消息

消费者 

注:两边都要声明队列,防止消费者启动在生产者之前报错

public class Consumer01 {
    private static final String QUEUE = "helloworld";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        //设置MabbitMQ所在服务器的ip和端口
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE, true, false, false, null);
        //定义消费方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * 消费者接收消息调用此方法
             * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
            (收到消息失败后是否需要重新发送)
             * @param properties
             * @param body
             * @throws IOException
            }
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交换机
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                //消息id
                long deliveryTag = envelope.getDeliveryTag();
                //消息内容
                String msg = new String(body);
                System.out.println("receive message.." + msg);
            }
        };
        /**
         * 监听队列String queue, boolean autoAck,Consumer callback
         * 参数明细
         * 1、队列名称
         * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
         为false则需要手动回复
         * 3、消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(QUEUE, true, consumer);
    }
}

 六:工作模式

RabbitMQ有以下几种工作模式 :
1、Work queues
2、Publish/Subscribe
3、Routing
4、Topics
5、Header
6、RPC

Work queues

工作队列模式

work queues与入门程序相比:多了一个消费端,两个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

测试:
1、使用入门程序,启动多个消费者。
2、生产者发送多个消息。

结果:
1、一条消息只会被一个消费者接收;
2、rabbit采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息。

 设置idea同时启用多个客户端

Publish/subscribe

发布订阅模式

发布订阅模式
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息 

案例:
用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法 。

生产者

1、声明exchange_fanout_inform交换机。
2、声明两个队列并且绑定到此交换机,绑定时不需要指定routingkey
3、发送消息时不需要指定routingkey

public class Producer02_publish {
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    private static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //创建一个与MQ的连接
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
            //创建一个连接
            connection = factory.newConnection();
            //创建与交换机的通道,每个通道代表一个会话
            channel = connection.createChannel();
            //声明交换机 String exchange, BuiltinExchangeType type
            /**
             * 参数明细
             * 1、交换机名称
             * 2、交换机类型,fanout、topic、direct、headers
             */
            channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, "fanout");
            //声明队列
            // (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments)
            /**
             * 参数明细:
             * 1、队列名称
             * 2、是否持久化
             * 3、是否独占此队列
             * 4、队列不用是否自动删除
             * 5、参数
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
            channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
            //交换机和队列绑定String queue, String exchange, String routingKey
            /**
             * 参数明细
             * 1、队列名称
             * 2、交换机名称
             * 3、路由key
             */
            channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");
            channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_FANOUT_INFORM, "");
            //发送消息
            for (int i = 0; i < 10; i++) {
                String message = "inform to user" + i;
                //向交换机发送消息 String exchange, String routingKey, BasicProperties props, byte[] body
                /**
                 * 参数明细
                 * 1、交换机名称,不指令使用默认交换机名称 Default Exchange
                 * 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消息将发到此队列
                 * 3、消息属性
                 * 4、消息内容
                 */
                channel.basicPublish(EXCHANGE_FANOUT_INFORM, "", null, message.getBytes());
                System.out.println("Send Message is:'" + message + "'");
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

 可以看到新声明的交换机exchange_fanout_inform

 新生成两个队列,每个队列接收到10条消息

消费者 

邮件发送消费者

public class Consumer02_subscribe_email {
    //队列名称
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String EXCHANGE_FANOUT_INFORM = "inform_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建一个与MQ的连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
        //创建一个连接
        Connection connection = factory.newConnection();
        //创建与交换机的通道,每个通道代表一个会话
        Channel channel = connection.createChannel();
        //声明交换机 String exchange, BuiltinExchangeType type
        /**
         * 参数明细
         * 1、交换机名称
         * 2、交换机类型,fanout、topic、direct、headers
         */
        channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, "fanout");
        //声明队列
        // channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        /**
         * 参数明细:
         * 1、队列名称
         * 2、是否持久化
         * 3、是否独占此队列
         * 4、队列不用是否自动删除
         * 5、参数
         */
        channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
        //交换机和队列绑定String queue, String exchange, String routingKey
        /**
         * 参数明细
         * 1、队列名称
         * 2、交换机名称
         * 3、路由key
         */
        channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");
        //定义消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                long deliveryTag = envelope.getDeliveryTag();
                String exchange = envelope.getExchange();
                //消息内容
                String message = new String(body);
                System.out.println(message);
            }
        };
        /**
         * 监听队列String queue, boolean autoAck,Consumer callback
         * 参数明细
         * 1、队列名称
         * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
         为false则需要手动回复
         * 3、消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);
    }

}

可以看到邮件的信息被消费了,短信的还在

可以看到该交换机绑定了两个队列 

Routing 

路由模式
1、每个消费者监听自己的队列,并且设置routingkey。
2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。
 

生产者

1、声明exchange_routing_inform交换机。
2、声明两个队列并且绑定到此交换机,绑定时需要指定routingkey
3、发送消息时需要指定routingkey

public class Producer03_routing {
    //队列名称
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //创建一个与MQ的连接
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
            //创建一个连接
            connection = factory.newConnection();
            //创建与交换机的通道,每个通道代表一个会话
            channel = connection.createChannel();
            //声明交换机 String exchange, BuiltinExchangeType type
            /**
             * 参数明细
             * 1、交换机名称
             * 2、交换机类型,fanout、topic、direct、headers
             */
            channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, "direct");
            //声明队列
            // channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
            /**
             * 参数明细:
             * 1、队列名称
             * 2、是否持久化
             * 3、是否独占此队列
             * 4、队列不用是否自动删除
             * 5、参数
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
            channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
            //交换机和队列绑定String queue, String exchange, String routingKey
            /**
             * 参数明细
             * 1、队列名称
             * 2、交换机名称
             * 3、路由key
             */
            channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL);
            channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS);
            //发送邮件消息
            for (int i = 0; i < 10; i++) {
                String message = "email inform to user" + i;
                //向交换机发送消息 String exchange, String routingKey, BasicProperties props, byte[] body
                /**
                 * 参数明细
                 * 1、交换机名称,不指令使用默认交换机名称 Default Exchange
                 * 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消
                 息将发到此队列
                 * 3、消息属性
                 * 4、消息内容
                 */
                channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL, null, message.getBytes());
                System.out.println("Send Message is:'" + message + "'");
            }
            //发送短信消息
            for (int i = 0; i < 10; i++) {
                String message = "sms inform to user" + i;
                //向交换机发送消息 String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null, message.getBytes());
                System.out.println("Send Message is:'" + message + "'");
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

可以看到创建了exchange_routing_inform交换机,并绑定了两个队列

可以看到对列绑定了Routing_key

消费者

邮件发送消费者

public class Consumer03_routing_email {
    //队列名称
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建一个与MQ的连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
        //创建一个连接
        Connection connection = factory.newConnection();
        //创建与交换机的通道,每个通道代表一个会话
        Channel channel = connection.createChannel();
        //声明交换机 String exchange, BuiltinExchangeType type
        /**
         * 参数明细
         * 1、交换机名称
         * 2、交换机类型,fanout、topic、direct、headers
         */
        channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, "direct");
        //声明队列
        // channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        /**
         * 参数明细:
         * 1、队列名称
         * 2、是否持久化
         * 3、是否独占此队列
         * 4、队列不用是否自动删除
         * 5、参数
         */
        channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
        //交换机和队列绑定String queue, String exchange, String routingKey
        /**
         * 参数明细
         * 1、队列名称
         * 2、交换机名称
         * 3、路由key
         */
        channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL);
        //定义消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                long deliveryTag = envelope.getDeliveryTag();
                String exchange = envelope.getExchange();
                //消息内容
                String message = new String(body);
                System.out.println(message);
            }
        };
        /**
         * 监听队列String queue, boolean autoAck,Consumer callback
         * 参数明细
         * 1、队列名称
         * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
         为false则需要手动回复
         * 3、消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);
    }

}

 思考

1、Routing模式和Publish/subscibe有啥区别?
Routing模式要求队列在绑定交换机时要指定routingkey,消息会转发到符合routingkey的队列。

 Topics

路由模式:
1、每个消费者监听自己的队列,并且设置带统配符的routingkey。
2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。 

案例:

根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效。

生产者

public class Producer04_topics {
    //队列名称
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email_topics";
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms_topics";
    private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //创建一个与MQ的连接
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
            //创建一个连接
            connection = factory.newConnection();
            //创建与交换机的通道,每个通道代表一个会话
            channel = connection.createChannel();
            //声明交换机 String exchange, BuiltinExchangeType type
            /**
             * 参数明细
             * 1、交换机名称
             * 2、交换机类型,fanout、topic、direct、headers
             */
            channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, "topic");
            //声明队列
            /**
             * 参数明细:
             * 1、队列名称
             * 2、是否持久化
             * 3、是否独占此队列
             * 4、队列不用是否自动删除
             * 5、参数
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
            channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);

            //绑定email通知队列
            channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,"inform.#.email.#");
            //绑定sms通知队列
            channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,"inform.#.sms.#");

            //发送邮件消息
            for (int i = 0; i < 10; i++) {
                String message = "email inform to user" + i;
                //向交换机发送消息 String exchange, String routingKey, BasicProperties props, byte[] body
                /**
                 * 参数明细
                 * 1、交换机名称,不指令使用默认交换机名称 Default Exchange
                 * 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消
                 息将发到此队列
                 * 3、消息属性
                 * 4、消息内容
                 */
                channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email", null, message.getBytes());
                System.out.println("Send Message is:'" + message + "'");
            }
            //发送短信消息
            for (int i = 0; i < 10; i++) {
                String message = "sms inform to user" + i;
                channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms", null, message.getBytes());
                System.out.println("Send Message is:'" + message + "'");
            }
            //发送短信和邮件消息
            for (int i = 0; i < 10; i++) {
                String message = "sms and email inform to user" + i;
                channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms.email", null, message.getBytes());
                System.out.println("Send Message is:'" + message + "'");
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

可以看到exchange_topics_inform绑定了两个Routing_key,进行通配符匹配

匹配规则 

统配符规则:
中间以“.”分隔。
符号#可以匹配多个词,符号*可以匹配一个词语。

对列绑定的Routing_key为:

inform.#.email.#

inform.#.sms.#

发送消息指定的Routing_key为:

inform.email 匹配 inform.#.email.#

inform.sms 匹配 inform.#.sms.#

inform.sms.email 匹配 inform.#.email.# 和 inform.#.sms.# 两个队列都能发送

(即使什么都没有,#也可以匹配个空)

Header模式

header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。

案例:
根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效

生产者

Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_type", "email");
Map<String, Object> headers_sms = new Hashtable<String, Object>();
headers_sms.put("inform_type", "sms");
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);

通知

String message = "email inform to user"+i;
Map<String,Object> headers = new Hashtable<String, Object>();
headers.put("inform_type", "email");//匹配email通知消费者绑定的header
//headers.put("inform_type", "sms");//匹配sms通知消费者绑定的header
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
properties.headers(headers);
//Email通知
channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());

发送邮件消费者 

channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS);
Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_email", "email");
//交换机和队列绑定
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
//指定消费队列
channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);

RPC

说明

RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:
1、客户端即是生产者就是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果
3、服务端将RPC方法 的结果发送到RPC响应队列
4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

七:SpringBoot整合RabbitMQ

一:添加依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
   <version>3.3.0</version>
</dependency>

二:配置文件

server:
  port: 44000
spring:
  application:
    name: test-SpringBoot-RabbitMQ-product
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtualHost: /

三:配置类

@Configuration
public class RabbitmqConfig {
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";

    /**
     * 交换机配置
     * ExchangeBuilder提供了fanout、direct、topic、header交换机类型的配置
     *
     * @return the exchange
     */
    @Bean(EXCHANGE_TOPICS_INFORM)
    public Exchange EXCHANGE_TOPICS_INFORM() {
        //durable(true)持久化,消息队列重启后交换机仍然存在
        return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
    }

    //声明队列
    @Bean(QUEUE_INFORM_SMS)
    public Queue QUEUE_INFORM_SMS() {
        Queue queue = new Queue(QUEUE_INFORM_SMS);
        return queue;
    }

    //声明队列
    @Bean(QUEUE_INFORM_EMAIL)
    public Queue QUEUE_INFORM_EMAIL() {
        Queue queue = new Queue(QUEUE_INFORM_EMAIL);
        return queue;
    }

    /**
     * channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#");
     * 绑定队列到交换机 .
     *
     * @param queue    the queue
     * @param exchange the exchange
     * @return the binding
     */
    @Bean
    public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
                                            @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs();
    }

    @Bean
    public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
                                              @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs();
    }
}

四:生产者

@Test
    public void testSendByTopics(){
        for (int i=0;i<5;i++){
            String message = "sms email inform to user"+i;
            rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,"inform.sms.email",message);
            System.out.println("Send Message is:'" + message + "'");
        }
    }

五:消费者 

@Component
public class ReceiveHandler {
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
    //监听email队列
    @RabbitListener(queues = {ReceiveHandler.QUEUE_INFORM_EMAIL})
    public void receive_email(String msg, Message message, Channel channel){
        System.out.println(msg);
    }
    //监听sms队列
    @RabbitListener(queues = {ReceiveHandler.QUEUE_INFORM_SMS})
    public void receive_sms(String msg,Message message,Channel channel){
        System.out.println(msg);
    }
}

八:高级特性

消费者绑定信息

1)通过@RabbitListener注解启动消费端也可以声明队列,交换机,路由信息...等等,如下:

2)尽量不要这么写,如果前置的绑定关系已经创建,只需要 @RabbitListener(queues = {ReceiveHandler.QUEUE_INFORM_SMS})用这种写法(推荐)

3)甚至直接在rabbitMQ图形化界面创建

@Component
public class ReceiveHandlerV2 {
    public static final String ROUTING_KEY = "order";
    public static final String QUEUE_NAME = "queue.order";
    public static final String EXCHANGE_DIRECT = "exchange.direct.order";


    //监听email队列
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = QUEUE_NAME, durable = "true"),
            exchange = @Exchange(value = EXCHANGE_DIRECT),
            key = {ROUTING_KEY}
    ))
    public void processMessage(String msg, Message message, Channel channel){
        System.out.println(msg);
    }
}

消息可靠性投递

产生原因

1)消息没有发送到消息对列上

2)消息成功存入消息队列,但是消息队列服务器宕机,原本保存在内存中的消息丢失

3)消息成功存入消息队列,但是消费端服务宕机,抛异常等等...

后果:消费者拿不到消息,业务功能缺失,数据错误

解决方案

1) 消息没有发送到消息队列:

  • 解决思路A:在生产端进行确认,具体操作中我们会针对交换机队列来确认,如果没有成功发送到消息队列服务器上,那就可以尝试重新发送
  • 解决思路B:为目标交换机指定备份交换机,当目标交换机投递失败时,把消息投递到备份交换机

2) 宕机导致内存中消息丢失

  • 解决思路:消息持久化到硬盘

3) 但是消费端服务宕机,抛异常

  • 消费端消费成功,给服务器返回ACK信息,然后消息队列删除该消息
  • 消费端消费失败,给服务器返回NACK信息,同时把消息恢复为待消费状态,这样就可以再次取回消息,重试一次(消费者支持幂等)

代码实现

生产端确认

注意:必须加如下两个配置,否则不生效

创建配置类:Why?首先我们需要声明一个回调函数来接受RabbitMQ服务器返回的确认信息

方法名方法功能所属接口接口所属类
confirm()确认消息是否发送到交换机ConfirmCallbackRabbitTemplate
returnedMessage()确认消息是否发送到对列ReturnsCallbackRabbitTemplate
  • 然后就是对RabbitTemplate的功能进行增强,因为回调函数所在对象必须设置到RabbitTemplate
  • 原本RabbitTemplate对象并没有生产端消息确认功能,要给它设置对应的组件才可以
  • 而设置对应的组件需要调用RabbitTemplate对象下面的两个方法
调用组件的方法所需对象类型
setConfirmCallback()ConfirmCallback接口类型
setReturnsCallback()ReturnsCallback接口类型

 配置类信息代码,如下:

package org.example.testspringbootrabbitmqproduct;

import jakarta.annotation.PostConstruct;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 只放在IOC中是不够的,需要设置在RabbitTemplate中才可以
     */
    @PostConstruct
    public void initRabbitTemplate() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    /**
     * @param correlationData correlation data for the callback.
     * @param ack true for ack, false for nack
     * @param cause An optional cause, for nack, when available, otherwise null.
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        // 消息发送到交换机成功或者失败都会回调这个方法
        System.out.println(correlationData);//成功后返回 null
        System.out.println(ack);//成功后返回 true
        System.out.println(cause);//成功后返回 null
    }

    /**
     * @param returnedMessage the returned message and metadata.
     */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        // 只有发送到队列上失败才会回调该方法
        System.out.println("消息主体: " + new String(returnedMessage.getMessage().getBody()));
        System.out.println("应答码: " + returnedMessage.getReplyCode());
        System.out.println("描述: " + returnedMessage.getReplyText());
        System.out.println("消息使用的交换器是: " + returnedMessage.getExchange());
        System.out.println("消息使用的路由键是: " + returnedMessage.getRoutingKey());

//        消息主体: 11
//        应答码: 312
//        描述: NO_ROUTE
//        消息使用的交换器是: exchange.direct.order
//        消息使用的路由键是: inform.sms.email1111
    }
}
 备份交换机

原理图

注意:备份交换机的类型必须是fanout(因为是不带路由键的)

注意:备份交换机只能是创建的时候指定

 模拟发送一个不存在的routingKey,查看备份队列已经有一条消息

 

 ACK/NACK机制

代码中开启机制的支持

思考:更新购物车的微服务消费了消息返回的ACK确认信息,然后Broker删除了信息,进而导致更新库存,更新积分的功能拿不到消息----这种情况会发生吗?  

答案:会的,因此需要广播到不同的队列中

  • 消费端代码
package org.example.testspringbootrabbitmqconsumer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class ReceiveHandlerV2 {
    public static final String ROUTING_KEY = "order";
    public static final String QUEUE_NAME = "queue.order";
    public static final String EXCHANGE_DIRECT = "exchange.direct.order";


    //监听队列
    @RabbitListener(queues = {QUEUE_NAME})
    public void processMessage(String msg, Message message, Channel channel) throws IOException {
        try {
            // 成功了返回ACK
            System.out.println("消费端接收到了消息: " + msg);
            /**
             * deliveryTag: 交付标签(对列中每条消息的唯一标识)是以为64位整数
             *              会从生产者带过来
             *              提问: 如果交换机是Fanout模式,同一个消息广播到不同对列,deliveryTag会重复吗?
             *                   不会,deliveryTag在Broker范围内唯一
             * multiple: 为true时 指定某个deliveryTag,为ACK时全部删除,为NACK时全部重新投递
             *            为false时,只做单个的处理
             */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 失败了返回NACK
            // 获取当前消息是否是重复投递
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            if (redelivered) {
                // 就不放回对列了,说明此前重复过了
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            } else {
                /**
                 * requeue: 控制消息是否重新放回队列 true:重新放回,进行投递  false:不重新投递, broker会丢弃
                 */
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
            throw new RuntimeException(e);
        }
    }
}
  •  ACK情况演示

 发送一条消息未执行basicAck()时,队列中消息还再存在

 

执行完basicAck()后队列中消息消失

  • NACK情况演示

 

 第一次是false

 第二次是true

速率显示重新投递

 分别向两个队列添加100条信息

强制让一个队列进行NACK 

 

启动消费者 ,可以看到NACK的队列一直处于Runing状态中,什么都不做的队列消费完了

结论: 配置文件配置的 acknowledge-mode: manual #把消息确认模式改为手动确认,只是起一个开启功能的作用,并不是每个队列都需要手动ack,如果不写手动ack的代码会自动进行ack

 

// 表示拒绝, 和basicNack基本一样,区别是 是否能控制批量操作 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);  

 

总结 

三个部分的操作相互配合总体上实现消息的可靠性投递

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/731976.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

openppp2 命令行接口详解

openppp2 是一个工作在 OSI/3 Layer 网络通信层的虚拟以太网工具链的开源软件&#xff0c;在查阅本文之前&#xff0c;人们可以查阅以下资料。 开源仓库&#xff1a; liulilittle/openppp2: PPP PRIVATE NETWORK™ 2 VPN Next Generation Reliable and Secure Virtual Etherne…

JavaWeb——SQL简介

1. SQL的介绍 SQL是一门结构化查询语言&#xff0c;就是一门用来操作关系型数据库的数据库语言&#xff1b; 使用SQL语句&#xff0c;可以操作所有的关系数据库&#xff1b; 但是&#xff0c;不同的关系型数据库的SQL操作略有不同&#xff0c;称为“方言”&#xff1b; 2. S…

大模型网信办备案全网最详细说明(付附件)

根据目前公开的国内大模型算法备案统计来看&#xff0c;首批境内深度合成服务算法备案清单&#xff0c;总共通过41家&#xff0c;14家互联网大厂和独角兽企业成功申报算法备案32个&#xff0c;6家新兴互联网公司成功申报算法备案9个&#xff0c;仅占比21.9%。 第二批境内…

springboot学习-图灵课堂-最详细学习

springboot-repeat springBoot学习代码说明为什么java -jar springJar包后项目就可以启动 配置文件介绍 springBoot学习 依赖引入 <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.target>8</mav…

嵌入式开发二十一:定时器之通用定时器

通用定时器是在基本定时器的基础上扩展而来&#xff0c;增加了输入捕获与输出比较等功能。高级定时器又是在通用定时器基础上扩展而来&#xff0c;增加了可编程死区互补输出、重复计数器、带刹车(断路)功能&#xff0c;这些功能主要针对工业电机控制方面。 本篇博客我们主要来学…

leetcode 二分查找·系统掌握 猜数字大小

题目&#xff1a; 题解&#xff1a; 使用最经典普通二分即可 int guessNumber(int n) {long l0,rn,mid;while(l<r){mid(rl)>>1;if(guess(mid)0)return mid;else if(guess(mid)-1)rmid-1;else lmid1;}return 0;}

MySQL的DML语句

文章目录 ☃️概述☃️DML☃️添加数据☃️更新和删除数据☃️DML的重要性 ☃️概述 MySQL 通用语法分类 ● DDL: 数据定义语言&#xff0c;用来 定义数据库对象&#xff08;数据库、表、字段&#xff09; ● DML: 数据操作语言&#xff0c;用来对数据库表中的数据进行增删改 …

如何有效管理信息技术课堂

有效管理信息技术课堂是确保学生学习效果、维护课堂秩序和提升学生兴趣的关键。以下是一些详细的方法和策略&#xff0c;旨在帮助教师更好地管理信息技术课堂&#xff1a; 一、制定明确的课堂规则 强调课堂纪律&#xff1a;确保学生明确了解并遵守课堂纪律&#xff0c;如准时…

【详细】一步一步实现一个BP神经网络-逐行代码解说

本文来自《老饼讲解-BP神经网络》https://www.bbbdata.com/ ​ 要如何使用代码实现一个BP神经网络呢&#xff1f; 下面跟随笔者&#xff0c;一步一步详细来实现&#xff0c;再对代码进行详细解说。 通过本文可以详细掌握怎么使用matlab来实现一个BP神经网络。 一、一步一步实…

去水印助手,小熊录屏,OldRoll复古胶片相机

我们将带大家了解三款特色应用,软件是经过大佬处理过的&#xff0c;都非常的好用&#xff01;今天分享给大家&#xff01;如果你也喜欢这几款软件不要忘记给博主点击点赞和再看哦&#xff01; 软件获取链接在链接的底部&#xff1a; 一键去水印助手 无论您是在各大社交平台上…

区块链技术:金融市场监管的新篇章

一、引言 随着金融科技的迅猛发展&#xff0c;区块链技术作为其中的佼佼者&#xff0c;正逐渐改变着金融市场的格局。在金融市场监管领域&#xff0c;区块链技术以其独特的优势&#xff0c;为监管机构提供了新的监管思路和手段。本文将深入探讨区块链技术在金融市场监管中的作用…

【机器学习 复习】第3章 K-近邻算法

一、概念 1.K-近邻算法&#xff1a;也叫KNN 分类 算法&#xff0c;其中的N是 邻近邻居NearestNeighbor的首字母。 &#xff08;1&#xff09;其中K是特征值&#xff0c;就是选择离某个预测的值&#xff08;例如预测的是苹果&#xff0c;就找个苹果&#xff09;最近的几个值&am…

18.cobra框架了解

目录 概述举例安装实践实践 概述 github cobra cobra 快速的实现一个命令行客户端&#xff0c;命令行解析工具。 cobra 中的主要概念 -Commands 表示执行运作-Args 执行参数-Flags 这些运作的标识符 举例 git clone 命令 git clone https://github.com/spf13/cobra.git -…

有玩家在2011年的MacBook上成功运行了Windows XP 还安装了触摸屏

我们已经在许多不同的设备上看到过 Windows XP 正在运行。这个古老的操作系统于 2001 年正式推出&#xff0c;现在已经老到其最后一次软件更新是在近十年前。一位好奇的玩家试图在 2011 年的触摸屏 MacBook 上为 Windows XP 打造了一个新家&#xff0c;复古技术探索者 Michael …

绽放光彩的小程序 UI 风格

绽放光彩的小程序 UI 风格

深度揭秘:深度学习框架下的神经网络架构进化

深度学习框架下的神经网络架构经历了从基础到复杂的显著进化&#xff0c;这一进程不仅推动了人工智能领域的突破性进展&#xff0c;还极大地影响了诸多行业应用。本文旨在深入浅出地揭示这一进化历程&#xff0c;探讨关键架构的创新点及其对现实世界的影响。 引言&#xff1a;…

反激开关电源输出假负载

1、为何需要假负载&#xff1f; 开关电源芯片的占空比最小不可能做到0%&#xff0c;都有一个最小导通时间&#xff0c;不过最小导通时间&#xff0c;在规格书中&#xff0c;不一定给出来 注意&#xff1a;如果没有最小导通时间&#xff0c;就相当于芯片都停止输出了&#xff…

Emacs之保存时删除行尾空格(一百四十二)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…

024基于SSM+Jsp的超市管理系统

开发语言&#xff1a;Java框架&#xff1a;ssm技术&#xff1a;JSPJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包…

PCB行业迈入数字化新时代,智能工厂引领未来制造

在传统的PCB生产过程中&#xff0c;人工操作、纸质记录、经验判断等方式占据了主导地位。然而&#xff0c;这种方式不仅效率低下&#xff0c;而且容易出现误差&#xff0c;导致产品质量不稳定。同时&#xff0c;随着市场竞争的加剧&#xff0c;客户对产品的交期、质量、成本等方…