从零开始手写RPC框架(4)

这一节主要讲述网络传输模块的代码,并且几乎每一行代码都加上了我个人理解的注释,同时也讲述了其中一些以前没见过的函数,和大致的底层运行逻辑。

目录

  • 网络传输实体类
  • 网络传输实现
    • 基于Socket实现网络传输
    • 基于Netty实现网络传输
      • 客户端
      • 服务端

再重新梳理一下RPC的逻辑,当你需要调用远程方法时,你必须通过网络请求将目标类、方法信息以及方法参数等数据发送到服务器端。这就涉及到了网络传输的问题。
对于网络传输的具体实现,你可以选择使用 Socket —— 这是 Java 中最基础且最原始的网络通信方式。然而,Socket 是阻塞IO,其性能较低且功能单一。你也可以选择使用同步非阻塞的 I/O 模型 NIO,但是使用 NIO 进行网络编程可能会比较复杂。因此,你可以考虑使用基于 NIO 的网络编程框架 Netty,它是最佳选择。

网络传输模块整体结构如下:

请添加图片描述

一共被分为了 4 个包:

1. constants : 存放一些网络传输模块共用的常量
2. dto : 用于网络传输的类。
3. handler : 里面只有一个用于处理 rpc 请求的类RpcRequestHandler(根据 rpc 请求调用目标类的目标方法)。
4. transport : 用户网络传输相关类(真正传输网络请求的地方。提供了 Socket 和 Netty 两种网络传输方式)。

下面分别进行介绍。

网络传输实体类

网络传输实体类在 dto 包下,主要有两个类。RpcRequest.java和RpcResponse.java

首先是RpcRequest.java——rpc 请求实体类。当你要调用远程方法的时候,你需要先传输一个 RpcRequest 给对方, RpcRequest里面包含了要调用的目标方法和类的名称、参数等数据。

@AllArgsConstructor
@NoArgsConstructor
@Getter
@Builder
@ToString
public class RpcRequest implements Serializable {
    private static final long serialVersionUID = 1905122041950251207L;// 序列化版本号
    private String requestId;// 请求ID
    private String interfaceName;// 接口名称
    private String methodName;// 方法名称
    private Object[] parameters;// 参数列表
    private Class<?>[] paramTypes;// 参数类型列表
    private String version;// 版本号 主要是为后续不兼容升级提供可能
    private String group;// 分组 主要用于处理一个接口有多个类实现的情况

    /**
     * 获取RPC服务名称
     *
     * @return 返回接口名称、分组和版本号的组合
     */
    public String getRpcServiceName() {
        return this.getInterfaceName() + this.getGroup() + this.getVersion();
    }
}

然后是RpcResponse.java——rpc 响应实体类,当服务端通过 RpcRequest 中的相关数据调用到目标服务的目标方法之后,调用结果就通过RpcResponse 返回给客户端。

@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@Builder
@ToString
public class RpcResponse<T> implements Serializable {

    private static final long serialVersionUID = 715745410605631233L;// 序列化版本号
    private String requestId;// 请求ID
    private Integer code;// 响应码
    private String message;// 响应消息
    private T data;// 响应体

    /**
     * 成功响应
     * @param data 响应数据
     * @param requestId 请求ID
     * @return 返回一个包含成功响应码、消息和请求ID的响应对象
     */
    public static <T> RpcResponse<T> success(T data, String requestId) {
        RpcResponse<T> response = new RpcResponse<>();
        response.setCode(RpcResponseCodeEnum.SUCCESS.getCode());
        response.setMessage(RpcResponseCodeEnum.SUCCESS.getMessage());
        response.setRequestId(requestId);
        if (null != data) {
            response.setData(data);
        }
        return response;
    }


    /**
     * 失败响应
     * @param rpcResponseCodeEnum 响应码枚举
     * @return 返回一个包含失败响应码和消息的响应对象
     */
    public static <T> RpcResponse<T> fail(RpcResponseCodeEnum rpcResponseCodeEnum) {
        RpcResponse<T> response = new RpcResponse<>();
        response.setCode(rpcResponseCodeEnum.getCode());
        response.setMessage(rpcResponseCodeEnum.getMessage());
        return response;
    }
}



网络传输实现

这部分基于 Socket,和基于 Netty 的网络传输方式实现。因此,先定义一个发送 RPC 请求的顶层接口,然后分别使用 Socket 和 Netty 两种方式对这个
接口进行实现即可,RpcRequestTransport.java 传输请求的接口:

@SPI
public interface RpcRequestTransport {
    /**
     * send rpc request to server and get result
     *
     * @param rpcRequest message body
     * @return data from server
     */
    Object sendRpcRequest(RpcRequest rpcRequest);
}

其中@SPI 是一个自定义注解

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface SPI {
}

在 Java 中,SPI 是一种服务发现机制。它允许第三方为应用程序提供插件或模块。在运行时,应用程序可以查询哪些插件或模块可用,并选择其中之一进行调用。
具体到我们的代码中,@SPI 被用于标记 RpcRequestTransport 接口。这意味着 RpcRequestTransport 的实现可以由第三方提供,并在运行时动态加载。这样,RPC 框架就可以支持多种传输协议,比如 HTTP、TCP、UDP 等,只要有相应的 RpcRequestTransport 实现即可。

