Skip to content

Commit

Permalink
feat: Async back-pressure on adding to a source
Browse files Browse the repository at this point in the history
  • Loading branch information
bjchambers committed Sep 8, 2023
1 parent d0b0f85 commit 770d17b
Show file tree
Hide file tree
Showing 13 changed files with 179 additions and 80 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ arrow-ord = { version = "43.0.0" }
arrow-schema = { version = "43.0.0", features = ["serde"] }
arrow-select = { version = "43.0.0" }
arrow-string = { version = "43.0.0" }
async-broadcast = "0.5.1"
async-once-cell = "0.5.3"
async-stream = "0.3.4"
async-trait = "0.1.68"
Expand Down
1 change: 1 addition & 0 deletions crates/sparrow-merge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ arrow-array.workspace = true
arrow-csv = { workspace = true, optional = true }
arrow-schema.workspace = true
arrow-select.workspace = true
async-broadcast.workspace = true
async-stream.workspace = true
bit-set.workspace = true
derive_more.workspace = true
Expand Down
31 changes: 18 additions & 13 deletions crates/sparrow-merge/src/in_memory_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ impl error_stack::Context for Error {}
pub struct InMemoryBatches {
retained: bool,
current: RwLock<Current>,
updates: tokio::sync::broadcast::Sender<(usize, RecordBatch)>,
sender: async_broadcast::Sender<(usize, RecordBatch)>,
/// A subscriber that is never used -- it exists only to keep the sender
/// alive.
_subscriber: tokio::sync::broadcast::Receiver<(usize, RecordBatch)>,
_receiver: async_broadcast::InactiveReceiver<(usize, RecordBatch)>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -62,20 +62,24 @@ impl Current {

impl InMemoryBatches {
pub fn new(retained: bool, schema: SchemaRef) -> Self {
let (updates, _subscriber) = tokio::sync::broadcast::channel(10);
let (mut sender, receiver) = async_broadcast::broadcast(10);

// Don't wait for a receiver. If no-one receives, `send` will fail.
sender.set_await_active(false);

let current = RwLock::new(Current::new(schema.clone()));
Self {
retained,
current,
updates,
_subscriber,
sender,
_receiver: receiver.deactivate(),
}
}

/// Add a batch, merging it into the in-memory version.
///
/// Publishes the new batch to the subscribers.
pub fn add_batch(&self, batch: RecordBatch) -> error_stack::Result<(), Error> {
pub async fn add_batch(&self, batch: RecordBatch) -> error_stack::Result<(), Error> {
if batch.num_rows() == 0 {
return Ok(());
}
Expand All @@ -89,10 +93,11 @@ impl InMemoryBatches {
write.version
};

self.updates
.send((new_version, batch))
.into_report()
.change_context(Error::Add)?;
let send_result = self.sender.broadcast((new_version, batch)).await;
if send_result.is_err() {
assert!(!self.sender.is_closed());
tracing::info!("No-one subscribed for new batch");
}
Ok(())
}

Expand All @@ -107,7 +112,7 @@ impl InMemoryBatches {
let read = self.current.read().unwrap();
(read.version, read.batch.clone())
};
let mut recv = self.updates.subscribe();
let mut recv = self.sender.new_receiver();

async_stream::try_stream! {
tracing::info!("Starting subscriber with version {version}");
Expand All @@ -124,11 +129,11 @@ impl InMemoryBatches {
tracing::warn!("Ignoring old version {recv_version}");
}
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
Err(async_broadcast::RecvError::Closed) => {
tracing::info!("Sender closed.");
break;
},
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
Err(async_broadcast::RecvError::Overflowed(_)) => {
Err(Error::ReceiverLagged)?;
}
}
Expand Down
21 changes: 0 additions & 21 deletions crates/sparrow-runtime/src/key_hash_inverse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,27 +321,6 @@ impl ThreadSafeKeyHashInverse {
}
}

pub fn blocking_add(
&self,
keys: &dyn Array,
key_hashes: &UInt64Array,
) -> error_stack::Result<(), Error> {
error_stack::ensure!(
keys.len() == key_hashes.len(),
Error::MismatchedLengths {
keys: keys.len(),
key_hashes: key_hashes.len()
}
);
let has_new_keys = self.key_map.blocking_read().has_new_keys(key_hashes);

if has_new_keys {
self.key_map.blocking_write().add(keys, key_hashes)
} else {
Ok(())
}
}

/// Stores the KeyHashInverse to the compute store.
///
/// This method is thread-safe and acquires the read-lock.
Expand Down
16 changes: 8 additions & 8 deletions crates/sparrow-runtime/src/prepare/preparer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use arrow::array::{ArrayRef, UInt64Array};
Expand Down Expand Up @@ -31,7 +32,7 @@ pub struct Preparer {
prepared_schema: SchemaRef,
time_column_name: String,
subsort_column_name: Option<String>,
next_subsort: u64,
next_subsort: AtomicU64,
key_column_name: String,
time_multiplier: Option<i64>,
}
Expand All @@ -51,7 +52,7 @@ impl Preparer {
prepared_schema,
time_column_name,
subsort_column_name,
next_subsort: prepare_hash,
next_subsort: prepare_hash.into(),
key_column_name,
time_multiplier,
})
Expand All @@ -66,10 +67,7 @@ impl Preparer {
/// - This computes and adds the key columns.
/// - This sorts the batch by time, subsort and key hash.
/// - This adds or casts columns as needed.
///
/// Self is mutated as necessary to ensure the `subsort` column is increasing, if
/// it is added.
pub fn prepare_batch(&mut self, batch: RecordBatch) -> error_stack::Result<RecordBatch, Error> {
pub fn prepare_batch(&self, batch: RecordBatch) -> error_stack::Result<RecordBatch, Error> {
let time = get_required_column(&batch, &self.time_column_name)?;
let time = cast_to_timestamp(time, self.time_multiplier)?;

Expand All @@ -80,8 +78,10 @@ impl Preparer {
.into_report()
.change_context_lazy(|| Error::ConvertSubsort(subsort.data_type().clone()))?
} else {
let subsort: UInt64Array = (self.next_subsort..).take(num_rows).collect();
self.next_subsort += num_rows as u64;
let subsort_start = self
.next_subsort
.fetch_add(num_rows as u64, Ordering::SeqCst);
let subsort: UInt64Array = (subsort_start..).take(num_rows).collect();
Arc::new(subsort)
};

Expand Down
6 changes: 4 additions & 2 deletions crates/sparrow-session/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl Table {
self.preparer.schema()
}

pub fn add_data(&mut self, batch: RecordBatch) -> error_stack::Result<(), Error> {
pub async fn add_data(&self, batch: RecordBatch) -> error_stack::Result<(), Error> {
let prepared = self
.preparer
.prepare_batch(batch)
Expand All @@ -75,11 +75,13 @@ impl Table {
let key_hashes = prepared.column(2).as_primitive();
let keys = prepared.column(self.key_column);
self.key_hash_inverse
.blocking_add(keys.as_ref(), key_hashes)
.add(keys.as_ref(), key_hashes)
.await
.change_context(Error::Prepare)?;

self.in_memory_batches
.add_batch(prepared)
.await
.change_context(Error::Prepare)?;
Ok(())
}
Expand Down
11 changes: 11 additions & 0 deletions python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion python/pysrc/kaskada/_ffi.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class Expr:
def execute(self, options: Optional[_ExecutionOptions] = None) -> Execution: ...
def grouping(self) -> Optional[str]: ...

class Table(Expr):
class Table:
def __init__(
self,
session: Session,
Expand All @@ -61,6 +61,7 @@ class Table(Expr):
@property
def name(self) -> str: ...
def add_pyarrow(self, data: pa.RecordBatch) -> None: ...
def expr(self) -> Expr: ...

class Udf(object):
def __init__(self, result_ty: str, result_fn: Callable[..., pa.Array]) -> None: ...
Loading

0 comments on commit 770d17b

Please sign in to comment.