Skip to content

Commit

Permalink
Have the scheduled_id/at columns be specified in the schema, not by t…
Browse files Browse the repository at this point in the history
…he column name (#1861)
  • Loading branch information
coolreader18 authored Oct 18, 2024
1 parent 263511e commit 209d12c
Show file tree
Hide file tree
Showing 18 changed files with 194 additions and 152 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 14 additions & 2 deletions crates/bindings-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1030,7 +1030,19 @@ fn table_impl(mut args: TableArgs, mut item: MutItem<syn::DeriveInput>) -> syn::
let unique_col_ids = unique_columns.iter().map(|col| col.index);
let primary_col_id = primary_key_column.iter().map(|col| col.index);
let sequence_col_ids = sequenced_columns.iter().map(|col| col.index);
let scheduled_reducer_ident = args.scheduled.iter();

let schedule = args
.scheduled
.as_ref()
.map(|reducer| {
// scheduled_at was inserted as the last field
let scheduled_at_id = (fields.len() - 1) as u16;
quote!(spacetimedb::table::ScheduleDesc {
reducer_name: <#reducer as spacetimedb::rt::ReducerInfo>::NAME,
scheduled_at_column: #scheduled_at_id,
})
})
.into_iter();

let unique_err = if !unique_columns.is_empty() {
quote!(spacetimedb::UniqueConstraintViolation)
Expand Down Expand Up @@ -1063,7 +1075,7 @@ fn table_impl(mut args: TableArgs, mut item: MutItem<syn::DeriveInput>) -> syn::
const INDEXES: &'static [spacetimedb::table::IndexDesc<'static>] = &[#(#index_descs),*];
#(const PRIMARY_KEY: Option<u16> = Some(#primary_col_id);)*
const SEQUENCES: &'static [u16] = &[#(#sequence_col_ids),*];
#(const SCHEDULED_REDUCER_NAME: Option<&'static str> = Some(<#scheduled_reducer_ident as spacetimedb::rt::ReducerInfo>::NAME);)*
#(const SCHEDULE: Option<spacetimedb::table::ScheduleDesc<'static>> = Some(#schedule);)*

#table_id_from_name_func
}
Expand Down
1 change: 1 addition & 0 deletions crates/bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub use spacetimedb_lib::ser::Serialize;
pub use spacetimedb_lib::Address;
pub use spacetimedb_lib::AlgebraicValue;
pub use spacetimedb_lib::Identity;
pub use spacetimedb_lib::ScheduleAt;
pub use spacetimedb_primitives::TableId;
pub use sys::Errno;
pub use table::{AutoIncOverflow, BTreeIndex, Table, TryInsertError, UniqueColumn, UniqueConstraintViolation};
Expand Down
18 changes: 16 additions & 2 deletions crates/bindings/src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,20 @@ pub trait TableColumn {
}
impl<T: SpacetimeType> TableColumn for T {}

/// Assert that the primary_key column of a scheduled table is a u64.
pub const fn assert_scheduled_table_primary_key<T: ScheduledTablePrimaryKey>() {}

mod sealed {
pub trait Sealed {}
}
#[diagnostic::on_unimplemented(
message = "scheduled table primary key must be a `u64`",
label = "should be `u64`, not `{Self}`"
)]
pub trait ScheduledTablePrimaryKey: sealed::Sealed {}
impl sealed::Sealed for u64 {}
impl ScheduledTablePrimaryKey for u64 {}

