2、RocketMQ源码分析(二)

RocketMQ的底层通信模块remoting

remoting是RocketMQ的底层通信模块,RocketMQ底层通讯是使用Netty来实现的。本文通过对remoting源码进行分析,来说明remoting如何实现高性能通信的。

二、Remoting 通信模块结构
remoting 的网络通信是基于 Netty 实现,模块中类的继承关系如下:

在这里插入图片描述
可见通信的类继承自类RemotingService,RemotingService的定义如下:

RemotingService是RPC 远程服务基础类。主要定义所有的远程服务类的基础方法:

public interface RemotingService {
     // 服务启动
    void start();
     // 服务停止
    void shutdown();
    //注册RPC钩子函数
    void registerRPCHook(RPCHook rpcHook);
}

抽象出服务端与客户端都需要的三个方法,其中注册hook是为了能够在远程通讯之后或调用之前执行用户自定义的逻辑。

RemotingServer/RemotingClient
远程服务器/客户端基础接口,两者中的方法基本类似,故这里重点介绍一下 RemotingServer,定位 RPC 远程操作的相关“业务方法”。

public interface RemotingServer extends RemotingService {
 
    /**
     *  注册处理器
     * @param requestCode   请求码
     * @param processor     处理器
     * @param executor      线程池
     *    这三者是绑定关系:
     *       根据请求的code  找到处理对应请求的处理器与线程池 并完成业务处理。
     */
    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);
 
 
    /**
     *  注册缺省处理器
     * @param processor  缺省处理器
     * @param executor   线程池
     */
    void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
 
    // 获取服务端口
    int localListenPort();
 
 
    /**
     *  根据 请求码 获取 处理器和线程池
     * @param requestCode  请求码
     * @return
     */
    Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
 
    /**
     *  同步调用
     * @param channel   通信通道
     * @param request   业务请求对象
     * @param timeoutMillis   超时时间
     * @return  响应结果封装
     */
    RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
        RemotingTimeoutException;
 
    /**
     *  异步调用
     * @param channel  通信通道
     * @param request  业务请求对象
     * @param timeoutMillis  超时时间
     * @param invokeCallback  响应结果回调对象
     */
    void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
 
 
    /**
     *  单向调用 (不关注返回结果)
     * @param channel   通信通道
     * @param request   业务请求对象
     * @param timeoutMillis  超时时间
     */
    void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException;
}

从上面的代码可以看出,RemotingServer的主要功能是注册请求协议处理器、请求调用方法。

NettyRemotingServer::服务端的实现类,实现了 RemotingServer 接口,继承 NettyRemotingAbstract 抽象类


public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
 
  	// Netty服务端启动器
    private final ServerBootstrap serverBootstrap;
  	
    // worker组
    private final EventLoopGroup eventLoopGroupSelector;
  
    // boss组 
    private final EventLoopGroup eventLoopGroupBoss;
 
    // Netty服务端配置信息类
    private final NettyServerConfig nettyServerConfig;
 
 	// 公共线程池   (在注册协议处理器的时候,若未给处理器指定线程池,那么就是用该公共线程池)
    private final ExecutorService publicExecutor;
 
	
    //  Netty Channel 特殊状态监听器
    private final ChannelEventListener  channelEventListener;
 
  	// 定时器  (功能: 扫描 responseTable表,将过期的responseFuture移除)
    private final Timer timer = new Timer("ServerHouseKeepingService", true);
 
    // 用于在pipeline指定handler中 执行任务的线程池
    private DefaultEventExecutorGroup defaultEventExecutorGroup;
 
 	// 服务端绑定的端口
    private int port = 0;
 
    private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
    private static final String TLS_HANDLER_NAME = "sslHandler";
    private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
 
  	
    // 用于处理 SSL 握手连接的处理器
    private HandshakeHandler handshakeHandler;
    
    // 协议编码 处理器
    private NettyEncoder encoder;
    
    // 连接管理 处理器
    private NettyConnectManageHandler connectionManageHandler;
    
    // 核心业务 处理器
    private NettyServerHandler serverHandler;
 
 
	// 参数1: nettyServerConfig  Netty服务端配置信息
    // 参数2: channelEventListener  channel特殊状态监听器
    public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
        final ChannelEventListener channelEventListener) {
		
        // 调用父类  就是通过 Semaphore 设置请求并发限制
        // 1. 设置 单行请求的并发限制
        // 2. 设置 异步请求的并发限制
        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
 
		
        
        this.serverBootstrap = new ServerBootstrap();
        this.nettyServerConfig = nettyServerConfig;
        this.channelEventListener = channelEventListener;
 
 
       
        // 创建公共线程池 publicExecutor   线程数量为:4
        int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
        if (publicThreadNums <= 0) {
            publicThreadNums = 4;
        }
        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);
 
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
            }
        });
 
    
        
        // 下面就是根据操作系统平台来选择创建 bossGroup 和 workGroup的逻辑
        if (useEpoll()) {
            this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
 
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });
 
            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();
 
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        } else {
            this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
 
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });
 
            this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();
 
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        }
		
        
        // 加载SSL连接的相关方法 (不在本篇的分析范围内)
        loadSslContext();
    }
}

