RabbitMQ二、RabbitMQ的六种模式

一、RabbitMQ的六种模式

  1. RabbitMQ共有六种工作模式:
    • 简单模式(Simple)
    • 工作队列模式(Work Queue)
    • 发布订阅模式(Publish/Subscribe)
    • 路由模式(Routing)
    • 通配符模式(Topics)
    • 远程调用模式(RPC,不常用,不对此模式进行讲解)

1、RabbitMQ的简单模式

在这里插入图片描述

  1. rabbitmq简单模式的特点:
      1. 一个生产者对应一个消费者,通过队列进行消息传递。
      1. 该模式使用direct交换机,direct交换机是RabbitMQ默认交换机

JMS

  1. 由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则——JMS,用于操作消息中间件。
  2. JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API。
  3. JMS是JavaEE规范中的一种,类比JDBC。很多MQ产品都实现了JMS规范,例如ActiveMQ等产品。
    • RabbitMQ官方并没有实现JMS规范,但是开源社区有JMS的实现包。

java操作rabbitmq的简单模式

  1. 操作之前记得启动rabbitmq服务(rabbitmq的客户端不用开启也行)
docker start rabbitmq
  1. 注意:启动rabbitmq的web客户端是为了能访问它的客户端界面
docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management
第一步:创建项目(使用简单的maven项目即可)并添加RabbitMQ依赖

在这里插入图片描述

  • 依赖
<dependencies>
  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.0</version>
  </dependency>
</dependencies>
第二步:生产者和消费者代码的编写
  • 创建生产者(producer)代码
package com.knife.demo01.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

// 生产者
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.70.130"); // 虚拟机ip
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");

        // 2.创建连接
        Connection connection = connectionFactory.newConnection();

        // 3.建立信道
        Channel channel = connection.createChannel();

        // 4.创建队列,如果队列已存在,则使用该队列
        /**
         * 参数1:队列名
         * 参数2:是否持久化,true表示MQ重启后队列还在。
         * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
         * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
         * 参数5:其他额外参数
         */
        channel.queueDeclare("simple_queue",false,false,false,null);

        // 5.发送消息
        String message = "hello!rabbitmq!";
        /**
         * 参数1:交换机名,""表示默认交换机direct
         * 参数2:路由键,简单模式就是队列名
         * 参数3:其他额外参数
         * 参数4:要传递的消息字节数组
         */
        channel.basicPublish("","simple_queue",null,message.getBytes());

        // 6.关闭信道和连接
        channel.close();
        connection.close();
        System.out.println("===发送成功===");
    }
}


运行生产者代码结果:
在这里插入图片描述
在这里插入图片描述

  • 创建消费者(consumer)代码
package com.knife.demo01.simple;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

// 消费者
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.70.130"); // 虚拟机ip
        connectionFactory.setPort(5672); // 端口号
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        // 2.创建连接
        Connection connection = connectionFactory.newConnection();
        // 3.建立信道
        Channel channel = connection.createChannel();
        // 4.监听队列
        /**
         * 参数1:监听的队列名
         * 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息
         * 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费
         */
        channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                接收消息
                String message = new String(body, "UTF-8");
                System.out.println("接受消息,消息为:"+message);
            }
        });
    }
}


运行消费者代码结果:
在这里插入图片描述

2、RabbitMQ的工作队列模式(相比简单模式,处理消息的消费者增多了)

在这里插入图片描述

  1. 工作队列模式(Work Queue)与简单模式相比,多了一些消费者,该模式也使用direct交换机(默认使用的交换机),应用于处理消息较多的情况。特点如下:

      1. 一个队列对应多个消费者。
      1. 一条消息只会被一个消费者消费。
      1. 消息队列默认采用轮询的方式将消息平均发送给消费者。
      1. 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系
  2. Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可
    在这里插入图片描述

java操作rabbitmq的工作队列模式

  1. Work Queues 的入门程序与上面简单模式的代码几乎是一样的。可以完全复制,只是比简单模式的消费者多几个而已,可以让多个消费者同时对消费消息的测试。
  2. 在简单模式的基础下编写代码(直接用同一个项目)
    在这里插入图片描述
  • 消息生产者代码:
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
//        创建工厂连接
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("192.168.70.130"); //虚拟机地址
        cf.setPort(5672); //rabbitmq的端口号
        cf.setUsername("admin"); // 用户名
        cf.setPassword("admin"); // 密码
        cf.setVirtualHost("/"); 

//        创建连接
        Connection connection = cf.newConnection();

