From bb2d423f616c337cd8d1f1f8ee54f04023d36fb4 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 11 Oct 2024 15:19:14 +0800 Subject: [PATCH] fix: add schema projection Signed-off-by: Xuanwo --- src/query/storages/iceberg/src/table.rs | 30 ++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/src/query/storages/iceberg/src/table.rs b/src/query/storages/iceberg/src/table.rs index 4eac8568ef480..a98c67fb67c46 100644 --- a/src/query/storages/iceberg/src/table.rs +++ b/src/query/storages/iceberg/src/table.rs @@ -25,6 +25,7 @@ use databend_common_catalog::plan::PartInfo; use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::PartitionsShuffleKind; +use databend_common_catalog::plan::Projection; use databend_common_catalog::plan::PushDownInfo; use databend_common_catalog::table::Table; use databend_common_catalog::table_args::TableArgs; @@ -170,7 +171,34 @@ impl IcebergTable { let max_threads = ctx.get_settings().get_max_threads()? as usize; let max_threads = std::cmp::min(parts_len, max_threads); - let output_schema = Arc::new(DataSchema::from(plan.schema())); + let mut output_projection = + PushDownInfo::projection_of_push_downs(&self.schema(), plan.push_downs.as_ref()); + let inner_projection = matches!(output_projection, Projection::InnerColumns(_)); + + if let Some(prewhere) = plan.push_downs.as_ref().and_then(|p| p.prewhere.as_ref()) { + output_projection = prewhere.output_columns.clone(); + debug_assert_eq!( + inner_projection, + matches!(output_projection, Projection::InnerColumns(_)) + ); + } + + let output_table_schema = output_projection.project_schema(&self.schema()); + let output_schema = Arc::new(DataSchema::from(&output_table_schema)); + + // Build projection mask and field paths for transforming `RecordBatch` to output block. + // The number of columns in `output_projection` may be less than the number of actual read columns. + // + // TODO: we need to build projection mask here to support nest schema. + // + // let (projection, output_leaves) = output_projection.to_arrow_projection(); + // let output_field_paths = Arc::new(compute_output_field_paths( + // &self.schema_desc, + // &projection, + // &output_table_schema, + // inner_projection, + // )?); + pipeline.add_source( |output| { IcebergTableSource::create(ctx.clone(), output, output_schema.clone(), self.clone())