Skip to content

Commit

Permalink
Add a few more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jordanrfrazier committed Oct 19, 2023
1 parent e2b35ef commit 8ab851f
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 93 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/sparrow-sources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ tracing.workspace = true

[dev-dependencies]
tokio.workspace = true
arrow-select.workspace = true
static_init.workspace = true

[lib]
bench = false
Expand Down
201 changes: 108 additions & 93 deletions crates/sparrow-sources/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use std::sync::{Arc, RwLock};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, SchemaRef};
use error_stack::{IntoReportCompat, ResultExt};
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use futures::{Stream, StreamExt, TryStreamExt};

use sparrow_batch::Batch;
use sparrow_interfaces::source::{Source, SourceError};
Expand Down Expand Up @@ -177,7 +176,9 @@ impl InMemoryBatches {
///
/// The first batch will be the in-memory merged batch, and batches will be
/// added as they arrive.
pub fn subscribe(&self) -> BoxStream<'static, error_stack::Result<RecordBatch, SourceError>> {
pub fn subscribe(
&self,
) -> impl Stream<Item = error_stack::Result<RecordBatch, SourceError>> + 'static {
let (mut version, merged) = {
let read = self.current.read().unwrap();
(read.version, read.batch.clone())
Expand Down Expand Up @@ -235,35 +236,44 @@ mod tests {

use super::*;

#[tokio::test]
async fn test_subscribe_to_batches() {
#[static_init::dynamic]
static SCHEMA: Schema = {
Schema::new(vec![
Field::new("_time", TimestampNanosecondType::DATA_TYPE, false),
Field::new("_subsort", DataType::UInt64, false),
Field::new("_key_hash", DataType::UInt64, false),
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::UInt64, true),
])
};

fn batch1() -> RecordBatch {
let time: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2]));
let subsort: ArrayRef = Arc::new(UInt64Array::from(vec![0, 1, 2]));
let key: ArrayRef = Arc::new(UInt64Array::from(vec![0, 0, 0]));
let a: ArrayRef = Arc::new(Int32Array::from(vec![0, 1, 2]));
let b: ArrayRef = Arc::new(UInt64Array::from(vec![None, Some(1), Some(8)]));
let schema = Schema::new(vec![
Field::new("_time", TimestampNanosecondType::DATA_TYPE, false),
Field::new("_subsort", DataType::UInt64, false),
Field::new("_key_hash", DataType::UInt64, false),
Field::new("a", a.data_type().clone(), false),
Field::new("b", b.data_type().clone(), true),
]);
let schema = Arc::new(schema);
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![time, subsort.clone(), key.clone(), a, b],
)
.unwrap();

let time2: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![3, 4, 5]));
let a2: ArrayRef = Arc::new(Int32Array::from(vec![3, 4, 5]));
let b2: ArrayRef = Arc::new(UInt64Array::from(vec![Some(10), None, None]));
// Can reuse the subsort/key arrays
let batch2 =
RecordBatch::try_new(schema.clone(), vec![time2, subsort, key, a2, b2]).unwrap();
let schema = Arc::new(SCHEMA.clone());
RecordBatch::try_new(schema.clone(), vec![time, subsort, key, a, b]).unwrap()
}

let in_mem = InMemoryBatches::new(true, schema.clone());
fn batch2() -> RecordBatch {
let time: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![3, 4, 5]));
let subsort: ArrayRef = Arc::new(UInt64Array::from(vec![0, 1, 2]));
let key: ArrayRef = Arc::new(UInt64Array::from(vec![0, 0, 0]));
let a: ArrayRef = Arc::new(Int32Array::from(vec![3, 4, 5]));
let b: ArrayRef = Arc::new(UInt64Array::from(vec![Some(10), None, None]));
let schema = Arc::new(SCHEMA.clone());
RecordBatch::try_new(schema.clone(), vec![time, subsort, key, a, b]).unwrap()
}

#[tokio::test]
async fn test_subscribe_to_batches() {
let batch1 = batch1();
let batch2 = batch2();
let schema = Arc::new(SCHEMA.clone());

let in_mem = InMemoryBatches::new(true, schema);

let mut s1 = in_mem.subscribe();
let mut s2 = in_mem.subscribe();
Expand All @@ -287,32 +297,9 @@ mod tests {
#[tokio::test]
async fn test_subscribe_to_multiple_batches() {
// Sends multiple batches before reading

let time: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2]));
let subsort: ArrayRef = Arc::new(UInt64Array::from(vec![0, 1, 2]));
let key: ArrayRef = Arc::new(UInt64Array::from(vec![0, 0, 0]));
let a: ArrayRef = Arc::new(Int32Array::from(vec![0, 1, 2]));
let b: ArrayRef = Arc::new(UInt64Array::from(vec![None, Some(1), Some(8)]));
let schema = Schema::new(vec![
Field::new("_time", TimestampNanosecondType::DATA_TYPE, false),
Field::new("_subsort", DataType::UInt64, false),
Field::new("_key_hash", DataType::UInt64, false),
Field::new("a", a.data_type().clone(), false),
Field::new("b", b.data_type().clone(), true),
]);
let schema = Arc::new(schema);
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![time, subsort.clone(), key.clone(), a, b],
)
.unwrap();

