RabbitMQ快速入门

文章目录

      • 1、RabbitMQ的概述
        • 1.1、什么是消息队列?
        • 1.2、为什么要使用消息队列?
        • 1.3、RabbitMQ的特点:
      • 2、RabbitMQ的安装
        • 2.1 下载与安装
        • 2.2 常用命令
      • 3、RabbitMQ消息发送和接受
        • 3.1 消息发送和接受机制
        • 3.2 AMQP的消息路由
        • 3.3 Exchange(交换机)的类型
        • 3.4 Java发送和接收 Queue的消息
        • 3.5 Java绑定Exchange 发送和接受消息
          • Direct类型的交换机:
          • Fanout类型的交换机:
          • Topic类型的交换机:
        • 3.6 消息的事务
        • 3.6 消息的发送者确认模式
          • 方式一:channel.waitForConfirms()普通发送方确认模式:
          • 方式二:channel.waitForConfirmsOrDie() 批量确认模式:
          • 方式三:channel.addConfirmListener() 异步监听发送确认模式:
        • 3.7 消息的消费者确认模式
          • 手动确认消息:
          • 开启事务,必须提交事务,消息才会被确认:
          • 消息的防重复确认:
      • 4、SpringBoot继承RabbitMQ
        • 发送者配置:分别对direct、fanout、topic类型的交换机做测试
        • 接受者配置:
      • 5、RabbitMQ集群

1、RabbitMQ的概述

1.1、什么是消息队列?

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kIKdMY5p-1690987919598)(/1604661953014.png)]

1.2、为什么要使用消息队列?

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Dw5KyBhN-1690987919599)(/1604662274592.png)]

1.3、RabbitMQ的特点:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kju4Ycoh-1690987919599)(/1604662728604.png)]在这里插入图片描述

2、RabbitMQ的安装

2.1 下载与安装

2.2 常用命令

3、RabbitMQ消息发送和接受

3.1 消息发送和接受机制

在这里插入图片描述

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-U2XTZive-1690987919601)(/1604665228403.png)]

在这里插入图片描述

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cbXo7mAn-1690987919602)(/1604665331633.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pw5qFGSo-1690987919603)(/1604665350382.png)]

3.2 AMQP的消息路由

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MpJ63S8A-1690987919603)(/1604665523333.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-e6rdzoh6-1690987919604)(/1604665496642.png)]

3.3 Exchange(交换机)的类型

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NagKZr6i-1690987919604)(/1604665614326.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TidGWjrH-1690987919604)(/1604665635312.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oTmxo3yn-1690987919604)(/1604667447351.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LqXq6jlN-1690987919605)(/1604666581680.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-grkHbM7H-1690987919605)(/1604667273821.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cCKeKePV-1690987919606)(/1604667492011.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KxyHkzXz-1690987919606)(/1604667248224.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WVl48T8J-1690987919606)(/1604668208133.png)]

3.4 Java发送和接收 Queue的消息

导入依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.8.0</version>
</dependency>

发送消息模块:

/**
 * 消息生产者,发送者
 */
public class SendMsg {
	//队列的名称
    final static String QUEUE_NAME = "myQueue";

