Skip to content

Commit

Permalink
refactor: return full table schema when creating physical schema
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Apr 14, 2023
1 parent b14d4ef commit 3c62e8d
Showing 1 changed file with 12 additions and 25 deletions.
37 changes: 12 additions & 25 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,25 +381,6 @@ impl TableProvider for DeltaTable {
.physical_arrow_schema(self.object_store())
.await?;

let file_schema = self
.state
.physical_arrow_schema(self.object_store())
.await?;

let table_schema = Arc::new(ArrowSchema::new(
schema
.fields
.clone()
.into_iter()
.map(|field| {
file_schema
.field_with_name(field.name())
.cloned()
.unwrap_or(field)
})
.collect(),
));

register_store(self, session.runtime_env().clone());

// TODO we group files together by their partition values. If the table is partitioned
Expand All @@ -409,15 +390,15 @@ impl TableProvider for DeltaTable {
if let Some(Some(predicate)) =
(!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
{
let pruning_predicate = PruningPredicate::try_new(predicate, table_schema.clone())?;
let pruning_predicate = PruningPredicate::try_new(predicate, schema.clone())?;
let files_to_prune = pruning_predicate.prune(&self.state)?;
self.get_state()
.files()
.iter()
.zip(files_to_prune.into_iter())
.for_each(|(action, keep_file)| {
if keep_file {
let part = partitioned_file_from_action(action, &table_schema);
let part = partitioned_file_from_action(action, &schema);
file_groups
.entry(part.partition_values.clone())
.or_default()
Expand All @@ -426,7 +407,7 @@ impl TableProvider for DeltaTable {
});
} else {
self.get_state().files().iter().for_each(|action| {
let part = partitioned_file_from_action(action, &table_schema);
let part = partitioned_file_from_action(action, &schema);
file_groups
.entry(part.partition_values.clone())
.or_default()
Expand All @@ -435,6 +416,14 @@ impl TableProvider for DeltaTable {
};

let table_partition_cols = self.get_metadata()?.partition_columns.clone();
let file_schema = Arc::new(ArrowSchema::new(
schema
.fields()
.iter()
.filter(|f| !table_partition_cols.contains(f.name()))
.cloned()
.collect(),
));

let parquet_scan = ParquetFormat::new()
.create_physical_plan(
Expand All @@ -451,9 +440,7 @@ impl TableProvider for DeltaTable {
.map(|c| {
Ok((
c.to_owned(),
partition_type_wrap(
table_schema.field_with_name(c)?.data_type().clone(),
),
partition_type_wrap(schema.field_with_name(c)?.data_type().clone()),
))
})
.collect::<Result<Vec<_>, ArrowError>>()?,
Expand Down

0 comments on commit 3c62e8d

Please sign in to comment.