下面,我们先来看一下比较简单点的使用 Socket 进行网络传输的方式。

基于Socket实现网络传输

客户端

客户端主要用于发送网络请求到服务端(目标方法所在的服务器)。当我们知道了服务端的地址之后,我们就可以通过 SocketRpcClient 发送 rpc 请求(RpcRequest) 到服务端了(如果我们要找到服务端的地址,涉及到了注册中心相关的知识,下一节会介绍)

@AllArgsConstructor
@Slf4j
public class SocketRpcClient implements RpcRequestTransport {
    private final ServiceDiscovery serviceDiscovery;// 服务发现组件

    /**
     * 构造函数
     * 默认使用 ZooKeeper 作为服务发现组件
     */
    public SocketRpcClient() {
        //ExtensionLoader 是一个用于加载扩展实现的工具类,它实现了一种称为 SPI(Service Provider Interface)的设计模式。
        //getExtensionLoader(ServiceDiscovery.class):获取 ServiceDiscovery 接口的 ExtensionLoader。如果缓存中没有,就创建一个新的 ExtensionLoader 并放入缓存。
        //getExtension(ServiceDiscoveryEnum.ZK.getName()):从 ExtensionLoader 中获取名为 ServiceDiscoveryEnum.ZK.getName() 的扩展实现。如果缓存中没有,就创建一个新的实例并放入缓存。
        this.serviceDiscovery = ExtensionLoader.getExtensionLoader(ServiceDiscovery.class).getExtension(ServiceDiscoveryEnum.ZK.getName());
    }

    /**
     * 发送 RPC 请求
     *
     * @param rpcRequest RPC 请求
     * @return 服务端返回的数据
     */
    @Override
    public Object sendRpcRequest(RpcRequest rpcRequest) {
        InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest);// 通过服务发现组件获取服务端地址
        try (Socket socket = new Socket()) {
            socket.connect(inetSocketAddress);// 连接到服务端
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            // 通过输出流向服务端发送数据
            objectOutputStream.writeObject(rpcRequest);
            ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
            // 通过输入流读取服务端返回的数据
            return objectInputStream.readObject();
        } catch (IOException | ClassNotFoundException e) {
            throw new RpcException("调用服务失败:", e);
        }
    }
}

“拓展类"通常指的是一种设计模式,叫做"服务提供者接口”(SPI,Service Provider Interface)。在这种模式中,有一个接口(或抽象类),以及这个接口的多个实现类,这些实现类就是所谓的"拓展类"。这种模式允许第三方为一个模块、库、框架或者应用提供插件或者拓展。


ExtensionLoader类的主要作用是加载扩展类。它首先从META-INF/extensions/目录下的文件中读取扩展类的信息,然后通过类加载器加载这些扩展类,并将它们存储在一个映射中。当需要获取一个扩展类的实例时,它会首先从缓存中获取,如果缓存中没有,就创建一个新的实例。


使用ExtensionLoader来加载拓展类,而不是直接通过包引用,有以下几个优点:

灵活性:使用ExtensionLoader可以在运行时动态地加载和卸载拓展类,而不需要在编译时确定。这使得你可以很容易地添加、替换或删除一个拓展。
解耦:ExtensionLoader使得你的代码和拓展类之间的耦合度降低。你的代码不需要直接引用拓展类,只需要通过ExtensionLoader来使用拓展。这使得你的代码更容易维护和测试。
隔离:ExtensionLoader可以为每个拓展类提供一个独立的类加载器,这样就可以防止拓展类之间的冲突和干扰。

服务端

Socket 服务端。用于等待客户端连接。当客户端成功连接之后,就可以发送 rpc 请求( RpcRequest ) 到服务端了。然后,服务端拿到 RpcRequest 就会去执行对应的方法。执行完对应的方法之后,就把执行得到的结果放在RpcResponse 中返回给客户端。

/**
 * Socket 服务端
 * 用于等待客户端连接并处理请求
 */
@Slf4j
public class SocketRpcServer {

    private final ExecutorService threadPool;// 线程池,用于处理客户端请求
    private final ServiceProvider serviceProvider;// 服务提供者,用于注册和查找服务


    public SocketRpcServer() {
        threadPool = ThreadPoolFactoryUtil.createCustomThreadPoolIfAbsent("socket-server-rpc-pool");
        serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class);
    }

    /**
     * 注册服务
     * @param rpcServiceConfig 服务配置
     */
    public void registerService(RpcServiceConfig rpcServiceConfig) {
        serviceProvider.publishService(rpcServiceConfig);
    }

    /**
     * 启动服务
     * 开始监听客户端连接并处理请求
     */
    public void start() {
        try (ServerSocket server = new ServerSocket()) { // 创建一个新的 ServerSocket
            String host = InetAddress.getLocalHost().getHostAddress();// 获取本地主机的 IP 地址
            server.bind(new InetSocketAddress(host, PORT));// 将 ServerSocket 绑定到指定的 IP 地址和端口号
            CustomShutdownHook.getCustomShutdownHook().clearAll();// 添加一个自定义的关闭钩子,当 JVM 关闭时,这个关闭钩子会执行 clearAll 方法
            //clearAll()方法为了告诉其他 RPC 服务或客户端,当前服务器已经不再提供服务,会清除注册中心中的当前服务器信息,并且关闭所有线程池,释放资源。
            Socket socket;
            while ((socket = server.accept()) != null) {// 循环接受客户端的连接
                log.info("client connected [{}]", socket.getInetAddress());// 记录客户端的 IP 地址
                threadPool.execute(new SocketRpcRequestHandlerRunnable(socket));// 将新的客户端连接提交给线程池处理
            }
            threadPool.shutdown();// 关闭线程池
        } catch (IOException e) {
            log.error("occur IOException:", e);
        }
    }
}

