-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Iceberg time travel #10258
Support Iceberg time travel #10258
Conversation
{ | ||
switch (pointerType) { | ||
case TEMPORAL: | ||
return versioning instanceof TimestampWithTimeZoneType || versioning instanceof TimestampType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's quite difficult to test DATE
type, I'm still thinking about how to simulate that... meanwhile I am just supporting the 2 timestamp types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's also unclear to me if we should take the beginning of a day or end of a day for time travel, so I think it's better to just not support it until explicit user request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, using dates would be problematic as Iceberg stores the snapshot timestamp in millis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's also unclear to me if we should take the beginning of a day or end of a day
TIMESTAMP 2021-01-02 03:04:05 means TIMESTAMP 2021-01-02 03:04:05.000000000
TIMESTAMP 2021-01-02 03:04 means TIMESTAMP 2021-01-02 03:04:00.000000000
TIMESTAMP 2021-01-02 means TIMESTAMP 2021-01-02 00:00:00.000000000
DATE 2021-01-02 means TIMESTAMP 2021-01-02
i.e. beginning of the day
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergReadVersionedTable.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergReadVersionedTable.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Skimmed
2124cdb
to
891221f
Compare
891221f
to
d47082f
Compare
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java
Outdated
Show resolved
Hide resolved
d47082f
to
82af1cc
Compare
82af1cc
to
aaf1cd9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, looks good to me.
One more general testing question, what happens if you have the FOR TIMESTAMP AS OF
syntax in a DML statement like an insert? Does that parse?
.build()) | ||
.setCatalog("iceberg") | ||
.build(); | ||
DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason not to reuse the code in IcebergQueryRunner#createIcebergQueryRunner
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because I need the catalog
object to perform operations like loadTable
and inspect information in the table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do use a helper method like the following
private Optional<Long> getLatestSnapshotId(String tableName)
{
MaterializedResult result = computeActual(format("SELECT snapshot_id FROM \"%s$snapshots\" ORDER BY committed_at DESC LIMIT 1", tableName));
return result.getRowCount() > 0 ? Optional.of((Long) result.getOnlyValue()) : Optional.empty();
}
and replace the createQueryRunner()
method with:
@Override
protected DistributedQueryRunner createQueryRunner()
throws Exception
{
return createIcebergQueryRunner();
}
In this fashion you can avoid the low-level details of building from scratch the IcebergQueryRunner
and working with Iceberg specific API and the maintenance of the test can be made easier.
} | ||
|
||
if (snapshotId == null) { | ||
throw new TrinoException(GENERIC_USER_ERROR, "Cannot find a snapshot older than epoch time " + timestampMillis); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More of a semantic question for the AS OF
syntax, but should this fail or should it return an empty set of data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should fail, returning an empty set of data should mean the table snapshot at that point of time actually contains no data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And this will also match the Spark SQL behavior in Iceberg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @jackye1995 that the query should fail instead of returning an empty set of data.
@alexjo2144 thanks for the review!
if you are talking about If you are talking about
|
|
||
public static long getSnapshotIdAsOfTime(Table table, long timestampMillis) | ||
{ | ||
Long snapshotId = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do check whether the timestamp is in the future.
I think we shouldn't allow future timestamps here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not? I think there could be a valid use case where a process provide a fixed timestamp for query to get consistent query result, and the timestamp could be in the future at the beginning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@findinpath i think this is a really important question.
The time travel syntax is supposed to provide a consistent, time-related view of a table.
Allowing timestamps in the future doesn't provide this, since future state naturally still may change.
i think this should be handled on the engine level though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
if (snapshotId == null) { | ||
throw new TrinoException(GENERIC_USER_ERROR, "Cannot find a snapshot older than epoch time " + timestampMillis); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From a user perspective, if the oldest history timestamp is shown, I think it should be formatted to be easily readable.
Use for reference (in case you want to show the timestamp to the user) :
- https://github.com/trinodb/trino/blob/master/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/HistoryTable.java#L85
- https://github.com/trinodb/trino/blob/master/core/trino-spi/src/main/java/io/trino/spi/type/Timestamps.java#L147 (this corresponds to the what is being seen in the
trino cli
when retrievingselect current_timestamp;
)
Just as an alternative, Snowflake uses the following message in such situations:
Time travel data is not available for table %s. The requested time is either beyond the allowed time travel period or before the object creation time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I have made the changes (similar to the Snowflake's message).
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
Thank you for your pull request and welcome to our community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. In order for us to review and merge your code, please submit the signed CLA to cla@trino.io. For more information, see https://github.com/trinodb/cla. |
I have rebased the changes. |
assertQuerySucceeds("INSERT INTO test_schema.test_iceberg_read_versioned_table VALUES ('a', 1)"); | ||
v1Unixtime = Instant.now().getEpochSecond() + 1; // go to the next second in case the operation finishes in the same second | ||
v1SnapshotId = getLatestSnapshotId("test_schema", "test_iceberg_read_versioned_table").get(); | ||
Thread.sleep(2000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please refactor the test to sleep at most few millis.
"Cannot specify end version both in table name and FOR clause"); | ||
} | ||
|
||
private String shortTzTimestamp(long unixtime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"unixtime" can be misunderstood.
call it "epochSeconds"
(same below)
@BeforeClass | ||
public void setUp() throws InterruptedException | ||
{ | ||
assertQuerySucceeds("CREATE SCHEMA test_schema"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need a new schema. Use the default schema provided by the query runner.
(i.e. don't qualify table references with schema at all)
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Show resolved
Hide resolved
} | ||
return getSnapshotIdAsOfTime(table, epochMillis); | ||
case TARGET_ID: | ||
verify(versionType == BIGINT, "Iceberg target ID time travel only supports type Bigint, but got " + versionType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid string concatenation when condition holds
verify(versionType == BIGINT, "Iceberg target ID time travel only supports type Bigint, but got %s", versionType);
} | ||
else { | ||
if (((TimestampType) versionType).isShort()) { | ||
epochMillis = ((long) version.getVersion()) / MICROSECONDS_PER_MILLISECOND; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure that the interpretation in UTC is the right one.
This is user-provided, so interpreting in session zone is probably appropriate.
cc @martint
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bump.
for now you can remove timestamp support from here, to unblock the PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I have removed the timestamp support.
{ | ||
switch (pointerType) { | ||
case TEMPORAL: | ||
return versioning instanceof TimestampWithTimeZoneType || versioning instanceof TimestampType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's also unclear to me if we should take the beginning of a day or end of a day
TIMESTAMP 2021-01-02 03:04:05 means TIMESTAMP 2021-01-02 03:04:05.000000000
TIMESTAMP 2021-01-02 03:04 means TIMESTAMP 2021-01-02 03:04:00.000000000
TIMESTAMP 2021-01-02 means TIMESTAMP 2021-01-02 00:00:00.000000000
DATE 2021-01-02 means TIMESTAMP 2021-01-02
i.e. beginning of the day
public static long getSnapshotIdAsOfTime(Table table, long timestampMillis) | ||
{ | ||
Long snapshotId = null; | ||
for (HistoryEntry logEntry : table.history()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the order of table.history()
elements?
is it guaranteed?
(the API doesn't seem to document this and this is being relied on)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is guaranteed
I can read the implementation myself, and the usages.
Still, to me "guaranteed" means something more than that.
It's not "guaranteed" unless documentation says it is guaranteed.
So for now we should do filter + max.
Or, make a PR to Iceberg to update table.history()
documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was going through this, just wanted to mention I don't think table.history() ordered is guaranteed due to staged commits. See apache/iceberg#3891 (comment) for more details.
It's part the reason why for time travel lookups of snapshots, we don't leverage binary search
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @amogh-jahagirdar for the reference
@rajarshisarkar can you add that link as an explanatory code comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. Sure, let me use the filter + max logic here and add the link as an explanatory code comment.
} | ||
|
||
if (snapshotId == null) { | ||
throw new TrinoException(GENERIC_USER_ERROR, String.format("Time travel data is not available for table " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: import format
statically
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java
Outdated
Show resolved
Hide resolved
return "CAST(" + shortTzTimestamp(unixtime) + " AT TIME ZONE 'UTC' AS TIMESTAMP(" + precision + ") WITHOUT TIME ZONE)"; | ||
} | ||
|
||
private Optional<Long> getLatestSnapshotId(String schemaName, String tableName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the pattern from io.trino.plugin.iceberg.BaseIcebergConnectorTest#getLatestSnapshotId
or extract an utility method
b5e412b
to
935996e
Compare
if (table.snapshot((long) version.getVersion()) == null) { | ||
throw new TrinoException(GENERIC_USER_ERROR, "Iceberg target ID does not exists: " + version.getVersion()); | ||
} | ||
|
||
return (long) version.getVersion(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (table.snapshot((long) version.getVersion()) == null) { | |
throw new TrinoException(GENERIC_USER_ERROR, "Iceberg target ID does not exists: " + version.getVersion()); | |
} | |
return (long) version.getVersion(); | |
ilong snapshotId = (long) version.getVersion(); | |
if (table.snapshot(snapshotId) == null) { | |
throw new TrinoException(GENERIC_USER_ERROR, "Iceberg snapshot ID does not exists: " + snapshotId); | |
} | |
return snapshotId; |
} | ||
else { | ||
if (((TimestampType) versionType).isShort()) { | ||
epochMillis = ((long) version.getVersion()) / MICROSECONDS_PER_MILLISECOND; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bump.
for now you can remove timestamp support from here, to unblock the PR
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Show resolved
Hide resolved
Long snapshotId = null; | ||
for (HistoryEntry logEntry : table.history()) { | ||
if (logEntry.timestampMillis() <= timestampMillis) { | ||
snapshotId = logEntry.snapshotId(); | ||
} | ||
} | ||
|
||
if (snapshotId == null) { | ||
throw new TrinoException(GENERIC_USER_ERROR, format("Time travel data is not available for table " + | ||
"%s. The requested time is beyond the allowed time travel period.", table.name())); | ||
} | ||
return snapshotId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Long snapshotId = null; | |
for (HistoryEntry logEntry : table.history()) { | |
if (logEntry.timestampMillis() <= timestampMillis) { | |
snapshotId = logEntry.snapshotId(); | |
} | |
} | |
if (snapshotId == null) { | |
throw new TrinoException(GENERIC_USER_ERROR, format("Time travel data is not available for table " + | |
"%s. The requested time is beyond the allowed time travel period.", table.name())); | |
} | |
return snapshotId; | |
return table.history().stream() | |
.filter(logEntry -> logEntry.timestampMillis() <= timestampMillis) | |
.max(comparing(HistoryEntry::timestampMillis)) | |
.orElseThrow(() -> new TrinoException(GENERIC_USER_ERROR, format( | |
"Time travel data is not available for table %s. The requested time is beyond the allowed time travel period.", | |
table.name()))) | |
.snapshotId(); |
public class TestIcebergReadVersionedTable | ||
extends AbstractTestQueryFramework | ||
{ | ||
private long v1Unixtime; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unixtime -> epochMillis
assertQuerySucceeds("INSERT INTO test_schema.test_iceberg_read_versioned_table VALUES ('a', 1)"); | ||
v1Unixtime = Instant.now().getEpochSecond() + 1; // go to the next second in case the operation finishes in the same second | ||
v1SnapshotId = getLatestSnapshotId("test_schema", "test_iceberg_read_versioned_table").get(); | ||
Thread.sleep(2000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please refactor the test to sleep at most few millis.
bump
v1Unixtime = Instant.now().getEpochSecond() + 1; // go to the next second in case the operation finishes in the same second | ||
v1SnapshotId = getLatestSnapshotId("test_iceberg_read_versioned_table"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
v1SnapshotId is the ID of the v1 snapshot
v1Unixtime should be timestamp of the v1 snapshot, not "some timestamp after that"
read actual snapshot's timestamp (committed_at
) instead of assuming (otherwise code requires explanation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me, I have made the change. Thanks, for the suggestion.
@AfterClass(alwaysRun = true) | ||
public void tearDown() | ||
{ | ||
assertQuerySucceeds("DROP TABLE IF EXISTS test_iceberg_read_versioned_table"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
redundant, query runner cleanup will do that
@Test | ||
public void testSelectTableWithEndLongTimestamp() | ||
{ | ||
assertQueryFails("SELECT * FROM test_iceberg_read_versioned_table FOR TIMESTAMP AS OF " + timestamp(1, 8), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inline timestamp(1, 8)
just use some fixed timestamp that's before today, as a SQL literal value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I have used TIMESTAMP '1970-01-01 00:00:00.001000000 Z'
now
|
||
private String shortTzTimestamp(long epochSeconds) | ||
{ | ||
return "from_unixtime(" + epochSeconds + ")"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from_unixtime
semantics are never obvious to me (eg is it session zone dependent?)
Make the code more explicit
private String timestampLiteral(long epochSeconds, int precision)
{
return DateTimeFormatter.ofPattern("'TIMESTAMP '''uuuu-MM-dd HH:mm:ss." + "S".repeat(precision) + " VV''")
.format(Instant.ofEpochMilli(epochSeconds).atZone(UTC));
}
this also frees you from needing to wrap the values in CAST and differentiate between "short" and "long".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks a lot cleaner, I have made the change. Thanks, for the suggestion.
} | ||
|
||
@BeforeClass | ||
public void setUp() throws InterruptedException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"throws" should be on next line
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Show resolved
Hide resolved
@rajarshisarkar did you maybe have time to apply the comments? |
@findepi I will update the PR this week. |
935996e
to
2008a31
Compare
@findepi I have addressed the review comments + rebased the changes. |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergReadVersionedTable.java
Outdated
Show resolved
Hide resolved
@rajarshisarkar i've applied some changes myself. |
@rajarshisarkar @jackye1995 thank you! |
Support AS OF TIMESTAMP and AS OF VERSION queries for Iceberg.
@electrum @findepi @losipiuk @alexjo2144