- 1. 线程模型&线程池介绍
- 1.1 线程池
- 1.2 线程模型
- 2. 线程池源码分析
- 2.1 FixedThreadPool
- 2.2 CachedThreadPool
- 2.3 LimitedThreadPool
- 3. 线程模型源码
- 3.1 AllDispatcher
- 3.2 DirectDispatcher
- 3.3 MessageOnlyDispatcher
- 3.4 ExecutionDispatcher
- 3.5 ConnectionOrderedDispatcher
1. 线程模型&线程池介绍
1.1 线程池
dubbo内部采用netty做为通信工具,netty包括bossGroup和workerGroup,bossGroup负责接收accept连接,连接就绪后,将连接交给workerGroup进行处理;
默认情况下:
bossGroup:线程个数:1个,队列长度,Integer.MAX;创建1个NioEventLoop,这个NioEventLoop主要进行accept操作;
workerGroup:线程个数:Math.min(Runtime.getRuntime().availableProcessors() + 1, 32),一般是CPU核数+1,会创建这么多的NioEventLoop;
以上这些都是属于netty的,而在dubbo中,也有一个业务线程池,为什么又要有一个业务线程池呢,是因为我们的业务一般处理慢,如果直接使用workerGroup的线程去处理业务的话,会降级dubbo的处理性能;
修改workerGroup线程个数
通过 iothreads 属性指定workerGroup个数;
dubbor线程池配置
dubbo提供了如下的线程池,可以在:dubbo:protocol threadpool=""
中指定;
- fixed:固定大小的线程池,默认大小是200,如果未配置:
queues
参数,则默认使用SynchronousQueue; - cached:缓存线程池,空闲一分钟自动删除,默认大小为:如果未配置
threads
参数,则Integer.MAX_VALUE
- limited:可伸缩线程池,但是线程池中的线程数只会增长不会收缩,这样做的目的是为了避免当进行收缩时流量突然增加造成性能问题。默认大小:如果未配置
threads
参数,则200
1.2 线程模型
上面说了dubbo的线程池,除了netty的bossGroup和workerGroup之外,另外提供的业务线程池,那什么场景下使用业务线程池,这种策略该如何选呢?
所以dubbo提供了:dubbo:protocol dispatcher=""
属性,可以指定规则:
- all:所有(除发送)消息都由业务线程池执行;包括:请求、响应、连接、断开连接、心跳;
- direct:所有消息都不派发到线程池,由workerGroup直接处理;
- message:只有请求响应发给线程池,其他连接直接在IO线程上执行;
- execution:只请求消息发到线程池,其他由IO线程池上执行;
- connection:在IO线程上,将连接断开事件放入队列,有序逐个执行,其他消息派发到线程池;
2. 线程池源码分析
源码的入口 :
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
String componentKey;
if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
componentKey = Constants.CONSUMER_SIDE;
if (url.getParameter(SHARE_EXECUTOR_KEY, false)) {
ExecutorService cExecutor = (ExecutorService) dataStore.get(componentKey, SHARED_CONSUMER_EXECUTOR_PORT);
if (cExecutor == null) {
cExecutor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
dataStore.put(componentKey, SHARED_CONSUMER_EXECUTOR_PORT, cExecutor);
cExecutor = (ExecutorService) dataStore.get(componentKey, SHARED_CONSUMER_EXECUTOR_PORT);
}
executor = cExecutor;
} else {
// 获取 URL中的 threadpool参数,参数指定的是使用的线程池类型
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
} else {
componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
}
dubbo提供了线程池ThreadPool,默认的SPI是"fixed",其实现类如下:
2.1 FixedThreadPool
public Executor getExecutor(URL url) {
// threadname 如果未配置,默认为:Dubbo
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// threads 表示corePoolSize,默认为:200
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
// queues 表示队列长度,默认为:0
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// 核心线程池大小、最大线程池大小,默认都是200,如果流量比较小的情况下,直接设置为200会导致资源占用情况严重;
// keepalivetime 0 ms
// 如果未配置queues的话,默认为0,会选用SynchronousQueue同步队列
// 否则使用LinkedBlockingQueue队列,长度为queues;
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
FixedThreadPool的话,初始化时,线程池大小即为200,不会随着流量的增长或缩小而改变线程池大小;而且队列如果指定,默认使用SynchronousQueue队列(没有长度);
2.2 CachedThreadPool
public class CachedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// threadname 如果未配置,默认为:Dubbo
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// corethreads 表示corethreads,默认为:0
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
// threads 表示maxPoolSize,默认为:Integer.MAX_VALUE
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
// queues 表示队列长度,默认为:0
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// alive 默认为:60 * 1000
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
// 指定coreSize、maxPoolSize、keepaliveTime参数;
// 队列如未指定也是默认:SynchronousQueue
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
CachedThreadPool 核心线程数为0,表示在空闲的情况下,它不需要保留任何活跃的线程,最大线程数为Integer.MAX_VALUE,队列如果未指定,则默认使用SynchronousQueue
当线程空闲1分钟时,会自动进行释放;
2.3 LimitedThreadPool
public class LimitedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// threadname 如果未配置,默认为:Dubbo
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// corethreads 表示corethreads,默认为:0
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
// threads 表示maxPoolSize,默认为:200
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// 核心线程数0,最大200,keepalivetime为Integer.MAX_VALUE
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
LimitedThreadPool表示,最大可扩容至200个线程,初始时为0,线程约为不过期;
3. 线程模型源码
all=com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher
direct=com.alibaba.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher
message=com.alibaba.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher
execution=com.alibaba.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher
connection=com.alibaba.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher
他们都继承自Dispatcher
接口,程序的入口在:
//
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
// 这里会获取Dispatcher
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
dubbo 服务消费者 入口
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
return ChannelHandlers.wrap(handler, url);
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
// 这里会获取 Dispatcher
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
各类Dispatcher的处理类都继承自ChannelHandler
public interface ChannelHandler {
/**
连接事件
*/
void connected(Channel channel) throws RemotingException;
/**
断开连接
*/
void disconnected(Channel channel) throws RemotingException;
/**
发送
*/
void sent(Channel channel, Object message) throws RemotingException;
/**
接收
*/
void received(Channel channel, Object message) throws RemotingException;
/**
异常
*/
void caught(Channel channel, Throwable exception) throws RemotingException;
}
- 建立连接:connected,主要是的职责是在channel记录read、write的时间,以及处理建立连接后的回调逻辑,比如dubbo支持在断开后自定义回调的hook(onconnect),即在该操作中执行。
- 断开连接:disconnected,主要是的职责是在channel移除read、write的时间,以及处理端开连接后的回调逻辑,比如dubbo支持在断开后自定义回调的hook(ondisconnect),即在该操作中执行。
- 发送消息:sent,包括发送请求和发送响应。记录write的时间。
- 接收消息:received,包括接收请求和接收响应。记录read的时间。
- 异常捕获:caught,用于处理在channel上发生的各类异常。
3.1 AllDispatcher
所有消息都由业务线程池执行;包括:请求、响应、连接、断开连接、心跳;它的处理handler是:AllChannelHandler
public class AllDispatcher implements Dispatcher {
public static final String NAME = "all";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
// 创建对应的handler处理类
return new AllChannelHandler(handler, url);
}
}
public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
// 它的主要任务就是初始化 executor
super(handler, url);
}
// 连接
@Override
public void connected(Channel channel) throws RemotingException {
// 获取线程池,实现方法由父类实现,初始化是由构造器调用父类构造器时进行的初始化;
ExecutorService cexecutor = getExecutorService();
try {
// 提交到线程池执行;
// ChannelEventRunnable有run方法,execute会执行它的run方法;
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}
@Override
public void disconnected(Channel channel) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
}
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
//fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
if(request.isTwoWay()){
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
}
}
}
public class ChannelEventRunnable implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ChannelEventRunnable.class);
private final ChannelHandler handler;
private final Channel channel;
private final ChannelState state;
private final Throwable exception;
private final Object message;
public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state) {
this(channel, handler, state, null);
}
public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message) {
this(channel, handler, state, message, null);
}
public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Throwable t) {
this(channel, handler, state, null, t);
}
public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message, Throwable exception) {
this.channel = channel;
this.handler = handler;
this.state = state;
this.message = message;
this.exception = exception;
}
@Override
public void run() {
// 这里面定义了各类消息的处理,其实也是会直接调用传递进来的handler的对应方法;
if (state == ChannelState.RECEIVED) {
try {
handler.received(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
} else {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case DISCONNECTED:
try {
handler.disconnected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case SENT:
try {
handler.sent(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
case CAUGHT:
try {
handler.caught(channel, exception);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is: " + message + ", exception is " + exception, e);
}
break;
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}
}
/**
* ChannelState
*
*
*/
public enum ChannelState {
/**
* CONNECTED
*/
CONNECTED,
/**
* DISCONNECTED
*/
DISCONNECTED,
/**
* SENT
*/
SENT,
/**
* RECEIVED
*/
RECEIVED,
/**
* CAUGHT
*/
CAUGHT
}
}
AllDispatcher会创建一个AllChannelHandler,而它内部会将各类处理提交至业务线程池,业务线程池会执行对应handler的消息方法处理;
- 在IO线程中执行的操作有:
- sent操作在IO线程上执行。
- 序列化响应在IO线程上执行。
- 在Dubbo线程中执行的操作有:
- received、connected、disconnected、caught都是在Dubbo线程上执行的。
- 反序列化请求的行为在Dubbo中做的。
3.2 DirectDispatcher
前面介绍过,它是所有的消息直接由workerGroup进行处理;
public class DirectDispatcher implements Dispatcher {
public static final String NAME = "direct";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return handler;
}
}
所以它的源码非常简单,直接由原handler进行处理即可;
- 在IO线程中执行的操作有:
- received、connected、disconnected、caught、sent操作在IO线程上执行。
- 反序列化请求和序列化响应在IO线程上执行。
- 并没有在Dubbo线程操作的行为。
3.3 MessageOnlyDispatcher
在provider端,Message Only Dispatcher和Execution Dispatcher的线程模型是一致的 只有请求发给线程池,其他连接直接在IO线程上执行,对应的handler处理类:MessageOnlyChannelHandler
public class MessageOnlyChannelHandler extends WrappedChannelHandler {
public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
}
其他事件都由父类WrappedChannelHandler实现,而父类的实现方式是直接调用handler.xx方法;
3.4 ExecutionDispatcher
只请求消息发到线程池,其他由IO线程池上执行;
public class ExecutionDispatcher implements Dispatcher {
public static final String NAME = "execution";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new ExecutionChannelHandler(handler, url);
}
}
- 在IO线程中执行的操作有:
- sent、connected、disconnected、caught操作在IO线程上执行。
- 序列化响应在IO线程上执行。
- 在Dubbo线程中执行的操作有:
- received都是在Dubbo线程上执行的。
- 反序列化请求的行为在Dubbo中做的。
3.5 ConnectionOrderedDispatcher
在IO线程上,将连接断开事件放入队列,有序逐个执行,其他消息派发到线程池;
public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {
protected final ThreadPoolExecutor connectionExecutor;
private final int queuewarninglimit;
public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 初始化了一个connection的连接线程池,最大和核心线程数都是1;
// LinkedQueue最大长度为Integer.MAX_VALUE;
connectionExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
new NamedThreadFactory(threadName, true),
new AbortPolicyWithReport(threadName, url)
); // FIXME There's no place to release connectionExecutor!
// 虽然 connection的连接池队列是LinkedQueue,但是,这个limit会限制 默认 1000;
queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
}
@Override
public void connected(Channel channel) throws RemotingException {
try {
// 检查队列是否超过limit;
checkQueueLength();
// 未超过,则交至connectionExecutor执行;
connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}
@Override
public void disconnected(Channel channel) throws RemotingException {
try {
// 检查队列是否超过limit;
checkQueueLength();
// 未超过,则交至connectionExecutor执行;
connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);
}
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
// 其余的操作还是交至 业务线程池去执行;
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//fix, reject exception can not be sent to consumer because thread pool is full, resulting in consumers waiting till timeout.
if (message instanceof Request && t instanceof RejectedExecutionException) {
Request request = (Request) message;
if (request.isTwoWay()) {
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
// 其余的操作还是交至 业务线程池去执行;
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
}
}
private void checkQueueLength() {
if (connectionExecutor.getQueue().size() > queuewarninglimit) {
logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit));
}
}
}
- 在IO线程中执行的操作有:
- sent操作在IO线程上执行。
- 序列化响应在IO线程上执行。
- 在Dubbo线程中执行的操作有:
- received、connected、disconnected、caught都是在Dubbo线程上执行的。但是connected和disconnected两个行为是与其他两个行为通过线程池隔离开的。并且在Dubbo connected thread pool中提供了链接限制、告警等能力。
- 反序列化请求的行为在Dubbo中做的。