diff --git a/cli/src/main/java/io/kestra/cli/commands/servers/WorkerCommand.java b/cli/src/main/java/io/kestra/cli/commands/servers/WorkerCommand.java index a9ac4fe9a8..dddbdfa873 100644 --- a/cli/src/main/java/io/kestra/cli/commands/servers/WorkerCommand.java +++ b/cli/src/main/java/io/kestra/cli/commands/servers/WorkerCommand.java @@ -21,7 +21,7 @@ public class WorkerCommand extends AbstractCommand { @Inject private ApplicationContext applicationContext; - @CommandLine.Option(names = {"-t", "--thread"}, description = "the number of concurrent threads to launch") + @CommandLine.Option(names = {"-t", "--thread"}, description = "the max number of concurrent threads to launch") private int thread = Runtime.getRuntime().availableProcessors(); public WorkerCommand() { diff --git a/core/src/main/java/io/kestra/core/runners/Worker.java b/core/src/main/java/io/kestra/core/runners/Worker.java index 7925c5f22b..7d4f6e2bc4 100644 --- a/core/src/main/java/io/kestra/core/runners/Worker.java +++ b/core/src/main/java/io/kestra/core/runners/Worker.java @@ -73,7 +73,11 @@ public Worker(ApplicationContext applicationContext, int thread) { this.metricRegistry = applicationContext.getBean(MetricRegistry.class); ExecutorsUtils executorsUtils = applicationContext.getBean(ExecutorsUtils.class); - this.executors = executorsUtils.fixedThreadPool(thread, "worker"); + this.executors = executorsUtils.maxCachedThreadPool( + Math.min(Runtime.getRuntime().availableProcessors(), thread), + thread, + "worker" + ); } @Override diff --git a/core/src/main/java/io/kestra/core/utils/ExecutorsUtils.java b/core/src/main/java/io/kestra/core/utils/ExecutorsUtils.java index db8579f1a7..60bc762e41 100644 --- a/core/src/main/java/io/kestra/core/utils/ExecutorsUtils.java +++ b/core/src/main/java/io/kestra/core/utils/ExecutorsUtils.java @@ -3,8 +3,8 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; + import jakarta.inject.Inject; import jakarta.inject.Singleton; @@ -25,6 +25,20 @@ public ExecutorService cachedThreadPool(String name) { ); } + public ExecutorService maxCachedThreadPool(int minThread, int maxThread, String name) { + return this.wrap( + name, + new ThreadPoolExecutor( + minThread, + maxThread, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + threadFactoryBuilder.build(name + "_%d") + ) + ); + } + public ExecutorService fixedThreadPool(int thread, String name) { return this.wrap( name,