10 dubbo源码学习_线程池

    • 1. 线程模型&线程池介绍
      • 1.1 线程池
      • 1.2 线程模型
    • 2. 线程池源码分析
      • 2.1 FixedThreadPool
      • 2.2 CachedThreadPool
      • 2.3 LimitedThreadPool
    • 3. 线程模型源码
      • 3.1 AllDispatcher
      • 3.2 DirectDispatcher
      • 3.3 MessageOnlyDispatcher
      • 3.4 ExecutionDispatcher
      • 3.5 ConnectionOrderedDispatcher

1. 线程模型&线程池介绍

1.1 线程池

dubbo内部采用netty做为通信工具,netty包括bossGroup和workerGroup,bossGroup负责接收accept连接,连接就绪后,将连接交给workerGroup进行处理;
默认情况下:
bossGroup:线程个数:1个,队列长度,Integer.MAX;创建1个NioEventLoop,这个NioEventLoop主要进行accept操作;
workerGroup:线程个数:Math.min(Runtime.getRuntime().availableProcessors() + 1, 32),一般是CPU核数+1,会创建这么多的NioEventLoop;
以上这些都是属于netty的,而在dubbo中,也有一个业务线程池,为什么又要有一个业务线程池呢,是因为我们的业务一般处理慢,如果直接使用workerGroup的线程去处理业务的话,会降级dubbo的处理性能;

修改workerGroup线程个数
通过 iothreads 属性指定workerGroup个数;

dubbor线程池配置
dubbo提供了如下的线程池,可以在:dubbo:protocol threadpool=""中指定;

  • fixed:固定大小的线程池,默认大小是200,如果未配置:queues 参数,则默认使用SynchronousQueue;
  • cached:缓存线程池,空闲一分钟自动删除,默认大小为:如果未配置 threads 参数,则 Integer.MAX_VALUE
  • limited:可伸缩线程池,但是线程池中的线程数只会增长不会收缩,这样做的目的是为了避免当进行收缩时流量突然增加造成性能问题。默认大小:如果未配置 threads 参数,则 200

1.2 线程模型

上面说了dubbo的线程池,除了netty的bossGroup和workerGroup之外,另外提供的业务线程池,那什么场景下使用业务线程池,这种策略该如何选呢?
所以dubbo提供了:dubbo:protocol dispatcher=""属性,可以指定规则:

  • all:所有(除发送)消息都由业务线程池执行;包括:请求、响应、连接、断开连接、心跳;
  • direct:所有消息都不派发到线程池,由workerGroup直接处理;
  • message:只有请求响应发给线程池,其他连接直接在IO线程上执行;
  • execution:只请求消息发到线程池,其他由IO线程池上执行;
  • connection:在IO线程上,将连接断开事件放入队列,有序逐个执行,其他消息派发到线程池;

2. 线程池源码分析

源码的入口 :
在这里插入图片描述

public WrappedChannelHandler(ChannelHandler handler, URL url) {
    this.handler = handler;
    this.url = url;

    String componentKey;
    if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
        componentKey = Constants.CONSUMER_SIDE;
        if (url.getParameter(SHARE_EXECUTOR_KEY, false)) {
            ExecutorService cExecutor = (ExecutorService) dataStore.get(componentKey, SHARED_CONSUMER_EXECUTOR_PORT);
            if (cExecutor == null) {
                cExecutor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
                dataStore.put(componentKey, SHARED_CONSUMER_EXECUTOR_PORT, cExecutor);
                cExecutor = (ExecutorService) dataStore.get(componentKey, SHARED_CONSUMER_EXECUTOR_PORT);
            }
            executor = cExecutor;
        } else {
            // 获取 URL中的 threadpool参数,参数指定的是使用的线程池类型
            executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
            dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
        }
    } else {
        componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }
}

dubbo提供了线程池ThreadPool,默认的SPI是"fixed",其实现类如下:

2.1 FixedThreadPool

public Executor getExecutor(URL url) {
    // threadname 如果未配置,默认为:Dubbo
    String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
    // threads 表示corePoolSize,默认为:200
    int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
    // queues 表示队列长度,默认为:0
    int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
    // 核心线程池大小、最大线程池大小,默认都是200,如果流量比较小的情况下,直接设置为200会导致资源占用情况严重;
    // keepalivetime 0 ms
    // 如果未配置queues的话,默认为0,会选用SynchronousQueue同步队列
    // 否则使用LinkedBlockingQueue队列,长度为queues;
    return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
            queues == 0 ? new SynchronousQueue<Runnable>() :
                    (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                            : new LinkedBlockingQueue<Runnable>(queues)),
            new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}

