Skip to content

Commit

Permalink
Add select transform
Browse files Browse the repository at this point in the history
  • Loading branch information
jordanrfrazier committed Oct 2, 2023
1 parent 0e4bc08 commit ec4530a
Show file tree
Hide file tree
Showing 8 changed files with 333 additions and 55 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 5 additions & 54 deletions crates/sparrow-arrow/src/batch.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use arrow_array::cast::AsArray;
use arrow_array::types::{TimestampNanosecondType, UInt64Type};
use arrow_array::{
Array, ArrayRef, ArrowPrimitiveType, BooleanArray, TimestampNanosecondArray, UInt64Array,
};
use arrow_array::{Array, ArrayRef, ArrowPrimitiveType, TimestampNanosecondArray, UInt64Array};
use error_stack::{IntoReport, ResultExt};
use itertools::Itertools;

Expand Down Expand Up @@ -253,54 +252,6 @@ impl Batch {
}
}

// TODO: Use "filter-bits" to avoid eagerly creating new columns.
pub fn filter(&self, predicate: &BooleanArray) -> error_stack::Result<Self, Error> {
match &self.data {
Some(info) => {
let filter = arrow_select::filter::FilterBuilder::new(predicate)
.optimize()
.build();
let data = filter
.filter(&info.data)
.into_report()
.change_context(Error::Internal)?;

// TODO: This is unnecessary if `time` and `key_hash` were already in the batch.
// We should figure out how to avoid the redundant work.
let time = filter
.filter(&info.time)
.into_report()
.change_context(Error::Internal)?;
let subsort = filter
.filter(&info.subsort)
.into_report()
.change_context(Error::Internal)?;
let key_hash = filter
.filter(&info.key_hash)
.into_report()
.change_context(Error::Internal)?;
let info = BatchInfo {
data,
time,
subsort,
key_hash,
// TODO: Should the `*_present_time` be updated to reflect actual contents of batch?
min_present_time: info.min_present_time,
max_present_time: info.max_present_time,
};

Ok(Self {
data: Some(info),
up_to_time: self.up_to_time,
})
}
None => {
assert_eq!(predicate.len(), 0);
Ok(self.clone())
}
}
}

pub fn slice(&self, offset: usize, length: usize) -> Self {
match &self.data {
Some(info) => {
Expand Down Expand Up @@ -446,15 +397,15 @@ impl BatchInfo {
}

pub(crate) fn time(&self) -> &TimestampNanosecondArray {
downcast_primitive_array(self.time.as_ref()).expect("type checked in constructor")
self.time.as_primitive()
}

pub(crate) fn subsort(&self) -> &UInt64Array {
downcast_primitive_array(self.subsort.as_ref()).expect("type checked in constructor")
self.subsort.as_primitive()
}

pub(crate) fn key_hash(&self) -> &UInt64Array {
downcast_primitive_array(self.key_hash.as_ref()).expect("type checked in constructor")
self.key_hash.as_primitive()
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/sparrow-transforms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Implementation of transforms and pipeline for executing them.
[dependencies]
arrow-array.workspace = true
arrow-schema.workspace = true
arrow-select.workspace = true
derive_more.workspace = true
error-stack.workspace = true
itertools.workspace = true
Expand All @@ -23,6 +24,7 @@ sparrow-scheduler = { path = "../sparrow-scheduler" }
tracing.workspace = true

[dev-dependencies]
index_vec.workspace = true

[lib]
doctest = false
1 change: 1 addition & 0 deletions crates/sparrow-transforms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
//! must be executed to move data to the appropriate partitions.

mod project;
mod select;
mod transform;
mod transform_pipeline;

Expand Down
112 changes: 112 additions & 0 deletions crates/sparrow-transforms/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,115 @@ impl Transform for Project {
std::any::type_name::<Self>()
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use super::*;
use arrow_array::{Int64Array, StructArray, TimestampNanosecondArray, UInt64Array};
use arrow_schema::{Field, Fields};
use index_vec::index_vec;
use sparrow_arrow::{scalar_value::ScalarValue, RowTime};

fn test_batch() -> Batch {
let input_fields = Fields::from(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int64, true),
]);
let input_data = Arc::new(StructArray::new(
input_fields.clone(),
vec![
Arc::new(Int64Array::from(vec![0, 1, 2, 3])),
Arc::new(Int64Array::from(vec![4, 7, 10, 11])),
],
None,
));
let time = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2, 3]));
let subsort = Arc::new(UInt64Array::from(vec![0, 1, 2, 3]));
let key_hash = Arc::new(UInt64Array::from(vec![0, 1, 2, 3]));
let input_batch = Batch::new_with_data(
input_data,
time,
subsort,
key_hash,
RowTime::from_timestamp_ns(3),
);
input_batch
}

#[test]
fn test_project() {
let output_fields = Fields::from(vec![
Field::new("b", DataType::Int64, true),
Field::new("ab", DataType::Int64, true),
]);
let output_type = DataType::Struct(output_fields.clone());

let input = test_batch();
let input_type = input.data().unwrap().data_type();
let exprs: Exprs = index_vec![
sparrow_physical::Expr {
name: "input".into(),
literal_args: vec![],
args: vec![],
result_type: input_type.clone(),
},
sparrow_physical::Expr {
name: "field_ref".into(),
literal_args: vec![ScalarValue::Utf8(Some("a".to_owned()))],
args: vec![0.into()],
result_type: DataType::Int64,
},
sparrow_physical::Expr {
name: "field_ref".into(),
literal_args: vec![ScalarValue::Utf8(Some("b".to_owned()))],
args: vec![0.into()],
result_type: DataType::Int64,
},
sparrow_physical::Expr {
name: "add".into(),
literal_args: vec![],
args: vec![1.into(), 2.into()],
result_type: DataType::Int64,
},
sparrow_physical::Expr {
name: "literal".into(),
literal_args: vec![ScalarValue::Int64(Some(10))],
args: vec![],
result_type: DataType::Int64,
},
sparrow_physical::Expr {
name: "record".into(),
literal_args: vec![],
args: vec![2.into(), 3.into()],
result_type: output_type.clone(),
},
];
let project = Project::try_new(&exprs, &output_type).unwrap();
let input = test_batch();
let actual = project.apply(input).unwrap();

// Construct the expected output
let output_data = Arc::new(StructArray::new(
output_fields.clone(),
vec![
Arc::new(Int64Array::from(vec![4, 7, 10, 11])),
Arc::new(Int64Array::from(vec![4, 8, 12, 14])),
],
None,
));
let time = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2, 3]));
let subsort = Arc::new(UInt64Array::from(vec![0, 1, 2, 3]));
let key_hash = Arc::new(UInt64Array::from(vec![0, 1, 2, 3]));
let expected = Batch::new_with_data(
output_data,
time,
subsort,
key_hash,
RowTime::from_timestamp_ns(3),
);

assert_eq!(expected, actual);
}
}
Loading

0 comments on commit ec4530a

Please sign in to comment.