0. 这一块比较简单,还就内个无障碍阅读
不谈,放个调用栈的日志先 …
// 我们自己写的 @Async 注解的方法
simpleTest:31, TestAsyncTestService (cn.angel.project.angelmicroservicesample.test.service)
invoke:-1, TestAsyncTestService$$FastClassBySpringCGLIB$$880884d7 (cn.angel.project.angelmicroservicesample.test.service)
// 从这个方法拦截器的调用顺序(异步 -> 事务)也可以想到:异步中启动事务,事务的作用域并不会带到全局,只是作用在当前线程
// 准备钻入到 我们自己写的方法中(aop的连接点 反射调用)
invoke:218, MethodProxy (org.springframework.cglib.proxy)
invokeJoinpoint:771, CglibAopProxy$CglibMethodInvocation (org.springframework.aop.framework)
proceed:163, ReflectiveMethodInvocation (org.springframework.aop.framework)
proceed:749, CglibAopProxy$CglibMethodInvocation (org.springframework.aop.framework)
proceedWithInvocation:-1, 1094377388 (org.springframework.transaction.interceptor.TransactionInterceptor$$Lambda$566)
invokeWithinTransaction:367, TransactionAspectSupport (org.springframework.transaction.interceptor)
invoke:118, TransactionInterceptor (org.springframework.transaction.interceptor)
proceed:175, ReflectiveMethodInvocation (org.springframework.aop.framework)
proceed:749, CglibAopProxy$CglibMethodInvocation (org.springframework.aop.framework)
invoke:95, ExposeInvocationInterceptor (org.springframework.aop.interceptor)
// 准备钻入到 事务的interceptor
proceed:186, ReflectiveMethodInvocation (org.springframework.aop.framework)
proceed:749, CglibAopProxy$CglibMethodInvocation (org.springframework.aop.framework)
lambda$invoke$0:115, AsyncExecutionInterceptor (org.springframework.aop.interceptor)
call:-1, 1109445703 (org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$564)
run$$$capture:266, FutureTask (java.util.concurrent)
run:-1, FutureTask (java.util.concurrent)
- Async stack trace
<init>:132, FutureTask (java.util.concurrent)
newTaskFor:102, AbstractExecutorService (java.util.concurrent)
submit:133, AbstractExecutorService (java.util.concurrent)
submit:348, ThreadPoolTaskExecutor (org.springframework.scheduling.concurrent)
doSubmit:290, AsyncExecutionAspectSupport (org.springframework.aop.interceptor)
invoke:129, AsyncExecutionInterceptor (org.springframework.aop.interceptor)
// 从这里开始走读源码
// 准备钻入到 异步的Interceptor
proceed:186, ReflectiveMethodInvocation (org.springframework.aop.framework)
proceed:749, CglibAopProxy$CglibMethodInvocation (org.springframework.aop.framework)
intercept:691, CglibAopProxy$DynamicAdvisedInterceptor (org.springframework.aop.framework)
simpleTest:-1, TestAsyncTestService$$EnhancerBySpringCGLIB$$72420f26 (cn.angel.project.angelmicroservicesample.test.service)
// 我们自己写的 控制器方法
async:57, TestEndpoint (cn.angel.project.angelmicroservicesample.endpoint)
invoke0:-2, NativeMethodAccessorImpl (sun.reflect)
invoke:62, NativeMethodAccessorImpl (sun.reflect)
invoke:43, DelegatingMethodAccessorImpl (sun.reflect)
invoke:498, Method (java.lang.reflect)
doInvoke:190, InvocableHandlerMethod (org.springframework.web.method.support)
invokeForRequest:138, InvocableHandlerMethod (org.springframework.web.method.support)
invokeAndHandle:105, ServletInvocableHandlerMethod (org.springframework.web.servlet.mvc.method.annotation)
invokeHandlerMethod:878, RequestMappingHandlerAdapter (org.springframework.web.servlet.mvc.method.annotation)
handleInternal:792, RequestMappingHandlerAdapter (org.springframework.web.servlet.mvc.method.annotation)
handle:87, AbstractHandlerMethodAdapter (org.springframework.web.servlet.mvc.method)
doDispatch:1040, DispatcherServlet (org.springframework.web.servlet)
doService:943, DispatcherServlet (org.springframework.web.servlet)
processRequest:1006, FrameworkServlet (org.springframework.web.servlet)
doPost:909, FrameworkServlet (org.springframework.web.servlet)
service:652, HttpServlet (javax.servlet.http)
service:883, FrameworkServlet (org.springframework.web.servlet)
service:733, HttpServlet (javax.servlet.http)
internalDoFilter:231, ApplicationFilterChain (org.apache.catalina.core)
doFilter:166, ApplicationFilterChain (org.apache.catalina.core)
doFilter:53, WsFilter (org.apache.tomcat.websocket.server)
internalDoFilter:193, ApplicationFilterChain (org.apache.catalina.core)
doFilter:166, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:100, RequestContextFilter (org.springframework.web.filter)
doFilter:119, OncePerRequestFilter (org.springframework.web.filter)
internalDoFilter:193, ApplicationFilterChain (org.apache.catalina.core)
doFilter:166, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:93, FormContentFilter (org.springframework.web.filter)
doFilter:119, OncePerRequestFilter (org.springframework.web.filter)
internalDoFilter:193, ApplicationFilterChain (org.apache.catalina.core)
doFilter:166, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:201, CharacterEncodingFilter (org.springframework.web.filter)
doFilter:119, OncePerRequestFilter (org.springframework.web.filter)
internalDoFilter:193, ApplicationFilterChain (org.apache.catalina.core)
doFilter:166, ApplicationFilterChain (org.apache.catalina.core)
invoke:202, StandardWrapperValve (org.apache.catalina.core)
invoke:97, StandardContextValve (org.apache.catalina.core)
invoke:542, AuthenticatorBase (org.apache.catalina.authenticator)
invoke:143, StandardHostValve (org.apache.catalina.core)
invoke:92, ErrorReportValve (org.apache.catalina.valves)
invoke:78, StandardEngineValve (org.apache.catalina.core)
service:343, CoyoteAdapter (org.apache.catalina.connector)
service:374, Http11Processor (org.apache.coyote.http11)
process:65, AbstractProcessorLight (org.apache.coyote)
process:888, AbstractProtocol$ConnectionHandler (org.apache.coyote)
doRun:1597, NioEndpoint$SocketProcessor (org.apache.tomcat.util.net)
run:49, SocketProcessorBase (org.apache.tomcat.util.net)
runWorker:1149, ThreadPoolExecutor (java.util.concurrent)
run:624, ThreadPoolExecutor$Worker (java.util.concurrent)
run:61, TaskThread$WrappingRunnable (org.apache.tomcat.util.threads)
run:748, Thread (java.lang)
1. spring.aop.methodInterceptor 老一套了
// org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
// 我们自定义方法所在的类(被代理的类)
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
// step into ...
// 这里将返回 用于获取异步线程池 的方法
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
// step into ...
// 显然这里获取异步线程池的方法,内部是有说法的!
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
// 这里创建了1个task
// MethodInvocation 将调用我们自定义的代码
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
// step into ...
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
-------------
// org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
// 参考: org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor#getExecutorQualifier
// 从@Async中获取Executor.bean.qualifier
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
// private SingletonSupplier<Executor> defaultExecutor;
// 这个Supplier函数即我们工厂类初始化缓存下来的Executor获取方法
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
// private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<>(16);
// 也就是说这一代码路径,第一次调用之后,也就顺带缓存下来了
this.executors.put(method, executor);
}
return executor;
}
--------------
// org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmit
@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
// 根据@Async注解的方法的返回类型 分为不同的代码分支
// 也就是说 被@Async注解的方法 只支持以下类型
// public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
// juc.CompletableFuture 相比 Future 新增组合的能力
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
// 其实 Callable 也可以视作 Supplier函数
// j.u.c.CompletableFuture.supplyAsync 的语义:该方法将在运行时调用
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
// 这里提交的executor即我们入参传的线程池
}, executor);
}
// spring封装的Future类型,集成了回调函数
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
// step into ...
// void
// 这个看起来比较简单,我们追踪这个
executor.submit(task);
return null;
}
}
---------
// org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor#submit(java.lang.Runnable)
@Override
public Future<?> submit(Runnable task) {
// private ThreadPoolExecutor threadPoolExecutor;
// spring异步的工厂类初始化时,会保存 默认Executor的get方法,第一次使用时调用并赋值
// 可见 ThreadPoolTaskExecutor 只是委托juc的线程池,spring并没有自己实现线程池
ExecutorService executor = getThreadPoolExecutor();
try {
// 从这里开始接入 j.u.c.AbstractExecutorService
return executor.submit(task);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
2. 小加一餐:默认Executor获取方法的调用
// step into ...
setConfigurers:77, AbstractAsyncConfiguration (org.springframework.scheduling.annotation)
invoke0:-1, NativeMethodAccessorImpl (sun.reflect)
invoke:62, NativeMethodAccessorImpl (sun.reflect)
invoke:43, DelegatingMethodAccessorImpl (sun.reflect)
invoke:498, Method (java.lang.reflect)
inject:754, AutowiredAnnotationBeanPostProcessor$AutowiredMethodElement (org.springframework.beans.factory.annotation)
inject:119, InjectionMetadata (org.springframework.beans.factory.annotation)
postProcessProperties:399, AutowiredAnnotationBeanPostProcessor (org.springframework.beans.factory.annotation)
populateBean:1420, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
doCreateBean:593, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
createBean:516, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
lambda$doGetBean$0:324, AbstractBeanFactory (org.springframework.beans.factory.support)
getObject:-1, 4073506 (org.springframework.beans.factory.support.AbstractBeanFactory$$Lambda$139)
getSingleton:234, DefaultSingletonBeanRegistry (org.springframework.beans.factory.support)
doGetBean:322, AbstractBeanFactory (org.springframework.beans.factory.support)
getBean:202, AbstractBeanFactory (org.springframework.beans.factory.support)
instantiateUsingFactoryMethod:409, ConstructorResolver (org.springframework.beans.factory.support)
instantiateUsingFactoryMethod:1336, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
createBeanInstance:1176, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
doCreateBean:556, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
createBean:516, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
lambda$doGetBean$0:324, AbstractBeanFactory (org.springframework.beans.factory.support)
getObject:-1, 4073506 (org.springframework.beans.factory.support.AbstractBeanFactory$$Lambda$139)
getSingleton:234, DefaultSingletonBeanRegistry (org.springframework.beans.factory.support)
doGetBean:322, AbstractBeanFactory (org.springframework.beans.factory.support)
getBean:207, AbstractBeanFactory (org.springframework.beans.factory.support)
registerBeanPostProcessors:229, PostProcessorRegistrationDelegate (org.springframework.context.support)
registerBeanPostProcessors:723, AbstractApplicationContext (org.springframework.context.support)
refresh:536, AbstractApplicationContext (org.springframework.context.support)
refresh:143, ServletWebServerApplicationContext (org.springframework.boot.web.servlet.context)
refresh:758, SpringApplication (org.springframework.boot)
refresh:750, SpringApplication (org.springframework.boot)
refreshContext:405, SpringApplication (org.springframework.boot)
run:315, SpringApplication (org.springframework.boot)
run:1237, SpringApplication (org.springframework.boot)
run:1226, SpringApplication (org.springframework.boot)
main:10, AngelMicroServiceSampleApplication (cn.angel.project.angelmicroservicesample)
东西并不多
// org.springframework.scheduling.annotation.AbstractAsyncConfiguration#setConfigurers
@Autowired(required = false)
void setConfigurers(Collection<AsyncConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
return;
}
if (configurers.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
AsyncConfigurer configurer = configurers.iterator().next();
// 熟悉的来咯
this.executor = configurer::getAsyncExecutor;
this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
}
3. 走个流程总结一下
除开async,其实spring很多框架都不啥干实事的,底层大多都委托给jdk、第三方(spring.validation=spring封装调用+jsr提出规范+hibernate实现细节)来实现,这也是java语言所倡导的"开放扩展"。
言归正传,补充以下类图
3.1 spring.async.ThreadpoolExecutor
3.2 spring.async.Future
3.3 spring.async.MethodInterceptor
4. 想起来了,再加一餐:SimpleAsyncTaskExecutor
-
当没有指定默认线程池,并且使用@EnableAsync + @Async的时候,会默认使用这个池子
-
这个池子提供了 节流(Throttle) 方面的实现,是基于同步锁实现的,跟j.u.c.ThreadpoolExecutor的做法(BlockQueue)还不一样,怪不得叫SimpleXxx
package org.springframework.core.task;
@SuppressWarnings("serial")
public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
implements AsyncListenableTaskExecutor, Serializable {
@Override
public void execute(Runnable task, long startTimeout) {
Assert.notNull(task, "Runnable must not be null");
Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
// ConcurrencyThrottleAdapter.beforeAccess() 算是伏笔了
this.concurrencyThrottle.beforeAccess();
doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
}
else {
doExecute(taskToUse);
}
}
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}
private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter();
public void setConcurrencyLimit(int concurrencyLimit) { this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit); }
public final int getConcurrencyLimit() { return this.concurrencyThrottle.getConcurrencyLimit(); }
public final boolean isThrottleActive() { return this.concurrencyThrottle.isThrottleActive(); }
private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {
// 实现都在 ConcurrencyThrottleSupport 里面了
@Override
protected void beforeAccess() { super.beforeAccess(); }
@Override
protected void afterAccess() { super.afterAccess(); }
}
private class ConcurrencyThrottlingRunnable implements Runnable {
private final Runnable target;
public ConcurrencyThrottlingRunnable(Runnable target) { this.target = target; }
@Override
public void run() {
try { this.target.run(); }
// ConcurrencyThrottleAdapter.afterAccess() 果然还是出现了
finally { concurrencyThrottle.afterAccess(); }
}
}
}
4.1 说白了,这才是 节流 的实现代码
package org.springframework.util;
@SuppressWarnings("serial")
public abstract class ConcurrencyThrottleSupport implements Serializable {
public static final int UNBOUNDED_CONCURRENCY = -1;
public static final int NO_CONCURRENCY = 0; // 串行何尝不是一种节流
// 重点应该就在这个 monitor 了
private transient Object monitor = new Object();
private int concurrencyLimit = UNBOUNDED_CONCURRENCY;
private int concurrencyCount = 0;
public boolean isThrottleActive() { return (this.concurrencyLimit >= 0); }
// 任务执行前调用
protected void beforeAccess() {
if (this.concurrencyLimit == NO_CONCURRENCY) {
throw new IllegalStateException(
"Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY");
}
if (this.concurrencyLimit > 0) {
boolean debug = logger.isDebugEnabled();
synchronized (this.monitor) {
boolean interrupted = false;
// 任务线程的并发数到头了,所以说,节流就是block,哈哈哈
while (this.concurrencyCount >= this.concurrencyLimit) {
if (interrupted) {
throw new IllegalStateException("Thread was interrupted while waiting for invocation access, " +
"but concurrency limit still does not allow for entering");
}
if (debug) {
logger.debug("Concurrency count " + this.concurrencyCount +
" has reached limit " + this.concurrencyLimit + " - blocking");
}
try {
// 您的block来辣
this.monitor.wait();
}
catch (InterruptedException ex) {
// Re-interrupt current thread, to allow other threads to react.
Thread.currentThread().interrupt();
interrupted = true;
}
}
if (debug) {
logger.debug("Entering throttle at concurrency count " + this.concurrencyCount);
}
// 这里还处于同步块,但不被block
// 同步中做了 计数累加
// 表示这单任务接下了
this.concurrencyCount++;
}
}
}
// 任务执行后调用
protected void afterAccess() {
if (this.concurrencyLimit >= 0) {
synchronized (this.monitor) {
// 表示任务搞定,并发数-1
this.concurrencyCount--;
if (logger.isDebugEnabled()) {
logger.debug("Returning from throttle at concurrency count " + this.concurrencyCount);
}
// 唤醒1条被beforeAccess()所block的任务线程
this.monitor.notify();
}
}
}
}