其中SocketRpcRequestHandlerRunnable.java的代码如下

/**
 * Socket 请求处理器
 * 用于处理来自客户端的请求
 */
@Slf4j
public class SocketRpcRequestHandlerRunnable implements Runnable {
    private final Socket socket;// 客户端 Socket 连接
    private final RpcRequestHandler rpcRequestHandler;// RPC 请求处理器


    public SocketRpcRequestHandlerRunnable(Socket socket) {
        this.socket = socket;
        this.rpcRequestHandler = SingletonFactory.getInstance(RpcRequestHandler.class);
    }

    /**
     * 处理客户端请求
     */
    @Override
    public void run() {
        log.info("server handle message from client by thread: [{}]", Thread.currentThread().getName());// 记录当前线程的名称
        try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
             ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
            RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();// 从输入流读取 RpcRequest 对象
            Object result = rpcRequestHandler.handle(rpcRequest);// 处理 RpcRequest,得到处理结果
            objectOutputStream.writeObject(RpcResponse.success(result, rpcRequest.getRequestId()));// 将处理结果包装成 RpcResponse,写入到输出流
            objectOutputStream.flush();// 刷新输出流,确保 RpcResponse 对象被发送到客户端
        } catch (IOException | ClassNotFoundException e) {
            log.error("occur exception:", e);
        }
    }

}

基于Netty实现网络传输

Netty 这部分的原理也差不多,不过实现代码差别很大。

客户端

Netty 客户端NettyClient.java主要提供了:

  • doConnect() :用于连接服务端(目标方法所在的服务器)并返回对应的 Channel 。当我们知道了服务端的地址之后,我们就可以通过 NettyClient 成功连接服务端了。(有了Channel 之后就能发送数据到服务端了)
  • sendRpcRequest() : 用于传输 rpc 请求( RpcRequest ) 到服务端。
@Slf4j
public final class NettyRpcClient implements RpcRequestTransport {
    private final ServiceDiscovery serviceDiscovery;// 服务发现组件
    private final UnprocessedRequests unprocessedRequests;// 未处理的请求
    private final ChannelProvider channelProvider;// 通道提供者
    private final Bootstrap bootstrap;// Netty 启动类
    private final EventLoopGroup eventLoopGroup;// Netty 事件循环组

    public NettyRpcClient() {//构造函数初始化资源,如 EventLoopGroup、Bootstrap 等
        eventLoopGroup = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)// 设置事件循环组
                .channel(NioSocketChannel.class)// 设置通道类型为 NioSocketChannel
                .handler(new LoggingHandler(LogLevel.INFO))// 添加日志处理器
                // 设置连接超时时间,如果超过这个时间还未连接成功,则连接失败
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {// 添加通道初始化器
                        ChannelPipeline p = ch.pipeline();
                        // 如果在 15 秒内没有向服务器发送数据,就发送一个心跳请求
                        p.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
                        p.addLast(new RpcMessageEncoder());// 添加 RPC 消息编码器
                        p.addLast(new RpcMessageDecoder());// 添加 RPC 消息解码器
                        p.addLast(new NettyRpcClientHandler());// 添加 Netty RPC 客户端处理器
                    }
                });
        // 获取服务发现组件的实例
        this.serviceDiscovery = ExtensionLoader.getExtensionLoader(ServiceDiscovery.class).getExtension(ServiceDiscoveryEnum.ZK.getName());
        // 获取未处理的请求的实例
        this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class);
        // 获取通道提供者的实例
        this.channelProvider = SingletonFactory.getInstance(ChannelProvider.class);
    }

    /**
     * 连接服务端并返回对应的 Channel
     * @param inetSocketAddress 服务端地址
     * @return 服务端的 Channel
     */
    @SneakyThrows
    public Channel doConnect(InetSocketAddress inetSocketAddress) {
        // 创建一个 CompletableFuture 对象,用于存储 Channel
        CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
        // 使用 Bootstrap 的 connect 方法连接到服务端
        bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {// 如果连接成功记录连接成功的日志
                log.info("The client has connected [{}] successful!", inetSocketAddress.toString());
                completableFuture.complete(future.channel());// 将 Channel 存入 CompletableFuture
            } else {
                throw new IllegalStateException();
            }
        });
        return completableFuture.get();// 返回 CompletableFuture 中的 Channel
    }


    /**
     * 发送 RPC 请求
     * @param rpcRequest RPC 请求
     * @return 服务端返回的数据
     */
    @Override
    public Object sendRpcRequest(RpcRequest rpcRequest) {
        // 创建 CompletableFuture 对象,用于存储 RPC 响应的结果
        CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
        // 通过服务发现组件获取服务端的地址
        InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest);
        // 获取与服务端地址相关的通道
        Channel channel = getChannel(inetSocketAddress);
        if (channel.isActive()) {// 检查通道是否活跃
            // 将未处理的请求放入 unprocessedRequests
            unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);
            // 创建 RPC 消息对象
            RpcMessage rpcMessage = RpcMessage.builder().data(rpcRequest)
                    .codec(SerializationTypeEnum.HESSIAN.getCode())
                    .compress(CompressTypeEnum.GZIP.getCode())
                    .messageType(RpcConstants.REQUEST_TYPE).build();
            // 将 RPC 消息对象写入通道并刷新,同时添加一个监听器处理发送失败的情况
            channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
                if (future.isSuccess()) {
                    log.info("client send message: [{}]", rpcMessage);
                } else {// 如果消息发送失败,关闭通道,并将异常存入 resultFuture
                    future.channel().close();
                    resultFuture.completeExceptionally(future.cause());
                    log.error("Send failed:", future.cause());
                }
            });
        } else {
            throw new IllegalStateException();
        }

        return resultFuture; // 返回 CompletableFuture 对象
    }


    /**
     * 获取通道
     * @param inetSocketAddress 服务端地址
     * @return 与服务端地址相关的通道
     */
    public Channel getChannel(InetSocketAddress inetSocketAddress) {
        Channel channel = channelProvider.get(inetSocketAddress);
        if (channel == null) {
            channel = doConnect(inetSocketAddress);
            channelProvider.set(inetSocketAddress, channel);
        }
        return channel;
    }

    // 优雅地关闭事件循环组,即在关闭事件循环组之前,会等待所有任务都完成,包括正在执行的任务和提交的但还未执行的任务
    public void close() {
        eventLoopGroup.shutdownGracefully();
    }
}

