SpringBoot的任务执行器
Spring Boot通过auto-configuration机制自动创建了任务执行器Task Execution,因此在SpringBoot项目中,你不需要任何配置、也不需要自己创建Task Execution就可以直接使用它。
Spring Boot通过auto-configuration机制创建的任务执行器有以下作用:
- asynchronous task execution (@EnableAsync):通过@EnableAsync以及@Async注解使用任务执行器。
 - Spring for GraphQL’s asynchronous handling of Callable return values from controller methods(这个没用过)。
 - Spring MVC’s asynchronous request processing:Spring MVC的异步请求处理。
 - Spring WebFlux’s blocking execution support
 
除以上官网提到的,你还可以:手动使用任务执行器执行异步任务。
SpringBoot通过auto-configuration机制帮你创建了任务执行器TaskExecution,至于怎么通过TaskExecution、执行什么异步任务当然是你自己的事情了。
今天研究两部分内容:
- SpringBoot的auto-configuration机制创建任务执行器的过程。
 - 通过任务执行器执行任务,主要是上述提到的官网内容第1项@EnableAsync以及@Async注解的原理及使用。
 
Spring Boot创建任务执行器
SpringBoot官网说的很明确,TaskExecutor通过SpringBoot的auto-configuration技术创建。我们知道SpringBoot的auto-configuration技术(详情请参考:SpringBoot 自动配置@EnableAutoConfiguration)是通过META-INF/spring.factories文件指定自动配置内容的,我们打开spring.factories文件找一下taskExecutor的相关内容,在EnableAutoConfiguration项下果然发现了TaskExecutionAutoConfiguration:
 
 TaskExecutionAutoConfiguration在org.springframework.boot.autoconfigure.task包下,代码不算长,比较简单:
@ConditionalOnClass(ThreadPoolTaskExecutor.class)
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(TaskExecutionProperties.class)
public class TaskExecutionAutoConfiguration {
	/**
	 * Bean name of the application {@link TaskExecutor}.
	 */
	public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";
	@Bean
	@ConditionalOnMissingBean
	public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties,
			ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers,
			ObjectProvider<TaskDecorator> taskDecorator) {
		TaskExecutionProperties.Pool pool = properties.getPool();
		TaskExecutorBuilder builder = new TaskExecutorBuilder();
		builder = builder.queueCapacity(pool.getQueueCapacity());
		builder = builder.corePoolSize(pool.getCoreSize());
		builder = builder.maxPoolSize(pool.getMaxSize());
		builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
		builder = builder.keepAlive(pool.getKeepAlive());
		Shutdown shutdown = properties.getShutdown();
		builder = builder.awaitTermination(shutdown.isAwaitTermination());
		builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
		builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
		builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);
		builder = builder.taskDecorator(taskDecorator.getIfUnique());
		return builder;
	}
	@Lazy
	@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
			AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
	@ConditionalOnMissingBean(Executor.class)
	public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
		return builder.build();
	}
}
 
它当然是一个Configuration类,在SpringBoot启动的过程@Bean注解指定的方法会被加载到Spring IoC容器中。其次,通过@EnableConfigurationProperties指定了配置类TaskExecutionProperties。
被加载到Ioc容器中的有两个对象:一个是通过taskExecutorBuilder方法加载的TaskExecutorBuilder,另外一个是通过applicationTaskExecutor方法加载的ThreadPoolTaskExecutor。
分别看一下。
TaskExecutorBuilder的创建
TaskExecutor构建器,在applicationTaskExecutor方法中负责构建TaskExecutor。
taskExecutorBuilder方法会接收一个参数TaskExecutionProperties ,用来指定TaskExecutor的各属性,比如queueCapacity、coreSize、keepAlive等等线程池相关参数,线程池相关内容请参考:线程池 - ThreadPoolExecutor源码分析。
简单看一眼TaskExecutionProperties类:
@ConfigurationProperties("spring.task.execution")
public class TaskExecutionProperties {
	private final Pool pool = new Pool();
	private final Shutdown shutdown = new Shutdown();
	/**
	 * Prefix to use for the names of newly created threads.
	 */
	private String threadNamePrefix = "task-";
	public Pool getPool() {
		return this.pool;
	}
	public Shutdown getShutdown() {
		return this.shutdown;
	}
	public String getThreadNamePrefix() {
		return this.threadNamePrefix;
	}
	public void setThreadNamePrefix(String threadNamePrefix) {
		this.threadNamePrefix = threadNamePrefix;
	}
	public static class Pool {
		/**
		 * Queue capacity. An unbounded capacity does not increase the pool and therefore
		 * ignores the "max-size" property.
		 */
		private int queueCapacity = Integer.MAX_VALUE;
		/**
		 * Core number of threads.
		 */
		private int coreSize = 8;
		/**
		 * Maximum allowed number of threads. If tasks are filling up the queue, the pool
		 * can expand up to that size to accommodate the load. Ignored if the queue is
		 * unbounded.
		 */
		private int maxSize = Integer.MAX_VALUE;
		/**
		 * Whether core threads are allowed to time out. This enables dynamic growing and
		 * shrinking of the pool.
		 */
		private boolean allowCoreThreadTimeout = true;
		/**
		 * Time limit for which threads may remain idle before being terminated.
		 */
		private Duration keepAlive = Duration.ofSeconds(60);
		public int getQueueCapacity() {
			return this.queueCapacity;
		}
		public void setQueueCapacity(int queueCapacity) {
			this.queueCapacity = queueCapacity;
		}
		public int getCoreSize() {
			return this.coreSize;
		}
		public void setCoreSize(int coreSize) {
			this.coreSize = coreSize;
		}
		public int getMaxSize() {
			return this.maxSize;
		}
		public void setMaxSize(int maxSize) {
			this.maxSize = maxSize;
		}
		public boolean isAllowCoreThreadTimeout() {
			return this.allowCoreThreadTimeout;
		}
		public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
			this.allowCoreThreadTimeout = allowCoreThreadTimeout;
		}
		public Duration getKeepAlive() {
			return this.keepAlive;
		}
		public void setKeepAlive(Duration keepAlive) {
			this.keepAlive = keepAlive;
		}
	}
 
正是@EnableConfigurationProperties以及@ConfigurationProperties注解决定了我们可以在配置文件(比如application.yml)中指定TaskExecutionProperties中的这些有关线程池的参数。
接收到这些配置参数之后,使用配置参数创建TaskExecutorBuilder,交给Spring Ioc容器。
ThreadPoolTaskExecutor 的创建
applicationTaskExecutor方法通过上面创建出来的TaskExecutorBuilder的build方法创建。
TaskExecutorBuilderd的build方法:
	public ThreadPoolTaskExecutor build() {
		return configure(new ThreadPoolTaskExecutor());
	}
 
new了一个ThreadPoolTaskExecutor对象,调用configure方法:
	public <T extends ThreadPoolTaskExecutor> T configure(T taskExecutor) {
		PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
		map.from(this.queueCapacity).to(taskExecutor::setQueueCapacity);
		map.from(this.corePoolSize).to(taskExecutor::setCorePoolSize);
		map.from(this.maxPoolSize).to(taskExecutor::setMaxPoolSize);
		map.from(this.keepAlive).asInt(Duration::getSeconds).to(taskExecutor::setKeepAliveSeconds);
		map.from(this.allowCoreThreadTimeOut).to(taskExecutor::setAllowCoreThreadTimeOut);
		map.from(this.awaitTermination).to(taskExecutor::setWaitForTasksToCompleteOnShutdown);
		map.from(this.awaitTerminationPeriod).as(Duration::toMillis).to(taskExecutor::setAwaitTerminationMillis);
		map.from(this.threadNamePrefix).whenHasText().to(taskExecutor::setThreadNamePrefix);
		map.from(this.taskDecorator).to(taskExecutor::setTaskDecorator);
		if (!CollectionUtils.isEmpty(this.customizers)) {
			this.customizers.forEach((customizer) -> customizer.customize(taskExecutor));
		}
		return taskExecutor;
	}
 
将配置文件传递过来的参数传递给创建出来的ThreadPoolTaskExecutor对象并返回。
不配置的情况下,线程池默认参数在TaskExecutionProperties中指定:
 
 ThreadPoolTaskExecutor创建完成!
TaskExecutor的使用
既然Spring Boot已经帮助我们完成了TaskExecutor的创建并注入了Spring Ioc容器中,接下来我们就看一下该怎么使用它。
首先要尝试的是“手动使用”,不使用Spring的注解、而是想办法在代码中直接从Spring容器中获取到TaskExecutor之后调用他的execute方法。
首先创建一个Spring Boot项目,不需要什么特殊功能,pom文件也很简单,引入spring-web即可:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.5</version>
<!--        <version>3.1.4</version>-->
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>springbootstart</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springbootstart</name>
    <description>springbootstart</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
 
然后,创建一个userService:
@Service
@Slf4j
public class UserService {
    @Autowired
    private TaskExecutor taskExecutor;
    public void test2(){
        taskExecutor.execute(()->{
            log.info("this is userservice test2 start...");
            try{
                Thread.sleep(10000);
            }catch (Exception e){
            }
            log.info("This is userService' test2 end...");
        });
    }
}
 
userService非常简单,比较重要的是:
    @Autowired
    private TaskExecutor taskExecutor;
 
这行代码通过@Autowired自动装配一个TaskExecutor 对象,因为我们从前面对Spring Boot代码的分析,Spring Boot应该是在启动的过程中已经通过auto-configuration机制自动创建并注入了TaskExecutor,所以按道理我们是可以通过自动装配的方式在userService中应用它的。
然后写一个test2方法,log看一下装配进来的taskExecutor到底是个啥对象,再调用taskExecutor的execute的方法模拟异步执行任务,执行前后打印log。
然后,写controller:
@RestController
@RequestMapping("/hello")
@Slf4j
public class HelloWorldController {
    public HelloWorldController(){
    }
    @Autowired
    private UserService userService;
    @GetMapping ("/test2")
    public String test2(){
        userService.test2();
        log.info("after userservice test2 ...");
        return "hello";
    }
}
 
OK,代码准备好了,启动应用,测试。通过应用的端口号8002可以正常访问:
 
而且,结果可以立即返回,前台并不需要等待userService的test2方法中睡眠的10秒钟,说明睡眠的线程一定是通过taskExecutor调用起来的异步线程,taskExecutor一定是生效了。
后台log也说明确实如此:
 
 前面的log是前台调用接口、tomcat的线程nio-8002-exec-1打印的,之后taskExecutor启动了新线程task-1,后面的两行日志是线程task-1打印的。
@Async注解
自己写代码使用taskExecutor线程池启动新线程执行任务这种方式虽然行得通,但是太low太繁琐了,既然使用了Spring框架,我们当然不需要这么麻烦。Spring给我们提供了@Async注解。
@Async注解可以用在方法上,也可以用在类上,不管用在方法上、还是用在类上,都要求当前类必须是受Spring管理的bean,因为@Async注解是通过Spring的BeanPostProcessor机制生效的。
我们改造UserService类,再编写一个test方法:
    @Async
    public void test(){
        log.info("This is userService' test start...");
        try {
            Thread.sleep(10000);
        }catch (Exception e){
        }
        log.info("This is userService' test end...");
        return;
    }
 
代码逻辑也非常简单,和test2方法一样,睡眠10秒后才返回结果。
重新启动应用后测试,发现@Async没有生效!
不生效的原因是缺少@EnableAsync注解,在启动类增加@EnableAsync注解后重新测试,发现@Async生效了,测试结果和test2的一样,所以也就不贴图了。
接下来的任务是,研究@EnableAsync注解的作用,为什么没有@EnableAsync注解的情况下,@Async注解不能生效。
@EnableAsync注解的底层原理
关于Spring的@Enablexxx注解,我们前面的文章分析过,基本就是通过@Configuration+@Import注解的联合使用达到注入指定对象到Spring IoC容器中。
先看@EnableAsync源码:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
 
通过@Import注解引入AsyncConfigurationSelector类,继续跟踪AsyncConfigurationSelector代码:
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
	private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
			"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
	/**
	 * Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration}
	 * for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()},
	 * respectively.
	 */
	@Override
	@Nullable
	public String[] selectImports(AdviceMode adviceMode) {
		switch (adviceMode) {
			case PROXY:
				return new String[] {ProxyAsyncConfiguration.class.getName()};
			case ASPECTJ:
				return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
			default:
				return null;
		}
	}
}
 
扩展了AdviceModeImportSelector类,而AdviceModeImportSelector类实现了ImportSelector接口,而ImportSelector接口这种方式最终是通过他的方法selectImports来实现注入的(这部分可以参考 SpringBoot 自动配置@EnableAutoConfiguration)。
selectImports方法根据adviceMode(默认是PROXY)会引入ProxyAsyncConfiguration类:
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
		Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
		bpp.configure(this.executor, this.exceptionHandler);
		Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
		if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
			bpp.setAsyncAnnotationType(customAsyncAnnotation);
		}
		bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
		bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
		return bpp;
	}
}
 
ProxyAsyncConfiguration 是一个配置类,会通过@Bean注解注入一个叫AsyncAnnotationBeanPostProcessor 的BeanPostProcessor。从类名称我们就可以猜测到@Async注解就是通过这个后置处理器进行处理的。
接下来的代码跟踪还是稍稍有点复杂的。
首先,AsyncAnnotationBeanPostProcessor 通过父类AbstractBeanFactoryAwareAdvisingPostProcessor实现了BeanFactoryAware接口,所以我们知道他的setBeanFactory方法在Spring的Bean创建过程中会被回调:
	@Override
	public void setBeanFactory(BeanFactory beanFactory) {
		super.setBeanFactory(beanFactory);
		AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
		if (this.asyncAnnotationType != null) {
			advisor.setAsyncAnnotationType(this.asyncAnnotationType);
		}
		advisor.setBeanFactory(beanFactory);
		this.advisor = advisor;
	}
 