//        创建信道
        Channel channel = connection.createChannel();

//        创建队列
        /**
         * 参数1:队列名称
         * 参数二:是否持久化
         * 参数三:是否私有化
         * 参数4:队列使用完毕后是否自动删除
         */
        channel.queueDeclare("work_queue",true,false,false,null);

//        发送消息
        /**
         * 参数1:交换机名,""表示默认交换机direct
         * 参数2:路由键,简单模式就是队列名
         * 参数3:其他额外参数
         * 参数4:要传递的消息字节数组
         */
        for (int i = 1; i <= 10; i++) {
            channel.basicPublish("","work_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,
                    ("你好,这是今天的第"+i+"条消息").getBytes());
        }

//        关闭资源
        channel.close();
        connection.close();
        System.out.println("消息发送成功====");
    }
}

  • 消息消费者代码(多个消费者可以代码复用,复用下面的代码,多创几个类)
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工程
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("192.168.70.130");
        cf.setPort(5672);
        cf.setUsername("admin");
        cf.setPassword("admin");
        cf.setVirtualHost("/");
        //2.创建连接
        Connection conn = cf.newConnection();

        //3.创建信道
        Channel channel = conn.createChannel();

//        4. 接收消息
        channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("消费者1 = " + msg);
            }
        });

        channel.close();
        conn.close();
    }

}

代码运行的效果和上面简单模式的类似。

3、RabbitMQ的发布订阅模式(相比工作队列模式,交换机使用的类型是Fanout(广播类型))

在这里插入图片描述

  • 图解说明
    • P:生产者,向 Exchange 发送消息
    • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与交换机绑定的队列
    • C1、C2:消费者,其所在队列要与交换机进行绑定
  1. 在开发过程中,有一些消息需要不同的消费者进行不同的处理
    • 如电商网站的同一条促销信息需要短信发送邮件发送站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe)
  2. 在订阅模型中,多了一个Exchange角色,而且过程略有变化:
    • Producer:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
    • Consumer:消费者,消息的接收者,会一直等待消息到来
    • Queue:消息队列,接收消息、缓存消息
    • Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
      • Fanout:广播类型,将消息交给所有绑定到交换机的队列
      • Direct:定向类型,把消息交给符合指定routing key 的队列
      • Topic:通配符类型,把消息交给符合routing pattern(路由模式) 的队列,Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
  3. 特点:
      1. 生产者(Producer)将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中(即可以转发到多个队列中)。
      1. 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机

java操作rabbitmq的发布订阅模式

  1. 在简单模式的基础下编写代码(直接用同一个项目)
    在这里插入图片描述
    其实代码还是差不多的,只是使用到的交换机不一样了而已。
  • Producer
public class Producer {
    // 生产者
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.70.130");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        // 2.创建连接
        Connection connection = connectionFactory.newConnection();
        // 3.建立信道
        Channel channel = connection.createChannel();
        // 4.创建交换机
        /**
         * 参数1:交换机名
         * 参数2:交换机类型
         * 参数3:交换机持久化
         */
//        BuiltinExchangeType.FANOUT:表示广播模式
        channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT, true);

        // 5.创建队列
//        队列1
        channel.queueDeclare("SEND_MAIL", true, false, false, null);
//        队列2
        channel.queueDeclare("SEND_MESSAGE", true, false, false, null);
//        队列3
        channel.queueDeclare("SEND_STATION", true, false, false, null);

        // 6.交换机绑定队列
        /**
         * 参数1:队列的名称
         * 参数2:交换机名
         * 参数3:路由关键字,发布订阅模式写""即可
         */
        channel.queueBind("SEND_MAIL", "exchange_fanout", "");
        channel.queueBind("SEND_MESSAGE", "exchange_fanout", "");
        channel.queueBind("SEND_STATION", "exchange_fanout", "");

        // 7.发送消息
        channel.basicPublish("exchange_fanout", "", null,
                ("618商品开抢了!").getBytes(StandardCharsets.UTF_8));
//        for (int i = 1; i <= 10; i++) {
//            channel.basicPublish("exchange_fanout", "", null,
//                    ("你好,尊敬的用户,秒杀商品开抢了!" + i).getBytes(StandardCharsets.UTF_8));
//        }

        // 8.关闭资源
        channel.close();
        connection.close();
    }
}

  • ConsumerMail(消费者代码都类似)