NettyRemotingServer当中重要的参数:

父类的属性 semaphoreOneway , **semaphoreAsync ** 用来控制请求并发量的
serverBootstrap Netty服务器启动器
nettyServerConfig Netty服务器配置信息
channelEventListener Netty Channel状态监听器
eventLoopGroupSelector worker组
eventLoopGroupBoss boss组

NettyRemotingServer的启动

  // 启动Netty 服务器
    @Override
    public void start() {
        
        // Netty pipeline中的指定 handler 采用该线程池执行
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {
 
                private AtomicInteger threadIndex = new AtomicInteger(0);
 
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });
 
        // 初始化 处理器 handler
        // 1. handshakeHandler  SSL连接
        // 2. encoder  编码器
        // 3. connectionManageHandler 连接管理器处理器
        // 4. serverHandler 核心业务处理器
        prepareSharableHandlers();
		
        // 下面就是 Netty 创建服务端启动器的固定流程 
        
        ServerBootstrap childHandler =
                // 配置服务端 启动对象
                // 配置工作组 boss 和 worker 组
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                    // 设置服务端ServerSocketChannel 类型
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                    // 设置服务端ch选项
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                    // 设置客户端ch选项
                .childOption(ChannelOption.TCP_NODELAY, true)
                    // 设置 接收缓冲区 和 发送缓冲区的 大小
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
 
                    // 设置服务器端口
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
 
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
          // 初始化 客户端ch pipeline 的逻辑, 同时指定了线程池为 defaultEventExecutorGroup
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });
 
		
        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            // 客户端开启 内存池,使用的内存池 是 PooledByteBufAllocator.DEFAULT
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }
 
        try {
            //  服务器 绑定端口
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }
 
        // 条件成立: channel状态监听器不为空, 则创建 网络异常事件执行器
        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
 
        // 提交定时任务,每一秒 执行一次
        // 扫描 responseTable 表, 将过期的 responseFuture 移除
        this.timer.scheduleAtFixedRate(new TimerTask() {
 
            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

上述代码 基本上就是 模板Netty创建服务端的代码,主要做了如下几件事:

启动Netty服务器
开启 channel状态监听线程
开启 扫描 responseFuture 的定时任务
在这里插入图片描述

通过这个结构图可以看出,RocketMQ 在 Netty 原生的多线程 Reactor 模型上做了一系列的扩展和优化,使用多个线程池来处理数据

1、一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP 网络连接请求,建立好连接,创建 SocketChannel,并注册到 selector 上。
 RocketMQ 的源码中会自动根据 OS 的类型选择 NIO 和 Epoll,也可以通过参数配置,然后监听真正的网络数据。
2、拿到网络数据后,再丢给 Worker 线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),
3、在真正执行业务逻辑之前需要进行 SSL 验证、编解码、空闲检查、网络连接管理,这些工作交给 defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为 8 )去做。
4、而处理业务操作放在业务线程池中执行,根据 RomotingCommand 的业务请求码 code 去 processorTable 这个本地缓存变量中找到对应的 processor,然后封装成 task 任务后,提交给对应的业务 processor 处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。从入口到业务逻辑的几个步骤中线程池一直再增加,这跟每一步逻辑复杂性相关,越复杂,需要的并发通道越宽。

NettyRemotingAbstract:抽象类NettyRemotingAbstract是NettyRemotingServer的父类,主要定义了请求并发量、控制响应对象和各种请求处理函数。

public abstract class NettyRemotingAbstract {
 
 
	// 控制 单向请求的 并发量
    protected final Semaphore semaphoreOneway;
 
    // 控制 异步请求的 并发量
    protected final Semaphore semaphoreAsync;
 
	// 响应对象映射表  (key: opaque  value:responseFuture)
    protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
        new ConcurrentHashMap<Integer, ResponseFuture>(256);
 
	// 请求处理器映射表 (key: requestCode  value:(processor,executor)  )
    protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
        new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
	// Netty事件监听线程池
    protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
 
	
    // 默认的请求处理器对  包含(processor,executor) 
    protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
	
    // SSL相关
    protected volatile SslContext sslContext;
 
	
    // 扩展钩子
    protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();
}

结合RocketMQ源码分析

01初始化流程
在Broker创建的时候会去初始化NettyRemotingServer类,调用其构造方法。

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
    final ChannelEventListener channelEventListener) {
    super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
    this.serverBootstrap = new ServerBootstrap();
    this.nettyServerConfig = nettyServerConfig;
    this.channelEventListener = channelEventListener;

    int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
    if (publicThreadNums <= 0) {
        publicThreadNums = 4;
    }
  // 初始化用于接收客户端的TCP连接
    this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
        }
    });
  // 根据配置设置NIO还是Epoll来作为Selector线程池
    // 默认在Linux环境为true,windows环境下都是NIO,相对来说Linux更快。
    if (useEpoll()) {
        this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
            }
        });

        this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);
            private int threadTotal = nettyServerConfig.getServerSelectorThreads();

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
            }
        });
    } else {
        this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
            }
        });

        this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);
            private int threadTotal = nettyServerConfig.getServerSelectorThreads();

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
            }
        });
    }
    // 加载SSL
    loadSslContext();
}

首先会初始化ServerBootstrap,NettyServerConfig等相关参数,同时会判断是在哪个操作系统,如果是在Linux平台则会使用EpollEventLoopGroup作为线程池,如果不是则会使用NioEventLoopGroup作为线程池。

02启动流程
在Broker作为服务端启动与NameServer作为服务端启动的时候都会来初始化,启动一个Netty的服务端进行相关的通讯。


@Override
public void start() {
    // 默认的处理线程池组,用于处理后面多个NettyHandle的操作
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
        nettyServerConfig.getServerWorkerThreads(),
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
            }
        });

    prepareSharableHandlers();
  // 正常的一种Netty的服务端的启动逻辑
    // 其中包括解码器,编码器,心跳管理器,连接管理器,消息类型处理分发
    ServerBootstrap childHandler =
        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
            .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
            .option(ChannelOption.SO_REUSEADDR, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                        .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                        .addLast(defaultEventExecutorGroup,
                            encoder,
                            new NettyDecoder(),
                            new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                            connectionManageHandler,
                            serverHandler
                        );
                }
            });
    if (nettyServerConfig.getServerSocketSndBufSize() > 0) {
        log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());
        childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());
    }
    if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {
        log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());
        childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());
    }
    if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {
        log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",
                nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());
        childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
                nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));
    }

    if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
        childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    }

    try {
        ChannelFuture sync = this.serverBootstrap.bind().sync();
        InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
        this.port = addr.getPort();
    } catch (InterruptedException e1) {
        throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
    }

    if (this.channelEventListener != null) {
        this.nettyEventExecutor.start();
    }

    // 定时扫描responseTable,获取返回结果,并处理超时业务。
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                // 超时请求扫描
                NettyRemotingServer.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
}

