简易版 RPC 框架实现 2.0 -netty实现

这一篇理解如果有难度,可能对netty不是很理解, 可以关注我netty专栏,还有另外一篇: 用 Netty 自己实现简单的RPC, 这一篇是学习netty的时候写的,更倾向于分析netty相关的知识, 今天我是学习dubbo,从一个rpc框架进行思考写的一篇文章

RPC 是“远程过程调用(Remote Procedure Call)”的缩写形式,比较通俗的解释是:像本地方法调用一样调用远程的服务。虽然 RPC 的定义非常简单,但是相对完整的、通用的 RPC 框架涉及很多方面的内容,例如注册发现、服务治理、负载均衡、集群容错、RPC 协议等,如下图所示:
在这里插入图片描述

简易 RPC 框架的架构图

本课时我们主要实现RPC 框架的基石部分——远程调用,简易版 RPC 框架一次远程调用的核心流程是这样的:

  1. Client 首先会调用本地的代理,也就是图中的 Proxy。
  2. Client 端 Proxy 会按照协议(Protocol),将调用中传入的数据序列化成字节流。
  3. 之后 Client 会通过网络,将字节数据发送到 Server 端。
  4. Server 端接收到字节数据之后,会按照协议进行反序列化,得到相应的请求信息。
  5. Server 端 Proxy 会根据序列化后的请求信息,调用相应的业务逻辑。
  6. Server 端业务逻辑的返回值,也会按照上述逻辑返回给 Client 端。

这个远程调用的过程,就是我们简易版本 RPC 框架的核心实现,只有理解了这个流程,才能进行后续的开发。

项目结构

了解了简易版 RPC 框架的工作流程和实现目标之后,我们再来看下项目的结构,为了方便起见,这里我们将整个项目放到了一个 Module 中了,如下图所示,你可以按照自己的需求进行模块划分。

在这里插入图片描述
那这各个包的功能是怎样的呢?我们就来一一说明。

  • protocol:简易版 RPC 框架的自定义协议。
  • serialization:提供了自定义协议对应的序列化、反序列化的相关工具类。
  • codec:提供了自定义协议对应的编码器和解码器。
  • transport:基于 Netty 提供了底层网络通信的功能,其中会使用到 codec 包中定义编码器和解码器,以及 serialization 包中的序列化器和反序列化器。
  • registry:基于 ZooKeeper 和 Curator 实现了简易版本的注册中心功能。
  • proxy:使用 JDK 动态代理实现了一层代理。

自定义协议

当前已经有很多成熟的协议了,例如 HTTP、HTTPS 等,那为什么我们还要自定义 RPC 协议呢?

从功能角度考虑,HTTP 协议在 1.X 时代,只支持半双工传输模式,虽然支持长连接,但是不支持服务端主动推送数据。从效率角度来看,在一次简单的远程调用中,只需要传递方法名和加个简单的参数,此时,HTTP 请求中大部分数据都被 HTTP Header 占据,真正的有效负载非常少,效率就比较低。

当然,HTTP 协议也有自己的优势,例如,天然穿透防火墙,大量的框架和开源软件支持 HTTP 接口,而且配合 REST 规范使用也是很便捷的,所以有很多 RPC 框架直接使用 HTTP 协议,尤其是在 HTTP 2.0 之后,如 gRPC、Spring Cloud 等。

这里我们自定义一个简易版的 Demo RPC 协议,如下图所示:

在这里插入图片描述
在 Demo RPC 的消息头中,包含了整个 RPC 消息的一些控制信息,例如,版本号、魔数、消息类型、附加信息、消息 ID 以及消息体的长度,在附加信息(extraInfo)中,按位进行划分,分别定义消息的类型、序列化方式、压缩方式以及请求类型。当然,你也可以自己扩充 Demo RPC 协议,实现更加复杂的功能。

Demo RPC 消息头对应的实体类是 Header,其定义如下:

public class Header {

    private short magic; // 魔数
    private byte version; // 版本号
    private byte extraInfo; // 附加信息
    private Long messageId; // 消息ID
    private Integer size; // 消息体长度

确定了 Demo RPC 协议消息头的结构之后,我们再来看 Demo RPC 协议消息体由哪些字段构成,这里我们通过 Request 和 Response 两个实体类来表示请求消息和响应消息的消息体:

public class Request implements Serializable {

    private String serviceName; // 请求的Service类名

    private String methodName; // 请求的方法名称

    private Class[] argTypes; // 请求方法的参数类型

    private Object[] args; // 请求方法的参数
}
public class Response implements Serializable {

    private int code = 0; // 响应的错误码,正常响应为0,非0表示异常响应

    private String errMsg; // 异常信息

    private Object result; // 响应结果
}

注意,Request 和 Response 对象是要进行序列化的,需要实现 Serializable 接口。为了让这两个类的对象能够在 Client 和 Server 之间跨进程传输,需要进行序列化和反序列化操作,这里定义一个 Serialization 接口,统一完成序列化相关的操作:

public interface Serialization {
    <T> byte[] serialize(T obj) throws IOException;

