利用netty手写rpc框架

前言:利用netty异步事件驱动的网络通信模型,来实现rpc通信

一、大致目录结构:

二、两个端:服务端(发布),客户端(订阅消费),上代码:

1.服务端(发布):

RPCServer:

代码:

public class RpcServer {


    private Map<String, Object> registryMap = new HashMap<>();
    private List<String> classCache = new ArrayList<>();

    // 1.实现发布;实现方式:
    // 1.1查找指定目录下的所有接口,放入一个集合中
    // 1.2遍历所有接口,放入一个map中存放接口路径及接口名称
    // 1.3发送方法相关信息


    public void publish(String providerPackage) throws Exception {

        getProviderClass(providerPackage);
        doRegister();

        NioEventLoopGroup parentGroup = new NioEventLoopGroup();
        NioEventLoopGroup childGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(parentGroup, childGroup)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new ObjectEncoder());
                            pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                            pipeline.addLast(new RpcServerHandler(registryMap));
                        }
                    });

            ChannelFuture future = serverBootstrap.bind(9999).sync();
            System.out.println("服务端监听9999端口,启动成功。。。");
            future.channel().closeFuture().sync();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {

            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }

    private void doRegister() throws Exception {

        if (classCache.size() > 0) {
            for (String className :classCache){

                Class<?> clazz = Class.forName(className);
                String interfaceName = clazz.getInterfaces()[0].getName();
                registryMap.put(interfaceName, clazz.newInstance());
            }
        }


    }


    /**
     * 获取当前目录下的所有接口,汇总到一个集合中
     * @param providerPackage
     */
    private void getProviderClass(String providerPackage) {

        URL resource =         this.getClass().getClassLoader().getResource(providerPackage.replaceAll("\\.", "/"));

        File file = null;
        if (resource != null) {
            file = new File(resource.getFile());
        }

        if (file != null) {
            for (File f : Objects.requireNonNull(file.listFiles())) {
                if (f.isDirectory()) {
                    getProviderClass(providerPackage + "." + f.getName());
                } else if (f.getName().endsWith(".class")) {
                    String fileName = f.getName().replace(".class", "").trim();
                    classCache.add(providerPackage + "." + fileName);
                }
            }
        }
    }


}

服务端处理器:用于处理消费端发送过来的接口数据

代码:

public class RpcServerHandler extends ChannelInboundHandlerAdapter {

    private Map<String, Object> registryMap;

    public RpcServerHandler(Map<String, Object> registryMap) {
        this.registryMap = registryMap;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        System.out.println("rpc-server收到客户端消息:" + msg);

        if (msg instanceof InvokeMessage) {
            InvokeMessage method = (InvokeMessage) msg;
            Object result = "rpc-server端没有该方法";

            // 判断是否存在该方法
            if (registryMap.containsKey(method.getClassName())) {
                Object provider = registryMap.get(method.getClassName());
                result = provider.getClass()
                        .getMethod(method.getMethodName(), method.getParamTypes())
                        .invoke(provider, method.getParamValues());
            }
            // 把方法返回结果,发给订阅者
            ctx.channel().writeAndFlush(result);
            ctx.close();
        }

    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

接口的实现类:这里的接口是客户端的接口Api

代码:

public class SsenServiceApiImpl implements SsenServiceApi {

    @Override
    public String hellRpc(String name) {
        return name + "实现类方法";
    }
}

2.客户端:(订阅消费)

这里采用JDK原生的基于接口的动态代理f发

public class RpcProxy {


    // JDK基于接口的动态代理,用于创建代理对象
    public static <T> T create(Class<?> clazz) {
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(),
                new Class[]{clazz},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        if (Object.class.equals(method.getDeclaringClass())) {
                            return method.invoke(proxy, method, args);
                        }

                        // 通过netty将接口信息发送给提供者,获取指定方法
                        return RpcInvoke(clazz, method, args);
                    }
                });
    }

    private static Object RpcInvoke(Class<?> clazz, Method method, Object[] args) {
        NioEventLoopGroup eventGroup = new NioEventLoopGroup();

        RpcClientHandler rpcClientHandler = new RpcClientHandler();

        try {

            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new ObjectEncoder());
                            pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
                                    ClassResolvers.cacheDisabled(null)));
                            pipeline.addLast(rpcClientHandler);
                        }
                    });

            // 绑定指定服务地址
            ChannelFuture future = bootstrap.connect("localhost", 9999).sync();

            // 指定接口信息发送给提供者
            InvokeMessage invokeMessage = new InvokeMessage();
            invokeMessage.setClassName(clazz.getName());
            invokeMessage.setMethodName(method.getName());
            invokeMessage.setParamTypes(method.getParameterTypes());
            invokeMessage.setParamValues(args);
            future.channel().writeAndFlush(invokeMessage).sync();

            future.channel().closeFuture().sync();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            eventGroup.shutdownGracefully();
        }
        return rpcClientHandler.getResult();
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/404868.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

深入学习TS的高阶语法(泛型、类型检测、内置工具)

文章目录 概要一.TS的类型检测1.鸭子类型2.严格的字面量类型检测 二.TS的泛型1.基本使用2.传递多个参数3.泛型接口4.泛型类5.泛型约束6.映射类型&#xff08;了解&#xff09; 三.TS的知识扩展1.模块的使用-- 内置类型导入 2.类型的查找3.第三方库的类型导入4.declare 声明文件…

Javase-数组

文章目录 1.1 为什么要使用数组1.2 数组的定义及初始化1.3 数组的使用1.4 遍历数组1.5 数组在内存中的存储分析1.6 数组的传参1.7 数组的拷贝 1.1 为什么要使用数组 假设现在有一个任务,要你存储5个同学的学习成绩(double类型),这时候我们可以写出来 double score1 90.4…等五…

谷粒商城-nginx搭建域名访问环境性能压测

nginx搭建域名访问环境 正向代理与反向代理 正向代理&#xff1a;客户端向代理服务器发请求并指定目标服务器&#xff0c;代理向目标服务器转交请求并将获得的内容返回给客户端。 反向代理&#xff1a;用户直接访问反向代理服务器就可以获得目标服务器的资源。反向代理服务器…

Redis能保证数据不丢失吗?

引言 大家即使没用过Redis&#xff0c;也应该都听说过Redis的威名。 Redis是一种Nosql类型的数据存储&#xff0c;全称Remote Dictionary Server&#xff0c;也就是远程字典服务器&#xff0c;用过Dictionary的应该都知道它是一种键值对&#xff08;Key-Value&#xff09;的数…

基于机器学习、遥感和Penman-Monteith方程的农田蒸散发混合模型研究_刘燕_2022

基于机器学习、遥感和Penman-Monteith方程的农田蒸散发混合模型研究_刘燕_2022 摘要关键词 1 绪论2 数据与方法2.1 数据2.2 机器学习算法2.3 Penman-Monteith方程2.4 Medlyn公式2.5 模型性能评估 3 基于机器学习算法的混合模型估算农田蒸散量的评价与比较4 利用人工神经网络算法…

js使用import到本js文件中的函数时报错 Error [ERR_MODULE_NOT_FOUND]: Cannot find module

