rabbitMQ介绍及使用方法

目录

一、MQ概述

二、RabbitMQ简介

三、RabbitMQ的五种工作模式

1、简单模式

2、work queues工作队列模式

3、Pub/Sub 订阅模式

4、Routing 路由模式

5、Topics 通配符模式


一、MQ概述

MQ全称Message Queue (消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统

之间进行通信。

总结:

MQ,消息队列,存储消息的中间件

分布式系统通信两种方式:直接远程调用和借助第三方完成间接通信

发送方称为生产者,接收方称为消费者

MQ的优点:

应用解耦:提高系统容错性和可维护性

异步提速:提升用户体验和系统吞吐量

削峰填谷:提高系统稳定性

MQ的缺点:

1、系统可用性降低

系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。

2、系统复杂度提高

MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调

用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?

3、一致性问题

A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败。如何保证消息数据处理的一致性?

使用MQ的条件

1、生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为

空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。

2、容许短暂的不一致性。

3、确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。

常见的MQ

二、RabbitMQ简介

AMQP,即Advanced Message Queuing Protocol (高级消息队列协议),是一个网络协议,是应用

层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消

息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP规范发布。

类比HTTP。

2007年,Rabbit 技术公司基于AMQP标准开发的RabbitMQ1.0发布。RabbitMQ采用Erlang语言开

发。Erlang 语言由Ericson设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广

泛。

RabbitMQ中的相关概念:

Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker

Virtual host:出于多租户和安全因素设计的,把 AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange / queue 等

Connection:publisher / consumer和broker之间的TCP连接

Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候TCPConnection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别 channel,所以 channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection 的开销。

Exchange:message到达 broker的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有: direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

Queue:消息最终被送到这里等待consumer取走

Binding:exchange和queue之间的虚拟连接,binding 中可以包含 routing key。Binding信息被保存到exchange 中的查询表中,用于message 的分发依据

RabbitMQ提供了6种工作模式:

简单模式、work queues、Publish/Subscribe发布与订阅模式、

Routing路由模式、Topics主题模式、RPC远程调用模式((远程调用,不太算MQ;暂不作介绍)。

官网对应模式介绍: https://www.rabbitmq.com


JMS

JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间

件的API

JMS是JavaEE规范中的一种,类比JDBC

很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ官方没有提供JMS的实现包,但是

开源社区有

三、RabbitMQ的五种工作模式

首先创建一个maven工程,里面创建两个模块(一个生产者,一个消费者)

2、添加依赖(生产者消费者都要依赖)

<dependencies>
        <!--rabbitmq java 客户端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>
    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

1、简单模式

P:生产者,也就是要发送消息的程序
C:消费者:消息的接收者,会一直等待消息到来
queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从甚中取出消息

1、创建生产者

public class HelloRabbitmq {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("192.168.138.129");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        //factory.setVirtualHost("/itcast");//虚拟机 默认值/
        factory.setUsername("admin");//用户名 默认 guest
        factory.setPassword("admin");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        参数:
            1. queue:队列名称
            2. durable:是否持久化,当mq重启之后,还在
            3. exclusive:
                * 是否独占。只能有一个消费者监听这队列
                * 当Connection关闭时,是否删除队列
                *
            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
            5. arguments:参数。

         */
        //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("hello_world",true,false,false,null);

        String body = "Hello rabbitmq....";
        /*
        * basicPublish(String exchange,String routingKey, BasicProperties props, byte[ ] body)参数:
        1. exchange:交换机名称。简单模式下交换机会使用黑认的“"
        *2. routingKey:路由名称,默认交换机和队列名一样就可以绑定
        3. props :配置信息
        4. body:发送消息数据
        * */
        //发送消息
        channel.basicPublish("","hello_world",null,body.getBytes());

        channel.close();
        connection.close();

    }
}

2、创建消费者

public class ConsumerHelloWorld {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("192.168.138.129");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        参数:
            1. queue:队列名称
            2. durable:是否持久化,当mq重启之后,还在
            3. exclusive:
                * 是否独占。只能有一个消费者监听这队列
                * 当Connection关闭时,是否删除队列
                *
            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
            5. arguments:参数。

         */
        //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("hello_world",true,false,false,null);
        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调对象
         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回调方法,当收到消息后,会自动执行该方法
                1. consumerTag:标识
                2. envelope:获取一些信息,交换机,路由key...
                3. properties:配置信息
                4. body:数据
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:" + consumerTag);
                System.out.println("Exchange:" + envelope.getExchange());
                System.out.println("RoutingKey:" + envelope.getRoutingKey());
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
            }
        };
        channel.basicConsume("hello_world",true,consumer);
        //关闭资源?不要
    }
}

2、work queues工作队列模式

Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个

队列中的消息。应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

1、创建生产者

public class ProducerWorkQueues {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("192.168.138.129");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        //factory.setVirtualHost("/itcast");//虚拟机 默认值/
        factory.setUsername("admin");//用户名 默认 guest
        factory.setPassword("admin");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        参数:
            1. queue:队列名称
            2. durable:是否持久化,当mq重启之后,还在
            3. exclusive:
                * 是否独占。只能有一个消费者监听这队列
                * 当Connection关闭时,是否删除队列
                *
            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
            5. arguments:参数。

         */
        //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("hello_workQueues",true,false,false,null);
        //创建消息
        for (int i = 1; i <= 10; i++) {
            String body = i+"hello rabbitmq~~~";

            //6. 发送消息
            channel.basicPublish("","hello_workQueues",null,body.getBytes());
        }
        /*
        * basicPublish(String exchange,String routingKey, BasicProperties props, byte[ ] body)参数:
        1. exchange:交换机名称。简单模式下交换机会使用黑认的“"
        2. routingKey:路由名称
        3. props :配置信息
        4. body:发送消息数据
        * */
        //7.释放资源
        channel.close();
        connection.close();

    }
}

2、创建消费者:这里要创建两个消费者才能看到效果

public class ConsumerWorkQueues1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("192.168.138.129");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        参数:
            1. queue:队列名称
            2. durable:是否持久化,当mq重启之后,还在
            3. exclusive:
                * 是否独占。只能有一个消费者监听这队列
                * 当Connection关闭时,是否删除队列
                *
            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
            5. arguments:参数。

         */
        //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("hello_workQueues",true,false,false,null);
        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调对象
         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回调方法,当收到消息后,会自动执行该方法
                1. consumerTag:标识
                2. envelope:获取一些信息,交换机,路由key...
                3. properties:配置信息
                4. body:数据
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body:" + new String(body));
            }
        };
        channel.basicConsume("hello_workQueues",true,consumer);
        //关闭资源?不要
    }
}

3、Pub/Sub 订阅模式

在订阅模型中,多了一个Exchange角色,而且过程略有变化:

P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)C:消费者,消息的接收者,会一直等待消息到来
Queue:消息队列,接收消息、缓存消息
Exchange:交换机 (X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
>Fanout:广播,将消息交给所有绑定到交换机的队列
>Direct:定向,把消息交给符合指定routing key 的队列
>Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

1、创建生产者

public class ProducerPubSub {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("192.168.138.129");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        //factory.setVirtualHost("/itcast");//虚拟机 默认值/
        factory.setUsername("admin");//用户名 默认 guest
        factory.setPassword("admin");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
       /*

       exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
       参数:
        1. exchange:交换机名称
        2. type:交换机类型
            DIRECT("direct"),:定向
            FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
            TOPIC("topic"),通配符的方式
            HEADERS("headers");参数匹配

        3. durable:是否持久化
        4. autoDelete:自动删除
        5. internal:内部使用。 一般false
        6. arguments:参数
        */

        String exchangeName = "test_fanout";
        //5. 创建交换机
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);

        //6、创建队列
        String queueName1 = "test_fanout_queue1";
        String queueName2 = "test_fanout_queue2";
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        //7. 绑定队列和交换机
        /*
        queueBind(String queue, String exchange, String routingKey)
        参数:
            1. queue:队列名称
            2. exchange:交换机名称
            3. routingKey:路由键,绑定规则
                如果交换机的类型为fanout ,routingKey设置为""
         */
        channel.queueBind(queueName1,exchangeName,"");
        channel.queueBind(queueName2,exchangeName,"");

        String body = "发送消息。。。。";
        //8. 发送消息
        channel.basicPublish(exchangeName,"",null,body.getBytes());

        //9. 释放资源
        channel.close();
        connection.close();

    }
}

2、创建消费者:这里要创建两个消费者才能看到效果

public class ConsumerPubSub1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("192.168.138.129");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();

        //队列名称
        String queueName1 = "test_fanout_queue1";
        String queueName2 = "test_fanout_queue2";
        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调对象
         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回调方法,当收到消息后,会自动执行该方法
                1. consumerTag:标识
                2. envelope:获取一些信息,交换机,路由key...
                3. properties:配置信息
                4. body:数据
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息打印到控制台.....");
            }
        };
        channel.basicConsume(queueName1,true,consumer);
    }
}

 执行消费者结果

