Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spark 3.3 read by snapshot ref schema #6717

Merged
merged 11 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
Expand All @@ -46,6 +49,8 @@ public class SparkCachedTableCatalog implements TableCatalog {
private static final Splitter COMMA = Splitter.on(",");
private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)");
private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)");
private static final Pattern BRANCH = Pattern.compile("branch_(.*)");
private static final Pattern TAG = Pattern.compile("tag_(.*)");

namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();

Expand Down Expand Up @@ -133,6 +138,8 @@ private Pair<Table, Long> load(Identifier ident) throws NoSuchTableException {

Long asOfTimestamp = null;
Long snapshotId = null;
String branch = null;
String tag = null;
for (String meta : metadata) {
Matcher timeBasedMatcher = AT_TIMESTAMP.matcher(meta);
if (timeBasedMatcher.matches()) {
Expand All @@ -143,13 +150,28 @@ private Pair<Table, Long> load(Identifier ident) throws NoSuchTableException {
Matcher snapshotBasedMatcher = SNAPSHOT_ID.matcher(meta);
if (snapshotBasedMatcher.matches()) {
snapshotId = Long.parseLong(snapshotBasedMatcher.group(1));
aokolnychyi marked this conversation as resolved.
Show resolved Hide resolved
continue;
}

Matcher branchBasedMatcher = BRANCH.matcher(meta);
if (branchBasedMatcher.matches()) {
branch = branchBasedMatcher.group(1);
aokolnychyi marked this conversation as resolved.
Show resolved Hide resolved
continue;
}

Matcher tagBasedMatcher = TAG.matcher(meta);
if (tagBasedMatcher.matches()) {
tag = tagBasedMatcher.group(1);
}
}

Preconditions.checkArgument(
asOfTimestamp == null || snapshotId == null,
"Cannot specify both snapshot and timestamp for time travel: %s",
ident);
Stream.of(snapshotId, asOfTimestamp, branch, tag).filter(Objects::nonNull).count() <= 1,
"Can specify only one of snapshot-id (%s), as-of-timestamp (%s), branch (%s), tag (%s)",
snapshotId,
asOfTimestamp,
branch,
tag);

Table table = TABLE_CACHE.get(key);

Expand All @@ -161,6 +183,16 @@ private Pair<Table, Long> load(Identifier ident) throws NoSuchTableException {
return Pair.of(table, snapshotId);
} else if (asOfTimestamp != null) {
return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp));
} else if (branch != null) {
Snapshot branchSnapshot = table.snapshot(branch);
Preconditions.checkArgument(
branchSnapshot != null, "Cannot find snapshot associated with branch name: %s", branch);
return Pair.of(table, branchSnapshot.snapshotId());
} else if (tag != null) {
Snapshot tagSnapshot = table.snapshot(tag);
Preconditions.checkArgument(
tagSnapshot != null, "Cannot find snapshot associated with tag name: %s", tag);
return Pair.of(table, tagSnapshot.snapshotId());
} else {
return Pair.of(table, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.CatalogProperties;
Expand All @@ -37,6 +39,7 @@
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Catalog;
Expand Down Expand Up @@ -105,6 +108,8 @@ public class SparkCatalog extends BaseCatalog {
private static final Splitter COMMA = Splitter.on(",");
private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)");
private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)");
private static final Pattern BRANCH = Pattern.compile("branch_(.*)");
namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
private static final Pattern TAG = Pattern.compile("tag_(.*)");

private String catalogName = null;
private Catalog icebergCatalog = null;
Expand Down Expand Up @@ -654,6 +659,22 @@ private Table load(Identifier ident) {
return new SparkTable(table, snapshotId, !cacheEnabled);
}

Matcher branch = BRANCH.matcher(ident.name());
if (branch.matches()) {
Snapshot branchSnapshot = table.snapshot(branch.group(1));
if (branchSnapshot != null) {
return new SparkTable(table, branchSnapshot.snapshotId(), !cacheEnabled);
}
}

Matcher tag = TAG.matcher(ident.name());
if (tag.matches()) {
Snapshot tagSnapshot = table.snapshot(tag.group(1));
if (tagSnapshot != null) {
return new SparkTable(table, tagSnapshot.snapshotId(), !cacheEnabled);
}
}
jackye1995 marked this conversation as resolved.
Show resolved Hide resolved

// the name wasn't a valid snapshot selector and did not point to the changelog
// throw the original exception
throw e;
Expand All @@ -678,6 +699,8 @@ private Table loadFromPathIdentifier(PathIdentifier ident) {
String metadataTableName = null;
Long asOfTimestamp = null;
Long snapshotId = null;
String branch = null;
String tag = null;
boolean isChangelog = false;

for (String meta : parsed.second()) {
Expand All @@ -700,13 +723,28 @@ private Table loadFromPathIdentifier(PathIdentifier ident) {
Matcher id = SNAPSHOT_ID.matcher(meta);
if (id.matches()) {
snapshotId = Long.parseLong(id.group(1));
continue;
}

Matcher branchRef = BRANCH.matcher(meta);
aokolnychyi marked this conversation as resolved.
Show resolved Hide resolved
if (branchRef.matches()) {
branch = branchRef.group(1);
continue;
}

Matcher tagRef = TAG.matcher(meta);
if (tagRef.matches()) {
tag = tagRef.group(1);
}
}

Preconditions.checkArgument(
asOfTimestamp == null || snapshotId == null,
"Cannot specify both snapshot-id and as-of-timestamp: %s",
ident.location());
Stream.of(snapshotId, asOfTimestamp, branch, tag).filter(Objects::nonNull).count() <= 1,
"Can specify only one of snapshot-id (%s), as-of-timestamp (%s), branch (%s), tag (%s)",
snapshotId,
asOfTimestamp,
branch,
tag);

Preconditions.checkArgument(
!isChangelog || (snapshotId == null && asOfTimestamp == null),
Expand All @@ -722,6 +760,18 @@ private Table loadFromPathIdentifier(PathIdentifier ident) {
long snapshotIdAsOfTime = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp);
return new SparkTable(table, snapshotIdAsOfTime, !cacheEnabled);

} else if (branch != null) {
Snapshot branchSnapshot = table.snapshot(branch);
Preconditions.checkArgument(
branchSnapshot != null, "Cannot find snapshot associated with branch name: %s", branch);
return new SparkTable(table, branchSnapshot.snapshotId(), !cacheEnabled);

} else if (tag != null) {
jackye1995 marked this conversation as resolved.
Show resolved Hide resolved
Snapshot tagSnapshot = table.snapshot(tag);
Preconditions.checkArgument(
tagSnapshot != null, "Cannot find snapshot associated with tag name: %s", tag);
return new SparkTable(table, tagSnapshot.snapshotId(), !cacheEnabled);

} else {
return new SparkTable(table, snapshotId, !cacheEnabled);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.PathIdentifier;
Expand Down Expand Up @@ -67,6 +69,8 @@ public class IcebergSource implements DataSourceRegister, SupportsCatalogOptions
"spark.sql.catalog." + DEFAULT_CACHE_CATALOG_NAME;
private static final String AT_TIMESTAMP = "at_timestamp_";
private static final String SNAPSHOT_ID = "snapshot_id_";
private static final String BRANCH_PREFIX = "branch_";
private static final String TAG_PREFIX = "tag_";
private static final String[] EMPTY_NAMESPACE = new String[0];

private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
Expand Down Expand Up @@ -124,11 +128,15 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri

Long snapshotId = propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID);
Long asOfTimestamp = propertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP);
String branch = options.get(SparkReadOptions.BRANCH);
String tag = options.get(SparkReadOptions.TAG);
Preconditions.checkArgument(
asOfTimestamp == null || snapshotId == null,
"Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)",
Stream.of(snapshotId, asOfTimestamp, branch, tag).filter(Objects::nonNull).count() <= 1,
"Can specify only one of snapshot-id (%s), as-of-timestamp (%s), branch (%s), tag (%s)",
snapshotId,
asOfTimestamp);
asOfTimestamp,
branch,
tag);

