Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push down DELETE for enforceable filters on Delta Lake #18332

Merged
merged 3 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ private int[] getWriterIndexes(Page page)
partitionName = Optional.of(partName);
}

String fileName = session.getQueryId() + "-" + randomUUID();
String fileName = session.getQueryId() + "_" + randomUUID();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is in sync with

Location targetLocation = sourceLocation.parentDirectory().appendPath(session.getQueryId() + "_" + randomUUID());

I'd benefit from a suggestion in which utility class could i extract the method for creating the file name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can always add a new one ;)

filePath = filePath.appendPath(fileName);

FileWriter fileWriter = createParquetFileWriter(filePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isProjectionPushdownEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isQueryPartitionFilterRequired;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isTableStatisticsEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSplitManager.partitionMatchesPredicate;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHANGE_DATA_FEED_ENABLED_PROPERTY;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHECKPOINT_INTERVAL_PROPERTY;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.COLUMN_MAPPING_MODE_PROPERTY;
Expand Down Expand Up @@ -315,7 +316,7 @@ public class DeltaLakeMetadata
public static final String INSERT_OPERATION = "WRITE";
public static final String MERGE_OPERATION = "MERGE";
public static final String UPDATE_OPERATION = "UPDATE"; // used by old Trino versions and Spark
public static final String DELETE_OPERATION = "DELETE"; // used by old Trino versions and Spark
public static final String DELETE_OPERATION = "DELETE"; // used Trino for whole table/partition deletes as well as Spark
public static final String OPTIMIZE_OPERATION = "OPTIMIZE";
public static final String SET_TBLPROPERTIES_OPERATION = "SET TBLPROPERTIES";
public static final String CHANGE_COLUMN_OPERATION = "CHANGE COLUMN";
Expand Down Expand Up @@ -3258,6 +3259,74 @@ public WriterScalingOptions getInsertWriterScalingOptions(ConnectorSession sessi
return WriterScalingOptions.ENABLED;
}

@Override
public Optional<ConnectorTableHandle> applyDelete(ConnectorSession session, ConnectorTableHandle handle)
{
DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) handle;
if (tableHandle.getMetadataEntry().isChangeDataFeedEnabled().orElse(false)) {
// For tables with CDF enabled the DELETE operation can't be performed only on metadata files
return Optional.empty();
}

return Optional.of(tableHandle);
}

@Override
public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle handle)
{
DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) handle;
findinpath marked this conversation as resolved.
Show resolved Hide resolved
if (isAppendOnly(tableHandle.getMetadataEntry())) {
throw new TrinoException(NOT_SUPPORTED, "Cannot modify rows from a table with '" + APPEND_ONLY_CONFIGURATION_KEY + "' set to true");
}

String tableLocation = tableHandle.location();
List<AddFileEntry> activeFiles = getAddFileEntriesMatchingEnforcedPartitionConstraint(session, tableHandle);

try {
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, tableLocation);

long writeTimestamp = Instant.now().toEpochMilli();
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
long currentVersion = getMandatoryCurrentVersion(fileSystem, tableLocation);
if (currentVersion != tableHandle.getReadVersion()) {
throw new TransactionConflictException(format("Conflicting concurrent writes found. Expected transaction log version: %s, actual version: %s", tableHandle.getReadVersion(), currentVersion));
}
long commitVersion = currentVersion + 1;
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, commitVersion, writeTimestamp, DELETE_OPERATION, tableHandle.getReadVersion()));

long deletedRecords = 0L;
boolean allDeletedFilesStatsPresent = true;
for (AddFileEntry addFileEntry : activeFiles) {
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), writeTimestamp, true));

Optional<Long> fileRecords = addFileEntry.getStats().flatMap(DeltaLakeFileStatistics::getNumRecords);
allDeletedFilesStatsPresent &= fileRecords.isPresent();
deletedRecords += fileRecords.orElse(0L);
}