CompletableFuture 是 Java 8 引入的一个类,它实现了 Future 和 CompletionStage 接口,提供了一种异步编程的方式。Future 是 Java 5 引入的一个接口,用于表示异步计算的结果。但是,Future 的功能比较有限,例如,它无法表示计算完成后的回调,也无法组合多个 Future 的结果。CompletableFuture 弥补了这些不足,提供了丰富的方法来处理异步计算的结果。


在我们的代码中,CompletableFuture 被用于存储 RPC 响应的结果。当 RPC 响应返回时,CompletableFuture 的 complete 方法被调用,将结果存入 CompletableFuture。然后,可以通过 CompletableFuture 的 get 方法来获取结果。如果结果还未返回,get 方法会阻塞,直到结果返回为止。


其中UnprocessedRequests.java用于存放未被服务端处理的请求(建议限制 map 容器大小,避免未处理请求过多 OOM)。

public class UnprocessedRequests {
    // 存储未处理的请求
    private static final Map<String, CompletableFuture<RpcResponse<Object>>> UNPROCESSED_RESPONSE_FUTURES = new ConcurrentHashMap<>();

    /**
     * 将未处理的请求放入 UNPROCESSED_RESPONSE_FUTURES
     * @param requestId 请求 ID
     * @param future 未来的 RPC 响应
     */
    public void put(String requestId, CompletableFuture<RpcResponse<Object>> future) {
        UNPROCESSED_RESPONSE_FUTURES.put(requestId, future);
    }


    /**
     * 完成 RPC 响应
     * @param rpcResponse RPC 响应
     */
    public void complete(RpcResponse<Object> rpcResponse) {
        // 从 UNPROCESSED_RESPONSE_FUTURES 中移除请求并获取对应的 CompletableFuture
        CompletableFuture<RpcResponse<Object>> future = UNPROCESSED_RESPONSE_FUTURES.remove(rpcResponse.getRequestId());
        // 如果 CompletableFuture 存在,则完成它
        if (null != future) {
            future.complete(rpcResponse);
        } else {
            throw new IllegalStateException();
        }
    }
}

自定义客户端 ChannelHandler 用于处理服务器返回的数据。下面是NettyRpcClientHandler.java

@Slf4j
public class NettyRpcClientHandler extends ChannelInboundHandlerAdapter {
    private final UnprocessedRequests unprocessedRequests;// 未处理的请求
    private final NettyRpcClient nettyRpcClient;// Netty RPC 客户端