public class ConsumerMail {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工程
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("192.168.70.130");
        cf.setPort(5672);
        cf.setUsername("admin");
        cf.setPassword("admin");
        cf.setVirtualHost("/");
        //2.创建连接
        Connection conn = cf.newConnection();
        //3.创建信道
        Channel channel = conn.createChannel();

        //4.监听队列
        channel.basicConsume("SEND_MAIL",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("发送邮件消息 = " + msg);
            }
        });
    }
}

  • ConsumerMesage
public class ConsumerMessage {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工程
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("192.168.70.130");
        cf.setPort(5672);
        cf.setUsername("admin");
        cf.setPassword("admin");
        cf.setVirtualHost("/");
        //2.创建连接
        Connection conn = cf.newConnection();
        //3.创建信道
        Channel channel = conn.createChannel();
        //4.监听队列
        channel.basicConsume("SEND_MESSAGE",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("发送短信消息 = " + msg);
            }
        });
    }
}

  • ConsumerStation
public class ConsumerStation {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工程
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("192.168.70.130");
        cf.setPort(5672);
        cf.setUsername("admin");
        cf.setPassword("admin");
        cf.setVirtualHost("/");
        //2.创建连接
        Connection conn = cf.newConnection();
        //3.创建信道
        Channel channel = conn.createChannel();
        //4.监听队列
        channel.basicConsume("SEND_STATION",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("发送站內信 = " + msg);
            }
        });
    }
}

4、RabbitMQ的路由模式(相比发布订阅模式,多了个Route Key)

  1. 使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多时候,不是所有消息都无差别的发布到所有队列中。比如电商网站的促销活动,双十一大促可能会发布到所有队列;而一些小的促销活动为了节约成本,只发布到站内信队列。此时需要使用路由模式(Routing)完成这一需求。
    在这里插入图片描述
  • 图解说明
    • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
    • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
    • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
    • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息
  1. 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key),消息的发送方在向Exchange发送消息时,也必须指定消息的 RoutingKey,Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息。

java操作rabbitmq的路由模式

  1. 在简单模式的基础下编写代码(直接用同一个项目)
    在这里插入图片描述

其实代码还是差不多的,只是使用到的交换机不一样了,多了一个route Key。

  • Producer
public class Producer {
    // 生产者
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.70.130");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        // 2.创建连接
        Connection connection = connectionFactory.newConnection();
        // 3.建立信道
        Channel channel = connection.createChannel();
        // 4.创建交换机
        /**
         * 参数1:交换机名
         * 参数2:交换机类型
         * 参数3:交换机持久化
         */
//      BuiltinExchangeType.DIRECT
        channel.exchangeDeclare("routing_exchange", BuiltinExchangeType.DIRECT, true);

        // 5.创建队列
//        队列1
        channel.queueDeclare("SEND_MAILRoute", true, false, false, null);
//        队列2
        channel.queueDeclare("SEND_MESSAGERoute", true, false, false, null);
//        队列3
        channel.queueDeclare("SEND_STATIONRoute", true, false, false, null);

        // 6.交换机绑定队列
        /**
         * 参数1:队列的名称
         * 参数2:交换机名
         * 参数3:路由关键字(路由名称)
         */
        channel.queueBind("SEND_MAILRoute", "routing_exchange", "import");
        channel.queueBind("SEND_MESSAGERoute", "routing_exchange", "import");
        channel.queueBind("SEND_STATIONRoute", "routing_exchange", "import");
        channel.queueBind("SEND_STATIONRoute", "routing_exchange", "normal");

        // 7.发送消息
        channel.basicPublish("routing_exchange", "import", null,
                ("618商品开抢了!").getBytes(StandardCharsets.UTF_8));

        channel.basicPublish("routing_exchange", "normal", null,
                ("normal路由的息").getBytes(StandardCharsets.UTF_8));
//        for (int i = 1; i <= 10; i++) {
//            channel.basicPublish("exchange_fanout", "", null,
//                    ("你好,尊敬的用户,秒杀商品开抢了!" + i).getBytes(StandardCharsets.UTF_8));
//        }

        // 8.关闭资源
        channel.close();
        connection.close();
    }
}

  • ConsumerMail(消费者代码都类似)
public class ConsumerMail {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工程
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("192.168.70.130");
        cf.setPort(5672);
        cf.setUsername("admin");
        cf.setPassword("admin");
        cf.setVirtualHost("/");
        //2.创建连接
        Connection conn = cf.newConnection();
        //3.创建信道
        Channel channel = conn.createChannel();

