From 152ab2670e146dd611bb949cad769a6a8e0fc899 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Sun, 12 Dec 2021 15:09:12 -0800 Subject: [PATCH 1/4] Spark: Use snapshot schema when reading snapshot This has been implemented for Spark 2. For Spark 3, Ryan Blue proposed a syntax for adding the snapshot id or timestamp to the table identifier in #3269. Here we implement the Spark 3 support for using the snapshot schema by using the proposed table identifier syntax. This is until a new Spark 3 is released with support for AS OF in Spark SQL. Note: The table identifier syntax is for internal use only (as in this implementation) and not meant to be exposed as a publicly supported syntax in SQL. However, for testing, we do test its use from SQL. --- .../org/apache/iceberg/util/SnapshotUtil.java | 26 ++ .../apache/iceberg/spark/SparkCatalog.java | 113 ++++++++- .../iceberg/spark/source/IcebergSource.java | 50 +++- .../spark/source/SparkScanBuilder.java | 15 +- .../iceberg/spark/source/SparkTable.java | 72 ++++-- .../source/TestIcebergSourceTablesBase.java | 233 +++++++++++++++++- .../apache/iceberg/spark/sql/TestSelect.java | 38 +++ 7 files changed, 512 insertions(+), 35 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java index a21c7ea92e87..083aee90a383 100644 --- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java @@ -304,4 +304,30 @@ public static Schema schemaFor(Table table, long snapshotId) { // TODO: recover the schema by reading previous metadata files return table.schema(); } + + /** + * Convenience method for returning the schema of the table for a snapshot, + * when we have a snapshot id or a timestamp. Only one of them should be specified + * (non-null), or an IllegalArgumentException is thrown. + * + * @param table a {@link Table} + * @param snapshotId the ID of the snapshot + * @param timestampMillis the timestamp in millis since the Unix epoch + * @return the schema + * @throws IllegalArgumentException if both snapshotId and timestampMillis are non-null + */ + public static Schema schemaFor(Table table, Long snapshotId, Long timestampMillis) { + Preconditions.checkArgument(snapshotId == null || timestampMillis == null, + "Cannot use both snapshot id and timestamp to find a schema"); + + if (snapshotId != null) { + return schemaFor(table, snapshotId); + } + + if (timestampMillis != null) { + return schemaFor(table, snapshotIdAsOfTime(table, timestampMillis)); + } + + return table.schema(); + } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 2c42310f1a54..6d1a924d87cb 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -19,14 +19,18 @@ package org.apache.iceberg.spark; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CachingCatalog; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; @@ -39,6 +43,7 @@ import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -46,7 +51,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.source.SparkTable; import org.apache.iceberg.spark.source.StagedSparkTable; +import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; @@ -84,6 +91,9 @@ */ public class SparkCatalog extends BaseCatalog { private static final Set DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER); + private static final Splitter COMMA = Splitter.on(","); + private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)"); + private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)"); private String catalogName = null; private Catalog icebergCatalog = null; @@ -122,8 +132,8 @@ protected TableIdentifier buildIdentifier(Identifier identifier) { @Override public SparkTable loadTable(Identifier ident) throws NoSuchTableException { try { - Table icebergTable = load(ident); - return new SparkTable(icebergTable, !cacheEnabled); + Pair icebergTable = load(ident); + return new SparkTable(icebergTable.first(), icebergTable.second(), !cacheEnabled); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); } @@ -224,7 +234,7 @@ public SparkTable alterTable(Identifier ident, TableChange... changes) throws No } try { - Table table = load(ident); + Table table = load(ident).first(); commitChanges(table, setLocation, setSnapshotId, pickSnapshotId, propertyChanges, schemaChanges); return new SparkTable(table, true /* refreshEagerly */); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { @@ -259,7 +269,7 @@ public void renameTable(Identifier from, Identifier to) throws NoSuchTableExcept @Override public void invalidateTable(Identifier ident) { try { - load(ident).refresh(); + load(ident).first().refresh(); } catch (org.apache.iceberg.exceptions.NoSuchTableException ignored) { // ignore if the table doesn't exist, it is not cached } @@ -471,10 +481,97 @@ private static void checkNotPathIdentifier(Identifier identifier, String method) } } - private Table load(Identifier ident) { - return isPathIdentifier(ident) ? - tables.load(((PathIdentifier) ident).location()) : - icebergCatalog.loadTable(buildIdentifier(ident)); + private Pair load(Identifier ident) { + if (isPathIdentifier(ident)) { + return loadFromPathIdentifier((PathIdentifier) ident); + } + + try { + return Pair.of(icebergCatalog.loadTable(buildIdentifier(ident)), null); + + } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { + // if the original load didn't work, the identifier may be extended and include a snapshot selector + TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace())); + Table table; + try { + table = icebergCatalog.loadTable(namespaceAsIdent); + } catch (org.apache.iceberg.exceptions.NoSuchTableException ignored) { + // the namespace does not identify a table, so it cannot be a table with a snapshot selector + // throw the original exception + throw e; + } + + // loading the namespace as a table worked, check the name to see if it is a valid selector + Matcher at = AT_TIMESTAMP.matcher(ident.name()); + if (at.matches()) { + long asOfTimestamp = Long.parseLong(at.group(1)); + return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp)); + } + + Matcher id = SNAPSHOT_ID.matcher(ident.name()); + if (id.matches()) { + long snapshotId = Long.parseLong(id.group(1)); + return Pair.of(table, snapshotId); + } + + // the name wasn't a valid snapshot selector. throw the original exception + throw e; + } + } + + private Pair> parseLocationString(String location) { + int hashIndex = location.lastIndexOf('#'); + if (hashIndex != -1 && !location.endsWith("#")) { + String baseLocation = location.substring(0, hashIndex); + List metadata = COMMA.splitToList(location.substring(hashIndex + 1)); + return Pair.of(baseLocation, metadata); + } else { + return Pair.of(location, ImmutableList.of()); + } + } + + private Pair loadFromPathIdentifier(PathIdentifier ident) { + Pair> parsed = parseLocationString(ident.location()); + + String metadataTableName = null; + Long asOfTimestamp = null; + Long snapshotId = null; + for (String meta : parsed.second()) { + if (MetadataTableType.from(meta) != null) { + metadataTableName = meta; + continue; + } + + Matcher at = AT_TIMESTAMP.matcher(meta); + if (at.matches()) { + asOfTimestamp = Long.parseLong(at.group(1)); + continue; + } + + Matcher id = SNAPSHOT_ID.matcher(meta); + if (id.matches()) { + snapshotId = Long.parseLong(id.group(1)); + } + } + + Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null, + "Cannot specify both snapshot-id and as-of-timestamp: %s", ident.location()); + + Table table = tables.load(parsed.first() + (metadataTableName != null ? "#" + metadataTableName : "")); + + if (snapshotId != null) { + return Pair.of(table, snapshotId); + } else if (asOfTimestamp != null) { + return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp)); + } else { + return Pair.of(table, null); + } + } + + private Identifier namespaceToIdentifier(String[] namespace) { + String[] ns = Arrays.copyOf(namespace, namespace.length - 1); + String name = namespace[ns.length]; + return Identifier.of(ns, name); } private Catalog.TableBuilder newBuilder(Identifier ident, Schema schema) { diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java index 98fcc0e85cf4..f9c7f1573384 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java @@ -19,11 +19,13 @@ package org.apache.iceberg.spark.source; +import java.util.Arrays; import java.util.Map; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.spark.PathIdentifier; import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSessionCatalog; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; @@ -56,6 +58,8 @@ public class IcebergSource implements DataSourceRegister, SupportsCatalogOptions { private static final String DEFAULT_CATALOG_NAME = "default_iceberg"; private static final String DEFAULT_CATALOG = "spark.sql.catalog." + DEFAULT_CATALOG_NAME; + private static final String AT_TIMESTAMP = "at_timestamp_"; + private static final String SNAPSHOT_ID = "snapshot_id_"; @Override public String shortName() { @@ -101,12 +105,26 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri SparkSession spark = SparkSession.active(); setupDefaultSparkCatalog(spark); String path = options.get("path"); + + Long snapshotId = getPropertyAsLong(options, SparkReadOptions.SNAPSHOT_ID); + Long asOfTimestamp = getPropertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP); + Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null, + "Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", snapshotId, asOfTimestamp); + String selector = ""; + if (snapshotId != null) { + selector = SNAPSHOT_ID + snapshotId; + } + if (asOfTimestamp != null) { + selector = AT_TIMESTAMP + asOfTimestamp; + } + CatalogManager catalogManager = spark.sessionState().catalogManager(); if (path.contains("/")) { // contains a path. Return iceberg default catalog and a PathIdentifier + String newPath = selector.equals("") ? path : path + "#" + selector; return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME), - new PathIdentifier(path)); + new PathIdentifier(newPath)); } final Spark3Util.CatalogAndIdentifier catalogAndIdentifier = Spark3Util.catalogAndIdentifier( @@ -115,10 +133,28 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri if (catalogAndIdentifier.catalog().name().equals("spark_catalog") && !(catalogAndIdentifier.catalog() instanceof SparkSessionCatalog)) { // catalog is a session catalog but does not support Iceberg. Use Iceberg instead. + Identifier ident = catalogAndIdentifier.identifier(); return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME), - catalogAndIdentifier.identifier()); - } else { + newIdentifier(ident, selector)); + } else if (snapshotId == null && asOfTimestamp == null) { return catalogAndIdentifier; + } else { + CatalogPlugin catalog = catalogAndIdentifier.catalog(); + Identifier ident = catalogAndIdentifier.identifier(); + return new Spark3Util.CatalogAndIdentifier(catalog, + newIdentifier(ident, selector)); + } + } + + private Identifier newIdentifier(Identifier ident, String newName) { + if (newName.equals("")) { + return ident; + } else { + String[] namespace = ident.namespace(); + String name = ident.name(); + String[] ns = Arrays.copyOf(namespace, namespace.length + 1); + ns[namespace.length] = name; + return Identifier.of(ns, newName); } } @@ -132,6 +168,14 @@ public String extractCatalog(CaseInsensitiveStringMap options) { return catalogAndIdentifier(options).catalog().name(); } + private static Long getPropertyAsLong(CaseInsensitiveStringMap options, String property) { + String value = options.get(property); + if (value != null) { + return Long.parseLong(value); + } + return null; + } + private static void setupDefaultSparkCatalog(SparkSession spark) { if (spark.conf().contains(DEFAULT_CATALOG)) { return; diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 04a61db13a8d..9c03d79655d5 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -36,6 +36,7 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.connector.read.ScanBuilder; @@ -53,6 +54,8 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, S private final Table table; private final SparkReadConf readConf; private final List metaColumns = Lists.newArrayList(); + private final Long snapshotId; + private final Long asOfTimestamp; private Schema schema = null; private StructType requestedProjection; @@ -65,16 +68,22 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, S this.spark = spark; this.table = table; this.readConf = new SparkReadConf(spark, table, options); + this.snapshotId = readConf.snapshotId(); + this.asOfTimestamp = readConf.asOfTimestamp(); this.caseSensitive = readConf.caseSensitive(); } + private Schema snapshotSchema() { + return SnapshotUtil.schemaFor(table, snapshotId, asOfTimestamp); + } + private Schema lazySchema() { if (schema == null) { if (requestedProjection != null) { // the projection should include all columns that will be returned, including those only used in filters - this.schema = SparkSchemaUtil.prune(table.schema(), requestedProjection, filterExpression(), caseSensitive); + this.schema = SparkSchemaUtil.prune(snapshotSchema(), requestedProjection, filterExpression(), caseSensitive); } else { - this.schema = table.schema(); + this.schema = snapshotSchema(); } } return schema; @@ -106,7 +115,7 @@ public Filter[] pushFilters(Filter[] filters) { Expression expr = SparkFilters.convert(filter); if (expr != null) { try { - Binder.bind(table.schema().asStruct(), expr, caseSensitive); + Binder.bind(snapshotSchema().asStruct(), expr, caseSensitive); expressions.add(expr); pushed.add(filter); } catch (ValidationException e) { diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 70b1e5d24f59..a79d10278370 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -27,6 +27,7 @@ import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Partitioning; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; @@ -37,14 +38,17 @@ import org.apache.iceberg.expressions.Projections; import org.apache.iceberg.expressions.StrictMetricsEvaluator; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkFilters; +import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.catalog.MetadataColumn; import org.apache.spark.sql.connector.catalog.SupportsDelete; @@ -80,7 +84,7 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table, TableCapability.OVERWRITE_DYNAMIC); private final Table icebergTable; - private final StructType requestedSchema; + private final Long snapshotId; private final boolean refreshEagerly; private StructType lazyTableSchema = null; private SparkSession lazySpark = null; @@ -89,15 +93,10 @@ public SparkTable(Table icebergTable, boolean refreshEagerly) { this(icebergTable, null, refreshEagerly); } - public SparkTable(Table icebergTable, StructType requestedSchema, boolean refreshEagerly) { + public SparkTable(Table icebergTable, Long snapshotId, boolean refreshEagerly) { this.icebergTable = icebergTable; - this.requestedSchema = requestedSchema; + this.snapshotId = snapshotId; this.refreshEagerly = refreshEagerly; - - if (requestedSchema != null) { - // convert the requested schema to throw an exception if any requested fields are unknown - SparkSchemaUtil.convert(icebergTable.schema(), requestedSchema); - } } private SparkSession sparkSession() { @@ -117,14 +116,14 @@ public String name() { return icebergTable.toString(); } + private Schema snapshotSchema() { + return SnapshotUtil.schemaFor(icebergTable, snapshotId, null); + } + @Override public StructType schema() { if (lazyTableSchema == null) { - if (requestedSchema != null) { - this.lazyTableSchema = SparkSchemaUtil.convert(SparkSchemaUtil.prune(icebergTable.schema(), requestedSchema)); - } else { - this.lazyTableSchema = SparkSchemaUtil.convert(icebergTable.schema()); - } + this.lazyTableSchema = SparkSchemaUtil.convert(snapshotSchema()); } return lazyTableSchema; @@ -186,22 +185,24 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { icebergTable.refresh(); } - SparkScanBuilder scanBuilder = new SparkScanBuilder(sparkSession(), icebergTable, options); - - if (requestedSchema != null) { - scanBuilder.pruneColumns(requestedSchema); - } - - return scanBuilder; + return new SparkScanBuilder(sparkSession(), icebergTable, addSnapshotId(options)); } @Override public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { + Preconditions.checkArgument( + snapshotId == null, + "Cannot write to table at a specific snapshot: %s", snapshotId); + return new SparkWriteBuilder(sparkSession(), icebergTable, info); } @Override public boolean canDeleteWhere(Filter[] filters) { + Preconditions.checkArgument( + snapshotId == null, + "Cannot delete from table at a specific snapshot: %s", snapshotId); + Expression deleteExpr = Expressions.alwaysTrue(); for (Filter filter : filters) { @@ -286,4 +287,35 @@ public int hashCode() { // use only name in order to correctly invalidate Spark cache return icebergTable.name().hashCode(); } + + private CaseInsensitiveStringMap addSnapshotId(CaseInsensitiveStringMap options) { + if (snapshotId != null) { + // If the table is loaded using the Spark DataFrame API, and option("snapshot-id", ) + // or option("as-of-timestamp", ) is applied to the DataFrameReader, SparkTable will be + // constructed with a non-null snapshotId. Subsequently SparkTable#newScanBuilder will be called + // with the options, which will include "snapshot-id" or "as-of-timestamp". + // On the other hand, if the table is loaded using SQL, with the table suffixed with a snapshot + // or timestamp selector, then SparkTable will be constructed with a non-null snapshotId, but + // SparkTable#newScanBuilder will be called without the "snapshot-id" or "as-of-timestamp" option. + // We therefore add a "snapshot-id" option here in this latter case. + // As a consistency check, if "snapshot-id" is in the options, the id must match what we already + // have. + SparkReadConf readConf = new SparkReadConf(sparkSession(), icebergTable, options); + Long snapshotIdFromOptions = readConf.snapshotId(); + Long asOfTimestamp = readConf.asOfTimestamp(); + Preconditions.checkArgument( + snapshotIdFromOptions == null || snapshotIdFromOptions.longValue() == snapshotId.longValue(), + "Cannot override snapshot ID more than once: %s", snapshotIdFromOptions); + + Map scanOptions = Maps.newHashMap(); + scanOptions.putAll(options.asCaseSensitiveMap()); + if (snapshotIdFromOptions == null && asOfTimestamp == null) { + scanOptions.put(SparkReadOptions.SNAPSHOT_ID, String.valueOf(snapshotId)); + } + + return new CaseInsensitiveStringMap(scanOptions); + } + + return options; + } } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 6e652febcd3b..20e0709b8a9a 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -44,6 +44,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.SparkTestBase; @@ -54,7 +55,9 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.types.StructType; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -70,6 +73,17 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase { optional(2, "data", Types.StringType.get()) ); + private static final Schema SCHEMA2 = new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(2, "data", Types.StringType.get()), + optional(3, "category", Types.StringType.get()) + ); + + private static final Schema SCHEMA3 = new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(3, "category", Types.StringType.get()) + ); + private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("id").build(); @Rule @@ -1113,7 +1127,7 @@ public void testPartitionsTable() { // check time travel List actualAfterFirstCommit = spark.read() .format("iceberg") - .option("snapshot-id", String.valueOf(firstCommitId)) + .option(SparkReadOptions.SNAPSHOT_ID, String.valueOf(firstCommitId)) .load(loadLocation(tableIdentifier, "partitions")) .orderBy("partition.id") .collectAsList(); @@ -1141,6 +1155,223 @@ public void testPartitionsTable() { } } + @Test + public synchronized void testSnapshotReadAfterAddColumn() { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); + Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned()); + + List originalRecords = Lists.newArrayList( + RowFactory.create(1, "x"), + RowFactory.create(2, "y"), + RowFactory.create(3, "z")); + + StructType originalSparkSchema = SparkSchemaUtil.convert(SCHEMA); + Dataset inputDf = spark.createDataFrame(originalRecords, originalSparkSchema); + inputDf.select("id", "data").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + Dataset resultDf = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf.orderBy("id").collectAsList()); + + Snapshot snapshot1 = table.currentSnapshot(); + + table.updateSchema().addColumn("category", Types.StringType.get()).commit(); + + List newRecords = Lists.newArrayList( + RowFactory.create(4, "xy", "B"), + RowFactory.create(5, "xyz", "C")); + + StructType newSparkSchema = SparkSchemaUtil.convert(SCHEMA2); + Dataset inputDf2 = spark.createDataFrame(newRecords, newSparkSchema); + inputDf2.select("id", "data", "category").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + List updatedRecords = Lists.newArrayList( + RowFactory.create(1, "x", null), + RowFactory.create(2, "y", null), + RowFactory.create(3, "z", null), + RowFactory.create(4, "xy", "B"), + RowFactory.create(5, "xyz", "C")); + + Dataset resultDf2 = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", updatedRecords, + resultDf2.orderBy("id").collectAsList()); + + Dataset resultDf3 = spark.read() + .format("iceberg") + .option(SparkReadOptions.SNAPSHOT_ID, snapshot1.snapshotId()) + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf3.orderBy("id").collectAsList()); + Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf3.schema()); + } + + @Test + public synchronized void testSnapshotReadAfterDropColumn() { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); + Table table = createTable(tableIdentifier, SCHEMA2, PartitionSpec.unpartitioned()); + + List originalRecords = Lists.newArrayList( + RowFactory.create(1, "x", "A"), + RowFactory.create(2, "y", "A"), + RowFactory.create(3, "z", "B")); + + StructType originalSparkSchema = SparkSchemaUtil.convert(SCHEMA2); + Dataset inputDf = spark.createDataFrame(originalRecords, originalSparkSchema); + inputDf.select("id", "data", "category").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + Dataset resultDf = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf.orderBy("id").collectAsList()); + + long tsBeforeDropColumn = waitUntilAfter(System.currentTimeMillis()); + table.updateSchema().deleteColumn("data").commit(); + long tsAfterDropColumn = waitUntilAfter(System.currentTimeMillis()); + + List newRecords = Lists.newArrayList( + RowFactory.create(4, "B"), + RowFactory.create(5, "C")); + + StructType newSparkSchema = SparkSchemaUtil.convert(SCHEMA3); + Dataset inputDf2 = spark.createDataFrame(newRecords, newSparkSchema); + inputDf2.select("id", "category").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + List updatedRecords = Lists.newArrayList( + RowFactory.create(1, "A"), + RowFactory.create(2, "A"), + RowFactory.create(3, "B"), + RowFactory.create(4, "B"), + RowFactory.create(5, "C")); + + Dataset resultDf2 = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", updatedRecords, + resultDf2.orderBy("id").collectAsList()); + + Dataset resultDf3 = spark.read() + .format("iceberg") + .option(SparkReadOptions.AS_OF_TIMESTAMP, tsBeforeDropColumn) + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf3.orderBy("id").collectAsList()); + Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf3.schema()); + + // At tsAfterDropColumn, there has been a schema change, but no new snapshot, + // so the snapshot as of tsAfterDropColumn is the same as that as of tsBeforeDropColumn. + Dataset resultDf4 = spark.read() + .format("iceberg") + .option(SparkReadOptions.AS_OF_TIMESTAMP, tsAfterDropColumn) + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf4.orderBy("id").collectAsList()); + Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf4.schema()); + } + + @Test + public synchronized void testSnapshotReadAfterAddAndDropColumn() { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); + Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned()); + + List originalRecords = Lists.newArrayList( + RowFactory.create(1, "x"), + RowFactory.create(2, "y"), + RowFactory.create(3, "z")); + + StructType originalSparkSchema = SparkSchemaUtil.convert(SCHEMA); + Dataset inputDf = spark.createDataFrame(originalRecords, originalSparkSchema); + inputDf.select("id", "data").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + Dataset resultDf = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf.orderBy("id").collectAsList()); + + Snapshot snapshot1 = table.currentSnapshot(); + + table.updateSchema().addColumn("category", Types.StringType.get()).commit(); + + List newRecords = Lists.newArrayList( + RowFactory.create(4, "xy", "B"), + RowFactory.create(5, "xyz", "C")); + + StructType sparkSchemaAfterAddColumn = SparkSchemaUtil.convert(SCHEMA2); + Dataset inputDf2 = spark.createDataFrame(newRecords, sparkSchemaAfterAddColumn); + inputDf2.select("id", "data", "category").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + List updatedRecords = Lists.newArrayList( + RowFactory.create(1, "x", null), + RowFactory.create(2, "y", null), + RowFactory.create(3, "z", null), + RowFactory.create(4, "xy", "B"), + RowFactory.create(5, "xyz", "C")); + + Dataset resultDf2 = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", updatedRecords, + resultDf2.orderBy("id").collectAsList()); + + table.updateSchema().deleteColumn("data").commit(); + + List recordsAfterDropColumn = Lists.newArrayList( + RowFactory.create(1, null), + RowFactory.create(2, null), + RowFactory.create(3, null), + RowFactory.create(4, "B"), + RowFactory.create(5, "C")); + + Dataset resultDf3 = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", recordsAfterDropColumn, + resultDf3.orderBy("id").collectAsList()); + + Dataset resultDf4 = spark.read() + .format("iceberg") + .option(SparkReadOptions.SNAPSHOT_ID, snapshot1.snapshotId()) + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf4.orderBy("id").collectAsList()); + Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf4.schema()); + } + @Test public void testRemoveOrphanFilesActionSupport() throws InterruptedException { TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java index 846e234cba07..476504e6fdcc 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java @@ -120,4 +120,42 @@ public void testMetadataTables() { ImmutableList.of(row(ANY, ANY, null, "append", ANY, ANY)), sql("SELECT * FROM %s.snapshots", tableName)); } + + @Test + public void testSnapshotInTableName() { + Assume.assumeFalse( + "Spark session catalog does not support extended table names", + "spark_catalog".equals(catalogName)); + + // get the snapshot ID of the last write and get the current row set as expected + long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId(); + List expected = sql("SELECT * FROM %s", tableName); + + // create a second snapshot + sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); + + String prefix = "snapshot_id_"; + // read the table at the snapshot + List actual = sql("SELECT * FROM %s.%s", tableName, prefix + snapshotId); + assertEquals("Snapshot at specific ID, prefix " + prefix, expected, actual); + } + + @Test + public void testTimestampInTableName() { + Assume.assumeFalse( + "Spark session catalog does not support extended table names", + "spark_catalog".equals(catalogName)); + + // get a timestamp just after the last write and get the current row set as expected + long timestamp = validationCatalog.loadTable(tableIdent).currentSnapshot().timestampMillis() + 2; + List expected = sql("SELECT * FROM %s", tableName); + + // create a second snapshot + sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); + + String prefix = "at_timestamp_"; + // read the table at the snapshot + List actual = sql("SELECT * FROM %s.%s", tableName, prefix + timestamp); + assertEquals("Snapshot at time, prefix " + prefix, expected, actual); + } } From 7ba1eab17a991186e0f906a2325afcc297c2473c Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Mon, 13 Dec 2021 19:15:56 -0800 Subject: [PATCH 2/4] Address feedback from Ryan Blue. Add a Schema parameter to the SparkScanBuilder constructor, so that we can pass the snapshot schema in when constructing it. In SparkTable#newScanBuilder, construct SparkScanBuilder with the snapshot schema. --- .../iceberg/spark/source/IcebergSource.java | 34 ++++++++-------- .../spark/source/SparkScanBuilder.java | 31 +++++---------- .../iceberg/spark/source/SparkTable.java | 39 +++++++++---------- 3 files changed, 43 insertions(+), 61 deletions(-) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java index f9c7f1573384..b5e97c21b668 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java @@ -106,14 +106,17 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri setupDefaultSparkCatalog(spark); String path = options.get("path"); - Long snapshotId = getPropertyAsLong(options, SparkReadOptions.SNAPSHOT_ID); - Long asOfTimestamp = getPropertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP); + Long snapshotId = propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID); + Long asOfTimestamp = propertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP); Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null, "Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", snapshotId, asOfTimestamp); - String selector = ""; + + String selector = null; + if (snapshotId != null) { selector = SNAPSHOT_ID + snapshotId; } + if (asOfTimestamp != null) { selector = AT_TIMESTAMP + asOfTimestamp; } @@ -122,7 +125,7 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri if (path.contains("/")) { // contains a path. Return iceberg default catalog and a PathIdentifier - String newPath = selector.equals("") ? path : path + "#" + selector; + String newPath = (selector == null) ? path : path + "#" + selector; return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME), new PathIdentifier(newPath)); } @@ -130,31 +133,26 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri final Spark3Util.CatalogAndIdentifier catalogAndIdentifier = Spark3Util.catalogAndIdentifier( "path or identifier", spark, path); + Identifier ident = identifierWithSelector(catalogAndIdentifier.identifier(), selector); if (catalogAndIdentifier.catalog().name().equals("spark_catalog") && !(catalogAndIdentifier.catalog() instanceof SparkSessionCatalog)) { // catalog is a session catalog but does not support Iceberg. Use Iceberg instead. - Identifier ident = catalogAndIdentifier.identifier(); return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME), - newIdentifier(ident, selector)); - } else if (snapshotId == null && asOfTimestamp == null) { - return catalogAndIdentifier; + ident); } else { - CatalogPlugin catalog = catalogAndIdentifier.catalog(); - Identifier ident = catalogAndIdentifier.identifier(); - return new Spark3Util.CatalogAndIdentifier(catalog, - newIdentifier(ident, selector)); + return new Spark3Util.CatalogAndIdentifier(catalogAndIdentifier.catalog(), + ident); } } - private Identifier newIdentifier(Identifier ident, String newName) { - if (newName.equals("")) { + private Identifier identifierWithSelector(Identifier ident, String selector) { + if (selector == null) { return ident; } else { String[] namespace = ident.namespace(); - String name = ident.name(); String[] ns = Arrays.copyOf(namespace, namespace.length + 1); - ns[namespace.length] = name; - return Identifier.of(ns, newName); + ns[namespace.length] = ident.name(); + return Identifier.of(ns, selector); } } @@ -168,7 +166,7 @@ public String extractCatalog(CaseInsensitiveStringMap options) { return catalogAndIdentifier(options).catalog().name(); } - private static Long getPropertyAsLong(CaseInsensitiveStringMap options, String property) { + private static Long propertyAsLong(CaseInsensitiveStringMap options, String property) { String value = options.get(property); if (value != null) { return Long.parseLong(value); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 9c03d79655d5..f00d41f575d3 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -36,7 +36,6 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.connector.read.ScanBuilder; @@ -54,8 +53,6 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, S private final Table table; private final SparkReadConf readConf; private final List metaColumns = Lists.newArrayList(); - private final Long snapshotId; - private final Long asOfTimestamp; private Schema schema = null; private StructType requestedProjection; @@ -64,29 +61,16 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, S private Filter[] pushedFilters = NO_FILTERS; private boolean ignoreResiduals = false; - SparkScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) { + SparkScanBuilder(SparkSession spark, Table table, Schema schema, CaseInsensitiveStringMap options) { this.spark = spark; this.table = table; + this.schema = schema; this.readConf = new SparkReadConf(spark, table, options); - this.snapshotId = readConf.snapshotId(); - this.asOfTimestamp = readConf.asOfTimestamp(); this.caseSensitive = readConf.caseSensitive(); } - private Schema snapshotSchema() { - return SnapshotUtil.schemaFor(table, snapshotId, asOfTimestamp); - } - - private Schema lazySchema() { - if (schema == null) { - if (requestedProjection != null) { - // the projection should include all columns that will be returned, including those only used in filters - this.schema = SparkSchemaUtil.prune(snapshotSchema(), requestedProjection, filterExpression(), caseSensitive); - } else { - this.schema = snapshotSchema(); - } - } - return schema; + SparkScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) { + this(spark, table, table.schema(), options); } private Expression filterExpression() { @@ -115,7 +99,7 @@ public Filter[] pushFilters(Filter[] filters) { Expression expr = SparkFilters.convert(filter); if (expr != null) { try { - Binder.bind(snapshotSchema().asStruct(), expr, caseSensitive); + Binder.bind(schema.asStruct(), expr, caseSensitive); expressions.add(expr); pushed.add(filter); } catch (ValidationException e) { @@ -143,6 +127,9 @@ public void pruneColumns(StructType requestedSchema) { .filter(field -> MetadataColumns.nonMetadataColumn(field.name())) .toArray(StructField[]::new)); + // the projection should include all columns that will be returned, including those only used in filters + this.schema = SparkSchemaUtil.prune(schema, requestedProjection, filterExpression(), caseSensitive); + Stream.of(requestedSchema.fields()) .map(StructField::name) .filter(MetadataColumns::isMetadataColumn) @@ -164,7 +151,7 @@ private Schema schemaWithMetadataColumns() { Schema meta = new Schema(fields); // schema or rows returned by readers - return TypeUtil.join(lazySchema(), meta); + return TypeUtil.join(schema, meta); } @Override diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index a79d10278370..818f8af207f4 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -45,7 +45,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkFilters; -import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.util.SnapshotUtil; @@ -185,7 +184,20 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { icebergTable.refresh(); } - return new SparkScanBuilder(sparkSession(), icebergTable, addSnapshotId(options)); + // If the table is loaded using the Spark DataFrame API, and option("snapshot-id", ) + // or option("as-of-timestamp", ) is applied to the DataFrameReader, SparkTable will be + // constructed with a non-null snapshotId. Subsequently SparkTable#newScanBuilder will be called + // with the options, which will include "snapshot-id" or "as-of-timestamp". + // On the other hand, if the table is loaded using SQL, with the table suffixed with a snapshot + // or timestamp selector, then SparkTable will be constructed with a non-null snapshotId, but + // SparkTable#newScanBuilder will be called without the "snapshot-id" or "as-of-timestamp" option. + // We therefore add a "snapshot-id" option here in this latter case. + CaseInsensitiveStringMap scanOptions = + (snapshotId != null && + options.get(SparkReadOptions.SNAPSHOT_ID) == null && + options.get(SparkReadOptions.AS_OF_TIMESTAMP) == null) ? addSnapshotId(options, snapshotId) : options; + + return new SparkScanBuilder(sparkSession(), icebergTable, snapshotSchema(), scanOptions); } @Override @@ -288,30 +300,15 @@ public int hashCode() { return icebergTable.name().hashCode(); } - private CaseInsensitiveStringMap addSnapshotId(CaseInsensitiveStringMap options) { + private static CaseInsensitiveStringMap addSnapshotId(CaseInsensitiveStringMap options, Long snapshotId) { if (snapshotId != null) { - // If the table is loaded using the Spark DataFrame API, and option("snapshot-id", ) - // or option("as-of-timestamp", ) is applied to the DataFrameReader, SparkTable will be - // constructed with a non-null snapshotId. Subsequently SparkTable#newScanBuilder will be called - // with the options, which will include "snapshot-id" or "as-of-timestamp". - // On the other hand, if the table is loaded using SQL, with the table suffixed with a snapshot - // or timestamp selector, then SparkTable will be constructed with a non-null snapshotId, but - // SparkTable#newScanBuilder will be called without the "snapshot-id" or "as-of-timestamp" option. - // We therefore add a "snapshot-id" option here in this latter case. - // As a consistency check, if "snapshot-id" is in the options, the id must match what we already - // have. - SparkReadConf readConf = new SparkReadConf(sparkSession(), icebergTable, options); - Long snapshotIdFromOptions = readConf.snapshotId(); - Long asOfTimestamp = readConf.asOfTimestamp(); - Preconditions.checkArgument( - snapshotIdFromOptions == null || snapshotIdFromOptions.longValue() == snapshotId.longValue(), + String snapshotIdFromOptions = options.get(SparkReadOptions.SNAPSHOT_ID); + Preconditions.checkArgument(snapshotIdFromOptions == null, "Cannot override snapshot ID more than once: %s", snapshotIdFromOptions); Map scanOptions = Maps.newHashMap(); scanOptions.putAll(options.asCaseSensitiveMap()); - if (snapshotIdFromOptions == null && asOfTimestamp == null) { - scanOptions.put(SparkReadOptions.SNAPSHOT_ID, String.valueOf(snapshotId)); - } + scanOptions.put(SparkReadOptions.SNAPSHOT_ID, String.valueOf(snapshotId)); return new CaseInsensitiveStringMap(scanOptions); } From b855dd515dbdd79d094e0ffa69cd23d0bff4fb11 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Wed, 15 Dec 2021 16:15:51 -0800 Subject: [PATCH 3/4] Add negative test cases. --- .../iceberg/spark/source/IcebergSource.java | 7 ++- .../iceberg/spark/sql/TestDeleteFrom.java | 19 ++++++++ .../apache/iceberg/spark/sql/TestSelect.java | 46 ++++++++++++++++++- .../spark/sql/TestUnpartitionedWrites.java | 19 ++++++++ 4 files changed, 86 insertions(+), 5 deletions(-) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java index b5e97c21b668..7aa66b2223fc 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java @@ -137,11 +137,9 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri if (catalogAndIdentifier.catalog().name().equals("spark_catalog") && !(catalogAndIdentifier.catalog() instanceof SparkSessionCatalog)) { // catalog is a session catalog but does not support Iceberg. Use Iceberg instead. - return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME), - ident); + return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME), ident); } else { - return new Spark3Util.CatalogAndIdentifier(catalogAndIdentifier.catalog(), - ident); + return new Spark3Util.CatalogAndIdentifier(catalogAndIdentifier.catalog(), ident); } } @@ -171,6 +169,7 @@ private static Long propertyAsLong(CaseInsensitiveStringMap options, String prop if (value != null) { return Long.parseLong(value); } + return null; } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java index 288c569ec2db..b9b1a7647dd9 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java @@ -73,6 +73,25 @@ public void testDeleteFromUnpartitionedTable() throws NoSuchTableException { sql("SELECT * FROM %s ORDER BY id", tableName)); } + @Test + public void testDeleteFromTableAtSnapshot() throws NoSuchTableException { + sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName); + + List records = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "c") + ); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.coalesce(1).writeTo(tableName).append(); + + long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId(); + String prefix = "snapshot_id_"; + AssertHelpers.assertThrows("Should not be able to delete from a table at a specific snapshot", + IllegalArgumentException.class, "Cannot delete from table at a specific snapshot", + () -> sql("DELETE FROM %s.%s WHERE id < 4", tableName, prefix + snapshotId)); + } + @Test public void testDeleteFromPartitionedTable() throws NoSuchTableException { sql("CREATE TABLE %s (id bigint, data string) " + diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java index 476504e6fdcc..00a386130fcc 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java @@ -21,12 +21,16 @@ import java.util.List; import java.util.Map; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.events.Listeners; import org.apache.iceberg.events.ScanEvent; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.junit.After; import org.junit.Assert; import org.junit.Assume; @@ -138,6 +142,14 @@ public void testSnapshotInTableName() { // read the table at the snapshot List actual = sql("SELECT * FROM %s.%s", tableName, prefix + snapshotId); assertEquals("Snapshot at specific ID, prefix " + prefix, expected, actual); + + // read the table using DataFrameReader option + Dataset df = spark.read() + .format("iceberg") + .option(SparkReadOptions.SNAPSHOT_ID, snapshotId) + .load(tableName); + List fromDF = rowsToJava(df.collectAsList()); + assertEquals("Snapshot at specific ID " + snapshotId, expected, fromDF); } @Test @@ -156,6 +168,38 @@ public void testTimestampInTableName() { String prefix = "at_timestamp_"; // read the table at the snapshot List actual = sql("SELECT * FROM %s.%s", tableName, prefix + timestamp); - assertEquals("Snapshot at time, prefix " + prefix, expected, actual); + assertEquals("Snapshot at timestamp, prefix " + prefix, expected, actual); + + // read the table using DataFrameReader option + Dataset df = spark.read() + .format("iceberg") + .option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp) + .load(tableName); + List fromDF = rowsToJava(df.collectAsList()); + assertEquals("Snapshot at timestamp " + timestamp, expected, fromDF); + } + + @Test + public void testSpecifySnapshotAndTimestamp() { + // get the snapshot ID of the last write + long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId(); + // get a timestamp just after the last write + long timestamp = validationCatalog.loadTable(tableIdent).currentSnapshot().timestampMillis() + 2; + + // create a second snapshot + sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); + + AssertHelpers.assertThrows("Should not be able to specify both snapshot id and timestamp", + IllegalArgumentException.class, + String.format("Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", + snapshotId, timestamp), + () -> { + spark.read() + .format("iceberg") + .option(SparkReadOptions.SNAPSHOT_ID, snapshotId) + .option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp) + .load(tableName) + .collectAsList(); + }); } } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestUnpartitionedWrites.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestUnpartitionedWrites.java index 988df10e0e18..bd6fb5abf2c6 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestUnpartitionedWrites.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestUnpartitionedWrites.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.spark.SparkCatalogTestBase; import org.apache.iceberg.spark.source.SimpleRecord; @@ -84,6 +85,24 @@ public void testInsertOverwrite() { assertEquals("Row data should match expected", expected, sql("SELECT * FROM %s ORDER BY id", tableName)); } + @Test + public void testInsertAppendAtSnapshot() { + long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId(); + String prefix = "snapshot_id_"; + AssertHelpers.assertThrows("Should not be able to insert into a table at a specific snapshot", + IllegalArgumentException.class, "Cannot write to table at a specific snapshot", + () -> sql("INSERT INTO %s.%s VALUES (4, 'd'), (5, 'e')", tableName, prefix + snapshotId)); + } + + @Test + public void testInsertOverwriteAtSnapshot() { + long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId(); + String prefix = "snapshot_id_"; + AssertHelpers.assertThrows("Should not be able to insert into a table at a specific snapshot", + IllegalArgumentException.class, "Cannot write to table at a specific snapshot", + () -> sql("INSERT OVERWRITE %s.%s VALUES (4, 'd'), (5, 'e')", tableName, prefix + snapshotId)); + } + @Test public void testDataFrameV2Append() throws NoSuchTableException { Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*) FROM %s", tableName)); From 03d80eb735f89c8318a7d83ec3baa1b3119642de Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Wed, 15 Dec 2021 19:02:38 -0800 Subject: [PATCH 4/4] Simplify SparkTable#newScanBuilder. Minor tweaks to tests. --- .../iceberg/spark/source/SparkTable.java | 20 +++++-------------- .../source/TestIcebergSourceTablesBase.java | 8 ++++---- .../apache/iceberg/spark/sql/TestSelect.java | 3 ++- 3 files changed, 11 insertions(+), 20 deletions(-) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 818f8af207f4..af629a49ed08 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -184,19 +184,7 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { icebergTable.refresh(); } - // If the table is loaded using the Spark DataFrame API, and option("snapshot-id", ) - // or option("as-of-timestamp", ) is applied to the DataFrameReader, SparkTable will be - // constructed with a non-null snapshotId. Subsequently SparkTable#newScanBuilder will be called - // with the options, which will include "snapshot-id" or "as-of-timestamp". - // On the other hand, if the table is loaded using SQL, with the table suffixed with a snapshot - // or timestamp selector, then SparkTable will be constructed with a non-null snapshotId, but - // SparkTable#newScanBuilder will be called without the "snapshot-id" or "as-of-timestamp" option. - // We therefore add a "snapshot-id" option here in this latter case. - CaseInsensitiveStringMap scanOptions = - (snapshotId != null && - options.get(SparkReadOptions.SNAPSHOT_ID) == null && - options.get(SparkReadOptions.AS_OF_TIMESTAMP) == null) ? addSnapshotId(options, snapshotId) : options; - + CaseInsensitiveStringMap scanOptions = addSnapshotId(options, snapshotId); return new SparkScanBuilder(sparkSession(), icebergTable, snapshotSchema(), scanOptions); } @@ -303,12 +291,14 @@ public int hashCode() { private static CaseInsensitiveStringMap addSnapshotId(CaseInsensitiveStringMap options, Long snapshotId) { if (snapshotId != null) { String snapshotIdFromOptions = options.get(SparkReadOptions.SNAPSHOT_ID); - Preconditions.checkArgument(snapshotIdFromOptions == null, + String value = snapshotId.toString(); + Preconditions.checkArgument(snapshotIdFromOptions == null || snapshotIdFromOptions.equals(value), "Cannot override snapshot ID more than once: %s", snapshotIdFromOptions); Map scanOptions = Maps.newHashMap(); scanOptions.putAll(options.asCaseSensitiveMap()); - scanOptions.put(SparkReadOptions.SNAPSHOT_ID, String.valueOf(snapshotId)); + scanOptions.put(SparkReadOptions.SNAPSHOT_ID, value); + scanOptions.remove(SparkReadOptions.AS_OF_TIMESTAMP); return new CaseInsensitiveStringMap(scanOptions); } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 20e0709b8a9a..1017a38d8aea 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -1180,7 +1180,7 @@ public synchronized void testSnapshotReadAfterAddColumn() { Assert.assertEquals("Records should match", originalRecords, resultDf.orderBy("id").collectAsList()); - Snapshot snapshot1 = table.currentSnapshot(); + Snapshot snapshotBeforeAddColumn = table.currentSnapshot(); table.updateSchema().addColumn("category", Types.StringType.get()).commit(); @@ -1212,7 +1212,7 @@ public synchronized void testSnapshotReadAfterAddColumn() { Dataset resultDf3 = spark.read() .format("iceberg") - .option(SparkReadOptions.SNAPSHOT_ID, snapshot1.snapshotId()) + .option(SparkReadOptions.SNAPSHOT_ID, snapshotBeforeAddColumn.snapshotId()) .load(loadLocation(tableIdentifier)); Assert.assertEquals("Records should match", originalRecords, resultDf3.orderBy("id").collectAsList()); @@ -1318,7 +1318,7 @@ public synchronized void testSnapshotReadAfterAddAndDropColumn() { Assert.assertEquals("Records should match", originalRecords, resultDf.orderBy("id").collectAsList()); - Snapshot snapshot1 = table.currentSnapshot(); + Snapshot snapshotBeforeAddColumn = table.currentSnapshot(); table.updateSchema().addColumn("category", Types.StringType.get()).commit(); @@ -1365,7 +1365,7 @@ public synchronized void testSnapshotReadAfterAddAndDropColumn() { Dataset resultDf4 = spark.read() .format("iceberg") - .option(SparkReadOptions.SNAPSHOT_ID, snapshot1.snapshotId()) + .option(SparkReadOptions.SNAPSHOT_ID, snapshotBeforeAddColumn.snapshotId()) .load(loadLocation(tableIdentifier)); Assert.assertEquals("Records should match", originalRecords, resultDf4.orderBy("id").collectAsList()); diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java index 00a386130fcc..38dfe0e5afda 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java @@ -159,7 +159,8 @@ public void testTimestampInTableName() { "spark_catalog".equals(catalogName)); // get a timestamp just after the last write and get the current row set as expected - long timestamp = validationCatalog.loadTable(tableIdent).currentSnapshot().timestampMillis() + 2; + long snapshotTs = validationCatalog.loadTable(tableIdent).currentSnapshot().timestampMillis(); + long timestamp = waitUntilAfter(snapshotTs + 2); List expected = sql("SELECT * FROM %s", tableName); // create a second snapshot