FixedThreadPool的话,初始化时,线程池大小即为200,不会随着流量的增长或缩小而改变线程池大小;而且队列如果指定,默认使用SynchronousQueue队列(没有长度);

2.2 CachedThreadPool

public class CachedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        // threadname 如果未配置,默认为:Dubbo
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // corethreads 表示corethreads,默认为:0
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        // threads 表示maxPoolSize,默认为:Integer.MAX_VALUE
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        // queues 表示队列长度,默认为:0
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        // alive 默认为:60 * 1000
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
        // 指定coreSize、maxPoolSize、keepaliveTime参数;
        // 队列如未指定也是默认:SynchronousQueue
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

CachedThreadPool 核心线程数为0,表示在空闲的情况下,它不需要保留任何活跃的线程,最大线程数为Integer.MAX_VALUE,队列如果未指定,则默认使用SynchronousQueue
当线程空闲1分钟时,会自动进行释放;

2.3 LimitedThreadPool

public class LimitedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        // threadname 如果未配置,默认为:Dubbo
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // corethreads 表示corethreads,默认为:0
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        // threads 表示maxPoolSize,默认为:200
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        // 核心线程数0,最大200,keepalivetime为Integer.MAX_VALUE
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

LimitedThreadPool表示,最大可扩容至200个线程,初始时为0,线程约为不过期;

3. 线程模型源码

all=com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher
direct=com.alibaba.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher
message=com.alibaba.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher
execution=com.alibaba.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher
connection=com.alibaba.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher

他们都继承自Dispatcher接口,程序的入口在:

// 
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
    return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
// 这里会获取Dispatcher
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
            .getAdaptiveExtension().dispatch(handler, url)));
}

dubbo 服务消费者 入口

public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
    super(url, wrapChannelHandler(url, handler));
}
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
    url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
    url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
    return ChannelHandlers.wrap(handler, url);
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
    return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
// 这里会获取 Dispatcher
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
            .getAdaptiveExtension().dispatch(handler, url)));
}

各类Dispatcher的处理类都继承自ChannelHandler

public interface ChannelHandler {

    /**
		连接事件
     */
    void connected(Channel channel) throws RemotingException;

    /**
		断开连接
     */
    void disconnected(Channel channel) throws RemotingException;

    /**
		发送
     */
    void sent(Channel channel, Object message) throws RemotingException;

    /**
		接收
     */
    void received(Channel channel, Object message) throws RemotingException;

    /**
		异常
     */
    void caught(Channel channel, Throwable exception) throws RemotingException;

}
  • 建立连接:connected,主要是的职责是在channel记录read、write的时间,以及处理建立连接后的回调逻辑,比如dubbo支持在断开后自定义回调的hook(onconnect),即在该操作中执行。
  • 断开连接:disconnected,主要是的职责是在channel移除read、write的时间,以及处理端开连接后的回调逻辑,比如dubbo支持在断开后自定义回调的hook(ondisconnect),即在该操作中执行。
  • 发送消息:sent,包括发送请求和发送响应。记录write的时间。
  • 接收消息:received,包括接收请求和接收响应。记录read的时间。
  • 异常捕获:caught,用于处理在channel上发生的各类异常。

3.1 AllDispatcher

所有消息都由业务线程池执行;包括:请求、响应、连接、断开连接、心跳;它的处理handler是:AllChannelHandler

public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        // 创建对应的handler处理类
        return new AllChannelHandler(handler, url);
    }
}


public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        // 它的主要任务就是初始化 executor
        super(handler, url);
    }
    // 连接
    @Override
    public void connected(Channel channel) throws RemotingException {
        // 获取线程池,实现方法由父类实现,初始化是由构造器调用父类构造器时进行的初始化;
        ExecutorService cexecutor = getExecutorService();
        try {
            // 提交到线程池执行;
            // ChannelEventRunnable有run方法,execute会执行它的run方法;
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
            //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}

public class ChannelEventRunnable implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(ChannelEventRunnable.class);

    private final ChannelHandler handler;
    private final Channel channel;
    private final ChannelState state;
    private final Throwable exception;
    private final Object message;

    public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state) {
        this(channel, handler, state, null);
    }

    public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message) {
        this(channel, handler, state, message, null);
    }

    public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Throwable t) {
        this(channel, handler, state, null, t);
    }

    public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message, Throwable exception) {
        this.channel = channel;
        this.handler = handler;
        this.state = state;
        this.message = message;
        this.exception = exception;
    }

    @Override
    public void run() {
        // 这里面定义了各类消息的处理,其实也是会直接调用传递进来的handler的对应方法;
        if (state == ChannelState.RECEIVED) {
            try {
                handler.received(channel, message);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is " + message, e);
            }
        } else {
            switch (state) {
            case CONNECTED:
                try {
                    handler.connected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case DISCONNECTED:
                try {
                    handler.disconnected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case SENT:
                try {
                    handler.sent(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is " + message, e);
                }
            case CAUGHT:
                try {
                    handler.caught(channel, exception);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is: " + message + ", exception is " + exception, e);
                }
                break;
            default:
                logger.warn("unknown state: " + state + ", message is " + message);
            }
        }

    }

    /**
     * ChannelState
     *
     *
     */
    public enum ChannelState {

        /**
         * CONNECTED
         */
        CONNECTED,

        /**
         * DISCONNECTED
         */
        DISCONNECTED,

        /**
         * SENT
         */
        SENT,

        /**
         * RECEIVED
         */
        RECEIVED,

        /**
         * CAUGHT
         */
        CAUGHT
    }

}

AllDispatcher会创建一个AllChannelHandler,而它内部会将各类处理提交至业务线程池,业务线程池会执行对应handler的消息方法处理;

在这里插入图片描述

  • 在IO线程中执行的操作有:
    • sent操作在IO线程上执行。
    • 序列化响应在IO线程上执行。
  • 在Dubbo线程中执行的操作有:
    • received、connected、disconnected、caught都是在Dubbo线程上执行的。
    • 反序列化请求的行为在Dubbo中做的。

3.2 DirectDispatcher

前面介绍过,它是所有的消息直接由workerGroup进行处理;

public class DirectDispatcher implements Dispatcher {

    public static final String NAME = "direct";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return handler;
    }

}

所以它的源码非常简单,直接由原handler进行处理即可;
在这里插入图片描述

  • 在IO线程中执行的操作有:
    • received、connected、disconnected、caught、sent操作在IO线程上执行。
    • 反序列化请求和序列化响应在IO线程上执行。
  • 并没有在Dubbo线程操作的行为。

3.3 MessageOnlyDispatcher

在provider端,Message Only Dispatcher和Execution Dispatcher的线程模型是一致的 只有请求发给线程池,其他连接直接在IO线程上执行,对应的handler处理类:MessageOnlyChannelHandler

public class MessageOnlyChannelHandler extends WrappedChannelHandler {

    public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
}

其他事件都由父类WrappedChannelHandler实现,而父类的实现方式是直接调用handler.xx方法;

3.4 ExecutionDispatcher

只请求消息发到线程池,其他由IO线程池上执行;

public class ExecutionDispatcher implements Dispatcher {

    public static final String NAME = "execution";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ExecutionChannelHandler(handler, url);
    }

}

在这里插入图片描述

  • 在IO线程中执行的操作有:
    • sent、connected、disconnected、caught操作在IO线程上执行。
    • 序列化响应在IO线程上执行。
  • 在Dubbo线程中执行的操作有:
    • received都是在Dubbo线程上执行的。
    • 反序列化请求的行为在Dubbo中做的。

3.5 ConnectionOrderedDispatcher

在IO线程上,将连接断开事件放入队列,有序逐个执行,其他消息派发到线程池;

public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {

    protected final ThreadPoolExecutor connectionExecutor;
    private final int queuewarninglimit;

    public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
        String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // 初始化了一个connection的连接线程池,最大和核心线程数都是1;
        // LinkedQueue最大长度为Integer.MAX_VALUE;
        connectionExecutor = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                new NamedThreadFactory(threadName, true),
                new AbortPolicyWithReport(threadName, url)
        );  // FIXME There's no place to release connectionExecutor!
        // 虽然 connection的连接池队列是LinkedQueue,但是,这个limit会限制 默认 1000;
        queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        try {
            // 检查队列是否超过limit;
            checkQueueLength();
            // 未超过,则交至connectionExecutor执行;
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        try {
            // 检查队列是否超过limit;
            checkQueueLength();
            // 未超过,则交至connectionExecutor执行;
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);
        }
    }


    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            // 其余的操作还是交至 业务线程池去执行;
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //fix, reject exception can not be sent to consumer because thread pool is full, resulting in consumers waiting till timeout.
            if (message instanceof Request && t instanceof RejectedExecutionException) {
                Request request = (Request) message;
                if (request.isTwoWay()) {
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            // 其余的操作还是交至 业务线程池去执行;
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }

    private void checkQueueLength() {
        if (connectionExecutor.getQueue().size() > queuewarninglimit) {
            logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit));
        }
    }
}

在这里插入图片描述

  • 在IO线程中执行的操作有:
    • sent操作在IO线程上执行。
    • 序列化响应在IO线程上执行。
  • 在Dubbo线程中执行的操作有:
    • received、connected、disconnected、caught都是在Dubbo线程上执行的。但是connected和disconnected两个行为是与其他两个行为通过线程池隔离开的。并且在Dubbo connected thread pool中提供了链接限制、告警等能力。
    • 反序列化请求的行为在Dubbo中做的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/15655.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

这些使用工具大推荐,现在知道不晚

1.Snip Snip是一款截图软件&#xff0c;它突出的优点就是可以制作滚动截图。 例如&#xff1a;对整个网页进行截图&#xff0c;使用Snip即可轻松获取&#xff0c;无需处理水印。 2.Sleep Cycle 快节奏、高压力的生活导致我们越来越晚睡觉&#xff0c;睡眠质量越来越差。 想提…

jsp家庭农场投入品信息管理系统Myeclipse开发mysql数据库web结构jsp编程计算机网页项目

一、源码特点 jsp家庭农场投入品信息管理系统是一套完善的java web信息管理系统 serlvet dao bean 开发&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为TOMCAT7.0,Myeclipse8.5开发…

SQL学习日记

目录 一、数据定义&#xff08;create&#xff0c;alter&#xff0c;drop&#xff09; 1.1数据类型 补充注释 1.2定义基本表&#xff08;create&#xff0c;alter&#xff0c;drop&#xff09; 1.3约束 1.3.1主键约束 1.3.2外码约束 ​编辑 补充CASCADE 关键字 1.3.3…

深度学习 - 45.MMOE Gate 简单实现 By Keras

目录 一.引言 二.MMoE 模型分析 三.MMoE 逻辑实现 • Input • Expert Output • Gate Output • Weighted Sum • Sigmoid Output • 完整代码 四.总结 一.引言 上一篇文章介绍了 MMoE 借鉴 MoE 的思路&#xff0c;为每一类输出构建一个 Gate 并最终加权多个 Exper…

05 KVM虚拟化Linux Bridge环境部署

文章目录 05 KVM虚拟化Linux Bridge环境部署5.1 安装Linux Bridge5.1.1 安装bridge-utils软件包5.1.2 确认安装是否成功 5.2 配置Linux Bridge5.2.1 创建网桥br05.2.2 将物理网卡ens33绑定到Linux Bridge5.2.3 配置ens33的ip5.2.4 为Linux Bridge网桥br0分配ip5.2.4.1 DHCP设置…

sin(x) + cos(x) 的极大值和极小值

sinx cosx 的极大值和极小值 理论推导图像 今天遇到了一个问题&#xff0c;就是如何求解 sin ⁡ x cos ⁡ x \sin{x} \cos{x} sinxcosx 的极大值和极小值。这里特来记录一下。 理论推导 首先&#xff0c;我们假设&#xff1a; sin ⁡ x cos ⁡ x R sin ⁡ ( x α ) (…

Vue(Vue脚手架)

一、使用Vue脚手架&#xff08;Vue Cli&#xff09; Vue官方提供脚手架平台选择最新版本&#xff1a; 可以相加兼容的标准化开发工具&#xff08;开发平台&#xff09; 禁止&#xff1a;最新的开发技术版本和比较旧版本的开发平台 Vue CLI&#x1f6e0;️ Vue.js 开发的标准工…

所有知识付费都可以用 ChatGPT 再割一次?

伴随春天一起到来的&#xff0c;还有如雨后春笋般冒出的 ChatGPT / AI 相关的付费社群、课程训练营、知识星球等。 ChatGPT 吹来的这股 AI 热潮&#xff0c;这几个月想必大家多多少少都能感受到。 ▲ 图片来源&#xff1a;网络 这两张图是最近在圈子里看到的。 一张是国内各…

第五章——动态规划3

蒙德里安的梦想 我们在黑框内横着放红框&#xff0c;我们发现当横向小方格摆好之后&#xff0c;纵向小方格只能一次纵向摆好&#xff0c;即纵向小方格只有一种方案&#xff0c;即整个摆放小方格的方案数等于横着摆放小方格的方案数 f[i,j]表示的是现在要在第i列摆&#xff0c;j…

MyBats

一、MyBatis简介 1. MyBatis历史 MyBatis最初是Apache的一个开源项目iBatis, 2010年6月这个项目由Apache Software Foundation迁移到了Google Code。随着开发团队转投Google Code旗下&#xff0c; iBatis3.x正式更名为MyBatis。代码于2013年11月迁移到Github。 iBatis一词来…

Packet Tracer - 研究直连路由

Packet Tracer - 研究直连路由 目标 第 1 部分&#xff1a;研究 IPv4 直连路由 第 2 部分&#xff1a;研究 IPv6 直连路由 拓扑图 背景信息 本活动中的网络已配置。 您将登录路由器并使用 show 命令发现并回答以下有关直连路由的问题。 注&#xff1a;用户 EXEC 密码是 c…

通用智能的瓶颈及可能的解决途径

通用智能是指能够在各种不同的任务和环境中灵活地适应和执行任务的智能。通用智能与特定任务的智能相反&#xff0c;后者只能在特定领域或任务中表现出色。通用智能的理论基础是人工智能领域的通用人工智能&#xff08;AGI&#xff09;研究&#xff0c;旨在设计出能够像人类一样…

三分钟看懂Python分支循环规范:if elif for while

人生苦短&#xff0c;我用python 分支与循环 条件是分支与循环中最为核心的点&#xff0c; 解决的问题场景是不同的问题有不同的处理逻辑。 当满足单个或者多个条件或者不满足条件进入分支和循环&#xff0c; 这里也就说明这个对相同问题处理执行逻辑依据具体参数动态变化&…

从0搭建Vue3组件库(四): 如何开发一个组件

本篇文章将介绍如何在组件库中开发一个组件,其中包括 如何本地实时调试组件如何让组件库支持全局引入如何在 setup 语法糖下给组件命名如何开发一个组件 目录结构 在packages目录下新建components和utils两个包,其中components就是我们组件存放的位置,而utils包则是存放一些…

史上最全Maven教程(五)

文章目录 &#x1f525;Maven聚合案例_搭建dao模块&#x1f525;Maven聚合案例_搭建service模块&#x1f525;Maven聚合案例_搭建web模块&#x1f525;Maven聚合案例_运行项目&#x1f525;依赖传递失效及解决方案 &#x1f525;Maven聚合案例_搭建dao模块 dao子工程中一般写实…

055:cesium两种方法加载天地影像图

第055个 点击查看专栏目录 本示例的目的是介绍如何在vue+cesium中用两种方法加载天地影像图。一种是利用WebMapTileServiceImageryProvider,另一种是利用UrlTemplateImageryProvider. 直接复制下面的 vue+cesium源代码,操作2分钟即可运行实现效果. 文章目录 示例效果配置方…

面试题30天打卡-day14

1、线程的生命周期是什么&#xff0c;线程有几种状态&#xff0c;什么是上下文切换&#xff1f; 线程通常有五种状态&#xff1a;创建&#xff0c;就绪&#xff0c;运行、阻塞和死亡状态。 新建状态&#xff08;New&#xff09;&#xff1a;新创建了一个线程对象。就绪状态&am…

controlnet1.1模型和预处理器功能详解(各预处理器出稿对比及对应模型说明)

ControlNet 1.1 与 ControlNet 1.0 具有完全相同的体系结构,ControlNet 1.1 包括所有以前的模型&#xff0c;具有改进的稳健性和结果质量&#xff0c;且增加并细化了多个模型。 命名规范 项目名版本号标识基础模型版本功能名文件后缀名 control 官方总是以control为项目名&…

Go | 一分钟掌握Go | 9 - 通道

作者&#xff1a;Mars酱 声明&#xff1a;本文章由Mars酱编写&#xff0c;部分内容来源于网络&#xff0c;如有疑问请联系本人。 转载&#xff1a;欢迎转载&#xff0c;转载前先请联系我&#xff01; 前言 在Java中&#xff0c;多线程之间的通信方式有哪些&#xff1f;记得吗&…

【云计算•云原生】3.一小时熟练掌握docker容器

文章目录 docker简介ubuntu下安装dockerkali下安装dockerdocker基本命令docker搭建mysql、nginx、redis容器/镜像打包搭建私有镜像仓库docker网络管理Dockerfile文件docker-compose.yml示例&#xff1a;搭建lamp docker简介 docker是一个开源的应用容器引擎&#xff0c;可以让…