From f54e0a785af4c68341ed6aedf50241222089c8d4 Mon Sep 17 00:00:00 2001 From: George Kulakowski Date: Sat, 1 Jul 2023 00:26:43 -0700 Subject: [PATCH] Remove the ever growing pile of memory in locking_tx_datastore (#38) * Remove the ever growing pile of memory in locking_tx_datastore * Clippy lint * cargo fmt * Cleanups * Fix lint * Rename TxRecord::pv to product_value --------- Co-authored-by: Boppy Co-authored-by: Tyler Cloutier --- crates/core/src/db/commit_log.rs | 65 +++---- .../db/datastore/locking_tx_datastore/mod.rs | 159 +++++++++--------- .../datastore/locking_tx_datastore/table.rs | 5 +- crates/core/src/db/datastore/traits.rs | 32 +++- crates/core/src/db/relational_db.rs | 18 +- crates/core/src/host/module_host.rs | 43 ++--- .../src/host/wasm_common/module_host_actor.rs | 5 +- crates/core/src/subscription/query.rs | 14 +- 8 files changed, 169 insertions(+), 172 deletions(-) diff --git a/crates/core/src/db/commit_log.rs b/crates/core/src/db/commit_log.rs index 2245465168..55f3c09e05 100644 --- a/crates/core/src/db/commit_log.rs +++ b/crates/core/src/db/commit_log.rs @@ -1,14 +1,20 @@ use super::{ - datastore::traits::MutTxDatastore, + datastore::traits::{MutTxDatastore, TxData}, message_log::MessageLog, - messages::{commit::Commit, transaction::Transaction}, + messages::commit::Commit, ostorage::ObjectDB, }; use crate::{ - db::{datastore::locking_tx_datastore::RowId, messages::write::Operation}, + db::{ + datastore::{locking_tx_datastore::RowId, traits::TxOp}, + messages::{ + transaction::Transaction, + write::{Operation, Write}, + }, + }, error::DBError, }; -use spacetimedb_lib::{hash::hash_bytes, DataKey}; +use spacetimedb_lib::hash::hash_bytes; use std::sync::Arc; use std::sync::Mutex; @@ -34,13 +40,13 @@ impl CommitLog { /// Persist to disk the [Tx] result into the [MessageLog]. /// - /// Returns `Some(n_bytes_written)` if `commit_result` was persisted, `false` if it doesn't have bytes to write. + /// Returns `Some(n_bytes_written)` if `commit_result` was persisted, `None` if it doesn't have bytes to write. #[tracing::instrument(skip_all)] - pub fn append_tx(&self, transaction: Arc, datastore: &D) -> Result, DBError> + pub fn append_tx(&self, tx_data: &TxData, datastore: &D) -> Result, DBError> where D: MutTxDatastore, { - if let Some(bytes) = self.generate_commit(transaction, datastore) { + if let Some(bytes) = self.generate_commit(tx_data, datastore) { let mut mlog = self.mlog.lock().unwrap(); mlog.append(&bytes)?; mlog.sync_all()?; @@ -51,37 +57,36 @@ impl CommitLog { } } - fn generate_commit(&self, transaction: Arc, datastore: &D) -> Option> - where - D: MutTxDatastore, - { - // TODO(george) Don't clone the data, just the Arc. + fn generate_commit>(&self, tx_data: &TxData, datastore: &D) -> Option> { let mut unwritten_commit = self.unwritten_commit.lock().unwrap(); - unwritten_commit.transactions.push(transaction.clone()); + let writes = tx_data + .records + .iter() + .map(|record| Write { + operation: match record.op { + TxOp::Insert(_) => Operation::Insert, + TxOp::Delete => Operation::Delete, + }, + set_id: record.table_id.0, + data_key: record.key, + }) + .collect(); + let transaction = Transaction { writes }; + unwritten_commit.transactions.push(Arc::new(transaction)); const COMMIT_SIZE: usize = 1; let tx = datastore.begin_mut_tx(); if unwritten_commit.transactions.len() >= COMMIT_SIZE { - let mut datas = Vec::new(); - for write in transaction.writes.iter() { - match write.operation { - Operation::Delete => continue, // if we deleted a value, then the data is not in the datastore - Operation::Insert => (), - } - let data = match write.data_key { - DataKey::Data(data) => Arc::new(data.to_vec()), - DataKey::Hash(_) => datastore - .resolve_data_key_mut_tx(&tx, &write.data_key) - .unwrap() - .unwrap(), - }; - datas.push(data) - } { let mut guard = self.odb.lock().unwrap(); - for data in datas.into_iter() { - guard.add((*data).clone()); + for record in &tx_data.records { + match &record.op { + TxOp::Insert(bytes) => { + guard.add(Vec::clone(bytes)); + } + TxOp::Delete => continue, + } } } diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs index 8f59f636e9..cbf516a184 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs @@ -21,19 +21,17 @@ use super::{ }, traits::{ self, ColId, DataRow, IndexDef, IndexId, IndexSchema, MutTx, MutTxDatastore, SequenceDef, SequenceId, TableDef, - TableId, TableSchema, TxDatastore, + TableId, TableSchema, TxData, TxDatastore, }, }; use crate::{ + db::datastore::traits::{TxOp, TxRecord}, db::{ datastore::{ system_tables::{st_columns_schema, st_indexes_schema, st_sequences_schema, st_table_schema}, traits::ColumnSchema, }, - messages::{ - transaction::Transaction, - write::{self, Operation, Write}, - }, + messages::{transaction::Transaction, write::Operation}, ostorage::ObjectDB, }, error::{DBError, IndexError, TableError}, @@ -113,10 +111,10 @@ impl CommittedState { Self { tables: HashMap::new() } } - fn get_or_create_table(&mut self, table_id: TableId, row_type: ProductType, schema: TableSchema) -> &mut Table { + fn get_or_create_table(&mut self, table_id: TableId, row_type: &ProductType, schema: &TableSchema) -> &mut Table { self.tables.entry(table_id).or_insert_with(|| Table { - row_type, - schema, + row_type: row_type.clone(), + schema: schema.clone(), rows: BTreeMap::new(), indexes: HashMap::new(), }) @@ -126,18 +124,24 @@ impl CommittedState { self.tables.get_mut(table_id) } - fn merge(&mut self, tx_state: TxState) -> Arc { - let mut transaction = Transaction { writes: vec![] }; + fn merge(&mut self, tx_state: TxState, memory: BTreeMap>>) -> TxData { + let mut tx_data = TxData { records: vec![] }; for (table_id, table) in tx_state.insert_tables { - let commit_table = self.get_or_create_table(table_id, table.row_type, table.schema); - for (row_id, row) in table.rows { - commit_table.insert(row_id, row); - transaction.writes.push(Write { - operation: write::Operation::Insert, - set_id: table_id.0, - data_key: row_id.0, - }) - } + let commit_table = self.get_or_create_table(table_id, &table.row_type, &table.schema); + tx_data.records.extend(table.rows.into_iter().map(|(row_id, row)| { + commit_table.insert(row_id, row.clone()); + let pv = row; + let bytes = match row_id.0 { + DataKey::Data(data) => Arc::new(data.to_vec()), + DataKey::Hash(_) => memory.get(&row_id.0).unwrap().clone(), + }; + TxRecord { + op: TxOp::Insert(bytes), + table_id, + key: row_id.0, + product_value: pv, + } + })); // Add all newly created indexes to the committed state for (_, index) in table.indexes { @@ -158,16 +162,18 @@ impl CommittedState { // 5. Commit the transaction if let Some(table) = self.get_table(&table_id) { for row_id in row_ids { - table.delete(&row_id); - transaction.writes.push(Write { - operation: write::Operation::Delete, - set_id: table_id.0, - data_key: row_id.0, - }) + if let Some(pv) = table.delete(&row_id) { + tx_data.records.push(TxRecord { + op: TxOp::Delete, + table_id, + key: row_id.0, + product_value: pv, + }) + } } } } - Arc::new(transaction) + tx_data } pub fn index_seek<'a>( @@ -192,7 +198,7 @@ struct TxState { /// Represents whether a row has been inserted, deleted, or has not been /// alterted in the current transaction. enum RowOp { - Insert, + Insert(ProductValue), Delete, Absent, } @@ -206,13 +212,16 @@ impl TxState { } pub fn get_row_op(&self, table_id: &TableId, row_id: &RowId) -> RowOp { - if Some(true) == self.delete_tables.get(table_id).map(|set| set.contains(row_id)) { + if let Some(true) = self.delete_tables.get(table_id).map(|set| set.contains(row_id)) { return RowOp::Delete; } let Some(table) = self.insert_tables.get(table_id) else { return RowOp::Absent; }; - table.get_row(row_id).map(|_| RowOp::Insert).unwrap_or(RowOp::Absent) + table + .get_row(row_id) + .map(|pv| RowOp::Insert(pv.clone())) + .unwrap_or(RowOp::Absent) } pub fn get_row(&self, table_id: &TableId, row_id: &RowId) -> Option<&ProductValue> { @@ -272,20 +281,20 @@ impl SequencesState { } struct Inner { - // TODO: This horror is brought to you by the fact that in two - // place we need to get access to the bytes of large blobs - // by looking them up by their `DataKey` when they may have - // been deleted from the datastore. - ever_growing_waste_of_memory: BTreeMap>>, + /// All of the byte objects inserted in the current transaction. + memory: BTreeMap>>, + /// The state of the database up to the point of the last committed transaction. committed_state: CommittedState, + /// The state of all insertions and deletions in this transaction. tx_state: Option, + /// The state of sequence generation in this database. sequence_state: SequencesState, } impl Inner { pub fn new() -> Self { Self { - ever_growing_waste_of_memory: BTreeMap::new(), + memory: BTreeMap::new(), committed_state: CommittedState::new(), tx_state: None, sequence_state: SequencesState::new(), @@ -297,9 +306,9 @@ impl Inner { let table_name = &schema.table_name; // Insert the table row into st_tables, creating st_tables if it's missing - let st_tables = - self.committed_state - .get_or_create_table(ST_TABLES_ID, ST_TABLE_ROW_TYPE.clone(), st_table_schema()); + let st_tables = self + .committed_state + .get_or_create_table(ST_TABLES_ID, &ST_TABLE_ROW_TYPE, &st_table_schema()); let row = StTableRow { table_id, table_name: &table_name, @@ -320,11 +329,9 @@ impl Inner { let row = ProductValue::from(&row); let data_key = row.to_data_key(); { - let st_columns = self.committed_state.get_or_create_table( - ST_COLUMNS_ID, - ST_COLUMNS_ROW_TYPE.clone(), - st_columns_schema(), - ); + let st_columns = + self.committed_state + .get_or_create_table(ST_COLUMNS_ID, &ST_COLUMNS_ROW_TYPE, &st_columns_schema()); st_columns.rows.insert(RowId(data_key), row); } @@ -339,8 +346,8 @@ impl Inner { }; let st_sequences = self.committed_state.get_or_create_table( ST_SEQUENCES_ID, - ST_SEQUENCE_ROW_TYPE.clone(), - st_sequences_schema(), + &ST_SEQUENCE_ROW_TYPE, + &st_sequences_schema(), ); let row = StSequenceRow { sequence_id: seq_id.0, @@ -362,7 +369,7 @@ impl Inner { // Insert the indexes into st_indexes let st_indexes = self.committed_state - .get_or_create_table(ST_INDEXES_ID, ST_INDEX_ROW_TYPE.clone(), st_indexes_schema()); + .get_or_create_table(ST_INDEXES_ID, &ST_INDEX_ROW_TYPE, &st_indexes_schema()); for (_, index) in schema.indexes.iter().enumerate() { let row = StIndexRow { index_id: index.index_id, @@ -976,17 +983,21 @@ impl Inner { }) } - fn contains_row(&self, table_id: &TableId, row_id: &RowId) -> bool { + fn contains_row(&self, table_id: &TableId, row_id: &RowId) -> RowOp { match self.tx_state.as_ref().unwrap().get_row_op(table_id, row_id) { - RowOp::Insert => return true, - RowOp::Delete => return false, + RowOp::Insert(pv) => return RowOp::Insert(pv), + RowOp::Delete => return RowOp::Delete, RowOp::Absent => (), } - self.committed_state + match self + .committed_state .tables .get(table_id) - .map(|table| table.rows.contains_key(row_id)) - .unwrap_or(false) + .and_then(|table| table.rows.get(row_id)) + { + Some(pv) => RowOp::Insert(pv.clone()), + None => RowOp::Absent, + } } fn table_exists(&self, table_id: &TableId) -> bool { @@ -1190,7 +1201,7 @@ impl Inner { match data_key { DataKey::Data(_) => (), DataKey::Hash(_) => { - self.ever_growing_waste_of_memory.insert(data_key, Arc::new(bytes)); + self.memory.insert(data_key, Arc::new(bytes)); } }; } @@ -1201,28 +1212,13 @@ impl Inner { Ok(()) } - fn resolve_data_key(&self, data_key: &DataKey) -> super::Result>>> { - match data_key { - DataKey::Data(data) => return Ok(Some(Arc::new(data.to_vec()))), - DataKey::Hash(_) => (), - } - if let Some(bytes) = self.ever_growing_waste_of_memory.get(data_key) { - return Ok(Some(bytes.clone())); - } - Ok(None) - } - fn get(&self, table_id: &TableId, row_id: &RowId) -> super::Result> { if !self.table_exists(table_id) { return Err(TableError::IdNotFound(table_id.0).into()); } match self.tx_state.as_ref().unwrap().get_row_op(table_id, row_id) { - RowOp::Insert => { - return Ok(self - .tx_state - .as_ref() - .and_then(|tx_state| tx_state.get_row(table_id, row_id)) - .map(|row| DataRef::new(row.clone()))); + RowOp::Insert(row) => { + return Ok(Some(DataRef::new(row))); } RowOp::Delete => { return Ok(None); @@ -1272,14 +1268,14 @@ impl Inner { } fn delete_row_internal(&mut self, table_id: &TableId, row_id: &RowId) -> bool { - if self.contains_row(table_id, row_id) { + if let RowOp::Insert(_) = self.contains_row(table_id, row_id) { self.tx_state .as_mut() .unwrap() .get_or_create_delete_table(*table_id) .insert(*row_id); if let Some(table) = self.tx_state.as_mut().unwrap().get_insert_table_mut(table_id) { - table.delete(row_id) + table.delete(row_id); } true } else { @@ -1353,9 +1349,11 @@ impl Inner { } } - fn commit(&mut self) -> super::Result>> { + fn commit(&mut self) -> super::Result> { let tx_state = self.tx_state.take().unwrap(); - Ok(Some(self.committed_state.merge(tx_state))) + let memory = std::mem::take(&mut self.memory); + let tx_data = self.committed_state.merge(tx_state, memory); + Ok(Some(tx_data)) } fn rollback(&mut self) { @@ -1539,8 +1537,8 @@ impl Iterator for Iter<'_> { .as_ref() .map(|tx_state| tx_state.get_row_op(&self.table_id, row_id)) { - Some(RowOp::Insert) => (), // Do nothing, we'll get it in the next stage - Some(RowOp::Delete) => (), // Skip it, it's been deleted + Some(RowOp::Insert(_)) => (), // Do nothing, we'll get it in the next stage + Some(RowOp::Delete) => (), // Skip it, it's been deleted Some(RowOp::Absent) => { return Some(DataRef::new(row.clone())); } @@ -1764,10 +1762,7 @@ impl traits::MutTx for Locking { tx.lock.rollback(); } - fn commit_mut_tx( - &self, - mut tx: Self::MutTxId, - ) -> super::Result>> { + fn commit_mut_tx(&self, mut tx: Self::MutTxId) -> super::Result> { tx.lock.commit() } } @@ -1915,10 +1910,6 @@ impl MutTxDatastore for Locking { ) -> super::Result { tx.lock.insert(table_id, row) } - - fn resolve_data_key_mut_tx(&self, tx: &Self::MutTxId, data_key: &DataKey) -> super::Result>>> { - tx.lock.resolve_data_key(data_key) - } } #[cfg(test)] diff --git a/crates/core/src/db/datastore/locking_tx_datastore/table.rs b/crates/core/src/db/datastore/locking_tx_datastore/table.rs index 4f2fca7206..f07c3627ae 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/table.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/table.rs @@ -29,12 +29,13 @@ impl Table { self.rows.insert(row_id, row); } - pub(crate) fn delete(&mut self, row_id: &RowId) { - let Some(row) = self.rows.remove(row_id) else { return }; + pub(crate) fn delete(&mut self, row_id: &RowId) -> Option { + let row = self.rows.remove(row_id)?; for (col_id, index) in self.indexes.iter_mut() { let col_value = row.get_field(col_id.0 as usize, None).unwrap(); index.delete(col_value, row_id) } + Some(row) } pub(crate) fn get_row(&self, row_id: &RowId) -> Option<&ProductValue> { diff --git a/crates/core/src/db/datastore/traits.rs b/crates/core/src/db/datastore/traits.rs index 48a506ebae..5c49a77197 100644 --- a/crates/core/src/db/datastore/traits.rs +++ b/crates/core/src/db/datastore/traits.rs @@ -1,4 +1,4 @@ -use crate::db::{messages::transaction::Transaction, relational_db::ST_TABLES_ID}; +use crate::db::relational_db::ST_TABLES_ID; use core::fmt; use spacetimedb_lib::DataKey; use spacetimedb_sats::{ @@ -276,6 +276,31 @@ impl From for TableDef { } } +/// Operations in a transaction are either Inserts or Deletes. +/// Inserts report the byte objects they inserted, to be persisted +/// later in an object store. +pub enum TxOp { + Insert(Arc>), + Delete, +} + +/// A record of a single operation within a transaction. +pub struct TxRecord { + /// Whether the operation was an insert or a delete. + pub(crate) op: TxOp, + /// The value of the modified row. + pub(crate) product_value: ProductValue, + /// The key of the modified row. + pub(crate) key: DataKey, + /// The table that was modified. + pub(crate) table_id: TableId, +} + +/// A record of all the operations within a transaction. +pub struct TxData { + pub(crate) records: Vec, +} + pub trait Blob { fn view(&self) -> &[u8]; } @@ -315,7 +340,7 @@ pub trait MutTx { fn begin_mut_tx(&self) -> Self::MutTxId; fn rollback_mut_tx(&self, tx: Self::MutTxId); - fn commit_mut_tx(&self, tx: Self::MutTxId) -> Result>>; + fn commit_mut_tx(&self, tx: Self::MutTxId) -> Result>; } pub trait Blobstore: BlobRow { @@ -528,7 +553,4 @@ pub trait MutTxDatastore: TxDatastore + MutTx { table_id: TableId, row: ProductValue, ) -> Result; - - // TODO(cloutiertyler): This function is needed as a kludge and should be removed. - fn resolve_data_key_mut_tx(&self, tx: &Self::MutTxId, data_key: &DataKey) -> Result>>>; } diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 44c25224e0..1dd2699f6b 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -2,9 +2,9 @@ use super::commit_log::CommitLog; use super::datastore::locking_tx_datastore::{Data, DataRef, Iter, IterByColEq, IterByColRange, MutTxId, RowId}; use super::datastore::traits::{ ColId, DataRow, IndexDef, IndexId, MutTx, MutTxDatastore, SequenceDef, SequenceId, TableDef, TableId, TableSchema, + TxData, }; use super::message_log::MessageLog; -use super::messages::transaction::Transaction; use super::relational_operators::Relation; use crate::db::db_metrics::{RDB_DELETE_BY_REL_TIME, RDB_DROP_TABLE_TIME, RDB_INSERT_TIME, RDB_ITER_TIME}; use crate::db::messages::commit::Commit; @@ -212,13 +212,13 @@ impl RelationalDB { log::trace!("ROLLBACK TX"); self.inner.rollback_mut_tx(tx) } - pub fn commit_tx(&self, tx: MutTxId) -> Result<(Option>, Option), DBError> { + pub fn commit_tx(&self, tx: MutTxId) -> Result)>, DBError> { log::trace!("COMMIT TX"); - if let Some(transaction) = self.inner.commit_mut_tx(tx)? { - let bytes_written = self.commit_log.append_tx(transaction.clone(), &self.inner)?; - return Ok((Some(transaction), bytes_written)); + if let Some(tx_data) = self.inner.commit_mut_tx(tx)? { + let bytes_written = self.commit_log.append_tx(&tx_data, &self.inner)?; + return Ok(Some((tx_data, bytes_written))); } - Ok((None, None)) + Ok(None) } /// Run a fallible function in a transaction. @@ -259,9 +259,9 @@ impl RelationalDB { if res.is_err() { self.rollback_tx(tx); } else { - let (transaction, _bytes_written) = self.commit_tx(tx).map_err(E::from)?; - if transaction.is_none() { - panic!("TODO: retry?"); + match self.commit_tx(tx).map_err(E::from)? { + Some(_) => (), + None => panic!("TODO: retry?"), } } res diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index e8585d0088..cc8cbe76e8 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1,6 +1,5 @@ use crate::client::ClientConnectionSender; -use crate::db::datastore::traits::MutTxDatastore; -use crate::db::messages::write::Write; +use crate::db::datastore::traits::{TableId, TxData, TxOp}; use crate::db::relational_db::RelationalDB; use crate::error::DBError; use crate::hash::Hash; @@ -32,38 +31,24 @@ impl DatabaseUpdate { false } - pub fn from_writes(stdb: &RelationalDB, writes: &Vec) -> Self { - let mut map: HashMap> = HashMap::new(); + pub fn from_writes(stdb: &RelationalDB, tx_data: &TxData) -> Self { + let mut map: HashMap> = HashMap::new(); //TODO: This should be wrapped with .auto_commit let tx = stdb.begin_tx(); - for write in writes { - let op = match write.operation { - crate::db::messages::write::Operation::Delete => 0, - crate::db::messages::write::Operation::Insert => 1, + for record in tx_data.records.iter() { + let op = match record.op { + TxOp::Delete => 0, + TxOp::Insert(_) => 1, }; - let vec = if let Some(vec) = map.get_mut(&write.set_id) { + let vec = if let Some(vec) = map.get_mut(&record.table_id) { vec } else { - map.insert(write.set_id, Vec::new()); - map.get_mut(&write.set_id).unwrap() + map.insert(record.table_id, Vec::new()); + map.get_mut(&record.table_id).unwrap() }; - let (row, row_pk) = { - // TODO: This is not safe at all and horribly horrible. - // The following code will panic if you try to get the row_type for the table - // and the table has since been deleted from the datastore. - let bytes = stdb - .inner - .resolve_data_key_mut_tx(&tx, &write.data_key) - .unwrap() - .unwrap(); - let ty = stdb - .row_schema_for_table(&tx, write.set_id) - .expect("Tyler to have written better code."); - let tuple = ProductValue::decode(&ty, &mut &bytes[..]).unwrap(); - (tuple, write.data_key.to_bytes()) - }; + let (row, row_pk) = (record.product_value.clone(), record.key.to_bytes()); vec.push(TableOp { op_type: op, @@ -72,18 +57,18 @@ impl DatabaseUpdate { }); } - let mut table_name_map: HashMap = HashMap::new(); + let mut table_name_map: HashMap = HashMap::new(); let mut table_updates = Vec::new(); for (table_id, table_row_operations) in map.drain() { let table_name = if let Some(name) = table_name_map.get(&table_id) { name.clone() } else { - let table_name = stdb.table_name_from_id(&tx, table_id).unwrap().unwrap(); + let table_name = stdb.table_name_from_id(&tx, table_id.0).unwrap().unwrap(); table_name_map.insert(table_id, table_name.clone()); table_name }; table_updates.push(DatabaseTableUpdate { - table_id, + table_id: table_id.0, table_name, ops: table_row_operations, }); diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 5680a33869..8df119e38b 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -796,8 +796,7 @@ impl WasmInstanceActor { EventStatus::Failed(errmsg.into()) } Ok(Ok(())) => { - let (transaction, bytes_written) = stdb.commit_tx(tx).unwrap(); - if let Some(transaction) = transaction { + if let Some((tx_data, bytes_written)) = stdb.commit_tx(tx).unwrap() { // TODO(cloutiertyler): This tracking doesn't really belong here if we want to write transactions to disk // in batches. This is because it's possible for a tiny reducer call to trigger a whole commit to be written to disk. // We should track the commit sizes instead internally to the CommitLog probably. @@ -806,7 +805,7 @@ impl WasmInstanceActor { .with_label_values(&[address, func_ident]) .observe(bytes_written as f64); } - EventStatus::Committed(DatabaseUpdate::from_writes(stdb, &transaction.writes)) + EventStatus::Committed(DatabaseUpdate::from_writes(stdb, &tx_data)) } else { todo!("Write skew, you need to implement retries my man, T-dawg."); } diff --git a/crates/core/src/subscription/query.rs b/crates/core/src/subscription/query.rs index a37343486d..c4b0c9a6e1 100644 --- a/crates/core/src/subscription/query.rs +++ b/crates/core/src/subscription/query.rs @@ -299,11 +299,11 @@ mod tests { let head_1 = ProductType::from_iter([("inventory_id", BuiltinType::U64), ("name", BuiltinType::String)]); let row_1 = product!(1u64, "health"); - let table_id_1 = create_table_from_program(p, "inventory", head_1.clone(), &[row_1.clone()])?; + let table_id_1 = create_table_from_program(p, "inventory", head_1, &[row_1])?; let head_2 = ProductType::from_iter([("player_id", BuiltinType::U64), ("name", BuiltinType::String)]); let row_2 = product!(2u64, "jhon doe"); - let table_id_2 = create_table_from_program(p, "player", head_2, &[row_2.clone()])?; + let table_id_2 = create_table_from_program(p, "player", head_2, &[row_2])?; let schema_1 = db.schema_for_table(&tx, table_id_1).unwrap(); let schema_2 = db.schema_for_table(&tx, table_id_2).unwrap(); @@ -323,19 +323,13 @@ mod tests { let result_1 = s.eval(&db)?; - let s = QuerySet(vec![ - Query { - queries: vec![q_2.clone()], - }, - Query { queries: vec![q_1] }, - ]); + let s = QuerySet(vec![Query { queries: vec![q_2] }, Query { queries: vec![q_1] }]); let result_2 = s.eval(&db)?; let to_row = |of: DatabaseUpdate| { of.tables .iter() - .map(|x| x.ops.iter().map(|x| x.row.clone())) - .flatten() + .flat_map(|x| x.ops.iter().map(|x| x.row.clone())) .sorted() .collect::>() };