-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix incorrect metadata for Iceberg table after rollback #13704
Conversation
private static Schema getCurrentSnapshotSchema(Table icebergTable) | ||
{ | ||
Optional<Long> tableSnapshotId = Optional.ofNullable(icebergTable.currentSnapshot()).map(Snapshot::snapshotId); | ||
if (tableSnapshotId.isPresent() && !findLatestSnapshotId(icebergTable.snapshots()).equals(tableSnapshotId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why !findLatestSnapshotId
check is necessary?
please document
is it test-covered?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For instance, when changed the table definition (e.g. ADD COLUMN
), tableSnapshotId
still points to the old definition before adding the column. The condition is for using the latest schema in such cases.
Added a code comment and it's covered in ALTER TABLE
→ INSERT INTO
in testRollbackSnapshotWithSchemaEvolution
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get it now. However, consider the case
CREATE TABLE t (a varchar);
INSERT INTO t VALUES 'one column' ; -- snapshot "s"
-- returns `a` column
SELECT * FROM t FOR VERSION AS OF s;
ALTER TABLE t ADD COLUMN (b varchar);
-- this should return a, b columns:
SELECT * FROM t;
-- and this should continue to return only the `a` column
SELECT * FROM t FOR VERSION AS OF s;
Can you verify this is the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think #14076 covered the case. Let me know if my understanding is wrong. Confirmed it works as
CREATE TABLE t (a varchar);
INSERT INTO t VALUES 'one column' ;
TABLE "t$snapshots";
committed_at | snapshot_id | parent_id | operation | >
------------------------------------+---------------------+---------------------+-----------+------------------------------------------------>
2022-09-13 15:24:59.540 Asia/Tokyo | 5659256701260247696 | NULL | append | /var/folders/8s/dkvf18z55lj_9yxhy1n54sph0000gn/>
2022-09-13 15:24:59.744 Asia/Tokyo | 261758948915485069 | 5659256701260247696 | append | /var/folders/8s/dkvf18z55lj_9yxhy1n54sph0000gn/>
-- returns `a` column
SELECT * FROM t FOR VERSION AS OF 261758948915485069;
a
------------
one column
ALTER TABLE t ADD COLUMN b varchar;
ADD COLUMN
-- this should return a, b columns:
SELECT * FROM t;
a | b
------------+------
one column | NULL
-- and this should continue to return only the `a` column
SELECT * FROM t FOR VERSION AS OF 261758948915485069;
a
------------
one column
7f60491
to
cb4e146
Compare
private static Optional<Long> findLatestSnapshotId(Iterable<Snapshot> snapshots) | ||
{ | ||
try { | ||
return Optional.of(Iterables.getLast(snapshots).snapshotId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is icebergTable.snapshots()
guaranteed to be ordered, or we need to sort this first?
(see #10258 (comment) )
return Optional.of(Iterables.getLast(snapshots).snapshotId()); | ||
} | ||
catch (NoSuchElementException e) { | ||
return Optional.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use Iterables.isEmpty()
to avoid exception-drive code flow
(probably obsolete, if you also need to sort)
@@ -604,7 +605,7 @@ public Iterator<TableColumnsMetadata> streamTableColumns(ConnectorSession sessio | |||
} | |||
|
|||
Table icebergTable = catalog.loadTable(session, tableName); | |||
List<ColumnMetadata> columns = getColumnMetadatas(icebergTable.schema()); | |||
List<ColumnMetadata> columns = getColumnMetadatas(getCurrentSnapshotSchema(icebergTable)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why it's more correct than icebergTable.schema()
?
With correct code, it looks like the information_schema.columns
doesn't "see" newly added columns until a new data snapshot is committed.
@@ -348,7 +349,7 @@ public IcebergTableHandle getTableHandle( | |||
} | |||
else { | |||
tableSnapshotId = Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId); | |||
tableSchema = table.schema(); | |||
tableSchema = getCurrentSnapshotSchema(table); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In https://github.com/trinodb/trino/pull/13704/files#r969206216 you're showing that this works, but i don't understand yet why it does so.
What's the situation when table.schema()
is different from getCurrentSnapshotSchema(table)
?
In that situation, how do they differ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The situation is after rollback. In the last SELECT statement in testRollbackSnapshotWithSchemaEvolution
,
table.schema()
returns
table {
1: col0: optional int
2: col1: optional int
}
getCurrentSnapshotSchema(table)
returns
table {
1: col0: optional int
}
cb4e146
to
b14b636
Compare
catch (NoSuchElementException e) { | ||
return Optional.empty(); | ||
} | ||
return StreamSupport.stream(snapshots.spliterator(), false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use Guava Streams.stream
return StreamSupport.stream(snapshots.spliterator(), false) | |
return stream(snapshots) |
@@ -615,6 +617,24 @@ public Iterator<TableColumnsMetadata> streamTableColumns(ConnectorSession sessio | |||
.iterator(); | |||
} | |||
|
|||
private static Schema getCurrentSnapshotSchema(Table icebergTable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add javadoc explaining how getCurrentSnapshotSchema(Table)
is different from Table#schema()
and when this should be used.
.skippingTypesCheck() | ||
.matches("VALUES 'col0'"); | ||
assertQuery("SELECT * FROM test_rollback_with_schema_evolution", "VALUES 1"); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add
// A DML should not change the table schema (which could be possible if the rolled back schema gets revived when committing th DML)
assertUpdate("INSERT INTO test_rollback_with_schema_evolution VALUES (13)");
assertThat(query("DESCRIBE test_rollback_with_schema_evolution"))
.projected(0)
.skippingTypesCheck()
.matches("VALUES 'col0'");
assertQuery("SELECT * FROM test_rollback_with_schema_evolution", "VALUES 1, 13");
This should pass, but actually the INSERT is failing.
It seems to me now that we cannot fix #13699 without Iceberg side being fixed (apache/iceberg#5591).
Let me close this PR because it requires changes in Iceberg side #13704 (comment). I will create a new PR once it's supported. |
Description
Fixes #13699
Documentation
(x) No documentation is needed.
Release notes
(x) Release notes entries required with the following suggested text: