Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spark 3.3 read by snapshot ref schema #6717

Merged
merged 11 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
Expand All @@ -46,7 +48,8 @@ public class SparkCachedTableCatalog implements TableCatalog {
private static final Splitter COMMA = Splitter.on(",");
private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)");
private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)");

namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
private static final Pattern BRANCH = Pattern.compile("branch_(.*)");
private static final Pattern TAG = Pattern.compile("tag_(.*)");
private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();

private String name = null;
Expand Down Expand Up @@ -133,6 +136,8 @@ private Pair<Table, Long> load(Identifier ident) throws NoSuchTableException {

Long asOfTimestamp = null;
Long snapshotId = null;
String branch = null;
String tag = null;
for (String meta : metadata) {
Matcher timeBasedMatcher = AT_TIMESTAMP.matcher(meta);
if (timeBasedMatcher.matches()) {
Expand All @@ -144,23 +149,49 @@ private Pair<Table, Long> load(Identifier ident) throws NoSuchTableException {
if (snapshotBasedMatcher.matches()) {
snapshotId = Long.parseLong(snapshotBasedMatcher.group(1));
aokolnychyi marked this conversation as resolved.
Show resolved Hide resolved
}

Matcher branchBasedMatcher = BRANCH.matcher(meta);
if (branchBasedMatcher.matches()) {
branch = branchBasedMatcher.group(1);
aokolnychyi marked this conversation as resolved.
Show resolved Hide resolved
}

Matcher tagBasedMatcher = BRANCH.matcher(meta);
namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
if (tagBasedMatcher.matches()) {
tag = tagBasedMatcher.group(1);
}
}

Preconditions.checkArgument(
asOfTimestamp == null || snapshotId == null,
"Cannot specify both snapshot and timestamp for time travel: %s",
ident);
Stream.of(snapshotId, asOfTimestamp, branch, tag).filter(Objects::nonNull).count() <= 1,
"Can specify at most one of snapshot-id (%s), as-of-timestamp (%s), and snapshot-ref (%s)",
snapshotId,
asOfTimestamp,
branch);
jackye1995 marked this conversation as resolved.
Show resolved Hide resolved

Table table = TABLE_CACHE.get(key);

if (table == null) {
throw new NoSuchTableException(ident);
}

long snapshotIdFromTimeTravel = -1L;
namrathamyske marked this conversation as resolved.
Show resolved Hide resolved

if (snapshotId != null) {
return Pair.of(table, snapshotId);
snapshotIdFromTimeTravel = snapshotId;
} else if (asOfTimestamp != null) {
return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp));
snapshotIdFromTimeTravel = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp);
} else if (branch != null) {
Preconditions.checkArgument(
table.snapshot(branch) != null, "branch not associated with a snapshot");
namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
snapshotIdFromTimeTravel = table.snapshot(branch).snapshotId();
} else if (tag != null) {
Preconditions.checkArgument(
table.snapshot(tag) != null, "tag not associated with a snapshot");
namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
snapshotIdFromTimeTravel = table.snapshot(tag).snapshotId();
}

if (snapshotIdFromTimeTravel != -1L) {
return Pair.of(table, snapshotIdFromTimeTravel);
} else {
return Pair.of(table, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.CatalogProperties;
Expand All @@ -37,6 +39,7 @@
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Catalog;
Expand Down Expand Up @@ -105,6 +108,8 @@ public class SparkCatalog extends BaseCatalog {
private static final Splitter COMMA = Splitter.on(",");
private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)");
private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)");
private static final Pattern BRANCH = Pattern.compile("branch_(.*)");
namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
private static final Pattern TAG = Pattern.compile("tag_(.*)");

private String catalogName = null;
private Catalog icebergCatalog = null;
Expand Down Expand Up @@ -641,16 +646,35 @@ private Table load(Identifier ident) {
return new SparkChangelogTable(table, !cacheEnabled);
}

long snapshotId = -1;
Matcher at = AT_TIMESTAMP.matcher(ident.name());
if (at.matches()) {
long asOfTimestamp = Long.parseLong(at.group(1));
long snapshotId = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp);
return new SparkTable(table, snapshotId, !cacheEnabled);
snapshotId = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp);
}

Matcher id = SNAPSHOT_ID.matcher(ident.name());
if (id.matches()) {
long snapshotId = Long.parseLong(id.group(1));
snapshotId = Long.parseLong(id.group(1));
}

Matcher branch = BRANCH.matcher(ident.name());
if (branch.matches()) {
Snapshot snapshot = table.snapshot(branch.group(1));
if (snapshot != null) {
snapshotId = snapshot.snapshotId();
}
}

Matcher tag = TAG.matcher(ident.name());
if (tag.matches()) {
Snapshot snapshot = table.snapshot(tag.group(1));
if (snapshot != null) {
snapshotId = snapshot.snapshotId();
}
}
jackye1995 marked this conversation as resolved.
Show resolved Hide resolved

if (snapshotId != -1) {
return new SparkTable(table, snapshotId, !cacheEnabled);
}

Expand Down Expand Up @@ -678,6 +702,8 @@ private Table loadFromPathIdentifier(PathIdentifier ident) {
String metadataTableName = null;
Long asOfTimestamp = null;
Long snapshotId = null;
String branch = null;
String tag = null;
boolean isChangelog = false;

for (String meta : parsed.second()) {
Expand All @@ -701,12 +727,25 @@ private Table loadFromPathIdentifier(PathIdentifier ident) {
if (id.matches()) {
snapshotId = Long.parseLong(id.group(1));
}

Matcher branchRef = BRANCH.matcher(meta);
aokolnychyi marked this conversation as resolved.
Show resolved Hide resolved
if (branchRef.matches()) {
branch = branchRef.group(1);
}

Matcher tagRef = TAG.matcher(meta);
if (tagRef.matches()) {
tag = tagRef.group(1);
}
}

Preconditions.checkArgument(
asOfTimestamp == null || snapshotId == null,
"Cannot specify both snapshot-id and as-of-timestamp: %s",
ident.location());
Stream.of(snapshotId, asOfTimestamp, branch, tag).filter(Objects::nonNull).count() <= 1,
"Can specify at most one of snapshot-id (%s), as-of-timestamp (%s), branch (%s) and tag (%s)",
jackye1995 marked this conversation as resolved.
Show resolved Hide resolved
snapshotId,
asOfTimestamp,
branch,
tag);

Preconditions.checkArgument(
!isChangelog || (snapshotId == null && asOfTimestamp == null),
Expand All @@ -715,16 +754,27 @@ private Table loadFromPathIdentifier(PathIdentifier ident) {
org.apache.iceberg.Table table =
tables.load(parsed.first() + (metadataTableName != null ? "#" + metadataTableName : ""));

long snapshotIdFromTimeTravel = -1L;

if (isChangelog) {
return new SparkChangelogTable(table, !cacheEnabled);

} else if (asOfTimestamp != null) {
long snapshotIdAsOfTime = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp);
return new SparkTable(table, snapshotIdAsOfTime, !cacheEnabled);
snapshotIdFromTimeTravel = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp);
} else if (branch != null) {
Preconditions.checkArgument(
table.snapshot(branch) != null, "branch not associated with a snapshot");
snapshotIdFromTimeTravel = table.snapshot(branch).snapshotId();
} else if (tag != null) {
jackye1995 marked this conversation as resolved.
Show resolved Hide resolved
Preconditions.checkArgument(
table.snapshot(tag) != null, "tag not associated with a snapshot");
snapshotIdFromTimeTravel = table.snapshot(tag).snapshotId();
}

} else {
return new SparkTable(table, snapshotId, !cacheEnabled);
if (snapshotIdFromTimeTravel != -1L) {
return new SparkTable(table, snapshotIdFromTimeTravel, !cacheEnabled);
}
return new SparkTable(table, snapshotId, !cacheEnabled);
namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
}

private Identifier namespaceToIdentifier(String[] namespace) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.PathIdentifier;
Expand Down Expand Up @@ -67,6 +69,8 @@ public class IcebergSource implements DataSourceRegister, SupportsCatalogOptions
"spark.sql.catalog." + DEFAULT_CACHE_CATALOG_NAME;
private static final String AT_TIMESTAMP = "at_timestamp_";
private static final String SNAPSHOT_ID = "snapshot_id_";
private static final String BRANCH = "branch_";
private static final String TAG = "tag_";
private static final String[] EMPTY_NAMESPACE = new String[0];

private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
Expand Down Expand Up @@ -124,11 +128,15 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri

Long snapshotId = propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID);
Long asOfTimestamp = propertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP);
String branch = options.get(SparkReadOptions.BRANCH);
String tag = options.get(SparkReadOptions.TAG);
Preconditions.checkArgument(
asOfTimestamp == null || snapshotId == null,
"Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)",
Stream.of(snapshotId, asOfTimestamp, branch, tag).filter(Objects::nonNull).count() <= 1,
"Can specify at most one of snapshot-id (%s), as-of-timestamp (%s), branch (%s) and tag (%s)",
jackye1995 marked this conversation as resolved.
Show resolved Hide resolved
snapshotId,
asOfTimestamp);
asOfTimestamp,
branch,
tag);