transactionLogWriter.flush();
writeCheckpointIfNeeded(session, tableHandle.getSchemaTableName(), tableHandle.location(), tableHandle.getMetadataEntry().getCheckpointInterval(), commitVersion);
return allDeletedFilesStatsPresent ? OptionalLong.of(deletedRecords) : OptionalLong.empty();
}
catch (Exception e) {
throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Failed to write Delta Lake transaction log entry", e);
}
}

private List<AddFileEntry> getAddFileEntriesMatchingEnforcedPartitionConstraint(ConnectorSession session, DeltaLakeTableHandle tableHandle)
{
TableSnapshot tableSnapshot = getSnapshot(tableHandle.getSchemaTableName(), tableHandle.getLocation(), session);
List<AddFileEntry> validDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, session);
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint = tableHandle.getEnforcedPartitionConstraint();
if (enforcedPartitionConstraint.isAll()) {
return validDataFiles;
}
Map<DeltaLakeColumnHandle, Domain> enforcedDomains = enforcedPartitionConstraint.getDomains().orElseThrow();
return validDataFiles.stream()
.filter(addAction -> partitionMatchesPredicate(addAction.getCanonicalPartitionValues(), enforcedDomains))
.collect(toImmutableList());
}

private static Map<String, DeltaLakeColumnStatistics> toDeltaLakeColumnStatistics(Collection<ComputedStatistics> computedStatistics)
{
return computedStatistics.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import io.trino.execution.QueryInfo;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode;
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.sql.planner.plan.TableDeleteNode;
import io.trino.sql.planner.plan.TableFinishNode;
import io.trino.sql.planner.plan.TableWriterNode;
import io.trino.testing.BaseConnectorTest;
import io.trino.testing.DataProviders;
import io.trino.testing.DistributedQueryRunner;
Expand Down Expand Up @@ -60,6 +63,7 @@
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY;
import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.sql.planner.optimizations.PlanNodeSearcher.searchFrom;
import static io.trino.testing.DataProviders.toDataProvider;
import static io.trino.testing.MaterializedResult.resultBuilder;
import static io.trino.testing.QueryAssertions.copyTpchTables;
Expand All @@ -77,6 +81,7 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

public class TestDeltaLakeConnectorTest
extends BaseConnectorTest
Expand Down Expand Up @@ -1820,6 +1825,94 @@ public void testTableWithTrailingSlashLocation(boolean partitioned)
assertUpdate("DROP TABLE " + tableName);
}

@Test(dataProvider = "deleteFiltersForTable")
public void testDeleteWithFilter(String createTableSql, String deleteFilter, boolean pushDownDelete)
{
String table = "delete_with_filter_" + randomNameSuffix();
assertUpdate(format(createTableSql, table, bucketName, table));

assertUpdate(format("INSERT INTO %s (customer, purchases, address) VALUES ('Aaron', 5, 'Antioch'), ('Bill', 7, 'Antioch'), ('Mary', 10, 'Adelphi'), ('Aaron', 3, 'Dallas')", table), 4);

assertUpdate(
getSession(),
format("DELETE FROM %s WHERE %s", table, deleteFilter),
2,
plan -> {
if (pushDownDelete) {
boolean tableDelete = searchFrom(plan.getRoot()).where(node -> node instanceof TableDeleteNode).matches();
assertTrue(tableDelete, "A TableDeleteNode should be present");
}
else {
TableFinishNode finishNode = searchFrom(plan.getRoot())
.where(TableFinishNode.class::isInstance)
.findOnlyElement();
assertTrue(finishNode.getTarget() instanceof TableWriterNode.MergeTarget, "Delete operation should be performed through MERGE mechanism");
}
});
assertQuery("SELECT customer, purchases, address FROM " + table, "VALUES ('Mary', 10, 'Adelphi'), ('Aaron', 3, 'Dallas')");
assertUpdate("DROP TABLE " + table);
}

