手动实现简易版RPC(下)
前言
什么是RPC?它的原理是什么?它有什么特点?如果让你实现一个RPC框架,你会如何是实现?带着这些问题,开始今天的学习。
接上一篇博客 手动实现简易版RPC(上) 我们得到了最简易RPC框架的架构图与运行图,本文主要介绍简易版RPC 的简易实现。
架构
最简易的 RPC 框架架构图:
整个简易RPC的调用过程图
下面我们白手起家,从0实现简易版的RPC框架
项目准备
1、创建项目
优先创建一个名为ape-rpc的空项目,然后使用idea 依次创建几个maven模块
整个项目包结构也如上图所示,分别介绍一下上述的包结构
- example-common 整个项目中所用到的公共类,例如一些实体类,公共接口等
- example-producer 示例服务生产者代码包
- example-consumer 示例服务消费者代码包
- ape-rpc-easy 简易RPC框架实现代码包
在示例项目中,我们将以一个最简单的猫咪服务为例,演示整个服务调用过程。下面我们依次实现上述的几个模块。
2、common模块代码实现
公共模块需要同时被服务消费者
和服务提供者
引入,主要是编写和服务相关的接口和数据实体
common模块结构如下图
2.1 引入lombok 简化开发
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
2.2 编写🐱实体
package com.jerry.common.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.io.Serializable;
/**
* @version 1.0
* @Author jerryLau
* @Date 2024/4/9 15:33
* @注释 实体 🐱
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Cat implements Serializable {
private int id;
/***
* 名字
*/
private String name;
/***
* 颜色
*/
private String color;
}
注意,对象需要实现序列化接口,为后续网络传输序列化提供支持
2.3编写猫咪服务接口
编写猫咪服务接口CatService,提供两个获取猫咪的方法
package com.jerry.common.service;
import com.jerry.common.model.Cat;
/**
* @version 1.0
* @Author jerryLau
* @Date 2024/4/9 15:38
* @注释 🐱服务接口
*/
public interface CatService {
/***
* 获取猫咪信息
* @param cat
* @return
*/
Cat getCat(Cat cat);
/***
* 按照id获取猫咪信息
* @param id
* @return
*/
Cat getCatById(int id);
//....other
}
3、服务生产者(producer)
作为服务生产者,是真正实现服务接口的模块,需要调用简单RPC模块
3.1添加pom依赖
<dependencies>
<!--common-->
<dependency>
<groupId>com.jerry</groupId>
<artifactId>example-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!--rpc-easy-->
<dependency>
<groupId>com.jerry</groupId>
<artifactId>ape-rpc-easy</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!--hutool工具包-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.16</version>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
</dependencies>
3.2 编写服务实现类
编写代码实现类,实现公共模块中定义的🐱服务接口
功能是打印🐱的信息
package com.jerry.producer.serviceImpl;
import com.jerry.common.model.Cat;
import com.jerry.common.service.CatService;
/**
* @version 1.0
* @Author jerryLau
* @Date 2024/4/9 15:45
* @注释 数据提供者实现
*/
public class CatServiceImpl implements CatService {
@Override
public Cat getCat(Cat cat) {
System.out.println("调用到了服务提供者:" + cat.toString());
return cat;
}
@Override
public Cat getCatById(int id) {
Cat tom = new Cat(id, "TOM", "#FFFFFF");
System.out.println("调用到了服务提供者:" + tom.toString());
return tom;
}
}
3.3实现生产者主类
编写服务提供者启动类EasyProducer ,之后会在该类的 main 方法中编写提供服务的代码。
package com.jerry.producer;
import com.jerry.aperpc.localregcenter.LocalRegCenter;
import com.jerry.aperpc.server.VertxHttpServer;
import com.jerry.common.service.CatService;
import com.jerry.producer.serviceImpl.CatServiceImpl;
/**
* @version 1.0
* @Author jerryLau
* @Date 2024/4/9 15:47
* @注释 简单的生产者
*/
public class EasyProducer {
public static void main(String[] args) {
//提供服务....
}
}
完成后代码生产者的结构大概如下图所示
4、服务消费者(Consumer)
服务消费者需要调用简单rpc实现模块
4.1pom依赖配置
<dependencies>
<dependency>
<groupId>com.jerry</groupId>
<artifactId>example-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.jerry</groupId>
<artifactId>ape-rpc-easy</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.16</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
</dependencies>
4.2创建服务消费者启动类
创建服务消费者启动类EasyConsumer,编写调用接口的代码
package com.jerry.consumer;
import com.jerry.aperpc.proxy.ServiceProxyFactory;
import com.jerry.common.model.Cat;
import com.jerry.common.service.CatService;
/**
* @version 1.0
* @Author jerryLau
* @Date 2024/4/9 17:14
* @注释
*/
public class EasyConsumer {
public static void main(String[] args) {
//todo: 需要获取 CatService 的实现类对象
CatService catService = null;
//调用
Cat newCat = catService.getCatById(1);
if (newCat != null) {
System.out.println("消费者获取到的 cat :" + newCat.toString());
} else {
System.out.println("no cat");
}
}
}
值得注意的是:现在是无法获取到 CatService实例的,所以预留为 null。我们之后的目标是,能够通过 RPC 框架快速得到一个支持远程调用服务提供者的代理对象,像调用本地方法一样调用 CatService的方法。
5、简单rpc业务实现
5.1 web服务器
我们要先让服务提供者提供可远程访问
的服务。那么,就需要一个 web 服务器,能够接受处理请求、并返回响应。
web 服务器的选择有很多,比如 Spring Boot 内嵌的 Tomcat、NIO 框架 Netty 和 Vert.x等等
此处我们使用高性能的 NIO 框架 Vert.x 来作为 RPC 框架的 web 服务器。
想了解更多,请参考 Vert.x官方文档🌐 || Vert.x官方文档中文版🌐
5.1.1 rpc-easy引入pom依赖
<dependencies>
<!-- 高性能的 NIO 框架 Vert.x 来作为 RPC 框架的 web 服务器。-->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>4.5.1</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.16</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
</dependencies>
5.1.2编写httpServer做web服务器接口
编写一个 web 服务器的接口 HttpServer,定义统一的启动服务器方法,便于后续的扩展,比如实现多种不同的
web 服务器。
package com.jerry.aperpc.server;
/**
* @version 1.0
* @Author jerryLau
* @Date 2024/4/9 16:03
* @注释
*/
public interface HttpServer {
/***
* 启动器
* @param port
*/
void exec(int port);
}
5.1.3基于vert.x实现请求处理
编写基于 Vert.x 实现的 web 服务器 VertxHttpServer,能够监听指定端口并处理请求。
package com.jerry.aperpc.server;
import io.vertx.core.Vertx;
/**
* @version 1.0
* @Author jerryLau
* @Date 2024/4/9 16:06
* @注释
*/
public class VertxHttpServer implements HttpServer {
@Override
public void exec(int port) {
//1-创建Vertx 实例
Vertx vertx = Vertx.vertx();
//2-创建http服务器
io.vertx.core.http.HttpServer httpServer = vertx.createHttpServer();
//3-监听端口并处理请求
httpServer.requestHandler(httpServerRequest -> {
//处理里HTTP 请求
System.out.println("Received request :"
+ httpServerRequest.method() + " " +
httpServerRequest.uri());
//发送http响应
httpServerRequest.response()
.putHeader("content-type", "text/plain")
.end("Hello from Vert.x HTTP server!");
});
//4-启动 HTTP 服务器并监听指定端口
httpServer.listen(port, result -> {
if (result.succeeded()) {
System.out.println("Server is now listening on port:" + port);
} else
System.err.println("Failed to start server:" + result.cause());
});
}
}
5.1.4验证web服务器
验证 web 服务器能否启动成功并接受请求。
修改服务提供者(example-producer)模块的 EasyProducer类,编写启动 web 服务的代码,如下:
public class EasyProducer {
public static void main(String[] args) {
//提供服务
VertxHttpServer vertxHttpServer = new VertxHttpServer();
vertxHttpServer.exec(8080);
}
}
浏览器访问localhost:8080
,查看能否正常访问并看到输出的文字。
如果能够正常访问,浏览器窗口以及控制台输出如下图所示
5.2 本地服务注册器
我们现在做的简易 RPC 框架主要是跑通流程,所以暂时先不用第三方注册中心,直接把服务注册到服务提供者本地即可。
在 rpc-easy 模块中新建文件夹 localregcenter ,创建本地服务注册器 LocalRegCenter,使用线程安全的 ConcurrentHashMap 存储服务注册信息,key 为服务名称、value 为服务的实现类。之后就可以根据要调用的服务名称获取到对应的实现类,然后通过反射进行方法调用了。
具体代码如下:
package com.jerry.aperpc.localregcenter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @version 1.0
* @Author jerryLau
* @Date 2024/4/9 16:23
* @注释 本地服务注册器
*/
public class LocalRegCenter {
/***
* 本地注册中心存储列表
*/
private static Map<String, Class<?>> map = new ConcurrentHashMap<String, Class<?>>();
/***
* 注册
* @param serviceName
* @param impl
*/
public static void add(String serviceName, Class<?> impl) {
map.put(serviceName, impl);
}
/***
* 获取
* @param serviceName
* @return
*/
public static Class<?> get(String serviceName) {
return map.get(serviceName);
}
/***
* 移除
* @param serviceName
*/
public static void remove(String serviceName) {
map.remove(serviceName);
}
}
注意:本地服务注册器和注册中心的作用是有区别的。
注册中心的作用侧重于管理注册的服务、提供服务信息给消费者;
而本地服务注册器的作用是根据服务名获取到对应的实现类,是完成调用必不可少的模块。
当服务提供者(example-producer)启动时,需要将自己注册服务到注册器中,修改 EasyProducer代码如下
package com.jerry.producer;
import com.jerry.aperpc.localregcenter.LocalRegCenter;
import com.jerry.aperpc.server.VertxHttpServer;
import com.jerry.common.service.CatService;
import com.jerry.producer.serviceImpl.CatServiceImpl;
/**
* @version 1.0
* @Author jerryLau
* @Date 2024/4/9 15:47
* @注释 简单的生产者
*/
public class EasyProducer {
public static void main(String[] args) {
//注册服务
LocalRegCenter.add(CatService.class.getName(), CatServiceImpl.class);
//提供服务
VertxHttpServer vertxHttpServer = new VertxHttpServer();
vertxHttpServer.exec(8080);
}
}
5.3 序列化器
服务在本地注册后,我们就可以根据请求信息取出实现类并通过反射技术调用实现类的方法了。
在编写处理请求的逻辑前,我们要先实现序列化器模块。
5.3.1 什么是序列化
无论是请求或响应,都会涉及参数的传输。而 Java对象是存活在JVM
虚拟机中的,如果想在其他位置存储并访问、或者在网络中进行传输,就需要进行序列化和反席列化。
简单理解:
**序列化:**将数据结构或对象转换成二进制字节流的过程
**反序列化:**将在序列化过程中所生成的二进制字节流转换成数据结构或者对象的过程
5.3.2 序列化处理要素
序列化的处理要素
- **解析效率:**序列化协议应该首要考虑的因素,像xml/json解析起来比较耗时,需要解析doom树,二进制自定义协议解析起来效率要快很多。
- **压缩率:**同样一个对象,xml/json传输起来有大量的标签冗余信息,信息有效性低,二进制自定义协议占用的空间相对来说会小很多。
- **扩展性与兼容性:**是否能够利于信息的扩展,并且增加字段后旧版客户端是否需要强制升级,这都是需要考虑的问题,在自定义二进制协议时候,要做好充分考虑设计。
- **可读性与可调试性:**xml/json的可读性会比二进制协议好很多,并且通过网络抓包是可以直接读取,二进制则需要反序列化才能查看其内容
- **跨语言:**有些序列化协议是与开发语言紧密相关的,例如dubbo的Hessian序列化协议就只能支持Java的RPC调用。
- **通用性:**xml/json非常通用,都有很好的第三方解析库,各个语言解析起来都十分方便,二进制数据的处理方面也有Protobu和和Hessian等插件,在做设计的时候尽量做到较好的通用性。
5.3.3 序列化器
- JDK原生序列化,通过实现Serializable接口。通过ObjectOutPutSream和ObjectlnputStream对象进行序列化及反序列化.
- JSON序列化。一般在HTTP协议的RPC框架通信中,会选择JSON方式。JSON具有较好的扩展性、可读性和通用性。但JSON序列化占用空间开销较大,没有JAVA的强类型区分,需要通过反射解决,解析效率和压缩率都较差。如果对并发和性能要求较高,或者是传输数据量较大的场景,不建议采用JSON序列化方式。
- Hessian2序列化。Hessian 是一个动态类型,二进制序列化,并且支持跨语言特性的序列化框架。Hessian 性能上要比JDK、JSON 序列化高效很多,并且生成的字节数也更小。有非常好的兼容性和稳定性,所以 Hessian 更加适合作为 RPC 框架远程通信的序列化协议。
- kryo序列化。高性能的序列化/反序列化工具,由于其变长存储特性并使用了字节码生成机制,拥有较高的运行速度和较小的字节码体积
- Protobuf序列化。Protobuf 出自于 Google,性能还比较优秀,也支持多种语言,同时还是跨平台的。就是在使用中过于繁琐,因为你需要自己定义 IDL 文件和生成对应的序列化代码。这样虽然不灵活,但是,另一方面导致 protobuf 没有序列化漏洞的风险。
- 其他序列化方式
在此我们实现简单的kryo序列化
- 首先在rpc模块pom中引入kryo系列化依赖
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>4.0.1</version>
</dependency>
由于其底层依赖于ASM技术,与Spring等框架可能会发生ASM依赖的版本冲突(文档中表示这个冲突还挺容易出现)所以提供了另外一个依赖以供解决此问题
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
<version>4.0.1</version>
</dependency>
同时引入异常管理依赖
<dependency>
<groupId>com.nimbusds</groupId>
<artifactId>oauth2-oidc-sdk</artifactId>
<version>8.36</version>
</dependency>
- 然后在rpc模块新建序列化接口Serializer,提供序列化和反序列化两个接口,方便后续拓展
package com.jerry.aperpc.serializer;
import java.io.IOException;
/**
* @version 1.0
* @Author jerryLau
* @Date 2024/4/9 16:33
* @注释
*/
public interface Serializer {
/***
* 序列化
* @param object
* @return
* @param <T>
* @throws IOException
*/
<T> byte[] serialize(T object) throws IOException;
/***
* 反序列化器
* @param bytes
* @param type
* @return
* @param <T>
* @throws IOException
*/
<T> T deserialize(byte[]bytes, Class<T> type) throws IOException;
}
- 基于kryo实现KryoSerializer
package com.jerry.aperpc.serializer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.jerry.aperpc.model.RpcRequest;
import com.jerry.aperpc.model.RpcResponse;
import com.nimbusds.oauth2.sdk.SerializeException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
/**
* @version 1.0
* @Author jerryLau
* @Date 2024/4/11 9:08
* @注释 基于kryo的序列化器
*/
public class KryoSerializer implements Serializer {
/**
* Because Kryo is not thread safe. So, use ThreadLocal to store Kryo objects
*/
private final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
kryo.register(RpcResponse.class);
kryo.register(RpcRequest.class);
return kryo;
});
@Override
public byte[] serialize(Object obj) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream)) {
Kryo kryo = kryoThreadLocal.get();
// Object->byte:将对象序列化为byte数组
kryo.writeObject(output, obj);
kryoThreadLocal.remove();
return output.toBytes();
} catch (Exception e) {
e.printStackTrace();
throw new SerializeException("Serialization failed");
}
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream)) {
Kryo kryo = kryoThreadLocal.get();
// byte->Object:从byte数组中反序列化出对象
Object o = kryo.readObject(input, clazz);
kryoThreadLocal.remove();
return clazz.cast(o);
} catch (Exception e) {
e.printStackTrace();
throw new SerializeException("Deserialization failed");
}
}
}
至此序列化器完成
5.4 请求处理器 (生产者处理调用)
请求处理器是RPC框架的核心,其主要功能就是:
根据接收到的请求,并根据请求参数找到对应的服务和方法,通过反射实现调用,最后封装返回结果并响应请求。
5.4.1 rpc模块请求响应实体封装
rpc模块中进行请求以及响应的实体封装
rpc请求体(RpcRequest)
请求类 RpcRequest 的作用是封装调用所需的信息,比如服务名称、方法名称、调用参数的类型列表、参数列表。这些都是 Java 反射机制所需的参数。
package com.jerry.aperpc.model;
import lombok.*;
import java.io.Serializable;
/**
* @version 1.0
* @Author jerryLau
* @Date 2024/4/9 16:43
* @注释 rpc 请求封装
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Builder
public class RpcRequest implements Serializable {
/***
服务名称
*/
private String serviceName;
/***
*方法名称
*/
private String methodName;
/***
参数类型列表
*/
private Class<?> [] parameterTypes;
/***
*参数列表
*/
private Object[] args;
}
rpc响应体(RpcResponse)
响应体 RpcResponse 的作用是封装调用方法得到的返回值、以及调用的信息(比如异常情况)等。
package com.jerry.aperpc.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.io.Serializable;
/**
* @version 1.0
* @Author jerryLau
* @Date 2024/4/9 16:47
* @注释 rpc 响应封装
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class RpcResponse implements Serializable {
/***
*响应数据
*/
private Object data;
/***
*响应数据类型(预留)
*/
private Class<?> dataType;
/***
*响应信息
*/
private String message;
/***
* 异常信息
*/
private Exception exception;
}
5.4.2 业务请求处理器
作为业务请求处理器,他有如下几个职责:
1.反序列化请求为对象,并从请求对象中获取参数
2.根据服务名称从本地注册器中获取到对应的服务实现类
3.通过反射机制调用服务实现类中的方法,得到返回结果
4.对返回结果进行封装和序列化,并写入到响应中
完整代码如下
package com.jerry.aperpc.server;
import com.jerry.aperpc.localregcenter.LocalRegCenter;
import com.jerry.aperpc.model.RpcRequest;
import com.jerry.aperpc.model.RpcResponse;
import com.jerry.aperpc.serializer.JDKSerializer;
import com.jerry.aperpc.serializer.KryoSerializer;
import com.jerry.aperpc.serializer.Serializer;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import java.io.IOException;
import java.lang.reflect.Method;
/**
* @version 1.0
* @Author jerryLau
* @Date 2024/4/9 16:49
* @注释 请求处理器
* 1.反序列化请求为对象,并从请求对象中获取参数。
* 2.根据服务名称从本地注册器中获取到对应的服务实现类,
* 3.通过反射机制调用方法,得到返回结果。
* 4.对返回结果进行封装和序列化,并写入到响应中,
*/
public class HttpServerHandler implements Handler<HttpServerRequest> {
@Override
public void handle(HttpServerRequest request) {
//1-指定序列化器
final Serializer serializer = new KryoSerializer();
//打印请求信息日志
System.out.println("Received request :" +"request.uri: ["+ request.uri() + "] request.method : ["+request.method()+"]");
//2-异步进行请求
request.bodyHandler(body -> {
byte[] bytes = body.getBytes();
RpcRequest rpcRequest = null;
try {
rpcRequest = serializer.deserialize(bytes, RpcRequest.class);//反序列化请求为对象
} catch (Exception e) {
e.printStackTrace();
}
// 构造响应结果对象
RpcResponse rpcResponse = new RpcResponse();
//如果请求为 nu11,直接返回
if (rpcRequest == null) {
rpcResponse.setMessage("rpcRequest is null");
doResponse(request, rpcResponse, serializer);
return;
}
try {
// 获取要调用的服务实现类,通过反射调用
Class<?> implClass = LocalRegCenter.get(rpcRequest.getServiceName());//获取实现类
Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());//反射获取实现类的方法
Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());//反射执行对应方法
// 封装返回结果
rpcResponse.setData(result);
rpcResponse.setDataType(method.getReturnType());
rpcResponse.setMessage("ok");
} catch (Exception e) {
e.printStackTrace();
rpcResponse.setMessage(e.getMessage());
rpcResponse.setException(e);
}
// 响应
doResponse(request, rpcResponse, serializer);
});
}
/**
* 响应
*
* @param request
* @param rpcResponse
* @param serializer
*/
void doResponse(HttpServerRequest request, RpcResponse rpcResponse, Serializer serializer) {
HttpServerResponse httpServerResponse = request.response()
.putHeader("content-type", "application/json");
try {
// 序列化
byte[] serialized = serializer.serialize(rpcResponse);
httpServerResponse.end(Buffer.buffer(serialized));
} catch (IOException e) {
e.printStackTrace();
httpServerResponse.end(Buffer.buffer());
}
}
}
简单解释一下上述部分代码:
- 获取实现类:
Class<?> implClass = LocalRegCenter.get(rpcRequest.getServiceName());
这行代码从
LocalRegCenter
(本地服务发现器)中,根据rpcRequest.getServiceName()
提供的服务名,获取了对应的实现类的Class
对象。这个Class
对象代表了实现类的元数据,包括它的方法、字段等信息。
- 获取实现类的方法:
Method method = implClass.getMethod(rpcRequest.getMethodName(),rpcRequest.getParameterTypes());
这行代码通过
implClass.getMethod(...)
方法,根据方法名和参数类型来获取实现类中的某个方法。其中:
rpcRequest.getMethodName()
提供了方法名。rpcRequest.getParameterTypes()
提供了方法的参数类型。
getMethod
方法返回的是一个Method
对象,代表了这个方法。
- 创建实现类的实例:
implClass.newInstance()
通过
newInstance
方法,创建了实现类的一个新实例。这是Java反射中创建对象的一种方式(注意:从Java 9开始,newInstance
方法已被标记为过时,建议使用getDeclaredConstructor().newInstance()
来替代)。
- 调用实现类的方法:
Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());
使用
method.invoke(...)
方法,调用上面获取到的Method
对象所代表的方法。这里做了两件事:
- 首先,通过
implClass.newInstance()
创建了一个实现类的新实例。- 然后,使用
rpcRequest.getArgs()
提供的参数来调用这个方法。
invoke
方法返回的是方法的返回值,这里被存储在result
变量中。
需要注意,不同的 web 服务器对应的请求处理器实现方式也不同,比如 Ver.x 中是通过实现Handler<HitpServerRequest>
接口来自定义请求处理器的。并且可以通过 request.bodyHandler
异步处理请求.
5.4.3 给 HttpServer 绑定请求处理器。
修改 VertxHttpServer 的代码,通过server.requestHandler
绑定请求处理器
修改后的代码如下
package com.jerry.aperpc.server;
import io.vertx.core.Vertx;
/**
* @version 1.0
* @Author jerryLau
* @Date 2024/4/9 16:06
* @注释
*/
public class VertxHttpServer implements HttpServer {
@Override
public void exec(int port) {
//1-创建Vertx 实例
Vertx vertx = Vertx.vertx();
//2-创建http服务器
io.vertx.core.http.HttpServer httpServer = vertx.createHttpServer();
//3-监听端口并处理请求
// httpServer.requestHandler(httpServerRequest -> {
// //处理里HTTP 请求
// System.out.println("Received request :"
// + httpServerRequest.method() + " " +
// httpServerRequest.uri());
//
// //发送http响应
// httpServerRequest.response()
// .putHeader("content-type", "text/plain")
// .end("Hello from Vert.x HTTP server!");
// });
httpServer.requestHandler(new HttpServerHandler());
//4-启动 HTTP 服务器并监听指定端口
httpServer.listen(port, result -> {
if (result.succeeded()) {
System.out.println("Server is now listening on port:" + port);
} else
System.err.println("Failed to start server:" + result.cause());
});
}
}
至此,引入了 RPC 框架的服务提供者模块,已经能够接受请求并完成服务调用了
5.5 代理 (消费者发起调用)
在项目准备阶段,我们已经预留了一段调用服务的代码,只要能够获取到 CatService对象(实现类),就能跑通整个流程。但 CatService的实现类从哪来呢?
总不能把服务提供者的CatServicelmpl复制粘贴到消费者模块吧?要能那样做还需要 RPC 框架干什么?分布式系统中,我们调用其他项目或团队提供的接口时,一般只关注请求参数和响应结果,而不关注具体实现。
在之前的架构中讲过,我们可以通过生成代理对象来简化消费方的调用,代理的实现方式大致分为两类类:静态代理和动态代理,下面依次实现。
5.5.1 静态代理
静态代理是指为每一个特定类型的接口或对象,编写一个代理类。
🌰:在 example-consumer 模块中,创建一个静态代理 CatServiceProxy,实现 CatService接口和 getCatById 方法。
只不过实现 getCatById 方法时,不是复制粘贴服务提供者 CatServicelmpl 中的代码,而是要构造 HTTP 请求去调用服务提供者。
package com.jerry.consumer.proxy;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.jerry.aperpc.model.RpcRequest;
import com.jerry.aperpc.model.RpcResponse;
import com.jerry.aperpc.serializer.JDKSerializer;
import com.jerry.aperpc.serializer.KryoSerializer;
import com.jerry.aperpc.serializer.Serializer;
import com.jerry.common.model.Cat;
import com.jerry.common.service.CatService;
import java.io.IOException;
/**
* @version 1.0
* @Author jerryLau
* @Date 2024/4/9 17:14
* @注释 🐱服务的静态代理
*/
public class CatServiceProxy implements CatService {
@Override
public Cat getCat(Cat cat) {
// 指定序列化器
final Serializer serializer = new KryoSerializer();
// 构造请求
RpcRequest rpcRequest = RpcRequest.builder()
.serviceName(CatService.class.getName())
.methodName("getCat")
.parameterTypes(new Class[]{Cat.class})
.args(new Object[]{cat})
.build();
try {
// 序列化(Java 对象 => 字节数组)
byte[] bodyBytes = serializer.serialize(rpcRequest);
// 发送请求
try (HttpResponse httpResponse = HttpRequest.post("http://localhost:8080")
.body(bodyBytes)
.execute()) {
byte[] result = httpResponse.bodyBytes();
// 反序列化(字节数组 => Java 对象)
RpcResponse rpcResponse = serializer.deserialize(result, RpcResponse.class);
return (Cat) rpcResponse.getData();
}
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
@Override
public Cat getCatById(int id) {
// 指定序列化器
final Serializer serializer = new KryoSerializer();
// 构造请求
RpcRequest rpcRequest = RpcRequest.builder()
.serviceName(CatService.class.getName())
.methodName("getCatById")
.parameterTypes(new Class[]{int.class})
.args(new Object[]{id})
.build();
try {
// 序列化(Java 对象 => 字节数组)
byte[] bodyBytes = serializer.serialize(rpcRequest);
// 发送请求
try (HttpResponse httpResponse = HttpRequest.post("http://localhost:8080")
.body(bodyBytes)
.execute()) {
byte[] result = httpResponse.bodyBytes();
// 反序列化(字节数组 => Java 对象)
RpcResponse rpcResponse = serializer.deserialize(result, RpcResponse.class);
return (Cat) rpcResponse.getData();
}
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
然后修改example-consumer 模块 EasyConsumer
,new 一个代理对象并赋值给 CatService,就能完成调用:
package com.jerry.consumer;
import com.jerry.aperpc.proxy.ServiceProxyFactory;
import com.jerry.common.model.Cat;
import com.jerry.common.service.CatService;
import com.jerry.consumer.proxy.CatServiceProxy;
/**
* @version 1.0
* @Author jerryLau
* @Date ${DATE} ${TIME}
* @注释
*/
public class EasyConsumer {
public static void main(String[] args) {
//todo: 需要获取 CatService 的实现类对象
CatService catService = new CatServiceProxy();
//调用
Cat newCat = catService.getCatById(1);
if (newCat != null) {
System.out.println("消费者获取到的 cat :" + newCat.toString());
} else {
System.out.println("no cat");
}
}
}
接下来我们尝试运行一下,优先启动生产者,然后启动消费者,正常的情况下,控制台会出现如下信息
静态代理虽然很好理解(就是写个实现类嘛),但缺点也很明显,我们如果要给每个服务接口都写一个实现类,是非常麻烦的,这种代理方式的灵活性也是比较差的。
因此我们尝试使用动态代理
5.5.2 动态代理
动态代理的作用是
根据要生成的对象的类型,自动生成一个代理对象。
常用的动态代理实现方式有JDK动态代理
和基于字节码生成的动态代理
(比如 CGLIB)。前者简单易用、无需引入额外的库,但缺点是只能对接口进行代理;后者更灵活、可以对任何类进行代理,但性能略低于JDK动态代理。
此处我们使用 JDK 动态代理,
- 在 RPC 模块中编写动态代理类 ServiceProxy,需要实现 InvocationHandler 接口的 invoke 方法
几乎就是将静态代理的方式搬运过来
package com.jerry.aperpc.proxy;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.jerry.aperpc.model.RpcRequest;
import com.jerry.aperpc.model.RpcResponse;
import com.jerry.aperpc.serializer.JDKSerializer;
import com.jerry.aperpc.serializer.KryoSerializer;
import com.jerry.aperpc.serializer.Serializer;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
/**
* @version 1.0
* @Author jerryLau
* @Date 2024/4/10 8:44
* @注释 rpc 模块中的动态代理
*/
public class ServiceProxy implements InvocationHandler {
/***
* 调用代理
* @param proxy the proxy instance that the method was invoked on
*
* @param method the {@code Method} instance corresponding to
* the interface method invoked on the proxy instance. The declaring
* class of the {@code Method} object will be the interface that
* the method was declared in, which may be a superinterface of the
* proxy interface that the proxy class inherits the method through.
*
* @param args an array of objects containing the values of the
* arguments passed in the method invocation on the proxy instance,
* or {@code null} if interface method takes no arguments.
* Arguments of primitive types are wrapped in instances of the
* appropriate primitive wrapper class, such as
* {@code java.lang.Integer} or {@code java.lang.Boolean}.
*
* @return
* @throws Throwable
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
System.out.println("invoke current method :" + method + "args: " + args);
System.out.println("invoke current method :" + method.getName() + " args: " + args +" parameterTypes:"+method.getParameterTypes());
// 指定序列化器
final Serializer serializer = new KryoSerializer();
// 构造请求
RpcRequest rpcRequest = RpcRequest.builder()
.serviceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.parameterTypes(method.getParameterTypes())
.args(args)
.build();
try {
// 序列化(Java 对象 => 字节数组)
byte[] bodyBytes = serializer.serialize(rpcRequest);
// 发送请求
//todo:暂时设置成硬编码,后面改成服务发现
try (HttpResponse httpResponse = HttpRequest.post("http://localhost:8080")
.body(bodyBytes)
.execute()) {
byte[] result = httpResponse.bodyBytes();
// 反序列化(字节数组 => Java 对象)
RpcResponse rpcResponse = serializer.deserialize(result, RpcResponse.class);
return rpcResponse.getData();
}
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
- 创建动态代理工厂 ServiceProxyFactory,作用是根据指定类创建动态代理对象
这里是使用了 工厂设计模式,来简化对象的创建过程,代码如下:
package com.jerry.aperpc.proxy;
import java.lang.reflect.Proxy;
/**
* @version 1.0
* @Author jerryLau
* @Date 2024/4/10 8:50
* @注释 动态代理工厂,通过指定类创建代理对象
*/
public class ServiceProxyFactory {
public static <T> T getProxy(Class<T> serviceClass) {
return (T) Proxy.newProxyInstance(
serviceClass.getClassLoader(),
new Class[]{serviceClass}
, new ServiceProxy()
);
}
}
上述代码中,主要是通过 Proxy.newProxyInstance
方法为指定类型创建代理对象
- 最后将example-consumer模块中获取通过静态代理获取CatService,调整成调用动态代理工厂得到动态代理对象
package com.jerry.consumer;
import com.jerry.aperpc.proxy.ServiceProxyFactory;
import com.jerry.common.model.Cat;
import com.jerry.common.service.CatService;
import com.jerry.consumer.proxy.CatServiceProxy;
/**
* @version 1.0
* @Author jerryLau
* @Date ${DATE} ${TIME}
* @注释
*/
public class EasyConsumer {
public static void main(String[] args) {
//todo: 需要获取 CatService 的实现类对象
// CatService catService = new CatServiceProxy();
CatService catService = ServiceProxyFactory.getProxy(CatService.class);
//调用
Cat newCat = catService.getCatById(1);
if (newCat != null) {
System.out.println("消费者获取到的 cat :" + newCat.toString());
} else {
System.out.println("no cat");
}
}
}
运行结果与静态差不太多
简单解释下上述动态代理部分代码执行流程,稍后再测试环节客以debug看到他是如何运行的
当服务消费者中通过调用
catService.getCatById
时,实际上并没有直接调用CatService
类的getCatById
方法,而是调用了 通过动态代理工厂生成的代理对象的getCatById
方法。这个代理对象内部持有InvocationHandler
的引用(在这个例子中是ServiceProxy
的实例),并将方法调用转发给ServiceProxy
的invoke
方法。
invoke
方法是如何拿到对应的方法进行执行的呢?这是通过反射机制实现的。当代理对象的方法被调用时,Proxy
类生成的代理对象会捕获这个调用,并获取被调用的方法(Method
对象)、调用该方法的代理对象本身(proxy
参数)以及传递给该方法的参数(args
参数)。然后,它将这些信息传递给InvocationHandler
的invoke
方法。在
invoke
方法内部,你可以通过method
参数来获取被调用的方法对象,这个例子中做了系统的打印输出的同时,获取到了要调用的方法信息、传入的参数列表等,这不就是我们服务提供者需要的参数么?用这些参数来构造请求对象就可以完成调用了。因此,当你通过代理对象调用方法时,实际上执行的是
InvocationHandler
的invoke
方法,而在这个方法中,你可以通过反射机制调用目标对象上的相应方法,并在调用前后添加你想要的额外逻辑。这种方式允许你在不修改目标对象代码的情况下,为目标对象添加一些额外的功能或行为。需要注意的是:上述代码中,请求的服务提供者地址被硬编码了,后期需要使用注册中心和服务发现机制来解决。
至此,简易版的 RPC 框架已经开发完成,下面我们进行简单的测试。
6、简单测试
- 以debug模式运行EasyProducer
- 以debug模式运行EasyConsumer,在动态代理工厂ServiceProxy设置断点,可以看到调用 catService时,实际是调用了代理对象的 invoke 方法,并且获取到了 serviceName、methodName、参数类型和列表等信息。
- 在服务提供者出请求处理出设置断点,可以看到接受并反序列化后的请求,跟发送时的内容一致
最后运行输出的结果就在不给大家演示了,参考上面5.5.2 动态代理的运行结果截图
至此,我们实现了简易版的PRC框架
码字不易,希望大家能够一键三连🌝⭐🌟
代码仓库 ape-rpc: 轮子项目,手动实现rpc github🌐 || ape-rpc: 轮子项目,手动实现rpc gitee🌐