diff --git a/crates/sparrow-compiler/src/frontend.rs b/crates/sparrow-compiler/src/frontend.rs index 6f746e4b6..93239bef5 100644 --- a/crates/sparrow-compiler/src/frontend.rs +++ b/crates/sparrow-compiler/src/frontend.rs @@ -165,72 +165,14 @@ impl FrontendOutput { false }; - // Decorate the expression as needed for the query_type. - let result_node = match options.per_entity_behavior { - _ if !executable => { - // Don't decorate incomplete or otherwise non-executable expressions. - // We don't produce executable plans for incomplete expressions. - query.value() - } - PerEntityBehavior::All => { - dfg.enter_env(); - dfg.bind("result", query); - let time_node = create_changed_since_time_node(&mut dfg)?; - dfg.bind("__changed_since_time__", time_node); - - let decorated = add_decoration( - data_context, - &mut diagnostics, - &mut dfg, - CHANGED_SINCE_DECORATION, - )?; - dfg.exit_env(); - decorated.value() - } - PerEntityBehavior::Final => { - dfg.enter_env(); - dfg.bind("result", query); - - // Treat FINAL queries as changed_since_time of 0 - let time_node = create_changed_since_time_node(&mut dfg)?; - dfg.bind("__changed_since_time__", time_node); - - let decorated = add_decoration( - data_context, - &mut diagnostics, - &mut dfg, - FINAL_QUERY_DECORATION, - )?; - dfg.exit_env(); - decorated.value() - } - PerEntityBehavior::FinalAtTime => { - dfg.enter_env(); - dfg.bind("result", query); - - // Treat FINAL queries as changed_since_time of 0 - let time_node = create_changed_since_time_node(&mut dfg)?; - dfg.bind("__changed_since_time__", time_node); - - let time_node = create_final_at_time_time_node(&mut dfg)?; - dfg.bind("__final_at_time__", time_node); - - // 1. If the final query time is provided then use it as the query final time in - // the special decorator 2. Use the same per entity behavior - // final for all of them - let decorated = add_decoration( - data_context, - &mut diagnostics, - &mut dfg, - FINAL_QUERY_AT_TIME_DECORATION, - )?; - dfg.exit_env(); - decorated.value() - } - PerEntityBehavior::Unspecified => { - anyhow::bail!("Unspecified per entity behavior") - } - }; + let result_node = decorate( + data_context, + &mut dfg, + &mut diagnostics, + executable, + query, + options.per_entity_behavior, + )?; // Create the basic analysis information. let num_errors = diagnostics.num_errors(); @@ -320,6 +262,71 @@ impl FrontendOutput { } } +pub fn decorate( + data_context: &mut DataContext, + dfg: &mut Dfg, + diagnostics: &mut DiagnosticCollector<'_>, + executable: bool, + query: AstDfgRef, + per_entity_behavior: PerEntityBehavior, +) -> anyhow::Result { + match per_entity_behavior { + _ if !executable => { + // Don't decorate incomplete or otherwise non-executable expressions. + // We don't produce executable plans for incomplete expressions. + Ok(query.value()) + } + PerEntityBehavior::All => { + dfg.enter_env(); + dfg.bind("result", query); + let time_node = create_changed_since_time_node(dfg)?; + dfg.bind("__changed_since_time__", time_node); + let decorated = + add_decoration(data_context, diagnostics, dfg, CHANGED_SINCE_DECORATION)?; + dfg.exit_env(); + Ok(decorated.value()) + } + PerEntityBehavior::Final => { + dfg.enter_env(); + dfg.bind("result", query); + + // Treat FINAL queries as changed_since_time of 0 + let time_node = create_changed_since_time_node(dfg)?; + dfg.bind("__changed_since_time__", time_node); + + let decorated = add_decoration(data_context, diagnostics, dfg, FINAL_QUERY_DECORATION)?; + dfg.exit_env(); + Ok(decorated.value()) + } + PerEntityBehavior::FinalAtTime => { + dfg.enter_env(); + dfg.bind("result", query); + + // Treat FINAL queries as changed_since_time of 0 + let time_node = create_changed_since_time_node(dfg)?; + dfg.bind("__changed_since_time__", time_node); + + let time_node = create_final_at_time_time_node(dfg)?; + dfg.bind("__final_at_time__", time_node); + + // 1. If the final query time is provided then use it as the query final time in + // the special decorator 2. Use the same per entity behavior + // final for all of them + let decorated = add_decoration( + data_context, + diagnostics, + dfg, + FINAL_QUERY_AT_TIME_DECORATION, + )?; + dfg.exit_env(); + Ok(decorated.value()) + } + PerEntityBehavior::Unspecified => { + anyhow::bail!("Unspecified per entity behavior") + } + } +} + /// Adds the given decoration to the dfg. fn add_decoration( data_context: &mut DataContext, diff --git a/crates/sparrow-runtime/src/execute.rs b/crates/sparrow-runtime/src/execute.rs index 37b8f33ef..08dea7baf 100644 --- a/crates/sparrow-runtime/src/execute.rs +++ b/crates/sparrow-runtime/src/execute.rs @@ -118,10 +118,18 @@ impl ExecutionOptions { self.changed_since_time = changed_since; } + pub fn set_changed_since_s(&mut self, seconds: i64) { + self.changed_since_time = Timestamp { seconds, nanos: 0 }; + } + pub fn set_final_at_time(&mut self, final_at_time: Timestamp) { self.final_at_time = Some(final_at_time); } + pub fn set_final_at_time_s(&mut self, seconds: i64) { + self.final_at_time = Some(Timestamp { seconds, nanos: 0 }); + } + async fn compute_store( &self, object_stores: &ObjectStoreRegistry, diff --git a/crates/sparrow-runtime/src/execute/operation/expression_executor.rs b/crates/sparrow-runtime/src/execute/operation/expression_executor.rs index 776821808..87bd35846 100644 --- a/crates/sparrow-runtime/src/execute/operation/expression_executor.rs +++ b/crates/sparrow-runtime/src/execute/operation/expression_executor.rs @@ -169,7 +169,7 @@ impl ExpressionExecutor { LateBoundValue::from_i32(late_bound).context("late bound value")?; let literal = late_bindings[late_bound] .as_ref() - .context("missing late bound value")? + .with_context(|| format!("missing late bound value {late_bound:?}"))? .clone(); anyhow::ensure!( diff --git a/crates/sparrow-session/src/lib.rs b/crates/sparrow-session/src/lib.rs index bbc72adef..3dcc74d27 100644 --- a/crates/sparrow-session/src/lib.rs +++ b/crates/sparrow-session/src/lib.rs @@ -7,5 +7,5 @@ mod table; pub use error::Error; pub use execution::Execution; pub use expr::{Expr, Literal}; -pub use session::{ExecutionOptions, Session}; +pub use session::{ExecutionOptions, Results, Session}; pub use table::Table; diff --git a/crates/sparrow-session/src/session.rs b/crates/sparrow-session/src/session.rs index ad11ce81f..75ac002ea 100644 --- a/crates/sparrow-session/src/session.rs +++ b/crates/sparrow-session/src/session.rs @@ -34,6 +34,13 @@ pub struct Session { udfs: HashMap>, } +#[derive(Default)] +pub enum Results { + #[default] + History, + Snapshot, +} + #[derive(Default)] pub struct ExecutionOptions { /// The maximum number of rows to return. @@ -42,6 +49,16 @@ pub struct ExecutionOptions { pub max_batch_size: Option, /// Whether to run execute as a materialization or not. pub materialize: bool, + /// History or Snapshot results. + pub results: Results, + /// The changed since time. This is the minimum timestamp of changes to events. + /// For historic queries, this limits the output points. + /// For snapshot queries, this limits the set of entities that are considered changed. + pub changed_since_time_s: Option, + /// The final at time. This is the maximum timestamp output. + /// For historic queries, this limits the output points. + /// For snapshot queries, this determines the time at which the snapshot is produced. + pub final_at_time_s: Option, } /// Adds a table to the session. @@ -345,31 +362,49 @@ impl Session { } } + /// Execute the query. + /// + /// It is unnfortunate this requires `&mut self` instead of `&self`. It relates to the + /// fact that the decorations may require mutating the DFG, which in turn requires + /// mutability. In practice, the decorations shouldn't mutate the DFG and/or that + /// shouldn't require mutating the session. pub fn execute( - &self, - expr: &Expr, + &mut self, + query: &Expr, options: ExecutionOptions, ) -> error_stack::Result { // TODO: Decorations? - let group_id = expr + let group_id = query .0 .grouping() .expect("query to be grouped (non-literal)"); - let primary_group_info = self - .data_context - .group_info(group_id) - .expect("missing group info"); - let primary_grouping = primary_group_info.name().to_owned(); - let primary_grouping_key_type = primary_group_info.key_type(); - // Hacky. Ideally, we'd determine the schema from the created execution plan. - // Currently, this isn't easily available. Instead, we create this from the - // columns we know we're producing. - let schema = result_schema(expr, primary_grouping_key_type)?; + let per_entity_behavior = match options.results { + Results::History => PerEntityBehavior::All, + Results::Snapshot if options.final_at_time_s.is_some() => { + PerEntityBehavior::FinalAtTime + } + Results::Snapshot => PerEntityBehavior::Final, + }; + + // Apply decorations as necessary for the per-entity behavior. + let feature_set = FeatureSet::default(); + let mut diagnostics = DiagnosticCollector::new(&feature_set); + let expr = sparrow_compiler::decorate( + &mut self.data_context, + &mut self.dfg, + &mut diagnostics, + true, + query.0.clone(), + per_entity_behavior, + ) + .into_report() + .change_context(Error::Compile)?; + error_stack::ensure!(diagnostics.num_errors() == 0, Error::Internal); - // First, extract the necessary subset of the DFG as an expression. + // Extract the necessary subset of the DFG as an expression. // This will allow us to operate without mutating things. - let expr = self.dfg.extract_simplest(expr.0.value()); + let expr = self.dfg.extract_simplest(expr); let expr = expr .simplify(&CompilerOptions { ..CompilerOptions::default() @@ -380,13 +415,24 @@ impl Session { .into_report() .change_context(Error::Compile)?; + let primary_group_info = self + .data_context + .group_info(group_id) + .expect("missing group info"); + let primary_grouping = primary_group_info.name().to_owned(); + let primary_grouping_key_type = primary_group_info.key_type(); + + // Hacky. Ideally, we'd determine the schema from the created execution plan. + // Currently, this isn't easily available. Instead, we create this from the + // columns we know we're producing. + let schema = result_schema(query, primary_grouping_key_type)?; + // TODO: Incremental? // TODO: Slicing? let plan = sparrow_compiler::plan::extract_plan_proto( &self.data_context, expr, - // TODO: Configure per-entity behavior. - PerEntityBehavior::Final, + per_entity_behavior, primary_grouping, primary_grouping_key_type, ) @@ -475,6 +521,13 @@ impl ExecutionOptions { }); } + if let Some(changed_since) = self.changed_since_time_s { + options.set_changed_since_s(changed_since); + } + if let Some(final_at_time) = self.final_at_time_s { + options.set_final_at_time_s(final_at_time); + } + options } } diff --git a/python/pysrc/kaskada/__init__.py b/python/pysrc/kaskada/__init__.py index 6e15c05b6..37d1d2dff 100644 --- a/python/pysrc/kaskada/__init__.py +++ b/python/pysrc/kaskada/__init__.py @@ -1,7 +1,7 @@ """Kaskada query builder and local execution engine.""" from __future__ import annotations -from . import destinations, plot, sources, windows +from . import destinations, plot, results, sources, windows from ._execution import Execution, ResultIterator from ._session import init_session from ._timestream import Arg, LiteralValue, Timestream, record diff --git a/python/pysrc/kaskada/_execution.py b/python/pysrc/kaskada/_execution.py index 20f26149b..15c13dc1d 100644 --- a/python/pysrc/kaskada/_execution.py +++ b/python/pysrc/kaskada/_execution.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import AsyncIterator, Callable, Iterator, Optional, TypeVar +from typing import AsyncIterator, Callable, Iterator, Literal, Optional, TypeVar import pyarrow as pa @@ -11,19 +11,33 @@ @dataclass class _ExecutionOptions: - """Execution options passed to the FFI layer. - - Attributes: - row_limit: The maximum number of rows to return. If not specified, all rows are returned. - max_batch_size: The maximum batch size to use when returning results. - If not specified, the default batch size will be used. - materialize: If true, the query will be a continuous materialization. - """ + """Execution options passed to the FFI layer.""" + #: The maximum number of rows to return. + #: If not specified, all rows are returned. row_limit: Optional[int] = None + + #: The maximum batch size to use when returning results. + #: If not specified, the default batch size will be used. max_batch_size: Optional[int] = None + + #: If true, the query will be a continuous materialization. materialize: bool = False + #: The type of results to return. + results: Literal["history", "snapshot"] = "history" + + #: The earliest time of changes to include in the results. + #: For history, this limits the points output. + #: For snapshots, this limits the entities that are output. + changed_since: Optional[int] = None + + #: The last time to process. + #: If not set, defaults to the current time. + #: For history, this limits the points output. + #: For snapshots, this determines when the snapshot is produced. + final_at: Optional[int] = None + class Execution: """Represents an execution of a TimeStream.""" diff --git a/python/pysrc/kaskada/_timestream.py b/python/pysrc/kaskada/_timestream.py index eebe4c698..5541e4cbb 100644 --- a/python/pysrc/kaskada/_timestream.py +++ b/python/pysrc/kaskada/_timestream.py @@ -1084,6 +1084,24 @@ def _execute( max_batch_size=max_batch_size, materialize=mode == "live", ) + + if results is None: + results = kd.results.History() + + if isinstance(results, kd.results.History): + options.results = "history" + if results.since is not None: + options.changed_since = int(results.since.timestamp()) + if results.until is not None: + options.final_at = int(results.until.timestamp()) + elif isinstance(results, kd.results.Snapshot): + options.results = "snapshot" + if results.changed_since is not None: + options.changed_since = int(results.changed_since.timestamp()) + if results.at is not None: + options.final_at = int(results.at.timestamp()) + else: + raise AssertionError(f"Unhandled results type {results!r}") return expr._ffi_expr.execute(options) diff --git a/python/pytests/conftest.py b/python/pytests/conftest.py index 7dd0e2571..b2bf14f14 100644 --- a/python/pytests/conftest.py +++ b/python/pytests/conftest.py @@ -45,6 +45,10 @@ def jsonl(self, data: Union[kd.Timestream, pd.DataFrame]) -> None: date_unit="ns", ) + if golden.empty and df.empty: + # For some reason, even when specifying the dtypes, reading an empty + # json produces no columns. + return pd.testing.assert_frame_equal(df, golden, check_datetimelike_compat=True) def parquet(self, data: Union[kd.Timestream, pd.DataFrame]) -> None: diff --git a/python/pytests/result_test.py b/python/pytests/execution_test.py similarity index 67% rename from python/pytests/result_test.py rename to python/pytests/execution_test.py index b1671e550..85e07ac62 100644 --- a/python/pytests/result_test.py +++ b/python/pytests/execution_test.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import AsyncIterator, Iterator import kaskada as kd @@ -82,3 +83,43 @@ async def test_iter_pandas_async_live(golden, source_int64) -> None: execution.stop() with pytest.raises(StopAsyncIteration): print(await execution.__anext__()) + + +def test_snapshot(golden, source_int64) -> None: + query = source_int64.col("m").sum() + golden.jsonl(query.to_pandas(kd.results.Snapshot())) + golden.jsonl( + query.to_pandas( + kd.results.Snapshot( + changed_since=datetime.fromisoformat("1996-12-19T16:39:59+00:00") + ) + ) + ) + golden.jsonl( + query.to_pandas( + kd.results.Snapshot(at=datetime.fromisoformat("1996-12-20T12:00:00+00:00")) + ) + ) + + +def test_history(golden, source_int64) -> None: + query = source_int64.col("m").sum() + golden.jsonl(query.to_pandas(kd.results.History())) + golden.jsonl( + query.to_pandas( + kd.results.History(since=datetime.fromisoformat("1996-12-19T16:39:59+00:00")) + ) + ) + golden.jsonl( + query.to_pandas( + kd.results.History(until=datetime.fromisoformat("1996-12-20T12:00:00+00:00")) + ) + ) + golden.jsonl( + query.to_pandas( + kd.results.History( + since=datetime.fromisoformat("1996-12-19T16:39:59+00:00"), + until=datetime.fromisoformat("1996-12-20T12:00:00+00:00"), + ) + ) + ) diff --git a/python/pytests/golden/execution_test/test_history.jsonl b/python/pytests/golden/execution_test/test_history.jsonl new file mode 100644 index 000000000..735353c8b --- /dev/null +++ b/python/pytests/golden/execution_test/test_history.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","result":5} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","result":24} +{"_time":"1996-12-19T16:39:59.000000000","_key":"A","result":22} +{"_time":"1996-12-19T16:40:00.000000000","_key":"A","result":22} +{"_time":"1996-12-19T16:40:01.000000000","_key":"A","result":34} +{"_time":"1996-12-19T16:40:02.000000000","_key":"A","result":34} diff --git a/python/pytests/golden/execution_test/test_history_1.jsonl b/python/pytests/golden/execution_test/test_history_1.jsonl new file mode 100644 index 000000000..abaecf535 --- /dev/null +++ b/python/pytests/golden/execution_test/test_history_1.jsonl @@ -0,0 +1,4 @@ +{"_time":"1996-12-19T16:39:59.000000000","_key":"A","result":22} +{"_time":"1996-12-19T16:40:00.000000000","_key":"A","result":22} +{"_time":"1996-12-19T16:40:01.000000000","_key":"A","result":34} +{"_time":"1996-12-19T16:40:02.000000000","_key":"A","result":34} diff --git a/python/pytests/golden/execution_test/test_history_2.jsonl b/python/pytests/golden/execution_test/test_history_2.jsonl new file mode 100644 index 000000000..735353c8b --- /dev/null +++ b/python/pytests/golden/execution_test/test_history_2.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","result":5} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","result":24} +{"_time":"1996-12-19T16:39:59.000000000","_key":"A","result":22} +{"_time":"1996-12-19T16:40:00.000000000","_key":"A","result":22} +{"_time":"1996-12-19T16:40:01.000000000","_key":"A","result":34} +{"_time":"1996-12-19T16:40:02.000000000","_key":"A","result":34} diff --git a/python/pytests/golden/execution_test/test_history_3.jsonl b/python/pytests/golden/execution_test/test_history_3.jsonl new file mode 100644 index 000000000..abaecf535 --- /dev/null +++ b/python/pytests/golden/execution_test/test_history_3.jsonl @@ -0,0 +1,4 @@ +{"_time":"1996-12-19T16:39:59.000000000","_key":"A","result":22} +{"_time":"1996-12-19T16:40:00.000000000","_key":"A","result":22} +{"_time":"1996-12-19T16:40:01.000000000","_key":"A","result":34} +{"_time":"1996-12-19T16:40:02.000000000","_key":"A","result":34} diff --git a/python/pytests/golden/result_test/test_iter_pandas.jsonl b/python/pytests/golden/execution_test/test_iter_pandas.jsonl similarity index 100% rename from python/pytests/golden/result_test/test_iter_pandas.jsonl rename to python/pytests/golden/execution_test/test_iter_pandas.jsonl diff --git a/python/pytests/golden/result_test/test_iter_pandas_1.jsonl b/python/pytests/golden/execution_test/test_iter_pandas_1.jsonl similarity index 100% rename from python/pytests/golden/result_test/test_iter_pandas_1.jsonl rename to python/pytests/golden/execution_test/test_iter_pandas_1.jsonl diff --git a/python/pytests/golden/result_test/test_iter_pandas_async.jsonl b/python/pytests/golden/execution_test/test_iter_pandas_async.jsonl similarity index 100% rename from python/pytests/golden/result_test/test_iter_pandas_async.jsonl rename to python/pytests/golden/execution_test/test_iter_pandas_async.jsonl diff --git a/python/pytests/golden/result_test/test_iter_pandas_async_1.jsonl b/python/pytests/golden/execution_test/test_iter_pandas_async_1.jsonl similarity index 100% rename from python/pytests/golden/result_test/test_iter_pandas_async_1.jsonl rename to python/pytests/golden/execution_test/test_iter_pandas_async_1.jsonl diff --git a/python/pytests/golden/result_test/test_iter_pandas_async_live.jsonl b/python/pytests/golden/execution_test/test_iter_pandas_async_live.jsonl similarity index 100% rename from python/pytests/golden/result_test/test_iter_pandas_async_live.jsonl rename to python/pytests/golden/execution_test/test_iter_pandas_async_live.jsonl diff --git a/python/pytests/golden/result_test/test_iter_pandas_async_live_1.jsonl b/python/pytests/golden/execution_test/test_iter_pandas_async_live_1.jsonl similarity index 100% rename from python/pytests/golden/result_test/test_iter_pandas_async_live_1.jsonl rename to python/pytests/golden/execution_test/test_iter_pandas_async_live_1.jsonl diff --git a/python/pytests/golden/execution_test/test_snapshot.jsonl b/python/pytests/golden/execution_test/test_snapshot.jsonl new file mode 100644 index 000000000..f65f104b2 --- /dev/null +++ b/python/pytests/golden/execution_test/test_snapshot.jsonl @@ -0,0 +1,2 @@ +{"_time":"1996-12-19T16:40:02.000000001","_key":"B","result":24} +{"_time":"1996-12-19T16:40:02.000000001","_key":"A","result":34} diff --git a/python/pytests/golden/execution_test/test_snapshot_1.jsonl b/python/pytests/golden/execution_test/test_snapshot_1.jsonl new file mode 100644 index 000000000..ec2f648b1 --- /dev/null +++ b/python/pytests/golden/execution_test/test_snapshot_1.jsonl @@ -0,0 +1 @@ +{"_time":"1996-12-19T16:40:02.000000001","_key":"A","result":34} diff --git a/python/pytests/golden/execution_test/test_snapshot_2.jsonl b/python/pytests/golden/execution_test/test_snapshot_2.jsonl new file mode 100644 index 000000000..9ea5bdd8f --- /dev/null +++ b/python/pytests/golden/execution_test/test_snapshot_2.jsonl @@ -0,0 +1,2 @@ +{"_time":"1996-12-20T12:00:00.000000001","_key":"B","result":24} +{"_time":"1996-12-20T12:00:00.000000001","_key":"A","result":34} diff --git a/python/src/expr.rs b/python/src/expr.rs index 5c427305d..c706ea8ca 100644 --- a/python/src/expr.rs +++ b/python/src/expr.rs @@ -109,7 +109,7 @@ impl Expr { } fn execute(&self, options: Option<&PyAny>) -> Result { - let session = self.session.rust_session()?; + let mut session = self.session.rust_session()?; let options = extract_options(options)?; let execution = session.execute(&self.rust_expr, options)?; Ok(Execution::new(execution)) @@ -167,11 +167,28 @@ fn extract_options(options: Option<&PyAny>) -> Result sparrow_session::Results::History, + "snapshot" => sparrow_session::Results::Snapshot, + invalid => { + return Err( + PyValueError::new_err(format!("invalid results '{invalid}'")).into(), + ) + } + }; Ok(sparrow_session::ExecutionOptions { row_limit: options.getattr(row_limit)?.extract()?, max_batch_size: options.getattr(max_batch_size)?.extract()?, materialize: options.getattr(materialize)?.extract()?, + results, + changed_since_time_s: options.getattr(changed_since_time)?.extract()?, + final_at_time_s: options.getattr(final_at_time)?.extract()?, }) } }