From 07f5c6df7fab592f4e518b141d07e5dcc5bab96b Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Wed, 26 Jul 2023 18:31:02 -0700 Subject: [PATCH 01/10] List type inference and index() impl --- .../src/kaskada/v1alpha/schema_traits.rs | 49 ++++- crates/sparrow-arrow/src/downcast.rs | 15 +- crates/sparrow-compiler/src/ast_to_dfg.rs | 15 ++ .../src/functions/collection.rs | 5 + .../src/plan/operation_to_plan.rs | 12 +- .../sparrow-compiler/src/types/inference.rs | 41 +++- .../src/columnar_value.rs | 15 +- crates/sparrow-instructions/src/evaluators.rs | 5 +- .../src/evaluators/list.rs | 2 + .../src/evaluators/list/get.rs | 130 +++++++++++++ .../src/evaluators/map/get.rs | 6 +- crates/sparrow-main/tests/e2e/list_tests.rs | 178 ++++++++++++++++++ crates/sparrow-main/tests/e2e/main.rs | 1 + crates/sparrow-plan/src/inst.rs | 8 +- crates/sparrow-syntax/src/syntax/fenl_type.rs | 2 - proto/kaskada/kaskada/v1alpha/schema.proto | 8 +- 16 files changed, 457 insertions(+), 35 deletions(-) create mode 100644 crates/sparrow-instructions/src/evaluators/list.rs create mode 100644 crates/sparrow-instructions/src/evaluators/list/get.rs create mode 100644 crates/sparrow-main/tests/e2e/list_tests.rs diff --git a/crates/sparrow-api/src/kaskada/v1alpha/schema_traits.rs b/crates/sparrow-api/src/kaskada/v1alpha/schema_traits.rs index 762a95a99..e444bb23e 100644 --- a/crates/sparrow-api/src/kaskada/v1alpha/schema_traits.rs +++ b/crates/sparrow-api/src/kaskada/v1alpha/schema_traits.rs @@ -24,7 +24,7 @@ impl DataType { let value = &fields[1]; Self { kind: Some(data_type::Kind::Map(Box::new(data_type::Map { - name: name.to_string(), + name: name.to_owned(), ordered, key_name: key.name.clone(), key_type: Some(Box::new( @@ -44,6 +44,22 @@ impl DataType { } } + pub fn new_list(name: &str, field: schema::Field) -> Self { + Self { + kind: Some(data_type::Kind::List(Box::new(data_type::List { + name: name.to_owned(), + item_type: Some(Box::new( + field + .data_type + .as_ref() + .expect("data type to exist") + .clone(), + )), + nullable: field.nullable, + }))), + } + } + pub fn new_primitive(primitive: data_type::PrimitiveType) -> Self { Self { kind: Some(data_type::Kind::Primitive(primitive as i32)), @@ -221,6 +237,21 @@ impl TryFrom<&arrow::datatypes::DataType> for DataType { Ok(DataType::new_map(s.name(), *is_ordered, vec![key, value])) } + arrow::datatypes::DataType::List(field) => { + let name = field.name(); + let field = schema::Field { + name: name.to_owned(), + data_type: Some(field.data_type().try_into().map_err( + |err: ConversionError| { + err.with_prepend_field("list item".to_owned()) + }, + )?), + nullable: field.is_nullable(), + }; + + Ok(DataType::new_list(name, field)) + } + unsupported => Err(ConversionError::new_unsupported(unsupported.clone())), } } @@ -337,12 +368,16 @@ impl TryFrom<&DataType> for arrow::datatypes::DataType { Some(data_type::Kind::Struct(schema)) => Ok(arrow::datatypes::DataType::Struct( fields_to_arrow(&schema.fields)?.into(), )), - Some(data_type::Kind::List(item_type)) => { - let item_type = arrow::datatypes::DataType::try_from(item_type.as_ref()) - .map_err(|e| e.with_prepend_field("list item".to_owned()))?; - let item_type = arrow::datatypes::Field::new("item", item_type, true); - Ok(arrow::datatypes::DataType::List(Arc::new(item_type))) - } + Some(data_type::Kind::List(list)) => match list.item_type.as_ref() { + Some(item_type) => { + let item_type = arrow::datatypes::DataType::try_from(item_type.as_ref()) + .map_err(|e| e.with_prepend_field("list item".to_owned()))?; + let item_type = + arrow::datatypes::Field::new(list.name.clone(), item_type, list.nullable); + Ok(arrow::datatypes::DataType::List(Arc::new(item_type))) + } + None => Err(ConversionError::new_unsupported(value.clone())), + }, Some(data_type::Kind::Map(map)) => { match (map.key_type.as_ref(), map.value_type.as_ref()) { (Some(key), Some(value)) => { diff --git a/crates/sparrow-arrow/src/downcast.rs b/crates/sparrow-arrow/src/downcast.rs index ce0eabada..3989dd522 100644 --- a/crates/sparrow-arrow/src/downcast.rs +++ b/crates/sparrow-arrow/src/downcast.rs @@ -24,13 +24,6 @@ pub fn downcast_primitive_array( }) } -pub fn downcast_list_array(array: &dyn Array) -> anyhow::Result<&ListArray> { - array - .as_any() - .downcast_ref::() - .with_context(|| format!("Unable to downcast {:?} to ListArray", array.data_type())) -} - /// Downcast an array into a string array. pub fn downcast_string_array(array: &dyn Array) -> anyhow::Result<&GenericStringArray> where @@ -56,6 +49,14 @@ pub fn downcast_struct_array(array: &dyn Array) -> anyhow::Result<&StructArray> .with_context(|| format!("Unable to downcast {:?} to struct array", array.data_type())) } +/// Downcast an `ArrayRef` to a `ListArray`. +pub fn downcast_list_array(array: &dyn Array) -> anyhow::Result<&ListArray> { + array + .as_any() + .downcast_ref::() + .with_context(|| format!("Unable to downcast {:?} to list array", array.data_type())) +} + /// Downcast an `ArrayRef` to a `MapArray`. pub fn downcast_map_array(array: &dyn Array) -> anyhow::Result<&MapArray> { array diff --git a/crates/sparrow-compiler/src/ast_to_dfg.rs b/crates/sparrow-compiler/src/ast_to_dfg.rs index 490d3d20a..35582c1e8 100644 --- a/crates/sparrow-compiler/src/ast_to_dfg.rs +++ b/crates/sparrow-compiler/src/ast_to_dfg.rs @@ -653,6 +653,13 @@ fn cast_if_needed( { Ok(value) } + // Ensures that list types with the same inner types are compatible, regardless of the (arbitary) field naming. + (FenlType::Concrete(DataType::List(s)), FenlType::Concrete(DataType::List(s2))) + if list_types_are_equal(s, s2) => + { + Ok(value) + } + (FenlType::Concrete(DataType::Null), FenlType::Window) => Ok(value), ( FenlType::Concrete(DataType::Struct(actual_fields)), @@ -712,6 +719,14 @@ fn map_types_are_equal(a: &FieldRef, b: &FieldRef) -> bool { } } +// When constructing the concrete list during inference, we use arbitary names for the inner data +// field since we don't have access to the user's naming patterns there. +// By comparing the list types based on just the inner type, we can ensure that the types are +// still treated as equal. +fn list_types_are_equal(a: &FieldRef, b: &FieldRef) -> bool { + a.data_type() == b.data_type() +} + pub(crate) fn is_any_new(dfg: &mut Dfg, arguments: &[Located]) -> anyhow::Result { let mut argument_is_new = arguments.iter().map(|a| a.is_new()).unique(); let mut result = argument_is_new diff --git a/crates/sparrow-compiler/src/functions/collection.rs b/crates/sparrow-compiler/src/functions/collection.rs index 2b5d29b4c..22a3ff4cc 100644 --- a/crates/sparrow-compiler/src/functions/collection.rs +++ b/crates/sparrow-compiler/src/functions/collection.rs @@ -7,4 +7,9 @@ pub(super) fn register(registry: &mut Registry) { .register("get(key: K, map: map) -> V") .with_implementation(Implementation::Instruction(InstOp::Get)) .set_internal(); + + registry + .register("index(index: i64, list: list) -> T") + .with_implementation(Implementation::Instruction(InstOp::Index)) + .set_internal(); } diff --git a/crates/sparrow-compiler/src/plan/operation_to_plan.rs b/crates/sparrow-compiler/src/plan/operation_to_plan.rs index 0d5de71de..3a6322861 100644 --- a/crates/sparrow-compiler/src/plan/operation_to_plan.rs +++ b/crates/sparrow-compiler/src/plan/operation_to_plan.rs @@ -21,10 +21,14 @@ use crate::DataContext; /// DataType protobuf representing a list of u64. #[static_init::dynamic] static LIST_U64_DATA_TYPE: DataType = DataType { - kind: Some(data_type::Kind::List(Box::new(DataType { - kind: Some(data_type::Kind::Primitive( - data_type::PrimitiveType::U64 as i32, - )), + kind: Some(data_type::Kind::List(Box::new(data_type::List { + name: "list_u64".to_owned(), + item_type: Some(Box::new(DataType { + kind: Some(data_type::Kind::Primitive( + data_type::PrimitiveType::U64 as i32, + )), + })), + nullable: true, }))), }; diff --git a/crates/sparrow-compiler/src/types/inference.rs b/crates/sparrow-compiler/src/types/inference.rs index 38d4e7d8b..d64c02331 100644 --- a/crates/sparrow-compiler/src/types/inference.rs +++ b/crates/sparrow-compiler/src/types/inference.rs @@ -210,8 +210,30 @@ pub fn validate_instantiation( } } } - FenlType::Collection(Collection::List, _) => { - todo!("list unsupported") + FenlType::Collection(Collection::List, type_vars) => { + debug_assert!(type_vars.len() == 1); + let item_type = match argument_type { + FenlType::Concrete(DataType::List(f)) => { + FenlType::Concrete(f.data_type().clone()) + } + other => anyhow::bail!("expected list, saw {:?}", other), + }; + + match types_for_variable.entry(type_vars[0].clone()) { + Entry::Occupied(occupied) => { + anyhow::ensure!( + occupied.get() == &item_type + || matches!(occupied.get(), FenlType::Error) + || matches!(argument_type, FenlType::Error), + "Failed type validation: expected {} but was {}", + occupied.get(), + item_type + ); + } + Entry::Vacant(vacant) => { + vacant.insert(item_type.clone()); + } + } } FenlType::Error => { // Assume the argument matches (since we already reported what @@ -359,7 +381,20 @@ fn instantiate_type(fenl_type: &FenlType, solutions: &HashMap todo!("unsupported"), + FenlType::Collection(Collection::List, type_vars) => { + debug_assert!(type_vars.len() == 1); + let concrete_type = solutions + .get(&type_vars[0]) + .cloned() + .unwrap_or(FenlType::Concrete(DataType::Null)); + let field = match concrete_type { + // TODO: Should the fields be nullable? + FenlType::Concrete(t) => Arc::new(Field::new("item", t, false)), + other => panic!("expected concrete type, got {:?}", other), + }; + + FenlType::Concrete(DataType::List(field)) + } FenlType::Concrete(_) => fenl_type.clone(), FenlType::Window => fenl_type.clone(), FenlType::Json => fenl_type.clone(), diff --git a/crates/sparrow-instructions/src/columnar_value.rs b/crates/sparrow-instructions/src/columnar_value.rs index b249bd02d..d90b7dc2f 100644 --- a/crates/sparrow-instructions/src/columnar_value.rs +++ b/crates/sparrow-instructions/src/columnar_value.rs @@ -1,13 +1,13 @@ use anyhow::anyhow; use arrow::array::{ - Array, ArrayRef, BooleanArray, GenericStringArray, MapArray, OffsetSizeTrait, PrimitiveArray, - StructArray, + Array, ArrayRef, BooleanArray, GenericStringArray, ListArray, MapArray, OffsetSizeTrait, + PrimitiveArray, StructArray, }; use arrow::datatypes::*; use owning_ref::ArcRef; use sparrow_arrow::downcast::{ - downcast_boolean_array, downcast_map_array, downcast_primitive_array, downcast_string_array, - downcast_struct_array, + downcast_boolean_array, downcast_list_array, downcast_map_array, downcast_primitive_array, + downcast_string_array, downcast_struct_array, }; use sparrow_arrow::scalar_value::{NativeFromScalar, ScalarValue}; @@ -62,6 +62,13 @@ impl ColumnarValue { ArcRef::new(array).try_map(|a| downcast_map_array(a)) } + /// Specialized version of `array_ref` that downcasts the array to a + /// list array. + pub fn list_array(&self) -> anyhow::Result> { + let array = self.array_ref()?; + ArcRef::new(array).try_map(|a| downcast_list_array(a)) + } + /// Specialized version of `array_ref` that downcasts the array to a /// string array. pub fn string_array(&self) -> anyhow::Result>> diff --git a/crates/sparrow-instructions/src/evaluators.rs b/crates/sparrow-instructions/src/evaluators.rs index 84276df3f..c71b2c779 100644 --- a/crates/sparrow-instructions/src/evaluators.rs +++ b/crates/sparrow-instructions/src/evaluators.rs @@ -16,6 +16,7 @@ mod equality; mod field_ref; mod general; mod json_field; +mod list; mod logical; mod macros; mod map; @@ -31,6 +32,7 @@ use equality::*; use field_ref::*; use general::*; use json_field::*; +use list::*; use logical::*; use map::*; use math::*; @@ -203,7 +205,8 @@ fn create_simple_evaluator( ) } InstOp::Floor => FloorEvaluator::try_new(info), - InstOp::Get => GetEvaluator::try_new(info), + InstOp::Get => MapGetEvaluator::try_new(info), + InstOp::Index => ListGetEvaluator::try_new(info), InstOp::Gt => match (info.args[0].is_literal(), info.args[1].is_literal()) { (_, true) => { create_ordered_evaluator!(&info.args[0].data_type, GtScalarEvaluator, info) diff --git a/crates/sparrow-instructions/src/evaluators/list.rs b/crates/sparrow-instructions/src/evaluators/list.rs new file mode 100644 index 000000000..77c2f16b2 --- /dev/null +++ b/crates/sparrow-instructions/src/evaluators/list.rs @@ -0,0 +1,2 @@ +mod get; +pub(super) use get::*; diff --git a/crates/sparrow-instructions/src/evaluators/list/get.rs b/crates/sparrow-instructions/src/evaluators/list/get.rs new file mode 100644 index 000000000..d8bc9c5e1 --- /dev/null +++ b/crates/sparrow-instructions/src/evaluators/list/get.rs @@ -0,0 +1,130 @@ +use anyhow::Context; +use arrow::array::{ + as_boolean_array, as_largestring_array, as_primitive_array, as_string_array, Array, + ArrayAccessor, ArrayRef, Int32Array, Int64Array, ListArray, MapArray, +}; +use arrow::buffer::OffsetBuffer; +use arrow::datatypes::Int64Type; +use arrow::downcast_primitive_array; +use arrow_schema::DataType; +use itertools::Itertools; +use sparrow_plan::ValueRef; +use std::sync::Arc; + +use crate::{Evaluator, EvaluatorFactory, StaticInfo}; + +/// Evaluator for `get` on lists. +/// +/// Retrieves the value at the given index. +#[derive(Debug)] +pub(in crate::evaluators) struct ListGetEvaluator { + index: ValueRef, + list: ValueRef, +} + +impl EvaluatorFactory for ListGetEvaluator { + fn try_new(info: StaticInfo<'_>) -> anyhow::Result> { + let input_type = info.args[1].data_type.clone(); + match input_type { + DataType::List(t) => anyhow::ensure!(t.data_type() == info.result_type), + other => anyhow::bail!("expected list type, saw {:?}", other), + }; + + println!("info: {:?}", info); + + let (index, list) = info.unpack_arguments()?; + Ok(Box::new(Self { index, list })) + } +} + +impl Evaluator for ListGetEvaluator { + fn evaluate(&mut self, info: &dyn crate::RuntimeInfo) -> anyhow::Result { + let list_input = info.value(&self.list)?.list_array()?; + let index_input = info.value(&self.index)?.primitive_array()?; + + let result = list_get(&list_input, &index_input)?; + Ok(Arc::new(result)) + } +} + +/// Given a `ListArray` and `index` array of the same length return an array of the values. +fn list_get(list: &ListArray, indices: &Int64Array) -> anyhow::Result { + anyhow::ensure!(list.len() == indices.len()); + let take_indices = list_indices(list, indices)?; + arrow::compute::take(list.values(), &take_indices, None).context("take in get_map") +} + +/// Gets the indices in the list where the values are at the index within each list. +fn list_indices(list: &ListArray, indices: &Int64Array) -> anyhow::Result { + let offsets = list.offsets(); + let x = accessible_array_list_indices(offsets, indices); + println!("Take results: {:?}", x); + Ok(x) + // Ok(accessible_array_list_indices(offsets, indices)) +} + +/// Generic implementation of `map_indices` for arrays implementing `ArrayAccessor`. +fn accessible_array_list_indices(offsets: &OffsetBuffer, indices: &Int64Array) -> Int32Array { + let mut result = Int32Array::builder(indices.len()); + let offsets = offsets.iter().map(|n| *n as usize).tuple_windows(); + + println!("Indices: {:?}", indices); + println!("Offsets: {:?}", offsets); + 'outer: for (index, (start, next)) in offsets.enumerate() { + let list_start = 0; + let list_end = next - start; + // TODO: Verify the value is valid + // send values back in and make sure it's valid at index + start + if indices.is_valid(index) { + println!("Index: {:?}", index); + println!("Start: {:?}", start); + println!("Next: {:?}", next); + println!("List start: {:?}", list_start); + println!("List end: {:?}", list_end); + let index = indices.value(index) as usize; + if index >= list_start && index < list_end { + result.append_value((start + index) as i32); + continue 'outer; + } + } + result.append_null(); + } + result.finish() +} + +#[cfg(test)] +mod tests { + + use std::sync::Arc; + + use arrow::{ + array::{ + as_string_array, as_struct_array, BooleanArray, BooleanBuilder, Int32Builder, + Int64Array, Int64Builder, ListBuilder, MapBuilder, StringArray, StringBuilder, + StructArray, StructBuilder, + }, + buffer::{BooleanBuffer, NullBuffer, OffsetBuffer}, + }; + use arrow_schema::{DataType, Field, Fields}; + use itertools::Itertools; + + #[test] + fn test_get_string_key_string_value() { + let mut builder = ListBuilder::new(Int32Builder::new()); + builder.append_value([Some(1), Some(2), Some(3)]); + builder.append_value([]); + builder.append_value([None]); + builder.append_value([Some(10), Some(8), Some(4)]); + builder.append_value([Some(10), Some(15), Some(19), Some(123)]); + + let array = builder.finish(); + let array = Arc::new(array); + println!("offsets: {:?}", array.offsets()); + let offsets: &OffsetBuffer = array.offsets(); + let tuples = offsets.iter().map(|n| *n as usize).tuple_windows(); + for (index, (start, end)) in tuples.enumerate() { + println!("index: {}, start: {}, end: {}", index, start, end); + } + // println!("tuple windows: {:?}", tuples); + } +} diff --git a/crates/sparrow-instructions/src/evaluators/map/get.rs b/crates/sparrow-instructions/src/evaluators/map/get.rs index 526a1a872..a1d2e52d5 100644 --- a/crates/sparrow-instructions/src/evaluators/map/get.rs +++ b/crates/sparrow-instructions/src/evaluators/map/get.rs @@ -14,12 +14,12 @@ use crate::{Evaluator, EvaluatorFactory, StaticInfo}; /// Evaluator for `get` on maps. #[derive(Debug)] -pub(in crate::evaluators) struct GetEvaluator { +pub(in crate::evaluators) struct MapGetEvaluator { map: ValueRef, key: ValueRef, } -impl EvaluatorFactory for GetEvaluator { +impl EvaluatorFactory for MapGetEvaluator { fn try_new(info: StaticInfo<'_>) -> anyhow::Result> { let key_type = info.args[0].data_type.clone(); let map_type = &info.args[1].data_type; @@ -49,7 +49,7 @@ impl EvaluatorFactory for GetEvaluator { } } -impl Evaluator for GetEvaluator { +impl Evaluator for MapGetEvaluator { fn evaluate(&mut self, info: &dyn crate::RuntimeInfo) -> anyhow::Result { let map_input = info.value(&self.map)?.map_array()?; let key_input = info.value(&self.key)?.array_ref()?; diff --git a/crates/sparrow-main/tests/e2e/list_tests.rs b/crates/sparrow-main/tests/e2e/list_tests.rs new file mode 100644 index 000000000..00cbdc604 --- /dev/null +++ b/crates/sparrow-main/tests/e2e/list_tests.rs @@ -0,0 +1,178 @@ +//! e2e tests for list types +use std::{fs::File, path::PathBuf, sync::Arc}; + +use anyhow::Context; +use arrow::{ + array::{Int32Builder, Int64Builder, ListBuilder, MapBuilder, StringBuilder}, + datatypes::{DataType, Field, Fields, Schema, TimeUnit}, + record_batch::RecordBatch, +}; +use itertools::Itertools; +use parquet::arrow::ArrowWriter; + +use sparrow_api::kaskada::v1alpha::TableConfig; +use uuid::Uuid; + +use crate::{fixture::DataFixture, QueryFixture}; + +/// Create a simple table with a collection type (map). +pub(crate) async fn list_data_fixture() -> DataFixture { + DataFixture::new() + .with_table_from_files( + TableConfig::new_with_table_source( + "Input", + &Uuid::new_v4(), + "time", + Some("subsort"), + "key", + "", + ), + &["parquet/data_with_list.parquet"], + ) + .await + .unwrap() +} + +#[tokio::test] +async fn test_list() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Input.list | index(0) }").with_dump_dot("asdf").run_to_csv(&arrow_list_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-19T16:39:57.000000000,0,18433805721903975440,1,1 + 1996-12-19T16:40:57.000000000,0,18433805721903975440,1,2 + 1996-12-19T16:40:59.000000000,0,18433805721903975440,1,3 + 1996-12-19T16:41:57.000000000,0,18433805721903975440,1, + 1996-12-19T16:42:57.000000000,0,18433805721903975440,1,1 + "###); +} + +fn json_input() -> &'static str { + let input = r#" + {"time": "1996-12-19T16:39:57Z", "subsort": 0, "key": 1, "list": [1, 2, 3] } + {"time": "1996-12-19T16:40:57Z", "subsort": 0, "key": 1, "list": [1, 2, 3] } + {"time": "1996-12-19T16:40:59Z", "subsort": 0, "key": 1, "list": [1, 2, 3] } + {"time": "1996-12-19T16:41:57Z", "subsort": 0, "key": 1, "list": [1, 2, 3] } + {"time": "1996-12-19T16:42:57Z", "subsort": 0, "key": 1, "list": [1, 2, 3] } + "#; + input +} + +pub(super) fn batch_from_json( + json: &str, + column_types: Vec, +) -> anyhow::Result { + // Trim trailing/leading whitespace on each line. + let json = json.lines().map(|line| line.trim()).join("\n"); + + // Determine the schema. + let schema = { + let mut fields = vec![ + Field::new( + "time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new("subsort", DataType::UInt64, false), + Field::new("key", DataType::UInt64, false), + ]; + + fields.push(Field::new( + "list", + DataType::List(Arc::new(Field::new("item", DataType::Int32, false))), + false, + )); + + Arc::new(Schema::new(fields)) + }; + + // Create the reader + let reader = std::io::Cursor::new(json.as_bytes()); + let reader = arrow::json::ReaderBuilder::new(schema).build(reader)?; + + // Read all the batches and concatenate them. + let batches: Vec<_> = reader.try_collect()?; + let read_schema = batches.get(0).context("no batches read")?.schema(); + let batch = arrow::compute::concat_batches(&read_schema, &batches) + .context("concatenate read batches")?; + + let batch = add_column_and_update_schema(&batch); + + Ok(batch) +} + +fn add_column_and_update_schema(batch: &RecordBatch) -> RecordBatch { + // Create the new column data (for example, a String array in this case) + + // let mut builder = ListBuilder::new(Int32Builder::new()); + // builder.append_value([Some(1), Some(2), Some(3)]); + // builder.append_value([]); + // builder.append_value([None]); + // builder.append_value([Some(10), Some(8), Some(4)]); + // builder.append_value([Some(10)]); + + // let array = builder.finish(); + // let array = Arc::new(array); + + // // Add the new column to the existing schema + // let f = Arc::new(Field::new("item", DataType::Int32, false)); + // let s = Arc::new(Field::new("list", DataType::List(f), false)); + + // let mut new_fields = vec![]; + // batch + // .schema() + // .fields() + // .iter() + // .for_each(|f| new_fields.push(f.clone())); + // new_fields.push(Arc::new(Field::new("list", DataType::Map(s, false), false))); + + // Update the schema with the new field + // let new_schema = Arc::new(Schema::new(new_fields)); + + // Create a new record batch with the updated schema and existing data + let mut columns = vec![]; + batch.columns().iter().for_each(|c| columns.push(c.clone())); + // columns.push(array); + + // TODO: REMOVE + let new_schema = batch.schema().clone(); + + RecordBatch::try_new(new_schema, columns).unwrap() +} + +async fn json_to_parquet_file(json_input: &str, file: File) { + let field = Arc::new(Field::new("my_item", DataType::Int32, false)); + let record_batch = batch_from_json(json_input, vec![DataType::List(field)]).unwrap(); + + // Create a Parquet writer + let mut writer = ArrowWriter::try_new(file, record_batch.schema(), None).unwrap(); + writer.write(&record_batch).unwrap(); + + // Close the writer to finish writing the file + writer.close().unwrap(); +} + +async fn arrow_list_data_fixture() -> DataFixture { + let input = json_input(); + let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + path.pop(); + path.pop(); + path.push("testdata"); + path.push("parquet/data_with_list.parquet"); + + let file = File::create(path).unwrap(); + json_to_parquet_file(input, file).await; + + DataFixture::new() + .with_table_from_files( + TableConfig::new_with_table_source( + "Input", + &Uuid::new_v4(), + "time", + Some("subsort"), + "key", + "", + ), + &[&"parquet/data_with_list.parquet"], + ) + .await + .unwrap() +} diff --git a/crates/sparrow-main/tests/e2e/main.rs b/crates/sparrow-main/tests/e2e/main.rs index c629a920e..84ecde202 100644 --- a/crates/sparrow-main/tests/e2e/main.rs +++ b/crates/sparrow-main/tests/e2e/main.rs @@ -26,6 +26,7 @@ mod equality_tests; mod formula_tests; mod general_tests; mod json_tests; +mod list_tests; mod logical_tests; mod lookup_tests; mod map_tests; diff --git a/crates/sparrow-plan/src/inst.rs b/crates/sparrow-plan/src/inst.rs index 5bf2e9fb4..8409d5110 100644 --- a/crates/sparrow-plan/src/inst.rs +++ b/crates/sparrow-plan/src/inst.rs @@ -103,6 +103,8 @@ pub enum InstOp { Hash, #[strum(props(signature = "if(condition: bool, value: T) -> T"))] If, + #[strum(props(signature = "index(index: i64, list: list) -> T"))] + Index, #[strum(props(signature = "is_valid(input: T) -> bool"))] IsValid, // HACK: This instruction does not show up in the plan/does not have an evaluator. @@ -332,9 +334,9 @@ fn parse_signature(op: InstOp, label: &'static str) -> Option> { panic!("Invalid {label} '{signature_str}' for instruction {op:?}: {e:?}") }); - if signature.name() != op.to_string() { - panic!("Signature for op '{op:?}' has invalid name: '{signature_str}'") - } + // if signature.name() != op.to_string() { + // panic!("Signature for op '{op:?}' has invalid name: '{signature_str}'") + // } Arc::new(signature) }) } diff --git a/crates/sparrow-syntax/src/syntax/fenl_type.rs b/crates/sparrow-syntax/src/syntax/fenl_type.rs index de2c70cfa..f8ee734e0 100644 --- a/crates/sparrow-syntax/src/syntax/fenl_type.rs +++ b/crates/sparrow-syntax/src/syntax/fenl_type.rs @@ -243,8 +243,6 @@ impl FromStr for FenlType { "duration_ns" => Ok(DataType::Duration(TimeUnit::Nanosecond).into()), "window" => Ok(FenlType::Window), "json" => Ok(FenlType::Json), - // TODO(https://github.com/kaskada-ai/kaskada/issues/494): Support fenl types - // in collections s if s.starts_with("list<") && s.ends_with('>') => { let type_var = &s[5..s.len() - 1] .split(',') diff --git a/proto/kaskada/kaskada/v1alpha/schema.proto b/proto/kaskada/kaskada/v1alpha/schema.proto index c12ebe6bf..faa8cbd41 100644 --- a/proto/kaskada/kaskada/v1alpha/schema.proto +++ b/proto/kaskada/kaskada/v1alpha/schema.proto @@ -32,12 +32,18 @@ message DataType { google.protobuf.Empty window = 3; // A list of a different type. - DataType list = 4; + List list = 4; // A map type. Map map = 5; } + message List { + string name = 1; + DataType item_type = 2; + bool nullable = 3; + } + message Map { string name = 1; bool ordered = 2; From 30db12ca3dfbffe2e5db6bd51b349ca47132da0a Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Thu, 27 Jul 2023 09:44:33 -0700 Subject: [PATCH 02/10] add unit tests --- .../src/functions/collection.rs | 2 +- .../src/functions/pushdown.rs | 1 + crates/sparrow-instructions/src/evaluators.rs | 2 +- .../src/evaluators/list/get.rs | 131 ++++++--- .../src/evaluators/map/get.rs | 6 +- crates/sparrow-main/tests/e2e/list_tests.rs | 248 ++++++++---------- crates/sparrow-plan/src/inst.rs | 2 +- crates/sparrow-syntax/src/syntax/fenl_type.rs | 3 + 8 files changed, 222 insertions(+), 173 deletions(-) diff --git a/crates/sparrow-compiler/src/functions/collection.rs b/crates/sparrow-compiler/src/functions/collection.rs index 22a3ff4cc..78f003c09 100644 --- a/crates/sparrow-compiler/src/functions/collection.rs +++ b/crates/sparrow-compiler/src/functions/collection.rs @@ -9,7 +9,7 @@ pub(super) fn register(registry: &mut Registry) { .set_internal(); registry - .register("index(index: i64, list: list) -> T") + .register("index(i: i64, list: list) -> T") .with_implementation(Implementation::Instruction(InstOp::Index)) .set_internal(); } diff --git a/crates/sparrow-compiler/src/functions/pushdown.rs b/crates/sparrow-compiler/src/functions/pushdown.rs index 7d9f1e4e6..589899334 100644 --- a/crates/sparrow-compiler/src/functions/pushdown.rs +++ b/crates/sparrow-compiler/src/functions/pushdown.rs @@ -111,6 +111,7 @@ impl Pushdown { | DataType::Interval(_) | DataType::Utf8 | DataType::LargeUtf8 + | DataType::List(..) | DataType::Map(..) => { let mut subst = subst.clone(); subst.insert( diff --git a/crates/sparrow-instructions/src/evaluators.rs b/crates/sparrow-instructions/src/evaluators.rs index c71b2c779..e112303e8 100644 --- a/crates/sparrow-instructions/src/evaluators.rs +++ b/crates/sparrow-instructions/src/evaluators.rs @@ -205,7 +205,7 @@ fn create_simple_evaluator( ) } InstOp::Floor => FloorEvaluator::try_new(info), - InstOp::Get => MapGetEvaluator::try_new(info), + InstOp::Get => GetEvaluator::try_new(info), InstOp::Index => ListGetEvaluator::try_new(info), InstOp::Gt => match (info.args[0].is_literal(), info.args[1].is_literal()) { (_, true) => { diff --git a/crates/sparrow-instructions/src/evaluators/list/get.rs b/crates/sparrow-instructions/src/evaluators/list/get.rs index d8bc9c5e1..8693e9a61 100644 --- a/crates/sparrow-instructions/src/evaluators/list/get.rs +++ b/crates/sparrow-instructions/src/evaluators/list/get.rs @@ -30,8 +30,6 @@ impl EvaluatorFactory for ListGetEvaluator { other => anyhow::bail!("expected list type, saw {:?}", other), }; - println!("info: {:?}", info); - let (index, list) = info.unpack_arguments()?; Ok(Box::new(Self { index, list })) } @@ -57,33 +55,64 @@ fn list_get(list: &ListArray, indices: &Int64Array) -> anyhow::Result /// Gets the indices in the list where the values are at the index within each list. fn list_indices(list: &ListArray, indices: &Int64Array) -> anyhow::Result { let offsets = list.offsets(); - let x = accessible_array_list_indices(offsets, indices); - println!("Take results: {:?}", x); - Ok(x) - // Ok(accessible_array_list_indices(offsets, indices)) + let values = list.values(); + downcast_primitive_array!( + values => { + Ok(accessible_array_list_indices( + offsets, + values, + indices + )) + } + DataType::Utf8 => { + let values = as_string_array(values); + Ok(accessible_array_list_indices( + offsets, + values, + indices + )) + } + DataType::LargeUtf8 => { + let values = as_largestring_array(values); + Ok(accessible_array_list_indices( + offsets, + values, + indices + )) + } + DataType::Boolean => { + let values = as_boolean_array(values); + Ok(accessible_array_list_indices( + offsets, + values, + indices + )) + } + unsupported => { + anyhow::bail!("unsupported list type: {:?}", unsupported) + } + ) } -/// Generic implementation of `map_indices` for arrays implementing `ArrayAccessor`. -fn accessible_array_list_indices(offsets: &OffsetBuffer, indices: &Int64Array) -> Int32Array { +/// Generic implementation of `list_indices` for arrays implementing `ArrayAccessor`. +fn accessible_array_list_indices>( + offsets: &OffsetBuffer, + values: A, + indices: &Int64Array, +) -> Int32Array { let mut result = Int32Array::builder(indices.len()); let offsets = offsets.iter().map(|n| *n as usize).tuple_windows(); - println!("Indices: {:?}", indices); - println!("Offsets: {:?}", offsets); 'outer: for (index, (start, next)) in offsets.enumerate() { let list_start = 0; let list_end = next - start; - // TODO: Verify the value is valid - // send values back in and make sure it's valid at index + start if indices.is_valid(index) { - println!("Index: {:?}", index); - println!("Start: {:?}", start); - println!("Next: {:?}", next); - println!("List start: {:?}", list_start); - println!("List end: {:?}", list_end); - let index = indices.value(index) as usize; - if index >= list_start && index < list_end { - result.append_value((start + index) as i32); + // The inner index corresponds to the index within each list. + let inner_index = indices.value(index) as usize; + // The outer index corresponds to the index with the flattened array. + let outer_index = start + inner_index; + if inner_index >= list_start && inner_index < list_end && values.is_valid(outer_index) { + result.append_value(outer_index as i32); continue 'outer; } } @@ -99,17 +128,20 @@ mod tests { use arrow::{ array::{ - as_string_array, as_struct_array, BooleanArray, BooleanBuilder, Int32Builder, - Int64Array, Int64Builder, ListBuilder, MapBuilder, StringArray, StringBuilder, - StructArray, StructBuilder, + as_boolean_array, as_primitive_array, as_string_array, as_struct_array, BooleanArray, + BooleanBuilder, Int32Array, Int32Builder, Int64Array, Int64Builder, ListBuilder, + MapBuilder, StringArray, StringBuilder, StructArray, StructBuilder, }, buffer::{BooleanBuffer, NullBuffer, OffsetBuffer}, + ipc::Bool, }; use arrow_schema::{DataType, Field, Fields}; use itertools::Itertools; + use crate::evaluators::list::get::list_get; + #[test] - fn test_get_string_key_string_value() { + fn test_index_primitive() { let mut builder = ListBuilder::new(Int32Builder::new()); builder.append_value([Some(1), Some(2), Some(3)]); builder.append_value([]); @@ -119,12 +151,49 @@ mod tests { let array = builder.finish(); let array = Arc::new(array); - println!("offsets: {:?}", array.offsets()); - let offsets: &OffsetBuffer = array.offsets(); - let tuples = offsets.iter().map(|n| *n as usize).tuple_windows(); - for (index, (start, end)) in tuples.enumerate() { - println!("index: {}, start: {}, end: {}", index, start, end); - } - // println!("tuple windows: {:?}", tuples); + + let indices = Int64Array::from(vec![0, 1, 2, 0, 1]); + let actual = list_get(&array, &indices).unwrap(); + let actual: &Int32Array = as_primitive_array(actual.as_ref()); + let expected = Int32Array::from(vec![Some(1), None, None, Some(10), Some(15)]); + assert_eq!(actual, &expected); + } + + #[test] + fn test_index_string() { + let mut builder = ListBuilder::new(StringBuilder::new()); + builder.append_value([Some("hello"), None, Some("world")]); + builder.append_value([Some("apple")]); + builder.append_value([None, Some("carrot")]); + builder.append_value([None, Some("dog"), Some("cat")]); + builder.append_value([Some("bird"), Some("fish")]); + + let array = builder.finish(); + let array = Arc::new(array); + + let indices = Int64Array::from(vec![0, 1, 2, 0, 1]); + let actual = list_get(&array, &indices).unwrap(); + let actual: &StringArray = as_string_array(actual.as_ref()); + let expected = StringArray::from(vec![Some("hello"), None, None, None, Some("fish")]); + assert_eq!(actual, &expected); + } + + #[test] + fn test_index_boolean() { + let mut builder = ListBuilder::new(BooleanBuilder::new()); + builder.append_value([Some(true), None, Some(false)]); + builder.append_value([Some(false)]); + builder.append_value([None, Some(false)]); + builder.append_value([None, Some(true), Some(false)]); + builder.append_value([Some(true), Some(false)]); + + let array = builder.finish(); + let array = Arc::new(array); + + let indices = Int64Array::from(vec![0, 1, 2, 0, 1]); + let actual = list_get(&array, &indices).unwrap(); + let actual: &BooleanArray = as_boolean_array(actual.as_ref()); + let expected = BooleanArray::from(vec![Some(true), None, None, None, Some(false)]); + assert_eq!(actual, &expected); } } diff --git a/crates/sparrow-instructions/src/evaluators/map/get.rs b/crates/sparrow-instructions/src/evaluators/map/get.rs index a1d2e52d5..526a1a872 100644 --- a/crates/sparrow-instructions/src/evaluators/map/get.rs +++ b/crates/sparrow-instructions/src/evaluators/map/get.rs @@ -14,12 +14,12 @@ use crate::{Evaluator, EvaluatorFactory, StaticInfo}; /// Evaluator for `get` on maps. #[derive(Debug)] -pub(in crate::evaluators) struct MapGetEvaluator { +pub(in crate::evaluators) struct GetEvaluator { map: ValueRef, key: ValueRef, } -impl EvaluatorFactory for MapGetEvaluator { +impl EvaluatorFactory for GetEvaluator { fn try_new(info: StaticInfo<'_>) -> anyhow::Result> { let key_type = info.args[0].data_type.clone(); let map_type = &info.args[1].data_type; @@ -49,7 +49,7 @@ impl EvaluatorFactory for MapGetEvaluator { } } -impl Evaluator for MapGetEvaluator { +impl Evaluator for GetEvaluator { fn evaluate(&mut self, info: &dyn crate::RuntimeInfo) -> anyhow::Result { let map_input = info.value(&self.map)?.map_array()?; let key_input = info.value(&self.key)?.array_ref()?; diff --git a/crates/sparrow-main/tests/e2e/list_tests.rs b/crates/sparrow-main/tests/e2e/list_tests.rs index 00cbdc604..f3314ae32 100644 --- a/crates/sparrow-main/tests/e2e/list_tests.rs +++ b/crates/sparrow-main/tests/e2e/list_tests.rs @@ -1,14 +1,4 @@ //! e2e tests for list types -use std::{fs::File, path::PathBuf, sync::Arc}; - -use anyhow::Context; -use arrow::{ - array::{Int32Builder, Int64Builder, ListBuilder, MapBuilder, StringBuilder}, - datatypes::{DataType, Field, Fields, Schema, TimeUnit}, - record_batch::RecordBatch, -}; -use itertools::Itertools; -use parquet::arrow::ArrowWriter; use sparrow_api::kaskada::v1alpha::TableConfig; use uuid::Uuid; @@ -34,145 +24,131 @@ pub(crate) async fn list_data_fixture() -> DataFixture { } #[tokio::test] -async fn test_list() { - insta::assert_snapshot!(QueryFixture::new("{ f1: Input.list | index(0) }").with_dump_dot("asdf").run_to_csv(&arrow_list_data_fixture().await).await.unwrap(), @r###" +async fn test_index_list_i64_static() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Input.i64_list | index(1) }").run_to_csv(&list_data_fixture().await).await.unwrap(), @r###" _time,_subsort,_key_hash,_key,f1 - 1996-12-19T16:39:57.000000000,0,18433805721903975440,1,1 + 1996-12-19T16:39:57.000000000,0,18433805721903975440,1,2 1996-12-19T16:40:57.000000000,0,18433805721903975440,1,2 - 1996-12-19T16:40:59.000000000,0,18433805721903975440,1,3 - 1996-12-19T16:41:57.000000000,0,18433805721903975440,1, - 1996-12-19T16:42:57.000000000,0,18433805721903975440,1,1 + 1996-12-19T16:40:59.000000000,0,18433805721903975440,1,2 + 1996-12-19T16:41:57.000000000,0,18433805721903975440,1,2 + 1996-12-19T16:42:57.000000000,0,18433805721903975440,1,2 "###); } -fn json_input() -> &'static str { - let input = r#" - {"time": "1996-12-19T16:39:57Z", "subsort": 0, "key": 1, "list": [1, 2, 3] } - {"time": "1996-12-19T16:40:57Z", "subsort": 0, "key": 1, "list": [1, 2, 3] } - {"time": "1996-12-19T16:40:59Z", "subsort": 0, "key": 1, "list": [1, 2, 3] } - {"time": "1996-12-19T16:41:57Z", "subsort": 0, "key": 1, "list": [1, 2, 3] } - {"time": "1996-12-19T16:42:57Z", "subsort": 0, "key": 1, "list": [1, 2, 3] } - "#; - input +#[tokio::test] +async fn test_index_list_i64_dynamic() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Input.i64_list | index(Input.index) }").run_to_csv(&list_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-19T16:39:57.000000000,0,18433805721903975440,1,1 + 1996-12-19T16:40:57.000000000,0,18433805721903975440,1,1 + 1996-12-19T16:40:59.000000000,0,18433805721903975440,1,1 + 1996-12-19T16:41:57.000000000,0,18433805721903975440,1,1 + 1996-12-19T16:42:57.000000000,0,18433805721903975440,1,1 + "###); } -pub(super) fn batch_from_json( - json: &str, - column_types: Vec, -) -> anyhow::Result { - // Trim trailing/leading whitespace on each line. - let json = json.lines().map(|line| line.trim()).join("\n"); - - // Determine the schema. - let schema = { - let mut fields = vec![ - Field::new( - "time", - DataType::Timestamp(TimeUnit::Nanosecond, None), - false, - ), - Field::new("subsort", DataType::UInt64, false), - Field::new("key", DataType::UInt64, false), - ]; - - fields.push(Field::new( - "list", - DataType::List(Arc::new(Field::new("item", DataType::Int32, false))), - false, - )); - - Arc::new(Schema::new(fields)) - }; - - // Create the reader - let reader = std::io::Cursor::new(json.as_bytes()); - let reader = arrow::json::ReaderBuilder::new(schema).build(reader)?; - - // Read all the batches and concatenate them. - let batches: Vec<_> = reader.try_collect()?; - let read_schema = batches.get(0).context("no batches read")?.schema(); - let batch = arrow::compute::concat_batches(&read_schema, &batches) - .context("concatenate read batches")?; - - let batch = add_column_and_update_schema(&batch); - - Ok(batch) +#[tokio::test] +async fn test_index_list_string_static() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Input.string_list | index(1) }").run_to_csv(&list_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-19T16:39:57.000000000,0,18433805721903975440,1,2 + 1996-12-19T16:40:57.000000000,0,18433805721903975440,1,2 + 1996-12-19T16:40:59.000000000,0,18433805721903975440,1,2 + 1996-12-19T16:41:57.000000000,0,18433805721903975440,1,2 + 1996-12-19T16:42:57.000000000,0,18433805721903975440,1,2 + "###); } -fn add_column_and_update_schema(batch: &RecordBatch) -> RecordBatch { - // Create the new column data (for example, a String array in this case) - - // let mut builder = ListBuilder::new(Int32Builder::new()); - // builder.append_value([Some(1), Some(2), Some(3)]); - // builder.append_value([]); - // builder.append_value([None]); - // builder.append_value([Some(10), Some(8), Some(4)]); - // builder.append_value([Some(10)]); - - // let array = builder.finish(); - // let array = Arc::new(array); - - // // Add the new column to the existing schema - // let f = Arc::new(Field::new("item", DataType::Int32, false)); - // let s = Arc::new(Field::new("list", DataType::List(f), false)); - - // let mut new_fields = vec![]; - // batch - // .schema() - // .fields() - // .iter() - // .for_each(|f| new_fields.push(f.clone())); - // new_fields.push(Arc::new(Field::new("list", DataType::Map(s, false), false))); - - // Update the schema with the new field - // let new_schema = Arc::new(Schema::new(new_fields)); - - // Create a new record batch with the updated schema and existing data - let mut columns = vec![]; - batch.columns().iter().for_each(|c| columns.push(c.clone())); - // columns.push(array); - - // TODO: REMOVE - let new_schema = batch.schema().clone(); - - RecordBatch::try_new(new_schema, columns).unwrap() +#[tokio::test] +async fn test_index_list_string_dynamic() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Input.string_list | index(Input.index) }").run_to_csv(&list_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-19T16:39:57.000000000,0,18433805721903975440,1,1 + 1996-12-19T16:40:57.000000000,0,18433805721903975440,1,1 + 1996-12-19T16:40:59.000000000,0,18433805721903975440,1,1 + 1996-12-19T16:41:57.000000000,0,18433805721903975440,1,1 + 1996-12-19T16:42:57.000000000,0,18433805721903975440,1,1 + "###); } -async fn json_to_parquet_file(json_input: &str, file: File) { - let field = Arc::new(Field::new("my_item", DataType::Int32, false)); - let record_batch = batch_from_json(json_input, vec![DataType::List(field)]).unwrap(); - - // Create a Parquet writer - let mut writer = ArrowWriter::try_new(file, record_batch.schema(), None).unwrap(); - writer.write(&record_batch).unwrap(); - - // Close the writer to finish writing the file - writer.close().unwrap(); +#[tokio::test] +async fn test_index_list_bool_static() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Input.bool_list | index(1) }").run_to_csv(&list_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-19T16:39:57.000000000,0,18433805721903975440,1,2 + 1996-12-19T16:40:57.000000000,0,18433805721903975440,1,2 + 1996-12-19T16:40:59.000000000,0,18433805721903975440,1,2 + 1996-12-19T16:41:57.000000000,0,18433805721903975440,1,2 + 1996-12-19T16:42:57.000000000,0,18433805721903975440,1,2 + "###); } -async fn arrow_list_data_fixture() -> DataFixture { - let input = json_input(); - let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - path.pop(); - path.pop(); - path.push("testdata"); - path.push("parquet/data_with_list.parquet"); +#[tokio::test] +async fn test_index_list_bool_dynamic() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Input.bool_list | index(Input.index) }").run_to_csv(&list_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-19T16:39:57.000000000,0,18433805721903975440,1,false + 1996-12-19T16:40:57.000000000,0,18433805721903975440,1, + 1996-12-19T16:40:59.000000000,0,18433805721903975440,1,false + 1996-12-19T16:41:57.000000000,0,18433805721903975440,1,true + 1996-12-19T16:42:57.000000000,0,18433805721903975440,1,true + "###); +} - let file = File::create(path).unwrap(); - json_to_parquet_file(input, file).await; +#[tokio::test] +async fn test_incorrect_index_type() { + insta::assert_yaml_snapshot!(QueryFixture::new("{ f1: Input.i64_list | index(\"s\") }") + .run_to_csv(&list_data_fixture().await).await.unwrap_err(), @r###" + --- + code: Client specified an invalid argument + message: 1 errors in Fenl statements; see diagnostics + fenl_diagnostics: + - severity: error + code: E0010 + message: Invalid argument type(s) + formatted: + - "error[E0010]: Invalid argument type(s)" + - " --> Query:1:24" + - " |" + - "1 | { f1: Input.i64_list | index(\"s\") }" + - " | ^^^^^ --- Actual type: string" + - " | | " + - " | Invalid types for parameter 'i' in call to 'index'" + - " |" + - " --> built-in signature 'index(i: i64, list: list) -> T':1:22" + - " |" + - "1 | index(i: i64, list: list) -> T" + - " | --- Expected type: i64" + - "" + - "" + "###); +} - DataFixture::new() - .with_table_from_files( - TableConfig::new_with_table_source( - "Input", - &Uuid::new_v4(), - "time", - Some("subsort"), - "key", - "", - ), - &[&"parquet/data_with_list.parquet"], - ) - .await - .unwrap() +#[tokio::test] +async fn test_incorrect_index_type_field() { + insta::assert_yaml_snapshot!(QueryFixture::new("{ f1: Input.i64_list | index(Input.bool_list) }") + .run_to_csv(&list_data_fixture().await).await.unwrap_err(), @r###" + --- + code: Client specified an invalid argument + message: 1 errors in Fenl statements; see diagnostics + fenl_diagnostics: + - severity: error + code: E0010 + message: Invalid argument type(s) + formatted: + - "error[E0010]: Invalid argument type(s)" + - " --> Query:1:24" + - " |" + - "1 | { f1: Input.i64_list | index(Input.bool_list) }" + - " | ^^^^^ --------------- Actual type: list" + - " | | " + - " | Invalid types for parameter 'i' in call to 'index'" + - " |" + - " --> built-in signature 'index(i: i64, list: list) -> T':1:18" + - " |" + - "1 | index(i: i64, list: list) -> T" + - " | --- Expected type: i64" + - "" + - "" + "###); } diff --git a/crates/sparrow-plan/src/inst.rs b/crates/sparrow-plan/src/inst.rs index 8409d5110..e7ddd8038 100644 --- a/crates/sparrow-plan/src/inst.rs +++ b/crates/sparrow-plan/src/inst.rs @@ -103,7 +103,7 @@ pub enum InstOp { Hash, #[strum(props(signature = "if(condition: bool, value: T) -> T"))] If, - #[strum(props(signature = "index(index: i64, list: list) -> T"))] + #[strum(props(signature = "index(i: i64, list: list) -> T"))] Index, #[strum(props(signature = "is_valid(input: T) -> bool"))] IsValid, diff --git a/crates/sparrow-syntax/src/syntax/fenl_type.rs b/crates/sparrow-syntax/src/syntax/fenl_type.rs index f8ee734e0..18af9c4f3 100644 --- a/crates/sparrow-syntax/src/syntax/fenl_type.rs +++ b/crates/sparrow-syntax/src/syntax/fenl_type.rs @@ -82,6 +82,9 @@ impl<'a> std::fmt::Display for FormatDataType<'a> { write!(fmt, "{}", FormatStruct(fields)) } DataType::Date32 => fmt.write_str("date32"), + DataType::List(f) => { + write!(fmt, "list<{}>", FormatDataType(f.data_type())) + } DataType::Map(f, _) => match f.data_type() { DataType::Struct(fields) => { write!( From 56630725dd2d8a6c07b72bec3a9937750b640478 Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Thu, 27 Jul 2023 10:05:03 -0700 Subject: [PATCH 03/10] snapshot test updates --- .../src/plan/operation_to_plan.rs | 4 +- ...er_golden_tests__lookup_diff_grouping.snap | 14 +++++-- ...er_golden_tests__lookup_same_grouping.snap | 14 +++++-- ...ts__lookup_same_grouping_with_slicing.snap | 14 +++++-- crates/sparrow-instructions/src/evaluators.rs | 2 +- .../src/evaluators/list.rs | 4 +- .../src/evaluators/list/{get.rs => index.rs} | 34 ++++++---------- crates/sparrow-main/tests/e2e/list_tests.rs | 40 +++++++++---------- .../src/execute/operation/lookup_request.rs | 12 ++++-- 9 files changed, 76 insertions(+), 62 deletions(-) rename crates/sparrow-instructions/src/evaluators/list/{get.rs => index.rs} (87%) diff --git a/crates/sparrow-compiler/src/plan/operation_to_plan.rs b/crates/sparrow-compiler/src/plan/operation_to_plan.rs index 3a6322861..6ba346a24 100644 --- a/crates/sparrow-compiler/src/plan/operation_to_plan.rs +++ b/crates/sparrow-compiler/src/plan/operation_to_plan.rs @@ -22,7 +22,9 @@ use crate::DataContext; #[static_init::dynamic] static LIST_U64_DATA_TYPE: DataType = DataType { kind: Some(data_type::Kind::List(Box::new(data_type::List { - name: "list_u64".to_owned(), + // Note: The fields here must match the default fields used when creating + // types during type inference, otherwise schema validation will fail. + name: "item".to_owned(), item_type: Some(Box::new(DataType { kind: Some(data_type::Kind::Primitive( data_type::PrimitiveType::U64 as i32, diff --git a/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_diff_grouping.snap b/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_diff_grouping.snap index 87f4a49cf..88330bdf2 100644 --- a/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_diff_grouping.snap +++ b/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_diff_grouping.snap @@ -227,8 +227,11 @@ operations: result_type: kind: List: - kind: - Primitive: 10 + name: item + item_type: + kind: + Primitive: 10 + nullable: true output: true operator: Input: @@ -251,8 +254,11 @@ operations: result_type: kind: List: - kind: - Primitive: 10 + name: item + item_type: + kind: + Primitive: 10 + nullable: true output: true operator: Input: diff --git a/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_same_grouping.snap b/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_same_grouping.snap index d9f029d3a..9970a70fb 100644 --- a/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_same_grouping.snap +++ b/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_same_grouping.snap @@ -217,8 +217,11 @@ operations: result_type: kind: List: - kind: - Primitive: 10 + name: item + item_type: + kind: + Primitive: 10 + nullable: true output: true operator: Input: @@ -241,8 +244,11 @@ operations: result_type: kind: List: - kind: - Primitive: 10 + name: item + item_type: + kind: + Primitive: 10 + nullable: true output: true operator: Input: diff --git a/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_same_grouping_with_slicing.snap b/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_same_grouping_with_slicing.snap index c1d5b07ba..72ae1e7ef 100644 --- a/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_same_grouping_with_slicing.snap +++ b/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_same_grouping_with_slicing.snap @@ -219,8 +219,11 @@ operations: result_type: kind: List: - kind: - Primitive: 10 + name: item + item_type: + kind: + Primitive: 10 + nullable: true output: true operator: Input: @@ -243,8 +246,11 @@ operations: result_type: kind: List: - kind: - Primitive: 10 + name: item + item_type: + kind: + Primitive: 10 + nullable: true output: true operator: Input: diff --git a/crates/sparrow-instructions/src/evaluators.rs b/crates/sparrow-instructions/src/evaluators.rs index e112303e8..770e923cf 100644 --- a/crates/sparrow-instructions/src/evaluators.rs +++ b/crates/sparrow-instructions/src/evaluators.rs @@ -206,7 +206,7 @@ fn create_simple_evaluator( } InstOp::Floor => FloorEvaluator::try_new(info), InstOp::Get => GetEvaluator::try_new(info), - InstOp::Index => ListGetEvaluator::try_new(info), + InstOp::Index => IndexEvaluator::try_new(info), InstOp::Gt => match (info.args[0].is_literal(), info.args[1].is_literal()) { (_, true) => { create_ordered_evaluator!(&info.args[0].data_type, GtScalarEvaluator, info) diff --git a/crates/sparrow-instructions/src/evaluators/list.rs b/crates/sparrow-instructions/src/evaluators/list.rs index 77c2f16b2..c1c37f4f9 100644 --- a/crates/sparrow-instructions/src/evaluators/list.rs +++ b/crates/sparrow-instructions/src/evaluators/list.rs @@ -1,2 +1,2 @@ -mod get; -pub(super) use get::*; +mod index; +pub(super) use index::*; diff --git a/crates/sparrow-instructions/src/evaluators/list/get.rs b/crates/sparrow-instructions/src/evaluators/list/index.rs similarity index 87% rename from crates/sparrow-instructions/src/evaluators/list/get.rs rename to crates/sparrow-instructions/src/evaluators/list/index.rs index 8693e9a61..904434a4a 100644 --- a/crates/sparrow-instructions/src/evaluators/list/get.rs +++ b/crates/sparrow-instructions/src/evaluators/list/index.rs @@ -1,10 +1,10 @@ use anyhow::Context; use arrow::array::{ - as_boolean_array, as_largestring_array, as_primitive_array, as_string_array, Array, - ArrayAccessor, ArrayRef, Int32Array, Int64Array, ListArray, MapArray, + as_boolean_array, as_largestring_array, as_string_array, Array, ArrayAccessor, ArrayRef, + Int32Array, Int64Array, ListArray, }; use arrow::buffer::OffsetBuffer; -use arrow::datatypes::Int64Type; + use arrow::downcast_primitive_array; use arrow_schema::DataType; use itertools::Itertools; @@ -13,16 +13,16 @@ use std::sync::Arc; use crate::{Evaluator, EvaluatorFactory, StaticInfo}; -/// Evaluator for `get` on lists. +/// Evaluator for `index` on lists. /// /// Retrieves the value at the given index. #[derive(Debug)] -pub(in crate::evaluators) struct ListGetEvaluator { +pub(in crate::evaluators) struct IndexEvaluator { index: ValueRef, list: ValueRef, } -impl EvaluatorFactory for ListGetEvaluator { +impl EvaluatorFactory for IndexEvaluator { fn try_new(info: StaticInfo<'_>) -> anyhow::Result> { let input_type = info.args[1].data_type.clone(); match input_type { @@ -35,7 +35,7 @@ impl EvaluatorFactory for ListGetEvaluator { } } -impl Evaluator for ListGetEvaluator { +impl Evaluator for IndexEvaluator { fn evaluate(&mut self, info: &dyn crate::RuntimeInfo) -> anyhow::Result { let list_input = info.value(&self.list)?.list_array()?; let index_input = info.value(&self.index)?.primitive_array()?; @@ -123,22 +123,12 @@ fn accessible_array_list_indices>( #[cfg(test)] mod tests { - - use std::sync::Arc; - - use arrow::{ - array::{ - as_boolean_array, as_primitive_array, as_string_array, as_struct_array, BooleanArray, - BooleanBuilder, Int32Array, Int32Builder, Int64Array, Int64Builder, ListBuilder, - MapBuilder, StringArray, StringBuilder, StructArray, StructBuilder, - }, - buffer::{BooleanBuffer, NullBuffer, OffsetBuffer}, - ipc::Bool, + use crate::evaluators::list::index::list_get; + use arrow::array::{ + as_boolean_array, as_primitive_array, as_string_array, BooleanArray, BooleanBuilder, + Int32Array, Int32Builder, Int64Array, ListBuilder, StringArray, StringBuilder, }; - use arrow_schema::{DataType, Field, Fields}; - use itertools::Itertools; - - use crate::evaluators::list::get::list_get; + use std::sync::Arc; #[test] fn test_index_primitive() { diff --git a/crates/sparrow-main/tests/e2e/list_tests.rs b/crates/sparrow-main/tests/e2e/list_tests.rs index f3314ae32..13d0921e4 100644 --- a/crates/sparrow-main/tests/e2e/list_tests.rs +++ b/crates/sparrow-main/tests/e2e/list_tests.rs @@ -40,9 +40,9 @@ async fn test_index_list_i64_dynamic() { insta::assert_snapshot!(QueryFixture::new("{ f1: Input.i64_list | index(Input.index) }").run_to_csv(&list_data_fixture().await).await.unwrap(), @r###" _time,_subsort,_key_hash,_key,f1 1996-12-19T16:39:57.000000000,0,18433805721903975440,1,1 - 1996-12-19T16:40:57.000000000,0,18433805721903975440,1,1 - 1996-12-19T16:40:59.000000000,0,18433805721903975440,1,1 - 1996-12-19T16:41:57.000000000,0,18433805721903975440,1,1 + 1996-12-19T16:40:57.000000000,0,18433805721903975440,1,3 + 1996-12-19T16:40:59.000000000,0,18433805721903975440,1,2 + 1996-12-19T16:41:57.000000000,0,18433805721903975440,1,3 1996-12-19T16:42:57.000000000,0,18433805721903975440,1,1 "###); } @@ -51,11 +51,11 @@ async fn test_index_list_i64_dynamic() { async fn test_index_list_string_static() { insta::assert_snapshot!(QueryFixture::new("{ f1: Input.string_list | index(1) }").run_to_csv(&list_data_fixture().await).await.unwrap(), @r###" _time,_subsort,_key_hash,_key,f1 - 1996-12-19T16:39:57.000000000,0,18433805721903975440,1,2 - 1996-12-19T16:40:57.000000000,0,18433805721903975440,1,2 - 1996-12-19T16:40:59.000000000,0,18433805721903975440,1,2 - 1996-12-19T16:41:57.000000000,0,18433805721903975440,1,2 - 1996-12-19T16:42:57.000000000,0,18433805721903975440,1,2 + 1996-12-19T16:39:57.000000000,0,18433805721903975440,1,bird + 1996-12-19T16:40:57.000000000,0,18433805721903975440,1,bird + 1996-12-19T16:40:59.000000000,0,18433805721903975440,1, + 1996-12-19T16:41:57.000000000,0,18433805721903975440,1,cat + 1996-12-19T16:42:57.000000000,0,18433805721903975440,1, "###); } @@ -63,11 +63,11 @@ async fn test_index_list_string_static() { async fn test_index_list_string_dynamic() { insta::assert_snapshot!(QueryFixture::new("{ f1: Input.string_list | index(Input.index) }").run_to_csv(&list_data_fixture().await).await.unwrap(), @r###" _time,_subsort,_key_hash,_key,f1 - 1996-12-19T16:39:57.000000000,0,18433805721903975440,1,1 - 1996-12-19T16:40:57.000000000,0,18433805721903975440,1,1 - 1996-12-19T16:40:59.000000000,0,18433805721903975440,1,1 - 1996-12-19T16:41:57.000000000,0,18433805721903975440,1,1 - 1996-12-19T16:42:57.000000000,0,18433805721903975440,1,1 + 1996-12-19T16:39:57.000000000,0,18433805721903975440,1,dog + 1996-12-19T16:40:57.000000000,0,18433805721903975440,1,fish + 1996-12-19T16:40:59.000000000,0,18433805721903975440,1, + 1996-12-19T16:41:57.000000000,0,18433805721903975440,1, + 1996-12-19T16:42:57.000000000,0,18433805721903975440,1,dog "###); } @@ -75,11 +75,11 @@ async fn test_index_list_string_dynamic() { async fn test_index_list_bool_static() { insta::assert_snapshot!(QueryFixture::new("{ f1: Input.bool_list | index(1) }").run_to_csv(&list_data_fixture().await).await.unwrap(), @r###" _time,_subsort,_key_hash,_key,f1 - 1996-12-19T16:39:57.000000000,0,18433805721903975440,1,2 - 1996-12-19T16:40:57.000000000,0,18433805721903975440,1,2 - 1996-12-19T16:40:59.000000000,0,18433805721903975440,1,2 - 1996-12-19T16:41:57.000000000,0,18433805721903975440,1,2 - 1996-12-19T16:42:57.000000000,0,18433805721903975440,1,2 + 1996-12-19T16:39:57.000000000,0,18433805721903975440,1,true + 1996-12-19T16:40:57.000000000,0,18433805721903975440,1,false + 1996-12-19T16:40:59.000000000,0,18433805721903975440,1,false + 1996-12-19T16:41:57.000000000,0,18433805721903975440,1,false + 1996-12-19T16:42:57.000000000,0,18433805721903975440,1, "###); } @@ -115,10 +115,10 @@ async fn test_incorrect_index_type() { - " | | " - " | Invalid types for parameter 'i' in call to 'index'" - " |" - - " --> built-in signature 'index(i: i64, list: list) -> T':1:22" + - " --> built-in signature 'index(i: i64, list: list) -> T':1:18" - " |" - "1 | index(i: i64, list: list) -> T" - - " | --- Expected type: i64" + - " | --- Expected type: i64" - "" - "" "###); diff --git a/crates/sparrow-runtime/src/execute/operation/lookup_request.rs b/crates/sparrow-runtime/src/execute/operation/lookup_request.rs index 3fc5bc673..49ff5df28 100644 --- a/crates/sparrow-runtime/src/execute/operation/lookup_request.rs +++ b/crates/sparrow-runtime/src/execute/operation/lookup_request.rs @@ -346,10 +346,14 @@ mod tests { expressions: vec![ExpressionPlan { arguments: vec![], result_type: Some(v1alpha::DataType { - kind: Some(data_type::Kind::List(Box::new(v1alpha::DataType { - kind: Some(data_type::Kind::Primitive( - data_type::PrimitiveType::U64 as i32, - )), + kind: Some(data_type::Kind::List(Box::new(data_type::List { + name: "list".to_owned(), + item_type: Some(Box::new(v1alpha::DataType { + kind: Some(data_type::Kind::Primitive( + data_type::PrimitiveType::U64 as i32, + )), + })), + nullable: true, }))), }), output: true, From abf46d65849edafd5a64deb1b337f0fb058c990e Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Thu, 27 Jul 2023 10:27:24 -0700 Subject: [PATCH 04/10] uncomment --- crates/sparrow-plan/src/inst.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/sparrow-plan/src/inst.rs b/crates/sparrow-plan/src/inst.rs index e7ddd8038..5fcd8ad53 100644 --- a/crates/sparrow-plan/src/inst.rs +++ b/crates/sparrow-plan/src/inst.rs @@ -334,9 +334,9 @@ fn parse_signature(op: InstOp, label: &'static str) -> Option> { panic!("Invalid {label} '{signature_str}' for instruction {op:?}: {e:?}") }); - // if signature.name() != op.to_string() { - // panic!("Signature for op '{op:?}' has invalid name: '{signature_str}'") - // } + if signature.name() != op.to_string() { + panic!("Signature for op '{op:?}' has invalid name: '{signature_str}'") + } Arc::new(signature) }) } From 2327bef2b31cb1dae51ef860f531b79210d030a0 Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Thu, 27 Jul 2023 10:48:19 -0700 Subject: [PATCH 05/10] Add test data --- testdata/parquet/data_with_list.parquet | Bin 0 -> 2865 bytes 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 testdata/parquet/data_with_list.parquet diff --git a/testdata/parquet/data_with_list.parquet b/testdata/parquet/data_with_list.parquet new file mode 100644 index 0000000000000000000000000000000000000000..f4b0971f7cac52ead3d26bd94fb557f11fc85c9a GIT binary patch literal 2865 zcmb7GO>A355MDn&yN=@&$FQsSBrB?*dCDQgPMQPV(3|*|+nv z-+VheJ4!~bjB=G{cqYyLoP`);`N_jK%0Kq9uikmLg5Sk2{-ST_`x`BkHv_xx!Iv}G z;2(x5$i3Vf&iZVN=5sI@bGr|cjH~(t^9FTIZC195oC&}9&ILcCYOpZ4TafLIu@iI* zU?5g7%@vaB7_MC7KpX$%96tmE9d?RZF+1=k(Y zwrD?)Z3O$EQU(Y-7VZb z;8r~6=RY`ary%C5OpafzRvJx}rb@H81@s2QJs13pG+j$KXw28GqYD_|1H)WETBs6S zE{&)YflW6O5bg(XCVh~3^;l27R;zXg7a}7v2Aj@W+aVXzd^*j2lncb#sFhUsjLWr#e)hlY8`%E+WLq@rF^(HWeG%E!zDOV8iGyP4wNc!3AZs*LqEn)S+d zshd(reCvXrk%fFb2?_KxuGv=wxD3yU=LRx9?&~N>6)hTa#Q~uiSF8u&scdf)_o*_j zi|fWXCceWx*0E+x?^kpvPV0f<(s_%zaLi^DhGuI(;vKVHQ((52r0xop%~!1Dk?JfTd3n>)9`x{xwY=TSH?8HJ zKAyxSB=2Z0kgL~gt*ebfxww_%XQ2lSLW;4yR1ZVpX>bEa)Y*@HFr5DU*;lc_ks2Rl zTQ#9*&WnKd;!PvKgsRc&;0kV0G!5wWTyaWvn-%b z>S6tc@q;UDKisYk+BAIsql1c{`uTI>;aUC)Gk$irQSBr-E$Wurc&MDx6UDFnT!@dJ zL6C1eZIELkJEm{XSn@-kU!6iKYGLey;#x^p-=eXIcWj8va+=HviTIWFTD{QV{%4HxeR|eUYD{ZXG-!l zRKZuuIO#zIS;~2oKxKbHD!^YNiU@T%E9D%T6D{xTG)TFWc_|Xv$CX^C8K*La zPBbEWkqCj3f!t*rO{wbcmDK7us?!C9-atETN>!s*)}myYLD#v(XkiOtj!rdl*~O@wzXYB!;_K{RZV?|dYshD&hIl)~ zk5cLIi2R{{meWml?sf;aCwcWFJ^ z<_Pyo=cQejX~wCHQzC`@&8;X)N?mSjbpk^rOD1Wk5{d)&My@IJjN9jxGbE#`gz@2s zNR@V=qvblcIJ&;QxL4Sw^Ix`(lCdD?Et&G=+@Gu=k~2*FbrT=84h0mDT0ELt8>MoV p^5R9N={vdlTf4>Pi}l92y<)vlscoMdzc3OViLzZUV;T5o`5&dS9uxoo literal 0 HcmV?d00001 From 5846bf0592286039c57a60213cb80b19d49d696e Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Thu, 27 Jul 2023 11:47:52 -0700 Subject: [PATCH 06/10] remove values() array and list_array() methods --- crates/sparrow-compiler/src/dfg.rs | 1 - .../src/columnar_value.rs | 15 ++--- .../src/evaluators/list/index.rs | 58 +++---------------- 3 files changed, 12 insertions(+), 62 deletions(-) diff --git a/crates/sparrow-compiler/src/dfg.rs b/crates/sparrow-compiler/src/dfg.rs index 83a00dc6d..a804f4bb6 100644 --- a/crates/sparrow-compiler/src/dfg.rs +++ b/crates/sparrow-compiler/src/dfg.rs @@ -108,7 +108,6 @@ impl Default for Dfg { impl Dfg { pub(super) fn add_literal(&mut self, literal: impl Into) -> anyhow::Result { let literal = literal.into(); - // TODO: FRAZ - do I need to support large string literal here? if let ScalarValue::Utf8(Some(literal)) = literal { self.add_string_literal(&literal) } else { diff --git a/crates/sparrow-instructions/src/columnar_value.rs b/crates/sparrow-instructions/src/columnar_value.rs index d90b7dc2f..b249bd02d 100644 --- a/crates/sparrow-instructions/src/columnar_value.rs +++ b/crates/sparrow-instructions/src/columnar_value.rs @@ -1,13 +1,13 @@ use anyhow::anyhow; use arrow::array::{ - Array, ArrayRef, BooleanArray, GenericStringArray, ListArray, MapArray, OffsetSizeTrait, - PrimitiveArray, StructArray, + Array, ArrayRef, BooleanArray, GenericStringArray, MapArray, OffsetSizeTrait, PrimitiveArray, + StructArray, }; use arrow::datatypes::*; use owning_ref::ArcRef; use sparrow_arrow::downcast::{ - downcast_boolean_array, downcast_list_array, downcast_map_array, downcast_primitive_array, - downcast_string_array, downcast_struct_array, + downcast_boolean_array, downcast_map_array, downcast_primitive_array, downcast_string_array, + downcast_struct_array, }; use sparrow_arrow::scalar_value::{NativeFromScalar, ScalarValue}; @@ -62,13 +62,6 @@ impl ColumnarValue { ArcRef::new(array).try_map(|a| downcast_map_array(a)) } - /// Specialized version of `array_ref` that downcasts the array to a - /// list array. - pub fn list_array(&self) -> anyhow::Result> { - let array = self.array_ref()?; - ArcRef::new(array).try_map(|a| downcast_list_array(a)) - } - /// Specialized version of `array_ref` that downcasts the array to a /// string array. pub fn string_array(&self) -> anyhow::Result>> diff --git a/crates/sparrow-instructions/src/evaluators/list/index.rs b/crates/sparrow-instructions/src/evaluators/list/index.rs index 904434a4a..22de1bc52 100644 --- a/crates/sparrow-instructions/src/evaluators/list/index.rs +++ b/crates/sparrow-instructions/src/evaluators/list/index.rs @@ -1,11 +1,7 @@ use anyhow::Context; -use arrow::array::{ - as_boolean_array, as_largestring_array, as_string_array, Array, ArrayAccessor, ArrayRef, - Int32Array, Int64Array, ListArray, -}; +use arrow::array::{Array, ArrayRef, AsArray, Int32Array, Int64Array, ListArray}; use arrow::buffer::OffsetBuffer; -use arrow::downcast_primitive_array; use arrow_schema::DataType; use itertools::Itertools; use sparrow_plan::ValueRef; @@ -37,7 +33,7 @@ impl EvaluatorFactory for IndexEvaluator { impl Evaluator for IndexEvaluator { fn evaluate(&mut self, info: &dyn crate::RuntimeInfo) -> anyhow::Result { - let list_input = info.value(&self.list)?.list_array()?; + let list_input = info.value(&self.list)?.array_ref()?; let index_input = info.value(&self.index)?.primitive_array()?; let result = list_get(&list_input, &index_input)?; @@ -46,8 +42,10 @@ impl Evaluator for IndexEvaluator { } /// Given a `ListArray` and `index` array of the same length return an array of the values. -fn list_get(list: &ListArray, indices: &Int64Array) -> anyhow::Result { +fn list_get(list: &ArrayRef, indices: &Int64Array) -> anyhow::Result { anyhow::ensure!(list.len() == indices.len()); + + let list = list.as_list(); let take_indices = list_indices(list, indices)?; arrow::compute::take(list.values(), &take_indices, None).context("take in get_map") } @@ -55,51 +53,11 @@ fn list_get(list: &ListArray, indices: &Int64Array) -> anyhow::Result /// Gets the indices in the list where the values are at the index within each list. fn list_indices(list: &ListArray, indices: &Int64Array) -> anyhow::Result { let offsets = list.offsets(); - let values = list.values(); - downcast_primitive_array!( - values => { - Ok(accessible_array_list_indices( - offsets, - values, - indices - )) - } - DataType::Utf8 => { - let values = as_string_array(values); - Ok(accessible_array_list_indices( - offsets, - values, - indices - )) - } - DataType::LargeUtf8 => { - let values = as_largestring_array(values); - Ok(accessible_array_list_indices( - offsets, - values, - indices - )) - } - DataType::Boolean => { - let values = as_boolean_array(values); - Ok(accessible_array_list_indices( - offsets, - values, - indices - )) - } - unsupported => { - anyhow::bail!("unsupported list type: {:?}", unsupported) - } - ) + Ok(accessible_array_list_indices(offsets, indices)) } /// Generic implementation of `list_indices` for arrays implementing `ArrayAccessor`. -fn accessible_array_list_indices>( - offsets: &OffsetBuffer, - values: A, - indices: &Int64Array, -) -> Int32Array { +fn accessible_array_list_indices(offsets: &OffsetBuffer, indices: &Int64Array) -> Int32Array { let mut result = Int32Array::builder(indices.len()); let offsets = offsets.iter().map(|n| *n as usize).tuple_windows(); @@ -111,7 +69,7 @@ fn accessible_array_list_indices>( let inner_index = indices.value(index) as usize; // The outer index corresponds to the index with the flattened array. let outer_index = start + inner_index; - if inner_index >= list_start && inner_index < list_end && values.is_valid(outer_index) { + if inner_index >= list_start && inner_index < list_end { result.append_value(outer_index as i32); continue 'outer; } From 08db9a9f1e6ca114af6ca4ac0d1d75d242876cdb Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Thu, 27 Jul 2023 11:49:51 -0700 Subject: [PATCH 07/10] Fix test signatures --- .../src/evaluators/list/index.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/crates/sparrow-instructions/src/evaluators/list/index.rs b/crates/sparrow-instructions/src/evaluators/list/index.rs index 22de1bc52..96c628d7e 100644 --- a/crates/sparrow-instructions/src/evaluators/list/index.rs +++ b/crates/sparrow-instructions/src/evaluators/list/index.rs @@ -83,8 +83,9 @@ fn accessible_array_list_indices(offsets: &OffsetBuffer, indices: &Int64Arr mod tests { use crate::evaluators::list::index::list_get; use arrow::array::{ - as_boolean_array, as_primitive_array, as_string_array, BooleanArray, BooleanBuilder, - Int32Array, Int32Builder, Int64Array, ListBuilder, StringArray, StringBuilder, + as_boolean_array, as_primitive_array, as_string_array, ArrayRef, BooleanArray, + BooleanBuilder, Int32Array, Int32Builder, Int64Array, ListBuilder, StringArray, + StringBuilder, }; use std::sync::Arc; @@ -97,8 +98,7 @@ mod tests { builder.append_value([Some(10), Some(8), Some(4)]); builder.append_value([Some(10), Some(15), Some(19), Some(123)]); - let array = builder.finish(); - let array = Arc::new(array); + let array: ArrayRef = Arc::new(builder.finish()); let indices = Int64Array::from(vec![0, 1, 2, 0, 1]); let actual = list_get(&array, &indices).unwrap(); @@ -116,8 +116,7 @@ mod tests { builder.append_value([None, Some("dog"), Some("cat")]); builder.append_value([Some("bird"), Some("fish")]); - let array = builder.finish(); - let array = Arc::new(array); + let array: ArrayRef = Arc::new(builder.finish()); let indices = Int64Array::from(vec![0, 1, 2, 0, 1]); let actual = list_get(&array, &indices).unwrap(); @@ -135,8 +134,7 @@ mod tests { builder.append_value([None, Some(true), Some(false)]); builder.append_value([Some(true), Some(false)]); - let array = builder.finish(); - let array = Arc::new(array); + let array: ArrayRef = Arc::new(builder.finish()); let indices = Int64Array::from(vec![0, 1, 2, 0, 1]); let actual = list_get(&array, &indices).unwrap(); From 14dde43a8bc02199c8cddbd3afc7e385fb0afc10 Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Thu, 27 Jul 2023 11:56:47 -0700 Subject: [PATCH 08/10] fix lookup tests --- crates/sparrow-runtime/src/execute/operation/lookup_request.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/sparrow-runtime/src/execute/operation/lookup_request.rs b/crates/sparrow-runtime/src/execute/operation/lookup_request.rs index 49ff5df28..af8010446 100644 --- a/crates/sparrow-runtime/src/execute/operation/lookup_request.rs +++ b/crates/sparrow-runtime/src/execute/operation/lookup_request.rs @@ -347,7 +347,7 @@ mod tests { arguments: vec![], result_type: Some(v1alpha::DataType { kind: Some(data_type::Kind::List(Box::new(data_type::List { - name: "list".to_owned(), + name: "item".to_owned(), item_type: Some(Box::new(v1alpha::DataType { kind: Some(data_type::Kind::Primitive( data_type::PrimitiveType::U64 as i32, From fd2b81e725cb7769ffb4ba581504ff8baaee401f Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Thu, 27 Jul 2023 13:42:20 -0700 Subject: [PATCH 09/10] small refactors --- crates/sparrow-arrow/src/downcast.rs | 11 +---------- .../sparrow-instructions/src/evaluators/list/index.rs | 7 ++----- .../src/execute/operation/lookup_response.rs | 7 +++---- .../sparrow-runtime/src/execute/operation/spread.rs | 8 ++++---- 4 files changed, 10 insertions(+), 23 deletions(-) diff --git a/crates/sparrow-arrow/src/downcast.rs b/crates/sparrow-arrow/src/downcast.rs index 3989dd522..39b263ddc 100644 --- a/crates/sparrow-arrow/src/downcast.rs +++ b/crates/sparrow-arrow/src/downcast.rs @@ -2,8 +2,7 @@ use anyhow::Context; use arrow::array::{ - Array, BooleanArray, GenericStringArray, ListArray, OffsetSizeTrait, PrimitiveArray, - StructArray, + Array, BooleanArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray, StructArray, }; use arrow::datatypes::ArrowPrimitiveType; use arrow_array::MapArray; @@ -49,14 +48,6 @@ pub fn downcast_struct_array(array: &dyn Array) -> anyhow::Result<&StructArray> .with_context(|| format!("Unable to downcast {:?} to struct array", array.data_type())) } -/// Downcast an `ArrayRef` to a `ListArray`. -pub fn downcast_list_array(array: &dyn Array) -> anyhow::Result<&ListArray> { - array - .as_any() - .downcast_ref::() - .with_context(|| format!("Unable to downcast {:?} to list array", array.data_type())) -} - /// Downcast an `ArrayRef` to a `MapArray`. pub fn downcast_map_array(array: &dyn Array) -> anyhow::Result<&MapArray> { array diff --git a/crates/sparrow-instructions/src/evaluators/list/index.rs b/crates/sparrow-instructions/src/evaluators/list/index.rs index 96c628d7e..e65bea3ff 100644 --- a/crates/sparrow-instructions/src/evaluators/list/index.rs +++ b/crates/sparrow-instructions/src/evaluators/list/index.rs @@ -53,11 +53,7 @@ fn list_get(list: &ArrayRef, indices: &Int64Array) -> anyhow::Result { /// Gets the indices in the list where the values are at the index within each list. fn list_indices(list: &ListArray, indices: &Int64Array) -> anyhow::Result { let offsets = list.offsets(); - Ok(accessible_array_list_indices(offsets, indices)) -} -/// Generic implementation of `list_indices` for arrays implementing `ArrayAccessor`. -fn accessible_array_list_indices(offsets: &OffsetBuffer, indices: &Int64Array) -> Int32Array { let mut result = Int32Array::builder(indices.len()); let offsets = offsets.iter().map(|n| *n as usize).tuple_windows(); @@ -76,7 +72,8 @@ fn accessible_array_list_indices(offsets: &OffsetBuffer, indices: &Int64Arr } result.append_null(); } - result.finish() + + Ok(result.finish()) } #[cfg(test)] diff --git a/crates/sparrow-runtime/src/execute/operation/lookup_response.rs b/crates/sparrow-runtime/src/execute/operation/lookup_response.rs index f0b9949d2..854c7709a 100644 --- a/crates/sparrow-runtime/src/execute/operation/lookup_response.rs +++ b/crates/sparrow-runtime/src/execute/operation/lookup_response.rs @@ -1,13 +1,13 @@ use std::sync::Arc; use anyhow::Context; -use arrow::array::{Array, ListArray, UInt32Array, UInt64Array}; +use arrow::array::{Array, AsArray, ListArray, UInt32Array, UInt64Array}; use async_trait::async_trait; use error_stack::{IntoReport, IntoReportCompat, ResultExt}; use futures::StreamExt; use itertools::Itertools; use sparrow_api::kaskada::v1alpha::operation_plan; -use sparrow_arrow::downcast::{downcast_list_array, downcast_primitive_array}; +use sparrow_arrow::downcast::downcast_primitive_array; use sparrow_instructions::ComputeStore; use tokio_stream::wrappers::ReceiverStream; @@ -111,8 +111,7 @@ impl LookupResponseOperation { return Ok(None); } - let requesting_key_hash_list: &ListArray = - downcast_list_array(requesting_key_hash_list.as_ref())?; + let requesting_key_hash_list: &ListArray = requesting_key_hash_list.as_ref().as_list(); let requesting_key_hashes = requesting_key_hash_list.values(); let requesting_key_hashes: &UInt64Array = downcast_primitive_array(requesting_key_hashes.as_ref())?; diff --git a/crates/sparrow-runtime/src/execute/operation/spread.rs b/crates/sparrow-runtime/src/execute/operation/spread.rs index ce6467ef9..979cd74ae 100644 --- a/crates/sparrow-runtime/src/execute/operation/spread.rs +++ b/crates/sparrow-runtime/src/execute/operation/spread.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use anyhow::Context; use arrow::array::{ - new_null_array, Array, ArrayData, ArrayRef, BooleanArray, BooleanBufferBuilder, + new_null_array, Array, ArrayData, ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder, GenericStringArray, GenericStringBuilder, Int32BufferBuilder, ListArray, MapArray, OffsetSizeTrait, PrimitiveArray, PrimitiveBuilder, StringArray, StringBuilder, StructArray, }; @@ -11,8 +11,8 @@ use arrow::datatypes::{self, ArrowPrimitiveType, DataType, Fields}; use bitvec::vec::BitVec; use itertools::{izip, Itertools}; use sparrow_arrow::downcast::{ - downcast_boolean_array, downcast_list_array, downcast_map_array, downcast_primitive_array, - downcast_string_array, downcast_struct_array, + downcast_boolean_array, downcast_map_array, downcast_primitive_array, downcast_string_array, + downcast_struct_array, }; use sparrow_arrow::utils::make_null_array; use sparrow_instructions::GroupingIndices; @@ -1696,7 +1696,7 @@ impl SpreadImpl for UnlatchedUInt64ListSpread { // referenced exactly once). Instead, we need to spread out the offset // array. - let values = downcast_list_array(values.as_ref())?; + let values = values.as_ref().as_list(); let mut offset_builder = Int32BufferBuilder::new(grouping.len() + 1); let mut null_builder = BooleanBufferBuilder::new(grouping.len()); From 3f8f1117de1463db6401802febe28b42390f8b4d Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Thu, 27 Jul 2023 13:44:55 -0700 Subject: [PATCH 10/10] clippy :( --- crates/sparrow-instructions/src/evaluators/list/index.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/sparrow-instructions/src/evaluators/list/index.rs b/crates/sparrow-instructions/src/evaluators/list/index.rs index e65bea3ff..718b6ddb6 100644 --- a/crates/sparrow-instructions/src/evaluators/list/index.rs +++ b/crates/sparrow-instructions/src/evaluators/list/index.rs @@ -1,6 +1,5 @@ use anyhow::Context; use arrow::array::{Array, ArrayRef, AsArray, Int32Array, Int64Array, ListArray}; -use arrow::buffer::OffsetBuffer; use arrow_schema::DataType; use itertools::Itertools;