Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apply a schema to fix column names #331

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

nicklan
Copy link
Collaborator

@nicklan nicklan commented Sep 10, 2024

Enforce/apply the schema given when we evaluate an expression.

Given we want to go to expression based fixup and allow the final schema to dictate the output, we will need to do this.

This code will fix-up at all levels of the output, which is messy in arrow since schemas are embedded all over the place. The schema is only applied if the output of the expression doesn't exactly match the passed schema.

Copy link

codecov bot commented Sep 10, 2024

Codecov Report

Attention: Patch coverage is 16.24365% with 165 lines in your changes missing coverage. Please review.

Project coverage is 75.54%. Comparing base (1e19980) to head (29b09f7).

Files with missing lines Patch % Lines
kernel/src/engine/arrow_expression.rs 3.59% 133 Missing and 1 partial ⚠️
kernel/src/scan/mod.rs 44.44% 26 Missing and 4 partials ⚠️
kernel/src/engine/arrow_utils.rs 66.66% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #331      +/-   ##
==========================================
- Coverage   76.85%   75.54%   -1.32%     
==========================================
  Files          47       47              
  Lines        9436     9620     +184     
  Branches     9436     9620     +184     
==========================================
+ Hits         7252     7267      +15     
- Misses       1789     1956     +167     
- Partials      395      397       +2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial pass. Interested to understand better how this fits into the broader column mapping situation etc.

