Skip to content

Commit

Permalink
feat: Expose DFG and query plans on Timestream (#780)
Browse files Browse the repository at this point in the history
This is useful for debugging and understanding how a query will be
executed.
  • Loading branch information
bjchambers authored Sep 29, 2023
1 parent e68a331 commit 3562f4c
Show file tree
Hide file tree
Showing 15 changed files with 938 additions and 655 deletions.
2 changes: 1 addition & 1 deletion crates/sparrow-compiler/src/dfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use sparrow_instructions::{InstKind, InstOp, Udf};
use sparrow_syntax::{FenlType, Location};
pub(crate) use step_kind::*;
type DfgGraph = egg::EGraph<language::DfgLang, analysis::DfgAnalysis>;
pub(super) use expr::DfgExpr;
pub use expr::DfgExpr;
pub(crate) use pattern::*;
use smallvec::smallvec;
use tracing::{info, info_span};
Expand Down
4 changes: 4 additions & 0 deletions crates/sparrow-compiler/src/dfg/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ impl DfgExpr {
self.expr.as_ref().len()
}

pub fn is_empty(&self) -> bool {
self.expr.as_ref().is_empty()
}

pub(super) fn add(&mut self, node: DfgLang) -> Id {
self.expr.add(node)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-compiler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ mod types;
pub use ast_to_dfg::*;
pub use compile::*;
pub use data_context::*;
pub use dfg::{remove_useless_transforms, Dfg};
pub use dfg::{remove_useless_transforms, Dfg, DfgExpr};
pub use diagnostics::*;
pub use error::*;
pub use frontend::*;
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-session/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ mod table;
pub use error::Error;
pub use execution::Execution;
pub use expr::{Expr, Literal};
pub use session::{ExecutionOptions, Results, Session};
pub use session::*;
pub use table::Table;
154 changes: 119 additions & 35 deletions crates/sparrow-session/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use sparrow_api::kaskada::v1alpha::execute_request::Limits;
use sparrow_api::kaskada::v1alpha::{
ComputeTable, FeatureSet, PerEntityBehavior, TableConfig, TableMetadata,
ComputePlan, ComputeTable, FeatureSet, PerEntityBehavior, TableConfig, TableMetadata,
};
use sparrow_compiler::{AstDfgRef, CompilerOptions, DataContext, Dfg, DiagnosticCollector};
use sparrow_instructions::{GroupId, Udf};
Expand Down Expand Up @@ -56,6 +56,13 @@ pub enum Results {
Snapshot,
}

#[derive(Debug, PartialEq, Eq)]
pub enum ExplanationKind {
InitialDfg,
FinalDfg,
FinalPlan,
}

#[derive(Default)]
pub struct ExecutionOptions {
/// The maximum number of rows to return.
Expand All @@ -76,6 +83,16 @@ pub struct ExecutionOptions {
pub final_at_time_s: Option<i64>,
}

impl ExecutionOptions {
pub fn per_entity_behavior(&self) -> PerEntityBehavior {
match self.results {
Results::History => PerEntityBehavior::All,
Results::Snapshot if self.final_at_time_s.is_some() => PerEntityBehavior::FinalAtTime,
Results::Snapshot => PerEntityBehavior::Final,
}
}
}

/// Adds a table to the session.
impl Session {
pub fn add_literal(&mut self, literal: Literal) -> error_stack::Result<Expr, Error> {
Expand Down Expand Up @@ -390,31 +407,17 @@ impl Session {
}
}

/// Execute the query.
/// Create the initial DFG.
///
/// It is unfortunate this requires `&mut self` instead of `&self`. It relates to the
/// fact that the decorations may require mutating the DFG, which in turn requires
/// mutability. In practice, the decorations shouldn't mutate the DFG and/or that
/// shouldn't require mutating the session.
pub fn execute(
fn compile_initial_dfg(
&mut self,
query: &Expr,
options: ExecutionOptions,
) -> error_stack::Result<Execution, Error> {
// TODO: Decorations?
let group_id = query
.0
.grouping()
.expect("query to be grouped (non-literal)");

let per_entity_behavior = match options.results {
Results::History => PerEntityBehavior::All,
Results::Snapshot if options.final_at_time_s.is_some() => {
PerEntityBehavior::FinalAtTime
}
Results::Snapshot => PerEntityBehavior::Final,
};

options: &ExecutionOptions,
) -> error_stack::Result<sparrow_compiler::DfgExpr, Error> {
// Apply decorations as necessary for the per-entity behavior.
let feature_set = FeatureSet::default();
let mut diagnostics = DiagnosticCollector::new(&feature_set);
Expand All @@ -424,48 +427,86 @@ impl Session {
&mut diagnostics,
true,
query.0.clone(),
per_entity_behavior,
options.per_entity_behavior(),
)
.into_report()
.change_context(Error::Compile)?;
error_stack::ensure!(diagnostics.num_errors() == 0, Error::internal());

// Extract the necessary subset of the DFG as an expression.
// This will allow us to operate without mutating things.
let expr = self.dfg.extract_simplest(expr);
let expr = expr
Ok(self.dfg.extract_simplest(expr))
}

fn optimize_dfg(
&self,
dfg: sparrow_compiler::DfgExpr,
) -> error_stack::Result<sparrow_compiler::DfgExpr, Error> {
let dfg = dfg
.simplify(&CompilerOptions {
..CompilerOptions::default()
})
.into_report()
.change_context(Error::Compile)?;
let expr = sparrow_compiler::remove_useless_transforms(expr)
let dfg = sparrow_compiler::remove_useless_transforms(dfg)
.into_report()
.change_context(Error::Compile)?;
Ok(dfg)
}

let primary_group_info = self
.data_context
.group_info(group_id)
.expect("missing group info");
fn extract_plan(
&self,
primary_group_info: &sparrow_compiler::GroupInfo,
dfg: sparrow_compiler::DfgExpr,
options: &ExecutionOptions,
) -> error_stack::Result<ComputePlan, Error> {
let primary_grouping = primary_group_info.name().to_owned();
let primary_grouping_key_type = primary_group_info.key_type();

// Hacky. Ideally, we'd determine the schema from the created execution plan.
// Currently, this isn't easily available. Instead, we create this from the
// columns we know we're producing.
let schema = result_schema(query, primary_grouping_key_type)?;

// TODO: Incremental?
// TODO: Slicing?
let plan = sparrow_compiler::plan::extract_plan_proto(
&self.data_context,
expr,
per_entity_behavior,
dfg,
options.per_entity_behavior(),
primary_grouping,
primary_grouping_key_type,
)
.into_report()
.change_context(Error::Compile)?;
Ok(plan)
}

/// Execute the query.
///
/// It is unfortunate this requires `&mut self` instead of `&self`. It relates to the
/// fact that the decorations may require mutating the DFG, which in turn requires
/// mutability. In practice, the decorations shouldn't mutate the DFG and/or that
/// shouldn't require mutating the session.
pub fn execute(
&mut self,
query: &Expr,
options: ExecutionOptions,
) -> error_stack::Result<Execution, Error> {
let expr = self.compile_initial_dfg(query, &options)?;
let expr = self.optimize_dfg(expr)?;

let group_id = query
.0
.grouping()
.expect("query to be grouped (non-literal)");

let primary_group_info = self
.data_context
.group_info(group_id)
.expect("missing group info");

let plan = self.extract_plan(primary_group_info, expr, &options)?;

// Hacky. Ideally, we'd determine the schema from the created execution plan.
// Currently, this isn't easily available. Instead, we create this from the
// columns we know we're producing.
let schema = result_schema(query, primary_group_info.key_type())?;

let (output_tx, output_rx) = tokio::sync::mpsc::channel(10);
let destination = Destination::Channel(output_tx);
Expand All @@ -482,7 +523,7 @@ impl Session {
.cloned()
.unwrap_or_else(|| {
Arc::new(ThreadSafeKeyHashInverse::from_data_type(
primary_grouping_key_type,
primary_group_info.key_type(),
))
});

Expand Down Expand Up @@ -511,6 +552,49 @@ impl Session {
schema,
))
}

pub fn explain(
&mut self,
kind: ExplanationKind,
query: &Expr,
options: ExecutionOptions,
) -> error_stack::Result<String, Error> {
match kind {
ExplanationKind::InitialDfg => {
let expr = self.compile_initial_dfg(query, &options)?;
expr.dot_string()
.into_report()
.change_context(Error::Compile)
}
ExplanationKind::FinalDfg => {
let expr = self.compile_initial_dfg(query, &options)?;
let expr = self.optimize_dfg(expr)?;
expr.dot_string()
.into_report()
.change_context(Error::Compile)
}
ExplanationKind::FinalPlan => {
let expr = self.compile_initial_dfg(query, &options)?;
let expr = self.optimize_dfg(expr)?;
let group_id = query
.0
.grouping()
.expect("query to be grouped (non-literal)");

let primary_group_info = self
.data_context
.group_info(group_id)
.expect("missing group info");

let plan = self.extract_plan(primary_group_info, expr, &options)?;
let mut bytes = Vec::new();
plan.write_to_graphviz(&mut bytes)
.into_report()
.change_context(Error::Compile)?;
Ok(String::from_utf8(bytes).expect("utf8"))
}
}
}
}

#[static_init::dynamic]
Expand Down
Loading

0 comments on commit 3562f4c

Please sign in to comment.