Skip to content

Commit

Permalink
Energy metering for persistent memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
coolreader18 committed Oct 8, 2024
1 parent ab3a0a8 commit ddaeee4
Show file tree
Hide file tree
Showing 24 changed files with 629 additions and 11 deletions.
7 changes: 7 additions & 0 deletions crates/client-api-messages/src/energy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ impl EnergyQuanta {
let energy = bytes_stored * sec + (bytes_stored * nsec) / 1_000_000_000;
Self::new(energy)
}

const ENERGY_PER_MEM_BYTE_SEC: u128 = 100;

pub fn from_memory_usage(bytes_stored: u64, storage_period: Duration) -> Self {
let byte_seconds = Self::from_disk_usage(bytes_stored, storage_period).get();
Self::new(byte_seconds * Self::ENERGY_PER_MEM_BYTE_SEC)
}
}

impl fmt::Display for EnergyQuanta {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use spacetimedb_table::{
blob_store::{BlobStore, HashMapBlobStore},
indexes::{RowPointer, SquashedOffset},
table::{IndexScanIter, InsertError, RowRef, Table},
MemoryUsage,
};
use std::collections::BTreeMap;
use std::sync::Arc;
Expand All @@ -55,6 +56,18 @@ pub struct CommittedState {
pub(super) index_id_map: IndexIdMap,
}

impl MemoryUsage for CommittedState {
fn memory_usage(&self) -> usize {
let Self {
next_tx_offset,
tables,
blob_store,
index_id_map,
} = self;
next_tx_offset.memory_usage() + tables.memory_usage() + blob_store.memory_usage() + index_id_map.memory_usage()
}
}

impl StateView for CommittedState {
fn get_schema(&self, table_id: TableId) -> Option<&Arc<TableSchema>> {
self.tables.get(&table_id).map(|table| table.get_schema())
Expand Down
16 changes: 16 additions & 0 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use spacetimedb_snapshot::ReconstructedSnapshot;
use spacetimedb_table::{
indexes::RowPointer,
table::{RowRef, Table},
MemoryUsage,
};
use std::time::{Duration, Instant};
use std::{borrow::Cow, sync::Arc};
Expand Down Expand Up @@ -64,6 +65,21 @@ pub struct Locking {
database_address: Address,
}

impl MemoryUsage for Locking {
fn memory_usage(&self) -> usize {
let Self {
committed_state,
sequence_state,
database_address,
} = self;
std::mem::size_of_val(&**committed_state)
+ committed_state.read().memory_usage()
+ std::mem::size_of_val(&**sequence_state)
+ sequence_state.lock().memory_usage()
+ database_address.memory_usage()
}
}

impl Locking {
pub fn new(database_address: Address) -> Self {
Self {
Expand Down
16 changes: 16 additions & 0 deletions crates/core/src/db/datastore/locking_tx_datastore/sequence.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
use spacetimedb_data_structures::map::IntMap;
use spacetimedb_primitives::SequenceId;
use spacetimedb_schema::schema::SequenceSchema;
use spacetimedb_table::MemoryUsage;

pub(super) struct Sequence {
schema: SequenceSchema,
pub(super) value: i128,
}

impl MemoryUsage for Sequence {
fn memory_usage(&self) -> usize {
// MEMUSE: intentionally ignoring schema
let Self { schema: _, value } = self;
value.memory_usage()
}
}

impl Sequence {
pub(super) fn new(schema: SequenceSchema) -> Self {
Self {
Expand Down Expand Up @@ -102,6 +111,13 @@ pub(super) struct SequencesState {
sequences: IntMap<SequenceId, Sequence>,
}

impl MemoryUsage for SequencesState {
fn memory_usage(&self) -> usize {
let Self { sequences } = self;
sequences.memory_usage()
}
}

impl SequencesState {
pub(super) fn get_sequence_mut(&mut self, seq_id: SequenceId) -> Option<&mut Sequence> {
self.sequences.get_mut(&seq_id)
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use spacetimedb_schema::schema::{IndexSchema, Schema, SequenceSchema, TableSchem
use spacetimedb_snapshot::{SnapshotError, SnapshotRepository};
use spacetimedb_table::indexes::RowPointer;
use spacetimedb_table::table::RowRef;
use spacetimedb_table::MemoryUsage;
use std::borrow::Cow;
use std::collections::HashSet;
use std::fmt;
Expand Down Expand Up @@ -490,6 +491,11 @@ impl RelationalDB {
self.disk_size_fn.as_ref().map_or(Ok(0), |f| f())
}

/// The size in bytes of all of the in-memory data in this database.
pub fn size_in_memory(&self) -> usize {
self.inner.memory_usage()
}

pub fn encode_row(row: &ProductValue, bytes: &mut Vec<u8>) {
// TODO: large file storage of the row elements
row.encode(bytes);
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/energy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub trait EnergyMonitor: Send + Sync + 'static {
execution_duration: Duration,
);
fn record_disk_usage(&self, database: &Database, replica_id: u64, disk_usage: u64, period: Duration);
fn record_memory_usage(&self, database: &Database, replica_id: u64, mem_usage: u64, period: Duration);
}

#[derive(Default)]
Expand All @@ -40,4 +41,6 @@ impl EnergyMonitor for NullEnergyMonitor {
}

fn record_disk_usage(&self, _database: &Database, _replica_id: u64, _disk_usage: u64, _period: Duration) {}

fn record_memory_usage(&self, _database: &Database, _replica_id: u64, _mem_usage: u64, _period: Duration) {}
}
14 changes: 8 additions & 6 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ impl Host {
}

scheduler_starter.start(&module_host)?;
let metrics_task = tokio::spawn(disk_monitor(replica_ctx.clone(), energy_monitor.clone())).abort_handle();
let metrics_task = tokio::spawn(storage_monitor(replica_ctx.clone(), energy_monitor.clone())).abort_handle();

Ok(Host {
module: watch::Sender::new(module_host),
Expand Down Expand Up @@ -826,22 +826,23 @@ impl Drop for Host {
}
}

const DISK_METERING_INTERVAL: Duration = Duration::from_secs(5);
const STORAGE_METERING_INTERVAL: Duration = Duration::from_secs(5);

/// Periodically collect the disk usage of `replica_ctx` and update metrics as well as
/// the `energy_monitor` accordingly.
async fn disk_monitor(replica_ctx: Arc<ReplicaContext>, energy_monitor: Arc<dyn EnergyMonitor>) {
let mut interval = tokio::time::interval(DISK_METERING_INTERVAL);
async fn storage_monitor(replica_ctx: Arc<ReplicaContext>, energy_monitor: Arc<dyn EnergyMonitor>) {
let mut interval = tokio::time::interval(STORAGE_METERING_INTERVAL);
// We don't care about happening precisely every 5 seconds - it just matters
// that the time between ticks is accurate.
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

let mut prev_disk_usage = replica_ctx.total_disk_usage();
let mut prev_disk_usage = tokio::task::block_in_place(|| replica_ctx.total_disk_usage());
let mut prev_tick = interval.tick().await;
loop {
let tick = interval.tick().await;
let dt = tick - prev_tick;
let disk_usage = tokio::task::block_in_place(|| replica_ctx.total_disk_usage());
let (disk_usage, mem_usage) =
tokio::task::block_in_place(|| (replica_ctx.total_disk_usage(), replica_ctx.mem_usage()));
if let Some(num_bytes) = disk_usage.durability {
DB_METRICS
.message_log_size
Expand All @@ -856,6 +857,7 @@ async fn disk_monitor(replica_ctx: Arc<ReplicaContext>, energy_monitor: Arc<dyn
}
let disk_usage = disk_usage.or(prev_disk_usage);
energy_monitor.record_disk_usage(&replica_ctx.database, replica_ctx.replica_id, disk_usage.sum(), dt);
energy_monitor.record_memory_usage(&replica_ctx.database, replica_ctx.replica_id, mem_usage as u64, dt);
prev_disk_usage = disk_usage;
prev_tick = tick;
}
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/replica_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ impl ReplicaContext {
logs: self.log_file_size().ok(),
}
}

/// The size in bytes of all of the in-memory data of the database.
pub fn mem_usage(&self) -> usize {
self.relational_db.size_in_memory()
}
}

impl Deref for ReplicaContext {
Expand Down
8 changes: 8 additions & 0 deletions crates/primitives/src/col_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,14 @@ impl ColList {
let addr = unsafe { self.check };
addr & 1 != 0
}

#[doc(hidden)]
pub fn heap_size(&self) -> usize {
match self.as_inline() {
Ok(_) => 0,
Err(heap) => heap.capacity() as usize,
}
}
}

impl Drop for ColList {
Expand Down
5 changes: 5 additions & 0 deletions crates/standalone/src/energy_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ impl EnergyMonitor for StandaloneEnergyMonitor {
let amount = EnergyQuanta::from_disk_usage(disk_usage, period);
self.withdraw_energy(database.owner_identity, amount)
}

fn record_memory_usage(&self, database: &Database, _instance_id: u64, mem_usage: u64, period: Duration) {
let amount = EnergyQuanta::from_memory_usage(mem_usage, period);
self.withdraw_energy(database.owner_identity, amount)
}
}

impl StandaloneEnergyMonitor {
Expand Down
11 changes: 11 additions & 0 deletions crates/table/src/bflatn_to_bsatn_fast_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
//! one of 20 bytes to copy the leading `(u64, u64, u32)`, which contains no padding,
//! and then one of 8 bytes to copy the trailing `u64`, skipping over 4 bytes of padding in between.

use crate::MemoryUsage;

use super::{
indexes::{Byte, Bytes},
layout::{
Expand Down Expand Up @@ -47,6 +49,13 @@ pub(crate) struct StaticBsatnLayout {
fields: Box<[MemcpyField]>,
}

impl MemoryUsage for StaticBsatnLayout {
fn memory_usage(&self) -> usize {
let Self { bsatn_length, fields } = self;
bsatn_length.memory_usage() + fields.memory_usage()
}
}

impl StaticBsatnLayout {
/// Serialize `row` from BFLATN to BSATN into `buf`.
///
Expand Down Expand Up @@ -156,6 +165,8 @@ struct MemcpyField {
length: u16,
}

impl MemoryUsage for MemcpyField {}

impl MemcpyField {
/// Copies the bytes at `row[self.bflatn_offset .. self.bflatn_offset + self.length]`
/// into `buf[self.bsatn_offset + self.length]`.
Expand Down
18 changes: 18 additions & 0 deletions crates/table/src/blob_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use blake3::hash;
use spacetimedb_data_structures::map::{Entry, HashMap};
use spacetimedb_lib::{de::Deserialize, ser::Serialize};

use crate::MemoryUsage;

/// The content address of a blob-stored object.
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash, Debug, Serialize, Deserialize)]
pub struct BlobHash {
Expand All @@ -24,6 +26,8 @@ pub struct BlobHash {
pub data: [u8; BlobHash::SIZE],
}

impl MemoryUsage for BlobHash {}

impl BlobHash {
/// The size of the hash function's output in bytes.
pub const SIZE: usize = 32;
Expand Down Expand Up @@ -142,6 +146,13 @@ pub struct HashMapBlobStore {
map: HashMap<BlobHash, BlobObject>,
}

impl MemoryUsage for HashMapBlobStore {
fn memory_usage(&self) -> usize {
let Self { map } = self;
map.memory_usage()
}
}

/// A blob object including a reference count and the data.
struct BlobObject {
/// Reference count of the blob.
Expand All @@ -150,6 +161,13 @@ struct BlobObject {
blob: Box<[u8]>,
}

impl MemoryUsage for BlobObject {
fn memory_usage(&self) -> usize {
let Self { uses, blob } = self;
uses.memory_usage() + blob.memory_usage()
}
}

impl BlobStore for HashMapBlobStore {
fn clone_blob(&mut self, hash: &BlobHash) -> Result<(), NoSuchBlobError> {
self.map.get_mut(hash).ok_or(NoSuchBlobError)?.uses += 1;
Expand Down
36 changes: 35 additions & 1 deletion crates/table/src/btree_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

use super::indexes::RowPointer;
use super::table::RowRef;
use crate::{read_column::ReadColumn, static_assert_size};
use crate::{read_column::ReadColumn, static_assert_size, MemoryUsage};
use core::ops::RangeBounds;
use spacetimedb_primitives::{ColList, IndexId};
use spacetimedb_sats::{
Expand Down Expand Up @@ -127,6 +127,28 @@ enum TypedIndex {
AlgebraicValue(Index<AlgebraicValue>),
}

impl MemoryUsage for TypedIndex {
fn memory_usage(&self) -> usize {
match self {
TypedIndex::Bool(this) => this.memory_usage(),
TypedIndex::U8(this) => this.memory_usage(),
TypedIndex::I8(this) => this.memory_usage(),
TypedIndex::U16(this) => this.memory_usage(),
TypedIndex::I16(this) => this.memory_usage(),
TypedIndex::U32(this) => this.memory_usage(),
TypedIndex::I32(this) => this.memory_usage(),
TypedIndex::U64(this) => this.memory_usage(),
TypedIndex::I64(this) => this.memory_usage(),
TypedIndex::U128(this) => this.memory_usage(),
TypedIndex::I128(this) => this.memory_usage(),
TypedIndex::U256(this) => this.memory_usage(),
TypedIndex::I256(this) => this.memory_usage(),
TypedIndex::String(this) => this.memory_usage(),
TypedIndex::AlgebraicValue(this) => this.memory_usage(),
}
}
}

impl TypedIndex {
/// Add the row referred to by `row_ref` to the index `self`,
/// which must be keyed at `cols`.
Expand Down Expand Up @@ -329,6 +351,18 @@ pub struct BTreeIndex {
pub key_type: AlgebraicType,
}

impl MemoryUsage for BTreeIndex {
fn memory_usage(&self) -> usize {
let Self {
index_id,
is_unique,
idx,
key_type,
} = self;
index_id.memory_usage() + is_unique.memory_usage() + idx.memory_usage() + key_type.memory_usage()
}
}

static_assert_size!(BTreeIndex, 64);

impl BTreeIndex {
Expand Down
9 changes: 9 additions & 0 deletions crates/table/src/btree_index/multimap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use core::slice;
use smallvec::SmallVec;
use std::collections::btree_map::{BTreeMap, Range};

use crate::MemoryUsage;

/// A multi map that relates a `K` to a *set* of `V`s.
#[derive(Default)]
pub struct MultiMap<K, V> {
Expand All @@ -15,6 +17,13 @@ pub struct MultiMap<K, V> {
map: BTreeMap<K, SmallVec<[V; 1]>>,
}

impl<K: MemoryUsage, V: MemoryUsage> MemoryUsage for MultiMap<K, V> {
fn memory_usage(&self) -> usize {
let Self { map } = self;
map.memory_usage()
}
}

impl<K: Ord, V: Ord> MultiMap<K, V> {
/// Returns an empty multi map.
pub fn new() -> Self {
Expand Down
Loading

0 comments on commit ddaeee4

Please sign in to comment.