Skip to content

Commit

Permalink
perf($Async): configure async thread pool for request
Browse files Browse the repository at this point in the history
QUEUE_CAPACITY = 10000
corePoolSize = Runtime.getRuntime().availableProcessors() * 2
maxPoolSize = corePoolSize * 3

BREAKING CHANGE: configure async thread pool for request
  • Loading branch information
johnnymillergh committed Aug 12, 2021
1 parent 357c131 commit 64d7776
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,20 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.context.request.async.CallableProcessingInterceptor;
import org.springframework.web.context.request.async.TimeoutCallableProcessingInterceptor;

import java.util.concurrent.Callable;
import java.util.concurrent.Executor;

/**
* Description: AsyncConfiguration, change description here.
Expand All @@ -17,25 +27,66 @@
@EnableAsync
@RequiredArgsConstructor
public class AsyncConfiguration {
private static final int QUEUE_CAPACITY = 10000;
private final MafProjectProperty mafProjectProperty;

@Bean
/**
* <p>Note: In the above example the {@code ThreadPoolTaskExecutor} is not a fully managed
* Spring bean. Add the {@code @Bean} annotation to the {@code getAsyncExecutor()} method
* if you want a fully managed bean. In such circumstances it is no longer necessary to
* manually call the {@code executor.initialize()} method as this will be invoked
* automatically when the bean is initialized.
* </p>
*
* @return customized async task executor
* @see ThreadPoolTaskExecutor
*/
@Bean(name = "asyncTaskExecutor")
public AsyncTaskExecutor asyncTaskExecutor() {
val executor = new ThreadPoolTaskExecutor();
val corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
log.info("corePoolSize = {}, for AsyncTaskExecutor", corePoolSize);
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(corePoolSize * 3);
executor.setBeanName("async-task-executor");
executor.setThreadNamePrefix(String.format("%s-", this.mafProjectProperty.getProjectArtifactId()));
executor.setQueueCapacity(QUEUE_CAPACITY);
executor.setBeanName("asyncTaskExecutor");
executor.setThreadNamePrefix(String.format("%s-async-", this.mafProjectProperty.getProjectArtifactId()));
// Specify the RejectedExecutionHandler to use for the ExecutorService.
// Default is the ExecutorService's default abort policy.
executor.setRejectedExecutionHandler((runnable, executor1) -> {
log.error("Captured rejected execution. {}", runnable);
// TODO: runnable persistence
});
executor.initialize();
executor.setRejectedExecutionHandler(
(runnable, executor1) ->
log.error("Captured rejected execution. Runnable: {}, executor task count: {}",
runnable.toString(), executor1.getTaskCount()));
// It is no longer necessary to manually call the 'executor.initialize()'
log.warn("Initial bean: '{}'", AsyncTaskExecutor.class.getSimpleName());
return executor;
}

@Bean
public AsyncConfigurer asyncConfigurer(@Qualifier("asyncTaskExecutor") AsyncTaskExecutor asyncTaskExecutor) {
log.warn("Initial bean: '{}'", AsyncConfigurer.class.getSimpleName());
return new AsyncConfigurer() {
@Override
public Executor getAsyncExecutor() {
return asyncTaskExecutor;
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
};
}

@Bean
public CallableProcessingInterceptor callableProcessingInterceptor() {
log.warn("Initial bean: {}", CallableProcessingInterceptor.class.getSimpleName());
return new TimeoutCallableProcessingInterceptor() {
@Override
public <T> Object handleTimeout(NativeWebRequest request, Callable<T> task) throws Exception {
log.error("Handling task timeout!");
return super.handleTimeout(request, task);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.web.context.request.async.CallableProcessingInterceptor;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

import java.util.Objects;

import static org.springframework.web.cors.CorsConfiguration.ALL;

/**
Expand All @@ -23,6 +29,8 @@ public class WebMvcConfiguration implements WebMvcConfigurer {
* can be cached by clients. By default this is set to 1800 seconds (30 minutes).
*/
private static final long MAX_AGE_SECS = 3600;
private final AsyncConfigurer asyncConfigurer;
private final CallableProcessingInterceptor callableProcessingInterceptor;

/**
* Configure cross origin requests processing.
Expand All @@ -40,4 +48,11 @@ public void addCorsMappings(CorsRegistry registry) {
.allowedHeaders(ALL)
.maxAge(MAX_AGE_SECS);
}

@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
configurer.setDefaultTimeout(360000)
.setTaskExecutor((AsyncTaskExecutor) Objects.requireNonNull(this.asyncConfigurer.getAsyncExecutor()))
.registerCallableInterceptors(this.callableProcessingInterceptor);
}
}

0 comments on commit 64d7776

Please sign in to comment.