    public NettyRpcClientHandler() {
        this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class);
        this.nettyRpcClient = SingletonFactory.getInstance(NettyRpcClient.class);
    }

    /**
     * 读取服务器传输的消息
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {// 当从服务器接收到一条消息时被调用
        try {
            log.info("client receive msg: [{}]", msg);
            if (msg instanceof RpcMessage) {// 如果消息是RpcMessage类型
                RpcMessage tmp = (RpcMessage) msg;
                byte messageType = tmp.getMessageType();
                if (messageType == RpcConstants.HEARTBEAT_RESPONSE_TYPE) {// 如果是心跳响应类型
                    log.info("heart [{}]", tmp.getData());
                } else if (messageType == RpcConstants.RESPONSE_TYPE) {// 如果是响应类型
                    RpcResponse<Object> rpcResponse = (RpcResponse<Object>) tmp.getData();// 获取响应数据
                    unprocessedRequests.complete(rpcResponse);// 完成未处理的请求
                }
            }
        } finally {
            ReferenceCountUtil.release(msg);// 释放消息资源
        }
    }

    //userEventTriggered也是netty的一个回调方法 当发生空闲状态事件时被调用
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {// 如果事件是空闲状态事件
            IdleState state = ((IdleStateEvent) evt).state();// 获取空闲状态
            if (state == IdleState.WRITER_IDLE) {// 如果是写空闲状态
                log.info("write idle happen [{}]", ctx.channel().remoteAddress());// 记录写空闲事件
                Channel channel = nettyRpcClient.getChannel((InetSocketAddress) ctx.channel().remoteAddress());// 获取与远程地址相关的通道
                RpcMessage rpcMessage = new RpcMessage();// 创建一个新的RpcMessage
                rpcMessage.setCodec(SerializationTypeEnum.PROTOSTUFF.getCode());// 设置编解码类型
                rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());// 设置压缩类型
                rpcMessage.setMessageType(RpcConstants.HEARTBEAT_REQUEST_TYPE);// 设置消息类型为心跳请求类型
                rpcMessage.setData(RpcConstants.PING);// 设置数据为PING
                channel.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);// 将消息写入并刷新到通道,如果失败则关闭通道
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    /**
     * 当处理客户端消息时发生异常时调用
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("client catch exception:", cause);
        cause.printStackTrace();// 打印异常堆栈跟踪
        ctx.close();
    }

}

在Netty中,空闲状态事件是指在一段时间内没有进行读操作、写操作或者两者都没有进行的情况。这是通过IdleStateHandler来检测的,它会在指定的空闲时间后触发一个IdleStateEvent事件。
空闲状态有三种类型:

读空闲(READER_IDLE):在一段时间内没有读取到对方的数据,也就是说,如果在指定的时间内没有接收到对方的数据,那么就会触发这个事件。
写空闲(WRITER_IDLE):在一段时间内没有向对方写数据,也就是说,如果在指定的时间内没有向对方发送数据,那么就会触发这个事件。
读写空闲(ALL_IDLE):在一段时间内既没有读取到对方的数据,也没有向对方写数据,也就是说,如果在指定的时间内既没有接收到对方的数据,也没有向对方发送数据,那么就会触发这个事件。

在我们的代码中,当发生写空闲事件时,客户端会向服务器发送一个心跳请求。这是因为,如果客户端在一段时间内没有向服务器发送数据,可能会导致服务器认为客户端已经断开连接,从而关闭连接。为了保持连接的活跃,当发生写空闲事件时,客户端会向服务器发送一个心跳请求,告诉服务器它还在。这样,即使在没有数据交换的情况下,客户端和服务器之间的连接也能保持活跃。这就是心跳机制的作用。

从代码中,可以看出当 rpc 请求被成功处理(客户端收到服务端的执行结果)之后,我们调用了unprocessedRequests.complete(rpcResponse) 方法,这样的话,你只需要通过下面的方式就能成
功接收到服务端返回的结果。

CompletableFuture<RpcResponse> completableFuture =(CompletableFuture<RpcResponse>) clientTransport.sendRpcRequest(rpcRequest);
rpcResponse = completableFuture.get();

然后是ChannelProvider.java用于存放Channel ( Channel 用于在服务端和客户端之间传输数据)。

@Slf4j
public class ChannelProvider {

    private final Map<String, Channel> channelMap;// 存储通道的映射

    public ChannelProvider() {
        channelMap = new ConcurrentHashMap<>();
    }

    /**
     * 获取通道
     * @param inetSocketAddress 服务端地址
     * @return 与服务端地址相关的通道
     */
    public Channel get(InetSocketAddress inetSocketAddress) {
        String key = inetSocketAddress.toString();// 将服务端地址转换为字符串作为键
        // 判断是否存在对应地址的连接
        if (channelMap.containsKey(key)) {
            Channel channel = channelMap.get(key);
            // 如果存在,则判断连接是否可用,如果可用,则直接获取
            if (channel != null && channel.isActive()) {
                return channel;
            } else {
                channelMap.remove(key);
            }
        }
        return null;
    }

    /**
     * 设置通道
     * @param inetSocketAddress 服务端地址
     * @param channel 通道
     */
    public void set(InetSocketAddress inetSocketAddress, Channel channel) {
        String key = inetSocketAddress.toString();
        channelMap.put(key, channel);
    }


    /**
     * 移除通道
     * @param inetSocketAddress 服务端地址
     */
    public void remove(InetSocketAddress inetSocketAddress) {
        String key = inetSocketAddress.toString();
        channelMap.remove(key);
        log.info("Channel map size :[{}]", channelMap.size());
    }
}




服务端

NettyRpcServer.java,Netty 服务端,监听客户端的连接。另外,还提供了两个用户手动注册服务的方法(还可以通过注解RpcService 注册服务,这个后面也会介绍到)。

