Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.3 write to branch snapshot #6651

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
9e8bf34
Spark 3.3 write to branch
namrathamyske Jan 23, 2023
ee4cadb
Spark 3.3 write to branch refactoring by review comments
namrathamyske Jan 23, 2023
3225506
Spark 3.3 write to branch refactoring by review comments
namrathamyske Jan 23, 2023
e1dfa45
Spark 3.3 write to branch data write test
namrathamyske Jan 23, 2023
58b4bf2
spotless
namrathamyske Jan 24, 2023
8677134
checking if snapshot set is branch
namrathamyske Jan 24, 2023
af17f25
Merge branch 'master' of https://github.com/apache/iceberg into spark…
namrathamyske Jan 25, 2023
7642b9e
Spark: address comments for spark branch writes
amogh-jahagirdar Feb 1, 2023
da9dcc0
Merge commit 'refs/pull/25/head' of https://github.com/namrathamyske/…
namrathamyske Feb 4, 2023
ca8e1ff
Merge branch 'master' of https://github.com/apache/iceberg into spark…
namrathamyske Feb 7, 2023
2e4eefe
review comments
namrathamyske Feb 11, 2023
de20c76
review comments
namrathamyske Feb 11, 2023
85d7475
spotless
namrathamyske Feb 11, 2023
bbf57e3
review comments changes
namrathamyske Feb 12, 2023
0e081e1
review comments changes
namrathamyske Feb 12, 2023
51b1052
new line change reversal
namrathamyske Feb 12, 2023
aa42e2e
Spark: Add tests for overwrite case
amogh-jahagirdar Feb 12, 2023
03c962d
Merge pull request #26 from amogh-jahagirdar/spark-branch-writes-more…
namrathamyske Feb 17, 2023
bed5ec3
nit review comments
namrathamyske Feb 17, 2023
332064e
Merge branch 'master' of https://github.com/apache/iceberg into spark…
namrathamyske Feb 17, 2023
6ef5f4e
Merge branch 'spark_writes' of https://github.com/namrathamyske/icebe…
namrathamyske Feb 17, 2023
8ecfdcd
adding write conf back
namrathamyske Feb 17, 2023
6b8f954
Remove SQL Write Conf, fail if write conf is specified for row level …
amogh-jahagirdar Feb 22, 2023
f8b34bd
Merge branch 'master' into spark_writes
amogh-jahagirdar Feb 22, 2023
a8a5d89
Merge branch 'master' into spark_writes
amogh-jahagirdar Feb 22, 2023
7ee1689
Address cleanup
amogh-jahagirdar Feb 23, 2023
64db07e
Allow non-existing branches in catalog#loadTable
amogh-jahagirdar Feb 23, 2023
1b2cd5a
Merge branch 'master' into spark_writes
amogh-jahagirdar Feb 23, 2023
4c94693
Remove Spark branch write option, use identifier in branch, merge/del…
amogh-jahagirdar Feb 26, 2023
2f3d6e1
Add merge tests
amogh-jahagirdar Feb 27, 2023
9bbed3a
Style
amogh-jahagirdar Feb 27, 2023
51a29b3
Remove setting branch in scan
amogh-jahagirdar Feb 27, 2023
b2692fe
Fix for metadata tables
amogh-jahagirdar Feb 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.IsolationLevel;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
Expand Down Expand Up @@ -304,4 +305,9 @@ public boolean caseSensitive() {
.defaultValue(SQLConf.CASE_SENSITIVE().defaultValueString())
.parse();
}

