Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spark 3.3 read by snapshot ref schema #6717

Merged
merged 11 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
Expand All @@ -46,6 +49,8 @@ public class SparkCachedTableCatalog implements TableCatalog {
private static final Splitter COMMA = Splitter.on(",");
private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)");
private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)");
private static final Pattern BRANCH = Pattern.compile("branch_(.*)");
private static final Pattern TAG = Pattern.compile("tag_(.*)");

namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();

Expand Down Expand Up @@ -133,6 +138,8 @@ private Pair<Table, Long> load(Identifier ident) throws NoSuchTableException {

Long asOfTimestamp = null;
Long snapshotId = null;
String branch = null;
String tag = null;
for (String meta : metadata) {
Matcher timeBasedMatcher = AT_TIMESTAMP.matcher(meta);
if (timeBasedMatcher.matches()) {
Expand All @@ -144,12 +151,24 @@ private Pair<Table, Long> load(Identifier ident) throws NoSuchTableException {
if (snapshotBasedMatcher.matches()) {
snapshotId = Long.parseLong(snapshotBasedMatcher.group(1));
aokolnychyi marked this conversation as resolved.
Show resolved Hide resolved
}

Matcher branchBasedMatcher = BRANCH.matcher(meta);
if (branchBasedMatcher.matches()) {
branch = branchBasedMatcher.group(1);
aokolnychyi marked this conversation as resolved.
Show resolved Hide resolved
}

Matcher tagBasedMatcher = TAG.matcher(meta);
if (tagBasedMatcher.matches()) {
tag = tagBasedMatcher.group(1);
}
}

Preconditions.checkArgument(
asOfTimestamp == null || snapshotId == null,
"Cannot specify both snapshot and timestamp for time travel: %s",
ident);
Stream.of(snapshotId, asOfTimestamp, branch, tag).filter(Objects::nonNull).count() <= 1,
"Can specify at most one of snapshot-id (%s), as-of-timestamp (%s), and snapshot-ref (%s)",
snapshotId,
asOfTimestamp,
branch);
jackye1995 marked this conversation as resolved.
Show resolved Hide resolved

Table table = TABLE_CACHE.get(key);

Expand All @@ -161,9 +180,19 @@ private Pair<Table, Long> load(Identifier ident) throws NoSuchTableException {
return Pair.of(table, snapshotId);
} else if (asOfTimestamp != null) {
return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp));
} else {
return Pair.of(table, null);
} else if (branch != null) {
Snapshot branchSnapshot = table.snapshot(branch);
Preconditions.checkArgument(
branchSnapshot != null, "Cannot find snapshot associated with branch name: %s", branch);
return Pair.of(table, branchSnapshot.snapshotId());
} else if (tag != null) {
Snapshot tagSnapshot = table.snapshot(tag);
Preconditions.checkArgument(
tagSnapshot != null, "Cannot find snapshot associated with tag name: %s", tag);
return Pair.of(table, tagSnapshot.snapshotId());
}

return Pair.of(table, null);
namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
}

private Pair<String, List<String>> parseIdent(Identifier ident) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.CatalogProperties;
Expand All @@ -37,6 +39,7 @@
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Catalog;
Expand Down Expand Up @@ -105,6 +108,8 @@ public class SparkCatalog extends BaseCatalog {
private static final Splitter COMMA = Splitter.on(",");
private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)");
private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)");
private static final Pattern BRANCH = Pattern.compile("branch_(.*)");
namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
private static final Pattern TAG = Pattern.compile("tag_(.*)");

private String catalogName = null;
private Catalog icebergCatalog = null;
Expand Down Expand Up @@ -654,6 +659,21 @@ private Table load(Identifier ident) {
return new SparkTable(table, snapshotId, !cacheEnabled);
}

Matcher branch = BRANCH.matcher(ident.name());
if (branch.matches()) {
Snapshot branchSnapshot = table.snapshot(branch.group(1));
if (branchSnapshot != null) {
return new SparkTable(table, branchSnapshot.snapshotId(), !cacheEnabled);
}
}

