Skip to content

Commit

Permalink
test(430): Incremental evaluation for semijoins
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-spacetime authored and pull[bot] committed Aug 7, 2024
1 parent e4a6969 commit 1069294
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 52 deletions.
4 changes: 2 additions & 2 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,14 @@ impl DatabaseUpdate {
}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DatabaseTableUpdate {
pub table_id: u32,
pub table_name: String,
pub ops: Vec<TableOp>,
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TableOp {
pub op_type: u8,
pub row_pk: Vec<u8>,
Expand Down
291 changes: 241 additions & 50 deletions crates/core/src/subscription/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,42 @@ mod tests {
Ok(db.create_table(tx, schema)?)
}

fn insert_op(table_id: u32, table_name: &str, row: ProductValue) -> DatabaseTableUpdate {
let row_pk = row.to_data_key().to_bytes();
DatabaseTableUpdate {
table_id,
table_name: table_name.to_string(),
ops: vec![TableOp {
op_type: 1,
row,
row_pk,
}],
}
}

fn delete_op(table_id: u32, table_name: &str, row: ProductValue) -> DatabaseTableUpdate {
let row_pk = row.to_data_key().to_bytes();
DatabaseTableUpdate {
table_id,
table_name: table_name.to_string(),
ops: vec![TableOp {
op_type: 0,
row,
row_pk,
}],
}
}

fn insert_row(db: &RelationalDB, tx: &mut MutTxId, table_id: u32, row: ProductValue) -> ResultTest<()> {
db.insert(tx, table_id, row)?;
Ok(())
}

fn delete_row(db: &RelationalDB, tx: &mut MutTxId, table_id: u32, row: ProductValue) -> ResultTest<()> {
db.delete_by_rel(tx, table_id, vec![row])?;
Ok(())
}

fn make_data(
db: &RelationalDB,
tx: &mut MutTxId,
Expand Down Expand Up @@ -453,17 +489,31 @@ mod tests {
let (db, _) = make_test_db()?;
let mut tx = db.begin_tx();

// Create table [lhs] with index on [b]
let schema = &[("a", AlgebraicType::U64), ("b", AlgebraicType::U64)];
let indexes = &[(1, "b")];
// Create table [lhs] with index on [id]
let schema = &[("id", AlgebraicType::I32), ("x", AlgebraicType::I32)];
let indexes = &[(0, "id")];
let lhs_id = create_table(&db, &mut tx, "lhs", schema, indexes)?;

// Create table [rhs] with no indexes
let schema = &[("b", AlgebraicType::U64), ("c", AlgebraicType::U64)];
let schema = &[
("rid", AlgebraicType::I32),
("id", AlgebraicType::I32),
("y", AlgebraicType::I32),
];
let rhs_id = create_table(&db, &mut tx, "rhs", schema, &[])?;

// Insert into lhs
for i in 0..5 {
db.insert(&mut tx, lhs_id, product!(i, i + 5))?;
}

// Insert into rhs
for i in 10..20 {
db.insert(&mut tx, rhs_id, product!(i, i - 10, i - 8))?;
}

// Should be answered using an index semijion
let sql = "select lhs.* from lhs join rhs on lhs.b = rhs.b where rhs.c = 3";
let sql = "select lhs.* from lhs join rhs on lhs.id = rhs.id where rhs.y >= 2 and rhs.y <= 4";
let mut exp = compile_sql(&db, &tx, sql)?;

let Some(CrudExpr::Query(query)) = exp.pop() else {
Expand All @@ -472,61 +522,202 @@ mod tests {

let query = QuerySet::from(query);

for i in 0..10 {
// Insert into lhs
let row = product!(i as u64, i as u64);
db.insert(&mut tx, lhs_id, row)?;
// Case 1: Delete a row inside the region and insert back inside the region
{
let r1 = product!(10, 0, 2);
let r2 = product!(10, 0, 3);

delete_row(&db, &mut tx, rhs_id, r1.clone())?;
insert_row(&db, &mut tx, rhs_id, r2.clone())?;

let updates = vec![
delete_op(rhs_id, "rhs", r1.clone()),
insert_op(rhs_id, "rhs", r2.clone()),
];

let update = DatabaseUpdate { tables: updates };
let result = query.eval_incr(&db, &mut tx, &update, AuthCtx::for_testing())?;

// Insert into rhs
let row = product!(i as u64, i as u64);
db.insert(&mut tx, rhs_id, row)?;
// No updates to report
assert_eq!(result.tables.len(), 0);

// Clean up tx
insert_row(&db, &mut tx, rhs_id, r1.clone())?;
delete_row(&db, &mut tx, rhs_id, r2.clone())?;
}

let mut updates = Vec::new();
// Case 2: Delete a row outside the region and insert back outside the region
{
let r1 = product!(13, 3, 5);
let r2 = product!(13, 3, 6);

// An update event for the left table that matches the query
let lhs_row = product!(11u64, 3u64);
let lhs_key = lhs_row.to_data_key().to_bytes();
let lhs_op = TableOp {
op_type: 0,
row: lhs_row,
row_pk: lhs_key,
};
updates.push(DatabaseTableUpdate {
table_id: lhs_id,
table_name: "lhs".into(),
ops: vec![lhs_op],
});

// An update event for the right table that matches the query
let rhs_row = product!(12u64, 3u64);
let rhs_key = rhs_row.to_data_key().to_bytes();
let rhs_op = TableOp {
op_type: 0,
row: rhs_row,
row_pk: rhs_key,
};
updates.push(DatabaseTableUpdate {
table_id: rhs_id,
table_name: "rhs".into(),
ops: vec![rhs_op],
});
insert_row(&db, &mut tx, rhs_id, r1.clone())?;
delete_row(&db, &mut tx, rhs_id, r2.clone())?;

let update = DatabaseUpdate { tables: updates };
let result = query.eval_incr(&db, &mut tx, &update, AuthCtx::for_testing())?;
let updates = vec![
delete_op(rhs_id, "rhs", r1.clone()),
insert_op(rhs_id, "rhs", r2.clone()),
];

assert_eq!(result.tables.len(), 1);
let update = DatabaseUpdate { tables: updates };
let result = query.eval_incr(&db, &mut tx, &update, AuthCtx::for_testing())?;

let update = &result.tables[0];
// No updates to report
assert_eq!(result.tables.len(), 0);

assert_eq!(update.ops.len(), 1);
assert_eq!(update.table_id, lhs_id);
// Clean up tx
insert_row(&db, &mut tx, rhs_id, r1.clone())?;
delete_row(&db, &mut tx, rhs_id, r2.clone())?;
}

let op = &update.ops[0];
// Case 3: Delete a row inside the region and insert back outside the region
{
let r1 = product!(10, 0, 2);
let r2 = product!(10, 0, 5);

delete_row(&db, &mut tx, rhs_id, r1.clone())?;
insert_row(&db, &mut tx, rhs_id, r2.clone())?;

let updates = vec![
delete_op(rhs_id, "rhs", r1.clone()),
insert_op(rhs_id, "rhs", r2.clone()),
];

let update = DatabaseUpdate { tables: updates };
let result = query.eval_incr(&db, &mut tx, &update, AuthCtx::for_testing())?;

// A single delete from lhs
assert_eq!(result.tables.len(), 1);
assert_eq!(result.tables[0], delete_op(lhs_id, "lhs", product!(0, 5)));

// Clean up tx
insert_row(&db, &mut tx, rhs_id, r1.clone())?;
delete_row(&db, &mut tx, rhs_id, r2.clone())?;
}

// Case 4: Delete a row outside the region and insert back inside the region
{
let r1 = product!(13, 3, 5);
let r2 = product!(13, 3, 4);

delete_row(&db, &mut tx, rhs_id, r1.clone())?;
insert_row(&db, &mut tx, rhs_id, r2.clone())?;

let updates = vec![
delete_op(rhs_id, "rhs", r1.clone()),
insert_op(rhs_id, "rhs", r2.clone()),
];

let update = DatabaseUpdate { tables: updates };
let result = query.eval_incr(&db, &mut tx, &update, AuthCtx::for_testing())?;

// A single insert into lhs
assert_eq!(result.tables.len(), 1);
assert_eq!(result.tables[0], insert_op(lhs_id, "lhs", product!(3, 8)));

// Clean up tx
insert_row(&db, &mut tx, rhs_id, r1.clone())?;
delete_row(&db, &mut tx, rhs_id, r2.clone())?;
}

// Case 5: Insert a row into lhs and insert a matching row inside the region of rhs
{
let lhs_row = product!(5, 10);
let rhs_row = product!(20, 5, 3);

insert_row(&db, &mut tx, lhs_id, lhs_row.clone())?;
insert_row(&db, &mut tx, rhs_id, rhs_row.clone())?;

let updates = vec![
insert_op(lhs_id, "lhs", lhs_row.clone()),
insert_op(rhs_id, "rhs", rhs_row.clone()),
];

let update = DatabaseUpdate { tables: updates };
let result = query.eval_incr(&db, &mut tx, &update, AuthCtx::for_testing())?;

// A single insert into lhs
assert_eq!(result.tables.len(), 1);
assert_eq!(result.tables[0], insert_op(lhs_id, "lhs", product!(5, 10)));

// Clean up tx
delete_row(&db, &mut tx, lhs_id, lhs_row.clone())?;
delete_row(&db, &mut tx, rhs_id, rhs_row.clone())?;
}

// Case 6: Insert a row into lhs and insert a matching row outside the region of rhs
{
let lhs_row = product!(5, 10);
let rhs_row = product!(20, 5, 5);

insert_row(&db, &mut tx, lhs_id, lhs_row.clone())?;
insert_row(&db, &mut tx, rhs_id, rhs_row.clone())?;

let updates = vec![
insert_op(lhs_id, "lhs", lhs_row.clone()),
insert_op(rhs_id, "rhs", rhs_row.clone()),
];

let update = DatabaseUpdate { tables: updates };
let result = query.eval_incr(&db, &mut tx, &update, AuthCtx::for_testing())?;

// No updates to report
assert_eq!(result.tables.len(), 0);

// Clean up tx
delete_row(&db, &mut tx, lhs_id, lhs_row.clone())?;
delete_row(&db, &mut tx, rhs_id, rhs_row.clone())?;
}

// Case 7: Delete a row from lhs and delete a matching row inside the region of rhs
{
let lhs_row = product!(0, 5);
let rhs_row = product!(10, 0, 2);

delete_row(&db, &mut tx, lhs_id, lhs_row.clone())?;
delete_row(&db, &mut tx, rhs_id, rhs_row.clone())?;

let updates = vec![
delete_op(lhs_id, "lhs", lhs_row.clone()),
delete_op(rhs_id, "rhs", rhs_row.clone()),
];

let update = DatabaseUpdate { tables: updates };
let result = query.eval_incr(&db, &mut tx, &update, AuthCtx::for_testing())?;

// A single delete from lhs
assert_eq!(result.tables.len(), 1);
assert_eq!(result.tables[0], delete_op(lhs_id, "lhs", product!(0, 5)));

// Clean up tx
insert_row(&db, &mut tx, lhs_id, lhs_row.clone())?;
insert_row(&db, &mut tx, rhs_id, rhs_row.clone())?;
}

// Case 8: Delete a row from lhs and delete a matching row outside the region of rhs
{
let lhs_row = product!(3, 8);
let rhs_row = product!(13, 3, 5);

delete_row(&db, &mut tx, lhs_id, lhs_row.clone())?;
delete_row(&db, &mut tx, rhs_id, rhs_row.clone())?;

let updates = vec![
delete_op(lhs_id, "lhs", lhs_row.clone()),
delete_op(rhs_id, "rhs", rhs_row.clone()),
];

let update = DatabaseUpdate { tables: updates };
let result = query.eval_incr(&db, &mut tx, &update, AuthCtx::for_testing())?;

// No updates to report
assert_eq!(result.tables.len(), 0);

// Clean up tx
insert_row(&db, &mut tx, lhs_id, lhs_row.clone())?;
insert_row(&db, &mut tx, rhs_id, rhs_row.clone())?;
}

assert_eq!(op.op_type, 0);
assert_eq!(op.row, product!(11u64, 3u64));
assert_eq!(op.row_pk, product!(11u64, 3u64).to_data_key().to_bytes());
Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/subscription/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ impl QuerySet {
/// Helper to retain [`PrimaryKey`] before converting to [`TableOp`].
///
/// [`PrimaryKey`] is [`Copy`], while [`TableOp`] stores it as a [`Vec<u8>`].
#[derive(Debug)]
struct Op {
op_type: u8,
row_pk: PrimaryKey,
Expand Down

0 comments on commit 1069294

Please sign in to comment.