node:internal/process/esm_loader:97internalBinding(errors).triggerUncaughtException(^Error [ERR_MODULE_NOT_FOUND]: Cannot find module D:\桌面\Pagesizedetection\lib\screensize imported from D:\桌面\Pagesizedetection\index.js Did you mean to import ../lib/sc…

文件上传漏洞--Upload-labs--Pass09(在某些版本的靶场里是Pass10)--点+空格+点 绕过

一、什么是 点空格点 绕过 顾名思义&#xff0c;将 test.php 改为 test.php. . &#xff0c;观察到后缀名php后多出了 点空格点。那么 点空格点 是如何进行绕过的&#xff0c;在什么情况下可以使用&#xff0c;让我们结合题目讲解。 二、代码审计 1、查看题目源代码上半部分&…

Nginx 配置前端工程项目二级目录

前提&#xff1a; 前端工程技术框架: vue 后端工程技术工程&#xff1a;spring boot 需求&#xff1a;需要通过二级目录访问前端工程&#xff1a; 如之前&#xff1a;http://127.0.0.1:80/ 改成 http://127.0.0.1/secondDirectory:80/ 一.前端工程支持二级目录 1.编译文…

CrossOver虚拟机软件2024有哪些功能?最新版本支持哪些游戏?

CrossOver由codewaver公司开发的类虚拟机软件&#xff0c;目的是使linux和Mac OS X操作系统和window系统兼容。CrossOver不像Parallels或VMware的模拟器&#xff0c;而是实实在在Mac OS X系统上运行的一个软件。CrossOvers能够直接在Mac上运行Windows软件与游戏&#xff0c;而不…

链表之“无头单向非循环链表”

目录 ​编辑 1.顺序表的问题及思考 2.链表 2.1链表的概念及结构 2.2无头单向非循环链表的实现 1.创建结构体 2.单链表打印 3.动态申请一个节点 3.单链表尾插 4.单链表头插 5.单链表尾删 6.单链表头删 7.单链表查找 8.单链表在pos位置之前插入x 9.单链表删除pos位…

力扣645. 错误的集合(排序,哈希表)

Problem: 645. 错误的集合 文章目录 题目描述思路复杂度Code 题目描述 思路 1.排序 1.对nums数组按从小到大的顺序排序; 2.遍历数组时若判断两个相邻的元素则找到重复元素&#xff1b; 3.记录一个整形变量prev一次置换当前位置元素并与其作差&#xff0c;若差等于2着说明缺失的…

一篇文章搞懂CDN加速原理

目录 一、什么是CDN CDN对网络的优化作用主要体现在以下几个方面&#xff1a; 二、CDN工作原理 CDN网络的组成元素&#xff1a; 三、名词解释 3.1 CNAME记录&#xff08;CNAME record&#xff09; 3.2 CNAME域名 3.3 DNS 3.4 回源host 3.5 协议回源 一、什么是CDN CD…

linux增加物理磁盘并挂载到文件系统

centos7增加物理磁盘并挂载到文件系统 1、查看所有磁盘情况 fdisk -l2、创建挂载路径 mkdir /data3、格式化磁盘 #磁盘filesystem(上图标红处) mkfs.xfs -f /dev/sda建议 与其它磁盘文件系统保持一致&#xff0c;我这里是xfs 可通过 cat /dev/sda查看 4、挂载 mount /dev/…

今日必读的9篇大模型论文

1.Customize-A-Video&#xff1a;文生视频&#xff0c;可以自由定制了 图像定制在文本到图像&#xff08;T2I&#xff09;扩散模型中已经得到了广泛的研究&#xff0c;并取得了令人印象深刻的成果和应用。随着文本到视频&#xff08;T2V&#xff09;扩散模型的兴起&#xff0c…

从零开始手写mmo游戏从框架到爆炸(二十一)— 战斗系统二

导航&#xff1a;从零开始手写mmo游戏从框架到爆炸&#xff08;零&#xff09;—— 导航-CSDN博客 上一章&#xff08;从零开始手写mmo游戏从框架到爆炸&#xff08;二十&#xff09;— 战斗系统一-CSDN博客&#xff09;我们只是完成了基本的战斗&#xff0c;速度属性并没有…

前端数据可视化:ECharts使用

可视化介绍 ​  ​  应对现在数据可视化的趋势&#xff0c;越来越多企业需要在很多场景(营销数据&#xff0c;生产数据&#xff0c;用户数据)下使用&#xff0c;可视化图表来展示体现数据&#xff0c;让数据更加直观&#xff0c;数据特点更加突出。   ​  数据可视化主要目…

读取7400MB/s!华为发布eKitStor Xtreme M.2闪存条

今日&#xff0c;华为举行数据存储新春新品发布会&#xff0c;不仅发布全新数据湖解决方案&#xff0c;华为还针对商业市场与分销市场发布了全闪存存储新品。 据介绍&#xff0c;面向游戏加速、影视编辑、户外作业等场景&#xff0c;华为发布eKitStor Xtreme系列高性能M.2闪存条…

Leetcode3035. 回文字符串的最大数量

Every day a Leetcode 题目来源&#xff1a;3035. 回文字符串的最大数量 解法1&#xff1a;哈希 排序 由于可以随意交换字母&#xff0c;先把所有字母都取出来&#xff0c;然后考虑如何填入各个字符串。 如果一个奇数长度字符串最终是回文串&#xff0c;那么它正中间的那…

一文读懂Linux内核中的Device mapper映射机制

一、 简介 本文总结Device mapper的映射机制。Device mapper是Linux2.6内核中提供的一种逻辑设备到物理设备的映射框架机制&#xff0c;在该机制下&#xff0c;用户可以很方便的根据自己的需要指定实现存储资源的管理策略&#xff0c;当前比较流行的Linux的逻辑卷管理器比如&a…

轻松打造智能化性能测试监控平台:【JMeter+Grafana+Influxdb】的优化整合方案

在当前激烈的市场竞争中&#xff0c;创新和效率成为企业发展的核心要素之一。在这种背景下&#xff0c;如何保证产品和服务的稳定性、可靠性以及高效性就显得尤为重要。 而在软件开发过程中&#xff0c;性能测试是一项不可或缺的环节&#xff0c;它可以有效的评估一个系统、应…