/// Used in the last type parameter of `Reducer` to indicate that the
/// context argument *should* be passed to the reducer logic.
pub struct ContextArg;
Expand Down Expand Up @@ -331,8 +345,8 @@ pub fn register_table<T: Table>() {
for &col in T::SEQUENCES {
table = table.with_column_sequence(col, None);
}
if let Some(scheduled_reducer) = T::SCHEDULED_REDUCER_NAME {
table = table.with_schedule(scheduled_reducer, None);
if let Some(schedule) = T::SCHEDULE {
table = table.with_schedule(schedule.reducer_name, schedule.scheduled_at_column, None);
}

table.finish();
Expand Down
7 changes: 6 additions & 1 deletion crates/bindings/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ pub trait TableInternal: Sized {
const INDEXES: &'static [IndexDesc<'static>];
const PRIMARY_KEY: Option<u16> = None;
const SEQUENCES: &'static [u16];
const SCHEDULED_REDUCER_NAME: Option<&'static str> = None;
const SCHEDULE: Option<ScheduleDesc<'static>> = None;

/// Returns the ID of this table.
fn table_id() -> TableId;
Expand All @@ -143,6 +143,11 @@ pub enum IndexAlgo<'a> {
BTree { columns: &'a [u16] },
}

pub struct ScheduleDesc<'a> {
pub reducer_name: &'a str,
pub scheduled_at_column: u16,
}

#[doc(hidden)]
pub trait __MapRowTypeToTable {
type Table: Table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,7 @@ mod tests {
ColRow { table: ST_SCHEDULED_ID.into(), pos: 1, name: "table_id", ty: TableId::get_type() },
ColRow { table: ST_SCHEDULED_ID.into(), pos: 2, name: "reducer_name", ty: AlgebraicType::String },
ColRow { table: ST_SCHEDULED_ID.into(), pos: 3, name: "schedule_name", ty: AlgebraicType::String },
ColRow { table: ST_SCHEDULED_ID.into(), pos: 4, name: "at_column", ty: AlgebraicType::U16 },

ColRow { table: ST_ROW_LEVEL_SECURITY_ID.into(), pos: 0, name: "table_id", ty: TableId::get_type() },
ColRow { table: ST_ROW_LEVEL_SECURITY_ID.into(), pos: 1, name: "sql", ty: AlgebraicType::String },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ impl MutTxId {
schedule_id: ScheduleId::SENTINEL,
schedule_name: schedule.schedule_name,
reducer_name: schedule.reducer_name,
at_column: schedule.at_column,
};
let (generated, ..) = self.insert(ST_SCHEDULED_ID, &mut row.into(), database_address)?;
let id = generated.as_u32();
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/db/datastore/system_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ st_fields_enum!(enum StScheduledFields {
"table_id", TableId = 1,
"reducer_name", ReducerName = 2,
"schedule_name", ScheduleName = 3,
"at_column", AtColumn = 4,
});

/// Helper method to check that a system table has the correct fields.
Expand Down Expand Up @@ -1285,6 +1286,7 @@ pub struct StScheduledRow {
pub(crate) table_id: TableId,
pub(crate) reducer_name: Box<str>,
pub(crate) schedule_name: Box<str>,
pub(crate) at_column: ColId,
}

impl TryFrom<RowRef<'_>> for StScheduledRow {
Expand All @@ -1307,6 +1309,7 @@ impl From<StScheduledRow> for ScheduleSchema {
reducer_name: row.reducer_name,
schedule_id: row.schedule_id,
schedule_name: row.schedule_name,
at_column: row.at_column,
}
}
}
Expand Down
16 changes: 10 additions & 6 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::error::{DBError, DatabaseError, TableError};
use crate::execution_context::ExecutionContext;
use crate::messages::control_db::HostType;
use crate::util::spawn_rayon;
use anyhow::anyhow;
use anyhow::{anyhow, Context};
use fs2::FileExt;
use futures::channel::mpsc;
use futures::StreamExt;
Expand Down Expand Up @@ -527,14 +527,18 @@ impl RelationalDB {
.get_all_tables_tx(&ExecutionContext::internal(self.address), tx)
}

pub fn is_scheduled_table(
pub fn table_scheduled_id_and_at(
&self,
ctx: &ExecutionContext,
tx: &mut MutTx,
tx: &impl StateView,
table_id: TableId,
) -> Result<bool, DBError> {
tx.schema_for_table(ctx, table_id)
.map(|schema| schema.schedule.is_some())
) -> Result<Option<(ColId, ColId)>, DBError> {
let schema = tx.schema_for_table(ctx, table_id)?;
let Some(sched) = &schema.schedule else { return Ok(None) };
let primary_key = schema
.primary_key
.context("scheduled table doesn't have a primary key?")?;
Ok(Some((primary_key, sched.at_column)))
}

pub fn decode_column(
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,14 @@ impl InstanceEnv {
}
})?;

if stdb.is_scheduled_table(ctx, tx, table_id)? {
if let Some((id_column, at_column)) = stdb.table_scheduled_id_and_at(ctx, tx, table_id)? {
let row_ref = tx.get(table_id, row_ptr)?.unwrap();
let (schedule_id, schedule_at) = get_schedule_from_row(tx, stdb, table_id, &row_ref)
let (schedule_id, schedule_at) = get_schedule_from_row(&row_ref, id_column, at_column)
// NOTE(centril): Should never happen,
// as we successfully inserted and thus `ret` is verified against the table schema.
.map_err(|e| NodesError::ScheduleError(ScheduleError::DecodingError(e)))?;
self.scheduler
.schedule(table_id, schedule_id, schedule_at)
.schedule(table_id, schedule_id, schedule_at, id_column, at_column)
.map_err(NodesError::ScheduleError)?;
}

Expand Down
Loading

2 comments on commit 209d12c

@github-actions
Copy link

@github-actions github-actions bot commented on 209d12c Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmarking failed. Please check the workflow run for details.

@github-actions
Copy link

@github-actions github-actions bot commented on 209d12c Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callgrind benchmark results

Callgrind Benchmark Report

These benchmarks were run using callgrind,
an instruction-level profiler. They allow comparisons between sqlite (sqlite), SpacetimeDB running through a module (stdb_module), and the underlying SpacetimeDB data storage engine (stdb_raw). Callgrind emulates a CPU to collect the below estimates.

Measurement changes larger than five percent are in bold.

In-memory benchmarks

callgrind: empty transaction

db total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw 5392 5393 -0.02% 5430 5431 -0.02%
sqlite 5579 5579 0.00% 5967 5941 0.44%

callgrind: filter

db schema indices count preload _column data_type total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str no_index 64 128 1 u64 75558 75556 0.00% 75896 75962 -0.09%
stdb_raw u32_u64_str no_index 64 128 2 string 118056 118054 0.00% 118552 118594 -0.04%
stdb_raw u32_u64_str btree_each_column 64 128 2 string 24082 24081 0.00% 24486 24509 -0.09%
stdb_raw u32_u64_str btree_each_column 64 128 1 u64 23050 23048 0.01% 23356 23386 -0.13%
sqlite u32_u64_str no_index 64 128 2 string 144695 144695 0.00% 146113 146217 -0.07%
sqlite u32_u64_str no_index 64 128 1 u64 124044 124044 0.00% 125252 125396 -0.11%
sqlite u32_u64_str btree_each_column 64 128 1 u64 131361 131361 0.00% 132753 132761 -0.01%
sqlite u32_u64_str btree_each_column 64 128 2 string 134494 134494 0.00% 136072 136132 -0.04%

callgrind: insert bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 64 128 900638 903196 -0.28% 920762 952634 -3.35%
stdb_raw u32_u64_str btree_each_column 64 128 1053903 1053024 0.08% 1115337 1083410 2.95%
sqlite u32_u64_str unique_0 64 128 398320 398320 0.00% 413142 413862 -0.17%
sqlite u32_u64_str btree_each_column 64 128 983637 983637 0.00% 1018503 1020055 -0.15%

callgrind: iterate

db schema indices count total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 152784 152785 -0.00% 152862 152835 0.02%
stdb_raw u32_u64_str unique_0 64 15809 15810 -0.01% 15879 15860 0.12%
sqlite u32_u64_str unique_0 1024 1067255 1067255 0.00% 1070563 1070665 -0.01%
sqlite u32_u64_str unique_0 64 76201 76201 0.00% 77259 77373 -0.15%

callgrind: serialize_product_value

count format total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
64 json 47528 47528 0.00% 50180 50180 0.00%
64 bsatn 25509 25509 0.00% 27685 27753 -0.25%
16 bsatn 8200 8200 0.00% 9492 9560 -0.71%
16 json 12188 12188 0.00% 14092 14092 0.00%

callgrind: update bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 1024 20704502 20973196 -1.28% 21503972 21556230 -0.24%
stdb_raw u32_u64_str unique_0 64 128 1308896 1311452 -0.19% 1395016 1352044 3.18%
sqlite u32_u64_str unique_0 1024 1024 1802128 1802128 0.00% 1811108 1811372 -0.01%
sqlite u32_u64_str unique_0 64 128 128474 128474 0.00% 131220 131280 -0.05%
On-disk benchmarks

callgrind: empty transaction

db total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw 5397 5398 -0.02% 5439 5440 -0.02%
sqlite 5621 5621 0.00% 6105 6043 1.03%

callgrind: filter

db schema indices count preload _column data_type total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str no_index 64 128 1 u64 75563 75561 0.00% 75893 75967 -0.10%
stdb_raw u32_u64_str no_index 64 128 2 string 119150 118059 0.92% 119642 118699 0.79%
stdb_raw u32_u64_str btree_each_column 64 128 2 string 24087 24089 -0.01% 24511 24525 -0.06%
stdb_raw u32_u64_str btree_each_column 64 128 1 u64 23055 23053 0.01% 23357 23387 -0.13%
sqlite u32_u64_str no_index 64 128 1 u64 125965 125965 0.00% 127521 127553 -0.03%
sqlite u32_u64_str no_index 64 128 2 string 146616 146616 0.00% 148362 148386 -0.02%
sqlite u32_u64_str btree_each_column 64 128 2 string 136616 136616 0.00% 138664 138744 -0.06%
sqlite u32_u64_str btree_each_column 64 128 1 u64 133457 133457 0.00% 135253 135325 -0.05%

callgrind: insert bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 64 128 851335 852449 -0.13% 901379 900615 0.08%
stdb_raw u32_u64_str btree_each_column 64 128 999599 1000273 -0.07% 1060041 1059899 0.01%
sqlite u32_u64_str unique_0 64 128 415857 415857 0.00% 429869 430509 -0.15%
sqlite u32_u64_str btree_each_column 64 128 1021904 1021898 0.00% 1055484 1057830 -0.22%

callgrind: iterate

db schema indices count total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 152789 152790 -0.00% 152859 152836 0.02%
stdb_raw u32_u64_str unique_0 64 15814 15815 -0.01% 15884 15861 0.15%
sqlite u32_u64_str unique_0 1024 1070323 1070323 0.00% 1074105 1074295 -0.02%
sqlite u32_u64_str unique_0 64 77973 77991 -0.02% 79247 79395 -0.19%

callgrind: serialize_product_value

count format total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
64 json 47528 47528 0.00% 50180 50180 0.00%
64 bsatn 25509 25509 0.00% 27685 27753 -0.25%
16 bsatn 8200 8200 0.00% 9492 9560 -0.71%
16 json 12188 12188 0.00% 14092 14092 0.00%

callgrind: update bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 1024 19444399 19463224 -0.10% 20318759 20136826 0.90%
stdb_raw u32_u64_str unique_0 64 128 1262469 1263736 -0.10% 1348485 1335690 0.96%
sqlite u32_u64_str unique_0 1024 1024 1809689 1809689 0.00% 1818351 1818403 -0.00%
sqlite u32_u64_str unique_0 64 128 132600 132600 0.00% 135506 135570 -0.05%

Please sign in to comment.