diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index c2ce359fc14a..7004d3fd019e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -56,12 +56,14 @@ import io.trino.spi.connector.ConnectorTableLayout; import io.trino.spi.connector.ConnectorTableMetadata; import io.trino.spi.connector.ConnectorTableProperties; +import io.trino.spi.connector.ConnectorTableVersion; import io.trino.spi.connector.ConnectorViewDefinition; import io.trino.spi.connector.Constraint; import io.trino.spi.connector.ConstraintApplicationResult; import io.trino.spi.connector.DiscretePredicates; import io.trino.spi.connector.MaterializedViewFreshness; import io.trino.spi.connector.MaterializedViewNotFoundException; +import io.trino.spi.connector.PointerType; import io.trino.spi.connector.ProjectionApplicationResult; import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.SchemaTableName; @@ -77,6 +79,9 @@ import io.trino.spi.security.TrinoPrincipal; import io.trino.spi.statistics.ComputedStatistics; import io.trino.spi.statistics.TableStatistics; +import io.trino.spi.type.LongTimestampWithTimeZone; +import io.trino.spi.type.TimestampType; +import io.trino.spi.type.TimestampWithTimeZoneType; import io.trino.spi.type.TypeManager; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -179,6 +184,7 @@ import static io.trino.plugin.iceberg.IcebergUtil.getColumns; import static io.trino.plugin.iceberg.IcebergUtil.getFileFormat; import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys; +import static io.trino.plugin.iceberg.IcebergUtil.getSnapshotIdAsOfTime; import static io.trino.plugin.iceberg.IcebergUtil.getTableComment; import static io.trino.plugin.iceberg.IcebergUtil.newCreateTableTransaction; import static io.trino.plugin.iceberg.IcebergUtil.toIcebergSchema; @@ -191,8 +197,12 @@ import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.EXPIRE_SNAPSHOTS; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.OPTIMIZE; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.REMOVE_ORPHAN_FILES; +import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR; +import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.connector.RetryMode.NO_RETRIES; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.function.Function.identity; @@ -265,6 +275,20 @@ public Optional getSchemaOwner(ConnectorSession session, Catalog @Override public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) { + return getTableHandle(session, tableName, Optional.empty(), Optional.empty()); + } + + @Override + public IcebergTableHandle getTableHandle( + ConnectorSession session, + SchemaTableName tableName, + Optional startVersion, + Optional endVersion) + { + if (startVersion.isPresent()) { + throw new TrinoException(NOT_SUPPORTED, "Read table with start version is not supported"); + } + IcebergTableName name = IcebergTableName.from(tableName.getTableName()); if (name.getTableType() != DATA) { // Pretend the table does not exist to produce better error message in case of table redirects to Hive @@ -278,7 +302,13 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa catch (TableNotFoundException e) { return null; } - Optional snapshotId = getSnapshotId(table, name.getSnapshotId()); + + if (name.getSnapshotId().isPresent() && endVersion.isPresent()) { + throw new TrinoException(GENERIC_USER_ERROR, "Cannot specify end version both in table name and FOR clause"); + } + + Optional snapshotId = endVersion.map(version -> getSnapshotIdFromVersion(table, version)) + .or(() -> getSnapshotId(table, name.getSnapshotId())); Map tableProperties = table.properties(); String nameMappingJson = tableProperties.get(TableProperties.DEFAULT_NAME_MAPPING); @@ -300,6 +330,47 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa ImmutableList.of()); } + @Override + public boolean isSupportedVersionType(ConnectorSession session, SchemaTableName tableName, PointerType pointerType, io.trino.spi.type.Type versioning) + { + switch (pointerType) { + case TEMPORAL: + return versioning instanceof TimestampWithTimeZoneType || versioning instanceof TimestampType; + case TARGET_ID: + return versioning == BIGINT; + } + return false; + } + + private long getSnapshotIdFromVersion(Table table, ConnectorTableVersion version) + { + io.trino.spi.type.Type versionType = version.getVersionType(); + switch (version.getPointerType()) { + case TEMPORAL: + long epochMillis; + if (versionType instanceof TimestampWithTimeZoneType) { + epochMillis = ((TimestampWithTimeZoneType) versionType).isShort() + ? unpackMillisUtc((long) version.getVersion()) + : ((LongTimestampWithTimeZone) version.getVersion()).getEpochMillis(); + } + else { + throw new TrinoException(NOT_SUPPORTED, "Unsupported type for temporal table version: " + versionType.getDisplayName()); + } + return getSnapshotIdAsOfTime(table, epochMillis); + + case TARGET_ID: + if (versionType != BIGINT) { + throw new TrinoException(NOT_SUPPORTED, "Unsupported type for table version: " + versionType.getDisplayName()); + } + long snapshotId = (long) version.getVersion(); + if (table.snapshot(snapshotId) == null) { + throw new TrinoException(INVALID_ARGUMENTS, "Iceberg snapshot ID does not exists: " + snapshotId); + } + return snapshotId; + } + throw new TrinoException(NOT_SUPPORTED, "Version pointer type is not supported: " + version.getPointerType()); + } + @Override public Optional getSystemTable(ConnectorSession session, SchemaTableName tableName) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index d69f4f57b1ab..09df8d86a746 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -55,6 +55,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.time.Instant; import java.util.ArrayList; import java.util.Base64; import java.util.List; @@ -87,6 +88,7 @@ import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; import static io.trino.plugin.iceberg.util.Timestamps.timestampTzFromMicros; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; +import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS; import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.type.BigintType.BIGINT; @@ -107,6 +109,7 @@ import static java.lang.Float.parseFloat; import static java.lang.Long.parseLong; import static java.lang.String.format; +import static java.util.Comparator.comparing; import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; import static org.apache.iceberg.LocationProviders.locationsFor; @@ -426,6 +429,16 @@ public static Transaction newCreateTableTransaction(TrinoCatalog catalog, Connec return catalog.newCreateTableTransaction(session, schemaTableName, schema, partitionSpec, targetPath, propertiesBuilder.buildOrThrow()); } + public static long getSnapshotIdAsOfTime(Table table, long epochMillis) + { + // The order of table.history() elements is not guaranteed, refer: https://github.com/apache/iceberg/issues/3891#issuecomment-1012303069 + return table.history().stream() + .filter(logEntry -> logEntry.timestampMillis() <= epochMillis) + .max(comparing(HistoryEntry::timestampMillis)) + .orElseThrow(() -> new TrinoException(INVALID_ARGUMENTS, format("No version history table %s at or before %s", table.name(), Instant.ofEpochMilli(epochMillis)))) + .snapshotId(); + } + public static void validateTableCanBeDropped(Table table) { // TODO: support path override in Iceberg table creation: https://github.com/trinodb/trino/issues/8861 diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergReadVersionedTable.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergReadVersionedTable.java new file mode 100644 index 000000000000..54dce99b0a42 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergReadVersionedTable.java @@ -0,0 +1,113 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.time.Instant; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; + +import static io.trino.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner; +import static java.lang.String.format; +import static java.time.ZoneOffset.UTC; + +public class TestIcebergReadVersionedTable + extends AbstractTestQueryFramework +{ + private long v1SnapshotId; + private long v1EpochMillis; + private long v2SnapshotId; + private long v2EpochMillis; + private long incorrectSnapshotId; + + @Override + protected DistributedQueryRunner createQueryRunner() + throws Exception + { + return createIcebergQueryRunner(); + } + + @BeforeClass + public void setUp() + throws InterruptedException + { + assertQuerySucceeds("CREATE TABLE test_iceberg_read_versioned_table(a_string varchar, an_integer integer)"); + assertQuerySucceeds("INSERT INTO test_iceberg_read_versioned_table VALUES ('a', 1)"); + v1SnapshotId = getLatestSnapshotId("test_iceberg_read_versioned_table"); + v1EpochMillis = getCommittedAtInEpochMilliSeconds("test_iceberg_read_versioned_table", v1SnapshotId); + Thread.sleep(1); + assertQuerySucceeds("INSERT INTO test_iceberg_read_versioned_table VALUES ('b', 2)"); + v2SnapshotId = getLatestSnapshotId("test_iceberg_read_versioned_table"); + v2EpochMillis = getCommittedAtInEpochMilliSeconds("test_iceberg_read_versioned_table", v2SnapshotId); + incorrectSnapshotId = v2SnapshotId + 1; + } + + @Test + public void testSelectTableWithEndSnapshotId() + { + assertQuery("SELECT * FROM test_iceberg_read_versioned_table FOR VERSION AS OF " + v1SnapshotId, "VALUES ('a', 1)"); + assertQuery("SELECT * FROM test_iceberg_read_versioned_table FOR VERSION AS OF " + v2SnapshotId, "VALUES ('a', 1), ('b', 2)"); + assertQueryFails("SELECT * FROM test_iceberg_read_versioned_table FOR VERSION AS OF " + incorrectSnapshotId, "Iceberg snapshot ID does not exists: " + incorrectSnapshotId); + } + + @Test + public void testSelectTableWithEndShortTimestampWithTimezone() + { + assertQueryFails("SELECT * FROM test_iceberg_read_versioned_table FOR TIMESTAMP AS OF TIMESTAMP '1970-01-01 00:00:00.001000000 Z'", + "\\QNo version history table tpch.\"test_iceberg_read_versioned_table\" at or before 1970-01-01T00:00:00.001Z"); + assertQuery("SELECT * FROM test_iceberg_read_versioned_table FOR TIMESTAMP AS OF " + timestampLiteral(v1EpochMillis, 9), "VALUES ('a', 1)"); + assertQuery("SELECT * FROM test_iceberg_read_versioned_table FOR TIMESTAMP AS OF " + timestampLiteral(v2EpochMillis, 9), "VALUES ('a', 1), ('b', 2)"); + } + + @Test + public void testSelectTableWithEndLongTimestampWithTimezone() + { + assertQueryFails("SELECT * FROM test_iceberg_read_versioned_table FOR TIMESTAMP AS OF TIMESTAMP '1970-01-01 00:00:00.001000000 Z'", + "\\QNo version history table tpch.\"test_iceberg_read_versioned_table\" at or before 1970-01-01T00:00:00.001Z"); + assertQuery("SELECT * FROM test_iceberg_read_versioned_table FOR TIMESTAMP AS OF " + timestampLiteral(v1EpochMillis, 9), "VALUES ('a', 1)"); + assertQuery("SELECT * FROM test_iceberg_read_versioned_table FOR TIMESTAMP AS OF " + timestampLiteral(v2EpochMillis, 9), "VALUES ('a', 1), ('b', 2)"); + } + + @Test + public void testEndVersionInTableNameAndForClauseShouldFail() + { + assertQueryFails("SELECT * FROM \"test_iceberg_read_versioned_table@" + v1SnapshotId + "\" FOR VERSION AS OF " + v1SnapshotId, + "Cannot specify end version both in table name and FOR clause"); + + assertQueryFails("SELECT * FROM \"test_iceberg_read_versioned_table@" + v1SnapshotId + "\" FOR TIMESTAMP AS OF " + timestampLiteral(v1EpochMillis, 9), + "Cannot specify end version both in table name and FOR clause"); + } + + private long getLatestSnapshotId(String tableName) + { + return (long) computeActual(format("SELECT snapshot_id FROM \"%s$snapshots\" ORDER BY committed_at DESC LIMIT 1", tableName)) + .getOnlyValue(); + } + + private long getCommittedAtInEpochMilliSeconds(String tableName, long snapshotId) + { + return ((ZonedDateTime) computeActual(format("SELECT committed_at FROM \"%s$snapshots\" WHERE snapshot_id=%s LIMIT 1", tableName, snapshotId)).getOnlyValue()) + .toInstant().toEpochMilli(); + } + + private static String timestampLiteral(long epochMilliSeconds, int precision) + { + return DateTimeFormatter.ofPattern("'TIMESTAMP '''uuuu-MM-dd HH:mm:ss." + "S".repeat(precision) + " VV''") + .format(Instant.ofEpochMilli(epochMilliSeconds).atZone(UTC)); + } +}