Skip to content

Commit

Permalink
Spark: Use snapshot schema when reading snapshot
Browse files Browse the repository at this point in the history
This has been implemented for Spark 2. For Spark 3, Ryan Blue proposed
a syntax for adding the snapshot id or timestamp to the table identifier
in #3269. Here we implement the Spark 3 support for using the snapshot
schema by using the proposed table identifier syntax. This is until a
new Spark 3 is released with support for AS OF in Spark SQL.
Note: The table identifier syntax is for internal use only (as in this
implementation) and not meant to be exposed as a publicly supported
syntax in SQL. However, for testing, we do test its use from SQL.
  • Loading branch information
wypoon committed Dec 13, 2021
1 parent 4c48df2 commit 31399e6
Show file tree
Hide file tree
Showing 7 changed files with 512 additions and 35 deletions.
26 changes: 26 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,4 +304,30 @@ public static Schema schemaFor(Table table, long snapshotId) {
// TODO: recover the schema by reading previous metadata files
return table.schema();
}

/**
* Convenience method for returning the schema of the table for a snapshot,
* when we have a snapshot id or a timestamp. Only one of them should be specified
* (non-null), or an IllegalArgumentException is thrown.
*
* @param table a {@link Table}
* @param snapshotId the ID of the snapshot
* @param timestampMillis the timestamp in millis since the Unix epoch
* @return the schema
* @throws IllegalArgumentException if both snapshotId and timestampMillis are non-null
*/
public static Schema schemaFor(Table table, Long snapshotId, Long timestampMillis) {
Preconditions.checkArgument(snapshotId == null || timestampMillis == null,
"Cannot use both snapshot id and timestamp to find a schema");

if (snapshotId != null) {
return schemaFor(table, snapshotId);
}

if (timestampMillis != null) {
return schemaFor(table, snapshotIdAsOfTime(table, timestampMillis));
}

return table.schema();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@

package org.apache.iceberg.spark;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
Expand All @@ -39,13 +43,16 @@
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.iceberg.spark.source.StagedSparkTable;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
Expand Down Expand Up @@ -80,6 +87,9 @@
*/
public class SparkCatalog extends BaseCatalog {
private static final Set<String> DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER);
private static final Splitter COMMA = Splitter.on(",");
private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)");
private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)");

private String catalogName = null;
private Catalog icebergCatalog = null;
Expand Down Expand Up @@ -118,8 +128,8 @@ protected TableIdentifier buildIdentifier(Identifier identifier) {
@Override
public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
try {
Table icebergTable = load(ident);
return new SparkTable(icebergTable, !cacheEnabled);
Pair<Table, Long> icebergTable = load(ident);
return new SparkTable(icebergTable.first(), icebergTable.second(), !cacheEnabled);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(ident);
}
Expand Down Expand Up @@ -220,7 +230,7 @@ public SparkTable alterTable(Identifier ident, TableChange... changes) throws No
}

try {
Table table = load(ident);
Table table = load(ident).first();
commitChanges(table, setLocation, setSnapshotId, pickSnapshotId, propertyChanges, schemaChanges);
return new SparkTable(table, true /* refreshEagerly */);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
Expand Down Expand Up @@ -255,7 +265,7 @@ public void renameTable(Identifier from, Identifier to) throws NoSuchTableExcept
@Override
public void invalidateTable(Identifier ident) {
try {
load(ident).refresh();
load(ident).first().refresh();
} catch (org.apache.iceberg.exceptions.NoSuchTableException ignored) {
// ignore if the table doesn't exist, it is not cached
}
Expand Down Expand Up @@ -455,10 +465,97 @@ private static void checkNotPathIdentifier(Identifier identifier, String method)
}
}