        //4.监听队列
        channel.basicConsume("SEND_MAILRoute",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("发送邮件消息 = " + msg);
            }
        });
    }
}
  • ConsumerMesage
public class ConsumerMessage {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工程
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("192.168.70.130");
        cf.setPort(5672);
        cf.setUsername("admin");
        cf.setPassword("admin");
        cf.setVirtualHost("/");
        //2.创建连接
        Connection conn = cf.newConnection();
        //3.创建信道
        Channel channel = conn.createChannel();
        //4.监听队列
        channel.basicConsume("SEND_MESSAGERoute",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("发送短信消息 = " + msg);
            }
        });
    }
}
  • ConsumerStation
public class ConsumerStation {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工程
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("192.168.70.130");
        cf.setPort(5672);
        cf.setUsername("admin");
        cf.setPassword("admin");
        cf.setVirtualHost("/");
        //2.创建连接
        Connection conn = cf.newConnection();
        //3.创建信道
        Channel channel = conn.createChannel();
        //4.监听队列
        channel.basicConsume("SEND_STATIONRoute",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("发送站內信 = " + msg);
            }
        });
    }
}

5、RabbitMQ的通配符模式(相比路由模式,路由的组成中带上了通配符)

在这里插入图片描述

  1. 通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,使用topic交换机
  2. 通配符规则:
      1. 消息设置RoutingKey时,RoutingKey由多个单词构成,中间以.分割。
      1. 队列设置RoutingKey时,#可以匹配任意多个单词,*可以匹配任意一个单词。
        在这里插入图片描述
  • 红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到
  • 黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配

java操作rabbitmq的通配符模式

  1. 在简单模式的基础下编写代码(直接用同一个项目)
    在这里插入图片描述
    其实代码还是差不多的,只是route Key的组成多了通配符。
  • Producer
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("192.168.126.10");
        cf.setPort(5672);
        cf.setUsername("admin");
        cf.setPassword("admin");
        cf.setVirtualHost("/");
        //2.创建连接
        Connection conn = cf.newConnection();
        //3.创建信道
        Channel channel = conn.createChannel();
        //4.创建交换机
        /**
         * 参数1:交换机名
         * 参数2:交换机类型
         * 参数3:交换机持久化
         */
        channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC, true);
        /**
         * 参数1:队列名
         * 参数2:是否持久化,true表示MQ重启后队列还在。
         * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
         * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
         * 参数5:其他额外参数
         */

        //5.创建队列(举例订单给手机发送,邮箱,站内)发送消息,3个队列
        channel.queueDeclare("SEND_MAIL3", true, false, false, null);
        channel.queueDeclare("SEND_MESSAGE3", true, false, false, null);
        channel.queueDeclare("SEND_STATION3", true, false, false, null);
        //6.将队列和交换机绑定
        /**
         * 参数1:队列名
         * 参数2:交换机名
         * 参数3:路由关键字,发布订阅模式写""即可
         */
        channel.queueBind("SEND_MAIL3", "exchange_topic", "#.mail.#");
        channel.queueBind("SEND_MESSAGE3", "exchange_topic", "#.message.#");
        channel.queueBind("SEND_STATION3", "exchange_topic", "#.station.#");
        //8.发送消息
        channel.basicPublish("exchange_topic", "mail.message.station", null, "618大促销活动".getBytes(StandardCharsets.UTF_8));
        channel.basicPublish("exchange_topic", "station", null, "618小促销活动".getBytes(StandardCharsets.UTF_8));
        //9.关闭资源
        channel.close();
        conn.close();
        System.out.println("发送消息成功");
    }
}

  • ConsumerMail(消费者代码都类似)
public class ConsumerMail {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工程
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("192.168.126.10");
        cf.setPort(5672);
        cf.setUsername("admin");
        cf.setPassword("admin");
        cf.setVirtualHost("/");
        //2.创建连接
        Connection conn = cf.newConnection();
        //3.创建信道
        Channel channel = conn.createChannel();
        //4.监听队列
        channel.basicConsume("SEND_MAIL3",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("发送邮件消息 = " + msg);
            }
        });
    }
}
  • ConsumerMesage
public class ConsumerMessage {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工程
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("192.168.126.10");
        cf.setPort(5672);
        cf.setUsername("admin");
        cf.setPassword("admin");
        cf.setVirtualHost("/");
        //2.创建连接
        Connection conn = cf.newConnection();
        //3.创建信道
        Channel channel = conn.createChannel();
        //4.监听队列
        channel.basicConsume("SEND_MESSAGE3",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("发送短信消息 = " + msg);
            }
        });
    }
}
  • ConsumerStation
public class ConsumerStation {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工程
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("192.168.126.10");
        cf.setPort(5672);
        cf.setUsername("admin");
        cf.setPassword("admin");
        cf.setVirtualHost("/");
        //2.创建连接
        Connection conn = cf.newConnection();
        //3.创建信道
        Channel channel = conn.createChannel();
        //4.监听队列
        channel.basicConsume("SEND_STATION3",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("发送站內信 = " + msg);
            }
        });
    }
}

RabbitMQ一、RabbitMQ的介绍与安装(docker)
RabbitMQ三、springboot整合rabbitmq(消息可靠性、高级特性)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/671794.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ThinkPHP5发送邮件如何配置?有哪些技巧?

ThinkPHP5发送邮件的性能怎么优化&#xff1f;批量发信的方法&#xff1f; 邮件发送功能是许多应用程序的关键组成部分&#xff0c;尤其是在用户注册、密码重置和通知等功能中尤为重要。AokSend将详细介绍如何在thinkphp5中配置和使用邮件发送功能&#xff0c;并确保你可以轻松…

DPDK基础组件二(igb_uio、kni、rcu)

The Linux driver implementer’s API guide — The Linux Kernel documentation 一、igb_uid驱动 参考博客:https://zhuanlan.zhihu.com/p/543217445 UIO(Userspace I/O)是运行在用户空间的I/O技术 代码位置:dpdk----/kernel/linux/igb_uio目录 igb_uio 是 dpdk 内部实…

从0开发一个Chrome插件:搭建开发环境

前言 这是《从0开发一个Chrome插件》系列的第三篇文章&#xff0c;本系列教你如何从0去开发一个Chrome插件&#xff0c;每篇文章都会好好打磨&#xff0c;写清楚我在开发过程遇到的问题&#xff0c;还有开发经验和技巧。 《从0开发一个Chrome插件》专栏&#xff1a; 从0开发一…

文章解读与仿真程序复现思路——电力系统自动化EI\CSCD\北大核心《考虑动态定价的新能源汽车能源站优化运行》

本专栏栏目提供文章与程序复现思路&#xff0c;具体已有的论文与论文源程序可翻阅本博主免费的专栏栏目《论文与完整程序》 论文与完整源程序_电网论文源程序的博客-CSDN博客https://blog.csdn.net/liang674027206/category_12531414.html 电网论文源程序-CSDN博客电网论文源…

Linux网络-守护进程版字典翻译服务器

文章目录 前言一、pid_t setsid(void);二、守护进程翻译字典服务器&#xff08;守护线程版&#xff09;效果图 前言 根据上章所讲的后台进程组和session会话&#xff0c;我们知道如果可以将一个进程放入一个独立的session&#xff0c;可以一定程度上守护该进程。 一、pid_t se…

Vite项目构建chrome extension,实现多入口

本项目使用Vite5 Vue3进行构建。 要使用vite工程构建浏览器插件&#xff0c;无非就是要实现popup页面和options页面。这就需要在项目中用到多入口打包&#xff08;生成多个html文件&#xff09;。 实现思路&#xff1a; 通过配置vite工程&#xff0c;使得项目打包后有两个h…

项目3 构建移动电商服务器集群

项目引入 经过前期加班加点地忙碌&#xff0c;我们的网站顺利上线了&#xff01;年中促销活动也如约而至&#xff0c;虽然公司全体对这次活动进行多方面地准备和“布防”&#xff0c;可是意外还是发生了。就在促销优惠购物活动的当天&#xff0c;猛然增加的用户访问量直接导致浏…

SpringBoot-SchedulingConfigurer源码初识:理解定时任务抛异常终止本次调度,但不会影响下一次执行调度

SchedulingConfigurer源码初识&#xff1a;理解定时任务抛异常终止本次调度&#xff0c;但不会影响下一次执行调度 EnableSchedulingScheduledAnnotationBeanPostProcessor进入finishRegistration方法 ScheduledTaskRegistrar处理触发器任务&#xff08;TriggerTask&#xff09…

回溯算法之电话号码字母组合

题目&#xff1a; 给定一个仅包含数字 2-9 的字符串&#xff0c;返回所有它能表示的字母组合。答案可以按 任意顺序 返回。 给出数字到字母的映射如下&#xff08;与电话按键相同&#xff09;。注意 1 不对应任何字母。 示例 1&#xff1a; 输入&#xff1a;digits "2…

【python】多线程(3)queue队列之不同延时时长的参数调用问题

链接1&#xff1a;【python】多线程&#xff08;笔记&#xff09;&#xff08;1&#xff09; 链接2&#xff1a;【python】多线程&#xff08;笔记&#xff09;&#xff08;2&#xff09;Queue队列 0.问题描述 两个线程&#xff0c;但是不同延时时长&#xff0c;导致数据输出…

vue 引用第三方库 Swpier轮播图

本文全程干货&#xff0c;没有废话 1.使用 npm 安装 swiper&#xff0c;使用 save 保存到 packjson 中 npm install --save swiper 2、把 swiper看成是第三方库或者是组件&#xff0c;然后按照&#xff0c;引用&#xff0c;挂载组件&#xff0c;使用组件三步法。 3、在 script…

overleaf 写参考文献引用

目录 1、 新建.bib 文件 2、导入引用 3、在文档中引用参考文献 4、生成参考文献列表 1、 新建.bib 文件 在Overleaf项目中&#xff0c;你可以选择导入现有的 .bib 文件或在项目中创建一个新的 .bib 文件来管理你的参考文献。 导入.bib 文件&#xff1a; 在项目文件树中点击…

1985-2020 年阿拉斯加和育空地区按植物功能类型划分的模型表层覆盖率

ABoVE: Modeled Top Cover by Plant Functional Type over Alaska and Yukon, 1985-2020 1985-2020 年阿拉斯加和育空地区按植物功能类型划分的模型表层覆盖率 简介 文件修订日期&#xff1a;2022-05-31 数据集版本: 1.1 本数据集包含阿拉斯加和育空地区北极和北方地区按…

C语言| 输出菱形*

C语言| 输出*三角形-CSDN博客 输出菱形。 【分析思路】 学会输出*的三角形之后输出菱形就很简单了。我们分析一下&#xff0c;菱形是由两个对称的三角形组成的&#xff0c;也因为是对称的&#xff0c;所以输出的菱形的行数肯定是一个奇数。 1 我们在编程的时候&#xff0c;要…

网络空间安全数学基础·循环群、群的结构

3.1 循环群&#xff08;重要&#xff09; 3.2 剩余类群&#xff08;掌握&#xff09; 3.3 子群的陪集&#xff08;掌握&#xff09; 3.4 正规子群、商群&#xff08;重要&#xff09; 3.1 循环群 定义&#xff1a;如果一个群G里的元素都是某一个元素g的幂&#xff0c;则G称为…

【SpringBoot】四种读取 Spring Boot 项目中 jar 包中的 resources 目录下的文件

本文摘要&#xff1a;四种读取 Spring Boot 项目中 jar 包中的 resources 目录下的文件 &#x1f60e; 作者介绍&#xff1a;我是程序员洲洲&#xff0c;一个热爱写作的非著名程序员。CSDN全栈优质领域创作者、华为云博客社区云享专家、阿里云博客社区专家博主。公粽号&#xf…

python dlib 面部特征点检测

运行环境macos m2芯片 Python 3.11.7&#xff0c;python3.9都能通过&#xff0c;windows系统应该也是一样的效果 import dlib import cv2 import matplotlib.pyplot as plt# Load the image image_path path_to_your_image.jpg # Replace with the path to your image image…

React常见的一些坑

文章目录 两个基础知识1. react的更新问题, react更新会重新执行react函数组件方法本身,并且子组件也会一起更新2. useCallback和useMemo滥用useCallback和useMemo要解决什么3. react的state有个经典的闭包,导致拿不到最新数据的问题.常见于useEffect, useMemo, useCallback4. …

LLM——深入探索 ChatGPT在代码解释方面的应用研究

1.概述 OpenAI在自然语言处理&#xff08;NLP&#xff09;的征途上取得了令人瞩目的进展&#xff0c;这一切得益于大型语言模型&#xff08;LLM&#xff09;的诞生与成长。这些先进的模型不仅是技术创新的典范&#xff0c;更是驱动着如GitHub Copilot编程助手和Bing搜索引擎等广…

基于SpringBoot+Vue的公园管理系统的详细设计和实现(源码+lw+部署文档+讲解等)

&#x1f497;博主介绍&#xff1a;✌全网粉丝1W,CSDN作者、博客专家、全栈领域优质创作者&#xff0c;博客之星、平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌&#x1f497; &#x1f31f;文末获取源码数据库&#x1f31f; 感兴趣的可以先收藏起来&#xff0c;还…