RabbitMQ的6种工作模式

RabbitMQ的6种工作模式

官方文档:

http://www.rabbitmq.com/

https://www.rabbitmq.com/getstarted.html

RabbitMQ 常见的 6 种工作模式:
在这里插入图片描述

1、simple简单模式

在这里插入图片描述

1)、消息产生后将消息放入队列。

2)、消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除。

3)、存在的问题:消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失。

4)、应用场景:聊天(中间有一个过度的服务器)。

5)、代码实现:

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>rabbitmq-java</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.10.0</version>
        </dependency>
    </dependencies>

</project>

工具类

package com.example;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {

    // 连接rabbitmq服务,共享一个工厂对象
    private static ConnectionFactory factory;

    static {
        factory=new ConnectionFactory();
        //设置rabbitmq属性
        factory.setHost("127.0.0.1");
        factory.setUsername("zsx242030");
        factory.setPassword("zsx242030");
        factory.setVirtualHost("/");
        factory.setPort(5672);
    }
    public static Connection getConnection(){
        Connection connection=null;
        try {
            //获取连接对象
            connection = factory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return connection;
    }
}

消息提供者

package com.example.simple;

import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Provider {

    public static void main(String[] args) {
        try {
            //获取连接对象
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //通过通道创建队列,后续所有的操作都是基于channel实现(队列也可以由消费方创建)
            channel.queueDeclare("queue1", false, false, false, null);
            //向队列中发送消息
            channel.basicPublish("", "queue1", null, "Hello RabbitMQ!!!".getBytes());
            //断开连接
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者

package com.example.simple;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer {
    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //监听队列中的消息(消费的是队列,而不是交换机)
            channel.basicConsume("queue1", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
消费者获得消息为:Hello RabbitMQ!!!

2、work工作模式(资源的竞争)

在这里插入图片描述

1)、消息产生者将消息放入队列,消费者可以有多个,消费者1,消费者2,同时监听同一个队列。消息被消费,

C1 和 C2 共同争抢当前的消息队列内容,谁先拿到谁负责消费消息。

2)、存在的问题:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关

(synchronized,与同步锁的性能不一样),保证一条消息只能被一个消费者使用。

3)、应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到

消息队列中,空闲的系统自动争抢);对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

4)、代码实现:

消息提供者

package com.example.work;

import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Provider {

    public static void main(String[] args) {
        try {
            //获取连接对象
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //通过通道创建队列
            channel.queueDeclare("queue1", false, false, false, null);
            //向队列中发送消息
            for (int i = 1; i <= 10; i++) {
                channel.basicPublish("", "queue1", null, ("Hello RabbitMQ!!!" + i).getBytes());
            }
            //断开连接
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者1

package com.example.work;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {
    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //监听队列中的消息
            channel.basicConsume("queue1", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            // connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者2

package com.example.work;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {
    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //监听队列中的消息
            channel.basicConsume("queue1", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            // connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!9
消费者2获得消息为:Hello RabbitMQ!!!2
消费者2获得消息为:Hello RabbitMQ!!!4
消费者2获得消息为:Hello RabbitMQ!!!6
消费者2获得消息为:Hello RabbitMQ!!!8
消费者2获得消息为:Hello RabbitMQ!!!10

3、publish/subscribe发布订阅(共享资源)

在这里插入图片描述

1)、X代表交换机,rabbitMQ 内部组件,erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消

息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费。

Exchange 有常见以下 3 种类型:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。

  • Direct:定向,把消息交给符合指定 routing key 的队列。

  • Topic:通配符,把消息交给符合 routing pattern (路由模式)的队列。

Exchange (交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者

没有符合路由规则的队列,那么消息会丢失。

2)相关场景:邮件群发,群聊天,广播(广告)。

3)、代码实现:

消息提供者

package com.example.publishsubscribe;

import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

// 交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建
public class Provider {

    public static void main(String[] args) {
        try {
            //获取连接对象
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)
            // 1.参数一:交换机名称    参数二:交换机类型
            channel.exchangeDeclare("fanout_exchange", "fanout");
            //通过通道创建队列
            //channel.queueDeclare("queue1",false,false,false,null);
            //向队列中发送消息
            for (int i = 1; i <= 10; i++) {
                channel.basicPublish("fanout_exchange", "", null, ("Hello RabbitMQ!!!" + i).getBytes());
            }
            //断开连接
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者1

package com.example.publishsubscribe;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {

    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("fanout_queue1", false, false, false, null);
            //给队列绑定交换机
            channel.queueBind("fanout_queue1", "fanout_exchange", "");
            //监听队列中的消息
            channel.basicConsume("fanout_queue1", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            //connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者2

package com.example.publishsubscribe;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {

    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("fanout_queue2", false, false, false, null);
            //给队列绑定交换机
            channel.queueBind("fanout_queue2", "fanout_exchange", "");
            //监听队列中的消息
            channel.basicConsume("fanout_queue2", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            //connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10
消费者2获得消息为:Hello RabbitMQ!!!1
消费者2获得消息为:Hello RabbitMQ!!!2
消费者2获得消息为:Hello RabbitMQ!!!3
消费者2获得消息为:Hello RabbitMQ!!!4
消费者2获得消息为:Hello RabbitMQ!!!5
消费者2获得消息为:Hello RabbitMQ!!!6
消费者2获得消息为:Hello RabbitMQ!!!7
消费者2获得消息为:Hello RabbitMQ!!!8
消费者2获得消息为:Hello RabbitMQ!!!9
消费者2获得消息为:Hello RabbitMQ!!!10

4、routing路由模式

在这里插入图片描述

1)、消息生产者将消息发送给交换机按照路由判断,路由是字符串,当前产生的消息携带路由字符,交换机根据路

由的 key,只能匹配上路由 key 对应的消息队列,对应的消费者才能消费消息。队列与交换机的绑定,不能是任意

绑定了,而是要指定一个 RoutingKey (路由 key)。消息的发送方在向 Exchange 发送消息时,也必须指定消息的

RoutingKey 。Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列

的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息。

2)、根据业务功能定义路由字符串。

3)、从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。

4)、业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可

以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误。

5)、代码实现:

消息提供者

package com.example.souting;

import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

// 交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建

public class Provider {

    public static void main(String[] args) {
        try {
            //获取连接对象
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)
            // 1.参数一:交换机名称    参数二:交换机类型
            channel.exchangeDeclare("direct_exchange", "direct");
            //向队列中发送消息
            for (int i = 1; i <= 10; i++) {
                channel.basicPublish("direct_exchange",
                        //设置路由键,符合路由键的队列,才能拿到消息
                        "insert",
                        null,
                        ("Hello RabbitMQ!!!" + i).getBytes());
            }
            //断开连接
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者1

package com.example.souting;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {

    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("direct_queue1", false, false, false, null);
            //绑定交换机(routingKey:路由键)
            channel.queueBind("direct_queue1", "direct_exchange", "select");
            channel.queueBind("direct_queue1", "direct_exchange", "insert");
            //监听队列中的消息
            channel.basicConsume("direct_queue1", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            //connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者2

package com.example.souting;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {
    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("direct_queue2", false, false, false, null);
            //绑定交换机(routingKey:路由键)
            channel.queueBind("direct_queue2", "direct_exchange", "delete");
            channel.queueBind("direct_queue2", "direct_exchange", "select");
            //监听队列中的消息
            channel.basicConsume("direct_queue2", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            //connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10

5、topic 主题模式(路由模式的一种)

在这里插入图片描述

1)、Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型

Exchange 可以让队列在绑定 Routing key 的时候使用通配符。

2)、Routingkey 一般都是有一个或多个单词组成,多个单词之间以 . 分割,例如:item.insert。

通配符规则:

# :匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

item.# :能够匹配item.insert.abc或者item.insert

item.* :只能匹配 item.insert

usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到

#.news ,因此凡是以 .news 结尾的 routing key 都会被匹配

3)、路由功能添加模糊匹配。

4)、消息产生者产生消息,把消息交给交换机。

5)、交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费。

6)、代码实现:

消息提供者

package com.example.topic;


import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

//交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建

public class Provider {

    public static void main(String[] args) {
        try {
            //获取连接对象
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)   //1.参数一:交换机名称    参数二:交换机类型
            channel.exchangeDeclare("topic_exchange", "topic");
            //向队列中发送消息
            for (int i = 1; i <= 10; i++) {
                channel.basicPublish("topic_exchange",
                        // #:匹配0-n个单词(之间以.区分,两点之间算一个单词,可以匹配hello world空格的情况)   *(匹配一个单词)
                        "emp.hello world",
                        null,
                        ("Hello RabbitMQ!!!" + i).getBytes());
            }
            //断开连接
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者1

package com.example.topic;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {
    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("topic_queue1", false, false, false, null);
            //绑定交换机(routingKey:路由键)  #:匹配0-n个单词(之间以.区分,两点之间算一个单词)
            channel.queueBind("topic_queue1", "topic_exchange", "emp.#");
            //监听队列中的消息
            channel.basicConsume("topic_queue1", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            //connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者2

package com.example.topic;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {
    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("topic_queue2", false, false, false, null);
            //绑定交换机(routingKey:路由键)  *:匹配1个单词(之间以.区分,两点之间算一个单词)
            channel.queueBind("topic_queue2", "topic_exchange", "emp.*");
            //监听队列中的消息
            channel.basicConsume("topic_queue2", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            //connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10

6、RPC

在这里插入图片描述

RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:

1)、客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。

2)、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。

3)、服务端将RPC方法 的结果发送到RPC响应队列。

4)、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

5)、代码实现:

Client端

package com.example.rpc;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Client {

    public static void main(String[] argv) throws IOException, InterruptedException {
        String message = "Hello World!!!";
        // 建立一个连接和一个通道,并为回调声明一个唯一的回调队列
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 定义一个临时变量的接受队列名
        String replyQueueName = channel.queueDeclare().getQueue();
        // 生成一个唯一的字符串作为回调队列的编号
        String corrId = UUID.randomUUID().toString();
        // 发送请求消息,消息使用了两个属性:replyTo和correlationId
        // 服务端根据replyTo返回结果,客户端根据correlationId判断响应是不是给自己的
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName)
                .build();
        // 发布一个消息,rpc_queue路由规则
        channel.basicPublish("", "rpc_queue", props, message.getBytes("UTF-8"));
        // 由于我们的消费者交易处理是在单独的线程中进行的,因此我们需要在响应到达之前暂停主线程。
        // 这里我们创建的容量为1的阻塞队列ArrayBlockingQueue,因为我们只需要等待一个响应。
        final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
        // String basicConsume(String queue, boolean autoAck, Consumer callback)
        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                //检查它的correlationId是否是我们所要找的那个
                if (properties.getCorrelationId().equals(corrId)) {
                    //如果是,则响应BlockingQueue
                    response.offer(new String(body, "UTF-8"));
                }
            }
        });
        System.out.println(" 客户端请求的结果:" + response.take());
    }
}

Server端

package com.example.rpc;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Server {

    public static void main(String[] args) {
        Connection connection = null;
        try {
            connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare("rpc_queue", false, false, false, null);
            channel.basicQos(1);
            System.out.println("Awaiting RPC requests:");
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
                            .correlationId(properties.getCorrelationId()).build();
                    String response = "";
                    try {
                        response = new String(body, "UTF-8");
                        System.out.println("response (" + response + ")");
                    } catch (RuntimeException e) {
                        System.out.println("错误信息 " + e.toString());
                    } finally {
                        // 返回处理结果队列
                        channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                        // 确认消息,已经收到后面参数 multiple:是否批量.true:将一次性确认所有小于envelope.getDeliveryTag()的消息。
                        channel.basicAck(envelope.getDeliveryTag(), false);
                        // RabbitMq consumer worker thread notifies the RPC
                        // server owner thread
                        synchronized (this) {
                            this.notify();
                        }
                    }
                }
            };
            // 取消自动确认
            boolean autoAck = false;
            channel.basicConsume("rpc_queue", autoAck, consumer);
            // Wait and be prepared to consume the message from RPC client.
            while (true) {
                synchronized (consumer) {
                    try {
                        consumer.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
Awaiting RPC requests:
response (Hello World!!!)
response (Hello World!!!)
response (Hello World!!!)

# 客戶端发起3次请求
客户端请求的结果:Hello World!!!
客户端请求的结果:Hello World!!!
客户端请求的结果:Hello World!!!

7、发布订阅模式与工作队列模式的区别

1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。

2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使

用默认交换机)。

3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将

队列绑定到默认的交换机 。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/63299.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

rust-异步学习

rust获取future中的结果 两种主要的方法使用 async: async fn 和 async 块 async 体以及其他 future 类型是惰性的&#xff1a;除非它们运行起来&#xff0c;否则它们什么都不做。 运行 Future 最常见的方法是 .await 它。 当 .await 在 Future 上调用时&#xff0c;它会尝试把…

测试岗?从功能测试进阶自动化测试开发,测试之路不迷茫...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 测试新人在想什么…

GD32F103VET输出PWM波形

GD32F103VET将TIMER0_CH3映射到PE14引脚&#xff0c;使其输出PWM波形。测试时&#xff0c;使用示波器看PE14引脚输出的波形&#xff0c;效果更直观。 TIMER0之PWM输出引脚映射如下: TIMER0_REMAP[1:0]"00"(没有映射): TIMER0_CH0默认被映射到PA8引脚 TIMER0_CH1默认…

【51单片机】晨启科技,酷黑版,音乐播放器

四、音乐播放器 任务要求&#xff1a; 设计制作一个简易音乐播放器&#xff08;通过手柄板上的蜂鸣器发声&#xff0c;播放2到4首音乐&#xff09;&#xff0c;同时LED模块闪烁&#xff0c;给人视、听觉美的感受。 评分细则&#xff1a; 按下播放按键A6开始播放音乐&#xff0…

243. 一个简单的整数问题2(树状数组)

输入样例&#xff1a; 10 5 1 2 3 4 5 6 7 8 9 10 Q 4 4 Q 1 10 Q 2 4 C 3 6 3 Q 2 4输出样例&#xff1a; 4 55 9 15 解析&#xff1a; 一般树状数组都是单点修改、区间查询或者单点查询、区间修改。这道题都是区间操作。 1. 区间修改用数组数组维护差分数组 2. 区间查询&am…

Spring事务(声明式事务)(Spring的事务,Spring隔离级别,事务传播机制)

目录 一、什么是事务&#xff0c;为什么要用事务 二、Spring声明式事务 &#x1f345; 1、Transactional的使用 &#x1f388; 事务回滚 &#x1f388;注意&#xff1a;异常被捕获&#xff0c;不会发生事务回滚 &#x1f345; 2、Transactional 作⽤范围 &#x1f345; …

跨隔离网文件交换,IT部门和业务部门难以兼顾怎么办?

网络隔离技术作为有效的网络安全和数据安全的管理手段&#xff0c;现在已经被充分运用在企业网络建设中。但企业进行网络隔离是基于安全考虑&#xff0c;被隔离的网络间的数据交换需求不会因网络隔离而消失&#xff0c;因此&#xff0c;企业就需要进行隔离网间的数据和文件交换…

element表格+表单+表单验证结合运用

目录​​​​​​​ 一、结果展示 二、实现代码 一、结果展示 1、图片 2、描述 table中放form表单&#xff0c;放输入框或下拉框或多选框等&#xff1b; 点击添加按钮&#xff0c;首先验证表单&#xff0c;如果存在没填的就验证提醒&#xff0c;都填了就向下添加一行表单表…

力扣:54. 螺旋矩阵(Python3)

题目&#xff1a; 给你一个 m 行 n 列的矩阵 matrix &#xff0c;请按照 顺时针螺旋顺序 &#xff0c;返回矩阵中的所有元素。 来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 链接&#xff1a;力扣 示例&#xff1a; 示例 1&#xff1a; 输入&#xff1a;matrix [[1,…

过滤器和拦截器的六大区别

平时觉得简单的知识点&#xff0c;但通常都不会太关注细节&#xff0c;一旦被别人问起来&#xff0c;反倒说不出个所以然来。真的就是一看就会一说就废。下面带大家一起结合实践来区分过滤器和拦截器吧~ 通俗理解&#xff1a; &#xff08;1&#xff09;过滤器&#xff08;Fil…

vue-cli

vue-cli脚手架 案例一&#xff1a; 案例二&#xff1a; 案例三&#xff1a; ​ 一、脚手架简介 Vue脚手架是Vue官方提供的标准化开发工具&#xff08;开发平台&#xff09;&#xff0c;它提供命令行和UI界面&#xff0c;方便创建vue工程、配置第三方依赖、编译vue工程 1. …

2023年华数杯数学建模C题思路 - 母亲身心健康对婴儿成长的影响

# 1 赛题 C 题 母亲身心健康对婴儿成长的影响 母亲是婴儿生命中最重要的人之一&#xff0c;她不仅为婴儿提供营养物质和身体保护&#xff0c; 还为婴儿提供情感支持和安全感。母亲心理健康状态的不良状况&#xff0c;如抑郁、焦虑、 压力等&#xff0c;可能会对婴儿的认知、情…

Centos更换网卡名称为eth0

Centos更换网卡名称为eth0 已安装好系统后需要修改网卡名称为eth0 编辑配置文件将ens33信息替换为eth0,可在vim命令模式输入%s/ens33/eth0/g替换相关内容 修改内核文件,添加内容:net.ifnames=0 biosdevname=0 [root@nova3 ~]# vim /etc/default/grub 使用命令重新生成g…

VLE基于预训练文本和图像编码器的图像-文本多模态理解模型:支持视觉问答、图文匹配、图片分类、常识推理等

项目设计集合&#xff08;人工智能方向&#xff09;&#xff1a;助力新人快速实战掌握技能、自主完成项目设计升级&#xff0c;提升自身的硬实力&#xff08;不仅限NLP、知识图谱、计算机视觉等领域&#xff09;&#xff1a;汇总有意义的项目设计集合&#xff0c;助力新人快速实…

OBS视频视频人物实时扣图方法(四种方式)

图片擦除一些杂乱图像 参考&#xff1a;https://www.bilibili.com/video/BV1va411G7be https://github.com/Sanster/lama-cleaner第一种&#xff1a;色度键选项 第二种&#xff1a;浏览器建立窗口选项 参考视频&#xff1a;https://www.bilibili.com/video/BV1WS4y1C7QY http…

git报错:Error merging: refusing to merge unrelated histories

碰对了情人&#xff0c;相思一辈子。 打命令&#xff1a;git pull origin master --allow-unrelated-histories 然后等一会 再push 切记不要有冲突的代码 需要改掉~

Redis BigKey案例

面试题&#xff1a; 阿里广告平台&#xff0c;海量数据里查询某一固定前缀的key小红书&#xff0c;你如何生产上限制keys*/flushdb/flushall等危险命令以防止误删误用&#xff1f;美团&#xff0c;MEMORY USAGE命令你用过吗&#xff1f;BigKey问题&#xff0c;多大算big&#…

密码攻击与ADSelfService Plus的保护

密码攻击是当前网络安全面临的严峻挑战之一。黑客通过不断演进的技术手段&#xff0c;试图入侵用户账户&#xff0c;窃取敏感信息&#xff0c;从而对个人和组织造成严重损害。为了应对密码攻击的威胁&#xff0c;ManageEngine推出了ADSelfService Plus&#xff0c;这是一款功能…

Clion开发Stm32之存储模块(W25Q64)驱动编写

前言 涵盖之前文章: Clion开发STM32之HAL库SPI封装(基础库) W25Q64驱动 头文件 #ifndef F1XX_TEMPLATE_MODULE_W25Q64_H #define F1XX_TEMPLATE_MODULE_W25Q64_H#include "sys_core.h" /* Private typedef ---------------------------------------------------…

国联易安网页防篡改保护系统“渠道招募”启动啦!

作为业内专注于保密与非密领域的分级保护、等级保护、业务连续性安全和大数据安全的领军企业&#xff0c;国联易安网页防篡改保护系统基于“高效同步”、“安全传输”两项技术&#xff0c;具备了独特的“五重防护”新特性&#xff0c;支持网页的全自动发布、网页监控、报警和自…