diff --git a/Cargo.lock b/Cargo.lock index 5462b4572..087944bfa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1026,13 +1026,14 @@ checksum = "3a8241f3ebb85c056b509d4327ad0358fbbba6ffb340bf388f26350aeda225b1" [[package]] name = "bigdecimal" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6aaf33151a6429fe9211d1b276eafdf70cdff28b071e76c0b0e1503221ea3744" +checksum = "a6773ddc0eafc0e509fb60e48dff7f450f8e674a0686ae8605e8d9901bd5eefa" dependencies = [ "num-bigint 0.4.3", "num-integer", "num-traits", + "serde", ] [[package]] @@ -4819,6 +4820,7 @@ name = "sparrow-physical" version = "0.6.4" dependencies = [ "arrow-schema", + "bigdecimal", "enum-as-inner", "insta", "serde", diff --git a/Cargo.toml b/Cargo.toml index 514617f55..9adcc1f4c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ aws-config = "0.11.0" aws-sdk-s3 = "0.11.0" pulsar = { version = "5.1.0", default-features = false, features = ["async-std-runtime", "tokio-runtime", "lz4"] } aws-types = "0.11.0" -bigdecimal = "0.3.0" +bigdecimal = { vernsion ="0.3.1", features = ["serde"] } bincode = "1.3.3" bit-set = "0.5.3" bitvec = { version = "1.0.1", features = ["serde"] } diff --git a/crates/sparrow-physical/Cargo.toml b/crates/sparrow-physical/Cargo.toml index c6b56ed04..828f4406f 100644 --- a/crates/sparrow-physical/Cargo.toml +++ b/crates/sparrow-physical/Cargo.toml @@ -11,6 +11,7 @@ Physical execution plans for Kaskada queries. [dependencies] arrow-schema.workspace = true +bigdecimal.workspace = true enum-as-inner.workspace = true serde.workspace = true diff --git a/crates/sparrow-physical/src/expr.rs b/crates/sparrow-physical/src/expr.rs index b47726021..2940ef245 100644 --- a/crates/sparrow-physical/src/expr.rs +++ b/crates/sparrow-physical/src/expr.rs @@ -1,5 +1,12 @@ +use std::borrow::Cow; + use arrow_schema::DataType; +/// A physical expression which describes how a value should be computed. +/// +/// Generally, each expression computes a column of values from zero or more +/// input columns. Expressions appear in a variety of places within the steps +/// that make up a physical plan. #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct Expr { @@ -27,11 +34,18 @@ pub struct Expr { #[serde(rename_all = "snake_case")] pub enum ExprKind { /// Apply the named instruction to the given children. - Call(String), + Call(Cow<'static, str>), /// Reference an input column by name. Column(String), - // A scalar value. - // Scalar(Scalar), + /// A boolean literal. + BooleanLiteral(bool), + /// A string literal. + StringLiteral(String), + /// A numeric literal. + /// + /// Other primitive literals (such as date times) may be expressed + /// using numeric literlas with an appropriate datatype. + NumericLiteral(bigdecimal::BigDecimal), } #[cfg(test)] @@ -52,7 +66,7 @@ mod tests { result_type: DataType::Int32, }, Expr { - kind: ExprKind::Call("add".to_string()), + kind: ExprKind::Call("add".into()), children: vec![0, 1], result_type: DataType::Int32, }, diff --git a/crates/sparrow-physical/src/lib.rs b/crates/sparrow-physical/src/lib.rs index aecf07b01..d5ca097b9 100644 --- a/crates/sparrow-physical/src/lib.rs +++ b/crates/sparrow-physical/src/lib.rs @@ -12,6 +12,8 @@ mod expr; mod plan; +mod step; pub use expr::*; pub use plan::*; +pub use step::*; diff --git a/crates/sparrow-physical/src/plan.rs b/crates/sparrow-physical/src/plan.rs index e69de29bb..67b4e3995 100644 --- a/crates/sparrow-physical/src/plan.rs +++ b/crates/sparrow-physical/src/plan.rs @@ -0,0 +1,12 @@ +use crate::Step; + +/// A plan is a directed, acyclic graph of steps. +/// +/// The plan is represented as an array of steps, with each step referencing +/// it's children (inputs) by index. The array is topologically sorted so that +/// all children have indices less than the step that references them. +#[derive(Debug)] +pub struct Plan { + /// The steps in the plan. + pub steps: Vec, +} diff --git a/crates/sparrow-physical/src/snapshots/sparrow_physical__expr__tests__physical_exprs_yaml.snap.new b/crates/sparrow-physical/src/snapshots/sparrow_physical__expr__tests__physical_exprs_yaml.snap similarity index 92% rename from crates/sparrow-physical/src/snapshots/sparrow_physical__expr__tests__physical_exprs_yaml.snap.new rename to crates/sparrow-physical/src/snapshots/sparrow_physical__expr__tests__physical_exprs_yaml.snap index 1ea5369fc..76f573aa4 100644 --- a/crates/sparrow-physical/src/snapshots/sparrow_physical__expr__tests__physical_exprs_yaml.snap.new +++ b/crates/sparrow-physical/src/snapshots/sparrow_physical__expr__tests__physical_exprs_yaml.snap @@ -1,6 +1,5 @@ --- source: crates/sparrow-physical/src/expr.rs -assertion_line: 63 expression: yaml --- - kind: !column foo diff --git a/crates/sparrow-physical/src/step.rs b/crates/sparrow-physical/src/step.rs new file mode 100644 index 000000000..d9354a036 --- /dev/null +++ b/crates/sparrow-physical/src/step.rs @@ -0,0 +1,59 @@ +use arrow_schema::SchemaRef; + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[repr(transparent)] +#[serde(transparent)] +pub struct StepId(usize); + +/// A single step in the physical plan. +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct Step { + pub id: StepId, + /// The kind of step being performed. + pub kind: StepKind, + /// Steps which act as input (children) of this step. + pub children: Vec, + /// The schema for this step. + pub schema: SchemaRef, +} + +/// The kinds of stesp that can occur in the physical plan. +#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "snake_case")] + +pub enum StepKind { + /// Scan the given table. + Scan { + table_name: String, + }, + /// Merge the given relations. + Merge, + /// Apply a projection to adjust columns in the table. + /// + /// The output includes the same rows as the input, but with columns + /// projected as configured. + Project { + /// Expressions to apply to compute additional input columns. + exprs: Vec, + /// Indices of expressions to use for the output. + /// + /// The length should be the same as the number of fields in the schema. + outputs: Vec, + }, + /// Filter the results based on a boolean predicate. + Filter { + /// Expressions to apply to compute the predicate. + /// + /// The last expression should be the boolean predicate. + exprs: Vec, + }, + /// A step that repartitions the output. + Repartition { + num_partitions: usize, + /// Expressions to apply to compute columns which may be referenced by `keys`. + exprs: Vec, + /// Indices of expression columns representing the keys. + keys: Vec, + }, + Error, +}