Skip to content

Commit

Permalink
Load catalogs concurrently
Browse files Browse the repository at this point in the history
When plugin concurrent loading was added
(c5ac103), the catalogs could not be
loaded concurrently easily, because `ConnectorManager.createCatalog` was
synchronized. The code has evolved since then, reducing contention
significantly and thus allowing for parallel startup.
  • Loading branch information
findepi committed Dec 5, 2022
1 parent a53d472 commit 92b128c
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.connector.system.GlobalSystemConnector;
import io.trino.metadata.Catalog;
import io.trino.metadata.CatalogManager;
import io.trino.server.ForStartup;
import io.trino.spi.TrinoException;

import javax.annotation.PreDestroy;
Expand All @@ -31,8 +32,10 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -42,6 +45,7 @@
import static io.trino.connector.CatalogHandle.createRootCatalogHandle;
import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS;
import static io.trino.spi.StandardErrorCode.CATALOG_NOT_AVAILABLE;
import static io.trino.util.Executors.executeUntilFailure;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

Expand All @@ -55,6 +59,7 @@ private enum State { CREATED, INITIALIZED, STOPPED }

private final CatalogStore catalogStore;
private final CatalogFactory catalogFactory;
private final Executor executor;

private final Lock catalogsUpdateLock = new ReentrantLock();
private final ConcurrentMap<String, CatalogConnector> catalogs = new ConcurrentHashMap<>();
Expand All @@ -63,10 +68,11 @@ private enum State { CREATED, INITIALIZED, STOPPED }
private State state = State.CREATED;

@Inject
public CoordinatorDynamicCatalogManager(CatalogStore catalogStore, CatalogFactory catalogFactory)
public CoordinatorDynamicCatalogManager(CatalogStore catalogStore, CatalogFactory catalogFactory, @ForStartup Executor executor)
{
this.catalogStore = requireNonNull(catalogStore, "catalogStore is null");
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
this.executor = requireNonNull(executor, "executor is null");
}

@PreDestroy
Expand Down Expand Up @@ -104,12 +110,17 @@ public void loadInitialCatalogs()
checkState(state != State.STOPPED, "ConnectorManager is stopped");
state = State.INITIALIZED;

for (CatalogProperties catalog : catalogStore.getCatalogs()) {
log.info("-- Loading catalog %s --", catalog.getCatalogHandle().getCatalogName());
CatalogConnector newCatalog = catalogFactory.createCatalog(catalog);
catalogs.put(catalog.getCatalogHandle().getCatalogName(), newCatalog);
log.info("-- Added catalog %s using connector %s --", catalog.getCatalogHandle().getCatalogName(), catalog.getConnectorName());
}
executeUntilFailure(
executor,
catalogStore.getCatalogs().stream()
.map(catalog -> (Callable<?>) () -> {
log.info("-- Loading catalog %s --", catalog.getCatalogHandle().getCatalogName());
CatalogConnector newCatalog = catalogFactory.createCatalog(catalog);
catalogs.put(catalog.getCatalogHandle().getCatalogName(), newCatalog);
log.info("-- Added catalog %s using connector %s --", catalog.getCatalogHandle().getCatalogName(), catalog.getConnectorName());
return null;
})
.collect(toImmutableList()));
}
finally {
catalogsUpdateLock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.connector.system.GlobalSystemConnector;
import io.trino.metadata.Catalog;
import io.trino.metadata.CatalogManager;
import io.trino.server.ForStartup;
import io.trino.spi.TrinoException;

import javax.annotation.PreDestroy;
Expand All @@ -37,8 +38,10 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;

import static com.google.common.base.MoreObjects.firstNonNull;
Expand All @@ -49,6 +52,7 @@
import static io.trino.connector.CatalogHandle.createRootCatalogHandle;
import static io.trino.spi.StandardErrorCode.CATALOG_NOT_AVAILABLE;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.util.Executors.executeUntilFailure;
import static java.util.Objects.requireNonNull;

@ThreadSafe
Expand All @@ -61,13 +65,14 @@ private enum State { CREATED, INITIALIZED, STOPPED }

private final CatalogFactory catalogFactory;
private final List<CatalogProperties> catalogProperties;
private final Executor executor;

private final ConcurrentMap<String, CatalogConnector> catalogs = new ConcurrentHashMap<>();

private final AtomicReference<State> state = new AtomicReference<>(State.CREATED);

@Inject
public StaticCatalogManager(CatalogFactory catalogFactory, StaticCatalogManagerConfig config)
public StaticCatalogManager(CatalogFactory catalogFactory, StaticCatalogManagerConfig config, @ForStartup Executor executor)
{
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
List<String> disabledCatalogs = firstNonNull(config.getDisabledCatalogs(), ImmutableList.of());
Expand Down Expand Up @@ -95,6 +100,7 @@ public StaticCatalogManager(CatalogFactory catalogFactory, StaticCatalogManagerC
catalogProperties.add(new CatalogProperties(createRootCatalogHandle(catalogName), connectorName, ImmutableMap.copyOf(properties)));
}
this.catalogProperties = catalogProperties.build();
this.executor = requireNonNull(executor, "executor is null");
}

private static List<File> listCatalogFiles(File catalogsDirectory)
Expand Down Expand Up @@ -133,13 +139,18 @@ public void loadInitialCatalogs()
return;
}

for (CatalogProperties catalog : catalogProperties) {
String catalogName = catalog.getCatalogHandle().getCatalogName();
log.info("-- Loading catalog %s --", catalogName);
CatalogConnector newCatalog = catalogFactory.createCatalog(catalog);
catalogs.put(catalogName, newCatalog);
log.info("-- Added catalog %s using connector %s --", catalogName, catalog.getConnectorName());
}
executeUntilFailure(
executor,
catalogProperties.stream()
.map(catalog -> (Callable<?>) () -> {
String catalogName = catalog.getCatalogHandle().getCatalogName();
log.info("-- Loading catalog %s --", catalogName);
CatalogConnector newCatalog = catalogFactory.createCatalog(catalog);
catalogs.put(catalogName, newCatalog);
log.info("-- Added catalog %s using connector %s --", catalogName, catalog.getConnectorName());
return null;
})
.collect(toImmutableList()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.trino.connector.CatalogServiceProviderModule.createAccessControlProvider;
Expand Down Expand Up @@ -357,7 +358,7 @@ private LocalQueryRunner(
this.optimizerConfig = new OptimizerConfig();
LazyCatalogFactory catalogFactory = new LazyCatalogFactory();
this.catalogFactory = catalogFactory;
this.catalogManager = new CoordinatorDynamicCatalogManager(NO_STORED_CATALOGS, catalogFactory);
this.catalogManager = new CoordinatorDynamicCatalogManager(NO_STORED_CATALOGS, catalogFactory, directExecutor());
this.transactionManager = InMemoryTransactionManager.create(
new TransactionManagerConfig().setIdleTimeout(new Duration(1, TimeUnit.DAYS)),
yieldExecutor,
Expand Down

0 comments on commit 92b128c

Please sign in to comment.