From 0e4bc08dff2ba5dbebd357fdaa16feecd456973e Mon Sep 17 00:00:00 2001 From: Ben Chambers <35960+bjchambers@users.noreply.github.com> Date: Mon, 2 Oct 2023 11:26:40 -0700 Subject: [PATCH] ref: Simplify expressions in physical plan (#782) Change expressions to always produce the result of the expression. This simplifies evaluation, without affecting the expressiviness: 1. We can now produce a primitive result (such as a boolean or integer) if that is the type of the final expression. 2. We can still produce records by adding a `record` expression to the end (which is the standard way of creating intermediate records). 3. We can still reference parts of a record input using `FieldRef`. This eliminates the weird "some expressions are marked as output" pattern. It also means that we can operate on `ArrayRef` generally and use `DataType` as the type, instead of sometimes using `RecordBatch` and `SchemaRef`. Other smaller changes in this PR: - Move the `Expressions` out of the `StepKind`. This allows the `StepKind` to serve as an identifier of the step, regardless of the expressions it computes, simplifying some of the de-duplication work in the logical-to-physical compilation. - Rename the `column` expression to `input`, since it accesses the input data to the expression evaluation. - Addition of a method to "intern" expression names, replacing a `&str` with `&'static str` for expression operators available in the binary. --- Cargo.lock | 6 + Cargo.toml | 2 +- crates/sparrow-arrow/src/batch.rs | 133 +++++++----------- crates/sparrow-backend/Cargo.toml | 1 + .../sparrow-backend/src/pipeline_schedule.rs | 43 +++--- crates/sparrow-execution/Cargo.toml | 1 + crates/sparrow-execution/src/lib.rs | 119 ++++++++-------- crates/sparrow-expressions/src/evaluators.rs | 24 +++- .../src/evaluators/column.rs | 38 ----- .../src/evaluators/input.rs | 23 +++ crates/sparrow-expressions/src/executor.rs | 28 ++-- crates/sparrow-expressions/src/lib.rs | 1 + crates/sparrow-expressions/src/work_area.rs | 4 +- crates/sparrow-logical/Cargo.toml | 2 + crates/sparrow-logical/src/expr.rs | 14 +- crates/sparrow-logical/src/grouping.rs | 6 +- ...parrow_logical__expr__tests__function.snap | 4 +- crates/sparrow-physical/Cargo.toml | 1 + crates/sparrow-physical/src/expr.rs | 45 +----- crates/sparrow-physical/src/plan.rs | 2 +- crates/sparrow-physical/src/step.rs | 63 +++++---- crates/sparrow-transforms/src/project.rs | 40 ++---- crates/sparrow-transforms/src/transform.rs | 9 ++ .../src/transform_pipeline.rs | 14 +- python/Cargo.lock | 1 + 25 files changed, 299 insertions(+), 325 deletions(-) delete mode 100644 crates/sparrow-expressions/src/evaluators/column.rs create mode 100644 crates/sparrow-expressions/src/evaluators/input.rs diff --git a/Cargo.lock b/Cargo.lock index aad752686..e65a6356f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4304,6 +4304,7 @@ dependencies = [ "index_vec", "sparrow-core", "sparrow-physical", + "uuid 1.4.1", ] [[package]] @@ -4432,6 +4433,7 @@ dependencies = [ "sparrow-testing", "sparrow-transforms", "tokio", + "uuid 1.4.1", ] [[package]] @@ -4529,7 +4531,9 @@ name = "sparrow-logical" version = "0.11.0" dependencies = [ "arrow-schema", + "decorum", "derive_more", + "enum-as-inner", "error-stack", "hashbrown 0.14.0", "insta", @@ -4655,6 +4659,7 @@ dependencies = [ "serde_yaml", "sparrow-arrow", "strum_macros 0.25.2", + "uuid 1.4.1", ] [[package]] @@ -5830,6 +5835,7 @@ checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" dependencies = [ "getrandom 0.2.10", "rand 0.8.5", + "serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 758966c74..584325eef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -139,7 +139,7 @@ tracing-subscriber = { version = "0.3.17", features = [ "json", ] } url = { version = "2.3.1", features = ["serde"] } -uuid = { version = "1.3.0", features = ["v4"] } +uuid = { version = "1.3.0", features = ["v4", "serde" ] } [workspace.dependencies.rocksdb] # This disables compression algorithms that cause issues during linking due to diff --git a/crates/sparrow-arrow/src/batch.rs b/crates/sparrow-arrow/src/batch.rs index 572060462..417b65560 100644 --- a/crates/sparrow-arrow/src/batch.rs +++ b/crates/sparrow-arrow/src/batch.rs @@ -1,10 +1,7 @@ -use arrow::error::ArrowError; use arrow_array::types::{TimestampNanosecondType, UInt64Type}; use arrow_array::{ - Array, ArrayRef, ArrowPrimitiveType, BooleanArray, RecordBatch, TimestampNanosecondArray, - UInt64Array, + Array, ArrayRef, ArrowPrimitiveType, BooleanArray, TimestampNanosecondArray, UInt64Array, }; -use arrow_schema::SchemaRef; use error_stack::{IntoReport, ResultExt}; use itertools::Itertools; @@ -36,15 +33,11 @@ impl Batch { pub fn num_rows(&self) -> usize { match &self.data { - Some(data) => data.batch.num_rows(), + Some(data) => data.data.len(), None => 0, } } - pub fn into_record_batch(self) -> Option { - self.data.map(|data| data.batch) - } - pub fn time(&self) -> Option<&TimestampNanosecondArray> { self.data.as_ref().map(|data| data.time()) } @@ -66,11 +59,8 @@ impl Batch { self.data.as_ref().map(|info| info.max_present_time) } - pub fn record_batch(&self) -> Option<&RecordBatch> { - match &self.data { - Some(info) => Some(&info.batch), - None => None, - } + pub fn data(&self) -> Option<&ArrayRef> { + self.data.as_ref().map(|info| &info.data) } /// Create a new `Batch` containing the given batch data. @@ -83,33 +73,33 @@ impl Batch { /// (b) all rows in this batch or less than or equal to `up_to_time` /// (c) all future rows are greater than `up_to_time`. pub fn new_with_data( - batch: RecordBatch, + data: ArrayRef, time: ArrayRef, subsort: ArrayRef, key_hash: ArrayRef, up_to_time: RowTime, ) -> Self { - debug_assert_eq!(batch.num_rows(), time.len()); - debug_assert_eq!(batch.num_rows(), subsort.len()); - debug_assert_eq!(batch.num_rows(), key_hash.len()); + debug_assert_eq!(data.len(), time.len()); + debug_assert_eq!(data.len(), subsort.len()); + debug_assert_eq!(data.len(), key_hash.len()); debug_assert_eq!(time.data_type(), &TimestampNanosecondType::DATA_TYPE); debug_assert_eq!(subsort.data_type(), &UInt64Type::DATA_TYPE); debug_assert_eq!(key_hash.data_type(), &UInt64Type::DATA_TYPE); - let data = if batch.num_rows() == 0 { + let data = if data.len() == 0 { None } else { - Some(BatchInfo::new(batch, time, subsort, key_hash)) + Some(BatchInfo::new(data, time, subsort, key_hash)) }; Self { data, up_to_time } } /// Return a new `Batch` with the same time properties, but new data. - pub fn with_projection(&self, new_batch: RecordBatch) -> Self { - assert_eq!(new_batch.num_rows(), self.num_rows()); + pub fn with_projection(&self, new_data: ArrayRef) -> Self { + assert_eq!(new_data.len(), self.num_rows()); Self { data: self.data.as_ref().map(|data| BatchInfo { - batch: new_batch, + data: new_data, time: data.time.clone(), subsort: data.subsort.clone(), key_hash: data.key_hash.clone(), @@ -160,11 +150,7 @@ impl Batch { }) } - pub fn concat( - schema: &SchemaRef, - batches: Vec, - up_to_time: RowTime, - ) -> error_stack::Result { + pub fn concat(batches: Vec, up_to_time: RowTime) -> error_stack::Result { // TODO: Add debug assertions for batch ordering? if batches.iter().all(|batch| batch.is_empty()) { return Ok(Batch::new_empty(up_to_time)); @@ -209,16 +195,17 @@ impl Batch { .change_context(Error::Internal)?; let batches: Vec<_> = batches - .into_iter() - .flat_map(|batch| batch.into_record_batch()) + .iter() + .flat_map(|batch| batch.data()) + .map(|data| data.as_ref()) .collect(); - let batch = arrow_select::concat::concat_batches(schema, &batches) + let data = arrow_select::concat::concat(&batches) .into_report() .change_context(Error::Internal)?; Ok(Self { data: Some(BatchInfo { - batch, + data, time, subsort, key_hash, @@ -232,8 +219,9 @@ impl Batch { pub fn take(&self, indices: &UInt64Array) -> error_stack::Result { match &self.data { Some(info) => { - let batch = - take_record_batch(&info.batch, indices).change_context(Error::Internal)?; + let data = arrow_select::take::take(info.data.as_ref(), indices, None) + .into_report() + .change_context(Error::Internal)?; let time = arrow_select::take::take(info.time.as_ref(), indices, None) .into_report() .change_context(Error::Internal)?; @@ -244,7 +232,7 @@ impl Batch { .into_report() .change_context(Error::Internal)?; let info = BatchInfo { - batch, + data, time, subsort, key_hash, @@ -272,16 +260,8 @@ impl Batch { let filter = arrow_select::filter::FilterBuilder::new(predicate) .optimize() .build(); - let columns: Vec<_> = info - .batch - .columns() - .iter() - .map(|column| filter.filter(column)) - .try_collect() - .into_report() - .change_context(Error::Internal)?; - - let batch = RecordBatch::try_new(info.batch.schema(), columns) + let data = filter + .filter(&info.data) .into_report() .change_context(Error::Internal)?; @@ -300,7 +280,7 @@ impl Batch { .into_report() .change_context(Error::Internal)?; let info = BatchInfo { - batch, + data, time, subsort, key_hash, @@ -325,7 +305,7 @@ impl Batch { match &self.data { Some(info) => { let info = BatchInfo { - batch: info.batch.slice(offset, length), + data: info.data.slice(offset, length), time: info.time.slice(offset, length), subsort: info.subsort.slice(offset, length), key_hash: info.key_hash.slice(offset, length), @@ -354,6 +334,8 @@ impl Batch { ) -> Self { use std::sync::Arc; + use arrow_array::StructArray; + let time: TimestampNanosecondArray = time.into(); let subsort: UInt64Array = (0..(time.len() as u64)).collect_vec().into(); let key_hash: UInt64Array = key_hash.into(); @@ -362,12 +344,14 @@ impl Batch { let subsort: ArrayRef = Arc::new(subsort); let key_hash: ArrayRef = Arc::new(key_hash); - let batch = - RecordBatch::try_new(MINIMAL_SCHEMA.clone(), vec![time.clone(), key_hash.clone()]) - .unwrap(); + let data = Arc::new(StructArray::new( + MINIMAL_SCHEMA.fields().clone(), + vec![time.clone(), key_hash.clone()], + None, + )); Batch::new_with_data( - batch, + data, time, subsort, key_hash, @@ -378,7 +362,7 @@ impl Batch { #[derive(Clone, Debug)] pub(crate) struct BatchInfo { - pub(crate) batch: RecordBatch, + pub(crate) data: ArrayRef, pub(crate) time: ArrayRef, pub(crate) subsort: ArrayRef, pub(crate) key_hash: ArrayRef, @@ -388,7 +372,7 @@ pub(crate) struct BatchInfo { impl PartialEq for BatchInfo { fn eq(&self, other: &Self) -> bool { - self.batch == other.batch + self.data.as_ref() == other.data.as_ref() && self.time.as_ref() == other.time.as_ref() && self.min_present_time == other.min_present_time && self.max_present_time == other.max_present_time @@ -396,11 +380,11 @@ impl PartialEq for BatchInfo { } impl BatchInfo { - fn new(batch: RecordBatch, time: ArrayRef, subsort: ArrayRef, key_hash: ArrayRef) -> Self { - debug_assert_eq!(batch.num_rows(), time.len()); - debug_assert_eq!(batch.num_rows(), subsort.len()); - debug_assert_eq!(batch.num_rows(), key_hash.len()); - debug_assert!(batch.num_rows() > 0); + fn new(data: ArrayRef, time: ArrayRef, subsort: ArrayRef, key_hash: ArrayRef) -> Self { + debug_assert_eq!(data.len(), time.len()); + debug_assert_eq!(data.len(), subsort.len()); + debug_assert_eq!(data.len(), key_hash.len()); + debug_assert!(data.len() > 0); debug_assert_eq!(time.null_count(), 0); let time_column: &TimestampNanosecondArray = @@ -417,7 +401,7 @@ impl BatchInfo { RowTime::from_timestamp_ns(time_column.values()[time_column.len() - 1]); Self { - batch, + data, time, subsort, key_hash, @@ -439,13 +423,13 @@ impl BatchInfo { let slice_start = time_column .values() .partition_point(|time| RowTime::from_timestamp_ns(*time) <= time_inclusive); - let slice_len = self.batch.num_rows() - slice_start; + let slice_len = self.data.len() - slice_start; let max_result_time = RowTime::from_timestamp_ns(time_column.values()[slice_start - 1]); let min_self_time = RowTime::from_timestamp_ns(time_column.values()[slice_start]); let result = Self { - batch: self.batch.slice(0, slice_start), + data: self.data.slice(0, slice_start), time: self.time.slice(0, slice_start), subsort: self.subsort.slice(0, slice_start), key_hash: self.key_hash.slice(0, slice_start), @@ -453,7 +437,7 @@ impl BatchInfo { max_present_time: max_result_time, }; - self.batch = self.batch.slice(slice_start, slice_len); + self.data = self.data.slice(slice_start, slice_len); self.time = self.time.slice(slice_start, slice_len); self.key_hash = self.key_hash.slice(slice_start, slice_len); self.min_present_time = min_self_time; @@ -482,20 +466,6 @@ pub enum Error { impl error_stack::Context for Error {} -pub fn take_record_batch( - batch: &RecordBatch, - indices: &UInt64Array, -) -> error_stack::Result { - // Produce batches based on indices - let columns = batch - .columns() - .iter() - .map(|c| arrow_select::take::take(c.as_ref(), indices, None).into_report()) - .try_collect()?; - - RecordBatch::try_new(batch.schema(), columns).into_report() -} - #[cfg(any(test, feature = "testing"))] #[static_init::dynamic] static MINIMAL_SCHEMA: arrow_schema::SchemaRef = { @@ -517,7 +487,6 @@ mod tests { use crate::testing::arb_arrays::arb_batch; use crate::{Batch, RowTime}; - use arrow_select::concat::concat_batches; use itertools::Itertools; use proptest::prelude::*; @@ -546,18 +515,18 @@ mod tests { prop_assert_eq!(remainder.up_to_time, original.up_to_time); // create the concatenated result - let concatenated = match (result.record_batch(), remainder.record_batch()) { + let concatenated = match (result.data(), remainder.data()) { (None, None) => unreachable!(), (Some(a), None) => a.clone(), (None, Some(b)) => b.clone(), - (Some(a), Some(b)) => concat_batches(&a.schema(), &[a.clone(), b.clone()]).unwrap(), + (Some(a), Some(b)) => arrow_select::concat::concat(&[a.as_ref(), b.as_ref()]).unwrap(), }; - prop_assert_eq!(&concatenated, original.record_batch().unwrap()); + prop_assert_eq!(&concatenated, original.data().unwrap()); prop_assert!(result.data.is_some()); let result = result.data.unwrap(); - prop_assert_eq!(result.batch.num_rows(), result.time.len()); + prop_assert_eq!(result.data.len(), result.time.len()); prop_assert_eq!(result.time().values()[0], i64::from(result.min_present_time)); prop_assert_eq!(result.time().values()[result.time.len() - 1], i64::from(result.max_present_time)); @@ -568,7 +537,7 @@ mod tests { prop_assert!(remainder.data.is_some()); let remainder = remainder.data.unwrap(); - prop_assert_eq!(remainder.batch.num_rows(), remainder.time.len()); + prop_assert_eq!(remainder.data.len(), remainder.time.len()); prop_assert_eq!(remainder.time().values()[0], i64::from(remainder.min_present_time)); prop_assert_eq!(remainder.time().values()[remainder.time.len() - 1], i64::from(remainder.max_present_time)); } diff --git a/crates/sparrow-backend/Cargo.toml b/crates/sparrow-backend/Cargo.toml index 78481054a..937967dab 100644 --- a/crates/sparrow-backend/Cargo.toml +++ b/crates/sparrow-backend/Cargo.toml @@ -14,6 +14,7 @@ arrow-schema.workspace = true index_vec.workspace = true sparrow-core = { path = "../sparrow-core" } sparrow-physical = { path = "../sparrow-physical" } +uuid.workspace = true [dev-dependencies] diff --git a/crates/sparrow-backend/src/pipeline_schedule.rs b/crates/sparrow-backend/src/pipeline_schedule.rs index ed6930c79..ef7dfdcf1 100644 --- a/crates/sparrow-backend/src/pipeline_schedule.rs +++ b/crates/sparrow-backend/src/pipeline_schedule.rs @@ -67,7 +67,7 @@ fn is_pipeline_breaker(index: StepId, step: &Step, references: &IndexVec { + StepKind::Read { .. } | StepKind::Merge | StepKind::Repartition { .. } => { debug_println!( DEBUG_SCHEDULING, "Step {index} is new pipeline based on kind {:?}", @@ -81,66 +81,65 @@ fn is_pipeline_breaker(index: StepId, step: &Step, references: &IndexVec separate pipeline since 0 has 2 consumers Step { id: 3.into(), - kind: StepKind::Project { - exprs: Exprs::empty(), - }, + kind: StepKind::Project, inputs: vec![0.into()], - schema: schema.clone(), + result_type: result_type.clone(), + exprs: Exprs::new(), }, // 4: project 2 -> same pipeline since only consumer Step { id: 4.into(), - kind: StepKind::Project { - exprs: Exprs::empty(), - }, + kind: StepKind::Project, inputs: vec![2.into()], - schema: schema.clone(), + result_type: result_type.clone(), + exprs: Exprs::new(), }, // 5: merge 3 and 4 -> new pipeline since merge Step { id: 5.into(), kind: StepKind::Merge, inputs: vec![3.into(), 4.into()], - schema, + result_type, + exprs: Exprs::new(), }, ]; diff --git a/crates/sparrow-execution/Cargo.toml b/crates/sparrow-execution/Cargo.toml index 43c778775..09a673df3 100644 --- a/crates/sparrow-execution/Cargo.toml +++ b/crates/sparrow-execution/Cargo.toml @@ -18,6 +18,7 @@ sparrow-physical = { path = "../sparrow-physical" } sparrow-transforms = { path = "../sparrow-transforms" } sparrow-scheduler = { path = "../sparrow-scheduler" } tokio.workspace = true +uuid.workspace = true [dev-dependencies] arrow-array.workspace = true diff --git a/crates/sparrow-execution/src/lib.rs b/crates/sparrow-execution/src/lib.rs index a05fabf57..b03e0baae 100644 --- a/crates/sparrow-execution/src/lib.rs +++ b/crates/sparrow-execution/src/lib.rs @@ -16,8 +16,8 @@ mod tests { use std::sync::Arc; use arrow_array::cast::AsArray; - use arrow_array::{Int64Array, RecordBatch, TimestampNanosecondArray, UInt64Array}; - use arrow_schema::{DataType, Field, Schema, SchemaRef}; + use arrow_array::{Int64Array, StructArray, TimestampNanosecondArray, UInt64Array}; + use arrow_schema::{DataType, Field, Fields}; use error_stack::{IntoReport, ResultExt}; use index_vec::index_vec; use parking_lot::Mutex; @@ -44,31 +44,31 @@ mod tests { let (input_tx, input_rx) = tokio::sync::mpsc::channel(10); let (output_tx, mut output_rx) = tokio::sync::mpsc::channel(10); - let input_schema = Arc::new(Schema::new(vec![ + let input_fields = Fields::from(vec![ Field::new("a", DataType::Int64, true), Field::new("b", DataType::Int64, true), Field::new("c", DataType::Int64, true), - ])); + ]); - let output_schema = Arc::new(Schema::new(vec![ + let output_fields = Fields::from(vec![ Field::new("ab", DataType::Int64, true), Field::new("abc", DataType::Int64, true), - ])); + ]); - let input_batch = RecordBatch::try_new( - input_schema.clone(), + let input_data = Arc::new(StructArray::new( + input_fields.clone(), vec![ Arc::new(Int64Array::from(vec![0, 1, 2, 3])), Arc::new(Int64Array::from(vec![4, 7, 10, 11])), Arc::new(Int64Array::from(vec![Some(21), None, Some(387), Some(87)])), ], - ) - .unwrap(); + None, + )); let time = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2, 3])); let subsort = Arc::new(UInt64Array::from(vec![0, 1, 2, 3])); let key_hash = Arc::new(UInt64Array::from(vec![0, 1, 2, 3])); let input_batch = Batch::new_with_data( - input_batch, + input_data, time, subsort, key_hash, @@ -79,16 +79,16 @@ mod tests { execute( "hello".to_owned(), - input_schema, + DataType::Struct(input_fields), input_rx, - output_schema, + DataType::Struct(output_fields), output_tx, ) .await .unwrap(); let output = output_rx.recv().await.unwrap(); - let output = output.into_record_batch().unwrap(); + let output = output.data().unwrap().as_struct(); let ab = output.column_by_name("ab").unwrap(); let abc = output.column_by_name("abc").unwrap(); assert_eq!(ab.as_primitive(), &Int64Array::from(vec![4, 8, 12, 14])); @@ -101,9 +101,9 @@ mod tests { /// Execute a physical plan. pub async fn execute( query_id: String, - input_schema: SchemaRef, + input_type: DataType, mut input: tokio::sync::mpsc::Receiver, - output_schema: SchemaRef, + output_type: DataType, output: tokio::sync::mpsc::Sender, ) -> error_stack::Result<(), Error> { let mut worker_pool = WorkerPool::start(query_id).change_context(Error::Creating)?; @@ -114,56 +114,61 @@ mod tests { // - We create a hard-coded "project" step (1) // - We output the results to the channel. + let table_id = uuid::uuid!("67e55044-10b1-426f-9247-bb680e5fe0c8"); + let scan = sparrow_physical::Step { id: 0.into(), - kind: sparrow_physical::StepKind::Scan { - table_name: "table".to_owned(), + kind: sparrow_physical::StepKind::Read { + source_id: table_id, }, inputs: vec![], - schema: input_schema, + result_type: input_type, + exprs: sparrow_physical::Exprs::new(), }; let project = sparrow_physical::Step { id: 1.into(), - kind: sparrow_physical::StepKind::Project { - exprs: sparrow_physical::Exprs { - exprs: index_vec![ - sparrow_physical::Expr { - name: "column".into(), - literal_args: vec![ScalarValue::Utf8(Some("a".to_owned()))], - args: vec![], - result_type: DataType::Int64 - }, - sparrow_physical::Expr { - name: "column".into(), - literal_args: vec![ScalarValue::Utf8(Some("b".to_owned()))], - args: vec![], - result_type: DataType::Int64 - }, - sparrow_physical::Expr { - name: "add".into(), - literal_args: vec![], - args: vec![0.into(), 1.into()], - result_type: DataType::Int64 - }, - sparrow_physical::Expr { - name: "column".into(), - literal_args: vec![ScalarValue::Utf8(Some("c".to_owned()))], - args: vec![], - result_type: DataType::Int64 - }, - sparrow_physical::Expr { - name: "add".into(), - literal_args: vec![], - args: vec![2.into(), 3.into()], - result_type: DataType::Int64 - }, - ], - outputs: vec![2.into(), 4.into()], - }, - }, + kind: sparrow_physical::StepKind::Project, inputs: vec![0.into()], - schema: output_schema, + result_type: output_type.clone(), + exprs: index_vec![ + sparrow_physical::Expr { + name: "column".into(), + literal_args: vec![ScalarValue::Utf8(Some("a".to_owned()))], + args: vec![], + result_type: DataType::Int64 + }, + sparrow_physical::Expr { + name: "column".into(), + literal_args: vec![ScalarValue::Utf8(Some("b".to_owned()))], + args: vec![], + result_type: DataType::Int64 + }, + sparrow_physical::Expr { + name: "add".into(), + literal_args: vec![], + args: vec![0.into(), 1.into()], + result_type: DataType::Int64 + }, + sparrow_physical::Expr { + name: "column".into(), + literal_args: vec![ScalarValue::Utf8(Some("c".to_owned()))], + args: vec![], + result_type: DataType::Int64 + }, + sparrow_physical::Expr { + name: "add".into(), + literal_args: vec![], + args: vec![2.into(), 3.into()], + result_type: DataType::Int64 + }, + sparrow_physical::Expr { + name: "record".into(), + literal_args: vec![], + args: vec![2.into(), 4.into()], + result_type: output_type + } + ], }; let sink_pipeline = worker_pool.add_pipeline(1, WriteChannelPipeline::new(output)); diff --git a/crates/sparrow-expressions/src/evaluators.rs b/crates/sparrow-expressions/src/evaluators.rs index b8c56920f..8972e499b 100644 --- a/crates/sparrow-expressions/src/evaluators.rs +++ b/crates/sparrow-expressions/src/evaluators.rs @@ -1,7 +1,7 @@ use std::borrow::Cow; use arrow_array::types::ArrowPrimitiveType; -use arrow_schema::{DataType, Schema}; +use arrow_schema::DataType; use hashbrown::HashMap; use index_vec::IndexVec; use itertools::Itertools; @@ -14,10 +14,10 @@ use crate::Error; mod cast; mod coalesce; -mod column; mod comparison; mod field_ref; mod hash; +mod input; mod is_valid; mod json_field; mod literal; @@ -50,11 +50,20 @@ struct EvaluatorFactory { inventory::collect!(EvaluatorFactory); +/// Static information available when creating an evaluator. pub struct StaticInfo<'a> { - input_schema: &'a Schema, + /// Name of the instruction to be evaluated. name: &'a Cow<'static, str>, + /// Literal (static) arguments to *this* expression. literal_args: &'a [ScalarValue], + /// Arguments (dynamic) to *this* expression. args: Vec<&'a StaticArg<'a>>, + /// Result type this expression should produce. + /// + /// For many instructions, this should be inferred from the arguments. + /// It is part of the plan (a) for simplicity, so a plan may be executed + /// without performing type-checking and (b) because some instructions + /// need to know the result-type in order to execute (eg., cast). result_type: &'a DataType, } @@ -150,7 +159,6 @@ impl<'a> StaticInfo<'a> { /// Create the evaluators for the given expressions. pub(super) fn create_evaluators( - input_schema: &Schema, exprs: &[Expr], ) -> error_stack::Result>, Error> { // Static information (index in expressions, type, etc.) for each expression in `exprs`. @@ -162,7 +170,6 @@ pub(super) fn create_evaluators( for (index, expr) in exprs.iter().enumerate() { let args = expr.args.iter().map(|index| &expressions[*index]).collect(); let info = StaticInfo { - input_schema, name: &expr.name, literal_args: &expr.literal_args, args, @@ -205,6 +212,13 @@ fn create_evaluator(info: StaticInfo<'_>) -> error_stack::Result Option<&'static str> { + EVALUATORS.get_key_value(name).map(|(k, _)| *k) +} + #[cfg(test)] mod tests { #[test] diff --git a/crates/sparrow-expressions/src/evaluators/column.rs b/crates/sparrow-expressions/src/evaluators/column.rs deleted file mode 100644 index e68ce27a8..000000000 --- a/crates/sparrow-expressions/src/evaluators/column.rs +++ /dev/null @@ -1,38 +0,0 @@ -use arrow_array::ArrayRef; - -use crate::evaluator::Evaluator; -use crate::work_area::WorkArea; -use crate::Error; - -inventory::submit!(crate::evaluators::EvaluatorFactory { - name: "column", - create: &create, -}); - -/// Evaluator for column reference (`.c`).. -struct ColumnEvaluator { - column: usize, -} - -impl Evaluator for ColumnEvaluator { - fn evaluate(&self, info: &WorkArea<'_>) -> error_stack::Result { - Ok(info.input_column(self.column).clone()) - } -} - -fn create(info: super::StaticInfo<'_>) -> error_stack::Result, Error> { - let name = info.literal_string()?; - let (column, field) = info - .input_schema - .column_with_name(name) - .expect("missing column"); - error_stack::ensure!( - field.data_type() == info.result_type, - Error::InvalidResultType { - expected: field.data_type().clone(), - actual: info.result_type.clone() - } - ); - - Ok(Box::new(ColumnEvaluator { column })) -} diff --git a/crates/sparrow-expressions/src/evaluators/input.rs b/crates/sparrow-expressions/src/evaluators/input.rs new file mode 100644 index 000000000..f9a42606f --- /dev/null +++ b/crates/sparrow-expressions/src/evaluators/input.rs @@ -0,0 +1,23 @@ +use arrow_array::ArrayRef; + +use crate::evaluator::Evaluator; +use crate::work_area::WorkArea; +use crate::Error; + +inventory::submit!(crate::evaluators::EvaluatorFactory { + name: "input", + create: &create, +}); + +/// Evaluator for referencing an input column. +struct InputEvaluator; + +impl Evaluator for InputEvaluator { + fn evaluate(&self, info: &WorkArea<'_>) -> error_stack::Result { + Ok(info.input_column().clone()) + } +} + +fn create(_info: super::StaticInfo<'_>) -> error_stack::Result, Error> { + Ok(Box::new(InputEvaluator)) +} diff --git a/crates/sparrow-expressions/src/executor.rs b/crates/sparrow-expressions/src/executor.rs index b52b536a1..da323c312 100644 --- a/crates/sparrow-expressions/src/executor.rs +++ b/crates/sparrow-expressions/src/executor.rs @@ -1,5 +1,5 @@ use arrow_array::ArrayRef; -use arrow_schema::Schema; +use arrow_schema::DataType; use sparrow_arrow::Batch; use crate::evaluator::Evaluator; @@ -14,27 +14,37 @@ use crate::Error; /// against existing columns. pub struct ExpressionExecutor { evaluators: Vec>, + output_type: DataType, } impl ExpressionExecutor { /// Create an `ExpressionExecutor` for the given expressions. - pub fn try_new( - input_schema: &Schema, - exprs: &[sparrow_physical::Expr], - ) -> error_stack::Result { - let evaluators = evaluators::create_evaluators(input_schema, exprs)?; - Ok(Self { evaluators }) + pub fn try_new(exprs: &[sparrow_physical::Expr]) -> error_stack::Result { + let evaluators = evaluators::create_evaluators(exprs)?; + let output_type = exprs + .last() + .expect("at least one expression") + .result_type + .clone(); + Ok(Self { + evaluators, + output_type, + }) } /// Execute the expressions on the given input batch. /// /// The result is a vector containing the results of each expression. - pub fn execute(&self, input: &Batch) -> error_stack::Result, Error> { + pub fn execute(&self, input: &Batch) -> error_stack::Result { let mut work_area = WorkArea::with_capacity(input, self.evaluators.len()); for evaluator in self.evaluators.iter() { let output = evaluator.evaluate(&work_area)?; work_area.expressions.push(output); } - Ok(work_area.expressions) + Ok(work_area.expressions.pop().unwrap()) + } + + pub fn output_type(&self) -> &DataType { + &self.output_type } } diff --git a/crates/sparrow-expressions/src/lib.rs b/crates/sparrow-expressions/src/lib.rs index 83ad92e49..e54cf1de0 100644 --- a/crates/sparrow-expressions/src/lib.rs +++ b/crates/sparrow-expressions/src/lib.rs @@ -93,4 +93,5 @@ mod values; mod work_area; pub use error::*; +pub use evaluators::intern_name; pub use executor::*; diff --git a/crates/sparrow-expressions/src/work_area.rs b/crates/sparrow-expressions/src/work_area.rs index 444b614cb..366abc8c6 100644 --- a/crates/sparrow-expressions/src/work_area.rs +++ b/crates/sparrow-expressions/src/work_area.rs @@ -24,8 +24,8 @@ impl<'a> WorkArea<'a> { } /// Return the [ArrayRef] for the given input index. - pub fn input_column(&self, index: usize) -> &ArrayRef { - self.input.record_batch().expect("non empty").column(index) + pub fn input_column(&self) -> &ArrayRef { + self.input.data().expect("non empty") } /// Return the [Value] for the given expression index. diff --git a/crates/sparrow-logical/Cargo.toml b/crates/sparrow-logical/Cargo.toml index 60ee6f093..0d91df90d 100644 --- a/crates/sparrow-logical/Cargo.toml +++ b/crates/sparrow-logical/Cargo.toml @@ -11,7 +11,9 @@ Logical representation of Kaskada queries. [dependencies] arrow-schema.workspace = true +decorum.workspace = true derive_more.workspace = true +enum-as-inner.workspace = true error-stack.workspace = true hashbrown.workspace = true itertools.workspace = true diff --git a/crates/sparrow-logical/src/expr.rs b/crates/sparrow-logical/src/expr.rs index 4a688b072..0006d1651 100644 --- a/crates/sparrow-logical/src/expr.rs +++ b/crates/sparrow-logical/src/expr.rs @@ -20,14 +20,16 @@ pub struct Expr { pub grouping: Grouping, } -#[derive(Debug)] +#[derive(Debug, Hash, PartialEq, Eq, Ord, PartialOrd, Clone, enum_as_inner::EnumAsInner)] pub enum Literal { Null, Bool(bool), String(String), Int64(i64), UInt64(u64), - Float64(f64), + // Decorum is needed to provide a total ordering on `f64` so we can + // derive `Ord` and `PartialOrd`. + Float64(decorum::Total), Timedelta { seconds: i64, nanos: i64 }, Uuid(Uuid), } @@ -101,12 +103,16 @@ impl Expr { } } + pub fn new_literal_str(str: impl Into) -> Self { + Self::new_literal(Literal::String(str.into())) + } + /// Create a new cast expression to the given type. pub fn cast(self: Arc, data_type: DataType) -> error_stack::Result, Error> { if self.result_type == data_type { Ok(self) } else { - let grouping = self.grouping.clone(); + let grouping = self.grouping; Ok(Arc::new(Expr { name: Cow::Borrowed("cast"), literal_args: vec![], @@ -214,7 +220,7 @@ mod tests { "add".into(), vec![ a_i32.clone(), - Arc::new(Expr::new_literal(Literal::Float64(1.0))), + Arc::new(Expr::new_literal(Literal::Float64(1.0.into()))), ], ) .unwrap(); diff --git a/crates/sparrow-logical/src/grouping.rs b/crates/sparrow-logical/src/grouping.rs index d8a8f9ec5..fe0dc6a25 100644 --- a/crates/sparrow-logical/src/grouping.rs +++ b/crates/sparrow-logical/src/grouping.rs @@ -8,13 +8,17 @@ use crate::{Error, ExprRef}; pub struct GroupId(u32); /// The grouping associated with an expression. -#[derive(Debug, PartialEq, Eq, Hash, Clone)] +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] pub enum Grouping { Literal, Group(GroupId), } impl Grouping { + pub fn new(group: u32) -> Grouping { + Grouping::Group(GroupId(group)) + } + pub fn from_args(args: &[ExprRef]) -> error_stack::Result { let groupings = args .iter() diff --git a/crates/sparrow-logical/src/snapshots/sparrow_logical__expr__tests__function.snap b/crates/sparrow-logical/src/snapshots/sparrow_logical__expr__tests__function.snap index b32019de4..105f8a3b3 100644 --- a/crates/sparrow-logical/src/snapshots/sparrow_logical__expr__tests__function.snap +++ b/crates/sparrow-logical/src/snapshots/sparrow_logical__expr__tests__function.snap @@ -67,7 +67,9 @@ Expr { name: "literal", literal_args: [ Float64( - 1.0, + Total( + 1.0, + ), ), ], args: [], diff --git a/crates/sparrow-physical/Cargo.toml b/crates/sparrow-physical/Cargo.toml index 6a0ee8717..c31a5aed5 100644 --- a/crates/sparrow-physical/Cargo.toml +++ b/crates/sparrow-physical/Cargo.toml @@ -17,6 +17,7 @@ index_vec.workspace = true serde.workspace = true sparrow-arrow = { path = "../sparrow-arrow" } strum_macros.workspace = true +uuid.workspace = true [dev-dependencies] serde_yaml.workspace = true diff --git a/crates/sparrow-physical/src/expr.rs b/crates/sparrow-physical/src/expr.rs index 2907fcc32..76ba15058 100644 --- a/crates/sparrow-physical/src/expr.rs +++ b/crates/sparrow-physical/src/expr.rs @@ -11,48 +11,7 @@ index_vec::define_index_type! { DISPLAY_FORMAT = "{}"; } -/// Represents 1 or more values computed by expressions. -/// -/// Expressions are evaluated by producing a sequence of columns -/// and then selecting specific columns from those computed. -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct Exprs { - /// The expressions computing the intermediate values. - pub exprs: IndexVec, - /// The indices of columns to output. - pub outputs: Vec, -} - -impl Exprs { - pub fn empty() -> Self { - Self { - exprs: IndexVec::default(), - outputs: vec![], - } - } - - /// Create expressions computing the value of the last expression. - pub fn singleton(exprs: Vec) -> Self { - let output = exprs.len() - 1; - Self { - exprs: exprs.into(), - outputs: vec![output.into()], - } - } - - pub fn is_empty(&self) -> bool { - self.outputs.is_empty() - } - - pub fn is_singleton(&self) -> bool { - self.outputs.len() == 1 - } - - /// Return the number of outputs produced by these expressions. - pub fn output_len(&self) -> usize { - self.outputs.len() - } -} +pub type Exprs = IndexVec; /// A physical expression which describes how a value should be computed. /// @@ -66,7 +25,7 @@ pub struct Expr { /// /// Similar to an opcode or function. /// - /// Generally, interning owned strings to the specific owned static strings is preferred. + /// Generally, interning owned strings to the specific static strings is preferred. pub name: Cow<'static, str>, /// Zero or more literal-valued arguments. pub literal_args: Vec, diff --git a/crates/sparrow-physical/src/plan.rs b/crates/sparrow-physical/src/plan.rs index 2b7d1dc89..1197a3118 100644 --- a/crates/sparrow-physical/src/plan.rs +++ b/crates/sparrow-physical/src/plan.rs @@ -7,7 +7,7 @@ use crate::{Step, StepId}; /// The plan is represented as an array of steps, with each step referencing /// it's children (inputs) by index. The array is topologically sorted so that /// every step appears after the inputs to that step. -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(Debug, serde::Serialize, serde::Deserialize, Default)] pub struct Plan { /// The steps in the plan. pub steps: IndexVec, diff --git a/crates/sparrow-physical/src/step.rs b/crates/sparrow-physical/src/step.rs index 08b77921b..435b1819f 100644 --- a/crates/sparrow-physical/src/step.rs +++ b/crates/sparrow-physical/src/step.rs @@ -1,6 +1,7 @@ -use arrow_schema::SchemaRef; +use arrow_schema::DataType; +use index_vec::IndexVec; -use crate::Exprs; +use crate::{Expr, ExprId}; index_vec::define_index_type! { /// The identifier (index) of a step. @@ -32,18 +33,37 @@ pub struct Step { pub kind: StepKind, /// Inputs to this step. pub inputs: Vec, - /// The schema for this step. - pub schema: SchemaRef, + /// The data type produced by this step. + pub result_type: DataType, + /// Expressions used in the step, if any. + /// + /// The final expression in the vector is considered the "result" of executing + /// the expressions. + /// + /// When expressions are executed, what inputs to the expressions are and + /// how the output is used depends on the StepKinds. See specific StepKinds + /// for whether expressions are allowed, how they are interpreted, and any + /// other restrictions. + pub exprs: IndexVec, } /// The kinds of steps that can occur in the physical plan. -#[derive(Debug, serde::Serialize, serde::Deserialize, strum_macros::IntoStaticStr)] +#[derive( + Clone, + Debug, + serde::Serialize, + serde::Deserialize, + strum_macros::IntoStaticStr, + PartialEq, + Eq, + Hash, +)] #[serde(rename_all = "snake_case")] pub enum StepKind { - /// Scan the given table. - Scan { - table_name: String, + /// Read the given source. + Read { + source_id: uuid::Uuid, }, /// Merge the given relations. Merge, @@ -51,26 +71,21 @@ pub enum StepKind { /// /// The output includes the same rows as the input, but with columns /// projected as configured. - Project { - /// Expressions to compute the projection. - /// - /// The length of the outputs should be the same as the fields in the schema. - exprs: Exprs, - }, + /// + /// Expressions in the step are used to compute the projected columns. The number + /// of expressions output should be the same as the fields in the step schema. + Project, /// Filter the results based on a boolean predicate. - Filter { - /// Expressions to apply to compute the predicate. - /// - /// There should be a single output producing a boolean value. - exprs: Exprs, - }, + /// + /// Expressions in the step are used to compute the predicate. There should be a + /// single output producing a boolean value. + Filter, /// A step that repartitions the output. + /// + /// Expressions in the step are used to compute the partition keys. Each output + /// corresponds to a part of the key. Repartition { num_partitions: usize, - /// Expressions to compute the keys. - /// - /// Each output corresponds to a part of the key. - keys: Exprs, }, Error, } diff --git a/crates/sparrow-transforms/src/project.rs b/crates/sparrow-transforms/src/project.rs index 011ce84e4..0fe68f461 100644 --- a/crates/sparrow-transforms/src/project.rs +++ b/crates/sparrow-transforms/src/project.rs @@ -1,6 +1,5 @@ -use arrow_array::RecordBatch; -use arrow_schema::SchemaRef; -use error_stack::{IntoReport, ResultExt}; +use arrow_schema::DataType; +use error_stack::ResultExt; use sparrow_arrow::Batch; use sparrow_expressions::ExpressionExecutor; @@ -11,23 +10,21 @@ use crate::transform::{Error, Transform}; /// Transform for projection. pub struct Project { evaluators: ExpressionExecutor, - outputs: Vec, - schema: SchemaRef, } impl Project { - pub fn try_new( - input_schema: &SchemaRef, - exprs: &Exprs, - schema: SchemaRef, - ) -> error_stack::Result { - let evaluators = ExpressionExecutor::try_new(input_schema.as_ref(), exprs.exprs.as_vec()) + pub fn try_new(exprs: &Exprs, output_type: &DataType) -> error_stack::Result { + let evaluators = ExpressionExecutor::try_new(exprs.as_vec()) .change_context_lazy(|| Error::CreateTransform("project"))?; - Ok(Self { - evaluators, - outputs: exprs.outputs.iter().map(|n| (*n).into()).collect(), - schema, - }) + error_stack::ensure!( + output_type == evaluators.output_type(), + Error::MismatchedResultType { + transform: "project", + expected: output_type.clone(), + actual: evaluators.output_type().clone() + } + ); + Ok(Self { evaluators }) } } @@ -36,16 +33,7 @@ impl Transform for Project { assert!(!batch.is_empty()); let error = || Error::ExecuteTransform("project"); - let columns = self.evaluators.execute(&batch).change_context_lazy(error)?; - let columns = self - .outputs - .iter() - .map(|index| columns[*index].clone()) - .collect(); - - let result = RecordBatch::try_new(self.schema.clone(), columns) - .into_report() - .change_context_lazy(error)?; + let result = self.evaluators.execute(&batch).change_context_lazy(error)?; Ok(batch.with_projection(result)) } diff --git a/crates/sparrow-transforms/src/transform.rs b/crates/sparrow-transforms/src/transform.rs index 7db2a6a68..cd73d15b4 100644 --- a/crates/sparrow-transforms/src/transform.rs +++ b/crates/sparrow-transforms/src/transform.rs @@ -1,3 +1,4 @@ +use arrow_schema::DataType; use sparrow_arrow::Batch; #[derive(derive_more::Display, Debug)] @@ -6,6 +7,14 @@ pub enum Error { CreateTransform(&'static str), #[display(fmt = "failed to execute {_0} transform")] ExecuteTransform(&'static str), + #[display( + fmt = "unexpected output type for {transform} transform: expected {expected:?} but got {actual:?}" + )] + MismatchedResultType { + transform: &'static str, + expected: DataType, + actual: DataType, + }, } impl error_stack::Context for Error {} diff --git a/crates/sparrow-transforms/src/transform_pipeline.rs b/crates/sparrow-transforms/src/transform_pipeline.rs index 7f4be085c..2f6aff3ef 100644 --- a/crates/sparrow-transforms/src/transform_pipeline.rs +++ b/crates/sparrow-transforms/src/transform_pipeline.rs @@ -106,15 +106,11 @@ impl TransformPipeline { ); let transform: Box = match &step.kind { - StepKind::Project { exprs } => Box::new( - crate::project::Project::try_new( - &input_step.schema, - exprs, - step.schema.clone(), - ) - .change_context_lazy(|| Error::CreatingTransform { - kind: (&step.kind).into(), - })?, + StepKind::Project => Box::new( + crate::project::Project::try_new(&step.exprs, &step.result_type) + .change_context_lazy(|| Error::CreatingTransform { + kind: (&step.kind).into(), + })?, ), unsupported => { error_stack::bail!(Error::UnsupportedStepKind { diff --git a/python/Cargo.lock b/python/Cargo.lock index 2c7adcfc0..a7a752d0c 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -4690,6 +4690,7 @@ checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" dependencies = [ "getrandom 0.2.10", "rand 0.8.5", + "serde", ] [[package]]