前言:利用netty异步事件驱动的网络通信模型,来实现rpc通信
一、大致目录结构:
二、两个端:服务端(发布),客户端(订阅消费),上代码:
1.服务端(发布):
RPCServer:
代码:
public class RpcServer {
private Map<String, Object> registryMap = new HashMap<>();
private List<String> classCache = new ArrayList<>();
// 1.实现发布;实现方式:
// 1.1查找指定目录下的所有接口,放入一个集合中
// 1.2遍历所有接口,放入一个map中存放接口路径及接口名称
// 1.3发送方法相关信息
public void publish(String providerPackage) throws Exception {
getProviderClass(providerPackage);
doRegister();
NioEventLoopGroup parentGroup = new NioEventLoopGroup();
NioEventLoopGroup childGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(parentGroup, childGroup)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new RpcServerHandler(registryMap));
}
});
ChannelFuture future = serverBootstrap.bind(9999).sync();
System.out.println("服务端监听9999端口,启动成功。。。");
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
private void doRegister() throws Exception {
if (classCache.size() > 0) {
for (String className :classCache){
Class<?> clazz = Class.forName(className);
String interfaceName = clazz.getInterfaces()[0].getName();
registryMap.put(interfaceName, clazz.newInstance());
}
}
}
/**
* 获取当前目录下的所有接口,汇总到一个集合中
* @param providerPackage
*/
private void getProviderClass(String providerPackage) {
URL resource = this.getClass().getClassLoader().getResource(providerPackage.replaceAll("\\.", "/"));
File file = null;
if (resource != null) {
file = new File(resource.getFile());
}
if (file != null) {
for (File f : Objects.requireNonNull(file.listFiles())) {
if (f.isDirectory()) {
getProviderClass(providerPackage + "." + f.getName());
} else if (f.getName().endsWith(".class")) {
String fileName = f.getName().replace(".class", "").trim();
classCache.add(providerPackage + "." + fileName);
}
}
}
}
}
服务端处理器:用于处理消费端发送过来的接口数据
代码:
public class RpcServerHandler extends ChannelInboundHandlerAdapter {
private Map<String, Object> registryMap;
public RpcServerHandler(Map<String, Object> registryMap) {
this.registryMap = registryMap;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("rpc-server收到客户端消息:" + msg);
if (msg instanceof InvokeMessage) {
InvokeMessage method = (InvokeMessage) msg;
Object result = "rpc-server端没有该方法";
// 判断是否存在该方法
if (registryMap.containsKey(method.getClassName())) {
Object provider = registryMap.get(method.getClassName());
result = provider.getClass()
.getMethod(method.getMethodName(), method.getParamTypes())
.invoke(provider, method.getParamValues());
}
// 把方法返回结果,发给订阅者
ctx.channel().writeAndFlush(result);
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
接口的实现类:这里的接口是客户端的接口Api
代码:
public class SsenServiceApiImpl implements SsenServiceApi {
@Override
public String hellRpc(String name) {
return name + "实现类方法";
}
}
2.客户端:(订阅消费)
这里采用JDK原生的基于接口的动态代理f发
public class RpcProxy {
// JDK基于接口的动态代理,用于创建代理对象
public static <T> T create(Class<?> clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(),
new Class[]{clazz},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (Object.class.equals(method.getDeclaringClass())) {
return method.invoke(proxy, method, args);
}
// 通过netty将接口信息发送给提供者,获取指定方法
return RpcInvoke(clazz, method, args);
}
});
}
private static Object RpcInvoke(Class<?> clazz, Method method, Object[] args) {
NioEventLoopGroup eventGroup = new NioEventLoopGroup();
RpcClientHandler rpcClientHandler = new RpcClientHandler();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null)));
pipeline.addLast(rpcClientHandler);
}
});
// 绑定指定服务地址
ChannelFuture future = bootstrap.connect("localhost", 9999).sync();
// 指定接口信息发送给提供者
InvokeMessage invokeMessage = new InvokeMessage();
invokeMessage.setClassName(clazz.getName());
invokeMessage.setMethodName(method.getName());
invokeMessage.setParamTypes(method.getParameterTypes());
invokeMessage.setParamValues(args);
future.channel().writeAndFlush(invokeMessage).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
eventGroup.shutdownGracefully();
}
return rpcClientHandler.getResult();
}
}