Skip to content

Commit

Permalink
Remove usages of Hadoop from FileHiveMetastore
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Aug 12, 2023
1 parent 304b5fa commit 79c0236
Show file tree
Hide file tree
Showing 17 changed files with 271 additions and 307 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import static com.google.inject.util.Modules.EMPTY_MODULE;
import static io.trino.plugin.deltalake.DeltaLakeConnectorFactory.CONNECTOR_NAME;
import static io.trino.plugin.hive.HiveMetadata.PRESTO_QUERY_ID_NAME;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.testing.TestingSession.testSessionBuilder;

Expand All @@ -65,7 +65,7 @@ protected QueryRunner createQueryRunner()
this.dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("delta_lake_data").toString();
this.metastore = new FileHiveMetastore(
new NodeVersion("testversion"),
HDFS_ENVIRONMENT,
HDFS_FILE_SYSTEM_FACTORY,
new HiveMetastoreConfig().isHideDeltaLakeTables(),
new FileHiveMetastoreConfig()
.setCatalogDirectory(dataDirectory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ private Multiset<FileOperation> getOperations()
{
return trackingFileSystemFactory.getOperationCounts()
.entrySet().stream()
.filter(entry -> !entry.getKey().location().path().endsWith(".trinoSchema"))
.flatMap(entry -> nCopies(entry.getValue(), FileOperation.create(
entry.getKey().location().path(),
entry.getKey().operationType())).stream())
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package io.trino.plugin.hive.metastore.file;

import com.google.inject.Inject;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.hive.HideDeltaLakeTables;
import io.trino.plugin.hive.NodeVersion;
import io.trino.plugin.hive.metastore.HiveMetastore;
Expand All @@ -29,10 +29,10 @@ public class FileHiveMetastoreFactory
private final FileHiveMetastore metastore;

@Inject
public FileHiveMetastoreFactory(NodeVersion nodeVersion, HdfsEnvironment hdfsEnvironment, @HideDeltaLakeTables boolean hideDeltaLakeTables, FileHiveMetastoreConfig config)
public FileHiveMetastoreFactory(NodeVersion nodeVersion, TrinoFileSystemFactory fileSystemFactory, @HideDeltaLakeTables boolean hideDeltaLakeTables, FileHiveMetastoreConfig config)
{
// file metastore does not support impersonation, so just create a single shared instance
metastore = new FileHiveMetastore(nodeVersion, hdfsEnvironment, hideDeltaLakeTables, config);
metastore = new FileHiveMetastore(nodeVersion, fileSystemFactory, hideDeltaLakeTables, config);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.io.IOException;

import static io.trino.plugin.hive.HiveMetadata.PRESTO_QUERY_ID_NAME;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY;
import static io.trino.testing.TestingNames.randomNameSuffix;

public class TestHiveCreateSchemaInternalRetry
Expand All @@ -52,7 +52,7 @@ private FileHiveMetastore createMetastore(String dataDirectory)
{
return new FileHiveMetastore(
new NodeVersion("testversion"),
HDFS_ENVIRONMENT,
HDFS_FILE_SYSTEM_FACTORY,
new HiveMetastoreConfig().isHideDeltaLakeTables(),
new FileHiveMetastoreConfig()
.setCatalogDirectory(dataDirectory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import java.io.File;

import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY;

// staging directory is shared mutable state
@Test(singleThreaded = true)
Expand All @@ -34,7 +34,7 @@ protected HiveMetastore createMetastore(File tempDir)
File baseDir = new File(tempDir, "metastore");
return new FileHiveMetastore(
new NodeVersion("test_version"),
HDFS_ENVIRONMENT,
HDFS_FILE_SYSTEM_FACTORY,
true,
new FileHiveMetastoreConfig()
.setCatalogDirectory(baseDir.toURI().toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY;
import static io.trino.plugin.hive.HiveType.HIVE_INT;
import static io.trino.plugin.hive.metastore.PrincipalPrivileges.NO_PRIVILEGES;
import static io.trino.plugin.hive.util.HiveClassNames.HUDI_PARQUET_INPUT_FORMAT;
Expand All @@ -55,7 +55,7 @@ public void setUp()

metastore = new FileHiveMetastore(
new NodeVersion("testversion"),
HDFS_ENVIRONMENT,
HDFS_FILE_SYSTEM_FACTORY,
new HiveMetastoreConfig().isHideDeltaLakeTables(),
new FileHiveMetastoreConfig()
.setCatalogDirectory(tmpDir.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,22 @@
*/
package io.trino.plugin.hive.metastore.file;

import com.google.common.collect.ImmutableSet;
import io.trino.hdfs.DynamicHdfsConfiguration;
import io.trino.hdfs.HdfsConfig;
import io.trino.hdfs.HdfsConfiguration;
import io.trino.hdfs.HdfsConfigurationInitializer;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.hdfs.authentication.NoHdfsAuthentication;
import io.trino.plugin.hive.NodeVersion;
import io.trino.plugin.hive.metastore.HiveMetastoreConfig;

import java.io.File;

import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY;

public final class TestingFileHiveMetastore
{
private TestingFileHiveMetastore() {}

public static FileHiveMetastore createTestingFileHiveMetastore(File catalogDirectory)
{
HdfsConfig hdfsConfig = new HdfsConfig();
HdfsConfiguration hdfsConfiguration = new DynamicHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of());
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication());
return new FileHiveMetastore(
new NodeVersion("testversion"),
hdfsEnvironment,
HDFS_FILE_SYSTEM_FACTORY,
new HiveMetastoreConfig().isHideDeltaLakeTables(),
new FileHiveMetastoreConfig()
.setCatalogDirectory(catalogDirectory.toURI().toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.util.regex.Pattern;

import static com.google.common.base.Verify.verify;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY;
import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY;
Expand All @@ -55,7 +55,7 @@ protected QueryRunner createQueryRunner()
File baseDir = queryRunner.getCoordinator().getBaseDataDir().resolve("hive_data").toFile();
return new FileHiveMetastore(
new NodeVersion("testversion"),
HDFS_ENVIRONMENT,
HDFS_FILE_SYSTEM_FACTORY,
new HiveMetastoreConfig().isHideDeltaLakeTables(),
new FileHiveMetastoreConfig()
.setCatalogDirectory(baseDir.toURI().toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.io.File;
import java.util.List;

import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.util.Objects.requireNonNull;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -68,7 +68,7 @@ protected QueryRunner createQueryRunner()
File baseDir = queryRunner.getCoordinator().getBaseDataDir().resolve("hive_data").toFile();
return new FileHiveMetastore(
new NodeVersion("testversion"),
HDFS_ENVIRONMENT,
HDFS_FILE_SYSTEM_FACTORY,
new HiveMetastoreConfig().isHideDeltaLakeTables(),
new FileHiveMetastoreConfig()
.setCatalogDirectory(baseDir.toURI().toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.trino.hdfs.HdfsConfig;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.hdfs.authentication.NoHdfsAuthentication;
import io.trino.plugin.hive.HiveQueryRunner;
import io.trino.plugin.hive.NodeVersion;
import io.trino.plugin.hive.metastore.HiveMetastoreConfig;
Expand All @@ -45,7 +42,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.plugin.hive.HiveQueryRunner.TPCH_SCHEMA;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_CONFIGURATION;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY;
import static io.trino.plugin.hive.util.MultisetAssertions.assertMultisetsEqual;
import static io.trino.testing.DataProviders.toDataProvider;
import static io.trino.testing.TestingNames.randomNameSuffix;
Expand Down Expand Up @@ -85,7 +82,7 @@ protected QueryRunner createQueryRunner()
File baseDir = distributedQueryRunner.getCoordinator().getBaseDataDir().resolve("hive_data").toFile();
return new FileHiveMetastore(
new NodeVersion("testversion"),
new HdfsEnvironment(HDFS_CONFIGURATION, new HdfsConfig(), new NoHdfsAuthentication()),
HDFS_FILE_SYSTEM_FACTORY,
new HiveMetastoreConfig().isHideDeltaLakeTables(),
new FileHiveMetastoreConfig()
.setCatalogDirectory(baseDir.toURI().toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.hdfs.HdfsContext;
import io.trino.metadata.Metadata;
import io.trino.metadata.QualifiedObjectName;
import io.trino.metadata.TableHandle;
Expand Down Expand Up @@ -58,7 +57,6 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.io.FileIO;
Expand Down Expand Up @@ -108,7 +106,6 @@
import static io.trino.SystemSessionProperties.TASK_PARTITIONED_WRITER_COUNT;
import static io.trino.SystemSessionProperties.TASK_WRITER_COUNT;
import static io.trino.SystemSessionProperties.USE_PREFERRED_WRITE_PARTITIONING;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.hive.metastore.file.TestingFileHiveMetastore.createTestingFileHiveMetastore;
import static io.trino.plugin.iceberg.IcebergFileFormat.AVRO;
import static io.trino.plugin.iceberg.IcebergFileFormat.ORC;
Expand Down Expand Up @@ -4208,13 +4205,8 @@ public void testIncorrectIcebergFileSizes()
assertNotEquals(dataFile.get("file_size_in_bytes"), alteredValue);
dataFile.put("file_size_in_bytes", alteredValue);

// Replace the file through HDFS client. This is required for correct checksums.
HdfsContext context = new HdfsContext(getSession().toConnectorSession());
org.apache.hadoop.fs.Path manifestFilePath = new org.apache.hadoop.fs.Path(manifestFile);
FileSystem fs = HDFS_ENVIRONMENT.getFileSystem(context, manifestFilePath);

// Write altered metadata
try (OutputStream out = fs.create(manifestFilePath);
try (OutputStream out = fileSystem.newOutputFile(Location.of(manifestFile)).createOrOverwrite();
DataFileWriter<GenericData.Record> dataFileWriter = new DataFileWriter<>(new GenericDatumWriter<>(schema))) {
dataFileWriter.create(schema, out);
dataFileWriter.append(entry);
Expand All @@ -4227,7 +4219,9 @@ public void testIncorrectIcebergFileSizes()
assertQuery(session, "SELECT * FROM test_iceberg_file_size", "VALUES (123), (456), (758)");

// Using Iceberg provided file size fails the query
assertQueryFails("SELECT * FROM test_iceberg_file_size", ".*Error opening Iceberg split.*\\QIncorrect file size (%d) for file (end of stream not reached)\\E.*".formatted(alteredValue));
assertQueryFails(
"SELECT * FROM test_iceberg_file_size",
"(Malformed ORC file\\. Invalid file metadata.*)|(.*Error opening Iceberg split.* Incorrect file size \\(%s\\) for file .*)".formatted(alteredValue));

dropTable("test_iceberg_file_size");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static io.trino.plugin.iceberg.IcebergTestUtils.checkOrcFileSorting;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY;
import static io.trino.testing.containers.Minio.MINIO_REGION;
import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -63,12 +64,14 @@ protected QueryRunner createQueryRunner()
.setIcebergProperties(
ImmutableMap.<String, String>builder()
.put("iceberg.file-format", format.name())
.put("hive.s3.aws-access-key", MINIO_ACCESS_KEY)
.put("hive.s3.aws-secret-key", MINIO_SECRET_KEY)
.put("hive.s3.endpoint", minio.getMinioAddress())
.put("hive.s3.path-style-access", "true")
.put("hive.s3.streaming.part-size", "5MB") // minimize memory usage
.put("hive.s3.max-connections", "8") // verify no leaks
.put("fs.native-s3.enabled", "true")
.put("s3.aws-access-key", MINIO_ACCESS_KEY)
.put("s3.aws-secret-key", MINIO_SECRET_KEY)
.put("s3.region", MINIO_REGION)
.put("s3.endpoint", minio.getMinioAddress())
.put("s3.path-style-access", "true")
.put("s3.streaming.part-size", "5MB") // minimize memory usage
.put("s3.max-connections", "2") // verify no leaks
.put("iceberg.register-table-procedure.enabled", "true")
// Allows testing the sorting writer flushing to the file system with smaller tables
.put("iceberg.writer-sort-buffer-size", "1MB")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static com.google.inject.util.Modules.EMPTY_MODULE;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -61,7 +61,7 @@ protected DistributedQueryRunner createQueryRunner()
// Using FileHiveMetastore as approximation of HMS
this.metastore = new FileHiveMetastore(
new NodeVersion("testversion"),
HDFS_ENVIRONMENT,
HDFS_FILE_SYSTEM_FACTORY,
new HiveMetastoreConfig().isHideDeltaLakeTables(),
new FileHiveMetastoreConfig()
.setCatalogDirectory(dataDirectory.toString()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static com.google.inject.util.Modules.EMPTY_MODULE;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -66,7 +66,7 @@ protected LocalQueryRunner createQueryRunner()

HiveMetastore metastore = new FileHiveMetastore(
new NodeVersion("testversion"),
HDFS_ENVIRONMENT,
HDFS_FILE_SYSTEM_FACTORY,
new HiveMetastoreConfig().isHideDeltaLakeTables(),
new FileHiveMetastoreConfig()
.setCatalogDirectory(baseDir.toURI().toString())
Expand Down
12 changes: 12 additions & 0 deletions testing/trino-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,18 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem-s3</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hdfs</artifactId>
Expand Down
Loading

0 comments on commit 79c0236

Please sign in to comment.