Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reading as of Snapshot ID fails on Metadata Tables after Iceberg Table Schema Update #6978

Closed
sungwy opened this issue Mar 1, 2023 · 9 comments · Fixed by #6980, #6993 or #6994
Closed

Comments

@sungwy
Copy link
Contributor

sungwy commented Mar 1, 2023

Apache Iceberg version

1.1.0 (latest release)

Query engine

Spark

Please describe the bug 🐞

Time travel / reading as of certain snapshot ID fails on Metadata Tables if there was ever a schema evolution introduced in the iceberg table. This seems like it could be an unwanted side effect of this PR that allows us to use the snapshot schema when reading a snapshot: #3722

Since schema evolution is not supported on metadata tables, we could patch this bug by using a condition that checks if the iceberg table is an instance of BaseMetadataTable before making the snapshotSchema call

Example query:

spark.read.format("iceberg").option("snapshot-id", 10963874102873L).load("db.table.files")

Example Error after Schema evolution:

Py4JJavaError: An error occurred while calling o373.load.
: java.lang.IllegalStateException: Cannot find schema with schema id 1
	at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkState(Preconditions.java:590)
	at org.apache.iceberg.util.SnapshotUtil.schemaFor(SnapshotUtil.java:363)
	at org.apache.iceberg.util.SnapshotUtil.schemaFor(SnapshotUtil.java:388)
	at org.apache.iceberg.spark.source.SparkTable.snapshotSchema(SparkTable.java:127)
	at org.apache.iceberg.spark.source.SparkTable.schema(SparkTable.java:133)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:176)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:303)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:265)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:239)
	at jdk.internal.reflect.GeneratedMethodAccessor210.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
@szehon-ho
Copy link
Collaborator

szehon-ho commented Mar 1, 2023

Interesting, this sounds like a serious bug.

But I tried to reproduce this in latest master, but following test passed for me (add to TestMetadataTables):

  @Test
  public void testFilesVersionAsOf() throws Exception {
    // Create table and insert data
    sql(
        "CREATE TABLE %s (id bigint, data string) "
            + "USING iceberg "
            + "PARTITIONED BY (data) "
            + "TBLPROPERTIES"
            + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
        tableName);

    List<SimpleRecord> recordsA =
        Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
    spark
        .createDataset(recordsA, Encoders.bean(SimpleRecord.class))
        .coalesce(1)
        .writeTo(tableName)
        .append();

    Table table = Spark3Util.loadIcebergTable(spark, tableName);
    Long currentSnapshotId = table.currentSnapshot().snapshotId();

    List<SimpleRecord> recordsB =
        Lists.newArrayList(new SimpleRecord(1, "b"), new SimpleRecord(2, "b"));
    spark
        .createDataset(recordsB, Encoders.bean(SimpleRecord.class))
        .coalesce(1)
        .writeTo(tableName)
        .append();


    List<Object[]> res1 = sql("SELECT * from %s.files VERSION AS OF %s", tableName, currentSnapshotId);

    Dataset<Row> ds = spark.read().format("iceberg").option("snapshot-id", currentSnapshotId).load(tableName + ".files");
    List<Row> res2 = ds.collectAsList();

  }

Not sure what I'm missing here

@sungwy
Copy link
Contributor Author

sungwy commented Mar 1, 2023

@szehon-ho This issue only happens if the schema of the table is updated (not the metadata table, but its corresponding iceberg table).
In the following example, I'm adding a new column to the existing table to invoke a schema update - i.e. there are now a total of 2 schemas (for the iceberg table) in the snapshot metadata.json file:

(SimpleExtraColumnRecord is a SimpleRecord with just one extra string column)

  @Test
  public void testFilesVersionAsOf() throws Exception {
    // Create table and insert data
    sql(
            "CREATE TABLE %s (id bigint, data string) "
                    + "USING iceberg "
                    + "PARTITIONED BY (data) "
                    + "TBLPROPERTIES"
                    + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
            tableName);

    List<SimpleRecord> recordsA =
            Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
    spark
            .createDataset(recordsA, Encoders.bean(SimpleRecord.class))
            .coalesce(1)
            .writeTo(tableName)
            .append();

    Table table = Spark3Util.loadIcebergTable(spark, tableName);
    Long olderSnapshotId = table.currentSnapshot().snapshotId();

    sql(
            "ALTER TABLE %s ADD COLUMNS (data2 string)",
            tableName);

    List<SimpleExtraColumnRecord> recordsB =
            Lists.newArrayList(new SimpleExtraColumnRecord(1, "b", "c"), new SimpleExtraColumnRecord (2, "b", "c"));
    spark
            .createDataset(recordsB, Encoders.bean(SimpleExtraColumnRecord.class))
            .coalesce(1)
            .writeTo(tableName)
            .append();


    List<Object[]> res1 = sql("SELECT * from %s.files VERSION AS OF %s", tableName, olderSnapshotId );

    Dataset<Row> ds = spark.read().format("iceberg").option("snapshot-id", olderSnapshotId ).load(tableName + ".files");
    List<Row> res2 = ds.collectAsList();

    Long currentSnapshotId = table.currentSnapshot().snapshotId();

    List<Object[]> res3 = sql("SELECT * from %s.files VERSION AS OF %s", tableName, currentSnapshotId);

    Dataset<Row> ds2 = spark.read().format("iceberg").option("snapshot-id", currentSnapshotId ).load(tableName + ".files");
    List<Row> res4 = ds2.collectAsList();
  }

This test fails on this call:
List<Object[]> res3 = sql("SELECT * from %s.files VERSION AS OF %s", tableName, currentSnapshotId);

@sungwy
Copy link
Contributor Author

sungwy commented Mar 1, 2023

Adding these conditions to these two locations in SparkTable.java fixes the issue:

https://github.com/apache/iceberg/blob/master/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java#L201

  @Override
  public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
    if (options.containsKey(SparkReadOptions.FILE_SCAN_TASK_SET_ID)) {
      // skip planning the job and fetch already staged file scan tasks
      return new SparkFilesScanBuilder(sparkSession(), icebergTable, options);
    }

    if (refreshEagerly) {
      icebergTable.refresh();
    }

    CaseInsensitiveStringMap scanOptions = addSnapshotId(options, snapshotId);
    if (icebergTable instanceof BaseMetadataTable) {
      return new SparkScanBuilder(sparkSession(), icebergTable, options);
    } else {
      return new SparkScanBuilder(sparkSession(), icebergTable, snapshotSchema(), scanOptions);
    }
  }

https://github.com/apache/iceberg/blob/master/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java#L133

  @Override
  public StructType schema() {
    if (lazyTableSchema == null) {
      if (icebergTable instanceof BaseMetadataTable) {
        this.lazyTableSchema = SparkSchemaUtil.convert(icebergTable.schema());
      } else {
        this.lazyTableSchema = SparkSchemaUtil.convert(snapshotSchema());
      }
    }

    return lazyTableSchema;
  }

@sungwy sungwy changed the title Reading as of Snapshot ID fails on Metadata Tables after Schema Evolution Reading as of Snapshot ID fails on Metadata Tables after Schema Update Mar 1, 2023
@sungwy sungwy changed the title Reading as of Snapshot ID fails on Metadata Tables after Schema Update Reading as of Snapshot ID fails on Metadata Tables after Iceberg Table Schema Update Mar 1, 2023
@szehon-ho
Copy link
Collaborator

szehon-ho commented Mar 1, 2023

Got it, missed that part, need time to look at that. Feel free to open a pr, and add the test case repro.

@sungwy
Copy link
Contributor Author

sungwy commented Mar 1, 2023

Thank you @szehon-ho !
I opened one up for Spark-3.3. The issue is in Spark 3.2 and 3.1 as well and has been around since Iceberg 0.13.0 release...

Happy to open up separate PRs for Spark-3.2 and Spark-3.1 (not sure if putting them all in the same PR or separately is better for cherry-picking for a release)

@nastra
Copy link
Contributor

nastra commented Mar 2, 2023

I had a look at this and I think I understand what's going on here.

In sql("SELECT * from %s.files VERSION AS OF %s", tableName, olderSnapshotId ) we're trying to do time-travel on the FilesTable but giving it the snapshot ID of the actual table, since olderSnapshotId is the snapshot from tableName.

The error therefore makes sense to me, since the schema of the FilesTable didn't evolve, but we're expecting it to return the schema for olderSnapshotId.

@sungwy
Copy link
Contributor Author

sungwy commented Mar 2, 2023

Hi @nastra really appreciate you taking the time to take a look at this PR. I was a bit confused at first, and it took some time to identify the issue and relate it to schema evolution, so here's an attempt at clearing up any confusion one might have when taking a look at this issue...

Firstly, my understanding is that the concept of snapshot_id is consistent between the actual iceberg table, and its corresponding state of its metadata tables that represent it at a certain point in time. Time travel on metadata tables based on the snapshot_id is actually an advertised feature of Iceberg within its docs.

Your observation that the schema of the filesTable didn't evolve and that that's the cause of the issue is absolutely correct. But I want to make a distinction between the schema_id of the actual iceberg table, the metadata table and their consistent snapshot_ids. Here's a table describing the timeline of events that is described by my test:

event create table add row (id, data) add column, add row (id, data, data2)
snapshot id (random) 6234 9023 8234
actual table schema id 0 0 1
files table schema id 0 0 0

The schema ID of the actual table does not change until there is schema evolution. And this is consistent with @szehon-ho 's observation that there is no issue when you try to run time travel queries on the metadata table when there is no schema evolution, even when you add an extra row and increment the snapshot ID.

However, when the schema evolves in the actual table, because #1508 makes a strong assumption that we are only looking at the schema ID of the actual table, we are using that schema ID to read the files metadata table, instead of using the schema ID of files table at that snapshot. Interesting thing is, that we are still able to query the files table for the first two snapshots in this series of events (6234 and 9023) even after the table has evolved, because the schema IDs within that snapshot are consistent across it's actual table and its metadata tables.
i.e. this issue happens on snapshots where the schema id of the actual table diverges from the schema id of the files table

@sungwy
Copy link
Contributor Author

sungwy commented Mar 2, 2023

I have a couple of thoughts around what the "correct" fix would be for this issue... as @szehon-ho mentioned on PR #6980 it would absolutely be nice for us to be able to support looking up metadata tables using their corresponding snapshot's schema as well.

For additional context, the ability to read data using the snapshot's schema was introduced in 0.13.0 Release, which allowed users to view the actual table using the schema of the table in that point in time. This wasn't a feature that was supported for actual tables or metadata tables before. Also, the metadata table's schema can only be updated if a user operates on a specific iceberg table across multiple versions of iceberg jar. This can happen, but only happens with a clear intention to increment the infrastructure stack in a data pipeline, instead of a running pipeline consistently and passively invoking schema evolution on the table.

Since this is a feature regression, I think it would be very important to put in a fix to at least make the metadata table readable upon schema evolution of the actual table by conditionally using the icebergTable.schema() if the table is an instanceof BaseMetadataTable. This will at least fix this bug and revert metadata table time travel to a useable state. And then, we can continue to look into the proper way to introduce the concept of snapshot-schema based time travel for metadata tables.

@szehon-ho
Copy link
Collaborator

szehon-ho commented Mar 2, 2023

yea i agree with @syun64 , time travel for metadata tables is quite useful. For example, we can today query files table as of some timestamp, and it will return files of that timestamp. Same for other metadata.

The problem comes from #1508 which for all time-travel, attempt to apply the schema at that time. While its a good idea, its just not supported yet for metadata tables, as they have dont have concept of historical schemas, though I feel they could.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment