目录
- Tomcat参数总览
- 设置位置
- 参数分析
- Tomcat内部类maxConnections属性
- Tomcat内部类的acceptCount
- Tomcat有几个Acceptor线程
- Tomcat的工作线程池
Tomcat参数总览
package org.springframework.boot.autoconfigure.web;
/**
*
* {@link ConfigurationProperties @ConfigurationProperties} for a web server (e.g. * port
* and path settings).
*/
@ConfigurationProperties(prefix = "server", ignoreUnknownFields = true)
public class ServerProperties {
/**
* Server HTTP port.
*/
private Integer port;
/**
* Maximum size of the HTTP message header.
*/
private DataSize maxHttpHeaderSize = DataSize.ofKilobytes(8);
private final Tomcat tomcat = new Tomcat();
/**
* Tomcat properties.
*/
public static class Tomcat {
/**
* Thread related configuration.
*/
private final Threads threads = new Threads();
/**
* Tomcat base directory. If not specified, a temporary directory is used.
*/
private File basedir;
/**
* Maximum size of the form content in any HTTP post request.
*/
private DataSize maxHttpFormPostSize = DataSize.ofMegabytes(2);
/**
* Maximum amount of request body to swallow.
*/
private DataSize maxSwallowSize = DataSize.ofMegabytes(2);
/**
* Maximum number of connections that the server accepts and processes at any
* given time. Once the limit has been reached, the operating system may still
* accept connections based on the "acceptCount" property.
*/
private int maxConnections = 8192;
/**
* Maximum queue length for incoming connection requests when all possible request
* processing threads are in use.
*/
private int acceptCount = 100;
/**
* Maximum number of idle processors that will be retained in the cache and reused
* with a subsequent request. When set to -1 the cache will be unlimited with a
* theoretical maximum size equal to the maximum number of connections.
*/
private int processorCache = 200;
/**
* Amount of time the connector will wait, after accepting a connection, for the
* request URI line to be presented.
*/
private Duration connectionTimeout;
}
/**
* Tomcat thread properties.
*/
public static class Threads {
/**
* Maximum amount of worker threads.
*/
private int max = 200;
/**
* Minimum amount of worker threads.
*/
private int minSpare = 10;
}
}
设置位置
public class TomcatWebServerFactoryCustomizer
implements WebServerFactoryCustomizer<ConfigurableTomcatWebServerFactory>, Ordered {
@Override
public void customize(ConfigurableTomcatWebServerFactory factory) {
ServerProperties properties = this.serverProperties;
ServerProperties.Tomcat tomcatProperties = properties.getTomcat();
PropertyMapper propertyMapper = PropertyMapper.get();
propertyMapper.from(tomcatProperties::getBasedir).whenNonNull().to(factory::setBaseDirectory);
propertyMapper.from(tomcatProperties::getBackgroundProcessorDelay).whenNonNull().as(Duration::getSeconds)
.as(Long::intValue).to(factory::setBackgroundProcessorDelay);
customizeRemoteIpValve(factory);
ServerProperties.Tomcat.Threads threadProperties = tomcatProperties.getThreads();
propertyMapper.from(threadProperties::getMax).when(this::isPositive)
.to((maxThreads) -> customizeMaxThreads(factory, threadProperties.getMax()));
propertyMapper.from(threadProperties::getMinSpare).when(this::isPositive)
.to((minSpareThreads) -> customizeMinThreads(factory, minSpareThreads));
propertyMapper.from(this.serverProperties.getMaxHttpHeaderSize()).whenNonNull().asInt(DataSize::toBytes)
.when(this::isPositive)
.to((maxHttpHeaderSize) -> customizeMaxHttpHeaderSize(factory, maxHttpHeaderSize));
propertyMapper.from(tomcatProperties::getMaxSwallowSize).whenNonNull().asInt(DataSize::toBytes)
.to((maxSwallowSize) -> customizeMaxSwallowSize(factory, maxSwallowSize));
propertyMapper.from(tomcatProperties::getMaxHttpFormPostSize).asInt(DataSize::toBytes)
.when((maxHttpFormPostSize) -> maxHttpFormPostSize != 0)
.to((maxHttpFormPostSize) -> customizeMaxHttpFormPostSize(factory, maxHttpFormPostSize));
propertyMapper.from(tomcatProperties::getAccesslog).when(ServerProperties.Tomcat.Accesslog::isEnabled)
.to((enabled) -> customizeAccessLog(factory));
propertyMapper.from(tomcatProperties::getUriEncoding).whenNonNull().to(factory::setUriEncoding);
propertyMapper.from(tomcatProperties::getConnectionTimeout).whenNonNull()
.to((connectionTimeout) -> customizeConnectionTimeout(factory, connectionTimeout));
propertyMapper.from(tomcatProperties::getMaxConnections).when(this::isPositive)
.to((maxConnections) -> customizeMaxConnections(factory, maxConnections));
propertyMapper.from(tomcatProperties::getAcceptCount).when(this::isPositive)
.to((acceptCount) -> customizeAcceptCount(factory, acceptCount));
propertyMapper.from(tomcatProperties::getProcessorCache)
.to((processorCache) -> customizeProcessorCache(factory, processorCache));
propertyMapper.from(tomcatProperties::getRelaxedPathChars).as(this::joinCharacters).whenHasText()
.to((relaxedChars) -> customizeRelaxedPathChars(factory, relaxedChars));
propertyMapper.from(tomcatProperties::getRelaxedQueryChars).as(this::joinCharacters).whenHasText()
.to((relaxedChars) -> customizeRelaxedQueryChars(factory, relaxedChars));
customizeStaticResources(factory);
customizeErrorReportValve(properties.getError(), factory);
}
private boolean isPositive(int value) {
return value > 0;
}
private void customizeAcceptCount(ConfigurableTomcatWebServerFactory factory, int acceptCount) {
factory.addConnectorCustomizers((connector) -> {
ProtocolHandler handler = connector.getProtocolHandler();
if (handler instanceof AbstractProtocol) {
AbstractProtocol<?> protocol = (AbstractProtocol<?>) handler;
protocol.setAcceptCount(acceptCount);
}
});
}
private void customizeProcessorCache(ConfigurableTomcatWebServerFactory factory, int processorCache) {
factory.addConnectorCustomizers((connector) -> {
ProtocolHandler handler = connector.getProtocolHandler();
if (handler instanceof AbstractProtocol) {
((AbstractProtocol<?>) handler).setProcessorCache(processorCache);
}
});
}
private void customizeMaxConnections(ConfigurableTomcatWebServerFactory factory, int maxConnections) {
factory.addConnectorCustomizers((connector) -> {
ProtocolHandler handler = connector.getProtocolHandler();
if (handler instanceof AbstractProtocol) {
AbstractProtocol<?> protocol = (AbstractProtocol<?>) handler;
protocol.setMaxConnections(maxConnections);
}
});
}
private void customizeConnectionTimeout(ConfigurableTomcatWebServerFactory factory, Duration connectionTimeout) {
factory.addConnectorCustomizers((connector) -> {
ProtocolHandler handler = connector.getProtocolHandler();
if (handler instanceof AbstractProtocol) {
AbstractProtocol<?> protocol = (AbstractProtocol<?>) handler;
protocol.setConnectionTimeout((int) connectionTimeout.toMillis());
}
});
}
@SuppressWarnings("rawtypes")
private void customizeMaxThreads(ConfigurableTomcatWebServerFactory factory, int maxThreads) {
factory.addConnectorCustomizers((connector) -> {
ProtocolHandler handler = connector.getProtocolHandler();
if (handler instanceof AbstractProtocol) {
AbstractProtocol protocol = (AbstractProtocol) handler;
protocol.setMaxThreads(maxThreads);
}
});
}
@SuppressWarnings("rawtypes")
private void customizeMinThreads(ConfigurableTomcatWebServerFactory factory, int minSpareThreads) {
factory.addConnectorCustomizers((connector) -> {
ProtocolHandler handler = connector.getProtocolHandler();
if (handler instanceof AbstractProtocol) {
AbstractProtocol protocol = (AbstractProtocol) handler;
protocol.setMinSpareThreads(minSpareThreads);
}
});
}
}
参数分析
Tomcat内部类maxConnections属性
提出关键代码:
org.springframework.boot.autoconfigure.web.embedded.TomcatWebServerFactoryCustomizer#customizeMaxConnections
/**
* Maximum number of connections that the server accepts and processes at any
* given time. Once the limit has been reached, the operating system may still
* accept connections based on the "acceptCount" property.
* 服务器在任何给定时间接受和处理的最大连接数。一旦达到限制,操作系统仍可能接受基于“acceptCount”属性的连接。
*/
private int maxConnections = 8192;
private void customizeMaxConnections(ConfigurableTomcatWebServerFactory factory, int maxConnections) {
factory.addConnectorCustomizers((connector) -> {
ProtocolHandler handler = connector.getProtocolHandler();
if (handler instanceof AbstractProtocol) {
AbstractProtocol<?> protocol = (AbstractProtocol<?>) handler;
//org.apache.coyote.AbstractProtocol#setMaxConnections
protocol.setMaxConnections(maxConnections);
}
});
}
org.apache.coyote.AbstractProtocol#setMaxConnections:
最终是被AbstractEndpoint#sexMaxConnections设置了
抽象类AbstractEndpoint的具体方法:
private int maxConnections = 8*1024;
public void setMaxConnections(int maxCon) {
this.maxConnections = maxCon;
//org.apache.tomcat.util.threads.LimitLatch
LimitLatch latch = this.connectionLimitLatch;
if (latch != null) {
// Update the latch that enforces this
if (maxCon == -1) {
releaseConnectionLatch();
} else {
latch.setLimit(maxCon);
}
} else if (maxCon > 0) {
initializeConnectionLatch();
}
}
private void releaseConnectionLatch() {
LimitLatch latch = connectionLimitLatch;
if (latch!=null) latch.releaseAll();
connectionLimitLatch = null;
}
protected LimitLatch initializeConnectionLatch() {
if (maxConnections==-1) return null;
if (connectionLimitLatch==null) {
connectionLimitLatch = new LimitLatch(getMaxConnections());
}
return connectionLimitLatch;
}
这里离不开org.apache.tomcat.util.threads.LimitLatch,先看下这个类的作用:
经过上面初始化LimitLatch的构造器方法可知,其参数是maxConnections,也就是LimitLatch的limit属性:
private final Sync sync;
private final AtomicLong count;
private volatile long limit;
/**
* Instantiates a LimitLatch object with an initial limit.
* @param limit - maximum number of concurrent acquisitions of this latch
*/
public LimitLatch(long limit) {
this.limit = limit;
this.count = new AtomicLong(0);
this.sync = new Sync();
}
重点关注下谁使用了LimitLatch的limit,可以看到是Acceptor用来判断连接是否达到了最大值。
LimitLatch内部依赖了Sync内部类,继承了AbstractQueuedSynchronizer并覆盖了其tryAcquireShared方法:
//AQS的acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
/**
* LimitLatch#Sync的tryAcquireShared方法由于count超过了maxConnections而返回-1,
* 于是执行doAcquireSharedInterruptibly,将当前线程封装为一个Shared(nextWaiter状态)节点插入到AQS队列尾部。
* 如果队列除了头部哨兵之外只有当前线程一个节点,则重新尝试递增LimitLatch的count判断是否大于maxConnections,
* 如果没有大于,则应该要唤醒AQS线程去加入继续Acceptor的流程:封装为events放入Poller事件队列
* 如果仍然是大于,则将当前Thread插入AQS队列挂起。
*
*/
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
public class LimitLatch {
private static final Log log = LogFactory.getLog(LimitLatch.class);
private class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
public Sync() {
}
/**
* count初始值0,随着Acceptor线程接收的连接越来越多,count会count.incrementAndGet()增长,
* 直到count大于limit(maxConnections)时,tryAcquireShared返回-1,同时count计数器也会递减。
*/
@Override
protected int tryAcquireShared(int ignored) {
long newCount = count.incrementAndGet();
if (!released && newCount > limit) {
// Limit exceeded
count.decrementAndGet();
return -1;
} else {
return 1;
}
}
@Override
protected boolean tryReleaseShared(int arg) {
count.decrementAndGet();
return true;
}
}
private final Sync sync;
private final AtomicLong count;
private volatile long limit;
private volatile boolean released = false;
/**
* Instantiates a LimitLatch object with an initial limit.
* @param limit - maximum number of concurrent acquisitions of this latch
*/
public LimitLatch(long limit) {
this.limit = limit;
this.count = new AtomicLong(0);
this.sync = new Sync();
}
}
总结:基于以上分析,可以确定一点,Acceptor线程一旦发现ServerSocketChannel返回了SocketChannel,就会调用org.apache.tomcat.util.net.NioEndpoint#setSocketOptions去注册SocketChannel到PollerEvent并订阅SelectionKey.OP_READ事件:
/** 位置:**org.apache.tomcat.util.net.NioEndpoint.Poller#register**
*
*
* Registers a newly created socket with the poller.
*
* @param socket The newly created socket
* @param socketWrapper The socket wrapper
*/
public void register(final NioChannel socket, final NioSocketWrapper socketWrapper) {
socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
PollerEvent event = null;
if (eventCache != null) {
event = eventCache.pop();
}
if (event == null) {
event = new PollerEvent(socket, OP_REGISTER);
} else {
event.reset(socket, OP_REGISTER);
}
addEvent(event);
}
/** 这是一个同步队列,放进去就立马要被Poller线程的Run方法取出来,丢到Worker线程池:
* Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
*/
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();
private void addEvent(PollerEvent event) {
events.offer(event);
if (wakeupCounter.incrementAndGet() == 0) {
selector.wakeup();
}
}
/** 位置: org.apache.tomcat.util.net.NioEndpoint.PollerEvent
* PollerEvent, cacheable object for poller events to avoid GC
*/
public static class PollerEvent {
private NioChannel socket;
private int interestOps;
public PollerEvent(NioChannel ch, int intOps) {
reset(ch, intOps);
}
public void reset(NioChannel ch, int intOps) {
socket = ch;
interestOps = intOps;
}
public NioChannel getSocket() {
return socket;
}
public int getInterestOps() {
return interestOps;
}
public void reset() {
reset(null, 0);
}
@Override
public String toString() {
return "Poller event: socket [" + socket + "], socketWrapper [" + socket.getSocketWrapper() +
"], interestOps [" + interestOps + "]";
}
}
Tomcat内部类的acceptCount
org.apache.tomcat.util.net.NioEndpoint在初始化时,initServerSocket时候会以参数传入
/**
* Initialize the endpoint.
*/
@Override
public void bind() throws Exception {
initServerSocket();
setStopLatch(new CountDownLatch(1));
// Initialize SSL if needed
initialiseSsl();
selectorPool.open(getName());
}
// Separated out to make it easier for folks that extend NioEndpoint to
// implement custom [server]sockets
protected void initServerSocket() throws Exception {
if (!getUseInheritedChannel()) {
serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
serverSock.socket().bind(addr,getAcceptCount());
} else {
// Retrieve the channel provided by the OS
Channel ic = System.inheritedChannel();
if (ic instanceof ServerSocketChannel) {
serverSock = (ServerSocketChannel) ic;
}
if (serverSock == null) {
throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
}
}
serverSock.configureBlocking(true); //mimic APR behavior
}
acceptCount根据源码说明是一个应对请求积压的值,但最终这个值是传入了一个jni的native方法:
/**
* Allows the server developer to specify the acceptCount (backlog) that
* should be used for server sockets. By default, this value
* is 100.
*/
private int acceptCount = 100;
所以,这里的积压应该是在TCP的读缓冲区里积压的请求数,看了一些文章的说法是:
acceptCount的队列中缓存的是建立了TCP连接等待被应用层ServerSocketChannel#accept方法返回的请求。accept队列也满了之后便会拒绝新的请求,并且没被及时处理的队列中的TCP连接会发出超时timeout响应。
Tomcat有几个Acceptor线程
把断点打在构造函数中,会发现在启动阶段只有一次调用:
public Acceptor(AbstractEndpoint<?,U> endpoint) {
this.endpoint = endpoint;
}
通过debug可以看到明显的执行时序:
- org.springframework.boot.web.servlet.context.WebServerStartStopLifecycle#start
@Override
public void start() {
this.webServer.start();
this.running = true;
this.applicationContext
.publishEvent(new ServletWebServerInitializedEvent(this.webServer, this.applicationContext));
}
- org.springframework.boot.web.embedded.tomcat.TomcatWebServer#addPreviouslyRemovedConnectors
public void start() throws WebServerException {
synchronized (this.monitor) {
if (this.started) {
return;
}
try {
addPreviouslyRemovedConnectors();
.....
}
catch (ConnectorStartFailedException ex) {
stopSilently();
throw ex;
}
catch (Exception ex) {
PortInUseException.throwIfPortBindingException(ex, () -> this.tomcat.getConnector().getPort());
throw new WebServerException("Unable to start embedded Tomcat server", ex);
}
finally {
Context context = findContext();
ContextBindings.unbindClassLoader(context, context.getNamingToken(), getClass().getClassLoader());
}
}
}
private void addPreviouslyRemovedConnectors() {
Service[] services = this.tomcat.getServer().findServices();
for (Service service : services) {
Connector[] connectors = this.serviceConnectors.get(service);
if (connectors != null) {
for (Connector connector : connectors) {
service.addConnector(connector);//添加Connector并启动
if (!this.autoStart) {
stopProtocolHandler(connector);
}
}
this.serviceConnectors.remove(service);
}
}
}
这里明显看到只有一个Service(StandardService),而这个Service也只包含了一个Connector(即Tomcat默认的Http Connector,默认8080),其实Tomcat的Connector类型还有几中:
- AJP Connector:端口默认8009
- APR Connector: 高性能扩展使用
- Https Connector
为什么只有一个,这里要看下添加的地方,这个地方的代码挺有意思的,有空一定要好好研究下:
- org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext#onRefresh
@Override
protected void onRefresh() {
super.onRefresh();
try {
createWebServer();
}
catch (Throwable ex) {
throw new ApplicationContextException("Unable to start web server", ex);
}
}
- org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext#createWebServer
private void createWebServer() {
WebServer webServer = this.webServer;
ServletContext servletContext = getServletContext();
if (webServer == null && servletContext == null) {
ServletWebServerFactory factory = getWebServerFactory();
this.webServer = factory.getWebServer(getSelfInitializer());
getBeanFactory().registerSingleton("webServerGracefulShutdown",
new WebServerGracefulShutdownLifecycle(this.webServer));
getBeanFactory().registerSingleton("webServerStartStop",
new WebServerStartStopLifecycle(this, this.webServer));
}
else if (servletContext != null) {
try {
getSelfInitializer().onStartup(servletContext);
}
catch (ServletException ex) {
throw new ApplicationContextException("Cannot initialize servlet context", ex);
}
}
initPropertySources();
}
- org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory#getWebServer
public Service getService() {
return getServer().findServices()[0];
}
/**
* Get the server object. You can add listeners and few more
* customizations. JNDI is disabled by default.
* @return The Server
*/
public Server getServer() {
if (server != null) {
return server;
}
System.setProperty("catalina.useNaming", "false");
server = new StandardServer();
initBaseDir();
// Set configuration source
ConfigFileLoader.setSource(new CatalinaBaseConfigurationSource(new File(basedir), null));
server.setPort( -1 );
//这里也只创建了一个Service
Service service = new StandardService();
service.setName("Tomcat");
server.addService(service);
return server;
}
@Override
public WebServer getWebServer(ServletContextInitializer... initializers) {
if (this.disableMBeanRegistry) {
Registry.disableRegistry();
}
Tomcat tomcat = new Tomcat();
File baseDir = (this.baseDirectory != null) ? this.baseDirectory : createTempDir("tomcat");
tomcat.setBaseDir(baseDir.getAbsolutePath());
//这里的确就创建了一个Connector
Connector connector = new Connector(this.protocol);
connector.setThrowOnFailure(true);
tomcat.getService().addConnector(connector);
customizeConnector(connector);
tomcat.setConnector(connector);
tomcat.getHost().setAutoDeploy(false);
configureEngine(tomcat.getEngine());
for (Connector additionalConnector : this.additionalTomcatConnectors) {
tomcat.getService().addConnector(additionalConnector);
}
prepareContext(tomcat.getHost(), initializers);
return getTomcatWebServer(tomcat);
}
- org.apache.catalina.core.StandardService#addConnector
/**
* Add a new Connector to the set of defined Connectors, and associate it
* with this Service's Container.
*
* @param connector The Connector to be added
*/
@Override
public void addConnector(Connector connector) {
synchronized (connectorsLock) {
connector.setService(this);
Connector results[] = new Connector[connectors.length + 1];
System.arraycopy(connectors, 0, results, 0, connectors.length);
results[connectors.length] = connector;
connectors = results;
}
try {
if (getState().isAvailable()) {
connector.start();//启动connector,Connector继承了LifecycleMBeanBase
}
} catch (LifecycleException e) {
throw new IllegalArgumentException(
sm.getString("standardService.connector.startFailed", connector), e);
}
// Report this property change to interested listeners
support.firePropertyChange("connector", null, connector);
}
- org.apache.catalina.util.LifecycleBase#start
/**
* {@inheritDoc}
*/
@Override
public final synchronized void start() throws LifecycleException {
if (LifecycleState.STARTING_PREP.equals(state) || LifecycleState.STARTING.equals(state) ||
LifecycleState.STARTED.equals(state)) {
if (log.isDebugEnabled()) {
Exception e = new LifecycleException();
log.debug(sm.getString("lifecycleBase.alreadyStarted", toString()), e);
} else if (log.isInfoEnabled()) {
log.info(sm.getString("lifecycleBase.alreadyStarted", toString()));
}
return;
}
if (state.equals(LifecycleState.NEW)) {
init();
} else if (state.equals(LifecycleState.FAILED)) {
stop();
} else if (!state.equals(LifecycleState.INITIALIZED) &&
!state.equals(LifecycleState.STOPPED)) {
invalidTransition(Lifecycle.BEFORE_START_EVENT);
}
try {
setStateInternal(LifecycleState.STARTING_PREP, null, false);
startInternal();//启动Connector内部流程
if (state.equals(LifecycleState.FAILED)) {
// This is a 'controlled' failure. The component put itself into the
// FAILED state so call stop() to complete the clean-up.
stop();
} else if (!state.equals(LifecycleState.STARTING)) {
// Shouldn't be necessary but acts as a check that sub-classes are
// doing what they are supposed to.
invalidTransition(Lifecycle.AFTER_START_EVENT);
} else {
setStateInternal(LifecycleState.STARTED, null, false);
}
} catch (Throwable t) {
// This is an 'uncontrolled' failure so put the component into the
// FAILED state and throw an exception.
handleSubClassException(t, "lifecycleBase.startFail", toString());
}
}
- org.apache.catalina.connector.Connector#startInternal
/**
* Begin processing requests via this Connector.
*
* @exception LifecycleException if a fatal startup error occurs
*/
@Override
protected void startInternal() throws LifecycleException {
// Validate settings before starting
if (getPortWithOffset() < 0) {
throw new LifecycleException(sm.getString(
"coyoteConnector.invalidPort", Integer.valueOf(getPortWithOffset())));
}
setState(LifecycleState.STARTING);
try {
protocolHandler.start();
} catch (Exception e) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerStartFailed"), e);
}
}
- org.apache.coyote.AbstractProtocol#start
@Override
public void start() throws Exception {
if (getLog().isInfoEnabled()) {
getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
logPortOffset();
}
endpoint.start();
monitorFuture = getUtilityExecutor().scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
if (!isPaused()) {
startAsyncTimeout();
}
}
}, 0, 60, TimeUnit.SECONDS);
}
- org.apache.tomcat.util.net.AbstractEndpoint#start
public final void start() throws Exception {
if (bindState == BindState.UNBOUND) {
bindWithCleanup();
bindState = BindState.BOUND_ON_START;
}
startInternal();
}
- org.apache.tomcat.util.net.NioEndpoint#startInternal
/**
* Start the NIO endpoint, creating acceptor, poller threads.
*/
@Override
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
if (socketProperties.getProcessorCache() != 0) {
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());
}
if (socketProperties.getEventCache() != 0) {
eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getEventCache());
}
if (socketProperties.getBufferPool() != 0) {
nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getBufferPool());
}
// Create worker collection
if (getExecutor() == null) {
createExecutor();
}
initializeConnectionLatch();
// Start poller thread
poller = new Poller();
Thread pollerThread = new Thread(poller, getName() + "-ClientPoller");
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
startAcceptorThread();
}
}
- org.apache.tomcat.util.net.AbstractEndpoint#startAcceptorThread
protected void startAcceptorThread() {
//到这一步创建Acceptor线程直接启动
acceptor = new Acceptor<>(this);
String threadName = getName() + "-Acceptor";
acceptor.setThreadName(threadName);
Thread t = new Thread(acceptor, threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
Tomcat的工作线程池
这里其实没什么好说的,主要就是这个TaskQueue是继承了LinkedBlockingQueue是无界队列。