Skip to content

Commit

Permalink
fix(core): use a cached thread pool for worker to reduce memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Mar 2, 2022
1 parent 18f2973 commit a01540a
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class WorkerCommand extends AbstractCommand {
@Inject
private ApplicationContext applicationContext;

@CommandLine.Option(names = {"-t", "--thread"}, description = "the number of concurrent threads to launch")
@CommandLine.Option(names = {"-t", "--thread"}, description = "the max number of concurrent threads to launch")
private int thread = Runtime.getRuntime().availableProcessors();

public WorkerCommand() {
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ public Worker(ApplicationContext applicationContext, int thread) {
this.metricRegistry = applicationContext.getBean(MetricRegistry.class);

ExecutorsUtils executorsUtils = applicationContext.getBean(ExecutorsUtils.class);
this.executors = executorsUtils.fixedThreadPool(thread, "worker");
this.executors = executorsUtils.maxCachedThreadPool(
Math.min(Runtime.getRuntime().availableProcessors(), thread),
thread,
"worker"
);
}

@Override
Expand Down
18 changes: 16 additions & 2 deletions core/src/main/java/io/kestra/core/utils/ExecutorsUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

Expand All @@ -25,6 +25,20 @@ public ExecutorService cachedThreadPool(String name) {
);
}

public ExecutorService maxCachedThreadPool(int minThread, int maxThread, String name) {
return this.wrap(
name,
new ThreadPoolExecutor(
minThread,
maxThread,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
threadFactoryBuilder.build(name + "_%d")
)
);
}

public ExecutorService fixedThreadPool(int thread, String name) {
return this.wrap(
name,
Expand Down

0 comments on commit a01540a

Please sign in to comment.