初始化线程池组处理多个NettyHandle操作。
初始化ServerBootStrap,设置tcp参数,编解码器,心跳处理器,连接管理器,请求分发。
定时扫描ResponseTable,获取返回结果并处理超时业务。

NettyEncoder类
将RemotingCommand对象序列化为ByteBuffer对象。根据serializerType的不同,Header会编码为JSON或者二进制。

 @Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
    throws Exception {
    try {
        remotingCommand.fastEncodeHeader(out);
        byte[] body = remotingCommand.getBody();
        if (body != null) {
            out.writeBytes(body);
        }
    } catch (Exception e) {
        log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
        if (remotingCommand != null) {
            log.error(remotingCommand.toString());
        }
        RemotingUtil.closeChannel(ctx.channel());
    }
}

将RemotingCommand对象编码成ByteBuf进行数据的传递,先将Header部分进行编码,然后将body追加到ByteBuf中,Header的编码相对复杂因为在传递过程中我们得让被传递方知道该ByteBuf如何进行拆解,同时我们也要防止各种不正常的情况方法,这是就需要我们规定一些内容,这些都会在Netty里面去详讲,这里就提及一下。(第一个字节表示编解码类型)

NettyDecoder类
NettyEncoder继承自LengthFieldBasedFrameDecoder,主要有用于解码入站数据流,并将数据流解码为RemotingCommand对象。

@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
    ByteBuf frame = null;
    try {
        frame = (ByteBuf) super.decode(ctx, in);
        if (null == frame) {
            return null;
        }
        return RemotingCommand.decode(frame);
    } catch (Exception e) {
        log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
        RemotingUtil.closeChannel(ctx.channel());
    } finally {
        if (null != frame) {
            frame.release();
        }
    }

    return null;
}

首先会调用LengthFieldBasedFrameDecoder.decoder方法进行首次解码,然后调用NettyDecoder.decoder进行二次解码,完成Header与body的解码并转换成RemotingCommand对象。而之所以需要两次第一个就是为了解出对应的长度,即数据流中前4个字节的值表示有效数据域的长度,除开前4个字节外的内容都是有效数据域的内容,不存在偏移量。

IdleStateHandler类
进行心跳检查类,客户端与服务端之间需要保持长连接则需要通过一个心跳机制来保证有效性,当其中一者处于宕机或者网络延迟的情况下判断是否有效,如果无效及时进行释放资源。(该类三个参数,第一个为读空闲时间,第二个为写空闲时间,第三个为读或写空闲时间,如果在规定时间内没有触发对应事件就会触发定时任务的执行)

NettyConnectManageHandler类
用于监听pipeline中入站/出站的事件,主要进行日志记录等操作。

NettyServerHandler类
核心业务处理器,处理的时候会去调用channelRead0进行相关逻辑,最终会去执行processMessageReceived方法。


public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
    final RemotingCommand cmd = msg;
    if (cmd != null) {
        switch (cmd.getType()) {
                // 处理请求的过程
            case REQUEST_COMMAND:
                processRequestCommand(ctx, cmd);
                break;
            case RESPONSE_COMMAND:
                // 处理响应的过程
                processResponseCommand(ctx, cmd);
                break;
            default:
                break;
        }
    }
}

TYPE类型为REQUEST_COMMAND表示请求消息,则调用processRequestCommand方法进行处理。

