Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: Use snapshot schema when reading snapshot #3722

Merged
merged 4 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,4 +304,30 @@ public static Schema schemaFor(Table table, long snapshotId) {
// TODO: recover the schema by reading previous metadata files
return table.schema();
}

/**
* Convenience method for returning the schema of the table for a snapshot,
* when we have a snapshot id or a timestamp. Only one of them should be specified
* (non-null), or an IllegalArgumentException is thrown.
*
* @param table a {@link Table}
* @param snapshotId the ID of the snapshot
* @param timestampMillis the timestamp in millis since the Unix epoch
* @return the schema
* @throws IllegalArgumentException if both snapshotId and timestampMillis are non-null
*/
public static Schema schemaFor(Table table, Long snapshotId, Long timestampMillis) {
Preconditions.checkArgument(snapshotId == null || timestampMillis == null,
"Cannot use both snapshot id and timestamp to find a schema");

if (snapshotId != null) {
return schemaFor(table, snapshotId);
}

if (timestampMillis != null) {
return schemaFor(table, snapshotIdAsOfTime(table, timestampMillis));
}

return table.schema();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@

package org.apache.iceberg.spark;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
Expand All @@ -39,14 +43,17 @@
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.iceberg.spark.source.StagedSparkTable;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
Expand Down Expand Up @@ -84,6 +91,9 @@
*/
public class SparkCatalog extends BaseCatalog {
private static final Set<String> DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER);
private static final Splitter COMMA = Splitter.on(",");
private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)");
private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)");

private String catalogName = null;
private Catalog icebergCatalog = null;
Expand Down Expand Up @@ -122,8 +132,8 @@ protected TableIdentifier buildIdentifier(Identifier identifier) {
@Override
public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
try {
Table icebergTable = load(ident);
return new SparkTable(icebergTable, !cacheEnabled);
Pair<Table, Long> icebergTable = load(ident);
return new SparkTable(icebergTable.first(), icebergTable.second(), !cacheEnabled);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(ident);
}
Expand Down Expand Up @@ -224,7 +234,7 @@ public SparkTable alterTable(Identifier ident, TableChange... changes) throws No
}

try {
Table table = load(ident);
Table table = load(ident).first();
commitChanges(table, setLocation, setSnapshotId, pickSnapshotId, propertyChanges, schemaChanges);
return new SparkTable(table, true /* refreshEagerly */);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
Expand Down Expand Up @@ -259,7 +269,7 @@ public void renameTable(Identifier from, Identifier to) throws NoSuchTableExcept
@Override
public void invalidateTable(Identifier ident) {
try {
load(ident).refresh();
load(ident).first().refresh();
} catch (org.apache.iceberg.exceptions.NoSuchTableException ignored) {
// ignore if the table doesn't exist, it is not cached
}
Expand Down Expand Up @@ -471,10 +481,97 @@ private static void checkNotPathIdentifier(Identifier identifier, String method)
}
}

private Table load(Identifier ident) {
return isPathIdentifier(ident) ?
tables.load(((PathIdentifier) ident).location()) :
icebergCatalog.loadTable(buildIdentifier(ident));
private Pair<Table, Long> load(Identifier ident) {
if (isPathIdentifier(ident)) {
return loadFromPathIdentifier((PathIdentifier) ident);
}

try {
return Pair.of(icebergCatalog.loadTable(buildIdentifier(ident)), null);

} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
// if the original load didn't work, the identifier may be extended and include a snapshot selector
TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace()));
Table table;
try {
table = icebergCatalog.loadTable(namespaceAsIdent);
} catch (org.apache.iceberg.exceptions.NoSuchTableException ignored) {
// the namespace does not identify a table, so it cannot be a table with a snapshot selector
// throw the original exception
throw e;
}

// loading the namespace as a table worked, check the name to see if it is a valid selector
Matcher at = AT_TIMESTAMP.matcher(ident.name());
if (at.matches()) {
long asOfTimestamp = Long.parseLong(at.group(1));
return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp));
}

Matcher id = SNAPSHOT_ID.matcher(ident.name());
if (id.matches()) {
long snapshotId = Long.parseLong(id.group(1));
return Pair.of(table, snapshotId);
}

// the name wasn't a valid snapshot selector. throw the original exception
throw e;
}
}

private Pair<String, List<String>> parseLocationString(String location) {
int hashIndex = location.lastIndexOf('#');
if (hashIndex != -1 && !location.endsWith("#")) {
String baseLocation = location.substring(0, hashIndex);
List<String> metadata = COMMA.splitToList(location.substring(hashIndex + 1));
return Pair.of(baseLocation, metadata);
} else {
return Pair.of(location, ImmutableList.of());
}
}

private Pair<Table, Long> loadFromPathIdentifier(PathIdentifier ident) {
Pair<String, List<String>> parsed = parseLocationString(ident.location());

String metadataTableName = null;
Long asOfTimestamp = null;
Long snapshotId = null;
for (String meta : parsed.second()) {
if (MetadataTableType.from(meta) != null) {
metadataTableName = meta;
continue;
}

Matcher at = AT_TIMESTAMP.matcher(meta);
if (at.matches()) {
asOfTimestamp = Long.parseLong(at.group(1));
continue;
}

Matcher id = SNAPSHOT_ID.matcher(meta);
if (id.matches()) {
snapshotId = Long.parseLong(id.group(1));
}
}

Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null,
"Cannot specify both snapshot-id and as-of-timestamp: %s", ident.location());

Table table = tables.load(parsed.first() + (metadataTableName != null ? "#" + metadataTableName : ""));

if (snapshotId != null) {
return Pair.of(table, snapshotId);
} else if (asOfTimestamp != null) {
return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp));
} else {
return Pair.of(table, null);
}
}