@DataProvider
public Object[][] deleteFiltersForTable()
findinpath marked this conversation as resolved.
Show resolved Hide resolved
{
return new Object[][]{
{
"CREATE TABLE %s (customer VARCHAR, purchases INT, address VARCHAR) WITH (location = 's3://%s/%s')",
"address = 'Antioch'",
false
},
{
// delete filter applied on function over non-partitioned field
"CREATE TABLE %s (customer VARCHAR, address VARCHAR, purchases INT) WITH (location = 's3://%s/%s', partitioned_by = ARRAY['address'])",
"starts_with(address, 'Antioch')",
false
},
{
// delete filter applied on partitioned field
"CREATE TABLE %s (customer VARCHAR, address VARCHAR, purchases INT) WITH (location = 's3://%s/%s', partitioned_by = ARRAY['address'])",
"address = 'Antioch'",
true
},
{
// delete filter applied on partitioned field and on synthesized field
"CREATE TABLE %s (customer VARCHAR, address VARCHAR, purchases INT) WITH (location = 's3://%s/%s', partitioned_by = ARRAY['address'])",
"address = 'Antioch' AND \"$file_size\" > 0",
false
},
{
// delete filter applied on function over partitioned field
"CREATE TABLE %s (customer VARCHAR, address VARCHAR, purchases INT) WITH (location = 's3://%s/%s', partitioned_by = ARRAY['address'])",
"starts_with(address, 'Antioch')",
false
},
{
// delete filter applied on non-partitioned field
"CREATE TABLE %s (customer VARCHAR, address VARCHAR, purchases INT) WITH (location = 's3://%s/%s', partitioned_by = ARRAY['customer'])",
"address = 'Antioch'",
false
},
{
// delete filter fully applied on composed partition
"CREATE TABLE %s (purchases INT, customer VARCHAR, address VARCHAR) WITH (location = 's3://%s/%s', partitioned_by = ARRAY['address', 'customer'])",
"address = 'Antioch' AND (customer = 'Aaron' OR customer = 'Bill')",
true
},
{
// delete filter applied only partly on first partitioned field
"CREATE TABLE %s (purchases INT, address VARCHAR, customer VARCHAR) WITH (location = 's3://%s/%s', partitioned_by = ARRAY['address', 'customer'])",
"address = 'Antioch'",
true
},
{
// delete filter applied only partly on second partitioned field
"CREATE TABLE %s (purchases INT, address VARCHAR, customer VARCHAR) WITH (location = 's3://%s/%s', partitioned_by = ARRAY['customer', 'address'])",
"address = 'Antioch'",
true
},
};
}

@Override
protected void verifyAddNotNullColumnToNonEmptyTableFailurePermissible(Throwable e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,14 @@ public void testDeleteAllDatabricks()
public void testDeleteAllOssDeltaLake()
{
String tableName = "test_delete_all_deltalake" + randomNameSuffix();
Set<String> originalFiles = testDeleteAllAndReturnInitialDataLakeFilesSet(
tableName,
"io/trino/plugin/deltalake/testing/resources/ossdeltalake");
hiveMinioDataLake.copyResources("io/trino/plugin/deltalake/testing/resources/ossdeltalake/customer", tableName);
Set<String> originalFiles = ImmutableSet.copyOf(hiveMinioDataLake.listFiles(tableName));
getQueryRunner().execute(format("CALL system.register_table('%s', '%s', '%s')", SCHEMA, tableName, getLocationForTable(tableName)));
assertQuery("SELECT * FROM " + tableName, "SELECT * FROM customer");
// There are `add` files in the transaction log without stats, reason why the DELETE statement on the whole table
// performed on the basis of metadata does not return the number of deleted records
assertUpdate("DELETE FROM " + tableName);
assertQuery("SELECT count(*) FROM " + tableName, "VALUES 0");
Set<String> expected = ImmutableSet.<String>builder()
.addAll(originalFiles)
.add(tableName + "/_delta_log/00000000000000000001.json")
Expand Down
Loading