Skip to content

Commit

Permalink
Reimplemented the energy tracking for identities (#115)
Browse files Browse the repository at this point in the history
* Refactoring some stuff for energy

* Fix an issue with i128 query params

* Infinite budget in Standalone

* Energy and crash fixes

* Hopefully fixed the test that now has energy

* Addresses Centril's comments

* Cargo fmt

---------

Signed-off-by: Tyler Cloutier <cloutiertyler@users.noreply.github.com>
  • Loading branch information
cloutiertyler authored Aug 1, 2023
1 parent 1e7cf1e commit d205670
Show file tree
Hide file tree
Showing 17 changed files with 210 additions and 57 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions crates/cli/src/subcommands/energy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn get_energy_subcommands() -> Vec<clap::Command> {
.arg(
Arg::new("balance")
.required(true)
.value_parser(value_parser!(u64))
.value_parser(value_parser!(i128))
.help("The balance value to set"),
)
.arg(
Expand Down Expand Up @@ -63,7 +63,7 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error
async fn exec_update_balance(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error> {
// let project_name = args.value_of("project name").unwrap();
let hex_id = args.get_one::<String>("identity");
let balance = *args.get_one::<u64>("balance").unwrap();
let balance = *args.get_one::<i128>("balance").unwrap();
let quiet = args.get_flag("quiet");

let hex_id = hex_id_or_default(hex_id, &config);
Expand Down Expand Up @@ -102,7 +102,7 @@ pub(super) async fn set_balance(
client: &reqwest::Client,
config: &Config,
hex_identity: &str,
balance: u64,
balance: i128,
) -> anyhow::Result<reqwest::Response> {
// TODO: this really should be form data in POST body, not query string parameter, but gotham
// does not support that on the server side without an extension.
Expand Down
1 change: 1 addition & 0 deletions crates/client-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ futures = "0.3"
bytes = "1"
bytestring = "1"
tokio-tungstenite = "0.18.0"
itoa = "1.0.9"
6 changes: 5 additions & 1 deletion crates/client-api/src/auth.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::fmt::Write;
use std::time::Duration;

use axum::extract::rejection::{TypedHeaderRejection, TypedHeaderRejectionReason};
Expand All @@ -6,6 +7,7 @@ use axum::headers::authorization::Credentials;
use axum::headers::{self, authorization};
use axum::response::IntoResponse;
use axum::TypedHeader;
use bytes::BytesMut;
use http::{request, HeaderValue, StatusCode};
use serde::Deserialize;
use spacetimedb::auth::identity::{
Expand Down Expand Up @@ -243,7 +245,9 @@ impl headers::Header for SpacetimeEnergyUsed {
}

fn encode<E: Extend<HeaderValue>>(&self, values: &mut E) {
values.extend([self.0 .0.into()])
let mut buf = BytesMut::new();
let _ = buf.write_str(itoa::Buffer::new().format(self.0 .0));
values.extend([HeaderValue::from_bytes(&buf).unwrap()]);
}
}

Expand Down
12 changes: 11 additions & 1 deletion crates/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use spacetimedb::client::ClientActorIndex;
use spacetimedb::control_db::ControlDb;
use spacetimedb::database_instance_context_controller::DatabaseInstanceContextController;
use spacetimedb::hash::Hash;
use spacetimedb::host::HostController;
use spacetimedb::host::UpdateDatabaseResult;
use spacetimedb::host::{EnergyQuanta, HostController};
use spacetimedb::identity::Identity;
use spacetimedb::messages::control_db::{Database, DatabaseInstance, HostType, Node};
use spacetimedb::messages::worker_db::DatabaseInstanceState;
Expand Down Expand Up @@ -90,6 +90,8 @@ pub trait ControlNodeDelegate: Send + Sync {

async fn alloc_spacetime_identity(&self) -> spacetimedb::control_db::Result<Identity>;

async fn withdraw_energy(&self, identity: &Identity, amount: EnergyQuanta) -> spacetimedb::control_db::Result<()>;

fn public_key(&self) -> &DecodingKey;
fn private_key(&self) -> &EncodingKey;
}
Expand Down Expand Up @@ -123,6 +125,10 @@ impl<T: ControlNodeDelegate + ?Sized> ControlNodeDelegate for ArcEnv<T> {
self.0.alloc_spacetime_identity().await
}

async fn withdraw_energy(&self, identity: &Identity, amount: EnergyQuanta) -> spacetimedb::control_db::Result<()> {
self.0.withdraw_energy(identity, amount).await
}

fn public_key(&self) -> &DecodingKey {
self.0.public_key()
}
Expand All @@ -141,6 +147,10 @@ impl<T: ControlNodeDelegate + ?Sized> ControlNodeDelegate for Arc<T> {
(**self).alloc_spacetime_identity().await
}

async fn withdraw_energy(&self, identity: &Identity, amount: EnergyQuanta) -> spacetimedb::control_db::Result<()> {
(**self).withdraw_energy(identity, amount).await
}

fn public_key(&self) -> &DecodingKey {
(**self).public_key()
}
Expand Down
25 changes: 18 additions & 7 deletions crates/client-api/src/routes/energy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use http::StatusCode;
use serde::Deserialize;
use serde_json::json;

use spacetimedb::host::EnergyQuanta;
use spacetimedb_lib::Identity;

use crate::auth::SpacetimeAuthHeader;
Expand All @@ -28,18 +29,20 @@ pub async fn get_energy_balance(
let balance = ctx
.control_db()
.get_energy_balance(&identity)
.await
.map_err(log_and_500)?
.ok_or((StatusCode::NOT_FOUND, "No budget for identity"))?;
.unwrap_or(EnergyQuanta(0));

let response_json = json!({ "balance": balance });
let response_json = json!({
// Note: balance must be returned as a string to avoid truncation.
"balance": balance.0.to_string(),
});

Ok(axum::Json(response_json))
}

#[derive(Deserialize)]
pub struct SetEnergyBalanceQueryParams {
balance: Option<i64>,
balance: Option<String>,
}
pub async fn set_energy_balance(
State(ctx): State<Arc<dyn ControlCtx>>,
Expand All @@ -61,15 +64,23 @@ pub async fn set_energy_balance(

let identity = Identity::from(identity);

let balance = balance.unwrap_or(0);
let balance = balance
.map(|balance| balance.parse::<i128>())
.transpose()
.map_err(|err| {
log::error!("Failed to parse balance: {:?}", err);
StatusCode::BAD_REQUEST
})?;
let balance = EnergyQuanta(balance.unwrap_or(0));

ctx.control_db()
.set_energy_balance(identity, balance)
.await
.map_err(log_and_500)?;

// Return the modified budget.
let response_json = json!({
"balance": balance,
// Note: balance must be returned as a string to avoid truncation.
"balance": balance.0.to_string(),
});

Ok(axum::Json(response_json))
Expand Down
19 changes: 11 additions & 8 deletions crates/core/src/control_db.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::address::Address;

use crate::hash::hash_bytes;
use crate::host::EnergyQuanta;
use crate::identity::Identity;
use crate::messages::control_db::{Database, DatabaseInstance, EnergyBalance, IdentityEmail, Node};
use crate::stdb_path;
Expand Down Expand Up @@ -34,6 +35,8 @@ pub enum Error {
DecodingError(#[from] bsatn::DecodeError),
#[error(transparent)]
DomainParsingError(#[from] DomainParsingError),
#[error("connection error")]
ConnectionError(),
#[error(transparent)]
JSONDeserializationError(#[from] serde_json::Error),
}
Expand Down Expand Up @@ -515,10 +518,10 @@ impl ControlDb {
continue;
}
};
let Ok(arr) = <[u8; 8]>::try_from(balance_entry.1.as_ref()) else {
let Ok(arr) = <[u8; 16]>::try_from(balance_entry.1.as_ref()) else {
return Err(Error::DecodingError(bsatn::DecodeError::BufferLength));
};
let balance = i64::from_ne_bytes(arr);
let balance = i128::from_ne_bytes(arr);
let energy_balance = EnergyBalance {
identity: Identity::from_slice(balance_entry.0.iter().as_slice()),
balance,
Expand All @@ -531,16 +534,16 @@ impl ControlDb {
/// Return the current budget for a given identity as stored in the db.
/// Note: this function is for the stored budget only and should *only* be called by functions in
/// `control_budget`, where a cached copy is stored along with business logic for managing it.
pub async fn get_energy_balance(&self, identity: &Identity) -> Result<Option<i64>> {
pub fn get_energy_balance(&self, identity: &Identity) -> Result<Option<EnergyQuanta>> {
let tree = self.db.open_tree("energy_budget")?;
let key = identity.to_hex();
let value = tree.get(key.as_bytes())?;
if let Some(value) = value {
let Ok(arr) = <[u8; 8]>::try_from(value.as_ref()) else {
let Ok(arr) = <[u8; 16]>::try_from(value.as_ref()) else {
return Err(Error::DecodingError(bsatn::DecodeError::BufferLength));
};
let balance = i64::from_ne_bytes(arr);
Ok(Some(balance))
let balance = i128::from_be_bytes(arr);
Ok(Some(EnergyQuanta(balance)))
} else {
Ok(None)
}
Expand All @@ -549,10 +552,10 @@ impl ControlDb {
/// Update the stored current budget for a identity.
/// Note: this function is for the stored budget only and should *only* be called by functions in
/// `control_budget`, where a cached copy is stored along with business logic for managing it.
pub fn set_energy_balance(&self, identity: Identity, energy_balance: i64) -> Result<()> {
pub async fn set_energy_balance(&self, identity: Identity, energy_balance: EnergyQuanta) -> Result<()> {
let tree = self.db.open_tree("energy_budget")?;
let key = identity.to_hex();
tree.insert(key, &energy_balance.to_be_bytes())?;
tree.insert(key, &energy_balance.0.to_be_bytes())?;

Ok(())
}
Expand Down
34 changes: 31 additions & 3 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use super::{EnergyMonitor, NullEnergyMonitor, ReducerArgs};

pub struct HostController {
modules: Mutex<HashMap<u64, ModuleHost>>,
energy_monitor: Arc<dyn EnergyMonitor>,
pub energy_monitor: Arc<dyn EnergyMonitor>,
}

#[derive(PartialEq, Eq, Hash, Copy, Clone, Serialize, Debug)]
Expand Down Expand Up @@ -58,20 +58,48 @@ impl fmt::Display for DescribedEntityType {
}
}

/// [EnergyQuanta] represents an amount of energy in a canonical unit.
/// It represents the smallest unit of energy that can be used to pay for
/// a reducer invocation. We will likely refer to this unit as an "eV".
///
/// NOTE: This is represented by a signed integer, because it is possible
/// for a user's balance to go negative. This is allowable
/// for reasons of eventual consistency motivated by performance.
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct EnergyQuanta(pub u64);
pub struct EnergyQuanta(pub i128);

impl EnergyQuanta {
pub const ZERO: Self = EnergyQuanta(0);

pub const DEFAULT_BUDGET: Self = EnergyQuanta(1_000_000_000_000_000_000);

/// A conversion function to convert from the canonical unit to points used
/// by Wasmer to track energy usage.
pub fn as_points(&self) -> u64 {
if self.0 < 0 {
return 0;
} else if self.0 > u64::MAX as i128 {
return u64::MAX;
}
self.0 as u64
}

/// A conversion function to convert from point used
/// by Wasmer to track energy usage, to our canonical unit.
pub fn from_points(points: u64) -> Self {
Self(points as i128)
}
}

#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
pub struct EnergyDiff(pub u64);
pub struct EnergyDiff(pub i128);

impl EnergyDiff {
pub const ZERO: Self = EnergyDiff(0);

pub fn as_quanta(self) -> EnergyQuanta {
EnergyQuanta(self.0)
}
}

impl Sub for EnergyQuanta {
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/host/wasmer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ pub fn make_actor(
// before calling reducer?
// I believe we can just set this to be zero and it's already being set by reducers
// but I don't want to break things, so I'm going to leave it.
let initial_points = EnergyQuanta::DEFAULT_BUDGET;
let metering = Arc::new(Metering::new(initial_points.0, cost_function));
let initial_points = EnergyQuanta::DEFAULT_BUDGET.as_points();
let metering = Arc::new(Metering::new(initial_points, cost_function));

// let mut compiler_config = wasmer_compiler_llvm::LLVM::default();
// compiler_config.opt_level(wasmer_compiler_llvm::LLVMOptLevel::Aggressive);
Expand Down
23 changes: 10 additions & 13 deletions crates/core/src/host/wasmer/wasmer_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,14 @@ use wasmer::{
};
use wasmer_middlewares::metering as wasmer_metering;

fn get_remaining_energy(ctx: &mut impl AsStoreMut, instance: &Instance) -> EnergyQuanta {
fn get_remaining_points(ctx: &mut impl AsStoreMut, instance: &Instance) -> u64 {
let remaining_points = wasmer_metering::get_remaining_points(ctx, instance);
match remaining_points {
wasmer_metering::MeteringPoints::Remaining(x) => EnergyQuanta(x),
wasmer_metering::MeteringPoints::Exhausted => EnergyQuanta::ZERO,
wasmer_metering::MeteringPoints::Remaining(x) => x,
wasmer_metering::MeteringPoints::Exhausted => 0,
}
}

fn set_remaining_energy(ctx: &mut impl AsStoreMut, instance: &Instance, energy: EnergyQuanta) {
wasmer_metering::set_remaining_points(ctx, instance, energy.0)
}

fn log_traceback(func_type: &str, func: &str, e: &RuntimeError) {
let frames = e.trace();
let frames_len = frames.len();
Expand Down Expand Up @@ -181,8 +177,8 @@ impl module_host_actor::WasmInstancePre for WasmerModule {
env.as_mut(&mut store).mem = Some(mem);

// Note: this budget is just for initializers
let budget = EnergyQuanta::DEFAULT_BUDGET;
set_remaining_energy(&mut store, &instance, budget);
let budget = EnergyQuanta::DEFAULT_BUDGET.as_points();
wasmer_metering::set_remaining_points(&mut store, &instance, budget);

for preinit in &func_names.preinits {
let func = instance.exports.get_typed_function::<(), ()>(&store, preinit).unwrap();
Expand Down Expand Up @@ -316,7 +312,8 @@ impl WasmerInstance {
) -> module_host_actor::ExecuteResult<RuntimeError> {
let store = &mut self.store;
let instance = &self.instance;
set_remaining_energy(store, instance, budget);
let budget = budget.as_points();
wasmer_metering::set_remaining_points(store, instance, budget);

let reduce = instance
.exports
Expand Down Expand Up @@ -348,10 +345,10 @@ impl WasmerInstance {
// .call(store, sender_buf.ptr.cast(), timestamp, args_buf.ptr, args_buf.len)
// .and_then(|_| {});
let duration = start.elapsed();
let remaining = get_remaining_energy(store, instance);
let remaining = get_remaining_points(store, instance);
let energy = module_host_actor::EnergyStats {
used: budget - remaining,
remaining,
used: EnergyQuanta::from_points(budget) - EnergyQuanta::from_points(remaining),
remaining: EnergyQuanta::from_points(remaining),
};
module_host_actor::ExecuteResult {
energy,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/json/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub struct EventJson {
pub status: String, // committed, failed
pub caller_identity: String, // hex identity
pub function_call: FunctionCallJson,
pub energy_quanta_used: u64,
pub energy_quanta_used: i128,
pub message: String,
}

Expand Down
7 changes: 5 additions & 2 deletions crates/core/src/messages/control_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ pub struct IdentityEmail {
#[derive(Clone, PartialEq, Serialize, Deserialize)]
pub struct EnergyBalance {
pub identity: Identity,
/// How much budget is remaining for this identity.
pub balance: i64,
/// The balance for this identity this identity.
/// NOTE: This is a signed integer, because it is possible
/// for a user's balance to go negative. This is allowable
/// for reasons of eventual consistency motivated by performance.
pub balance: i128,
}

#[derive(Clone, PartialEq, Serialize, Deserialize)]
Expand Down
Loading

0 comments on commit d205670

Please sign in to comment.