String selector = null;

Expand All @@ -140,6 +148,14 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri
selector = AT_TIMESTAMP + asOfTimestamp;
}

if (branch != null) {
selector = BRANCH + branch;
jackye1995 marked this conversation as resolved.
Show resolved Hide resolved
}

if (tag != null) {
selector = TAG + tag;
}

CatalogManager catalogManager = spark.sessionState().catalogManager();

if (TABLE_CACHE.contains(path)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,8 @@ private static CaseInsensitiveStringMap addSnapshotId(
scanOptions.put(SparkReadOptions.SNAPSHOT_ID, value);
scanOptions.remove(SparkReadOptions.AS_OF_TIMESTAMP);

namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
jackye1995 marked this conversation as resolved.
Show resolved Hide resolved
scanOptions.remove(SparkReadOptions.BRANCH);
scanOptions.remove(SparkReadOptions.TAG);
return new CaseInsensitiveStringMap(scanOptions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,10 @@ public void testSnapshotSelectionBySnapshotIdAndTimestamp() throws IOException {
.option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp)
.load(tableLocation))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot specify both snapshot-id")
.hasMessageContaining("and as-of-timestamp");
.hasMessageContaining("Can specify at most one of snapshot-id")
.hasMessageContaining("as-of-timestamp")
.hasMessageContaining("branch")
.hasMessageContaining("tag");
}

@Test
Expand Down Expand Up @@ -325,7 +327,7 @@ public void testSnapshotSelectionByBranchAndTagFails() throws IOException {
.load(tableLocation)
.show())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageStartingWith("Cannot override ref, already set snapshot id=");
.hasMessageStartingWith("Can specify at most one of snapshot-id");
}

@Test
Expand Down Expand Up @@ -356,7 +358,7 @@ public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOExcep
.load(tableLocation)
.show())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageStartingWith("Cannot override ref, already set snapshot id=");
.hasMessageStartingWith("Can specify at most one of snapshot-id");

Assertions.assertThatThrownBy(
() ->
Expand All @@ -368,6 +370,64 @@ public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOExcep
.load(tableLocation)
.show())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageStartingWith("Cannot override ref, already set snapshot id=");
.hasMessageStartingWith("Can specify at most one of snapshot-id");
}

@Test
public void testSnapshotSelectionByBranchOrTagWithSchemaChange() throws IOException {
namrathamyske marked this conversation as resolved.
Show resolved Hide resolved
String tableLocation = temp.newFolder("iceberg-table").toString();

HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
Table table = tables.create(SCHEMA, spec, tableLocation);

// produce the first snapshot
List<SimpleRecord> firstBatchRecords =
Lists.newArrayList(
new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class);
firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);

table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit();
table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit();

Dataset<Row> branchSnapshotResult =
spark.read().format("iceberg").option("branch", "branch").load(tableLocation);
List<SimpleRecord> branchSnapshotRecords =
branchSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
List<SimpleRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(firstBatchRecords);
Assert.assertEquals(
"Current snapshot rows should match", expectedRecords, branchSnapshotRecords);

Dataset<Row> tagSnapshotResult =
spark.read().format("iceberg").option("tag", "tag").load(tableLocation);
List<SimpleRecord> tagSnapshotRecords =
tagSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Assert.assertEquals("Current snapshot rows should match", expectedRecords, tagSnapshotRecords);

// Deleting a column to indicate schema change
table.updateSchema().deleteColumn("data").commit();

// The data should have the deleted column as it was captured in an earlier snapshot.
Dataset<Row> deletedColumnBranchSnapshotResult =
spark.read().format("iceberg").option("branch", "branch").load(tableLocation);
List<SimpleRecord> deletedColumnBranchSnapshotRecords =
deletedColumnBranchSnapshotResult
.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList();
Assert.assertEquals(
"Current snapshot rows should match", expectedRecords, deletedColumnBranchSnapshotRecords);

Dataset<Row> deletedColumnTagSnapshotResult =
spark.read().format("iceberg").option("branch", "branch").load(tableLocation);
List<SimpleRecord> deletedColumnTagSnapshotRecords =
deletedColumnTagSnapshotResult
.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList();
Assert.assertEquals(
"Current snapshot rows should match", expectedRecords, deletedColumnTagSnapshotRecords);
}
}