Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for OpenTelemetry in Glue metastore #18458

Merged
merged 2 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.glue.AWSGlueAsync;
import com.amazonaws.services.glue.model.ConcurrentModificationException;
import com.google.common.collect.ImmutableSet;
import io.trino.Session;
import io.trino.plugin.deltalake.TestingDeltaLakePlugin;
import io.trino.plugin.deltalake.metastore.TestingDeltaLakeMetastoreModule;
Expand Down Expand Up @@ -74,7 +75,7 @@ protected QueryRunner createQueryRunner()
GlueHiveMetastoreConfig glueConfig = new GlueHiveMetastoreConfig()
.setDefaultWarehouseDir(dataDirectory.toUri().toString());

AWSGlueAsync glueClient = createAsyncGlueClient(glueConfig, DefaultAWSCredentialsProviderChain.getInstance(), Optional.empty(), stats.newRequestMetricsCollector());
AWSGlueAsync glueClient = createAsyncGlueClient(glueConfig, DefaultAWSCredentialsProviderChain.getInstance(), ImmutableSet.of(), stats.newRequestMetricsCollector());
AWSGlueAsync proxiedGlueClient = newProxy(AWSGlueAsync.class, (proxy, method, args) -> {
Object result;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.airlift.bootstrap.Bootstrap;
import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.json.JsonModule;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.trino.filesystem.manager.FileSystemModule;
import io.trino.hdfs.HdfsEnvironment;
Expand Down Expand Up @@ -118,6 +119,7 @@ public void setUp()
binder.bind(NodeManager.class).toInstance(context.getNodeManager());
binder.bind(PageIndexerFactory.class).toInstance(context.getPageIndexerFactory());
binder.bind(NodeVersion.class).toInstance(new NodeVersion("test_version"));
binder.bind(OpenTelemetry.class).toInstance(context.getOpenTelemetry());
binder.bind(Tracer.class).toInstance(context.getTracer());
},
// connector modules
Expand Down
5 changes: 5 additions & 0 deletions plugin/trino-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@
<artifactId>opentelemetry-api</artifactId>
</dependency>

<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-aws-sdk-1.11</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-cache</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
import com.amazonaws.metrics.RequestMetricCollector;
import com.amazonaws.services.glue.AWSGlueAsync;
import com.amazonaws.services.glue.AWSGlueAsyncClientBuilder;
import com.google.common.collect.ImmutableList;

import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.hdfs.s3.AwsCurrentRegionHolder.getCurrentRegionFromEC2Metadata;
Expand All @@ -34,7 +33,7 @@ private GlueClientUtil() {}
public static AWSGlueAsync createAsyncGlueClient(
GlueHiveMetastoreConfig config,
AWSCredentialsProvider credentialsProvider,
Optional<RequestHandler2> requestHandler,
Set<RequestHandler2> requestHandlers,
RequestMetricCollector metricsCollector)
{
ClientConfiguration clientConfig = new ClientConfiguration()
Expand All @@ -44,10 +43,7 @@ public static AWSGlueAsync createAsyncGlueClient(
.withMetricsCollector(metricsCollector)
.withClientConfiguration(clientConfig);

ImmutableList.Builder<RequestHandler2> requestHandlers = ImmutableList.builder();
requestHandler.ifPresent(requestHandlers::add);
config.getCatalogId().ifPresent(catalogId -> requestHandlers.add(new GlueCatalogIdRequestHandler(catalogId)));
asyncGlueClientBuilder.setRequestHandlers(requestHandlers.build().toArray(RequestHandler2[]::new));
asyncGlueClientBuilder.setRequestHandlers(requestHandlers.toArray(RequestHandler2[]::new));

if (config.getGlueEndpointUrl().isPresent()) {
checkArgument(config.getGlueRegion().isPresent(), "Glue region must be set when Glue endpoint URL is set");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public static GlueHiveMetastore createTestingGlueHiveMetastore(java.nio.file.Pat
glueConfig,
directExecutor(),
new DefaultGlueColumnStatisticsProviderFactory(directExecutor(), directExecutor()),
createAsyncGlueClient(glueConfig, DefaultAWSCredentialsProviderChain.getInstance(), Optional.empty(), stats.newRequestMetricsCollector()),
createAsyncGlueClient(glueConfig, DefaultAWSCredentialsProviderChain.getInstance(), ImmutableSet.of(), stats.newRequestMetricsCollector()),
stats,
table -> true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.Multibinder;
import com.google.inject.multibindings.ProvidesIntoSet;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.awssdk.v1_11.AwsSdkTelemetry;
import io.trino.plugin.hive.AllowHiveTableRename;
import io.trino.plugin.hive.HiveConfig;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
Expand All @@ -35,6 +39,7 @@
import java.util.function.Predicate;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
Expand All @@ -49,8 +54,9 @@ public class GlueMetastoreModule
protected void setup(Binder binder)
{
GlueHiveMetastoreConfig glueConfig = buildConfigObject(GlueHiveMetastoreConfig.class);
glueConfig.getGlueProxyApiId().ifPresent(glueProxyApiId -> binder
.bind(Key.get(RequestHandler2.class, ForGlueHiveMetastore.class))
Multibinder<RequestHandler2> requestHandlers = newSetBinder(binder, RequestHandler2.class, ForGlueHiveMetastore.class);
glueConfig.getCatalogId().ifPresent(catalogId -> requestHandlers.addBinding().toInstance(new GlueCatalogIdRequestHandler(catalogId)));
glueConfig.getGlueProxyApiId().ifPresent(glueProxyApiId -> requestHandlers.addBinding()
.toInstance(new ProxyApiRequestHandler(glueProxyApiId)));
configBinder(binder).bindConfig(HiveConfig.class);
binder.bind(AWSCredentialsProvider.class).toProvider(GlueCredentialsProvider.class).in(Scopes.SINGLETON);
Expand Down Expand Up @@ -88,6 +94,17 @@ private Module getGlueStatisticsModule(Class<? extends GlueColumnStatisticsProvi
.in(Scopes.SINGLETON);
}

@ProvidesIntoSet
@Singleton
@ForGlueHiveMetastore
public RequestHandler2 createRequestHandler(OpenTelemetry openTelemetry)
{
return AwsSdkTelemetry.builder(openTelemetry)
.setCaptureExperimentalSpanAttributes(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

Can we add similar for Iceberg Glue catalog?

Copy link
Member Author

@ebyhr ebyhr Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR added support for Iceberg Glue catalog as well. The above screenshot is captured with Iceberg connector. GlueMetastoreModule is installed in IcebergGlueCatalogModule.

.build()
.newRequestHandler();
}

@Provides
@Singleton
@ForGlueHiveMetastore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.handlers.RequestHandler2;
import com.amazonaws.services.glue.AWSGlueAsync;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import com.google.inject.Provider;

import java.util.Optional;
import java.util.Set;

import static io.trino.plugin.hive.metastore.glue.GlueClientUtil.createAsyncGlueClient;
import static java.util.Objects.requireNonNull;
Expand All @@ -30,24 +31,24 @@ public class HiveGlueClientProvider
private final GlueMetastoreStats stats;
private final AWSCredentialsProvider credentialsProvider;
private final GlueHiveMetastoreConfig glueConfig; // TODO do not keep mutable config instance on a field
private final Optional<RequestHandler2> requestHandler;
private final Set<RequestHandler2> requestHandlers;

@Inject
public HiveGlueClientProvider(
@ForGlueHiveMetastore GlueMetastoreStats stats,
AWSCredentialsProvider credentialsProvider,
@ForGlueHiveMetastore Optional<RequestHandler2> requestHandler,
@ForGlueHiveMetastore Set<RequestHandler2> requestHandlers,
GlueHiveMetastoreConfig glueConfig)
{
this.stats = requireNonNull(stats, "stats is null");
this.credentialsProvider = requireNonNull(credentialsProvider, "credentialsProvider is null");
this.requestHandler = requireNonNull(requestHandler, "requestHandler is null");
this.requestHandlers = ImmutableSet.copyOf(requireNonNull(requestHandlers, "requestHandlers is null"));
this.glueConfig = glueConfig;
}

@Override
public AWSGlueAsync get()
{
return createAsyncGlueClient(glueConfig, credentialsProvider, requestHandler, stats.newRequestMetricsCollector());
return createAsyncGlueClient(glueConfig, credentialsProvider, requestHandlers, stats.newRequestMetricsCollector());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.amazonaws.services.glue.model.UpdateTableRequest;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
Expand Down Expand Up @@ -237,7 +238,7 @@ protected HiveMetastore createMetastore(File tempDir)
glueConfig,
executor,
new DefaultGlueColumnStatisticsProviderFactory(executor, executor),
createAsyncGlueClient(glueConfig, DefaultAWSCredentialsProviderChain.getInstance(), Optional.empty(), stats.newRequestMetricsCollector()),
createAsyncGlueClient(glueConfig, DefaultAWSCredentialsProviderChain.getInstance(), ImmutableSet.of(), stats.newRequestMetricsCollector()),
stats,
new DefaultGlueMetastoreTableFilterProvider(true).get());
}
Expand Down
12 changes: 12 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,12 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-aws-sdk-1.11</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hadoop-toolkit</artifactId>
Expand Down Expand Up @@ -400,6 +406,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import com.amazonaws.services.glue.model.Table;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.Multibinder;
import io.airlift.configuration.AbstractConfigurationAwareModule;
Expand All @@ -39,6 +37,7 @@

import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

Expand All @@ -57,6 +56,12 @@ protected void setup(Binder binder)
binder.bind(TrinoCatalogFactory.class).to(TrinoGlueCatalogFactory.class).in(Scopes.SINGLETON);
newExporter(binder).export(TrinoCatalogFactory.class).withGeneratedName();

Multibinder<RequestHandler2> requestHandlers = newSetBinder(binder, RequestHandler2.class, ForGlueHiveMetastore.class);
install(conditionalModule(
IcebergGlueCatalogConfig.class,
IcebergGlueCatalogConfig::isSkipArchive,
config -> requestHandlers.addBinding().toInstance(new SkipArchiveRequestHandler())));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// Required to inject HiveMetastoreFactory for migrate procedure
binder.bind(Key.get(boolean.class, HideDeltaLakeTables.class)).toInstance(false);
newOptionalBinder(binder, Key.get(new TypeLiteral<Predicate<Table>>() {}, ForGlueHiveMetastore.class))
Expand All @@ -65,12 +70,4 @@ protected void setup(Binder binder)
Multibinder<Procedure> procedures = newSetBinder(binder, Procedure.class);
procedures.addBinding().toProvider(MigrateProcedure.class).in(Scopes.SINGLETON);
}

@Provides
@Singleton
@ForGlueHiveMetastore
public static RequestHandler2 createRequestHandler(IcebergGlueCatalogConfig config)
{
return new SkipArchiveRequestHandler(config.isSkipArchive());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,11 @@
public class SkipArchiveRequestHandler
extends RequestHandler2
{
private final boolean skipArchive;

public SkipArchiveRequestHandler(boolean skipArchive)
{
this.skipArchive = skipArchive;
}

@Override
public AmazonWebServiceRequest beforeExecution(AmazonWebServiceRequest request)
{
if (request instanceof UpdateTableRequest updateTableRequest) {
return updateTableRequest.withSkipArchive(skipArchive);
return updateTableRequest.withSkipArchive(true);
}
if (request instanceof CreateDatabaseRequest ||
request instanceof DeleteDatabaseRequest ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.glue.AWSGlueAsync;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.hive.metastore.glue.GlueHiveMetastoreConfig;
Expand Down Expand Up @@ -51,7 +52,7 @@ public TestingGlueIcebergTableOperationsProvider(
requireNonNull(credentialsProvider, "credentialsProvider is null");
requireNonNull(awsGlueAsyncAdapterProvider, "awsGlueAsyncAdapterProvider is null");
this.glueClient = awsGlueAsyncAdapterProvider.createAWSGlueAsyncAdapter(
createAsyncGlueClient(glueConfig, credentialsProvider, Optional.empty(), stats.newRequestMetricsCollector()));
createAsyncGlueClient(glueConfig, credentialsProvider, ImmutableSet.of(), stats.newRequestMetricsCollector()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
import com.amazonaws.services.glue.model.Table;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.Multibinder;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.hive.HideDeltaLakeTables;
import io.trino.plugin.hive.metastore.glue.ForGlueHiveMetastore;
Expand All @@ -34,7 +33,9 @@

import java.util.function.Predicate;

import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static java.util.Objects.requireNonNull;
import static org.weakref.jmx.guice.ExportBinder.newExporter;
Expand Down Expand Up @@ -62,18 +63,16 @@ protected void setup(Binder binder)
newExporter(binder).export(TrinoCatalogFactory.class).withGeneratedName();
binder.bind(AWSGlueAsyncAdapterProvider.class).toInstance(awsGlueAsyncAdapterProvider);

Multibinder<RequestHandler2> requestHandlers = newSetBinder(binder, RequestHandler2.class);
install(conditionalModule(
IcebergGlueCatalogConfig.class,
IcebergGlueCatalogConfig::isSkipArchive,
config -> requestHandlers.addBinding().toInstance(new SkipArchiveRequestHandler())));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • config is of type Binder. rename the parameter
  • i don't think one module should add to set binder created by another module; is it guaranteed to work?
    • repeat newSetBinder(binder, RequestHandler2.class) inside the conditional module

Copy link
Member Author

@ebyhr ebyhr Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sent follow-up #18465

Multibinder javadoc mentions:

Contributing multibindings from different modules is supported. For example, it is okay for both CandyModule and ChipsModule to create their own Multibinder, and to each contribute bindings to the set of snacks. When that set is injected, it will contain elements from both modules.
https://google.github.io/guice/api-docs/7.0.0/javadoc/com/google/inject/multibindings/Multibinder.html

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The quoted text says that 2+ modules can call newSetBinder(binder, X.class) and this will end-up being one Set<X>.

it does not seem to say, however, that newSetBinder(binder, X.class) result (the Multibinder object) can be shared between modules


// Required to inject HiveMetastoreFactory for migrate procedure
binder.bind(Key.get(boolean.class, HideDeltaLakeTables.class)).toInstance(false);
newOptionalBinder(binder, Key.get(new TypeLiteral<Predicate<Table>>() {}, ForGlueHiveMetastore.class))
.setBinding().toInstance(table -> true);
install(new GlueMetastoreModule());
}

@Provides
@Singleton
@ForGlueHiveMetastore
public static RequestHandler2 createRequestHandler(IcebergGlueCatalogConfig config)
{
return new SkipArchiveRequestHandler(config.isSkipArchive());
}
}