From c5ac103ede428edbbe02f6394dc6d38ce4422ee2 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 5 Oct 2021 16:55:23 +0200 Subject: [PATCH] Load plugins concurrently This adds an opt-in capability to start server concurrent. When enabled, plugins are loaded concurrently. In local testing, this improves `DevelopmentServer` startup time from about 88s to about 38s (actual times very much depend on multiple factors). As a follow-up work, catalogs can be made to load concurrently. --- .../main/java/io/trino/server/ForStartup.java | 29 ++++++++ .../java/io/trino/server/ServerConfig.java | 15 ++++ .../io/trino/server/ServerMainModule.java | 15 ++++ .../trino/server/ServerPluginsProvider.java | 22 ++++-- .../main/java/io/trino/util/Executors.java | 69 +++++++++++++++++++ .../io/trino/server/TestServerConfig.java | 3 + .../multinode-master-config.properties | 1 + .../multinode-worker-config.properties | 1 + .../common/standard/config.properties | 1 + .../trino-server-dev/etc/config.properties | 1 + .../server/DevelopmentPluginsProvider.java | 20 ++++-- 11 files changed, 167 insertions(+), 10 deletions(-) create mode 100644 core/trino-main/src/main/java/io/trino/server/ForStartup.java create mode 100644 core/trino-main/src/main/java/io/trino/util/Executors.java diff --git a/core/trino-main/src/main/java/io/trino/server/ForStartup.java b/core/trino-main/src/main/java/io/trino/server/ForStartup.java new file mode 100644 index 000000000000..9ae0b34890d3 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/server/ForStartup.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.server; + +import javax.inject.Qualifier; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Retention(RUNTIME) +@Target({FIELD, PARAMETER, METHOD}) +@Qualifier +public @interface ForStartup {} diff --git a/core/trino-main/src/main/java/io/trino/server/ServerConfig.java b/core/trino-main/src/main/java/io/trino/server/ServerConfig.java index c26c04dad87f..7f16d1a9c95c 100644 --- a/core/trino-main/src/main/java/io/trino/server/ServerConfig.java +++ b/core/trino-main/src/main/java/io/trino/server/ServerConfig.java @@ -14,6 +14,7 @@ package io.trino.server; import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; import io.airlift.units.Duration; import javax.validation.constraints.NotNull; @@ -25,6 +26,7 @@ public class ServerConfig { private boolean coordinator = true; + private boolean concurrentStartup; private boolean includeExceptionInResponse = true; private Duration gracePeriod = new Duration(2, MINUTES); private boolean queryResultsCompressionEnabled = true; @@ -42,6 +44,19 @@ public ServerConfig setCoordinator(boolean coordinator) return this; } + public boolean isConcurrentStartup() + { + return concurrentStartup; + } + + @Config("experimental.concurrent-startup") + @ConfigDescription("Parallelize work during server startup") + public ServerConfig setConcurrentStartup(boolean concurrentStartup) + { + this.concurrentStartup = concurrentStartup; + return this; + } + public boolean isIncludeExceptionInResponse() { return includeExceptionInResponse; diff --git a/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java b/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java index d5e5bf87f528..fd7e52580e81 100644 --- a/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java +++ b/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java @@ -142,9 +142,11 @@ import javax.inject.Singleton; import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static com.google.inject.multibindings.Multibinder.newSetBinder; import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; import static io.airlift.concurrent.Threads.daemonThreadsNamed; @@ -461,6 +463,19 @@ public static TypeOperators createTypeOperators(TypeOperatorsCache typeOperators return new TypeOperators(typeOperatorsCache); } + @Provides + @Singleton + @ForStartup + public static Executor createStartupExecutor(ServerConfig config) + { + if (!config.isConcurrentStartup()) { + return directExecutor(); + } + return new BoundedExecutor( + newCachedThreadPool(daemonThreadsNamed("startup-%s")), + Runtime.getRuntime().availableProcessors()); + } + @Provides @Singleton @ForExchange diff --git a/core/trino-main/src/main/java/io/trino/server/ServerPluginsProvider.java b/core/trino-main/src/main/java/io/trino/server/ServerPluginsProvider.java index aa20da57b815..45c1f5e43cac 100644 --- a/core/trino-main/src/main/java/io/trino/server/ServerPluginsProvider.java +++ b/core/trino-main/src/main/java/io/trino/server/ServerPluginsProvider.java @@ -24,30 +24,40 @@ import java.net.URL; import java.nio.file.Path; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Streams.stream; +import static io.trino.util.Executors.executeUntilFailure; import static java.nio.file.Files.newDirectoryStream; +import static java.util.Objects.requireNonNull; public class ServerPluginsProvider implements PluginsProvider { private final File installedPluginsDir; + private final Executor executor; @Inject - public ServerPluginsProvider(ServerPluginsProviderConfig config) + public ServerPluginsProvider(ServerPluginsProviderConfig config, @ForStartup Executor executor) { this.installedPluginsDir = config.getInstalledPluginsDir(); + this.executor = requireNonNull(executor, "executor is null"); } @Override public void loadPlugins(Loader loader, ClassLoaderFactory createClassLoader) { - for (File file : listFiles(installedPluginsDir)) { - if (file.isDirectory()) { - loader.load(file.getAbsolutePath(), () -> createClassLoader.create(buildClassPath(file))); - } - } + executeUntilFailure( + executor, + listFiles(installedPluginsDir).stream() + .filter(File::isDirectory) + .map(file -> (Callable) () -> { + loader.load(file.getAbsolutePath(), () -> createClassLoader.create(buildClassPath(file))); + return null; + }) + .collect(toImmutableList())); } private static List buildClassPath(File path) diff --git a/core/trino-main/src/main/java/io/trino/util/Executors.java b/core/trino-main/src/main/java/io/trino/util/Executors.java new file mode 100644 index 000000000000..c98e087406a1 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/util/Executors.java @@ -0,0 +1,69 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; + +import static io.airlift.concurrent.MoreFutures.getDone; + +public final class Executors +{ + private Executors() {} + + /** + * Run all tasks on executor returning as soon as all complete or any task fails. + * Upon task execution failure, other tasks are cancelled and interrupted, but not waited + * for. + */ + public static void executeUntilFailure(Executor executor, Collection> tasks) + { + CompletionService completionService = new ExecutorCompletionService<>(executor); + List> futures = new ArrayList<>(tasks.size()); + for (Callable task : tasks) { + futures.add(completionService.submit(task)); + } + try { + for (int i = 0; i < futures.size(); i++) { + getDone(take(completionService)); + } + } + catch (Exception failure) { + try { + futures.forEach(future -> future.cancel(true)); + } + catch (RuntimeException e) { + failure.addSuppressed(e); + } + throw failure; + } + } + + private static Future take(CompletionService completionService) + { + try { + return completionService.take(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted", e); + } + } +} diff --git a/core/trino-main/src/test/java/io/trino/server/TestServerConfig.java b/core/trino-main/src/test/java/io/trino/server/TestServerConfig.java index ce04a503ab66..9fc561afad06 100644 --- a/core/trino-main/src/test/java/io/trino/server/TestServerConfig.java +++ b/core/trino-main/src/test/java/io/trino/server/TestServerConfig.java @@ -31,6 +31,7 @@ public void testDefaults() { assertRecordedDefaults(recordDefaults(ServerConfig.class) .setCoordinator(true) + .setConcurrentStartup(false) .setIncludeExceptionInResponse(true) .setGracePeriod(new Duration(2, MINUTES)) .setQueryResultsCompressionEnabled(true) @@ -42,6 +43,7 @@ public void testExplicitPropertyMappings() { Map properties = new ImmutableMap.Builder() .put("coordinator", "false") + .put("experimental.concurrent-startup", "true") .put("http.include-exception-in-response", "false") .put("shutdown.grace-period", "5m") .put("query-results.compression-enabled", "false") @@ -50,6 +52,7 @@ public void testExplicitPropertyMappings() ServerConfig expected = new ServerConfig() .setCoordinator(false) + .setConcurrentStartup(true) .setIncludeExceptionInResponse(false) .setGracePeriod(new Duration(5, MINUTES)) .setQueryResultsCompressionEnabled(false) diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/common/standard-multinode/multinode-master-config.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/common/standard-multinode/multinode-master-config.properties index 48226102a7fc..7ce0bc4b8cce 100644 --- a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/common/standard-multinode/multinode-master-config.properties +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/common/standard-multinode/multinode-master-config.properties @@ -2,6 +2,7 @@ node.id=will-be-overwritten node.environment=test coordinator=true +experimental.concurrent-startup=true node-scheduler.include-coordinator=false http-server.http.port=8080 query.max-memory=1GB diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/common/standard-multinode/multinode-worker-config.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/common/standard-multinode/multinode-worker-config.properties index fb41811d41cf..d237f5addeb0 100644 --- a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/common/standard-multinode/multinode-worker-config.properties +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/common/standard-multinode/multinode-worker-config.properties @@ -2,6 +2,7 @@ node.id=will-be-overwritten node.environment=test coordinator=false +experimental.concurrent-startup=true http-server.http.port=8081 query.max-memory=1GB query.max-memory-per-node=768MB diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/common/standard/config.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/common/standard/config.properties index cc78c206ccf9..57a852184573 100644 --- a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/common/standard/config.properties +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/common/standard/config.properties @@ -2,6 +2,7 @@ node.id=will-be-overwritten node.environment=test coordinator=true +experimental.concurrent-startup=true node-scheduler.include-coordinator=true http-server.http.port=8080 query.max-memory=2GB diff --git a/testing/trino-server-dev/etc/config.properties b/testing/trino-server-dev/etc/config.properties index 01222214aa0c..e1e4530f7421 100644 --- a/testing/trino-server-dev/etc/config.properties +++ b/testing/trino-server-dev/etc/config.properties @@ -9,6 +9,7 @@ node.id=ffffffff-ffff-ffff-ffff-ffffffffffff node.environment=test node.internal-address=localhost +experimental.concurrent-startup=true http-server.http.port=8080 discovery.uri=http://localhost:8080 diff --git a/testing/trino-server-dev/src/main/java/io/trino/server/DevelopmentPluginsProvider.java b/testing/trino-server-dev/src/main/java/io/trino/server/DevelopmentPluginsProvider.java index c39f53af5b47..b6205ce1f668 100644 --- a/testing/trino-server-dev/src/main/java/io/trino/server/DevelopmentPluginsProvider.java +++ b/testing/trino-server-dev/src/main/java/io/trino/server/DevelopmentPluginsProvider.java @@ -29,29 +29,41 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.server.PluginDiscovery.discoverPlugins; import static io.trino.server.PluginDiscovery.writePluginServices; +import static io.trino.util.Executors.executeUntilFailure; +import static java.util.Objects.requireNonNull; public class DevelopmentPluginsProvider implements PluginsProvider { private final ArtifactResolver resolver; private final List plugins; + private final Executor executor; @Inject - public DevelopmentPluginsProvider(DevelopmentLoaderConfig config) + public DevelopmentPluginsProvider(DevelopmentLoaderConfig config, @ForStartup Executor executor) { this.resolver = new ArtifactResolver(config.getMavenLocalRepository(), config.getMavenRemoteRepository()); this.plugins = ImmutableList.copyOf(config.getPlugins()); + this.executor = requireNonNull(executor, "executor is null"); } @Override public void loadPlugins(Loader loader, ClassLoaderFactory createClassLoader) { - for (String plugin : plugins) { - loader.load(plugin, () -> buildClassLoader(plugin, createClassLoader)); - } + executeUntilFailure( + executor, + plugins.stream() + .map(plugin -> (Callable) () -> { + loader.load(plugin, () -> buildClassLoader(plugin, createClassLoader)); + return null; + }) + .collect(toImmutableList())); } private PluginClassLoader buildClassLoader(String plugin, ClassLoaderFactory classLoaderFactory)