private Table load(Identifier ident) {
return isPathIdentifier(ident) ?
tables.load(((PathIdentifier) ident).location()) :
icebergCatalog.loadTable(buildIdentifier(ident));
private Pair<Table, Long> load(Identifier ident) {
if (isPathIdentifier(ident)) {
return loadFromPathIdentifier((PathIdentifier) ident);
}

try {
return Pair.of(icebergCatalog.loadTable(buildIdentifier(ident)), null);

} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
// if the original load didn't work, the identifier may be extended and include a snapshot selector
TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace()));
Table table;
try {
table = icebergCatalog.loadTable(namespaceAsIdent);
} catch (org.apache.iceberg.exceptions.NoSuchTableException ignored) {
// the namespace does not identify a table, so it cannot be a table with a snapshot selector
// throw the original exception
throw e;
}

// loading the namespace as a table worked, check the name to see if it is a valid selector
Matcher at = AT_TIMESTAMP.matcher(ident.name());
if (at.matches()) {
long asOfTimestamp = Long.parseLong(at.group(1));
return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp));
}

Matcher id = SNAPSHOT_ID.matcher(ident.name());
if (id.matches()) {
long snapshotId = Long.parseLong(id.group(1));
return Pair.of(table, snapshotId);
}

// the name wasn't a valid snapshot selector. throw the original exception
throw e;
}
}

private Pair<String, List<String>> parseLocationString(String location) {
int hashIndex = location.lastIndexOf('#');
if (hashIndex != -1 && !location.endsWith("#")) {
String baseLocation = location.substring(0, hashIndex);
List<String> metadata = COMMA.splitToList(location.substring(hashIndex + 1));
return Pair.of(baseLocation, metadata);
} else {
return Pair.of(location, ImmutableList.of());
}
}

private Pair<Table, Long> loadFromPathIdentifier(PathIdentifier ident) {
Pair<String, List<String>> parsed = parseLocationString(ident.location());

String metadataTableName = null;
Long asOfTimestamp = null;
Long snapshotId = null;
for (String meta : parsed.second()) {
if (MetadataTableType.from(meta) != null) {
metadataTableName = meta;
continue;
}

Matcher at = AT_TIMESTAMP.matcher(meta);
if (at.matches()) {
asOfTimestamp = Long.parseLong(at.group(1));
continue;
}

Matcher id = SNAPSHOT_ID.matcher(meta);
if (id.matches()) {
snapshotId = Long.parseLong(id.group(1));
}
}

Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null,
"Cannot specify both snapshot-id and as-of-timestamp: %s", ident.location());

Table table = tables.load(parsed.first() + (metadataTableName != null ? "#" + metadataTableName : ""));

if (snapshotId != null) {
return Pair.of(table, snapshotId);
} else if (asOfTimestamp != null) {
return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp));
} else {
return Pair.of(table, null);
}
}

private Identifier namespaceToIdentifier(String[] namespace) {
String[] ns = Arrays.copyOf(namespace, namespace.length - 1);
String name = namespace[ns.length];
return Identifier.of(ns, name);
}

private Catalog.TableBuilder newBuilder(Identifier ident, Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

package org.apache.iceberg.spark.source;

import java.util.Arrays;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.PathIdentifier;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
Expand Down Expand Up @@ -56,6 +58,8 @@
public class IcebergSource implements DataSourceRegister, SupportsCatalogOptions {
private static final String DEFAULT_CATALOG_NAME = "default_iceberg";
private static final String DEFAULT_CATALOG = "spark.sql.catalog." + DEFAULT_CATALOG_NAME;
private static final String AT_TIMESTAMP = "at_timestamp_";
private static final String SNAPSHOT_ID = "snapshot_id_";

@Override
public String shortName() {
Expand Down Expand Up @@ -101,12 +105,26 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri
SparkSession spark = SparkSession.active();
setupDefaultSparkCatalog(spark);
String path = options.get("path");

Long snapshotId = getPropertyAsLong(options, SparkReadOptions.SNAPSHOT_ID);
Long asOfTimestamp = getPropertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP);
Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null,
"Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", snapshotId, asOfTimestamp);
String selector = "";
if (snapshotId != null) {
selector = SNAPSHOT_ID + snapshotId;
}
if (asOfTimestamp != null) {
selector = AT_TIMESTAMP + asOfTimestamp;
}

CatalogManager catalogManager = spark.sessionState().catalogManager();

if (path.contains("/")) {
// contains a path. Return iceberg default catalog and a PathIdentifier
String newPath = selector.equals("") ? path : path + "#" + selector;
return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
new PathIdentifier(path));
new PathIdentifier(newPath));
}

