Skip to content

Commit

Permalink
Add spooling exchange interface
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr authored and martint committed Jan 21, 2022
1 parent 5c4b512 commit b33f9bc
Show file tree
Hide file tree
Showing 22 changed files with 749 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.exchange;

import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;

public class ExchangeManagerModule
implements Module
{
@Override
public void configure(Binder binder)
{
binder.bind(ExchangeManagerRegistry.class).in(Scopes.SINGLETON);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.exchange;

import io.airlift.log.Logger;
import io.trino.metadata.HandleResolver;
import io.trino.spi.TrinoException;
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.exchange.ExchangeManager;
import io.trino.spi.exchange.ExchangeManagerFactory;

import javax.inject.Inject;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static io.airlift.configuration.ConfigurationLoader.loadPropertiesFrom;
import static io.trino.spi.StandardErrorCode.EXCHANGE_MANAGER_NOT_CONFIGURED;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class ExchangeManagerRegistry
{
private static final Logger log = Logger.get(ExchangeManagerRegistry.class);

private static final File CONFIG_FILE = new File("etc/exchange-manager.properties");
private static final String EXCHANGE_MANAGER_NAME_PROPERTY = "exchange-manager.name";

private final HandleResolver handleResolver;

private final Map<String, ExchangeManagerFactory> exchangeManagerFactories = new ConcurrentHashMap<>();

private volatile ExchangeManager exchangeManager;

@Inject
public ExchangeManagerRegistry(HandleResolver handleResolver)
{
this.handleResolver = requireNonNull(handleResolver, "handleResolver is null");
}

public void addExchangeManagerFactory(ExchangeManagerFactory factory)
{
requireNonNull(factory, "factory is null");
if (exchangeManagerFactories.putIfAbsent(factory.getName(), factory) != null) {
throw new IllegalArgumentException(format("Exchange manager factory '%s' is already registered", factory.getName()));
}
}

public void loadExchangeManager()
{
if (!CONFIG_FILE.exists()) {
log.info("Exchange manager configuration file is not present: %s", CONFIG_FILE.getAbsoluteFile());
return;
}

Map<String, String> properties = loadProperties(CONFIG_FILE);
String name = properties.remove(EXCHANGE_MANAGER_NAME_PROPERTY);
checkArgument(!isNullOrEmpty(name), "Exchange manager configuration %s does not contain %s", CONFIG_FILE, EXCHANGE_MANAGER_NAME_PROPERTY);

loadExchangeManager(name, properties);
}

public synchronized void loadExchangeManager(String name, Map<String, String> properties)
{
log.info("-- Loading exchange manager %s --", name);

checkState(exchangeManager == null, "exchangeManager is already loaded");

ExchangeManagerFactory factory = exchangeManagerFactories.get(name);
checkArgument(factory != null, "Exchange manager factory '%s' is not registered. Available factories: %s", name, exchangeManagerFactories.keySet());

ExchangeManager exchangeManager;
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(factory.getClass().getClassLoader())) {
exchangeManager = factory.create(properties);
}
handleResolver.setExchangeManagerHandleResolver(factory.getHandleResolver());

log.info("-- Loaded exchange manager %s --", name);

this.exchangeManager = exchangeManager;
}

public ExchangeManager getExchangeManager()
{
ExchangeManager exchangeManager = this.exchangeManager;
if (exchangeManager == null) {
throw new TrinoException(EXCHANGE_MANAGER_NOT_CONFIGURED, "Exchange manager is not configured");
}
return exchangeManager;
}

private static Map<String, String> loadProperties(File configFile)
{
try {
return new HashMap<>(loadPropertiesFrom(configFile.getPath()));
}
catch (IOException e) {
throw new UncheckedIOException("Failed to read configuration file: " + configFile, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.exchange.ExchangeSinkInstanceHandle;
import io.trino.spi.exchange.ExchangeSourceHandle;

public class HandleJsonModule
implements Module
Expand Down Expand Up @@ -89,4 +91,16 @@ public static com.fasterxml.jackson.databind.Module partitioningHandleModule(Han
{
return new AbstractTypedJacksonModule<>(ConnectorPartitioningHandle.class, resolver::getId, resolver::getPartitioningHandleClass) {};
}

@ProvidesIntoSet
public static com.fasterxml.jackson.databind.Module exchangeSinkInstanceHandleModule(HandleResolver resolver)
{
return new AbstractTypedJacksonModule<>(ExchangeSinkInstanceHandle.class, (clazz) -> clazz.getClass().getSimpleName(), (ignored) -> resolver.getExchangeSinkInstanceHandleClass()) {};
}

@ProvidesIntoSet
public static com.fasterxml.jackson.databind.Module exchangeSourceHandleModule(HandleResolver resolver)
{
return new AbstractTypedJacksonModule<>(ExchangeSourceHandle.class, (clazz) -> clazz.getClass().getSimpleName(), (ignored) -> resolver.getExchangeSourceHandleHandleClass()) {};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.exchange.ExchangeManagerHandleResolver;
import io.trino.spi.exchange.ExchangeSinkInstanceHandle;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.split.EmptySplitHandleResolver;

import javax.inject.Inject;
Expand All @@ -34,6 +37,7 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;

Expand All @@ -44,7 +48,8 @@

public final class HandleResolver
{
private final ConcurrentMap<String, MaterializedHandleResolver> handleResolvers = new ConcurrentHashMap<>();
private final ConcurrentMap<String, MaterializedHandleResolver> catalogHandleResolvers = new ConcurrentHashMap<>();
private final AtomicReference<ExchangeManagerHandleResolver> exchangeManagerHandleResolver = new AtomicReference<>();

@Inject
public HandleResolver()
Expand All @@ -59,13 +64,18 @@ public void addCatalogHandleResolver(String catalogName, ConnectorHandleResolver
{
requireNonNull(catalogName, "catalogName is null");
requireNonNull(resolver, "resolver is null");
MaterializedHandleResolver existingResolver = handleResolvers.putIfAbsent(catalogName, new MaterializedHandleResolver(resolver));
MaterializedHandleResolver existingResolver = catalogHandleResolvers.putIfAbsent(catalogName, new MaterializedHandleResolver(resolver));
checkState(existingResolver == null, "Catalog '%s' is already assigned to resolver: %s", catalogName, existingResolver);
}

public void setExchangeManagerHandleResolver(ExchangeManagerHandleResolver resolver)
{
checkState(exchangeManagerHandleResolver.compareAndSet(null, resolver), "Exchange manager handle resolver is already set");
}

public void removeCatalogHandleResolver(String catalogName)
{
handleResolvers.remove(catalogName);
catalogHandleResolvers.remove(catalogName);
}

public String getId(ConnectorTableHandle tableHandle)
Expand Down Expand Up @@ -158,16 +168,30 @@ public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass(Str
return resolverFor(id).getTransactionHandleClass().orElseThrow(() -> new IllegalArgumentException("No resolver for " + id));
}

public Class<? extends ExchangeSinkInstanceHandle> getExchangeSinkInstanceHandleClass()
{
ExchangeManagerHandleResolver resolver = exchangeManagerHandleResolver.get();
checkState(resolver != null, "Exchange manager handle resolver is not set");
return resolver.getExchangeSinkInstanceHandleClass();
}

public Class<? extends ExchangeSourceHandle> getExchangeSourceHandleHandleClass()
{
ExchangeManagerHandleResolver resolver = exchangeManagerHandleResolver.get();
checkState(resolver != null, "Exchange manager handle resolver is not set");
return resolver.getExchangeSourceHandleHandleClass();
}

private MaterializedHandleResolver resolverFor(String id)
{
MaterializedHandleResolver resolver = handleResolvers.get(id);
MaterializedHandleResolver resolver = catalogHandleResolvers.get(id);
checkArgument(resolver != null, "No handle resolver for connector: %s", id);
return resolver;
}

private <T> String getId(T handle, Function<MaterializedHandleResolver, Optional<Class<? extends T>>> getter)
{
for (Entry<String, MaterializedHandleResolver> entry : handleResolvers.entrySet()) {
for (Entry<String, MaterializedHandleResolver> entry : catalogHandleResolvers.entrySet()) {
try {
if (getter.apply(entry.getValue()).map(clazz -> clazz.isInstance(handle)).orElse(false)) {
return entry.getKey();
Expand Down
12 changes: 11 additions & 1 deletion core/trino-main/src/main/java/io/trino/server/PluginManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airlift.log.Logger;
import io.trino.connector.ConnectorManager;
import io.trino.eventlistener.EventListenerManager;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.resourcegroups.ResourceGroupManager;
import io.trino.metadata.BlockEncodingManager;
import io.trino.metadata.MetadataManager;
Expand All @@ -31,6 +32,7 @@
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.connector.ConnectorFactory;
import io.trino.spi.eventlistener.EventListenerFactory;
import io.trino.spi.exchange.ExchangeManagerFactory;
import io.trino.spi.resourcegroups.ResourceGroupConfigurationManagerFactory;
import io.trino.spi.security.CertificateAuthenticatorFactory;
import io.trino.spi.security.GroupProviderFactory;
Expand Down Expand Up @@ -78,6 +80,7 @@ public class PluginManager
private final Optional<HeaderAuthenticatorManager> headerAuthenticatorManager;
private final EventListenerManager eventListenerManager;
private final GroupProviderManager groupProviderManager;
private final ExchangeManagerRegistry exchangeManagerRegistry;
private final SessionPropertyDefaults sessionPropertyDefaults;
private final TypeRegistry typeRegistry;
private final BlockEncodingManager blockEncodingManager;
Expand All @@ -98,7 +101,8 @@ public PluginManager(
GroupProviderManager groupProviderManager,
SessionPropertyDefaults sessionPropertyDefaults,
TypeRegistry typeRegistry,
BlockEncodingManager blockEncodingManager)
BlockEncodingManager blockEncodingManager,
ExchangeManagerRegistry exchangeManagerRegistry)
{
this.pluginsProvider = requireNonNull(pluginsProvider, "pluginsProvider is null");
this.connectorManager = requireNonNull(connectorManager, "connectorManager is null");
Expand All @@ -113,6 +117,7 @@ public PluginManager(
this.sessionPropertyDefaults = requireNonNull(sessionPropertyDefaults, "sessionPropertyDefaults is null");
this.typeRegistry = requireNonNull(typeRegistry, "typeRegistry is null");
this.blockEncodingManager = requireNonNull(blockEncodingManager, "blockEncodingManager is null");
this.exchangeManagerRegistry = requireNonNull(exchangeManagerRegistry, "exchangeManagerRegistry is null");
}

public void loadPlugins()
Expand Down Expand Up @@ -234,6 +239,11 @@ private void installPluginInternal(Plugin plugin, Supplier<ClassLoader> duplicat
log.info("Registering group provider %s", groupProviderFactory.getName());
groupProviderManager.addGroupProviderFactory(groupProviderFactory);
}

for (ExchangeManagerFactory exchangeManagerFactory : plugin.getExchangeManagerFactories()) {
log.info("Registering exchange manager %s", exchangeManagerFactory.getName());
exchangeManagerRegistry.addExchangeManagerFactory(exchangeManagerFactory);
}
}

public static PluginClassLoader createClassLoader(List<URL> urls)
Expand Down
4 changes: 4 additions & 0 deletions core/trino-main/src/main/java/io/trino/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import io.trino.client.NodeVersion;
import io.trino.eventlistener.EventListenerManager;
import io.trino.eventlistener.EventListenerModule;
import io.trino.exchange.ExchangeManagerModule;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.resourcegroups.ResourceGroupManager;
import io.trino.execution.warnings.WarningCollectorModule;
import io.trino.metadata.Catalog;
Expand Down Expand Up @@ -104,6 +106,7 @@ private void doStart(String trinoVersion)
new ServerSecurityModule(),
new AccessControlModule(),
new EventListenerModule(),
new ExchangeManagerModule(),
new CoordinatorDiscoveryModule(),
new ServerMainModule(trinoVersion),
new GracefulShutdownModule(),
Expand Down Expand Up @@ -134,6 +137,7 @@ private void doStart(String trinoVersion)
.ifPresent(PasswordAuthenticatorManager::loadPasswordAuthenticator);
injector.getInstance(EventListenerManager.class).loadEventListeners();
injector.getInstance(GroupProviderManager.class).loadConfiguredGroupProvider();
injector.getInstance(ExchangeManagerRegistry.class).loadExchangeManager();
injector.getInstance(CertificateAuthenticatorManager.class).loadCertificateAuthenticator();
injector.getInstance(optionalKey(HeaderAuthenticatorManager.class))
.ifPresent(HeaderAuthenticatorManager::loadHeaderAuthenticator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.trino.dispatcher.DispatchManager;
import io.trino.eventlistener.EventListenerConfig;
import io.trino.eventlistener.EventListenerManager;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.FailureInjector;
import io.trino.execution.FailureInjector.InjectedFailureType;
import io.trino.execution.QueryInfo;
Expand Down Expand Up @@ -264,6 +265,7 @@ private TestingTrinoServer(
binder.bind(ShutdownAction.class).to(TestShutdownAction.class).in(Scopes.SINGLETON);
binder.bind(GracefulShutdownHandler.class).in(Scopes.SINGLETON);
binder.bind(ProcedureTester.class).in(Scopes.SINGLETON);
binder.bind(ExchangeManagerRegistry.class).in(Scopes.SINGLETON);
});

if (discoveryUri.isPresent()) {
Expand Down
Loading

0 comments on commit b33f9bc

Please sign in to comment.