diff --git a/docs/src/main/sphinx/connector/delta-lake.rst b/docs/src/main/sphinx/connector/delta-lake.rst index a6c2267f7610..7919c034ec60 100644 --- a/docs/src/main/sphinx/connector/delta-lake.rst +++ b/docs/src/main/sphinx/connector/delta-lake.rst @@ -153,6 +153,9 @@ values. Typical usage does not require you to configure them. * - ``delta.target-max-file-size`` - Target maximum size of written files; the actual size may be larger. - ``1GB`` + * - ``iceberg.unique-table-location`` + - Use randomized, unique table locations. + - ``true`` The following table describes performance tuning catalog properties for the connector. diff --git a/plugin/trino-delta-lake/pom.xml b/plugin/trino-delta-lake/pom.xml index 577c50422743..c60a28738a01 100644 --- a/plugin/trino-delta-lake/pom.xml +++ b/plugin/trino-delta-lake/pom.xml @@ -387,6 +387,7 @@ **/TestDeltaLakeGlueMetastore.java **/TestDeltaLakeCleanUpGlueMetastore.java **/TestDeltaLakeSharedGlueMetastoreWithTableRedirections.java + **/TestDeltaLakeTableWithCustomLocationUsingGlueMetastore.java @@ -432,6 +433,7 @@ **/TestDeltaLakeGlueMetastore.java **/TestDeltaLakeCleanUpGlueMetastore.java **/TestDeltaLakeSharedGlueMetastoreWithTableRedirections.java + **/TestDeltaLakeTableWithCustomLocationUsingGlueMetastore.java diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java index da61588f7b09..57054ad57e23 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java @@ -69,6 +69,7 @@ public class DeltaLakeConfig private boolean deleteSchemaLocationsFallback; private String parquetTimeZone = TimeZone.getDefault().getID(); private DataSize targetMaxFileSize = DataSize.of(1, GIGABYTE); + private boolean uniqueTableLocation = true; public Duration getMetadataCacheTtl() { @@ -398,4 +399,17 @@ public DeltaLakeConfig setTargetMaxFileSize(DataSize targetMaxFileSize) this.targetMaxFileSize = targetMaxFileSize; return this; } + + public boolean isUniqueTableLocation() + { + return uniqueTableLocation; + } + + @Config("delta.unique-table-location") + @ConfigDescription("Use randomized, unique table locations") + public DeltaLakeConfig setUniqueTableLocation(boolean uniqueTableLocation) + { + this.uniqueTableLocation = uniqueTableLocation; + return this; + } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeErrorCode.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeErrorCode.java index 28695adc4958..5d46055448b5 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeErrorCode.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeErrorCode.java @@ -26,6 +26,7 @@ public enum DeltaLakeErrorCode DELTA_LAKE_INVALID_TABLE(1, EXTERNAL), DELTA_LAKE_BAD_DATA(2, EXTERNAL), DELTA_LAKE_BAD_WRITE(3, EXTERNAL), + DELTA_LAKE_FILESYSTEM_ERROR(4, EXTERNAL), /**/; private final ErrorCode errorCode; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index dcc7b7af1c24..0d54c0639cdf 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -276,6 +276,7 @@ public class DeltaLakeMetadata private final DeltaLakeRedirectionsProvider deltaLakeRedirectionsProvider; private final ExtendedStatisticsAccess statisticsAccess; private final boolean deleteSchemaLocationsFallback; + private final boolean useUniqueTableLocation; public DeltaLakeMetadata( DeltaLakeMetastore metastore, @@ -293,7 +294,8 @@ public DeltaLakeMetadata( boolean ignoreCheckpointWriteFailures, boolean deleteSchemaLocationsFallback, DeltaLakeRedirectionsProvider deltaLakeRedirectionsProvider, - ExtendedStatisticsAccess statisticsAccess) + ExtendedStatisticsAccess statisticsAccess, + boolean useUniqueTableLocation) { this.metastore = requireNonNull(metastore, "metastore is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); @@ -312,6 +314,7 @@ public DeltaLakeMetadata( this.deltaLakeRedirectionsProvider = requireNonNull(deltaLakeRedirectionsProvider, "deltaLakeRedirectionsProvider is null"); this.statisticsAccess = requireNonNull(statisticsAccess, "statisticsAccess is null"); this.deleteSchemaLocationsFallback = deleteSchemaLocationsFallback; + this.useUniqueTableLocation = useUniqueTableLocation; } @Override @@ -645,7 +648,11 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe if (schemaLocation.isEmpty()) { throw new TrinoException(NOT_SUPPORTED, "The 'location' property must be specified either for the table or the schema"); } - location = new Path(schemaLocation.get(), tableName).toString(); + String tableNameForLocation = tableName; + if (useUniqueTableLocation) { + tableNameForLocation += "-" + randomUUID().toString().replace("-", ""); + } + location = new Path(schemaLocation.get(), tableNameForLocation).toString(); checkPathContainsNoFiles(session, new Path(location)); external = false; } @@ -771,7 +778,11 @@ public DeltaLakeOutputTableHandle beginCreateTable(ConnectorSession session, Con if (schemaLocation.isEmpty()) { throw new TrinoException(NOT_SUPPORTED, "The 'location' property must be specified either for the table or the schema"); } - location = new Path(schemaLocation.get(), tableName).toString(); + String tableNameForLocation = tableName; + if (useUniqueTableLocation) { + tableNameForLocation += "-" + randomUUID().toString().replace("-", ""); + } + location = new Path(schemaLocation.get(), tableNameForLocation).toString(); external = false; } Path targetPath = new Path(location); @@ -1605,7 +1616,9 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl private boolean allowWrite(ConnectorSession session, DeltaLakeTableHandle tableHandle) { - boolean requiresOptIn = transactionLogWriterFactory.newWriter(session, tableHandle.getLocation()).isUnsafe(); + String tableLocation = metastore.getTableLocation(tableHandle.getSchemaTableName(), session); + Path tableMetadataDirectory = new Path(new Path(tableLocation).getParent().toString(), tableHandle.getTableName()); + boolean requiresOptIn = transactionLogWriterFactory.newWriter(session, tableMetadataDirectory.toString()).isUnsafe(); return !requiresOptIn || unsafeWritesEnabled; } @@ -1789,7 +1802,7 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle throw new TableNotFoundException(handle.getSchemaTableName()); } - metastore.dropTable(session, handle.getSchemaName(), handle.getTableName()); + metastore.dropTable(session, handle.getSchemaName(), handle.getTableName(), table.get().getTableType().equals(EXTERNAL_TABLE.toString())); } @Override diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java index 061b68b1990c..b72580456374 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java @@ -53,6 +53,7 @@ public class DeltaLakeMetadataFactory private final boolean ignoreCheckpointWriteFailures; private final long perTransactionMetastoreCacheMaximumSize; private final boolean deleteSchemaLocationsFallback; + private final boolean useUniqueTableLocation; @Inject public DeltaLakeMetadataFactory( @@ -89,6 +90,7 @@ public DeltaLakeMetadataFactory( this.ignoreCheckpointWriteFailures = deltaLakeConfig.isIgnoreCheckpointWriteFailures(); this.perTransactionMetastoreCacheMaximumSize = deltaLakeConfig.getPerTransactionMetastoreCacheMaximumSize(); this.deleteSchemaLocationsFallback = deltaLakeConfig.isDeleteSchemaLocationsFallback(); + this.useUniqueTableLocation = deltaLakeConfig.isUniqueTableLocation(); } public DeltaLakeMetadata create(ConnectorIdentity identity) @@ -101,7 +103,8 @@ public DeltaLakeMetadata create(ConnectorIdentity identity) cachingHiveMetastore, transactionLogAccess, typeManager, - statisticsAccess); + statisticsAccess, + hdfsEnvironment); return new DeltaLakeMetadata( deltaLakeMetastore, hdfsEnvironment, @@ -118,6 +121,7 @@ public DeltaLakeMetadata create(ConnectorIdentity identity) ignoreCheckpointWriteFailures, deleteSchemaLocationsFallback, deltaLakeRedirectionsProvider, - statisticsAccess); + statisticsAccess, + useUniqueTableLocation); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeMetastore.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeMetastore.java index c631becd161d..25ee7e22c5c3 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeMetastore.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeMetastore.java @@ -45,7 +45,7 @@ public interface DeltaLakeMetastore void createTable(ConnectorSession session, Table table, PrincipalPrivileges principalPrivileges); - void dropTable(ConnectorSession session, String databaseName, String tableName); + void dropTable(ConnectorSession session, String databaseName, String tableName, boolean externalTable); void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java index 544d657d88c8..3a5c0c8b843b 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java @@ -26,6 +26,7 @@ import io.trino.plugin.deltalake.transactionlog.TableSnapshot; import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeFileStatistics; +import io.trino.plugin.hive.HdfsEnvironment; import io.trino.plugin.hive.metastore.Database; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.PrincipalPrivileges; @@ -40,6 +41,7 @@ import io.trino.spi.statistics.Estimate; import io.trino.spi.statistics.TableStatistics; import io.trino.spi.type.TypeManager; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.IOException; @@ -55,6 +57,7 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.plugin.deltalake.DeltaLakeColumnType.PARTITION_KEY; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; +import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_FILESYSTEM_ERROR; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_TABLE; import static io.trino.plugin.deltalake.DeltaLakeMetadata.PATH_PROPERTY; @@ -79,17 +82,20 @@ public class HiveMetastoreBackedDeltaLakeMetastore private final TransactionLogAccess transactionLogAccess; private final TypeManager typeManager; private final CachingExtendedStatisticsAccess statisticsAccess; + private final HdfsEnvironment hdfsEnvironment; public HiveMetastoreBackedDeltaLakeMetastore( HiveMetastore delegate, TransactionLogAccess transactionLogAccess, TypeManager typeManager, - CachingExtendedStatisticsAccess statisticsAccess) + CachingExtendedStatisticsAccess statisticsAccess, + HdfsEnvironment hdfsEnvironment) { this.delegate = requireNonNull(delegate, "delegate is null"); this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogSupport is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.statisticsAccess = requireNonNull(statisticsAccess, "statisticsAccess is null"); + this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); } @Override @@ -161,12 +167,22 @@ public void createTable(ConnectorSession session, Table table, PrincipalPrivileg } @Override - public void dropTable(ConnectorSession session, String databaseName, String tableName) + public void dropTable(ConnectorSession session, String databaseName, String tableName, boolean externalTable) { String tableLocation = getTableLocation(new SchemaTableName(databaseName, tableName), session); delegate.dropTable(databaseName, tableName, true); statisticsAccess.invalidateCache(tableLocation); transactionLogAccess.invalidateCaches(tableLocation); + if (!externalTable) { + try { + Path path = new Path(tableLocation); + FileSystem fileSystem = hdfsEnvironment.getFileSystem(new HdfsEnvironment.HdfsContext(session), path); + fileSystem.delete(path, true); + } + catch (IOException e) { + throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, format("Failed to delete directory %s of the table %s", tableLocation, tableName), e); + } + } } @Override diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java index f98b218ba982..c7de38f06859 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java @@ -31,6 +31,7 @@ import io.trino.testing.DistributedQueryRunner; import io.trino.testing.MaterializedResult; import io.trino.testing.MaterializedResultWithQueryId; +import io.trino.testing.MaterializedRow; import io.trino.testing.QueryRunner; import io.trino.testing.TestingConnectorBehavior; import io.trino.tpch.TpchTable; @@ -574,12 +575,8 @@ public void testCreateTableAsWithSchemaLocation() assertUpdate(format("CREATE TABLE %s.%s AS SELECT name FROM nation", schemaName, tableName2), "SELECT count(*) FROM nation"); assertQuery(format("SELECT * FROM %s.%s", schemaName, tableName), "SELECT name FROM nation"); assertQuery(format("SELECT * FROM %s.%s", schemaName, tableName2), "SELECT name FROM nation"); - assertQuery( - "SELECT DISTINCT regexp_replace(\"$path\", '(.*[/][^/]*)[/][^/]*$', '$1') FROM " + schemaName + "." + tableName, - format("VALUES '%s/%s'", schemaLocation, tableName)); - assertQuery( - "SELECT DISTINCT regexp_replace(\"$path\", '(.*[/][^/]*)[/][^/]*$', '$1') FROM " + schemaName + "." + tableName2, - format("VALUES '%s/%s'", schemaLocation, tableName2)); + validatePath(schemaLocation, schemaName, tableName); + validatePath(schemaLocation, schemaName, tableName2); } @Test @@ -599,12 +596,17 @@ public void testCreateTableWithSchemaLocation() assertUpdate(format("INSERT INTO %s.%s SELECT name FROM nation", schemaName, tableName2), "SELECT count(*) FROM nation"); assertQuery(format("SELECT * FROM %s.%s", schemaName, tableName), "SELECT name FROM nation"); assertQuery(format("SELECT * FROM %s.%s", schemaName, tableName2), "SELECT name FROM nation"); - assertQuery( - "SELECT DISTINCT regexp_replace(\"$path\", '(.*[/][^/]*)[/][^/]*$', '$1') FROM " + schemaName + "." + tableName, - format("VALUES '%s/%s'", schemaLocation, tableName)); - assertQuery( - "SELECT DISTINCT regexp_replace(\"$path\", '(.*[/][^/]*)[/][^/]*$', '$1') FROM " + schemaName + "." + tableName2, - format("VALUES '%s/%s'", schemaLocation, tableName2)); + validatePath(schemaLocation, schemaName, tableName); + validatePath(schemaLocation, schemaName, tableName2); + } + + private void validatePath(String schemaLocation, String schemaName, String tableName) + { + List materializedRows = getQueryRunner() + .execute("SELECT DISTINCT regexp_replace(\"$path\", '(.*[/][^/]*)[/][^/]*$', '$1') FROM " + schemaName + "." + tableName) + .getMaterializedRows(); + assertThat(materializedRows.size()).isEqualTo(1); + assertThat((String) materializedRows.get(0).getField(0)).matches(format("%s/%s.*", schemaLocation, tableName)); } @Override diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeTableWithCustomLocation.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeTableWithCustomLocation.java new file mode 100644 index 000000000000..71251488a7ff --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeTableWithCustomLocation.java @@ -0,0 +1,80 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake; + +import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.Table; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.MaterializedRow; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.TableType; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +import static io.trino.testing.sql.TestTable.randomTableSuffix; +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +public abstract class BaseDeltaLakeTableWithCustomLocation + extends AbstractTestQueryFramework +{ + protected static final String SCHEMA = "test_tables_with_custom_location" + randomTableSuffix(); + protected static final String CATALOG_NAME = "delta_with_custom_location"; + protected File metastoreDir; + protected HiveMetastore metastore; + protected HdfsEnvironment hdfsEnvironment; + protected HdfsEnvironment.HdfsContext hdfsContext; + + @Test + public void testTableHasUuidSuffixInLocation() + { + String tableName = "table_with_uuid" + randomTableSuffix(); + assertQuerySucceeds(format("CREATE TABLE %s AS SELECT 1 as val", tableName)); + Optional table = metastore.getTable(SCHEMA, tableName); + assertTrue(table.isPresent(), "Table should exists"); + String location = table.get().getStorage().getLocation(); + assertThat(location).matches(format(".*%s-[0-9a-f]{32}", tableName)); + } + + @Test + public void testCreateAndDrop() + throws IOException + { + String tableName = "test_create_and_drop" + randomTableSuffix(); + assertQuerySucceeds(format("CREATE TABLE %s AS SELECT 1 as val", tableName)); + Table table = metastore.getTable(SCHEMA, tableName).orElseThrow(); + assertThat(table.getTableType()).isEqualTo(TableType.MANAGED_TABLE.name()); + + org.apache.hadoop.fs.Path tableLocation = new org.apache.hadoop.fs.Path(table.getStorage().getLocation()); + FileSystem fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, tableLocation); + assertTrue(fileSystem.exists(tableLocation), "The directory corresponding to the table storage location should exist"); + List materializedRows = computeActual("SELECT \"$path\" FROM " + tableName).getMaterializedRows(); + assertEquals(materializedRows.size(), 1); + String filePath = (String) materializedRows.get(0).getField(0); + assertTrue(fileSystem.exists(new org.apache.hadoop.fs.Path(filePath)), "The data file should exist"); + assertQuerySucceeds(format("DROP TABLE %s", tableName)); + assertFalse(metastore.getTable(SCHEMA, tableName).isPresent(), "Table should be dropped"); + assertFalse(fileSystem.exists(new Path(filePath)), "The data file should have been removed"); + assertFalse(fileSystem.exists(tableLocation), "The directory corresponding to the dropped Delta Lake table should be removed"); + } +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAdlsConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAdlsConnectorSmokeTest.java index a7cbd1fda79d..37fe5ecbecf5 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAdlsConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAdlsConnectorSmokeTest.java @@ -159,7 +159,7 @@ protected List listCheckpointFiles(String transactionLogDirectory) private List listAllFilesRecursive(String directory) { - String azurePath = bucketName + "/" + directory + "/"; + String azurePath = bucketName + "/" + directory; Duration timeout = Duration.ofMinutes(5); List allPaths = azureContainerClient.listBlobs(new ListBlobsOptions().setPrefix(azurePath), timeout).stream() .map(BlobItem::getName) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java index 76caf7dc0a03..5e0a97e24d80 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java @@ -62,7 +62,8 @@ public void testDefaults() .setDeleteSchemaLocationsFallback(false) .setParquetTimeZone(TimeZone.getDefault().getID()) .setPerTransactionMetastoreCacheMaximumSize(1000) - .setTargetMaxFileSize(DataSize.of(1, GIGABYTE))); + .setTargetMaxFileSize(DataSize.of(1, GIGABYTE)) + .setUniqueTableLocation(true)); } @Test @@ -93,6 +94,7 @@ public void testExplicitPropertyMappings() .put("delta.delete-schema-locations-fallback", "true") .put("delta.parquet.time-zone", nonDefaultTimeZone().getID()) .put("delta.target-max-file-size", "2 GB") + .put("delta.unique-table-location", "false") .buildOrThrow(); DeltaLakeConfig expected = new DeltaLakeConfig() @@ -119,7 +121,8 @@ public void testExplicitPropertyMappings() .setDeleteSchemaLocationsFallback(true) .setParquetTimeZone(nonDefaultTimeZone().getID()) .setPerTransactionMetastoreCacheMaximumSize(500) - .setTargetMaxFileSize(DataSize.of(2, GIGABYTE)); + .setTargetMaxFileSize(DataSize.of(2, GIGABYTE)) + .setUniqueTableLocation(false); assertFullMapping(properties, expected); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java index 142280b1b7d2..73d78cadde20 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java @@ -175,7 +175,8 @@ public DeltaLakeMetastore getDeltaLakeMetastore( hiveMetastoreFactory.createMetastore(Optional.empty()), transactionLogAccess, typeManager, - statistics); + statistics, + HDFS_ENVIRONMENT); } }); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java index cd91249c100a..6b7f4aee6b07 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java @@ -261,7 +261,7 @@ public void createTable(ConnectorSession session, Table table, PrincipalPrivileg } @Override - public void dropTable(ConnectorSession session, String databaseName, String tableName) + public void dropTable(ConnectorSession session, String databaseName, String tableName, boolean externalTable) { throw new UnsupportedOperationException("Unimplemented"); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeTableWithCustomLocationUsingGlueMetastore.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeTableWithCustomLocationUsingGlueMetastore.java new file mode 100644 index 000000000000..59c99a13664e --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeTableWithCustomLocationUsingGlueMetastore.java @@ -0,0 +1,106 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake; + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.airlift.log.Logger; +import io.trino.Session; +import io.trino.plugin.hive.HdfsConfig; +import io.trino.plugin.hive.HdfsConfigurationInitializer; +import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.HiveHdfsConfiguration; +import io.trino.plugin.hive.authentication.NoHdfsAuthentication; +import io.trino.plugin.hive.metastore.glue.DefaultGlueColumnStatisticsProviderFactory; +import io.trino.plugin.hive.metastore.glue.GlueHiveMetastore; +import io.trino.plugin.hive.metastore.glue.GlueHiveMetastoreConfig; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; +import org.testng.annotations.AfterClass; + +import java.io.File; +import java.util.Optional; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static io.trino.plugin.deltalake.DeltaLakeConnectorFactory.CONNECTOR_NAME; +import static io.trino.testing.TestingSession.testSessionBuilder; + +public class TestDeltaLakeTableWithCustomLocationUsingGlueMetastore + extends BaseDeltaLakeTableWithCustomLocation +{ + private static final Logger LOG = Logger.get(TestDeltaLakeSharedGlueMetastoreWithTableRedirections.class); + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + Session deltaLakeSession = testSessionBuilder() + .setCatalog(CATALOG_NAME) + .setSchema(SCHEMA) + .build(); + + DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(deltaLakeSession).build(); + + this.metastoreDir = new File(queryRunner.getCoordinator().getBaseDataDir().resolve("delta_lake_data").toString()); + this.metastoreDir.deleteOnExit(); + + queryRunner.installPlugin(new DeltaLakePlugin()); + queryRunner.createCatalog( + CATALOG_NAME, + CONNECTOR_NAME, + ImmutableMap.builder() + .put("hive.metastore", "glue") + .put("hive.metastore.glue.region", "us-east-2") + .put("hive.metastore.glue.default-warehouse-dir", metastoreDir.getPath()) + .buildOrThrow()); + + HdfsConfig hdfsConfig = new HdfsConfig(); + hdfsEnvironment = new HdfsEnvironment( + new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of()), + hdfsConfig, + new NoHdfsAuthentication()); + GlueHiveMetastoreConfig glueConfig = new GlueHiveMetastoreConfig() + .setGlueRegion("us-east-2"); + metastore = new GlueHiveMetastore( + hdfsEnvironment, + glueConfig, + DefaultAWSCredentialsProviderChain.getInstance(), + directExecutor(), + new DefaultGlueColumnStatisticsProviderFactory(directExecutor(), directExecutor()), + Optional.empty(), + table -> true); + hdfsContext = new HdfsEnvironment.HdfsContext(queryRunner.getDefaultSession().toConnectorSession()); + + queryRunner.execute("CREATE SCHEMA " + SCHEMA + " WITH (location = '" + metastoreDir.getPath() + "')"); + return queryRunner; + } + + @AfterClass(alwaysRun = true) + public void tearDown() + { + try { + if (metastore != null) { + // Data is on the local disk and will be deleted by the deleteOnExit hook + metastore.dropDatabase(SCHEMA, false); + deleteRecursively(metastoreDir.toPath(), ALLOW_INSECURE); + } + } + catch (Exception e) { + LOG.error(e, "Failed to clean up Glue database: %s", SCHEMA); + } + } +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeTableWithCustomLocationUsingHiveMetastore.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeTableWithCustomLocationUsingHiveMetastore.java new file mode 100644 index 000000000000..5cc1d1c99826 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeTableWithCustomLocationUsingHiveMetastore.java @@ -0,0 +1,79 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake; + +import com.google.common.collect.ImmutableSet; +import io.trino.Session; +import io.trino.plugin.hive.HdfsConfig; +import io.trino.plugin.hive.HdfsConfiguration; +import io.trino.plugin.hive.HdfsConfigurationInitializer; +import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.HdfsEnvironment.HdfsContext; +import io.trino.plugin.hive.HiveHdfsConfiguration; +import io.trino.plugin.hive.NodeVersion; +import io.trino.plugin.hive.authentication.NoHdfsAuthentication; +import io.trino.plugin.hive.metastore.file.FileHiveMetastore; +import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig; +import io.trino.spi.security.ConnectorIdentity; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; + +import java.nio.file.Files; +import java.util.HashMap; +import java.util.Map; + +import static io.trino.plugin.deltalake.DeltaLakeConnectorFactory.CONNECTOR_NAME; +import static io.trino.testing.TestingSession.testSessionBuilder; + +public class TestDeltaLakeTableWithCustomLocationUsingHiveMetastore + extends BaseDeltaLakeTableWithCustomLocation +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + Session session = testSessionBuilder() + .setCatalog(CATALOG_NAME) + .setSchema(SCHEMA) + .build(); + + DistributedQueryRunner.Builder builder = DistributedQueryRunner.builder(session); + DistributedQueryRunner queryRunner = builder.build(); + + Map connectorProperties = new HashMap<>(); + HdfsConfig hdfsConfig = new HdfsConfig(); + HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of()); + hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication()); + metastoreDir = Files.createTempDirectory("test_delta_lake").toFile(); + FileHiveMetastoreConfig config = new FileHiveMetastoreConfig() + .setCatalogDirectory(metastoreDir.toURI().toString()) + .setMetastoreUser("test"); + hdfsContext = new HdfsContext(ConnectorIdentity.ofUser(config.getMetastoreUser())); + metastore = new FileHiveMetastore( + new NodeVersion("testversion"), + hdfsEnvironment, + false, + config); + connectorProperties.putIfAbsent("delta.unique-table-location", "true"); + connectorProperties.putIfAbsent("hive.metastore", "file"); + connectorProperties.putIfAbsent("hive.metastore.catalog.dir", metastoreDir.getPath()); + + queryRunner.installPlugin(new TestingDeltaLakePlugin()); + queryRunner.createCatalog(CATALOG_NAME, CONNECTOR_NAME, connectorProperties); + + queryRunner.execute("CREATE SCHEMA " + SCHEMA); + + return queryRunner; + } +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java index 3f7d0764d59a..1b18805c1f7e 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java @@ -130,7 +130,8 @@ public void setupMetastore() hiveMetastore, transactionLogAccess, typeManager, - statistics); + statistics, + hdfsEnvironment); } private DeltaLakeTableHandle registerTable(String tableName) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java index ffc4aeef7137..d5207d30b4eb 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java @@ -308,7 +308,7 @@ public synchronized void createTable(Table table, PrincipalPrivileges principalP checkArgument(table.getStorage().getLocation().isEmpty(), "Storage location for view must be empty"); } else if (table.getTableType().equals(MANAGED_TABLE.name())) { - if (!tableMetadataDirectory.equals(new Path(table.getStorage().getLocation()))) { + if (!(new Path(table.getStorage().getLocation()).toString().contains(tableMetadataDirectory.toString()))) { throw new TrinoException(HIVE_METASTORE_ERROR, "Table directory must be " + tableMetadataDirectory); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/TableMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/TableMetadata.java index aba4b31393e8..25607f1061bc 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/TableMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/TableMetadata.java @@ -33,6 +33,7 @@ import java.util.OptionalLong; import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.plugin.hive.HiveSchemaProperties.LOCATION_PROPERTY; import static io.trino.plugin.hive.metastore.StorageFormat.VIEW_STORAGE_FORMAT; import static java.util.Objects.requireNonNull; @@ -279,7 +280,7 @@ public Table toTable(String databaseName, String tableName, String location) owner, tableType, Storage.builder() - .setLocation(externalLocation.orElse(location)) + .setLocation(externalLocation.or(() -> Optional.ofNullable(parameters.get(LOCATION_PROPERTY))).orElse(location)) .setStorageFormat(storageFormat.map(StorageFormat::fromHiveStorageFormat).orElse(VIEW_STORAGE_FORMAT)) .setBucketProperty(bucketProperty) .setSerdeParameters(serdeParameters)