Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.3 write to branch snapshot #6651

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
9e8bf34
Spark 3.3 write to branch
namrathamyske Jan 23, 2023
ee4cadb
Spark 3.3 write to branch refactoring by review comments
namrathamyske Jan 23, 2023
3225506
Spark 3.3 write to branch refactoring by review comments
namrathamyske Jan 23, 2023
e1dfa45
Spark 3.3 write to branch data write test
namrathamyske Jan 23, 2023
58b4bf2
spotless
namrathamyske Jan 24, 2023
8677134
checking if snapshot set is branch
namrathamyske Jan 24, 2023
af17f25
Merge branch 'master' of https://github.com/apache/iceberg into spark…
namrathamyske Jan 25, 2023
7642b9e
Spark: address comments for spark branch writes
amogh-jahagirdar Feb 1, 2023
da9dcc0
Merge commit 'refs/pull/25/head' of https://github.com/namrathamyske/…
namrathamyske Feb 4, 2023
ca8e1ff
Merge branch 'master' of https://github.com/apache/iceberg into spark…
namrathamyske Feb 7, 2023
2e4eefe
review comments
namrathamyske Feb 11, 2023
de20c76
review comments
namrathamyske Feb 11, 2023
85d7475
spotless
namrathamyske Feb 11, 2023
bbf57e3
review comments changes
namrathamyske Feb 12, 2023
0e081e1
review comments changes
namrathamyske Feb 12, 2023
51b1052
new line change reversal
namrathamyske Feb 12, 2023
aa42e2e
Spark: Add tests for overwrite case
amogh-jahagirdar Feb 12, 2023
03c962d
Merge pull request #26 from amogh-jahagirdar/spark-branch-writes-more…
namrathamyske Feb 17, 2023
bed5ec3
nit review comments
namrathamyske Feb 17, 2023
332064e
Merge branch 'master' of https://github.com/apache/iceberg into spark…
namrathamyske Feb 17, 2023
6ef5f4e
Merge branch 'spark_writes' of https://github.com/namrathamyske/icebe…
namrathamyske Feb 17, 2023
8ecfdcd
adding write conf back
namrathamyske Feb 17, 2023
6b8f954
Remove SQL Write Conf, fail if write conf is specified for row level …
amogh-jahagirdar Feb 22, 2023
f8b34bd
Merge branch 'master' into spark_writes
amogh-jahagirdar Feb 22, 2023
a8a5d89
Merge branch 'master' into spark_writes
amogh-jahagirdar Feb 22, 2023
7ee1689
Address cleanup
amogh-jahagirdar Feb 23, 2023
64db07e
Allow non-existing branches in catalog#loadTable
amogh-jahagirdar Feb 23, 2023
1b2cd5a
Merge branch 'master' into spark_writes
amogh-jahagirdar Feb 23, 2023
4c94693
Remove Spark branch write option, use identifier in branch, merge/del…
amogh-jahagirdar Feb 26, 2023
2f3d6e1
Add merge tests
amogh-jahagirdar Feb 27, 2023
9bbed3a
Style
amogh-jahagirdar Feb 27, 2023
51a29b3
Remove setting branch in scan
amogh-jahagirdar Feb 27, 2023
b2692fe
Fix for metadata tables
amogh-jahagirdar Feb 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.FileIO;
Expand Down Expand Up @@ -394,4 +395,19 @@ public static Schema schemaFor(Table table, Long snapshotId, Long timestampMilli

return table.schema();
}

