diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 81653b5e20df..426864b853f2 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -2767,8 +2767,10 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col RowDelta rowDelta = transaction.newRowDelta(); table.getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId())); TupleDomain dataColumnPredicate = table.getEnforcedPredicate().filter((column, domain) -> !isMetadataColumnId(column.getId())); - if (!dataColumnPredicate.isAll()) { - rowDelta.conflictDetectionFilter(toIcebergExpression(dataColumnPredicate)); + TupleDomain convertibleUnenforcedPredicate = table.getUnenforcedPredicate().filter((_, domain) -> isConvertableToIcebergExpression(domain)); + TupleDomain effectivePredicate = dataColumnPredicate.intersect(convertibleUnenforcedPredicate); + if (!effectivePredicate.isAll()) { + rowDelta.conflictDetectionFilter(toIcebergExpression(effectivePredicate)); } IsolationLevel isolationLevel = IsolationLevel.fromName(icebergTable.properties().getOrDefault(DELETE_ISOLATION_LEVEL, DELETE_ISOLATION_LEVEL_DEFAULT)); if (isolationLevel == IsolationLevel.SERIALIZABLE) { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergLocalConcurrentWrites.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergLocalConcurrentWrites.java index 82dbc50f9be4..65253a37868d 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergLocalConcurrentWrites.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergLocalConcurrentWrites.java @@ -18,20 +18,24 @@ import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.MaterializedResult; import io.trino.testing.QueryRunner; +import io.trino.testing.sql.TestTable; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import java.util.List; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.stream.IntStream; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.concurrent.MoreFutures.tryGetFutureValue; import static io.trino.testing.QueryAssertions.getTrinoExceptionCause; import static io.trino.testing.TestingNames.randomNameSuffix; +import static java.lang.String.format; import static java.util.concurrent.Executors.newFixedThreadPool; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; @@ -768,6 +772,255 @@ void testConcurrentDeleteAndDeletePushdownAndInsert() } } + @Test + void testConcurrentUpdateWithPartitionTransformation() + throws Exception + { + int threads = 4; + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + List rows = ImmutableList.of("('A', DATE '2024-01-01')", "('B', DATE '2024-02-02')", "('C', DATE '2024-03-03')", "('D', DATE '2024-04-04')"); + List partitions = ImmutableList.of("DATE '2024-01-01'", "DATE '2024-02-02'", "DATE '2024-03-03'", "DATE '2024-04-04'"); + + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_concurrent_update_partition_transform_table_", + "(data varchar, part date) with (partitioning = array['month(part)'])")) { + String tableName = table.getName(); + + assertUpdate("INSERT INTO " + tableName + " VALUES " + String.join(", ", rows), 4); + + List> futures = IntStream.range(0, threads) + .mapToObj(threadNumber -> executor.submit(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute(format("UPDATE %s SET data = data || data WHERE part = %s", tableName, partitions.get(threadNumber))); + return true; + })) + .collect(toImmutableList()); + + futures.forEach(future -> { + Optional value = tryGetFutureValue(future, 20, SECONDS); + checkState(value.isPresent(), "Task did not complete in time"); + boolean updateSuccessful = value.get(); + checkState(updateSuccessful, "Task did not complete successfully"); + }); + + assertThat(query("SELECT data, part FROM " + tableName)) + .skippingTypesCheck() + .matches("VALUES ('AA', DATE '2024-01-01'), ('BB', DATE '2024-02-02'), ('CC', DATE '2024-03-03'), ('DD', DATE '2024-04-04')"); + } + finally { + executor.shutdownNow(); + assertThat(executor.awaitTermination(10, SECONDS)).isTrue(); + } + } + + @Test + void testConcurrentUpdateWithNestedPartitionTransformation() + throws Exception + { + int threads = 4; + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + List rows = ImmutableList.of("('A', ROW(DATE '2024-01-01'))", "('B', ROW(DATE '2024-02-02'))", "('C', ROW(DATE '2024-03-03'))", "('D', ROW(DATE '2024-04-04'))"); + List partitions = ImmutableList.of("DATE '2024-01-01'", "DATE '2024-02-02'", "DATE '2024-03-03'", "DATE '2024-04-04'"); + + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_concurrent_update_partition_transform_table_", + "(data varchar, parent ROW (part date)) with (partitioning = array['month(\"parent.part\")'])")) { + String tableName = table.getName(); + + assertUpdate("INSERT INTO " + tableName + " VALUES " + String.join(", ", rows), 4); + + List> futures = IntStream.range(0, threads) + .mapToObj(threadNumber -> executor.submit(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute(format("UPDATE %s SET data = data || data WHERE parent.part = %s", tableName, partitions.get(threadNumber))); + return true; + })) + .collect(toImmutableList()); + + futures.forEach(future -> { + Optional value = tryGetFutureValue(future, 20, SECONDS); + checkState(value.isPresent(), "Task did not complete in time"); + boolean updateSuccessful = value.get(); + checkState(updateSuccessful, "Task did not complete successfully"); + }); + + assertThat(query("SELECT data, parent.part FROM " + tableName)) + .skippingTypesCheck() + .matches("VALUES ('AA', DATE '2024-01-01'), ('BB', DATE '2024-02-02'), ('CC', DATE '2024-03-03'), ('DD', DATE '2024-04-04')"); + } + finally { + executor.shutdownNow(); + assertThat(executor.awaitTermination(10, SECONDS)).isTrue(); + } + } + + @Test + void testConcurrentUpdateWithMultiplePartitionTransformation() + throws Exception + { + int threads = 4; + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + List rows = ImmutableList.of("('A', TIMESTAMP '2024-01-01 01:01', 1, 'aaa')", + "('B', TIMESTAMP '2024-01-01 02:02', 1, 'aab')", + "('C', TIMESTAMP '2024-01-01 03:03', 1, 'aac')", + "('D', TIMESTAMP '2024-01-01 04:04', 1, 'aad')"); + // Only hour partition is not-overlapping + List partitions1 = ImmutableList.of("TIMESTAMP '2024-01-01 01:01'", "TIMESTAMP '2024-01-01 02:02'", "TIMESTAMP '2024-01-01 03:03'", "TIMESTAMP '2024-01-01 04:04'"); + List partitions2 = ImmutableList.of("1", "1", "1", "1"); + List partitions3 = ImmutableList.of("'aaa'", "'aab'", "'aac'", "'aad'"); + + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_concurrent_update_multiple_partition_transform_table_", + "(data varchar, part1 timestamp, part2 int, part3 varchar) with (partitioning = array['hour(part1)', 'bucket(part2, 10)', 'truncate(part3, 2)'])")) { + String tableName = table.getName(); + + assertUpdate("INSERT INTO " + tableName + " VALUES " + String.join(", ", rows), 4); + + List> futures = IntStream.range(0, threads) + .mapToObj(threadNumber -> executor.submit(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute(format( + "UPDATE %s SET data = data || data WHERE part1 = %s AND part2 = %s AND part3 = %s", + tableName, + partitions1.get(threadNumber), + partitions2.get(threadNumber), + partitions3.get(threadNumber))); + return true; + })) + .collect(toImmutableList()); + + futures.forEach(future -> { + Optional value = tryGetFutureValue(future, 20, SECONDS); + checkState(value.isPresent(), "Task did not complete in time"); + boolean updateSuccessful = value.get(); + checkState(updateSuccessful, "Task did not complete successfully"); + }); + + assertThat(query("SELECT data, part1, part2, part3 FROM " + tableName)) + .skippingTypesCheck() + .matches("VALUES ('AA', TIMESTAMP '2024-01-01 01:01', 1, 'aaa'), " + + "('BB', TIMESTAMP '2024-01-01 02:02', 1, 'aab')," + + " ('CC', TIMESTAMP '2024-01-01 03:03', 1, 'aac'), " + + "('DD', TIMESTAMP '2024-01-01 04:04', 1, 'aad')"); + } + finally { + executor.shutdownNow(); + assertThat(executor.awaitTermination(10, SECONDS)).isTrue(); + } + } + + @Test + void testConcurrentUpdateWithOverlappingPartitionTransformation() + throws Exception + { + int threads = 4; + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + List rows = ImmutableList.of("('A', DATE '2024-01-01')", "('B', DATE '2024-01-02')", "('C', DATE '2024-03-03')", "('D', DATE '2024-04-04')"); + List partitions = ImmutableList.of("DATE '2024-01-01'", "DATE '2024-01-02'", "DATE '2024-03-03'", "DATE '2024-04-04'"); + + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_concurrent_update_overlapping_partition_transform_table_", + "(data varchar, part date) with (partitioning = array['month(part)'])")) { + String tableName = table.getName(); + + assertUpdate("INSERT INTO " + tableName + " VALUES " + String.join(", ", rows), 4); + + List> futures = IntStream.range(0, threads) + .mapToObj(threadNumber -> executor.submit(() -> { + barrier.await(10, SECONDS); + try { + getQueryRunner().execute(format("UPDATE %s SET data = data || data WHERE part = %s", tableName, partitions.get(threadNumber))); + return true; + } + catch (Exception e) { + RuntimeException trinoException = getTrinoExceptionCause(e); + try { + assertThat(trinoException).hasMessageMatching("Failed to commit the transaction during write.*|" + + "Failed to commit during write.*"); + } + catch (Throwable verifyFailure) { + if (verifyFailure != e) { + verifyFailure.addSuppressed(e); + } + throw verifyFailure; + } + return false; + } + })) + .collect(toImmutableList()); + + long successfulWrites = futures.stream() + .map(future -> tryGetFutureValue(future, 10, SECONDS).orElseThrow(() -> new RuntimeException("Wait timed out"))) + .filter(success -> success) + .count(); + + assertThat(successfulWrites).isEqualTo(3); + + //There can be two possible results depended on which thread fails + MaterializedResult expected1 = computeActual("VALUES (VARCHAR 'AA', DATE '2024-01-01'), ('B', DATE '2024-01-02'), ('CC', DATE '2024-03-03'), ('DD', DATE '2024-04-04')"); + MaterializedResult expected2 = computeActual("VALUES (VARCHAR 'A', DATE '2024-01-01'), ('BB', DATE '2024-01-02'), ('CC', DATE '2024-03-03'), ('DD', DATE '2024-04-04')"); + assertThat(computeActual("SELECT data, part FROM " + tableName + " ORDER BY data")) + .isIn(expected1, expected2); + } + finally { + executor.shutdownNow(); + assertThat(executor.awaitTermination(10, SECONDS)).isTrue(); + } + } + + @Test + void testConcurrentUpdateWithEnforcedAndUnenforcedPartitions() + throws Exception + { + int threads = 4; + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + List rows = ImmutableList.of("('A', 'a', DATE '2024-01-01')", "('B', 'b', DATE '2024-02-02')", "('C', 'c', DATE '2024-03-03')", "('D', 'd', DATE '2024-04-04')"); + List partitions1 = ImmutableList.of("'a'", "'b'", "'c'", "'d'"); + List partitions2 = ImmutableList.of("DATE '2024-01-01'", "DATE '2024-02-02'", "DATE '2024-03-03'", "DATE '2024-04-04'"); + + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_concurrent_update_enforced_unenforced_partition_transform_table_", + // part1 is enforced and part2 is unenforced as it has transformation + "(data varchar, part1 varchar, part2 date) with (partitioning = array['part1', 'month(part2)'])")) { + String tableName = table.getName(); + + assertUpdate("INSERT INTO " + tableName + " VALUES " + String.join(", ", rows), 4); + + List> futures = IntStream.range(0, threads) + .mapToObj(threadNumber -> executor.submit(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute(format("UPDATE %s SET data = data || data WHERE part1 = %s AND part2 = %s", tableName, partitions1.get(threadNumber), partitions2.get(threadNumber))); + return true; + })) + .collect(toImmutableList()); + + futures.forEach(future -> { + Optional value = tryGetFutureValue(future, 20, SECONDS); + checkState(value.isPresent(), "Task did not complete in time"); + boolean updateSuccessful = value.get(); + checkState(updateSuccessful, "Task did not complete successfully"); + }); + + assertThat(query("SELECT data, part1, part2 FROM " + tableName)) + .skippingTypesCheck() + .matches("VALUES ('AA', 'a', DATE '2024-01-01'), ('BB', 'b', DATE '2024-02-02'), ('CC', 'c', DATE '2024-03-03'), ('DD', 'd', DATE '2024-04-04')"); + } + finally { + executor.shutdownNow(); + assertThat(executor.awaitTermination(10, SECONDS)).isTrue(); + } + } + private long getCurrentSnapshotId(String tableName) { return (long) computeScalar("SELECT snapshot_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES");