RabbitMQ学习整理————基于RabbitMQ实现RPC

基于RabbitMQ实现RPC

  • 前言
  • 什么是RPC
  • RabbitMQ如何实现RPC
  • RPC简单示例
  • 通过Spring AMQP实现RPC

前言

这边参考了RabbitMQ的官网,想整理一篇关于RabbitMQ实现RPC调用的博客,打算把两种实现RPC调用的都整理一下,一个是使用官方提供的一个Java client,还有一个是Spring AMQP的整合使用。
代码路径:https://github.com/yzh19961031/blogDemo/tree/master/rabbitmq

什么是RPC

RPC是远程过程调用(Remote Procedure Call)的缩写形式,简单说就是一个节点去请求另一个节点上面的服务并获得响应结果。
我们之前总结的工作模式都是发送消息到指定的队列,再由相关的消费者进行消费,如果存在这样的场景,比如消费者消费完消息需给生产者一个具体的响应,然后生产者再根据这个响应进行其他的业务逻辑,这样就需要使用到RabbitMQ提供的RPC能力。

RabbitMQ如何实现RPC

官方有很详细的介绍文档,这边贴一下地址:https://www.rabbitmq.com/tutorials/tutorial-six-java.html
RabbitMQ实现RPC很简单,正常的流程就是请求以及响应,我们只需要在请求的消息的属性里面添加一个响应队列的地址,这边需要使用到一个BasicProperties这个类。具体配置如下:

// 指定一个回调队列
callbackQueueName = channel.queueDeclare().getQueue();
// 设置replyTo的属性为指定的回调队列
BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

BasicProperties这个类中提供了很多的属性,有14个,很多基本上很少用到,常用的就是几个,我这边也贴一下,其实在我上一篇文章中基于RabbitMQ实现的一个RPC工具里面都有用到这些属性。

  1. contentType 这个属性用来表明消息的类型,默认是"application/octet-stream"这种流的类型,还有常用的比如"application/json","text/plain"等,这些在我的RPC工具里面都有用到。
  2. replyTo 这个就是上面指定的回调队列。
  3. correlationId 这个id可以用来进行消息的确认,将相应与请求相关联。主要是可以确认服务端收到的消息是不是指定客户端发过来的,用于确认。

首先先贴一张官方提供的图,这个是RabbitMQ实现RPC的主要工作流程:
在这里插入图片描述
实现RPC的具体工作流程:

  1. 首先客户端发送一个请求消息,这个请求消息里面有两个属性,一个是replyTo回调队列的地址,一个是correlationId用于标识当前消息唯一的id信息。
  2. 这个消息是发送到指定的rpc_queue这个队列上面。
  3. 对应我们的服务端Server就会等待rpc_queue上面的请求消息,当请求消息来得时候,服务端会进行处理,处理完成会将相应的消息再发送到请求消息属性中的replyTo回调的队列上面。
  4. 客户端发送消息之后,会等待replyTo队列中的消息。当有消息来得时候,会检查响应消息中correlationId属性和请求消息中correlationId是否一致,完成一次PRC调用。

RPC简单示例

我这边根据官网上面提供的例子简单修改整理了一下,这边提供一个大小写转换的功能,就是客户端发送一段小写的字符串,服务端将字符串转为大写再响应过来。详细逻辑可以看下代码中注释,具体代码如下:
首先服务端:

/**
 * RPC服务端
 *
 * @author yuanzhihao
 * @since 2020/11/21
 */
public class RPCServer {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 首先还是正常获得connection以及channel对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.1.108");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        // 定义一个rpc的队列
        String queueName = "test_rpc";
        channel.queueDeclare(queueName, false, false, false, null);

        Object monitor = new Object();
        // 具体的消费代码里面实现
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            // 消费者将请求消息中的correlationId信息再作为响应传回replyTo队列
            AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                    .Builder()
                    .correlationId(delivery.getProperties().getCorrelationId())
                    .build();

            String response = "";
            try {
                // 提供一个大小写转换的方法
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                System.out.println("toUpperCase(" + message + ")");
                response = toUpperCase(message);
            } catch (RuntimeException e) {
                System.out.println(e.toString());
            } finally {
                // 将响应传回replyTo队列
                channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));
                // 设置了手动应答 需要手动确认消息
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                // 执行完成会释放主线程的锁
                // RabbitMq consumer worker thread notifies the RPC server owner thread
                synchronized (monitor) {
                    monitor.notify();
                }
            }
        };

        // 监听"test_rpc"队列
        channel.basicConsume(queueName, false, deliverCallback, (consumerTag -> { }));
        // 这个锁对象是确保我们server的调用逻辑执行完成 首先挂起主线程
        // Wait and be prepared to consume the message from RPC client.
        while (true) {
            synchronized (monitor) {
                try {
                    monitor.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    // 提供一个大小写转换的方法
    private static String toUpperCase(String msg) {
        return msg.toUpperCase();
    }
}

客户端:

/**
 * RPC客户端
 *
 * @author yuanzhihao
 * @since 2020/11/21
 */

public class RPCClient {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 创建connection以及channel对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.1.108");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        try ( Connection connection = connectionFactory.newConnection();
              Channel channel = connection.createChannel()) {
            // 声明一个队列
            String queueName = "test_rpc";

            // 请求消息中需要带一个唯一标识ID 
            String corrId = UUID.randomUUID().toString();
            // 声明一个回调队列
            String replayQueueName = channel.queueDeclare().getQueue();
            // 将correlationId以及回调队列设置在消息的属性中
            AMQP.BasicProperties properties = new AMQP.BasicProperties
                    .Builder()
                    .correlationId(corrId)
                    .replyTo(replayQueueName)
                    .build();
            // 具体消息内容
            String msg = "hello rpc";
            // 发送请求消息
            channel.basicPublish("",queueName,properties,msg.getBytes());
            // 设置一个阻塞队列  等待服务端的响应
            final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

            String ctag = channel.basicConsume(replayQueueName, true, (consumerTag, message) -> {
                // 注意 这边根据correlationId进行下判断
                if (message.getProperties().getCorrelationId().equals(corrId)) {
                    response.offer(new String(message.getBody(), StandardCharsets.UTF_8));
                }
            }, consumerTag -> {});

            // 获取响应结果
            String take = response.take();
            System.out.println("rpc result is "+ take);
            channel.basicCancel(ctag);
        }
    }
}

执行代码,具体的客户端与服务端运行结果在这里插入图片描述 在这里插入图片描述

通过Spring AMQP实现RPC

通过Spring来实现RPC也很简单,主要通过spring提供的一个RabbitTemplate对象中sendAndReceive方法来实现,这个方法是发送消息然后一直等待响应。监听器里面实现的和之前的逻辑大致相同,都需要将response响应消息发送到对应的replyTo回调队列上。下面直接贴一下代码。
首先是服务端,我这边直接是使用配置类的形式,具体一些的配置项可以参考下我之前的那篇博客或者上网搜一下~

/**
 * 主配置类
 *
 * @author yuanzhihao
 * @since 2021/1/9
 */
@Configuration
public class RabbitMQConfig {

    private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);

    // 注入connectionFactory对象
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("192.168.1.108:5672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    // 声明队列
    @Bean
    public Queue rpcQueue() {
        return new Queue("test_rpc",false);
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    // 创建初始化RabbitAdmin对象
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

    // 消息监听器
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(RabbitTemplate rabbitTemplate) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        // 监听的队列
        container.setQueues(rpcQueue());
        MessageListener messageListener = message -> {
            String receiveMsg = new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("Receive a message message is {}", receiveMsg);
            // 执行对应逻辑
            String responseMsg = toUpperCase(receiveMsg);
            MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().
                    setCorrelationId(message.getMessageProperties().getCorrelationId()).
                    build();
            // 响应消息 这边就是如果没有绑定交换机和队列的话 消息应该直接传到对应的队列上面
            rabbitTemplate.send("", message.getMessageProperties().getReplyTo(), new Message(responseMsg.getBytes(StandardCharsets.UTF_8), messageProperties));
        };
        // 设置监听器
        container.setMessageListener(messageListener);
        return container;
    }

    // 提供一个大小写转换的方法
    private String toUpperCase(String msg) {
        return msg.toUpperCase();
    }
}

客户端我采用test单元测试的形式

/**
 * spring amqp rpc 测试类
 *
 * @author yuanzhihao
 * @since 2021/1/9
 */