Matcher tag = TAG.matcher(ident.name());
if (tag.matches()) {
Snapshot tagSnapshot = table.snapshot(tag.group(1));
if (tagSnapshot != null) {
return new SparkTable(table, tagSnapshot.snapshotId(), !cacheEnabled);
}
}
jackye1995 marked this conversation as resolved.
Show resolved Hide resolved
// the name wasn't a valid snapshot selector and did not point to the changelog
// throw the original exception
throw e;
Expand All @@ -678,6 +698,8 @@ private Table loadFromPathIdentifier(PathIdentifier ident) {
String metadataTableName = null;
Long asOfTimestamp = null;
Long snapshotId = null;
String branch = null;
String tag = null;
boolean isChangelog = false;

for (String meta : parsed.second()) {
Expand All @@ -701,12 +723,25 @@ private Table loadFromPathIdentifier(PathIdentifier ident) {
if (id.matches()) {
snapshotId = Long.parseLong(id.group(1));
}

Matcher branchRef = BRANCH.matcher(meta);
aokolnychyi marked this conversation as resolved.
Show resolved Hide resolved
if (branchRef.matches()) {
branch = branchRef.group(1);
}

Matcher tagRef = TAG.matcher(meta);
if (tagRef.matches()) {
tag = tagRef.group(1);
}
}

Preconditions.checkArgument(
asOfTimestamp == null || snapshotId == null,
"Cannot specify both snapshot-id and as-of-timestamp: %s",
ident.location());
Stream.of(snapshotId, asOfTimestamp, branch, tag).filter(Objects::nonNull).count() <= 1,
"Can specify at most one of snapshot-id (%s), as-of-timestamp (%s), branch (%s) and tag (%s)",
jackye1995 marked this conversation as resolved.
Show resolved Hide resolved
snapshotId,
asOfTimestamp,
branch,
tag);

Preconditions.checkArgument(
!isChangelog || (snapshotId == null && asOfTimestamp == null),
Expand All @@ -722,9 +757,19 @@ private Table loadFromPathIdentifier(PathIdentifier ident) {
long snapshotIdAsOfTime = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp);
return new SparkTable(table, snapshotIdAsOfTime, !cacheEnabled);

} else {
return new SparkTable(table, snapshotId, !cacheEnabled);
} else if (branch != null) {
Snapshot branchSnapshot = table.snapshot(branch);
Preconditions.checkArgument(
branchSnapshot != null, "Cannot find snapshot associated with branch name: %s", branch);
return new SparkTable(table, branchSnapshot.snapshotId(), !cacheEnabled);
} else if (tag != null) {
jackye1995 marked this conversation as resolved.
Show resolved Hide resolved
Snapshot tagSnapshot = table.snapshot(tag);
Preconditions.checkArgument(
tagSnapshot != null, "Cannot find snapshot associated with tag name: %s", tag);
return new SparkTable(table, tagSnapshot.snapshotId(), !cacheEnabled);
}

return new SparkTable(table, snapshotId, !cacheEnabled);
namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
}

private Identifier namespaceToIdentifier(String[] namespace) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.PathIdentifier;
Expand Down Expand Up @@ -67,6 +69,8 @@ public class IcebergSource implements DataSourceRegister, SupportsCatalogOptions
"spark.sql.catalog." + DEFAULT_CACHE_CATALOG_NAME;
private static final String AT_TIMESTAMP = "at_timestamp_";
private static final String SNAPSHOT_ID = "snapshot_id_";
private static final String BRANCH = "branch_";
private static final String TAG = "tag_";
private static final String[] EMPTY_NAMESPACE = new String[0];

private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
Expand Down Expand Up @@ -124,11 +128,15 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri

Long snapshotId = propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID);
Long asOfTimestamp = propertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP);
String branch = options.get(SparkReadOptions.BRANCH);
String tag = options.get(SparkReadOptions.TAG);
Preconditions.checkArgument(
asOfTimestamp == null || snapshotId == null,
"Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)",
Stream.of(snapshotId, asOfTimestamp, branch, tag).filter(Objects::nonNull).count() <= 1,
"Can specify at most one of snapshot-id (%s), as-of-timestamp (%s), branch (%s) and tag (%s)",
jackye1995 marked this conversation as resolved.
Show resolved Hide resolved
snapshotId,
asOfTimestamp);
asOfTimestamp,
branch,
tag);

String selector = null;

