Skip to content

Commit

Permalink
Change optional RequestHandler2 to set for Glue
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jul 31, 2023
1 parent c1f5f09 commit f09ccea
Show file tree
Hide file tree
Showing 10 changed files with 38 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.glue.AWSGlueAsync;
import com.amazonaws.services.glue.model.ConcurrentModificationException;
import com.google.common.collect.ImmutableSet;
import io.trino.Session;
import io.trino.plugin.deltalake.TestingDeltaLakePlugin;
import io.trino.plugin.deltalake.metastore.TestingDeltaLakeMetastoreModule;
Expand Down Expand Up @@ -74,7 +75,7 @@ protected QueryRunner createQueryRunner()
GlueHiveMetastoreConfig glueConfig = new GlueHiveMetastoreConfig()
.setDefaultWarehouseDir(dataDirectory.toUri().toString());

AWSGlueAsync glueClient = createAsyncGlueClient(glueConfig, DefaultAWSCredentialsProviderChain.getInstance(), Optional.empty(), stats.newRequestMetricsCollector());
AWSGlueAsync glueClient = createAsyncGlueClient(glueConfig, DefaultAWSCredentialsProviderChain.getInstance(), ImmutableSet.of(), stats.newRequestMetricsCollector());
AWSGlueAsync proxiedGlueClient = newProxy(AWSGlueAsync.class, (proxy, method, args) -> {
Object result;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
import com.amazonaws.metrics.RequestMetricCollector;
import com.amazonaws.services.glue.AWSGlueAsync;
import com.amazonaws.services.glue.AWSGlueAsyncClientBuilder;
import com.google.common.collect.ImmutableList;

import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.hdfs.s3.AwsCurrentRegionHolder.getCurrentRegionFromEC2Metadata;
Expand All @@ -34,7 +33,7 @@ private GlueClientUtil() {}
public static AWSGlueAsync createAsyncGlueClient(
GlueHiveMetastoreConfig config,
AWSCredentialsProvider credentialsProvider,
Optional<RequestHandler2> requestHandler,
Set<RequestHandler2> requestHandlers,
RequestMetricCollector metricsCollector)
{
ClientConfiguration clientConfig = new ClientConfiguration()
Expand All @@ -44,10 +43,7 @@ public static AWSGlueAsync createAsyncGlueClient(
.withMetricsCollector(metricsCollector)
.withClientConfiguration(clientConfig);

ImmutableList.Builder<RequestHandler2> requestHandlers = ImmutableList.builder();
requestHandler.ifPresent(requestHandlers::add);
config.getCatalogId().ifPresent(catalogId -> requestHandlers.add(new GlueCatalogIdRequestHandler(catalogId)));
asyncGlueClientBuilder.setRequestHandlers(requestHandlers.build().toArray(RequestHandler2[]::new));
asyncGlueClientBuilder.setRequestHandlers(requestHandlers.toArray(RequestHandler2[]::new));

if (config.getGlueEndpointUrl().isPresent()) {
checkArgument(config.getGlueRegion().isPresent(), "Glue region must be set when Glue endpoint URL is set");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public static GlueHiveMetastore createTestingGlueHiveMetastore(java.nio.file.Pat
glueConfig,
directExecutor(),
new DefaultGlueColumnStatisticsProviderFactory(directExecutor(), directExecutor()),
createAsyncGlueClient(glueConfig, DefaultAWSCredentialsProviderChain.getInstance(), Optional.empty(), stats.newRequestMetricsCollector()),
createAsyncGlueClient(glueConfig, DefaultAWSCredentialsProviderChain.getInstance(), ImmutableSet.of(), stats.newRequestMetricsCollector()),
stats,
table -> true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.Multibinder;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.hive.AllowHiveTableRename;
Expand All @@ -35,6 +36,7 @@
import java.util.function.Predicate;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
Expand All @@ -49,8 +51,9 @@ public class GlueMetastoreModule
protected void setup(Binder binder)
{
GlueHiveMetastoreConfig glueConfig = buildConfigObject(GlueHiveMetastoreConfig.class);
glueConfig.getGlueProxyApiId().ifPresent(glueProxyApiId -> binder
.bind(Key.get(RequestHandler2.class, ForGlueHiveMetastore.class))
Multibinder<RequestHandler2> requestHandlers = newSetBinder(binder, RequestHandler2.class, ForGlueHiveMetastore.class);
glueConfig.getCatalogId().ifPresent(catalogId -> requestHandlers.addBinding().toInstance(new GlueCatalogIdRequestHandler(catalogId)));
glueConfig.getGlueProxyApiId().ifPresent(glueProxyApiId -> requestHandlers.addBinding()
.toInstance(new ProxyApiRequestHandler(glueProxyApiId)));
configBinder(binder).bindConfig(HiveConfig.class);
binder.bind(AWSCredentialsProvider.class).toProvider(GlueCredentialsProvider.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.handlers.RequestHandler2;
import com.amazonaws.services.glue.AWSGlueAsync;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import com.google.inject.Provider;

import java.util.Optional;
import java.util.Set;

import static io.trino.plugin.hive.metastore.glue.GlueClientUtil.createAsyncGlueClient;
import static java.util.Objects.requireNonNull;
Expand All @@ -30,24 +31,24 @@ public class HiveGlueClientProvider
private final GlueMetastoreStats stats;
private final AWSCredentialsProvider credentialsProvider;
private final GlueHiveMetastoreConfig glueConfig; // TODO do not keep mutable config instance on a field
private final Optional<RequestHandler2> requestHandler;
private final Set<RequestHandler2> requestHandlers;

@Inject
public HiveGlueClientProvider(
@ForGlueHiveMetastore GlueMetastoreStats stats,
AWSCredentialsProvider credentialsProvider,
@ForGlueHiveMetastore Optional<RequestHandler2> requestHandler,
@ForGlueHiveMetastore Set<RequestHandler2> requestHandlers,
GlueHiveMetastoreConfig glueConfig)
{
this.stats = requireNonNull(stats, "stats is null");
this.credentialsProvider = requireNonNull(credentialsProvider, "credentialsProvider is null");
this.requestHandler = requireNonNull(requestHandler, "requestHandler is null");
this.requestHandlers = ImmutableSet.copyOf(requireNonNull(requestHandlers, "requestHandlers is null"));
this.glueConfig = glueConfig;
}

@Override
public AWSGlueAsync get()
{
return createAsyncGlueClient(glueConfig, credentialsProvider, requestHandler, stats.newRequestMetricsCollector());
return createAsyncGlueClient(glueConfig, credentialsProvider, requestHandlers, stats.newRequestMetricsCollector());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.amazonaws.services.glue.model.UpdateTableRequest;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
Expand Down Expand Up @@ -237,7 +238,7 @@ protected HiveMetastore createMetastore(File tempDir)
glueConfig,
executor,
new DefaultGlueColumnStatisticsProviderFactory(executor, executor),
createAsyncGlueClient(glueConfig, DefaultAWSCredentialsProviderChain.getInstance(), Optional.empty(), stats.newRequestMetricsCollector()),
createAsyncGlueClient(glueConfig, DefaultAWSCredentialsProviderChain.getInstance(), ImmutableSet.of(), stats.newRequestMetricsCollector()),
stats,
new DefaultGlueMetastoreTableFilterProvider(true).get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import com.amazonaws.services.glue.model.Table;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.Multibinder;
import io.airlift.configuration.AbstractConfigurationAwareModule;
Expand All @@ -39,6 +37,7 @@

import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

Expand All @@ -57,6 +56,12 @@ protected void setup(Binder binder)
binder.bind(TrinoCatalogFactory.class).to(TrinoGlueCatalogFactory.class).in(Scopes.SINGLETON);
newExporter(binder).export(TrinoCatalogFactory.class).withGeneratedName();

Multibinder<RequestHandler2> requestHandlers = newSetBinder(binder, RequestHandler2.class, ForGlueHiveMetastore.class);
install(conditionalModule(
IcebergGlueCatalogConfig.class,
IcebergGlueCatalogConfig::isSkipArchive,
config -> requestHandlers.addBinding().toInstance(new SkipArchiveRequestHandler())));

// Required to inject HiveMetastoreFactory for migrate procedure
binder.bind(Key.get(boolean.class, HideDeltaLakeTables.class)).toInstance(false);
newOptionalBinder(binder, Key.get(new TypeLiteral<Predicate<Table>>() {}, ForGlueHiveMetastore.class))
Expand All @@ -65,12 +70,4 @@ protected void setup(Binder binder)
Multibinder<Procedure> procedures = newSetBinder(binder, Procedure.class);
procedures.addBinding().toProvider(MigrateProcedure.class).in(Scopes.SINGLETON);
}

@Provides
@Singleton
@ForGlueHiveMetastore
public static RequestHandler2 createRequestHandler(IcebergGlueCatalogConfig config)
{
return new SkipArchiveRequestHandler(config.isSkipArchive());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,11 @@
public class SkipArchiveRequestHandler
extends RequestHandler2
{
private final boolean skipArchive;

public SkipArchiveRequestHandler(boolean skipArchive)
{
this.skipArchive = skipArchive;
}

@Override
public AmazonWebServiceRequest beforeExecution(AmazonWebServiceRequest request)
{
if (request instanceof UpdateTableRequest updateTableRequest) {
return updateTableRequest.withSkipArchive(skipArchive);
return updateTableRequest.withSkipArchive(true);
}
if (request instanceof CreateDatabaseRequest ||
request instanceof DeleteDatabaseRequest ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.glue.AWSGlueAsync;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.hive.metastore.glue.GlueHiveMetastoreConfig;
Expand Down Expand Up @@ -51,7 +52,7 @@ public TestingGlueIcebergTableOperationsProvider(
requireNonNull(credentialsProvider, "credentialsProvider is null");
requireNonNull(awsGlueAsyncAdapterProvider, "awsGlueAsyncAdapterProvider is null");
this.glueClient = awsGlueAsyncAdapterProvider.createAWSGlueAsyncAdapter(
createAsyncGlueClient(glueConfig, credentialsProvider, Optional.empty(), stats.newRequestMetricsCollector()));
createAsyncGlueClient(glueConfig, credentialsProvider, ImmutableSet.of(), stats.newRequestMetricsCollector()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
import com.amazonaws.services.glue.model.Table;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.Multibinder;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.hive.HideDeltaLakeTables;
import io.trino.plugin.hive.metastore.glue.ForGlueHiveMetastore;
Expand All @@ -34,7 +33,9 @@

import java.util.function.Predicate;

import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static java.util.Objects.requireNonNull;
import static org.weakref.jmx.guice.ExportBinder.newExporter;
Expand Down Expand Up @@ -62,18 +63,16 @@ protected void setup(Binder binder)
newExporter(binder).export(TrinoCatalogFactory.class).withGeneratedName();
binder.bind(AWSGlueAsyncAdapterProvider.class).toInstance(awsGlueAsyncAdapterProvider);

Multibinder<RequestHandler2> requestHandlers = newSetBinder(binder, RequestHandler2.class);
install(conditionalModule(
IcebergGlueCatalogConfig.class,
IcebergGlueCatalogConfig::isSkipArchive,
config -> requestHandlers.addBinding().toInstance(new SkipArchiveRequestHandler())));

// Required to inject HiveMetastoreFactory for migrate procedure
binder.bind(Key.get(boolean.class, HideDeltaLakeTables.class)).toInstance(false);
newOptionalBinder(binder, Key.get(new TypeLiteral<Predicate<Table>>() {}, ForGlueHiveMetastore.class))
.setBinding().toInstance(table -> true);
install(new GlueMetastoreModule());
}

@Provides
@Singleton
@ForGlueHiveMetastore
public static RequestHandler2 createRequestHandler(IcebergGlueCatalogConfig config)
{
return new SkipArchiveRequestHandler(config.isSkipArchive());
}
}

0 comments on commit f09ccea

Please sign in to comment.