@Slf4j
@Component
public class NettyRpcServer {

    public static final int PORT = 9998;// 服务端口

    // 服务提供者,用于注册和查找服务
    private final ServiceProvider serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class);

    /**
     * 注册服务
     * @param rpcServiceConfig 服务配置
     */
    public void registerService(RpcServiceConfig rpcServiceConfig) {
        serviceProvider.publishService(rpcServiceConfig);
    }

    /**
     * 启动服务
     */
    @SneakyThrows// Lombok提供的注解,用于处理所有受检异常在方法体中自动捕获并处理异常,将异常转换为非受检异常(Unchecked Exception)并抛出。
    public void start() {
        // 添加一个自定义的关闭钩子,当 JVM 关闭时,这个关闭钩子会执行 clearAll 方法
        //clearAll()方法为了告诉其他 RPC 服务或客户端,当前服务器已经不再提供服务,会清除注册中心中的当前服务器信息,并且关闭所有线程池,释放资源。
        CustomShutdownHook.getCustomShutdownHook().clearAll();// 获取自定义关闭钩子实例并调用其clearAll方法
        String host = InetAddress.getLocalHost().getHostAddress();// 获取本地主机地址
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);//创建一个bossGroup,它负责接收客户端的连接,指定线程数为1
        EventLoopGroup workerGroup = new NioEventLoopGroup();// 创建一个workerGroup,它负责处理已接受的连接
        DefaultEventExecutorGroup serviceHandlerGroup = new DefaultEventExecutorGroup(
                RuntimeUtil.cpus() * 2,// 创建一个线程数为CPU核数的两倍的DefaultEventExecutorGroup
                // 使用ThreadPoolFactoryUtil创建一个线程工厂
                ThreadPoolFactoryUtil.createThreadFactory("service-handler-group", false)
        );
        try {
            ServerBootstrap b = new ServerBootstrap();// 创建一个ServerBootstrap实例
            b.group(bossGroup, workerGroup)// 设置bossGroup和workerGroup
                    .channel(NioServerSocketChannel.class)// 设置通道为NioServerSocketChannel
                    // TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。
                    .childOption(ChannelOption.TCP_NODELAY, true)// 设置TCP_NODELAY为true,禁用Nagle算法
                    // 是否开启 TCP 底层心跳机制
                    .childOption(ChannelOption.SO_KEEPALIVE, true)// 设置SO_KEEPALIVE为true,开启TCP底层心跳机制
                    //表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .handler(new LoggingHandler(LogLevel.INFO))// 添加一个日志处理器
                    // 当客户端第一次进行请求的时候才会进行初始化
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline p = ch.pipeline();// 获取通道的管道
                            // 添加一个空闲状态处理器,如果30秒内没有收到客户端的请求,就关闭连接
                            p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
                            p.addLast(new RpcMessageEncoder());// 添加一个RpcMessage编码器
                            p.addLast(new RpcMessageDecoder());// 添加一个RpcMessage解码器
                            p.addLast(serviceHandlerGroup, new NettyRpcServerHandler());// 添加一个NettyRpcServerHandler处理器
                        }
                    });

            ChannelFuture f = b.bind(host, PORT).sync(); // 绑定主机和端口,并同步等待绑定成功
            // 等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {// 记录启动服务器时发生的异常
            log.error("occur exception when start server:", e);
        } finally {
            log.error("shutdown bossGroup and workerGroup");
            // 优雅地关闭事件循环组,即在关闭事件循环组之前,会等待所有任务都完成,包括正在执行的任务和提交的但还未执行的任务
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            serviceHandlerGroup.shutdownGracefully();
        }
    }
}


然后是NettyServerHandler.java,自定义服务端ChannelHandler 用于处理客户端发送的数据。当客户端发的 rpc 请求( RpcRequest ) 来了之后,服务端就会处理 rpc 请求( RpcRequest ) ,处理完之后就把得到 rpc 相应( RpcResponse )传输给客户端。

@Slf4j
public class NettyRpcServerHandler extends ChannelInboundHandlerAdapter {

    private final RpcRequestHandler rpcRequestHandler;//RpcRequestHandler实例,用于处理Rpc请求

