diff --git a/Cargo.lock b/Cargo.lock index 4d8eea26a..10d9cc725 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -421,6 +421,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "async-broadcast" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c48ccdbf6ca6b121e0f586cbc0e73ae440e56c67c30fa0873b4e110d9c26d2b" +dependencies = [ + "event-listener", + "futures-core", +] + [[package]] name = "async-channel" version = "1.9.0" @@ -4598,6 +4608,7 @@ dependencies = [ "arrow-ord", "arrow-schema", "arrow-select", + "async-broadcast", "async-stream", "bit-set", "derive_more", diff --git a/Cargo.toml b/Cargo.toml index 1a6abd684..758966c74 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ arrow-ord = { version = "43.0.0" } arrow-schema = { version = "43.0.0", features = ["serde"] } arrow-select = { version = "43.0.0" } arrow-string = { version = "43.0.0" } +async-broadcast = "0.5.1" async-once-cell = "0.5.3" async-stream = "0.3.4" async-trait = "0.1.68" diff --git a/crates/sparrow-merge/Cargo.toml b/crates/sparrow-merge/Cargo.toml index 71ba75939..4af9ae59a 100644 --- a/crates/sparrow-merge/Cargo.toml +++ b/crates/sparrow-merge/Cargo.toml @@ -19,6 +19,7 @@ arrow-array.workspace = true arrow-csv = { workspace = true, optional = true } arrow-schema.workspace = true arrow-select.workspace = true +async-broadcast.workspace = true async-stream.workspace = true bit-set.workspace = true derive_more.workspace = true diff --git a/crates/sparrow-merge/src/in_memory_batches.rs b/crates/sparrow-merge/src/in_memory_batches.rs index 87e63a110..da744a998 100644 --- a/crates/sparrow-merge/src/in_memory_batches.rs +++ b/crates/sparrow-merge/src/in_memory_batches.rs @@ -2,7 +2,7 @@ use std::sync::RwLock; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; -use error_stack::{IntoReport, IntoReportCompat, ResultExt}; +use error_stack::{IntoReportCompat, ResultExt}; use futures::Stream; use crate::old::homogeneous_merge; @@ -20,12 +20,15 @@ impl error_stack::Context for Error {} /// Struct for managing in-memory batches. #[derive(Debug)] pub struct InMemoryBatches { - retained: bool, + /// Whether rows added will be available for interactive queries. + /// If False, rows will be discarded after being sent to any active + /// materializations. + queryable: bool, current: RwLock, - updates: tokio::sync::broadcast::Sender<(usize, RecordBatch)>, + sender: async_broadcast::Sender<(usize, RecordBatch)>, /// A subscriber that is never used -- it exists only to keep the sender /// alive. - _subscriber: tokio::sync::broadcast::Receiver<(usize, RecordBatch)>, + _receiver: async_broadcast::InactiveReceiver<(usize, RecordBatch)>, } #[derive(Debug)] @@ -61,38 +64,43 @@ impl Current { } impl InMemoryBatches { - pub fn new(retained: bool, schema: SchemaRef) -> Self { - let (updates, _subscriber) = tokio::sync::broadcast::channel(10); + pub fn new(queryable: bool, schema: SchemaRef) -> Self { + let (mut sender, receiver) = async_broadcast::broadcast(10); + + // Don't wait for a receiver. If no-one receives, `send` will fail. + sender.set_await_active(false); + let current = RwLock::new(Current::new(schema.clone())); Self { - retained, + queryable, current, - updates, - _subscriber, + sender, + _receiver: receiver.deactivate(), } } /// Add a batch, merging it into the in-memory version. /// /// Publishes the new batch to the subscribers. - pub fn add_batch(&self, batch: RecordBatch) -> error_stack::Result<(), Error> { + pub async fn add_batch(&self, batch: RecordBatch) -> error_stack::Result<(), Error> { if batch.num_rows() == 0 { return Ok(()); } let new_version = { let mut write = self.current.write().map_err(|_| Error::Add)?; - if self.retained { + if self.queryable { write.add_batch(&batch)?; } write.version += 1; write.version }; - self.updates - .send((new_version, batch)) - .into_report() - .change_context(Error::Add)?; + let send_result = self.sender.broadcast((new_version, batch)).await; + if send_result.is_err() { + assert!(!self.sender.is_closed()); + tracing::info!("No-one subscribed for new batch"); + } Ok(()) } @@ -107,7 +115,7 @@ impl InMemoryBatches { let read = self.current.read().unwrap(); (read.version, read.batch.clone()) }; - let mut recv = self.updates.subscribe(); + let mut recv = self.sender.new_receiver(); async_stream::try_stream! { tracing::info!("Starting subscriber with version {version}"); @@ -126,11 +134,11 @@ impl InMemoryBatches { tracing::warn!("Ignoring old version {recv_version}"); } } - Err(tokio::sync::broadcast::error::RecvError::Closed) => { + Err(async_broadcast::RecvError::Closed) => { tracing::info!("Sender closed."); break; }, - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { + Err(async_broadcast::RecvError::Overflowed(_)) => { Err(Error::ReceiverLagged)?; } } diff --git a/crates/sparrow-runtime/src/key_hash_inverse.rs b/crates/sparrow-runtime/src/key_hash_inverse.rs index d145aa19b..40313006a 100644 --- a/crates/sparrow-runtime/src/key_hash_inverse.rs +++ b/crates/sparrow-runtime/src/key_hash_inverse.rs @@ -324,27 +324,6 @@ impl ThreadSafeKeyHashInverse { } } - pub fn blocking_add( - &self, - keys: &dyn Array, - key_hashes: &UInt64Array, - ) -> error_stack::Result<(), Error> { - error_stack::ensure!( - keys.len() == key_hashes.len(), - Error::MismatchedLengths { - keys: keys.len(), - key_hashes: key_hashes.len() - } - ); - let has_new_keys = self.key_map.blocking_read().has_new_keys(key_hashes); - - if has_new_keys { - self.key_map.blocking_write().add(keys, key_hashes) - } else { - Ok(()) - } - } - /// Stores the KeyHashInverse to the compute store. /// /// This method is thread-safe and acquires the read-lock. diff --git a/crates/sparrow-runtime/src/prepare/preparer.rs b/crates/sparrow-runtime/src/prepare/preparer.rs index 40859a882..91b2841dc 100644 --- a/crates/sparrow-runtime/src/prepare/preparer.rs +++ b/crates/sparrow-runtime/src/prepare/preparer.rs @@ -1,3 +1,4 @@ +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use arrow::array::{ArrayRef, UInt64Array}; @@ -31,7 +32,7 @@ pub struct Preparer { prepared_schema: SchemaRef, time_column_name: String, subsort_column_name: Option, - next_subsort: u64, + next_subsort: AtomicU64, key_column_name: String, time_multiplier: Option, } @@ -51,7 +52,7 @@ impl Preparer { prepared_schema, time_column_name, subsort_column_name, - next_subsort: prepare_hash, + next_subsort: prepare_hash.into(), key_column_name, time_multiplier, }) @@ -66,10 +67,7 @@ impl Preparer { /// - This computes and adds the key columns. /// - This sorts the batch by time, subsort and key hash. /// - This adds or casts columns as needed. - /// - /// Self is mutated as necessary to ensure the `subsort` column is increasing, if - /// it is added. - pub fn prepare_batch(&mut self, batch: RecordBatch) -> error_stack::Result { + pub fn prepare_batch(&self, batch: RecordBatch) -> error_stack::Result { let time = get_required_column(&batch, &self.time_column_name)?; let time = cast_to_timestamp(time, self.time_multiplier)?; @@ -80,8 +78,10 @@ impl Preparer { .into_report() .change_context_lazy(|| Error::ConvertSubsort(subsort.data_type().clone()))? } else { - let subsort: UInt64Array = (self.next_subsort..).take(num_rows).collect(); - self.next_subsort += num_rows as u64; + let subsort_start = self + .next_subsort + .fetch_add(num_rows as u64, Ordering::SeqCst); + let subsort: UInt64Array = (subsort_start..).take(num_rows).collect(); Arc::new(subsort) }; diff --git a/crates/sparrow-session/src/session.rs b/crates/sparrow-session/src/session.rs index 698f0917e..b499ed100 100644 --- a/crates/sparrow-session/src/session.rs +++ b/crates/sparrow-session/src/session.rs @@ -87,7 +87,7 @@ impl Session { name: &str, schema: SchemaRef, time_column_name: &str, - retained: bool, + queryable: bool, subsort_column_name: Option<&str>, key_column_name: &str, grouping_name: Option<&str>, @@ -150,7 +150,7 @@ impl Session { key_hash_inverse, key_column, expr, - retained, + queryable, time_unit, ) } diff --git a/crates/sparrow-session/src/table.rs b/crates/sparrow-session/src/table.rs index e39fbba65..f25d5af50 100644 --- a/crates/sparrow-session/src/table.rs +++ b/crates/sparrow-session/src/table.rs @@ -26,7 +26,7 @@ impl Table { key_hash_inverse: Arc, key_column: usize, expr: Expr, - retained: bool, + queryable: bool, time_unit: Option<&str>, ) -> error_stack::Result { let prepared_fields: Fields = KEY_FIELDS @@ -38,7 +38,7 @@ impl Table { let prepare_hash = 0; assert!(table_info.in_memory.is_none()); - let in_memory_batches = Arc::new(InMemoryBatches::new(retained, prepared_schema.clone())); + let in_memory_batches = Arc::new(InMemoryBatches::new(queryable, prepared_schema.clone())); table_info.in_memory = Some(in_memory_batches.clone()); let preparer = Preparer::new( @@ -66,7 +66,7 @@ impl Table { self.preparer.schema() } - pub fn add_data(&mut self, batch: RecordBatch) -> error_stack::Result<(), Error> { + pub async fn add_data(&self, batch: RecordBatch) -> error_stack::Result<(), Error> { let prepared = self .preparer .prepare_batch(batch) @@ -75,11 +75,13 @@ impl Table { let key_hashes = prepared.column(2).as_primitive(); let keys = prepared.column(self.key_column); self.key_hash_inverse - .blocking_add(keys.as_ref(), key_hashes) + .add(keys.as_ref(), key_hashes) + .await .change_context(Error::Prepare)?; self.in_memory_batches .add_batch(prepared) + .await .change_context(Error::Prepare)?; Ok(()) } diff --git a/examples/event-api/server.py b/examples/event-api/server.py index c6941df49..ea054d253 100755 --- a/examples/event-api/server.py +++ b/examples/event-api/server.py @@ -13,15 +13,14 @@ async def main(): # Initialize event source with schema from historical data. events = kd.sources.PyDict( - rows = [], - schema = pa.schema([ + schema=pa.schema([ pa.field("ts", pa.float64()), pa.field("user", pa.string()), pa.field("request_id", pa.string()), ]), - time_column = "ts", - key_column = "user", - time_unit = "s", + time_column="ts", + key_column="user", + time_unit="s", retained=False, ) diff --git a/examples/slackbot/Notebook.ipynb b/examples/slackbot/Notebook.ipynb index 0708159eb..de745351f 100644 --- a/examples/slackbot/Notebook.ipynb +++ b/examples/slackbot/Notebook.ipynb @@ -119,10 +119,11 @@ "metadata": {}, "outputs": [], "source": [ + "import asyncio\n", "import pandas\n", "import sparrow_pi.sources as sources\n", "\n", - "messages = kt.sources.Parquet(\"./messages.parquet\", time = \"ts\", entity = \"channel\")\n", + "messages = await kt.sources.Parquet.create(\"./messages.parquet\", time = \"ts\", entity = \"channel\")\n", "messages = messages.with_key(kt.record({ # !!!\n", " \"channel\": messages.col(\"channel\"),\n", " \"thread\": messages.col(\"thread_ts\"),\n", diff --git a/python/Cargo.lock b/python/Cargo.lock index 4434298d3..7ca902638 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -393,6 +393,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "async-broadcast" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c48ccdbf6ca6b121e0f586cbc0e73ae440e56c67c30fa0873b4e110d9c26d2b" +dependencies = [ + "event-listener", + "futures-core", +] + [[package]] name = "async-channel" version = "1.9.0" @@ -3850,6 +3860,7 @@ dependencies = [ "arrow-csv", "arrow-schema", "arrow-select", + "async-broadcast", "async-stream", "bit-set", "derive_more", diff --git a/python/docs/source/examples/time_centric.ipynb b/python/docs/source/examples/time_centric.ipynb index 243f537ea..904e2203d 100644 --- a/python/docs/source/examples/time_centric.ipynb +++ b/python/docs/source/examples/time_centric.ipynb @@ -125,6 +125,8 @@ "metadata": {}, "outputs": [], "source": [ + "import asyncio\n", + "\n", "# For demo simplicity, instead of a CSV file, we read and then parse data from a\n", "# CSV string. Kaskadaa\n", "event_data_string = \"\"\"\n", @@ -151,7 +153,7 @@ " ev_00020,2022-01-01 22:20:00,user_002,view_item,0\n", "\"\"\"\n", "\n", - "events = kd.sources.CsvString(\n", + "events = await kd.sources.CsvString.create(\n", " event_data_string, time_column=\"event_at\", key_column=\"entity_id\"\n", ")\n", "\n", diff --git a/python/docs/source/guide/entities.md b/python/docs/source/guide/entities.md index 83eb8cd46..0b1285f35 100644 --- a/python/docs/source/guide/entities.md +++ b/python/docs/source/guide/entities.md @@ -67,6 +67,7 @@ This is helpful since the _feature vector_ for an entity will depend only on eve --- tags: [hide-input] --- +import asyncio import kaskada as kd kd.init_session() data = "\n".join( @@ -79,7 +80,7 @@ data = "\n".join( "1996-12-23T16:40:01,A,12", ] ) -multi_entity = kd.sources.CsvString(data, time_column="time", key_column="key") +multi_entity = await kd.sources.CsvString.create(data, time_column="time", key_column="key") kd.plot.render( kd.plot.Plot(multi_entity.col("m"), name="m"), diff --git a/python/docs/source/guide/quickstart.md b/python/docs/source/guide/quickstart.md index 7f9b9c3de..b0e17207c 100644 --- a/python/docs/source/guide/quickstart.md +++ b/python/docs/source/guide/quickstart.md @@ -28,6 +28,7 @@ The following Python code imports the Kaskada library, creates a session, and lo It then runs a query to produce a Pandas DataFrame. ```{code-cell} +import asyncio import kaskada as kd kd.init_session() content = "\n".join( @@ -41,6 +42,6 @@ content = "\n".join( "1996-12-19T16:40:02,A,,", ] ) -source = kd.sources.CsvString(content, time_column="time", key_column="key") +source = await kd.sources.CsvString.create(content, time_column="time", key_column="key") source.select("m", "n").extend({"sum_m": source.col("m").sum() }).to_pandas() ``` \ No newline at end of file diff --git a/python/docs/source/guide/timestreams.md b/python/docs/source/guide/timestreams.md index d935e6562..a6b61c488 100644 --- a/python/docs/source/guide/timestreams.md +++ b/python/docs/source/guide/timestreams.md @@ -21,6 +21,7 @@ This makes it easy to focus on events happening over time and how aggregations c --- tags: [hide-input] --- +import asyncio import kaskada as kd kd.init_session() data = "\n".join( @@ -33,7 +34,7 @@ data = "\n".join( "1996-12-23T16:40:01,A,12", ] ) -multi_entity = kd.sources.CsvString(data, time_column="time", key_column="key") +multi_entity = await kd.sources.CsvString.create(data, time_column="time", key_column="key") kd.plot.render( kd.plot.Plot(multi_entity.col("m"), name="m") diff --git a/python/docs/source/guide/tour.md b/python/docs/source/guide/tour.md index 146054ed9..487fa055a 100644 --- a/python/docs/source/guide/tour.md +++ b/python/docs/source/guide/tour.md @@ -24,6 +24,7 @@ The initial setup / data is below. --- tags: [hide-cell] --- +import asyncio import kaskada as kd kd.init_session() single_entity = "\n".join( @@ -36,7 +37,7 @@ single_entity = "\n".join( "1996-12-24T16:40:02,A,,", ] ) -single_entity = kd.sources.CsvString(single_entity, time_column="time", key_column="key") +single_entity = await kd.sources.CsvString.create(single_entity, time_column="time", key_column="key") ``` ## Events and Aggregations diff --git a/python/docs/source/index.md b/python/docs/source/index.md index 92c98c6e6..3a398a0ee 100644 --- a/python/docs/source/index.md +++ b/python/docs/source/index.md @@ -55,11 +55,12 @@ Kaskada was built by core contributors to [Apache Beam](https://beam.apache.org/ The core of BeepGPT's real-time processing requires only a few lines of code using Kaskada: ```python +import asyncio import kaskada as kd kd.init_session() # Bootstrap from historical data -messages = kd.sources.PyDict( +messages = await kd.sources.PyDict.create( rows = pyarrow.parquet.read_table("./messages.parquet") .to_pylist(), time_column = "ts", diff --git a/python/pyproject.toml b/python/pyproject.toml index ff533c7a6..ec4c8b16b 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -155,4 +155,5 @@ show_error_context = true [tool.pytest.ini_options] testpaths = [ "pytests", -] \ No newline at end of file +] +asyncio_mode = "auto" diff --git a/python/pysrc/kaskada/_ffi.pyi b/python/pysrc/kaskada/_ffi.pyi index 7da90e133..4a8a239f9 100644 --- a/python/pysrc/kaskada/_ffi.pyi +++ b/python/pysrc/kaskada/_ffi.pyi @@ -45,7 +45,7 @@ class Expr: def execute(self, options: Optional[_ExecutionOptions] = None) -> Execution: ... def grouping(self) -> Optional[str]: ... -class Table(Expr): +class Table: def __init__( self, session: Session, @@ -53,14 +53,15 @@ class Table(Expr): time_column: str, key_column: str, schema: pa.Schema, - retained: bool, + queryable: bool, subsort_column: Optional[str], grouping_name: Optional[str], time_unit: Optional[str], ) -> None: ... @property def name(self) -> str: ... - def add_pyarrow(self, data: pa.RecordBatch) -> None: ... + async def add_pyarrow(self, data: pa.RecordBatch) -> None: ... + def expr(self) -> Expr: ... class Udf(object): def __init__(self, result_ty: str, result_fn: Callable[..., pa.Array]) -> None: ... diff --git a/python/pysrc/kaskada/sources/arrow.py b/python/pysrc/kaskada/sources/arrow.py index 36fb7a57b..cd7f6f4db 100644 --- a/python/pysrc/kaskada/sources/arrow.py +++ b/python/pysrc/kaskada/sources/arrow.py @@ -18,17 +18,50 @@ class Pandas(Source): def __init__( self, - dataframe: pd.DataFrame, *, time_column: str, key_column: str, + schema: pa.Schema, subsort_column: Optional[str] = None, - schema: Optional[pa.Schema] = None, grouping_name: Optional[str] = None, time_unit: Optional[TimeUnit] = None, ) -> None: """Create a source reading Pandas DataFrames. + Args: + time_column: The name of the column containing the time. + key_column: The name of the column containing the key. + schema: The schema to use. + subsort_column: The name of the column containing the subsort. + If not provided, the subsort will be assigned by the system. + grouping_name: The name of the group associated with each key. + This is used to ensure implicit joins are only performed between data grouped + by the same entity. + time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`. + If not specified (and not specified in the data), nanosecond will be assumed. + """ + super().__init__( + schema=schema, + time_column=time_column, + key_column=key_column, + subsort_column=subsort_column, + grouping_name=grouping_name, + time_unit=time_unit, + ) + + @staticmethod + async def create( + dataframe: Optional[pd.DataFrame] = None, + *, + time_column: str, + key_column: str, + subsort_column: Optional[str] = None, + schema: Optional[pa.Schema] = None, + grouping_name: Optional[str] = None, + time_unit: Optional[TimeUnit] = None, + ) -> Pandas: + """Create a source reading Pandas DataFrames. + Args: dataframe: The DataFrame to start from. time_column: The name of the column containing the time. @@ -43,8 +76,11 @@ def __init__( If not specified (and not specified in the data), nanosecond will be assumed. """ if schema is None: + if dataframe is None: + raise ValueError("Must provide schema or dataframe") schema = pa.Schema.from_pandas(dataframe) - super().__init__( + + source = Pandas( schema=schema, time_column=time_column, key_column=key_column, @@ -52,13 +88,16 @@ def __init__( grouping_name=grouping_name, time_unit=time_unit, ) - self.add_data(dataframe) - def add_data(self, data: pd.DataFrame) -> None: + if dataframe is not None: + await source.add_data(dataframe) + return source + + async def add_data(self, data: pd.DataFrame) -> None: """Add data to the source.""" table = pa.Table.from_pandas(data, self._schema, preserve_index=False) for batch in table.to_batches(): - self._ffi_table.add_pyarrow(batch) + await self._ffi_table.add_pyarrow(batch) class PyDict(Source): @@ -66,40 +105,36 @@ class PyDict(Source): def __init__( self, - rows: dict | list[dict], *, time_column: str, key_column: str, - retained: bool = True, + schema: pa.Schema, + queryable: bool = True, subsort_column: Optional[str] = None, - schema: Optional[pa.Schema] = None, grouping_name: Optional[str] = None, time_unit: Optional[TimeUnit] = None, ) -> None: """Create a source reading from rows represented as dicts. Args: - rows: One or more rows represented as dicts. time_column: The name of the column containing the time. key_column: The name of the column containing the key. - retained: Whether added rows should be retained for queries. - If True, rows (both provided to the constructor and added later) will be retained + schema: The schema to use. + queryable: Whether added rows will be available for running queries. + If True, rows (both provided to the constructor and added later) will be available for interactive queries. If False, rows will be discarded after being sent to any running materializations. Consider setting this to False when the source will only be used for materialization to avoid unnecessary memory usage. subsort_column: The name of the column containing the subsort. If not provided, the subsort will be assigned by the system. - schema: The schema to use. If not provided, it will be inferred from the input. grouping_name: The name of the group associated with each key. This is used to ensure implicit joins are only performed between data grouped by the same entity. time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`. If not specified (and not specified in the data), nanosecond will be assumed. """ - if schema is None: - schema = pa.Table.from_pylist(rows).schema super().__init__( - retained=retained, + queryable=queryable, schema=schema, time_column=time_column, key_column=key_column, @@ -109,35 +144,120 @@ def __init__( ) self._convert_options = pyarrow.csv.ConvertOptions(column_types=schema) - self.add_rows(rows) - def add_rows(self, rows: dict | list[dict]) -> None: + @staticmethod + async def create( + rows: Optional[dict | list[dict]] = None, + *, + time_column: str, + key_column: str, + queryable: bool = True, + subsort_column: Optional[str] = None, + schema: Optional[pa.Schema] = None, + grouping_name: Optional[str] = None, + time_unit: Optional[TimeUnit] = None, + ) -> PyDict: + """Create a source reading from rows represented as dicts. + + Args: + time_column: The name of the column containing the time. + key_column: The name of the column containing the key. + queryable: Whether added rows will be available for running queries. + If True, rows (both provided to the constructor and added later) will be available + for interactive queries. If False, rows will be discarded after being sent to any + running materializations. Consider setting this to False when the source will only + be used for materialization to avoid unnecessary memory usage. + subsort_column: The name of the column containing the subsort. + If not provided, the subsort will be assigned by the system. + schema: The schema to use. If not provided, it will be inferred from the input. + grouping_name: The name of the group associated with each key. + This is used to ensure implicit joins are only performed between data grouped + by the same entity. + time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`. + If not specified (and not specified in the data), nanosecond will be assumed. + """ + if schema is None: + if rows is None: + raise ValueError("Must provide schema or rows") + schema = pa.Table.from_pylist(rows).schema + source = PyDict( + time_column=time_column, + key_column=key_column, + queryable=queryable, + subsort_column=subsort_column, + schema=schema, + grouping_name=grouping_name, + time_unit=time_unit, + ) + if rows: + await source.add_rows(rows) + return source + + async def add_rows(self, rows: dict | list[dict]) -> None: """Add data to the source.""" if isinstance(rows, dict): rows = [rows] table = pa.Table.from_pylist(rows, schema=self._schema) for batch in table.to_batches(): - self._ffi_table.add_pyarrow(batch) + await self._ffi_table.add_pyarrow(batch) # TODO: We should be able to go straight from CSV to PyArrow, but -# currently that has some problems with timestamp hadling. +# currently that has some problems with timestamp handling. class CsvString(Source): """Source reading data from CSV strings using Pandas.""" def __init__( self, - csv_string: str | BytesIO, *, time_column: str, key_column: str, + schema: pa.Schema, subsort_column: Optional[str] = None, - schema: Optional[pa.Schema] = None, grouping_name: Optional[str] = None, time_unit: Optional[TimeUnit] = None, ) -> None: """Create a CSV String Source. + Args: + time_column: The name of the column containing the time. + key_column: The name of the column containing the key. + schema: The schema to use. + subsort_column: The name of the column containing the subsort. + If not provided, the subsort will be assigned by the system. + grouping_name: The name of the group associated with each key. + This is used to ensure implicit joins are only performed between data grouped + by the same entity. + time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`. + If not specified (and not specified in the data), nanosecond will be assumed. + """ + super().__init__( + schema=schema, + time_column=time_column, + key_column=key_column, + subsort_column=subsort_column, + grouping_name=grouping_name, + time_unit=time_unit, + ) + + self._convert_options = pyarrow.csv.ConvertOptions( + column_types=schema, + strings_can_be_null=True, + ) + + @staticmethod + async def create( + csv_string: Optional[str | BytesIO] = None, + *, + time_column: str, + key_column: str, + subsort_column: Optional[str] = None, + schema: Optional[pa.Schema] = None, + grouping_name: Optional[str] = None, + time_unit: Optional[TimeUnit] = None, + ) -> CsvString: + """Create a CSV String Source with data. + Args: csv_string: The CSV string to start from. time_column: The name of the column containing the time. @@ -154,9 +274,12 @@ def __init__( if isinstance(csv_string, str): csv_string = BytesIO(csv_string.encode("utf-8")) if schema is None: + if csv_string is None: + raise ValueError("Must provide schema or csv_string") schema = pa.csv.read_csv(csv_string).schema csv_string.seek(0) - super().__init__( + + source = CsvString( schema=schema, time_column=time_column, key_column=key_column, @@ -165,19 +288,17 @@ def __init__( time_unit=time_unit, ) - self._convert_options = pyarrow.csv.ConvertOptions( - column_types=schema, - strings_can_be_null=True, - ) - self.add_string(csv_string) + if csv_string: + await source.add_string(csv_string) + return source - def add_string(self, csv_string: str | BytesIO) -> None: + async def add_string(self, csv_string: str | BytesIO) -> None: """Add data to the source.""" if isinstance(csv_string, str): csv_string = BytesIO(csv_string.encode("utf-8")) content = pa.csv.read_csv(csv_string, convert_options=self._convert_options) for batch in content.to_batches(): - self._ffi_table.add_pyarrow(batch) + await self._ffi_table.add_pyarrow(batch) class JsonlString(Source): @@ -185,17 +306,51 @@ class JsonlString(Source): def __init__( self, - json_string: str | BytesIO, *, time_column: str, key_column: str, + schema: pa.Schema, subsort_column: Optional[str] = None, - schema: Optional[pa.Schema] = None, grouping_name: Optional[str] = None, time_unit: Optional[TimeUnit] = None, ) -> None: """Create a JSON String Source. + Args: + time_column: The name of the column containing the time. + key_column: The name of the column containing the key. + schema: The schema to use. + subsort_column: The name of the column containing the subsort. + If not provided, the subsort will be assigned by the system. + grouping_name: The name of the group associated with each key. + This is used to ensure implicit joins are only performed between data grouped + by the same entity. + time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`. + If not specified (and not specified in the data), nanosecond will be assumed. + """ + super().__init__( + schema=schema, + time_column=time_column, + key_column=key_column, + subsort_column=subsort_column, + grouping_name=grouping_name, + time_unit=time_unit, + ) + self._parse_options = pyarrow.json.ParseOptions(explicit_schema=schema) + + @staticmethod + async def create( + json_string: Optional[str | BytesIO] = None, + *, + time_column: str, + key_column: str, + subsort_column: Optional[str] = None, + schema: Optional[pa.Schema] = None, + grouping_name: Optional[str] = None, + time_unit: Optional[TimeUnit] = None, + ) -> JsonlString: + """Create a source reading from JSON strings. + Args: json_string: The line-delimited JSON string to start from. time_column: The name of the column containing the time. @@ -212,27 +367,31 @@ def __init__( if isinstance(json_string, str): json_string = BytesIO(json_string.encode("utf-8")) if schema is None: + if json_string is None: + raise ValueError("Must provide schema or JSON") schema = pa.json.read_json(json_string).schema json_string.seek(0) - super().__init__( - schema=schema, + + source = JsonlString( time_column=time_column, key_column=key_column, subsort_column=subsort_column, + schema=schema, grouping_name=grouping_name, time_unit=time_unit, ) - self._parse_options = pyarrow.json.ParseOptions(explicit_schema=schema) - self.add_string(json_string) + if json_string: + await source.add_string(json_string) + return source - def add_string(self, json_string: str | BytesIO) -> None: + async def add_string(self, json_string: str | BytesIO) -> None: """Add data to the source.""" if isinstance(json_string, str): json_string = BytesIO(json_string.encode("utf-8")) batches = pa.json.read_json(json_string, parse_options=self._parse_options) for batch in batches.to_batches(): - self._ffi_table.add_pyarrow(batch) + await self._ffi_table.add_pyarrow(batch) class Parquet(Source): @@ -240,7 +399,6 @@ class Parquet(Source): def __init__( self, - path: str, *, time_column: str, key_column: str, @@ -252,13 +410,47 @@ def __init__( """Create a Parquet source. Args: - path: The path to the Parquet file to add. dataframe: The DataFrame to start from. time_column: The name of the column containing the time. key_column: The name of the column containing the key. subsort_column: The name of the column containing the subsort. If not provided, the subsort will be assigned by the system. + schema: The schema to use. + grouping_name: The name of the group associated with each key. + This is used to ensure implicit joins are only performed between data grouped + by the same entity. + time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`. + If not specified (and not specified in the data), nanosecond will be assumed. + """ + super().__init__( + schema=schema, + time_column=time_column, + key_column=key_column, + subsort_column=subsort_column, + grouping_name=grouping_name, + time_unit=time_unit, + ) + + @staticmethod + async def create( + path: Optional[str] = None, + *, + time_column: str, + key_column: str, + schema: Optional[pa.Schema] = None, + subsort_column: Optional[str] = None, + grouping_name: Optional[str] = None, + time_unit: Optional[TimeUnit] = None, + ) -> Parquet: + """Create a Parquet source. + + Args: + path: The path to the Parquet file to add. + time_column: The name of the column containing the time. + key_column: The name of the column containing the key. schema: The schema to use. If not provided, it will be inferred from the input. + subsort_column: The name of the column containing the subsort. + If not provided, the subsort will be assigned by the system. grouping_name: The name of the group associated with each key. This is used to ensure implicit joins are only performed between data grouped by the same entity. @@ -266,8 +458,11 @@ def __init__( If not specified (and not specified in the data), nanosecond will be assumed. """ if schema is None: + if path is None: + raise ValueError("Must provide schema or path to parquet file") schema = pa.parquet.read_schema(path) - super().__init__( + + source = Parquet( schema=schema, time_column=time_column, key_column=key_column, @@ -276,13 +471,15 @@ def __init__( time_unit=time_unit, ) - self.add_file(path) + if path: + await source.add_file(path) + return source - def add_file(self, path: str) -> None: + async def add_file(self, path: str) -> None: """Add data to the source.""" table = pa.parquet.read_table( path, schema=self._schema, ) for batch in table.to_batches(): - self._ffi_table.add_pyarrow(batch) + await self._ffi_table.add_pyarrow(batch) diff --git a/python/pysrc/kaskada/sources/source.py b/python/pysrc/kaskada/sources/source.py index 2ceddfb69..6f36bc519 100644 --- a/python/pysrc/kaskada/sources/source.py +++ b/python/pysrc/kaskada/sources/source.py @@ -25,7 +25,7 @@ def __init__( schema: pa.Schema, time_column: str, key_column: str, - retained: bool = True, + queryable: bool = True, subsort_column: Optional[str] = None, grouping_name: Optional[str] = None, time_unit: Optional[TimeUnit] = None, @@ -63,12 +63,12 @@ def fix_field(field: pa.Field) -> pa.Field: time_column, key_column, schema, - retained, + queryable, subsort_column, grouping_name, time_unit, ) - super().__init__(ffi_table) + super().__init__(ffi_table.expr()) self._schema = schema self._ffi_table = ffi_table diff --git a/python/pytests/aggregation/count_if_test.py b/python/pytests/aggregation/count_if_test.py index f3ef0a7dc..056568162 100644 --- a/python/pytests/aggregation/count_if_test.py +++ b/python/pytests/aggregation/count_if_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def count_if_source() -> kd.sources.CsvString: +async def count_if_source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n,is_valid", @@ -17,10 +17,12 @@ def count_if_source() -> kd.sources.CsvString: "1996-12-19T16:40:04,B,30,1,true", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_count_if_unwindowed(count_if_source, golden) -> None: +async def test_count_if_unwindowed(count_if_source, golden) -> None: is_valid = count_if_source.col("is_valid") m = count_if_source.col("m") golden.jsonl( @@ -34,7 +36,7 @@ def test_count_if_unwindowed(count_if_source, golden) -> None: ) -def test_count_if_windowed(count_if_source, golden) -> None: +async def test_count_if_windowed(count_if_source, golden) -> None: is_valid = count_if_source.col("is_valid") m = count_if_source.col("m") golden.jsonl( @@ -48,7 +50,7 @@ def test_count_if_windowed(count_if_source, golden) -> None: ) -def test_count_if_since_true(count_if_source, golden) -> None: +async def test_count_if_since_true(count_if_source, golden) -> None: is_valid = count_if_source.col("is_valid") m = count_if_source.col("m") golden.jsonl( diff --git a/python/pytests/aggregation/count_test.py b/python/pytests/aggregation/count_test.py index 4962f9253..efef29fea 100644 --- a/python/pytests/aggregation/count_test.py +++ b/python/pytests/aggregation/count_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -15,10 +15,12 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_count_unwindowed(source, golden) -> None: +async def test_count_unwindowed(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl( @@ -26,7 +28,7 @@ def test_count_unwindowed(source, golden) -> None: ) -def test_count_windowed(source, golden) -> None: +async def test_count_windowed(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl( @@ -41,7 +43,7 @@ def test_count_windowed(source, golden) -> None: ) -def test_count_since_true(source, golden) -> None: +async def test_count_since_true(source, golden) -> None: # `since(True)` should be the same as unwindowed, so equals to one whenever the value is non-null m_sum_since_true = kd.record( { diff --git a/python/pytests/aggregation/max_test.py b/python/pytests/aggregation/max_test.py index cb12824af..b21b21235 100644 --- a/python/pytests/aggregation/max_test.py +++ b/python/pytests/aggregation/max_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -15,16 +15,18 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_max_unwindowed(source, golden) -> None: +async def test_max_unwindowed(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl(kd.record({"m": m, "max_m": m.max(), "n": n, "max_n": n.max()})) -def test_max_windowed(source, golden) -> None: +async def test_max_windowed(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl( @@ -39,7 +41,7 @@ def test_max_windowed(source, golden) -> None: ) -def test_max_since_true(source, golden) -> None: +async def test_max_since_true(source, golden) -> None: # `since(True)` should be the same as unwindowed, so equals the original vaule. m_max_since_true = kd.record( { diff --git a/python/pytests/aggregation/mean_test.py b/python/pytests/aggregation/mean_test.py index 87e267743..a0e985b2a 100644 --- a/python/pytests/aggregation/mean_test.py +++ b/python/pytests/aggregation/mean_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -15,16 +15,18 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_mean_unwindowed(source, golden) -> None: +async def test_mean_unwindowed(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl(kd.record({"m": m, "mean_m": m.mean(), "n": n, "mean_n": n.mean()})) -def test_mean_windowed(source, golden) -> None: +async def test_mean_windowed(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl( @@ -39,7 +41,7 @@ def test_mean_windowed(source, golden) -> None: ) -def test_mean_since_true(source, golden) -> None: +async def test_mean_since_true(source, golden) -> None: # `since(True)` should be the same as unwindowed, so equals the original vaule. m_mean_since_true = kd.record( { diff --git a/python/pytests/aggregation/min_test.py b/python/pytests/aggregation/min_test.py index 812860c3d..724f99a38 100644 --- a/python/pytests/aggregation/min_test.py +++ b/python/pytests/aggregation/min_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -15,16 +15,18 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_min_unwindowed(source, golden) -> None: +async def test_min_unwindowed(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl(kd.record({"m": m, "min_m": m.min(), "n": n, "min_n": n.min()})) -def test_min_windowed(source, golden) -> None: +async def test_min_windowed(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl( @@ -39,7 +41,7 @@ def test_min_windowed(source, golden) -> None: ) -def test_min_since_true(source, golden) -> None: +async def test_min_since_true(source, golden) -> None: # `since(True)` should be the same as unwindowed, so equals the original vaule. m_min_since_true = kd.record( { diff --git a/python/pytests/aggregation/stddev_test.py b/python/pytests/aggregation/stddev_test.py index 973d165c8..3deaaac60 100644 --- a/python/pytests/aggregation/stddev_test.py +++ b/python/pytests/aggregation/stddev_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -15,10 +15,12 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_stddev_unwindowed(source, golden) -> None: +async def test_stddev_unwindowed(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl( @@ -26,7 +28,7 @@ def test_stddev_unwindowed(source, golden) -> None: ) -def test_stddev_windowed(source, golden) -> None: +async def test_stddev_windowed(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl( @@ -41,7 +43,7 @@ def test_stddev_windowed(source, golden) -> None: ) -def test_stddev_since_true(source, golden) -> None: +async def test_stddev_since_true(source, golden) -> None: # `since(True)` should be the same as unwindowed, so equals the original vaule. m_stddev_since_true = kd.record( { diff --git a/python/pytests/aggregation/sum_test.py b/python/pytests/aggregation/sum_test.py index 5d4f42891..7268f5613 100644 --- a/python/pytests/aggregation/sum_test.py +++ b/python/pytests/aggregation/sum_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -15,13 +15,15 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) @pytest.fixture(scope="module") -def source_spread_across_days() -> kd.sources.PyDict: - return kd.sources.PyDict( - rows=[ +async def source_spread_across_days() -> kd.sources.PyDict: + return await kd.sources.PyDict.create( + [ {"time": "2021-01-01T00:00:00", "key": "A", "m": 1, "n": 2}, {"time": "2021-01-01T01:10:01", "key": "A", "m": 3, "n": 4}, {"time": "2021-01-01T02:20:02", "key": "A", "m": 5, "n": 6}, @@ -35,13 +37,13 @@ def source_spread_across_days() -> kd.sources.PyDict: ) -def test_sum_unwindowed(source, golden) -> None: +async def test_sum_unwindowed(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl(kd.record({"m": m, "sum_m": m.sum(), "n": n, "sum_n": n.sum()})) -def test_sum_windowed(source, golden) -> None: +async def test_sum_windowed(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl( @@ -56,7 +58,7 @@ def test_sum_windowed(source, golden) -> None: ) -def test_sum_since_true(source, golden) -> None: +async def test_sum_since_true(source, golden) -> None: # `since(True)` should be the same as unwindowed, so equals the original vaule. m_sum_since_true = kd.record( { @@ -67,7 +69,7 @@ def test_sum_since_true(source, golden) -> None: golden.jsonl(m_sum_since_true) -def test_sum_since_hourly(source_spread_across_days, golden) -> None: +async def test_sum_since_hourly(source_spread_across_days, golden) -> None: golden.jsonl( source_spread_across_days.col("m").sum(window=kd.windows.Since.hourly()) ) diff --git a/python/pytests/aggregation/variance_test.py b/python/pytests/aggregation/variance_test.py index 9d9193d57..c831a026a 100644 --- a/python/pytests/aggregation/variance_test.py +++ b/python/pytests/aggregation/variance_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -15,10 +15,12 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_variance_unwindowed(source, golden) -> None: +async def test_variance_unwindowed(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl( @@ -28,7 +30,7 @@ def test_variance_unwindowed(source, golden) -> None: ) -def test_variance_windowed(source, golden) -> None: +async def test_variance_windowed(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl( @@ -43,7 +45,7 @@ def test_variance_windowed(source, golden) -> None: ) -def test_variance_since_true(source, golden) -> None: +async def test_variance_since_true(source, golden) -> None: # `since(True)` should be the same as unwindowed, so equals the original vaule. m_variance_since_true = kd.record( { diff --git a/python/pytests/ceil_test.py b/python/pytests/ceil_test.py index 5c1ae7469..be3a91167 100644 --- a/python/pytests/ceil_test.py +++ b/python/pytests/ceil_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m", @@ -15,9 +15,11 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,1.01", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_ceil(source, golden) -> None: +async def test_ceil(source, golden) -> None: m = source.col("m") golden.jsonl(kd.record({"m": m, "ceil_m": m.ceil()})) diff --git a/python/pytests/clamp_test.py b/python/pytests/clamp_test.py index 09f615e0e..f648f1a45 100644 --- a/python/pytests/clamp_test.py +++ b/python/pytests/clamp_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m", @@ -15,26 +15,28 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,1.01", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_clamp_min_max(source, golden) -> None: +async def test_clamp_min_max(source, golden) -> None: m = source.col("m") golden.jsonl(kd.record({"m": m, "clamped_m": m.clamp(min=5, max=100)})) -def test_clamp_min(source, golden) -> None: +async def test_clamp_min(source, golden) -> None: m = source.col("m") golden.jsonl(kd.record({"m": m, "clamped_min": m.clamp(min=5)})) -def test_clamp_max(source, golden) -> None: +async def test_clamp_max(source, golden) -> None: m = source.col("m") golden.jsonl(kd.record({"m": m, "clamped_min": m.clamp(max=100)})) @pytest.fixture(scope="module") -def banking_source() -> kd.sources.CsvString: +async def banking_source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,current_balance,min_balance,max_balance", @@ -46,10 +48,12 @@ def banking_source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,10.00,11.00,11.01", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_clamp_banking_min_max(banking_source, golden) -> None: +async def test_clamp_banking_min_max(banking_source, golden) -> None: current_balance = banking_source.col("current_balance") min_balance = banking_source.col("min_balance") max_balance = banking_source.col("max_balance") @@ -65,7 +69,7 @@ def test_clamp_banking_min_max(banking_source, golden) -> None: ) -def test_clamp_banking_min(banking_source, golden) -> None: +async def test_clamp_banking_min(banking_source, golden) -> None: current_balance = banking_source.col("current_balance") min_balance = banking_source.col("min_balance") golden.jsonl( @@ -78,7 +82,7 @@ def test_clamp_banking_min(banking_source, golden) -> None: ) -def test_clamp_banking_max(banking_source, golden) -> None: +async def test_clamp_banking_max(banking_source, golden) -> None: current_balance = banking_source.col("current_balance") max_balance = banking_source.col("max_balance") golden.jsonl( diff --git a/python/pytests/coalesce_test.py b/python/pytests/coalesce_test.py index 748170aaa..6a384ecf8 100644 --- a/python/pytests/coalesce_test.py +++ b/python/pytests/coalesce_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n,o", @@ -15,16 +15,18 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,,,15", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_coalesce(source, golden) -> None: +async def test_coalesce(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl(kd.record({"m": m, "n": n, "coalesced_val": m.coalesce(n)})) -def test_coalesce_three(source, golden) -> None: +async def test_coalesce_three(source, golden) -> None: m = source.col("m") n = source.col("n") o = source.col("o") diff --git a/python/pytests/collect_test.py b/python/pytests/collect_test.py index c10a98769..8204fafb3 100644 --- a/python/pytests/collect_test.py +++ b/python/pytests/collect_test.py @@ -5,7 +5,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n,s,b", @@ -18,10 +18,12 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:04,A,,,f,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_collect_basic(source, golden) -> None: +async def test_collect_basic(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl( @@ -36,7 +38,7 @@ def test_collect_basic(source, golden) -> None: ) -def test_collect_with_max(source, golden) -> None: +async def test_collect_with_max(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl( @@ -51,7 +53,7 @@ def test_collect_with_max(source, golden) -> None: ) -def test_collect_with_min(source, golden) -> None: +async def test_collect_with_min(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl( @@ -66,7 +68,7 @@ def test_collect_with_min(source, golden) -> None: ) -def test_collect_with_min_and_max(source, golden) -> None: +async def test_collect_with_min_and_max(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl( @@ -81,7 +83,7 @@ def test_collect_with_min_and_max(source, golden) -> None: ) -def test_collect_since_window(source, golden) -> None: +async def test_collect_since_window(source, golden) -> None: m = source.col("m") golden.jsonl( kd.record( @@ -90,7 +92,7 @@ def test_collect_since_window(source, golden) -> None: ) -def test_collect_i64_trailing_window_1s(source, golden) -> None: +async def test_collect_i64_trailing_window_1s(source, golden) -> None: m = source.col("m") golden.jsonl( kd.record( @@ -104,7 +106,7 @@ def test_collect_i64_trailing_window_1s(source, golden) -> None: ) -def test_collect_i64_trailing_window_3s(source, golden) -> None: +async def test_collect_i64_trailing_window_3s(source, golden) -> None: m = source.col("m") golden.jsonl( kd.record( @@ -118,7 +120,7 @@ def test_collect_i64_trailing_window_3s(source, golden) -> None: ) -def test_collect_i64_trailing_window_3s_with_max(source, golden) -> None: +async def test_collect_i64_trailing_window_3s_with_max(source, golden) -> None: m = source.col("m") golden.jsonl( kd.record( @@ -132,7 +134,7 @@ def test_collect_i64_trailing_window_3s_with_max(source, golden) -> None: ) -def test_collect_i64_trailing_window_3s_with_min(source, golden) -> None: +async def test_collect_i64_trailing_window_3s_with_min(source, golden) -> None: m = source.col("m") golden.jsonl( kd.record( @@ -146,7 +148,7 @@ def test_collect_i64_trailing_window_3s_with_min(source, golden) -> None: ) -def test_collect_string_trailing_window_1s(source, golden) -> None: +async def test_collect_string_trailing_window_1s(source, golden) -> None: s = source.col("s") golden.jsonl( kd.record( @@ -160,7 +162,7 @@ def test_collect_string_trailing_window_1s(source, golden) -> None: ) -def test_collect_string_trailing_window_3s(source, golden) -> None: +async def test_collect_string_trailing_window_3s(source, golden) -> None: s = source.col("s") golden.jsonl( kd.record( @@ -174,7 +176,7 @@ def test_collect_string_trailing_window_3s(source, golden) -> None: ) -def test_collect_string_trailing_window_3s_with_max(source, golden) -> None: +async def test_collect_string_trailing_window_3s_with_max(source, golden) -> None: s = source.col("s") golden.jsonl( kd.record( @@ -188,7 +190,7 @@ def test_collect_string_trailing_window_3s_with_max(source, golden) -> None: ) -def test_collect_string_trailing_window_3s_with_min(source, golden) -> None: +async def test_collect_string_trailing_window_3s_with_min(source, golden) -> None: s = source.col("s") golden.jsonl( kd.record( @@ -202,7 +204,7 @@ def test_collect_string_trailing_window_3s_with_min(source, golden) -> None: ) -def test_collect_bool_trailing_window_1s(source, golden) -> None: +async def test_collect_bool_trailing_window_1s(source, golden) -> None: b = source.col("b") golden.jsonl( kd.record( @@ -216,7 +218,7 @@ def test_collect_bool_trailing_window_1s(source, golden) -> None: ) -def test_collect_bool_trailing_window_3s(source, golden) -> None: +async def test_collect_bool_trailing_window_3s(source, golden) -> None: b = source.col("b") golden.jsonl( kd.record( @@ -230,7 +232,7 @@ def test_collect_bool_trailing_window_3s(source, golden) -> None: ) -def test_collect_bool_trailing_window_3s_with_max(source, golden) -> None: +async def test_collect_bool_trailing_window_3s_with_max(source, golden) -> None: b = source.col("b") golden.jsonl( kd.record( @@ -244,7 +246,7 @@ def test_collect_bool_trailing_window_3s_with_max(source, golden) -> None: ) -def test_collect_bool_trailing_window_3s_with_min(source, golden) -> None: +async def test_collect_bool_trailing_window_3s_with_min(source, golden) -> None: b = source.col("b") golden.jsonl( kd.record( @@ -262,7 +264,7 @@ def test_collect_bool_trailing_window_3s_with_min(source, golden) -> None: # date-time like fields nested within a list. So we expand things out. # # TODO: Improve the golden testing so this isn't necessary. -def test_collect_struct_trailing_window_1s(source, golden) -> None: +async def test_collect_struct_trailing_window_1s(source, golden) -> None: collect = source.collect(max=None, window=kd.windows.Trailing(timedelta(seconds=1))) golden.jsonl( kd.record( @@ -277,7 +279,7 @@ def test_collect_struct_trailing_window_1s(source, golden) -> None: ) -def test_collect_struct_trailing_window_3s(source, golden) -> None: +async def test_collect_struct_trailing_window_3s(source, golden) -> None: collect = source.collect(max=None, window=kd.windows.Trailing(timedelta(seconds=3))) golden.jsonl( kd.record( @@ -292,7 +294,7 @@ def test_collect_struct_trailing_window_3s(source, golden) -> None: ) -def test_collect_struct_trailing_window_3s_with_max(source, golden) -> None: +async def test_collect_struct_trailing_window_3s_with_max(source, golden) -> None: collect = source.collect(max=2, window=kd.windows.Trailing(timedelta(seconds=3))) golden.jsonl( kd.record( @@ -307,7 +309,7 @@ def test_collect_struct_trailing_window_3s_with_max(source, golden) -> None: ) -def test_collect_struct_trailing_window_3s_with_min(source, golden) -> None: +async def test_collect_struct_trailing_window_3s_with_min(source, golden) -> None: collect = source.collect( min=3, max=None, window=kd.windows.Trailing(timedelta(seconds=3)) ) @@ -324,19 +326,19 @@ def test_collect_struct_trailing_window_3s_with_min(source, golden) -> None: ) -def test_collect_records(source, golden) -> None: +async def test_collect_records(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl(kd.record({"m": m, "n": n}).collect(max=None)) -def test_collect_records_field_ref(source, golden) -> None: +async def test_collect_records_field_ref(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl(kd.record({"m": m, "n": n}).collect(max=None).col("m")) -def test_collect_lists(source, golden) -> None: +async def test_collect_lists(source, golden) -> None: m = source.col("m") golden.jsonl( kd.record( diff --git a/python/pytests/conftest.py b/python/pytests/conftest.py index b2bf14f14..8d1513850 100644 --- a/python/pytests/conftest.py +++ b/python/pytests/conftest.py @@ -1,3 +1,4 @@ +import asyncio import os from typing import Union @@ -12,6 +13,15 @@ def session() -> None: init_session() +# sets up a single, session-scoped async event loop. +@pytest.fixture(scope="session") +def event_loop(): + policy = asyncio.get_event_loop_policy() + loop = policy.new_event_loop() + yield loop + loop.close() + + def pytest_addoption(parser: pytest.Parser): parser.addoption("--save-golden", action="store_true", help="update golden files") diff --git a/python/pytests/csv_string_source_test.py b/python/pytests/csv_string_source_test.py index 72677e38c..58edbc263 100644 --- a/python/pytests/csv_string_source_test.py +++ b/python/pytests/csv_string_source_test.py @@ -1,7 +1,9 @@ import kaskada as kd +import pytest -def test_read_csv(golden) -> None: +@pytest.mark.asyncio +async def test_read_csv(golden) -> None: content1 = "\n".join( [ "time,key,m,n", @@ -24,12 +26,12 @@ def test_read_csv(golden) -> None: "1996-12-19T17:40:02,A,,", ] ) - source = kd.sources.CsvString( + source = await kd.sources.CsvString.create( content1, time_column="time", key_column="key", ) golden.jsonl(source) - source.add_string(content2) + await source.add_string(content2) golden.jsonl(source) diff --git a/python/pytests/else_test.py b/python/pytests/else_test.py index ab31dc09e..f30cf6304 100644 --- a/python/pytests/else_test.py +++ b/python/pytests/else_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n,default", @@ -15,10 +15,12 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,,,-1", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_else_(source, golden) -> None: +async def test_else_(source, golden) -> None: m = source.col("m") n = source.col("n") default = source.col("default") @@ -39,17 +41,19 @@ def test_else_(source, golden) -> None: @pytest.fixture(scope="module") -def record_source() -> kd.sources.JsonlString: +async def record_source() -> kd.sources.JsonlString: content = "\n".join( [ """{"time":"1996-12-19T16:39:57","key":"A","override": {"test":"override_val"}}""", """{"time":"1996-12-19T16:39:58","key":"A","default_record":{"test":"default"}}""", ] ) - return kd.sources.JsonlString(content, time_column="time", key_column="key") + return await kd.sources.JsonlString.create( + content, time_column="time", key_column="key" + ) -def test_else_debug(record_source, golden) -> None: +async def test_else_debug(record_source, golden) -> None: default_record = record_source.col("default_record") override_column = record_source.col("override") golden.jsonl( diff --git a/python/pytests/execution_test.py b/python/pytests/execution_test.py index ccc2cd1ab..afa53e4de 100644 --- a/python/pytests/execution_test.py +++ b/python/pytests/execution_test.py @@ -7,7 +7,7 @@ @pytest.fixture -def source_int64() -> kd.sources.CsvString: +async def source_int64() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -19,10 +19,12 @@ def source_int64() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_iter_pandas(golden, source_int64) -> None: +async def test_iter_pandas(golden, source_int64) -> None: batches = source_int64.run_iter(row_limit=4, max_batch_size=2) # 4 rows, max 2 per batch = 2 batches @@ -32,7 +34,7 @@ def test_iter_pandas(golden, source_int64) -> None: next(batches) -def test_iter_rows(golden, source_int64) -> None: +async def test_iter_rows(golden, source_int64) -> None: results: Iterator[dict] = source_int64.run_iter("row", row_limit=2) assert next(results)["m"] == 5 assert next(results)["m"] == 24 @@ -40,7 +42,6 @@ def test_iter_rows(golden, source_int64) -> None: next(results) -@pytest.mark.asyncio async def test_iter_pandas_async(golden, source_int64) -> None: batches: AsyncIterator[pd.DataFrame] = source_int64.run_iter( row_limit=4, max_batch_size=2 @@ -57,7 +58,6 @@ async def test_iter_pandas_async(golden, source_int64) -> None: await batches.__anext__() -@pytest.mark.asyncio async def test_iter_pandas_async_live(golden, source_int64) -> None: data2 = "\n".join( [ @@ -77,7 +77,7 @@ async def test_iter_pandas_async_live(golden, source_int64) -> None: golden.jsonl(await execution.__anext__()) # Add data and await the second batch. - source_int64.add_string(data2) + await source_int64.add_string(data2) golden.jsonl(await execution.__anext__()) execution.stop() @@ -85,7 +85,7 @@ async def test_iter_pandas_async_live(golden, source_int64) -> None: print(await execution.__anext__()) -def test_snapshot(golden, source_int64) -> None: +async def test_snapshot(golden, source_int64) -> None: query = source_int64.col("m").sum() golden.jsonl(query.to_pandas(kd.results.Snapshot())) golden.jsonl( @@ -102,7 +102,7 @@ def test_snapshot(golden, source_int64) -> None: ) -def test_history(golden, source_int64) -> None: +async def test_history(golden, source_int64) -> None: query = source_int64.col("m").sum() golden.jsonl(query.to_pandas(kd.results.History())) golden.jsonl( diff --git a/python/pytests/exp_test.py b/python/pytests/exp_test.py index 0d8b82604..704e62e12 100644 --- a/python/pytests/exp_test.py +++ b/python/pytests/exp_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m", @@ -15,9 +15,11 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,5", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_exp(source, golden) -> None: +async def test_exp(source, golden) -> None: m = source.col("m") golden.jsonl(kd.record({"m": m, "exp_m": m.exp()})) diff --git a/python/pytests/filter_test.py b/python/pytests/filter_test.py index f56754b74..eb22b2912 100644 --- a/python/pytests/filter_test.py +++ b/python/pytests/filter_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -15,10 +15,12 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_filter(source, golden) -> None: +async def test_filter(source, golden) -> None: m = source.col("m") n = source.col("n") condition_m = m > 15 diff --git a/python/pytests/flatten_test.py b/python/pytests/flatten_test.py index ed339de5a..a0b1aa9e2 100644 --- a/python/pytests/flatten_test.py +++ b/python/pytests/flatten_test.py @@ -1,9 +1,9 @@ import kaskada as kd -def test_flatten(golden) -> None: - source = kd.sources.PyDict( - [ +async def test_flatten(golden) -> None: + source = await kd.sources.PyDict.create( + rows=[ {"time": "1996-12-19T16:39:57", "user": "A", "m": [[5]]}, {"time": "1996-12-19T17:39:57", "user": "A", "m": []}, {"time": "1996-12-19T18:39:57", "user": "A", "m": [None]}, diff --git a/python/pytests/floor_test.py b/python/pytests/floor_test.py index 41542ec46..aa059f200 100644 --- a/python/pytests/floor_test.py +++ b/python/pytests/floor_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m", @@ -15,9 +15,11 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,1.01", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_floor(source, golden) -> None: +async def test_floor(source, golden) -> None: m = source.col("m") golden.jsonl(kd.record({"m": m, "floor_m": m.floor()})) diff --git a/python/pytests/greatest_test.py b/python/pytests/greatest_test.py index de0f29116..508ab935d 100644 --- a/python/pytests/greatest_test.py +++ b/python/pytests/greatest_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,a,b", @@ -14,10 +14,12 @@ def source() -> kd.sources.CsvString: "2021-01-05T00:00:00,A,2,5.4", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_greatest(source, golden) -> None: +async def test_greatest(source, golden) -> None: a = source.col("a") b = source.col("b") golden.jsonl(kd.record({"a": a, "b": b, "a_greatest_b": a.greatest(b)})) diff --git a/python/pytests/hash_test.py b/python/pytests/hash_test.py index f8fa2d649..88053e9f6 100644 --- a/python/pytests/hash_test.py +++ b/python/pytests/hash_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def string_source() -> kd.sources.CsvString: +async def string_source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -15,16 +15,18 @@ def string_source() -> kd.sources.CsvString: "2021-01-04T00:00:00,Ryan,earth,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_hash_string(string_source, golden) -> None: +async def test_hash_string(string_source, golden) -> None: m = string_source.col("m") golden.jsonl(kd.record({"m": m, "hash_m": m.hash()})) @pytest.fixture(scope="module") -def integer_source() -> kd.sources.CsvString: +async def integer_source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -36,9 +38,11 @@ def integer_source() -> kd.sources.CsvString: "2021-01-04T00:00:00,Ryan,9,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_hash_integer(integer_source, golden) -> None: +async def test_hash_integer(integer_source, golden) -> None: m = integer_source.col("m") golden.jsonl(kd.record({"m": m, "hash_m": m.hash()})) diff --git a/python/pytests/if_test.py b/python/pytests/if_test.py index 0e50f8dd4..da21cc360 100644 --- a/python/pytests/if_test.py +++ b/python/pytests/if_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -15,10 +15,12 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_if_(source, golden) -> None: +async def test_if_(source, golden) -> None: m = source.col("m") n = source.col("n") condition_m = m > 15 diff --git a/python/pytests/jsonl_string_source_test.py b/python/pytests/jsonl_string_source_test.py index ec636c5fa..1ff1773d8 100644 --- a/python/pytests/jsonl_string_source_test.py +++ b/python/pytests/jsonl_string_source_test.py @@ -1,15 +1,15 @@ import kaskada as kd -def test_read_jsonl(golden) -> None: - source = kd.sources.JsonlString( +async def test_read_jsonl(golden) -> None: + source = await kd.sources.JsonlString.create( '{"time": "1996-12-19T16:39:57", "user": "A", "m": 5, "n": 10}', time_column="time", key_column="user", ) golden.jsonl(source) - source.add_string( + await source.add_string( """ {"time": "1996-12-19T16:40:57", "user": "A", "m": 8, "n": 10} {"time": "1996-12-19T16:41:57", "user": "B", "m": 5} @@ -19,15 +19,15 @@ def test_read_jsonl(golden) -> None: golden.jsonl(source) -def test_read_jsonl_lists(golden) -> None: - source = kd.sources.JsonlString( +async def test_read_jsonl_lists(golden) -> None: + source = await kd.sources.JsonlString.create( '{"time": "1996-12-19T16:39:57", "user": "A", "m": [5, 10], "n": 10}', time_column="time", key_column="user", ) golden.jsonl(source) - source.add_string( + await source.add_string( """ {"time": "1996-12-19T16:40:57", "user": "A", "m": [], "n": 10} {"time": "1996-12-19T16:41:57", "user": "A", "n": 10} diff --git a/python/pytests/lag_test.py b/python/pytests/lag_test.py index 026dcd984..d3be76b2c 100644 --- a/python/pytests/lag_test.py +++ b/python/pytests/lag_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -15,10 +15,12 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_lag(source, golden) -> None: +async def test_lag(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl( @@ -35,11 +37,11 @@ def test_lag(source, golden) -> None: ) -def test_lag_struct(source, golden) -> None: +async def test_lag_struct(source, golden) -> None: golden.jsonl(source.lag(1)) -def test_lag_list(source, golden) -> None: +async def test_lag_list(source, golden) -> None: m = source.col("m") golden.jsonl( kd.record( diff --git a/python/pytests/least_test.py b/python/pytests/least_test.py index 58eda9161..1ce59f5f3 100644 --- a/python/pytests/least_test.py +++ b/python/pytests/least_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,a,b", @@ -14,10 +14,12 @@ def source() -> kd.sources.CsvString: "2021-01-05T00:00:00,A,2,5.4", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_least(source, golden) -> None: +async def test_least(source, golden) -> None: a = source.col("a") b = source.col("b") golden.jsonl(kd.record({"a": a, "b": b, "a_least_b": a.least(b)})) diff --git a/python/pytests/length_test.py b/python/pytests/length_test.py index 15abf0c13..fed81a472 100644 --- a/python/pytests/length_test.py +++ b/python/pytests/length_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n,str", @@ -15,10 +15,12 @@ def source() -> kd.sources.CsvString: '1996-12-19T16:40:02,A,,,"fig"', ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_length(source, golden) -> None: +async def test_length(source, golden) -> None: my_str = source.col("str") list = my_str.collect(max=None) golden.jsonl( diff --git a/python/pytests/lookup_test.py b/python/pytests/lookup_test.py index 3aed90cc4..8a2a875a5 100644 --- a/python/pytests/lookup_test.py +++ b/python/pytests/lookup_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def key_source() -> kd.sources.CsvString: +async def key_source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,state", @@ -15,11 +15,13 @@ def key_source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,WA", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) @pytest.fixture(scope="module") -def foreign_source() -> kd.sources.CsvString: +async def foreign_source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -31,10 +33,12 @@ def foreign_source() -> kd.sources.CsvString: "1996-12-19T16:40:02,WA,,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_lookup(key_source, foreign_source, golden) -> None: +async def test_lookup(key_source, foreign_source, golden) -> None: state = key_source.col("state") foreign_value = foreign_source.col("m") last_foreign_value = foreign_source.col("m").last() diff --git a/python/pytests/lower_test.py b/python/pytests/lower_test.py index 41dc3fbd6..5b102bc4a 100644 --- a/python/pytests/lower_test.py +++ b/python/pytests/lower_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m", @@ -15,9 +15,11 @@ def source() -> kd.sources.CsvString: "2021-01-04T00:00:00,Ryan,hi", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_lower(source, golden) -> None: +async def test_lower(source, golden) -> None: m = source.col("m") golden.jsonl(kd.record({"m": m, "lower_m": m.lower()})) diff --git a/python/pytests/math_test.py b/python/pytests/math_test.py index abce613f7..111a66601 100644 --- a/python/pytests/math_test.py +++ b/python/pytests/math_test.py @@ -3,7 +3,7 @@ @pytest.fixture -def source_int64() -> kd.sources.CsvString: +async def source_int64() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -15,10 +15,12 @@ def source_int64() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_math_int64(golden, source_int64) -> None: +async def test_math_int64(golden, source_int64) -> None: m = source_int64.col("m") n = source_int64.col("n") golden.jsonl( diff --git a/python/pytests/null_test.py b/python/pytests/null_test.py index d3f9887df..cd02d836d 100644 --- a/python/pytests/null_test.py +++ b/python/pytests/null_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -15,10 +15,12 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_null_if(source, golden) -> None: +async def test_null_if(source, golden) -> None: m = source.col("m") n = source.col("n") condition_m = m > 15 @@ -37,7 +39,7 @@ def test_null_if(source, golden) -> None: ) -def test_is_null(source, golden) -> None: +async def test_is_null(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl( diff --git a/python/pytests/pandas_source_test.py b/python/pytests/pandas_source_test.py index 2e3606d97..afcc3abdb 100644 --- a/python/pytests/pandas_source_test.py +++ b/python/pytests/pandas_source_test.py @@ -2,31 +2,37 @@ import pandas as pd -def test_float_seconds(golden) -> None: +async def test_float_seconds(golden) -> None: data = {"time": [1671477472.026119], "user": ["tom"]} df = pd.DataFrame(data) - table = kd.sources.Pandas(df, time_column="time", key_column="user", time_unit="s") + table = await kd.sources.Pandas.create( + df, time_column="time", key_column="user", time_unit="s" + ) golden.jsonl(table) -def test_float_milliseconds(golden) -> None: +async def test_float_milliseconds(golden) -> None: data = {"time": [1671477472026.119], "user": ["tom"]} df = pd.DataFrame(data) - table = kd.sources.Pandas(df, time_column="time", key_column="user", time_unit="ms") + table = await kd.sources.Pandas.create( + df, time_column="time", key_column="user", time_unit="ms" + ) golden.jsonl(table) -def test_float_nanoseconds(golden) -> None: +async def test_float_nanoseconds(golden) -> None: data = {"time": [1671477472026119000], "user": ["tom"]} df = pd.DataFrame(data) - table = kd.sources.Pandas(df, time_column="time", key_column="user") + table = await kd.sources.Pandas.create( + df, time_column="time", key_column="user", time_unit="ns" + ) golden.jsonl(table) -def test_add_dataframe(golden) -> None: +async def test_add_dataframe(golden) -> None: df1 = pd.DataFrame( { "time": [1000000000000], @@ -34,7 +40,7 @@ def test_add_dataframe(golden) -> None: } ) - table = kd.sources.Pandas(df1, time_column="time", key_column="key") + table = await kd.sources.Pandas.create(df1, time_column="time", key_column="key") golden.jsonl(table) df2 = pd.DataFrame( @@ -43,5 +49,5 @@ def test_add_dataframe(golden) -> None: "key": ["a"], } ) - table.add_data(df2) + await table.add_data(df2) golden.jsonl(table) diff --git a/python/pytests/parquet_source_test.py b/python/pytests/parquet_source_test.py index 479f55da0..fa736261c 100644 --- a/python/pytests/parquet_source_test.py +++ b/python/pytests/parquet_source_test.py @@ -1,13 +1,13 @@ import kaskada as kd -def test_read_parquet(golden) -> None: - source = kd.sources.Parquet( +async def test_read_parquet(golden) -> None: + source = await kd.sources.Parquet.create( "../testdata/purchases/purchases_part1.parquet", time_column="purchase_time", key_column="customer_id", ) golden.jsonl(source) - source.add_file("../testdata/purchases/purchases_part2.parquet") + await source.add_file("../testdata/purchases/purchases_part2.parquet") golden.jsonl(source) diff --git a/python/pytests/powf_test.py b/python/pytests/powf_test.py index f50441ff8..e8fa9048c 100644 --- a/python/pytests/powf_test.py +++ b/python/pytests/powf_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -15,10 +15,12 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,,6", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_powf_unwindowed(source, golden) -> None: +async def test_powf_unwindowed(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl(kd.record({"m": m, "powf": m.powf(n)})) diff --git a/python/pytests/pydict_source_test.py b/python/pytests/pydict_source_test.py index 2ed59b63a..42303898e 100644 --- a/python/pytests/pydict_source_test.py +++ b/python/pytests/pydict_source_test.py @@ -3,15 +3,15 @@ import pytest -def test_read_pydict(golden) -> None: - source = kd.sources.PyDict( - [{"time": "1996-12-19T16:39:57", "user": "A", "m": 5, "n": 10}], +async def test_read_pydict(golden) -> None: + source = await kd.sources.PyDict.create( + rows=[{"time": "1996-12-19T16:39:57", "user": "A", "m": 5, "n": 10}], time_column="time", key_column="user", ) golden.jsonl(source) - source.add_rows( + await source.add_rows( [ {"time": "1996-12-19T16:40:57", "user": "A", "m": 8, "n": 10}, {"time": "1996-12-19T16:41:57", "user": "B", "m": 5}, @@ -19,19 +19,19 @@ def test_read_pydict(golden) -> None: ) golden.jsonl(source) - source.add_rows({"time": "1996-12-19T16:42:57", "user": "A", "m": 8, "n": 10}) + await source.add_rows({"time": "1996-12-19T16:42:57", "user": "A", "m": 8, "n": 10}) golden.jsonl(source) -def test_read_pydict_lists(golden) -> None: - source = kd.sources.PyDict( - [{"time": "1996-12-19T16:39:57", "user": "A", "m": [5, 10], "n": 10}], +async def test_read_pydict_lists(golden) -> None: + source = await kd.sources.PyDict.create( + rows=[{"time": "1996-12-19T16:39:57", "user": "A", "m": [5, 10], "n": 10}], time_column="time", key_column="user", ) golden.jsonl(source) - source.add_rows( + await source.add_rows( [ {"time": "1996-12-19T16:40:57", "user": "A", "m": [], "n": 10}, {"time": "1996-12-19T16:41:57", "user": "A", "n": 10}, @@ -40,16 +40,16 @@ def test_read_pydict_lists(golden) -> None: golden.jsonl(source) -def test_read_pydict_ignore_column(golden) -> None: +async def test_read_pydict_ignore_column(golden) -> None: # Schema is determined from first row, and doesn't contain an "m" column. - source = kd.sources.PyDict( - [{"time": "1996-12-19T16:39:57", "user": "A", "n": 10}], + source = await kd.sources.PyDict.create( + rows=[{"time": "1996-12-19T16:39:57", "user": "A", "n": 10}], time_column="time", key_column="user", ) golden.jsonl(source) - source.add_rows( + await source.add_rows( [ {"time": "1996-12-19T16:40:57", "user": "A", "m": 83, "n": 10}, {"time": "1996-12-19T16:41:57", "user": "A", "m": 12}, @@ -58,9 +58,8 @@ def test_read_pydict_ignore_column(golden) -> None: golden.jsonl(source) -def test_read_pydict_empty() -> None: +async def test_read_pydict_empty() -> None: source = kd.sources.PyDict( - rows=[], schema=pa.schema( [ ("time", pa.timestamp("ns")), @@ -79,10 +78,8 @@ def test_read_pydict_empty() -> None: assert result["x"].dtype == "int64" -@pytest.mark.asyncio async def test_read_pydict_empty_live(golden) -> None: source = kd.sources.PyDict( - rows=[], schema=pa.schema( [ ("time", pa.string()), @@ -95,7 +92,7 @@ async def test_read_pydict_empty_live(golden) -> None: ) execution = source.run_iter(mode="live") - source.add_rows({"time": "1996-12-19T16:39:57Z", "user": "A", "x": 5}) + await source.add_rows({"time": "1996-12-19T16:39:57Z", "user": "A", "x": 5}) golden.jsonl(await execution.__anext__()) execution.stop() diff --git a/python/pytests/record_test.py b/python/pytests/record_test.py index d6d80b804..255b99ac7 100644 --- a/python/pytests/record_test.py +++ b/python/pytests/record_test.py @@ -3,7 +3,7 @@ @pytest.fixture -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -15,10 +15,12 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_record(source, golden) -> None: +async def test_record(source, golden) -> None: m = source.col("m") n = source.col("n") @@ -32,29 +34,29 @@ def test_record(source, golden) -> None: ) -def test_extend_dict(source, golden) -> None: +async def test_extend_dict(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl(source.extend({"add": m + n})) -def test_extend_record(source, golden) -> None: +async def test_extend_record(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl(source.extend(kd.record({"add": m + n}))) -def test_extend_computed_record(source, golden) -> None: +async def test_extend_computed_record(source, golden) -> None: golden.jsonl(source.extend(lambda i: kd.record({"add": i.col("m") + i.col("n")}))) -def test_extend_computed_dict(source, golden) -> None: +async def test_extend_computed_dict(source, golden) -> None: golden.jsonl(source.extend(lambda input: {"add": input.col("m") + input.col("n")})) -def test_select_record(source, golden) -> None: +async def test_select_record(source, golden) -> None: golden.jsonl(source.select("n")) -def test_remove_record(source, golden) -> None: +async def test_remove_record(source, golden) -> None: golden.jsonl(source.remove("n")) diff --git a/python/pytests/round_test.py b/python/pytests/round_test.py index e55dcb256..0b0e5b3d2 100644 --- a/python/pytests/round_test.py +++ b/python/pytests/round_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m", @@ -12,9 +12,11 @@ def source() -> kd.sources.CsvString: "2021-01-02T00:00:00,B,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_round(source, golden) -> None: +async def test_round(source, golden) -> None: m = source.col("m") golden.jsonl(kd.record({"m": m, "round_m": m.round()})) diff --git a/python/pytests/seconds_since_previous_test.py b/python/pytests/seconds_since_previous_test.py index 191c3c209..2bb38839a 100644 --- a/python/pytests/seconds_since_previous_test.py +++ b/python/pytests/seconds_since_previous_test.py @@ -4,7 +4,7 @@ @pytest.fixture -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n,t", @@ -16,10 +16,12 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,,,1996-12-19T16:43:02", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_seconds_since_previous(golden, source) -> None: +async def test_seconds_since_previous(golden, source) -> None: t = source.col("time") golden.jsonl( kd.record( diff --git a/python/pytests/seconds_since_test.py b/python/pytests/seconds_since_test.py index ae5ffd96b..316a5cbb5 100644 --- a/python/pytests/seconds_since_test.py +++ b/python/pytests/seconds_since_test.py @@ -6,7 +6,7 @@ @pytest.fixture -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n,t", @@ -18,10 +18,12 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,,,1996-12-19T16:43:02", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_seconds_since(golden, source) -> None: +async def test_seconds_since(golden, source) -> None: t1 = source.col("time") t2 = source.col("t") golden.jsonl( @@ -36,7 +38,7 @@ def test_seconds_since(golden, source) -> None: ) -def test_seconds_since_datetime(golden, source) -> None: +async def test_seconds_since_datetime(golden, source) -> None: t = source.col("time") dt = datetime.datetime(1996, 12, 19, 16, 39, 50, tzinfo=datetime.timezone.utc) golden.jsonl( diff --git a/python/pytests/shift_by_test.py b/python/pytests/shift_by_test.py index 17630218b..95be876fd 100644 --- a/python/pytests/shift_by_test.py +++ b/python/pytests/shift_by_test.py @@ -5,7 +5,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -15,10 +15,12 @@ def source() -> kd.sources.CsvString: "1997-01-18T16:40:00,A,,9", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_shift_by_timedelta(source, golden) -> None: +async def test_shift_by_timedelta(source, golden) -> None: time = source.col("time") golden.jsonl( kd.record( @@ -31,7 +33,7 @@ def test_shift_by_timedelta(source, golden) -> None: ) -def test_shift_collect(source, golden) -> None: +async def test_shift_collect(source, golden) -> None: golden.jsonl( source.record( lambda input: { diff --git a/python/pytests/shift_to_test.py b/python/pytests/shift_to_test.py index 0c8458e86..632ba330b 100644 --- a/python/pytests/shift_to_test.py +++ b/python/pytests/shift_to_test.py @@ -5,7 +5,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -15,11 +15,13 @@ def source() -> kd.sources.CsvString: "1997-01-18T16:40:00,A,,9", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) @pytest.mark.skip(reason="shift to literal not supported") -def test_shift_to_datetime(source, golden) -> None: +async def test_shift_to_datetime(source, golden) -> None: time = source.col("time") shift_to_datetime = datetime(1996, 12, 25, 0, 0, 0) golden.jsonl( @@ -29,7 +31,7 @@ def test_shift_to_datetime(source, golden) -> None: ) -def test_shift_to_column(source, golden) -> None: +async def test_shift_to_column(source, golden) -> None: time = source.col("time") shift_by_timedelta = timedelta(seconds=10) golden.jsonl( diff --git a/python/pytests/shift_until_test.py b/python/pytests/shift_until_test.py index 055ba0c7d..83f59dfdd 100644 --- a/python/pytests/shift_until_test.py +++ b/python/pytests/shift_until_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -15,10 +15,12 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_shift_until_predicate(source, golden) -> None: +async def test_shift_until_predicate(source, golden) -> None: m = source.col("m") predicate = m.sum() > 30 golden.jsonl( diff --git a/python/pytests/sqrt_test.py b/python/pytests/sqrt_test.py index e6d67199c..6935831c0 100644 --- a/python/pytests/sqrt_test.py +++ b/python/pytests/sqrt_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m", @@ -12,9 +12,11 @@ def source() -> kd.sources.CsvString: "2021-01-02T00:00:00,B,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_sqrt(source, golden) -> None: +async def test_sqrt(source, golden) -> None: m = source.col("m") golden.jsonl(kd.record({"m": m, "sqrt_m": m.sqrt()})) diff --git a/python/pytests/time_test.py b/python/pytests/time_test.py index 934d51e05..515831210 100644 --- a/python/pytests/time_test.py +++ b/python/pytests/time_test.py @@ -6,7 +6,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -19,10 +19,12 @@ def source() -> kd.sources.CsvString: "1997-01-18T16:40:04,A,5,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_time_of_point(source, golden) -> None: +async def test_time_of_point(source, golden) -> None: m = source.col("m") n = source.col("n") golden.jsonl( @@ -37,40 +39,40 @@ def test_time_of_point(source, golden) -> None: ) -def test_time_add_days(source, golden) -> None: +async def test_time_add_days(source, golden) -> None: time = source.col("time") golden.jsonl(kd.record({"time": time, "time_plus_day": time + timedelta(days=1)})) -def test_time_add_hours(source, golden) -> None: +async def test_time_add_hours(source, golden) -> None: time = source.col("time") golden.jsonl( kd.record({"time": time, "time_plus_hours": time + timedelta(hours=1)}) ) -def test_time_add_minutes(source, golden) -> None: +async def test_time_add_minutes(source, golden) -> None: time = source.col("time") golden.jsonl( kd.record({"time": time, "time_plus_minutes": time + timedelta(minutes=1)}) ) -def test_time_add_days_and_minutes(source, golden) -> None: +async def test_time_add_days_and_minutes(source, golden) -> None: time = source.col("time") golden.jsonl( kd.record({"time": time, "time_plus_day": time + timedelta(days=3, minutes=1)}) ) -def test_time_add_seconds(source, golden) -> None: +async def test_time_add_seconds(source, golden) -> None: time = source.col("time") golden.jsonl( kd.record({"time": time, "time_plus_seconds": time + timedelta(seconds=5)}) ) -def test_compare_literal_timedelta(source, golden) -> None: +async def test_compare_literal_timedelta(source, golden) -> None: time = source.col("time") seconds_since = time.seconds_since_previous() td = timedelta(milliseconds=1000) diff --git a/python/pytests/timestream_test.py b/python/pytests/timestream_test.py index 36326cbe9..5c9416bfe 100644 --- a/python/pytests/timestream_test.py +++ b/python/pytests/timestream_test.py @@ -110,7 +110,7 @@ def test_timestream_arithmetic_types(source1) -> None: assert "Arg[1]: Timestream[int32]" in e.value.__notes__ -def test_timestream_preview(golden) -> None: +async def test_timestream_preview(golden) -> None: content = "\n".join( [ "time,key,m,n", @@ -122,12 +122,14 @@ def test_timestream_preview(golden) -> None: "1996-12-19T16:40:02,A,,", ] ) - source = kd.sources.CsvString(content, time_column="time", key_column="key") + source = await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) golden.jsonl(source.preview(limit=4)) -def test_timestream_run_non_record(golden) -> None: +async def test_timestream_run_non_record(golden) -> None: content = "\n".join( [ "time,key,m,n", @@ -139,11 +141,13 @@ def test_timestream_run_non_record(golden) -> None: "1996-12-19T16:40:02,A,,", ] ) - source = kd.sources.CsvString(content, time_column="time", key_column="key") + source = await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) golden.jsonl(source.col("m")) -def test_timestream_cast(golden) -> None: +async def test_timestream_cast(golden) -> None: content = "\n".join( [ "time,key,m,n", @@ -155,7 +159,9 @@ def test_timestream_cast(golden) -> None: "1996-12-19T16:40:02,A,,", ] ) - source = kd.sources.CsvString(content, time_column="time", key_column="key") + source = await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) golden.jsonl(source.col("time").cast(pa.timestamp("ns"))) diff --git a/python/pytests/udf_test.py b/python/pytests/udf_test.py index d3b6baf4c..370a19ead 100644 --- a/python/pytests/udf_test.py +++ b/python/pytests/udf_test.py @@ -27,7 +27,7 @@ def test_docstring() -> None: @pytest.fixture -def source_int64() -> kd.sources.CsvString: +async def source_int64() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,n", @@ -39,16 +39,18 @@ def source_int64() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,,", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_add_udf_direct(golden, source_int64) -> None: +async def test_add_udf_direct(golden, source_int64) -> None: m = source_int64.col("m") n = source_int64.col("n") golden.jsonl(kd.record({"m": m, "n": n, "add": add(m, n), "add_x2": add_x2(m, n)})) -def test_add_udf_pipe(golden, source_int64) -> None: +async def test_add_udf_pipe(golden, source_int64) -> None: m = source_int64.col("m") n = source_int64.col("n") golden.jsonl( @@ -69,7 +71,7 @@ def test_add_udf_pipe(golden, source_int64) -> None: # A bug existed where the UDF was hashing the signature rather than # the uuid, so the DFG was equating two different UDFs with the # same signature and using the first definition. -def test_add_udf_redefined_inline(golden, source_int64) -> None: +async def test_add_udf_redefined_inline(golden, source_int64) -> None: m = source_int64.col("m") n = source_int64.col("n") diff --git a/python/pytests/union_test.py b/python/pytests/union_test.py index b18d93d18..b79ab96cf 100644 --- a/python/pytests/union_test.py +++ b/python/pytests/union_test.py @@ -1,9 +1,9 @@ import kaskada as kd -def test_union(golden) -> None: - source = kd.sources.PyDict( - [ +async def test_union(golden) -> None: + source = await kd.sources.PyDict.create( + rows=[ {"time": "1996-12-19T16:39:57", "user": "A", "m": [5], "n": []}, {"time": "1996-12-19T17:39:57", "user": "A", "m": [], "n": [5, 6]}, {"time": "1996-12-19T18:39:57", "user": "A", "m": [None]}, diff --git a/python/pytests/upper_test.py b/python/pytests/upper_test.py index 047365d86..a77c21f8e 100644 --- a/python/pytests/upper_test.py +++ b/python/pytests/upper_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m", @@ -15,9 +15,11 @@ def source() -> kd.sources.CsvString: "2021-01-04T00:00:00,Ryan,hi", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_upper(source, golden) -> None: +async def test_upper(source, golden) -> None: m = source.col("m") golden.jsonl(kd.record({"m": m, "upper_m": m.upper()})) diff --git a/python/pytests/with_key_test.py b/python/pytests/with_key_test.py index f4296cf95..77822aebd 100644 --- a/python/pytests/with_key_test.py +++ b/python/pytests/with_key_test.py @@ -3,7 +3,7 @@ @pytest.fixture(scope="module") -def source() -> kd.sources.CsvString: +async def source() -> kd.sources.CsvString: content = "\n".join( [ "time,key,m,new_key", @@ -15,23 +15,25 @@ def source() -> kd.sources.CsvString: "1996-12-19T16:40:02,A,,C", ] ) - return kd.sources.CsvString(content, time_column="time", key_column="key") + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) -def test_with_key_literal(source, golden) -> None: +async def test_with_key_literal(source, golden) -> None: golden.jsonl(source.with_key("literal_key")) -def test_with_key_column(source, golden) -> None: +async def test_with_key_column(source, golden) -> None: new_key = source.col("new_key") golden.jsonl(source.with_key(new_key)) -def test_with_key_grouping(source, golden) -> None: +async def test_with_key_grouping(source, golden) -> None: new_key = source.col("new_key") grouping = "user" golden.jsonl(source.with_key(new_key, grouping)) -def test_with_key_last(source, golden) -> None: +async def test_with_key_last(source, golden) -> None: golden.jsonl(source.with_key(source.col("new_key")).last()) diff --git a/python/src/table.rs b/python/src/table.rs index 376973a49..f7329dae0 100644 --- a/python/src/table.rs +++ b/python/src/table.rs @@ -10,18 +10,19 @@ use crate::error::Result; use crate::expr::Expr; use crate::session::Session; -#[pyclass(extends=Expr, subclass)] +#[pyclass] pub(crate) struct Table { #[pyo3(get)] name: String, - rust_table: RustTable, + rust_table: Arc, + session: Session, } #[pymethods] impl Table { /// Create a new table. #[new] - #[pyo3(signature = (session, name, time_column, key_column, schema, retained, subsort_column, grouping_name, time_unit))] + #[pyo3(signature = (session, name, time_column, key_column, schema, queryable, subsort_column, grouping_name, time_unit))] #[allow(clippy::too_many_arguments)] fn new( session: Session, @@ -29,41 +30,50 @@ impl Table { time_column: &str, key_column: &str, schema: PyArrowType, - retained: bool, + queryable: bool, subsort_column: Option<&str>, grouping_name: Option<&str>, time_unit: Option<&str>, - ) -> Result<(Self, Expr)> { + ) -> Result { let raw_schema = Arc::new(schema.0); let rust_table = session.rust_session()?.add_table( &name, raw_schema, time_column, - retained, + queryable, subsort_column, key_column, grouping_name, time_unit, )?; - let rust_expr = rust_table.expr.clone(); - let table = Table { name, rust_table }; - let expr = Expr { rust_expr, session }; - Ok((table, expr)) + let table = Table { name, rust_table: Arc::new(rust_table), session }; + Ok(table) } - /// Add PyArrow data to the given table. + fn expr(&self) -> Expr { + let rust_expr = self.rust_table.expr.clone(); + Expr { rust_expr, session: self.session.clone() } + } + /// Add PyArrow data to the given table. /// /// TODO: Support other kinds of data: /// - pyarrow RecordBatchReader /// - Parquet file URLs /// - Python generators? /// TODO: Error handling - fn add_pyarrow(&mut self, data: &PyAny) -> Result<()> { + fn add_pyarrow<'py>(&self, data: &'py PyAny, py: Python<'py>) -> Result<&'py PyAny> { let data = RecordBatch::from_pyarrow(data)?; - self.rust_table.add_data(data)?; - Ok(()) + + let rust_table = self.rust_table.clone(); + Ok(pyo3_asyncio::tokio::future_into_py(py, async move { + let result = rust_table.add_data(data).await; + Python::with_gil(|py| { + result.unwrap(); + Ok(py.None()) + }) + })?) } } diff --git a/python/visualize.ipynb b/python/visualize.ipynb index 665618b36..b09dafffc 100644 --- a/python/visualize.ipynb +++ b/python/visualize.ipynb @@ -27,6 +27,8 @@ "metadata": {}, "outputs": [], "source": [ + "import asyncio \n", + "\n", "data = \"\\n\".join(\n", " [\n", " \"time,key,m,n\",\n", @@ -36,7 +38,7 @@ " \"1997-04-18T16:40:00,A,,9\",\n", " ]\n", " )\n", - "source = kd.sources.CsvString(data, time_column=\"time\", key_column=\"key\")\n", + "source = await kd.sources.CsvString.create(data, time_column=\"time\", key_column=\"key\")\n", "\n", "kd.plot.render(\n", " kd.plot.Plot(source.col(\"m\"), name=\"m\"),\n",