Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: index for DataType::List #562

Merged
merged 10 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 42 additions & 7 deletions crates/sparrow-api/src/kaskada/v1alpha/schema_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl DataType {
let value = &fields[1];
Self {
kind: Some(data_type::Kind::Map(Box::new(data_type::Map {
name: name.to_string(),
name: name.to_owned(),
ordered,
key_name: key.name.clone(),
key_type: Some(Box::new(
Expand All @@ -44,6 +44,22 @@ impl DataType {
}
}

pub fn new_list(name: &str, field: schema::Field) -> Self {
Self {
kind: Some(data_type::Kind::List(Box::new(data_type::List {
name: name.to_owned(),
item_type: Some(Box::new(
field
.data_type
.as_ref()
.expect("data type to exist")
.clone(),
)),
nullable: field.nullable,
}))),
}
}

pub fn new_primitive(primitive: data_type::PrimitiveType) -> Self {
Self {
kind: Some(data_type::Kind::Primitive(primitive as i32)),
Expand Down Expand Up @@ -221,6 +237,21 @@ impl TryFrom<&arrow::datatypes::DataType> for DataType {

Ok(DataType::new_map(s.name(), *is_ordered, vec![key, value]))
}
arrow::datatypes::DataType::List(field) => {
let name = field.name();
let field = schema::Field {
name: name.to_owned(),
data_type: Some(field.data_type().try_into().map_err(
|err: ConversionError<arrow::datatypes::DataType>| {
err.with_prepend_field("list item".to_owned())
},
)?),
nullable: field.is_nullable(),
};

Ok(DataType::new_list(name, field))
}

unsupported => Err(ConversionError::new_unsupported(unsupported.clone())),
}
}
Expand Down Expand Up @@ -337,12 +368,16 @@ impl TryFrom<&DataType> for arrow::datatypes::DataType {
Some(data_type::Kind::Struct(schema)) => Ok(arrow::datatypes::DataType::Struct(
fields_to_arrow(&schema.fields)?.into(),
)),
Some(data_type::Kind::List(item_type)) => {
let item_type = arrow::datatypes::DataType::try_from(item_type.as_ref())
.map_err(|e| e.with_prepend_field("list item".to_owned()))?;
let item_type = arrow::datatypes::Field::new("item", item_type, true);
Ok(arrow::datatypes::DataType::List(Arc::new(item_type)))
}
Some(data_type::Kind::List(list)) => match list.item_type.as_ref() {
Some(item_type) => {
let item_type = arrow::datatypes::DataType::try_from(item_type.as_ref())
.map_err(|e| e.with_prepend_field("list item".to_owned()))?;
let item_type =
arrow::datatypes::Field::new(list.name.clone(), item_type, list.nullable);
Ok(arrow::datatypes::DataType::List(Arc::new(item_type)))
}
None => Err(ConversionError::new_unsupported(value.clone())),
},
Some(data_type::Kind::Map(map)) => {
match (map.key_type.as_ref(), map.value_type.as_ref()) {
(Some(key), Some(value)) => {
Expand Down
10 changes: 1 addition & 9 deletions crates/sparrow-arrow/src/downcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

use anyhow::Context;
use arrow::array::{
Array, BooleanArray, GenericStringArray, ListArray, OffsetSizeTrait, PrimitiveArray,
StructArray,
Array, BooleanArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray, StructArray,
};
use arrow::datatypes::ArrowPrimitiveType;
use arrow_array::MapArray;
Expand All @@ -24,13 +23,6 @@ pub fn downcast_primitive_array<T: ArrowPrimitiveType>(
})
}

pub fn downcast_list_array(array: &dyn Array) -> anyhow::Result<&ListArray> {
array
.as_any()
.downcast_ref::<ListArray>()
.with_context(|| format!("Unable to downcast {:?} to ListArray", array.data_type()))
}

/// Downcast an array into a string array.
pub fn downcast_string_array<T>(array: &dyn Array) -> anyhow::Result<&GenericStringArray<T>>
where
Expand Down
15 changes: 15 additions & 0 deletions crates/sparrow-compiler/src/ast_to_dfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,13 @@ fn cast_if_needed(
{
Ok(value)
}
// Ensures that list types with the same inner types are compatible, regardless of the (arbitary) field naming.
(FenlType::Concrete(DataType::List(s)), FenlType::Concrete(DataType::List(s2)))
if list_types_are_equal(s, s2) =>
{
Ok(value)
}

(FenlType::Concrete(DataType::Null), FenlType::Window) => Ok(value),
(
FenlType::Concrete(DataType::Struct(actual_fields)),
Expand Down Expand Up @@ -712,6 +719,14 @@ fn map_types_are_equal(a: &FieldRef, b: &FieldRef) -> bool {
}
}

// When constructing the concrete list during inference, we use arbitary names for the inner data
// field since we don't have access to the user's naming patterns there.
// By comparing the list types based on just the inner type, we can ensure that the types are
// still treated as equal.
fn list_types_are_equal(a: &FieldRef, b: &FieldRef) -> bool {
a.data_type() == b.data_type()
}

pub(crate) fn is_any_new(dfg: &mut Dfg, arguments: &[Located<AstDfgRef>]) -> anyhow::Result<Id> {
let mut argument_is_new = arguments.iter().map(|a| a.is_new()).unique();
let mut result = argument_is_new
Expand Down
1 change: 0 additions & 1 deletion crates/sparrow-compiler/src/dfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ impl Default for Dfg {
impl Dfg {
pub(super) fn add_literal(&mut self, literal: impl Into<ScalarValue>) -> anyhow::Result<Id> {
let literal = literal.into();
// TODO: FRAZ - do I need to support large string literal here?
if let ScalarValue::Utf8(Some(literal)) = literal {
self.add_string_literal(&literal)
} else {
Expand Down
5 changes: 5 additions & 0 deletions crates/sparrow-compiler/src/functions/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,9 @@ pub(super) fn register(registry: &mut Registry) {
.register("get<K: key, V: any>(key: K, map: map<K, V>) -> V")
.with_implementation(Implementation::Instruction(InstOp::Get))
.set_internal();

registry
.register("index<T: any>(i: i64, list: list<T>) -> T")
.with_implementation(Implementation::Instruction(InstOp::Index))
.set_internal();
}
1 change: 1 addition & 0 deletions crates/sparrow-compiler/src/functions/pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl Pushdown {
| DataType::Interval(_)
| DataType::Utf8
| DataType::LargeUtf8
| DataType::List(..)
| DataType::Map(..) => {
let mut subst = subst.clone();
subst.insert(
Expand Down
14 changes: 10 additions & 4 deletions crates/sparrow-compiler/src/plan/operation_to_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@ use crate::DataContext;
/// DataType protobuf representing a list of u64.
#[static_init::dynamic]
static LIST_U64_DATA_TYPE: DataType = DataType {
kind: Some(data_type::Kind::List(Box::new(DataType {
kind: Some(data_type::Kind::Primitive(
data_type::PrimitiveType::U64 as i32,
)),
kind: Some(data_type::Kind::List(Box::new(data_type::List {
// Note: The fields here must match the default fields used when creating
// types during type inference, otherwise schema validation will fail.
name: "item".to_owned(),
item_type: Some(Box::new(DataType {
kind: Some(data_type::Kind::Primitive(
data_type::PrimitiveType::U64 as i32,
)),
})),
nullable: true,
}))),
};

Expand Down
41 changes: 38 additions & 3 deletions crates/sparrow-compiler/src/types/inference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,30 @@ pub fn validate_instantiation(
}
}
}
FenlType::Collection(Collection::List, _) => {
todo!("list unsupported")
FenlType::Collection(Collection::List, type_vars) => {
debug_assert!(type_vars.len() == 1);
let item_type = match argument_type {
FenlType::Concrete(DataType::List(f)) => {
FenlType::Concrete(f.data_type().clone())
}
other => anyhow::bail!("expected list, saw {:?}", other),
};

match types_for_variable.entry(type_vars[0].clone()) {
Entry::Occupied(occupied) => {
anyhow::ensure!(
occupied.get() == &item_type
|| matches!(occupied.get(), FenlType::Error)
|| matches!(argument_type, FenlType::Error),
"Failed type validation: expected {} but was {}",
occupied.get(),
item_type
);
}
Entry::Vacant(vacant) => {
vacant.insert(item_type.clone());
}
}
}
FenlType::Error => {
// Assume the argument matches (since we already reported what
Expand Down Expand Up @@ -359,7 +381,20 @@ fn instantiate_type(fenl_type: &FenlType, solutions: &HashMap<TypeVariable, Fenl
let s = Arc::new(Field::new("entries", DataType::Struct(fields), false));
FenlType::Concrete(DataType::Map(s, false))
}
FenlType::Collection(Collection::List, _) => todo!("unsupported"),
FenlType::Collection(Collection::List, type_vars) => {
debug_assert!(type_vars.len() == 1);
let concrete_type = solutions
.get(&type_vars[0])
.cloned()
.unwrap_or(FenlType::Concrete(DataType::Null));
let field = match concrete_type {
// TODO: Should the fields be nullable?
jordanrfrazier marked this conversation as resolved.
Show resolved Hide resolved
FenlType::Concrete(t) => Arc::new(Field::new("item", t, false)),
other => panic!("expected concrete type, got {:?}", other),
};

FenlType::Concrete(DataType::List(field))
}
FenlType::Concrete(_) => fenl_type.clone(),
FenlType::Window => fenl_type.clone(),
FenlType::Json => fenl_type.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,11 @@ operations:
result_type:
kind:
List:
kind:
Primitive: 10
name: item
item_type:
kind:
Primitive: 10
nullable: true
output: true
operator:
Input:
Expand All @@ -251,8 +254,11 @@ operations:
result_type:
kind:
List:
kind:
Primitive: 10
name: item
item_type:
kind:
Primitive: 10
nullable: true
output: true
operator:
Input:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,11 @@ operations:
result_type:
kind:
List:
kind:
Primitive: 10
name: item
jordanrfrazier marked this conversation as resolved.
Show resolved Hide resolved
item_type:
kind:
Primitive: 10
nullable: true
output: true
operator:
Input:
Expand All @@ -241,8 +244,11 @@ operations:
result_type:
kind:
List:
kind:
Primitive: 10
name: item
item_type:
kind:
Primitive: 10
nullable: true
output: true
operator:
Input:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,11 @@ operations:
result_type:
kind:
List:
kind:
Primitive: 10
name: item
item_type:
kind:
Primitive: 10
nullable: true
output: true
operator:
Input:
Expand All @@ -243,8 +246,11 @@ operations:
result_type:
kind:
List:
kind:
Primitive: 10
name: item
item_type:
kind:
Primitive: 10
nullable: true
output: true
operator:
Input:
Expand Down
3 changes: 3 additions & 0 deletions crates/sparrow-instructions/src/evaluators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod equality;
mod field_ref;
mod general;
mod json_field;
mod list;
mod logical;
mod macros;
mod map;
Expand All @@ -31,6 +32,7 @@ use equality::*;
use field_ref::*;
use general::*;
use json_field::*;
use list::*;
use logical::*;
use map::*;
use math::*;
Expand Down Expand Up @@ -204,6 +206,7 @@ fn create_simple_evaluator(
}
InstOp::Floor => FloorEvaluator::try_new(info),
InstOp::Get => GetEvaluator::try_new(info),
InstOp::Index => IndexEvaluator::try_new(info),
InstOp::Gt => match (info.args[0].is_literal(), info.args[1].is_literal()) {
(_, true) => {
create_ordered_evaluator!(&info.args[0].data_type, GtScalarEvaluator, info)
Expand Down
2 changes: 2 additions & 0 deletions crates/sparrow-instructions/src/evaluators/list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mod index;
pub(super) use index::*;
Loading
Loading