diff --git a/plugin/trino-hive/pom.xml b/plugin/trino-hive/pom.xml
index bd19a453a590..cea46290c3d1 100644
--- a/plugin/trino-hive/pom.xml
+++ b/plugin/trino-hive/pom.xml
@@ -308,6 +308,12 @@
runtime
+
+ io.opentelemetry
+ opentelemetry-context
+ runtime
+
+
io.trino
trino-hadoop-toolkit
@@ -348,6 +354,24 @@
+
+ io.opentelemetry
+ opentelemetry-sdk
+ test
+
+
+
+ io.opentelemetry
+ opentelemetry-sdk-testing
+ test
+
+
+
+ io.opentelemetry
+ opentelemetry-sdk-trace
+ test
+
+
io.trino
@@ -368,6 +392,13 @@
test
+
+ io.trino
+ trino-filesystem
+ test-jar
+ test
+
+
io.trino
trino-main
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java
index 90d1043551d9..7d231385477b 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java
@@ -87,7 +87,7 @@ private InternalHiveConnectorFactory() {}
public static Connector createConnector(String catalogName, Map config, ConnectorContext context, Module module)
{
- return createConnector(catalogName, config, context, module, Optional.empty(), Optional.empty(), Optional.empty());
+ return createConnector(catalogName, config, context, module, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
}
public static Connector createConnector(
@@ -97,6 +97,7 @@ public static Connector createConnector(
Module module,
Optional metastore,
Optional fileSystemFactory,
+ Optional openTelemetry,
Optional directoryLister)
{
requireNonNull(config, "config is null");
@@ -127,7 +128,7 @@ public static Connector createConnector(
new HiveProcedureModule(),
new MBeanServerModule(),
binder -> {
- binder.bind(OpenTelemetry.class).toInstance(context.getOpenTelemetry());
+ binder.bind(OpenTelemetry.class).toInstance(openTelemetry.orElse(context.getOpenTelemetry()));
binder.bind(Tracer.class).toInstance(context.getTracer());
binder.bind(NodeVersion.class).toInstance(new NodeVersion(context.getNodeManager().getCurrentNode().getVersion()));
binder.bind(NodeManager.class).toInstance(context.getNodeManager());
diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java
index a493d304a220..19fa8bb0cd6e 100644
--- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java
+++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java
@@ -19,6 +19,7 @@
import com.google.inject.Module;
import io.airlift.log.Logger;
import io.airlift.log.Logging;
+import io.opentelemetry.api.OpenTelemetry;
import io.trino.Session;
import io.trino.metadata.QualifiedObjectName;
import io.trino.plugin.hive.fs.DirectoryLister;
@@ -105,6 +106,7 @@ public static class Builder>
File baseDir = queryRunner.getCoordinator().getBaseDataDir().resolve("hive_data").toFile();
return createTestingFileHiveMetastore(baseDir);
};
+ private Optional openTelemetry = Optional.empty();
private Module module = EMPTY_MODULE;
private Optional directoryLister = Optional.empty();
private boolean tpcdsCatalogEnabled;
@@ -173,6 +175,13 @@ public SELF setMetastore(Function metasto
return self();
}
+ @CanIgnoreReturnValue
+ public SELF setOpenTelemetry(OpenTelemetry openTelemetry)
+ {
+ this.openTelemetry = Optional.of(openTelemetry);
+ return self();
+ }
+
@CanIgnoreReturnValue
public SELF setModule(Module module)
{
@@ -244,7 +253,7 @@ public DistributedQueryRunner build()
}
HiveMetastore metastore = this.metastore.apply(queryRunner);
- queryRunner.installPlugin(new TestingHivePlugin(Optional.of(metastore), module, directoryLister));
+ queryRunner.installPlugin(new TestingHivePlugin(Optional.of(metastore), openTelemetry, module, directoryLister));
Map hiveProperties = new HashMap<>();
if (!skipTimezoneSetup) {
diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java
index f353248f6df1..086472941d78 100644
--- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java
+++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java
@@ -14,6 +14,7 @@
package io.trino.plugin.hive;
import com.google.inject.Module;
+import io.opentelemetry.api.OpenTelemetry;
import io.trino.plugin.hive.fs.DirectoryLister;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.spi.connector.Connector;
@@ -31,17 +32,23 @@ public class TestingHiveConnectorFactory
implements ConnectorFactory
{
private final Optional metastore;
+ private final Optional openTelemetry;
private final Module module;
private final Optional directoryLister;
public TestingHiveConnectorFactory(HiveMetastore metastore)
{
- this(Optional.of(metastore), EMPTY_MODULE, Optional.empty());
+ this(Optional.of(metastore), Optional.empty(), EMPTY_MODULE, Optional.empty());
}
- public TestingHiveConnectorFactory(Optional metastore, Module module, Optional directoryLister)
+ public TestingHiveConnectorFactory(
+ Optional metastore,
+ Optional openTelemetry,
+ Module module,
+ Optional directoryLister)
{
this.metastore = requireNonNull(metastore, "metastore is null");
+ this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
this.module = requireNonNull(module, "module is null");
this.directoryLister = requireNonNull(directoryLister, "directoryLister is null");
}
@@ -55,6 +62,6 @@ public String getName()
@Override
public Connector create(String catalogName, Map config, ConnectorContext context)
{
- return createConnector(catalogName, config, context, module, metastore, Optional.empty(), directoryLister);
+ return createConnector(catalogName, config, context, module, metastore, Optional.empty(), openTelemetry, directoryLister);
}
}
diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java
index adc6ad833f5e..13975b1995b3 100644
--- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java
+++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java
@@ -15,6 +15,7 @@
import com.google.common.collect.ImmutableList;
import com.google.inject.Module;
+import io.opentelemetry.api.OpenTelemetry;
import io.trino.plugin.hive.fs.DirectoryLister;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.spi.Plugin;
@@ -29,22 +30,24 @@ public class TestingHivePlugin
implements Plugin
{
private final Optional metastore;
+ private final Optional openTelemetry;
private final Module module;
private final Optional directoryLister;
public TestingHivePlugin()
{
- this(Optional.empty(), EMPTY_MODULE, Optional.empty());
+ this(Optional.empty(), Optional.empty(), EMPTY_MODULE, Optional.empty());
}
public TestingHivePlugin(HiveMetastore metastore)
{
- this(Optional.of(metastore), EMPTY_MODULE, Optional.empty());
+ this(Optional.of(metastore), Optional.empty(), EMPTY_MODULE, Optional.empty());
}
- public TestingHivePlugin(Optional metastore, Module module, Optional directoryLister)
+ public TestingHivePlugin(Optional metastore, Optional openTelemetry, Module module, Optional directoryLister)
{
this.metastore = requireNonNull(metastore, "metastore is null");
+ this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
this.module = requireNonNull(module, "module is null");
this.directoryLister = requireNonNull(directoryLister, "directoryLister is null");
}
@@ -52,6 +55,6 @@ public TestingHivePlugin(Optional metastore, Module module, Optio
@Override
public Iterable getConnectorFactories()
{
- return ImmutableList.of(new TestingHiveConnectorFactory(metastore, module, directoryLister));
+ return ImmutableList.of(new TestingHiveConnectorFactory(metastore, openTelemetry, module, directoryLister));
}
}
diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3/TestTrinoS3FileSystemAccessOperations.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3/TestTrinoS3FileSystemAccessOperations.java
new file mode 100644
index 000000000000..a065406c8d39
--- /dev/null
+++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3/TestTrinoS3FileSystemAccessOperations.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hive.s3;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultiset;
+import com.google.common.collect.Multiset;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
+import io.opentelemetry.sdk.trace.SdkTracerProvider;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
+import io.trino.hdfs.HdfsConfig;
+import io.trino.hdfs.HdfsEnvironment;
+import io.trino.hdfs.authentication.NoHdfsAuthentication;
+import io.trino.plugin.hive.HiveQueryRunner;
+import io.trino.plugin.hive.NodeVersion;
+import io.trino.plugin.hive.metastore.HiveMetastoreConfig;
+import io.trino.plugin.hive.metastore.file.FileHiveMetastore;
+import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig;
+import io.trino.testing.AbstractTestQueryFramework;
+import io.trino.testing.DistributedQueryRunner;
+import io.trino.testing.QueryRunner;
+import io.trino.testing.containers.Minio;
+import org.intellij.lang.annotations.Language;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.util.Arrays;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static io.trino.plugin.hive.HiveQueryRunner.TPCH_SCHEMA;
+import static io.trino.plugin.hive.HiveTestUtils.HDFS_CONFIGURATION;
+import static io.trino.plugin.hive.util.MultisetAssertions.assertMultisetsEqual;
+import static io.trino.testing.DataProviders.toDataProvider;
+import static io.trino.testing.TestingNames.randomNameSuffix;
+import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY;
+import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY;
+import static java.util.stream.Collectors.toCollection;
+
+@Test(singleThreaded = true) // S3 request counters shares mutable state so can't be run from many threads simultaneously
+public class TestTrinoS3FileSystemAccessOperations
+ extends AbstractTestQueryFramework
+{
+ private static final String BUCKET = "test-bucket";
+
+ private Minio minio;
+ private InMemorySpanExporter spanExporter;
+
+ @Override
+ protected QueryRunner createQueryRunner()
+ throws Exception
+ {
+ minio = closeAfterClass(Minio.builder().build());
+ minio.start();
+ minio.createBucket(BUCKET);
+
+ spanExporter = closeAfterClass(InMemorySpanExporter.create());
+
+ SdkTracerProvider tracerProvider = SdkTracerProvider.builder()
+ .addSpanProcessor(SimpleSpanProcessor.create(spanExporter))
+ .build();
+
+ OpenTelemetry openTelemetry = OpenTelemetrySdk.builder()
+ .setTracerProvider(tracerProvider)
+ .build();
+
+ return HiveQueryRunner.builder()
+ .setMetastore(distributedQueryRunner -> {
+ File baseDir = distributedQueryRunner.getCoordinator().getBaseDataDir().resolve("hive_data").toFile();
+ return new FileHiveMetastore(
+ new NodeVersion("testversion"),
+ new HdfsEnvironment(HDFS_CONFIGURATION, new HdfsConfig(), new NoHdfsAuthentication()),
+ new HiveMetastoreConfig().isHideDeltaLakeTables(),
+ new FileHiveMetastoreConfig()
+ .setCatalogDirectory(baseDir.toURI().toString())
+ .setDisableLocationChecks(true) // matches Glue behavior
+ .setMetastoreUser("test"));
+ })
+ .setHiveProperties(ImmutableMap.builder()
+ .put("hive.s3.aws-access-key", MINIO_ACCESS_KEY)
+ .put("hive.s3.aws-secret-key", MINIO_SECRET_KEY)
+ .put("hive.s3.endpoint", minio.getMinioAddress())
+ .put("hive.s3.path-style-access", "true")
+ .put("hive.non-managed-table-writes-enabled", "true")
+ .buildOrThrow())
+ .setOpenTelemetry(openTelemetry)
+ .setInitialSchemasLocationBase("s3://" + BUCKET)
+ .build();
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void tearDown()
+ {
+ // closed by closeAfterClass
+ spanExporter = null;
+ minio = null;
+ }
+
+ @Test(dataProvider = "storageFormats")
+ public void testSelectWithFilter(StorageFormat format)
+ {
+ assertUpdate("DROP TABLE IF EXISTS test_select_from_where");
+ String tableLocation = randomTableLocation("test_select_from_where");
+
+ assertUpdate("CREATE TABLE test_select_from_where WITH (format = '" + format + "', external_location = '" + tableLocation + "') AS SELECT 2 AS age", 1);
+
+ assertFileSystemAccesses("SELECT * FROM test_select_from_where WHERE age = 2",
+ ImmutableMultiset.builder()
+ // TODO https://github.com/trinodb/trino/issues/18334 Reduce GetObject call for Parquet format
+ .addCopies("S3.GetObject", occurrences(format, 1, 2))
+ .add("S3.ListObjectsV2")
+ .addCopies("S3.GetObjectMetadata", occurrences(format, 1, 0))
+ .build());
+
+ assertUpdate("DROP TABLE test_select_from_where");
+ }
+
+ @Test(dataProvider = "storageFormats")
+ public void testSelectPartitionTable(StorageFormat format)
+ {
+ assertUpdate("DROP TABLE IF EXISTS test_select_from_partition");
+ String tableLocation = randomTableLocation("test_select_from_partition");
+
+ assertUpdate("CREATE TABLE test_select_from_partition (data int, key varchar)" +
+ "WITH (partitioned_by = ARRAY['key'], format = '" + format + "', external_location = '" + tableLocation + "')");
+ assertUpdate("INSERT INTO test_select_from_partition VALUES (1, 'part1'), (2, 'part2')", 2);
+
+ assertFileSystemAccesses("SELECT * FROM test_select_from_partition",
+ ImmutableMultiset.builder()
+ // TODO https://github.com/trinodb/trino/issues/18334 Reduce GetObject call for Parquet format
+ .addCopies("S3.GetObject", occurrences(format, 2, 4))
+ .addCopies("S3.ListObjectsV2", 2)
+ .addCopies("S3.GetObjectMetadata", occurrences(format, 2, 0))
+ .build());
+
+ assertFileSystemAccesses("SELECT * FROM test_select_from_partition WHERE key = 'part1'",
+ ImmutableMultiset.builder()
+ // TODO https://github.com/trinodb/trino/issues/18334 Reduce GetObject call for Parquet format
+ .addCopies("S3.GetObject", occurrences(format, 1, 2))
+ .add("S3.ListObjectsV2")
+ .addCopies("S3.GetObjectMetadata", occurrences(format, 1, 0))
+ .build());
+
+ assertUpdate("INSERT INTO test_select_from_partition VALUES (11, 'part1')", 1);
+ assertFileSystemAccesses("SELECT * FROM test_select_from_partition WHERE key = 'part1'",
+ ImmutableMultiset.builder()
+ // TODO https://github.com/trinodb/trino/issues/18334 Reduce GetObject call for Parquet format
+ .addCopies("S3.GetObject", occurrences(format, 2, 4))
+ .addCopies("S3.ListObjectsV2", 1)
+ .addCopies("S3.GetObjectMetadata", occurrences(format, 2, 0))
+ .build());
+
+ assertUpdate("DROP TABLE test_select_from_partition");
+ }
+
+ private static String randomTableLocation(String tableName)
+ {
+ return "s3://%s/%s/%s-%s".formatted(BUCKET, TPCH_SCHEMA, tableName, randomNameSuffix());
+ }
+
+ private void assertFileSystemAccesses(@Language("SQL") String query, Multiset expectedAccesses)
+ {
+ DistributedQueryRunner queryRunner = getDistributedQueryRunner();
+ spanExporter.reset();
+ queryRunner.executeWithQueryId(queryRunner.getDefaultSession(), query);
+ assertMultisetsEqual(getOperations(), expectedAccesses);
+ }
+
+ private Multiset getOperations()
+ {
+ return spanExporter.getFinishedSpanItems().stream()
+ .map(SpanData::getName)
+ .collect(toCollection(HashMultiset::create));
+ }
+
+ @DataProvider
+ public static Object[][] storageFormats()
+ {
+ return Arrays.stream(StorageFormat.values())
+ .collect(toDataProvider());
+ }
+
+ private static int occurrences(StorageFormat tableType, int orcValue, int parquetValue)
+ {
+ checkArgument(!(orcValue == parquetValue), "No need to use Occurrences when ORC and Parquet");
+ return switch (tableType) {
+ case ORC -> orcValue;
+ case PARQUET -> parquetValue;
+ };
+ }
+
+ enum StorageFormat
+ {
+ ORC,
+ PARQUET,
+ }
+}