Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][VARIANT] Support reading basic variant #259

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

richardc-db
Copy link

@richardc-db richardc-db commented Jun 15, 2024

Still very much WIP.

Implements basic variant support.

To read variants, engines can read physical variants from storage as structs and implement (or use the default) "variant_coalesce" function, which is intended to create a Variant column vector from the struct input.

Because kernel-defaults uses Arrow as its in-memory data representation (and there isn't currently an arrow variant type), variants in kernel-defaults are simply an arrow struct with "value" and "metadata" binary child fields. A piece of metadata "isVariant" is inserted onto the variant struct field to differentiate between a struct and variant.

In the post-shredding future, the "variant_coalesce" function (which is currently more or less a no-op) will be used to rebuild shredded variants into their fully encoded representation.

Tested using an external golden table with variants and nested variants.

Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made an initial pass. Overall impression is that we "care too much" about variant in "too many places" -- which complicates the code and makes it error-prone. Not sure the best way to address this shortcoming (differentiating logical vs. physical types might help)?

Two high-level questions I wasn't able to answer during the code review:

  1. Is there a "clean" boundary between the variant logical type and the physical struct-of-binary type+data? Ideally, that boundary should be enforced very early in the query's lifecycle, so that almost all the kernel code doesn't have to care about it, and simply sees a struct that happens to carry a metadata field?
  2. What does VariantCoalesce really do? I think it just ensures that each variant column has the correct number and ordering of physical sub-fields. But it seems like that should happen automatically (and much more cleanly) if we send the correct read schema to the file readers?

.metadata()
.iter()
.map(|(key, val)| Ok((key.clone(), serde_json::to_string(val)?)))
.collect::<Result<_, serde_json::Error>>()
.collect::<Result<HashMap<String, String>, serde_json::Error>>()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you can get rid of the turbofish by:

use itertools::Itertools;
  ...
let mut metadata: HashMap<_, _> = f
    .metadata
    .iter()
    .map(...)
    .try_collect()
    .map_err(...)?;

Comment on lines +125 to +126
ArrowField::new("value", ArrowDataType::Binary, false),
ArrowField::new("metadata", ArrowDataType::Binary, false),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe those need to be nullable, since a null variant field will be physically represented as a null entry in these arrays?

@@ -89,6 +90,10 @@ impl Scalar {
Decimal128Array::new_null(num_rows)
.with_precision_and_scale(*precision, *scale as i8)?,
),
// TODO(r.chen): Fill this out correctly.
PrimitiveType::Variant => {
panic!("UNSUPPORTED - VARIANT DOESNT HAVE AN ARROW TYPE")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could also just do

PrimitiveType::Variant =>unimplemented!(),

... but wouldn't the following suffice to implement the required functionality?

PrimitiveType::Variant => {
    // Arrow has no variant type, so we have to emit a null struct-of-arrays instead.
    let arrow_variant = ArrowDataType::try_from(DataType::VARIANT)?;
    let physical_variant = DataType::try_from(arrow_variant)?;
    Scalar::Null(physical_variant).to_array(num_rows)
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, it's a bit troubling that a round trip from kernel to arrow and back is not idempotent?
Do we need to introduce a concept of physical vs. logical data types in kernel, so that variant (logical type) is simply an array-of-struct (physical type) everywhere?

@@ -228,13 +233,131 @@ fn ensure_data_types(kernel_type: &DataType, arrow_type: &ArrowDataType) -> Delt
}
Ok(())
}
(DataType::Primitive(PrimitiveType::Variant), ArrowDataType::Struct(_)) => Ok(()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to verify the struct's schema? Otherwise weird errors could arise downstream...

Seems like we just need a recursive call to ensure_data_types, where the first arg is the physical variant schema (see comment on the null to_array case above), and the second arg is whatever got passed in?

Comment on lines +250 to +256
let new_arrays = arrays
.into_iter()
.zip(kernel_struct_type.fields().into_iter())
.map(|(child_arr, struct_field)| {
variant_coalesce_impl(child_arr, struct_field.data_type())
})
.collect::<Result<Vec<_>, _>>()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use itertools::Itertools, then this code simplifies to:

                let new_arrays: Vec<_>= arrays
                      ...
                    .try_collect()?;

))
}
}
_ => Ok(Arc::new(arr)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code only succeeds by accident, because Arc implements the AsRef trait...

Suggested change
_ => Ok(Arc::new(arr)),
_ => Ok(arr),

variant_coalesce_results.push(coalesced_batch);
}
}
None => (),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look right... if variant is disabled then the entire query result is thrown away?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it would be a lot cleaner to have a two pass approach:

  • At query compilation time, verify that variant mode is actually enabled if any variant column is found in the schema
  • At runtime, unconditionally apply VariantCoalesce, since it's anyway a no-op for non-variant columns

Then we don't need all this option and if/else control flow here?

@@ -418,7 +419,8 @@ impl Display for PrimitiveType {
PrimitiveType::TimestampNtz => write!(f, "timestamp_ntz"),
PrimitiveType::Decimal(precision, scale) => {
write!(f, "decimal({},{})", precision, scale)
}
},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think commas are required after case arms that use curly braces?

Ok::<usize, Error>(idx)
} else {
Err(Error::invalid_variant_representation(
"\"value\" field is not of binary type.",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we usually surround column names with ' rather than "?
(better match with SQL, but also avoids needing escape characters here)

(several more below)

if let Some(struct_array) = arr.as_any().downcast_ref::<StructArray>() {
let (fields, arrays, nulls) = struct_array.clone().into_parts();

let value_idx = if let Some((idx, field)) = fields.find("value") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok to require the physical schema to be in a particular order, rather than checking at runtime like this. In theory, kernel is -- or could be -- in charge of the schema that goes to parquet reader, so it should always match?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also: If we do fix the field ordering, can the VariantCoalesce operation (and variant_coalesce_impl function) just disappear? Or are they accomplishing something else besides field reordering that I didn't notice?

@zachschuermann zachschuermann self-requested a review July 18, 2024 01:50
@richardc-db
Copy link
Author

richardc-db commented Aug 12, 2024

hey @scovich, thanks for taking a look! Sorry for the very late reply, this PR is very much a WIP and was more of a hack for me to familiarize myself with the code base (and rust). But to answer some of your questions:

Is there a "clean" boundary between the variant logical type and the physical struct-of-binary type+data? Ideally, that boundary should be enforced very early in the query's lifecycle, so that almost all the kernel code doesn't have to care about it, and simply sees a struct that happens to carry a metadata field?

This makes sense. One thing I've discussed with Nick is a new API where the kernel can receive the parquet schema through the parquet footer. This way, the kernel will see just a struct (with a metadata field). The details of this are still being discussed, though.

What does VariantCoalesce really do? I think it just ensures that each variant column has the correct number and ordering of physical sub-fields. But it seems like that should happen automatically (and much more cleanly) if we send the correct read schema to the file readers?

yep, currently it doesn't really have a purpose. I have it in this PR just so I can understand the expression framework, but in the future, we'd like to have an expression which will "reconstruct" shredded variants. We intend that this "variant_coalesce" expression will do this in the future - i.e. the engine's scan will return an arbitrary struct, and the "variant_coalesce" expression will process it to return the struct of binaries.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

2 participants