Skip to content

Commit

Permalink
Add concurrent writes reconciliation for DELETE pushdown in Delta Lake
Browse files Browse the repository at this point in the history
Allow committing pushdown DELETE operations in
a concurrent context by placing these operations right after
any other previously concurrently completed write operations.

Disallow committing the operation in any of the following cases:

- table schema change has been committed in the meantime
- table protocol change has been committed in the meantime
- add files committed in the meantime should be read by
the current operation
- remove files committed in the meantime conflict with the
add files read by the current operation

The current changes also take into consideration the `delta.isolationLevel`
table property of the Delta Lake table for DELETE operations.

Relevant example taken from Databricks documentation in regards to the
distinction between `WriteSerializable` and `Serializable` isolation levels:

> For example, consider `txn1`, a long running delete and `txn2`,
> which inserts blindly data into the table.
> `txn2` and `txn1` complete and they are recorded in the order
> `txn2, txn1`
> into the history of the table.
> According to the history, the data inserted in `txn2` should not exist
> in the table. For `Serializable` level, a reader would never see data
> inserted by `txn2`. However, for the `WriteSerializable` level, a reader
> could at some point see the data inserted by `txn2`.

A few words about WriteSerializable isolation level taken from delta.io javadocs:

> This isolation level will ensure snapshot isolation consistency guarantee
> between write operations only.
> In other words, if only the write operations are considered, then
> there exists a serializable sequence between them that would produce the same
> result as seen in the table.
  • Loading branch information
findinpath committed Apr 29, 2024
1 parent 4cf2856 commit f7829ae
Show file tree
Hide file tree
Showing 4 changed files with 532 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@
import java.util.Optional;
import java.util.Set;

import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.canonicalizePartitionValues;
import static java.util.Objects.requireNonNull;

public class DeltaLakeCommitSummary
{
private final long version;
private final List<MetadataEntry> metadataUpdates;
private final Optional<ProtocolEntry> protocol;
private final boolean containingRemovedFiles;
private final boolean containsRemoveFileWithoutPartitionValues;
private final Set<Map<String, Optional<String>>> removedFilesCanonicalPartitionValues;
private final Set<Map<String, Optional<String>>> addedFilesCanonicalPartitionValues;
private final Optional<Boolean> isBlindAppend;

Expand All @@ -43,8 +45,9 @@ public DeltaLakeCommitSummary(long version, List<DeltaLakeTransactionLogEntry> t
Optional<ProtocolEntry> optionalProtocol = Optional.empty();
Optional<CommitInfoEntry> optionalCommitInfo = Optional.empty();
ImmutableSet.Builder<Map<String, Optional<String>>> addedFilesCanonicalPartitionValuesBuilder = ImmutableSet.builder();
ImmutableSet.Builder<Map<String, Optional<String>>> removedFilesCanonicalPartitionValuesBuilder = ImmutableSet.builder();
boolean containsRemoveFileWithoutPartitionValues = false;

boolean removedFilesFound = false;
for (DeltaLakeTransactionLogEntry transactionLogEntry : transactionLogEntries) {
if (transactionLogEntry.getMetaData() != null) {
metadataUpdatesBuilder.add(transactionLogEntry.getMetaData());
Expand All @@ -59,15 +62,22 @@ else if (transactionLogEntry.getAdd() != null) {
addedFilesCanonicalPartitionValuesBuilder.add(transactionLogEntry.getAdd().getCanonicalPartitionValues());
}
else if (transactionLogEntry.getRemove() != null) {
removedFilesFound = true;
Map<String, String> partitionValues = transactionLogEntry.getRemove().partitionValues();
if (partitionValues == null) {
containsRemoveFileWithoutPartitionValues = true;
}
else {
removedFilesCanonicalPartitionValuesBuilder.add(canonicalizePartitionValues(partitionValues));
}
}
}

this.version = version;
metadataUpdates = metadataUpdatesBuilder.build();
protocol = optionalProtocol;
addedFilesCanonicalPartitionValues = addedFilesCanonicalPartitionValuesBuilder.build();
containingRemovedFiles = removedFilesFound;
removedFilesCanonicalPartitionValues = removedFilesCanonicalPartitionValuesBuilder.build();
this.containsRemoveFileWithoutPartitionValues = containsRemoveFileWithoutPartitionValues;
isBlindAppend = optionalCommitInfo.flatMap(CommitInfoEntry::isBlindAppend);
}

Expand All @@ -86,9 +96,14 @@ public Optional<ProtocolEntry> getProtocol()
return protocol;
}

public boolean isContainingRemovedFiles()
public boolean isContainsRemoveFileWithoutPartitionValues()
{
return containingRemovedFiles;
return containsRemoveFileWithoutPartitionValues;
}

public Set<Map<String, Optional<String>>> getRemovedFilesCanonicalPartitionValues()
{
return removedFilesCanonicalPartitionValues;
}

public Set<Map<String, Optional<String>>> getAddedFilesCanonicalPartitionValues()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2077,9 +2077,10 @@ private void checkForConcurrentTransactionConflicts(
List<TupleDomain<DeltaLakeColumnHandle>> enforcedSourcePartitionConstraints = sameAsTargetSourceTableHandles.stream()
.map(DeltaLakeTableHandle::getEnforcedPartitionConstraint)
.collect(toImmutableList());
TupleDomain<DeltaLakeColumnHandle> enforcedSourcePartitionConstraintsUnion = TupleDomain.columnWiseUnion(enforcedSourcePartitionConstraints);

checkIfCommittedAddedFilesConflictWithCurrentOperation(TupleDomain.columnWiseUnion(enforcedSourcePartitionConstraints), commitSummary);
checkIfCommittedRemovedFilesConflictWithCurrentOperation(commitSummary);
checkIfCommittedAddedFilesConflictWithCurrentOperation(enforcedSourcePartitionConstraintsUnion, commitSummary);
checkIfCommittedRemovedFilesConflictWithCurrentOperation(enforcedSourcePartitionConstraintsUnion, commitSummary);
}
}
case SERIALIZABLE -> throw new TransactionFailedException("Conflicting concurrent writes with the current operation on Serializable isolation level");
Expand Down Expand Up @@ -2132,21 +2133,35 @@ private static void checkIfCommittedAddedFilesConflictWithCurrentOperation(Tuple
}
}

