diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeCommitSummary.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeCommitSummary.java index 91415187e3e9..2734caa92139 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeCommitSummary.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeCommitSummary.java @@ -14,29 +14,37 @@ package io.trino.plugin.deltalake; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; import static java.util.Objects.requireNonNull; public class DeltaLakeCommitSummary { + private final long version; private final List metadataUpdates; private final Optional protocol; + private final boolean containingRemovedFiles; + private final Set>> addedFilesCanonicalPartitionValues; private final Optional isBlindAppend; - public DeltaLakeCommitSummary(List transactionLogEntries) + public DeltaLakeCommitSummary(long version, List transactionLogEntries) { requireNonNull(transactionLogEntries, "transactionLogEntries is null"); ImmutableList.Builder metadataUpdatesBuilder = ImmutableList.builder(); Optional optionalProtocol = Optional.empty(); Optional optionalCommitInfo = Optional.empty(); + ImmutableSet.Builder>> addedFilesCanonicalPartitionValuesBuilder = ImmutableSet.builder(); + boolean removedFilesFound = false; for (DeltaLakeTransactionLogEntry transactionLogEntry : transactionLogEntries) { if (transactionLogEntry.getMetaData() != null) { metadataUpdatesBuilder.add(transactionLogEntry.getMetaData()); @@ -47,13 +55,27 @@ else if (transactionLogEntry.getProtocol() != null) { else if (transactionLogEntry.getCommitInfo() != null) { optionalCommitInfo = Optional.of(transactionLogEntry.getCommitInfo()); } + else if (transactionLogEntry.getAdd() != null) { + addedFilesCanonicalPartitionValuesBuilder.add(transactionLogEntry.getAdd().getCanonicalPartitionValues()); + } + else if (transactionLogEntry.getRemove() != null) { + removedFilesFound = true; + } } + this.version = version; metadataUpdates = metadataUpdatesBuilder.build(); protocol = optionalProtocol; + addedFilesCanonicalPartitionValues = addedFilesCanonicalPartitionValuesBuilder.build(); + containingRemovedFiles = removedFilesFound; isBlindAppend = optionalCommitInfo.flatMap(CommitInfoEntry::isBlindAppend); } + public long getVersion() + { + return version; + } + public List getMetadataUpdates() { return metadataUpdates; @@ -64,6 +86,16 @@ public Optional getProtocol() return protocol; } + public boolean isContainingRemovedFiles() + { + return containingRemovedFiles; + } + + public Set>> getAddedFilesCanonicalPartitionValues() + { + return addedFilesCanonicalPartitionValues; + } + public Optional getIsBlindAppend() { return isBlindAppend; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 0f4583a6a3db..079a88c33beb 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -234,7 +234,6 @@ import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode.NAME; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode.NONE; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.IsolationLevel; -import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.IsolationLevel.SERIALIZABLE; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.MAX_COLUMN_ID_CONFIGURATION_KEY; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.TIMESTAMP_NTZ_FEATURE_NAME; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.changeDataFeedEnabled; @@ -1840,14 +1839,13 @@ private long commitInsertOperation( TrinoFileSystem fileSystem = fileSystemFactory.create(session); long currentVersion = getMandatoryCurrentVersion(fileSystem, handle.getLocation(), handle.getReadVersion()); - boolean isBlindAppend = sourceTableHandles.stream() + List sameAsTargetSourceTableHandles = sourceTableHandles.stream() .filter(sourceTableHandle -> sourceTableHandle instanceof DeltaLakeTableHandle) .map(DeltaLakeTableHandle.class::cast) .filter(tableHandle -> handle.getTableName().equals(tableHandle.getSchemaTableName()) // disregard time travel table handles && tableHandle.getReadVersion() >= handle.getReadVersion()) - .findAny() - .isEmpty(); + .collect(toImmutableList()); long readVersionValue = readVersion.get(); if (currentVersion > readVersionValue) { String transactionLogDirectory = getTransactionLogDir(handle.getLocation()); @@ -1861,37 +1859,43 @@ private long commitInsertOperation( catch (IOException e) { throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, "Failed to access table metadata", e); } - checkForInsertTransactionConflicts(session.getQueryId(), isBlindAppend, isolationLevel, new DeltaLakeCommitSummary(transactionLogEntries), version, attemptCount); + checkForInsertTransactionConflicts(session.getQueryId(), sameAsTargetSourceTableHandles, isolationLevel, new DeltaLakeCommitSummary(version, transactionLogEntries), attemptCount); } // Avoid re-reading already processed transaction log entries in case of retries readVersion.set(currentVersion); } long commitVersion = currentVersion + 1; - writeTransactionLogForInsertOperation(session, handle, isBlindAppend, isolationLevel, dataFileInfos, commitVersion, currentVersion); + writeTransactionLogForInsertOperation(session, handle, sameAsTargetSourceTableHandles.isEmpty(), isolationLevel, dataFileInfos, commitVersion, currentVersion); return commitVersion; } private void checkForInsertTransactionConflicts( String queryId, - boolean isBlindAppend, + List sameAsTargetSourceTableHandles, IsolationLevel isolationLevel, DeltaLakeCommitSummary commitSummary, - long version, int attemptCount) { checkNoMetadataUpdates(commitSummary); checkNoProtocolUpdates(commitSummary); - if (!isBlindAppend) { - throw new TransactionFailedException("Conflicting concurrent writes with the current non blind append INSERT operation"); - } + switch (isolationLevel) { + case WRITESERIALIZABLE -> { + if (!sameAsTargetSourceTableHandles.isEmpty()) { + // INSERT operations that contain sub-queries reading the same table support the same concurrency as MERGE. + List> enforcedSourcePartitionConstraints = sameAsTargetSourceTableHandles.stream() + .map(DeltaLakeTableHandle::getEnforcedPartitionConstraint) + .collect(toImmutableList()); - if (isolationLevel == SERIALIZABLE) { - throw new TransactionFailedException("Conflicting concurrent writes with the current blind append INSERT operation on Serializable isolation level"); + checkIfCommittedAddedFilesConflictWithCurrentOperation(TupleDomain.columnWiseUnion(enforcedSourcePartitionConstraints), commitSummary); + checkIfCommittedRemovedFilesConflictWithCurrentOperation(commitSummary); + } + } + case SERIALIZABLE -> throw new TransactionFailedException("Conflicting concurrent writes with the current INSERT operation on Serializable isolation level"); } - LOG.debug("Completed checking for conflicts in the query %s for target table version: %s Attempt: %s ", queryId, version, attemptCount); + LOG.debug("Completed checking for conflicts in the query %s for target table version: %s Attempt: %s ", queryId, commitSummary.getVersion(), attemptCount); } private static void checkNoProtocolUpdates(DeltaLakeCommitSummary commitSummary) @@ -1908,6 +1912,48 @@ private static void checkNoMetadataUpdates(DeltaLakeCommitSummary commitSummary) } } + private static void checkIfCommittedAddedFilesConflictWithCurrentOperation(TupleDomain enforcedSourcePartitionConstraints, DeltaLakeCommitSummary commitSummary) + { + Set>> addedFilesCanonicalPartitionValues = commitSummary.getIsBlindAppend().orElse(false) + // Do not conflict with blind appends. Blind appends can be placed before or after the current operation + // when backtracking which serializable sequence of operations led to the current state of the table. + ? Set.of() + : commitSummary.getAddedFilesCanonicalPartitionValues(); + + if (addedFilesCanonicalPartitionValues.isEmpty()) { + return; + } + + boolean readWholeTable = enforcedSourcePartitionConstraints.isAll(); + if (readWholeTable) { + throw new TransactionFailedException("Conflicting concurrent writes found. Data files added in the modified table by concurrent write operation."); + } + + Map enforcedDomains = enforcedSourcePartitionConstraints.getDomains().orElseThrow(); + boolean conflictingAddFilesFound = addedFilesCanonicalPartitionValues.stream() + .anyMatch(canonicalPartitionValues -> partitionMatchesPredicate(canonicalPartitionValues, enforcedDomains)); + if (conflictingAddFilesFound) { + throw new TransactionFailedException("Conflicting concurrent writes found. Data files were added in the modified table by another concurrent write operation."); + } + } + + private static void checkIfCommittedRemovedFilesConflictWithCurrentOperation(DeltaLakeCommitSummary commitSummary) + { + if (commitSummary.getIsBlindAppend().orElse(false)) { + // Do not conflict with blind appends. Blind appends can be placed before or after the current operation + // when backtracking which serializable sequence of operations led to the current state of the table. + checkState(!commitSummary.isContainingRemovedFiles(), "Blind append transaction %s cannot contain removed files", commitSummary.getVersion()); + return; + } + + if (!commitSummary.isContainingRemovedFiles()) { + return; + } + + // TODO Pass active files of the target table read in DeltaLakeSplitSource to figure out whether the removed files do actually conflict with the read table partitions + throw new TransactionFailedException("Conflicting concurrent writes found. Data files were removed from the modified table by another concurrent write operation."); + } + private void writeTransactionLogForInsertOperation( ConnectorSession session, DeltaLakeInsertTableHandle insertTableHandle, @@ -2037,7 +2083,7 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle merg } long commitVersion = currentVersion + 1; - transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, IsolationLevel.WRITESERIALIZABLE, commitVersion, createdTime, MERGE_OPERATION, handle.getReadVersion(), true)); + transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, IsolationLevel.WRITESERIALIZABLE, commitVersion, createdTime, MERGE_OPERATION, handle.getReadVersion(), false)); // TODO: Delta writes another field "operationMetrics" (https://github.com/trinodb/trino/issues/12005) long writeTimestamp = Instant.now().toEpochMilli(); @@ -2256,7 +2302,7 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl long createdTime = Instant.now().toEpochMilli(); long commitVersion = readVersion + 1; - transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, IsolationLevel.WRITESERIALIZABLE, commitVersion, createdTime, OPTIMIZE_OPERATION, readVersion, true)); + transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, IsolationLevel.WRITESERIALIZABLE, commitVersion, createdTime, OPTIMIZE_OPERATION, readVersion, false)); // TODO: Delta writes another field "operationMetrics" that I haven't // seen before. It contains delete/update metrics. Investigate/include it. @@ -3573,7 +3619,7 @@ private OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandl throw new TransactionConflictException(format("Conflicting concurrent writes found. Expected transaction log version: %s, actual version: %s", tableHandle.getReadVersion(), currentVersion)); } long commitVersion = currentVersion + 1; - transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, IsolationLevel.WRITESERIALIZABLE, commitVersion, writeTimestamp, operation, tableHandle.getReadVersion(), true)); + transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, IsolationLevel.WRITESERIALIZABLE, commitVersion, writeTimestamp, operation, tableHandle.getReadVersion(), false)); long deletedRecords = 0L; boolean allDeletedFilesStatsPresent = true; diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java index 756eaa02d9f1..9044ebf527cf 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java @@ -51,15 +51,19 @@ import java.util.concurrent.Callable; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.function.BiConsumer; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.IntStream; +import java.util.stream.LongStream; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.MoreCollectors.onlyElement; import static com.google.common.collect.Sets.union; +import static io.airlift.concurrent.MoreFutures.tryGetFutureValue; import static io.trino.SystemSessionProperties.ENABLE_DYNAMIC_FILTERING; import static io.trino.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; import static io.trino.plugin.base.util.Closables.closeAllSuppress; @@ -70,6 +74,7 @@ import static io.trino.plugin.deltalake.TestingDeltaLakeUtils.getTableActiveFiles; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY; import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder; +import static io.trino.testing.QueryAssertions.getTrinoExceptionCause; import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.DELETE_TABLE; import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.DROP_TABLE; import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.INSERT_TABLE; @@ -86,6 +91,7 @@ import static java.util.concurrent.Executors.newFixedThreadPool; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.joining; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -2337,6 +2343,137 @@ private void testConcurrentInsertsReconciliationForBlindInserts(boolean partitio } } + @RepeatedTest(3) + public void testConcurrentInsertsSelectingFromTheSameTable() + throws Exception + { + testConcurrentInsertsSelectingFromTheSameTable(true); + testConcurrentInsertsSelectingFromTheSameTable(false); + } + + private void testConcurrentInsertsSelectingFromTheSameTable(boolean partitioned) + throws Exception + { + int threads = 3; + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + String tableName = "test_concurrent_inserts_select_from_same_table_" + randomNameSuffix(); + + assertUpdate( + "CREATE TABLE " + tableName + " (a, part) " + (partitioned ? " WITH (partitioned_by = ARRAY['part'])" : "") + " AS VALUES (0, 10)", + 1); + + try { + // Considering T1, T2, T3 being the order of completion of the concurrent INSERT operations, + // if all the operations would eventually succeed, the entries inserted per thread would look like this: + // T1: (1, 10) + // T2: (2, 10) + // T3: (3, 10) + List> futures = IntStream.range(0, threads) + .mapToObj(threadNumber -> executor.submit(() -> { + barrier.await(10, SECONDS); + try { + getQueryRunner().execute("INSERT INTO " + tableName + " SELECT COUNT(*), 10 AS part FROM " + tableName); + return true; + } + catch (Exception e) { + RuntimeException trinoException = getTrinoExceptionCause(e); + try { + assertThat(trinoException).hasMessage("Failed to write Delta Lake transaction log entry"); + } + catch (Throwable verifyFailure) { + if (verifyFailure != e) { + verifyFailure.addSuppressed(e); + } + throw verifyFailure; + } + return false; + } + })) + .collect(toImmutableList()); + + long successfulInsertsCount = futures.stream() + .map(future -> tryGetFutureValue(future, 20, SECONDS).orElseThrow(() -> new RuntimeException("Wait timed out"))) + .filter(success -> success) + .count(); + + assertThat(successfulInsertsCount).isGreaterThanOrEqualTo(1); + assertQuery( + "SELECT * FROM " + tableName, + "VALUES (0, 10)" + + LongStream.rangeClosed(1, successfulInsertsCount) + .boxed() + .map("(%d, 10)"::formatted) + .collect(joining(", ", ", ", ""))); + assertQuery( + "SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", + "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0)" + + LongStream.rangeClosed(1, successfulInsertsCount) + .boxed() + .map(version -> "(%s, 'WRITE', 'WriteSerializable', %s)".formatted(version, version - 1)) + .collect(joining(", ", ", ", ""))); + } + finally { + assertUpdate("DROP TABLE " + tableName); + executor.shutdownNow(); + assertTrue(executor.awaitTermination(10, SECONDS)); + } + } + + @RepeatedTest(3) + public void testConcurrentInsertsReconciliationForMixedInserts() + throws Exception + { + int threads = 3; + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + String tableName = "test_concurrent_mixed_inserts_table_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (a, part) WITH (partitioned_by = ARRAY['part']) AS VALUES (0, 10), (11, 20)", 2); + + try { + // insert data concurrently + executor.invokeAll(ImmutableList.>builder() + .add(() -> { + // Read from the partition `10` of the same table to avoid reconciliation failures + barrier.await(10, SECONDS); + getQueryRunner().execute("INSERT INTO " + tableName + " SELECT COUNT(*) AS a, 10 AS part FROM " + tableName + " WHERE part = 10"); + return null; + }) + .add(() -> { + // Read from the partition `20` of the same table to avoid reconciliation failures + barrier.await(10, SECONDS); + getQueryRunner().execute("INSERT INTO " + tableName + " SELECT COUNT(*) AS a, 20 AS part FROM " + tableName + " WHERE part = 20"); + return null; + }) + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (22, 30)"); + return null; + }) + .build()) + .forEach(MoreFutures::getDone); + + assertQuery( + "SELECT * FROM " + tableName, + "VALUES (0, 10), (1, 10), (11, 20), (1, 20), (22, 30)"); + assertQuery( + "SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", + """ + VALUES + (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0), + (1, 'WRITE', 'WriteSerializable', 0), + (2, 'WRITE', 'WriteSerializable', 1), + (3, 'WRITE', 'WriteSerializable', 2) + """); + } + finally { + assertUpdate("DROP TABLE " + tableName); + executor.shutdownNow(); + assertTrue(executor.awaitTermination(10, SECONDS)); + } + } + protected List listCheckpointFiles(String transactionLogDirectory) { return listFiles(transactionLogDirectory).stream() diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java index f0c457601f28..5be0a4cbc0af 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java @@ -44,6 +44,7 @@ import java.util.concurrent.Future; import java.util.regex.Pattern; import java.util.stream.IntStream; +import java.util.stream.LongStream; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.io.MoreFiles.deleteRecursively; @@ -60,6 +61,7 @@ import static java.util.concurrent.Executors.newFixedThreadPool; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.regex.Matcher.quoteReplacement; +import static java.util.stream.Collectors.joining; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; @@ -167,6 +169,326 @@ private void testConcurrentInsertsReconciliationForBlindInserts(boolean partitio } } + // Copied from BaseDeltaLakeConnectorSmokeTest + @Test + public void testConcurrentInsertsSelectingFromTheSameTable() + throws Exception + { + testConcurrentInsertsSelectingFromTheSameTable(true); + testConcurrentInsertsSelectingFromTheSameTable(false); + } + + private void testConcurrentInsertsSelectingFromTheSameTable(boolean partitioned) + throws Exception + { + int threads = 3; + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + String tableName = "test_concurrent_inserts_select_from_same_table_" + randomNameSuffix(); + + assertUpdate( + "CREATE TABLE " + tableName + " (a, part) " + (partitioned ? " WITH (partitioned_by = ARRAY['part'])" : "") + " AS VALUES (0, 10)", + 1); + + try { + // Considering T1, T2, T3 being the order of completion of the concurrent INSERT operations, + // if all the operations would eventually succeed, the entries inserted per thread would look like this: + // T1: (1, 10) + // T2: (2, 10) + // T3: (3, 10) + List> futures = IntStream.range(0, threads) + .mapToObj(threadNumber -> executor.submit(() -> { + barrier.await(10, SECONDS); + try { + getQueryRunner().execute("INSERT INTO " + tableName + " SELECT COUNT(*), 10 AS part FROM " + tableName); + return true; + } + catch (Exception e) { + RuntimeException trinoException = getTrinoExceptionCause(e); + try { + assertThat(trinoException).hasMessage("Failed to write Delta Lake transaction log entry"); + } + catch (Throwable verifyFailure) { + if (verifyFailure != e) { + verifyFailure.addSuppressed(e); + } + throw verifyFailure; + } + return false; + } + })) + .collect(toImmutableList()); + + long successfulInsertsCount = futures.stream() + .map(future -> tryGetFutureValue(future, 20, SECONDS).orElseThrow(() -> new RuntimeException("Wait timed out"))) + .filter(success -> success) + .count(); + + assertThat(successfulInsertsCount).isGreaterThanOrEqualTo(1); + assertQuery( + "SELECT * FROM " + tableName, + "VALUES (0, 10)" + + LongStream.rangeClosed(1, successfulInsertsCount) + .boxed() + .map("(%d, 10)"::formatted) + .collect(joining(", ", ", ", ""))); + assertQuery( + "SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", + "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0)" + + LongStream.rangeClosed(1, successfulInsertsCount) + .boxed() + .map(version -> "(%s, 'WRITE', 'WriteSerializable', %s)".formatted(version, version - 1)) + .collect(joining(", ", ", ", ""))); + } + finally { + assertUpdate("DROP TABLE " + tableName); + executor.shutdownNow(); + assertTrue(executor.awaitTermination(10, SECONDS)); + } + } + + @Test + public void testConcurrentInsertsSelectingFromTheSamePartition() + throws Exception + { + int threads = 3; + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + String tableName = "test_concurrent_inserts_select_from_same_partition_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (a, part) WITH (partitioned_by = ARRAY['part']) AS VALUES (0, 10), (11, 20), (22, 30)", 3); + + try { + // Considering T1, T2, T3 being the order of completion of the concurrent INSERT operations, + // if all the operations would eventually succeed, the entries inserted per thread would look like this: + // T1: (1, 10) + // T2: (2, 10) + // T3: (3, 10) + // The state of the table after the successful INSERT operations would be: + // (0,10), (1, 10), (2, 10), (3, 10), (11, 20), (22, 30) + List> futures = IntStream.range(0, threads) + .mapToObj(threadNumber -> executor.submit(() -> { + barrier.await(10, SECONDS); + try { + getQueryRunner().execute("INSERT INTO " + tableName + " SELECT COUNT(*) as a, 10 as part FROM " + tableName + " WHERE part = 10"); + return true; + } + catch (Exception e) { + RuntimeException trinoException = getTrinoExceptionCause(e); + try { + assertThat(trinoException).hasMessage("Failed to write Delta Lake transaction log entry"); + } + catch (Throwable verifyFailure) { + if (verifyFailure != e) { + verifyFailure.addSuppressed(e); + } + throw verifyFailure; + } + return false; + } + })) + .collect(toImmutableList()); + + long successfulInsertsCount = futures.stream() + .map(future -> tryGetFutureValue(future, 20, SECONDS).orElseThrow(() -> new RuntimeException("Wait timed out"))) + .filter(success -> success) + .count(); + + assertThat(successfulInsertsCount).isGreaterThanOrEqualTo(1); + assertQuery( + "SELECT * FROM " + tableName, + "VALUES (0, 10), (11, 20), (22, 30)" + + LongStream.rangeClosed(1, successfulInsertsCount) + .boxed() + .map("(%d, 10)"::formatted) + .collect(joining(", ", ", ", ""))); + assertQuery( + "SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", + "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0)" + + LongStream.rangeClosed(1, successfulInsertsCount) + .boxed() + .map(version -> "(%s, 'WRITE', 'WriteSerializable', %s)".formatted(version, version - 1)) + .collect(joining(", ", ", ", ""))); + } + finally { + assertUpdate("DROP TABLE " + tableName); + executor.shutdownNow(); + assertTrue(executor.awaitTermination(10, SECONDS)); + } + } + + // Copied from BaseDeltaLakeConnectorSmokeTest + @Test + public void testConcurrentInsertsReconciliationForMixedInserts() + throws Exception + { + int threads = 3; + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + String tableName = "test_concurrent_mixed_inserts_table_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (a, part) WITH (partitioned_by = ARRAY['part']) AS VALUES (0, 10), (11, 20)", 2); + + try { + // insert data concurrently + executor.invokeAll(ImmutableList.>builder() + .add(() -> { + // Read from the partition `10` of the same table to avoid reconciliation failures + barrier.await(10, SECONDS); + getQueryRunner().execute("INSERT INTO " + tableName + " SELECT COUNT(*) AS a, 10 AS part FROM " + tableName + " WHERE part = 10"); + return null; + }) + .add(() -> { + // Read from the partition `20` of the same table to avoid reconciliation failures + barrier.await(10, SECONDS); + getQueryRunner().execute("INSERT INTO " + tableName + " SELECT COUNT(*) AS a, 20 AS part FROM " + tableName + " WHERE part = 20"); + return null; + }) + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (22, 30)"); + return null; + }) + .build()) + .forEach(MoreFutures::getDone); + + assertQuery( + "SELECT * FROM " + tableName, + "VALUES (0, 10), (1, 10), (11, 20), (1, 20), (22, 30)"); + assertQuery( + "SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", + """ + VALUES + (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0), + (1, 'WRITE', 'WriteSerializable', 0), + (2, 'WRITE', 'WriteSerializable', 1), + (3, 'WRITE', 'WriteSerializable', 2) + """); + } + finally { + assertUpdate("DROP TABLE " + tableName); + executor.shutdownNow(); + assertTrue(executor.awaitTermination(10, SECONDS)); + } + } + + @Test + public void testConcurrentInsertsSelectingFromDifferentPartitionsOfSameTable() + throws Exception + { + int threads = 3; + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + String tableName = "test_concurrent_mixed_inserts_table_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (a, part) WITH (partitioned_by = ARRAY['part']) AS VALUES (0, 10), (11, 20), (22, 30)", 3); + + try { + // perform non-overlapping non-blind insert operations concurrently which read from different source partitions + // and write within the same target partition of the table. + executor.invokeAll(ImmutableList.>builder() + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute("INSERT INTO " + tableName + " SELECT a + 1, 40 as part FROM " + tableName + " WHERE part = 10"); + return null; + }) + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute("INSERT INTO " + tableName + " SELECT a + 2, 40 as part FROM " + tableName + " WHERE part = 20"); + return null; + }) + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute("INSERT INTO " + tableName + " SELECT a + 3, 40 as part FROM " + tableName + " WHERE part = 30"); + return null; + }) + .build()) + .forEach(MoreFutures::getDone); + + assertQuery( + "SELECT * FROM " + tableName, + "VALUES (0, 10), (11, 20), (22, 30), (1, 40), (13, 40), (25, 40)"); + assertQuery( + "SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", + """ + VALUES + (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0), + (1, 'WRITE', 'WriteSerializable', 0), + (2, 'WRITE', 'WriteSerializable', 1), + (3, 'WRITE', 'WriteSerializable', 2) + """); + } + finally { + assertUpdate("DROP TABLE " + tableName); + executor.shutdownNow(); + assertTrue(executor.awaitTermination(10, SECONDS)); + } + } + + @Test + public void testConcurrentInsertsSelectingFromMultipleNonoverlappingPartitionsOfSameTable() + throws Exception + { + int threads = 3; + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + String tableName = "test_concurrent_mixed_inserts_table_" + randomNameSuffix(); + + // Create table with multiple partitions and multiple files per partition + assertUpdate("CREATE TABLE " + tableName + " (a, part) WITH (partitioned_by = ARRAY['part']) AS VALUES (0, 10), (11, 20), (22, 30), (33, 40), (44, 50), (55, 60)", 6); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 10), (13, 20), (24, 30), (35, 40), (46, 50), (57, 60)", 6); + + try { + // perform non-overlapping non-blind insert operations concurrently which read from different source partitions + // and write within the same target partition of the table. + executor.invokeAll(ImmutableList.>builder() + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute("INSERT INTO " + tableName + " SELECT a + 1, part FROM " + tableName + " WHERE part IN (10, 20)"); + return null; + }) + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute("INSERT INTO " + tableName + " SELECT a + 1, part FROM " + tableName + " WHERE part IN (30, 40)"); + return null; + }) + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute("INSERT INTO " + tableName + " SELECT a + 1, part FROM " + tableName + " WHERE part IN (50, 60)"); + return null; + }) + .build()) + .forEach(MoreFutures::getDone); + + assertQuery( + "SELECT * FROM " + tableName, + """ + VALUES + (0, 10), (1, 10), (2, 10), (3, 10), + (11, 20), (12, 20), (13, 20), (14, 20), + (22, 30), (23, 30),(24, 30), (25, 30), + (33, 40), (34, 40), (35, 40), (36, 40), + (44, 50), (45, 50), (46, 50), (47, 50), + (55, 60), (56,60), (57, 60), (58,60) + """); + assertQuery( + "SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", + """ + VALUES + (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0), + (1, 'WRITE', 'WriteSerializable', 0), + (2, 'WRITE', 'WriteSerializable', 1), + (3, 'WRITE', 'WriteSerializable', 2), + (4, 'WRITE', 'WriteSerializable', 3) + """); + } + finally { + assertUpdate("DROP TABLE " + tableName); + executor.shutdownNow(); + assertTrue(executor.awaitTermination(10, SECONDS)); + } + } + @Test public void testConcurrentSerializableBlindInsertsReconciliationFailure() throws Exception diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMinioAndHmsConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMinioAndHmsConnectorSmokeTest.java index 986aa8fecb50..593d8a2ea1db 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMinioAndHmsConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMinioAndHmsConnectorSmokeTest.java @@ -253,6 +253,22 @@ public void testConcurrentInsertsReconciliationForBlindInserts() // testConcurrentInsertsReconciliation requires safe writes capability to avoid test flakiness } + @Override + @Test + @Disabled + public void testConcurrentInsertsSelectingFromTheSameTable() + { + // testConcurrentInsertsSelectingFromTheSameTable requires safe writes capability to avoid test flakiness + } + + @Override + @Test + @Disabled + public void testConcurrentInsertsReconciliationForMixedInserts() + { + // testConcurrentInsertsReconciliationForMixedInserts requires safe writes capability to avoid test flakiness + } + private String lockTable(String tableName, java.time.Duration lockDuration) throws Exception { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java index 4906f981feae..f3049283c643 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java @@ -77,9 +77,9 @@ public void testHistoryTable() // TODO (https://github.com/trinodb/trino/issues/15763) Use correct operation name for DML statements .matches(""" VALUES - (BIGINT '5', VARCHAR 'OPTIMIZE', BIGINT '4', VARCHAR 'WriteSerializable', true), - (BIGINT '4', VARCHAR 'DELETE', BIGINT '3', VARCHAR 'WriteSerializable', true), - (BIGINT '3', VARCHAR 'MERGE', BIGINT '2', VARCHAR 'WriteSerializable', true), + (BIGINT '5', VARCHAR 'OPTIMIZE', BIGINT '4', VARCHAR 'WriteSerializable', false), + (BIGINT '4', VARCHAR 'DELETE', BIGINT '3', VARCHAR 'WriteSerializable', false), + (BIGINT '3', VARCHAR 'MERGE', BIGINT '2', VARCHAR 'WriteSerializable', false), (BIGINT '2', VARCHAR 'WRITE', BIGINT '1', VARCHAR 'WriteSerializable', true), (BIGINT '1', VARCHAR 'WRITE', BIGINT '0', VARCHAR 'WriteSerializable', true), (BIGINT '0', VARCHAR 'CREATE TABLE', BIGINT '0', VARCHAR 'WriteSerializable', true)