private Identifier namespaceToIdentifier(String[] namespace) {
String[] ns = Arrays.copyOf(namespace, namespace.length - 1);
String name = namespace[ns.length];
return Identifier.of(ns, name);
}

private Catalog.TableBuilder newBuilder(Identifier ident, Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

package org.apache.iceberg.spark.source;

import java.util.Arrays;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.PathIdentifier;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
Expand Down Expand Up @@ -56,6 +58,8 @@
public class IcebergSource implements DataSourceRegister, SupportsCatalogOptions {
private static final String DEFAULT_CATALOG_NAME = "default_iceberg";
private static final String DEFAULT_CATALOG = "spark.sql.catalog." + DEFAULT_CATALOG_NAME;
private static final String AT_TIMESTAMP = "at_timestamp_";
private static final String SNAPSHOT_ID = "snapshot_id_";

@Override
public String shortName() {
Expand Down Expand Up @@ -101,24 +105,52 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri
SparkSession spark = SparkSession.active();
setupDefaultSparkCatalog(spark);
String path = options.get("path");

Long snapshotId = propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID);
Long asOfTimestamp = propertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP);
Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null,
"Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", snapshotId, asOfTimestamp);

String selector = null;

if (snapshotId != null) {
selector = SNAPSHOT_ID + snapshotId;
}

if (asOfTimestamp != null) {
rdblue marked this conversation as resolved.
Show resolved Hide resolved
selector = AT_TIMESTAMP + asOfTimestamp;
}

CatalogManager catalogManager = spark.sessionState().catalogManager();

if (path.contains("/")) {
// contains a path. Return iceberg default catalog and a PathIdentifier
String newPath = (selector == null) ? path : path + "#" + selector;
return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
new PathIdentifier(path));
new PathIdentifier(newPath));
}

final Spark3Util.CatalogAndIdentifier catalogAndIdentifier = Spark3Util.catalogAndIdentifier(
"path or identifier", spark, path);

Identifier ident = identifierWithSelector(catalogAndIdentifier.identifier(), selector);
if (catalogAndIdentifier.catalog().name().equals("spark_catalog") &&
!(catalogAndIdentifier.catalog() instanceof SparkSessionCatalog)) {
// catalog is a session catalog but does not support Iceberg. Use Iceberg instead.
return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
catalogAndIdentifier.identifier());
return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME), ident);
} else {
return catalogAndIdentifier;
return new Spark3Util.CatalogAndIdentifier(catalogAndIdentifier.catalog(), ident);
}
}

private Identifier identifierWithSelector(Identifier ident, String selector) {
if (selector == null) {
return ident;
} else {
String[] namespace = ident.namespace();
String[] ns = Arrays.copyOf(namespace, namespace.length + 1);
ns[namespace.length] = ident.name();
return Identifier.of(ns, selector);
}
}

Expand All @@ -132,6 +164,15 @@ public String extractCatalog(CaseInsensitiveStringMap options) {
return catalogAndIdentifier(options).catalog().name();
}

private static Long propertyAsLong(CaseInsensitiveStringMap options, String property) {
String value = options.get(property);
if (value != null) {
return Long.parseLong(value);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline after if

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add a blank line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a blank line.
What is the rationale for always adding a blank line after an if?
I fail to see how this makes the code more readable.
I can understand breaking a large block of code up with blank lines in general, but this is a very short method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes agree. I think it's mostly just general codestyle rules the community follows, maybe we should just put these into checkstyle instead of being human linters


return null;
}

private static void setupDefaultSparkCatalog(SparkSession spark) {
if (spark.conf().contains(DEFAULT_CATALOG)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,16 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, S
private Filter[] pushedFilters = NO_FILTERS;
private boolean ignoreResiduals = false;

SparkScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) {
SparkScanBuilder(SparkSession spark, Table table, Schema schema, CaseInsensitiveStringMap options) {
this.spark = spark;
this.table = table;
this.schema = schema;
this.readConf = new SparkReadConf(spark, table, options);
this.caseSensitive = readConf.caseSensitive();
}

private Schema lazySchema() {
if (schema == null) {
if (requestedProjection != null) {
// the projection should include all columns that will be returned, including those only used in filters
this.schema = SparkSchemaUtil.prune(table.schema(), requestedProjection, filterExpression(), caseSensitive);
} else {
this.schema = table.schema();
}
}
return schema;
SparkScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) {
this(spark, table, table.schema(), options);
}

private Expression filterExpression() {
Expand Down Expand Up @@ -106,7 +99,7 @@ public Filter[] pushFilters(Filter[] filters) {
Expression expr = SparkFilters.convert(filter);
if (expr != null) {
try {
Binder.bind(table.schema().asStruct(), expr, caseSensitive);
Binder.bind(schema.asStruct(), expr, caseSensitive);
expressions.add(expr);
pushed.add(filter);
} catch (ValidationException e) {
Expand Down Expand Up @@ -134,6 +127,9 @@ public void pruneColumns(StructType requestedSchema) {
.filter(field -> MetadataColumns.nonMetadataColumn(field.name()))
.toArray(StructField[]::new));

// the projection should include all columns that will be returned, including those only used in filters
this.schema = SparkSchemaUtil.prune(schema, requestedProjection, filterExpression(), caseSensitive);

Stream.of(requestedSchema.fields())
.map(StructField::name)
.filter(MetadataColumns::isMetadataColumn)
Expand All @@ -155,7 +151,7 @@ private Schema schemaWithMetadataColumns() {
Schema meta = new Schema(fields);

// schema or rows returned by readers
return TypeUtil.join(lazySchema(), meta);
return TypeUtil.join(schema, meta);
}

@Override
Expand Down
Loading