由浅入深Netty简易实现RPC框架

目录

    • 1 准备工作
    • 2 服务器 handler
    • 3 客户端代码第一版
    • 4 客户端 handler 第一版
    • 5 客户端代码 第二版
    • 6 客户端 handler 第二版


1 准备工作

在这里插入图片描述

这些代码可以认为是现成的,无需从头编写练习

为了简化起见,在原来聊天项目的基础上新增 Rpc 请求和响应消息

@Data
public abstract class Message implements Serializable {

    // 省略旧的代码

    public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
    public static final int  RPC_MESSAGE_TYPE_RESPONSE = 102;

    static {
        // ...
        messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
        messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
    }

}

请求消息

@Getter
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {

    /**
     * 调用的接口全限定名,服务端根据它找到实现
     */
    private String interfaceName;
    /**
     * 调用接口中的方法名
     */
    private String methodName;
    /**
     * 方法返回类型
     */
    private Class<?> returnType;
    /**
     * 方法参数类型数组
     */
    private Class[] parameterTypes;
    /**
     * 方法参数值数组
     */
    private Object[] parameterValue;

    public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
        super.setSequenceId(sequenceId);
        this.interfaceName = interfaceName;
        this.methodName = methodName;
        this.returnType = returnType;
        this.parameterTypes = parameterTypes;
        this.parameterValue = parameterValue;
    }

    @Override
    public int getMessageType() {
        return RPC_MESSAGE_TYPE_REQUEST;
    }
}

响应消息

@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {
    /**
     * 返回值
     */
    private Object returnValue;
    /**
     * 异常值
     */
    private Exception exceptionValue;

    @Override
    public int getMessageType() {
        return RPC_MESSAGE_TYPE_RESPONSE;
    }
}

服务器架子

@Slf4j
public class RpcServer {
    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        
        // rpc 请求消息处理器,待实现
        RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boss, worker);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProcotolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(MESSAGE_CODEC);
                    ch.pipeline().addLast(RPC_HANDLER);
                }
            });
            Channel channel = serverBootstrap.bind(8080).sync().channel();
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("server error", e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

客户端架子

public class RpcClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        
        // rpc 响应消息处理器,待实现
        RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(group);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProcotolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(MESSAGE_CODEC);
                    ch.pipeline().addLast(RPC_HANDLER);
                }
            });
            Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
            channel.closeFuture().sync();
        } catch (Exception e) {
            log.error("client error", e);
        } finally {
            group.shutdownGracefully();
        }
    }
}

服务器端的 service 获取

public class ServicesFactory {

    static Properties properties;
    static Map<Class<?>, Object> map = new ConcurrentHashMap<>();

    static {
        try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
            properties = new Properties();
            properties.load(in);
            Set<String> names = properties.stringPropertyNames();
            for (String name : names) {
                if (name.endsWith("Service")) {
                    Class<?> interfaceClass = Class.forName(name);
                    Class<?> instanceClass = Class.forName(properties.getProperty(name));
                    map.put(interfaceClass, instanceClass.newInstance());
                }
            }
        } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
            throw new ExceptionInInitializerError(e);
        }
    }

    public static <T> T getService(Class<T> interfaceClass) {
        return (T) map.get(interfaceClass);
    }
}

相关配置 application.properties

serializer.algorithm=Json
cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl

2 服务器 handler

@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {
        RpcResponseMessage response = new RpcResponseMessage();
        response.setSequenceId(message.getSequenceId());
        try {
            // 获取真正的实现对象
            HelloService service = (HelloService)
                    ServicesFactory.getService(Class.forName(message.getInterfaceName()));
            
            // 获取要调用的方法
            Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
            
            // 调用方法
            Object invoke = method.invoke(service, message.getParameterValue());
            // 调用成功
            response.setReturnValue(invoke);
        } catch (Exception e) {
            e.printStackTrace();
            // 调用异常
            response.setExceptionValue(e);
        }
        // 返回结果
        ctx.writeAndFlush(response);
    }
}

3 客户端代码第一版

只发消息

