Skip to content

Commit

Permalink
Load plugins concurrently
Browse files Browse the repository at this point in the history
This adds an opt-in capability to start server concurrent. When enabled,
plugins are loaded concurrently.

In local testing, this improves `DevelopmentServer` startup time from
about 88s to about 38s (actual times very much depend on multiple
factors).

As a follow-up work, catalogs can be made to load concurrently.
  • Loading branch information
findepi committed Oct 12, 2021
1 parent ca20ca3 commit c5ac103
Show file tree
Hide file tree
Showing 11 changed files with 167 additions and 10 deletions.
29 changes: 29 additions & 0 deletions core/trino-main/src/main/java/io/trino/server/ForStartup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.server;

import javax.inject.Qualifier;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@Qualifier
public @interface ForStartup {}
15 changes: 15 additions & 0 deletions core/trino-main/src/main/java/io/trino/server/ServerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.server;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.Duration;

import javax.validation.constraints.NotNull;
Expand All @@ -25,6 +26,7 @@
public class ServerConfig
{
private boolean coordinator = true;
private boolean concurrentStartup;
private boolean includeExceptionInResponse = true;
private Duration gracePeriod = new Duration(2, MINUTES);
private boolean queryResultsCompressionEnabled = true;
Expand All @@ -42,6 +44,19 @@ public ServerConfig setCoordinator(boolean coordinator)
return this;
}

public boolean isConcurrentStartup()
{
return concurrentStartup;
}

@Config("experimental.concurrent-startup")
@ConfigDescription("Parallelize work during server startup")
public ServerConfig setConcurrentStartup(boolean concurrentStartup)
{
this.concurrentStartup = concurrentStartup;
return this;
}