    <T> T deserialize(byte[] data, Class<T> clz) throws IOException;
}

在 Demo RPC 中默认使用 Hessian 序列化方式,下面的 HessianSerialization 就是基于 Hessian 序列化方式对 Serialization 接口的实现:

public class HessianSerialization implements Serialization {
    @Override
    public byte[] serialize(Object data) throws IOException {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        Hessian2Output out = new Hessian2Output(bos);
        out.writeObject(data);
        out.flush();
        return bos.toByteArray();
    }

    public <T> T deserialize(byte[] data, Class<T> clz) throws IOException {
        Hessian2Input input = new Hessian2Input(new ByteArrayInputStream(data));
        return (T) input.readObject(clz);
    }
}

在有的场景中,请求或响应传输的数据比较大,直接传输比较消耗带宽,所以一般会采用压缩后再发送的方式。在前面介绍的 Demo RPC 消息头中的 extraInfo 字段中,就包含了标识消息体压缩方式的 bit 位。这里我们定义一个 Compressor 接口抽象所有压缩算法:

public interface Compressor {
  byte[] compress(byte[] array) throws IOException;
  byte[] unCompress(byte[] array) throws IOException;
}

同时提供了一个基于 Snappy 压缩算法的实现,作为 Demo RPC 的默认压缩算法:

public class SnappyCompressor implements Compressor {
  public byte[] compress(byte[] array) throws IOException {
      if (array == null) { return null; }
      return Snappy.compress(array);
  }
  public byte[] unCompress(byte[] array) throws IOException {
      if (array == null) { return null; }
      return Snappy.uncompress(array);
  }
}

编解码实现

了解了自定义协议的结构之后,我们再来解决协议的编解码问题。

前面课时介绍 Netty 核心概念的时候我们提到过,Netty 每个 Channel 绑定一个 ChannelPipeline,并依赖 ChannelPipeline 中添加的 ChannelHandler 处理接收到(或要发送)的数据,其中就包括字节到消息(以及消息到字节)的转换。Netty 中提供了 ByteToMessageDecoder、 MessageToByteEncoder、MessageToMessageEncoder、MessageToMessageDecoder 等抽象类来实现 Message 与 ByteBuf 之间的转换以及 Message 之间的转换,如下图所示:

在这里插入图片描述
Netty 提供的 Decoder 和 Encoder 实现

在 Netty 的源码中,我们可以看到对很多已有协议的序列化和反序列化都是基于上述抽象类实现的,例如,HttpServerCodec 中通过依赖 HttpServerRequestDecoder 和 HttpServerResponseEncoder 来实现 HTTP 请求的解码和 HTTP 响应的编码。如下图所示,HttpServerRequestDecoder 继承自 ByteToMessageDecoder,实现了 ByteBuf 到 HTTP 请求之间的转换;HttpServerResponseEncoder 继承自 MessageToMessageEncoder,实现 HTTP 响应到其他消息的转换(其中包括转换成 ByteBuf 的能力)。

在这里插入图片描述
Netty 中 HTTP 协议的 Decoder 和 Encoder 实现

在简易版 RPC 框架中,我们的自定义请求暂时没有 HTTP 协议那么复杂,只要简单继承 ByteToMessageDecoder 和 MessageToMessageEncoder 即可。

首先来看 DemoRpcDecoder,它实现了 ByteBuf 到 Demo RPC Message 的转换,具体实现如下:

public class DemoRpcDecoder extends ByteToMessageDecoder {

    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
        if (byteBuf.readableBytes() < Constants.HEADER_SIZE) {
            return; // 不到16字节的话无法解析消息头,暂不读取
        }
        // 记录当前readIndex指针的位置,方便重置
        byteBuf.markReaderIndex();
        // 尝试读取消息头的魔数部分
        short magic = byteBuf.readShort();
        if (magic != Constants.MAGIC) { // 魔数不匹配会抛出异常
            byteBuf.resetReaderIndex(); // 重置readIndex指针
            throw new RuntimeException("magic number error:" + magic);
        }
        // 依次读取消息版本、附加信息、消息ID以及消息体长度四部分
        byte version = byteBuf.readByte();
        byte extraInfo = byteBuf.readByte();
        long messageId = byteBuf.readLong();
        int size = byteBuf.readInt();
        Object body = null;
        // 心跳消息是没有消息体的,无需读取
        if (!Constants.isHeartBeat(extraInfo)) {
            // 对于非心跳消息,没有积累到足够的数据是无法进行反序列化的
            if (byteBuf.readableBytes() < size) {
                byteBuf.resetReaderIndex();
                return;
            }
            // 读取消息体并进行反序列化
            byte[] payload = new byte[size];
            byteBuf.readBytes(payload);
            // 这里根据消息头中的extraInfo部分选择相应的序列化和压缩方式
            Serialization serialization = SerializationFactory.get(extraInfo);
            Compressor compressor = CompressorFactory.get(extraInfo);
            if (Constants.isRequest(extraInfo)) {
                // 得到消息体
                body = serialization.deserialize(compressor.unCompress(payload),
                        Request.class);
            } else {
                // 得到消息体
                body = serialization.deserialize(compressor.unCompress(payload),
                        Response.class);
            }
        }
        // 将上面读取到的消息头和消息体拼装成完整的Message并向后传递
        Header header = new Header(magic, version, extraInfo, messageId, size);
        Message message = new Message(header, body);
        out.add(message);
    }
}
public class DemoRpcEncoder extends MessageToByteEncoder<Message> {

