diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 19759a8970686..87c8100ce0af7 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -913,6 +913,7 @@ message FilterExecNode { PhysicalPlanNode input = 1; PhysicalExprNode expr = 2; uint32 default_filter_selectivity = 3; + repeated uint32 projection = 9; } message FileGroup { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index cff58d3ddc4a4..34bf67ca3e3ba 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -5567,6 +5567,9 @@ impl serde::Serialize for FilterExecNode { if self.default_filter_selectivity != 0 { len += 1; } + if !self.projection.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.FilterExecNode", len)?; if let Some(v) = self.input.as_ref() { struct_ser.serialize_field("input", v)?; @@ -5577,6 +5580,9 @@ impl serde::Serialize for FilterExecNode { if self.default_filter_selectivity != 0 { struct_ser.serialize_field("defaultFilterSelectivity", &self.default_filter_selectivity)?; } + if !self.projection.is_empty() { + struct_ser.serialize_field("projection", &self.projection)?; + } struct_ser.end() } } @@ -5591,6 +5597,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { "expr", "default_filter_selectivity", "defaultFilterSelectivity", + "projection", ]; #[allow(clippy::enum_variant_names)] @@ -5598,6 +5605,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { Input, Expr, DefaultFilterSelectivity, + Projection, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -5622,6 +5630,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { "input" => Ok(GeneratedField::Input), "expr" => Ok(GeneratedField::Expr), "defaultFilterSelectivity" | "default_filter_selectivity" => Ok(GeneratedField::DefaultFilterSelectivity), + "projection" => Ok(GeneratedField::Projection), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -5644,6 +5653,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { let mut input__ = None; let mut expr__ = None; let mut default_filter_selectivity__ = None; + let mut projection__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Input => { @@ -5666,12 +5676,22 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) ; } + GeneratedField::Projection => { + if projection__.is_some() { + return Err(serde::de::Error::duplicate_field("projection")); + } + projection__ = + Some(map_.next_value::>>()? + .into_iter().map(|x| x.0).collect()) + ; + } } } Ok(FilterExecNode { input: input__, expr: expr__, default_filter_selectivity: default_filter_selectivity__.unwrap_or_default(), + projection: projection__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 2ce8004e32486..885e032f06004 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1466,6 +1466,8 @@ pub struct FilterExecNode { pub expr: ::core::option::Option, #[prost(uint32, tag = "3")] pub default_filter_selectivity: u32, + #[prost(uint32, repeated, tag = "9")] + pub projection: ::prost::alloc::vec::Vec, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index e622af745062f..2bab802bf026d 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -178,7 +178,18 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { ) })?; let filter_selectivity = filter.default_filter_selectivity.try_into(); - let filter = FilterExec::try_new(predicate, input)?; + let projection = if !filter.projection.is_empty() { + Some( + filter + .projection + .iter() + .map(|i| *i as usize) + .collect::>(), + ) + } else { + None + }; + let filter = FilterExec::try_new(predicate, input)?.with_projection(projection)?; match filter_selectivity { Ok(filter_selectivity) => Ok(Arc::new( filter.with_default_selectivity(filter_selectivity)?, @@ -1167,6 +1178,9 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { extension_codec, )?), default_filter_selectivity: exec.default_selectivity() as u32, + projection: exec.projection().as_ref().map_or_else(Vec::new, |v| { + v.iter().map(|x| *x as u32).collect::>() + }), }, ))), });