Skip to content

Commit

Permalink
Use table schema from the table handle in getting the column handles
Browse files Browse the repository at this point in the history
In the context of the dealing with an Iceberg table with
a structure which evolves over time (columns are added / dropped)
in case of performing a snapshot/time travel query, the schema of
the output matches the corresponding schema of the table
snapshot queried.
  • Loading branch information
findinpath authored and findepi committed Sep 12, 2022
1 parent ae170a0 commit 3421fdf
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -557,10 +557,8 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());

ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
for (IcebergColumnHandle columnHandle : getColumns(icebergTable.schema(), typeManager)) {
for (IcebergColumnHandle columnHandle : getColumns(SchemaParser.fromJson(table.getTableSchemaJson()), typeManager)) {
columnHandles.put(columnHandle.getName(), columnHandle);
}
columnHandles.put(FILE_PATH.getColumnName(), pathColumnHandle());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import static io.trino.spi.predicate.Domain.multipleValues;
import static io.trino.spi.predicate.Domain.singleValue;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.DoubleType.DOUBLE;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
Expand Down Expand Up @@ -5446,6 +5447,55 @@ public void testReadFromVersionedTableWithSchemaEvolution()
.matches("VALUES (VARCHAR 'a', 11, NULL), (VARCHAR 'b', 22, BIGINT '32')");
}

@Test
public void testReadFromVersionedTableWithSchemaEvolutionDropColumn()
{
String tableName = "test_versioned_table_schema_evolution_drop_column_" + randomTableSuffix();

assertQuerySucceeds("CREATE TABLE " + tableName + "(col1 varchar, col2 integer, col3 boolean)");
long v1SnapshotId = getCurrentSnapshotId(tableName);
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId))
.hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER, BOOLEAN))
.returnsEmptyResult();

assertUpdate("INSERT INTO " + tableName + " VALUES ('a', 1 , true)", 1);
long v2SnapshotId = getCurrentSnapshotId(tableName);
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId))
.hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER, BOOLEAN))
.matches("VALUES (VARCHAR 'a', 1, true)");

assertUpdate("ALTER TABLE " + tableName + " DROP COLUMN col3");
assertUpdate("INSERT INTO " + tableName + " VALUES ('b', 2)", 1);
long v3SnapshotId = getCurrentSnapshotId(tableName);
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v3SnapshotId))
.hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER))
.matches("VALUES (VARCHAR 'a', 1), (VARCHAR 'b', 2)");
assertThat(query("SELECT * FROM " + tableName))
.hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER))
.matches("VALUES (VARCHAR 'a', 1), (VARCHAR 'b', 2)");
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId))
.hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER, BOOLEAN))
.matches("VALUES (VARCHAR 'a', 1, true)");

assertUpdate("ALTER TABLE " + tableName + " DROP COLUMN col2");
assertUpdate("INSERT INTO " + tableName + " VALUES ('c')", 1);
long v4SnapshotId = getCurrentSnapshotId(tableName);
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v4SnapshotId))
.hasOutputTypes(ImmutableList.of(VARCHAR))
.matches("VALUES (VARCHAR 'a'), (VARCHAR 'b'), (VARCHAR 'c')");
assertThat(query("SELECT * FROM " + tableName))
.hasOutputTypes(ImmutableList.of(VARCHAR))
.matches("VALUES (VARCHAR 'a'), (VARCHAR 'b'), (VARCHAR 'c')");
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v3SnapshotId))
.hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER))
.matches("VALUES (VARCHAR 'a', 1), (VARCHAR 'b', 2)");
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId))
.hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER, BOOLEAN))
.matches("VALUES (VARCHAR 'a', 1, true)");

assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testReadFromVersionedTableWithPartitionSpecEvolution()
throws Exception
Expand Down

0 comments on commit 3421fdf

Please sign in to comment.