    @Override
    protected void encode(ChannelHandlerContext ctx,
                          Message message, ByteBuf byteBuf) throws Exception {
        Header header = message.getHeader();
        // 依次序列化消息头中的魔数、版本、附加信息以及消息ID
        byteBuf.writeShort(header.getMagic());
        byteBuf.writeByte(header.getVersion());
        byteBuf.writeByte(header.getExtraInfo());
        byteBuf.writeLong(header.getMessageId());
        Object content = message.getContent();
        if (Constants.isHeartBeat(header.getExtraInfo())) {
            byteBuf.writeInt(0); // 心跳消息,没有消息体,这里写入0
            return;
        }
        // 按照extraInfo部分指定的序列化方式和压缩方式进行处理
        Serialization serialization = SerializationFactory.get(header.getExtraInfo());
        Compressor compressor = CompressorFactory.get(header.getExtraInfo());
        byte[] payload = compressor.compress(serialization.serialize(content));
        byteBuf.writeInt(payload.length); // 写入消息体长度
        byteBuf.writeBytes(payload); // 写入消息体
    }
}

transport 相关实现

正如前文介绍 Netty 线程模型的时候提到,我们不能在 Netty 的 I/O 线程中执行耗时的业务逻辑。在 Demo RPC 框架的 Server 端接收到请求时,首先会通过上面介绍的 DemoRpcDecoder 反序列化得到请求消息,之后我们会通过一个自定义的 ChannelHandler(DemoRpcServerHandler)将请求提交给业务线程池进行处理。

在 Demo RPC 框架的 Client 端接收到响应消息的时候,也是先通过 DemoRpcDecoder 反序列化得到响应消息,之后通过一个自定义的 ChannelHandler(DemoRpcClientHandler)将响应返回给上层业务。

DemoRpcServerHandler 和 DemoRpcClientHandler 都继承自 SimpleChannelInboundHandler,如下图所示:
在这里插入图片描述
下面我们就来看一下这两个自定义的 ChannelHandler 实现:

public class DemoRpcServerHandler extends SimpleChannelInboundHandler<Message<Request>> {

    // 业务线程池
    private static Executor executor = Executors.newCachedThreadPool();

    @Override
    protected void channelRead0(final ChannelHandlerContext channelHandlerContext, Message<Request> message) throws Exception {
        byte extraInfo = message.getHeader().getExtraInfo();
        if (Constants.isHeartBeat(extraInfo)) { // 心跳消息,直接返回即可
            channelHandlerContext.writeAndFlush(message);
            return;
        }
        // 非心跳消息,直接封装成Runnable提交到业务线程池
        executor.execute(new InvokeRunnable(message, channelHandlerContext));
    }
}
public class DemoRpcClientHandler extends SimpleChannelInboundHandler<Message<Response>> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Message<Response> message) throws Exception {
        NettyResponseFuture responseFuture =
                Connection.IN_FLIGHT_REQUEST_MAP.remove(message.getHeader().getMessageId());
        Response response = message.getContent();
        // 心跳消息特殊处理
        if (response == null && Constants.isHeartBeat(message.getHeader().getExtraInfo())) {
            response = new Response();
            response.setCode(Constants.HEARTBEAT_CODE);
        }
        responseFuture.getPromise().setSuccess(response.getResult());
    }
}

注意,这里有两个点需要特别说明一下。一个点是 Server 端的 InvokeRunnable,在这个 Runnable 任务中会根据请求的 serviceName、methodName 以及参数信息,调用相应的方法:

class InvokeRunnable implements Runnable {

    private ChannelHandlerContext ctx;
    private Message<Request> message;

    public InvokeRunnable(Message<Request> message, ChannelHandlerContext ctx) {
        this.message = message;
        this.ctx = ctx;
    }

    @Override
    public void run() {
        Response response = new Response();
        Object result = null;
        try {
            Request request = message.getContent();
            String serviceName = request.getServiceName();
            // 这里提供BeanManager对所有业务Bean进行管理,其底层在内存中维护了
            // 一个业务Bean实例的集合。感兴趣的同学可以尝试接入Spring等容器管
            // 理业务Bean
            Object bean = BeanManager.getBean(serviceName);
            // 下面通过反射调用Bean中的相应方法
            Method method = bean.getClass().getMethod(request.getMethodName(), request.getArgTypes());
            result = method.invoke(bean, request.getArgs());
        } catch (Exception e) {
            // 省略异常处理
        } finally {
        }
        Header header = message.getHeader();
        header.setExtraInfo((byte) 1);
        response.setResult(result); // 设置响应结果
        // 将响应消息返回给客户端
        ctx.writeAndFlush(new Message(header, response));
    }

}

另一个点是 Client 端的 Connection,它是用来暂存已发送出去但未得到响应的请求,这样,在响应返回时,就可以查找到相应的请求以及 Future,从而将响应结果返回给上层业务逻辑,具体实现如下:

public class Connection implements Closeable {
  private static AtomicLong ID_GENERATOR = new AtomicLong(0);
  public static Map<Long, NettyResponseFuture<Response>> 
      IN_FLIGHT_REQUEST_MAP = new ConcurrentHashMap<>();
  private ChannelFuture future;
  private AtomicBoolean isConnected = new AtomicBoolean();
  public Connection(ChannelFuture future, boolean isConnected) {
      this.future = future;
      this.isConnected.set(isConnected);
  }
  public NettyResponseFuture<Response> request(Message<Request> message, long timeOut) {
      // 生成并设置消息ID
      long messageId = ID_GENERATOR.incrementAndGet();
      message.getHeader().setMessageId(messageId);
      // 创建消息关联的Future
      NettyResponseFuture responseFuture = new NettyResponseFuture(System.currentTimeMillis(),
              timeOut, message, future.channel(), new DefaultPromise(new DefaultEventLoop()));
      // 将消息ID和关联的Future记录到IN_FLIGHT_REQUEST_MAP集合中
      IN_FLIGHT_REQUEST_MAP.put(messageId, responseFuture);
      try {
          future.channel().writeAndFlush(message); // 发送请求
      } catch (Exception e) {
          // 发送请求异常时,删除对应的Future
          IN_FLIGHT_REQUEST_MAP.remove(messageId);
          throw e;
      }
      return responseFuture;
  }
  // 省略getter/setter以及close()方法
}

我们可以看到,Connection 中没有定时清理 IN_FLIGHT_REQUEST_MAP 集合的操作,在无法正常获取响应的时候,就会导致 IN_FLIGHT_REQUEST_MAP 不断膨胀,最终 OOM。你也可以添加一个时间轮定时器,定时清理过期的请求消息,这里我们就不再展开讲述了。

完成自定义 ChannelHandler 的编写之后,我们需要再定义两个类—— DemoRpcClient 和 DemoRpcServer,分别作为 Client 和 Server 的启动入口。DemoRpcClient 的实现如下:

public class DemoRpcClient implements Closeable {

    protected Bootstrap clientBootstrap;
    protected EventLoopGroup group;
    private String host;
    private int port;

    public DemoRpcClient(String host, int port) {
        this.host = host;
        this.port = port;
        // 创建并配置客户端Bootstrap
        clientBootstrap = new Bootstrap();
        group = NettyEventLoopFactory.eventLoopGroup(Constants.DEFAULT_IO_THREADS, "NettyClientWorker");
        clientBootstrap.group(group)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .channel(NioSocketChannel.class) // 创建的Channel类型
                // 指定ChannelHandler的顺序
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast("demo-rpc-encoder", new DemoRpcEncoder());
                        ch.pipeline().addLast("demo-rpc-decoder", new DemoRpcDecoder());
                        ch.pipeline().addLast("client-handler", new DemoRpcClientHandler());
                    }
                });
    }


    public ChannelFuture connect() {
        // 连接指定的地址和端口
        ChannelFuture connect = clientBootstrap.connect(host, port);
        connect.awaitUninterruptibly();
        return connect;
    }

    @Override
    public void close() {
        group.shutdownGracefully();
    }
}

通过 DemoRpcClient 的代码我们可以看到其 ChannelHandler 的执行顺序如下:
在这里插入图片描述
客户端 ChannelHandler 结构图

另外,在创建EventLoopGroup时并没有直接使用NioEventLoopGroup,而是在 NettyEventLoopFactory 中根据当前操作系统进行选择,对于 Linux 系统,会使用 EpollEventLoopGroup,其他系统则使用 NioEventLoopGroup。

接下来我们再看DemoRpcServer 的具体实现:

public class DemoRpcServer {

    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private ServerBootstrap serverBootstrap;
    private Channel channel;
    protected int port;

    public DemoRpcServer(int port) throws InterruptedException {
        this.port = port;
        // 创建boss和worker两个EventLoopGroup,注意一些小细节,
        // workerGroup 是按照中的线程数是按照 CPU 核数计算得到的
        bossGroup = NettyEventLoopFactory.eventLoopGroup(1,
                "NettyServerBoss");
        workerGroup = NettyEventLoopFactory.eventLoopGroup(
                Math.min(Runtime.getRuntime().availableProcessors() + 1, 32),
                "NettyServerWorker");
        serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                // 指定每个Channel上注册的ChannelHandler以及顺序
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(
                        new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast("demp-rpc-decoder", new DemoRpcDecoder());
                                ch.pipeline().addLast("demo-rpc-encoder", new DemoRpcEncoder());
                                ch.pipeline().addLast("server-handler", new DemoRpcServerHandler());
                            }
                        });
    }

    public ChannelFuture start() throws InterruptedException {
        // 监听指定的端口
        ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (channelFuture.isSuccess()) {
                    System.out.println("监听端口 6668 成功");
                } else {
                    System.out.println("监听端口 6668 失败");
                }
            }
        });

        channel = channelFuture.channel();
        channel.closeFuture().sync();
        return channelFuture;
    }


    public void startAndWait() throws InterruptedException {
        try {
            channel.closeFuture().await();
        } catch (InterruptedException e) {
            Thread.interrupted();
        }
    }


    public void shutdown() throws InterruptedException {
        channel.close().sync();
        if (bossGroup != null)
            bossGroup.shutdownGracefully().awaitUninterruptibly(15000);
        if (workerGroup != null)
            workerGroup.shutdownGracefully().awaitUninterruptibly(15000);
    }

}

通过对 DemoRpcServer 实现的分析,我们可以知道每个 Channel 上的 ChannelHandler 顺序如下:

在这里插入图片描述
服务端 ChannelHandler 结构图

registry 相关实现

介绍完客户端和服务端的通信之后,我们再来看简易 RPC 框架的另一个基础能力——服务注册与服务发现能力,对应 demo-rpc 项目源码中的 registry 包。

registry 包主要是依赖 Apache Curator 实现了一个简易版本的 ZooKeeper 客户端,并基于 ZooKeeper 实现了注册中心最基本的两个功能:Provider 注册以及 Consumer 订阅。

这里我们先定义一个 Registry 接口,其中提供了注册以及查询服务实例的方法,如下图所示:

public interface Registry<T> {

    void registerService(ServiceInstance<T> service) throws Exception;

    void unregisterService(ServiceInstance<T> service) throws Exception;

    List<ServiceInstance<T>> queryForInstances(String name) throws Exception;
}

ZooKeeperRegistry 是基于 curator-x-discovery 对 Registry 接口的实现类型,其中封装了之前课时介绍的 ServiceDiscovery,并在其上添加了 ServiceCache 缓存提高查询效率。ZooKeeperRegistry 的具体实现如下:

public class ZookeeperRegistry<T> implements Registry<T> {

    private Map<String, List<ServiceInstanceListener<T>>> listeners = Maps.newConcurrentMap();

    private InstanceSerializer serializer = new JsonInstanceSerializer<>(ServerInfo.class);

    private ServiceDiscovery<T> serviceDiscovery;

    private ServiceCache<T> serviceCache;

    private String address = "localhost:2181";

    public void start() throws Exception {
        String root = "/demo/rpc";
        // 初始化CuratorFramework
        CuratorFramework client = CuratorFrameworkFactory.newClient(address, new ExponentialBackoffRetry(1000, 3));
        client.start();  // 启动Curator客户端
        // client.createContainers(root);

        // 初始化ServiceDiscovery
        serviceDiscovery = ServiceDiscoveryBuilder.builder(ServerInfo.class)
                .client(client).basePath(root)
                .serializer(serializer)
                .build();
        serviceDiscovery.start(); // 启动ServiceDiscovery

        // 创建ServiceCache,监Zookeeper相应节点的变化,也方便后续的读取
        serviceCache = serviceDiscovery.serviceCacheBuilder()
                .name("/demoService")
                .build();
//        client.start(); // 启动Curator客户端
        client.blockUntilConnected();  // 阻塞当前线程,等待连接成功
        serviceDiscovery.start(); // 启动ServiceDiscovery
        serviceCache.start(); // 启动ServiceCache
    }

    @Override
    public void registerService(ServiceInstance<T> service) throws Exception {
        serviceDiscovery.registerService(service);
    }

    @Override
    public void unregisterService(ServiceInstance service) throws Exception {
        serviceDiscovery.unregisterService(service);
    }

    @Override
    public List<ServiceInstance<T>> queryForInstances(String name) throws Exception {
        // 直接根据name进行过滤ServiceCache中的缓存数据
        return serviceCache.getInstances().stream()
                .filter(s -> s.getName().equals(name))
                .collect(Collectors.toList());
    }
}

通过对 ZooKeeperRegistry的分析可以得知,它是基于 Curator 中的 ServiceDiscovery 组件与 ZooKeeper 进行交互的,并且对 Registry 接口的实现也是通过直接调用 ServiceDiscovery 的相关方法实现的。在查询时,直接读取 ServiceCache 中的缓存数据,ServiceCache 底层在本地维护了一个 ConcurrentHashMap 缓存,通过 PathChildrenCache 监听 ZooKeeper 中各个子节点的变化,同步更新本地缓存。这里我们简单看一下 ServiceCache 的核心实现:

