Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataFusion integration incorrectly handles partition columns defined "first" in schema #1168

Closed
alexwilcoxson-rel opened this issue Feb 20, 2023 · 2 comments · Fixed by #1303
Labels
bug Something isn't working

Comments

@alexwilcoxson-rel
Copy link
Contributor

Environment

Delta-rs version: 0.7.0

Binding: Rust

Environment:

  • Cloud provider: n/a
  • OS: macOS
  • Other: M1

Bug

What happened:
With Datafusion, when filtering a table on a non-partition column in a partitioned table leads to unexpected results or errors, when partition columns are defined "first" in schema.

What you expected to happen:
Expected filter to work on non-partition column correctly.

How to reproduce it:
Wrote this test. This fails because it is treating value (not partitioned as the part column Dict(UInt16, Int32) instead of Int32.

#[tokio::test]
async fn temp_test() -> Result<()> {
    let table_loc = "./tests/data/temp-test";
    let table = CreateBuilder::new()
        .with_location(table_loc)
        .with_columns(vec![
            SchemaField::new("part".to_owned(), deltalake::SchemaDataType::primitive("integer".to_owned()), false, HashMap::new()),
            SchemaField::new("value".to_owned(), deltalake::SchemaDataType::primitive("integer".to_owned()), false, HashMap::new()),
        ])
        .with_partition_columns(vec!["part"])
        .await?;

    let arrow_schema = table.schema().unwrap().try_into()?;
    let value_col = Int32Array::from(vec![10, 20, 30]);
    let part_col = Int32Array::from(vec![1, 2, 3]);
    let record_batch = RecordBatch::try_new(
        Arc::new(arrow_schema),
        vec![Arc::new(value_col), Arc::new(part_col)]
    )?;

    let table = WriteBuilder::new()
        .with_location(table_loc)
        .with_input_batches(vec![record_batch])
        .await?;

    let ctx = SessionContext::new();

    println!("{:?}", table.schema());
    println!("{:?}", table.get_partition_values().collect::<Vec<_>>());

    ctx.register_table("demo", Arc::new(table))?;

    let actual = ctx
        .sql("SELECT part, value FROM demo WHERE value = 1")
        .await?
        .collect()
        .await?;

    let expected = vec![
        "+------+-------+",
        "| part | value |",
        "+------+-------+",
        "| 10   | 1     |",
        "+------+-------+",
    ];

    assert_batches_sorted_eq!(&expected, &actual);

    Ok(())
}

More details:
This seems to be happening because in the DataFusion scan, the FileScanConfig's FileSchema is built from non-partition cols and partition cols are provided as table_partition_cols. This is expected but the projection pass to FileScanConfig is not updated to account for this.

See the doc for projection on FileScanConfig:

    /// Columns on which to project the data. Indexes that are higher than the
    /// number of columns of `file_schema` refer to `table_partition_cols`.

As a quick fix I added this to the scan:

        let field_lookup = file_schema.fields().iter().map(|f| f.name()).chain(table_partition_cols.iter()).collect::<Vec<_>>();
        let updated_projection = if let Some(proj) = projection {
            Some(proj.iter().map(|p| field_lookup.iter().position(|f| *f == schema.field(*p).name()).unwrap()).collect::<Vec<usize>>())
        } else {
            None
        };

Basically update the projection to use the indexes into file_schema + table_partition_cols.
This assumes the projection indexes are from schema.field() which it looks like it is up the stack in datafusion.

Making this change does fix the example tests but breaks test_datafusion_partitioned_types because the column output ordering is different.

I can dig in further and submit a PR (if something like this is the correct fix) once I go through my company's open source contribution policy.

@alexwilcoxson-rel alexwilcoxson-rel added the bug Something isn't working label Feb 20, 2023
@roeap
Copy link
Collaborator

roeap commented Feb 22, 2023

Thanks for the report @alexwilcoxson-rel!

Generally speaking we are very happy about every contribution! On first glance it seems that you already narrowed down the source of the error, and more generally I "fear" we have to do some internal schema shenanigans anyhow, to account for potential differences in data types. i.e. the delta spec does not cover the entire range of types in parquet / arrow.

So while I generally hope we can leave as much of the schema handling to datafusion, we may have to manually interfere...

LEt me know If you want to / can take this on, otherwise I'll have a closer look soon.

@alexwilcoxson-rel
Copy link
Contributor Author

Alternative and perhaps safer then changing the projection is to change TableProvider.schema impl for DeltaTable to provide the columns in the order expected by DataFusion, i.e. table partition cols at the end.

This also doesn't break test_datafusion_partitioned_types which also asserts already the schema on the collected record batches is ordered with partition cols last.

    fn schema(&self) -> Arc<ArrowSchema> {
        let orig_schema = <ArrowSchema as TryFrom<&schema::Schema>>::try_from(
            DeltaTable::schema(self).unwrap(),
        ).unwrap();
        // DataFusion projection expects partition cols at end of schema
        let table_partition_cols = self.get_metadata().unwrap().partition_columns.clone();
        let file_schema = orig_schema.fields().iter().filter(|f| !table_partition_cols.contains(f.name()));
        let partition_schema = orig_schema.fields().iter().filter(|f| table_partition_cols.contains(f.name()));
        let new_schema = ArrowSchema::new(file_schema.chain(partition_schema).cloned().collect::<Vec<_>>());
        Arc::new(new_schema)
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants