netty基础_12.用 Netty 自己实现简单的RPC

用 Netty 自己实现简单的RPC

    • RPC 基本介绍
    • 我们的RPC 调用流程图
    • 己实现 Dubbo RPC(基于 Netty)
      • 需求说明
      • 设计说明
      • 代码
        • 封装的RPC
          • NettyServer
          • NettyServerHandler
          • NettyClientHandler
          • NettyClient
        • 接口
        • 服务端(provider)
          • HelloServiceImpl
          • ServerBootstrap
        • 客户端(消费者)
      • 调用过程
      • 效果

RPC 基本介绍

  1. RPC(Remote Procedure Call)—远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
  2. 两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样(如图)

在这里插入图片描述

过程:

  1. 调用者(Caller),调用远程API(Remote API)

  2. 调用远程API会通过一个RPC代理(RpcProxy)

  3. RPC代理再去调用RpcInvoker(这个是PRC的调用者)

  4. RpcInvoker通过RPC连接器(RpcConnector)

  5. RPC连接器用两台机器规定好的PRC协议(RpcProtocol)把数据进行编码

  6. 接着RPC连接器通过RpcChannel通道发送到对方的PRC接收器(RpcAcceptor)

  7. PRC接收器通过PRC协议进行解码拿到数据

  8. 然后将数据传给RpcProcessor

  9. RpcProcessor再传给RpcInvoker

  10. RpcInvoker调用Remote API

  11. 最后推给被调用者(Callee)

  12. 常见的 RPC 框架有:比较知名的如阿里的 DubboGooglegRPCGo 语言的 rpcxApachethriftSpring 旗下的 SpringCloud

我们的RPC 调用流程图

在这里插入图片描述

RPC 调用流程说明

  1. 服务消费方(client)以本地调用方式调用服务
  2. client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  3. client stub 将消息进行编码并发送到服务端
  4. server stub 收到消息后进行解码
  5. server stub 根据解码结果调用本地的服务
  6. 本地服务执行并将结果返回给 server stub
  7. server stub 将返回导入结果进行编码并发送至消费方
  8. client stub 接收到消息并进行解码
  9. 服务消费方(client)得到结果

小结:RPC 的目标就是将 2 - 8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用

己实现 Dubbo RPC(基于 Netty)

需求说明

  1. Dubbo 底层使用了 Netty 作为网络通讯框架,要求用 Netty 实现一个简单的 RPC 框架
  2. 模仿 Dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用 Netty 4.1.20

设计说明

  1. 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
  2. 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
  3. 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 请求提供者返回数据
  4. 开发的分析图

在这里插入图片描述

代码

封装的RPC

可以把这块代码理解成封装的dubbo

NettyServer
package com.atguigu.netty.dubborpc.netty;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class NettyServer {


    public static void startServer(String hostName, int port) {
        startServer0(hostName,port);
    }

    //编写一个方法,完成对NettyServer的初始化和启动

    private static void startServer0(String hostname, int port) {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                                      @Override
                                      protected void initChannel(SocketChannel ch) throws Exception {
                                          ChannelPipeline pipeline = ch.pipeline();
                                          pipeline.addLast(new StringDecoder());
                                          pipeline.addLast(new StringEncoder());
                                          pipeline.addLast(new NettyServerHandler()); //业务处理器

                                      }
                                  }

                    );

            ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();
            System.out.println("服务提供方开始提供服务~~");
            channelFuture.channel().closeFuture().sync();

        }catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

NettyServerHandler
package com.atguigu.netty.dubborpc.netty;


import com.atguigu.netty.dubborpc.customer.ClientBootstrap;
import com.atguigu.netty.dubborpc.provider.HelloServiceImpl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