let time2: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![3, 4, 5]));
let a2: ArrayRef = Arc::new(Int32Array::from(vec![3, 4, 5]));
let b2: ArrayRef = Arc::new(UInt64Array::from(vec![Some(10), None, None]));
// Can reuse the subsort/key arrays
let batch2 =
RecordBatch::try_new(schema.clone(), vec![time2, subsort, key, a2, b2]).unwrap();
let batch1 = batch1();
let batch2 = batch2();
let schema = Arc::new(SCHEMA.clone());

let in_mem = InMemoryBatches::new(true, schema.clone());

Expand All @@ -337,32 +324,9 @@ mod tests {
#[tokio::test]
async fn test_late_subscription_receives_merged_batch() {
// Verify later subscription gets the full merged batch

let time: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2]));
let subsort: ArrayRef = Arc::new(UInt64Array::from(vec![0, 1, 2]));
let key: ArrayRef = Arc::new(UInt64Array::from(vec![0, 0, 0]));
let a: ArrayRef = Arc::new(Int32Array::from(vec![0, 1, 2]));
let b: ArrayRef = Arc::new(UInt64Array::from(vec![None, Some(1), Some(8)]));
let schema = Schema::new(vec![
Field::new("_time", TimestampNanosecondType::DATA_TYPE, false),
Field::new("_subsort", DataType::UInt64, false),
Field::new("_key_hash", DataType::UInt64, false),
Field::new("a", a.data_type().clone(), false),
Field::new("b", b.data_type().clone(), true),
]);
let schema = Arc::new(schema);
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![time, subsort.clone(), key.clone(), a, b],
)
.unwrap();

let time2: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![3, 4, 5]));
let a2: ArrayRef = Arc::new(Int32Array::from(vec![3, 4, 5]));
let b2: ArrayRef = Arc::new(UInt64Array::from(vec![Some(10), None, None]));
// Can reuse the subsort/key arrays
let batch2 =
RecordBatch::try_new(schema.clone(), vec![time2, subsort, key, a2, b2]).unwrap();
let batch1 = batch1();
let batch2 = batch2();
let schema = Arc::new(SCHEMA.clone());

let in_mem = InMemoryBatches::new(true, schema.clone());

Expand All @@ -382,22 +346,73 @@ mod tests {
let b2_s1 = s1.next().await.unwrap().unwrap();
assert_eq!(batch2, b2_s1);

let time: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2, 3, 4, 5]));
let subsort: ArrayRef = Arc::new(UInt64Array::from(vec![0, 1, 2, 0, 1, 2]));
let key: ArrayRef = Arc::new(UInt64Array::from(vec![0, 0, 0, 0, 0, 0]));
let a: ArrayRef = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5]));
let b: ArrayRef = Arc::new(UInt64Array::from(vec![
None,
Some(1),
Some(8),
Some(10),
None,
None,
]));
let merged_batch =
RecordBatch::try_new(schema.clone(), vec![time, subsort, key, a, b]).unwrap();
arrow_select::concat::concat_batches(&batch1.schema(), &[batch1, batch2]).unwrap();

let b2_s2 = s2.next().await.unwrap().unwrap();
assert_eq!(merged_batch, b2_s2);
let b1_s2 = s2.next().await.unwrap().unwrap();
assert_eq!(merged_batch, b1_s2);
}

#[tokio::test]
async fn test_reads_current_batch() {
let batch1 = batch1();
let schema = Arc::new(SCHEMA.clone());

let in_mem = InMemoryBatches::new(true, schema);
in_mem.add_batch(batch1.clone()).await.unwrap();

let received = in_mem.current().unwrap();
assert_eq!(batch1, received);

let batch2 = batch2();
in_mem.add_batch(batch2.clone()).await.unwrap();

let received = in_mem.current().unwrap();
let expected =
arrow_select::concat::concat_batches(&batch1.schema(), &[batch1, batch2]).unwrap();
assert_eq!(received, expected);
}

#[tokio::test]
async fn test_non_queryable_reads_empty_current_batch() {
let batch1 = batch1();
let schema = Arc::new(SCHEMA.clone());

let in_mem = InMemoryBatches::new(false, schema);
in_mem.add_batch(batch1.clone()).await.unwrap();

assert!(in_mem.current().is_none());
}

#[tokio::test]
async fn test_non_queryable_subscription() {
let batch1 = batch1();
let batch2 = batch2();
let schema = Arc::new(SCHEMA.clone());

let in_mem = InMemoryBatches::new(false, schema.clone());

// Subscribe first stream
let mut s1 = in_mem.subscribe();

// Send both batches
in_mem.add_batch(batch1.clone()).await.unwrap();

// Subscribe second stream
let mut s2 = in_mem.subscribe();

// Send the second batch
in_mem.add_batch(batch2.clone()).await.unwrap();

let b1_s1 = s1.next().await.unwrap().unwrap();
assert_eq!(batch1, b1_s1);

let b2_s1 = s1.next().await.unwrap().unwrap();
assert_eq!(batch2, b2_s1);

// Second subscription should only see second batch
let b1_s2 = s2.next().await.unwrap().unwrap();
println!("read 3");
assert_eq!(batch2, b1_s2);
}
}

0 comments on commit 8ab851f

Please sign in to comment.