public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
    // 从processorTable中找到对应的Processor,如果不存在则使用defaultRequestProcessor处理器
    final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
    final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
    final int opaque = cmd.getOpaque();

    if (pair != null) {
        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    // 获取nameServer
                    String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                    doBeforeRpcHooks(remoteAddr, cmd);
                    final RemotingResponseCallback callback = new RemotingResponseCallback() {
                        @Override
                        // 服务端处理完请求之后调用
                        public void callback(RemotingCommand response) {
                            doAfterRpcHooks(remoteAddr, cmd, response);
                            // 不是单向
                            if (!cmd.isOnewayRPC()) {
                                if (response != null) {
                                    response.setOpaque(opaque);
                                    response.markResponseType();
                                    response.setSerializeTypeCurrentRPC(cmd.getSerializeTypeCurrentRPC());
                                    try {
                                        // 往客户端输出结果
                                        ctx.writeAndFlush(response);
                                    } catch (Throwable e) {
                                        log.error("process request over, but response failed", e);
                                        log.error(cmd.toString());
                                        log.error(response.toString());
                                    }
                                } else {
                                }
                            }
                        }
                    };
                    // 如果是异步处理的Process
                    if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
                        AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
                        processor.asyncProcessRequest(ctx, cmd, callback);
                    } else {
                        // 不是异步知道拿到Processor进行相关处理
                        NettyRequestProcessor processor = pair.getObject1();
                        RemotingCommand response = processor.processRequest(ctx, cmd);
                        callback.callback(response);
                    }
                } catch (Throwable e) {
                    log.error("process request exception", e);
                    log.error(cmd.toString());

                    if (!cmd.isOnewayRPC()) {
                        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                            RemotingHelper.exceptionSimpleDesc(e));
                        response.setOpaque(opaque);
                        ctx.writeAndFlush(response);
                    }
                }
            }
        };

        if (pair.getObject1().rejectRequest()) {
            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                "[REJECTREQUEST]system busy, start flow control for a while");
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            return;
        }

        try {
            final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
            pair.getObject2().submit(requestTask);
        } catch (RejectedExecutionException e) {
            if ((System.currentTimeMillis() % 10000) == 0) {
                log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                    + ", too many requests and system thread pool busy, RejectedExecutionException "
                    + pair.getObject2().toString()
                    + " request code: " + cmd.getCode());
            }

            if (!cmd.isOnewayRPC()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[OVERLOAD]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
            }
        }
    } else {
        String error = " request type " + cmd.getCode() + " not supported";
        final RemotingCommand response =
            RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
        response.setOpaque(opaque);
        ctx.writeAndFlush(response);
        log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
    }
}

首先从processorTable获取对应的Processor,而processorTable的内容都是通过启动Broker或者是NameServer进行存入的,存储的结构为map以Code作为key以Processor作为值,当我们使用的时候只需要根据code查询即可,如果没有找到就直接使用默认的defaultRequestProcessor处理器。
如果获取到的Processor为异步的则调用异步处理请求,如果不是则直接调用Processor对应的processRequest方法进行处理。
TYPE类型为RESPONSE_COMMAND表示响应消息,则调用processResponseCommand进行处理。


public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
    final int opaque = cmd.getOpaque();
    // 从map缓存获取正在进行的其中一个请求
    final ResponseFuture responseFuture = responseTable.get(opaque);
    if (responseFuture != null) {
        responseFuture.setResponseCommand(cmd);
      // 移除本次请求
        responseTable.remove(opaque);
      // 执行回调
        if (responseFuture.getInvokeCallback() != null) {
            executeInvokeCallback(responseFuture);
        } else {
            responseFuture.putResponse(cmd);
            responseFuture.release();
        }
    } else {
        log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
        log.warn(cmd.toString());
    }
}

超时请求扫描
在每次请求时将ResponseFuture放入Map中,通过定时扫描来判断记录时间与超时时间来比较判断是否超时。

