Skip to content

Commit

Permalink
ref: Simplify expressions in physical plan (#782)
Browse files Browse the repository at this point in the history
Change expressions to always produce the result of the expression. This
simplifies evaluation, without affecting the expressiviness:

1. We can now produce a primitive result (such as a boolean or integer)
if that is the type of the final expression.
2. We can still produce records by adding a `record` expression to the
end (which is the standard way of creating intermediate records).
3. We can still reference parts of a record input using `FieldRef`.

This eliminates the weird "some expressions are marked as output"
pattern. It also means that we can operate on `ArrayRef` generally and
use `DataType` as the type, instead of sometimes using `RecordBatch` and
`SchemaRef`.

Other smaller changes in this PR:
- Move the `Expressions` out of the `StepKind`. This allows the
`StepKind` to serve as an identifier of the step, regardless of the
expressions it computes, simplifying some of the de-duplication work in
the logical-to-physical compilation.
- Rename the `column` expression to `input`, since it accesses the input
data to the expression evaluation.
- Addition of a method to "intern" expression names, replacing a `&str`
with `&'static str` for expression operators available in the binary.
  • Loading branch information
bjchambers authored Oct 2, 2023
1 parent dd520a2 commit 0e4bc08
Show file tree
Hide file tree
Showing 25 changed files with 299 additions and 325 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ tracing-subscriber = { version = "0.3.17", features = [
"json",
] }
url = { version = "2.3.1", features = ["serde"] }
uuid = { version = "1.3.0", features = ["v4"] }
uuid = { version = "1.3.0", features = ["v4", "serde" ] }

[workspace.dependencies.rocksdb]
# This disables compression algorithms that cause issues during linking due to
Expand Down
133 changes: 51 additions & 82 deletions crates/sparrow-arrow/src/batch.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use arrow::error::ArrowError;
use arrow_array::types::{TimestampNanosecondType, UInt64Type};
use arrow_array::{
Array, ArrayRef, ArrowPrimitiveType, BooleanArray, RecordBatch, TimestampNanosecondArray,
UInt64Array,
Array, ArrayRef, ArrowPrimitiveType, BooleanArray, TimestampNanosecondArray, UInt64Array,
};
use arrow_schema::SchemaRef;
use error_stack::{IntoReport, ResultExt};
use itertools::Itertools;

Expand Down Expand Up @@ -36,15 +33,11 @@ impl Batch {

pub fn num_rows(&self) -> usize {
match &self.data {
Some(data) => data.batch.num_rows(),
Some(data) => data.data.len(),
None => 0,
}
}

pub fn into_record_batch(self) -> Option<RecordBatch> {
self.data.map(|data| data.batch)
}

pub fn time(&self) -> Option<&TimestampNanosecondArray> {
self.data.as_ref().map(|data| data.time())
}
Expand All @@ -66,11 +59,8 @@ impl Batch {
self.data.as_ref().map(|info| info.max_present_time)
}

pub fn record_batch(&self) -> Option<&RecordBatch> {
match &self.data {
Some(info) => Some(&info.batch),
None => None,
}
pub fn data(&self) -> Option<&ArrayRef> {
self.data.as_ref().map(|info| &info.data)
}

/// Create a new `Batch` containing the given batch data.
Expand All @@ -83,33 +73,33 @@ impl Batch {
/// (b) all rows in this batch or less than or equal to `up_to_time`
/// (c) all future rows are greater than `up_to_time`.
pub fn new_with_data(
batch: RecordBatch,
data: ArrayRef,
time: ArrayRef,
subsort: ArrayRef,
key_hash: ArrayRef,
up_to_time: RowTime,
) -> Self {
debug_assert_eq!(batch.num_rows(), time.len());
debug_assert_eq!(batch.num_rows(), subsort.len());
debug_assert_eq!(batch.num_rows(), key_hash.len());
debug_assert_eq!(data.len(), time.len());
debug_assert_eq!(data.len(), subsort.len());
debug_assert_eq!(data.len(), key_hash.len());
debug_assert_eq!(time.data_type(), &TimestampNanosecondType::DATA_TYPE);
debug_assert_eq!(subsort.data_type(), &UInt64Type::DATA_TYPE);
debug_assert_eq!(key_hash.data_type(), &UInt64Type::DATA_TYPE);
let data = if batch.num_rows() == 0 {
let data = if data.len() == 0 {
None
} else {
Some(BatchInfo::new(batch, time, subsort, key_hash))
Some(BatchInfo::new(data, time, subsort, key_hash))
};

Self { data, up_to_time }
}

/// Return a new `Batch` with the same time properties, but new data.
pub fn with_projection(&self, new_batch: RecordBatch) -> Self {
assert_eq!(new_batch.num_rows(), self.num_rows());
pub fn with_projection(&self, new_data: ArrayRef) -> Self {
assert_eq!(new_data.len(), self.num_rows());
Self {
data: self.data.as_ref().map(|data| BatchInfo {
batch: new_batch,
data: new_data,
time: data.time.clone(),
subsort: data.subsort.clone(),
key_hash: data.key_hash.clone(),
Expand Down Expand Up @@ -160,11 +150,7 @@ impl Batch {
})
}

pub fn concat(
schema: &SchemaRef,
batches: Vec<Batch>,
up_to_time: RowTime,
) -> error_stack::Result<Batch, Error> {
pub fn concat(batches: Vec<Batch>, up_to_time: RowTime) -> error_stack::Result<Batch, Error> {
// TODO: Add debug assertions for batch ordering?
if batches.iter().all(|batch| batch.is_empty()) {
return Ok(Batch::new_empty(up_to_time));
Expand Down Expand Up @@ -209,16 +195,17 @@ impl Batch {
.change_context(Error::Internal)?;

let batches: Vec<_> = batches
.into_iter()
.flat_map(|batch| batch.into_record_batch())
.iter()
.flat_map(|batch| batch.data())
.map(|data| data.as_ref())
.collect();
let batch = arrow_select::concat::concat_batches(schema, &batches)
let data = arrow_select::concat::concat(&batches)
.into_report()
.change_context(Error::Internal)?;

Ok(Self {
data: Some(BatchInfo {
batch,
data,
time,
subsort,
key_hash,
Expand All @@ -232,8 +219,9 @@ impl Batch {
pub fn take(&self, indices: &UInt64Array) -> error_stack::Result<Self, Error> {
match &self.data {
Some(info) => {
let batch =
take_record_batch(&info.batch, indices).change_context(Error::Internal)?;
let data = arrow_select::take::take(info.data.as_ref(), indices, None)
.into_report()
.change_context(Error::Internal)?;
let time = arrow_select::take::take(info.time.as_ref(), indices, None)
.into_report()
.change_context(Error::Internal)?;
Expand All @@ -244,7 +232,7 @@ impl Batch {
.into_report()
.change_context(Error::Internal)?;
let info = BatchInfo {
batch,
data,
time,
subsort,
key_hash,
Expand Down Expand Up @@ -272,16 +260,8 @@ impl Batch {
let filter = arrow_select::filter::FilterBuilder::new(predicate)
.optimize()
.build();
let columns: Vec<_> = info
.batch
.columns()
.iter()
.map(|column| filter.filter(column))
.try_collect()
.into_report()
.change_context(Error::Internal)?;

let batch = RecordBatch::try_new(info.batch.schema(), columns)
let data = filter
.filter(&info.data)
.into_report()
.change_context(Error::Internal)?;

Expand All @@ -300,7 +280,7 @@ impl Batch {
.into_report()
.change_context(Error::Internal)?;
let info = BatchInfo {
batch,
data,
time,
subsort,
key_hash,
Expand All @@ -325,7 +305,7 @@ impl Batch {
match &self.data {
Some(info) => {
let info = BatchInfo {
batch: info.batch.slice(offset, length),
data: info.data.slice(offset, length),
time: info.time.slice(offset, length),
subsort: info.subsort.slice(offset, length),
key_hash: info.key_hash.slice(offset, length),
Expand Down Expand Up @@ -354,6 +334,8 @@ impl Batch {
) -> Self {
use std::sync::Arc;

use arrow_array::StructArray;

let time: TimestampNanosecondArray = time.into();
let subsort: UInt64Array = (0..(time.len() as u64)).collect_vec().into();
let key_hash: UInt64Array = key_hash.into();
Expand All @@ -362,12 +344,14 @@ impl Batch {
let subsort: ArrayRef = Arc::new(subsort);
let key_hash: ArrayRef = Arc::new(key_hash);

let batch =
RecordBatch::try_new(MINIMAL_SCHEMA.clone(), vec![time.clone(), key_hash.clone()])
.unwrap();
let data = Arc::new(StructArray::new(
MINIMAL_SCHEMA.fields().clone(),
vec![time.clone(), key_hash.clone()],
None,
));

Batch::new_with_data(
batch,
data,
time,
subsort,
key_hash,
Expand All @@ -378,7 +362,7 @@ impl Batch {

#[derive(Clone, Debug)]
pub(crate) struct BatchInfo {
pub(crate) batch: RecordBatch,
pub(crate) data: ArrayRef,
pub(crate) time: ArrayRef,
pub(crate) subsort: ArrayRef,
pub(crate) key_hash: ArrayRef,
Expand All @@ -388,19 +372,19 @@ pub(crate) struct BatchInfo {

impl PartialEq for BatchInfo {
fn eq(&self, other: &Self) -> bool {
self.batch == other.batch
self.data.as_ref() == other.data.as_ref()
&& self.time.as_ref() == other.time.as_ref()
&& self.min_present_time == other.min_present_time
&& self.max_present_time == other.max_present_time
}
}

impl BatchInfo {
fn new(batch: RecordBatch, time: ArrayRef, subsort: ArrayRef, key_hash: ArrayRef) -> Self {
debug_assert_eq!(batch.num_rows(), time.len());
debug_assert_eq!(batch.num_rows(), subsort.len());
debug_assert_eq!(batch.num_rows(), key_hash.len());
debug_assert!(batch.num_rows() > 0);
fn new(data: ArrayRef, time: ArrayRef, subsort: ArrayRef, key_hash: ArrayRef) -> Self {
debug_assert_eq!(data.len(), time.len());
debug_assert_eq!(data.len(), subsort.len());
debug_assert_eq!(data.len(), key_hash.len());
debug_assert!(data.len() > 0);
debug_assert_eq!(time.null_count(), 0);

let time_column: &TimestampNanosecondArray =
Expand All @@ -417,7 +401,7 @@ impl BatchInfo {
RowTime::from_timestamp_ns(time_column.values()[time_column.len() - 1]);

Self {
batch,
data,
time,
subsort,
key_hash,
Expand All @@ -439,21 +423,21 @@ impl BatchInfo {
let slice_start = time_column
.values()
.partition_point(|time| RowTime::from_timestamp_ns(*time) <= time_inclusive);
let slice_len = self.batch.num_rows() - slice_start;
let slice_len = self.data.len() - slice_start;

let max_result_time = RowTime::from_timestamp_ns(time_column.values()[slice_start - 1]);
let min_self_time = RowTime::from_timestamp_ns(time_column.values()[slice_start]);

let result = Self {
batch: self.batch.slice(0, slice_start),
data: self.data.slice(0, slice_start),
time: self.time.slice(0, slice_start),
subsort: self.subsort.slice(0, slice_start),
key_hash: self.key_hash.slice(0, slice_start),
min_present_time: self.min_present_time,
max_present_time: max_result_time,
};

self.batch = self.batch.slice(slice_start, slice_len);
self.data = self.data.slice(slice_start, slice_len);
self.time = self.time.slice(slice_start, slice_len);
self.key_hash = self.key_hash.slice(slice_start, slice_len);
self.min_present_time = min_self_time;
Expand Down Expand Up @@ -482,20 +466,6 @@ pub enum Error {

impl error_stack::Context for Error {}

pub fn take_record_batch(
batch: &RecordBatch,
indices: &UInt64Array,
) -> error_stack::Result<RecordBatch, ArrowError> {
// Produce batches based on indices
let columns = batch
.columns()
.iter()
.map(|c| arrow_select::take::take(c.as_ref(), indices, None).into_report())
.try_collect()?;

RecordBatch::try_new(batch.schema(), columns).into_report()
}

#[cfg(any(test, feature = "testing"))]
#[static_init::dynamic]
static MINIMAL_SCHEMA: arrow_schema::SchemaRef = {
Expand All @@ -517,7 +487,6 @@ mod tests {

use crate::testing::arb_arrays::arb_batch;
use crate::{Batch, RowTime};
use arrow_select::concat::concat_batches;
use itertools::Itertools;
use proptest::prelude::*;

Expand Down Expand Up @@ -546,18 +515,18 @@ mod tests {
prop_assert_eq!(remainder.up_to_time, original.up_to_time);

// create the concatenated result
let concatenated = match (result.record_batch(), remainder.record_batch()) {
let concatenated = match (result.data(), remainder.data()) {
(None, None) => unreachable!(),
(Some(a), None) => a.clone(),
(None, Some(b)) => b.clone(),
(Some(a), Some(b)) => concat_batches(&a.schema(), &[a.clone(), b.clone()]).unwrap(),
(Some(a), Some(b)) => arrow_select::concat::concat(&[a.as_ref(), b.as_ref()]).unwrap(),
};
prop_assert_eq!(&concatenated, original.record_batch().unwrap());
prop_assert_eq!(&concatenated, original.data().unwrap());

prop_assert!(result.data.is_some());
let result = result.data.unwrap();

prop_assert_eq!(result.batch.num_rows(), result.time.len());
prop_assert_eq!(result.data.len(), result.time.len());
prop_assert_eq!(result.time().values()[0], i64::from(result.min_present_time));
prop_assert_eq!(result.time().values()[result.time.len() - 1], i64::from(result.max_present_time));

Expand All @@ -568,7 +537,7 @@ mod tests {
prop_assert!(remainder.data.is_some());
let remainder = remainder.data.unwrap();

prop_assert_eq!(remainder.batch.num_rows(), remainder.time.len());
prop_assert_eq!(remainder.data.len(), remainder.time.len());
prop_assert_eq!(remainder.time().values()[0], i64::from(remainder.min_present_time));
prop_assert_eq!(remainder.time().values()[remainder.time.len() - 1], i64::from(remainder.max_present_time));
}
Expand Down
1 change: 1 addition & 0 deletions crates/sparrow-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ arrow-schema.workspace = true
index_vec.workspace = true
sparrow-core = { path = "../sparrow-core" }
sparrow-physical = { path = "../sparrow-physical" }
uuid.workspace = true

[dev-dependencies]

Expand Down
Loading

0 comments on commit 0e4bc08

Please sign in to comment.