@Slf4j
public class RpcClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(group);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProcotolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(MESSAGE_CODEC);
                    ch.pipeline().addLast(RPC_HANDLER);
                }
            });
            Channel channel = bootstrap.connect("localhost", 8080).sync().channel();

            ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(
                    1,
                    "cn.itcast.server.service.HelloService",
                    "sayHello",
                    String.class,
                    new Class[]{String.class},
                    new Object[]{"张三"}
            )).addListener(promise -> {
                if (!promise.isSuccess()) {
                    Throwable cause = promise.cause();
                    log.error("error", cause);
                }
            });

            channel.closeFuture().sync();
        } catch (Exception e) {
            log.error("client error", e);
        } finally {
            group.shutdownGracefully();
        }
    }
}

4 客户端 handler 第一版

@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
        log.debug("{}", msg);
    }
}

5 客户端代码 第二版

包括 channel 管理,代理,接收结果

@Slf4j
public class RpcClientManager {


    public static void main(String[] args) {
        HelloService service = getProxyService(HelloService.class);
        System.out.println(service.sayHello("zhangsan"));
//        System.out.println(service.sayHello("lisi"));
//        System.out.println(service.sayHello("wangwu"));
    }

    // 创建代理类
    public static <T> T getProxyService(Class<T> serviceClass) {
        ClassLoader loader = serviceClass.getClassLoader();
        Class<?>[] interfaces = new Class[]{serviceClass};
        //                                                            sayHello  "张三"
        Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {
            // 1. 将方法调用转换为 消息对象
            int sequenceId = SequenceIdGenerator.nextId();
            RpcRequestMessage msg = new RpcRequestMessage(
                    sequenceId,
                    serviceClass.getName(),
                    method.getName(),
                    method.getReturnType(),
                    method.getParameterTypes(),
                    args
            );
            // 2. 将消息对象发送出去
            getChannel().writeAndFlush(msg);

            // 3. 准备一个空 Promise 对象,来接收结果             指定 promise 对象异步接收结果线程
            DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
            RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);

//            promise.addListener(future -> {
//                // 线程
//            });

            // 4. 等待 promise 结果
            promise.await();
            if(promise.isSuccess()) {
                // 调用正常
                return promise.getNow();
            } else {
                // 调用失败
                throw new RuntimeException(promise.cause());
            }
        });
        return (T) o;
    }

    private static Channel channel = null;
    private static final Object LOCK = new Object();

    // 获取唯一的 channel 对象
    public static Channel getChannel() {
        if (channel != null) {
            return channel;
        }
        synchronized (LOCK) { //  t2
            if (channel != null) { // t1
                return channel;
            }
            initChannel();
            return channel;
        }
    }

    // 初始化 channel 方法
    private static void initChannel() {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.group(group);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new ProcotolFrameDecoder());
                ch.pipeline().addLast(LOGGING_HANDLER);
                ch.pipeline().addLast(MESSAGE_CODEC);
                ch.pipeline().addLast(RPC_HANDLER);
            }
        });
        try {
            channel = bootstrap.connect("localhost", 8080).sync().channel();
            channel.closeFuture().addListener(future -> {
                group.shutdownGracefully();
            });
        } catch (Exception e) {
            log.error("client error", e);
        }
    }
}

6 客户端 handler 第二版

