Skip to content

Commit

Permalink
feat: partitioned merge pipeline (#828)
Browse files Browse the repository at this point in the history
Adds the `merge` pipeline (some functionality, at least). 

Blocker:
* Merge pipeline tests

Deferred:
* Grouping / Interpolation - Spread only functions in the unlatched
capacity.
  • Loading branch information
jordanrfrazier authored Oct 30, 2023
1 parent 9998e78 commit 944b648
Show file tree
Hide file tree
Showing 14 changed files with 3,032 additions and 47 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

152 changes: 111 additions & 41 deletions crates/sparrow-batch/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use arrow_array::{
Array, ArrayRef, ArrowPrimitiveType, RecordBatch, StructArray, TimestampNanosecondArray,
UInt64Array,
};
use arrow_schema::DataType;
use arrow_schema::{Fields, Schema};
use error_stack::{IntoReport, ResultExt};
use itertools::Itertools;
Expand Down Expand Up @@ -77,6 +78,35 @@ impl Batch {
})
}

/// Updates the up_to_time.
///
/// The new `up_to_time` must be >= the existing `self.up_to_time`.
pub fn with_up_to_time(self, up_to_time: RowTime) -> Self {
assert!(up_to_time >= self.up_to_time);
Batch {
data: self.data,
up_to_time,
}
}

/// Replaces the batch's data column with the given data column.
/// The other existing columns and properties are preserved and validated
/// with the new data column.
///
/// Panics if the existing [BatchInfo] is empty (as there would be no corresponding
/// key columns for the data).
pub fn with_data(self, data: ArrayRef) -> Self {
if let Some(info) = self.data {
let data = Some(BatchInfo::new(data, info.time, info.subsort, info.key_hash));
Self {
data,
up_to_time: self.up_to_time,
}
} else {
panic!("Cannot replace data of empty batch");
}
}

