From d205670868a5c9738327ecba57eab3ab8990de5d Mon Sep 17 00:00:00 2001 From: Tyler Cloutier Date: Tue, 1 Aug 2023 10:59:37 +0200 Subject: [PATCH] Reimplemented the energy tracking for identities (#115) * Refactoring some stuff for energy * Fix an issue with i128 query params * Infinite budget in Standalone * Energy and crash fixes * Hopefully fixed the test that now has energy * Addresses Centril's comments * Cargo fmt --------- Signed-off-by: Tyler Cloutier --- Cargo.lock | 5 +- crates/cli/src/subcommands/energy.rs | 6 +- crates/client-api/Cargo.toml | 1 + crates/client-api/src/auth.rs | 6 +- crates/client-api/src/lib.rs | 12 ++- crates/client-api/src/routes/energy.rs | 25 ++++-- crates/core/src/control_db.rs | 19 +++-- crates/core/src/host/host_controller.rs | 34 +++++++- crates/core/src/host/wasmer/mod.rs | 4 +- crates/core/src/host/wasmer/wasmer_module.rs | 23 +++--- crates/core/src/json/client_api.rs | 2 +- crates/core/src/messages/control_db.rs | 7 +- .../core/src/messages/control_worker_api.rs | 4 +- crates/standalone/src/energy_monitor.rs | 79 +++++++++++++++++++ crates/standalone/src/lib.rs | 30 ++++--- crates/standalone/src/main.rs | 3 +- crates/testing/src/modules.rs | 7 +- 17 files changed, 210 insertions(+), 57 deletions(-) create mode 100644 crates/standalone/src/energy_monitor.rs diff --git a/Cargo.lock b/Cargo.lock index 2904e09927..d88c73c49b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1982,9 +1982,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.6" +version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" +checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" [[package]] name = "jobserver" @@ -3978,6 +3978,7 @@ dependencies = [ "futures", "http", "hyper", + "itoa", "lazy_static", "log", "mime", diff --git a/crates/cli/src/subcommands/energy.rs b/crates/cli/src/subcommands/energy.rs index 69fd822ca1..93d5fea954 100644 --- a/crates/cli/src/subcommands/energy.rs +++ b/crates/cli/src/subcommands/energy.rs @@ -27,7 +27,7 @@ fn get_energy_subcommands() -> Vec { .arg( Arg::new("balance") .required(true) - .value_parser(value_parser!(u64)) + .value_parser(value_parser!(i128)) .help("The balance value to set"), ) .arg( @@ -63,7 +63,7 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error async fn exec_update_balance(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error> { // let project_name = args.value_of("project name").unwrap(); let hex_id = args.get_one::("identity"); - let balance = *args.get_one::("balance").unwrap(); + let balance = *args.get_one::("balance").unwrap(); let quiet = args.get_flag("quiet"); let hex_id = hex_id_or_default(hex_id, &config); @@ -102,7 +102,7 @@ pub(super) async fn set_balance( client: &reqwest::Client, config: &Config, hex_identity: &str, - balance: u64, + balance: i128, ) -> anyhow::Result { // TODO: this really should be form data in POST body, not query string parameter, but gotham // does not support that on the server side without an extension. diff --git a/crates/client-api/Cargo.toml b/crates/client-api/Cargo.toml index 09f0897cf9..668b5dc8c3 100644 --- a/crates/client-api/Cargo.toml +++ b/crates/client-api/Cargo.toml @@ -32,3 +32,4 @@ futures = "0.3" bytes = "1" bytestring = "1" tokio-tungstenite = "0.18.0" +itoa = "1.0.9" diff --git a/crates/client-api/src/auth.rs b/crates/client-api/src/auth.rs index 28162495d2..da2eada2f1 100644 --- a/crates/client-api/src/auth.rs +++ b/crates/client-api/src/auth.rs @@ -1,3 +1,4 @@ +use std::fmt::Write; use std::time::Duration; use axum::extract::rejection::{TypedHeaderRejection, TypedHeaderRejectionReason}; @@ -6,6 +7,7 @@ use axum::headers::authorization::Credentials; use axum::headers::{self, authorization}; use axum::response::IntoResponse; use axum::TypedHeader; +use bytes::BytesMut; use http::{request, HeaderValue, StatusCode}; use serde::Deserialize; use spacetimedb::auth::identity::{ @@ -243,7 +245,9 @@ impl headers::Header for SpacetimeEnergyUsed { } fn encode>(&self, values: &mut E) { - values.extend([self.0 .0.into()]) + let mut buf = BytesMut::new(); + let _ = buf.write_str(itoa::Buffer::new().format(self.0 .0)); + values.extend([HeaderValue::from_bytes(&buf).unwrap()]); } } diff --git a/crates/client-api/src/lib.rs b/crates/client-api/src/lib.rs index 8da7e9b54e..deafc1e4b6 100644 --- a/crates/client-api/src/lib.rs +++ b/crates/client-api/src/lib.rs @@ -7,8 +7,8 @@ use spacetimedb::client::ClientActorIndex; use spacetimedb::control_db::ControlDb; use spacetimedb::database_instance_context_controller::DatabaseInstanceContextController; use spacetimedb::hash::Hash; -use spacetimedb::host::HostController; use spacetimedb::host::UpdateDatabaseResult; +use spacetimedb::host::{EnergyQuanta, HostController}; use spacetimedb::identity::Identity; use spacetimedb::messages::control_db::{Database, DatabaseInstance, HostType, Node}; use spacetimedb::messages::worker_db::DatabaseInstanceState; @@ -90,6 +90,8 @@ pub trait ControlNodeDelegate: Send + Sync { async fn alloc_spacetime_identity(&self) -> spacetimedb::control_db::Result; + async fn withdraw_energy(&self, identity: &Identity, amount: EnergyQuanta) -> spacetimedb::control_db::Result<()>; + fn public_key(&self) -> &DecodingKey; fn private_key(&self) -> &EncodingKey; } @@ -123,6 +125,10 @@ impl ControlNodeDelegate for ArcEnv { self.0.alloc_spacetime_identity().await } + async fn withdraw_energy(&self, identity: &Identity, amount: EnergyQuanta) -> spacetimedb::control_db::Result<()> { + self.0.withdraw_energy(identity, amount).await + } + fn public_key(&self) -> &DecodingKey { self.0.public_key() } @@ -141,6 +147,10 @@ impl ControlNodeDelegate for Arc { (**self).alloc_spacetime_identity().await } + async fn withdraw_energy(&self, identity: &Identity, amount: EnergyQuanta) -> spacetimedb::control_db::Result<()> { + (**self).withdraw_energy(identity, amount).await + } + fn public_key(&self) -> &DecodingKey { (**self).public_key() } diff --git a/crates/client-api/src/routes/energy.rs b/crates/client-api/src/routes/energy.rs index 5d00b2c800..d640628a0a 100644 --- a/crates/client-api/src/routes/energy.rs +++ b/crates/client-api/src/routes/energy.rs @@ -6,6 +6,7 @@ use http::StatusCode; use serde::Deserialize; use serde_json::json; +use spacetimedb::host::EnergyQuanta; use spacetimedb_lib::Identity; use crate::auth::SpacetimeAuthHeader; @@ -28,18 +29,20 @@ pub async fn get_energy_balance( let balance = ctx .control_db() .get_energy_balance(&identity) - .await .map_err(log_and_500)? - .ok_or((StatusCode::NOT_FOUND, "No budget for identity"))?; + .unwrap_or(EnergyQuanta(0)); - let response_json = json!({ "balance": balance }); + let response_json = json!({ + // Note: balance must be returned as a string to avoid truncation. + "balance": balance.0.to_string(), + }); Ok(axum::Json(response_json)) } #[derive(Deserialize)] pub struct SetEnergyBalanceQueryParams { - balance: Option, + balance: Option, } pub async fn set_energy_balance( State(ctx): State>, @@ -61,15 +64,23 @@ pub async fn set_energy_balance( let identity = Identity::from(identity); - let balance = balance.unwrap_or(0); + let balance = balance + .map(|balance| balance.parse::()) + .transpose() + .map_err(|err| { + log::error!("Failed to parse balance: {:?}", err); + StatusCode::BAD_REQUEST + })?; + let balance = EnergyQuanta(balance.unwrap_or(0)); ctx.control_db() .set_energy_balance(identity, balance) + .await .map_err(log_and_500)?; - // Return the modified budget. let response_json = json!({ - "balance": balance, + // Note: balance must be returned as a string to avoid truncation. + "balance": balance.0.to_string(), }); Ok(axum::Json(response_json)) diff --git a/crates/core/src/control_db.rs b/crates/core/src/control_db.rs index e6af0d6643..f5e24629e0 100644 --- a/crates/core/src/control_db.rs +++ b/crates/core/src/control_db.rs @@ -1,6 +1,7 @@ use crate::address::Address; use crate::hash::hash_bytes; +use crate::host::EnergyQuanta; use crate::identity::Identity; use crate::messages::control_db::{Database, DatabaseInstance, EnergyBalance, IdentityEmail, Node}; use crate::stdb_path; @@ -34,6 +35,8 @@ pub enum Error { DecodingError(#[from] bsatn::DecodeError), #[error(transparent)] DomainParsingError(#[from] DomainParsingError), + #[error("connection error")] + ConnectionError(), #[error(transparent)] JSONDeserializationError(#[from] serde_json::Error), } @@ -515,10 +518,10 @@ impl ControlDb { continue; } }; - let Ok(arr) = <[u8; 8]>::try_from(balance_entry.1.as_ref()) else { + let Ok(arr) = <[u8; 16]>::try_from(balance_entry.1.as_ref()) else { return Err(Error::DecodingError(bsatn::DecodeError::BufferLength)); }; - let balance = i64::from_ne_bytes(arr); + let balance = i128::from_ne_bytes(arr); let energy_balance = EnergyBalance { identity: Identity::from_slice(balance_entry.0.iter().as_slice()), balance, @@ -531,16 +534,16 @@ impl ControlDb { /// Return the current budget for a given identity as stored in the db. /// Note: this function is for the stored budget only and should *only* be called by functions in /// `control_budget`, where a cached copy is stored along with business logic for managing it. - pub async fn get_energy_balance(&self, identity: &Identity) -> Result> { + pub fn get_energy_balance(&self, identity: &Identity) -> Result> { let tree = self.db.open_tree("energy_budget")?; let key = identity.to_hex(); let value = tree.get(key.as_bytes())?; if let Some(value) = value { - let Ok(arr) = <[u8; 8]>::try_from(value.as_ref()) else { + let Ok(arr) = <[u8; 16]>::try_from(value.as_ref()) else { return Err(Error::DecodingError(bsatn::DecodeError::BufferLength)); }; - let balance = i64::from_ne_bytes(arr); - Ok(Some(balance)) + let balance = i128::from_be_bytes(arr); + Ok(Some(EnergyQuanta(balance))) } else { Ok(None) } @@ -549,10 +552,10 @@ impl ControlDb { /// Update the stored current budget for a identity. /// Note: this function is for the stored budget only and should *only* be called by functions in /// `control_budget`, where a cached copy is stored along with business logic for managing it. - pub fn set_energy_balance(&self, identity: Identity, energy_balance: i64) -> Result<()> { + pub async fn set_energy_balance(&self, identity: Identity, energy_balance: EnergyQuanta) -> Result<()> { let tree = self.db.open_tree("energy_budget")?; let key = identity.to_hex(); - tree.insert(key, &energy_balance.to_be_bytes())?; + tree.insert(key, &energy_balance.0.to_be_bytes())?; Ok(()) } diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 536fbddadd..5020a92e78 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -18,7 +18,7 @@ use super::{EnergyMonitor, NullEnergyMonitor, ReducerArgs}; pub struct HostController { modules: Mutex>, - energy_monitor: Arc, + pub energy_monitor: Arc, } #[derive(PartialEq, Eq, Hash, Copy, Clone, Serialize, Debug)] @@ -58,20 +58,48 @@ impl fmt::Display for DescribedEntityType { } } +/// [EnergyQuanta] represents an amount of energy in a canonical unit. +/// It represents the smallest unit of energy that can be used to pay for +/// a reducer invocation. We will likely refer to this unit as an "eV". +/// +/// NOTE: This is represented by a signed integer, because it is possible +/// for a user's balance to go negative. This is allowable +/// for reasons of eventual consistency motivated by performance. #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub struct EnergyQuanta(pub u64); +pub struct EnergyQuanta(pub i128); impl EnergyQuanta { pub const ZERO: Self = EnergyQuanta(0); pub const DEFAULT_BUDGET: Self = EnergyQuanta(1_000_000_000_000_000_000); + + /// A conversion function to convert from the canonical unit to points used + /// by Wasmer to track energy usage. + pub fn as_points(&self) -> u64 { + if self.0 < 0 { + return 0; + } else if self.0 > u64::MAX as i128 { + return u64::MAX; + } + self.0 as u64 + } + + /// A conversion function to convert from point used + /// by Wasmer to track energy usage, to our canonical unit. + pub fn from_points(points: u64) -> Self { + Self(points as i128) + } } #[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord)] -pub struct EnergyDiff(pub u64); +pub struct EnergyDiff(pub i128); impl EnergyDiff { pub const ZERO: Self = EnergyDiff(0); + + pub fn as_quanta(self) -> EnergyQuanta { + EnergyQuanta(self.0) + } } impl Sub for EnergyQuanta { diff --git a/crates/core/src/host/wasmer/mod.rs b/crates/core/src/host/wasmer/mod.rs index 7b49191dfa..ff871cbca7 100644 --- a/crates/core/src/host/wasmer/mod.rs +++ b/crates/core/src/host/wasmer/mod.rs @@ -36,8 +36,8 @@ pub fn make_actor( // before calling reducer? // I believe we can just set this to be zero and it's already being set by reducers // but I don't want to break things, so I'm going to leave it. - let initial_points = EnergyQuanta::DEFAULT_BUDGET; - let metering = Arc::new(Metering::new(initial_points.0, cost_function)); + let initial_points = EnergyQuanta::DEFAULT_BUDGET.as_points(); + let metering = Arc::new(Metering::new(initial_points, cost_function)); // let mut compiler_config = wasmer_compiler_llvm::LLVM::default(); // compiler_config.opt_level(wasmer_compiler_llvm::LLVMOptLevel::Aggressive); diff --git a/crates/core/src/host/wasmer/wasmer_module.rs b/crates/core/src/host/wasmer/wasmer_module.rs index d6b6156bed..6309c2a135 100644 --- a/crates/core/src/host/wasmer/wasmer_module.rs +++ b/crates/core/src/host/wasmer/wasmer_module.rs @@ -11,18 +11,14 @@ use wasmer::{ }; use wasmer_middlewares::metering as wasmer_metering; -fn get_remaining_energy(ctx: &mut impl AsStoreMut, instance: &Instance) -> EnergyQuanta { +fn get_remaining_points(ctx: &mut impl AsStoreMut, instance: &Instance) -> u64 { let remaining_points = wasmer_metering::get_remaining_points(ctx, instance); match remaining_points { - wasmer_metering::MeteringPoints::Remaining(x) => EnergyQuanta(x), - wasmer_metering::MeteringPoints::Exhausted => EnergyQuanta::ZERO, + wasmer_metering::MeteringPoints::Remaining(x) => x, + wasmer_metering::MeteringPoints::Exhausted => 0, } } -fn set_remaining_energy(ctx: &mut impl AsStoreMut, instance: &Instance, energy: EnergyQuanta) { - wasmer_metering::set_remaining_points(ctx, instance, energy.0) -} - fn log_traceback(func_type: &str, func: &str, e: &RuntimeError) { let frames = e.trace(); let frames_len = frames.len(); @@ -181,8 +177,8 @@ impl module_host_actor::WasmInstancePre for WasmerModule { env.as_mut(&mut store).mem = Some(mem); // Note: this budget is just for initializers - let budget = EnergyQuanta::DEFAULT_BUDGET; - set_remaining_energy(&mut store, &instance, budget); + let budget = EnergyQuanta::DEFAULT_BUDGET.as_points(); + wasmer_metering::set_remaining_points(&mut store, &instance, budget); for preinit in &func_names.preinits { let func = instance.exports.get_typed_function::<(), ()>(&store, preinit).unwrap(); @@ -316,7 +312,8 @@ impl WasmerInstance { ) -> module_host_actor::ExecuteResult { let store = &mut self.store; let instance = &self.instance; - set_remaining_energy(store, instance, budget); + let budget = budget.as_points(); + wasmer_metering::set_remaining_points(store, instance, budget); let reduce = instance .exports @@ -348,10 +345,10 @@ impl WasmerInstance { // .call(store, sender_buf.ptr.cast(), timestamp, args_buf.ptr, args_buf.len) // .and_then(|_| {}); let duration = start.elapsed(); - let remaining = get_remaining_energy(store, instance); + let remaining = get_remaining_points(store, instance); let energy = module_host_actor::EnergyStats { - used: budget - remaining, - remaining, + used: EnergyQuanta::from_points(budget) - EnergyQuanta::from_points(remaining), + remaining: EnergyQuanta::from_points(remaining), }; module_host_actor::ExecuteResult { energy, diff --git a/crates/core/src/json/client_api.rs b/crates/core/src/json/client_api.rs index c1f348f102..cd1dfc237d 100644 --- a/crates/core/src/json/client_api.rs +++ b/crates/core/src/json/client_api.rs @@ -81,7 +81,7 @@ pub struct EventJson { pub status: String, // committed, failed pub caller_identity: String, // hex identity pub function_call: FunctionCallJson, - pub energy_quanta_used: u64, + pub energy_quanta_used: i128, pub message: String, } diff --git a/crates/core/src/messages/control_db.rs b/crates/core/src/messages/control_db.rs index de37e95ea2..fa6afa6947 100644 --- a/crates/core/src/messages/control_db.rs +++ b/crates/core/src/messages/control_db.rs @@ -13,8 +13,11 @@ pub struct IdentityEmail { #[derive(Clone, PartialEq, Serialize, Deserialize)] pub struct EnergyBalance { pub identity: Identity, - /// How much budget is remaining for this identity. - pub balance: i64, + /// The balance for this identity this identity. + /// NOTE: This is a signed integer, because it is possible + /// for a user's balance to go negative. This is allowable + /// for reasons of eventual consistency motivated by performance. + pub balance: i128, } #[derive(Clone, PartialEq, Serialize, Deserialize)] diff --git a/crates/core/src/messages/control_worker_api.rs b/crates/core/src/messages/control_worker_api.rs index f999411fb9..f77695b4e7 100644 --- a/crates/core/src/messages/control_worker_api.rs +++ b/crates/core/src/messages/control_worker_api.rs @@ -51,7 +51,7 @@ pub enum DeleteOperation { #[derive(Clone, PartialEq, Serialize, Deserialize)] pub struct EnergyBalanceUpdate { pub identity: Identity, - pub energy_balance: i64, + pub energy_balance: i128, } // A message to syncronize energy balances from control node to worker node. #[derive(Clone, PartialEq, Serialize, Deserialize)] @@ -66,5 +66,5 @@ pub struct EnergyWithdrawals { #[derive(Clone, PartialEq, Serialize, Deserialize)] pub struct EnergyWithdrawal { pub identity: Identity, - pub amount: i64, + pub amount: i128, } diff --git a/crates/standalone/src/energy_monitor.rs b/crates/standalone/src/energy_monitor.rs new file mode 100644 index 0000000000..638849b341 --- /dev/null +++ b/crates/standalone/src/energy_monitor.rs @@ -0,0 +1,79 @@ +use crate::StandaloneEnv; +use spacetimedb::host::{EnergyDiff, EnergyMonitor, EnergyMonitorFingerprint, EnergyQuanta}; +use spacetimedb_client_api::ControlNodeDelegate; +use std::{ + sync::{Arc, Mutex, Weak}, + time::Duration, +}; + +pub(crate) struct StandaloneEnergyMonitor { + inner: Arc>, +} + +impl StandaloneEnergyMonitor { + pub fn new() -> Self { + Self { + inner: Arc::new(Mutex::new(Inner { + standalone_env: Weak::new(), + })), + } + } + + pub fn set_standalone_env(&self, standalone_env: Arc) { + self.inner.lock().unwrap().set_standalone_env(standalone_env); + } +} + +impl EnergyMonitor for StandaloneEnergyMonitor { + fn reducer_budget(&self, _fingerprint: &EnergyMonitorFingerprint<'_>) -> EnergyQuanta { + // Infinitely large reducer budget in Standalone + EnergyQuanta(i128::max_value()) + } + + fn record( + &self, + fingerprint: &EnergyMonitorFingerprint<'_>, + energy_used: EnergyDiff, + _execution_duration: Duration, + ) { + if energy_used.0 == 0 { + return; + } + let module_identity = fingerprint.module_identity; + let standalone_env = { + self.inner + .lock() + .unwrap() + .standalone_env + .upgrade() + .expect("Worker env was dropped.") + }; + tokio::spawn(async move { + standalone_env + .withdraw_energy(&module_identity, energy_used.as_quanta()) + .await + .unwrap(); + }); + } +} + +struct Inner { + standalone_env: Weak, +} + +impl Inner { + pub fn set_standalone_env(&mut self, worker_env: Arc) { + self.standalone_env = Arc::downgrade(&worker_env); + } + + /// To be used if we ever want to enable reducer budgets in Standalone + fn _reducer_budget(&self, fingerprint: &EnergyMonitorFingerprint<'_>) -> EnergyQuanta { + let standalone_env = self.standalone_env.upgrade().expect("Standalone env was dropped."); + let balance = standalone_env + .control_db + .get_energy_balance(&fingerprint.module_identity) + .unwrap() + .unwrap_or(EnergyQuanta(0)); + EnergyQuanta(i128::max(balance.0, 0)) + } +} diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index 938a31be4c..74d055f819 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -1,7 +1,9 @@ +mod energy_monitor; pub mod routes; mod worker_db; use anyhow::Context; +use energy_monitor::StandaloneEnergyMonitor; use openssl::ec::{EcGroup, EcKey}; use openssl::nid::Nid; use openssl::pkey::PKey; @@ -13,9 +15,9 @@ use spacetimedb::database_instance_context::DatabaseInstanceContext; use spacetimedb::database_instance_context_controller::DatabaseInstanceContextController; use spacetimedb::db::db_metrics; use spacetimedb::hash::Hash; -use spacetimedb::host::UpdateDatabaseResult; use spacetimedb::host::UpdateOutcome; use spacetimedb::host::{scheduler::Scheduler, HostController}; +use spacetimedb::host::{EnergyQuanta, UpdateDatabaseResult}; use spacetimedb::identity::Identity; use spacetimedb::messages::control_db::{Database, DatabaseInstance, HostType, Node}; use spacetimedb::messages::worker_db::DatabaseInstanceState; @@ -42,21 +44,18 @@ pub struct StandaloneEnv { private_key: EncodingKey, } -// TODO: what kind of energy monitoring do we want standalone to have? -type StandaloneEnergyMonitor = spacetimedb::host::NullEnergyMonitor; - impl StandaloneEnv { - pub async fn init() -> anyhow::Result { + pub async fn init() -> anyhow::Result> { let worker_db = WorkerDb::init()?; let object_db = ObjectDb::init()?; let db_inst_ctx_controller = DatabaseInstanceContextController::new(); let control_db = ControlDb::new()?; - let energy_monitor = Arc::new(StandaloneEnergyMonitor::default()); - let host_controller = Arc::new(HostController::new(energy_monitor)); + let energy_monitor = Arc::new(StandaloneEnergyMonitor::new()); + let host_controller = Arc::new(HostController::new(energy_monitor.clone())); let client_actor_index = ClientActorIndex::new(); let sendgrid = SendGridController::new(); let (public_key, private_key) = get_or_create_keys()?; - Ok(Self { + let this = Arc::new(Self { worker_db, control_db, db_inst_ctx_controller, @@ -66,7 +65,9 @@ impl StandaloneEnv { sendgrid, public_key, private_key, - }) + }); + energy_monitor.set_standalone_env(this.clone()); + Ok(this) } } @@ -316,6 +317,17 @@ impl spacetimedb_client_api::ControlNodeDelegate for StandaloneEnv { self.control_db.alloc_spacetime_identity().await } + async fn withdraw_energy(&self, identity: &Identity, amount: EnergyQuanta) -> spacetimedb::control_db::Result<()> { + let energy_balance = self.control_db.get_energy_balance(identity)?; + let energy_balance = energy_balance.unwrap_or(EnergyQuanta(0)); + println!("Withdrawing {} energy from {}", amount.0, identity); + println!("Old balance: {}", energy_balance.0); + let new_balance = energy_balance - amount; + self.control_db + .set_energy_balance(*identity, new_balance.as_quanta()) + .await + } + fn public_key(&self) -> &DecodingKey { &self.public_key } diff --git a/crates/standalone/src/main.rs b/crates/standalone/src/main.rs index 3f51d38968..186a441bbc 100644 --- a/crates/standalone/src/main.rs +++ b/crates/standalone/src/main.rs @@ -8,7 +8,6 @@ use spacetimedb::worker_metrics; use spacetimedb_standalone::routes::router; use spacetimedb_standalone::StandaloneEnv; use std::net::TcpListener; -use std::sync::Arc; use tokio::runtime::Builder; use std::panic; @@ -51,7 +50,7 @@ async fn start(config: Config) -> anyhow::Result<()> { // Metrics for our use of db/. db_metrics::register_custom_metrics(); - let ctx = spacetimedb_client_api::ArcEnv(Arc::new(StandaloneEnv::init().await?)); + let ctx = spacetimedb_client_api::ArcEnv(StandaloneEnv::init().await?); let service = router().with_state(ctx).into_make_service(); diff --git a/crates/testing/src/modules.rs b/crates/testing/src/modules.rs index 6fc71f8811..ee156febc5 100644 --- a/crates/testing/src/modules.rs +++ b/crates/testing/src/modules.rs @@ -2,6 +2,7 @@ use std::future::Future; use std::io::{self, Write}; use std::path::PathBuf; use std::process::Command; +use std::sync::Arc; use spacetimedb::address::Address; use spacetimedb::client::{ClientActorId, ClientConnection, Protocol}; @@ -10,6 +11,7 @@ use spacetimedb::hash::hash_bytes; use spacetimedb::messages::control_db::HostType; use spacetimedb_client_api::{ControlCtx, ControlStateDelegate, WorkerCtx}; +use spacetimedb_standalone::StandaloneEnv; use tokio::runtime::{Builder, Runtime}; fn start_runtime() -> Runtime { @@ -95,6 +97,8 @@ pub fn compile(path: &str) { #[derive(Clone)] pub struct ModuleHandle { + // Needs to hold a reference to the standalone env. + _env: Arc, pub client: ClientConnection, pub db_address: Address, } @@ -121,7 +125,7 @@ impl ModuleHandle { pub async fn load_module(name: &str) -> ModuleHandle { crate::set_key_env_vars(); - let env = &spacetimedb_standalone::StandaloneEnv::init().await.unwrap(); + let env = spacetimedb_standalone::StandaloneEnv::init().await.unwrap(); let identity = env.control_db().alloc_spacetime_identity().await.unwrap(); let address = env.control_db().alloc_spacetime_address().await.unwrap(); let program_bytes = read_module(name); @@ -150,6 +154,7 @@ pub async fn load_module(name: &str) -> ModuleHandle { // the runtime on which a module was created and then we could add impl // for stuff like "get logs" or "get message log" ModuleHandle { + _env: env, client: ClientConnection::dummy(client_id, Protocol::Text, instance.id, module), db_address: address, }