diff --git a/crates/core/src/subscription/query.rs b/crates/core/src/subscription/query.rs index f96d4a5fb5..d2fb05bbf6 100644 --- a/crates/core/src/subscription/query.rs +++ b/crates/core/src/subscription/query.rs @@ -601,7 +601,7 @@ mod tests { tables: vec![data.clone()], }; - check_query_incr(&db, &mut tx, &s, &update, 3, &[row])?; + check_query_incr(&db, &mut tx, &s, &update, 1, &[row])?; let q = QueryExpr::new(db_table((&schema).into(), "_inventory".to_owned(), schema.table_id)); @@ -656,7 +656,7 @@ mod tests { }, ]); - check_query_eval(&db, &mut tx, &s, 3, &[product!(1u64, "health")])?; + check_query_eval(&db, &mut tx, &s, 1, &[product!(1u64, "health")])?; let row = product!(1u64, "health"); @@ -680,7 +680,7 @@ mod tests { let update = DatabaseUpdate { tables: vec![data] }; - check_query_incr(&db, &mut tx, &s, &update, 3, &[row])?; + check_query_incr(&db, &mut tx, &s, &update, 1, &[row])?; Ok(()) } diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index c987bce037..a24ce5fd2e 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -4,7 +4,7 @@ use spacetimedb_lib::relation::RelValue; use spacetimedb_lib::PrimaryKey; use spacetimedb_sats::AlgebraicValue; use spacetimedb_vm::expr::QueryExpr; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::ops::Deref; use super::query::Query; @@ -93,10 +93,14 @@ impl QuerySet { auth: AuthCtx, ) -> Result { let mut output = DatabaseUpdate { tables: vec![] }; + let mut table_ops = HashMap::new(); let mut seen = HashSet::new(); for query in &self.0 { for table in database_update.tables.iter().cloned() { + let (_, table_row_operations) = table_ops + .entry(table.table_id) + .or_insert((table.table_name.clone(), vec![])); for q in query.queries_of_table_id(&table) { if let Some(result) = run_query(relational_db, tx, &q, auth)? .into_iter() @@ -110,8 +114,6 @@ impl QuerySet { ) }); - let mut table_row_operations = table.clone(); - table_row_operations.ops.clear(); for mut row in result.data { //Hack: remove the hidden field OP_TYPE_FIELD_NAME. see `to_mem_table` // Needs to be done before calculating the PK. @@ -132,14 +134,19 @@ impl QuerySet { let row_pk = row_pk.to_bytes(); let row = row.data; - table_row_operations.ops.push(TableOp { op_type, row_pk, row }); + table_row_operations.push(TableOp { op_type, row_pk, row }); } - output.tables.push(table_row_operations); } } } } - + for (table_id, (table_name, ops)) in table_ops.into_iter().filter(|(_, (_, ops))| !ops.is_empty()) { + output.tables.push(DatabaseTableUpdate { + table_id, + table_name, + ops, + }); + } Ok(output) } @@ -158,43 +165,44 @@ impl QuerySet { auth: AuthCtx, ) -> Result { let mut database_update: DatabaseUpdate = DatabaseUpdate { tables: vec![] }; + let mut table_ops = HashMap::new(); let mut seen = HashSet::new(); for query in &self.0 { for q in &query.queries { if let Some(t) = q.source.get_db_table() { + let (_, table_row_operations) = table_ops + .entry(t.table_id) + .or_insert((t.head.table_name.clone(), vec![])); for table in run_query(relational_db, tx, q, auth)? { - { - let mut table_row_operations = Vec::new(); - - for row in table.data { - let row_pk = pk_for_row(&row); - - //Skip rows that are already resolved in a previous subscription... - if seen.contains(&(t.table_id, row_pk)) { - continue; - } - seen.insert((t.table_id, row_pk)); - - let row_pk = row_pk.to_bytes(); - let row = row.data; - table_row_operations.push(TableOp { - op_type: 1, // Insert - row_pk, - row, - }); + for row in table.data { + let row_pk = pk_for_row(&row); + + //Skip rows that are already resolved in a previous subscription... + if seen.contains(&(t.table_id, row_pk)) { + continue; } + seen.insert((t.table_id, row_pk)); - database_update.tables.push(DatabaseTableUpdate { - table_id: t.table_id, - table_name: t.head.table_name.clone(), - ops: table_row_operations, + let row_pk = row_pk.to_bytes(); + let row = row.data; + table_row_operations.push(TableOp { + op_type: 1, // Insert + row_pk, + row, }); } } } } } + for (table_id, (table_name, ops)) in table_ops.into_iter().filter(|(_, (_, ops))| !ops.is_empty()) { + database_update.tables.push(DatabaseTableUpdate { + table_id, + table_name, + ops, + }); + } Ok(database_update) } }