private static void checkIfCommittedRemovedFilesConflictWithCurrentOperation(DeltaLakeCommitSummary commitSummary)
private static void checkIfCommittedRemovedFilesConflictWithCurrentOperation(TupleDomain<DeltaLakeColumnHandle> enforcedSourcePartitionConstraints, DeltaLakeCommitSummary commitSummary)
{
if (commitSummary.getIsBlindAppend().orElse(false)) {
// Do not conflict with blind appends. Blind appends can be placed before or after the current operation
// when backtracking which serializable sequence of operations led to the current state of the table.
checkState(!commitSummary.isContainingRemovedFiles(), "Blind append transaction %s cannot contain removed files", commitSummary.getVersion());
checkState(commitSummary.getRemovedFilesCanonicalPartitionValues().isEmpty(), "Blind append transaction %s cannot contain removed files", commitSummary.getVersion());
checkState(!commitSummary.isContainsRemoveFileWithoutPartitionValues(), "Blind append transaction %s cannot contain removed files", commitSummary.getVersion());
return;
}
if (commitSummary.isContainsRemoveFileWithoutPartitionValues()) {
// Can't perform reconciliation between disjoint partitions when it is not clear which partitions are affected by the winning commit.
throw new TransactionFailedException("Conflicting concurrent writes found. Data files removed in the modified table by another concurrent write operation.");
}

if (!commitSummary.isContainingRemovedFiles()) {
if (commitSummary.getRemovedFilesCanonicalPartitionValues().isEmpty()) {
return;
}

// TODO Pass active files of the target table read in DeltaLakeSplitSource to figure out whether the removed files do actually conflict with the read table partitions
throw new TransactionFailedException("Conflicting concurrent writes found. Data files were removed from the modified table by another concurrent write operation.");
boolean readWholeTable = enforcedSourcePartitionConstraints.isAll();
if (readWholeTable) {
throw new TransactionFailedException("Conflicting concurrent writes found. Data files removed in the modified table by another concurrent write operation.");
}

Map<DeltaLakeColumnHandle, Domain> enforcedDomains = enforcedSourcePartitionConstraints.getDomains().orElseThrow();
boolean conflictingRemoveFilesFound = commitSummary.getRemovedFilesCanonicalPartitionValues().stream()
.anyMatch(canonicalPartitionValues -> partitionMatchesPredicate(canonicalPartitionValues, enforcedDomains));
if (conflictingRemoveFilesFound) {
throw new TransactionFailedException("Conflicting concurrent writes found. Data files were removed from the modified table by another concurrent write operation.");
}
}

private void writeTransactionLogForInsertOperation(
Expand Down Expand Up @@ -3813,7 +3828,11 @@ private OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandl
checkWriteSupported(tableHandle);

try {
CommitDeleteOperationResult commitDeleteOperationResult = commitDeleteOperation(session, tableHandle, operation);
IsolationLevel isolationLevel = getIsolationLevel(tableHandle.getMetadataEntry());
AtomicReference<Long> readVersion = new AtomicReference<>(tableHandle.getReadVersion());
CommitDeleteOperationResult commitDeleteOperationResult = Failsafe.with(TRANSACTION_CONFLICT_RETRY_POLICY)
.get(context -> commitDeleteOperation(session, tableHandle, operation, isolationLevel, readVersion, context.getAttemptCount()));

writeCheckpointIfNeeded(
session,
tableHandle.getSchemaTableName(),
Expand All @@ -3831,18 +3850,20 @@ private OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandl
private CommitDeleteOperationResult commitDeleteOperation(
ConnectorSession session,
DeltaLakeTableHandle tableHandle,
String operation)
String operation,
IsolationLevel isolationLevel,
AtomicReference<Long> readVersion,
int attemptCount)
throws IOException
{
String tableLocation = tableHandle.location();

TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, tableLocation);

long writeTimestamp = Instant.now().toEpochMilli();
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
long currentVersion = getMandatoryCurrentVersion(fileSystem, tableLocation, tableHandle.getReadVersion());
if (currentVersion != tableHandle.getReadVersion()) {
throw new TransactionConflictException(format("Conflicting concurrent writes found. Expected transaction log version: %s, actual version: %s", tableHandle.getReadVersion(), currentVersion));
}
long currentVersion = getMandatoryCurrentVersion(fileSystem, tableLocation, readVersion.get());
checkForConcurrentTransactionConflicts(session, fileSystem, ImmutableList.of(tableHandle), isolationLevel, currentVersion, readVersion, tableHandle.getLocation(), attemptCount);
long commitVersion = currentVersion + 1;
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, IsolationLevel.WRITESERIALIZABLE, commitVersion, writeTimestamp, operation, tableHandle.getReadVersion(), false));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2608,6 +2608,55 @@ public void testConcurrentInsertsReconciliationForMixedInserts()
}
}

