Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to have unique table location for each delta lake table #13020

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ values. Typical usage does not require you to configure them.
* - ``delta.target-max-file-size``
- Target maximum size of written files; the actual size may be larger.
- ``1GB``
* - ``iceberg.unique-table-location``
- Use randomized, unique table locations.
- ``true``

The following table describes performance tuning catalog properties for the
connector.
Expand Down
2 changes: 2 additions & 0 deletions plugin/trino-delta-lake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@
<exclude>**/TestDeltaLakeGlueMetastore.java</exclude>
<exclude>**/TestDeltaLakeCleanUpGlueMetastore.java</exclude>
<exclude>**/TestDeltaLakeSharedGlueMetastoreWithTableRedirections.java</exclude>
<exclude>**/TestDeltaLakeTableWithCustomLocationUsingGlueMetastore.java</exclude>
</excludes>
</configuration>
</plugin>
Expand Down Expand Up @@ -432,6 +433,7 @@
<include>**/TestDeltaLakeGlueMetastore.java</include>
<include>**/TestDeltaLakeCleanUpGlueMetastore.java</include>
<include>**/TestDeltaLakeSharedGlueMetastoreWithTableRedirections.java</include>
<include>**/TestDeltaLakeTableWithCustomLocationUsingGlueMetastore.java</include>
</includes>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class DeltaLakeConfig
private boolean deleteSchemaLocationsFallback;
private String parquetTimeZone = TimeZone.getDefault().getID();
private DataSize targetMaxFileSize = DataSize.of(1, GIGABYTE);
private boolean uniqueTableLocation = true;

public Duration getMetadataCacheTtl()
{
Expand Down Expand Up @@ -398,4 +399,17 @@ public DeltaLakeConfig setTargetMaxFileSize(DataSize targetMaxFileSize)
this.targetMaxFileSize = targetMaxFileSize;
return this;
}

public boolean isUniqueTableLocation()
{
return uniqueTableLocation;
}

@Config("delta.unique-table-location")
@ConfigDescription("Use randomized, unique table locations")
public DeltaLakeConfig setUniqueTableLocation(boolean uniqueTableLocation)
{
this.uniqueTableLocation = uniqueTableLocation;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public enum DeltaLakeErrorCode
DELTA_LAKE_INVALID_TABLE(1, EXTERNAL),
DELTA_LAKE_BAD_DATA(2, EXTERNAL),
DELTA_LAKE_BAD_WRITE(3, EXTERNAL),
DELTA_LAKE_FILESYSTEM_ERROR(4, EXTERNAL),
/**/;

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ public class DeltaLakeMetadata
private final DeltaLakeRedirectionsProvider deltaLakeRedirectionsProvider;
private final ExtendedStatisticsAccess statisticsAccess;
private final boolean deleteSchemaLocationsFallback;
private final boolean useUniqueTableLocation;

public DeltaLakeMetadata(
DeltaLakeMetastore metastore,
Expand All @@ -293,7 +294,8 @@ public DeltaLakeMetadata(
boolean ignoreCheckpointWriteFailures,
boolean deleteSchemaLocationsFallback,
DeltaLakeRedirectionsProvider deltaLakeRedirectionsProvider,
ExtendedStatisticsAccess statisticsAccess)
ExtendedStatisticsAccess statisticsAccess,
boolean useUniqueTableLocation)
{
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
Expand All @@ -312,6 +314,7 @@ public DeltaLakeMetadata(
this.deltaLakeRedirectionsProvider = requireNonNull(deltaLakeRedirectionsProvider, "deltaLakeRedirectionsProvider is null");
this.statisticsAccess = requireNonNull(statisticsAccess, "statisticsAccess is null");
this.deleteSchemaLocationsFallback = deleteSchemaLocationsFallback;
this.useUniqueTableLocation = useUniqueTableLocation;
}

@Override
Expand Down Expand Up @@ -645,7 +648,11 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
if (schemaLocation.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "The 'location' property must be specified either for the table or the schema");
}
location = new Path(schemaLocation.get(), tableName).toString();
String tableNameForLocation = tableName;
if (useUniqueTableLocation) {
tableNameForLocation += "-" + randomUUID().toString().replace("-", "");
}
location = new Path(schemaLocation.get(), tableNameForLocation).toString();
checkPathContainsNoFiles(session, new Path(location));
external = false;
}
Expand Down Expand Up @@ -771,7 +778,11 @@ public DeltaLakeOutputTableHandle beginCreateTable(ConnectorSession session, Con
if (schemaLocation.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "The 'location' property must be specified either for the table or the schema");
}
location = new Path(schemaLocation.get(), tableName).toString();
String tableNameForLocation = tableName;
if (useUniqueTableLocation) {
tableNameForLocation += "-" + randomUUID().toString().replace("-", "");
}
location = new Path(schemaLocation.get(), tableNameForLocation).toString();
external = false;
}
Path targetPath = new Path(location);
Expand Down Expand Up @@ -1605,7 +1616,9 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl

private boolean allowWrite(ConnectorSession session, DeltaLakeTableHandle tableHandle)
{
boolean requiresOptIn = transactionLogWriterFactory.newWriter(session, tableHandle.getLocation()).isUnsafe();
String tableLocation = metastore.getTableLocation(tableHandle.getSchemaTableName(), session);
Path tableMetadataDirectory = new Path(new Path(tableLocation).getParent().toString(), tableHandle.getTableName());
boolean requiresOptIn = transactionLogWriterFactory.newWriter(session, tableMetadataDirectory.toString()).isUnsafe();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this change needed?

return !requiresOptIn || unsafeWritesEnabled;
}

Expand Down Expand Up @@ -1789,7 +1802,7 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle
throw new TableNotFoundException(handle.getSchemaTableName());
}

metastore.dropTable(session, handle.getSchemaName(), handle.getTableName());
metastore.dropTable(session, handle.getSchemaName(), handle.getTableName(), table.get().getTableType().equals(EXTERNAL_TABLE.toString()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class DeltaLakeMetadataFactory
private final boolean ignoreCheckpointWriteFailures;
private final long perTransactionMetastoreCacheMaximumSize;
private final boolean deleteSchemaLocationsFallback;
private final boolean useUniqueTableLocation;

@Inject
public DeltaLakeMetadataFactory(
Expand Down Expand Up @@ -89,6 +90,7 @@ public DeltaLakeMetadataFactory(
this.ignoreCheckpointWriteFailures = deltaLakeConfig.isIgnoreCheckpointWriteFailures();
this.perTransactionMetastoreCacheMaximumSize = deltaLakeConfig.getPerTransactionMetastoreCacheMaximumSize();
this.deleteSchemaLocationsFallback = deltaLakeConfig.isDeleteSchemaLocationsFallback();
this.useUniqueTableLocation = deltaLakeConfig.isUniqueTableLocation();
}

public DeltaLakeMetadata create(ConnectorIdentity identity)
Expand All @@ -101,7 +103,8 @@ public DeltaLakeMetadata create(ConnectorIdentity identity)
cachingHiveMetastore,
transactionLogAccess,
typeManager,
statisticsAccess);
statisticsAccess,
hdfsEnvironment);
return new DeltaLakeMetadata(
deltaLakeMetastore,
hdfsEnvironment,
Expand All @@ -118,6 +121,7 @@ public DeltaLakeMetadata create(ConnectorIdentity identity)
ignoreCheckpointWriteFailures,
deleteSchemaLocationsFallback,
deltaLakeRedirectionsProvider,
statisticsAccess);
statisticsAccess,
useUniqueTableLocation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public interface DeltaLakeMetastore

void createTable(ConnectorSession session, Table table, PrincipalPrivileges principalPrivileges);

void dropTable(ConnectorSession session, String databaseName, String tableName);
void dropTable(ConnectorSession session, String databaseName, String tableName, boolean externalTable);

void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeFileStatistics;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.metastore.Database;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.PrincipalPrivileges;
Expand All @@ -40,6 +41,7 @@
import io.trino.spi.statistics.Estimate;
import io.trino.spi.statistics.TableStatistics;
import io.trino.spi.type.TypeManager;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
Expand All @@ -55,6 +57,7 @@
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.PARTITION_KEY;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_FILESYSTEM_ERROR;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_TABLE;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.PATH_PROPERTY;
Expand All @@ -79,17 +82,20 @@ public class HiveMetastoreBackedDeltaLakeMetastore
private final TransactionLogAccess transactionLogAccess;
private final TypeManager typeManager;
private final CachingExtendedStatisticsAccess statisticsAccess;
private final HdfsEnvironment hdfsEnvironment;

public HiveMetastoreBackedDeltaLakeMetastore(
HiveMetastore delegate,
TransactionLogAccess transactionLogAccess,
TypeManager typeManager,
CachingExtendedStatisticsAccess statisticsAccess)
CachingExtendedStatisticsAccess statisticsAccess,
HdfsEnvironment hdfsEnvironment)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogSupport is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.statisticsAccess = requireNonNull(statisticsAccess, "statisticsAccess is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
}

