diff --git a/Cargo.lock b/Cargo.lock index e65a6356f..26f2297ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4878,8 +4878,10 @@ version = "0.11.0" dependencies = [ "arrow-array", "arrow-schema", + "arrow-select", "derive_more", "error-stack", + "index_vec", "itertools 0.11.0", "parking_lot 0.12.1", "sparrow-arrow", diff --git a/crates/sparrow-arrow/src/batch.rs b/crates/sparrow-arrow/src/batch.rs index 417b65560..245da081c 100644 --- a/crates/sparrow-arrow/src/batch.rs +++ b/crates/sparrow-arrow/src/batch.rs @@ -1,7 +1,6 @@ +use arrow_array::cast::AsArray; use arrow_array::types::{TimestampNanosecondType, UInt64Type}; -use arrow_array::{ - Array, ArrayRef, ArrowPrimitiveType, BooleanArray, TimestampNanosecondArray, UInt64Array, -}; +use arrow_array::{Array, ArrayRef, ArrowPrimitiveType, TimestampNanosecondArray, UInt64Array}; use error_stack::{IntoReport, ResultExt}; use itertools::Itertools; @@ -253,54 +252,6 @@ impl Batch { } } - // TODO: Use "filter-bits" to avoid eagerly creating new columns. - pub fn filter(&self, predicate: &BooleanArray) -> error_stack::Result { - match &self.data { - Some(info) => { - let filter = arrow_select::filter::FilterBuilder::new(predicate) - .optimize() - .build(); - let data = filter - .filter(&info.data) - .into_report() - .change_context(Error::Internal)?; - - // TODO: This is unnecessary if `time` and `key_hash` were already in the batch. - // We should figure out how to avoid the redundant work. - let time = filter - .filter(&info.time) - .into_report() - .change_context(Error::Internal)?; - let subsort = filter - .filter(&info.subsort) - .into_report() - .change_context(Error::Internal)?; - let key_hash = filter - .filter(&info.key_hash) - .into_report() - .change_context(Error::Internal)?; - let info = BatchInfo { - data, - time, - subsort, - key_hash, - // TODO: Should the `*_present_time` be updated to reflect actual contents of batch? - min_present_time: info.min_present_time, - max_present_time: info.max_present_time, - }; - - Ok(Self { - data: Some(info), - up_to_time: self.up_to_time, - }) - } - None => { - assert_eq!(predicate.len(), 0); - Ok(self.clone()) - } - } - } - pub fn slice(&self, offset: usize, length: usize) -> Self { match &self.data { Some(info) => { @@ -446,15 +397,15 @@ impl BatchInfo { } pub(crate) fn time(&self) -> &TimestampNanosecondArray { - downcast_primitive_array(self.time.as_ref()).expect("type checked in constructor") + self.time.as_primitive() } pub(crate) fn subsort(&self) -> &UInt64Array { - downcast_primitive_array(self.subsort.as_ref()).expect("type checked in constructor") + self.subsort.as_primitive() } pub(crate) fn key_hash(&self) -> &UInt64Array { - downcast_primitive_array(self.key_hash.as_ref()).expect("type checked in constructor") + self.key_hash.as_primitive() } } diff --git a/crates/sparrow-transforms/Cargo.toml b/crates/sparrow-transforms/Cargo.toml index 74cf6a7ca..db5d87d84 100644 --- a/crates/sparrow-transforms/Cargo.toml +++ b/crates/sparrow-transforms/Cargo.toml @@ -12,6 +12,7 @@ Implementation of transforms and pipeline for executing them. [dependencies] arrow-array.workspace = true arrow-schema.workspace = true +arrow-select.workspace = true derive_more.workspace = true error-stack.workspace = true itertools.workspace = true @@ -23,6 +24,7 @@ sparrow-scheduler = { path = "../sparrow-scheduler" } tracing.workspace = true [dev-dependencies] +index_vec.workspace = true [lib] doctest = false diff --git a/crates/sparrow-transforms/src/lib.rs b/crates/sparrow-transforms/src/lib.rs index 439fdbb7a..48641e74b 100644 --- a/crates/sparrow-transforms/src/lib.rs +++ b/crates/sparrow-transforms/src/lib.rs @@ -16,6 +16,7 @@ //! must be executed to move data to the appropriate partitions. mod project; +mod select; mod transform; mod transform_pipeline; diff --git a/crates/sparrow-transforms/src/project.rs b/crates/sparrow-transforms/src/project.rs index 0fe68f461..a382e0596 100644 --- a/crates/sparrow-transforms/src/project.rs +++ b/crates/sparrow-transforms/src/project.rs @@ -41,3 +41,115 @@ impl Transform for Project { std::any::type_name::() } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use arrow_array::{Int64Array, StructArray, TimestampNanosecondArray, UInt64Array}; + use arrow_schema::{Field, Fields}; + use index_vec::index_vec; + use sparrow_arrow::{scalar_value::ScalarValue, RowTime}; + + fn test_batch() -> Batch { + let input_fields = Fields::from(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Int64, true), + ]); + let input_data = Arc::new(StructArray::new( + input_fields.clone(), + vec![ + Arc::new(Int64Array::from(vec![0, 1, 2, 3])), + Arc::new(Int64Array::from(vec![4, 7, 10, 11])), + ], + None, + )); + let time = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2, 3])); + let subsort = Arc::new(UInt64Array::from(vec![0, 1, 2, 3])); + let key_hash = Arc::new(UInt64Array::from(vec![0, 1, 2, 3])); + let input_batch = Batch::new_with_data( + input_data, + time, + subsort, + key_hash, + RowTime::from_timestamp_ns(3), + ); + input_batch + } + + #[test] + fn test_project() { + let output_fields = Fields::from(vec![ + Field::new("b", DataType::Int64, true), + Field::new("ab", DataType::Int64, true), + ]); + let output_type = DataType::Struct(output_fields.clone()); + + let input = test_batch(); + let input_type = input.data().unwrap().data_type(); + let exprs: Exprs = index_vec![ + sparrow_physical::Expr { + name: "input".into(), + literal_args: vec![], + args: vec![], + result_type: input_type.clone(), + }, + sparrow_physical::Expr { + name: "field_ref".into(), + literal_args: vec![ScalarValue::Utf8(Some("a".to_owned()))], + args: vec![0.into()], + result_type: DataType::Int64, + }, + sparrow_physical::Expr { + name: "field_ref".into(), + literal_args: vec![ScalarValue::Utf8(Some("b".to_owned()))], + args: vec![0.into()], + result_type: DataType::Int64, + }, + sparrow_physical::Expr { + name: "add".into(), + literal_args: vec![], + args: vec![1.into(), 2.into()], + result_type: DataType::Int64, + }, + sparrow_physical::Expr { + name: "literal".into(), + literal_args: vec![ScalarValue::Int64(Some(10))], + args: vec![], + result_type: DataType::Int64, + }, + sparrow_physical::Expr { + name: "record".into(), + literal_args: vec![], + args: vec![2.into(), 3.into()], + result_type: output_type.clone(), + }, + ]; + let project = Project::try_new(&exprs, &output_type).unwrap(); + let input = test_batch(); + let actual = project.apply(input).unwrap(); + + // Construct the expected output + let output_data = Arc::new(StructArray::new( + output_fields.clone(), + vec![ + Arc::new(Int64Array::from(vec![4, 7, 10, 11])), + Arc::new(Int64Array::from(vec![4, 8, 12, 14])), + ], + None, + )); + let time = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2, 3])); + let subsort = Arc::new(UInt64Array::from(vec![0, 1, 2, 3])); + let key_hash = Arc::new(UInt64Array::from(vec![0, 1, 2, 3])); + let expected = Batch::new_with_data( + output_data, + time, + subsort, + key_hash, + RowTime::from_timestamp_ns(3), + ); + + assert_eq!(expected, actual); + } +} diff --git a/crates/sparrow-transforms/src/select.rs b/crates/sparrow-transforms/src/select.rs new file mode 100644 index 000000000..40f0c750d --- /dev/null +++ b/crates/sparrow-transforms/src/select.rs @@ -0,0 +1,203 @@ +use arrow_array::{cast::AsArray, BooleanArray}; +use arrow_schema::DataType; +use error_stack::{IntoReport, ResultExt}; +use sparrow_arrow::Batch; + +use sparrow_expressions::ExpressionExecutor; +use sparrow_physical::Exprs; + +use crate::transform::{Error, Transform}; + +/// Transform for select. +pub struct Select { + evaluators: ExpressionExecutor, +} + +impl Select { + pub fn try_new(exprs: &Exprs, output_type: &DataType) -> error_stack::Result { + let evaluators = ExpressionExecutor::try_new(exprs.as_vec()) + .change_context_lazy(|| Error::CreateTransform("select"))?; + error_stack::ensure!( + output_type == evaluators.output_type(), + Error::MismatchedResultType { + transform: "select", + expected: output_type.clone(), + actual: evaluators.output_type().clone() + } + ); + Ok(Self { evaluators }) + } +} + +impl Transform for Select { + fn apply(&self, batch: Batch) -> error_stack::Result { + assert!(!batch.is_empty()); + + let condition = self + .evaluators + .execute(&batch) + .change_context(Error::ExecuteTransform("select"))?; + + // Expressions must evaluate to a single boolean condition. + let condition = condition.as_boolean(); + + let result = filter(batch, condition)?; + Ok(result) + } + + fn name(&self) -> &'static str { + std::any::type_name::() + } +} + +// TODO: Use "filter-bits" to avoid eagerly creating new columns. +fn filter(batch: Batch, predicate: &BooleanArray) -> error_stack::Result { + let error = || Error::ExecuteTransform("select"); + let up_to_time = batch.up_to_time; + match batch.data() { + Some(data) => { + let filter = arrow_select::filter::FilterBuilder::new(predicate).build(); + let data = filter + .filter(data) + .into_report() + .change_context_lazy(error)?; + + // TODO: This is unnecessary if `time` and `key_hash` were already in the batch. + // We should figure out how to avoid the redundant work. + let time = batch.time().expect("time column"); + let subsort = batch.subsort().expect("subsort column"); + let key_hash = batch.key_hash().expect("key column"); + let time = filter + .filter(time) + .into_report() + .change_context_lazy(error)?; + let subsort = filter + .filter(subsort) + .into_report() + .change_context_lazy(error)?; + let key_hash = filter + .filter(key_hash) + .into_report() + .change_context_lazy(error)?; + + Ok(Batch::new_with_data( + data, time, subsort, key_hash, up_to_time, + )) + } + None => { + assert_eq!(predicate.len(), 0); + Ok(batch.clone()) + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use arrow_array::{Int64Array, StructArray, TimestampNanosecondArray, UInt64Array}; + use arrow_schema::{Field, Fields}; + use index_vec::index_vec; + use sparrow_arrow::{scalar_value::ScalarValue, RowTime}; + + fn test_batch() -> Batch { + let input_fields = Fields::from(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Int64, true), + ]); + let input_data = Arc::new(StructArray::new( + input_fields.clone(), + vec![ + Arc::new(Int64Array::from(vec![0, 1, 2, 3])), + Arc::new(Int64Array::from(vec![4, 7, 10, 11])), + ], + None, + )); + let time = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2, 3])); + let subsort = Arc::new(UInt64Array::from(vec![0, 1, 2, 3])); + let key_hash = Arc::new(UInt64Array::from(vec![0, 1, 2, 3])); + let input_batch = Batch::new_with_data( + input_data, + time, + subsort, + key_hash, + RowTime::from_timestamp_ns(3), + ); + input_batch + } + + #[test] + fn test_select() { + let output_type = DataType::Boolean; + let input = test_batch(); + let input_type = input.data().unwrap().data_type(); + let exprs: Exprs = index_vec![ + sparrow_physical::Expr { + name: "input".into(), + literal_args: vec![], + args: vec![], + result_type: input_type.clone(), + }, + sparrow_physical::Expr { + name: "field_ref".into(), + literal_args: vec![ScalarValue::Utf8(Some("a".to_owned()))], + args: vec![0.into()], + result_type: DataType::Int64, + }, + sparrow_physical::Expr { + name: "field_ref".into(), + literal_args: vec![ScalarValue::Utf8(Some("b".to_owned()))], + args: vec![0.into()], + result_type: DataType::Int64, + }, + sparrow_physical::Expr { + name: "add".into(), + literal_args: vec![], + args: vec![1.into(), 2.into()], + result_type: DataType::Int64, + }, + sparrow_physical::Expr { + name: "literal".into(), + literal_args: vec![ScalarValue::Int64(Some(10))], + args: vec![], + result_type: DataType::Int64, + }, + sparrow_physical::Expr { + name: "gt_primitive".into(), + literal_args: vec![], + args: vec![3.into(), 4.into()], + result_type: output_type.clone(), + }, + ]; + let select = Select::try_new(&exprs, &output_type).unwrap(); + let input = test_batch(); + let actual = select.apply(input).unwrap(); + + // Construct the expected output + let output_fields = Fields::from(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Int64, true), + ]); + let output_data = Arc::new(StructArray::new( + output_fields.clone(), + vec![ + Arc::new(Int64Array::from(vec![2, 3])), + Arc::new(Int64Array::from(vec![10, 11])), + ], + None, + )); + let time = Arc::new(TimestampNanosecondArray::from(vec![2, 3])); + let subsort = Arc::new(UInt64Array::from(vec![2, 3])); + let key_hash = Arc::new(UInt64Array::from(vec![2, 3])); + let expected = Batch::new_with_data( + output_data, + time, + subsort, + key_hash, + RowTime::from_timestamp_ns(3), + ); + + assert_eq!(expected, actual); + } +} diff --git a/crates/sparrow-transforms/src/transform.rs b/crates/sparrow-transforms/src/transform.rs index cd73d15b4..18e91cc81 100644 --- a/crates/sparrow-transforms/src/transform.rs +++ b/crates/sparrow-transforms/src/transform.rs @@ -28,6 +28,6 @@ pub(crate) trait Transform: Send + Sync { std::any::type_name::() } - /// Apply the transfrom to the given input batch. + /// Apply the transform to the given input batch. fn apply(&self, batch: Batch) -> error_stack::Result; } diff --git a/crates/sparrow-transforms/src/transform_pipeline.rs b/crates/sparrow-transforms/src/transform_pipeline.rs index 2f6aff3ef..1687ed64c 100644 --- a/crates/sparrow-transforms/src/transform_pipeline.rs +++ b/crates/sparrow-transforms/src/transform_pipeline.rs @@ -112,6 +112,12 @@ impl TransformPipeline { kind: (&step.kind).into(), })?, ), + StepKind::Filter => Box::new( + crate::select::Select::try_new(&step.exprs, &step.result_type) + .change_context_lazy(|| Error::CreatingTransform { + kind: (&step.kind).into(), + })?, + ), unsupported => { error_stack::bail!(Error::UnsupportedStepKind { kind: unsupported.into() @@ -221,6 +227,7 @@ impl Pipeline for TransformPipeline { ); // If the batch is non empty, process it. + // TODO: Propagate empty batches to further the watermark. if !batch.is_empty() { let mut batch = batch; for transform in self.transforms.iter() {