Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(428): Return only one message per table in subscription #429

Merged
merged 1 commit into from
Oct 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/core/src/subscription/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ mod tests {
tables: vec![data.clone()],
};

check_query_incr(&db, &mut tx, &s, &update, 3, &[row])?;
check_query_incr(&db, &mut tx, &s, &update, 1, &[row])?;

let q = QueryExpr::new(db_table((&schema).into(), "_inventory".to_owned(), schema.table_id));

Expand Down Expand Up @@ -656,7 +656,7 @@ mod tests {
},
]);

check_query_eval(&db, &mut tx, &s, 3, &[product!(1u64, "health")])?;
check_query_eval(&db, &mut tx, &s, 1, &[product!(1u64, "health")])?;

let row = product!(1u64, "health");

Expand All @@ -680,7 +680,7 @@ mod tests {

let update = DatabaseUpdate { tables: vec![data] };

check_query_incr(&db, &mut tx, &s, &update, 3, &[row])?;
check_query_incr(&db, &mut tx, &s, &update, 1, &[row])?;

Ok(())
}
Expand Down
66 changes: 37 additions & 29 deletions crates/core/src/subscription/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use spacetimedb_lib::relation::RelValue;
use spacetimedb_lib::PrimaryKey;
use spacetimedb_sats::AlgebraicValue;
use spacetimedb_vm::expr::QueryExpr;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::ops::Deref;

use super::query::Query;
Expand Down Expand Up @@ -93,10 +93,14 @@ impl QuerySet {
auth: AuthCtx,
) -> Result<DatabaseUpdate, DBError> {
let mut output = DatabaseUpdate { tables: vec![] };
let mut table_ops = HashMap::new();
let mut seen = HashSet::new();

for query in &self.0 {
for table in database_update.tables.iter().cloned() {
let (_, table_row_operations) = table_ops
.entry(table.table_id)
.or_insert((table.table_name.clone(), vec![]));
for q in query.queries_of_table_id(&table) {
if let Some(result) = run_query(relational_db, tx, &q, auth)?
.into_iter()
Expand All @@ -110,8 +114,6 @@ impl QuerySet {
)
});

let mut table_row_operations = table.clone();
table_row_operations.ops.clear();
for mut row in result.data {
//Hack: remove the hidden field OP_TYPE_FIELD_NAME. see `to_mem_table`
// Needs to be done before calculating the PK.
Expand All @@ -132,14 +134,19 @@ impl QuerySet {

let row_pk = row_pk.to_bytes();
let row = row.data;
table_row_operations.ops.push(TableOp { op_type, row_pk, row });
table_row_operations.push(TableOp { op_type, row_pk, row });
}
output.tables.push(table_row_operations);
}
}
}
}

for (table_id, (table_name, ops)) in table_ops.into_iter().filter(|(_, (_, ops))| !ops.is_empty()) {
output.tables.push(DatabaseTableUpdate {
table_id,
table_name,
ops,
});
}
Ok(output)
}

Expand All @@ -158,43 +165,44 @@ impl QuerySet {
auth: AuthCtx,
) -> Result<DatabaseUpdate, DBError> {
let mut database_update: DatabaseUpdate = DatabaseUpdate { tables: vec![] };
let mut table_ops = HashMap::new();
let mut seen = HashSet::new();

for query in &self.0 {
for q in &query.queries {
if let Some(t) = q.source.get_db_table() {
let (_, table_row_operations) = table_ops
.entry(t.table_id)
.or_insert((t.head.table_name.clone(), vec![]));
for table in run_query(relational_db, tx, q, auth)? {
{
let mut table_row_operations = Vec::new();

for row in table.data {
let row_pk = pk_for_row(&row);

//Skip rows that are already resolved in a previous subscription...
if seen.contains(&(t.table_id, row_pk)) {
continue;
}
seen.insert((t.table_id, row_pk));

let row_pk = row_pk.to_bytes();
let row = row.data;
table_row_operations.push(TableOp {
op_type: 1, // Insert
row_pk,
row,
});
for row in table.data {
let row_pk = pk_for_row(&row);

//Skip rows that are already resolved in a previous subscription...
if seen.contains(&(t.table_id, row_pk)) {
continue;
}
seen.insert((t.table_id, row_pk));

database_update.tables.push(DatabaseTableUpdate {
table_id: t.table_id,
table_name: t.head.table_name.clone(),
ops: table_row_operations,
let row_pk = row_pk.to_bytes();
let row = row.data;
table_row_operations.push(TableOp {
op_type: 1, // Insert
row_pk,
row,
});
}
}
}
}
}
for (table_id, (table_name, ops)) in table_ops.into_iter().filter(|(_, (_, ops))| !ops.is_empty()) {
database_update.tables.push(DatabaseTableUpdate {
table_id,
table_name,
ops,
});
}
Ok(database_update)
}
}
Loading