Expand All @@ -140,6 +148,14 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri
selector = AT_TIMESTAMP + asOfTimestamp;
}

if (branch != null) {
selector = BRANCH + branch;
jackye1995 marked this conversation as resolved.
Show resolved Hide resolved
}

if (tag != null) {
selector = TAG + tag;
}

CatalogManager catalogManager = spark.sessionState().catalogManager();

if (TABLE_CACHE.contains(path)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,8 @@ private static CaseInsensitiveStringMap addSnapshotId(
scanOptions.put(SparkReadOptions.SNAPSHOT_ID, value);
scanOptions.remove(SparkReadOptions.AS_OF_TIMESTAMP);

namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
jackye1995 marked this conversation as resolved.
Show resolved Hide resolved
scanOptions.remove(SparkReadOptions.BRANCH);
scanOptions.remove(SparkReadOptions.TAG);
return new CaseInsensitiveStringMap(scanOptions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,10 @@ public void testSnapshotSelectionBySnapshotIdAndTimestamp() throws IOException {
.option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp)
.load(tableLocation))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot specify both snapshot-id")
.hasMessageContaining("and as-of-timestamp");
.hasMessageContaining("Can specify at most one of snapshot-id")
.hasMessageContaining("as-of-timestamp")
.hasMessageContaining("branch")
.hasMessageContaining("tag");
}

@Test
Expand Down Expand Up @@ -325,7 +327,7 @@ public void testSnapshotSelectionByBranchAndTagFails() throws IOException {
.load(tableLocation)
.show())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageStartingWith("Cannot override ref, already set snapshot id=");
.hasMessageStartingWith("Can specify at most one of snapshot-id");
}

@Test
Expand Down Expand Up @@ -356,7 +358,7 @@ public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOExcep
.load(tableLocation)
.show())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageStartingWith("Cannot override ref, already set snapshot id=");
.hasMessageStartingWith("Can specify at most one of snapshot-id");

Assertions.assertThatThrownBy(
() ->
Expand All @@ -368,6 +370,64 @@ public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOExcep
.load(tableLocation)
.show())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageStartingWith("Cannot override ref, already set snapshot id=");
.hasMessageStartingWith("Can specify at most one of snapshot-id");
}

@Test
public void testSnapshotSelectionByBranchOrTagWithSchemaChange() throws IOException {
namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
String tableLocation = temp.newFolder("iceberg-table").toString();

HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
Table table = tables.create(SCHEMA, spec, tableLocation);

// produce the first snapshot
List<SimpleRecord> firstBatchRecords =
Lists.newArrayList(
new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class);
firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);

table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit();
table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit();

Dataset<Row> branchSnapshotResult =
spark.read().format("iceberg").option("branch", "branch").load(tableLocation);
List<SimpleRecord> branchSnapshotRecords =
branchSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
List<SimpleRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(firstBatchRecords);
Assert.assertEquals(
"Current snapshot rows should match", expectedRecords, branchSnapshotRecords);

Dataset<Row> tagSnapshotResult =
spark.read().format("iceberg").option("tag", "tag").load(tableLocation);
List<SimpleRecord> tagSnapshotRecords =
tagSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Assert.assertEquals("Current snapshot rows should match", expectedRecords, tagSnapshotRecords);

// Deleting a column to indicate schema change
table.updateSchema().deleteColumn("data").commit();

// The data should have the deleted column as it was captured in an earlier snapshot.
Dataset<Row> deletedColumnBranchSnapshotResult =
spark.read().format("iceberg").option("branch", "branch").load(tableLocation);
List<SimpleRecord> deletedColumnBranchSnapshotRecords =
deletedColumnBranchSnapshotResult
.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList();
Assert.assertEquals(
"Current snapshot rows should match", expectedRecords, deletedColumnBranchSnapshotRecords);

Dataset<Row> deletedColumnTagSnapshotResult =
spark.read().format("iceberg").option("branch", "branch").load(tableLocation);
List<SimpleRecord> deletedColumnTagSnapshotRecords =
deletedColumnTagSnapshotResult
.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList();
Assert.assertEquals(
"Current snapshot rows should match", expectedRecords, deletedColumnTagSnapshotRecords);
}
}