Skip to content

Commit

Permalink
Remove the ever growing pile of memory in locking_tx_datastore (#38)
Browse files Browse the repository at this point in the history
* Remove the ever growing pile of memory in locking_tx_datastore

* Clippy lint

* cargo fmt

* Cleanups

* Fix lint

* Rename TxRecord::pv to product_value

---------

Co-authored-by: Boppy <no-reply@boppygames.gg>
Co-authored-by: Tyler Cloutier <cloutiertyler@aol.com>
  • Loading branch information
3 people authored Jul 1, 2023
1 parent 3dfe03d commit f54e0a7
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 172 deletions.
65 changes: 35 additions & 30 deletions crates/core/src/db/commit_log.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
use super::{
datastore::traits::MutTxDatastore,
datastore::traits::{MutTxDatastore, TxData},
message_log::MessageLog,
messages::{commit::Commit, transaction::Transaction},
messages::commit::Commit,
ostorage::ObjectDB,
};
use crate::{
db::{datastore::locking_tx_datastore::RowId, messages::write::Operation},
db::{
datastore::{locking_tx_datastore::RowId, traits::TxOp},
messages::{
transaction::Transaction,
write::{Operation, Write},
},
},
error::DBError,
};
use spacetimedb_lib::{hash::hash_bytes, DataKey};
use spacetimedb_lib::hash::hash_bytes;
use std::sync::Arc;
use std::sync::Mutex;

Expand All @@ -34,13 +40,13 @@ impl CommitLog {

/// Persist to disk the [Tx] result into the [MessageLog].
///
/// Returns `Some(n_bytes_written)` if `commit_result` was persisted, `false` if it doesn't have bytes to write.
/// Returns `Some(n_bytes_written)` if `commit_result` was persisted, `None` if it doesn't have bytes to write.
#[tracing::instrument(skip_all)]
pub fn append_tx<D>(&self, transaction: Arc<Transaction>, datastore: &D) -> Result<Option<usize>, DBError>
pub fn append_tx<D>(&self, tx_data: &TxData, datastore: &D) -> Result<Option<usize>, DBError>
where
D: MutTxDatastore<RowId = RowId>,
{
if let Some(bytes) = self.generate_commit(transaction, datastore) {
if let Some(bytes) = self.generate_commit(tx_data, datastore) {
let mut mlog = self.mlog.lock().unwrap();
mlog.append(&bytes)?;
mlog.sync_all()?;
Expand All @@ -51,37 +57,36 @@ impl CommitLog {
}
}

fn generate_commit<D>(&self, transaction: Arc<Transaction>, datastore: &D) -> Option<Vec<u8>>
where
D: MutTxDatastore<RowId = RowId>,
{
// TODO(george) Don't clone the data, just the Arc.
fn generate_commit<D: MutTxDatastore<RowId = RowId>>(&self, tx_data: &TxData, datastore: &D) -> Option<Vec<u8>> {
let mut unwritten_commit = self.unwritten_commit.lock().unwrap();
unwritten_commit.transactions.push(transaction.clone());
let writes = tx_data
.records
.iter()
.map(|record| Write {
operation: match record.op {
TxOp::Insert(_) => Operation::Insert,
TxOp::Delete => Operation::Delete,
},
set_id: record.table_id.0,
data_key: record.key,
})
.collect();
let transaction = Transaction { writes };
unwritten_commit.transactions.push(Arc::new(transaction));

const COMMIT_SIZE: usize = 1;

let tx = datastore.begin_mut_tx();
if unwritten_commit.transactions.len() >= COMMIT_SIZE {
let mut datas = Vec::new();
for write in transaction.writes.iter() {
match write.operation {
Operation::Delete => continue, // if we deleted a value, then the data is not in the datastore
Operation::Insert => (),
}
let data = match write.data_key {
DataKey::Data(data) => Arc::new(data.to_vec()),
DataKey::Hash(_) => datastore
.resolve_data_key_mut_tx(&tx, &write.data_key)
.unwrap()
.unwrap(),
};
datas.push(data)
}
{
let mut guard = self.odb.lock().unwrap();
for data in datas.into_iter() {
guard.add((*data).clone());
for record in &tx_data.records {
match &record.op {
TxOp::Insert(bytes) => {
guard.add(Vec::clone(bytes));
}
TxOp::Delete => continue,
}
}
}

Expand Down
Loading

0 comments on commit f54e0a7

Please sign in to comment.