public String branch() {
return confParser.stringConf().option(SparkWriteOptions.BRANCH).defaultValue(SnapshotRef.MAIN_BRANCH).parse();
namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,6 @@ private SparkWriteOptions() {}

// Isolation Level for DataFrame calls. Currently supported by overwritePartitions
public static final String ISOLATION_LEVEL = "isolation-level";
// Branch to write to
namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
public static final String BRANCH = "branch";
}
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,6 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {

@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
Preconditions.checkArgument(
snapshotId == null, "Cannot write to table at a specific snapshot: %s", snapshotId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this no longer valid? I think that we do not want to write to a specific snapshot. Is branch somehow passed as the snapshot ID?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking into this a bit more, I think this is incorrect. The snapshotId is set when the table is loaded using time travel syntax. I don't think that we want to allow that.

Copy link
Contributor Author

@namrathamyske namrathamyske Jan 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue Can we add more checks that if the snapshot Id is the tip of the branch, then writing to branch is supported ?
If its the tip of the branch, then spark write should be supported.

I believe when we do
spark...save(table);

We are calling
catalog.loadtable(ident)
In DataFrameWriter.

When passing
spark..option("branch","..")

the snapshotId() is getting set

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this isn't an issue. I reverted this change and ran TestSparkDataWrite and everything passes. Let's revert this and run CI. If there are other issues outside of that test class, I'll take a look.

Copy link
Contributor Author

@namrathamyske namrathamyske Feb 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue @amogh-jahagirdar if bug fix for read by snapshot ref gets merged #6717, then write to branch snapshot will fail as per test TestDeleteFrom.java That's because of the above condition. If feel we have to tweak the condition if this is going to be there.

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar Feb 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it seems the issue is that catalog.loadTable(table) interprets the branch option as the branch read option (because both are called "branch" and we have to load the table before doing the write, it can't differentiate if it's for write or not) couldn't we just have a different config name when doing writes?

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar Feb 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@namrathamyske Yeah just updated to use the name write-branch and tests are passing. The issue is the name 'branch' is used for both read and write options, and when the loadTable is performed when doing the write , it treats it as a time travel. we should disambiguate the two. I think we should actually call it something else for the write case. write-branch kinda sounds odd to me tbh, maybe we go with toBranch. toBranch would be consistent with what's at the API and what's being done in the Flink PR. But we don't necessarily need to have parity there, whatever is the spark convention for naming and makes sense for users. @aokolnychyi @rdblue any suggestions there?

Copy link
Contributor Author

@namrathamyske namrathamyske Feb 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But i think we can't disregard calling loadTable wrt to ref passed. Later in future when we implement session configs for testing INSERT DELETE operations, there is lot of overlap between read and write. Spark logical plans call the SparkScanBuilder

, , . which should use read time travel config. SparkCopyOnWrite, SparkMergeOnRead have respective scanner for each of them which inherit from SparkScanBuilder. I will include the changes in this PR. Its still WIP.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point @namrathamyske , I was a bit short sighted we actually do want to leverage the statistics for the specific snapshot for writes. These statistics would be used during the scan itself (for example MERGE INTO branch) . So either we 1.) seek a good way to differentiate between a time travel query where the write shouldn't be able to be applied and an intentional write on a branch or 2.) we just relax the check that a snapshot is set as you did earlier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue @amogh-jahagirdar @jackye1995 this is still an open item for this PR get merged. I would prefer to go with second option. But let me know otherwise!


namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
return new SparkWriteBuilder(sparkSession(), icebergTable, info);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
private final SortOrder[] requiredOrdering;

private boolean cleanupOnAbort = true;
private String branch;
namrathamyske marked this conversation as resolved.
Show resolved Hide resolved


SparkWrite(
SparkSession spark,
Expand All @@ -140,6 +142,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled();
this.requiredDistribution = requiredDistribution;
this.requiredOrdering = requiredOrdering;
this.branch = writeConf.branch();
namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down Expand Up @@ -290,7 +293,8 @@ public String toString() {
private class BatchAppend extends BaseBatchWrite {
@Override
public void commit(WriterCommitMessage[] messages) {
AppendFiles append = table.newAppend();
AppendFiles append = table.newAppend().toBranch(branch);
namrathamyske marked this conversation as resolved.
Show resolved Hide resolved


int numFiles = 0;
for (DataFile file : files(messages)) {
Expand All @@ -312,7 +316,8 @@ public void commit(WriterCommitMessage[] messages) {
return;
}

ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
ReplacePartitions dynamicOverwrite = table.newReplacePartitions().toBranch(branch);

namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
IsolationLevel isolationLevel = writeConf.isolationLevel();
Long validateFromSnapshotId = writeConf.validateFromSnapshotId();

Expand Down Expand Up @@ -349,8 +354,7 @@ private OverwriteByFilter(Expression overwriteExpr) {

@Override
public void commit(WriterCommitMessage[] messages) {
OverwriteFiles overwriteFiles = table.newOverwrite();
overwriteFiles.overwriteByRowFilter(overwriteExpr);
OverwriteFiles overwriteFiles = table.newOverwrite().toBranch(branch).overwriteByRowFilter(overwriteExpr);

namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
int numFiles = 0;
for (DataFile file : files(messages)) {
Expand Down Expand Up @@ -411,7 +415,7 @@ private Expression conflictDetectionFilter() {

@Override
public void commit(WriterCommitMessage[] messages) {
OverwriteFiles overwriteFiles = table.newOverwrite();
OverwriteFiles overwriteFiles = table.newOverwrite().toBranch(branch);

List<DataFile> overwrittenFiles = overwrittenFiles();
int numOverwrittenFiles = overwrittenFiles.size();
Expand Down Expand Up @@ -536,7 +540,7 @@ protected <T> void commit(SnapshotUpdate<T> snapshotUpdate, long epochId, String
}

private Long findLastCommittedEpochId() {
Snapshot snapshot = table.currentSnapshot();
Snapshot snapshot = table.snapshot(branch);
namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
Long lastCommittedEpochId = null;
while (snapshot != null) {
Map<String, String> summary = snapshot.summary();
Expand Down Expand Up @@ -570,7 +574,7 @@ protected String mode() {

@Override
protected void doCommit(long epochId, WriterCommitMessage[] messages) {
AppendFiles append = table.newFastAppend();
AppendFiles append = table.newFastAppend().toBranch(branch);
namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
int numFiles = 0;
for (DataFile file : files(messages)) {
append.appendFile(file);
Expand All @@ -588,7 +592,7 @@ protected String mode() {

@Override
public void doCommit(long epochId, WriterCommitMessage[] messages) {
OverwriteFiles overwriteFiles = table.newOverwrite();
OverwriteFiles overwriteFiles = table.newOverwrite().toBranch(branch);
namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
overwriteFiles.overwriteByRowFilter(Expressions.alwaysTrue());
int numFiles = 0;
for (DataFile file : files(messages)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,18 @@ public class TestSparkDataWrite {

@Rule public TemporaryFolder temp = new TemporaryFolder();

@Parameterized.Parameters(name = "format = {0}")
private String branch;

@Parameterized.Parameters(name = "format = {0}, branch = {1}")
public static Object[] parameters() {
return new Object[] {"parquet", "avro", "orc"};
return new Object[][] {
new Object[] {"parquet", "main"},
new Object[] {"parquet", "testBranch"},
new Object[] {"avro", "main"},
new Object[] {"avro", "testBranch"},
new Object[] {"orc", "main"},
new Object[] {"orc", "testBranch"},
};
}

@BeforeClass
Expand All @@ -93,8 +102,9 @@ public static void stopSpark() {
currentSpark.stop();
}

public TestSparkDataWrite(String format) {
public TestSparkDataWrite(String format, String branch) {
this.format = FileFormat.fromString(format);
this.branch = branch;
}

@Test
Expand All @@ -117,17 +127,18 @@ public void testBasicWrite() throws IOException {
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Append)
.option("branch", branch)
namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
.save(location.toString());

table.refresh();

Dataset<Row> result = spark.read().format("iceberg").load(location.toString());
Dataset<Row> result = spark.read().format("iceberg").option("branch", branch).load(location.toString());

List<SimpleRecord> actual =
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
Assert.assertEquals("Result rows should match", expected, actual);
for (ManifestFile manifest : table.currentSnapshot().allManifests(table.io())) {
for (ManifestFile manifest : table.snapshot(branch).allManifests(table.io())) {
namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
for (DataFile file : ManifestFiles.read(manifest, table.io())) {
// TODO: avro not support split
if (!format.equals(FileFormat.AVRO)) {
Expand Down Expand Up @@ -175,6 +186,7 @@ public void testAppend() throws IOException {
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Append)
.option("branch", branch)
.save(location.toString());

df.withColumn("id", df.col("id").plus(3))
Expand All @@ -183,11 +195,12 @@ public void testAppend() throws IOException {
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Append)
.save(location.toString());
.option("branch", branch)
.save(location.toString());

table.refresh();

Dataset<Row> result = spark.read().format("iceberg").load(location.toString());
Dataset<Row> result = spark.read().format("iceberg").option("branch", branch).load(location.toString());

List<SimpleRecord> actual =
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Expand Down Expand Up @@ -216,7 +229,8 @@ public void testEmptyOverwrite() throws IOException {
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Append)
.save(location.toString());
.option("branch", branch)
.save(location.toString());

Dataset<Row> empty = spark.createDataFrame(ImmutableList.of(), SimpleRecord.class);
empty
Expand All @@ -226,11 +240,12 @@ public void testEmptyOverwrite() throws IOException {
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Overwrite)
.option("overwrite-mode", "dynamic")
.save(location.toString());
.option("branch", branch)
.save(location.toString());

table.refresh();

Dataset<Row> result = spark.read().format("iceberg").load(location.toString());
Dataset<Row> result = spark.read().format("iceberg").option("branch", branch).load(location.toString());

List<SimpleRecord> actual =
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Expand Down Expand Up @@ -266,7 +281,8 @@ public void testOverwrite() throws IOException {
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Append)
.save(location.toString());
.option("branch", branch)
.save(location.toString());

// overwrite with 2*id to replace record 2, append 4 and 6
df.withColumn("id", df.col("id").multiply(2))
Expand All @@ -276,11 +292,12 @@ public void testOverwrite() throws IOException {
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Overwrite)
.option("overwrite-mode", "dynamic")
.save(location.toString());
.option("branch", branch)
.save(location.toString());

table.refresh();

Dataset<Row> result = spark.read().format("iceberg").load(location.toString());
Dataset<Row> result = spark.read().format("iceberg").option("branch", branch).load(location.toString());

List<SimpleRecord> actual =
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Expand Down Expand Up @@ -308,19 +325,21 @@ public void testUnpartitionedOverwrite() throws IOException {
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Append)
.save(location.toString());
.option("branch", branch)
.save(location.toString());

// overwrite with the same data; should not produce two copies
df.select("id", "data")
.write()
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Overwrite)
.save(location.toString());
.option("branch", branch)
.save(location.toString());

table.refresh();

Dataset<Row> result = spark.read().format("iceberg").load(location.toString());
Dataset<Row> result = spark.read().format("iceberg").option("branch", branch).load(location.toString());

List<SimpleRecord> actual =
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Expand Down Expand Up @@ -354,19 +373,20 @@ public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Append)
.save(location.toString());
.option("branch", branch)
.save(location.toString());

table.refresh();

Dataset<Row> result = spark.read().format("iceberg").load(location.toString());
Dataset<Row> result = spark.read().format("iceberg").option("branch", branch).load(location.toString());

List<SimpleRecord> actual =
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
Assert.assertEquals("Result rows should match", expected, actual);

List<DataFile> files = Lists.newArrayList();
for (ManifestFile manifest : table.currentSnapshot().allManifests(table.io())) {
for (ManifestFile manifest : table.snapshot(branch).allManifests(table.io())) {
for (DataFile file : ManifestFiles.read(manifest, table.io())) {
files.add(file);
}
Expand Down