Skip to content

Commit

Permalink
fix: add schema projection
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo committed Oct 11, 2024
1 parent 9207773 commit bb2d423
Showing 1 changed file with 29 additions and 1 deletion.
30 changes: 29 additions & 1 deletion src/query/storages/iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use databend_common_catalog::plan::PartInfo;
use databend_common_catalog::plan::PartStatistics;
use databend_common_catalog::plan::Partitions;
use databend_common_catalog::plan::PartitionsShuffleKind;
use databend_common_catalog::plan::Projection;
use databend_common_catalog::plan::PushDownInfo;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_args::TableArgs;
Expand Down Expand Up @@ -170,7 +171,34 @@ impl IcebergTable {
let max_threads = ctx.get_settings().get_max_threads()? as usize;
let max_threads = std::cmp::min(parts_len, max_threads);

let output_schema = Arc::new(DataSchema::from(plan.schema()));
let mut output_projection =
PushDownInfo::projection_of_push_downs(&self.schema(), plan.push_downs.as_ref());
let inner_projection = matches!(output_projection, Projection::InnerColumns(_));

if let Some(prewhere) = plan.push_downs.as_ref().and_then(|p| p.prewhere.as_ref()) {
output_projection = prewhere.output_columns.clone();
debug_assert_eq!(
inner_projection,
matches!(output_projection, Projection::InnerColumns(_))
);
}

let output_table_schema = output_projection.project_schema(&self.schema());
let output_schema = Arc::new(DataSchema::from(&output_table_schema));

// Build projection mask and field paths for transforming `RecordBatch` to output block.
// The number of columns in `output_projection` may be less than the number of actual read columns.
//
// TODO: we need to build projection mask here to support nest schema.
//
// let (projection, output_leaves) = output_projection.to_arrow_projection();
// let output_field_paths = Arc::new(compute_output_field_paths(
// &self.schema_desc,
// &projection,
// &output_table_schema,
// inner_projection,
// )?);

pipeline.add_source(
|output| {
IcebergTableSource::create(ctx.clone(), output, output_schema.clone(), self.clone())
Expand Down

0 comments on commit bb2d423

Please sign in to comment.