-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 3.3 write to branch snapshot #6651
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! commented some suggestions to simplify the code a bit, could we add some tests for this? I think TestSparkDataWrite is the right place
@@ -304,4 +304,9 @@ public boolean caseSensitive() { | |||
.defaultValue(SQLConf.CASE_SENSITIVE().defaultValueString()) | |||
.parse(); | |||
} | |||
|
|||
public String branch() { | |||
return confParser.stringConf().option(SparkWriteOptions.BRANCH).parseOptional(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this return main if no branch is passed in? I think that will simplify some of the below logic a bit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, this is still WIP. I wanted to know if i am missing some major spark write operations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure just took a look at SparkWrite, I think we're missing the streaming append/overwrite cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@amogh-jahagirdar Added tests
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
Show resolved
Hide resolved
@@ -247,9 +247,6 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { | |||
|
|||
@Override | |||
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { | |||
Preconditions.checkArgument( | |||
snapshotId == null, "Cannot write to table at a specific snapshot: %s", snapshotId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this no longer valid? I think that we do not want to write to a specific snapshot. Is branch somehow passed as the snapshot ID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After looking into this a bit more, I think this is incorrect. The snapshotId
is set when the table is loaded using time travel syntax. I don't think that we want to allow that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue Can we add more checks that if the snapshot Id is the tip of the branch, then writing to branch is supported ?
If its the tip of the branch, then spark write should be supported.
I believe when we do
spark...save(table);
We are calling
catalog.loadtable(ident)
In DataFrameWriter.
When passing
spark..option("branch","..")
the snapshotId() is getting set
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this isn't an issue. I reverted this change and ran TestSparkDataWrite
and everything passes. Let's revert this and run CI. If there are other issues outside of that test class, I'll take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue @amogh-jahagirdar if bug fix for read by snapshot ref gets merged #6717, then write to branch snapshot will fail as per test TestDeleteFrom.java That's because of the above condition. If feel we have to tweak the condition if this is going to be there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it seems the issue is that catalog.loadTable(table)
interprets the branch option as the branch read option (because both are called "branch" and we have to load the table before doing the write, it can't differentiate if it's for write or not) couldn't we just have a different config name when doing writes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@namrathamyske Yeah just updated to use the name write-branch
and tests are passing. The issue is the name 'branch' is used for both read and write options, and when the loadTable is performed when doing the write , it treats it as a time travel. we should disambiguate the two. I think we should actually call it something else for the write case. write-branch
kinda sounds odd to me tbh, maybe we go with toBranch
. toBranch would be consistent with what's at the API and what's being done in the Flink PR. But we don't necessarily need to have parity there, whatever is the spark convention for naming and makes sense for users. @aokolnychyi @rdblue any suggestions there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But i think we can't disregard calling loadTable wrt to ref passed. Later in future when we implement session configs for testing INSERT
DELETE
operations, there is lot of overlap between read and write. Spark logical plans call the SparkScanBuilder
iceberg/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Line 260 in 32a8ef5
if (branch != null) { |
iceberg/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Line 424 in 32a8ef5
Snapshot snapshot = table.currentSnapshot(); |
iceberg/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Line 393 in 32a8ef5
Snapshot snapshot = table.currentSnapshot(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point @namrathamyske , I was a bit short sighted we actually do want to leverage the statistics for the specific snapshot for writes. These statistics would be used during the scan itself (for example MERGE INTO branch) . So either we 1.) seek a good way to differentiate between a time travel query where the write shouldn't be able to be applied and an intentional write on a branch or 2.) we just relax the check that a snapshot is set as you did earlier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue @amogh-jahagirdar @jackye1995 this is still an open item for this PR get merged. I would prefer to go with second option. But let me know otherwise!
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks really close. Thanks @namrathamyske!
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
Outdated
Show resolved
Hide resolved
boolean branchOptionPresent = info.options().containsKey(SparkWriteOptions.BRANCH); | ||
if (!branchOptionPresent) { | ||
Preconditions.checkArgument( | ||
snapshotId == null, "Cannot write to table at a specific snapshot: %s", snapshotId); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll need to double check the code in case there's any other implications to this but in case we do go this route could we combine the boolean and precondition check
boolean branchWrite = info.options().containsKey(SparkWriteOptions.BRANCH);
Preconditions.checkArgument(branchWrite || snapshotId == null, "Cannot write to table at a specific snapshot: %s", snapshotId)
@namrathamyske I raised a PR to your branch to address the remaining comments, let me know your thoughts. I also did an inspection of the code and don't think we need to remove the check here #6651 (comment). We can wait for CI though to verify all cases and then see a path forward if anything else fails like @rdblue mentioned. |
I'd be interested to review this one on Monday. |
return confParser.stringConf().option(SparkReadOptions.BRANCH).parseOptional(); | ||
return confParser | ||
.stringConf() | ||
.sessionConf(SparkReadOptions.BRANCH) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should be the Spark read option. Session configs are defined in SparkSQLProperties
and use a form like spark.sql.iceberg._____
. In this case, how about spark.sql.iceberg.read-branch
?
I think we also need to be careful about the behavior of this, and possibly use a different SparkReadConf
option for it. When the SQL read branch is set, we want to use that if it is present and otherwise fall back to the main branch. That may make this complicated enough that we should leave the SQL session read branch out of this commit and add it in a follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue @amogh-jahagirdar i am going to remove all session related changes from this PR so that can raise a follow up one. We can support only write by spark option in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@namrathamyske, what about adding back the write side? I think the problem is just with the read side and we can't test the write side of this PR without the SQL option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue if we just have write side and not read side how can we use session conf ? i think spark sql operations uses scans to read as i explained in #6651 (comment).
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
Outdated
Show resolved
Hide resolved
Preconditions.checkArgument( | ||
branchSnapshot != null, "Cannot find snapshot associated with branch name: %s", branch); | ||
return new SparkTable(table, branchSnapshot.snapshotId(), !cacheEnabled); | ||
|
||
// It's possible that the branch does not exist when performing writes to new branches. | ||
// Load table should still succeed when spark is performing the write. | ||
// Reads performed on non-existing branches will fail at a later point | ||
Long branchSnapshotId = branchSnapshot == null ? null : branchSnapshot.snapshotId(); | ||
return new SparkTable(table, branchSnapshotId, !cacheEnabled); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@namrathamyske @rdblue @aokolnychyi @jackye1995 I'm removing this check because this prevents writing to new branches. Catalog#loadTable gets called in spark when planning the write, and we fail the validation check that the branch snapshot exists. I added a test to validate that if a read on an invalid branch is performed we still fail (albeit later, when trying to build the scan).
Where is this happening? It sounds like write options are being mixed with read options. |
Catalog#loadTable is called when DataFrameWriter#save is called https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L294 |
Ah I think I get what you're saying, it's only loading the table with the branch because of the identifier matching we introduced in #6717 and the read option name is the same as the write option name. We could introduce a different option name for branch writes and then they won't collide but I think a benefit of the current approach is the operation will actually use the branch schema during analysis and during planning of the write. |
@amogh-jahagirdar Agree with you. Also as we discussed we should not have read and write options with different name. That's because write options are also used for scans. |
I think that we have a general problem using options for writing to a table that we're trying to work around, but in the end it's probably not a good idea to use read and write options for branching. There are a few issues we've already hit:
I was talking with @aokolnychyi about this yesterday and I think he's right: the best way to solve these problems is to load a branch like we would a table, so that all uses of the branch/table are consistent. The read path does this by using |
@rdblue @aokolnychyi I'm on board with that, I do agree placing the branch in the identifier itself really gets us out of a lot of the awkward work arounds we have to do for supporting branch writes through data frame writes. I'll work on a change to remove the write branch and have everything in the identifier. |
return buildMergeOnReadScan(SnapshotRef.MAIN_BRANCH); | ||
} | ||
|
||
public Scan buildMergeOnReadScan(String writeBranch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to pass write branch here ? i see this.branch exists already.
@@ -82,6 +82,8 @@ public Long endSnapshotId() { | |||
return confParser.longConf().option(SparkReadOptions.END_SNAPSHOT_ID).parseOptional(); | |||
} | |||
|
|||
/** @deprecated will be removed in 1.3.0; specify read branch in the table identifier */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we at-least support reading by spark options. We already support time travel by spark options and branch/tag is also a part of the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's probably a way to reconcile having both read options and querying via select * from table.branch_identifier
. Right now it's handled by just validating that if a read option is set, validate the identifier branch is the same.
That being said I don't think it's worth it in the long run for the user experience, because then for Dataframes there's multiple ways for a user to query a branch (in the identifier and for read options) so it gets a bit confusing there for users. We then have the maintainability issues for handling cases where read options and identifiers are set. Considering we now know that having the branch in the identifier is the way to go for the write case, it makes sense to me to have the same single approach for the read case.
26470a0
to
b2692fe
Compare
If there's no objections, I'll raise a separate PR since the approach and user experience is different than how the PR started. It also should make it easier to review for reviewers to avoid getting distracted by discussions which may no longer be relevant. I'll be sure to mark @namrathamyske as co-author too! |
What about using the exact same syntax we support for reads? I believe it would be cc @rdblue |
I am a bit confused in terms of what PRs to review. Do we plan to migrate to table identifiers in this PR or separate? |
@aokolnychyi I synced with Amogh offline, I will create a new PR for the new approach and will ping everyone here for review. It will be through the same syntax as read through table identifier. |
I'm going to close this now that #6965 is open. @namrathamyske and @amogh-jahagirdar, you both did an amazing job on this and I want to say a huge thank you for your work here! Great to get to the point where we have a clear path forward and I'm sure we're going to see a lot of the contributions here in the other PR. |
issue addressed from: #3896
Purpose of this PR is to enable spark writes on a branch snapshot.
@rdblue @aokolnychyi @jackye1995 @amogh-jahagirdar thoughts on this ?