-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: Use snapshot schema when reading snapshot #3722
Conversation
bec6bd5
to
31399e6
Compare
@rdblue @jackye1995 please take a look. This incorporates #3269 (updated). It would be nice if this could make it into 0.13, as using the snapshot schema is already implemented in the Spark 2.4 support. |
Yes agree, I think we need to include this for 0.13 consistent experience in 3.x and 2.4. Please let me know if anyone is against it. |
Skimmed through this, mostly look good to me as most of the content was reviewed once in the original PR, I will take another deeper look in the afternoon. And FYI, I am also adding the time travel support in Trino (trinodb/trino#10258), I will add another PR to match this behavior. |
@aokolnychyi can you take a look too, in case it conflicts with any changes you're making? |
|
||
if (requestedSchema != null) { | ||
// convert the requested schema to throw an exception if any requested fields are unknown | ||
SparkSchemaUtil.convert(icebergTable.schema(), requestedSchema); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pointed this out in #1508 and I'll point it out again here:
I removed requestedSchema
from SparkTable
because with #1783, the Spark 3 IcebergSource
changed to be a SupportsCatalogOptions
, not just a TableProvider
. Since DataFrameReader
does not support specifying a schema when reading from an IcebergSource
:
DataSource.lookupDataSourceV2(source, sparkSession.sessionState.conf).map { provider =>
...
val (table, catalog, ident) = provider match {
case _: SupportsCatalogOptions if userSpecifiedSchema.nonEmpty =>
throw new IllegalArgumentException(
s"$source does not support user specified schema. Please don't specify the schema.")
(see https://github.com/apache/spark/blob/v3.2.0/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L220-L223)
there is no reason to have a requestedSchema
field as we cannot make use of it.
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
Outdated
Show resolved
Hide resolved
This has been implemented for Spark 2. For Spark 3, Ryan Blue proposed a syntax for adding the snapshot id or timestamp to the table identifier in apache#3269. Here we implement the Spark 3 support for using the snapshot schema by using the proposed table identifier syntax. This is until a new Spark 3 is released with support for AS OF in Spark SQL. Note: The table identifier syntax is for internal use only (as in this implementation) and not meant to be exposed as a publicly supported syntax in SQL. However, for testing, we do test its use from SQL.
Add a Schema parameter to the SparkScanBuilder constructor, so that we can pass the snapshot schema in when constructing it. In SparkTable#newScanBuilder, construct SparkScanBuilder with the snapshot schema.
ba8c820
to
7ba1eab
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall looks good to me, I don't have any further comments around the time travel logic. I think we are missing a few failure test cases, could you add those?
String value = options.get(property); | ||
if (value != null) { | ||
return Long.parseLong(value); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline after if
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add a blank line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a blank line.
What is the rationale for always adding a blank line after an if?
I fail to see how this makes the code more readable.
I can understand breaking a large block of code up with blank lines in general, but this is a very short method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes agree. I think it's mostly just general codestyle rules the community follows, maybe we should just put these into checkstyle instead of being human linters
@@ -120,4 +120,42 @@ public void testMetadataTables() { | |||
ImmutableList.of(row(ANY, ANY, null, "append", ANY, ANY)), | |||
sql("SELECT * FROM %s.snapshots", tableName)); | |||
} | |||
|
|||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are missing a few failure test cases:
- Cannot specify both snapshot-id and as-of-timestamp
- Cannot write from table at a specific snapshot
- Cannot delete from table at a specific snapshot
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. I agree that it'd be good to have such test cases. I'd point out though that none of the above should be supported even before this change, so if the test cases don't exist, they are existing holes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added test cases for reading with both snapshot-id and as-of-timestamp, writing to a table at a specific snapshot, and deleting from a table at a specific snapshot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These look good to me. Thanks for adding them!
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
Outdated
Show resolved
Hide resolved
// or timestamp selector, then SparkTable will be constructed with a non-null snapshotId, but | ||
// SparkTable#newScanBuilder will be called without the "snapshot-id" or "as-of-timestamp" option. | ||
// We therefore add a "snapshot-id" option here in this latter case. | ||
CaseInsensitiveStringMap scanOptions = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this is worth the complexity. Why not just always add the snapshot ID if snapshotId
is set? We know that if it is set, the option snapshot-id
or as-of-timestamp
should correspond to it. We should just make sure that the given snapshot ID is set in the options and remove as-of-timestamp
if it is set. That makes this whole block simpler:
CaseInsensitiveStringMap scanOptions = snapshotId != null ? addSnapshotId(options, snapshotId) : options;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the update to addSnapshotId
below, this worked fine with tests:
CaseInsensitiveStringMap scanOptions = addSnapshotId(options, snapshotId);
It didn't need the null check because that's done inside addSnapshotId
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me try it out.
I had run into a problem with the original addSnapshotId
function and always calling it. After I analysed what was happening, I wrote that comment to remind myself. I therefore called addSnapshotId
only when strictly necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I see that you changed
Preconditions.checkArgument(snapshotIdFromOptions == null,
"Cannot override snapshot ID more than once: %s", snapshotIdFromOptions);
to
Preconditions.checkArgument(snapshotIdFromOptions == null || snapshotId.toString().equals(snapshotIdFromOptions),
"Cannot override snapshot ID more than once: %s", snapshotIdFromOptions);
in addSnapshotId
.
With the old version, you should only call addSnapshotId
if the options
did not already have snapshot-id
or as-of-timestamp
.
|
||
Map<String, String> scanOptions = Maps.newHashMap(); | ||
scanOptions.putAll(options.asCaseSensitiveMap()); | ||
scanOptions.put(SparkReadOptions.SNAPSHOT_ID, String.valueOf(snapshotId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should also remove as-of-timestamp
since snapshot-id
is being set. I think I missed that in my PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated this to the following and tests work fine:
private static CaseInsensitiveStringMap addSnapshotId(CaseInsensitiveStringMap options, Long snapshotId) {
if (snapshotId != null) {
String snapshotIdFromOptions = options.get(SparkReadOptions.SNAPSHOT_ID);
Preconditions.checkArgument(snapshotIdFromOptions == null || snapshotId.toString().equals(snapshotIdFromOptions),
"Cannot override snapshot ID more than once: %s", snapshotIdFromOptions);
Map<String, String> scanOptions = Maps.newHashMap();
scanOptions.putAll(options.asCaseSensitiveMap());
scanOptions.put(SparkReadOptions.SNAPSHOT_ID, String.valueOf(snapshotId));
scanOptions.remove(SparkReadOptions.AS_OF_TIMESTAMP);
return new CaseInsensitiveStringMap(scanOptions);
}
return options;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, that makes sense.
Assert.assertEquals("Records should match", originalRecords, | ||
resultDf.orderBy("id").collectAsList()); | ||
|
||
Snapshot snapshot1 = table.currentSnapshot(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A better name would be beforeAddColumn
. In general, I think adding numbers to a generic name is not a good practice for readable tests.
"spark_catalog".equals(catalogName)); | ||
|
||
// get a timestamp just after the last write and get the current row set as expected | ||
long timestamp = validationCatalog.loadTable(tableIdent).currentSnapshot().timestampMillis() + 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use waitUntilAfter
defined in SparkTestBase
to avoid flaky tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a waitUntilAfter
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, this is ready to go in. My only real concern is over timestamps in testing without using waitUntilAfter
. I think we can also simplify handling in newScanBuilder
, but that's minor and I think that the current logic is correct.
Thanks, @wypoon! This looks great I think we can get it in with a couple minor changes. |
Minor tweaks to tests.
@rdblue thanks for all the reviews. I adopted your suggestion around |
@rdblue @jackye1995 if this can be merged, I'll prepare PRs for Spark 3.1 and 3.0 for porting it. I'll be on vacation for the next two weeks. |
Thanks, @wypoon! Nice work. |
This has been implemented for Spark 2 in #1508. For Spark 3, Ryan Blue proposed a syntax for adding the snapshot id or timestamp to the table identifier in #3269. Here we implement the Spark 3 support for using the snapshot schema by using the proposed table identifier syntax. This is until a new version of Spark 3 is released with support for
AS OF
in Spark SQL.Note: The table identifier syntax is for internal use only (as in this implementation) and not meant to be exposed as a publicly supported syntax in SQL. However, for testing, we do test its use from SQL.