Skip to content

Commit

Permalink
Introduce a pipelined materializing query executor
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-spacetime committed Jan 17, 2025
1 parent 6f428f3 commit 8a36603
Show file tree
Hide file tree
Showing 5 changed files with 705 additions and 34 deletions.
21 changes: 17 additions & 4 deletions crates/execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use iter::PlanIter;
use spacetimedb_lib::{
bsatn::{EncodeError, ToBsatn},
query::Delta,
ser::Serialize,
sats::impl_serialize,
AlgebraicValue, ProductValue,
};
use spacetimedb_physical_plan::plan::{ProjectField, ProjectPlan, TupleField};
Expand All @@ -17,6 +17,7 @@ use spacetimedb_table::{
};

pub mod iter;
pub mod pipelined;

/// The datastore interface required for building an executor
pub trait Datastore {
Expand Down Expand Up @@ -72,12 +73,17 @@ pub trait DeltaStore {
}
}

#[derive(Clone, Serialize)]
#[derive(Clone)]
pub enum Row<'a> {
Ptr(RowRef<'a>),
Ref(&'a ProductValue),
}

impl_serialize!(['a] Row<'a>, (self, ser) => match self {
Self::Ptr(row) => row.serialize(ser),
Self::Ref(row) => row.serialize(ser),
});

impl ToBsatn for Row<'_> {
fn static_bsatn_size(&self) -> Option<u16> {
match self {
Expand All @@ -104,8 +110,8 @@ impl ToBsatn for Row<'_> {
impl ProjectField for Row<'_> {
fn project(&self, field: &TupleField) -> AlgebraicValue {
match self {
Self::Ptr(ptr) => ptr.read_col(field.field_pos).unwrap(),
Self::Ref(val) => val.elements.get(field.field_pos).unwrap().clone(),
Self::Ptr(ptr) => ptr.project(field),
Self::Ref(val) => val.project(field),
}
}
}
Expand Down Expand Up @@ -153,6 +159,13 @@ impl<'a> Tuple<'a> {
}
}
}

fn join(self, with: Self) -> Self {
match with {
Self::Row(ptr) => self.append(ptr),
Self::Join(ptrs) => ptrs.into_iter().fold(self, |tup, ptr| tup.append(ptr)),
}
}
}

pub struct DeltaScanIter<'a> {
Expand Down
Loading

0 comments on commit 8a36603

Please sign in to comment.