@ContextConfiguration(classes = {RabbitMQConfig.class})
@RunWith(SpringRunner.class)
public class RabbitMQRpcTest {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 测试RPC客户端
    @Test
    public void testRpcClient() {
        // 设置correlationId
        String corrId = UUID.randomUUID().toString();
        String msg = "hello rpc";
        MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setCorrelationId(corrId).build();
        // 注意 这边如果使用sendAndReceive不指定replyTo回调队列 spring会默认帮我们添加一个回调队列
        // 格式默认 "amq.rabbitmq.reply-to" 前缀
        Message message = rabbitTemplate.sendAndReceive("", "test_rpc", new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties));
        log.info("The response is {}", new String(message.getBody(), StandardCharsets.UTF_8));

    }
}

具体实现可以看下代码的注释
代码执行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/404375.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++【继承】

继承 继承的概念 继承是面向对象语言得三大特性之一&#xff08;继承、封装、多态&#xff09;&#xff0c;注&#xff1a;继承并不是C的特性&#xff0c;是面向对象语言的特性。 继承的概念&#xff1a; 继承(inheritance)机制是面向对象程序设计使代码可以复用的最重要的手段…

U盘故障频发?了解原因,掌握正确使用方法!

U盘文件夹变打不开的文件是一种常见的故障&#xff0c;表现为用户无法访问存储在U盘中的文件或文件夹。这种故障通常是由于各种原因引起的&#xff0c;包括物理损坏、文件系统错误、病毒感染等。当遇到这种情况时&#xff0c;用户需要采取一些方法来恢复文件。 U盘故障频发&…

CentOS使用Docker搭建Halo网站并实现无公网ip远程访问

&#x1f525;博客主页&#xff1a; 小羊失眠啦. &#x1f3a5;系列专栏&#xff1a;《C语言》 《数据结构》 《C》 《Linux》 《Cpolar》 ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&…

【前端素材】推荐优质后台管理系统Protable平台模板(附源码)

一、需求分析 后台管理系统是一种用于管理和监控网站、应用程序或系统的在线工具。它通常是通过网页界面进行访问和操作&#xff0c;用于管理网站内容、用户权限、数据分析等。当我们从多个层次来详细分析后台管理系统时&#xff0c;可以将其功能和定义进一步细分&#xff0c;…

微信小程序 --- wx.request网络请求封装

网络请求封装 网络请求模块难度较大&#xff0c;如果学习起来感觉吃力&#xff0c;可以直接学习 [请求封装-使用 npm 包发送请求] 以后的模块 01. 为什么要封装 wx.request 小程序大多数 API 都是异步 API&#xff0c;如 wx.request()&#xff0c;wx.login() 等。这类 API 接口…

crmeb多门店商城系统二次开发 增加车辆车牌搜索功能、车辆公里数

