Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track cpu usage metrics for query execution #2140

Open
wants to merge 1 commit into
base: joshua/materialized-query-execution
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions crates/core/src/db/db_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,14 @@ metrics_group!(
#[labels(txn_type: WorkloadType, db: Identity, reducer_or_query: str, table_id: u32, table_name: str)]
pub rdb_num_rows_deleted: IntCounterVec,

#[name = spacetime_num_rows_fetched_total]
#[help = "The cumulative number of rows fetched from a table"]
#[labels(txn_type: WorkloadType, db: Identity, reducer_or_query: str, table_id: u32, table_name: str)]
pub rdb_num_rows_fetched: IntCounterVec,

#[name = spacetime_num_index_keys_scanned_total]
#[help = "The cumulative number of keys scanned from an index"]
#[labels(txn_type: WorkloadType, db: Identity, reducer_or_query: str, table_id: u32, table_name: str)]
pub rdb_num_keys_scanned: IntCounterVec,
#[name = spacetime_num_rows_scanned_total]
#[help = "The cumulative number of rows scanned during by reducers and queries"]
#[labels(txn_type: WorkloadType, db: Identity)]
pub rdb_num_rows_scanned: IntCounterVec,

#[name = spacetime_num_index_seeks_total]
#[help = "The cumulative number of index seeks"]
#[labels(txn_type: WorkloadType, db: Identity, reducer_or_query: str, table_id: u32, table_name: str)]
#[labels(txn_type: WorkloadType, db: Identity)]
pub rdb_num_index_seeks: IntCounterVec,

#[name = spacetime_num_txns_total]
Expand Down
22 changes: 16 additions & 6 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ use crate::db::datastore::traits::{IsolationLevel, Program, TxData};
use crate::energy::EnergyQuanta;
use crate::error::DBError;
use crate::estimation::estimate_rows_scanned;
use crate::execution_context::{ExecutionContext, ReducerContext, Workload};
use crate::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType};
use crate::hash::Hash;
use crate::identity::Identity;
use crate::messages::control_db::Database;
use crate::replica_context::ReplicaContext;
use crate::sql::ast::SchemaViewer;
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
use crate::subscription::record_query_metrics;
use crate::subscription::tx::DeltaTx;
use crate::util::lending_pool::{Closed, LendingPool, LentResource, PoolClosed};
use crate::vm::check_row_limit;
Expand Down Expand Up @@ -850,17 +851,26 @@ impl ModuleHost {
let auth = AuthCtx::new(replica_ctx.owner_identity, caller_identity);
log::debug!("One-off query: {query}");

db.with_read_only(Workload::Sql, |tx| {
let (rows, metrics) = db.with_read_only(Workload::Sql, |tx| {
let tx = SchemaViewer::new(tx, &auth);
let plan = SubscribePlan::compile(&query, &tx)?;
check_row_limit(&plan, db, &tx, |plan, tx| estimate_rows_scanned(tx, plan), &auth)?;
plan.execute::<_, F>(&DeltaTx::from(&*tx))
.map(|(rows, _)| OneOffTable {
table_name: plan.table_name().to_owned().into_boxed_str(),
rows,
.map(|(rows, _, metrics)| {
(
OneOffTable {
table_name: plan.table_name().to_owned().into_boxed_str(),
rows,
},
metrics,
)
})
.context("One-off queries are not allowed to modify the database")
})
})?;

record_query_metrics(WorkloadType::Sql, &db.database_identity(), metrics);

Ok(rows)
}

/// FIXME(jgilles): this is a temporary workaround for deleting not currently being supported
Expand Down
15 changes: 8 additions & 7 deletions crates/core/src/subscription/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;

use anyhow::Result;
use spacetimedb_execution::{Datastore, DeltaStore};
use spacetimedb_query::delta::DeltaPlanEvaluator;
use spacetimedb_query::{delta::DeltaPlanEvaluator, metrics::QueryMetrics};
use spacetimedb_vm::relation::RelValue;

use crate::host::module_host::UpdatesRelValue;
Expand All @@ -15,34 +15,35 @@ use crate::host::module_host::UpdatesRelValue;
/// Hence this may be removed at any time after 1.0.
pub fn eval_delta<'a, Tx: Datastore + DeltaStore>(
tx: &'a Tx,
metrics: &mut QueryMetrics,
delta: &'a DeltaPlanEvaluator,
) -> Result<UpdatesRelValue<'a>> {
if !delta.is_join() {
return Ok(UpdatesRelValue {
inserts: delta.eval_inserts(tx)?.map(RelValue::from).collect(),
deletes: delta.eval_deletes(tx)?.map(RelValue::from).collect(),
inserts: delta.eval_inserts(tx, metrics)?.map(RelValue::from).collect(),
deletes: delta.eval_deletes(tx, metrics)?.map(RelValue::from).collect(),
});
}
if delta.has_inserts() && !delta.has_deletes() {
return Ok(UpdatesRelValue {
inserts: delta.eval_inserts(tx)?.map(RelValue::from).collect(),
inserts: delta.eval_inserts(tx, metrics)?.map(RelValue::from).collect(),
deletes: vec![],
});
}
if delta.has_deletes() && !delta.has_inserts() {
return Ok(UpdatesRelValue {
deletes: delta.eval_deletes(tx)?.map(RelValue::from).collect(),
deletes: delta.eval_deletes(tx, metrics)?.map(RelValue::from).collect(),
inserts: vec![],
});
}
let mut inserts = HashMap::new();

for row in delta.eval_inserts(tx)?.map(RelValue::from) {
for row in delta.eval_inserts(tx, metrics)?.map(RelValue::from) {
inserts.entry(row).and_modify(|n| *n += 1).or_insert(1);
}

let deletes = delta
.eval_deletes(tx)?
.eval_deletes(tx, metrics)?
.map(RelValue::from)
.filter(|row| match inserts.get_mut(row) {
None => true,
Expand Down
16 changes: 16 additions & 0 deletions crates/core/src/subscription/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
use spacetimedb_lib::Identity;
use spacetimedb_query::metrics::QueryMetrics;

use crate::{db::db_metrics::DB_METRICS, execution_context::WorkloadType};

pub mod delta;
pub mod execution_unit;
pub mod module_subscription_actor;
Expand All @@ -6,3 +11,14 @@ pub mod query;
#[allow(clippy::module_inception)] // it's right this isn't ideal :/
pub mod subscription;
pub mod tx;

pub(crate) fn record_query_metrics(workload: WorkloadType, db: &Identity, metrics: QueryMetrics) {
DB_METRICS
.rdb_num_index_seeks
.with_label_values(&workload, db)
.inc_by(metrics.index_seeks as u64);
DB_METRICS
.rdb_num_rows_scanned
.with_label_values(&workload, db)
.inc_by(metrics.rows_scanned as u64);
}
49 changes: 39 additions & 10 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::execution_unit::QueryHash;
use super::module_subscription_manager::{Plan, SubscriptionManager};
use super::query::compile_read_only_query;
use super::record_query_metrics;
use super::tx::DeltaTx;
use crate::client::messages::{
SubscriptionError, SubscriptionMessage, SubscriptionResult, SubscriptionRows, SubscriptionUpdateMessage,
Expand All @@ -11,7 +12,7 @@ use crate::db::datastore::locking_tx_datastore::tx::TxId;
use crate::db::relational_db::{MutTx, RelationalDB, Tx};
use crate::error::DBError;
use crate::estimation::estimate_rows_scanned;
use crate::execution_context::Workload;
use crate::execution_context::{Workload, WorkloadType};
use crate::host::module_host::{DatabaseUpdate, EventStatus, ModuleEvent};
use crate::messages::websocket::Subscribe;
use crate::vm::check_row_limit;
Expand All @@ -22,6 +23,7 @@ use spacetimedb_client_api_messages::websocket::{
};
use spacetimedb_lib::identity::AuthCtx;
use spacetimedb_lib::Identity;
use spacetimedb_query::metrics::QueryMetrics;
use spacetimedb_query::{execute_plans, SubscribePlan};
use std::{sync::Arc, time::Instant};

Expand All @@ -37,6 +39,7 @@ pub struct ModuleSubscriptions {
}

type AssertTxFn = Arc<dyn Fn(&Tx)>;
type SubscriptionUpdate = FormatSwitch<TableUpdate<BsatnFormat>, TableUpdate<JsonFormat>>;

impl ModuleSubscriptions {
pub fn new(relational_db: Arc<RelationalDB>, owner_identity: Identity) -> Self {
Expand All @@ -54,7 +57,7 @@ impl ModuleSubscriptions {
query: Arc<Plan>,
tx: &TxId,
auth: &AuthCtx,
) -> Result<FormatSwitch<TableUpdate<BsatnFormat>, TableUpdate<JsonFormat>>, DBError> {
) -> Result<(SubscriptionUpdate, QueryMetrics), DBError> {
let comp = sender.config.compression;
let plan = SubscribePlan::from_delta_plan(&query);

Expand All @@ -68,8 +71,12 @@ impl ModuleSubscriptions {

let tx = DeltaTx::from(tx);
Ok(match sender.config.protocol {
Protocol::Binary => FormatSwitch::Bsatn(plan.collect_table_update(comp, &tx)?),
Protocol::Text => FormatSwitch::Json(plan.collect_table_update(comp, &tx)?),
Protocol::Binary => plan
.collect_table_update(comp, &tx)
.map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))?,
Protocol::Text => plan
.collect_table_update(comp, &tx)
.map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))?,
})
}

Expand Down Expand Up @@ -98,7 +105,13 @@ impl ModuleSubscriptions {

drop(guard);

let table_rows = self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, &auth)?;
let (table_rows, metrics) = self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, &auth)?;

record_query_metrics(
WorkloadType::Subscribe,
&self.relational_db.database_identity(),
metrics,
);

// It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently.
// This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here
Expand Down Expand Up @@ -161,7 +174,13 @@ impl ModuleSubscriptions {
self.relational_db.release_tx(tx);
});
let auth = AuthCtx::new(self.owner_identity, sender.id.identity);
let table_rows = self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, &auth)?;
let (table_rows, metrics) = self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, &auth)?;