@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {

    //                       序号      用来接收结果的 promise 对象
    public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();

    @Override

    protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
        log.debug("{}", msg);
        // 拿到空的 promise
        Promise<Object> promise = PROMISES.remove(msg.getSequenceId());
        if (promise != null) {
            Object returnValue = msg.getReturnValue();
            Exception exceptionValue = msg.getExceptionValue();
            if(exceptionValue != null) {
                promise.setFailure(exceptionValue);
            } else {
                promise.setSuccess(returnValue);
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/21544.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

chatgpt赋能Python-pythonendswith

Python endswith方法&#xff1a;介绍、用法和示例 在编程中&#xff0c;经常需要查找字符串是否以特定字符结尾。Python提供了一个方便易用的方法——endswith()。 什么是Python endswith()方法&#xff1f; Python endswith()方法是用于检查字符串是否以特定子字符串结尾的…

【经验分享】一种高内聚低耦合的消息分发模块的设计与实现

【经验分享】一种高内聚低耦合的消息分发模块的设计与实现 又到了每天的open话题&#xff1a;【代码面对面】时刻&#xff0c;让我们一起在摸鱼中学习技术吧。今天的话题是嵌入式的消息分发模块&#xff0c;你会怎么设计和实现&#xff1f; 1 写在前面 老套路&#xff0c;我先…

GitHub Copilot:神一样的代码助手

我肝肯定&#xff0c;很多很多小伙伴还不了解 Copilot 是什么&#xff0c;尤其是初学计算机的小伙伴&#xff0c;我这里普及一下吧&#xff01; GitHub Copilot 是一个基于 AI 的代码自动完成工具&#xff0c;由 GitHub 和 OpenAI 共同开发。 GitHub 和 OpenAI 想必大家都很清楚…

从零制作操作系统——环境搭建以及HelloWorld

从零制作操作系统——环境搭建以及HelloWorld 起因 最近在学习操作系统&#xff0c;尝试自己照着书搓一个出来。 环境搭建 基础环境 我们的操作系统在x86平台的Linux下进行编写和运行。编辑器用的VIM。 我的系统是Fedora 36&#xff0c;当然你也可以使用Ubuntu或者其他Li…

IBM 创新方案+SNP数据转型助一汽大众实现数据平稳、高效迁移

近日&#xff0c;IBM 采用基于SNP Bluefield技术迁移的IBM Rapid Move创新数据迁移方案, 成功为一汽-大众实施了企业运营数据系统从 ECC 到 S/4 的升级项目。该项目系统切换耗时仅三天&#xff0c;不仅助客户高效、平稳迁移了系统数据&#xff0c;升级了数据底座&#xff0c;还…

SpringBoot项目打包部署到Nginx【无需配置Nginx】

0.前置知识 springboot打包的项目共分为jar和war两种类型 jar包 jar类型项目使用SpringBoot打包插件打包时&#xff0c;会在打成的jar中 内置一个tomcat 的jar 所以我们可以使用jdk直接运行&#xff0c;将功能代码放到其内置的tomcat中运行。 war包 在打包时需要将 内置的tom…

关于单目视觉 SLAM 的空间感知定位技术的讨论

尝试关于单目视觉 SLAM 的空间感知定位技术的学习&#xff0c;做以调查。SLAM算法最早在机器人领域中提出&#xff0c;视觉SLAM又可以分为单目、双目和深度相机三种传感器模式&#xff0c;在AR应用中通常使用轻便、价格低廉的单目相机设备。仅使用一个摄像头作为传感器完成同步…

prettier 使用详细介绍

prettier 使用详细介绍 prettier是一个代码格式化工具&#xff0c;可以通过自定义规则来重新规范项目中的代码&#xff0c;去掉原始的代码风格&#xff0c;确保团队的代码使用统一相同的格式。 安装 npm i prettier -Dyarn add prettier --dev创建一个prettierrc.*配置文件&…

六级备考28天|CET-6|听力第二讲|长对话满分技巧|听写技巧|2022年6月考题|14:30~16:00

目录 1. 听力策略 2. 第一二讲笔记 3. 听力原文复现 (5)第五小题 (6)第六小题 (7)第七小题 (8)第八小题 扩展业务 expand business 4. 重点词汇 1. 听力策略 2. 第一二讲笔记 3. 听力原文复现 (5)第五小题 our guest is Molly Sundas, a university stud…

learn C++ NO.5 ——类和对象(3)

日期类的实现 在前面类和对象的学习中&#xff0c;由于知识多比较多和碎&#xff0c;需要一个能够将之前所学知识融会贯通的东西。下面就通过实现日期类来对类和对象已经所学的知识进行巩固。 日期类的基本功能&#xff08;.h文件&#xff09; //Date.h//头文件内容 #includ…

【数据结构】广度优先遍历(BFS)模板及其讲解

&#x1f38a;专栏【数据结构】 &#x1f354;喜欢的诗句&#xff1a;更喜岷山千里雪 三军过后尽开颜。 &#x1f386;音乐分享【勋章】 大一同学小吉&#xff0c;欢迎并且感谢大家指出我的问题&#x1f970; 目录 &#x1f381;定义 &#x1f381;遍历方法 &#x1f381;根…

什么是边缘计算盒子?边缘计算盒子可以做什么?一文带你了解边缘计算云服务器 ECS

上文&#xff0c;我们已经为大家介绍了什么是边缘计算、边缘计算的诞生、以及边缘计算与CDN之间的关系&#xff0c;感兴趣的小伙伴欢迎阅读以往文章&#xff1a; 边缘计算节点是啥&#xff1f;边缘计算与CDN有什么关系&#xff1f;一文带你了解边缘计算节点BEC&#xff08;1&am…

nest笔记十一:一个完整的nestjs示例工程(nestjs_template)

概述 链接&#xff1a;nestjs_template 相关文章列表 nestjs系列笔记 示例工程说明 这个工程是我使用nestjs多个项目后&#xff0c;总结出来的模板。这是一个完整的工程&#xff0c;使用了yaml做为配置&#xff0c;使用了log4js和redis和typeorm&#xff0c;sawgger&#…

Elasticsearch 集群部署插件管理及副本分片概念介绍

Elasticsearch 集群配置版本均为8以上 安装前准备 CPU 2C 内存4G或更多 操作系统: Ubuntu20.04,Ubuntu18.04,Rocky8.X,Centos 7.X 操作系统盘50G 主机名设置规则为nodeX.qingtong.org 生产环境建议准备单独的数据磁盘主机名 #各自服务器配置自己的主机名 hostnamectl set-ho…

STM32实现基于RS485的简单的Modbus协议

背景 我这里用STM32实现&#xff0c;其实可以搬移到其他MCU&#xff0c;之前有项目使用STM32实现Modbus协议 这个场景比较正常&#xff0c;很多时候都能碰到 这里主要是Modbus和变频器通信 最常见的是使用Modbus实现传感器数据的采集&#xff0c;我记得之前用过一些传感器都…

学习RHCSA的day.03

目录 2.6 Linux系统的目录结构 2.7 目录操作命令 2.8 文件操作命令 2.6 Linux系统的目录结构 1、Linux目录结构的特点 分区加载于目录结构&#xff1a; 使用树形目录结构来组织和管理文件。整个系统只有一个位于根分区的一个根目录&#xff08;树根&#xff09;、一棵树。…

盘点 | 10大类企业管理系统有哪些

人类的发展史也是一部工具的进化史&#xff0c;企业管理手段同样不例外。移动互联网时代给了传统低下的手工操作方式致命一击&#xff0c;应运而生的各类企业管理系统工具为企业管理插上腾飞的翅膀&#xff0c;彻底颠覆了手动低效率的历史&#xff0c;变得更加移动化、智能化。…

求最小生成树(Prim算法与Kruskal算法与并查集)

目录 1、案例要求2、算法设计与实现2.1 Prim算法2.1.1 构造无向图2.1.2 编写Prim算法函数2.1.3 实现代码2.1.4 运行结果截图 2.2 Kruskal算法2.2.1 构造无向图2.2.2 编写并查集UnionFind类2.2.3 编写Kruskal算法2.2.4 实现代码2.2.5 运行结果截图 3、总结 1、案例要求 利用贪心…

低代码与其拓荒,不如颠覆开发行业

目录 一、前言 二、低代码是一个值得信赖的“黑盒子” 粗略总结&#xff0c;开发者对低代码平台所见即所得设计器有两种反应&#xff1a; 三、人人都爱黑盒子 四、用“低代码平台”来开发是什么样的感受&#xff1f; 五、结论 一、前言 在科幻电影中&#xff0c;我们看到…

【OpenCV】C++红绿灯轮廓识别+ROS话题实现

目录 前言 一、背景知识 Opencv轮廓检测 ROS相关知识 二、环境依赖 三、具体实现 Step1&#xff1a;初始化ROS&#xff0c;订阅话题 Step2&#xff1a;接收话题&#xff0c;进入回调 1. 帧处理 2. 膨胀腐蚀处理 Step3&#xff1a;红绿特征处理 1. 提取绘制轮廓 2…