@Override
Expand Down Expand Up @@ -161,12 +167,22 @@ public void createTable(ConnectorSession session, Table table, PrincipalPrivileg
}

@Override
public void dropTable(ConnectorSession session, String databaseName, String tableName)
public void dropTable(ConnectorSession session, String databaseName, String tableName, boolean externalTable)
{
String tableLocation = getTableLocation(new SchemaTableName(databaseName, tableName), session);
delegate.dropTable(databaseName, tableName, true);
statisticsAccess.invalidateCache(tableLocation);
transactionLogAccess.invalidateCaches(tableLocation);
if (!externalTable) {
try {
Path path = new Path(tableLocation);
FileSystem fileSystem = hdfsEnvironment.getFileSystem(new HdfsEnvironment.HdfsContext(session), path);
fileSystem.delete(path, true);
}
catch (IOException e) {
throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, format("Failed to delete directory %s of the table %s", tableLocation, tableName), e);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.MaterializedResult;
import io.trino.testing.MaterializedResultWithQueryId;
import io.trino.testing.MaterializedRow;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingConnectorBehavior;
import io.trino.tpch.TpchTable;
Expand Down Expand Up @@ -574,12 +575,8 @@ public void testCreateTableAsWithSchemaLocation()
assertUpdate(format("CREATE TABLE %s.%s AS SELECT name FROM nation", schemaName, tableName2), "SELECT count(*) FROM nation");
assertQuery(format("SELECT * FROM %s.%s", schemaName, tableName), "SELECT name FROM nation");
assertQuery(format("SELECT * FROM %s.%s", schemaName, tableName2), "SELECT name FROM nation");
assertQuery(
"SELECT DISTINCT regexp_replace(\"$path\", '(.*[/][^/]*)[/][^/]*$', '$1') FROM " + schemaName + "." + tableName,
format("VALUES '%s/%s'", schemaLocation, tableName));
assertQuery(
"SELECT DISTINCT regexp_replace(\"$path\", '(.*[/][^/]*)[/][^/]*$', '$1') FROM " + schemaName + "." + tableName2,
format("VALUES '%s/%s'", schemaLocation, tableName2));
validatePath(schemaLocation, schemaName, tableName);
validatePath(schemaLocation, schemaName, tableName2);
}

@Test
Expand All @@ -599,12 +596,17 @@ public void testCreateTableWithSchemaLocation()
assertUpdate(format("INSERT INTO %s.%s SELECT name FROM nation", schemaName, tableName2), "SELECT count(*) FROM nation");
assertQuery(format("SELECT * FROM %s.%s", schemaName, tableName), "SELECT name FROM nation");
assertQuery(format("SELECT * FROM %s.%s", schemaName, tableName2), "SELECT name FROM nation");
assertQuery(
"SELECT DISTINCT regexp_replace(\"$path\", '(.*[/][^/]*)[/][^/]*$', '$1') FROM " + schemaName + "." + tableName,
format("VALUES '%s/%s'", schemaLocation, tableName));
assertQuery(
"SELECT DISTINCT regexp_replace(\"$path\", '(.*[/][^/]*)[/][^/]*$', '$1') FROM " + schemaName + "." + tableName2,
format("VALUES '%s/%s'", schemaLocation, tableName2));
validatePath(schemaLocation, schemaName, tableName);
validatePath(schemaLocation, schemaName, tableName2);
}