public void scanResponseTable() {
    final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
    Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<Integer, ResponseFuture> next = it.next();
        ResponseFuture rep = next.getValue();

        // 当前时间大于请求开始时间+超时等待时间+1秒,则任务超时了
        if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
            rep.release();
            it.remove();
            rfList.add(rep);
            log.warn("remove timeout request, " + rep);
        }
    }
  // 执行被移除 ResponseFuture对象的executeInvokeCallback方法  
    for (ResponseFuture rf : rfList) {
        try {
            executeInvokeCallback(rf);
        } catch (Throwable e) {
            log.warn("scanResponseTable, operationComplete Exception", e);
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/226145.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于OpenCV+CNN+IOT+微信小程序智能果实采摘指导系统——深度学习算法应用(含pytho、JS工程源码)+数据集+模型(四)

目录 前言总体设计系统整体结构图系统流程图 运行环境Python环境TensorFlow 环境Jupyter Notebook环境Pycharm 环境微信开发者工具OneNET云平台 模块实现1. 数据预处理2. 创建模型并编译3. 模型训练及保存1&#xff09;模型训练2&#xff09;模型保存 4. 上传结果1&#xff09;…

刷题学习记录(文件上传)

[GXYCTF 2019]BabyUpload 知识点&#xff1a;文件上传.htaccessMIME绕过 题目直接给题目标签提示文件上传的类型 思路&#xff1a;先上传.htaccess文件&#xff0c;在上传木马文件&#xff0c;最后蚁剑连接 上传.htaccess文件 再上传一个没有<?的shell 但是要把image/pn…

生态学、种间关系、进化

这里写自定义目录标题 参考资料种间关系Lynn Margulis共生体在进化过程中形成了一种互帮互助的机制 捕食&#xff1a;收割理论 进化思想史 参考资料 Symbiotic Communications: Where Marconi Meets Darwin 普通生态学(孙儒泳)高等教育出版社1998普通生态学(尚玉昌)北京大学出…

AUTOSAR 入门

前言 AUTOSAR是什么Vector DaVinci 工具功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants 创建一个自定义列表如何创建一个注脚注释也是必…

Weblogic CVE-2023-21839(metasploit版)

Step1&#xff1a;用docker搭建环境 Step2&#xff1a;docker查看映射端口 Step3&#xff1a;访问特定端口&#xff0c;然后靶标应用。 Step4&#xff1a;用metasploit进行攻击&#xff1a; 首先&#xff0c;打开metasploit&#xff0c;然后查询需要攻击的板块&#xff0…

【绘图工具】-- Excalidraw 介绍

1、Excalidraw 的来源 Excalidraw 的作者 vjeux 是 facebook 团队的成员&#xff0c;项目在开始阶段就是就是以开源的方式推进&#xff08;现在也开源&#xff0c;只是商业版本才收费&#xff09;&#xff0c;当时作者在 Twitter 上发了一些推文介绍了&#xff0c;然后引起了许…

学习记录---kubernetes中备份和恢复etcd

一、简介 ETCD是kubernetes的重要组成部分&#xff0c;它主要用于存储kubernetes的所有元数据&#xff0c;我们在kubernetes中的所有资源(node、pod、deployment、service等)&#xff0c;如果该组件出现问题&#xff0c;则可能会导致kubernetes无法使用、资源丢失等情况。因此…

题目:分糖果(蓝桥OJ 2928)

题目描述&#xff1a; 解题思路&#xff1a; 本题采用贪心思想 题解&#xff1a; #include<bits/stdc.h> using namespace std;const int N 1e6 9; char s[N];//写字符串数组的一种方法,像数组一样***int main() {int n, x;cin >> n >> x;for(int i 1; i…

C++ vector基本操作

目录 一、介绍 二、定义 三、迭代器 四、容量操作 1、size 2、capacity 3、empty 4、resize 5、reserve 总结&#xff08;扩容机制&#xff09; 五、增删查改 1、push_back & pop_back 2、find 3、insert 4、erase 5、swap 6、operator[] 一、介绍 vector…

OpenGL ES 帧缓冲对象介绍和使用示例

一、介绍 1. 帧缓冲对象 默认情况下&#xff0c;OpenGL渲染的目标是屏幕&#xff0c;但如果你不想直接渲染到屏幕上&#xff0c;还需要对渲染结果做某些后期处理、渲染到纹理、阴影映射等操作&#xff0c;便可以使用帧缓冲对象&#xff0c;实现离屏渲染。 帧缓冲对象&#x…

Jmeter接口测试

前言&#xff1a; 本文主要针对http接口进行测试&#xff0c;使用Jmeter工具实现。 Jmter工具设计之初是用于做性能测试的&#xff0c;它在实现对各种接口的调用方面已经做的比较成熟&#xff0c;因此&#xff0c;本次直接使用Jmeter工具来完成对Http接口的测试。 1.介绍什么是…

《即时消息系统-IM核心技术》

IM 核心概念 用户&#xff1a;系统的使用者 消息&#xff1a;是指用户之间的沟通内容。通常在 IM 系统中&#xff0c;消息会有以下几类&#xff1a;文本消息、表情消息、图片消息、视频消息、文件消息等等 会话&#xff1a;通常指两个用户之间因聊天而建立起的关联 群&…

聊一聊Java中的枚举和泛型(两种强大的编程特性)

聊一聊Java中的枚举和泛型&#xff08;两种强大的编程特性&#xff09; 保持热爱&#xff0c;奔赴山海。。。。。。 Java中的枚举 在Java中&#xff0c;枚举&#xff08;Enum&#xff09;是一种特殊的数据类型&#xff0c;用于定义包含固定常量集合的数据类型。枚举类型在Jav…

猫咪瘦弱的原因是什么?适合给消瘦猫咪长肉吃的猫罐头分享

很多小猫咪吃得很多&#xff0c;但是还是很瘦&#xff0c;这让很多猫主人感到困惑&#xff0c;猫咪瘦弱的原因是什么呢&#xff1f;铲屎那么多年&#xff0c;还是有点子养猫知识在身上的。那么&#xff0c;小猫咪瘦弱的原因是什么呢&#xff1f;让我们看看是不是这些原因导致的…

LinuxBasicsForHackers笔记 -- 进程管理

进程是一个正在运行和使用资源的程序。 Linux 内核是操作系统的内核&#xff0c;几乎控制着一切&#xff0c;在创建进程时&#xff0c;它会按顺序为每个进程分配一个唯一的进程 ID (PID)。 查看进程 ps – 用于在命令行查看哪些进程处于活动状态。单独使用 ps 命令并不能真正…

使用Notepad++编辑器,安装compare比较差异插件

概述 是一款非常有特色的编辑器&#xff0c;Notepad是开源软件&#xff0c;Notepad中文版可以免费使用。 操作步骤&#xff1a; 1、在工具栏 ->“插件”选项。 2、勾选Compare选项&#xff0c;点击右上角“安装”即可。 3、 确认安装插件 4、下载插件 5、插件已安装 6、打…

微前端介绍

目录 微前端概念 微前端特性 场景演示 微前端方案 iframe 方案 qiankun 方案 micro-app 方案 EMP 方案 无界微前端 方案 无界方案 成本低 速度快 原生隔离 功能强大 总结 前言&#xff1a;微前端已经是一个非常成熟的领域了&#xff0c;但开发者不管采用哪个现…

JAVA网络编程——BIO、NIO、AIO深度解析

I/O 一直是很多Java同学难以理解的一个知识点&#xff0c;这篇帖子将会从底层原理上带你理解I/O&#xff0c;让你看清I/O相关问题的本质。 1、I/O的概念 I/O 的全称是Input/Output。虽常谈及I/O&#xff0c;但想必你也一时不能给出一个完整的定义。搜索了谷哥欠&#xff0c;发…

MySQl int(1)、int(20) 的区别到底在哪里

MySQl int(1)、int(20) 的区别到底在哪里 常思一二&#xff0c;便得自然… int(1)数据类型介绍 在MySQL中&#xff0c;INT(1) 是一种定义整数类型的数据字段&#xff0c;其中的数字表示显示宽度而不是存储范围。具体说&#xff0c;INT(1) 中的数字 1 表示显示宽度&#xff0…

数字人知识库:Awesome-Talking-Head-Synthesis

数字人知识库&#xff1a;Awesome-Talking-Head-Synthesis 文章目录 数字人知识库&#xff1a;Awesome-Talking-Head-SynthesisDatasetsSurveyAudio-drivenText-drivenNeRF & 3DMetricsTools & SoftwareSlides & Presentations Gihub&#xff1a;https://github.co…