/**
* Fetch the snapshot at the head of the given branch in the given table.
*
* @param table a {@link Table}
* @param branch a branch
* @return the latest snapshot for the given branch
*/
public static Snapshot latestSnapshot(Table table, String branch) {
if (SnapshotRef.MAIN_BRANCH.equals(branch)) {
return table.currentSnapshot();
} else {
return table.snapshot(branch);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
Expand All @@ -66,6 +67,7 @@
public abstract class SparkRowLevelOperationsTestBase extends SparkExtensionsTestBase {

private static final Random RANDOM = ThreadLocalRandom.current();
protected static final String TEST_BRANCH = "test_branch";

protected final String fileFormat;
protected final boolean vectorized;
Expand Down Expand Up @@ -173,14 +175,23 @@ protected void createAndInitTable(String schema, String jsonData) {
createAndInitTable(schema, "", jsonData);
}

protected void createAndInitTableBranch(String schema, String jsonData, String branch) {
createAndInitTableBranch(schema, "", jsonData, branch);
}

protected void createAndInitTable(String schema, String partitioning, String jsonData) {
createAndInitTableBranch(schema, partitioning, jsonData, SnapshotRef.MAIN_BRANCH);
}

protected void createAndInitTableBranch(
String schema, String partitioning, String jsonData, String branch) {
sql("CREATE TABLE %s (%s) USING iceberg %s", tableName, schema, partitioning);
initTable();

if (jsonData != null) {
try {
Dataset<Row> ds = toDS(schema, jsonData);
ds.coalesce(1).writeTo(tableName).append();
ds.coalesce(1).writeTo(tableNameWithBranch(branch)).append();
} catch (NoSuchTableException e) {
throw new RuntimeException("Failed to write data", e);
}
Expand Down Expand Up @@ -315,4 +326,12 @@ protected DataFile writeDataFile(Table table, List<GenericRecord> records) {
throw new UncheckedIOException(e);
}
}

protected String tableNameWithBranch(String branch) {
if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
return tableName;
}

return tableName + "." + branch;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.extensions;

import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE;

import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.junit.Test;
import org.junit.runners.Parameterized;

public class TestCopyOnWriteBranchDelete extends TestCopyOnWriteDelete {

public TestCopyOnWriteBranchDelete(
String catalogName,
String implementation,
Map<String, String> config,
String fileFormat,
Boolean vectorized,
String distributionMode) {
super(catalogName, implementation, config, fileFormat, vectorized, distributionMode);
}

@Parameterized.Parameters(
name =
"catalogName = {0}, implementation = {1}, config = {2},"
+ " format = {3}, vectorized = {4}, distributionMode = {5}")
public static Object[][] parameters() {
return new Object[][] {
{
"testhive",
SparkCatalog.class.getName(),
ImmutableMap.of(
"type", "hive",
"default-namespace", "default"),
"parquet",
true,
WRITE_DISTRIBUTION_MODE_NONE
},
};
}

@Test
public void testDeleteWithoutScanningTable() throws Exception {
testDeleteWithoutScanningTable(TEST_BRANCH);
}

@Test
public void testDeleteFileThenMetadataDelete() throws Exception {
testDeleteFileThenMetadataDelete(TEST_BRANCH);
}

@Test
public void testDeleteWithFalseCondition() {
testDeleteWithFalseCondition(TEST_BRANCH);
}

@Test
public void testDeleteFromEmptyTable() {
testDeleteFromEmptyTable(TEST_BRANCH);
}

@Test
public void testExplain() {
testExplain(TEST_BRANCH);
}

@Test
public void testDeleteWithAlias() {
testDeleteWithAlias(TEST_BRANCH);
}

@Test
public void testDeleteWithDynamicFileFiltering() throws NoSuchTableException {
testDeleteWithDynamicFileFiltering(TEST_BRANCH);
}

@Test
public void testDeleteNonExistingRecords() {
testDeleteNonExistingRecords(TEST_BRANCH);
}

@Test
public void testDeleteWithoutCondition() {
testDeleteWithoutCondition(TEST_BRANCH);
}

@Test
public void testDeleteUsingMetadataWithComplexCondition() {
testDeleteUsingMetadataWithComplexCondition(TEST_BRANCH);
}

@Test
public void testDeleteWithArbitraryPartitionPredicates() {
testDeleteWithArbitraryPartitionPredicates(TEST_BRANCH);
}

@Test
public void testDeleteWithNonDeterministicCondition() {
testDeleteWithNonDeterministicCondition(TEST_BRANCH);
}

@Test
public void testDeleteWithFoldableConditions() {
testDeleteWithFoldableConditions(TEST_BRANCH);
}

@Test
public void testDeleteWithNullConditions() {
testDeleteWithNullConditions(TEST_BRANCH);
}

@Test
public void testDeleteWithInAndNotInConditions() {
testDeleteWithInAndNotInConditions(TEST_BRANCH);
}

@Test
public void testDeleteWithMultipleRowGroupsParquet() throws NoSuchTableException {
testDeleteWithMultipleRowGroupsParquet(TEST_BRANCH);
}

@Test
public void testDeleteWithConditionOnNestedColumn() {
testDeleteWithConditionOnNestedColumn(TEST_BRANCH);
}

@Test
public void testDeleteWithInSubquery() throws NoSuchTableException {
testDeleteWithInSubquery(TEST_BRANCH);
}

@Test
public void testDeleteWithMultiColumnInSubquery() throws NoSuchTableException {
testDeleteWithNotInSubquery(TEST_BRANCH);
}

@Test
public void testDeleteWithNotInSubquery() throws NoSuchTableException {
testDeleteWithNotInSubquery(TEST_BRANCH);
}

@Test
public void testDeleteOnNonIcebergTableNotSupported() {
testDeleteOnNonIcebergTableNotSupported(TEST_BRANCH);
}

@Test
public void testDeleteWithExistSubquery() throws NoSuchTableException {
testDeleteWithExistSubquery(TEST_BRANCH);
}

@Test
public void testDeleteWithNotExistsSubquery() throws NoSuchTableException {
testDeleteWithNotExistsSubquery(TEST_BRANCH);
}

@Test
public void testDeleteWithScalarSubquery() throws NoSuchTableException {
testDeleteWithScalarSubquery(TEST_BRANCH);
}

@Test
public synchronized void testDeleteWithSerializableIsolation() throws InterruptedException {
testDeleteWithSerializableIsolation(TEST_BRANCH);
}

@Test
public synchronized void testDeleteWithSnapshotIsolation()
throws InterruptedException, ExecutionException {
testDeleteWithSnapshotIsolation(TEST_BRANCH);
}

@Test
public void testDeleteRefreshesRelationCache() throws NoSuchTableException {
testDeleteRefreshesRelationCache(TEST_BRANCH);
}

@Test
public void testDeleteWithMultipleSpecs() {
testDeleteWithMultipleSpecs(TEST_BRANCH);
}
}
Loading