public class ServiceCacheImpl<T> implements ServiceCache<T>, 
PathChildrenCacheListener{//实现PathChildrenCacheListener接口
  // 关联的ServiceDiscovery实例
  private final ServiceDiscoveryImpl<T>  discovery;
  // 底层的PathChildrenCache,用于监听子节点的变化
  private final PathChildrenCache cache; 
  // 本地缓存
  private final ConcurrentMap<String, ServiceInstance<T>> instances 
    = Maps.newConcurrentMap();
  public List<ServiceInstance<T>> getInstances(){ // 返回本地缓存内容
      return Lists.newArrayList(instances.values());
  }
  public void childEvent(CuratorFramework client, 
        PathChildrenCacheEvent event) throws Exception{
      switch(event.getType()){
          case CHILD_ADDED:
          case CHILD_UPDATED:{
              addInstance(event.getData(), false); // 更新本地缓存
              notifyListeners = true;
              break;
          }
          case CHILD_REMOVED:{ // 更新本地缓存
              instances.remove(instanceIdFromData(event.getData()));
              notifyListeners = true;
              break;
          }
      }
      ... // 通知ServiceCache上注册的监听器
  }
}

proxy 相关实现

在简易版 Demo RPC 框架中,Proxy 主要是为 Client 端创建一个代理,帮助客户端程序屏蔽底层的网络操作以及与注册中心之间的交互。

简易版 Demo RPC 使用 JDK 动态代理的方式生成代理,这里需要编写一个 InvocationHandler 接口的实现,即下面的 DemoRpcProxy。其中有两个核心方法:一个是 newInstance() 方法,用于生成代理对象;另一个是 invoke() 方法,当调用目标对象的时候,会执行 invoke() 方法中的代理逻辑。

下面是 DemoRpcProxy 的具体实现:

public class DemoRpcProxy implements InvocationHandler {

    private String serviceName; // 需要代理的服务(接口)名称

    public Map<Method, Header> headerCache = new ConcurrentHashMap<>();

    // 用于与Zookeeper交互,其中自带缓存
    private Registry<ServerInfo> registry;

    public DemoRpcProxy(String serviceName,
                        Registry<ServerInfo> registry) throws Exception {
        this.serviceName = serviceName;
        this.registry = registry;
    }

    public static <T> T newInstance(Class<T> clazz, Registry<ServerInfo> registry) throws Exception {
        // 创建代理对象
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class[]{clazz},
                new DemoRpcProxy("demoService", registry));
    }


    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // 从Zookeeper缓存中获取可用的Server地址,并随机从中选择一个
        List<ServiceInstance<ServerInfo>> serviceInstances =
                registry.queryForInstances(serviceName);
        ServiceInstance<ServerInfo> serviceInstance =
                serviceInstances.get(ThreadLocalRandom.current().nextInt(serviceInstances.size()));
        // 创建请求消息,然后调用remoteCall()方法请求上面选定的Server端
        String methodName = method.getName();
        Header header = headerCache.computeIfAbsent(method, h -> new Header(MAGIC, VERSION_1));
        Message<Request> message = new Message(header, new Request(serviceName, methodName, args));
        return remoteCall(serviceInstance.getPayload(), message);
    }

    protected Object remoteCall(ServerInfo serverInfo, Message message) throws Exception {
        if (serverInfo == null) {
            throw new RuntimeException("get available server error");
        }
        Object result;
        try {
            // 创建DemoRpcClient连接指定的Server端
            DemoRpcClient demoRpcClient = new DemoRpcClient(serverInfo.getHost(), serverInfo.getPort());
            ChannelFuture channelFuture = demoRpcClient.connect().awaitUninterruptibly();
            // 创建对应的Connection对象,并发送请求
            Connection connection = new Connection(channelFuture, true);
            NettyResponseFuture responseFuture = connection.request(message, Constants.DEFAULT_TIMEOUT);
            // 等待请求对应的响应
            result = responseFuture.getPromise().get(Constants.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw e;
        }
        return result;
    }
}

从 DemoRpcProxy 的实现中我们可以看到,它依赖了 ServiceInstanceCache 获取ZooKeeper 中注册的 Server 端地址,同时依赖了 DemoRpcClient 与Server 端进行通信,上层调用方拿到这个代理对象后,就可以像调用本地方法一样进行调用,而不再关心底层网络通信和服务发现的细节。当然,这个简易版 DemoRpcProxy 的实现还有很多可以优化的地方,例如:

  • 缓存 DemoRpcClient 客户端对象以及相应的 Connection 对象,不必每次进行创建。
  • 可以添加失败重试机制,在请求出现超时的时候,进行重试。
  • 可以添加更加复杂和灵活的负载均衡机制,例如,根据 Hash 值散列进行负载均衡、根据节点 load 情况进行负载均衡等。

你若感兴趣的话可以尝试进行扩展,以实现一个更加完善的代理层。

使用方接入

介绍完 Demo RPC 的核心实现之后,下面我们讲解下Demo RPC 框架的使用方式。这里涉及Consumer、DemoServiceImp、Provider三个类以及 DemoService 业务接口。

相应的业务接口和实现比较简单,我们再来看Provider的实现,它的角色类似于 Dubbo 中的 Provider,其会创建 DemoServiceImpl 这个业务 Bean 并将自身的地址信息暴露出去,如下所示:

public class Provider {
    public static void main(String[] args) throws Exception {
        // 创建DemoServiceImpl,并注册到BeanManager中
        BeanManager.registerBean("demoService", new DemoServiceImpl());
        // 创建ZookeeperRegistry,并将Provider的地址信息封装成ServerInfo
        // 对象注册到Zookeeper
        ZookeeperRegistry<ServerInfo> discovery = new ZookeeperRegistry<>();
        discovery.start();
        ServerInfo serverInfo = new ServerInfo("127.0.0.1", 6666);
        discovery.registerService(
                ServiceInstance.<ServerInfo>builder().name("demoService").payload(serverInfo).build());
        // 启动DemoRpcServer,等待Client的请求
        DemoRpcServer rpcServer = new DemoRpcServer(6666);
        rpcServer.start();
        Thread.sleep(100000000L);
    }
}

最后是Consumer,它类似于 Dubbo 中的 Consumer,其会订阅 Provider 地址信息,然后根据这些信息选择一个 Provider 建立连接,发送请求并得到响应,这些过程在 Proxy 中都予以了封装,那Consumer 的实现就很简单了,可参考如下示例代码:

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 创建ZookeeperRegistr对象
        ZookeeperRegistry<ServerInfo> discovery = new ZookeeperRegistry<>();
        discovery.start();

        // 创建代理对象,通过代理调用远端Server
        DemoService demoService = DemoRpcProxy.newInstance(DemoService.class, discovery);
        // 调用sayHello()方法,并输出结果
        String result = demoService.sayHello("hello");
        System.out.println(result);
        // Thread.sleep(10000000L);
    }
}

总结

本课时我们首先介绍了简易 RPC 框架中的transport 包,它在上一课时介绍的编解码器基础之上,实现了服务端和客户端的通信能力。之后讲解了registry 包如何实现与 ZooKeeper 的交互,完善了简易 RPC 框架的服务注册与服务发现的能力。接下来又分析了proxy 包的实现,其中通过 JDK 动态代理的方式,帮接入方屏蔽了底层网络通信的复杂性。最后,我们编写了一个简单的 DemoService 业务接口,以及相应的 Provider 和 Consumer 接入简易 RPC 框架。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/462266.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Delphi7应用教程学习1.3【练习题目】:文本及悬停文字的显示

这个例子主要用到了btn的Hint 属性&#xff0c;Hint是提示的意思。 还有Delphi7还是很好用的&#xff0c;改变了的属性是粗体&#xff0c;默认没有改变的属性为细体。

插入排序:一种简单而有效的排序算法

插入排序&#xff1a;一种简单而有效的排序算法 一、什么是插入排序&#xff1f;二、插入排序的步骤三、插入排序的C语言实现四、插入排序的性能分析五、插入排序的优化六、总结 在我们日常生活和工作中&#xff0c;排序是一种非常常见的操作。比如&#xff0c;我们可能需要对一…

B端界面又丑又乱,也不会总结规范,来,我给5个规范模板,照着学

发5个别人总结的规范&#xff0c;一定会对你的B端系统改进&#xff0c;有帮助的。

TSINGSEE青犀AI烟火识别等算法打造电瓶车消防安全解决方案

一、背景分析 根据国家消防救援局的统计&#xff0c;2023年全国共接报电动自行车火灾2.1万起&#xff0c;相比2022年上升17.4%&#xff0c;电动自行车火灾安全隐患问题不容忽视。 电瓶车火情主要问题和原因&#xff1a; 电瓶车/电池质量良莠不齐用户安全意识薄弱&#xff0c…

虚拟机NAT模式配置

注意这里IP要和网关在同一网段&#xff0c;且虚拟机默认网关末尾为.2&#xff08;如果默认网关配置为.1会与宿主机冲突&#xff0c;导致无法ping通外网&#xff09; 点击NAT模式下的NAT设置即可查看默认网关 这里的网关可以理解为主机与虚拟机交互的入口

蓝桥杯算法训练VIP-数组查找及替换

题目 1634: 蓝桥杯算法训练VIP-数组查找及替换 时间限制: 3s 内存限制: 192MB 提交: 1629 解决: 890 题目描述 给定某整数数组和某一整数b。要求删除数组中可以被b整除的所有元素&#xff0c;同时将该数组各元素按从小到大排序。如果数组元素数值在A到Z的ASCII之间&#xff0…

MySQL:SQL优化

1. 插入优化 使用insert语句单条单条数据插入效率偏低&#xff0c;建议使用insert批量插入数据&#xff0c;批量控制在500-1000条数据较为合适&#xff0c;当面对数以百万的数据时&#xff0c;可以使用load指令&#xff0c;提升插入数据效率 相关指令 #客户端连接服务端加上参…

Java后端面试经验分享,~纯分享

本文将从面试、工作、学习三个方面分享最近面试的一些心得以及以后发展的一些规划&#xff0c;仅供参考&#xff0c;哈哈&#xff0c;毕竟本人也很菜&#xff0c;因为菜才要多学习。一会儿也会分享两本Java面试题库&#xff08;题库是b站大学找的&#xff0c;一会儿我也会分享出…

