From 5e34e23c743526d5a5551ddee05b6f0085cfee4a Mon Sep 17 00:00:00 2001 From: Namratha Mysore Keshavaprakash Date: Tue, 31 Jan 2023 22:22:34 -0800 Subject: [PATCH 01/11] spark 3.3 read by snapshot ref bug fix --- .../apache/iceberg/spark/SparkCatalog.java | 28 ++++++++++++-- .../iceberg/spark/source/IcebergSource.java | 6 +++ .../iceberg/spark/source/SparkTable.java | 2 +- .../spark/source/TestSnapshotSelection.java | 38 +++++++++++++++++++ 4 files changed, 70 insertions(+), 4 deletions(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 9714ec3f4a88..e078a2b94477 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -24,11 +24,13 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CachingCatalog; import org.apache.iceberg.CatalogProperties; @@ -37,6 +39,7 @@ import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.Catalog; @@ -105,6 +108,7 @@ public class SparkCatalog extends BaseCatalog { private static final Splitter COMMA = Splitter.on(","); private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)"); private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)"); + private static final Pattern BRANCH = Pattern.compile("branch_(.*)"); private String catalogName = null; private Catalog icebergCatalog = null; @@ -654,6 +658,13 @@ private Table load(Identifier ident) { return new SparkTable(table, snapshotId, !cacheEnabled); } + Matcher branch = BRANCH.matcher(ident.name()); + if (branch.matches()) { + Snapshot snapshot = table.snapshot(branch.group(1)); + if (snapshot != null) { + return new SparkTable(table, snapshot.snapshotId(), !cacheEnabled); + } + } // the name wasn't a valid snapshot selector and did not point to the changelog // throw the original exception throw e; @@ -678,6 +689,7 @@ private Table loadFromPathIdentifier(PathIdentifier ident) { String metadataTableName = null; Long asOfTimestamp = null; Long snapshotId = null; + String branch = null; boolean isChangelog = false; for (String meta : parsed.second()) { @@ -701,12 +713,19 @@ private Table loadFromPathIdentifier(PathIdentifier ident) { if (id.matches()) { snapshotId = Long.parseLong(id.group(1)); } + + Matcher ref = BRANCH.matcher(meta); + if (ref.matches()) { + branch = ref.group(1); + } } Preconditions.checkArgument( - asOfTimestamp == null || snapshotId == null, - "Cannot specify both snapshot-id and as-of-timestamp: %s", - ident.location()); + Stream.of(snapshotId, asOfTimestamp, branch).filter(Objects::nonNull).count() <= 1, + "Can specify at most one of snapshot-id (%s), as-of-timestamp (%s), and snapshot-ref (%s)", + snapshotId, + asOfTimestamp, + branch); Preconditions.checkArgument( !isChangelog || (snapshotId == null && asOfTimestamp == null), @@ -722,6 +741,9 @@ private Table loadFromPathIdentifier(PathIdentifier ident) { long snapshotIdAsOfTime = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp); return new SparkTable(table, snapshotIdAsOfTime, !cacheEnabled); + } else if (branch != null && table.snapshot(branch) != null) { + return new SparkTable(table, table.snapshot(branch).snapshotId(), !cacheEnabled); + } else { return new SparkTable(table, snapshotId, !cacheEnabled); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java index 4a03d3a5b5ab..4c27f6cd4e40 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java @@ -67,6 +67,7 @@ public class IcebergSource implements DataSourceRegister, SupportsCatalogOptions "spark.sql.catalog." + DEFAULT_CACHE_CATALOG_NAME; private static final String AT_TIMESTAMP = "at_timestamp_"; private static final String SNAPSHOT_ID = "snapshot_id_"; + private static final String BRANCH = "branch_"; private static final String[] EMPTY_NAMESPACE = new String[0]; private static final SparkTableCache TABLE_CACHE = SparkTableCache.get(); @@ -124,6 +125,7 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri Long snapshotId = propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID); Long asOfTimestamp = propertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP); + String branch = options.get(SparkReadOptions.BRANCH); Preconditions.checkArgument( asOfTimestamp == null || snapshotId == null, "Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", @@ -140,6 +142,10 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri selector = AT_TIMESTAMP + asOfTimestamp; } + if (branch != null) { + selector = BRANCH + branch; + } + CatalogManager catalogManager = spark.sessionState().catalogManager(); if (TABLE_CACHE.contains(path)) { diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 0208121f0027..1495782365d3 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -368,7 +368,7 @@ private static CaseInsensitiveStringMap addSnapshotId( scanOptions.putAll(options.asCaseSensitiveMap()); scanOptions.put(SparkReadOptions.SNAPSHOT_ID, value); scanOptions.remove(SparkReadOptions.AS_OF_TIMESTAMP); - + scanOptions.remove(SparkReadOptions.BRANCH); return new CaseInsensitiveStringMap(scanOptions); } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index 0b7348fa078a..446fd073870f 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -370,4 +370,42 @@ public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOExcep .isInstanceOf(IllegalArgumentException.class) .hasMessageStartingWith("Cannot override ref, already set snapshot id="); } + + @Test + public void testSnapshotSelectionByBranchWithSchemaChange() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = tables.create(SCHEMA, spec, tableLocation); + + // produce the first snapshot + List firstBatchRecords = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); + firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit(); + + Dataset currentSnapshotResult = + spark.read().format("iceberg").option("branch", "branch").load(tableLocation); + List currentSnapshotRecords = + currentSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(firstBatchRecords); + Assert.assertEquals( + "Current snapshot rows should match", expectedRecords, currentSnapshotRecords); + + table.updateSchema().deleteColumn("data").commit(); + + Dataset deleteSnapshotResult = + spark.read().format("iceberg").option("branch", "branch").load(tableLocation); + List deletedSnapshotRecords = + deleteSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + List expectedRecordsAfterDeletion = Lists.newArrayList(); + expectedRecordsAfterDeletion.addAll(firstBatchRecords); + Assert.assertEquals( + "Current snapshot rows should match", expectedRecords, deletedSnapshotRecords); + } } From 7f8829899d429b1b5561bee9468b7a4fcb03e4f2 Mon Sep 17 00:00:00 2001 From: Namratha Mysore Keshavaprakash Date: Fri, 10 Feb 2023 13:58:42 -0800 Subject: [PATCH 02/11] spotless --- .../apache/iceberg/spark/SparkCatalog.java | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index e078a2b94477..192baa023782 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -645,26 +645,30 @@ private Table load(Identifier ident) { return new SparkChangelogTable(table, !cacheEnabled); } + long snapshotId = -1; Matcher at = AT_TIMESTAMP.matcher(ident.name()); if (at.matches()) { long asOfTimestamp = Long.parseLong(at.group(1)); - long snapshotId = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp); - return new SparkTable(table, snapshotId, !cacheEnabled); + snapshotId = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp); } Matcher id = SNAPSHOT_ID.matcher(ident.name()); if (id.matches()) { - long snapshotId = Long.parseLong(id.group(1)); - return new SparkTable(table, snapshotId, !cacheEnabled); + snapshotId = Long.parseLong(id.group(1)); } Matcher branch = BRANCH.matcher(ident.name()); if (branch.matches()) { Snapshot snapshot = table.snapshot(branch.group(1)); if (snapshot != null) { - return new SparkTable(table, snapshot.snapshotId(), !cacheEnabled); + snapshotId = snapshot.snapshotId(); } } + + if (snapshotId != -1) { + return new SparkTable(table, snapshotId, !cacheEnabled); + } + // the name wasn't a valid snapshot selector and did not point to the changelog // throw the original exception throw e; @@ -734,19 +738,19 @@ private Table loadFromPathIdentifier(PathIdentifier ident) { org.apache.iceberg.Table table = tables.load(parsed.first() + (metadataTableName != null ? "#" + metadataTableName : "")); + snapshotId = -1L; + if (isChangelog) { return new SparkChangelogTable(table, !cacheEnabled); } else if (asOfTimestamp != null) { - long snapshotIdAsOfTime = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp); - return new SparkTable(table, snapshotIdAsOfTime, !cacheEnabled); - - } else if (branch != null && table.snapshot(branch) != null) { - return new SparkTable(table, table.snapshot(branch).snapshotId(), !cacheEnabled); - - } else { - return new SparkTable(table, snapshotId, !cacheEnabled); + snapshotId = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp); + } else if (branch != null) { + Preconditions.checkArgument( + table.snapshot(branch) != null, "branch not associated with a snapshot"); + snapshotId = table.snapshot(branch).snapshotId(); } + return new SparkTable(table, snapshotId, !cacheEnabled); } private Identifier namespaceToIdentifier(String[] namespace) { From 1afd51bbc5f84104016b07ac6b3e291f6f31cfa3 Mon Sep 17 00:00:00 2001 From: Namratha Mysore Keshavaprakash Date: Fri, 10 Feb 2023 14:44:56 -0800 Subject: [PATCH 03/11] adding tags --- .../spark/SparkCachedTableCatalog.java | 32 +++++++++++++--- .../apache/iceberg/spark/SparkCatalog.java | 37 ++++++++++++++----- .../iceberg/spark/source/IcebergSource.java | 16 ++++++-- .../iceberg/spark/source/SparkTable.java | 2 + .../spark/source/TestSnapshotSelection.java | 23 +++++++----- 5 files changed, 83 insertions(+), 27 deletions(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java index 923c54981199..42529cb2819a 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java @@ -20,9 +20,11 @@ import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Stream; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -46,7 +48,7 @@ public class SparkCachedTableCatalog implements TableCatalog { private static final Splitter COMMA = Splitter.on(","); private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)"); private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)"); - + private static final Pattern BRANCH = Pattern.compile("branch_(.*)"); private static final SparkTableCache TABLE_CACHE = SparkTableCache.get(); private String name = null; @@ -133,6 +135,7 @@ private Pair load(Identifier ident) throws NoSuchTableException { Long asOfTimestamp = null; Long snapshotId = null; + String branch = null; for (String meta : metadata) { Matcher timeBasedMatcher = AT_TIMESTAMP.matcher(meta); if (timeBasedMatcher.matches()) { @@ -144,12 +147,19 @@ private Pair load(Identifier ident) throws NoSuchTableException { if (snapshotBasedMatcher.matches()) { snapshotId = Long.parseLong(snapshotBasedMatcher.group(1)); } + + Matcher snapshotRefBasedMatcher = BRANCH.matcher(meta); + if (snapshotRefBasedMatcher.matches()) { + branch = snapshotRefBasedMatcher.group(1); + } } Preconditions.checkArgument( - asOfTimestamp == null || snapshotId == null, - "Cannot specify both snapshot and timestamp for time travel: %s", - ident); + Stream.of(snapshotId, asOfTimestamp, branch).filter(Objects::nonNull).count() <= 1, + "Can specify at most one of snapshot-id (%s), as-of-timestamp (%s), and snapshot-ref (%s)", + snapshotId, + asOfTimestamp, + branch); Table table = TABLE_CACHE.get(key); @@ -157,10 +167,20 @@ private Pair load(Identifier ident) throws NoSuchTableException { throw new NoSuchTableException(ident); } + long snapshotIdFromTimeTravel = -1L; + if (snapshotId != null) { - return Pair.of(table, snapshotId); + snapshotIdFromTimeTravel = snapshotId; } else if (asOfTimestamp != null) { - return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp)); + snapshotIdFromTimeTravel = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp); + } else if (branch != null) { + Preconditions.checkArgument( + table.snapshot(branch) != null, "branch not associated with a snapshot"); + snapshotIdFromTimeTravel = table.snapshot(branch).snapshotId(); + } + + if (snapshotIdFromTimeTravel != -1L) { + return Pair.of(table, snapshotIdFromTimeTravel); } else { return Pair.of(table, null); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 192baa023782..0afc368857ac 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -109,6 +109,7 @@ public class SparkCatalog extends BaseCatalog { private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)"); private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)"); private static final Pattern BRANCH = Pattern.compile("branch_(.*)"); + private static final Pattern TAG = Pattern.compile("tag_(.*)"); private String catalogName = null; private Catalog icebergCatalog = null; @@ -665,6 +666,14 @@ private Table load(Identifier ident) { } } + Matcher tag = TAG.matcher(ident.name()); + if (tag.matches()) { + Snapshot snapshot = table.snapshot(tag.group(1)); + if (snapshot != null) { + snapshotId = snapshot.snapshotId(); + } + } + if (snapshotId != -1) { return new SparkTable(table, snapshotId, !cacheEnabled); } @@ -694,6 +703,7 @@ private Table loadFromPathIdentifier(PathIdentifier ident) { Long asOfTimestamp = null; Long snapshotId = null; String branch = null; + String tag = null; boolean isChangelog = false; for (String meta : parsed.second()) { @@ -718,18 +728,24 @@ private Table loadFromPathIdentifier(PathIdentifier ident) { snapshotId = Long.parseLong(id.group(1)); } - Matcher ref = BRANCH.matcher(meta); - if (ref.matches()) { - branch = ref.group(1); + Matcher branchRef = BRANCH.matcher(meta); + if (branchRef.matches()) { + branch = branchRef.group(1); + } + + Matcher tagRef = TAG.matcher(meta); + if (tagRef.matches()) { + tag = tagRef.group(1); } } Preconditions.checkArgument( - Stream.of(snapshotId, asOfTimestamp, branch).filter(Objects::nonNull).count() <= 1, - "Can specify at most one of snapshot-id (%s), as-of-timestamp (%s), and snapshot-ref (%s)", + Stream.of(snapshotId, asOfTimestamp, branch, tag).filter(Objects::nonNull).count() <= 1, + "Can specify at most one of snapshot-id (%s), as-of-timestamp (%s), branch (%s) and tag (%s)", snapshotId, asOfTimestamp, - branch); + branch, + tag); Preconditions.checkArgument( !isChangelog || (snapshotId == null && asOfTimestamp == null), @@ -738,17 +754,20 @@ private Table loadFromPathIdentifier(PathIdentifier ident) { org.apache.iceberg.Table table = tables.load(parsed.first() + (metadataTableName != null ? "#" + metadataTableName : "")); - snapshotId = -1L; + long snapshotIdFromTimeTravel = -1L; if (isChangelog) { return new SparkChangelogTable(table, !cacheEnabled); } else if (asOfTimestamp != null) { - snapshotId = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp); + snapshotIdFromTimeTravel = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp); } else if (branch != null) { Preconditions.checkArgument( table.snapshot(branch) != null, "branch not associated with a snapshot"); - snapshotId = table.snapshot(branch).snapshotId(); + snapshotIdFromTimeTravel = table.snapshot(branch).snapshotId(); + } + if (snapshotIdFromTimeTravel != -1L) { + return new SparkTable(table, snapshotIdFromTimeTravel, !cacheEnabled); } return new SparkTable(table, snapshotId, !cacheEnabled); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java index 4c27f6cd4e40..902e0e47e3f1 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java @@ -20,7 +20,9 @@ import java.util.Arrays; import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.stream.Stream; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.spark.PathIdentifier; @@ -68,6 +70,7 @@ public class IcebergSource implements DataSourceRegister, SupportsCatalogOptions private static final String AT_TIMESTAMP = "at_timestamp_"; private static final String SNAPSHOT_ID = "snapshot_id_"; private static final String BRANCH = "branch_"; + private static final String TAG = "tag_"; private static final String[] EMPTY_NAMESPACE = new String[0]; private static final SparkTableCache TABLE_CACHE = SparkTableCache.get(); @@ -126,11 +129,14 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri Long snapshotId = propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID); Long asOfTimestamp = propertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP); String branch = options.get(SparkReadOptions.BRANCH); + String tag = options.get(SparkReadOptions.TAG); Preconditions.checkArgument( - asOfTimestamp == null || snapshotId == null, - "Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", + Stream.of(snapshotId, asOfTimestamp, branch, tag).filter(Objects::nonNull).count() <= 1, + "Can specify at most one of snapshot-id (%s), as-of-timestamp (%s), branch (%s) and tag (%s)", snapshotId, - asOfTimestamp); + asOfTimestamp, + branch, + tag); String selector = null; @@ -146,6 +152,10 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri selector = BRANCH + branch; } + if (tag != null) { + selector = TAG + tag; + } + CatalogManager catalogManager = spark.sessionState().catalogManager(); if (TABLE_CACHE.contains(path)) { diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 1495782365d3..9f07b14cc987 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -368,7 +368,9 @@ private static CaseInsensitiveStringMap addSnapshotId( scanOptions.putAll(options.asCaseSensitiveMap()); scanOptions.put(SparkReadOptions.SNAPSHOT_ID, value); scanOptions.remove(SparkReadOptions.AS_OF_TIMESTAMP); + scanOptions.remove(SparkReadOptions.BRANCH); + scanOptions.remove(SparkReadOptions.TAG); return new CaseInsensitiveStringMap(scanOptions); } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index 446fd073870f..7a395f2c53cc 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -223,8 +223,10 @@ public void testSnapshotSelectionBySnapshotIdAndTimestamp() throws IOException { .option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp) .load(tableLocation)) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Cannot specify both snapshot-id") - .hasMessageContaining("and as-of-timestamp"); + .hasMessageContaining("Can specify at most one of snapshot-id") + .hasMessageContaining("as-of-timestamp") + .hasMessageContaining("branch") + .hasMessageContaining("tag"); } @Test @@ -325,7 +327,7 @@ public void testSnapshotSelectionByBranchAndTagFails() throws IOException { .load(tableLocation) .show()) .isInstanceOf(IllegalArgumentException.class) - .hasMessageStartingWith("Cannot override ref, already set snapshot id="); + .hasMessageStartingWith("Can specify at most one of snapshot-id"); } @Test @@ -356,7 +358,7 @@ public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOExcep .load(tableLocation) .show()) .isInstanceOf(IllegalArgumentException.class) - .hasMessageStartingWith("Cannot override ref, already set snapshot id="); + .hasMessageStartingWith("Can specify at most one of snapshot-id"); Assertions.assertThatThrownBy( () -> @@ -368,7 +370,7 @@ public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOExcep .load(tableLocation) .show()) .isInstanceOf(IllegalArgumentException.class) - .hasMessageStartingWith("Cannot override ref, already set snapshot id="); + .hasMessageStartingWith("Can specify at most one of snapshot-id"); } @Test @@ -397,14 +399,17 @@ public void testSnapshotSelectionByBranchWithSchemaChange() throws IOException { Assert.assertEquals( "Current snapshot rows should match", expectedRecords, currentSnapshotRecords); + // Deleting a column to indicate schema change table.updateSchema().deleteColumn("data").commit(); - Dataset deleteSnapshotResult = + Dataset deleteColumnSnapshotResult = spark.read().format("iceberg").option("branch", "branch").load(tableLocation); List deletedSnapshotRecords = - deleteSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); - List expectedRecordsAfterDeletion = Lists.newArrayList(); - expectedRecordsAfterDeletion.addAll(firstBatchRecords); + deleteColumnSnapshotResult + .orderBy("id") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + // The data should have the deleted column as it was captured in an earlier snapshot. Assert.assertEquals( "Current snapshot rows should match", expectedRecords, deletedSnapshotRecords); } From 0d9e059138cab264b5197fd3b887fabfa0399429 Mon Sep 17 00:00:00 2001 From: Namratha Mysore Keshavaprakash Date: Fri, 10 Feb 2023 14:53:08 -0800 Subject: [PATCH 04/11] tests fix --- .../spark/source/TestSnapshotSelection.java | 38 ++++++++++++++----- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index 7a395f2c53cc..0d98b0724b7a 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -374,7 +374,7 @@ public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOExcep } @Test - public void testSnapshotSelectionByBranchWithSchemaChange() throws IOException { + public void testSnapshotSelectionByBranchOrTagWithSchemaChange() throws IOException { String tableLocation = temp.newFolder("iceberg-table").toString(); HadoopTables tables = new HadoopTables(CONF); @@ -389,28 +389,46 @@ public void testSnapshotSelectionByBranchWithSchemaChange() throws IOException { firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit(); + table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit(); - Dataset currentSnapshotResult = + Dataset branchSnapshotResult = spark.read().format("iceberg").option("branch", "branch").load(tableLocation); - List currentSnapshotRecords = - currentSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + List branchSnapshotRecords = + branchSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); List expectedRecords = Lists.newArrayList(); expectedRecords.addAll(firstBatchRecords); Assert.assertEquals( - "Current snapshot rows should match", expectedRecords, currentSnapshotRecords); + "Current snapshot rows should match", expectedRecords, branchSnapshotRecords); + + Dataset tagSnapshotResult = + spark.read().format("iceberg").option("tag", "tag").load(tableLocation); + List tagSnapshotRecords = + tagSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + Assert.assertEquals( + "Current snapshot rows should match", expectedRecords, tagSnapshotRecords); // Deleting a column to indicate schema change table.updateSchema().deleteColumn("data").commit(); - Dataset deleteColumnSnapshotResult = + // The data should have the deleted column as it was captured in an earlier snapshot. + Dataset deletedColumnBranchSnapshotResult = spark.read().format("iceberg").option("branch", "branch").load(tableLocation); - List deletedSnapshotRecords = - deleteColumnSnapshotResult + List deletedColumnBranchSnapshotRecords = + deletedColumnBranchSnapshotResult .orderBy("id") .as(Encoders.bean(SimpleRecord.class)) .collectAsList(); - // The data should have the deleted column as it was captured in an earlier snapshot. Assert.assertEquals( - "Current snapshot rows should match", expectedRecords, deletedSnapshotRecords); + "Current snapshot rows should match", expectedRecords, deletedColumnBranchSnapshotRecords); + + Dataset deletedColumnTagSnapshotResult = + spark.read().format("iceberg").option("branch", "branch").load(tableLocation); + List deletedColumnTagSnapshotRecords = + deletedColumnTagSnapshotResult + .orderBy("id") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + Assert.assertEquals( + "Current snapshot rows should match", expectedRecords, deletedColumnTagSnapshotRecords); } } From 78ef8f2085c2b155bc0b3ab635fcde58ee46a8e1 Mon Sep 17 00:00:00 2001 From: Namratha Mysore Keshavaprakash Date: Fri, 10 Feb 2023 14:57:15 -0800 Subject: [PATCH 05/11] spotless --- .../spark/source/TestSnapshotSelection.java | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index 0d98b0724b7a..b7f9cca20051 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -394,18 +394,17 @@ public void testSnapshotSelectionByBranchOrTagWithSchemaChange() throws IOExcept Dataset branchSnapshotResult = spark.read().format("iceberg").option("branch", "branch").load(tableLocation); List branchSnapshotRecords = - branchSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + branchSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); List expectedRecords = Lists.newArrayList(); expectedRecords.addAll(firstBatchRecords); Assert.assertEquals( "Current snapshot rows should match", expectedRecords, branchSnapshotRecords); Dataset tagSnapshotResult = - spark.read().format("iceberg").option("tag", "tag").load(tableLocation); + spark.read().format("iceberg").option("tag", "tag").load(tableLocation); List tagSnapshotRecords = - tagSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); - Assert.assertEquals( - "Current snapshot rows should match", expectedRecords, tagSnapshotRecords); + tagSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + Assert.assertEquals("Current snapshot rows should match", expectedRecords, tagSnapshotRecords); // Deleting a column to indicate schema change table.updateSchema().deleteColumn("data").commit(); @@ -419,16 +418,16 @@ public void testSnapshotSelectionByBranchOrTagWithSchemaChange() throws IOExcept .as(Encoders.bean(SimpleRecord.class)) .collectAsList(); Assert.assertEquals( - "Current snapshot rows should match", expectedRecords, deletedColumnBranchSnapshotRecords); + "Current snapshot rows should match", expectedRecords, deletedColumnBranchSnapshotRecords); Dataset deletedColumnTagSnapshotResult = - spark.read().format("iceberg").option("branch", "branch").load(tableLocation); + spark.read().format("iceberg").option("branch", "branch").load(tableLocation); List deletedColumnTagSnapshotRecords = - deletedColumnTagSnapshotResult - .orderBy("id") - .as(Encoders.bean(SimpleRecord.class)) - .collectAsList(); + deletedColumnTagSnapshotResult + .orderBy("id") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); Assert.assertEquals( - "Current snapshot rows should match", expectedRecords, deletedColumnTagSnapshotRecords); + "Current snapshot rows should match", expectedRecords, deletedColumnTagSnapshotRecords); } } From 825220030ee05b9308db13fa203d832b4d18e9d7 Mon Sep 17 00:00:00 2001 From: Namratha Mysore Keshavaprakash Date: Fri, 10 Feb 2023 15:09:32 -0800 Subject: [PATCH 06/11] tag fixes --- .../spark/SparkCachedTableCatalog.java | 19 +++++++++++++++---- .../apache/iceberg/spark/SparkCatalog.java | 5 +++++ 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java index 42529cb2819a..d9ab932d776b 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java @@ -49,6 +49,7 @@ public class SparkCachedTableCatalog implements TableCatalog { private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)"); private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)"); private static final Pattern BRANCH = Pattern.compile("branch_(.*)"); + private static final Pattern TAG = Pattern.compile("tag_(.*)"); private static final SparkTableCache TABLE_CACHE = SparkTableCache.get(); private String name = null; @@ -136,6 +137,7 @@ private Pair load(Identifier ident) throws NoSuchTableException { Long asOfTimestamp = null; Long snapshotId = null; String branch = null; + String tag = null; for (String meta : metadata) { Matcher timeBasedMatcher = AT_TIMESTAMP.matcher(meta); if (timeBasedMatcher.matches()) { @@ -148,14 +150,19 @@ private Pair load(Identifier ident) throws NoSuchTableException { snapshotId = Long.parseLong(snapshotBasedMatcher.group(1)); } - Matcher snapshotRefBasedMatcher = BRANCH.matcher(meta); - if (snapshotRefBasedMatcher.matches()) { - branch = snapshotRefBasedMatcher.group(1); + Matcher branchBasedMatcher = BRANCH.matcher(meta); + if (branchBasedMatcher.matches()) { + branch = branchBasedMatcher.group(1); + } + + Matcher tagBasedMatcher = BRANCH.matcher(meta); + if (tagBasedMatcher.matches()) { + tag = tagBasedMatcher.group(1); } } Preconditions.checkArgument( - Stream.of(snapshotId, asOfTimestamp, branch).filter(Objects::nonNull).count() <= 1, + Stream.of(snapshotId, asOfTimestamp, branch, tag).filter(Objects::nonNull).count() <= 1, "Can specify at most one of snapshot-id (%s), as-of-timestamp (%s), and snapshot-ref (%s)", snapshotId, asOfTimestamp, @@ -177,6 +184,10 @@ private Pair load(Identifier ident) throws NoSuchTableException { Preconditions.checkArgument( table.snapshot(branch) != null, "branch not associated with a snapshot"); snapshotIdFromTimeTravel = table.snapshot(branch).snapshotId(); + } else if (tag != null) { + Preconditions.checkArgument( + table.snapshot(tag) != null, "tag not associated with a snapshot"); + snapshotIdFromTimeTravel = table.snapshot(tag).snapshotId(); } if (snapshotIdFromTimeTravel != -1L) { diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 0afc368857ac..768575d0cc81 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -765,7 +765,12 @@ private Table loadFromPathIdentifier(PathIdentifier ident) { Preconditions.checkArgument( table.snapshot(branch) != null, "branch not associated with a snapshot"); snapshotIdFromTimeTravel = table.snapshot(branch).snapshotId(); + } else if (tag != null) { + Preconditions.checkArgument( + table.snapshot(tag) != null, "tag not associated with a snapshot"); + snapshotIdFromTimeTravel = table.snapshot(tag).snapshotId(); } + if (snapshotIdFromTimeTravel != -1L) { return new SparkTable(table, snapshotIdFromTimeTravel, !cacheEnabled); } From 77ef6c49f477637a8e5dac43817ad34a6eb5e816 Mon Sep 17 00:00:00 2001 From: Namratha Mysore Keshavaprakash Date: Sat, 11 Feb 2023 23:19:56 -0800 Subject: [PATCH 07/11] spotless --- .../spark/SparkCachedTableCatalog.java | 26 ++++++------ .../apache/iceberg/spark/SparkCatalog.java | 42 ++++++++----------- 2 files changed, 30 insertions(+), 38 deletions(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java index d9ab932d776b..c627ef49bd80 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java @@ -25,6 +25,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Stream; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -50,6 +51,7 @@ public class SparkCachedTableCatalog implements TableCatalog { private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)"); private static final Pattern BRANCH = Pattern.compile("branch_(.*)"); private static final Pattern TAG = Pattern.compile("tag_(.*)"); + private static final SparkTableCache TABLE_CACHE = SparkTableCache.get(); private String name = null; @@ -155,7 +157,7 @@ private Pair load(Identifier ident) throws NoSuchTableException { branch = branchBasedMatcher.group(1); } - Matcher tagBasedMatcher = BRANCH.matcher(meta); + Matcher tagBasedMatcher = TAG.matcher(meta); if (tagBasedMatcher.matches()) { tag = tagBasedMatcher.group(1); } @@ -174,27 +176,23 @@ private Pair load(Identifier ident) throws NoSuchTableException { throw new NoSuchTableException(ident); } - long snapshotIdFromTimeTravel = -1L; - if (snapshotId != null) { - snapshotIdFromTimeTravel = snapshotId; + return Pair.of(table, snapshotId); } else if (asOfTimestamp != null) { - snapshotIdFromTimeTravel = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp); + return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp)); } else if (branch != null) { + Snapshot branchSnapshot = table.snapshot(branch); Preconditions.checkArgument( - table.snapshot(branch) != null, "branch not associated with a snapshot"); - snapshotIdFromTimeTravel = table.snapshot(branch).snapshotId(); + branchSnapshot != null, "Cannot find snapshot associated with branch name: %s", branch); + return Pair.of(table, branchSnapshot.snapshotId()); } else if (tag != null) { + Snapshot tagSnapshot = table.snapshot(tag); Preconditions.checkArgument( - table.snapshot(tag) != null, "tag not associated with a snapshot"); - snapshotIdFromTimeTravel = table.snapshot(tag).snapshotId(); + tagSnapshot != null, "Cannot find snapshot associated with tag name: %s", tag); + return Pair.of(table, tagSnapshot.snapshotId()); } - if (snapshotIdFromTimeTravel != -1L) { - return Pair.of(table, snapshotIdFromTimeTravel); - } else { - return Pair.of(table, null); - } + return Pair.of(table, null); } private Pair> parseIdent(Identifier ident) { diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 768575d0cc81..a2a059d3c456 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -646,38 +646,34 @@ private Table load(Identifier ident) { return new SparkChangelogTable(table, !cacheEnabled); } - long snapshotId = -1; Matcher at = AT_TIMESTAMP.matcher(ident.name()); if (at.matches()) { long asOfTimestamp = Long.parseLong(at.group(1)); - snapshotId = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp); + long snapshotId = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp); + return new SparkTable(table, snapshotId, !cacheEnabled); } Matcher id = SNAPSHOT_ID.matcher(ident.name()); if (id.matches()) { - snapshotId = Long.parseLong(id.group(1)); + long snapshotId = Long.parseLong(id.group(1)); + return new SparkTable(table, snapshotId, !cacheEnabled); } Matcher branch = BRANCH.matcher(ident.name()); if (branch.matches()) { - Snapshot snapshot = table.snapshot(branch.group(1)); - if (snapshot != null) { - snapshotId = snapshot.snapshotId(); + Snapshot branchSnapshot = table.snapshot(branch.group(1)); + if (branchSnapshot != null) { + return new SparkTable(table, branchSnapshot.snapshotId(), !cacheEnabled); } } Matcher tag = TAG.matcher(ident.name()); if (tag.matches()) { - Snapshot snapshot = table.snapshot(tag.group(1)); - if (snapshot != null) { - snapshotId = snapshot.snapshotId(); + Snapshot tagSnapshot = table.snapshot(tag.group(1)); + if (tagSnapshot != null) { + return new SparkTable(table, tagSnapshot.snapshotId(), !cacheEnabled); } } - - if (snapshotId != -1) { - return new SparkTable(table, snapshotId, !cacheEnabled); - } - // the name wasn't a valid snapshot selector and did not point to the changelog // throw the original exception throw e; @@ -754,26 +750,24 @@ private Table loadFromPathIdentifier(PathIdentifier ident) { org.apache.iceberg.Table table = tables.load(parsed.first() + (metadataTableName != null ? "#" + metadataTableName : "")); - long snapshotIdFromTimeTravel = -1L; - if (isChangelog) { return new SparkChangelogTable(table, !cacheEnabled); } else if (asOfTimestamp != null) { - snapshotIdFromTimeTravel = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp); + long snapshotIdAsOfTime = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp); + return new SparkTable(table, snapshotIdAsOfTime, !cacheEnabled); } else if (branch != null) { + Snapshot branchSnapshot = table.snapshot(branch); Preconditions.checkArgument( - table.snapshot(branch) != null, "branch not associated with a snapshot"); - snapshotIdFromTimeTravel = table.snapshot(branch).snapshotId(); + branchSnapshot != null, "Cannot find snapshot associated with branch name: %s", branch); + return new SparkTable(table, branchSnapshot.snapshotId(), !cacheEnabled); } else if (tag != null) { + Snapshot tagSnapshot = table.snapshot(tag); Preconditions.checkArgument( - table.snapshot(tag) != null, "tag not associated with a snapshot"); - snapshotIdFromTimeTravel = table.snapshot(tag).snapshotId(); + tagSnapshot != null, "Cannot find snapshot associated with tag name: %s", tag); + return new SparkTable(table, tagSnapshot.snapshotId(), !cacheEnabled); } - if (snapshotIdFromTimeTravel != -1L) { - return new SparkTable(table, snapshotIdFromTimeTravel, !cacheEnabled); - } return new SparkTable(table, snapshotId, !cacheEnabled); } From cb3dac96215945cea817d89c79b22091bb2b2ced Mon Sep 17 00:00:00 2001 From: Namratha Mysore Keshavaprakash Date: Sat, 11 Feb 2023 23:27:41 -0800 Subject: [PATCH 08/11] spotless --- .../src/main/java/org/apache/iceberg/spark/SparkCatalog.java | 1 + 1 file changed, 1 insertion(+) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index a2a059d3c456..36cc2edd120c 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -756,6 +756,7 @@ private Table loadFromPathIdentifier(PathIdentifier ident) { } else if (asOfTimestamp != null) { long snapshotIdAsOfTime = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp); return new SparkTable(table, snapshotIdAsOfTime, !cacheEnabled); + } else if (branch != null) { Snapshot branchSnapshot = table.snapshot(branch); Preconditions.checkArgument( From 5d1d7a9c15da8b079571840b22ea8027bafaef26 Mon Sep 17 00:00:00 2001 From: Namratha Mysore Keshavaprakash Date: Sun, 12 Feb 2023 09:48:44 -0800 Subject: [PATCH 09/11] review comments changes --- .../spark/SparkCachedTableCatalog.java | 4 +- .../apache/iceberg/spark/SparkCatalog.java | 4 +- .../spark/source/TestSnapshotSelection.java | 42 +++++++++++++++---- .../apache/iceberg/spark/sql/TestSelect.java | 3 +- 4 files changed, 39 insertions(+), 14 deletions(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java index c627ef49bd80..22bf066b4785 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java @@ -190,9 +190,9 @@ private Pair load(Identifier ident) throws NoSuchTableException { Preconditions.checkArgument( tagSnapshot != null, "Cannot find snapshot associated with tag name: %s", tag); return Pair.of(table, tagSnapshot.snapshotId()); + } else { + return Pair.of(table, null); } - - return Pair.of(table, null); } private Pair> parseIdent(Identifier ident) { diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 36cc2edd120c..685f14ba81b5 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -767,9 +767,9 @@ private Table loadFromPathIdentifier(PathIdentifier ident) { Preconditions.checkArgument( tagSnapshot != null, "Cannot find snapshot associated with tag name: %s", tag); return new SparkTable(table, tagSnapshot.snapshotId(), !cacheEnabled); + } else { + return new SparkTable(table, snapshotId, !cacheEnabled); } - - return new SparkTable(table, snapshotId, !cacheEnabled); } private Identifier namespaceToIdentifier(String[] namespace) { diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index b7f9cca20051..77f580ab831c 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -374,7 +374,7 @@ public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOExcep } @Test - public void testSnapshotSelectionByBranchOrTagWithSchemaChange() throws IOException { + public void testSnapshotSelectionByBranchWithSchemaChange() throws IOException { String tableLocation = temp.newFolder("iceberg-table").toString(); HadoopTables tables = new HadoopTables(CONF); @@ -389,7 +389,6 @@ public void testSnapshotSelectionByBranchOrTagWithSchemaChange() throws IOExcept firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit(); - table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit(); Dataset branchSnapshotResult = spark.read().format("iceberg").option("branch", "branch").load(tableLocation); @@ -400,12 +399,6 @@ public void testSnapshotSelectionByBranchOrTagWithSchemaChange() throws IOExcept Assert.assertEquals( "Current snapshot rows should match", expectedRecords, branchSnapshotRecords); - Dataset tagSnapshotResult = - spark.read().format("iceberg").option("tag", "tag").load(tableLocation); - List tagSnapshotRecords = - tagSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); - Assert.assertEquals("Current snapshot rows should match", expectedRecords, tagSnapshotRecords); - // Deleting a column to indicate schema change table.updateSchema().deleteColumn("data").commit(); @@ -419,9 +412,40 @@ public void testSnapshotSelectionByBranchOrTagWithSchemaChange() throws IOExcept .collectAsList(); Assert.assertEquals( "Current snapshot rows should match", expectedRecords, deletedColumnBranchSnapshotRecords); + } + + @Test + public void testSnapshotSelectionByTagWithSchemaChange() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = tables.create(SCHEMA, spec, tableLocation); + // produce the first snapshot + List firstBatchRecords = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); + firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit(); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(firstBatchRecords); + + Dataset tagSnapshotResult = + spark.read().format("iceberg").option("tag", "tag").load(tableLocation); + List tagSnapshotRecords = + tagSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + Assert.assertEquals("Current snapshot rows should match", expectedRecords, tagSnapshotRecords); + + // Deleting a column to indicate schema change + table.updateSchema().deleteColumn("data").commit(); + + // The data should have the deleted column as it was captured in an earlier snapshot. Dataset deletedColumnTagSnapshotResult = - spark.read().format("iceberg").option("branch", "branch").load(tableLocation); + spark.read().format("iceberg").option("tag", "tag").load(tableLocation); List deletedColumnTagSnapshotRecords = deletedColumnTagSnapshotResult .orderBy("id") diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java index 9f074c6f9b61..9c3d49d392cf 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java @@ -441,7 +441,8 @@ public void testSpecifySnapshotAndTimestamp() { "Should not be able to specify both snapshot id and timestamp", IllegalArgumentException.class, String.format( - "Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", snapshotId, timestamp), + "Can specify at most one of snapshot-id (%s), as-of-timestamp (%s)", + snapshotId, timestamp), () -> { spark .read() From a5fb911c36cafc89a70a5d1da94ef88d1dc62a09 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 22 Feb 2023 11:16:25 -0800 Subject: [PATCH 10/11] address comments --- .../apache/iceberg/spark/SparkCachedTableCatalog.java | 5 +++-- .../java/org/apache/iceberg/spark/SparkCatalog.java | 5 ++++- .../org/apache/iceberg/spark/source/IcebergSource.java | 10 +++++----- .../iceberg/spark/source/TestSnapshotSelection.java | 8 ++++---- .../java/org/apache/iceberg/spark/sql/TestSelect.java | 2 +- 5 files changed, 17 insertions(+), 13 deletions(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java index 22bf066b4785..e3e15b5cc845 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java @@ -165,10 +165,11 @@ private Pair load(Identifier ident) throws NoSuchTableException { Preconditions.checkArgument( Stream.of(snapshotId, asOfTimestamp, branch, tag).filter(Objects::nonNull).count() <= 1, - "Can specify at most one of snapshot-id (%s), as-of-timestamp (%s), and snapshot-ref (%s)", + "Can specify only one of snapshot-id (%s), as-of-timestamp (%s), branch (%s), tag (%s)", snapshotId, asOfTimestamp, - branch); + branch, + tag); Table table = TABLE_CACHE.get(key); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 685f14ba81b5..639fb5e84d70 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -674,6 +674,7 @@ private Table load(Identifier ident) { return new SparkTable(table, tagSnapshot.snapshotId(), !cacheEnabled); } } + // the name wasn't a valid snapshot selector and did not point to the changelog // throw the original exception throw e; @@ -737,7 +738,7 @@ private Table loadFromPathIdentifier(PathIdentifier ident) { Preconditions.checkArgument( Stream.of(snapshotId, asOfTimestamp, branch, tag).filter(Objects::nonNull).count() <= 1, - "Can specify at most one of snapshot-id (%s), as-of-timestamp (%s), branch (%s) and tag (%s)", + "Can specify only one of snapshot-id (%s), as-of-timestamp (%s), branch (%s), tag (%s)", snapshotId, asOfTimestamp, branch, @@ -762,11 +763,13 @@ private Table loadFromPathIdentifier(PathIdentifier ident) { Preconditions.checkArgument( branchSnapshot != null, "Cannot find snapshot associated with branch name: %s", branch); return new SparkTable(table, branchSnapshot.snapshotId(), !cacheEnabled); + } else if (tag != null) { Snapshot tagSnapshot = table.snapshot(tag); Preconditions.checkArgument( tagSnapshot != null, "Cannot find snapshot associated with tag name: %s", tag); return new SparkTable(table, tagSnapshot.snapshotId(), !cacheEnabled); + } else { return new SparkTable(table, snapshotId, !cacheEnabled); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java index 902e0e47e3f1..8975c7f32db1 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java @@ -69,8 +69,8 @@ public class IcebergSource implements DataSourceRegister, SupportsCatalogOptions "spark.sql.catalog." + DEFAULT_CACHE_CATALOG_NAME; private static final String AT_TIMESTAMP = "at_timestamp_"; private static final String SNAPSHOT_ID = "snapshot_id_"; - private static final String BRANCH = "branch_"; - private static final String TAG = "tag_"; + private static final String BRANCH_PREFIX = "branch_"; + private static final String TAG_PREFIX = "tag_"; private static final String[] EMPTY_NAMESPACE = new String[0]; private static final SparkTableCache TABLE_CACHE = SparkTableCache.get(); @@ -132,7 +132,7 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri String tag = options.get(SparkReadOptions.TAG); Preconditions.checkArgument( Stream.of(snapshotId, asOfTimestamp, branch, tag).filter(Objects::nonNull).count() <= 1, - "Can specify at most one of snapshot-id (%s), as-of-timestamp (%s), branch (%s) and tag (%s)", + "Can specify only one of snapshot-id (%s), as-of-timestamp (%s), branch (%s), tag (%s)", snapshotId, asOfTimestamp, branch, @@ -149,11 +149,11 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri } if (branch != null) { - selector = BRANCH + branch; + selector = BRANCH_PREFIX + branch; } if (tag != null) { - selector = TAG + tag; + selector = TAG_PREFIX + tag; } CatalogManager catalogManager = spark.sessionState().catalogManager(); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index 77f580ab831c..276fbcd592ae 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -223,7 +223,7 @@ public void testSnapshotSelectionBySnapshotIdAndTimestamp() throws IOException { .option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp) .load(tableLocation)) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Can specify at most one of snapshot-id") + .hasMessageContaining("Can specify only one of snapshot-id") .hasMessageContaining("as-of-timestamp") .hasMessageContaining("branch") .hasMessageContaining("tag"); @@ -327,7 +327,7 @@ public void testSnapshotSelectionByBranchAndTagFails() throws IOException { .load(tableLocation) .show()) .isInstanceOf(IllegalArgumentException.class) - .hasMessageStartingWith("Can specify at most one of snapshot-id"); + .hasMessageStartingWith("Can specify only one of snapshot-id"); } @Test @@ -358,7 +358,7 @@ public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOExcep .load(tableLocation) .show()) .isInstanceOf(IllegalArgumentException.class) - .hasMessageStartingWith("Can specify at most one of snapshot-id"); + .hasMessageStartingWith("Can specify only one of snapshot-id"); Assertions.assertThatThrownBy( () -> @@ -370,7 +370,7 @@ public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOExcep .load(tableLocation) .show()) .isInstanceOf(IllegalArgumentException.class) - .hasMessageStartingWith("Can specify at most one of snapshot-id"); + .hasMessageStartingWith("Can specify only one of snapshot-id"); } @Test diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java index 9c3d49d392cf..e08bc4574dbf 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java @@ -441,7 +441,7 @@ public void testSpecifySnapshotAndTimestamp() { "Should not be able to specify both snapshot id and timestamp", IllegalArgumentException.class, String.format( - "Can specify at most one of snapshot-id (%s), as-of-timestamp (%s)", + "Can specify only one of snapshot-id (%s), as-of-timestamp (%s)", snapshotId, timestamp), () -> { spark From 94528cec244d3b0bdcbd72c450e860042d8ac29c Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 22 Feb 2023 14:08:18 -0800 Subject: [PATCH 11/11] address comments --- .../java/org/apache/iceberg/spark/SparkCachedTableCatalog.java | 2 ++ .../src/main/java/org/apache/iceberg/spark/SparkCatalog.java | 2 ++ .../main/java/org/apache/iceberg/spark/source/SparkTable.java | 2 +- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java index e3e15b5cc845..2533b3bd75b5 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java @@ -150,11 +150,13 @@ private Pair load(Identifier ident) throws NoSuchTableException { Matcher snapshotBasedMatcher = SNAPSHOT_ID.matcher(meta); if (snapshotBasedMatcher.matches()) { snapshotId = Long.parseLong(snapshotBasedMatcher.group(1)); + continue; } Matcher branchBasedMatcher = BRANCH.matcher(meta); if (branchBasedMatcher.matches()) { branch = branchBasedMatcher.group(1); + continue; } Matcher tagBasedMatcher = TAG.matcher(meta); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 639fb5e84d70..258f8420f1e4 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -723,11 +723,13 @@ private Table loadFromPathIdentifier(PathIdentifier ident) { Matcher id = SNAPSHOT_ID.matcher(meta); if (id.matches()) { snapshotId = Long.parseLong(id.group(1)); + continue; } Matcher branchRef = BRANCH.matcher(meta); if (branchRef.matches()) { branch = branchRef.group(1); + continue; } Matcher tagRef = TAG.matcher(meta); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 9f07b14cc987..da9160ffba7e 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -368,9 +368,9 @@ private static CaseInsensitiveStringMap addSnapshotId( scanOptions.putAll(options.asCaseSensitiveMap()); scanOptions.put(SparkReadOptions.SNAPSHOT_ID, value); scanOptions.remove(SparkReadOptions.AS_OF_TIMESTAMP); - scanOptions.remove(SparkReadOptions.BRANCH); scanOptions.remove(SparkReadOptions.TAG); + return new CaseInsensitiveStringMap(scanOptions); }