From aa20ff8a68baa7354b93c0705ee6822ad5646c02 Mon Sep 17 00:00:00 2001 From: Ben Chambers <35960+bjchambers@users.noreply.github.com> Date: Thu, 31 Aug 2023 12:48:20 -0700 Subject: [PATCH 1/6] feat: Add support for results and time ranges This adds the plumbing to use History and Snapshot result configurations. It adds the necessary decoration code to the session. - [ ] Review tests / time -- looks like time zones cause some errors - [ ] Get final snapshot test passing (or indicate it is expected to fail) --- crates/sparrow-compiler/src/frontend.rs | 139 +++++++++--------- crates/sparrow-runtime/src/execute.rs | 8 + crates/sparrow-session/src/lib.rs | 2 +- crates/sparrow-session/src/session.rs | 87 ++++++++--- python/pysrc/kaskada/__init__.py | 2 +- python/pysrc/kaskada/_execution.py | 32 ++-- python/pysrc/kaskada/_timestream.py | 18 +++ python/pytests/conftest.py | 6 + .../{result_test.py => execution_test.py} | 17 +++ .../golden/execution_test/test_history.jsonl | 6 + .../execution_test/test_history_1.jsonl | 4 + .../execution_test/test_history_2.jsonl | 1 + .../execution_test/test_history_3.jsonl | 1 + .../test_iter_pandas.jsonl | 0 .../test_iter_pandas_1.jsonl | 0 .../test_iter_pandas_async.jsonl | 0 .../test_iter_pandas_async_1.jsonl | 0 .../test_iter_pandas_async_live.jsonl | 0 .../test_iter_pandas_async_live_1.jsonl | 0 .../golden/execution_test/test_snapshot.jsonl | 2 + .../execution_test/test_snapshot_1.jsonl | 1 + .../execution_test/test_snapshot_2.jsonl | 6 + python/src/expr.rs | 19 ++- 23 files changed, 256 insertions(+), 95 deletions(-) rename python/pytests/{result_test.py => execution_test.py} (72%) create mode 100644 python/pytests/golden/execution_test/test_history.jsonl create mode 100644 python/pytests/golden/execution_test/test_history_1.jsonl create mode 100644 python/pytests/golden/execution_test/test_history_2.jsonl create mode 100644 python/pytests/golden/execution_test/test_history_3.jsonl rename python/pytests/golden/{result_test => execution_test}/test_iter_pandas.jsonl (100%) rename python/pytests/golden/{result_test => execution_test}/test_iter_pandas_1.jsonl (100%) rename python/pytests/golden/{result_test => execution_test}/test_iter_pandas_async.jsonl (100%) rename python/pytests/golden/{result_test => execution_test}/test_iter_pandas_async_1.jsonl (100%) rename python/pytests/golden/{result_test => execution_test}/test_iter_pandas_async_live.jsonl (100%) rename python/pytests/golden/{result_test => execution_test}/test_iter_pandas_async_live_1.jsonl (100%) create mode 100644 python/pytests/golden/execution_test/test_snapshot.jsonl create mode 100644 python/pytests/golden/execution_test/test_snapshot_1.jsonl create mode 100644 python/pytests/golden/execution_test/test_snapshot_2.jsonl diff --git a/crates/sparrow-compiler/src/frontend.rs b/crates/sparrow-compiler/src/frontend.rs index 6f746e4b6..93239bef5 100644 --- a/crates/sparrow-compiler/src/frontend.rs +++ b/crates/sparrow-compiler/src/frontend.rs @@ -165,72 +165,14 @@ impl FrontendOutput { false }; - // Decorate the expression as needed for the query_type. - let result_node = match options.per_entity_behavior { - _ if !executable => { - // Don't decorate incomplete or otherwise non-executable expressions. - // We don't produce executable plans for incomplete expressions. - query.value() - } - PerEntityBehavior::All => { - dfg.enter_env(); - dfg.bind("result", query); - let time_node = create_changed_since_time_node(&mut dfg)?; - dfg.bind("__changed_since_time__", time_node); - - let decorated = add_decoration( - data_context, - &mut diagnostics, - &mut dfg, - CHANGED_SINCE_DECORATION, - )?; - dfg.exit_env(); - decorated.value() - } - PerEntityBehavior::Final => { - dfg.enter_env(); - dfg.bind("result", query); - - // Treat FINAL queries as changed_since_time of 0 - let time_node = create_changed_since_time_node(&mut dfg)?; - dfg.bind("__changed_since_time__", time_node); - - let decorated = add_decoration( - data_context, - &mut diagnostics, - &mut dfg, - FINAL_QUERY_DECORATION, - )?; - dfg.exit_env(); - decorated.value() - } - PerEntityBehavior::FinalAtTime => { - dfg.enter_env(); - dfg.bind("result", query); - - // Treat FINAL queries as changed_since_time of 0 - let time_node = create_changed_since_time_node(&mut dfg)?; - dfg.bind("__changed_since_time__", time_node); - - let time_node = create_final_at_time_time_node(&mut dfg)?; - dfg.bind("__final_at_time__", time_node); - - // 1. If the final query time is provided then use it as the query final time in - // the special decorator 2. Use the same per entity behavior - // final for all of them - let decorated = add_decoration( - data_context, - &mut diagnostics, - &mut dfg, - FINAL_QUERY_AT_TIME_DECORATION, - )?; - dfg.exit_env(); - decorated.value() - } - PerEntityBehavior::Unspecified => { - anyhow::bail!("Unspecified per entity behavior") - } - }; + let result_node = decorate( + data_context, + &mut dfg, + &mut diagnostics, + executable, + query, + options.per_entity_behavior, + )?; // Create the basic analysis information. let num_errors = diagnostics.num_errors(); @@ -320,6 +262,71 @@ impl FrontendOutput { } } +pub fn decorate( + data_context: &mut DataContext, + dfg: &mut Dfg, + diagnostics: &mut DiagnosticCollector<'_>, + executable: bool, + query: AstDfgRef, + per_entity_behavior: PerEntityBehavior, +) -> anyhow::Result { + match per_entity_behavior { + _ if !executable => { + // Don't decorate incomplete or otherwise non-executable expressions. + // We don't produce executable plans for incomplete expressions. + Ok(query.value()) + } + PerEntityBehavior::All => { + dfg.enter_env(); + dfg.bind("result", query); + let time_node = create_changed_since_time_node(dfg)?; + dfg.bind("__changed_since_time__", time_node); + let decorated = + add_decoration(data_context, diagnostics, dfg, CHANGED_SINCE_DECORATION)?; + dfg.exit_env(); + Ok(decorated.value()) + } + PerEntityBehavior::Final => { + dfg.enter_env(); + dfg.bind("result", query); + + // Treat FINAL queries as changed_since_time of 0 + let time_node = create_changed_since_time_node(dfg)?; + dfg.bind("__changed_since_time__", time_node); + + let decorated = add_decoration(data_context, diagnostics, dfg, FINAL_QUERY_DECORATION)?; + dfg.exit_env(); + Ok(decorated.value()) + } + PerEntityBehavior::FinalAtTime => { + dfg.enter_env(); + dfg.bind("result", query); + + // Treat FINAL queries as changed_since_time of 0 + let time_node = create_changed_since_time_node(dfg)?; + dfg.bind("__changed_since_time__", time_node); + + let time_node = create_final_at_time_time_node(dfg)?; + dfg.bind("__final_at_time__", time_node); + + // 1. If the final query time is provided then use it as the query final time in + // the special decorator 2. Use the same per entity behavior + // final for all of them + let decorated = add_decoration( + data_context, + diagnostics, + dfg, + FINAL_QUERY_AT_TIME_DECORATION, + )?; + dfg.exit_env(); + Ok(decorated.value()) + } + PerEntityBehavior::Unspecified => { + anyhow::bail!("Unspecified per entity behavior") + } + } +} + /// Adds the given decoration to the dfg. fn add_decoration( data_context: &mut DataContext, diff --git a/crates/sparrow-runtime/src/execute.rs b/crates/sparrow-runtime/src/execute.rs index 37b8f33ef..d0914d142 100644 --- a/crates/sparrow-runtime/src/execute.rs +++ b/crates/sparrow-runtime/src/execute.rs @@ -118,10 +118,18 @@ impl ExecutionOptions { self.changed_since_time = changed_since; } + pub fn set_changed_since_s(&mut self, seconds: i64) { + self.changed_since_time = Timestamp { seconds, nanos: 0 }; + } + pub fn set_final_at_time(&mut self, final_at_time: Timestamp) { self.final_at_time = Some(final_at_time); } + pub fn set_final_at_time_s(&mut self, seconds: i64) { + self.changed_since_time = Timestamp { seconds, nanos: 0 }; + } + async fn compute_store( &self, object_stores: &ObjectStoreRegistry, diff --git a/crates/sparrow-session/src/lib.rs b/crates/sparrow-session/src/lib.rs index bbc72adef..3dcc74d27 100644 --- a/crates/sparrow-session/src/lib.rs +++ b/crates/sparrow-session/src/lib.rs @@ -7,5 +7,5 @@ mod table; pub use error::Error; pub use execution::Execution; pub use expr::{Expr, Literal}; -pub use session::{ExecutionOptions, Session}; +pub use session::{ExecutionOptions, Results, Session}; pub use table::Table; diff --git a/crates/sparrow-session/src/session.rs b/crates/sparrow-session/src/session.rs index ad11ce81f..45e174e64 100644 --- a/crates/sparrow-session/src/session.rs +++ b/crates/sparrow-session/src/session.rs @@ -34,6 +34,13 @@ pub struct Session { udfs: HashMap>, } +#[derive(Default)] +pub enum Results { + #[default] + History, + Snapshot, +} + #[derive(Default)] pub struct ExecutionOptions { /// The maximum number of rows to return. @@ -42,6 +49,16 @@ pub struct ExecutionOptions { pub max_batch_size: Option, /// Whether to run execute as a materialization or not. pub materialize: bool, + // History or Snapshot results. + pub results: Results, + /// The changed since time. This is the minimum timestamp of changes to events. + /// For historic queries, this limits the output points. + /// For snapshot queries, this limits the set of entities that are considered changed. + pub changed_since_time_s: Option, + /// The final at time. This ist he maximum timestamp output. + /// For historic queries, this limits the output points. + /// For snapshot queries, this determines the time at which the snapshot is produced. + pub final_at_time_s: Option, } /// Adds a table to the session. @@ -345,31 +362,49 @@ impl Session { } } + /// Execute the query. + /// + /// It is unnfortunate this requires `&mut self` instead of `&self`. It relates to the + /// fact that the decorations may require mutating the DFG, which in turn requires + /// mutability. In practice, the decorations shouldn't mutate the DFG and/or that + /// shouldn't require mutating the session. pub fn execute( - &self, - expr: &Expr, + &mut self, + query: &Expr, options: ExecutionOptions, ) -> error_stack::Result { // TODO: Decorations? - let group_id = expr + let group_id = query .0 .grouping() .expect("query to be grouped (non-literal)"); - let primary_group_info = self - .data_context - .group_info(group_id) - .expect("missing group info"); - let primary_grouping = primary_group_info.name().to_owned(); - let primary_grouping_key_type = primary_group_info.key_type(); - // Hacky. Ideally, we'd determine the schema from the created execution plan. - // Currently, this isn't easily available. Instead, we create this from the - // columns we know we're producing. - let schema = result_schema(expr, primary_grouping_key_type)?; + let per_entity_behavior = match options.results { + Results::History => PerEntityBehavior::All, + Results::Snapshot if options.final_at_time_s.is_some() => { + PerEntityBehavior::FinalAtTime + } + Results::Snapshot => PerEntityBehavior::Final, + }; + + // Apply decorations as necessary for the per-entity behavior. + let feature_set = FeatureSet::default(); + let mut diagnostics = DiagnosticCollector::new(&feature_set); + let expr = sparrow_compiler::decorate( + &mut self.data_context, + &mut self.dfg, + &mut diagnostics, + true, + query.0.clone(), + per_entity_behavior, + ) + .into_report() + .change_context(Error::Compile)?; + error_stack::ensure!(diagnostics.num_errors() == 0, Error::Internal); - // First, extract the necessary subset of the DFG as an expression. + // Extract the necessary subset of the DFG as an expression. // This will allow us to operate without mutating things. - let expr = self.dfg.extract_simplest(expr.0.value()); + let expr = self.dfg.extract_simplest(expr); let expr = expr .simplify(&CompilerOptions { ..CompilerOptions::default() @@ -380,13 +415,24 @@ impl Session { .into_report() .change_context(Error::Compile)?; + let primary_group_info = self + .data_context + .group_info(group_id) + .expect("missing group info"); + let primary_grouping = primary_group_info.name().to_owned(); + let primary_grouping_key_type = primary_group_info.key_type(); + + // Hacky. Ideally, we'd determine the schema from the created execution plan. + // Currently, this isn't easily available. Instead, we create this from the + // columns we know we're producing. + let schema = result_schema(query, primary_grouping_key_type)?; + // TODO: Incremental? // TODO: Slicing? let plan = sparrow_compiler::plan::extract_plan_proto( &self.data_context, expr, - // TODO: Configure per-entity behavior. - PerEntityBehavior::Final, + per_entity_behavior, primary_grouping, primary_grouping_key_type, ) @@ -475,6 +521,13 @@ impl ExecutionOptions { }); } + if let Some(changed_since) = self.changed_since_time_s { + options.set_changed_since_s(changed_since); + } + if let Some(final_at_time) = self.final_at_time_s { + options.set_final_at_time_s(final_at_time); + } + options } } diff --git a/python/pysrc/kaskada/__init__.py b/python/pysrc/kaskada/__init__.py index 6e15c05b6..5a7dc922a 100644 --- a/python/pysrc/kaskada/__init__.py +++ b/python/pysrc/kaskada/__init__.py @@ -1,7 +1,7 @@ """Kaskada query builder and local execution engine.""" from __future__ import annotations -from . import destinations, plot, sources, windows +from . import destinations, plot, sources, windows, results from ._execution import Execution, ResultIterator from ._session import init_session from ._timestream import Arg, LiteralValue, Timestream, record diff --git a/python/pysrc/kaskada/_execution.py b/python/pysrc/kaskada/_execution.py index 20f26149b..15c13dc1d 100644 --- a/python/pysrc/kaskada/_execution.py +++ b/python/pysrc/kaskada/_execution.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import AsyncIterator, Callable, Iterator, Optional, TypeVar +from typing import AsyncIterator, Callable, Iterator, Literal, Optional, TypeVar import pyarrow as pa @@ -11,19 +11,33 @@ @dataclass class _ExecutionOptions: - """Execution options passed to the FFI layer. - - Attributes: - row_limit: The maximum number of rows to return. If not specified, all rows are returned. - max_batch_size: The maximum batch size to use when returning results. - If not specified, the default batch size will be used. - materialize: If true, the query will be a continuous materialization. - """ + """Execution options passed to the FFI layer.""" + #: The maximum number of rows to return. + #: If not specified, all rows are returned. row_limit: Optional[int] = None + + #: The maximum batch size to use when returning results. + #: If not specified, the default batch size will be used. max_batch_size: Optional[int] = None + + #: If true, the query will be a continuous materialization. materialize: bool = False + #: The type of results to return. + results: Literal["history", "snapshot"] = "history" + + #: The earliest time of changes to include in the results. + #: For history, this limits the points output. + #: For snapshots, this limits the entities that are output. + changed_since: Optional[int] = None + + #: The last time to process. + #: If not set, defaults to the current time. + #: For history, this limits the points output. + #: For snapshots, this determines when the snapshot is produced. + final_at: Optional[int] = None + class Execution: """Represents an execution of a TimeStream.""" diff --git a/python/pysrc/kaskada/_timestream.py b/python/pysrc/kaskada/_timestream.py index eebe4c698..3cdd4c3ef 100644 --- a/python/pysrc/kaskada/_timestream.py +++ b/python/pysrc/kaskada/_timestream.py @@ -1084,6 +1084,24 @@ def _execute( max_batch_size=max_batch_size, materialize=mode == "live", ) + + if results is None: + results = kd.results.History() + + if isinstance(results, kd.results.History): + options.results = 'history' + if results.since is not None: + options.changed_since = int(results.since.timestamp()) + if results.until is not None: + options.final_at = int(results.until.timestamp()) + elif isinstance(results, kd.results.Snapshot): + options.results = 'snapshot' + if results.changed_since is not None: + options.changed_since = int(results.changed_since.timestamp()) + if results.at is not None: + options.final_at = int(results.at.timestamp()) + else: + raise AssertionError(f"Unhandled results type {results!r}") return expr._ffi_expr.execute(options) diff --git a/python/pytests/conftest.py b/python/pytests/conftest.py index 7dd0e2571..4db19f17d 100644 --- a/python/pytests/conftest.py +++ b/python/pytests/conftest.py @@ -27,8 +27,10 @@ def jsonl(self, data: Union[kd.Timestream, pd.DataFrame]) -> None: """Golden test against newline-delimited JSON file (json-lines).""" df = _data_to_dataframe(data) filename = self._filename("jsonl") + print(f"Results: {df}, {filename}") if self._save: + print("Saving") df.to_json( filename, orient="records", @@ -45,6 +47,10 @@ def jsonl(self, data: Union[kd.Timestream, pd.DataFrame]) -> None: date_unit="ns", ) + if golden.empty and df.empty: + # For some reason, even when specifying the dtypes, reading an empty + # json produces no columns. + return pd.testing.assert_frame_equal(df, golden, check_datetimelike_compat=True) def parquet(self, data: Union[kd.Timestream, pd.DataFrame]) -> None: diff --git a/python/pytests/result_test.py b/python/pytests/execution_test.py similarity index 72% rename from python/pytests/result_test.py rename to python/pytests/execution_test.py index b1671e550..c82068c14 100644 --- a/python/pytests/result_test.py +++ b/python/pytests/execution_test.py @@ -3,6 +3,7 @@ import kaskada as kd import pandas as pd import pytest +from datetime import datetime @pytest.fixture @@ -82,3 +83,19 @@ async def test_iter_pandas_async_live(golden, source_int64) -> None: execution.stop() with pytest.raises(StopAsyncIteration): print(await execution.__anext__()) + + +def test_snapshot(golden, source_int64) -> None: + query = source_int64.col("m").sum() + golden.jsonl(query.to_pandas(kd.results.Snapshot())) + golden.jsonl(query.to_pandas(kd.results.Snapshot(changed_since=datetime.fromisoformat("1996-12-19T16:39:59Z")))) + golden.jsonl(query.to_pandas(kd.results.Snapshot(at=datetime.fromisoformat("1996-12-20T12:00:00Z")))) + +def test_history(golden, source_int64) -> None: + query = source_int64.col("m").sum() + golden.jsonl(query.to_pandas(kd.results.History())) + golden.jsonl(query.to_pandas(kd.results.History(since=datetime.fromisoformat("1996-12-19T16:39:59Z")))) + golden.jsonl(query.to_pandas(kd.results.History(until=datetime.fromisoformat("1996-12-20T12:00:00Z")))) + golden.jsonl(query.to_pandas(kd.results.History( + since=datetime.fromisoformat("1996-12-19T16:39:59Z"), + until=datetime.fromisoformat("1996-12-20T12:00:00Z")))) diff --git a/python/pytests/golden/execution_test/test_history.jsonl b/python/pytests/golden/execution_test/test_history.jsonl new file mode 100644 index 000000000..735353c8b --- /dev/null +++ b/python/pytests/golden/execution_test/test_history.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","result":5} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","result":24} +{"_time":"1996-12-19T16:39:59.000000000","_key":"A","result":22} +{"_time":"1996-12-19T16:40:00.000000000","_key":"A","result":22} +{"_time":"1996-12-19T16:40:01.000000000","_key":"A","result":34} +{"_time":"1996-12-19T16:40:02.000000000","_key":"A","result":34} diff --git a/python/pytests/golden/execution_test/test_history_1.jsonl b/python/pytests/golden/execution_test/test_history_1.jsonl new file mode 100644 index 000000000..abaecf535 --- /dev/null +++ b/python/pytests/golden/execution_test/test_history_1.jsonl @@ -0,0 +1,4 @@ +{"_time":"1996-12-19T16:39:59.000000000","_key":"A","result":22} +{"_time":"1996-12-19T16:40:00.000000000","_key":"A","result":22} +{"_time":"1996-12-19T16:40:01.000000000","_key":"A","result":34} +{"_time":"1996-12-19T16:40:02.000000000","_key":"A","result":34} diff --git a/python/pytests/golden/execution_test/test_history_2.jsonl b/python/pytests/golden/execution_test/test_history_2.jsonl new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/python/pytests/golden/execution_test/test_history_2.jsonl @@ -0,0 +1 @@ + diff --git a/python/pytests/golden/execution_test/test_history_3.jsonl b/python/pytests/golden/execution_test/test_history_3.jsonl new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/python/pytests/golden/execution_test/test_history_3.jsonl @@ -0,0 +1 @@ + diff --git a/python/pytests/golden/result_test/test_iter_pandas.jsonl b/python/pytests/golden/execution_test/test_iter_pandas.jsonl similarity index 100% rename from python/pytests/golden/result_test/test_iter_pandas.jsonl rename to python/pytests/golden/execution_test/test_iter_pandas.jsonl diff --git a/python/pytests/golden/result_test/test_iter_pandas_1.jsonl b/python/pytests/golden/execution_test/test_iter_pandas_1.jsonl similarity index 100% rename from python/pytests/golden/result_test/test_iter_pandas_1.jsonl rename to python/pytests/golden/execution_test/test_iter_pandas_1.jsonl diff --git a/python/pytests/golden/result_test/test_iter_pandas_async.jsonl b/python/pytests/golden/execution_test/test_iter_pandas_async.jsonl similarity index 100% rename from python/pytests/golden/result_test/test_iter_pandas_async.jsonl rename to python/pytests/golden/execution_test/test_iter_pandas_async.jsonl diff --git a/python/pytests/golden/result_test/test_iter_pandas_async_1.jsonl b/python/pytests/golden/execution_test/test_iter_pandas_async_1.jsonl similarity index 100% rename from python/pytests/golden/result_test/test_iter_pandas_async_1.jsonl rename to python/pytests/golden/execution_test/test_iter_pandas_async_1.jsonl diff --git a/python/pytests/golden/result_test/test_iter_pandas_async_live.jsonl b/python/pytests/golden/execution_test/test_iter_pandas_async_live.jsonl similarity index 100% rename from python/pytests/golden/result_test/test_iter_pandas_async_live.jsonl rename to python/pytests/golden/execution_test/test_iter_pandas_async_live.jsonl diff --git a/python/pytests/golden/result_test/test_iter_pandas_async_live_1.jsonl b/python/pytests/golden/execution_test/test_iter_pandas_async_live_1.jsonl similarity index 100% rename from python/pytests/golden/result_test/test_iter_pandas_async_live_1.jsonl rename to python/pytests/golden/execution_test/test_iter_pandas_async_live_1.jsonl diff --git a/python/pytests/golden/execution_test/test_snapshot.jsonl b/python/pytests/golden/execution_test/test_snapshot.jsonl new file mode 100644 index 000000000..f65f104b2 --- /dev/null +++ b/python/pytests/golden/execution_test/test_snapshot.jsonl @@ -0,0 +1,2 @@ +{"_time":"1996-12-19T16:40:02.000000001","_key":"B","result":24} +{"_time":"1996-12-19T16:40:02.000000001","_key":"A","result":34} diff --git a/python/pytests/golden/execution_test/test_snapshot_1.jsonl b/python/pytests/golden/execution_test/test_snapshot_1.jsonl new file mode 100644 index 000000000..ec2f648b1 --- /dev/null +++ b/python/pytests/golden/execution_test/test_snapshot_1.jsonl @@ -0,0 +1 @@ +{"_time":"1996-12-19T16:40:02.000000001","_key":"A","result":34} diff --git a/python/pytests/golden/execution_test/test_snapshot_2.jsonl b/python/pytests/golden/execution_test/test_snapshot_2.jsonl new file mode 100644 index 000000000..735353c8b --- /dev/null +++ b/python/pytests/golden/execution_test/test_snapshot_2.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","result":5} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","result":24} +{"_time":"1996-12-19T16:39:59.000000000","_key":"A","result":22} +{"_time":"1996-12-19T16:40:00.000000000","_key":"A","result":22} +{"_time":"1996-12-19T16:40:01.000000000","_key":"A","result":34} +{"_time":"1996-12-19T16:40:02.000000000","_key":"A","result":34} diff --git a/python/src/expr.rs b/python/src/expr.rs index 5c427305d..c706ea8ca 100644 --- a/python/src/expr.rs +++ b/python/src/expr.rs @@ -109,7 +109,7 @@ impl Expr { } fn execute(&self, options: Option<&PyAny>) -> Result { - let session = self.session.rust_session()?; + let mut session = self.session.rust_session()?; let options = extract_options(options)?; let execution = session.execute(&self.rust_expr, options)?; Ok(Execution::new(execution)) @@ -167,11 +167,28 @@ fn extract_options(options: Option<&PyAny>) -> Result sparrow_session::Results::History, + "snapshot" => sparrow_session::Results::Snapshot, + invalid => { + return Err( + PyValueError::new_err(format!("invalid results '{invalid}'")).into(), + ) + } + }; Ok(sparrow_session::ExecutionOptions { row_limit: options.getattr(row_limit)?.extract()?, max_batch_size: options.getattr(max_batch_size)?.extract()?, materialize: options.getattr(materialize)?.extract()?, + results, + changed_since_time_s: options.getattr(changed_since_time)?.extract()?, + final_at_time_s: options.getattr(final_at_time)?.extract()?, }) } } From 4f33e1a2e6209032ebe48749595fa8f702b5e327 Mon Sep 17 00:00:00 2001 From: Ben Chambers <35960+bjchambers@users.noreply.github.com> Date: Fri, 1 Sep 2023 08:27:17 -0700 Subject: [PATCH 2/6] improve error --- .../src/execute/operation/expression_executor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/sparrow-runtime/src/execute/operation/expression_executor.rs b/crates/sparrow-runtime/src/execute/operation/expression_executor.rs index 776821808..87bd35846 100644 --- a/crates/sparrow-runtime/src/execute/operation/expression_executor.rs +++ b/crates/sparrow-runtime/src/execute/operation/expression_executor.rs @@ -169,7 +169,7 @@ impl ExpressionExecutor { LateBoundValue::from_i32(late_bound).context("late bound value")?; let literal = late_bindings[late_bound] .as_ref() - .context("missing late bound value")? + .with_context(|| format!("missing late bound value {late_bound:?}"))? .clone(); anyhow::ensure!( From b5e4ebfb46d67bb4d919ca3a19f27bcbbbdefcdd Mon Sep 17 00:00:00 2001 From: Ben Chambers <35960+bjchambers@users.noreply.github.com> Date: Fri, 1 Sep 2023 08:32:43 -0700 Subject: [PATCH 3/6] fix tests --- crates/sparrow-runtime/src/execute.rs | 2 +- python/pytests/golden/execution_test/test_history_2.jsonl | 7 ++++++- python/pytests/golden/execution_test/test_history_3.jsonl | 5 ++++- .../pytests/golden/execution_test/test_snapshot_2.jsonl | 8 ++------ 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/crates/sparrow-runtime/src/execute.rs b/crates/sparrow-runtime/src/execute.rs index d0914d142..08dea7baf 100644 --- a/crates/sparrow-runtime/src/execute.rs +++ b/crates/sparrow-runtime/src/execute.rs @@ -127,7 +127,7 @@ impl ExecutionOptions { } pub fn set_final_at_time_s(&mut self, seconds: i64) { - self.changed_since_time = Timestamp { seconds, nanos: 0 }; + self.final_at_time = Some(Timestamp { seconds, nanos: 0 }); } async fn compute_store( diff --git a/python/pytests/golden/execution_test/test_history_2.jsonl b/python/pytests/golden/execution_test/test_history_2.jsonl index 8b1378917..735353c8b 100644 --- a/python/pytests/golden/execution_test/test_history_2.jsonl +++ b/python/pytests/golden/execution_test/test_history_2.jsonl @@ -1 +1,6 @@ - +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","result":5} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","result":24} +{"_time":"1996-12-19T16:39:59.000000000","_key":"A","result":22} +{"_time":"1996-12-19T16:40:00.000000000","_key":"A","result":22} +{"_time":"1996-12-19T16:40:01.000000000","_key":"A","result":34} +{"_time":"1996-12-19T16:40:02.000000000","_key":"A","result":34} diff --git a/python/pytests/golden/execution_test/test_history_3.jsonl b/python/pytests/golden/execution_test/test_history_3.jsonl index 8b1378917..abaecf535 100644 --- a/python/pytests/golden/execution_test/test_history_3.jsonl +++ b/python/pytests/golden/execution_test/test_history_3.jsonl @@ -1 +1,4 @@ - +{"_time":"1996-12-19T16:39:59.000000000","_key":"A","result":22} +{"_time":"1996-12-19T16:40:00.000000000","_key":"A","result":22} +{"_time":"1996-12-19T16:40:01.000000000","_key":"A","result":34} +{"_time":"1996-12-19T16:40:02.000000000","_key":"A","result":34} diff --git a/python/pytests/golden/execution_test/test_snapshot_2.jsonl b/python/pytests/golden/execution_test/test_snapshot_2.jsonl index 735353c8b..9ea5bdd8f 100644 --- a/python/pytests/golden/execution_test/test_snapshot_2.jsonl +++ b/python/pytests/golden/execution_test/test_snapshot_2.jsonl @@ -1,6 +1,2 @@ -{"_time":"1996-12-19T16:39:57.000000000","_key":"A","result":5} -{"_time":"1996-12-19T16:39:58.000000000","_key":"B","result":24} -{"_time":"1996-12-19T16:39:59.000000000","_key":"A","result":22} -{"_time":"1996-12-19T16:40:00.000000000","_key":"A","result":22} -{"_time":"1996-12-19T16:40:01.000000000","_key":"A","result":34} -{"_time":"1996-12-19T16:40:02.000000000","_key":"A","result":34} +{"_time":"1996-12-20T12:00:00.000000001","_key":"B","result":24} +{"_time":"1996-12-20T12:00:00.000000001","_key":"A","result":34} From 9fa10da168f6cd670e48fc43947586884191a49c Mon Sep 17 00:00:00 2001 From: Ben Chambers <35960+bjchambers@users.noreply.github.com> Date: Fri, 1 Sep 2023 14:41:11 -0700 Subject: [PATCH 4/6] comments --- crates/sparrow-session/src/session.rs | 4 ++-- python/pytests/conftest.py | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/crates/sparrow-session/src/session.rs b/crates/sparrow-session/src/session.rs index 45e174e64..75ac002ea 100644 --- a/crates/sparrow-session/src/session.rs +++ b/crates/sparrow-session/src/session.rs @@ -49,13 +49,13 @@ pub struct ExecutionOptions { pub max_batch_size: Option, /// Whether to run execute as a materialization or not. pub materialize: bool, - // History or Snapshot results. + /// History or Snapshot results. pub results: Results, /// The changed since time. This is the minimum timestamp of changes to events. /// For historic queries, this limits the output points. /// For snapshot queries, this limits the set of entities that are considered changed. pub changed_since_time_s: Option, - /// The final at time. This ist he maximum timestamp output. + /// The final at time. This is the maximum timestamp output. /// For historic queries, this limits the output points. /// For snapshot queries, this determines the time at which the snapshot is produced. pub final_at_time_s: Option, diff --git a/python/pytests/conftest.py b/python/pytests/conftest.py index 4db19f17d..b2bf14f14 100644 --- a/python/pytests/conftest.py +++ b/python/pytests/conftest.py @@ -27,10 +27,8 @@ def jsonl(self, data: Union[kd.Timestream, pd.DataFrame]) -> None: """Golden test against newline-delimited JSON file (json-lines).""" df = _data_to_dataframe(data) filename = self._filename("jsonl") - print(f"Results: {df}, {filename}") if self._save: - print("Saving") df.to_json( filename, orient="records", From 2d88c7fd3f6797a70f5f369e3e89481567665c73 Mon Sep 17 00:00:00 2001 From: Ben Chambers <35960+bjchambers@users.noreply.github.com> Date: Fri, 1 Sep 2023 16:08:08 -0700 Subject: [PATCH 5/6] lint --- python/pysrc/kaskada/__init__.py | 2 +- python/pysrc/kaskada/_timestream.py | 4 +-- python/pytests/execution_test.py | 40 +++++++++++++++++++++++------ 3 files changed, 35 insertions(+), 11 deletions(-) diff --git a/python/pysrc/kaskada/__init__.py b/python/pysrc/kaskada/__init__.py index 5a7dc922a..37d1d2dff 100644 --- a/python/pysrc/kaskada/__init__.py +++ b/python/pysrc/kaskada/__init__.py @@ -1,7 +1,7 @@ """Kaskada query builder and local execution engine.""" from __future__ import annotations -from . import destinations, plot, sources, windows, results +from . import destinations, plot, results, sources, windows from ._execution import Execution, ResultIterator from ._session import init_session from ._timestream import Arg, LiteralValue, Timestream, record diff --git a/python/pysrc/kaskada/_timestream.py b/python/pysrc/kaskada/_timestream.py index 3cdd4c3ef..5541e4cbb 100644 --- a/python/pysrc/kaskada/_timestream.py +++ b/python/pysrc/kaskada/_timestream.py @@ -1089,13 +1089,13 @@ def _execute( results = kd.results.History() if isinstance(results, kd.results.History): - options.results = 'history' + options.results = "history" if results.since is not None: options.changed_since = int(results.since.timestamp()) if results.until is not None: options.final_at = int(results.until.timestamp()) elif isinstance(results, kd.results.Snapshot): - options.results = 'snapshot' + options.results = "snapshot" if results.changed_since is not None: options.changed_since = int(results.changed_since.timestamp()) if results.at is not None: diff --git a/python/pytests/execution_test.py b/python/pytests/execution_test.py index c82068c14..3a5dcd911 100644 --- a/python/pytests/execution_test.py +++ b/python/pytests/execution_test.py @@ -1,9 +1,9 @@ +from datetime import datetime from typing import AsyncIterator, Iterator import kaskada as kd import pandas as pd import pytest -from datetime import datetime @pytest.fixture @@ -88,14 +88,38 @@ async def test_iter_pandas_async_live(golden, source_int64) -> None: def test_snapshot(golden, source_int64) -> None: query = source_int64.col("m").sum() golden.jsonl(query.to_pandas(kd.results.Snapshot())) - golden.jsonl(query.to_pandas(kd.results.Snapshot(changed_since=datetime.fromisoformat("1996-12-19T16:39:59Z")))) - golden.jsonl(query.to_pandas(kd.results.Snapshot(at=datetime.fromisoformat("1996-12-20T12:00:00Z")))) + golden.jsonl( + query.to_pandas( + kd.results.Snapshot( + changed_since=datetime.fromisoformat("1996-12-19T16:39:59Z") + ) + ) + ) + golden.jsonl( + query.to_pandas( + kd.results.Snapshot(at=datetime.fromisoformat("1996-12-20T12:00:00Z")) + ) + ) + def test_history(golden, source_int64) -> None: query = source_int64.col("m").sum() golden.jsonl(query.to_pandas(kd.results.History())) - golden.jsonl(query.to_pandas(kd.results.History(since=datetime.fromisoformat("1996-12-19T16:39:59Z")))) - golden.jsonl(query.to_pandas(kd.results.History(until=datetime.fromisoformat("1996-12-20T12:00:00Z")))) - golden.jsonl(query.to_pandas(kd.results.History( - since=datetime.fromisoformat("1996-12-19T16:39:59Z"), - until=datetime.fromisoformat("1996-12-20T12:00:00Z")))) + golden.jsonl( + query.to_pandas( + kd.results.History(since=datetime.fromisoformat("1996-12-19T16:39:59Z")) + ) + ) + golden.jsonl( + query.to_pandas( + kd.results.History(until=datetime.fromisoformat("1996-12-20T12:00:00Z")) + ) + ) + golden.jsonl( + query.to_pandas( + kd.results.History( + since=datetime.fromisoformat("1996-12-19T16:39:59Z"), + until=datetime.fromisoformat("1996-12-20T12:00:00Z"), + ) + ) + ) From b5e81621a2799f038ff3a4e82586201569cc7f61 Mon Sep 17 00:00:00 2001 From: Ben Chambers <35960+bjchambers@users.noreply.github.com> Date: Fri, 1 Sep 2023 19:33:17 -0700 Subject: [PATCH 6/6] fix dates on <3.11 --- python/pytests/execution_test.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/pytests/execution_test.py b/python/pytests/execution_test.py index 3a5dcd911..85e07ac62 100644 --- a/python/pytests/execution_test.py +++ b/python/pytests/execution_test.py @@ -91,13 +91,13 @@ def test_snapshot(golden, source_int64) -> None: golden.jsonl( query.to_pandas( kd.results.Snapshot( - changed_since=datetime.fromisoformat("1996-12-19T16:39:59Z") + changed_since=datetime.fromisoformat("1996-12-19T16:39:59+00:00") ) ) ) golden.jsonl( query.to_pandas( - kd.results.Snapshot(at=datetime.fromisoformat("1996-12-20T12:00:00Z")) + kd.results.Snapshot(at=datetime.fromisoformat("1996-12-20T12:00:00+00:00")) ) ) @@ -107,19 +107,19 @@ def test_history(golden, source_int64) -> None: golden.jsonl(query.to_pandas(kd.results.History())) golden.jsonl( query.to_pandas( - kd.results.History(since=datetime.fromisoformat("1996-12-19T16:39:59Z")) + kd.results.History(since=datetime.fromisoformat("1996-12-19T16:39:59+00:00")) ) ) golden.jsonl( query.to_pandas( - kd.results.History(until=datetime.fromisoformat("1996-12-20T12:00:00Z")) + kd.results.History(until=datetime.fromisoformat("1996-12-20T12:00:00+00:00")) ) ) golden.jsonl( query.to_pandas( kd.results.History( - since=datetime.fromisoformat("1996-12-19T16:39:59Z"), - until=datetime.fromisoformat("1996-12-20T12:00:00Z"), + since=datetime.fromisoformat("1996-12-19T16:39:59+00:00"), + until=datetime.fromisoformat("1996-12-20T12:00:00+00:00"), ) ) )