Skip to content

Commit

Permalink
Fix bug in locating fields when a JOIN clause is present (#102)
Browse files Browse the repository at this point in the history
* Fix bug in locating fields when a JOIN clause is present

* Update crates/lib/src/relation.rs

Co-authored-by: Mazdak Farrokhzad <twingoow@gmail.com>
Signed-off-by: Mario Montoya <mamcx@elmalabarista.com>

* Update crates/lib/src/relation.rs

Co-authored-by: Mazdak Farrokhzad <twingoow@gmail.com>
Signed-off-by: Mario Montoya <mamcx@elmalabarista.com>

* Update crates/core/src/subscription/subscription.rs

Co-authored-by: Mazdak Farrokhzad <twingoow@gmail.com>
Signed-off-by: Mario Montoya <mamcx@elmalabarista.com>

* Update crates/core/src/subscription/subscription.rs

Co-authored-by: Mazdak Farrokhzad <twingoow@gmail.com>
Signed-off-by: Mario Montoya <mamcx@elmalabarista.com>

* Update crates/core/src/subscription/subscription.rs

Co-authored-by: Mazdak Farrokhzad <twingoow@gmail.com>
Signed-off-by: Mario Montoya <mamcx@elmalabarista.com>

* Update crates/core/src/subscription/query.rs

Co-authored-by: Mazdak Farrokhzad <twingoow@gmail.com>
Signed-off-by: Mario Montoya <mamcx@elmalabarista.com>

* Small doc nits

* Fix test

---------

Signed-off-by: Mario Montoya <mamcx@elmalabarista.com>
Co-authored-by: Mazdak Farrokhzad <twingoow@gmail.com>
  • Loading branch information
mamcx and Centril authored Jul 31, 2023
1 parent 34ce8ea commit 333bbc3
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 42 deletions.
14 changes: 10 additions & 4 deletions crates/core/src/db/datastore/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ pub struct TableSchema {
}

impl TableSchema {
/// Check if the `name` of the [FieldName] exist on this [TableSchema]
///
/// Warning: It ignores the `table_name`
pub fn get_column_by_field(&self, field: &FieldName) -> Option<&ColumnSchema> {
match field.field() {
FieldOnly::Name(x) => self.get_column_by_name(x),
Expand All @@ -180,13 +183,16 @@ impl TableSchema {
self.columns.get(pos)
}

pub fn get_column_by_name(&self, name: &str) -> Option<&ColumnSchema> {
self.columns.iter().find(|x| x.col_name == name)
/// Check if the `col_name` exist on this [TableSchema]
///
/// Warning: It ignores the `table_name`
pub fn get_column_by_name(&self, col_name: &str) -> Option<&ColumnSchema> {
self.columns.iter().find(|x| x.col_name == col_name)
}

/// Turn a [TableField] that could be an unqualified field `id` into `table.id`
pub fn normalize_field(&self, name: &TableField) -> FieldName {
FieldName::named(name.table.unwrap_or(&self.table_name), name.field)
pub fn normalize_field(&self, or_use: &TableField) -> FieldName {
FieldName::named(or_use.table.unwrap_or(&self.table_name), or_use.field)
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub enum PlanError {
TableNotFoundQualified { expect: String },
#[error("Unknown field: `{field}` not found in the table(s): `{tables:?}`")]
UnknownField { field: FieldName, tables: Vec<String> },
#[error("Field: `{fields:?}` not found in the table(s): `{tables:?}`")]
#[error("Field(s): `{fields:?}` not found in the table(s): `{tables:?}`")]
UnknownFields {
fields: Vec<FieldName>,
tables: Vec<String>,
Expand Down
12 changes: 8 additions & 4 deletions crates/core/src/sql/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,14 @@ impl From {
let fields = self.find_field(named)?;

match fields.len() {
0 => Err(PlanError::UnknownField {
field: FieldName::named("?", named),
tables: self.table_names(),
}),
0 => {
let field = extract_table_field(named)?;

Err(PlanError::UnknownField {
field: FieldName::named(field.table.unwrap_or("?"), field.field),
tables: self.table_names(),
})
}
1 => Ok(fields[0].clone()),
_ => Err(PlanError::AmbiguousField {
field: named.into(),
Expand Down
7 changes: 6 additions & 1 deletion crates/core/src/sql/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ pub fn execute_sql(
}

/// Run the `SQL` string using the `auth` credentials
fn run(db: &RelationalDB, tx: &mut MutTxId, sql_text: &str, auth: AuthCtx) -> Result<Vec<MemTable>, DBError> {
pub(crate) fn run(
db: &RelationalDB,
tx: &mut MutTxId,
sql_text: &str,
auth: AuthCtx,
) -> Result<Vec<MemTable>, DBError> {
let ast = compile_sql(db, tx, sql_text)?;
execute_sql(db, tx, ast, auth)
}
Expand Down
57 changes: 48 additions & 9 deletions crates/core/src/subscription/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,23 @@ pub fn to_mem_table(of: QueryExpr, data: &DatabaseTableUpdate) -> QueryExpr {
SourceExpr::MemTable(x) => MemTable::new(&x.head, table_access, &[]),
SourceExpr::DbTable(table) => MemTable::new(&table.head, table_access, &[]),
};
t.head.fields.push(Column::new(
FieldName::named(&t.head.table_name, OP_TYPE_FIELD_NAME),
AlgebraicType::U8,
));

for row in &data.ops {
let mut new = row.row.clone();
new.elements.push(row.op_type.into());
t.data.push(new);

if let Some(pos) = t.head.find_pos_by_name(OP_TYPE_FIELD_NAME) {
t.data.extend(data.ops.iter().map(|row| {
let mut new = row.row.clone();
new.elements[pos] = row.op_type.into();
new
}));
} else {
t.head.fields.push(Column::new(
FieldName::named(&t.head.table_name, OP_TYPE_FIELD_NAME),
AlgebraicType::U8,
));
for row in &data.ops {
let mut new = row.row.clone();
new.elements.push(row.op_type.into());
t.data.push(new);
}
}

q.source = SourceExpr::MemTable(t);
Expand Down Expand Up @@ -103,6 +111,7 @@ mod tests {
use super::*;
use crate::db::relational_db::tests_utils::make_test_db;
use crate::host::module_host::{DatabaseTableUpdate, DatabaseUpdate, TableOp};
use crate::sql::execute::run;
use crate::subscription::subscription::QuerySet;
use crate::vm::tests::create_table_from_program;
use crate::vm::DbProgram;
Expand Down Expand Up @@ -469,4 +478,34 @@ mod tests {

Ok(())
}

#[test]
fn test_subscribe_sql() -> ResultTest<()> {
let (db, _tmp_dir) = make_test_db()?;
let mut tx = db.begin_tx();

let sql_create = "CREATE TABLE MobileEntityState (entity_id BIGINT UNSIGNED, location_x INTEGER, location_z INTEGER, destination_x INTEGER, destination_z INTEGER, is_running BOOLEAN, timestamp BIGINT UNSIGNED, dimension INTEGER UNSIGNED);\
CREATE TABLE EnemyState (entity_id BIGINT UNSIGNED, herd_id INTEGER, status INTEGER, type INTEGER, direction INTEGER);";
run(&db, &mut tx, sql_create, AuthCtx::for_testing())?;

let sql_create = "\
insert into MobileEntityState (entity_id, location_x, location_z, destination_x, destination_z, is_running, timestamp, dimension) values (1, 96001, 96001, 96001, 1867045146, false, 17167179743690094247, 3926297397);\
insert into MobileEntityState (entity_id, location_x, location_z, destination_x, destination_z, is_running, timestamp, dimension) values (2, 96001, 191000, 191000, 1560020888, true, 2947537077064292621, 445019304);
insert into EnemyState (entity_id, herd_id, status, type, direction) values (1, 1181485940, 1633678837, 1158301365, 132191327);
insert into EnemyState (entity_id, herd_id, status, type, direction) values (2, 2017368418, 194072456, 34423057, 1296770410);";
run(&db, &mut tx, sql_create, AuthCtx::for_testing())?;

let sql_query = "SELECT * FROM MobileEntityState JOIN EnemyState ON MobileEntityState.entity_id = EnemyState.entity_id WHERE location_x > 96000 AND MobileEntityState.location_x < 192000 AND MobileEntityState.location_z > 96000 AND MobileEntityState.location_z < 192000";
let q = compile_query(&db, &mut tx, sql_query)?;

This comment has been minimized.

Copy link
@cloutiertyler

cloutiertyler Aug 1, 2023

Contributor

This has a clippy warning.


for q in q.queries {
assert_eq!(
run_query(&db, &mut tx, &q, AuthCtx::for_testing())?.len(),
1,
"Not return results"
);
}
Ok(())
}
}
27 changes: 16 additions & 11 deletions crates/core/src/subscription/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,26 @@ impl QuerySet {
.into_iter()
.find(|x| !x.data.is_empty())
{
let pos_op_type = result.head.find_pos_by_name(OP_TYPE_FIELD_NAME).unwrap_or_else(|| {
panic!(
"failed to locate `{OP_TYPE_FIELD_NAME}` on `{}`. fields: {:?}",
result.head.table_name,
result.head.fields.iter().map(|x| &x.field).collect::<Vec<_>>()
)
});

let mut table_row_operations = table.clone();
table_row_operations.ops.clear();
for mut row in result.data {
//Hack: remove the hidden field OP_TYPE_FIELD_NAME. see `to_mem_table`
// needs to be done before calculate the PK
let op_type =
if let Some(AlgebraicValue::Builtin(BuiltinValue::U8(op))) = row.elements.pop() {
op
} else {
panic!("Fail to extract {OP_TYPE_FIELD_NAME}")
};
// Needs to be done before calculating the PK.
let op_type = if let AlgebraicValue::Builtin(BuiltinValue::U8(op)) =
row.elements.remove(pos_op_type)
{
op
} else {
panic!("Fail to extract `{OP_TYPE_FIELD_NAME}` on `{}`", result.head.table_name)
};

let row_pk = RelationalDB::pk_for_row(&row);

Expand Down Expand Up @@ -146,10 +155,6 @@ impl QuerySet {
});
}

// if table_row_operations.is_empty() {
// continue;
// }

database_update.tables.push(DatabaseTableUpdate {
table_id: t.table_id,
table_name: t.head.table_name.clone(),
Expand Down
28 changes: 16 additions & 12 deletions crates/lib/src/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,12 @@ impl Header {
}
}

/// Finds the position of a field with `name`.
pub fn find_pos_by_name(&self, name: &str) -> Option<usize> {
let field = FieldName::named(&self.table_name, name);
self.column_pos(&field)
}

pub fn column<'a>(&'a self, col: &'a FieldName) -> Option<&Column> {
self.fields.iter().find(|f| &f.field == col)
}
Expand Down Expand Up @@ -294,31 +300,29 @@ impl Header {
Ok(Self::new(&self.table_name, &p))
}

/// Adds the fields from `right` to this [`Header`],
/// renaming duplicated fields with a counter like `a, a => a, a0`.
pub fn extend(&self, right: &Self) -> Self {
let count = self.fields.len() + right.fields.len();
let mut fields = Vec::with_capacity(count);
let mut left = self.fields.clone();
let mut _right = right.fields.clone();

fields.append(&mut left);
let mut fields = self.fields.clone();
fields.reserve(count - fields.len());

let mut cont = 0;
//Avoid duplicated field names...
for mut f in _right.into_iter() {
for mut f in right.fields.iter().cloned() {
if f.field.table() == self.table_name && self.column_pos(&f.field).is_some() {
let name = format!("{}_{}", f.field.field(), cont);
f.field = FieldName::Name {
table: f.field.table().into(),
field: name,
};
fields.push(f);

cont += 1;
} else {
fields.push(f);
}
fields.push(f);
}

Self::new(&format!("{} | {}", self.table_name, right.table_name), &fields)
Self::new(&self.table_name, &fields)
}
}

Expand Down Expand Up @@ -426,11 +430,11 @@ impl<'a> RelValueRef<'a> {
if let Some(v) = self.data.elements.get(pos) {
v
} else {
unreachable!("Field {col} at pos {pos} not found on row {:?}", self.data.elements)
unreachable!("Field `{col}` at pos {pos} not found on row: {:?}", self.data.elements)
}
} else {
unreachable!(
"Field {col} not found on {}. Fields:{}",
"Field `{col}` not found on `{}`. Fields:{}",
self.head.table_name, self.head
)
}
Expand Down

0 comments on commit 333bbc3

Please sign in to comment.