//服务器这边handler比较简单
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("---服务端开始收到来自客户单的消息---");
        //获取客户端发送的消息,并调用服务
        System.out.println("原始消息:" + msg);

        /*
         1.客户端在调用服务器的api 时,我们需要定义一个协议,比如我们要求 每次发消息是都
         必须以某个字符串开头 "HelloService#hello#你好"
         2.Dubbo注册在Zookeeper里时,这种就是类的全路径字符串,你用IDEA的zookeeper插件
         就可以清楚地看到
         */
        if(msg.toString().startsWith(ClientBootstrap.providerName)) {

            String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
            ctx.writeAndFlush(result);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

NettyClientHandler
package com.atguigu.netty.dubborpc.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.Callable;

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    private ChannelHandlerContext context;//上下文
    private String result; //返回的结果
    private String para; //客户端调用方法时,传入的参数


    //与服务器的连接创建后,就会被调用, 这个方法是第一个被调用(1)
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" channelActive 被调用  ");
        context = ctx; //因为我们在其它方法会使用到 ctx
    }

    //收到服务器的数据后,调用方法 (4)
    //
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(" channelRead 被调用  ");
        result = msg.toString();
        notify(); //唤醒等待的线程
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    //被代理对象调用, 发送数据给服务器,-> wait -> 等待被唤醒(channelRead) -> 返回结果 (3)-》5
    @Override
    public synchronized Object call() throws Exception {
        System.out.println(" call1 被调用  ");
        context.writeAndFlush(para);
        //进行wait
        wait(); //等待channelRead 方法获取到服务器的结果后,唤醒
        System.out.println(" call2 被调用  ");
        return  result; //服务方返回的结果

    }
    //(2)
    void setPara(String para) {
        System.out.println(" setPara  ");
        this.para = para;
    }
}

NettyClient
package com.atguigu.netty.dubborpc.netty;


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.lang.reflect.Proxy;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NettyClient {

    //创建线程池
    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private static NettyClientHandler client;
    private int count = 0;

    //编写方法使用代理模式,获取一个代理对象

    public Object getBean(final Class<?> serivceClass, final String providerName) {

        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class<?>[]{serivceClass}, (proxy, method, args) -> {

                    System.out.println("(proxy, method, args) 进入...." + (++count) + " 次");
                    //{}  部分的代码,客户端每调用一次 hello, 就会进入到该代码
                    if (client == null) {
                        initClient();
                    }

                    //设置要发给服务器端的信息
                    //providerName:协议头,args[0]:就是客户端要发送给服务端的数据
                    client.setPara(providerName + args[0]);

                    //
                    return executor.submit(client).get();

                });
    }

    //初始化客户端
    private static void initClient() {
        client = new NettyClientHandler();
        //创建EventLoopGroup
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(
                        new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(new StringEncoder());
                                pipeline.addLast(client);
                            }
                        }
                );

        try {
            bootstrap.connect("127.0.0.1", 7000).sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

接口
package com.atguigu.netty.dubborpc.publicinterface;

//这个是接口,是服务提供方和 服务消费方都需要
public interface HelloService {

    String hello(String mes);
}

服务端(provider)
HelloServiceImpl
package com.atguigu.netty.dubborpc.provider;

import com.atguigu.netty.dubborpc.publicinterface.HelloService;

public class HelloServiceImpl implements HelloService{

    private static int count = 0;
    //当有消费方调用该方法时, 就返回一个结果
    @Override
    public String hello(String mes) {
        System.out.println("收到客户端消息=" + mes);
        System.out.println();
        //根据mes 返回不同的结果
        if(mes != null) {
            return "你好客户端, 我已经收到你的消息。消息为:[" + mes + "] ,第" + (++count) + " 次 \n";
        } else {
            return "你好客户端, 我已经收到你的消息 ";
        }
    }
}

ServerBootstrap
package com.atguigu.netty.dubborpc.provider;

import com.atguigu.netty.dubborpc.netty.NettyServer;

//ServerBootstrap 会启动一个服务提供者,就是 NettyServer
public class ServerBootstrap {
    public static void main(String[] args) {

        //代码代填..
        NettyServer.startServer("127.0.0.1", 7000);
    }
}

客户端(消费者)
package com.atguigu.netty.dubborpc.customer;

import com.atguigu.netty.dubborpc.netty.NettyClient;
import com.atguigu.netty.dubborpc.publicinterface.HelloService;

public class ClientBootstrap {


    //这里定义协议头
    public static final String providerName = "HelloService#hello#";

    public static void main(String[] args) throws  Exception{

        //创建一个消费者
        NettyClient customer = new NettyClient();

        //创建代理对象
        HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);

        for (;; ) {
            Thread.sleep(2 * 1000);
            //通过代理对象调用服务提供者的方法(服务)
            String res = service.hello("你好 dubbo~");
            System.out.println("调用的结果 res= " + res);
        }
    }
}

调用过程

  1. ClientBootstrap#main发起调用
  2. 走到下面这一行代码后
 HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);
  1. 调用NettyClient#getBean,在此方法里与服务端建立链接。

  2. 于是就执行NettyClientHandler#channelActive

  3. 接着回到NettyClient#getBean调用NettyClientHandler#setPara,调用完之后再回到NettyClient#getBean,用线程池提交任务

  4. 因为用线程池提交了任务,就准备执行NettyClientHandler#call线程任务

  5. NettyClientHandler#call中发送数据给服务提供者

    context.writeAndFlush(para);
    

    由于还没收到服务提供者的数据结果,所以wait住

  6. 来到了服务提供者这边,从Socket通道中收到了数据,所以执行NettyServerHandler#channelRead,然后因为此方法中执行了

    String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
    
  7. 就去HelloServiceImpl#hello中执行业务逻辑,返回数据给NettyServerHandler#channelReadNettyServerHandler#channelRead再把数据发给客户端

  8. NettyClientHandler#channelRead收到服务提供者发来的数据,唤醒之前wait的线程

  9. 所以之前wait的线程从NettyClientHandler#call苏醒,返回result给NettyClient#getBean

  10. NettyClient#getBeanget()到数据,ClientBootstrap#main中的此函数调用返回,得到服务端提供的数据。

     String res = service.hello("你好 dubbo~");
    

13.至此,一次RPC调用结束。

效果

ClientBootstrap打印

(proxy, method, args) 进入....1 次
 setPara  
 channelActive 被调用  
 call1 被调用  
 channelRead 被调用  
 call2 被调用  
调用的结果 res= 你好客户端, 我已经收到你的消息。消息为:[你好 dubbo~] ,第1(proxy, method, args) 进入....2 次
 setPara  
 call1 被调用  
 channelRead 被调用  
 call2 被调用  
调用的结果 res= 你好客户端, 我已经收到你的消息。消息为:[你好 dubbo~] ,第2(proxy, method, args) 进入....3 次
 setPara  
 call1 被调用  
 channelRead 被调用  
 call2 被调用  
调用的结果 res= 你好客户端, 我已经收到你的消息。消息为:[你好 dubbo~] ,第3(proxy, method, args) 进入....4 次
 setPara  
 call1 被调用  
 channelRead 被调用  
 call2 被调用  
调用的结果 res= 你好客户端, 我已经收到你的消息。消息为:[你好 dubbo~] ,第4(proxy, method, args) 进入....5 次
 setPara  
 call1 被调用  
 channelRead 被调用  
 call2 被调用  
调用的结果 res= 你好客户端, 我已经收到你的消息。消息为:[你好 dubbo~] ,第5

ServerBootstrap打印

服务提供方开始提供服务~~
---服务端开始收到来自客户单的消息---
原始消息:HelloService#hello#你好 dubbo~
收到客户端消息=你好 dubbo~

---服务端开始收到来自客户单的消息---
原始消息:HelloService#hello#你好 dubbo~
收到客户端消息=你好 dubbo~

---服务端开始收到来自客户单的消息---
原始消息:HelloService#hello#你好 dubbo~
收到客户端消息=你好 dubbo~

---服务端开始收到来自客户单的消息---
原始消息:HelloService#hello#你好 dubbo~
收到客户端消息=你好 dubbo~

---服务端开始收到来自客户单的消息---
原始消息:HelloService#hello#你好 dubbo~
收到客户端消息=你好 dubbo~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/472749.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

概率基础——逻辑回归多分类法

概率基础——逻辑回归多分类法 逻辑回归是一种经典的分类算法&#xff0c;通常用于解决二分类问题。然而&#xff0c;在实际应用中&#xff0c;我们经常会遇到多分类任务。本文将简单介绍逻辑回归的理论、多分类方法以及优缺点&#xff0c;并提供一个Python实现的示例。 逻辑…

【MySQL】图形化界面工具DataGrip安装&配置&使用

前言 大家好吖&#xff0c;欢迎来到 YY 滴MySQL系列 &#xff0c;热烈欢迎&#xff01; 本章主要内容面向接触过C Linux的老铁 主要内容含&#xff1a; 欢迎订阅 YY滴C专栏&#xff01;更多干货持续更新&#xff01;以下是传送门&#xff01; YY的《C》专栏YY的《C11》专栏YY的…

npm出现内部错误,重新设置镜像

问题&#xff1a; 报错解释&#xff1a; 这个错误表明你尝试从一个指定的npm镜像源的响应时失败了。可能的原因包括网络问题、镜像源不可用、DNS解析问题或者镜像源的确已经下线或更改。 1.重新设置镜像源 设置淘宝镜像源&#xff1a; npm config set registry https://re…

网络面试——http 和 https 的区别

区别&#xff1a; 1. HTTP 是超文本传输协议&#xff0c;信息是明文传输&#xff0c;HTTPS 是具有安全性的 SSL 加密传输协议。HTTPS 是由 SSL HTTP 协议构建的可进行加密传输、身份认证的网络协议&#xff0c;比 HTTP 协议安全 2. 端口号&#xff1a;http 使用 80 端口&#…

ros小问题之差速轮式机器人轮子不显示(rviz gazebo)

在rviz及gazebo练习差速轮式机器人时&#xff0c;很奇怪&#xff0c;只有个机器人的底板及底部的两个万向轮&#xff0c;如下图&#xff0c; 后来查看相关.xacro文件&#xff0c;里面是引用包含了轮子的xacro文件&#xff0c;只需传入不同的参数即可调用生成不同位置的轮子&…

代码学习第24天----回溯算法

随想录日记part24 t i m e &#xff1a; time&#xff1a; time&#xff1a; 2024.03.10 主要内容&#xff1a;回溯算法在代码学习中尤其重要&#xff0c;所以今天继续加深对其的理解&#xff1a;1&#xff1a;递增子序列 &#xff1b;2.全排列 &#xff1b;3.全排列II 491.递…

MySQL基础-----事务(下)

目录 前言 一、并发事务问题 1.赃读 2.不可重复读 3.幻读 二、事务隔离级别 1.相关操作 2.案例演示 前言 本期我们继续上一期事务的内容&#xff0c;本期的主要讲解的是并发事务的相关问题以及解决方式&#xff0c;内容可能会比较难去理解&#xff0c;不过我会尽量详细说…

C++ UML类图

参考文章&#xff1a; &#xff08;1&#xff09;C UML类图详解 &#xff08;2&#xff09;C基础——用C实例理解UML类图 &#xff08;3&#xff09;C设计模式——UML类图 &#xff08;4&#xff09;[UML] 类图介绍 —— 程序员&#xff08;灵魂画手&#xff09;必备画图技能之…

31.HarmonyOS App(JAVA)鸿蒙系统app Service服务的用法

鸿蒙系统app Service服务的用法 后台任务调度和管控 HarmonyOS将应用的资源使用生命周期划分为前台、后台和挂起三个阶段。前台运行不受资源调度的约束&#xff0c;后台会根据应用业务的具体任务情况进行资源使用管理&#xff0c;在挂起状态时&#xff0c;会对应用的资源使用进…

《C语言深度剖析》---------关键字(1)

1.双击实质--->加载内存 windows系统里面&#xff0c;双击的本质就是运行程序&#xff0c;把程序加载到内存里面&#xff1b; 任何程序运行的时候都必须加载到内存里面&#xff1b; 程序没有运行之前在硬盘里面&#xff0c;为什么程序运行之前必须加载到内存里面呢&#…

Spring Web MVC入门(5)

响应 在我们前面的代码例子中, 都已经设置了响应数据Http响应结果可以是数据, 也可以是静态页面, 也可以针对响应设置状态码, Header信息等. 返回静态页面 创建前端页面index.html(注意路径) html代码如下: <!DOCTYPE html> <html lang"en"> <hea…

Nutanix 国产化替代|一文了解 SmartX 超融合替代可行性与迁移方案

2022 年 8 月 19 日&#xff0c;Nutanix&#xff08;路坦力&#xff09;宣布中国市场自 2023 财年起将转型为合作伙伴销售主导模式&#xff0c;引起了广泛关注&#xff1b;同时结合当前 IT 基础架构的国产化趋势背景&#xff0c;不少正在使用和考虑使用 Nutanix 产品的企业开始…

Vue 中预加载组件

在 Vue 中&#xff0c;利用 VueRouter 可以轻松的实现两个组件&#xff08;页面&#xff09;之间的切换&#xff0c;有个常用的设计就是需要在登录页登录后跳转至一个内容页&#xff0c;通常的做法是在登录校验完成之后立即切换路由至内容页&#xff0c;接着内容页发送网络请求…

极简生活|2024年让自己越来越好的18个极简好习惯

哈喽&#xff0c;你好啊&#xff0c;我是雷工&#xff01; 转眼间已经进入了2024年&#xff0c;新的一年&#xff0c;新的开始。 俗话说&#xff1a;百尺高台起于垒土&#xff0c;千里之堤毁于蚁穴。 好习惯积累的越多&#xff0c;坏习惯越来越少&#xff0c;我们的生活才能越…

Iterator对象功能学习

package config;import java.util.Iterator; import java.util.Properties; import java.util.Set;/*** 这个类演示了如何使用Properties类来存储和访问键值对。* Properties类继承自Hashtable&#xff0c;因此它可以用来存储键值对数据&#xff0c;且支持同步。*/ public clas…

MySQL介绍

一、MySQL数据库介绍 1、发展史 1996年 MySQL1.0 2008年1月16日 Sun公司收购了 MySQL 2009年4月20日 Oracle收购了Sun公司 MySQL是一种开放源代码的关系型数据库管理系统 使用最常用的数据库管理语言 SQL&#xff08;结构化查询语言&#xff09; MySQL是开放源代码的 因此所有…

基于Springboot的员工健康管理系统(有报告)。Javaee项目,springboot项目。

演示视频&#xff1a; 基于Springboot的员工健康管理系统&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构…

【c++】c++基本语法知识-命名空间-输入输出-缺省参数

主页&#xff1a;醋溜马桶圈-CSDN博客 专栏&#xff1a;c_醋溜马桶圈的博客-CSDN博客 gitee&#xff1a;mnxcc (mnxcc) - Gitee.com 目录 1.命名空间 1.2 命名空间定义 1.3 命名空间使用 命名空间的三种使用方式 2.C输入&输出 std命名空间的使用惯例 3.缺省参数 3…

Win11初始化系统遇一文解决

这个是目录 一、设置内的初始化无法使用时&#xff0c;使用以下工具二、将桌面移动到D盘三、解决win11桌面右键创建只有一个带盾牌的文件夹问题四、win11 系统停止更新五、office安装1、使用的是 Office Tool plus2、使用WPS 六、D盘有感叹号七、打开组策略编辑器(gpedit.msc)失…

【Docker】-- 如何安装docker

一、安装docker 首先要安装一个yum工具 yum install -y yum-utils 安装成功后&#xff0c;执行命令&#xff0c;配置Docker的yum源&#xff1a; yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo 最后&#xff0c;执行命令&#x…