setBeanFactory方法中会创建一个advisor类AsyncAnnotationAdvisor,从名字中我们又可以猜测到,@Async注解最终应该会通过AOP技术实现。
继续跟踪AsyncAnnotationAdvisor源码,构造器:
	public AsyncAnnotationAdvisor(
   		@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
   	Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
   	asyncAnnotationTypes.add(Async.class);
   	try {
   		asyncAnnotationTypes.add((Class<? extends Annotation>)
   				ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
   	}
   	catch (ClassNotFoundException ex) {
   		// If EJB 3.1 API not present, simply ignore.
   	}
   	this.advice = buildAdvice(executor, exceptionHandler);
   	this.pointcut = buildPointcut(asyncAnnotationTypes);
   }
 
调用buildAdvice和buildPointcut,创建切面和切点:
	protected Advice buildAdvice(
   		@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
   	AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
   	interceptor.configure(executor, exceptionHandler);
   	return interceptor;
   }
 
构造切面的方法会创建一个AnnotationAsyncExecutionInterceptor 类,回忆一下AOP相关知识,我们知道Pointcut满足的情况下会调用切面类的invoke方法。
构造Pointcut的方法源码我们就不再跟踪了,可以猜测到他的匹配逻辑应该是检查当前方法(或者当前类)是否有@Async注解。
接下来我们就继续跟踪AnnotationAsyncExecutionInterceptor 类。
AnnotationAsyncExecutionInterceptor继承自父类AsyncExecutionInterceptor,invoke方法在他父类AsyncExecutionInterceptor中。
	public Object invoke(final MethodInvocation invocation) throws Throwable {
		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
		Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
		final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
		AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
		if (executor == null) {
			throw new IllegalStateException(
					"No executor specified and no default executor set on AsyncExecutionInterceptor either");
		}
		Callable<Object> task = () -> {
			try {
				Object result = invocation.proceed();
				if (result instanceof Future) {
					return ((Future<?>) result).get();
				}
			}
			catch (ExecutionException ex) {
				handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
			}
			catch (Throwable ex) {
				handleError(ex, userDeclaredMethod, invocation.getArguments());
			}
			return null;
		};
		return doSubmit(task, executor, invocation.getMethod().getReturnType());
	}
 
invoke方法就是实现异步调用的地方!
首先会通过determineAsyncExecutor方法获取taskExecutor,这也是我们关心的地方,不过我们先放放,先看一下拿到TaskExecutor之后的处理逻辑。
代码并不复杂,lamda方式创建一个callable任务,通过invocation.proceed()执行原方法。
通过doSubmit方法、使用TaskExecutor启动新的线程调用task任务、完成对原方法的执行!
主要代码跟踪完毕。
最后,再来看一下determineAsyncExecutor方法:
	protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
		AsyncTaskExecutor executor = this.executors.get(method);
		if (executor == null) {
			Executor targetExecutor;
			String qualifier = getExecutorQualifier(method);
			if (StringUtils.hasLength(qualifier)) {
				targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
			}
			else {
				targetExecutor = this.defaultExecutor.get();
			}
			if (targetExecutor == null) {
				return null;
			}
			executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
					(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
			this.executors.put(method, executor);
		}
		return executor;
	}
 
总体的逻辑就是,通过beanFactory从Spring Ioc容器中获取TaskExecutor,首先判断是否有QualifiedExecutor,有的话通过findQualifiedExecutor方法从容器中获取QualifiedExecutor,没有的话通过this.defaultExecutor.get()获取。
this.defaultExecutor.get()的业务逻辑需要基于接口AsyncConfigurer来解释:
public interface AsyncConfigurer {
	/**
	 * The {@link Executor} instance to be used when processing async
	 * method invocations.
	 */
	@Nullable
	default Executor getAsyncExecutor() {
		return null;
	}
	/**
	 * The {@link AsyncUncaughtExceptionHandler} instance to be used
	 * when an exception is thrown during an asynchronous method execution
	 * with {@code void} return type.
	 */
	@Nullable
	default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
		return null;
	}
}
 
AsyncConfigurer 接口有两个方法,一个用来获取Executor,一个用来获取AsyncUncaughtExceptionHandler。
this.defaultExecutor.get()的业务逻辑大概可以概括为:如果应用实现了AsyncConfigurer接口,则通过该接口获取Executor,否则,如果没有提供AsyncConfigurer的实现类,则向Spring Ioc容器获取默认的TaskExecutor:
 
 OK,Thanks a lot!


![街机模拟游戏逆向工程(HACKROM)教程:[1]数据的存储与读取](https://img-blog.csdnimg.cn/direct/3227b660010942019448656e45a8218c.png)