kernel/src/engine/arrow_expression.rs Outdated Show resolved Hide resolved
kernel/src/engine/arrow_expression.rs Outdated Show resolved Hide resolved
// build up a new set of (col, field) pairs
let result_iter = arrow_fields.into_iter().zip(schema.fields()).zip(cols).map(
|((arrow_field, kernel_field), col)| -> DeltaResult<(Arc<dyn Array>, ArrowField)> {
match (&kernel_field.data_type, arrow_field.data_type()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need to check col.data_type()? It should exactly match arrow_field.data_type(), right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: I guess the as_struct_opt call, combined with recursion on the child fields, would validate that as a side effect?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new code pulls the data types from the columns as it recurses, so I think this is covered now.

},
);
let (new_cols, new_fields): (Vec<Arc<dyn Array>>, Vec<ArrowField>) =
result_iter.process_results(|iter| iter.unzip())?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to grok this process_results thing...

  • It takes an Iterator<DeltaResult<(Arc<dyn Array>, Field)>> as input?
  • And then, it specially maps that iterator so that:
    • Any Err result from the input iterator immediately triggers an error result of the overall operation?
    • The mapping function receives an iterator of all the Ok values, which can be manipulated however we want
  • Similar to collect, type inference can produce an output of any type (not just Iterator)
  • The output is thus DeltaResult<(Vec<_>, Vec<_>)>?

So basically, this is a collect that allows transformation of the iterator before materializing it, but with the same error-stop semantics?

(phew, that's complicated)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is functionally equivalent to collect .. iter .. unzip .. collect?
But more efficient because it doesn't have to materialize the intermediate result?

Put another way: It's like Result.and_then, but for an iterator with fail-stop semantics?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I basically think of it as, if i have a function that takes Iter<T>, and I want to apply it over a iterator of Result<T, E>, then I can call process_results with the function.

process_results takes care of unwrapping the Ok values, calling the process function on all the inner values of the Oks, and then wraps the final result in an Ok. Otherwise it just stops at the first Err.

I think this is functionally equivalent to collect .. iter .. unzip .. collect?

Almost, without the final collect, since unzip is basically two collects in a trench-coat.

@@ -171,7 +171,7 @@ impl Error {
Self::FileNotFound(path.to_string())
}
pub fn missing_column(name: impl ToString) -> Self {
Self::MissingColumn(name.to_string())
Self::MissingColumn(name.to_string()).with_backtrace()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

temporary debugging aid? or intentional?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intentional, since this happens in multiple places it's useful to have a backtrace.

@nicklan nicklan marked this pull request as ready for review October 8, 2024 23:33
@nicklan nicklan requested a review from scovich October 8, 2024 23:35
@@ -53,8 +53,8 @@ pub(crate) use prim_array_cmp;
/// returns a tuples of (mask_indicies: Vec<parquet_schema_index>, reorder_indicies:
/// Vec<requested_index>). `mask_indicies` is used for generating the mask for reading from the

fn make_arrow_error(s: String) -> Error {
Error::Arrow(arrow_schema::ArrowError::InvalidArgumentError(s))
pub(crate) fn make_arrow_error(s: String) -> Error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub(crate) fn make_arrow_error(s: String) -> Error {
pub(crate) fn make_arrow_error(s: impl Into<String>) -> Error {

^^simplifies call sites that pass &str without duplicating an arg that was already a String (because Into::into consumes its argument).

One could also use s: impl ToString -- which accepts more types as input -- but String.to_string must always make a copy because it takes &self instead of self.

Comment on lines +381 to +382
// make column `col` with type `arrow_type` look like `kernel_type`. For now this only handles name
// transforms. if the actual data types don't match, this will return an error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about nullability? IIRC we have to read parquet with everything nullable, because parquet can't express the concept of a non-nullable field nesting inside a nullable field.

Or did we handle that already by just making everything nullable in our action schema?

) -> DeltaResult<Option<Arc<dyn Array>>> {
match (kernel_type, arrow_type) {
(DataType::Struct(kernel_fields), ArrowDataType::Struct(arrow_fields)) => {
if kernel_fields.fields.len() != arrow_fields.len() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many query engines (including both spark and arrow) allow schema widening, where fields present only in the target schema are allowed, and inferred to be null. Fields present only in source, or fields present in both but with incompatible types, are still forbidden.

So the question: Do we anticipate such capability being useful or even necessary, given expected kernel use cases? If so, we may want to consider supporting it (at least, enough to ensure that our implementation can be extended to support it later). If not needed, we should avoid adding yet more complexity.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A similar question: Do we anticipate it being useful to support dropping fields that are present only in the source?

Asking because nested column pruning is actually quite difficult to express, without some capability like this.

But on the other hand, it seems like we should prefer to do all column pruning -- nested or otherwise -- up front (before reading) so we don't have to materialize unwanted data.

Comment on lines +397 to +399
let sa = col.as_struct_opt().ok_or(make_arrow_error(
"Arrow claimed to be a struct but isn't a StructArray".to_string(),
))?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let sa = col.as_struct_opt().ok_or(make_arrow_error(
"Arrow claimed to be a struct but isn't a StructArray".to_string(),
))?;
let sa = col.as_struct_opt().ok_or_else(|| make_arrow_error(
"Arrow claimed to be a struct but isn't a StructArray",
))?;

(we don't want to create the error unless it's needed)

(many others to fix as well)

(we should fix make_arrow_error to take impl Into<String> instead of String)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, I recently learned a neat rust trick:

Suggested change
let sa = col.as_struct_opt().ok_or(make_arrow_error(
"Arrow claimed to be a struct but isn't a StructArray".to_string(),
))?;
let Some(sa) = col.as_struct_opt() else {
return Err(make_arrow_error("Arrow claimed to be a struct but isn't a StructArray"));
};

It works just like if let, but the else clause must diverge (panic, return, break, etc).

It may not be a net win in this specific case tho.

let sa = col.as_struct_opt().ok_or(make_arrow_error(
"Arrow claimed to be a struct but isn't a StructArray".to_string(),
))?;
let (fields, sa_cols, sa_nulls) = sa.clone().into_parts();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems unfortunate that we can't consume sa... but I guess it's all Arc all the way down, so hopefully the cloning is at least cheap?

Comment on lines +412 to +414
fn make_data_type_physical(
logical_dt: &DataType,
column_mapping_mode: ColumnMappingMode,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: Normally, the field's field id and physical name are stored in the field's metadata... but Iceberg requires field ids even for the internal columns used by Map and Array, and there's no way to associate metadata with those. So, when IcebergCompatV2 table feature is enabled, we have to remember the most recently-seen field, as well as the column path we descended through since then, so we can fetch the field ids out of that parent field's metadata.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I don't think Delta cares about those iceberg field ids -- even in column mapping field mode -- so maybe we can ignore all of this on the read path.

// build up the mapped child fields
let children = struct_type
.fields()
.map(|field| make_field_physical(field, column_mapping_mode))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhere near here, we need to actually fetch the field's physical name from the field metadata, no?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah make_field_physical != make_data_type_physical. And it does the right thing.

.fields()
.map(|field| make_field_physical(field, column_mapping_mode))
.try_collect()?;
Ok(DataType::Struct(Box::new(StructType::new(children))))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Ok(DataType::Struct(Box::new(StructType::new(children))))
Ok(DataType::struct_type(children))

Comment on lines +399 to +404
Ok(StructField {
name: physical_name.to_string(),
data_type: mapped_data_type,
nullable: logical_field.nullable,
metadata: logical_field.metadata.clone(),
})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this better or worse?

Suggested change
Ok(StructField {
name: physical_name.to_string(),
data_type: mapped_data_type,
nullable: logical_field.nullable,
metadata: logical_field.metadata.clone(),
})
let mut field = StructField::new(physial_name, data_type, logical_field.nullable);
field.metadata = logical_field.metadata.clone();
Ok(field)

Comment on lines +389 to +396
fn make_field_physical(
logical_field: &StructField,
column_mapping_mode: ColumnMappingMode,
) -> DeltaResult<StructField> {
match column_mapping_mode {
ColumnMappingMode::None => Ok(logical_field.clone()),
ColumnMappingMode::Name => {
let physical_name = logical_field.physical_name(column_mapping_mode)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Column mapping is pretty fundamental... I wonder if we should define these as methods on the various types?

impl StructField {
    pub fn to_physical(self, mode: ColumnMappingMode) -> DeltaResult<Self> {
        Ok(Self {
            name: self.physical_name(mode)?.into(),
            data_type: self.data_type.to_physical(mode)?,
            ..self,
        })
    }
}

impl DataType {
    pub fn to_physical(self, mode: ColumnMappingMode) -> DeltaResult<Self> {
        let result = match self {
            Struct(s) => DataType::try_struct_type(
                s.fields.into_values().map(|field| field.to_physical(mode)),
            )?,
            Array(a) => DataType::array_type(
                a.element_type.to_physical(mode)?,
                a.contains_null,
            ),
            Map(m) => DataType::map_type(
                m.key_type.to_physical(mode)?,
                m.value_type.to_physical(mode)?,
                m.value_contains_null,
            ),
            Primitive(_) => self,
        };
        Ok(result)
    }
}

Note that the above consumes self so we would need to clone the input first. Seems like a welcome simplification that doesn't change big-O bound and so satisfies the "don't optimize, don't pessimize" mantra?

Meanwhile: The existing StructField::with_field takes &self instead of self like with_XXX methods normally do. It only has one call site so far (ignoring this PR), so we should probably fix that ASAP.

Also, the existing StructType::new takes Vec<StructField> when it would be much nicer to take impl IntoIterator<Item = StructField>. I'll probably throw up a PR to fix that one soon.

Also, I noticed that the test_field_metadata unit test (schema.rs) for physical names is going directly to the metadata instead of exercising the physical_name method we actually use in practice??

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR to simplify StructType is up: #385

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

2 participants