Skip to content

Commit

Permalink
feat: introducing physical plans
Browse files Browse the repository at this point in the history
This is part of #409.
  • Loading branch information
bjchambers committed Jun 1, 2023
1 parent 857f206 commit 28bf38c
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 8 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ aws-config = "0.11.0"
aws-sdk-s3 = "0.11.0"
pulsar = { version = "5.1.0", default-features = false, features = ["async-std-runtime", "tokio-runtime", "lz4"] }
aws-types = "0.11.0"
bigdecimal = "0.3.0"
bigdecimal = { vernsion ="0.3.1", features = ["serde"] }
bincode = "1.3.3"
bit-set = "0.5.3"
bitvec = { version = "1.0.1", features = ["serde"] }
Expand Down
1 change: 1 addition & 0 deletions crates/sparrow-physical/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Physical execution plans for Kaskada queries.

[dependencies]
arrow-schema.workspace = true
bigdecimal.workspace = true
enum-as-inner.workspace = true
serde.workspace = true

Expand Down
22 changes: 18 additions & 4 deletions crates/sparrow-physical/src/expr.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
use std::borrow::Cow;

use arrow_schema::DataType;

/// A physical expression which describes how a value should be computed.
///
/// Generally, each expression computes a column of values from zero or more
/// input columns. Expressions appear in a variety of places within the steps
/// that make up a physical plan.
#[derive(Debug, serde::Serialize, serde::Deserialize)]

pub struct Expr {
Expand Down Expand Up @@ -27,11 +34,18 @@ pub struct Expr {
#[serde(rename_all = "snake_case")]
pub enum ExprKind {
/// Apply the named instruction to the given children.
Call(String),
Call(Cow<'static, str>),
/// Reference an input column by name.
Column(String),
// A scalar value.
// Scalar(Scalar),
/// A boolean literal.
BooleanLiteral(bool),
/// A string literal.
StringLiteral(String),
/// A numeric literal.
///
/// Other primitive literals (such as date times) may be expressed
/// using numeric literlas with an appropriate datatype.
NumericLiteral(bigdecimal::BigDecimal),
}

#[cfg(test)]
Expand All @@ -52,7 +66,7 @@ mod tests {
result_type: DataType::Int32,
},
Expr {
kind: ExprKind::Call("add".to_string()),
kind: ExprKind::Call("add".into()),
children: vec![0, 1],
result_type: DataType::Int32,
},
Expand Down
2 changes: 2 additions & 0 deletions crates/sparrow-physical/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

mod expr;
mod plan;
mod step;

pub use expr::*;
pub use plan::*;
pub use step::*;
12 changes: 12 additions & 0 deletions crates/sparrow-physical/src/plan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use crate::Step;

/// A plan is a directed, acyclic graph of steps.
///
/// The plan is represented as an array of steps, with each step referencing
/// it's children (inputs) by index. The array is topologically sorted so that
/// all children have indices less than the step that references them.
#[derive(Debug)]
pub struct Plan {
/// The steps in the plan.
pub steps: Vec<Step>,
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
---
source: crates/sparrow-physical/src/expr.rs
assertion_line: 63
expression: yaml
---
- kind: !column foo
Expand Down
59 changes: 59 additions & 0 deletions crates/sparrow-physical/src/step.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use arrow_schema::SchemaRef;

#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[repr(transparent)]
#[serde(transparent)]
pub struct StepId(usize);

/// A single step in the physical plan.
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Step {
pub id: StepId,
/// The kind of step being performed.
pub kind: StepKind,
/// Steps which act as input (children) of this step.
pub children: Vec<StepId>,
/// The schema for this step.
pub schema: SchemaRef,
}

/// The kinds of stesp that can occur in the physical plan.
#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]

pub enum StepKind {
/// Scan the given table.
Scan {
table_name: String,
},
/// Merge the given relations.
Merge,
/// Apply a projection to adjust columns in the table.
///
/// The output includes the same rows as the input, but with columns
/// projected as configured.
Project {
/// Expressions to apply to compute additional input columns.
exprs: Vec<crate::Expr>,
/// Indices of expressions to use for the output.
///
/// The length should be the same as the number of fields in the schema.
outputs: Vec<usize>,
},
/// Filter the results based on a boolean predicate.
Filter {
/// Expressions to apply to compute the predicate.
///
/// The last expression should be the boolean predicate.
exprs: Vec<crate::Expr>,
},
/// A step that repartitions the output.
Repartition {
num_partitions: usize,
/// Expressions to apply to compute columns which may be referenced by `keys`.
exprs: Vec<crate::Expr>,
/// Indices of expression columns representing the keys.
keys: Vec<usize>,
},
Error,
}

0 comments on commit 28bf38c

Please sign in to comment.