Skip to content

Commit

Permalink
Verify accuracy of doing a CTAS statement from a versioned table
Browse files Browse the repository at this point in the history
  • Loading branch information
findinpath authored and ebyhr committed Jun 22, 2022
1 parent b5bf1f5 commit ff57246
Showing 1 changed file with 47 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -107,6 +110,7 @@
import static io.trino.transaction.TransactionBuilder.transaction;
import static java.lang.String.format;
import static java.lang.String.join;
import static java.time.ZoneOffset.UTC;
import static java.util.Collections.nCopies;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
Expand Down Expand Up @@ -4170,6 +4174,37 @@ public void testModifyingOldSnapshotIsNotPossible()
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testCreateTableAsSelectFromVersionedTable()
throws Exception
{
String sourceTableName = "test_ctas_versioned_source_" + randomTableSuffix();
String snapshotVersionedSinkTableName = "test_ctas_snapshot_versioned_sink_" + randomTableSuffix();
String timestampVersionedSinkTableName = "test_ctas_timestamp_versioned_sink_" + randomTableSuffix();

assertUpdate("CREATE TABLE " + sourceTableName + "(an_integer integer)");
// Enforce having exactly one snapshot of the table at the timestamp corresponding to `afterInsert123EpochMillis`
Thread.sleep(1);
assertUpdate("INSERT INTO " + sourceTableName + " VALUES 1, 2, 3", 3);
long afterInsert123SnapshotId = getLatestSnapshotId(sourceTableName);
long afterInsert123EpochMillis = getCommittedAtInEpochMilliseconds(sourceTableName, afterInsert123SnapshotId);
Thread.sleep(1);
assertUpdate("INSERT INTO " + sourceTableName + " VALUES 4, 5, 6", 3);
long afterInsert456SnapshotId = getLatestSnapshotId(sourceTableName);
assertUpdate("INSERT INTO " + sourceTableName + " VALUES 7, 8, 9", 3);

assertUpdate("CREATE TABLE " + snapshotVersionedSinkTableName + " AS SELECT * FROM " + sourceTableName + " FOR VERSION AS OF " + afterInsert456SnapshotId, 6);
assertUpdate("CREATE TABLE " + timestampVersionedSinkTableName + " AS SELECT * FROM " + sourceTableName + " FOR TIMESTAMP AS OF " + timestampLiteral(afterInsert123EpochMillis, 9), 3);

assertQuery("SELECT * FROM " + sourceTableName, "VALUES 1, 2, 3, 4, 5, 6, 7, 8, 9");
assertQuery("SELECT * FROM " + snapshotVersionedSinkTableName, "VALUES 1, 2, 3, 4, 5, 6");
assertQuery("SELECT * FROM " + timestampVersionedSinkTableName, "VALUES 1, 2, 3");

assertUpdate("DROP TABLE " + sourceTableName);
assertUpdate("DROP TABLE " + snapshotVersionedSinkTableName);
assertUpdate("DROP TABLE " + timestampVersionedSinkTableName);
}

private Session prepareCleanUpSession()
{
return Session.builder(getSession())
Expand Down Expand Up @@ -4232,4 +4267,16 @@ private Path getIcebergTablePath(String tableName, String suffix)
String schema = getSession().getSchema().orElseThrow();
return getDistributedQueryRunner().getCoordinator().getBaseDataDir().resolve("iceberg_data").resolve(schema).resolve(tableName).resolve(suffix);
}

private long getCommittedAtInEpochMilliseconds(String tableName, long snapshotId)
{
return ((ZonedDateTime) computeActual(format("SELECT committed_at FROM \"%s$snapshots\" WHERE snapshot_id=%s LIMIT 1", tableName, snapshotId)).getOnlyValue())
.toInstant().toEpochMilli();
}

private static String timestampLiteral(long epochMilliSeconds, int precision)
{
return DateTimeFormatter.ofPattern("'TIMESTAMP '''uuuu-MM-dd HH:mm:ss." + "S".repeat(precision) + " VV''")
.format(Instant.ofEpochMilli(epochMilliSeconds).atZone(UTC));
}
}

0 comments on commit ff57246

Please sign in to comment.