public boolean isIncludeExceptionInResponse()
{
return includeExceptionInResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,11 @@
import javax.inject.Singleton;

import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
Expand Down Expand Up @@ -461,6 +463,19 @@ public static TypeOperators createTypeOperators(TypeOperatorsCache typeOperators
return new TypeOperators(typeOperatorsCache);
}

@Provides
@Singleton
@ForStartup
public static Executor createStartupExecutor(ServerConfig config)
{
if (!config.isConcurrentStartup()) {
return directExecutor();
}
return new BoundedExecutor(
newCachedThreadPool(daemonThreadsNamed("startup-%s")),
Runtime.getRuntime().availableProcessors());
}

@Provides
@Singleton
@ForExchange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,40 @@
import java.net.URL;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Streams.stream;
import static io.trino.util.Executors.executeUntilFailure;
import static java.nio.file.Files.newDirectoryStream;
import static java.util.Objects.requireNonNull;

public class ServerPluginsProvider
implements PluginsProvider
{
private final File installedPluginsDir;
private final Executor executor;

@Inject
public ServerPluginsProvider(ServerPluginsProviderConfig config)
public ServerPluginsProvider(ServerPluginsProviderConfig config, @ForStartup Executor executor)
{
this.installedPluginsDir = config.getInstalledPluginsDir();
this.executor = requireNonNull(executor, "executor is null");
}

@Override
public void loadPlugins(Loader loader, ClassLoaderFactory createClassLoader)
{
for (File file : listFiles(installedPluginsDir)) {
if (file.isDirectory()) {
loader.load(file.getAbsolutePath(), () -> createClassLoader.create(buildClassPath(file)));
}
}
executeUntilFailure(
executor,
listFiles(installedPluginsDir).stream()
.filter(File::isDirectory)
.map(file -> (Callable<?>) () -> {
loader.load(file.getAbsolutePath(), () -> createClassLoader.create(buildClassPath(file)));
return null;
})
.collect(toImmutableList()));
}

private static List<URL> buildClassPath(File path)
Expand Down
69 changes: 69 additions & 0 deletions core/trino-main/src/main/java/io/trino/util/Executors.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.util;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;

import static io.airlift.concurrent.MoreFutures.getDone;

public final class Executors
{
private Executors() {}

/**
* Run all tasks on executor returning as soon as all complete or any task fails.
* Upon task execution failure, other tasks are cancelled and interrupted, but not waited
* for.
*/
public static <T> void executeUntilFailure(Executor executor, Collection<Callable<T>> tasks)
{
CompletionService<T> completionService = new ExecutorCompletionService<>(executor);
List<Future<T>> futures = new ArrayList<>(tasks.size());
for (Callable<T> task : tasks) {
futures.add(completionService.submit(task));
}
try {
for (int i = 0; i < futures.size(); i++) {
getDone(take(completionService));
}
}
catch (Exception failure) {
try {
futures.forEach(future -> future.cancel(true));
}
catch (RuntimeException e) {
failure.addSuppressed(e);
}
throw failure;
}
}

private static <T> Future<T> take(CompletionService<T> completionService)
{
try {
return completionService.take();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public void testDefaults()
{
assertRecordedDefaults(recordDefaults(ServerConfig.class)
.setCoordinator(true)
.setConcurrentStartup(false)
.setIncludeExceptionInResponse(true)
.setGracePeriod(new Duration(2, MINUTES))
.setQueryResultsCompressionEnabled(true)
Expand All @@ -42,6 +43,7 @@ public void testExplicitPropertyMappings()
{
Map<String, String> properties = new ImmutableMap.Builder<String, String>()
.put("coordinator", "false")
.put("experimental.concurrent-startup", "true")
.put("http.include-exception-in-response", "false")
.put("shutdown.grace-period", "5m")
.put("query-results.compression-enabled", "false")
Expand All @@ -50,6 +52,7 @@ public void testExplicitPropertyMappings()

ServerConfig expected = new ServerConfig()
.setCoordinator(false)
.setConcurrentStartup(true)
.setIncludeExceptionInResponse(false)
.setGracePeriod(new Duration(5, MINUTES))
.setQueryResultsCompressionEnabled(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ node.id=will-be-overwritten
node.environment=test

coordinator=true
experimental.concurrent-startup=true
node-scheduler.include-coordinator=false
http-server.http.port=8080
query.max-memory=1GB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ node.id=will-be-overwritten
node.environment=test

coordinator=false
experimental.concurrent-startup=true
http-server.http.port=8081
query.max-memory=1GB
query.max-memory-per-node=768MB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ node.id=will-be-overwritten
node.environment=test

coordinator=true
experimental.concurrent-startup=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
query.max-memory=2GB
Expand Down
1 change: 1 addition & 0 deletions testing/trino-server-dev/etc/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
node.id=ffffffff-ffff-ffff-ffff-ffffffffffff
node.environment=test
node.internal-address=localhost
experimental.concurrent-startup=true
http-server.http.port=8080

discovery.uri=http://localhost:8080
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,41 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.server.PluginDiscovery.discoverPlugins;
import static io.trino.server.PluginDiscovery.writePluginServices;
import static io.trino.util.Executors.executeUntilFailure;
import static java.util.Objects.requireNonNull;

public class DevelopmentPluginsProvider
implements PluginsProvider
{
private final ArtifactResolver resolver;
private final List<String> plugins;
private final Executor executor;

@Inject
public DevelopmentPluginsProvider(DevelopmentLoaderConfig config)
public DevelopmentPluginsProvider(DevelopmentLoaderConfig config, @ForStartup Executor executor)
{
this.resolver = new ArtifactResolver(config.getMavenLocalRepository(), config.getMavenRemoteRepository());
this.plugins = ImmutableList.copyOf(config.getPlugins());
this.executor = requireNonNull(executor, "executor is null");
}

@Override
public void loadPlugins(Loader loader, ClassLoaderFactory createClassLoader)
{
for (String plugin : plugins) {
loader.load(plugin, () -> buildClassLoader(plugin, createClassLoader));
}
executeUntilFailure(
executor,
plugins.stream()
.map(plugin -> (Callable<?>) () -> {
loader.load(plugin, () -> buildClassLoader(plugin, createClassLoader));
return null;
})
.collect(toImmutableList()));
}

private PluginClassLoader buildClassLoader(String plugin, ClassLoaderFactory classLoaderFactory)
Expand Down

0 comments on commit c5ac103

Please sign in to comment.