private void validatePath(String schemaLocation, String schemaName, String tableName)
{
List<MaterializedRow> materializedRows = getQueryRunner()
.execute("SELECT DISTINCT regexp_replace(\"$path\", '(.*[/][^/]*)[/][^/]*$', '$1') FROM " + schemaName + "." + tableName)
.getMaterializedRows();
assertThat(materializedRows.size()).isEqualTo(1);
assertThat((String) materializedRows.get(0).getField(0)).matches(format("%s/%s.*", schemaLocation, tableName));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake;

import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.Table;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.MaterializedRow;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.TableType;
import org.testng.annotations.Test;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Optional;

import static io.trino.testing.sql.TestTable.randomTableSuffix;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

public abstract class BaseDeltaLakeTableWithCustomLocation
extends AbstractTestQueryFramework
{
protected static final String SCHEMA = "test_tables_with_custom_location" + randomTableSuffix();
protected static final String CATALOG_NAME = "delta_with_custom_location";
protected File metastoreDir;
protected HiveMetastore metastore;
protected HdfsEnvironment hdfsEnvironment;
protected HdfsEnvironment.HdfsContext hdfsContext;

@Test
public void testTableHasUuidSuffixInLocation()
{
String tableName = "table_with_uuid" + randomTableSuffix();
assertQuerySucceeds(format("CREATE TABLE %s AS SELECT 1 as val", tableName));
Optional<Table> table = metastore.getTable(SCHEMA, tableName);
assertTrue(table.isPresent(), "Table should exists");
String location = table.get().getStorage().getLocation();
assertThat(location).matches(format(".*%s-[0-9a-f]{32}", tableName));
}

@Test
public void testCreateAndDrop()
throws IOException
{
String tableName = "test_create_and_drop" + randomTableSuffix();
assertQuerySucceeds(format("CREATE TABLE %s AS SELECT 1 as val", tableName));
Table table = metastore.getTable(SCHEMA, tableName).orElseThrow();
assertThat(table.getTableType()).isEqualTo(TableType.MANAGED_TABLE.name());

org.apache.hadoop.fs.Path tableLocation = new org.apache.hadoop.fs.Path(table.getStorage().getLocation());
FileSystem fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, tableLocation);
assertTrue(fileSystem.exists(tableLocation), "The directory corresponding to the table storage location should exist");
List<MaterializedRow> materializedRows = computeActual("SELECT \"$path\" FROM " + tableName).getMaterializedRows();
assertEquals(materializedRows.size(), 1);
String filePath = (String) materializedRows.get(0).getField(0);
assertTrue(fileSystem.exists(new org.apache.hadoop.fs.Path(filePath)), "The data file should exist");
assertQuerySucceeds(format("DROP TABLE %s", tableName));
assertFalse(metastore.getTable(SCHEMA, tableName).isPresent(), "Table should be dropped");
assertFalse(fileSystem.exists(new Path(filePath)), "The data file should have been removed");
assertFalse(fileSystem.exists(tableLocation), "The directory corresponding to the dropped Delta Lake table should be removed");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ protected List<String> listCheckpointFiles(String transactionLogDirectory)

private List<String> listAllFilesRecursive(String directory)
{
String azurePath = bucketName + "/" + directory + "/";
String azurePath = bucketName + "/" + directory;
Duration timeout = Duration.ofMinutes(5);
List<String> allPaths = azureContainerClient.listBlobs(new ListBlobsOptions().setPrefix(azurePath), timeout).stream()
.map(BlobItem::getName)
Expand Down
Loading