Skip to content

Commit

Permalink
fix(428): Return only one message per table in subscription (#429)
Browse files Browse the repository at this point in the history
Fixes #428.

Before this change, if you had multiple queries for the same base table
as part of a subscription, you would receive multiple DatabaseTableUpdate
messages referencing the same table.

Clients did not expect nor handle such cases.
  • Loading branch information
joshua-spacetime authored Oct 14, 2023
1 parent c4af68f commit a66faa4
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 32 deletions.
6 changes: 3 additions & 3 deletions crates/core/src/subscription/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ mod tests {
tables: vec![data.clone()],
};

check_query_incr(&db, &mut tx, &s, &update, 3, &[row])?;
check_query_incr(&db, &mut tx, &s, &update, 1, &[row])?;

let q = QueryExpr::new(db_table((&schema).into(), "_inventory".to_owned(), schema.table_id));

Expand Down Expand Up @@ -656,7 +656,7 @@ mod tests {
},
]);

check_query_eval(&db, &mut tx, &s, 3, &[product!(1u64, "health")])?;
check_query_eval(&db, &mut tx, &s, 1, &[product!(1u64, "health")])?;

let row = product!(1u64, "health");

Expand All @@ -680,7 +680,7 @@ mod tests {

let update = DatabaseUpdate { tables: vec![data] };

check_query_incr(&db, &mut tx, &s, &update, 3, &[row])?;
check_query_incr(&db, &mut tx, &s, &update, 1, &[row])?;

Ok(())
}
Expand Down
66 changes: 37 additions & 29 deletions crates/core/src/subscription/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use spacetimedb_lib::relation::RelValue;
use spacetimedb_lib::PrimaryKey;
use spacetimedb_sats::AlgebraicValue;
use spacetimedb_vm::expr::QueryExpr;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::ops::Deref;

use super::query::Query;
Expand Down Expand Up @@ -93,10 +93,14 @@ impl QuerySet {
auth: AuthCtx,
) -> Result<DatabaseUpdate, DBError> {
let mut output = DatabaseUpdate { tables: vec![] };
let mut table_ops = HashMap::new();
let mut seen = HashSet::new();

for query in &self.0 {
for table in database_update.tables.iter().cloned() {
let (_, table_row_operations) = table_ops
.entry(table.table_id)
.or_insert((table.table_name.clone(), vec![]));
for q in query.queries_of_table_id(&table) {
if let Some(result) = run_query(relational_db, tx, &q, auth)?
.into_iter()
Expand All @@ -110,8 +114,6 @@ impl QuerySet {
)
});

let mut table_row_operations = table.clone();
table_row_operations.ops.clear();
for mut row in result.data {
//Hack: remove the hidden field OP_TYPE_FIELD_NAME. see `to_mem_table`
// Needs to be done before calculating the PK.
Expand All @@ -132,14 +134,19 @@ impl QuerySet {

let row_pk = row_pk.to_bytes();
let row = row.data;
table_row_operations.ops.push(TableOp { op_type, row_pk, row });
table_row_operations.push(TableOp { op_type, row_pk, row });
}
output.tables.push(table_row_operations);
}
}
}
}

for (table_id, (table_name, ops)) in table_ops.into_iter().filter(|(_, (_, ops))| !ops.is_empty()) {
output.tables.push(DatabaseTableUpdate {
table_id,
table_name,
ops,
});
}
Ok(output)
}

Expand All @@ -158,43 +165,44 @@ impl QuerySet {
auth: AuthCtx,
) -> Result<DatabaseUpdate, DBError> {
let mut database_update: DatabaseUpdate = DatabaseUpdate { tables: vec![] };
let mut table_ops = HashMap::new();
let mut seen = HashSet::new();

for query in &self.0 {
for q in &query.queries {
if let Some(t) = q.source.get_db_table() {
let (_, table_row_operations) = table_ops
.entry(t.table_id)
.or_insert((t.head.table_name.clone(), vec![]));
for table in run_query(relational_db, tx, q, auth)? {
{
let mut table_row_operations = Vec::new();

for row in table.data {
let row_pk = pk_for_row(&row);

//Skip rows that are already resolved in a previous subscription...
if seen.contains(&(t.table_id, row_pk)) {
continue;
}
seen.insert((t.table_id, row_pk));

let row_pk = row_pk.to_bytes();
let row = row.data;
table_row_operations.push(TableOp {
op_type: 1, // Insert
row_pk,
row,
});
for row in table.data {
let row_pk = pk_for_row(&row);

//Skip rows that are already resolved in a previous subscription...
if seen.contains(&(t.table_id, row_pk)) {
continue;
}
seen.insert((t.table_id, row_pk));

database_update.tables.push(DatabaseTableUpdate {
table_id: t.table_id,
table_name: t.head.table_name.clone(),
ops: table_row_operations,
let row_pk = row_pk.to_bytes();
let row = row.data;
table_row_operations.push(TableOp {
op_type: 1, // Insert
row_pk,
row,
});
}
}
}
}
}
for (table_id, (table_name, ops)) in table_ops.into_iter().filter(|(_, (_, ops))| !ops.is_empty()) {
database_update.tables.push(DatabaseTableUpdate {
table_id,
table_name,
ops,
});
}
Ok(database_update)
}
}

0 comments on commit a66faa4

Please sign in to comment.