record_query_metrics(
WorkloadType::Subscribe,
&self.relational_db.database_identity(),
metrics,
);

WORKER_METRICS
.subscription_queries
Expand Down Expand Up @@ -247,11 +266,19 @@ impl ModuleSubscriptions {
)?;

let tx = DeltaTx::from(&*tx);
let database_update = match sender.config.protocol {
Protocol::Text => FormatSwitch::Json(execute_plans(plans, comp, &tx)?),
Protocol::Binary => FormatSwitch::Bsatn(execute_plans(plans, comp, &tx)?),
let (database_update, metrics) = match sender.config.protocol {
Protocol::Binary => execute_plans(plans, comp, &tx)
.map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))?,
Protocol::Text => execute_plans(plans, comp, &tx)
.map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))?,
};

record_query_metrics(
WorkloadType::Subscribe,
&self.relational_db.database_identity(),
metrics,
);

// It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently.
// This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here
// but that should not pose an issue.
Expand Down Expand Up @@ -328,7 +355,9 @@ impl ModuleSubscriptions {
let event = Arc::new(event);

match &event.status {
EventStatus::Committed(_) => subscriptions.eval_updates(&read_tx, event.clone(), caller),
EventStatus::Committed(_) => {
subscriptions.eval_updates(&read_tx, event.clone(), caller, &self.relational_db.database_identity())
}
EventStatus::Failed(_) => {
if let Some(client) = caller {
let message = TransactionUpdateMessage {
Expand Down
37 changes: 30 additions & 7 deletions crates/core/src/subscription/module_subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ use super::tx::DeltaTx;
use crate::client::messages::{SubscriptionUpdateMessage, TransactionUpdateMessage};
use crate::client::{ClientConnectionSender, Protocol};
use crate::error::DBError;
use crate::execution_context::WorkloadType;
use crate::host::module_host::{DatabaseTableUpdate, ModuleEvent, UpdatesRelValue};
use crate::messages::websocket::{self as ws, TableUpdate};
use crate::subscription::delta::eval_delta;
use crate::subscription::record_query_metrics;
use hashbrown::hash_map::OccupiedError;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use spacetimedb_client_api_messages::websocket::{
Expand All @@ -15,6 +17,7 @@ use spacetimedb_data_structures::map::{Entry, HashCollectionExt, HashMap, HashSe
use spacetimedb_lib::{Address, Identity};
use spacetimedb_primitives::TableId;
use spacetimedb_query::delta::DeltaPlan;
use spacetimedb_query::metrics::QueryMetrics;
use std::ops::Deref;
use std::sync::Arc;

Expand Down Expand Up @@ -310,15 +313,21 @@ impl SubscriptionManager {
/// evaluates only the necessary queries for those delta tables,
/// and then sends the results to each client.
#[tracing::instrument(skip_all)]
pub fn eval_updates(&self, tx: &DeltaTx, event: Arc<ModuleEvent>, caller: Option<&ClientConnectionSender>) {
pub fn eval_updates(
&self,
tx: &DeltaTx,
event: Arc<ModuleEvent>,
caller: Option<&ClientConnectionSender>,
database_identity: &Identity,
) {
use FormatSwitch::{Bsatn, Json};

let tables = &event.status.database_update().unwrap().tables;

// Put the main work on a rayon compute thread.
rayon::scope(|_| {
let span = tracing::info_span!("eval_incr").entered();
let mut eval = tables
let (updates, metrics) = tables
.iter()
.filter(|table| !table.inserts.is_empty() || !table.deletes.is_empty())
.map(|DatabaseTableUpdate { table_id, .. }| table_id)
Expand All @@ -330,7 +339,7 @@ impl SubscriptionManager {
// If N clients are subscribed to a query,
// we copy the DatabaseTableUpdate N times,
// which involves cloning BSATN (binary) or product values (json).
.flat_map_iter(|(hash, plan)| {
.map(|(hash, plan)| {
let table_id = plan.table_id();
let table_name = plan.table_name();
// Store at most one copy of the serialization to BSATN
Expand All @@ -353,9 +362,10 @@ impl SubscriptionManager {
}

let evaluator = plan.evaluator(tx);
let mut metrics = QueryMetrics::default();

// TODO: Handle errors instead of skipping them
eval_delta(tx, &evaluator)
let updates = eval_delta(tx, &mut metrics, &evaluator)
.ok()
.filter(|delta_updates| delta_updates.has_updates())
.map(|delta_updates| {
Expand All @@ -377,9 +387,22 @@ impl SubscriptionManager {
})
.collect::<Vec<_>>()
})
.unwrap_or_default()
.unwrap_or_default();

(updates, metrics)
})
.collect::<Vec<_>>()
.reduce(
|| (vec![], QueryMetrics::default()),
|(mut updates, mut aggregated_metrics), (table_upates, metrics)| {
updates.extend(table_upates);
aggregated_metrics.merge(metrics);
(updates, aggregated_metrics)
},
);

record_query_metrics(WorkloadType::Update, database_identity, metrics);

let mut eval = updates
.into_iter()
// For each subscriber, aggregate all the updates for the same table.
// That is, we build a map `(subscriber_id, table_id) -> updates`.
Expand Down Expand Up @@ -948,7 +971,7 @@ mod tests {
});

db.with_read_only(Workload::Update, |tx| {
subscriptions.eval_updates(&(&*tx).into(), event, Some(&client0))
subscriptions.eval_updates(&(&*tx).into(), event, Some(&client0), &db.database_identity())
});

tokio::runtime::Builder::new_current_thread()
Expand Down
Loading