@RepeatedTest(3)
public void testConcurrentDeletePushdownReconciliation()
throws Exception
{
int threads = 3;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
String tableName = "test_concurrent_inserts_table_" + randomNameSuffix();

assertUpdate("CREATE TABLE " + tableName + " (a, part) WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, 30), (31, 40)", 4);

try {
// delete data concurrently by using non-overlapping partition predicate
executor.invokeAll(ImmutableList.<Callable<Void>>builder()
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute("DELETE FROM " + tableName + " WHERE part = 10");
return null;
})
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute("DELETE FROM " + tableName + " WHERE part = 20");
return null;
})
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute("DELETE FROM " + tableName + " WHERE part = 30");
return null;
})
.build())
.forEach(MoreFutures::getDone);

assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (31, 40)");
assertQuery("SELECT version, operation, isolation_level FROM \"" + tableName + "$history\"",
"""
VALUES
(0, 'CREATE TABLE AS SELECT', 'WriteSerializable'),
(1, 'DELETE', 'WriteSerializable'),
(2, 'DELETE', 'WriteSerializable'),
(3, 'DELETE', 'WriteSerializable')
""");
}
finally {
assertUpdate("DROP TABLE " + tableName);
executor.shutdownNow();
assertTrue(executor.awaitTermination(10, SECONDS));
}
}

protected List<String> listCheckpointFiles(String transactionLogDirectory)
{
return listFiles(transactionLogDirectory).stream()
Expand Down
Loading

0 comments on commit f7829ae

Please sign in to comment.