Skip to content

Commit

Permalink
Merge pull request #86 from kaskada-ai/output-at-time
Browse files Browse the repository at this point in the history
feat: produce output at user-defined time
  • Loading branch information
jordanrfrazier authored Mar 8, 2023
2 parents 0f5937e + a0785a1 commit 3906b76
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 99 deletions.
17 changes: 12 additions & 5 deletions clients/python/src/fenlmagic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,18 @@ def fenl(self, arg, cell=None):
preview_rows = clean_arg(args.preview_rows)
var = clean_arg(args.var)

result_behavior = (
"final-results"
if test_arg(clean_arg(args.result_behavior), "final-results")
else "all-results"
)
final_result_time = clean_arg(args.final_time)
result_behavior = clean_arg(args.result_behavior)

# Assert the result behaviors are valid
if final_result_time is not None:
if result_behavior is not None and not test_arg(
result_behavior, "final-results"
):
raise UsageError(
'--final-time must be used with result-behavior "final-results"'
)
result_behavior = "final-results"

if cell is None:
expression = arg
Expand Down
2 changes: 2 additions & 0 deletions clients/python/src/kaskada/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ def create_query(
No actual computation of results is performed.
changed_since_time (datetime.datetime, optional):
Time bound (inclusive) after which results will be output.
final_result_time (Union[str, datetime.datetime], optional):
Time bound (inclusive) at which results will be output.
limits (pb.QueryRequest.Limits, optional):
Configure limits on the output set.
slice_filter (SliceFilter, optional):
Expand Down
20 changes: 15 additions & 5 deletions crates/sparrow-main/tests/e2e/decoration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,16 +199,15 @@ async fn test_sum_i64_final_at_time() {
let datetime = NaiveDateTime::new(date_for_test(1996, 12, 20), time_for_test(0, 39, 58));
insta::assert_snapshot!(QueryFixture::new("{ sum_field: sum(Numbers.m) }").with_final_results_at_time(datetime).run_to_csv(&i64_data_fixture()).await.unwrap(), @r###"
_time,_subsort,_key_hash,_key,sum_field
1996-12-20T00:40:02.000000001,18446744073709551615,3650215962958587783,A,5
1996-12-20T00:40:02.000000001,18446744073709551615,11753611437813598533,B,24
1996-12-20T00:39:58.000000001,18446744073709551615,3650215962958587783,A,5
1996-12-20T00:39:58.000000001,18446744073709551615,11753611437813598533,B,24
"###);
}

#[tokio::test]
async fn test_sum_i64_all_filtered_final_at_time() {
let datetime = NaiveDateTime::new(date_for_test(1970, 12, 20), time_for_test(0, 39, 58));
insta::assert_snapshot!(QueryFixture::new("{ sum_field: sum(Numbers.m) }").with_final_results_at_time(datetime).run_to_csv(&i64_data_fixture()).await.unwrap(), @"_time,_subsort,_key_hash,_key,sum_field
");
insta::assert_snapshot!(QueryFixture::new("{ sum_field: sum(Numbers.m) }").with_final_results_at_time(datetime).run_to_csv(&i64_data_fixture()).await.unwrap(), @"");
}

fn shift_data_fixture_at_time() -> DataFixture {
Expand Down Expand Up @@ -256,6 +255,17 @@ async fn test_last_timestamp_ns_changed_since_with_final_at_time() {
let final_time = NaiveDateTime::new(date_for_test(2000, 1, 1), time_for_test(0, 0, 0));
insta::assert_snapshot!(QueryFixture::new("{ last: last(Times.n) }").with_changed_since(changed_since).with_final_results_at_time(final_time).run_to_csv(&timestamp_ns_data_fixture()).await.unwrap(), @r###"
_time,_subsort,_key_hash,_key,last
2004-12-06T00:44:57.000000001,18446744073709551615,11753611437813598533,B,8
2000-01-01T00:00:00.000000001,18446744073709551615,11753611437813598533,B,8
"###);
}

#[tokio::test]
async fn test_final_at_time_past_input_times() {
// Expect rows to be produced at the final time, even if it's past the input times
let final_time = NaiveDateTime::new(date_for_test(2020, 1, 1), time_for_test(0, 0, 0));
insta::assert_snapshot!(QueryFixture::new("{ last: last(Times.n) }").with_final_results_at_time(final_time).run_to_csv(&timestamp_ns_data_fixture()).await.unwrap(), @r###"
_time,_subsort,_key_hash,_key,last
2020-01-01T00:00:00.000000001,18446744073709551615,3650215962958587783,A,2
2020-01-01T00:00:00.000000001,18446744073709551615,11753611437813598533,B,23
"###);
}
36 changes: 25 additions & 11 deletions crates/sparrow-runtime/src/execute.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use chrono::NaiveDateTime;
use error_stack::{IntoReport, IntoReportCompat, ResultExt};
use futures::Stream;
use prost_wkt_types::Timestamp;
Expand Down Expand Up @@ -66,16 +67,17 @@ pub async fn execute(
None,
));

let query_final_time = request.final_result_time.unwrap_or(Timestamp {
seconds: 0,
nanos: 0,
});

late_bindings[LateBoundValue::FinalAtTime] = Some(ScalarValue::timestamp(
query_final_time.seconds,
query_final_time.nanos,
None,
));
let output_at_time = if let Some(output_at_time) = request.final_result_time {
late_bindings[LateBoundValue::FinalAtTime] = Some(ScalarValue::timestamp(
output_at_time.seconds,
output_at_time.nanos,
None,
));
Some(output_at_time)
} else {
late_bindings[LateBoundValue::FinalAtTime] = None;
None
};

let mut data_context = DataContext::try_from_tables(request.tables.to_vec())
.into_report()
Expand Down Expand Up @@ -136,7 +138,9 @@ pub async fn execute(
nanos: i32::MAX,
}
}
PerEntityBehavior::FinalAtTime => query_final_time.clone(),
PerEntityBehavior::FinalAtTime => {
output_at_time.as_ref().expect("final at time").clone()
}
};

Some(
Expand Down Expand Up @@ -178,6 +182,15 @@ pub async fn execute(
let (progress_updates_tx, progress_updates_rx) =
tokio::sync::mpsc::channel(29.max(plan.operations.len() * 2));

let output_datetime = if let Some(t) = output_at_time {
Some(
NaiveDateTime::from_timestamp_opt(t.seconds, t.nanos as u32)
.ok_or(Error::internal_msg("expected valid timestamp"))?,
)
} else {
None
};

// We use the plan hash for validating the snapshot is as expected.
// Rather than accepting it as input (which could lead to us getting
// a correct hash but an incorrect plan) we re-hash the plan.
Expand All @@ -190,6 +203,7 @@ pub async fn execute(
key_hash_inverse,
max_event_in_snapshot: None,
progress_updates_tx,
output_at_time: output_datetime,
};

// Start executing the query. We pass the response channel to the
Expand Down
6 changes: 5 additions & 1 deletion crates/sparrow-runtime/src/execute/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ pub(crate) struct OperationContext {
/// Channel for sending progress updates.
pub progress_updates_tx:
tokio::sync::mpsc::Sender<crate::execute::progress_reporter::ProgressUpdate>,
/// The time to produce a final tick at.
///
/// If set, the user supplied a specific time to produce values at.
pub output_at_time: Option<NaiveDateTime>,
}

impl OperationContext {
Expand Down Expand Up @@ -391,7 +395,7 @@ fn create_operation(
),
operation_plan::Operator::Tick(tick_operation) => {
if matches!(tick_operation.behavior(), TickBehavior::Finished) {
FinalTickOperation::create(incoming_channels, input_columns)
FinalTickOperation::create(context, incoming_channels, input_columns)
} else {
TickOperation::create(tick_operation, incoming_channels, input_columns)
}
Expand Down
Loading

0 comments on commit 3906b76

Please sign in to comment.