-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix incorrect metadata for Iceberg table after rollback #13704
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -154,6 +154,7 @@ | |||||
import java.util.regex.Pattern; | ||||||
import java.util.stream.Collectors; | ||||||
import java.util.stream.Stream; | ||||||
import java.util.stream.StreamSupport; | ||||||
|
||||||
import static com.google.common.base.Preconditions.checkArgument; | ||||||
import static com.google.common.base.Preconditions.checkState; | ||||||
|
@@ -234,6 +235,7 @@ | |||||
import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc; | ||||||
import static io.trino.spi.type.UuidType.UUID; | ||||||
import static java.lang.String.format; | ||||||
import static java.util.Comparator.comparing; | ||||||
import static java.util.Objects.requireNonNull; | ||||||
import static java.util.function.Function.identity; | ||||||
import static java.util.stream.Collectors.groupingBy; | ||||||
|
@@ -347,7 +349,7 @@ public IcebergTableHandle getTableHandle( | |||||
} | ||||||
else { | ||||||
tableSnapshotId = Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId); | ||||||
tableSchema = table.schema(); | ||||||
tableSchema = getCurrentSnapshotSchema(table); | ||||||
partitionSpec = Optional.of(table.spec()); | ||||||
} | ||||||
|
||||||
|
@@ -595,7 +597,7 @@ public Iterator<TableColumnsMetadata> streamTableColumns(ConnectorSession sessio | |||||
} | ||||||
|
||||||
Table icebergTable = catalog.loadTable(session, tableName); | ||||||
List<ColumnMetadata> columns = getColumnMetadatas(icebergTable.schema()); | ||||||
List<ColumnMetadata> columns = getColumnMetadatas(getCurrentSnapshotSchema(icebergTable)); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why it's more correct than With correct code, it looks like the |
||||||
return Stream.of(TableColumnsMetadata.forTable(tableName, columns)); | ||||||
} | ||||||
catch (TableNotFoundException e) { | ||||||
|
@@ -615,6 +617,24 @@ public Iterator<TableColumnsMetadata> streamTableColumns(ConnectorSession sessio | |||||
.iterator(); | ||||||
} | ||||||
|
||||||
private static Schema getCurrentSnapshotSchema(Table icebergTable) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add javadoc explaining how |
||||||
{ | ||||||
Optional<Long> tableSnapshotId = Optional.ofNullable(icebergTable.currentSnapshot()).map(Snapshot::snapshotId); | ||||||
// Check whether the snapshot is latest or not because changing table definition (e.g. ADD COLUMN) doesn't generate a new snapshot | ||||||
// If we use schema for 'tableSnapshotId' without this comparison, it returns old table definition which isn't different from the latest one | ||||||
if (tableSnapshotId.isPresent() && !findLatestSnapshotId(icebergTable.snapshots()).equals(tableSnapshotId)) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is it test-covered? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For instance, when changed the table definition (e.g. Added a code comment and it's covered in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I get it now. However, consider the case CREATE TABLE t (a varchar);
INSERT INTO t VALUES 'one column' ; -- snapshot "s"
-- returns `a` column
SELECT * FROM t FOR VERSION AS OF s;
ALTER TABLE t ADD COLUMN (b varchar);
-- this should return a, b columns:
SELECT * FROM t;
-- and this should continue to return only the `a` column
SELECT * FROM t FOR VERSION AS OF s; Can you verify this is the case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think #14076 covered the case. Let me know if my understanding is wrong. Confirmed it works as CREATE TABLE t (a varchar);
INSERT INTO t VALUES 'one column' ;
TABLE "t$snapshots";
committed_at | snapshot_id | parent_id | operation | >
------------------------------------+---------------------+---------------------+-----------+------------------------------------------------>
2022-09-13 15:24:59.540 Asia/Tokyo | 5659256701260247696 | NULL | append | /var/folders/8s/dkvf18z55lj_9yxhy1n54sph0000gn/>
2022-09-13 15:24:59.744 Asia/Tokyo | 261758948915485069 | 5659256701260247696 | append | /var/folders/8s/dkvf18z55lj_9yxhy1n54sph0000gn/>
-- returns `a` column
SELECT * FROM t FOR VERSION AS OF 261758948915485069;
a
------------
one column
ALTER TABLE t ADD COLUMN b varchar;
ADD COLUMN
-- this should return a, b columns:
SELECT * FROM t;
a | b
------------+------
one column | NULL
-- and this should continue to return only the `a` column
SELECT * FROM t FOR VERSION AS OF 261758948915485069;
a
------------
one column |
||||||
return schemaFor(icebergTable, tableSnapshotId.get()); | ||||||
} | ||||||
return icebergTable.schema(); | ||||||
} | ||||||
|
||||||
private static Optional<Long> findLatestSnapshotId(Iterable<Snapshot> snapshots) | ||||||
{ | ||||||
return StreamSupport.stream(snapshots.spliterator(), false) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use Guava
Suggested change
|
||||||
.max(comparing(Snapshot::timestampMillis)) | ||||||
.map(Snapshot::snapshotId); | ||||||
} | ||||||
|
||||||
@Override | ||||||
public void createSchema(ConnectorSession session, String schemaName, Map<String, Object> properties, TrinoPrincipal owner) | ||||||
{ | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1106,6 +1106,31 @@ public void testRollbackSnapshot() | |
dropTable("test_rollback"); | ||
} | ||
|
||
@Test | ||
public void testRollbackSnapshotWithSchemaEvolution() | ||
{ | ||
assertUpdate("CREATE TABLE test_rollback_with_schema_evolution AS SELECT 1 col0", 1); | ||
long afterCreateTableId = getCurrentSnapshotId("test_rollback_with_schema_evolution"); | ||
|
||
assertUpdate("ALTER TABLE test_rollback_with_schema_evolution ADD COLUMN col1 int"); | ||
assertUpdate("INSERT INTO test_rollback_with_schema_evolution VALUES (2, 2)", 1); | ||
|
||
assertQuery("SELECT * FROM test_rollback_with_schema_evolution", "VALUES (1, NULL), (2, 2)"); | ||
|
||
assertQuery("SELECT * FROM test_rollback_with_schema_evolution FOR VERSION AS OF " + afterCreateTableId, "VALUES 1"); | ||
|
||
assertUpdate("CALL system.rollback_to_snapshot('tpch', 'test_rollback_with_schema_evolution', " + afterCreateTableId + ")"); | ||
|
||
// Verify the output doesn't have col1 column after rollback | ||
assertThat(query("DESCRIBE test_rollback_with_schema_evolution")) | ||
.projected(0) | ||
.skippingTypesCheck() | ||
.matches("VALUES 'col0'"); | ||
assertQuery("SELECT * FROM test_rollback_with_schema_evolution", "VALUES 1"); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add // A DML should not change the table schema (which could be possible if the rolled back schema gets revived when committing th DML)
assertUpdate("INSERT INTO test_rollback_with_schema_evolution VALUES (13)");
assertThat(query("DESCRIBE test_rollback_with_schema_evolution"))
.projected(0)
.skippingTypesCheck()
.matches("VALUES 'col0'");
assertQuery("SELECT * FROM test_rollback_with_schema_evolution", "VALUES 1, 13"); This should pass, but actually the INSERT is failing. It seems to me now that we cannot fix #13699 without Iceberg side being fixed (apache/iceberg#5591). |
||
dropTable("test_rollback_with_schema_evolution"); | ||
} | ||
|
||
@Override | ||
protected String errorMessageForInsertIntoNotNullColumn(String columnName) | ||
{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In https://github.com/trinodb/trino/pull/13704/files#r969206216 you're showing that this works, but i don't understand yet why it does so.
What's the situation when
table.schema()
is different fromgetCurrentSnapshotSchema(table)
?In that situation, how do they differ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The situation is after rollback. In the last SELECT statement in
testRollbackSnapshotWithSchemaEvolution
,table.schema()
returnsgetCurrentSnapshotSchema(table)
returns