final Spark3Util.CatalogAndIdentifier catalogAndIdentifier = Spark3Util.catalogAndIdentifier(
Expand All @@ -115,10 +133,28 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri
if (catalogAndIdentifier.catalog().name().equals("spark_catalog") &&
!(catalogAndIdentifier.catalog() instanceof SparkSessionCatalog)) {
// catalog is a session catalog but does not support Iceberg. Use Iceberg instead.
Identifier ident = catalogAndIdentifier.identifier();
return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
catalogAndIdentifier.identifier());
} else {
newIdentifier(ident, selector));
} else if (snapshotId == null && asOfTimestamp == null) {
return catalogAndIdentifier;
} else {
CatalogPlugin catalog = catalogAndIdentifier.catalog();
Identifier ident = catalogAndIdentifier.identifier();
return new Spark3Util.CatalogAndIdentifier(catalog,
newIdentifier(ident, selector));
}
}

private Identifier newIdentifier(Identifier ident, String newName) {
if (newName.equals("")) {
return ident;
} else {
String[] namespace = ident.namespace();
String name = ident.name();
String[] ns = Arrays.copyOf(namespace, namespace.length + 1);
ns[namespace.length] = name;
return Identifier.of(ns, newName);
}
}

Expand All @@ -132,6 +168,14 @@ public String extractCatalog(CaseInsensitiveStringMap options) {
return catalogAndIdentifier(options).catalog().name();
}

private static Long getPropertyAsLong(CaseInsensitiveStringMap options, String property) {
String value = options.get(property);
if (value != null) {
return Long.parseLong(value);
}
return null;
}

private static void setupDefaultSparkCatalog(SparkSession spark) {
if (spark.conf().contains(DEFAULT_CATALOG)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;
Expand All @@ -53,6 +54,8 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, S
private final Table table;
private final SparkReadConf readConf;
private final List<String> metaColumns = Lists.newArrayList();
private final Long snapshotId;
private final Long asOfTimestamp;

private Schema schema = null;
private StructType requestedProjection;
Expand All @@ -65,16 +68,22 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, S
this.spark = spark;
this.table = table;
this.readConf = new SparkReadConf(spark, table, options);
this.snapshotId = readConf.snapshotId();
this.asOfTimestamp = readConf.asOfTimestamp();
this.caseSensitive = readConf.caseSensitive();
}

private Schema snapshotSchema() {
return SnapshotUtil.schemaFor(table, snapshotId, asOfTimestamp);
}

private Schema lazySchema() {
if (schema == null) {
if (requestedProjection != null) {
// the projection should include all columns that will be returned, including those only used in filters
this.schema = SparkSchemaUtil.prune(table.schema(), requestedProjection, filterExpression(), caseSensitive);
this.schema = SparkSchemaUtil.prune(snapshotSchema(), requestedProjection, filterExpression(), caseSensitive);
} else {
this.schema = table.schema();
this.schema = snapshotSchema();
}
}
return schema;
Expand Down Expand Up @@ -106,7 +115,7 @@ public Filter[] pushFilters(Filter[] filters) {
Expression expr = SparkFilters.convert(filter);
if (expr != null) {
try {
Binder.bind(table.schema().asStruct(), expr, caseSensitive);
Binder.bind(snapshotSchema().asStruct(), expr, caseSensitive);
expressions.add(expr);
pushed.add(filter);
} catch (ValidationException e) {
Expand Down
Loading

0 comments on commit 31399e6

Please sign in to comment.