    public NettyRpcServerHandler() {
        this.rpcRequestHandler = SingletonFactory.getInstance(RpcRequestHandler.class);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {// 当从客户端接收到一条消息时被调用
        try {
            if (msg instanceof RpcMessage) {// 如果消息是RpcMessage类型
                log.info("server receive msg: [{}] ", msg);
                byte messageType = ((RpcMessage) msg).getMessageType();// 获取消息类型
                RpcMessage rpcMessage = new RpcMessage();// 创建一个新的RpcMessage用作响应
                rpcMessage.setCodec(SerializationTypeEnum.HESSIAN.getCode());// 设置编解码类型
                rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());// 设置压缩类型
                if (messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE) {// 如果是心跳请求类型
                    rpcMessage.setMessageType(RpcConstants.HEARTBEAT_RESPONSE_TYPE);// 设置响应消息类型为心跳响应类型
                    rpcMessage.setData(RpcConstants.PONG);//设置数据为PONG
                } else {//不是心跳请求类型 说明是rpc请求
                    RpcRequest rpcRequest = (RpcRequest) ((RpcMessage) msg).getData();// 获取请求数据
                    Object result = rpcRequestHandler.handle(rpcRequest);// 执行目标方法(客户端需要执行的方法)并返回方法结果
                    log.info(String.format("server get result: %s", result.toString()));// 记录获取到的结果
                    rpcMessage.setMessageType(RpcConstants.RESPONSE_TYPE);// 设置消息类型为响应类型
                    if (ctx.channel().isActive() && ctx.channel().isWritable()) {// 如果通道是活跃的并且是可写的
                        RpcResponse<Object> rpcResponse = RpcResponse.success(result, rpcRequest.getRequestId());
                        rpcMessage.setData(rpcResponse);// 创建一个成功的Rpc响应并设置响应数据
                    } else {// 如果通道是不活跃的或者不可写的
                        RpcResponse<Object> rpcResponse = RpcResponse.fail(RpcResponseCodeEnum.FAIL);
                        rpcMessage.setData(rpcResponse);// 创建一个失败的Rpc响应并// 设置响应数据
                        log.error("not writable now, message dropped");
                    }
                }
                // 将消息写入并刷新到通道,如果失败则关闭通道
                ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        } finally {
            //释放消息资源仿真内存泄露
            ReferenceCountUtil.release(msg);
        }
    }

    // 当发生用户事件时被调用即一段时间内没有读取到客户端的数据,那么就关闭连接。
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {// 如果事件是空闲状态事件
            IdleState state = ((IdleStateEvent) evt).state();// 获取空闲状态
            if (state == IdleState.READER_IDLE) {// 如果是读空闲状态
                log.info("idle check happen, so close the connection");
                ctx.close();// 关闭通道处理上下文
            }
        } else {
            super.userEventTriggered(ctx, evt);// 如果不是空闲状态事件,调用父类的userEventTriggered方法
            //父类的这个方法默认实现是不做任何事情但,如果有其他的ChannelInboundHandler在管道中,
            // 并且这个ChannelInboundHandler重写了userEventTriggered方法,那么这个方法就会被调用。
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 当处理客户端消息时发生异常时调用
        log.error("server catch exception");// 记录异常信息
        cause.printStackTrace();// 打印异常堆栈跟踪
        ctx.close();// 关闭通道处理上下文
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/429079.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

华为---MSTP(一)---MSTP生成树协议

目录 1. MSTP技术产生背景 2. STP/RSTP的缺陷 ​编辑 2.1 无法均衡流量负载 2.2 数据使用次优路径 3. MSTP生成树协议 3.1 MSTP相关概念 3.2 MSTP树生成的形成过程 4. MSTP报文 1. MSTP技术产生背景 RSTP在STP基础上进行了改进&#xff0c;实现了网络拓扑快速收敛。但…

【k8s管理--可视化界面】

1、可视化界面的软件 kubernetes的可视化软件有以下这些kubernetes dashboard&#xff1a;https://github.com/kubernetes/dashboardkubesphere官网&#xff1a; https://kubesphere.io/zh/rancher 官网&#xff1a; https://www.rancher.cn/kuboard 官网&#xff1a; https:/…

C++11常用知识分享(一)【列表初始化 || 简化声明 || 范围for || 左右值 || 可变参数模板】

目录 一. 列表初始化 1&#xff09;用法 2) initializer_list 小节&#xff1a; 二&#xff0c;简化声明 1) &#xff0c;auto 2) &#xff0c;decltype类 3)&#xff0c;nullptr 三&#xff0c;范围for 四&#xff0c;C11后&#xff0c;STL容器变化 五&#xff0c…

【数据结构】实现堆

大家好&#xff0c;我是苏貝&#xff0c;本篇博客带大家了解堆&#xff0c;如果你觉得我写的还不错的话&#xff0c;可以给我一个赞&#x1f44d;吗&#xff0c;感谢❤️ 目录 一. 堆的概念及结构二. 堆的实现堆的结构体初始化销毁插入数据删除数据&#xff08;默认删除堆顶即…

【JS】WebSocket实现简易聊天室

【JS】WebSocket实现简易聊天室 聊天室思路示例 聊天室思路 聊天室思路 1、连接服务器先建立连接&#xff0c;默认生成匿名用户(admin01) 2、客户端发送消息&#xff0c;其它客户端用户都会同步接收消息(服务端接受消息广播所有连接用户) 3、客户端修改昵称&#xff0c;其它客…

鸿蒙应用组件

基础组件 索引组件—AlphabetIndexer&#xff08;相当于安卓的seedbar&#xff09; 使用&#xff1a;AlphabetIndexer(value: {arrayValue: Array<string>, selected: number})空白填充组件—Blank&#xff08;占位使用&#xff0c;当父组件为Row/Column/Flex时生效&am…

[SpringCloud] OpenFeign核心架构原理 (一)

Feign的本质: 动态代理 七大核心组件 Feign底层是基于JDK动态代理来的, Feign.builder()最终构造的是一个代理对象, Feign在构建对象的时候会解析方法上的注解和参数, 获取Http请求需要用到基本参数以及和这些参数和方法参数的对应关系。然后发送Http请求, 获取响应, 再根据响…

2024-3-4 市场分歧视角

今天市场有一个单独分歧视角可以观察思考&#xff0c;竞价氢能这边严重不符合预期&#xff0c;隔夜单 四川金顶 和 东方精工 大幅减少&#xff0c;预期就是这两个货会高位分歧&#xff0c;最高板 东方精工 开盘就是瀑布杀&#xff0c;四川金顶先杀到1个多点&#xff0c;9&#…

分库分表如何管理不同实例中几万张分片表?

在进行分库分表设计时&#xff0c;确认好了数据节点数量和分片策略以后&#xff0c;接下来要做的就是管理大量的分片表。实际实施过程中可能存在上百个分片数据库实例&#xff0c;每个实例中都可能有成千上万个分片表&#xff0c;如果仅依靠人力来完成这些任务显然是不现实的。…

【AI视野·今日Robot 机器人论文速览 第八十期】Fri, 1 Mar 2024

AI视野今日CS.Robotics 机器人学论文速览 Fri, 1 Mar 2024 Totally 32 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Robotics Papers Humanoid Locomotion as Next Token Prediction Authors Ilija Radosavovic, Bike Zhang, Baifeng Shi, Jathushan Rajasegaran…

Linux--文件(2)-重定向和文件缓冲

命令行中的重定向符号 介绍和使用 在Linux的命令行中&#xff0c;重定向符号用于将命令的输入或输出重定向到文件或设备。 常见的重定向符号&#xff1a; 1.“>“符号&#xff1a;将命令的标准输出重定向到指定文件中&#xff0c;并覆盖原有的内容。 2.”>>“符号&a…

《高效使用Redis》- 由面试题“Redis是否为单线程”引发的思考

由面试题“Redis是否为单线程”引发的思考 很多人都遇到过这么一道面试题&#xff1a;Redis是单线程还是多线程&#xff1f;这个问题既简单又复杂。说他简单是因为大多数人都知道Redis是单线程&#xff0c;说复杂是因为这个答案其实并不准确。 难道Redis不是单线程&#xff1f…

springboot项目单纯使用nacos注册中心功能

Spring Boot 项目完全可以单独使用 Nacos 作为注册中心。Nacos 是一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。它支持服务的注册与发现&#xff0c;能够与 Spring Boot 应用无缝集成&#xff0c;为微服务架构提供了强大的支持。 在使用 Nacos 作为注册中…

#QT(串口助手-界面)

1.IDE&#xff1a;QTCreator 2.实验&#xff1a;编写串口助手 3.记录 接收框:Plain Text Edit 属性选择&#xff1a;Combo Box 发送框:Line Edit 广告&#xff1a;Group Box &#xff08;1&#xff09;仿照现有串口助手设计UI界面 &#xff08;2&#xff09;此时串口助手大…

从0搭建Azure DevOps Server

Windows虚拟机搭建DevOps 服务器 背景资源准备安装软件需求流程版本兼容性安装SQL ServerSSMS安装visual StudioAzure DevOps Server测试本地访问端口更改及外界访问 背景 搭建一台Azure DevOps Server 供我们运维项目开发&#xff0c;现在DevOps运维已成为一个主流&#xff0…

【金三银四】每日一点面试题(Java--JVM篇)

1、说一下 JVM 的主要组成部分及其作用&#xff1f; JVM&#xff08;Java虚拟机&#xff09;是Java程序运行的核心组件&#xff0c;它负责将Java字节码翻译成底层操作系统能够执行的指令。JVM由以下几个主要组成部分构成&#xff1a; 类加载器&#xff08;Class Loader&#…

117.移除链表元素(力扣)

题目描述 代码解决 class Solution { public:ListNode* removeElements(ListNode* head, int val) {//删除头节点while(head!NULL&&head->valval){ListNode*tmphead;headhead->next;delete tmp;}//删除非头节点ListNode*curhead;while(cur!NULL&&cur-&g…

【python】python用户管理系统[简易版](源码+报告)【独一无二】

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;公众号&#x1f448;&#xff1a;测试开发自动化【获取源码商业合作】 &#x1f449;荣__誉&#x1f448;&#xff1a;阿里云博客专家博主、5…

End-to-End Weakly-Supervised SemanticSegmentation with Transformers

摘要 弱监督语义分割&#xff08;WSSS&#xff09;使用图像级标签是一项重要且具有挑战性的任务。由于高训练效率&#xff0c;端到端的WSSS解决方案受到社区越来越多的关注。然而&#xff0c;当前的方法主要基于卷积神经网络&#xff0c;并未正确地探索全局信息&#xff0c;因…

SwiftUI 在 App 中弹出全局消息横幅(下)

功能需求 在 SwiftUI 开发的 App 界面中,有时我们需要在全局层面向用户展示一些消息: 如上图所示:我们弹出的全局消息横幅位于所有视图之上,这意味这它不会被任何东西所遮挡;而且用户可以点击该横幅关闭它。这是怎么做到的呢? 在本篇博文中,您将学到以下内容 功能需求…