C# Onnx C2PNet 图像去雾 室内场景

目录 介绍 效果 模型信息 项目 代码 下载 C# Onnx C2PNet 图像去雾 室内场景 介绍 github地址&#xff1a;GitHub - YuZheng9/C2PNet: [CVPR 2023] Curricular Contrastive Regularization for Physics-aware Single Image Dehazing [CVPR 2023] Curricular Contrasti…

DataGrip 面试题及答案整理,最新面试题

DataGrip的数据库兼容性和多数据库支持如何实现&#xff1f; DataGrip实现数据库兼容性和多数据库支持的方式包括&#xff1a; 1、广泛的数据库支持&#xff1a; DataGrip支持多种数据库&#xff0c;包括但不限于MySQL, PostgreSQL, SQL Server, Oracle, SQLite, 和MongoDB&a…

C++:类之六脉神剑——默认成员函数

个人主页&#xff1a;日刷百题 系列专栏&#xff1a;〖C/C小游戏〗〖Linux〗〖数据结构〗 〖C语言〗 &#x1f30e;欢迎各位→点赞&#x1f44d;收藏⭐️留言&#x1f4dd; ​ ​ 一、默认成员函数 如果一个类中什么成员都没有&#xff0c;简称为 空类 。 空类中真的什么都…

管理类联考–复试–政治--二十大--记忆宫殿

文章目录 整体记忆宫殿门床头柜床书桌阳台 口诀记忆法 整体 记忆宫殿 要有逻辑的放到房间了 何为逻辑&#xff0c;如下大佬总结的便是&#xff0c;或者可自行总结&#xff0c;有前后顺序&#xff0c;做事逻辑即可 第一步&#xff1a;将逻辑的点放到房间里的点&#xff0c;…

每日OJ题_简单多问题dp⑥_力扣714. 买卖股票的最佳时机含手续费

目录 力扣714. 买卖股票的最佳时机含手续费 状态机分析 解析代码 力扣714. 买卖股票的最佳时机含手续费 714. 买卖股票的最佳时机含手续费 难度 中等 给定一个整数数组 prices&#xff0c;其中 prices[i]表示第 i 天的股票价格 &#xff1b;整数 fee 代表了交易股票的手续…

cannot find -xml2: No such file or directory的解决方法

一&#xff0c;问题现象 在编译库的时候出现如下图所示的报错&#xff1a;C:/msys64/mingw32/bin/…/lib/gcc/i686-w64-mingw32/13.2.0/…/…/…/…/i686-w64-mingw32/bin/ld.exe: ca nnot find -lxml2: No such file or directory collect2.exe: error: ld returned 1 exit s…

spring boot集成redis实现共享存储session

spring boot集成redis实现共享存储session redis实现共享存储session 首先下载redis,我下载的版本是5.0.14,目前官网貌似找不到5.x版本&#xff0c;可以自行去网上寻找。我这里的springboot版本是2.6.4引入redis依赖 <!-- https://mvnrepository.com/artifact/org.spring…

火车订票管理系统|基于springboot框架+ Mysql+Java+B/S结构的火车订票管理系统设计与实现(可运行源码+数据库+设计文档)

推荐阅读100套最新项目 最新ssmjava项目文档视频演示可运行源码分享 最新jspjava项目文档视频演示可运行源码分享 最新Spring Boot项目文档视频演示可运行源码分享 目录 前台功能效果图 管理员功能登录前台功能效果图 用户功能模块 系统功能设计 数据库E-R图设计 lunwen…

【已解决】Nginx启动[emerg] bind() to 0.0.0.0:80 failed(98:Address alreadyin use)

原因分析 在Ubuntu系统上启动nginx服务时&#xff0c;出现如下报错&#xff1a; 该错误表明端口 80 已经被其他进程占用&#xff0c;导致 Nginx 无法绑定到该端口上。原因就是系统里面显存一个nginx服务。需要先停下来&#xff0c;才能再次启动服务。 解决步骤 1.执行命名服务…

SpringBoot打造企业级进销存储系统 第五讲

package com.java1234.repository;import com.java1234.entity.Menu; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Query;import java.util.List;/*** 菜单Repository接口*/ public interface MenuReposit…

find_package 总结

本文参考&#xff1a;“轻松搞定CMake”系列之find_package用法详解 原理 find_package 即在指定目录CMAKE_MODULE_PATH 或 CMAKE_PREFIX_PATH查找对应的cmake文件。 find 模式 Module模式(默认)&#xff1a;查询Findxxx.cmake配置文件, 在CMAKE_MODULE_PATH 目录Config模式…

安装PYQT5 遇到Microsoft Visual C++ 14.0 is required解决方法

# Time: 2024/03/16 #Author: Xiaohong # 运行环境: OS: Windows 7 旗舰版 # Python: 3.7.9 Pyqt5 # 目的: 解决安装PYQT5 遇到Microsoft Visual C 14.0 is required 1.安装PYQT5时&#xff0c;遇到Microsoft Visual C 14.0 is required&#xff0c;如图 2.查Microsoft…