// NOTE: The current execution logic expects `RecordBatch` outputs (as well as some existing
// testing functionality that is compatible with the old non-partitioned execution path. In the
// future, we should standardize on `Batch` which makes it easier to carry a primitive value out.
Expand Down Expand Up @@ -231,47 +261,61 @@ impl Batch {
/// Returns the rows less than the given time and
/// leaves the remaining rows in `self`.
pub fn split_up_to(&mut self, time_exclusive: RowTime) -> Option<Batch> {
let Some(data) = &mut self.data else {
return None;
};

if time_exclusive < data.min_present_time {
// time_exclusive < min_present <= max_present.
// none of the rows in the batch should be taken.
return None;
}
if let Some(data) = &mut self.data {
if time_exclusive < data.min_present_time {
// time_exclusive < min_present <= max_present.
// none of the rows in the batch should be taken.
return None;
}

if data.max_present_time < time_exclusive {
// min_present <= max_present < time_exclusive
// all rows should be taken
return Some(Batch {
data: self.data.take(),
// Even though we took all the rows, the batch
// we return is only as complete as the original
// batch. There may be other batches after this
// that have equal rows less than time exclusive.
up_to_time: self.up_to_time,
});
}
if data.max_present_time < time_exclusive {
// min_present <= max_present < time_exclusive
// all rows should be taken
return Some(Batch {
data: self.data.take(),
// Even though we took all rows, we need to pick the
// minimum up_to_time. If
// 1) self.up_to_time < time_exclusive: the input may have more rows
// between self.up_to_time and time_exclusive.
// 2) self.up_to_time > time_exclusive: We requested a split at
// time_exclusive, so we can't return a batch with a greater time.
up_to_time: self.up_to_time.min(time_exclusive),
});
}

// If we reach this point, then we need to actually split the data.
debug_assert!(time_exclusive <= self.up_to_time);

let split = data.split_up_to(time_exclusive);
if let Some(data) = split {
// Use the new max_present_time as the up_to_time.
let up_to_time = data.time().value(data.time.len() - 1).into();
Some(Batch {
data: Some(data),
// We can be complete up to time_exclusive because it is less
// than the time this batch was complete to. We put
// all of the rows this batch had up to that time in the result,
// and only left the batches after that time.
up_to_time,
})
// If we reach this point, then we need to actually split the data.
debug_assert!(time_exclusive <= self.up_to_time);

let split = data.split_up_to(time_exclusive);
if let Some(data) = split {
// Use the new max_present_time as the up_to_time.
let up_to_time = data.time().value(data.time.len() - 1).into();
Some(Batch {
data: Some(data),
// We can be complete up to time_exclusive because it is less
// than the time this batch was complete to. We put
// all of the rows this batch had up to that time in the result,
// and only left the batches after that time.
up_to_time,
})
} else {
// If there's no data the min_present_time must have been
// after the split time. We can safely return None without
// handling any up_to_time.
None
}
} else {
// If there's no data after we split at `time_exclusive`, return None
None
// No data, but we may have to produce an empty batch with
// a new up_to_time. Use the minimum, as
// 1) self.up_to_time < time_exclusive: the input may have more rows
// between self.up_to_time and time_exclusive.
// 2) self.up_to_time > time_exclusive: We requested a split at
// time_exclusive, so we can't return a batch with a greater time.
//
// TODO: Split up watermark from batch
// https://github.com/kaskada-ai/kaskada/issues/836
let new_up_to = time_exclusive.min(self.up_to_time);
Some(Batch::new_empty(new_up_to))
}
}

Expand Down Expand Up @@ -514,7 +558,7 @@ pub(crate) fn validate_bounds(
}

fn validate_batch_schema(schema: &Schema) -> error_stack::Result<(), Error> {
use arrow::datatypes::{DataType, TimeUnit};
use arrow::datatypes::TimeUnit;

// Validate the three key columns are present, non-null and have the right type.
validate_key_column(
Expand Down Expand Up @@ -677,8 +721,8 @@ static MINIMAL_SCHEMA: arrow_schema::SchemaRef = {
use arrow_schema::{Field, Schema};

let schema = Schema::new(vec![
Field::new("time", TimestampNanosecondType::DATA_TYPE, false),
Field::new("key_hash", UInt64Type::DATA_TYPE, false),
Field::new("time", TimestampNanosecondType::DATA_TYPE, true),
Field::new("key_hash", UInt64Type::DATA_TYPE, true),
]);
Arc::new(schema)
};
Expand All @@ -688,6 +732,9 @@ mod tests {

use crate::testing::arb_arrays::arb_batch;
use crate::{Batch, RowTime};
use arrow_array::cast::AsArray;
use arrow_array::new_empty_array;
use arrow_array::types::{ArrowPrimitiveType, TimestampNanosecondType, UInt64Type};
use itertools::Itertools;
use proptest::prelude::*;

Expand All @@ -701,6 +748,29 @@ mod tests {
assert_eq!(batch, Batch::minimal_from(vec![1], vec![0], 1));
}

#[test]
fn test_split_batch_with_up_to_time() {
// Has rows at [0, 1], but an up_to of 10
let mut batch = Batch::minimal_from(vec![0, 1], vec![0, 0], 10);

// Split at 5
let split = batch.split_up_to(RowTime::from_timestamp_ns(5)).unwrap();

let expected_split = Batch::minimal_from(vec![0, 1], vec![0, 0], 5);
assert_eq!(split, expected_split);

let empty = new_empty_array(&TimestampNanosecondType::DATA_TYPE);
let empty_u64 = new_empty_array(&UInt64Type::DATA_TYPE);
assert_eq!(
batch,
Batch::minimal_from(
empty.as_primitive().clone(),
empty_u64.as_primitive().clone(),
10
)
);
}

proptest::proptest! {
#[test]
fn test_split_in_batch(original in arb_batch(2..100)) {
Expand Down
4 changes: 4 additions & 0 deletions crates/sparrow-io/src/channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mod source;

pub use source::InMemoryBatches;
pub use source::InMemorySource;
15 changes: 14 additions & 1 deletion crates/sparrow-merge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,36 @@ testing = ["arrow-csv", "proptest"]

[dependencies]
anyhow.workspace = true
arrow.workspace = true
arrow-arith.workspace = true
arrow-array.workspace = true
arrow-csv = { workspace = true, optional = true }
arrow-schema.workspace = true
arrow-select.workspace = true
bit-set.workspace = true
derive_more.workspace = true
erased-serde.workspace = true
error-stack.workspace = true
itertools.workspace = true
parking_lot.workspace = true
proptest = { workspace = true, optional = true }
serde.workspace = true
smallvec.workspace = true
sparrow-arrow = { path = "../sparrow-arrow" }
sparrow-batch = { path = "../sparrow-batch", features = ["testing"] }
sparrow-batch = { path = "../sparrow-batch" }
sparrow-core = { path = "../sparrow-core" }
sparrow-instructions = { path = "../sparrow-instructions" }
sparrow-physical = { path = "../sparrow-physical" }
sparrow-scheduler = { path = "../sparrow-scheduler" }
tokio.workspace = true
tracing.workspace = true

[dev-dependencies]
arrow-csv.workspace = true
arrow-ord.workspace = true
insta.workspace = true
sparrow-batch = { path = "../sparrow-batch", features = ["testing"] }
postcard.workspace = true
proptest.workspace = true

[lib]
Expand Down
Loading

0 comments on commit 944b648

Please sign in to comment.