diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index ceb7ca1993fb..b3642dd3c733 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -59,6 +59,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Instant; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Locale; import java.util.Map; @@ -107,6 +110,7 @@ import static io.trino.transaction.TransactionBuilder.transaction; import static java.lang.String.format; import static java.lang.String.join; +import static java.time.ZoneOffset.UTC; import static java.util.Collections.nCopies; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.joining; @@ -4170,6 +4174,37 @@ public void testModifyingOldSnapshotIsNotPossible() assertUpdate("DROP TABLE " + tableName); } + @Test + public void testCreateTableAsSelectFromVersionedTable() + throws Exception + { + String sourceTableName = "test_ctas_versioned_source_" + randomTableSuffix(); + String snapshotVersionedSinkTableName = "test_ctas_snapshot_versioned_sink_" + randomTableSuffix(); + String timestampVersionedSinkTableName = "test_ctas_timestamp_versioned_sink_" + randomTableSuffix(); + + assertUpdate("CREATE TABLE " + sourceTableName + "(an_integer integer)"); + // Enforce having exactly one snapshot of the table at the timestamp corresponding to `afterInsert123EpochMillis` + Thread.sleep(1); + assertUpdate("INSERT INTO " + sourceTableName + " VALUES 1, 2, 3", 3); + long afterInsert123SnapshotId = getLatestSnapshotId(sourceTableName); + long afterInsert123EpochMillis = getCommittedAtInEpochMilliseconds(sourceTableName, afterInsert123SnapshotId); + Thread.sleep(1); + assertUpdate("INSERT INTO " + sourceTableName + " VALUES 4, 5, 6", 3); + long afterInsert456SnapshotId = getLatestSnapshotId(sourceTableName); + assertUpdate("INSERT INTO " + sourceTableName + " VALUES 7, 8, 9", 3); + + assertUpdate("CREATE TABLE " + snapshotVersionedSinkTableName + " AS SELECT * FROM " + sourceTableName + " FOR VERSION AS OF " + afterInsert456SnapshotId, 6); + assertUpdate("CREATE TABLE " + timestampVersionedSinkTableName + " AS SELECT * FROM " + sourceTableName + " FOR TIMESTAMP AS OF " + timestampLiteral(afterInsert123EpochMillis, 9), 3); + + assertQuery("SELECT * FROM " + sourceTableName, "VALUES 1, 2, 3, 4, 5, 6, 7, 8, 9"); + assertQuery("SELECT * FROM " + snapshotVersionedSinkTableName, "VALUES 1, 2, 3, 4, 5, 6"); + assertQuery("SELECT * FROM " + timestampVersionedSinkTableName, "VALUES 1, 2, 3"); + + assertUpdate("DROP TABLE " + sourceTableName); + assertUpdate("DROP TABLE " + snapshotVersionedSinkTableName); + assertUpdate("DROP TABLE " + timestampVersionedSinkTableName); + } + private Session prepareCleanUpSession() { return Session.builder(getSession()) @@ -4232,4 +4267,16 @@ private Path getIcebergTablePath(String tableName, String suffix) String schema = getSession().getSchema().orElseThrow(); return getDistributedQueryRunner().getCoordinator().getBaseDataDir().resolve("iceberg_data").resolve(schema).resolve(tableName).resolve(suffix); } + + private long getCommittedAtInEpochMilliseconds(String tableName, long snapshotId) + { + return ((ZonedDateTime) computeActual(format("SELECT committed_at FROM \"%s$snapshots\" WHERE snapshot_id=%s LIMIT 1", tableName, snapshotId)).getOnlyValue()) + .toInstant().toEpochMilli(); + } + + private static String timestampLiteral(long epochMilliSeconds, int precision) + { + return DateTimeFormatter.ofPattern("'TIMESTAMP '''uuuu-MM-dd HH:mm:ss." + "S".repeat(precision) + " VV''") + .format(Instant.ofEpochMilli(epochMilliSeconds).atZone(UTC)); + } }