From db21e3963db3a54dae77342ca51246871eca98c4 Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Fri, 28 Jul 2023 10:17:12 -0700 Subject: [PATCH 01/13] add skeleton evaluator --- crates/sparrow-catalog/catalog/len copy.toml | 16 +++++ crates/sparrow-compiler/src/ast_to_dfg.rs | 69 +++++++++++++------ .../src/functions/collection.rs | 5 ++ .../src/functions/function.rs | 4 ++ crates/sparrow-instructions/src/evaluators.rs | 4 ++ .../src/evaluators/collect.rs | 60 ++++++++++++++++ crates/sparrow-main/tests/e2e/list_tests.rs | 9 ++- crates/sparrow-plan/src/inst.rs | 5 ++ 8 files changed, 150 insertions(+), 22 deletions(-) create mode 100644 crates/sparrow-catalog/catalog/len copy.toml create mode 100644 crates/sparrow-instructions/src/evaluators/collect.rs diff --git a/crates/sparrow-catalog/catalog/len copy.toml b/crates/sparrow-catalog/catalog/len copy.toml new file mode 100644 index 000000000..991b18a0d --- /dev/null +++ b/crates/sparrow-catalog/catalog/len copy.toml @@ -0,0 +1,16 @@ +name = "get" +signature = "get(key: K, map: map) -> V" +short_doc = "..." +long_doc = """ + +### Parameters + ... + +### Results +... +""" + +[[examples]] +name = "..." +expression = "..." +input_csv = "..." \ No newline at end of file diff --git a/crates/sparrow-compiler/src/ast_to_dfg.rs b/crates/sparrow-compiler/src/ast_to_dfg.rs index a615cbdbd..ded2b4f15 100644 --- a/crates/sparrow-compiler/src/ast_to_dfg.rs +++ b/crates/sparrow-compiler/src/ast_to_dfg.rs @@ -522,31 +522,28 @@ pub fn add_to_dfg( dfg.bind("$condition_input", args[0].inner().clone()); let window = &expr.unwrap().args()[1]; - let (condition, duration) = match window.op() { - ExprOp::Call(window_name) => { - flatten_window_args(window_name, window, dfg, data_context, diagnostics)? - } - ExprOp::Literal(v) if v.inner() == &LiteralValue::Null => { - // Unwindowed aggregations just use nulls - let null_arg = dfg.add_literal(LiteralValue::Null.to_scalar()?)?; - let null_arg = Located::new( - add_literal( - dfg, - null_arg, - FenlType::Concrete(DataType::Null), - window.location().clone(), - )?, - window.location().clone(), - ); - - (null_arg.clone(), null_arg) - } - unexpected => anyhow::bail!("expected window, found {:?}", unexpected), - }; + let (condition, duration) = + flatten_window_args_if_needed(window, dfg, data_context, diagnostics)?; dfg.exit_env(); // [agg_input, condition, duration] vec![args[0].clone(), condition, duration] + } else if function.name() == "collect" { + // The collect function contains a window, but does not follow the same signature + // pattern as aggregations, so it requires a + // + // TODO: Flattening the window arguments is hacky and confusing. We should instead + // incorporate the tick directly into the function containing the window. + dfg.enter_env(); + dfg.bind("$condition_input", args[1].inner().clone()); + + let window = &expr.unwrap().args()[2]; + let (condition, duration) = + flatten_window_args_if_needed(window, dfg, data_context, diagnostics)?; + + dfg.exit_env(); + // [max, input, condition, duration] + vec![args[0].clone(), args[1].clone(), condition, duration] } else if function.name() == "when" || function.name() == "if" { dfg.enter_env(); dfg.bind("$condition_input", args[1].inner().clone()); @@ -605,6 +602,36 @@ pub fn add_to_dfg( } } +fn flatten_window_args_if_needed( + window: &Located>, + dfg: &mut Dfg, + data_context: &mut DataContext, + diagnostics: &mut DiagnosticCollector<'_>, +) -> anyhow::Result<(Located>, Located>)> { + let (condition, duration) = match window.op() { + ExprOp::Call(window_name) => { + flatten_window_args(window_name, window, dfg, data_context, diagnostics)? + } + ExprOp::Literal(v) if v.inner() == &LiteralValue::Null => { + // Unwindowed aggregations just use nulls + let null_arg = dfg.add_literal(LiteralValue::Null.to_scalar()?)?; + let null_arg = Located::new( + add_literal( + dfg, + null_arg, + FenlType::Concrete(DataType::Null), + window.location().clone(), + )?, + window.location().clone(), + ); + + (null_arg.clone(), null_arg) + } + unexpected => anyhow::bail!("expected window, found {:?}", unexpected), + }; + Ok((condition, duration)) +} + // Verify that the arguments are compatibly partitioned. fn verify_same_partitioning( data_context: &DataContext, diff --git a/crates/sparrow-compiler/src/functions/collection.rs b/crates/sparrow-compiler/src/functions/collection.rs index 78f003c09..fdf1bec7a 100644 --- a/crates/sparrow-compiler/src/functions/collection.rs +++ b/crates/sparrow-compiler/src/functions/collection.rs @@ -12,4 +12,9 @@ pub(super) fn register(registry: &mut Registry) { .register("index(i: i64, list: list) -> T") .with_implementation(Implementation::Instruction(InstOp::Index)) .set_internal(); + + registry + .register("collect(max: i64, input: T, window: window = null) -> list") + .with_implementation(Implementation::Instruction(InstOp::Collect)) + .set_internal(); } diff --git a/crates/sparrow-compiler/src/functions/function.rs b/crates/sparrow-compiler/src/functions/function.rs index ada567430..a652a3c90 100644 --- a/crates/sparrow-compiler/src/functions/function.rs +++ b/crates/sparrow-compiler/src/functions/function.rs @@ -98,6 +98,10 @@ impl Function { ) } + pub fn contains_window(&self) -> bool { + self.name() == "collect" || self.is_aggregation() + } + pub fn is_tick(&self) -> bool { matches!(self.implementation, Implementation::Tick(_)) } diff --git a/crates/sparrow-instructions/src/evaluators.rs b/crates/sparrow-instructions/src/evaluators.rs index 770e923cf..9f92a7745 100644 --- a/crates/sparrow-instructions/src/evaluators.rs +++ b/crates/sparrow-instructions/src/evaluators.rs @@ -11,6 +11,7 @@ use crate::{ColumnarValue, ComputeStore, GroupingIndices}; pub mod aggregation; mod cast; +mod collect; mod comparison; mod equality; mod field_ref; @@ -41,6 +42,8 @@ use sparrow_plan::ValueRef; use string::*; use time::*; +use self::collect::CollectEvaluator; + /// Represents static information for an evaluator. #[derive(Debug)] pub struct StaticInfo<'a> { @@ -179,6 +182,7 @@ fn create_simple_evaluator( create_number_evaluator!(&info.args[0].data_type, ClampEvaluator, info) } InstOp::Coalesce => CoalesceEvaluator::try_new(info), + InstOp::Collect => CollectEvaluator::try_new(info), InstOp::CountIf => CountIfEvaluator::try_new(info), InstOp::DayOfMonth => DayOfMonthEvaluator::try_new(info), InstOp::DayOfMonth0 => DayOfMonth0Evaluator::try_new(info), diff --git a/crates/sparrow-instructions/src/evaluators/collect.rs b/crates/sparrow-instructions/src/evaluators/collect.rs new file mode 100644 index 000000000..e8d8ace9c --- /dev/null +++ b/crates/sparrow-instructions/src/evaluators/collect.rs @@ -0,0 +1,60 @@ +//! The cast instruction isn't a "normal" instruction since it doesn't have a +//! a single, fixed signature. Specifically, the input and output types depend +//! on the input to the instruction and the requested output type. + +use std::sync::Arc; + +use anyhow::anyhow; +use arrow::array::{ArrayRef, Int32Array, IntervalDayTimeArray, IntervalYearMonthArray}; +use arrow::datatypes::DataType; +use sparrow_arrow::downcast::downcast_primitive_array; +use sparrow_kernels::time::i64_to_two_i32; +use sparrow_plan::ValueRef; +use sparrow_syntax::FenlType; + +use crate::{Evaluator, EvaluatorFactory, RuntimeInfo, StaticInfo}; + +/// Evaluator for the `collect` instruction. +/// +/// Collect collects a stream of values into a List. A list is produced +/// for each input value received, growing up to a maximum size. +#[derive(Debug)] +pub struct CollectEvaluator { + max: ValueRef, + input: ValueRef, + tick: ValueRef, + duration: ValueRef, +} + +impl EvaluatorFactory for CollectEvaluator { + fn try_new(info: StaticInfo<'_>) -> anyhow::Result> { + let input_type = info.args[1].data_type(); + let result_type = info.result_type; + match result_type { + DataType::List(t) => anyhow::ensure!(t.data_type() == input_type), + other => anyhow::bail!("expected list result type, saw {:?}", other), + }; + + let (max, input, tick, duration) = info.unpack_arguments()?; + Ok(Box::new(Self { + max, + input, + tick, + duration, + })) + } +} + +impl CollectEvaluator { + fn execute(input: &ArrayRef) -> anyhow::Result { + todo!() + } +} + +impl Evaluator for CollectEvaluator { + fn evaluate(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { + let input = info.value(&self.input)?.array_ref()?; + let result = CollectEvaluator::execute(&input)?; + Ok(result) + } +} diff --git a/crates/sparrow-main/tests/e2e/list_tests.rs b/crates/sparrow-main/tests/e2e/list_tests.rs index 13d0921e4..e6c8c5d90 100644 --- a/crates/sparrow-main/tests/e2e/list_tests.rs +++ b/crates/sparrow-main/tests/e2e/list_tests.rs @@ -3,7 +3,7 @@ use sparrow_api::kaskada::v1alpha::TableConfig; use uuid::Uuid; -use crate::{fixture::DataFixture, QueryFixture}; +use crate::{fixture::DataFixture, fixtures::i64_data_fixture, QueryFixture}; /// Create a simple table with a collection type (map). pub(crate) async fn list_data_fixture() -> DataFixture { @@ -95,6 +95,13 @@ async fn test_index_list_bool_dynamic() { "###); } +#[tokio::test] +async fn test_collect_to_list_i64() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Numbers.m | collect(10) | index(0) }").with_dump_dot("asdf").run_to_csv(&i64_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + "###); +} + #[tokio::test] async fn test_incorrect_index_type() { insta::assert_yaml_snapshot!(QueryFixture::new("{ f1: Input.i64_list | index(\"s\") }") diff --git a/crates/sparrow-plan/src/inst.rs b/crates/sparrow-plan/src/inst.rs index 5fcd8ad53..6d370a78c 100644 --- a/crates/sparrow-plan/src/inst.rs +++ b/crates/sparrow-plan/src/inst.rs @@ -60,6 +60,11 @@ pub enum InstOp { Clamp, #[strum(props(signature = "coalesce(values+: T) -> T"))] Coalesce, + #[strum(props( + dfg_signature = "collect(max: i64, input: T, window: window = null) -> list", + plan_signature = "collect(max: i64, input: T, ticks: bool = null, slide_duration: i64 = null) -> list" + ))] + Collect, #[strum(props( dfg_signature = "count_if(input: T, window: window = null) -> u32", plan_signature = "count_if(input: T, ticks: bool = null, slide_duration: i64 = null) -> \ From d950c4f50d416de597ed910cf736bbf637fd9f0f Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Sat, 29 Jul 2023 14:05:23 -0700 Subject: [PATCH 02/13] add primitive collect evaluator --- .../src/functions/collection.rs | 1 + crates/sparrow-instructions/src/evaluators.rs | 14 +- .../src/evaluators/collect.rs | 60 -------- .../src/evaluators/list.rs | 9 ++ .../src/evaluators/list/collect_boolean.rs | 99 ++++++++++++++ .../src/evaluators/list/collect_map.rs | 81 +++++++++++ .../src/evaluators/list/collect_primitive.rs | 128 ++++++++++++++++++ .../src/evaluators/list/collect_string.rs | 99 ++++++++++++++ .../evaluators/list/collect_string_generic.rs | 113 ++++++++++++++++ .../src/evaluators/list/index.rs | 2 + .../sparrow-main/tests/e2e/collect_tests.rs | 71 ++++++++++ crates/sparrow-main/tests/e2e/list_tests.rs | 38 ++---- crates/sparrow-main/tests/e2e/main.rs | 1 + 13 files changed, 622 insertions(+), 94 deletions(-) delete mode 100644 crates/sparrow-instructions/src/evaluators/collect.rs create mode 100644 crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs create mode 100644 crates/sparrow-instructions/src/evaluators/list/collect_map.rs create mode 100644 crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs create mode 100644 crates/sparrow-instructions/src/evaluators/list/collect_string.rs create mode 100644 crates/sparrow-instructions/src/evaluators/list/collect_string_generic.rs create mode 100644 crates/sparrow-main/tests/e2e/collect_tests.rs diff --git a/crates/sparrow-compiler/src/functions/collection.rs b/crates/sparrow-compiler/src/functions/collection.rs index fdf1bec7a..8f4cd3a5a 100644 --- a/crates/sparrow-compiler/src/functions/collection.rs +++ b/crates/sparrow-compiler/src/functions/collection.rs @@ -13,6 +13,7 @@ pub(super) fn register(registry: &mut Registry) { .with_implementation(Implementation::Instruction(InstOp::Index)) .set_internal(); + // TODO: Make MAX default to something? registry .register("collect(max: i64, input: T, window: window = null) -> list") .with_implementation(Implementation::Instruction(InstOp::Collect)) diff --git a/crates/sparrow-instructions/src/evaluators.rs b/crates/sparrow-instructions/src/evaluators.rs index 9f92a7745..5341ef6ee 100644 --- a/crates/sparrow-instructions/src/evaluators.rs +++ b/crates/sparrow-instructions/src/evaluators.rs @@ -11,7 +11,6 @@ use crate::{ColumnarValue, ComputeStore, GroupingIndices}; pub mod aggregation; mod cast; -mod collect; mod comparison; mod equality; mod field_ref; @@ -42,8 +41,6 @@ use sparrow_plan::ValueRef; use string::*; use time::*; -use self::collect::CollectEvaluator; - /// Represents static information for an evaluator. #[derive(Debug)] pub struct StaticInfo<'a> { @@ -182,7 +179,16 @@ fn create_simple_evaluator( create_number_evaluator!(&info.args[0].data_type, ClampEvaluator, info) } InstOp::Coalesce => CoalesceEvaluator::try_new(info), - InstOp::Collect => CollectEvaluator::try_new(info), + InstOp::Collect => { + create_typed_evaluator!( + &info.args[1].data_type, + CollectPrimitiveEvaluator, + CollectMapEvaluator, + CollectBooleanEvaluator, + CollectStringEvaluator, + info + ) + } InstOp::CountIf => CountIfEvaluator::try_new(info), InstOp::DayOfMonth => DayOfMonthEvaluator::try_new(info), InstOp::DayOfMonth0 => DayOfMonth0Evaluator::try_new(info), diff --git a/crates/sparrow-instructions/src/evaluators/collect.rs b/crates/sparrow-instructions/src/evaluators/collect.rs deleted file mode 100644 index e8d8ace9c..000000000 --- a/crates/sparrow-instructions/src/evaluators/collect.rs +++ /dev/null @@ -1,60 +0,0 @@ -//! The cast instruction isn't a "normal" instruction since it doesn't have a -//! a single, fixed signature. Specifically, the input and output types depend -//! on the input to the instruction and the requested output type. - -use std::sync::Arc; - -use anyhow::anyhow; -use arrow::array::{ArrayRef, Int32Array, IntervalDayTimeArray, IntervalYearMonthArray}; -use arrow::datatypes::DataType; -use sparrow_arrow::downcast::downcast_primitive_array; -use sparrow_kernels::time::i64_to_two_i32; -use sparrow_plan::ValueRef; -use sparrow_syntax::FenlType; - -use crate::{Evaluator, EvaluatorFactory, RuntimeInfo, StaticInfo}; - -/// Evaluator for the `collect` instruction. -/// -/// Collect collects a stream of values into a List. A list is produced -/// for each input value received, growing up to a maximum size. -#[derive(Debug)] -pub struct CollectEvaluator { - max: ValueRef, - input: ValueRef, - tick: ValueRef, - duration: ValueRef, -} - -impl EvaluatorFactory for CollectEvaluator { - fn try_new(info: StaticInfo<'_>) -> anyhow::Result> { - let input_type = info.args[1].data_type(); - let result_type = info.result_type; - match result_type { - DataType::List(t) => anyhow::ensure!(t.data_type() == input_type), - other => anyhow::bail!("expected list result type, saw {:?}", other), - }; - - let (max, input, tick, duration) = info.unpack_arguments()?; - Ok(Box::new(Self { - max, - input, - tick, - duration, - })) - } -} - -impl CollectEvaluator { - fn execute(input: &ArrayRef) -> anyhow::Result { - todo!() - } -} - -impl Evaluator for CollectEvaluator { - fn evaluate(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { - let input = info.value(&self.input)?.array_ref()?; - let result = CollectEvaluator::execute(&input)?; - Ok(result) - } -} diff --git a/crates/sparrow-instructions/src/evaluators/list.rs b/crates/sparrow-instructions/src/evaluators/list.rs index c1c37f4f9..971f1790b 100644 --- a/crates/sparrow-instructions/src/evaluators/list.rs +++ b/crates/sparrow-instructions/src/evaluators/list.rs @@ -1,2 +1,11 @@ +mod collect_boolean; +mod collect_map; +mod collect_primitive; +mod collect_string; mod index; + +pub(super) use collect_boolean::*; +pub(super) use collect_map::*; +pub(super) use collect_primitive::*; +pub(super) use collect_string::*; pub(super) use index::*; diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs b/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs new file mode 100644 index 000000000..8289b3491 --- /dev/null +++ b/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs @@ -0,0 +1,99 @@ +//! The cast instruction isn't a "normal" instruction since it doesn't have a +//! a single, fixed signature. Specifically, the input and output types depend +//! on the input to the instruction and the requested output type. + +use std::collections::VecDeque; +use std::sync::Arc; + +use anyhow::anyhow; +use arrow::array::{ + ArrayRef, AsArray, BooleanBuilder, Int32Array, IntervalDayTimeArray, IntervalYearMonthArray, + ListBuilder, PrimitiveBuilder, +}; +use arrow::datatypes::{ArrowPrimitiveType, DataType}; +use arrow::downcast_primitive_array; +use sparrow_arrow::downcast::{downcast_boolean_array, downcast_primitive_array}; +use sparrow_arrow::scalar_value::ScalarValue; +use sparrow_kernels::time::i64_to_two_i32; +use sparrow_plan::ValueRef; +use sparrow_syntax::FenlType; + +use crate::{Evaluator, EvaluatorFactory, RuntimeInfo, StaticInfo}; + +/// Evaluator for the `collect` instruction. +/// +/// Collect collects a stream of values into a List. A list is produced +/// for each input value received, growing up to a maximum size. +#[derive(Debug)] +pub struct CollectBooleanEvaluator { + /// The max size of the buffer. + /// + /// Once the max size is reached, the front will be popped and the new + /// value pushed to the back. + max: i64, + input: ValueRef, + tick: ValueRef, + duration: ValueRef, + buffer: VecDeque>, +} + +impl EvaluatorFactory for CollectBooleanEvaluator { + fn try_new(info: StaticInfo<'_>) -> anyhow::Result> { + let input_type = info.args[1].data_type(); + let result_type = info.result_type; + match result_type { + DataType::List(t) => anyhow::ensure!(t.data_type() == input_type), + other => anyhow::bail!("expected list result type, saw {:?}", other), + }; + + let max = match info.args[0].value_ref.literal_value() { + Some(ScalarValue::Int64(Some(v))) => *v, + Some(other) => anyhow::bail!("expected i64 for max parameter, saw {:?}", other), + None => anyhow::bail!("expected literal value for max parameter"), + }; + + let (_, input, tick, duration) = info.unpack_arguments()?; + Ok(Box::new(Self { + max, + input, + tick, + duration, + buffer: vec![].into(), + })) + } +} + +impl Evaluator for CollectBooleanEvaluator { + fn evaluate(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { + let input = info.value(&self.input)?.array_ref()?; + + match (self.tick.is_literal_null(), self.duration.is_literal_null()) { + (true, true) => self.evaluate_non_windowed(info), + (true, false) => unimplemented!("since window aggregation unsupported"), + (false, false) => panic!("sliding window aggregation should use other evaluator"), + (_, _) => anyhow::bail!("saw invalid combination of tick and duration"), + } + } +} + +impl CollectBooleanEvaluator { + fn evaluate_non_windowed(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { + let input = info.value(&self.input)?.array_ref()?; + let input = input.as_boolean(); + let mut builder = BooleanBuilder::new(); + let mut list_builder = ListBuilder::new(builder); + + input.into_iter().for_each(|i| { + self.buffer.push_back(i); + if self.buffer.len() > self.max as usize { + self.buffer.pop_front(); + } + + // TODO: Empty is null or empty? + list_builder.append_value(self.buffer.clone()); + list_builder.append(true); + }); + + Ok(Arc::new(list_builder.finish())) + } +} diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_map.rs b/crates/sparrow-instructions/src/evaluators/list/collect_map.rs new file mode 100644 index 000000000..1ef5c377a --- /dev/null +++ b/crates/sparrow-instructions/src/evaluators/list/collect_map.rs @@ -0,0 +1,81 @@ +//! The cast instruction isn't a "normal" instruction since it doesn't have a +//! a single, fixed signature. Specifically, the input and output types depend +//! on the input to the instruction and the requested output type. + +use std::collections::VecDeque; +use std::sync::Arc; + +use anyhow::anyhow; +use arrow::array::{ + ArrayRef, Int32Array, IntervalDayTimeArray, IntervalYearMonthArray, ListBuilder, + PrimitiveBuilder, +}; +use arrow::datatypes::{ArrowPrimitiveType, DataType}; +use arrow::downcast_primitive_array; +use sparrow_arrow::downcast::downcast_primitive_array; +use sparrow_arrow::scalar_value::ScalarValue; +use sparrow_kernels::time::i64_to_two_i32; +use sparrow_plan::ValueRef; +use sparrow_syntax::FenlType; + +use crate::{Evaluator, EvaluatorFactory, RuntimeInfo, StaticInfo}; + +/// Evaluator for the `collect` instruction. +/// +/// Collect collects a stream of values into a List. A list is produced +/// for each input value received, growing up to a maximum size. +#[derive(Debug)] +pub struct CollectMapEvaluator { + /// The max size of the buffer. + /// + /// Once the max size is reached, the front will be popped and the new + /// value pushed to the back. + max: i64, + input: ValueRef, + tick: ValueRef, + duration: ValueRef, +} + +impl EvaluatorFactory for CollectMapEvaluator { + fn try_new(info: StaticInfo<'_>) -> anyhow::Result> { + let input_type = info.args[1].data_type(); + let result_type = info.result_type; + match result_type { + DataType::List(t) => anyhow::ensure!(t.data_type() == input_type), + other => anyhow::bail!("expected list result type, saw {:?}", other), + }; + + let max = match info.args[0].value_ref.literal_value() { + Some(ScalarValue::Int64(Some(v))) => *v, + Some(other) => anyhow::bail!("expected i64 for max parameter, saw {:?}", other), + None => anyhow::bail!("expected literal value for max parameter"), + }; + + let (_, input, tick, duration) = info.unpack_arguments()?; + Ok(Box::new(Self { + max, + input, + tick, + duration, + })) + } +} + +impl Evaluator for CollectMapEvaluator { + fn evaluate(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { + let input = info.value(&self.input)?.array_ref()?; + + match (self.tick.is_literal_null(), self.duration.is_literal_null()) { + (true, true) => self.evaluate_non_windowed(info), + (true, false) => unimplemented!("since window aggregation unsupported"), + (false, false) => panic!("sliding window aggregation should use other evaluator"), + (_, _) => anyhow::bail!("saw invalid combination of tick and duration"), + } + } +} + +impl CollectMapEvaluator { + fn evaluate_non_windowed(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { + todo!() + } +} diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs b/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs new file mode 100644 index 000000000..9c105aeb4 --- /dev/null +++ b/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs @@ -0,0 +1,128 @@ +//! The cast instruction isn't a "normal" instruction since it doesn't have a +//! a single, fixed signature. Specifically, the input and output types depend +//! on the input to the instruction and the requested output type. + +use std::collections::VecDeque; +use std::sync::Arc; + +use anyhow::anyhow; +use arrow::array::{ + ArrayRef, Int32Array, IntervalDayTimeArray, IntervalYearMonthArray, ListBuilder, + PrimitiveBuilder, +}; +use arrow::datatypes::{ArrowPrimitiveType, DataType}; +use arrow::downcast_primitive_array; +use itertools::izip; +use sparrow_arrow::downcast::downcast_primitive_array; +use sparrow_arrow::scalar_value::ScalarValue; +use sparrow_kernels::time::i64_to_two_i32; +use sparrow_plan::ValueRef; +use sparrow_syntax::FenlType; + +use crate::{Evaluator, EvaluatorFactory, RuntimeInfo, StaticInfo}; + +/// Evaluator for the `collect` instruction. +/// +/// Collect collects a stream of values into a List. A list is produced +/// for each input value received, growing up to a maximum size. +#[derive(Debug)] +pub struct CollectPrimitiveEvaluator +where + T: ArrowPrimitiveType, +{ + /// The max size of the buffer. + /// + /// Once the max size is reached, the front will be popped and the new + /// value pushed to the back. + max: i64, + input: ValueRef, + tick: ValueRef, + duration: ValueRef, + /// Contains the buffer of values for each entity + buffers: Vec>>, +} + +impl EvaluatorFactory for CollectPrimitiveEvaluator +where + T: ArrowPrimitiveType + Send + Sync, +{ + fn try_new(info: StaticInfo<'_>) -> anyhow::Result> { + let input_type = info.args[1].data_type(); + let result_type = info.result_type; + match result_type { + DataType::List(t) => anyhow::ensure!(t.data_type() == input_type), + other => anyhow::bail!("expected list result type, saw {:?}", other), + }; + + let max = match info.args[0].value_ref.literal_value() { + Some(ScalarValue::Int64(Some(v))) => *v, + Some(other) => anyhow::bail!("expected i64 for max parameter, saw {:?}", other), + None => anyhow::bail!("expected literal value for max parameter"), + }; + + let (_, input, tick, duration) = info.unpack_arguments()?; + Ok(Box::new(Self { + max, + input, + tick, + duration, + buffers: vec![].into(), + })) + } +} + +impl Evaluator for CollectPrimitiveEvaluator +where + T: ArrowPrimitiveType + Send + Sync, +{ + fn evaluate(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { + match (self.tick.is_literal_null(), self.duration.is_literal_null()) { + (true, true) => self.evaluate_non_windowed(info), + (true, false) => unimplemented!("since window aggregation unsupported"), + (false, false) => panic!("sliding window aggregation should use other evaluator"), + (_, _) => anyhow::bail!("saw invalid combination of tick and duration"), + } + } +} + +impl CollectPrimitiveEvaluator +where + T: ArrowPrimitiveType + Send + Sync, +{ + fn ensure_entity_capacity(&mut self, len: usize) { + if len >= self.buffers.len() { + self.buffers.resize(len + 1, VecDeque::new()); + } + } + + fn evaluate_non_windowed(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { + let input = info.value(&self.input)?.array_ref()?; + let key_capacity = info.grouping().num_groups(); + let entity_indices = info.grouping().group_indices(); + assert_eq!(entity_indices.len(), input.len()); + + self.ensure_entity_capacity(key_capacity); + + let input = downcast_primitive_array::(input.as_ref())?; + let builder = PrimitiveBuilder::::new(); + let mut list_builder = ListBuilder::new(builder); + + izip!(entity_indices.values(), input).for_each(|(entity_index, input)| { + let entity_index = *entity_index as usize; + + self.buffers[entity_index].push_back(input); + if self.buffers[entity_index].len() > self.max as usize { + self.buffers[entity_index].pop_front(); + } + + // TODO: Empty is null or empty? + println!("Current Buffer: {:?}", self.buffers[entity_index]); + list_builder.append_value(self.buffers[entity_index].clone()); + }); + + let result = list_builder.finish(); + println!("ListBuilder: {:?}", result); + + Ok(Arc::new(result)) + } +} diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_string.rs b/crates/sparrow-instructions/src/evaluators/list/collect_string.rs new file mode 100644 index 000000000..129350913 --- /dev/null +++ b/crates/sparrow-instructions/src/evaluators/list/collect_string.rs @@ -0,0 +1,99 @@ +//! The cast instruction isn't a "normal" instruction since it doesn't have a +//! a single, fixed signature. Specifically, the input and output types depend +//! on the input to the instruction and the requested output type. + +use std::collections::VecDeque; +use std::sync::Arc; + +use anyhow::anyhow; +use arrow::array::{ + ArrayRef, AsArray, GenericStringBuilder, Int32Array, IntervalDayTimeArray, + IntervalYearMonthArray, ListBuilder, OffsetSizeTrait, PrimitiveBuilder, StringArray, + StringBuilder, +}; +use arrow::datatypes::{ArrowPrimitiveType, DataType}; +use arrow::downcast_primitive_array; +use sparrow_arrow::downcast::{downcast_primitive_array, downcast_string_array}; +use sparrow_arrow::scalar_value::ScalarValue; +use sparrow_plan::ValueRef; +use sparrow_syntax::FenlType; + +use crate::{Evaluator, EvaluatorFactory, RuntimeInfo, StaticInfo}; + +/// Evaluator for the `collect` instruction. +/// +/// Collect collects a stream of values into a List. A list is produced +/// for each input value received, growing up to a maximum size. +#[derive(Debug)] +pub struct CollectStringEvaluator { + /// The max size of the buffer. + /// + /// Once the max size is reached, the front will be popped and the new + /// value pushed to the back. + max: i64, + input: ValueRef, + tick: ValueRef, + duration: ValueRef, + buffer: VecDeque>, +} + +impl EvaluatorFactory for CollectStringEvaluator { + fn try_new(info: StaticInfo<'_>) -> anyhow::Result> { + let input_type = info.args[1].data_type(); + let result_type = info.result_type; + match result_type { + DataType::List(t) => anyhow::ensure!(t.data_type() == input_type), + other => anyhow::bail!("expected list result type, saw {:?}", other), + }; + + let max = match &info.args[0].value_ref.literal_value() { + Some(ScalarValue::Int64(Some(v))) => *v, + Some(other) => anyhow::bail!("expected i64 for max parameter, saw {:?}", other), + None => anyhow::bail!("expected literal value for max parameter"), + }; + + let (_, input, tick, duration) = info.unpack_arguments()?; + Ok(Box::new(Self { + max, + input, + tick, + duration, + buffer: vec![].into(), + })) + } +} + +impl Evaluator for CollectStringEvaluator { + fn evaluate(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { + let input = info.value(&self.input)?.array_ref()?; + + match (self.tick.is_literal_null(), self.duration.is_literal_null()) { + (true, true) => self.evaluate_non_windowed(info), + (true, false) => unimplemented!("since window aggregation unsupported"), + (false, false) => panic!("sliding window aggregation should use other evaluator"), + (_, _) => anyhow::bail!("saw invalid combination of tick and duration"), + } + } +} + +impl CollectStringEvaluator { + fn evaluate_non_windowed(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { + let input = info.value(&self.input)?.array_ref()?; + let input = input.as_string::(); + let mut builder = StringBuilder::new(); + let mut list_builder = ListBuilder::new(builder); + + input.into_iter().for_each(|i| { + self.buffer.push_back(i.map(|s| s.to_owned())); + if self.buffer.len() > self.max as usize { + self.buffer.pop_front(); + } + + // TODO: Empty is null or empty? + list_builder.append_value(self.buffer.clone()); + list_builder.append(true); + }); + + Ok(Arc::new(list_builder.finish())) + } +} diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_string_generic.rs b/crates/sparrow-instructions/src/evaluators/list/collect_string_generic.rs new file mode 100644 index 000000000..d8dc76b1a --- /dev/null +++ b/crates/sparrow-instructions/src/evaluators/list/collect_string_generic.rs @@ -0,0 +1,113 @@ +//! The cast instruction isn't a "normal" instruction since it doesn't have a +//! a single, fixed signature. Specifically, the input and output types depend +//! on the input to the instruction and the requested output type. + +use std::collections::VecDeque; +use std::sync::Arc; + +use anyhow::anyhow; +use arrow::array::{ + ArrayRef, GenericStringBuilder, Int32Array, IntervalDayTimeArray, IntervalYearMonthArray, + ListBuilder, OffsetSizeTrait, PrimitiveBuilder, +}; +use arrow::datatypes::{ArrowPrimitiveType, DataType}; +use arrow::downcast_primitive_array; +use sparrow_arrow::downcast::{downcast_primitive_array, downcast_string_array}; +use sparrow_arrow::scalar_value::ScalarValue; +use sparrow_plan::ValueRef; +use sparrow_syntax::FenlType; + +use crate::{Evaluator, EvaluatorFactory, RuntimeInfo, StaticInfo}; + +/// Evaluator for the `collect` instruction. +/// +/// Collect collects a stream of values into a List. A list is produced +/// for each input value received, growing up to a maximum size. +#[derive(Debug)] +pub struct CollectStringEvaluator<'a, O> +where + O: OffsetSizeTrait, +{ + /// The max size of the buffer. + /// + /// Once the max size is reached, the front will be popped and the new + /// value pushed to the back. + max: i64, + input: ValueRef, + tick: ValueRef, + duration: ValueRef, + buffer: VecDeque>, + + _phantom: std::marker::PhantomData, +} + +impl<'a, O> EvaluatorFactory for CollectStringEvaluator<'a, O> +where + O: OffsetSizeTrait, +{ + fn try_new(info: StaticInfo<'_>) -> anyhow::Result> { + let input_type = info.args[1].data_type(); + let result_type = info.result_type; + match result_type { + DataType::List(t) => anyhow::ensure!(t.data_type() == input_type), + other => anyhow::bail!("expected list result type, saw {:?}", other), + }; + + let max = match info.args[0].value_ref.literal_value() { + Some(ScalarValue::Int64(Some(v))) => v, + Some(other) => anyhow::bail!("expected i64 for max parameter, saw {:?}", other), + None => anyhow::bail!("expected literal value for max parameter"), + }; + + let (_, input, tick, duration) = info.unpack_arguments()?; + Ok(Box::new(Self { + max: *max, + input, + tick, + duration, + buffer: vec![].into(), + _phantom: std::marker::PhantomData, + })) + } +} + +impl<'a, O> Evaluator for CollectStringEvaluator<'a, O> +where + O: OffsetSizeTrait, +{ + fn evaluate(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { + let input = info.value(&self.input)?.array_ref()?; + + match (self.tick.is_literal_null(), self.duration.is_literal_null()) { + (true, true) => self.evaluate_non_windowed(info), + (true, false) => unimplemented!("since window aggregation unsupported"), + (false, false) => panic!("sliding window aggregation should use other evaluator"), + (_, _) => anyhow::bail!("saw invalid combination of tick and duration"), + } + } +} + +impl<'a, O> CollectStringEvaluator<'a, O> +where + O: OffsetSizeTrait, +{ + fn evaluate_non_windowed(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { + let input = info.value(&self.input)?.array_ref()?; + let input = downcast_string_array::(input.as_ref())?; + let mut builder = GenericStringBuilder::::new(); + let mut list_builder = ListBuilder::new(builder); + + input.into_iter().for_each(|i| { + self.buffer.push_back(i); + if self.buffer.len() > self.max as usize { + self.buffer.pop_front(); + } + + // TODO: Empty is null or empty? + list_builder.append_value(self.buffer.clone()); + list_builder.append(true); + }); + + Ok(Arc::new(list_builder.finish())) + } +} diff --git a/crates/sparrow-instructions/src/evaluators/list/index.rs b/crates/sparrow-instructions/src/evaluators/list/index.rs index 718b6ddb6..1e237132e 100644 --- a/crates/sparrow-instructions/src/evaluators/list/index.rs +++ b/crates/sparrow-instructions/src/evaluators/list/index.rs @@ -42,6 +42,8 @@ impl Evaluator for IndexEvaluator { /// Given a `ListArray` and `index` array of the same length return an array of the values. fn list_get(list: &ArrayRef, indices: &Int64Array) -> anyhow::Result { + println!("List: {:?}", list); + println!("Indices: {:?}", indices); anyhow::ensure!(list.len() == indices.len()); let list = list.as_list(); diff --git a/crates/sparrow-main/tests/e2e/collect_tests.rs b/crates/sparrow-main/tests/e2e/collect_tests.rs new file mode 100644 index 000000000..4ebe66ba5 --- /dev/null +++ b/crates/sparrow-main/tests/e2e/collect_tests.rs @@ -0,0 +1,71 @@ +//! e2e tests for collect to list function + +use indoc::indoc; +use sparrow_api::kaskada::v1alpha::TableConfig; +use uuid::Uuid; + +use crate::{fixture::DataFixture, fixtures::i64_data_fixture, QueryFixture}; + +/// Create a simple table with a collection type (map). +pub(crate) async fn list_data_fixture() -> DataFixture { + DataFixture::new() + .with_table_from_files( + TableConfig::new_with_table_source( + "Input", + &Uuid::new_v4(), + "time", + Some("subsort"), + "key", + "", + ), + &["parquet/data_with_list.parquet"], + ) + .await + .unwrap() +} + +pub(crate) async fn collect_data_fixture() -> DataFixture { + DataFixture::new() + .with_table_from_csv( + TableConfig::new_with_table_source( + "Collect", + &Uuid::new_v4(), + "time", + Some("subsort"), + "key", + "", + ), + indoc! {" + time,subsort,key,s,n,b + 1996-12-19T16:39:57-08:00,0,A,hEllo,0,true + 1996-12-19T16:40:57-08:00,0,A,hi,2,false + 1996-12-19T16:41:57-08:00,0,A,hey,9,, + 1996-12-19T16:42:57-08:00,0,A,ay,-1,true + 1996-12-19T16:43:57-08:00,0,A,hIlo,10,true + 1996-12-20T16:40:57-08:00,0,B,h,5,false + 1996-12-20T16:41:57-08:00,0,B,he,-2,, + 1996-12-20T16:42:57-08:00,0,B,,-2,true + 1996-12-20T16:43:57-08:00,0,B,hel,2,false + 1996-12-20T16:44:57-08:00,0,B,,true + 1996-12-21T16:44:57-08:00,0,C,g,true + 1996-12-21T16:45:57-08:00,0,C,go,true + 1996-12-21T16:46:57-08:00,0,C,goo,true + 1996-12-21T16:47:57-08:00,0,C,good,true + "}, + ) + .await + .unwrap() +} + +#[tokio::test] +async fn test_collect_to_list_i64() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Collect.n | collect(10) | index(0) }").with_dump_dot("asdf").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,5 + 1996-12-20T00:39:58.000000000,9223372036854775808,2867199309159137213,B,24 + 1996-12-20T00:39:59.000000000,9223372036854775808,12960666915911099378,A,5 + 1996-12-20T00:40:00.000000000,9223372036854775808,12960666915911099378,A,5 + 1996-12-20T00:40:01.000000000,9223372036854775808,12960666915911099378,A,5 + 1996-12-20T00:40:02.000000000,9223372036854775808,12960666915911099378,A,5 + "###); +} diff --git a/crates/sparrow-main/tests/e2e/list_tests.rs b/crates/sparrow-main/tests/e2e/list_tests.rs index e6c8c5d90..e3b40fe75 100644 --- a/crates/sparrow-main/tests/e2e/list_tests.rs +++ b/crates/sparrow-main/tests/e2e/list_tests.rs @@ -3,7 +3,7 @@ use sparrow_api::kaskada::v1alpha::TableConfig; use uuid::Uuid; -use crate::{fixture::DataFixture, fixtures::i64_data_fixture, QueryFixture}; +use crate::{fixture::DataFixture, QueryFixture}; /// Create a simple table with a collection type (map). pub(crate) async fn list_data_fixture() -> DataFixture { @@ -95,39 +95,17 @@ async fn test_index_list_bool_dynamic() { "###); } -#[tokio::test] -async fn test_collect_to_list_i64() { - insta::assert_snapshot!(QueryFixture::new("{ f1: Numbers.m | collect(10) | index(0) }").with_dump_dot("asdf").run_to_csv(&i64_data_fixture().await).await.unwrap(), @r###" - _time,_subsort,_key_hash,_key,f1 - "###); -} - #[tokio::test] async fn test_incorrect_index_type() { insta::assert_yaml_snapshot!(QueryFixture::new("{ f1: Input.i64_list | index(\"s\") }") .run_to_csv(&list_data_fixture().await).await.unwrap_err(), @r###" - --- - code: Client specified an invalid argument - message: 1 errors in Fenl statements; see diagnostics - fenl_diagnostics: - - severity: error - code: E0010 - message: Invalid argument type(s) - formatted: - - "error[E0010]: Invalid argument type(s)" - - " --> Query:1:24" - - " |" - - "1 | { f1: Input.i64_list | index(\"s\") }" - - " | ^^^^^ --- Actual type: string" - - " | | " - - " | Invalid types for parameter 'i' in call to 'index'" - - " |" - - " --> built-in signature 'index(i: i64, list: list) -> T':1:18" - - " |" - - "1 | index(i: i64, list: list) -> T" - - " | --- Expected type: i64" - - "" - - "" + _time,_subsort,_key_hash,_key,f1 + 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,5 + 1996-12-20T00:39:58.000000000,9223372036854775808,2867199309159137213,B,24 + 1996-12-20T00:39:59.000000000,9223372036854775808,12960666915911099378,A,22 + 1996-12-20T00:40:00.000000000,9223372036854775808,12960666915911099378,A,22 + 1996-12-20T00:40:01.000000000,9223372036854775808,12960666915911099378,A,34 + 1996-12-20T00:40:02.000000000,9223372036854775808,12960666915911099378,A,34 "###); } diff --git a/crates/sparrow-main/tests/e2e/main.rs b/crates/sparrow-main/tests/e2e/main.rs index 84ecde202..97f25c56a 100644 --- a/crates/sparrow-main/tests/e2e/main.rs +++ b/crates/sparrow-main/tests/e2e/main.rs @@ -19,6 +19,7 @@ mod aggregation_tests; mod basic_error_tests; mod cast_tests; mod coalesce_tests; +mod collect_tests; mod comparison_tests; mod decoration_tests; mod entity_key_output_tests; From 68229b0d78495632ba7e0f59a12f25fb00410dcd Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Sat, 29 Jul 2023 14:40:18 -0700 Subject: [PATCH 03/13] Add unit tests --- .../src/functions/collection.rs | 2 +- .../src/evaluators/list/collect_boolean.rs | 36 ++- .../src/evaluators/list/collect_string.rs | 34 ++- .../sparrow-main/tests/e2e/collect_tests.rs | 287 +++++++++++++++--- 4 files changed, 295 insertions(+), 64 deletions(-) diff --git a/crates/sparrow-compiler/src/functions/collection.rs b/crates/sparrow-compiler/src/functions/collection.rs index 8f4cd3a5a..925d40e42 100644 --- a/crates/sparrow-compiler/src/functions/collection.rs +++ b/crates/sparrow-compiler/src/functions/collection.rs @@ -15,7 +15,7 @@ pub(super) fn register(registry: &mut Registry) { // TODO: Make MAX default to something? registry - .register("collect(max: i64, input: T, window: window = null) -> list") + .register("collect(const max: i64, input: T, window: window = null) -> list") .with_implementation(Implementation::Instruction(InstOp::Collect)) .set_internal(); } diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs b/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs index 8289b3491..6b5fe4d4d 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs @@ -12,6 +12,7 @@ use arrow::array::{ }; use arrow::datatypes::{ArrowPrimitiveType, DataType}; use arrow::downcast_primitive_array; +use itertools::izip; use sparrow_arrow::downcast::{downcast_boolean_array, downcast_primitive_array}; use sparrow_arrow::scalar_value::ScalarValue; use sparrow_kernels::time::i64_to_two_i32; @@ -34,7 +35,8 @@ pub struct CollectBooleanEvaluator { input: ValueRef, tick: ValueRef, duration: ValueRef, - buffer: VecDeque>, + /// Contains the buffer of values for each entity + buffers: Vec>>, } impl EvaluatorFactory for CollectBooleanEvaluator { @@ -58,15 +60,13 @@ impl EvaluatorFactory for CollectBooleanEvaluator { input, tick, duration, - buffer: vec![].into(), + buffers: vec![].into(), })) } } impl Evaluator for CollectBooleanEvaluator { fn evaluate(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { - let input = info.value(&self.input)?.array_ref()?; - match (self.tick.is_literal_null(), self.duration.is_literal_null()) { (true, true) => self.evaluate_non_windowed(info), (true, false) => unimplemented!("since window aggregation unsupported"), @@ -77,21 +77,35 @@ impl Evaluator for CollectBooleanEvaluator { } impl CollectBooleanEvaluator { + fn ensure_entity_capacity(&mut self, len: usize) { + if len >= self.buffers.len() { + self.buffers.resize(len + 1, VecDeque::new()); + } + } + fn evaluate_non_windowed(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { let input = info.value(&self.input)?.array_ref()?; + let key_capacity = info.grouping().num_groups(); + let entity_indices = info.grouping().group_indices(); + assert_eq!(entity_indices.len(), input.len()); + + self.ensure_entity_capacity(key_capacity); + let input = input.as_boolean(); - let mut builder = BooleanBuilder::new(); + let builder = BooleanBuilder::new(); let mut list_builder = ListBuilder::new(builder); - input.into_iter().for_each(|i| { - self.buffer.push_back(i); - if self.buffer.len() > self.max as usize { - self.buffer.pop_front(); + izip!(entity_indices.values(), input).for_each(|(entity_index, input)| { + let entity_index = *entity_index as usize; + + self.buffers[entity_index].push_back(input); + if self.buffers[entity_index].len() > self.max as usize { + self.buffers[entity_index].pop_front(); } // TODO: Empty is null or empty? - list_builder.append_value(self.buffer.clone()); - list_builder.append(true); + println!("Current Buffer: {:?}", self.buffers[entity_index]); + list_builder.append_value(self.buffers[entity_index].clone()); }); Ok(Arc::new(list_builder.finish())) diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_string.rs b/crates/sparrow-instructions/src/evaluators/list/collect_string.rs index 129350913..fc650146b 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_string.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_string.rs @@ -13,6 +13,7 @@ use arrow::array::{ }; use arrow::datatypes::{ArrowPrimitiveType, DataType}; use arrow::downcast_primitive_array; +use itertools::izip; use sparrow_arrow::downcast::{downcast_primitive_array, downcast_string_array}; use sparrow_arrow::scalar_value::ScalarValue; use sparrow_plan::ValueRef; @@ -34,7 +35,8 @@ pub struct CollectStringEvaluator { input: ValueRef, tick: ValueRef, duration: ValueRef, - buffer: VecDeque>, + /// Contains the buffer of values for each entity + buffers: Vec>>, } impl EvaluatorFactory for CollectStringEvaluator { @@ -58,15 +60,13 @@ impl EvaluatorFactory for CollectStringEvaluator { input, tick, duration, - buffer: vec![].into(), + buffers: vec![].into(), })) } } impl Evaluator for CollectStringEvaluator { fn evaluate(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { - let input = info.value(&self.input)?.array_ref()?; - match (self.tick.is_literal_null(), self.duration.is_literal_null()) { (true, true) => self.evaluate_non_windowed(info), (true, false) => unimplemented!("since window aggregation unsupported"), @@ -77,21 +77,35 @@ impl Evaluator for CollectStringEvaluator { } impl CollectStringEvaluator { + fn ensure_entity_capacity(&mut self, len: usize) { + if len >= self.buffers.len() { + self.buffers.resize(len + 1, VecDeque::new()); + } + } + fn evaluate_non_windowed(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { let input = info.value(&self.input)?.array_ref()?; + let key_capacity = info.grouping().num_groups(); + let entity_indices = info.grouping().group_indices(); + assert_eq!(entity_indices.len(), input.len()); + + self.ensure_entity_capacity(key_capacity); + let input = input.as_string::(); let mut builder = StringBuilder::new(); let mut list_builder = ListBuilder::new(builder); - input.into_iter().for_each(|i| { - self.buffer.push_back(i.map(|s| s.to_owned())); - if self.buffer.len() > self.max as usize { - self.buffer.pop_front(); + izip!(entity_indices.values(), input).for_each(|(entity_index, input)| { + let entity_index = *entity_index as usize; + + self.buffers[entity_index].push_back(input.map(|i| i.to_owned())); + if self.buffers[entity_index].len() > self.max as usize { + self.buffers[entity_index].pop_front(); } // TODO: Empty is null or empty? - list_builder.append_value(self.buffer.clone()); - list_builder.append(true); + println!("Current Buffer: {:?}", self.buffers[entity_index]); + list_builder.append_value(self.buffers[entity_index].clone()); }); Ok(Arc::new(list_builder.finish())) diff --git a/crates/sparrow-main/tests/e2e/collect_tests.rs b/crates/sparrow-main/tests/e2e/collect_tests.rs index 4ebe66ba5..f2e58dffe 100644 --- a/crates/sparrow-main/tests/e2e/collect_tests.rs +++ b/crates/sparrow-main/tests/e2e/collect_tests.rs @@ -1,28 +1,10 @@ -//! e2e tests for collect to list function +//! e2e tests for collect function use indoc::indoc; use sparrow_api::kaskada::v1alpha::TableConfig; use uuid::Uuid; -use crate::{fixture::DataFixture, fixtures::i64_data_fixture, QueryFixture}; - -/// Create a simple table with a collection type (map). -pub(crate) async fn list_data_fixture() -> DataFixture { - DataFixture::new() - .with_table_from_files( - TableConfig::new_with_table_source( - "Input", - &Uuid::new_v4(), - "time", - Some("subsort"), - "key", - "", - ), - &["parquet/data_with_list.parquet"], - ) - .await - .unwrap() -} +use crate::{fixture::DataFixture, QueryFixture}; pub(crate) async fn collect_data_fixture() -> DataFixture { DataFixture::new() @@ -36,21 +18,21 @@ pub(crate) async fn collect_data_fixture() -> DataFixture { "", ), indoc! {" - time,subsort,key,s,n,b - 1996-12-19T16:39:57-08:00,0,A,hEllo,0,true - 1996-12-19T16:40:57-08:00,0,A,hi,2,false - 1996-12-19T16:41:57-08:00,0,A,hey,9,, - 1996-12-19T16:42:57-08:00,0,A,ay,-1,true - 1996-12-19T16:43:57-08:00,0,A,hIlo,10,true - 1996-12-20T16:40:57-08:00,0,B,h,5,false - 1996-12-20T16:41:57-08:00,0,B,he,-2,, - 1996-12-20T16:42:57-08:00,0,B,,-2,true - 1996-12-20T16:43:57-08:00,0,B,hel,2,false - 1996-12-20T16:44:57-08:00,0,B,,true - 1996-12-21T16:44:57-08:00,0,C,g,true - 1996-12-21T16:45:57-08:00,0,C,go,true - 1996-12-21T16:46:57-08:00,0,C,goo,true - 1996-12-21T16:47:57-08:00,0,C,good,true + time,subsort,key,s,n,b,index + 1996-12-19T16:39:57-08:00,0,A,hEllo,0,true,0 + 1996-12-19T16:40:57-08:00,0,A,hi,2,false,1 + 1996-12-19T16:41:57-08:00,0,A,hey,9,,2 + 1996-12-19T16:42:57-08:00,0,A,ay,-1,true,1 + 1996-12-19T16:43:57-08:00,0,A,hIlo,10,true, + 1996-12-20T16:40:57-08:00,0,B,h,5,false,0 + 1996-12-20T16:41:57-08:00,0,B,he,-2,,1 + 1996-12-20T16:42:57-08:00,0,B,,,true,2 + 1996-12-20T16:43:57-08:00,0,B,hel,2,false,1 + 1996-12-20T16:44:57-08:00,0,B,,,true,1 + 1996-12-21T16:44:57-08:00,0,C,g,1,true,2 + 1996-12-21T16:45:57-08:00,0,C,go,2,true,0 + 1996-12-21T16:46:57-08:00,0,C,goo,3,true, + 1996-12-21T16:47:57-08:00,0,C,good,4,true,1 "}, ) .await @@ -59,13 +41,234 @@ pub(crate) async fn collect_data_fixture() -> DataFixture { #[tokio::test] async fn test_collect_to_list_i64() { - insta::assert_snapshot!(QueryFixture::new("{ f1: Collect.n | collect(10) | index(0) }").with_dump_dot("asdf").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###" + insta::assert_snapshot!(QueryFixture::new("{ f1: Collect.n | collect(10) | index(0) }").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###" _time,_subsort,_key_hash,_key,f1 - 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,5 - 1996-12-20T00:39:58.000000000,9223372036854775808,2867199309159137213,B,24 - 1996-12-20T00:39:59.000000000,9223372036854775808,12960666915911099378,A,5 - 1996-12-20T00:40:00.000000000,9223372036854775808,12960666915911099378,A,5 - 1996-12-20T00:40:01.000000000,9223372036854775808,12960666915911099378,A,5 - 1996-12-20T00:40:02.000000000,9223372036854775808,12960666915911099378,A,5 + 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,0 + 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,0 + 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A,0 + 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,0 + 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A,0 + 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,5 + 1996-12-21T00:41:57.000000000,9223372036854775808,2867199309159137213,B,5 + 1996-12-21T00:42:57.000000000,9223372036854775808,2867199309159137213,B,5 + 1996-12-21T00:43:57.000000000,9223372036854775808,2867199309159137213,B,5 + 1996-12-21T00:44:57.000000000,9223372036854775808,2867199309159137213,B,5 + 1996-12-22T00:44:57.000000000,9223372036854775808,2521269998124177631,C,1 + 1996-12-22T00:45:57.000000000,9223372036854775808,2521269998124177631,C,1 + 1996-12-22T00:46:57.000000000,9223372036854775808,2521269998124177631,C,1 + 1996-12-22T00:47:57.000000000,9223372036854775808,2521269998124177631,C,1 + "###); +} + +#[tokio::test] +async fn test_collect_to_list_i64_dynamic() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Collect.n | collect(10) | index(Collect.index) }").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,0 + 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,2 + 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A,9 + 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,2 + 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A, + 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,5 + 1996-12-21T00:41:57.000000000,9223372036854775808,2867199309159137213,B,-2 + 1996-12-21T00:42:57.000000000,9223372036854775808,2867199309159137213,B, + 1996-12-21T00:43:57.000000000,9223372036854775808,2867199309159137213,B,-2 + 1996-12-21T00:44:57.000000000,9223372036854775808,2867199309159137213,B,-2 + 1996-12-22T00:44:57.000000000,9223372036854775808,2521269998124177631,C, + 1996-12-22T00:45:57.000000000,9223372036854775808,2521269998124177631,C,1 + 1996-12-22T00:46:57.000000000,9223372036854775808,2521269998124177631,C, + 1996-12-22T00:47:57.000000000,9223372036854775808,2521269998124177631,C,2 + "###); +} + +#[tokio::test] +async fn test_collect_to_small_list_i64() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Collect.n | collect(2) | index(Collect.index) }").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,0 + 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,2 + 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A, + 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,-1 + 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A, + 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,5 + 1996-12-21T00:41:57.000000000,9223372036854775808,2867199309159137213,B,-2 + 1996-12-21T00:42:57.000000000,9223372036854775808,2867199309159137213,B, + 1996-12-21T00:43:57.000000000,9223372036854775808,2867199309159137213,B,2 + 1996-12-21T00:44:57.000000000,9223372036854775808,2867199309159137213,B, + 1996-12-22T00:44:57.000000000,9223372036854775808,2521269998124177631,C, + 1996-12-22T00:45:57.000000000,9223372036854775808,2521269998124177631,C,1 + 1996-12-22T00:46:57.000000000,9223372036854775808,2521269998124177631,C, + 1996-12-22T00:47:57.000000000,9223372036854775808,2521269998124177631,C,4 + "###); +} + +#[tokio::test] +async fn test_collect_to_list_string() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Collect.s | collect(10) | index(0) }").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,hEllo + 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,hEllo + 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A,hEllo + 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,hEllo + 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A,hEllo + 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,h + 1996-12-21T00:41:57.000000000,9223372036854775808,2867199309159137213,B,h + 1996-12-21T00:42:57.000000000,9223372036854775808,2867199309159137213,B,h + 1996-12-21T00:43:57.000000000,9223372036854775808,2867199309159137213,B,h + 1996-12-21T00:44:57.000000000,9223372036854775808,2867199309159137213,B,h + 1996-12-22T00:44:57.000000000,9223372036854775808,2521269998124177631,C,g + 1996-12-22T00:45:57.000000000,9223372036854775808,2521269998124177631,C,g + 1996-12-22T00:46:57.000000000,9223372036854775808,2521269998124177631,C,g + 1996-12-22T00:47:57.000000000,9223372036854775808,2521269998124177631,C,g + "###); +} + +#[tokio::test] +async fn test_collect_to_list_string_dynamic() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Collect.s | collect(10) | index(Collect.index) }").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,hEllo + 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,hi + 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A,hey + 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,hi + 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A, + 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,h + 1996-12-21T00:41:57.000000000,9223372036854775808,2867199309159137213,B,he + 1996-12-21T00:42:57.000000000,9223372036854775808,2867199309159137213,B, + 1996-12-21T00:43:57.000000000,9223372036854775808,2867199309159137213,B,he + 1996-12-21T00:44:57.000000000,9223372036854775808,2867199309159137213,B,he + 1996-12-22T00:44:57.000000000,9223372036854775808,2521269998124177631,C, + 1996-12-22T00:45:57.000000000,9223372036854775808,2521269998124177631,C,g + 1996-12-22T00:46:57.000000000,9223372036854775808,2521269998124177631,C, + 1996-12-22T00:47:57.000000000,9223372036854775808,2521269998124177631,C,go + "###); +} + +#[tokio::test] +async fn test_collect_to_small_list_string() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Collect.s | collect(2) | index(Collect.index) }").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,hEllo + 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,hi + 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A, + 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,ay + 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A, + 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,h + 1996-12-21T00:41:57.000000000,9223372036854775808,2867199309159137213,B,he + 1996-12-21T00:42:57.000000000,9223372036854775808,2867199309159137213,B, + 1996-12-21T00:43:57.000000000,9223372036854775808,2867199309159137213,B,hel + 1996-12-21T00:44:57.000000000,9223372036854775808,2867199309159137213,B, + 1996-12-22T00:44:57.000000000,9223372036854775808,2521269998124177631,C, + 1996-12-22T00:45:57.000000000,9223372036854775808,2521269998124177631,C,g + 1996-12-22T00:46:57.000000000,9223372036854775808,2521269998124177631,C, + 1996-12-22T00:47:57.000000000,9223372036854775808,2521269998124177631,C,good + "###); +} + +#[tokio::test] +async fn test_collect_to_list_boolean() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Collect.b | collect(10) | index(0) }").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,true + 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,false + 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A, + 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,false + 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A, + 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,false + 1996-12-21T00:41:57.000000000,9223372036854775808,2867199309159137213,B, + 1996-12-21T00:42:57.000000000,9223372036854775808,2867199309159137213,B,true + 1996-12-21T00:43:57.000000000,9223372036854775808,2867199309159137213,B, + 1996-12-21T00:44:57.000000000,9223372036854775808,2867199309159137213,B, + 1996-12-22T00:44:57.000000000,9223372036854775808,2521269998124177631,C, + 1996-12-22T00:45:57.000000000,9223372036854775808,2521269998124177631,C,true + 1996-12-22T00:46:57.000000000,9223372036854775808,2521269998124177631,C, + 1996-12-22T00:47:57.000000000,9223372036854775808,2521269998124177631,C,true + "###); +} + +#[tokio::test] +async fn test_collect_to_list_boolean_dynamic() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Collect.b | collect(10) | index(Collect.index) }").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,hEllo + 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,hi + 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A,hey + 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,hi + 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A, + 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,h + 1996-12-21T00:41:57.000000000,9223372036854775808,2867199309159137213,B,he + 1996-12-21T00:42:57.000000000,9223372036854775808,2867199309159137213,B, + 1996-12-21T00:43:57.000000000,9223372036854775808,2867199309159137213,B,he + 1996-12-21T00:44:57.000000000,9223372036854775808,2867199309159137213,B,he + 1996-12-22T00:44:57.000000000,9223372036854775808,2521269998124177631,C, + 1996-12-22T00:45:57.000000000,9223372036854775808,2521269998124177631,C,g + 1996-12-22T00:46:57.000000000,9223372036854775808,2521269998124177631,C, + 1996-12-22T00:47:57.000000000,9223372036854775808,2521269998124177631,C,go + "###); +} + +#[tokio::test] +async fn test_collect_to_small_list_boolean() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Collect.b | collect(2) | index(Collect.index) }").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,true + 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,false + 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A, + 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,true + 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A, + 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,false + 1996-12-21T00:41:57.000000000,9223372036854775808,2867199309159137213,B, + 1996-12-21T00:42:57.000000000,9223372036854775808,2867199309159137213,B, + 1996-12-21T00:43:57.000000000,9223372036854775808,2867199309159137213,B,false + 1996-12-21T00:44:57.000000000,9223372036854775808,2867199309159137213,B,true + 1996-12-22T00:44:57.000000000,9223372036854775808,2521269998124177631,C, + 1996-12-22T00:45:57.000000000,9223372036854775808,2521269998124177631,C,true + 1996-12-22T00:46:57.000000000,9223372036854775808,2521269998124177631,C, + 1996-12-22T00:47:57.000000000,9223372036854775808,2521269998124177631,C,true + "###); +} + +#[tokio::test] +async fn test_require_literal_max() { + // TODO: We should figure out how to not report the second error -- type variables with + // error propagation needs some fixing. + insta::assert_yaml_snapshot!(QueryFixture::new("{ f1: Collect.s | collect(Collect.index) | index(1) }") + .run_to_csv(&collect_data_fixture().await).await.unwrap_err(), @r###" + --- + code: Client specified an invalid argument + message: 2 errors in Fenl statements; see diagnostics + fenl_diagnostics: + - severity: error + code: E0014 + message: Invalid non-constant argument + formatted: + - "error[E0014]: Invalid non-constant argument" + - " --> Query:1:27" + - " |" + - "1 | { f1: Collect.s | collect(Collect.index) | index(1) }" + - " | ^^^^^^^^^^^^^ Argument 'max' to 'collect' must be constant, but was not" + - "" + - "" + - severity: error + code: E0010 + message: Invalid argument type(s) + formatted: + - "error[E0010]: Invalid argument type(s)" + - " --> Query:1:44" + - " |" + - "1 | { f1: Collect.s | collect(Collect.index) | index(1) }" + - " | ^^^^^ Invalid types for parameter 'list' in call to 'index'" + - " |" + - " --> internal:1:1" + - " |" + - 1 | $input + - " | ------ Actual type: error" + - " |" + - " --> built-in signature 'index(i: i64, list: list) -> T':1:29" + - " |" + - "1 | index(i: i64, list: list) -> T" + - " | ------- Expected type: list" + - "" + - "" "###); } From 54d4afeb182842df39ef48c9719a8e617ed10dd6 Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Sat, 29 Jul 2023 14:46:24 -0700 Subject: [PATCH 04/13] update e2e tests --- crates/sparrow-catalog/catalog/len copy.toml | 16 ------- .../sparrow-main/tests/e2e/collect_tests.rs | 42 +++++++++---------- crates/sparrow-main/tests/e2e/list_tests.rs | 29 +++++++++---- 3 files changed, 43 insertions(+), 44 deletions(-) delete mode 100644 crates/sparrow-catalog/catalog/len copy.toml diff --git a/crates/sparrow-catalog/catalog/len copy.toml b/crates/sparrow-catalog/catalog/len copy.toml deleted file mode 100644 index 991b18a0d..000000000 --- a/crates/sparrow-catalog/catalog/len copy.toml +++ /dev/null @@ -1,16 +0,0 @@ -name = "get" -signature = "get(key: K, map: map) -> V" -short_doc = "..." -long_doc = """ - -### Parameters - ... - -### Results -... -""" - -[[examples]] -name = "..." -expression = "..." -input_csv = "..." \ No newline at end of file diff --git a/crates/sparrow-main/tests/e2e/collect_tests.rs b/crates/sparrow-main/tests/e2e/collect_tests.rs index f2e58dffe..5c61cc70c 100644 --- a/crates/sparrow-main/tests/e2e/collect_tests.rs +++ b/crates/sparrow-main/tests/e2e/collect_tests.rs @@ -170,18 +170,18 @@ async fn test_collect_to_list_boolean() { insta::assert_snapshot!(QueryFixture::new("{ f1: Collect.b | collect(10) | index(0) }").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###" _time,_subsort,_key_hash,_key,f1 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,true - 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,false - 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A, - 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,false - 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A, + 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,true + 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A,true + 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,true + 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A,true 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,false - 1996-12-21T00:41:57.000000000,9223372036854775808,2867199309159137213,B, - 1996-12-21T00:42:57.000000000,9223372036854775808,2867199309159137213,B,true - 1996-12-21T00:43:57.000000000,9223372036854775808,2867199309159137213,B, - 1996-12-21T00:44:57.000000000,9223372036854775808,2867199309159137213,B, - 1996-12-22T00:44:57.000000000,9223372036854775808,2521269998124177631,C, + 1996-12-21T00:41:57.000000000,9223372036854775808,2867199309159137213,B,false + 1996-12-21T00:42:57.000000000,9223372036854775808,2867199309159137213,B,false + 1996-12-21T00:43:57.000000000,9223372036854775808,2867199309159137213,B,false + 1996-12-21T00:44:57.000000000,9223372036854775808,2867199309159137213,B,false + 1996-12-22T00:44:57.000000000,9223372036854775808,2521269998124177631,C,true 1996-12-22T00:45:57.000000000,9223372036854775808,2521269998124177631,C,true - 1996-12-22T00:46:57.000000000,9223372036854775808,2521269998124177631,C, + 1996-12-22T00:46:57.000000000,9223372036854775808,2521269998124177631,C,true 1996-12-22T00:47:57.000000000,9223372036854775808,2521269998124177631,C,true "###); } @@ -190,20 +190,20 @@ async fn test_collect_to_list_boolean() { async fn test_collect_to_list_boolean_dynamic() { insta::assert_snapshot!(QueryFixture::new("{ f1: Collect.b | collect(10) | index(Collect.index) }").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###" _time,_subsort,_key_hash,_key,f1 - 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,hEllo - 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,hi - 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A,hey - 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,hi + 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,true + 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,false + 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A, + 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,false 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A, - 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,h - 1996-12-21T00:41:57.000000000,9223372036854775808,2867199309159137213,B,he - 1996-12-21T00:42:57.000000000,9223372036854775808,2867199309159137213,B, - 1996-12-21T00:43:57.000000000,9223372036854775808,2867199309159137213,B,he - 1996-12-21T00:44:57.000000000,9223372036854775808,2867199309159137213,B,he + 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,false + 1996-12-21T00:41:57.000000000,9223372036854775808,2867199309159137213,B, + 1996-12-21T00:42:57.000000000,9223372036854775808,2867199309159137213,B,true + 1996-12-21T00:43:57.000000000,9223372036854775808,2867199309159137213,B, + 1996-12-21T00:44:57.000000000,9223372036854775808,2867199309159137213,B, 1996-12-22T00:44:57.000000000,9223372036854775808,2521269998124177631,C, - 1996-12-22T00:45:57.000000000,9223372036854775808,2521269998124177631,C,g + 1996-12-22T00:45:57.000000000,9223372036854775808,2521269998124177631,C,true 1996-12-22T00:46:57.000000000,9223372036854775808,2521269998124177631,C, - 1996-12-22T00:47:57.000000000,9223372036854775808,2521269998124177631,C,go + 1996-12-22T00:47:57.000000000,9223372036854775808,2521269998124177631,C,true "###); } diff --git a/crates/sparrow-main/tests/e2e/list_tests.rs b/crates/sparrow-main/tests/e2e/list_tests.rs index e3b40fe75..13d0921e4 100644 --- a/crates/sparrow-main/tests/e2e/list_tests.rs +++ b/crates/sparrow-main/tests/e2e/list_tests.rs @@ -99,13 +99,28 @@ async fn test_index_list_bool_dynamic() { async fn test_incorrect_index_type() { insta::assert_yaml_snapshot!(QueryFixture::new("{ f1: Input.i64_list | index(\"s\") }") .run_to_csv(&list_data_fixture().await).await.unwrap_err(), @r###" - _time,_subsort,_key_hash,_key,f1 - 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,5 - 1996-12-20T00:39:58.000000000,9223372036854775808,2867199309159137213,B,24 - 1996-12-20T00:39:59.000000000,9223372036854775808,12960666915911099378,A,22 - 1996-12-20T00:40:00.000000000,9223372036854775808,12960666915911099378,A,22 - 1996-12-20T00:40:01.000000000,9223372036854775808,12960666915911099378,A,34 - 1996-12-20T00:40:02.000000000,9223372036854775808,12960666915911099378,A,34 + --- + code: Client specified an invalid argument + message: 1 errors in Fenl statements; see diagnostics + fenl_diagnostics: + - severity: error + code: E0010 + message: Invalid argument type(s) + formatted: + - "error[E0010]: Invalid argument type(s)" + - " --> Query:1:24" + - " |" + - "1 | { f1: Input.i64_list | index(\"s\") }" + - " | ^^^^^ --- Actual type: string" + - " | | " + - " | Invalid types for parameter 'i' in call to 'index'" + - " |" + - " --> built-in signature 'index(i: i64, list: list) -> T':1:18" + - " |" + - "1 | index(i: i64, list: list) -> T" + - " | --- Expected type: i64" + - "" + - "" "###); } From dd3140e07c32657dae19b8dd01ec9deb9c879ac0 Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Sat, 29 Jul 2023 14:58:14 -0700 Subject: [PATCH 05/13] cleanup evaluators --- crates/sparrow-compiler/src/ast_to_dfg.rs | 2 +- .../src/functions/function.rs | 4 - .../src/evaluators/list/collect_boolean.rs | 4 +- .../src/evaluators/list/collect_map.rs | 49 ++------ .../src/evaluators/list/collect_primitive.rs | 9 +- .../src/evaluators/list/collect_string.rs | 6 +- .../evaluators/list/collect_string_generic.rs | 113 ------------------ 7 files changed, 17 insertions(+), 170 deletions(-) delete mode 100644 crates/sparrow-instructions/src/evaluators/list/collect_string_generic.rs diff --git a/crates/sparrow-compiler/src/ast_to_dfg.rs b/crates/sparrow-compiler/src/ast_to_dfg.rs index ded2b4f15..b50aa04fb 100644 --- a/crates/sparrow-compiler/src/ast_to_dfg.rs +++ b/crates/sparrow-compiler/src/ast_to_dfg.rs @@ -530,7 +530,7 @@ pub fn add_to_dfg( vec![args[0].clone(), condition, duration] } else if function.name() == "collect" { // The collect function contains a window, but does not follow the same signature - // pattern as aggregations, so it requires a + // pattern as aggregations, so it requires a different flattening strategy. // // TODO: Flattening the window arguments is hacky and confusing. We should instead // incorporate the tick directly into the function containing the window. diff --git a/crates/sparrow-compiler/src/functions/function.rs b/crates/sparrow-compiler/src/functions/function.rs index a652a3c90..ada567430 100644 --- a/crates/sparrow-compiler/src/functions/function.rs +++ b/crates/sparrow-compiler/src/functions/function.rs @@ -98,10 +98,6 @@ impl Function { ) } - pub fn contains_window(&self) -> bool { - self.name() == "collect" || self.is_aggregation() - } - pub fn is_tick(&self) -> bool { matches!(self.implementation, Implementation::Tick(_)) } diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs b/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs index 6b5fe4d4d..a0e8f86ee 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs @@ -25,6 +25,8 @@ use crate::{Evaluator, EvaluatorFactory, RuntimeInfo, StaticInfo}; /// /// Collect collects a stream of values into a List. A list is produced /// for each input value received, growing up to a maximum size. +/// +/// If the list is empty, an empty list is returned (rather than `null`). #[derive(Debug)] pub struct CollectBooleanEvaluator { /// The max size of the buffer. @@ -103,8 +105,6 @@ impl CollectBooleanEvaluator { self.buffers[entity_index].pop_front(); } - // TODO: Empty is null or empty? - println!("Current Buffer: {:?}", self.buffers[entity_index]); list_builder.append_value(self.buffers[entity_index].clone()); }); diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_map.rs b/crates/sparrow-instructions/src/evaluators/list/collect_map.rs index 1ef5c377a..d1a4ebddd 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_map.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_map.rs @@ -16,7 +16,6 @@ use sparrow_arrow::downcast::downcast_primitive_array; use sparrow_arrow::scalar_value::ScalarValue; use sparrow_kernels::time::i64_to_two_i32; use sparrow_plan::ValueRef; -use sparrow_syntax::FenlType; use crate::{Evaluator, EvaluatorFactory, RuntimeInfo, StaticInfo}; @@ -30,52 +29,20 @@ pub struct CollectMapEvaluator { /// /// Once the max size is reached, the front will be popped and the new /// value pushed to the back. - max: i64, - input: ValueRef, - tick: ValueRef, - duration: ValueRef, + _max: i64, + _input: ValueRef, + _tick: ValueRef, + _duration: ValueRef, } impl EvaluatorFactory for CollectMapEvaluator { - fn try_new(info: StaticInfo<'_>) -> anyhow::Result> { - let input_type = info.args[1].data_type(); - let result_type = info.result_type; - match result_type { - DataType::List(t) => anyhow::ensure!(t.data_type() == input_type), - other => anyhow::bail!("expected list result type, saw {:?}", other), - }; - - let max = match info.args[0].value_ref.literal_value() { - Some(ScalarValue::Int64(Some(v))) => *v, - Some(other) => anyhow::bail!("expected i64 for max parameter, saw {:?}", other), - None => anyhow::bail!("expected literal value for max parameter"), - }; - - let (_, input, tick, duration) = info.unpack_arguments()?; - Ok(Box::new(Self { - max, - input, - tick, - duration, - })) + fn try_new(_info: StaticInfo<'_>) -> anyhow::Result> { + unimplemented!("map collect evaluator is unsupported") } } impl Evaluator for CollectMapEvaluator { - fn evaluate(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { - let input = info.value(&self.input)?.array_ref()?; - - match (self.tick.is_literal_null(), self.duration.is_literal_null()) { - (true, true) => self.evaluate_non_windowed(info), - (true, false) => unimplemented!("since window aggregation unsupported"), - (false, false) => panic!("sliding window aggregation should use other evaluator"), - (_, _) => anyhow::bail!("saw invalid combination of tick and duration"), - } - } -} - -impl CollectMapEvaluator { - fn evaluate_non_windowed(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { - todo!() + fn evaluate(&mut self, _info: &dyn RuntimeInfo) -> anyhow::Result { + unimplemented!("map collect evaluator is unsupported") } } diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs b/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs index 9c105aeb4..093be60a3 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs @@ -25,6 +25,8 @@ use crate::{Evaluator, EvaluatorFactory, RuntimeInfo, StaticInfo}; /// /// Collect collects a stream of values into a List. A list is produced /// for each input value received, growing up to a maximum size. +/// +/// If the list is empty, an empty list is returned (rather than `null`). #[derive(Debug)] pub struct CollectPrimitiveEvaluator where @@ -115,14 +117,9 @@ where self.buffers[entity_index].pop_front(); } - // TODO: Empty is null or empty? - println!("Current Buffer: {:?}", self.buffers[entity_index]); list_builder.append_value(self.buffers[entity_index].clone()); }); - let result = list_builder.finish(); - println!("ListBuilder: {:?}", result); - - Ok(Arc::new(result)) + Ok(Arc::new(list_builder.finish())) } } diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_string.rs b/crates/sparrow-instructions/src/evaluators/list/collect_string.rs index fc650146b..6f59efccb 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_string.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_string.rs @@ -25,6 +25,8 @@ use crate::{Evaluator, EvaluatorFactory, RuntimeInfo, StaticInfo}; /// /// Collect collects a stream of values into a List. A list is produced /// for each input value received, growing up to a maximum size. +/// +/// If the list is empty, an empty list is returned (rather than `null`). #[derive(Debug)] pub struct CollectStringEvaluator { /// The max size of the buffer. @@ -92,7 +94,7 @@ impl CollectStringEvaluator { self.ensure_entity_capacity(key_capacity); let input = input.as_string::(); - let mut builder = StringBuilder::new(); + let builder = StringBuilder::new(); let mut list_builder = ListBuilder::new(builder); izip!(entity_indices.values(), input).for_each(|(entity_index, input)| { @@ -103,8 +105,6 @@ impl CollectStringEvaluator { self.buffers[entity_index].pop_front(); } - // TODO: Empty is null or empty? - println!("Current Buffer: {:?}", self.buffers[entity_index]); list_builder.append_value(self.buffers[entity_index].clone()); }); diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_string_generic.rs b/crates/sparrow-instructions/src/evaluators/list/collect_string_generic.rs deleted file mode 100644 index d8dc76b1a..000000000 --- a/crates/sparrow-instructions/src/evaluators/list/collect_string_generic.rs +++ /dev/null @@ -1,113 +0,0 @@ -//! The cast instruction isn't a "normal" instruction since it doesn't have a -//! a single, fixed signature. Specifically, the input and output types depend -//! on the input to the instruction and the requested output type. - -use std::collections::VecDeque; -use std::sync::Arc; - -use anyhow::anyhow; -use arrow::array::{ - ArrayRef, GenericStringBuilder, Int32Array, IntervalDayTimeArray, IntervalYearMonthArray, - ListBuilder, OffsetSizeTrait, PrimitiveBuilder, -}; -use arrow::datatypes::{ArrowPrimitiveType, DataType}; -use arrow::downcast_primitive_array; -use sparrow_arrow::downcast::{downcast_primitive_array, downcast_string_array}; -use sparrow_arrow::scalar_value::ScalarValue; -use sparrow_plan::ValueRef; -use sparrow_syntax::FenlType; - -use crate::{Evaluator, EvaluatorFactory, RuntimeInfo, StaticInfo}; - -/// Evaluator for the `collect` instruction. -/// -/// Collect collects a stream of values into a List. A list is produced -/// for each input value received, growing up to a maximum size. -#[derive(Debug)] -pub struct CollectStringEvaluator<'a, O> -where - O: OffsetSizeTrait, -{ - /// The max size of the buffer. - /// - /// Once the max size is reached, the front will be popped and the new - /// value pushed to the back. - max: i64, - input: ValueRef, - tick: ValueRef, - duration: ValueRef, - buffer: VecDeque>, - - _phantom: std::marker::PhantomData, -} - -impl<'a, O> EvaluatorFactory for CollectStringEvaluator<'a, O> -where - O: OffsetSizeTrait, -{ - fn try_new(info: StaticInfo<'_>) -> anyhow::Result> { - let input_type = info.args[1].data_type(); - let result_type = info.result_type; - match result_type { - DataType::List(t) => anyhow::ensure!(t.data_type() == input_type), - other => anyhow::bail!("expected list result type, saw {:?}", other), - }; - - let max = match info.args[0].value_ref.literal_value() { - Some(ScalarValue::Int64(Some(v))) => v, - Some(other) => anyhow::bail!("expected i64 for max parameter, saw {:?}", other), - None => anyhow::bail!("expected literal value for max parameter"), - }; - - let (_, input, tick, duration) = info.unpack_arguments()?; - Ok(Box::new(Self { - max: *max, - input, - tick, - duration, - buffer: vec![].into(), - _phantom: std::marker::PhantomData, - })) - } -} - -impl<'a, O> Evaluator for CollectStringEvaluator<'a, O> -where - O: OffsetSizeTrait, -{ - fn evaluate(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { - let input = info.value(&self.input)?.array_ref()?; - - match (self.tick.is_literal_null(), self.duration.is_literal_null()) { - (true, true) => self.evaluate_non_windowed(info), - (true, false) => unimplemented!("since window aggregation unsupported"), - (false, false) => panic!("sliding window aggregation should use other evaluator"), - (_, _) => anyhow::bail!("saw invalid combination of tick and duration"), - } - } -} - -impl<'a, O> CollectStringEvaluator<'a, O> -where - O: OffsetSizeTrait, -{ - fn evaluate_non_windowed(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { - let input = info.value(&self.input)?.array_ref()?; - let input = downcast_string_array::(input.as_ref())?; - let mut builder = GenericStringBuilder::::new(); - let mut list_builder = ListBuilder::new(builder); - - input.into_iter().for_each(|i| { - self.buffer.push_back(i); - if self.buffer.len() > self.max as usize { - self.buffer.pop_front(); - } - - // TODO: Empty is null or empty? - list_builder.append_value(self.buffer.clone()); - list_builder.append(true); - }); - - Ok(Arc::new(list_builder.finish())) - } -} From 9af194e9e3520fc956f59fbc1a53492759085f64 Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Sat, 29 Jul 2023 16:21:22 -0700 Subject: [PATCH 06/13] clippy --- .../src/evaluators/list/collect_boolean.rs | 18 +++++++++--------- .../src/evaluators/list/collect_map.rs | 19 +++++++++---------- .../src/evaluators/list/collect_primitive.rs | 13 ++++--------- .../src/evaluators/list/collect_string.rs | 16 +++++----------- .../src/evaluators/list/index.rs | 2 -- 5 files changed, 27 insertions(+), 41 deletions(-) diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs b/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs index a0e8f86ee..c9eff8f9f 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs @@ -5,19 +5,19 @@ use std::collections::VecDeque; use std::sync::Arc; -use anyhow::anyhow; + use arrow::array::{ - ArrayRef, AsArray, BooleanBuilder, Int32Array, IntervalDayTimeArray, IntervalYearMonthArray, - ListBuilder, PrimitiveBuilder, + ArrayRef, AsArray, BooleanBuilder, + ListBuilder, }; -use arrow::datatypes::{ArrowPrimitiveType, DataType}; -use arrow::downcast_primitive_array; +use arrow::datatypes::{DataType}; + use itertools::izip; -use sparrow_arrow::downcast::{downcast_boolean_array, downcast_primitive_array}; + use sparrow_arrow::scalar_value::ScalarValue; -use sparrow_kernels::time::i64_to_two_i32; + use sparrow_plan::ValueRef; -use sparrow_syntax::FenlType; + use crate::{Evaluator, EvaluatorFactory, RuntimeInfo, StaticInfo}; @@ -62,7 +62,7 @@ impl EvaluatorFactory for CollectBooleanEvaluator { input, tick, duration, - buffers: vec![].into(), + buffers: vec![], })) } } diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_map.rs b/crates/sparrow-instructions/src/evaluators/list/collect_map.rs index d1a4ebddd..b4897c58e 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_map.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_map.rs @@ -2,19 +2,18 @@ //! a single, fixed signature. Specifically, the input and output types depend //! on the input to the instruction and the requested output type. -use std::collections::VecDeque; -use std::sync::Arc; -use anyhow::anyhow; + + + use arrow::array::{ - ArrayRef, Int32Array, IntervalDayTimeArray, IntervalYearMonthArray, ListBuilder, - PrimitiveBuilder, + ArrayRef, }; -use arrow::datatypes::{ArrowPrimitiveType, DataType}; -use arrow::downcast_primitive_array; -use sparrow_arrow::downcast::downcast_primitive_array; -use sparrow_arrow::scalar_value::ScalarValue; -use sparrow_kernels::time::i64_to_two_i32; + + + + + use sparrow_plan::ValueRef; use crate::{Evaluator, EvaluatorFactory, RuntimeInfo, StaticInfo}; diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs b/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs index 093be60a3..8db994a0b 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs @@ -5,19 +5,14 @@ use std::collections::VecDeque; use std::sync::Arc; -use anyhow::anyhow; -use arrow::array::{ - ArrayRef, Int32Array, IntervalDayTimeArray, IntervalYearMonthArray, ListBuilder, - PrimitiveBuilder, -}; +use arrow::array::{ArrayRef, ListBuilder, PrimitiveBuilder}; use arrow::datatypes::{ArrowPrimitiveType, DataType}; -use arrow::downcast_primitive_array; + use itertools::izip; use sparrow_arrow::downcast::downcast_primitive_array; use sparrow_arrow::scalar_value::ScalarValue; -use sparrow_kernels::time::i64_to_two_i32; + use sparrow_plan::ValueRef; -use sparrow_syntax::FenlType; use crate::{Evaluator, EvaluatorFactory, RuntimeInfo, StaticInfo}; @@ -68,7 +63,7 @@ where input, tick, duration, - buffers: vec![].into(), + buffers: vec![], })) } } diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_string.rs b/crates/sparrow-instructions/src/evaluators/list/collect_string.rs index 6f59efccb..1078bbe3d 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_string.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_string.rs @@ -5,19 +5,13 @@ use std::collections::VecDeque; use std::sync::Arc; -use anyhow::anyhow; -use arrow::array::{ - ArrayRef, AsArray, GenericStringBuilder, Int32Array, IntervalDayTimeArray, - IntervalYearMonthArray, ListBuilder, OffsetSizeTrait, PrimitiveBuilder, StringArray, - StringBuilder, -}; -use arrow::datatypes::{ArrowPrimitiveType, DataType}; -use arrow::downcast_primitive_array; +use arrow::array::{ArrayRef, AsArray, ListBuilder, StringBuilder}; +use arrow::datatypes::DataType; + use itertools::izip; -use sparrow_arrow::downcast::{downcast_primitive_array, downcast_string_array}; + use sparrow_arrow::scalar_value::ScalarValue; use sparrow_plan::ValueRef; -use sparrow_syntax::FenlType; use crate::{Evaluator, EvaluatorFactory, RuntimeInfo, StaticInfo}; @@ -62,7 +56,7 @@ impl EvaluatorFactory for CollectStringEvaluator { input, tick, duration, - buffers: vec![].into(), + buffers: vec![], })) } } diff --git a/crates/sparrow-instructions/src/evaluators/list/index.rs b/crates/sparrow-instructions/src/evaluators/list/index.rs index 1e237132e..718b6ddb6 100644 --- a/crates/sparrow-instructions/src/evaluators/list/index.rs +++ b/crates/sparrow-instructions/src/evaluators/list/index.rs @@ -42,8 +42,6 @@ impl Evaluator for IndexEvaluator { /// Given a `ListArray` and `index` array of the same length return an array of the values. fn list_get(list: &ArrayRef, indices: &Int64Array) -> anyhow::Result { - println!("List: {:?}", list); - println!("Indices: {:?}", indices); anyhow::ensure!(list.len() == indices.len()); let list = list.as_list(); From c4d8e1df5b4ff97d0ede0bc2b7d9a0b4632636fa Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Sun, 30 Jul 2023 21:13:49 -0700 Subject: [PATCH 07/13] Add state token --- crates/sparrow-compiler/src/ast_to_dfg.rs | 1 + .../src/evaluators/aggregation/token.rs | 2 + .../aggregation/token/collect_token.rs | 61 +++++++++++++++++++ .../src/evaluators/list/collect_boolean.rs | 47 ++++++-------- .../src/evaluators/list/collect_map.rs | 20 +----- .../src/evaluators/list/collect_primitive.rs | 37 ++++++----- .../src/evaluators/list/collect_string.rs | 38 ++++++------ 7 files changed, 122 insertions(+), 84 deletions(-) create mode 100644 crates/sparrow-instructions/src/evaluators/aggregation/token/collect_token.rs diff --git a/crates/sparrow-compiler/src/ast_to_dfg.rs b/crates/sparrow-compiler/src/ast_to_dfg.rs index b50aa04fb..56994e4e9 100644 --- a/crates/sparrow-compiler/src/ast_to_dfg.rs +++ b/crates/sparrow-compiler/src/ast_to_dfg.rs @@ -602,6 +602,7 @@ pub fn add_to_dfg( } } +#[allow(clippy::type_complexity)] fn flatten_window_args_if_needed( window: &Located>, dfg: &mut Dfg, diff --git a/crates/sparrow-instructions/src/evaluators/aggregation/token.rs b/crates/sparrow-instructions/src/evaluators/aggregation/token.rs index 766028640..72ff04303 100644 --- a/crates/sparrow-instructions/src/evaluators/aggregation/token.rs +++ b/crates/sparrow-instructions/src/evaluators/aggregation/token.rs @@ -1,6 +1,7 @@ //! Tokens representing keys for compute storage. mod boolean_accum_token; +mod collect_token; mod count_accum_token; pub mod lag_token; mod map_accum_token; @@ -12,6 +13,7 @@ mod two_stacks_primitive_accum_token; mod two_stacks_string_accum_token; pub use boolean_accum_token::*; +pub use collect_token::*; pub use count_accum_token::*; pub use map_accum_token::*; pub use primitive_accum_token::*; diff --git a/crates/sparrow-instructions/src/evaluators/aggregation/token/collect_token.rs b/crates/sparrow-instructions/src/evaluators/aggregation/token/collect_token.rs new file mode 100644 index 000000000..ad0dcb73d --- /dev/null +++ b/crates/sparrow-instructions/src/evaluators/aggregation/token/collect_token.rs @@ -0,0 +1,61 @@ +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::collections::VecDeque; + +use crate::{ComputeStore, StateToken, StoreKey}; + +/// State token used for the lag operator. +#[derive(Default, Debug)] +pub struct CollectToken +where + T: Clone, + T: Serialize + DeserializeOwned, + Vec>>: Serialize + DeserializeOwned, +{ + state: Vec>>, +} + +impl CollectToken +where + T: Clone, + T: Serialize + DeserializeOwned, + Vec>>: Serialize + DeserializeOwned, +{ + pub fn resize(&mut self, len: usize) { + if len >= self.state.len() { + self.state.resize(len + 1, VecDeque::new()); + } + } + + pub fn add_value(&mut self, max: usize, index: usize, input: Option) { + self.state[index].push_back(input); + if self.state[index].len() > max { + self.state[index].pop_front(); + } + } + + pub fn state(&self, index: usize) -> &VecDeque> { + &self.state[index] + } +} + +impl StateToken for CollectToken +where + T: Clone, + T: Serialize + DeserializeOwned, + Vec>>: Serialize + DeserializeOwned, +{ + fn restore(&mut self, key: &StoreKey, store: &ComputeStore) -> anyhow::Result<()> { + if let Some(state) = store.get(key)? { + self.state = state; + } else { + self.state.clear(); + } + + Ok(()) + } + + fn store(&self, key: &StoreKey, store: &ComputeStore) -> anyhow::Result<()> { + store.put(key, &self.state) + } +} diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs b/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs index c9eff8f9f..c6c325bef 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs @@ -1,25 +1,10 @@ -//! The cast instruction isn't a "normal" instruction since it doesn't have a -//! a single, fixed signature. Specifically, the input and output types depend -//! on the input to the instruction and the requested output type. - -use std::collections::VecDeque; -use std::sync::Arc; - - -use arrow::array::{ - ArrayRef, AsArray, BooleanBuilder, - ListBuilder, -}; -use arrow::datatypes::{DataType}; - +use crate::{CollectToken, Evaluator, EvaluatorFactory, RuntimeInfo, StateToken, StaticInfo}; +use arrow::array::{ArrayRef, AsArray, BooleanBuilder, ListBuilder}; +use arrow::datatypes::DataType; use itertools::izip; - use sparrow_arrow::scalar_value::ScalarValue; - use sparrow_plan::ValueRef; - - -use crate::{Evaluator, EvaluatorFactory, RuntimeInfo, StaticInfo}; +use std::sync::Arc; /// Evaluator for the `collect` instruction. /// @@ -38,7 +23,7 @@ pub struct CollectBooleanEvaluator { tick: ValueRef, duration: ValueRef, /// Contains the buffer of values for each entity - buffers: Vec>>, + token: CollectToken, } impl EvaluatorFactory for CollectBooleanEvaluator { @@ -62,7 +47,7 @@ impl EvaluatorFactory for CollectBooleanEvaluator { input, tick, duration, - buffers: vec![], + token: CollectToken::default(), })) } } @@ -76,13 +61,19 @@ impl Evaluator for CollectBooleanEvaluator { (_, _) => anyhow::bail!("saw invalid combination of tick and duration"), } } + + fn state_token(&self) -> Option<&dyn StateToken> { + Some(&self.token) + } + + fn state_token_mut(&mut self) -> Option<&mut dyn StateToken> { + Some(&mut self.token) + } } impl CollectBooleanEvaluator { fn ensure_entity_capacity(&mut self, len: usize) { - if len >= self.buffers.len() { - self.buffers.resize(len + 1, VecDeque::new()); - } + self.token.resize(len) } fn evaluate_non_windowed(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { @@ -100,12 +91,10 @@ impl CollectBooleanEvaluator { izip!(entity_indices.values(), input).for_each(|(entity_index, input)| { let entity_index = *entity_index as usize; - self.buffers[entity_index].push_back(input); - if self.buffers[entity_index].len() > self.max as usize { - self.buffers[entity_index].pop_front(); - } + self.token.add_value(self.max as usize, entity_index, input); + let cur_list = self.token.state(entity_index); - list_builder.append_value(self.buffers[entity_index].clone()); + list_builder.append_value(cur_list.clone()); }); Ok(Arc::new(list_builder.finish())) diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_map.rs b/crates/sparrow-instructions/src/evaluators/list/collect_map.rs index b4897c58e..a0ecea95b 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_map.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_map.rs @@ -1,22 +1,6 @@ -//! The cast instruction isn't a "normal" instruction since it doesn't have a -//! a single, fixed signature. Specifically, the input and output types depend -//! on the input to the instruction and the requested output type. - - - - - -use arrow::array::{ - ArrayRef, -}; - - - - - -use sparrow_plan::ValueRef; - use crate::{Evaluator, EvaluatorFactory, RuntimeInfo, StaticInfo}; +use arrow::array::ArrayRef; +use sparrow_plan::ValueRef; /// Evaluator for the `collect` instruction. /// diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs b/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs index 8db994a0b..d82103fb3 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs @@ -1,20 +1,17 @@ -//! The cast instruction isn't a "normal" instruction since it doesn't have a -//! a single, fixed signature. Specifically, the input and output types depend -//! on the input to the instruction and the requested output type. - -use std::collections::VecDeque; use std::sync::Arc; use arrow::array::{ArrayRef, ListBuilder, PrimitiveBuilder}; use arrow::datatypes::{ArrowPrimitiveType, DataType}; use itertools::izip; +use serde::de::DeserializeOwned; +use serde::Serialize; use sparrow_arrow::downcast::downcast_primitive_array; use sparrow_arrow::scalar_value::ScalarValue; use sparrow_plan::ValueRef; -use crate::{Evaluator, EvaluatorFactory, RuntimeInfo, StaticInfo}; +use crate::{CollectToken, Evaluator, EvaluatorFactory, RuntimeInfo, StateToken, StaticInfo}; /// Evaluator for the `collect` instruction. /// @@ -26,6 +23,7 @@ use crate::{Evaluator, EvaluatorFactory, RuntimeInfo, StaticInfo}; pub struct CollectPrimitiveEvaluator where T: ArrowPrimitiveType, + T::Native: Serialize + DeserializeOwned + Copy, { /// The max size of the buffer. /// @@ -36,12 +34,13 @@ where tick: ValueRef, duration: ValueRef, /// Contains the buffer of values for each entity - buffers: Vec>>, + token: CollectToken, } impl EvaluatorFactory for CollectPrimitiveEvaluator where T: ArrowPrimitiveType + Send + Sync, + T::Native: Serialize + DeserializeOwned + Copy, { fn try_new(info: StaticInfo<'_>) -> anyhow::Result> { let input_type = info.args[1].data_type(); @@ -63,7 +62,7 @@ where input, tick, duration, - buffers: vec![], + token: CollectToken::default(), })) } } @@ -71,6 +70,7 @@ where impl Evaluator for CollectPrimitiveEvaluator where T: ArrowPrimitiveType + Send + Sync, + T::Native: Serialize + DeserializeOwned + Copy, { fn evaluate(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { match (self.tick.is_literal_null(), self.duration.is_literal_null()) { @@ -80,16 +80,23 @@ where (_, _) => anyhow::bail!("saw invalid combination of tick and duration"), } } + + fn state_token(&self) -> Option<&dyn StateToken> { + Some(&self.token) + } + + fn state_token_mut(&mut self) -> Option<&mut dyn StateToken> { + Some(&mut self.token) + } } impl CollectPrimitiveEvaluator where T: ArrowPrimitiveType + Send + Sync, + T::Native: Serialize + DeserializeOwned + Copy, { fn ensure_entity_capacity(&mut self, len: usize) { - if len >= self.buffers.len() { - self.buffers.resize(len + 1, VecDeque::new()); - } + self.token.resize(len) } fn evaluate_non_windowed(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { @@ -107,12 +114,10 @@ where izip!(entity_indices.values(), input).for_each(|(entity_index, input)| { let entity_index = *entity_index as usize; - self.buffers[entity_index].push_back(input); - if self.buffers[entity_index].len() > self.max as usize { - self.buffers[entity_index].pop_front(); - } + self.token.add_value(self.max as usize, entity_index, input); + let cur_list = self.token.state(entity_index); - list_builder.append_value(self.buffers[entity_index].clone()); + list_builder.append_value(cur_list.clone()); }); Ok(Arc::new(list_builder.finish())) diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_string.rs b/crates/sparrow-instructions/src/evaluators/list/collect_string.rs index 1078bbe3d..02129f429 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_string.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_string.rs @@ -1,19 +1,10 @@ -//! The cast instruction isn't a "normal" instruction since it doesn't have a -//! a single, fixed signature. Specifically, the input and output types depend -//! on the input to the instruction and the requested output type. - -use std::collections::VecDeque; -use std::sync::Arc; - +use crate::{CollectToken, Evaluator, EvaluatorFactory, RuntimeInfo, StateToken, StaticInfo}; use arrow::array::{ArrayRef, AsArray, ListBuilder, StringBuilder}; use arrow::datatypes::DataType; - use itertools::izip; - use sparrow_arrow::scalar_value::ScalarValue; use sparrow_plan::ValueRef; - -use crate::{Evaluator, EvaluatorFactory, RuntimeInfo, StaticInfo}; +use std::sync::Arc; /// Evaluator for the `collect` instruction. /// @@ -32,7 +23,7 @@ pub struct CollectStringEvaluator { tick: ValueRef, duration: ValueRef, /// Contains the buffer of values for each entity - buffers: Vec>>, + token: CollectToken, } impl EvaluatorFactory for CollectStringEvaluator { @@ -56,7 +47,7 @@ impl EvaluatorFactory for CollectStringEvaluator { input, tick, duration, - buffers: vec![], + token: CollectToken::default(), })) } } @@ -70,13 +61,19 @@ impl Evaluator for CollectStringEvaluator { (_, _) => anyhow::bail!("saw invalid combination of tick and duration"), } } + + fn state_token(&self) -> Option<&dyn StateToken> { + Some(&self.token) + } + + fn state_token_mut(&mut self) -> Option<&mut dyn StateToken> { + Some(&mut self.token) + } } impl CollectStringEvaluator { fn ensure_entity_capacity(&mut self, len: usize) { - if len >= self.buffers.len() { - self.buffers.resize(len + 1, VecDeque::new()); - } + self.token.resize(len) } fn evaluate_non_windowed(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { @@ -94,12 +91,11 @@ impl CollectStringEvaluator { izip!(entity_indices.values(), input).for_each(|(entity_index, input)| { let entity_index = *entity_index as usize; - self.buffers[entity_index].push_back(input.map(|i| i.to_owned())); - if self.buffers[entity_index].len() > self.max as usize { - self.buffers[entity_index].pop_front(); - } + self.token + .add_value(self.max as usize, entity_index, input.map(|s| s.to_owned())); + let cur_list = self.token.state(entity_index); - list_builder.append_value(self.buffers[entity_index].clone()); + list_builder.append_value(cur_list.clone()); }); Ok(Arc::new(list_builder.finish())) From 5bdcbfae2e2b87abae128a960b129c81214f70f5 Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Sun, 30 Jul 2023 21:59:46 -0700 Subject: [PATCH 08/13] Allow to indicate unlimited size --- .../src/functions/collection.rs | 1 - .../src/evaluators/list/collect_boolean.rs | 12 ++++++++--- .../src/evaluators/list/collect_primitive.rs | 12 ++++++++--- .../src/evaluators/list/collect_string.rs | 14 +++++++++---- .../sparrow-main/tests/e2e/collect_tests.rs | 21 +++++++++++++++++++ 5 files changed, 49 insertions(+), 11 deletions(-) diff --git a/crates/sparrow-compiler/src/functions/collection.rs b/crates/sparrow-compiler/src/functions/collection.rs index 925d40e42..f28129e9a 100644 --- a/crates/sparrow-compiler/src/functions/collection.rs +++ b/crates/sparrow-compiler/src/functions/collection.rs @@ -13,7 +13,6 @@ pub(super) fn register(registry: &mut Registry) { .with_implementation(Implementation::Instruction(InstOp::Index)) .set_internal(); - // TODO: Make MAX default to something? registry .register("collect(const max: i64, input: T, window: window = null) -> list") .with_implementation(Implementation::Instruction(InstOp::Collect)) diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs b/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs index c6c325bef..7bf026d50 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs @@ -18,7 +18,7 @@ pub struct CollectBooleanEvaluator { /// /// Once the max size is reached, the front will be popped and the new /// value pushed to the back. - max: i64, + max: usize, input: ValueRef, tick: ValueRef, duration: ValueRef, @@ -36,7 +36,13 @@ impl EvaluatorFactory for CollectBooleanEvaluator { }; let max = match info.args[0].value_ref.literal_value() { - Some(ScalarValue::Int64(Some(v))) => *v, + Some(ScalarValue::Int64(Some(v))) if *v <= 0 => { + anyhow::bail!("unexpected value of `max` -- must be > 0") + } + Some(ScalarValue::Int64(Some(v))) => *v as usize, + // If a user specifies `max = null`, we use usize::MAX value as a way + // to have an "unlimited" buffer. + Some(ScalarValue::Int64(None)) => usize::MAX, Some(other) => anyhow::bail!("expected i64 for max parameter, saw {:?}", other), None => anyhow::bail!("expected literal value for max parameter"), }; @@ -91,7 +97,7 @@ impl CollectBooleanEvaluator { izip!(entity_indices.values(), input).for_each(|(entity_index, input)| { let entity_index = *entity_index as usize; - self.token.add_value(self.max as usize, entity_index, input); + self.token.add_value(self.max, entity_index, input); let cur_list = self.token.state(entity_index); list_builder.append_value(cur_list.clone()); diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs b/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs index d82103fb3..1b9244095 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs @@ -29,7 +29,7 @@ where /// /// Once the max size is reached, the front will be popped and the new /// value pushed to the back. - max: i64, + max: usize, input: ValueRef, tick: ValueRef, duration: ValueRef, @@ -51,7 +51,13 @@ where }; let max = match info.args[0].value_ref.literal_value() { - Some(ScalarValue::Int64(Some(v))) => *v, + Some(ScalarValue::Int64(Some(v))) if *v <= 0 => { + anyhow::bail!("unexpected value of `max` -- must be > 0") + } + Some(ScalarValue::Int64(Some(v))) => *v as usize, + // If a user specifies `max = null`, we use usize::MAX value as a way + // to have an "unlimited" buffer. + Some(ScalarValue::Int64(None)) => usize::MAX, Some(other) => anyhow::bail!("expected i64 for max parameter, saw {:?}", other), None => anyhow::bail!("expected literal value for max parameter"), }; @@ -114,7 +120,7 @@ where izip!(entity_indices.values(), input).for_each(|(entity_index, input)| { let entity_index = *entity_index as usize; - self.token.add_value(self.max as usize, entity_index, input); + self.token.add_value(self.max, entity_index, input); let cur_list = self.token.state(entity_index); list_builder.append_value(cur_list.clone()); diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_string.rs b/crates/sparrow-instructions/src/evaluators/list/collect_string.rs index 02129f429..da2af5a25 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_string.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_string.rs @@ -18,7 +18,7 @@ pub struct CollectStringEvaluator { /// /// Once the max size is reached, the front will be popped and the new /// value pushed to the back. - max: i64, + max: usize, input: ValueRef, tick: ValueRef, duration: ValueRef, @@ -35,8 +35,14 @@ impl EvaluatorFactory for CollectStringEvaluator { other => anyhow::bail!("expected list result type, saw {:?}", other), }; - let max = match &info.args[0].value_ref.literal_value() { - Some(ScalarValue::Int64(Some(v))) => *v, + let max = match info.args[0].value_ref.literal_value() { + Some(ScalarValue::Int64(Some(v))) if *v <= 0 => { + anyhow::bail!("unexpected value of `max` -- must be > 0") + } + Some(ScalarValue::Int64(Some(v))) => *v as usize, + // If a user specifies `max = null`, we use usize::MAX value as a way + // to have an "unlimited" buffer. + Some(ScalarValue::Int64(None)) => usize::MAX, Some(other) => anyhow::bail!("expected i64 for max parameter, saw {:?}", other), None => anyhow::bail!("expected literal value for max parameter"), }; @@ -92,7 +98,7 @@ impl CollectStringEvaluator { let entity_index = *entity_index as usize; self.token - .add_value(self.max as usize, entity_index, input.map(|s| s.to_owned())); + .add_value(self.max, entity_index, input.map(|s| s.to_owned())); let cur_list = self.token.state(entity_index); list_builder.append_value(cur_list.clone()); diff --git a/crates/sparrow-main/tests/e2e/collect_tests.rs b/crates/sparrow-main/tests/e2e/collect_tests.rs index 5c61cc70c..f1ffafa08 100644 --- a/crates/sparrow-main/tests/e2e/collect_tests.rs +++ b/crates/sparrow-main/tests/e2e/collect_tests.rs @@ -39,6 +39,27 @@ pub(crate) async fn collect_data_fixture() -> DataFixture { .unwrap() } +#[tokio::test] +async fn test_collect_with_null_max() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Collect.n | collect(max = null) | index(0), f2: Collect.b | collect(max = null) | index(0), f3: Collect.s | collect(max = null) | index(0) }").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1,f2,f3 + 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,0,true,hEllo + 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,0,true,hEllo + 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A,0,true,hEllo + 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,0,true,hEllo + 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A,0,true,hEllo + 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,5,false,h + 1996-12-21T00:41:57.000000000,9223372036854775808,2867199309159137213,B,5,false,h + 1996-12-21T00:42:57.000000000,9223372036854775808,2867199309159137213,B,5,false,h + 1996-12-21T00:43:57.000000000,9223372036854775808,2867199309159137213,B,5,false,h + 1996-12-21T00:44:57.000000000,9223372036854775808,2867199309159137213,B,5,false,h + 1996-12-22T00:44:57.000000000,9223372036854775808,2521269998124177631,C,1,true,g + 1996-12-22T00:45:57.000000000,9223372036854775808,2521269998124177631,C,1,true,g + 1996-12-22T00:46:57.000000000,9223372036854775808,2521269998124177631,C,1,true,g + 1996-12-22T00:47:57.000000000,9223372036854775808,2521269998124177631,C,1,true,g + "###); +} + #[tokio::test] async fn test_collect_to_list_i64() { insta::assert_snapshot!(QueryFixture::new("{ f1: Collect.n | collect(10) | index(0) }").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###" From 2400de5ad1665c9c99457ac3d5ca67e352ec38b3 Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Mon, 31 Jul 2023 08:32:56 -0700 Subject: [PATCH 09/13] golang proto incorrect type --- tests/integration/api/api_suite_test.go | 22 +++++++++++++++++++ .../api/query_v1_incremental_test.go | 4 +++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/tests/integration/api/api_suite_test.go b/tests/integration/api/api_suite_test.go index c25b987b7..7c85ff529 100644 --- a/tests/integration/api/api_suite_test.go +++ b/tests/integration/api/api_suite_test.go @@ -204,6 +204,28 @@ func primitiveSchemaField(name string, primitiveType v1alpha.DataType_PrimitiveT } } +func listSchemaField(name string, primitiveType v1alpha.DataType_Primitive) *v1alpha.Schema_Field { + return &v1alpha.Schema_Field{ + Name: name, + DataType: &v1alpha.DataType{ + Kind: &v1alpha.DataType_KIND_List{ + List: &v1alpha.DataType_List{ + Name: name, + ItemType: &v1alpha.DataType{ + Kind: primitiveType, + // &v1alpha.DataType_Primitive{ + // Primitive: primitiveType, + // }, + }, + Nullable: true, + }, + }, + }, + Nullable: true, + } + +} + func getRemotePulsarHostname() string { if os.Getenv("ENV") == "local-local" { return "localhost" diff --git a/tests/integration/api/query_v1_incremental_test.go b/tests/integration/api/query_v1_incremental_test.go index 1c298b6b2..1991d57ab 100644 --- a/tests/integration/api/query_v1_incremental_test.go +++ b/tests/integration/api/query_v1_incremental_test.go @@ -19,7 +19,7 @@ import ( "github.com/jt-nti/gproto" ) -var _ = Describe("Query V1 with incremental", Ordered, func() { +var _ = FDescribe("Query V1 with incremental", Ordered, func() { var ( ctx context.Context cancel context.CancelFunc @@ -79,6 +79,7 @@ var _ = Describe("Query V1 with incremental", Ordered, func() { entity: table.customer_id, max_amount: table.amount | max(), min_amount: table.amount | min(), + collect: table.amount | collect(2), }` expressionIncremental = strings.ReplaceAll(expressionTemplate, "table", tableIncremental.TableName) @@ -156,6 +157,7 @@ var _ = Describe("Query V1 with incremental", Ordered, func() { gproto.Equal(primitiveSchemaField("entity", v1alpha.DataType_PRIMITIVE_TYPE_STRING)), gproto.Equal(primitiveSchemaField("max_amount", v1alpha.DataType_PRIMITIVE_TYPE_I64)), gproto.Equal(primitiveSchemaField("min_amount", v1alpha.DataType_PRIMITIVE_TYPE_I64)), + gproto.Equal(primitiveSchemaField("collect", v1alpha.DataType_PRIMITIVE_TYPE_)), )) _, err = uuid.Parse(secondResponse.QueryId) Expect(err).Should(BeNil()) From 68c3a2a9ee0f777172d4a62a30a3e951e9635832 Mon Sep 17 00:00:00 2001 From: Kevin J Nguyen Date: Mon, 31 Jul 2023 10:37:59 -0500 Subject: [PATCH 10/13] updated tests --- tests/integration/api/api_suite_test.go | 11 +++++------ tests/integration/api/query_v1_incremental_test.go | 2 +- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/tests/integration/api/api_suite_test.go b/tests/integration/api/api_suite_test.go index 7c85ff529..067469941 100644 --- a/tests/integration/api/api_suite_test.go +++ b/tests/integration/api/api_suite_test.go @@ -204,18 +204,17 @@ func primitiveSchemaField(name string, primitiveType v1alpha.DataType_PrimitiveT } } -func listSchemaField(name string, primitiveType v1alpha.DataType_Primitive) *v1alpha.Schema_Field { +func listSchemaField(name string, primitiveType v1alpha.DataType_PrimitiveType) *v1alpha.Schema_Field { return &v1alpha.Schema_Field{ Name: name, DataType: &v1alpha.DataType{ - Kind: &v1alpha.DataType_KIND_List{ + Kind: &v1alpha.DataType_List_{ List: &v1alpha.DataType_List{ Name: name, ItemType: &v1alpha.DataType{ - Kind: primitiveType, - // &v1alpha.DataType_Primitive{ - // Primitive: primitiveType, - // }, + Kind: &v1alpha.DataType_Primitive{ + Primitive: primitiveType, + }, }, Nullable: true, }, diff --git a/tests/integration/api/query_v1_incremental_test.go b/tests/integration/api/query_v1_incremental_test.go index 1991d57ab..61f744ee6 100644 --- a/tests/integration/api/query_v1_incremental_test.go +++ b/tests/integration/api/query_v1_incremental_test.go @@ -157,7 +157,7 @@ var _ = FDescribe("Query V1 with incremental", Ordered, func() { gproto.Equal(primitiveSchemaField("entity", v1alpha.DataType_PRIMITIVE_TYPE_STRING)), gproto.Equal(primitiveSchemaField("max_amount", v1alpha.DataType_PRIMITIVE_TYPE_I64)), gproto.Equal(primitiveSchemaField("min_amount", v1alpha.DataType_PRIMITIVE_TYPE_I64)), - gproto.Equal(primitiveSchemaField("collect", v1alpha.DataType_PRIMITIVE_TYPE_)), + gproto.Equal(primitiveSchemaField("collect", v1alpha.DataType_PRIMITIVE_TYPE_BOOL)), )) _, err = uuid.Parse(secondResponse.QueryId) Expect(err).Should(BeNil()) From 4e340b92a984258f7829a171d1cefb2c105b0e62 Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Mon, 31 Jul 2023 10:40:07 -0700 Subject: [PATCH 11/13] use copied instead of clone for values --- .../src/evaluators/list/collect_boolean.rs | 4 ++-- .../src/evaluators/list/collect_primitive.rs | 2 +- .../src/evaluators/list/collect_string.rs | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs b/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs index 7bf026d50..7e0945dd5 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs @@ -8,7 +8,7 @@ use std::sync::Arc; /// Evaluator for the `collect` instruction. /// -/// Collect collects a stream of values into a List. A list is produced +/// Collects a stream of values into a List. A list is produced /// for each input value received, growing up to a maximum size. /// /// If the list is empty, an empty list is returned (rather than `null`). @@ -100,7 +100,7 @@ impl CollectBooleanEvaluator { self.token.add_value(self.max, entity_index, input); let cur_list = self.token.state(entity_index); - list_builder.append_value(cur_list.clone()); + list_builder.append_value(cur_list.iter().copied()); }); Ok(Arc::new(list_builder.finish())) diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs b/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs index 1b9244095..3dcf31b57 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs @@ -123,7 +123,7 @@ where self.token.add_value(self.max, entity_index, input); let cur_list = self.token.state(entity_index); - list_builder.append_value(cur_list.clone()); + list_builder.append_value(cur_list.iter().copied()); }); Ok(Arc::new(list_builder.finish())) diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_string.rs b/crates/sparrow-instructions/src/evaluators/list/collect_string.rs index da2af5a25..580ce0979 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_string.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_string.rs @@ -8,7 +8,7 @@ use std::sync::Arc; /// Evaluator for the `collect` instruction. /// -/// Collect collects a stream of values into a List. A list is produced +/// Collects a stream of values into a List. A list is produced /// for each input value received, growing up to a maximum size. /// /// If the list is empty, an empty list is returned (rather than `null`). @@ -101,7 +101,7 @@ impl CollectStringEvaluator { .add_value(self.max, entity_index, input.map(|s| s.to_owned())); let cur_list = self.token.state(entity_index); - list_builder.append_value(cur_list.clone()); + list_builder.append_value(cur_list.iter().copied()); }); Ok(Arc::new(list_builder.finish())) From 5622d0b3568223b01fbc1ccd02bac0bb4ea502d5 Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Mon, 31 Jul 2023 10:55:07 -0700 Subject: [PATCH 12/13] remove integ tests changes --- tests/integration/api/api_suite_test.go | 21 ------------------- .../api/query_v1_incremental_test.go | 4 +--- 2 files changed, 1 insertion(+), 24 deletions(-) diff --git a/tests/integration/api/api_suite_test.go b/tests/integration/api/api_suite_test.go index 067469941..c25b987b7 100644 --- a/tests/integration/api/api_suite_test.go +++ b/tests/integration/api/api_suite_test.go @@ -204,27 +204,6 @@ func primitiveSchemaField(name string, primitiveType v1alpha.DataType_PrimitiveT } } -func listSchemaField(name string, primitiveType v1alpha.DataType_PrimitiveType) *v1alpha.Schema_Field { - return &v1alpha.Schema_Field{ - Name: name, - DataType: &v1alpha.DataType{ - Kind: &v1alpha.DataType_List_{ - List: &v1alpha.DataType_List{ - Name: name, - ItemType: &v1alpha.DataType{ - Kind: &v1alpha.DataType_Primitive{ - Primitive: primitiveType, - }, - }, - Nullable: true, - }, - }, - }, - Nullable: true, - } - -} - func getRemotePulsarHostname() string { if os.Getenv("ENV") == "local-local" { return "localhost" diff --git a/tests/integration/api/query_v1_incremental_test.go b/tests/integration/api/query_v1_incremental_test.go index 61f744ee6..1c298b6b2 100644 --- a/tests/integration/api/query_v1_incremental_test.go +++ b/tests/integration/api/query_v1_incremental_test.go @@ -19,7 +19,7 @@ import ( "github.com/jt-nti/gproto" ) -var _ = FDescribe("Query V1 with incremental", Ordered, func() { +var _ = Describe("Query V1 with incremental", Ordered, func() { var ( ctx context.Context cancel context.CancelFunc @@ -79,7 +79,6 @@ var _ = FDescribe("Query V1 with incremental", Ordered, func() { entity: table.customer_id, max_amount: table.amount | max(), min_amount: table.amount | min(), - collect: table.amount | collect(2), }` expressionIncremental = strings.ReplaceAll(expressionTemplate, "table", tableIncremental.TableName) @@ -157,7 +156,6 @@ var _ = FDescribe("Query V1 with incremental", Ordered, func() { gproto.Equal(primitiveSchemaField("entity", v1alpha.DataType_PRIMITIVE_TYPE_STRING)), gproto.Equal(primitiveSchemaField("max_amount", v1alpha.DataType_PRIMITIVE_TYPE_I64)), gproto.Equal(primitiveSchemaField("min_amount", v1alpha.DataType_PRIMITIVE_TYPE_I64)), - gproto.Equal(primitiveSchemaField("collect", v1alpha.DataType_PRIMITIVE_TYPE_BOOL)), )) _, err = uuid.Parse(secondResponse.QueryId) Expect(err).Should(BeNil()) From 24ba56ea02dfc9f89a7dadbb5556d3ceb479d14f Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Mon, 31 Jul 2023 12:03:59 -0700 Subject: [PATCH 13/13] use cloned for strings --- .../sparrow-instructions/src/evaluators/list/collect_string.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_string.rs b/crates/sparrow-instructions/src/evaluators/list/collect_string.rs index 580ce0979..b60e0aa34 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_string.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_string.rs @@ -101,7 +101,7 @@ impl CollectStringEvaluator { .add_value(self.max, entity_index, input.map(|s| s.to_owned())); let cur_list = self.token.state(entity_index); - list_builder.append_value(cur_list.iter().copied()); + list_builder.append_value(cur_list.clone()); }); Ok(Arc::new(list_builder.finish()))