4、Routing 路由模式

模式说明:

队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

消息的发送方在向Exchange 发送消息时,也必须指定消息的 RoutingKey
Exchange 不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

图解:

P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key
X: Exchange (交换机),接收生产者的消息,然后把消息递交给与routing key完全匹配的队列

C1:消费者,其所在队列指定了需要routing key 为error 的消息
C2:消费者,其所在队列指定了需要routing key为info、error、warning 的消息
 

1、创建生产者

public class ProducerRouting {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("192.168.138.129");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        //factory.setVirtualHost("/itcast");//虚拟机 默认值/
        factory.setUsername("admin");//用户名 默认 guest
        factory.setPassword("admin");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
       /*

       exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
       参数:
        1. exchange:交换机名称
        2. type:交换机类型
            DIRECT("direct"),:定向
            FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
            TOPIC("topic"),通配符的方式
            HEADERS("headers");参数匹配

        3. durable:是否持久化
        4. autoDelete:自动删除
        5. internal:内部使用。 一般false
        6. arguments:参数
        */

        String exchangeName = "test_direct";
        //5. 创建交换机
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
        //6. 创建队列
        String queueName1 = "test_direct_queue1";
        String queueName2 = "test_direct_queue2";

        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        //7. 绑定队列和交换机
        /*
        queueBind(String queue, String exchange, String routingKey)
        参数:
            1. queue:队列名称
            2. exchange:交换机名称
            3. routingKey:路由键,绑定规则
                如果交换机的类型为fanout ,routingKey设置为""
         */
        //队列1绑定 error
        channel.queueBind(queueName1,exchangeName,"error");
        //队列2绑定 info  error  warning
        channel.queueBind(queueName2,exchangeName,"info");
        channel.queueBind(queueName2,exchangeName,"error");
        channel.queueBind(queueName2,exchangeName,"warning");

        String body = "日志信息:张三调用了delete方法...出错误了。。。日志级别:error...";
        //8. 发送消息
        channel.basicPublish(exchangeName,"info",null,body.getBytes());

        //9. 释放资源
        channel.close();
        connection.close();

    }
}

 

 

2、创建消费者:这里要创建两个消费者才能看到效果

public class ConsumerRouting1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("192.168.138.129");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //队列名称
        String queueName1 = "test_direct_queue1";
        String queueName2 = "test_direct_queue2";
        
        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调对象
         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回调方法,当收到消息后,会自动执行该方法
                1. consumerTag:标识
                2. envelope:获取一些信息,交换机,路由key...
                3. properties:配置信息
                4. body:数据
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息打印到控制台.....");
            }
        };
        channel.basicConsume(queueName1,true,consumer);
        
        //关闭资源?不要
    }
}

 

5、Topics 通配符模式

Topic主题模式可以实现 Pub/Sub 发布与订阅模式和Routing 路由模式的功能,只是Topic在配置routing key的时候可以使用通配符,显得更加灵活。

1、创建生产者

public class ProducerTopics {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("192.168.138.129");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        //factory.setVirtualHost("/itcast");//虚拟机 默认值/
        factory.setUsername("admin");//用户名 默认 guest
        factory.setPassword("admin");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
       /*
       exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
       参数:
        1. exchange:交换机名称
        2. type:交换机类型
            DIRECT("direct"),:定向
            FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
            TOPIC("topic"),通配符的方式
            HEADERS("headers");参数匹配
        3. durable:是否持久化
        4. autoDelete:自动删除
        5. internal:内部使用。 一般false
        6. arguments:参数
        */
        String exchangeName = "test_topic";
        //5. 创建交换机
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
        //6. 创建队列
        String queueName1 = "test_topic_queue1";
        String queueName2 = "test_topic_queue2";
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        //7. 绑定队列和交换机
        /*
        queueBind(String queue, String exchange, String routingKey)
        参数:
            1. queue:队列名称
            2. exchange:交换机名称
            3. routingKey:路由键,绑定规则
                如果交换机的类型为fanout ,routingKey设置为""
         */
        // routing key  系统的名称.日志的级别。
        //=需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
        channel.queueBind(queueName1,exchangeName,"#.error");
        channel.queueBind(queueName1,exchangeName,"order.*");
        channel.queueBind(queueName2,exchangeName,"*.*");

        String body = "日志信息:张三调用了findAll方法...日志级别:info...";
        //8. 发送消息
        channel.basicPublish(exchangeName,"order.error",null,body.getBytes());

        //9. 释放资源
        channel.close();
        connection.close();
    }
}

2、创建消费者:这里要创建两个消费者才能看到效果


public class ConsumerTopic1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("192.168.138.129");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //队列名称
        String queueName1 = "test_topic_queue1";
        String queueName2 = "test_topic_queue2";
        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调对象
         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回调方法,当收到消息后,会自动执行该方法
                1. consumerTag:标识
                2. envelope:获取一些信息,交换机,路由key...
                3. properties:配置信息
                4. body:数据
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息存入数据库.......");
            }
        };
        channel.basicConsume(queueName1,true,consumer);
        //关闭资源?不要
    }
}

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/2560.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

电商项目后端框架SpringBoot、MybatisPlus

后端框架基础 1.代码自动生成工具 mybatis-plus &#xff08;1&#xff09;首先需要添加依赖文件 <dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.2.2</version></dependency><de…

【linux】进程信号——信号的产生

进程信号一、信号概念1.1 信号理解二、产生信号2.1 通过键盘产生信号2.2 捕捉信号自定义signal2.3 系统调用接口产生信号2.3.1 向任意进程发送任意信号kill2.3.2 给自己发送任意信号raise2.3.3 给自己发送指定信号abort2.3.4 理解2.4 硬件异常产生信号2.4.1 除0异常2.4.2 野指针…

蓝桥杯刷题冲刺 | 倒计时17天

作者&#xff1a;指针不指南吗 专栏&#xff1a;蓝桥杯倒计时冲刺 &#x1f43e;马上就要蓝桥杯了&#xff0c;最后的这几天尤为重要&#xff0c;不可懈怠哦&#x1f43e; 文章目录1.长草2.分考场1.长草 题目 链接&#xff1a; 长草 - 蓝桥云课 (lanqiao.cn) 题目描述 小明有一…

Feign远程调用

之前在一篇博客中写了利用RestTemplate发起远程调用的代码&#xff0c;但是存在一下问题&#xff1a;代码可读性差&#xff0c;编程体验不统一&#xff1b;如果参数特别多的话&#xff0c;参数复杂URL难以维护。Feign官方地址&#xff1a;https://github.com/OpenFeign/feignFe…

行业观察 | 来了解一下AI加速器

本文参考网上可查询到的资料简要总结AI加速器的概念和应用等信息 1。 未完待续 更新&#xff1a; 2023 / 3 / 22 行业观察 | 来了解一下AI加速器前言加速器处理器处理器是什么&#xff1f;处理器进化史加速器架构指令集 ISA特定领域的指令集 ISA超长指令字&#xff08;VLIW&a…

如何使用子项目管理方案?

在项目进行中经常发生这样的情况&#xff1a;当你开始为一个项目制定时间表时&#xff0c;你会发现任务的数量太多。你需要把它们全部分组到一些摘要任务中。但随后你看到一堆摘要任务&#xff0c;也想把它们再分组。 这样一来&#xff0c;该项目变得很麻烦&#xff0c;甚至项目…

Matlab进阶绘图第10期—带填充纹理的柱状图

带填充纹理的柱状图是通过在原始柱状图的基础上添加不同的纹理得到的&#xff0c;可以很好地解决由于颜色区分不足而导致的对象识别困难问题。 由于Matlab中未提供纹理填充选项&#xff0c;因此需要大家自行设法解决。 本文使用Kesh Ikuma制作的hatchfill2工具&#xff08;Ma…

gin框架使用websocket实现进入容器内部执行命令

文章目录1. 先决条件2. gin框架实现3. 测试用html文件4. 需要完善1. 先决条件 docker开放远程API端口 2. gin框架实现 type GetCommandResultRequire struct {IpAddr string json:"ip_addr" //传入要控制容器的ip地址ContainerUuid string json:"cont…

对堆题的总体思路

浅说一下pwn堆并用一个简单的例子具体说明给刚入坑堆的小朋友说的一些思路说一下堆是什么堆你可以看成一个结构体数组&#xff0c;然后数组里每个元素都会开辟一块内存来存储数据那么这块用来存储数据的内存就是堆。结构体数组在BSS段上&#xff0c;其内容就是堆的地址&#xf…

动态SQL必知必会

动态SQL必知必会1、什么是动态SQL2、为什么使用动态SQL3、动态SQL的标签4、if 标签-单标签判断5、choose标签-多条件分支判断6、set 标签-修改语句7、foreach标签7.1 批量查询7.2 批量删除7.3 批量添加8、模糊分页查询1、什么是动态SQL 动态 SQL 是 MyBatis 的强大特性之一。如…

阿里巴巴2017实习生笔试题(二)——总结

具体题目来自阿里巴巴2017实习生笔试题&#xff0c;本文仅为整理与汇总。 本题应该往C的多态性进行理解&#xff0c;多态中的动态链接在执行时进行&#xff0c;静态链接在编译时进行。其中A、C、D 都是动态链接的优点&#xff0c;B 时静态链接的优点。 减少页面交换可从如下角…

nginx-动静分离-防盗链-location-4

动静分离 为了加快网站的解析速度&#xff0c;可以把动态页面和静态页面有不同的服务器来解析&#xff0c;加快机械速度。降低原来单个服务器的压力。在动静分离的tomcat时候比较明显&#xff0c;因为tomcat解析静态很慢&#xff0c;其实这些原理的话很好理解&#xff0c;简单…

Baumer工业相机堡盟万兆网相机如何使用千兆网网卡环境保持帧率不变(C++)

项目场景 Baumer工业相机堡盟相机是一种高性能、高质量的工业相机&#xff0c;可用于各种应用场景&#xff0c;如物体检测、计数和识别、运动分析和图像处理。 Baumer的万兆网相机拥有出色的图像处理性能&#xff0c;可以实时传输高分辨率图像。此外&#xff0c;该相机还具…

IP、MAC和端口

IP&#xff0c;MAC和端口的概念MAC地址也叫物理地址、硬件地址&#xff0c;由网络设备厂家直接烧录在网卡上的&#xff0c;理论上Mac地址是唯一-的。 但因为Mac地址可以通过程序修改&#xff0c;所以也有可能会重复。IP地址是互联网上的每台设备都规定了-一个唯一的地址, 这个地…

网络安全之认识勒索病毒

一、什么是勒索病毒 勒索病毒&#xff0c;是一种新型电脑病毒&#xff0c;伴随数字货币兴起&#xff0c;主要以邮件、程序木马、网页挂马、服务器入侵、捆绑软件等多种形式进行传播&#xff0c;一旦感染将给用户带来无法估量的损失。如果遭受勒索病毒攻击&#xff0c;将会使绝…

如何用C语言实现渣男通讯录

注意&#xff1a;纯属玩笑&#xff0c;博大家一乐&#xff0c;切勿当真&#x1f4d6;首先我们要知道一个渣男通讯录有哪些信息要包含哪些功能1.你的通讯录要装多少个女朋友你得规定吧&#xff1b;2.每个女朋友的姓名&#xff0c;年龄&#xff0c;电话&#xff0c;爱好这些要有吧…

第29次CCFCSP认证经验总结

鄙人有幸参加了由中国计算机学会举办的第29次计算机软件能力认证考试&#xff0c;在此进行一些考试细节和经验的总结。 如果没有仔细了解过的小白去网上搜索CCFCSP&#xff0c;可能出现的是CSP-J/S&#xff0c;但是详细了解会发现&#xff0c;首先CSP-J/S分初试和复试&#xff…

.NET/C#/GC与内存管理(含深度解析)

详情请看参考文章&#xff1a;.NET面试题解析(06)-GC与内存管理 - 不灬赖 - 博客园 (cnblogs.com)一、对象创建及生命周期一个对象的生命周期简单概括就是&#xff1a;创建>使用>释放&#xff0c;在.NET中一个对象的生命周期&#xff1a;new创建对象并分配内存对象初始化…

【Linux】浅谈shell命令以及运行原理

前言&#xff1a;上篇博文把linux下的基本指令讲解完了。本期我们聊聊Linux下【shell】命令及其运行原理。 目录 Shell的基本概念与作用 原理图展示 shell命令执行原理 Shell的基本概念与作用 Linux严格意义上说的是一个操作系统&#xff0c;我们称之为“核心&#xff08;ker…

文心一言 VS ChatGPT,国产大模型和国外的差距有多大?

3月16号&#xff0c;百度正式发布了『文心一言』&#xff0c;这是国内公司第一次发布类ChatGPT的产品。大家一定非常好奇文心一言和chatgpt之间的差距有多大&#xff1f;国产大模型还有多少路可走&#xff1f;本文就全面测评这两款产品&#xff01; 目录 体验网址 1、旅游攻…