    public static void main(String[] args) {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置mq的连接信息
        factory.setHost("192.168.65.128");  //主机ip
        factory.setPort(5672);  //端口,此端口是
        factory.setUsername("root"); //账号
        factory.setPassword("root"); //密码
        factory.setVirtualHost("/mq");  //虚拟主机,没有设置可以不写

        //定义连接
        Connection conn = null;
        //定义通道
        Channel channel = null;

        try {
            conn = factory.newConnection(); //获取连接
            channel = conn.createChannel(); //创建通道
            /**
             * 声明一个队列
             * 参数1:队列的名称
             * 参数2:是否为持久化的队列
             * 参数3:是否为排外,如果是排外,则该队列只允许一个消费者监听
             * 参数4:是否自动删除队列,如果队列中没有消息,也没有消费者连接时,就会删除该队列
             * 参数5:为队列的一些其他属性设置,一般为null即可
             * 注意事项:
             * 1、队列存在,则不创建声明,队列不存在,则会创建一个新的队列
             * 2、队列名可以是任意值,但接收消息必须保持和队列名一致,否则消息不知道发送到哪去了
             * 3、下面这行代码可有可无,但是必须确保队列是存在的,否则消息不知道发送到哪去了
             */
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            //要发送的消息
            String message = "hello RabbitMQ, this is my first use1111111";
            /**
             * 发送消息到MQ
             * 参数1:交换机的名称,为空字符串表示不使用交换机
             * 参数2:为队列或者routingKey,当指定了交换机,这个值就是RoutingKey
             * 参数3:为消息属性信息,一般为null即可
             * 参数4:为具体的消息数据的字节数组
             */
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf8"));
            System.out.println("消息发送完毕......");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            if(channel != null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if(conn != null){
                try {
                    conn.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

接收消息模块:

/**
 * 接收消息
 */
public class ReceiveMsg {

    final static String QUEUE_NAME = "myQueue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.65.128");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        factory.setVirtualHost("/mq");

        try {
            Connection conn = factory.newConnection();
            Channel channel = conn.createChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            /**
             * 接收消息
             * 参数1:当前监听的队列名称,此名字必须与发送时的队列名称保持一致,不然接收不到消息
             * 参数2:是否自动确认消息,true:会自动确认消息,并将消息从消息队列中移除
             * 参数3:消息接收者的标签,用于当多个消费者同时监听一个队列时,用于区分不同的消费者,通常为空串即可
             * 参数4:消息接收的回调函数,用于对接收的消息进行处理
             * 注意:使用basicConsume方法后,会自动启动一个线程持续监听队列,如果队列中有消息会自动接收,所以不能关闭连接和通道对象
             */
            channel.basicConsume(QUEUE_NAME, true, "",new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body,"utf8");
                    System.out.println("接收的消息: => " + msg);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PSNi1jRb-1690987919607)(/1604718939009.png)]

3.5 Java绑定Exchange 发送和接受消息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-f5cY6Sqt-1690987919607)(/1604719059849.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cfIex08q-1690987919607)(/1604719157272.png)]

Direct类型的交换机:

发送消息:

/**
 * 发送消息到direct类型的交换机,交换机与队列绑定通过routingKey,然后交换机把消息发送给队列
 */
public class DirectExchangeSendMsg {

    //队列的名称
    final static String QUEUE_NAME = "myDirectQueue";
    //交换机的名称
    final static String EXCHANGE_NAME = "directExchange";
    //消息的routingKey
    final static String ROUTING_KEY = "directRoutingKey";

    public static void main(String[] args) {
        send();
    }


    public static void send(){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.65.128");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        factory.setVirtualHost("/mq");

        Connection conn = null;
        Channel channel = null;

        try {
            conn = factory.newConnection();
            channel = conn.createChannel();
            //声明一个队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            /**
             * 声明一个交换机
             * 参数1:交换机的名称
             * 参数2:交换机的类型
             * 参数3:是否为持久化交换机
             * 注意:声明一个交换机,存在则不声明,不存在则声明,可有可无
             */
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);

            /**
             * 将队列绑定到交换机
             * 参数1:队列名称
             * 参数2:交换机名称
             * 参数3:消息的routingKey,就是BindingKey,虽然参数名为routingKey
             */
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

            /**
             * 发送消息到指定的队列
             * 参数1:交换机的名称
             * 参数2:routingKey,如果这个routingKey与某个队列绑定的交换机的routingKey一致,则消息就会被发送到这个队列中
             */
            String msg = "要发送的消息...";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes("utf8"));

            System.out.println("消息发送完毕....");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            if(channel != null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if(conn != null){
                try {
                    conn.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

接收消息:

/**
 * 消费者消费队列中的消息
 */
public class DirectExchangeReceiveMsg {

    //队列的名称
    final static String QUEUE_NAME = "myDirectQueue";
    //交换机的名称
    final static String EXCHANGE_NAME = "directExchange";
    //消息的routingKey
    final static String ROUTING_KEY = "directRoutingKey";

    public static void main(String[] args) {
       receive();
    }

    public static void receive(){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.65.128");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        factory.setVirtualHost("/mq");

        Connection conn = null;
        Channel channel = null;
        try {
            conn = factory.newConnection();
            channel = conn.createChannel();
            //声明队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
            //将队列绑定到交换机上
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
            //接收消息
            channel.basicConsume(QUEUE_NAME, true, "", new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf8");
                    System.out.println("消费 ---> " + msg);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}
Fanout类型的交换机:

发送消息:

/**
 * 发送消息到fanout类型的交换机
 */
public class FanoutExchangeSendMsg {

    //交换机的名称
    final static String EXCHANGE_NAME = "fanOutExchange";

    public static void main(String[] args) {
        send();
    }


    public static void send(){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.65.128");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        factory.setVirtualHost("/mq");

        Connection conn = null;
        Channel channel = null;

        try {
            conn = factory.newConnection();
            channel = conn.createChannel();

            //channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
            //channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

            /**
             * 由于是fanout类型的交换机,所以存在多个消费者,因此不建议在发送消息的时候创建队列,以及绑定交换机
             * 建议在消费者中创建队列并绑定交换机,但必须在发送消息时确保队列存在,所以上面的代码不写
             */
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
            String msg = "要发送的消息...";
            channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes("utf8"));


            System.out.println("消息发送完毕....");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            if(channel != null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if(conn != null){
                try {
                    conn.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

接收消息:

/**
 * 消费者消费队列中的消息
 */
public class FanOutExchangeReceiveMsg {

    //交换机的名称
    final static String EXCHANGE_NAME = "fanOutExchange";

    public static void main(String[] args) {
       receive();
    }

    public static void receive(){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.65.128");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        factory.setVirtualHost("/mq");

        Connection conn = null;
        Channel channel = null;
        try {
            conn = factory.newConnection();
            channel = conn.createChannel();
            /**
             * 声明队列,由于fanout类型的交换机类似于广播的模式,会有多个消费者来接收交换机中的数据,
             * 所以创建一个随机的队列名称
             *
             * channel.queueDeclare():创建一个随机名称的队列,不是持久化,排外的,自动删除的
             * getQueue():获取队列的名称
             */
            String queueName = channel.queueDeclare().getQueue();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
            //将队列绑定到交换机上,fanout类型的交换机没有routingKey
            channel.queueBind(queueName, EXCHANGE_NAME, "");
            //接收消息
            channel.basicConsume(queueName, true, "", new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf8");
                    System.out.println("消费 ---> " + msg);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}
Topic类型的交换机:

发送消息:

/**
 * 发送消息到topic类型的交换机
 */
public class TopicExchangeSendMsg {

    //交换机的名称
    final static String EXCHANGE_NAME = "topicExchange";

    public static void main(String[] args) {
        send();
    }


    public static void send(){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.65.128");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        factory.setVirtualHost("/mq");

        Connection conn = null;
        Channel channel = null;

        try {
            conn = factory.newConnection();
            channel = conn.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
            String msg = "要发送的消息...";
            channel.basicPublish(EXCHANGE_NAME, "aa.bb.cc", null, msg.getBytes("utf8"));

            System.out.println("消息发送完毕....");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            if(channel != null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if(conn != null){
                try {
                    conn.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

接收消息:

/**
 * 消费者消费队列中的消息
 */
public class TopicExchangeReceiveMsg01 {

    //交换机的名称
    final static String EXCHANGE_NAME = "topicExchange";
    //队列的名称
    final static String QUEUE_NAME = "topicQueue01";//取topicQueue01、topicQueue02、topicQueue03
    //topic交换机的routingKey可以包含通配符, .分割单词   # 匹配0个或者多个单词  * 匹配恰好一个单词
    final static String ROUTING_KEY = "aa"; //取aa、aa.*、aa.# 进行测试

    public static void main(String[] args) {
       receive();
    }

    public static void receive(){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.65.128");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        factory.setVirtualHost("/mq");

        Connection conn = null;
        Channel channel = null;
        try {
            conn = factory.newConnection();
            channel = conn.createChannel();

            channel.queueDeclare(QUEUE_NAME,true,false,false, null);
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
            //将队列绑定到交换机上,fanout类型的交换机没有routingKey
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
            //接收消息
            channel.basicConsume(QUEUE_NAME, true, "", new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf8");
                    System.out.println(QUEUE_NAME + "消费 ---> "+ROUTING_KEY+" --> " + msg);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

总结:

Topic类型的交换机也是消息一对多的一种交换机类型,它和fanout都能实现一个消息同时发送给多个队列;

fanout更适用于使用在一个功能不同的进程来获取数据,例如,手机App中的消息推送,一个App可能会还有很多个用户来进行安装,然后他们都会启动一个随机的队列来接收者自己的数据;

Topic更适用于不同功能模块来接收同一个消息,例如商城下单成功后需要发送消息到队列中,例如:RoutingKey为 order.success, 物流系统监听订单 order.*, 发票系统监听 order. *

Topic可以使用农随机的队列名也可以使用一个明确的队列名,但是如果应用在和订单有关的功能中,建议是有个名明确的队列 并且要求持久化的队列。

3.6 消息的事务

事务消息与数据库的事务类似,只是MQ中的消息是要保证消息是否会全部发送成功,防止丢失消息的一种策略。
RabbitMQ有两种方式来解决这个问题:

  1. 通过AMQP提供的事务机制实现
  2. 使用发送者确认模式实现

事务的使用:
事务的实现主要是对信道(Channel)的设置,主要的方法有三个:

  1. channel.txSelect()声明启动事务模式
  2. channel.txCommit()提交事务
  3. channel.txRollback()回滚事务

发送者在声明事务后没有提交,队列中不会有消息存在,只有事务提交后,才会将内存中的消息写入队列。

消费者在声明事务后,即使没有提交,也可以获取队列中的消息,并将消息从队列中移除。

回滚事务,就是将消息事务中的操作,进行撤销到事务开始前的状态。

发送者:

/**
 * 消息生产者,发送者
 */
public class TransactionSendMsg {

    final static String QUEUE_NAME = "myTransactionQueue";

    public static void main(String[] args) throws Exception {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置mq的连接信息
        factory.setHost("192.168.65.128");  //主机ip
        factory.setPort(5672);  //端口
        factory.setUsername("root"); //账号
        factory.setPassword("root"); //密码
        factory.setVirtualHost("/mq");  //虚拟主机

        //定义连接
        Connection conn = null;
        //定义通道
        Channel channel = null;

        try {
            conn = factory.newConnection(); //获取连接
            channel = conn.createChannel(); //创建通道
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //要发送的消息
            String message = "hello RabbitMQ, this is my first use1111111";
            //声明启动事务模式,只声明启动,如果没有提交事务,则事务中要发送的消息不会写入到队列中
            channel.txSelect();
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf8"));
            //int i = 10/0;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf8"));
            channel.txCommit();
            System.out.println("消息发送完毕......");
        } catch (Exception e){
            channel.txRollback();   //事务回滚,放弃当前事务中所有要提交的消息,释放内存
        } finally{
            if(channel != null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(conn != null){
                try {
                    conn.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

接受者:

/**
 * 接收消息
 */
public class TransactionReceiveMsg {

    final static String QUEUE_NAME = "myTransactionQueue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.65.128");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        factory.setVirtualHost("/mq");

        try {
            Connection conn = factory.newConnection();
            Channel channel = conn.createChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            //消费端声明开启事务,不需要提交,也可以获取消息
            channel.txSelect();
            channel.basicConsume(QUEUE_NAME, true, "",new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body,"utf8");
                    System.out.println("接收的消息: => " + msg);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

3.6 消息的发送者确认模式

事务适用于经常发生异常的情况下,当发生异常,会拒绝提交事务,而消息的发送者确认模式,适用于发生少量异常的情况,当发送消息发生异常或者消息的丢失,它会补发消息,即重新发送消息,来保证消息的一致性和正确性。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rZ544hd5-1690987919608)(/1604748519941.png)]

方式一:channel.waitForConfirms()普通发送方确认模式:
/**
 * 发送消息到direct类型的交换机------发送者普通确认模式
 */
public class ConfirmDirectExchangeSendMsg {

    //队列的名称
    final static String QUEUE_NAME = "myConfirmDirectQueue";
    //交换机的名称
    final static String EXCHANGE_NAME = "confirmDirectExchange";
    //消息的routingKey
    final static String ROUTING_KEY = "confirmDirectRoutingKey";

    public static void main(String[] args) {
        send();
    }


    public static void send(){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.65.128");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        factory.setVirtualHost("/mq");

        Connection conn = null;
        Channel channel = null;

        try {
            conn = factory.newConnection();
            channel = conn.createChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

            //启动发送者确认模式
            channel.confirmSelect();
            String msg = "要发送的消息...";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes("utf8"));
            //阻塞线程等待服务返回响应,用于判断消息是否发送成功,如果服务端确认消息发送完成,则返回true
            //还可以为这个方法指定毫秒值,用于确认我们需要等待服务端响应的超时时间,
            //如果消息超过时间,则会抛出异常InterruptedException,表示服务器出现问题,需要补发消息
            //或将消息缓存到redis中,稍后利用定时任务补发。
            //无论是返回false,还是抛出异常,消息都有可能发送成功和失败

            //如果我们的消息一定要发送到队列中,例如:订单数据,那么我们可以采用消息补发
            //就是重新发送一次消息,可以使用递归或者redis + 定时任务完成补发
            boolean b = channel.waitForConfirms();
            System.out.println("消息发送完毕...." + b);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if(channel != null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if(conn != null){
                try {
                    conn.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
方式二:channel.waitForConfirmsOrDie() 批量确认模式:
/**
 * 发送消息到direct类型的交换机------发送者批量确认模式
 */
public class BatchConfirmDirectExchangeSendMsg {

    //队列的名称
    final static String QUEUE_NAME = "myBatchConfirmDirectQueue";
    //交换机的名称
    final static String EXCHANGE_NAME = "batchConfirmDirectExchange";
    //消息的routingKey
    final static String ROUTING_KEY = "batchConfirmDirectRoutingKey";

    public static void main(String[] args) {
        send();
    }


    public static void send(){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.65.128");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        factory.setVirtualHost("/mq");

        Connection conn = null;
        Channel channel = null;

        try {
            conn = factory.newConnection();
            channel = conn.createChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

            //启动发送者确认模式
            channel.confirmSelect();
            String msg = "要发送的消息...";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes("utf8"));

            //waitForConfirmsOrDie:批量消息确认,它会同时向服务中确认之前通道中发送的所有消息是否已经全部成功写入
            //这个方法没有返回值,如果服务器中有一条消息没有能够成功或向服务器发送确认时不可访问,都被认定为消息确认失败
            //可能有消息没有发送成功,我们需要进行消息的补发。
            //如果无法向服务器获取确认消息,那么方法抛出异常InterruptedException,这是就需要补发消息到队列中
            //waitForConfirmsOrDie可以指定一个参数timeout,用于等服务器的确认时间,如果超过了这个时间也会抛出异常,表示消息需要补发。
            //注意;
            //批量消息的确认速度比普通的消息确认要快,但是如果一旦出现了消息补发的情况,我们不能确定具体是哪条消息没有完成发送,
            //需要将本次发送的所有消息全部都进行补发。
            channel.waitForConfirmsOrDie();
            System.out.println("消息发送完毕....");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if(channel != null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if(conn != null){
                try {
                    conn.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
方式三:channel.addConfirmListener() 异步监听发送确认模式:
/**
 * 发送消息到direct类型的交换机------发送者异步监听确认模式
 */
public class AsyBatchConfirmDirectExchangeSendMsg {

    //队列的名称
    final static String QUEUE_NAME = "myAsyBatchConfirmDirectQueue";
    //交换机的名称
    final static String EXCHANGE_NAME = "asyBatchConfirmDirectExchange";
    //消息的routingKey
    final static String ROUTING_KEY = "asyBatchConfirmDirectRoutingKey";

    public static void main(String[] args) {
        send();
    }


    public static void send(){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.65.128");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        factory.setVirtualHost("/mq");

        Connection conn = null;
        Channel channel = null;

        try {
            conn = factory.newConnection();
            channel = conn.createChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

            //启动发送者确认模式
            channel.confirmSelect();
            //添加监听器
            channel.addConfirmListener(new ConfirmListener() {
                //消息确认后的回调函数
                //deliveryTag:确认的消息编号,从1开始,依次递增
                //multiple: 消息是否同时确认多个,true:同时确认多条  false:确认当前一条信息
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("消息 -- " + deliveryTag + " --- " + multiple);
                }

                //消息没有别确认的回调函数
                //deliveryTag: 没没有确认的消息编号,从1开始,依次递增
                //multiple: 消息是否同时没有确认多个
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("消息 -- " + deliveryTag + " --- " + multiple);
                }
            });

            String msg = "要发送的消息...";
            //批量发送消息
            for (int i = 0; i < 10000; i++) {
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes("utf8"));
            }

            System.out.println("消息发送完毕....");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }  finally {
            if(channel != null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if(conn != null){
                try {
                    conn.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

3.7 消息的消费者确认模式

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-G6UpBwu7-1690987919609)(/1604816062591.png)]

手动确认消息:
/**
 * 消费者消费队列中的消息,
 */
public class AsyBatchConfirmDirectExchangeReceiveMsg {

    //队列的名称
    final static String QUEUE_NAME = "myAsyBatchConfirmDirectQueue";
    //交换机的名称
    final static String EXCHANGE_NAME = "asyBatchConfirmDirectExchange";
    //消息的routingKey
    final static String ROUTING_KEY = "asyBatchConfirmDirectRoutingKey";

    public static void main(String[] args) {
       receive();
    }

    public static void receive(){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.65.128");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        factory.setVirtualHost("/mq");

        Connection conn = null;
        Channel channel = null;
        try {
            conn = factory.newConnection();
            channel = conn.createChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

            //消息的手动确认
            //参数2:false:消息的手动确认 true:消息自动确认
            //自动确认,如果在处理消息的过程中出现错误,会导致消息的丢失,所以改为手动确认
            //判断消息是否手动确认,如果手动确认了,表示消息被处理了,需要从队列中移除
            channel.basicConsume(QUEUE_NAME, false, "", new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf8");
                    System.out.println("消费 ---> " + msg);
                    //获取channel对象
                    Channel c = this.getChannel();
                    //获取消息的编号
                    long tag = envelope.getDeliveryTag();
                    //消息的手动确认
                    //参数1:消息的编号
                    //参数2:true:确认多条消息,表示 编号<=tag的消息都已经被确认了
                    c.basicAck(tag, true);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}
开启事务,必须提交事务,消息才会被确认:
/**
 * 消费者消费队列中的消息,
 */
public class AsyBatchConfirmDirectExchangeReceiveMsg2 {

    //队列的名称
    final static String QUEUE_NAME = "myAsyBatchConfirmDirectQueue";
    //交换机的名称
    final static String EXCHANGE_NAME = "asyBatchConfirmDirectExchange";
    //消息的routingKey
    final static String ROUTING_KEY = "asyBatchConfirmDirectRoutingKey";

    public static void main(String[] args) {
       receive();
    }

    public static void receive(){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.65.128");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        factory.setVirtualHost("/mq");

        Connection conn = null;
        Channel channel = null;
        try {
            conn = factory.newConnection();
            channel = conn.createChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

            //声明事务
            channel.txSelect();
            //消息的手动确认
            //参数2:false:消息的手动确认 true:消息自动确认
            //自动确认,如果在处理消息的过程中出现错误,会导致消息的丢失,所以改为手动确认
            //判断消息是否手动确认,如果手动确认了,表示消息被处理了,需要从队列中移除
            channel.basicConsume(QUEUE_NAME, false, "", new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf8");
                    System.out.println("消费 ---> " + msg);
                    //获取channel对象
                    Channel c = this.getChannel();
                    //获取消息的编号
                    long tag = envelope.getDeliveryTag();
                    //消息的手动确认
                    //参数1:消息的编号
                    //参数2:true:确认多条消息,表示 编号<=tag的消息都已经被确认了
                    c.basicAck(tag, true);

                    //当确认消息时,开启了事务,如果没有提交,那么消息不会被确认,也就不会从队列中移除,所以需要手动提交事务
                    c.txCommit();
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}
消息的防重复确认:

接收和处理完成两个状态。

/**
 * 消费者消费队列中的消息,
 */
public class AsyBatchConfirmDirectExchangeReceiveMsg3 {

    //队列的名称
    final static String QUEUE_NAME = "myAsyBatchConfirmDirectQueue";
    //交换机的名称
    final static String EXCHANGE_NAME = "asyBatchConfirmDirectExchange";
    //消息的routingKey
    final static String ROUTING_KEY = "asyBatchConfirmDirectRoutingKey";

    public static void main(String[] args) {
       receive();
    }

    public static void receive(){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.65.128");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        factory.setVirtualHost("/mq");

        Connection conn = null;
        Channel channel = null;
        try {
            conn = factory.newConnection();
            channel = conn.createChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

            //声明事务
            channel.txSelect();
            //消息的手动确认
            //参数2:false:消息的手动确认 true:消息自动确认
            //自动确认,如果在处理消息的过程中出现错误,会导致消息的丢失,所以改为手动确认
            //判断消息是否手动确认,如果手动确认了,表示消息被处理了,需要从队列中移除
            channel.basicConsume(QUEUE_NAME, false, "", new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //获取channel对象
                    Channel c = this.getChannel();
                    //获取消息的编号
                    long tag = envelope.getDeliveryTag();
                    //false:表示消息没有被接收过
                    //true:表示消息被接收过了,或者被处理完成了,因此需要进行消息的防重复确认
                    boolean redeliver = envelope.isRedeliver();
                    if(!redeliver){ //消息没有被处理过
                        String msg = new String(body, "utf8");
                        System.out.println("消费 ---> " + msg);
                        //消息的手动确认
                        //参数1:消息的编号
                        //参数2:true:确认多条消息,表示 编号<=tag的消息都已经被确认了,false:表示确认当前一条消息
                        //c.basicAck(tag, true);

                        //当确认消息时,开启了事务,如果没有提交,那么消息不会被确认,也就不会从队列中移除,所以需要手动提交事务
                        //c.txCommit();
                    }else {
                        //消息被接收或处理过,则防止重复确认
                        //如果查询数据库是否已经添加了或者修改了记录,即表示消息是否被接收过一次
                        //如果经确认,该消息被接收过,但没被处理完成,则需要重新处理该消息,并确认该消息
                        //如果该消息已经处理完成,则不需要进行其他处理操作,直接调用下面的代码
                        //c.basicAck(tag, false);
                    }
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

4、SpringBoot继承RabbitMQ

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring.rabbitmq.host=192.168.65.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.virtual-host=/mq

发送者配置:分别对direct、fanout、topic类型的交换机做测试

配置:

@Configuration
public class RabbitMQConfig {

    /**
     * 配置一个direct类型的交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("bootDirectExchange");
    }
    /**
     * 配置一个fanout类型的交换机
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("bootFanoutExchange");
    }

    /**
     * 配置一个topic类型的交换机
     * @return
     */
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("bootTopicExchange");
    }

    /**
     * 配置一个队列
     * @return
     */
    @Bean
    public Queue queue(){
        return new Queue("bootQueue");
    }

    /**
     * 配置一个队列和交换机绑定
     * @param queue 队列
     * @param directExchange 交换机
     * @return
     */
    @Bean
    public Binding binding(Queue queue, DirectExchange directExchange){
        //绑定一个队列到交换机
        return BindingBuilder.bind(queue).to(directExchange).with("bootRoutingKey");
    }
}

serviceIMpl:

@Service("sendService")
public class SendMsgImpl implements SendService {

    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
     * 发送消息到direct类型的交换机
     * @param message
     */
    @Override
    public void sendMsg(String message) {
        /**
         * 参数1:交换机名称
         * 参数2:routingKey
         * 参数3:发送的消息对象
         */
        amqpTemplate.convertSendAndReceive("bootDirectExchange","bootRoutingKey", message);
        System.out.println("消息发送完毕....");
    }

    /**
     * 发送消息到fanout类型的交换机
     * @param message
     */
    @Override
    public void sendFanoutMsg(String message) {
        amqpTemplate.convertAndSend("bootFanoutExchange","",message);
        System.out.println("消息发送完毕....");
    }

    /**
     * 发送消息到topic类型的交换机
     * @param message
     */
    @Override
    public void sendTopicMsg(String message, String key) {
        amqpTemplate.convertAndSend("bootTopicExchange",key,message);
        System.out.println("消息发送完毕....");
    }
}

启动类:

@SpringBootApplication
public class SpringbootMqSendApplication {

    public static void main(String[] args) {
        ApplicationContext  context = SpringApplication.run(SpringbootMqSendApplication.class, args);
        SendService sendService = context.getBean(SendService.class);
        //sendService.sendMsg("springboot继承rabbitMq发送消息、.....");
        //sendService.sendFanoutMsg("springboot继承rabbitMq发送消息、.....");
        sendService.sendTopicMsg("springboot继承rabbitMq发送消息","aa.bb.cc");
    }
}

接受者配置:

@Configuration
public class RabbitMQConfig {

    /**
     * 配置一个direct类型的交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("bootDirectExchange");
    }

    /**
     * 配置一个队列
     * @return
     */
    @Bean
    public Queue queue(){
        return new Queue("bootQueue");
    }

    /**
     * 配置一个队列和交换机绑定
     * @param queue 队列
     * @param directExchange 交换机
     * @return
     */
    @Bean
    public Binding binding(Queue queue, DirectExchange directExchange){
        //绑定一个队列到交换机
        return BindingBuilder.bind(queue).to(directExchange).with("bootRoutingKey");
    }
}

serviceIMpl:

@Service("receiveService")
public class ReceiveMsgImpl implements ReceiveService {

    @Autowired
    private AmqpTemplate amqpTemplate;


    /**
     * 注意,此方法不是持续接收消息的,每启动一次就接收一次消息,因此不使用这种方式
     */
/*    @Override
    public void receiveMsg() {
        String msg = (String) amqpTemplate.receiveAndConvert("bootQueue");
        System.out.println("接收到的消息 --- > " + msg);
    }*/


    /**
     * direct类型的交换机
     * @RabbitListener注解:作用是标记这是一个RabbitMQ的消息监听方法,用于持续的接收消息
     * 被标记的方法不需要手动调用,由spring自动调用。
     * queues:指定监听的消息队列
     * message:接收到的具体的消息
     *
     * 注意:这个方法如果发生异常,能接受到消息,但是消息没有别确认,需要进行消息的防重复确认
     *      如果正常,spring会确认消息,将消息从队列中移除
     */
    @RabbitListener(queues = "bootQueue")
    public void receiveMsg(String message) {
        System.out.println("接收到的消息 --- > " + message);
        //int i = 1/0;
    }



    //接收fanout类型绑定队列的消息,有两个,fanout类型的交换机为一对多
    //fanout类型的交换机,@QueueBinding完成队列和交换机的绑定,@Queue:用于生成一个随机的队列,@Exchange:创建一个交换机
    @RabbitListener(bindings = @QueueBinding(value = @Queue(),
                                            exchange = @Exchange(name = "bootFanoutExchange",type = ExchangeTypes.FANOUT)))
    public void fanoutReceiveMsg01(String message) {
        System.out.println("fanoutReceiveMsg01==》接收到的消息 --- > " + message);
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue(),
                                             exchange = @Exchange(name = "bootFanoutExchange",type = ExchangeTypes.FANOUT)))
    public void fanoutReceiveMsg02(String message) {
        System.out.println("fanoutReceiveMsg02==》接收到的消息 --- > " + message);
    }



    //接收topic类型的绑定的队列的消息
    @RabbitListener(bindings = @QueueBinding(value=@Queue(name = "topicReceiveMsg01"),key = "aa",
                                                exchange = @Exchange(value = "bootTopicExchange",type = ExchangeTypes.TOPIC)))
    public void topicReceiveMsg01(String message){
        System.out.println("topicReceiveMsg01==》接收到的消息 aa --- > " + message);
    }

    @RabbitListener(bindings = @QueueBinding(value=@Queue(name = "topicReceiveMsg02"),key = "aa.*",
            exchange = @Exchange(value = "bootTopicExchange",type = ExchangeTypes.TOPIC)))
    public void topicReceiveMsg02(String message){
        System.out.println("topicReceiveMsg02==》接收到的消息 aa.* --- 1 > " + message);
    }

    @RabbitListener(bindings = @QueueBinding(value=@Queue(name = "topicReceiveMsg03"),key = "aa.#",
            exchange = @Exchange(value = "bootTopicExchange",type = ExchangeTypes.TOPIC)))
    public void topicReceiveMsg03(String message){
        System.out.println("topicReceiveMsg03==》接收到的消息 aa.# --- 0/n > " + message);
    }
}

5、RabbitMQ集群

=》接收到的消息 — > " + message);
}

@RabbitListener(bindings = @QueueBinding(value = @Queue(),
                                         exchange = @Exchange(name = "bootFanoutExchange",type = ExchangeTypes.FANOUT)))
public void fanoutReceiveMsg02(String message) {
    System.out.println("fanoutReceiveMsg02==》接收到的消息 --- > " + message);
}



//接收topic类型的绑定的队列的消息
@RabbitListener(bindings = @QueueBinding(value=@Queue(name = "topicReceiveMsg01"),key = "aa",
                                            exchange = @Exchange(value = "bootTopicExchange",type = ExchangeTypes.TOPIC)))
public void topicReceiveMsg01(String message){
    System.out.println("topicReceiveMsg01==》接收到的消息 aa --- > " + message);
}

@RabbitListener(bindings = @QueueBinding(value=@Queue(name = "topicReceiveMsg02"),key = "aa.*",
        exchange = @Exchange(value = "bootTopicExchange",type = ExchangeTypes.TOPIC)))
public void topicReceiveMsg02(String message){
    System.out.println("topicReceiveMsg02==》接收到的消息 aa.* --- 1 > " + message);
}

@RabbitListener(bindings = @QueueBinding(value=@Queue(name = "topicReceiveMsg03"),key = "aa.#",
        exchange = @Exchange(value = "bootTopicExchange",type = ExchangeTypes.TOPIC)))
public void topicReceiveMsg03(String message){
    System.out.println("topicReceiveMsg03==》接收到的消息 aa.# --- 0/n > " + message);
}

}


### 5、RabbitMQ集群















本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/57244.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

该选择WPF 还是 Winform?

WPF和WinForms都是.NET平台下的桌面应用程序开发框架&#xff0c;它们各有特点&#xff0c;适用于不同的场景和需求。下面是对WPF和WinForms的一些比较和优劣势&#xff1a;WPF&#xff08;Windows Presentation Foundation&#xff09;&#xff1a;WPF具有强大的图形渲染能力&…

UML—用例图的那些事

目录 背景: 1.用例图的发展史 过程: 1.用例图中的元素和关系 2.应用中的例子 总结&#xff1a; 背景: 1.用例图的发展史 用例图是一种常用的软件工程工具&#xff0c;用于描述系统的功能需求和用户与系统的交互。它在软件开发过程中起到了重要的作用&#xff0c;并且经历了…

使用Vue+CSS实现汉堡图标过渡为叉号图标,有点意思

前言 本文给大家分享三个具有过渡效果的汉堡图标&#xff0c;当点击汉堡图标时&#xff0c;过渡为叉号图标。这种具有过渡特效的图标挺炫酷的&#xff0c;感觉一下子给网页增加一点新颖特色。早在2015年左右&#xff0c;国外挺多优秀门户网站都有使用类似的图标&#xff0c;那…

学无止境·运维高阶③(Mysqldump脚本)

Mysqldump脚本 1、详细脚本2、执行 1、详细脚本 #!/bin/bash mysql_cmd‘-uroot -pRedHat123’ exclude_db‘information_schema|performance_schema|sys’ bak_path/backup/db mysql m y s q l c m d − e ′ s h o w d a t a b a s e s ′ − N ∣ e g r e p − v " {m…

uniapp小程序console.log在微信开发者工具中不打印问题

最近在开发一款uniapp小程序&#xff0c;发现console.log在微信开发者工具中不打印&#xff0c;但在H5页面就能够有打印输出&#xff0c;于是在网上寻找原因… 主要是由于vue.config.js文件中有设置发布时删除console的配置&#xff0c;如下&#xff1a; 官网参考地址&#x…

如何知道企业是否办理过等保备案?哪里可以查询?

对于等保政策细节&#xff0c;大家还存在很多疑问&#xff0c;例如有人在问&#xff0c;如何知道企业是否办理过等保备案&#xff1f;哪里可以查询&#xff1f;今天我们就来简单聊聊&#xff0c;仅供参考。 如何知道企业是否办理过等保备案&#xff1f; 一般企业办理过等保备案…

MySQL的索引使用的数据结构,事务知识

一、索引的数据结构&#x1f338; 索引的数据结构&#xff08;非常重要&#xff09; mysql的索引的数据结构&#xff0c;并非定式&#xff01;&#xff01;&#xff01;取决于MySQL使用哪个存储引擎 数据库这块组织数据使用的数据结构是在硬盘上的。我们平时写的代码是存在内存…

计算机毕设 深度学习猫狗分类 - python opencv cnn

文章目录 0 前言1 课题背景2 使用CNN进行猫狗分类3 数据集处理4 神经网络的编写5 Tensorflow计算图的构建6 模型的训练和测试7 预测效果8 最后 0 前言 &#x1f525; 这两年开始毕业设计和毕业答辩的要求和难度不断提升&#xff0c;传统的毕设题目缺少创新和亮点&#xff0c;往…

Elasticsearch搜索引擎系统入门

目录 【认识Elasticsearch】 Elasticsearch主要应用场景 Elasticsearch的版本与升级 【Elastic Stack全家桶】 Logstash Kibana Beats Elasticsearch在日志场景的应用 Elasticsearch与数据库的集成 【安装Elasticsearch】 安装插件 安装Kibana 安装Logstash 【认…

@Transactional详解(作用、失效场景与解决方法)

一、了解Transactional注解&#xff0c;先要知道事务是什么&#xff0c;但既然知道了这个注解&#xff0c;不知道事务是什么&#xff0c;那就重新再去学习一遍数据库吧&#xff0c;这里讲解开发中代码实现事务的方式 1、编程式事务&#xff08;开发用的很少了&#xff09; 基于…

【Linux】网络基础——宏观认识计算机网络

1 计算机网络背景 网络发展 独立模式: 计算机之间相互独立; 一开始&#xff0c;计算机发明出来之后&#xff0c;一台计算机处理完的数据&#xff0c;数据会保存在软盘&#xff08;物理&#xff09;&#xff0c;通过人之间的相互通信&#xff0c;把计算机A处理完的数据存储到软…

与传统的学生宿舍供电系统相比预付费安全用电管理系统优势-安科瑞黄安南

摘 要&#xff1a;为消除高校学生因违章使用大功率电器&#xff0c;导致宿舍用电线路过载&#xff0c;从而引发火灾的隐患&#xff0c;文章将安全用电与用电管理统一考虑&#xff0c;设计并实施了安全用电智能控制与管理系统。对该系统的工作原理和功能进行了详细介绍&#xff…

python中数据可视化

1.掷一个D6和一个D10 50000次的结果 die.py from random import randintclass Die:def __init__(self, num_sides6):self.num_sides num_sidesdef roll(self):return randint(1, self.num_sides) die_visual.py from die import Die from plotly.graph_objs import Bar, L…

PoseiSwap:基于 Nautilus Chain ,构建全新价值体系

在 DeFi Summer 后&#xff0c;以太坊自身的弊端不断凸显&#xff0c;而以 Layer2 的方式为其扩容成为了行业很长一段时间的叙事方向之一。虽然以太坊已经顺利的从 PoW 的 1.0 迈向了 PoS 的 2.0 时代&#xff0c;但以太坊创始人 Vitalik Buterin 表示&#xff0c; Layer2 未来…

Kafka-消费者组消费流程

消费者向kafka集群发送消费请求&#xff0c;消费者客户端默认每次从kafka集群拉取50M数据&#xff0c;放到缓冲队列中&#xff0c;消费者从缓冲队列中每次拉取500条数据进行消费。

opencv36-形态学操作-膨胀 cv2.dilate()

膨胀操作是形态学中另外一种基本的操作。膨胀操作和腐蚀操作的作用是相反的&#xff0c;膨胀操作能对图像的边界进行扩张。膨胀操作将与当前对象&#xff08;前景&#xff09;接触到的背景点合并到当前对象内&#xff0c;从而实现将图像的边界点向外扩张。如果图像内两个对象的…

C 语言高级1-内存分区,多级指针,位运算

目录 1. 内存分区 1.1 数据类型 1.1.1 数据类型概念 1.1.2 数据类型别名 1.1.3 void数据类型 1.1.4 sizeof操作符 1.1.5 数据类型总结 1.2 变量 1.1.1 变量的概念 3.1.2 变量名的本质 1.3 程序的内存分区模型 1.3.1 内存分区 1.3.1.1 运行之前 1.3.1.2运行之后 1…

目标检测与跟踪 (2)- YOLO V8配置与测试

系列文章目录 第一章 目标检测与跟踪 &#xff08;1&#xff09;- 机器人视觉与YOLO V8 目标检测与跟踪 &#xff08;1&#xff09;- 机器人视觉与YOLO V8_Techblog of HaoWANG的博客-CSDN博客3D物体实时检测、三维目标识别、6D位姿估计一直是机器人视觉领域的核心研究课题&a…

苹果Vision Pro正式发布,下一个iPhone诞生了?

在库克即将退休之际&#xff0c;苹果开启了下一个十年。 2023年6月6日&#xff0c;在苹果WWDC开发者大会上&#xff0c;苹果发布了15寸的MacBook Air&#xff0c;以及一众iOS 17、iPad OS 17、Mac OS等系统的更新。当我们觉得这些常规更新有点不痛不痒&#xff0c;甚至想大呼“…

【uniapp 样式】使用setStorageSync存储历史搜索记录

<template><view><view class"zhuangbox u-flex"><u--inputplaceholder"请输入关键字搜索"border"surround"shapecircleprefixIcon"search"prefixIconStyle"font-size: 22px;color: #909399"v-model&q…