String selector = null;

Expand All @@ -140,6 +148,14 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri
selector = AT_TIMESTAMP + asOfTimestamp;
}

if (branch != null) {
selector = BRANCH_PREFIX + branch;
}

if (tag != null) {
selector = TAG_PREFIX + tag;
}

CatalogManager catalogManager = spark.sessionState().catalogManager();

if (TABLE_CACHE.contains(path)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@ private static CaseInsensitiveStringMap addSnapshotId(
scanOptions.putAll(options.asCaseSensitiveMap());
scanOptions.put(SparkReadOptions.SNAPSHOT_ID, value);
scanOptions.remove(SparkReadOptions.AS_OF_TIMESTAMP);
scanOptions.remove(SparkReadOptions.BRANCH);
scanOptions.remove(SparkReadOptions.TAG);

namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
return new CaseInsensitiveStringMap(scanOptions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,10 @@ public void testSnapshotSelectionBySnapshotIdAndTimestamp() throws IOException {
.option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp)
.load(tableLocation))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot specify both snapshot-id")
.hasMessageContaining("and as-of-timestamp");
.hasMessageContaining("Can specify only one of snapshot-id")
.hasMessageContaining("as-of-timestamp")
.hasMessageContaining("branch")
.hasMessageContaining("tag");
}

@Test
Expand Down Expand Up @@ -325,7 +327,7 @@ public void testSnapshotSelectionByBranchAndTagFails() throws IOException {
.load(tableLocation)
.show())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageStartingWith("Cannot override ref, already set snapshot id=");
.hasMessageStartingWith("Can specify only one of snapshot-id");
}

@Test
Expand Down Expand Up @@ -356,7 +358,7 @@ public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOExcep
.load(tableLocation)
.show())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageStartingWith("Cannot override ref, already set snapshot id=");
.hasMessageStartingWith("Can specify only one of snapshot-id");

Assertions.assertThatThrownBy(
() ->
Expand All @@ -368,6 +370,88 @@ public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOExcep
.load(tableLocation)
.show())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageStartingWith("Cannot override ref, already set snapshot id=");
.hasMessageStartingWith("Can specify only one of snapshot-id");
}

@Test
public void testSnapshotSelectionByBranchWithSchemaChange() throws IOException {
String tableLocation = temp.newFolder("iceberg-table").toString();

HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
Table table = tables.create(SCHEMA, spec, tableLocation);

// produce the first snapshot
List<SimpleRecord> firstBatchRecords =
Lists.newArrayList(
new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class);
firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);

table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit();

Dataset<Row> branchSnapshotResult =
spark.read().format("iceberg").option("branch", "branch").load(tableLocation);
List<SimpleRecord> branchSnapshotRecords =
branchSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
List<SimpleRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(firstBatchRecords);
Assert.assertEquals(
"Current snapshot rows should match", expectedRecords, branchSnapshotRecords);

// Deleting a column to indicate schema change
table.updateSchema().deleteColumn("data").commit();

// The data should have the deleted column as it was captured in an earlier snapshot.
Dataset<Row> deletedColumnBranchSnapshotResult =
spark.read().format("iceberg").option("branch", "branch").load(tableLocation);
List<SimpleRecord> deletedColumnBranchSnapshotRecords =
deletedColumnBranchSnapshotResult
.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList();
Assert.assertEquals(
"Current snapshot rows should match", expectedRecords, deletedColumnBranchSnapshotRecords);
}

@Test
public void testSnapshotSelectionByTagWithSchemaChange() throws IOException {
String tableLocation = temp.newFolder("iceberg-table").toString();

HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
Table table = tables.create(SCHEMA, spec, tableLocation);

// produce the first snapshot
List<SimpleRecord> firstBatchRecords =
Lists.newArrayList(
new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class);
firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);

table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit();

List<SimpleRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(firstBatchRecords);

Dataset<Row> tagSnapshotResult =
spark.read().format("iceberg").option("tag", "tag").load(tableLocation);
List<SimpleRecord> tagSnapshotRecords =
tagSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Assert.assertEquals("Current snapshot rows should match", expectedRecords, tagSnapshotRecords);

// Deleting a column to indicate schema change
table.updateSchema().deleteColumn("data").commit();

// The data should have the deleted column as it was captured in an earlier snapshot.
Dataset<Row> deletedColumnTagSnapshotResult =
spark.read().format("iceberg").option("tag", "tag").load(tableLocation);
List<SimpleRecord> deletedColumnTagSnapshotRecords =
deletedColumnTagSnapshotResult
.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList();
Assert.assertEquals(
"Current snapshot rows should match", expectedRecords, deletedColumnTagSnapshotRecords);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,8 @@ public void testSpecifySnapshotAndTimestamp() {
"Should not be able to specify both snapshot id and timestamp",
IllegalArgumentException.class,
String.format(
"Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", snapshotId, timestamp),
"Can specify only one of snapshot-id (%s), as-of-timestamp (%s)",
snapshotId, timestamp),
() -> {
spark
.read()
Expand Down