Skip to content

Commit

Permalink
bug: drains output channel when batching results (#763)
Browse files Browse the repository at this point in the history
Drains the output channel. Previously was causing queries to block on
send.

Puts the tokio runtime into an `Arc` to share it -- the `handle()` was
not working like I wanted since it was being dropped (a blocking method)
from an asynchronous context. Unsure why, but swapping to use `Arc`, as
is also recommended, works fine.

https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#sharing

Closes #775

---------

Co-authored-by: Ben Chambers <35960+bjchambers@users.noreply.github.com>
  • Loading branch information
jordanrfrazier and bjchambers authored Sep 27, 2023
1 parent 72c03a5 commit e68a331
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 56 deletions.
2 changes: 0 additions & 2 deletions crates/sparrow-runtime/src/execute/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ pub(super) fn write(
batch
};


if batch.num_rows() > max_batch_size {
for start in (0..batch.num_rows()).step_by(max_batch_size) {
let end = (start + max_batch_size).min(batch.num_rows());
Expand All @@ -121,7 +120,6 @@ pub(super) fn write(
yield post_process_batch(&sink_schema, batch, &key_hash_inverse).await;
}


if limit_rows && remaining == 0 {
break;
}
Expand Down
67 changes: 42 additions & 25 deletions crates/sparrow-session/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use futures::StreamExt;
use sparrow_api::kaskada::v1alpha::ExecuteResponse;

use crate::Error;

pub struct Execution {
/// Tokio runtme managing this execution.
rt: tokio::runtime::Runtime,
/// Tokio handle managing this execution.
handle: tokio::runtime::Handle,
/// Channel to receive output on.
output: tokio_stream::wrappers::ReceiverStream<RecordBatch>,
/// Future which resolves to the first error or None.
// Future that resolves to the first error, if one occurred.
status: Status,
/// Stop signal. Send `true` to stop execution.
stop_signal_rx: tokio::sync::watch::Sender<bool>,
Expand All @@ -27,21 +27,31 @@ enum Status {

impl Execution {
pub(super) fn new(
rt: tokio::runtime::Runtime,
handle: tokio::runtime::Handle,
output_rx: tokio::sync::mpsc::Receiver<RecordBatch>,
progress: BoxStream<'static, error_stack::Result<ExecuteResponse, Error>>,
stop_signal_rx: tokio::sync::watch::Sender<bool>,
schema: SchemaRef,
) -> Self {
let output = tokio_stream::wrappers::ReceiverStream::new(output_rx);

// Constructs a futures that resolves to the first error, if one occurred.
let status = Status::Running(Box::pin(async move {
let mut progress = progress;
while (progress.try_next().await?).is_some() {}
Ok(())
let mut errors = progress
.filter_map(|result| {
futures::future::ready(if let Err(e) = result { Some(e) } else { None })
})
.boxed();
let first_error = errors.next().await;
if let Some(first_error) = first_error {
Err(first_error)
} else {
Ok(())
}
}));

Self {
rt,
handle,
output,
status,
stop_signal_rx,
Expand All @@ -56,12 +66,12 @@ impl Execution {
/// status (and return) accordingly.
fn is_done(&mut self) -> error_stack::Result<(), Error> {
let result = match &mut self.status {
Status::Running(future) => {
Status::Running(progress) => {
// Based on the implementation of `FutureExt::now_or_never`:
let noop_waker = futures::task::noop_waker();
let mut cx = std::task::Context::from_waker(&noop_waker);

match future.as_mut().poll(&mut cx) {
match progress.as_mut().poll(&mut cx) {
std::task::Poll::Ready(x) => x,
_ => return Ok(()),
}
Expand Down Expand Up @@ -99,29 +109,36 @@ impl Execution {

pub fn next_blocking(&mut self) -> error_stack::Result<Option<RecordBatch>, Error> {
self.is_done()?;
Ok(self.rt.block_on(self.output.next()))
Ok(self.handle.block_on(self.output.next()))
}

pub async fn collect_all(self) -> error_stack::Result<Vec<RecordBatch>, Error> {
// TODO: For large outputs, we likely need to drain the output while waiting for the future.
match self.status {
Status::Running(future) => future.await?,
let progress = match self.status {
Status::Running(progress) => progress,
Status::Failed => error_stack::bail!(Error::ExecutionFailed),
_ => {}
Status::Completed => {
// If the progress channel has completed without error, we know that the output channel
// hasn't filled up, so we can go ahead and collect the output
return Ok(self.output.collect().await);
}
};

Ok(self.output.collect().await)
let output = self.output.collect::<Vec<_>>();

let (first_error, output) = futures::join!(progress, output);
if let Err(e) = first_error {
Err(e)
} else {
Ok(output)
}
}

pub fn collect_all_blocking(self) -> error_stack::Result<Vec<RecordBatch>, Error> {
// TODO: For large outputs, we likely need to drain the output while waiting for the future.
match self.status {
Status::Running(future) => self.rt.block_on(future)?,
Status::Failed => error_stack::bail!(Error::ExecutionFailed),
_ => {}
};

Ok(self.rt.block_on(self.output.collect()))
// In order to check the running status, we have to enter the runtime regardless,
// so there's no reason to check the status prior to entering the runtime
// here.
let handle = self.handle.clone();
handle.block_on(self.collect_all())
}
}

Expand Down
28 changes: 18 additions & 10 deletions crates/sparrow-session/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use uuid::Uuid;
use crate::execution::Execution;
use crate::{Error, Expr, Literal, Table};

#[derive(Default)]
pub struct Session {
data_context: DataContext,
dfg: Dfg,
Expand All @@ -34,6 +33,20 @@ pub struct Session {
/// udf as well.
udfs: HashMap<Uuid, Arc<dyn Udf>>,
object_store_registry: Arc<ObjectStoreRegistry>,
rt: tokio::runtime::Runtime,
}

impl Default for Session {
fn default() -> Self {
Self {
data_context: Default::default(),
dfg: Default::default(),
key_hash_inverse: Default::default(),
udfs: Default::default(),
object_store_registry: Default::default(),
rt: tokio::runtime::Runtime::new().expect("tokio runtime"),
}
}
}

#[derive(Default)]
Expand Down Expand Up @@ -454,13 +467,6 @@ impl Session {
.into_report()
.change_context(Error::Compile)?;

// Switch to the Tokio async pool. This seems gross.
// Create the runtime.
//
// TODO: Figure out how to asynchronously pass results back to Python?
let rt = tokio::runtime::Runtime::new()
.into_report()
.change_context(Error::Execute)?;
let (output_tx, output_rx) = tokio::sync::mpsc::channel(10);
let destination = Destination::Channel(output_tx);

Expand All @@ -481,7 +487,8 @@ impl Session {
});

// Hacky. Use the existing execution logic. This weird things with downloading checkpoints, etc.
let progress = rt
let progress = self
.rt
.block_on(sparrow_runtime::execute::execute_new(
plan,
destination,
Expand All @@ -495,8 +502,9 @@ impl Session {
.map_err(|e| e.change_context(Error::Execute))
.boxed();

let handle = self.rt.handle().clone();
Ok(Execution::new(
rt,
handle,
output_rx,
progress,
stop_signal_tx,
Expand Down
4 changes: 3 additions & 1 deletion python/pysrc/kaskada/_timestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,9 @@ def select(self, *args: str) -> Timestream:
"""
return Timestream._call("select_fields", self, *args)

def substring(self, start: Optional[int] = None, end: Optional[int] = None) -> Timestream:
def substring(
self, start: Optional[int] = None, end: Optional[int] = None
) -> Timestream:
"""Return a Timestream with a substring between the start and end indices.
Args:
Expand Down
10 changes: 9 additions & 1 deletion python/pysrc/kaskada/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,12 @@
from .source import Source


__all__ = ["Source", "CsvString", "Pandas", "JsonlFile", "JsonlString", "PyDict", "Parquet"]
__all__ = [
"Source",
"CsvString",
"Pandas",
"JsonlFile",
"JsonlString",
"PyDict",
"Parquet",
]
9 changes: 5 additions & 4 deletions python/pysrc/kaskada/sources/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,9 @@ async def create(
if schema is None:
if csv_string is None:
raise ValueError("Must provide schema or csv_string")
schema = pa.csv.read_csv(csv_string, parse_options=CsvString._parse_options).schema
schema = pa.csv.read_csv(
csv_string, parse_options=CsvString._parse_options
).schema
csv_string.seek(0)

source = CsvString(
Expand All @@ -303,7 +305,7 @@ async def add_string(self, csv_string: str | BytesIO) -> None:
content = pa.csv.read_csv(
csv_string,
convert_options=self._convert_options,
parse_options=CsvString._parse_options
parse_options=CsvString._parse_options,
)
for batch in content.to_batches():
await self._ffi_table.add_pyarrow(batch)
Expand Down Expand Up @@ -396,8 +398,7 @@ async def create(
async def add_file(self, path: str) -> None:
"""Add data to the source."""
batches = pa.json.read_json(
Source._get_absolute_path(path),
parse_options=self._parse_options
Source._get_absolute_path(path), parse_options=self._parse_options
)
for batch in batches.to_batches():
await self._ffi_table.add_pyarrow(batch)
Expand Down
2 changes: 1 addition & 1 deletion python/pysrc/kaskada/sources/source.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Provide the base-class for Kaskada sources."""
from typing import Literal, Optional
import os
from typing import Literal, Optional

import kaskada._ffi as _ffi
import pyarrow as pa
Expand Down
6 changes: 1 addition & 5 deletions python/pytests/filter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ async def test_filter_to_merge_preserves_interpolation(source, golden) -> None:
predicate = n < 9
golden.jsonl(
kd.record(
{
"n": n,
"predicate": predicate,
"filter_sum": n.sum().filter(predicate)
}
{"n": n, "predicate": predicate, "filter_sum": n.sum().filter(predicate)}
)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{"_time":"2022-01-04T14:38:31.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-04T14:38:31.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":499.48,"item":"0da9b3fd-2c92-4b87-92b0-5137eaf6ff75"}
{"_time":"2022-01-05T20:40:03.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-05T20:40:03.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":498.16,"item":"f9cdde05-40f9-48fd-812e-1c3936589184"}
{"_time":"2022-01-06T04:54:59.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-06T04:54:59.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":491.38,"item":"64cd0de2-02b8-4420-8b8a-57ad4d0b9aa2"}
{"_time":"2022-01-06T07:14:07.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-06T07:14:07.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":498.32,"item":"3940b205-50a6-4141-ab68-aa0464ae0f3d"}
{"_time":"2022-01-07T02:32:48.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-07T02:32:48.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":498.4,"item":"3c1f2b17-8bb0-43b9-8a52-e3d8d81fe311"}
{"_time":"2022-01-08T08:17:28.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-08T08:17:28.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":492.91,"item":"b5a8d6e1-9070-410d-bf44-72754b485faa"}
{"_time":"2022-01-09T15:12:23.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-09T15:12:23.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":493.92,"item":"9976f04f-3faf-46bd-80f6-1dc102632ec6"}
{"_time":"2022-01-10T02:11:28.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-10T02:11:28.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":495.04,"item":"ca02d3d3-a309-4b7b-ac12-29fa4a1a8704"}
{"_time":"2022-01-14T15:06:56.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-14T15:06:56.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":492.81,"item":"3940b205-50a6-4141-ab68-aa0464ae0f3d"}
{"_time":"2022-01-16T05:08:53.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-16T05:08:53.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":492.92,"item":"5a86942a-5bcc-41f7-9286-937b248caccc"}
{"_time":"2022-01-20T03:28:47.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-20T03:28:47.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":495.65,"item":"bcfd7a57-f36e-4b37-9b2d-795401f36459"}
{"_time":"2022-01-21T13:25:25.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-21T13:25:25.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":499.51,"item":"d6789f76-7ac6-415b-a2fa-8b56f80eef74"}
{"_time":"2022-01-23T06:10:21.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-23T06:10:21.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":491.33,"item":"d988eedb-2f3c-4ad5-82ab-7b1c25754ea0"}
{"_time":"2022-01-24T16:50:58.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-24T16:50:58.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":494.25,"item":"69718e27-44e6-4cb1-86ff-fc5b5d4c50a1"}
{"_time":"2022-01-26T20:56:58.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-26T20:56:58.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":492.23,"item":"87c91aeb-dba3-431e-bbda-f65f9164c64d"}
{"_time":"2022-01-26T22:57:18.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-26T22:57:18.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":491.84,"item":"804488a1-9724-465d-a596-1b6930510640"}
{"_time":"2022-01-29T08:46:35.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-29T08:46:35.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":490.18,"item":"bc323957-93e4-4aa8-8fc1-c73411e9ca0b"}
{"_time":"2022-01-29T17:21:29.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-29T17:21:29.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":491.56,"item":"ad811380-ac9c-4f6a-9015-ba2441abbff0"}
8 changes: 1 addition & 7 deletions python/pytests/len_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,4 @@ async def source() -> kd.sources.CsvString:

async def test_len(source, golden) -> None:
s = source.col("s")
golden.jsonl(
kd.record(
{
"len": s.len()
}
)
)
golden.jsonl(kd.record({"len": s.len()}))
22 changes: 22 additions & 0 deletions python/pytests/parquet_source_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,25 @@ async def test_read_parquet_with_subsort(golden) -> None:

await source.add_file("../testdata/purchases/purchases_part2.parquet")
golden.jsonl(source)


# Verifies that we drain the output and progress channels correctly.
#
# When the parquet file contains more rows than
# (CHANNEL_SIZE / MAX_BATCH_SIZE), the channels previously filled
# up, causing the sender to block. This test verifies that the
# channels correctly drain, allowing the sender to continue.
# See https://github.com/kaskada-ai/kaskada/issues/775
async def test_large_parquet_file(golden) -> None:
source = await kd.sources.Parquet.create(
"../testdata/parquet/purchases_100k.parquet",
time_column="time",
key_column="user",
)
user = source.col("user")
amount = source.col("amount")

# Add a filter to reduce the output file size while ensuring the entire
# file is still processed
predicate = user.eq("5fec83d4-f5c6-4943-ab05-2b6760330daf").and_(amount.gt(490))
golden.jsonl(source.filter(predicate))
Binary file added testdata/parquet/purchases_100k.parquet
Binary file not shown.

0 comments on commit e68a331

Please sign in to comment.