1、增加的数据库 ALTER TABLE eb_store_order ADD cart_number VARCHAR(255) NOT NULL DEFAULT COMMENT 车牌 AFTER erp_order_id, ADD curmileage VARCHAR(255) NOT NULL DEFAULT COMMENT 当前里程 AFTER cart_number; ALTER TABLE eb_store_cart ADD cart_number VARCHAR(…

go使用trpc案例

1.go下载trpc go install trpc.group/trpc-go/trpc-cmdline/trpclatest 有报错的话尝试配置一些代理&#xff08;选一个&#xff09; go env -w GOPROXYhttps://goproxy.cn,direct go env -w GOPROXYhttps://goproxy.io,direct go env -w GOPROXYhttps://goproxy.baidu.com/…

【机器学习的主要任务和应用领域】

曾梦想执剑走天涯&#xff0c;我是程序猿【AK】 目录 简述概要知识图谱 简述概要 了解机器学习的主要任务和应用领域 知识图谱 机器学习的主要任务可以分为监督学习、无监督学习和半监督学习。 监督学习&#xff1a;这是机器学习中最为常见的一类任务&#xff0c;基于已知类…

如何设置离线启动NVIDIA Chat with RTX

第一步 先上图C:\Users\test\AppData\Local\NVIDIA\ChatWithRTX\RAG\trt-llm-rag-windows-main\ui 默认安装的地址 用户名可能会有差异&#xff0c;需要注意下 增加如图中深色的地方 shareTrue, 第二步 科学上网 把下面的都下载下来 保存到自己的磁盘中 修改路径 如下图 如…

对Redis锁延期的一些讨论与思考

上一篇文章提到使用针对不同的业务场景如何合理使用Redis分布式锁&#xff0c;并引入了一个新的问题 若定义锁的过期时间是10s&#xff0c;此时A线程获取了锁然后执行业务代码&#xff0c;但是业务代码消耗时间花费了15s。这就会导致A线程还没有执行完业务代码&#xff0c;A线程…

2024年【起重机司机(限桥式起重机)】找解析及起重机司机(限桥式起重机)考试总结

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2024年【起重机司机(限桥式起重机)】找解析及起重机司机(限桥式起重机)考试总结&#xff0c;包含起重机司机(限桥式起重机)找解析答案和解析及起重机司机(限桥式起重机)考试总结练习。安全生产模拟考试一点通结合国家…

游戏平台如何定制开发?

随着科技的飞速发展和互联网的普及&#xff0c;游戏平台已成为人们休闲娱乐的重要选择。为了满足用户多样化的需求&#xff0c;游戏平台的定制开发显得尤为重要。本文将探讨游戏平台定制开发的过程、关键要素以及注意事项&#xff0c;为有志于涉足此领域的开发者提供参考。 一、…

springboot自定义starter,实现自动装配

1、SpringBoot Starter介绍 随着Spring的日渐臃肿&#xff0c;为了简化配置、开箱即用、快速集成&#xff0c;Spring Boot 横空出世。 目前已经成为 Java 目前最火热的框架了。平常我们用Spring Boot开发web应用。Spring mvc 默认使用tomcat servlet容器&#xff0c; 因为Sprin…

ChatGPT 4 教你完成学生表,教师表,课程表,选课表之间的SQL学习

数据源准备&#xff1a; # 学生表 create table student( sno varchar(10) primary key, #学号sname varchar(20), #姓名sage int(2), #年龄ssex varchar(5) #性别 ); #教师表 create table teacher( tno varchar(10) primary …

git切换仓库地址

已有git仓库&#xff0c;要切换提交的仓库地址&#xff0c;用以下命令 git remote set-url origin 自己的仓库地址 用以下命令&#xff0c;查看当前仓库地址&#xff1a; git remote show origin 切换仓库后&#xff0c;用以下命令初始化提交仓库&#xff1a; git push -u o…

ES6 | (一)ES6 新特性(上) | 尚硅谷Web前端ES6教程

文章目录 &#x1f4da;ES6新特性&#x1f4da;let关键字&#x1f4da;const关键字&#x1f4da;变量的解构赋值&#x1f4da;模板字符串&#x1f4da;简化对象写法&#x1f4da;箭头函数&#x1f4da;函数参数默认值设定&#x1f4da;rest参数&#x1f4da;spread扩展运算符&a…

Java-Arrays工具类的常见方法总结

在Java中&#xff0c;提供了一个专门用于操作数组的工具类&#xff0c;即Arrays类&#xff0c;该类提供了一些方法对数组进行排序&#xff0c;打印&#xff0c;复制等操作。下面是一些该工具类方法总结。 1.Arrays.aslist() 作用&#xff1a;当我们想直接将数组中的全部内容(…

【计算机网络】数据链路层--以太网/MTU/ARP/RARP协议

文章目录 一、以太网1.以太网帧格式2.MAC地址3.局域网的转发原理 二、MTU1.什么是MTU2.MTU对IP协议的影响3.MTU对UDP影响4.MTU对于TCP协议的影响 三、ARP协议1.ARP协议的作用2.ARP数据报的格式3.ARP协议的工作流程 一、以太网 “以太网” 不是一种具体的网络, 而是一种技术标准…

命令执行 [WUSTCTF2020]朴实无华1

做题&#xff1a; 打开题目 我们用dirsearch扫描一下看看 扫描到有robots.txt&#xff0c;访问一下看看 提示我们 /fAke_f1agggg.php 那就访问一下&#xff0c;不是真的flag bp抓包一下 得到提示&#xff0c; /fl4g.php&#xff0c;访问一下看看 按alt&#xff0c;点击修复文…