Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in locating fields when a JOIN clause is present #102

Merged
merged 9 commits into from
Jul 31, 2023
14 changes: 10 additions & 4 deletions crates/core/src/db/datastore/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ pub struct TableSchema {
}

impl TableSchema {
/// Check if the `name` of the [FieldName] exist on this [TableSchema]
///
/// Warning: It ignores the `table_name`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm; the relevance of this comment isn't entirely clear to me. Could you elaborate on this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will rework the field resolution because is broken now but is unrelated to the fix for this PR.

pub fn get_column_by_field(&self, field: &FieldName) -> Option<&ColumnSchema> {
match field.field() {
FieldOnly::Name(x) => self.get_column_by_name(x),
Expand All @@ -180,13 +183,16 @@ impl TableSchema {
self.columns.get(pos)
}

pub fn get_column_by_name(&self, name: &str) -> Option<&ColumnSchema> {
self.columns.iter().find(|x| x.col_name == name)
/// Check if the `col_name` exist on this [TableSchema]
///
/// Warning: It ignores the `table_name`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same re. relevance.

pub fn get_column_by_name(&self, col_name: &str) -> Option<&ColumnSchema> {
self.columns.iter().find(|x| x.col_name == col_name)
}

/// Turn a [TableField] that could be an unqualified field `id` into `table.id`
pub fn normalize_field(&self, name: &TableField) -> FieldName {
FieldName::named(name.table.unwrap_or(&self.table_name), name.field)
pub fn normalize_field(&self, or_use: &TableField) -> FieldName {
FieldName::named(or_use.table.unwrap_or(&self.table_name), or_use.field)
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub enum PlanError {
TableNotFoundQualified { expect: String },
#[error("Unknown field: `{field}` not found in the table(s): `{tables:?}`")]
UnknownField { field: FieldName, tables: Vec<String> },
#[error("Field: `{fields:?}` not found in the table(s): `{tables:?}`")]
#[error("Field(s): `{fields:?}` not found in the table(s): `{tables:?}`")]
UnknownFields {
fields: Vec<FieldName>,
tables: Vec<String>,
Expand Down
12 changes: 8 additions & 4 deletions crates/core/src/sql/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,14 @@ impl From {
let fields = self.find_field(named)?;

match fields.len() {
0 => Err(PlanError::UnknownField {
field: FieldName::named("?", named),
tables: self.table_names(),
}),
0 => {
let field = extract_table_field(named)?;

Err(PlanError::UnknownField {
field: FieldName::named(field.table.unwrap_or("?"), field.field),
tables: self.table_names(),
})
}
1 => Ok(fields[0].clone()),
_ => Err(PlanError::AmbiguousField {
field: named.into(),
Expand Down
7 changes: 6 additions & 1 deletion crates/core/src/sql/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ pub fn execute_sql(
}

/// Run the `SQL` string using the `auth` credentials
fn run(db: &RelationalDB, tx: &mut MutTxId, sql_text: &str, auth: AuthCtx) -> Result<Vec<MemTable>, DBError> {
pub(crate) fn run(
db: &RelationalDB,
tx: &mut MutTxId,
sql_text: &str,
auth: AuthCtx,
) -> Result<Vec<MemTable>, DBError> {
let ast = compile_sql(db, tx, sql_text)?;
execute_sql(db, tx, ast, auth)
}
Expand Down
57 changes: 48 additions & 9 deletions crates/core/src/subscription/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,23 @@ pub fn to_mem_table(of: QueryExpr, data: &DatabaseTableUpdate) -> QueryExpr {
SourceExpr::MemTable(x) => MemTable::new(&x.head, table_access, &[]),
SourceExpr::DbTable(table) => MemTable::new(&table.head, table_access, &[]),
};
t.head.fields.push(Column::new(
FieldName::named(&t.head.table_name, OP_TYPE_FIELD_NAME),
AlgebraicType::U8,
));

for row in &data.ops {
let mut new = row.row.clone();
new.elements.push(row.op_type.into());
t.data.push(new);

if let Some(pos) = t.head.find_pos_by_name(OP_TYPE_FIELD_NAME) {
t.data.extend(data.ops.iter().map(|row| {
let mut new = row.row.clone();
new.elements[pos] = row.op_type.into();
new
}));
} else {
t.head.fields.push(Column::new(
FieldName::named(&t.head.table_name, OP_TYPE_FIELD_NAME),
AlgebraicType::U8,
));
for row in &data.ops {
let mut new = row.row.clone();
new.elements.push(row.op_type.into());
t.data.push(new);
}
}

q.source = SourceExpr::MemTable(t);
Expand Down Expand Up @@ -103,6 +111,7 @@ mod tests {
use super::*;
use crate::db::relational_db::tests_utils::make_test_db;
use crate::host::module_host::{DatabaseTableUpdate, DatabaseUpdate, TableOp};
use crate::sql::execute::run;
use crate::subscription::subscription::QuerySet;
use crate::vm::tests::create_table_from_program;
use crate::vm::DbProgram;
Expand Down Expand Up @@ -469,4 +478,34 @@ mod tests {

Ok(())
}

#[test]
fn test_subscribe_sql() -> ResultTest<()> {
let (db, _tmp_dir) = make_test_db()?;
let mut tx = db.begin_tx();

let sql_create = "CREATE TABLE MobileEntityState (entity_id BIGINT UNSIGNED, location_x INTEGER, location_z INTEGER, destination_x INTEGER, destination_z INTEGER, is_running BOOLEAN, timestamp BIGINT UNSIGNED, dimension INTEGER UNSIGNED);\
CREATE TABLE EnemyState (entity_id BIGINT UNSIGNED, herd_id INTEGER, status INTEGER, type INTEGER, direction INTEGER);";
run(&db, &mut tx, sql_create, AuthCtx::for_testing())?;

let sql_create = "\
insert into MobileEntityState (entity_id, location_x, location_z, destination_x, destination_z, is_running, timestamp, dimension) values (1, 96001, 96001, 96001, 1867045146, false, 17167179743690094247, 3926297397);\
insert into MobileEntityState (entity_id, location_x, location_z, destination_x, destination_z, is_running, timestamp, dimension) values (2, 96001, 191000, 191000, 1560020888, true, 2947537077064292621, 445019304);

insert into EnemyState (entity_id, herd_id, status, type, direction) values (1, 1181485940, 1633678837, 1158301365, 132191327);
insert into EnemyState (entity_id, herd_id, status, type, direction) values (2, 2017368418, 194072456, 34423057, 1296770410);";
run(&db, &mut tx, sql_create, AuthCtx::for_testing())?;

let sql_query = "SELECT * FROM MobileEntityState JOIN EnemyState ON MobileEntityState.entity_id = EnemyState.entity_id WHERE location_x > 96000 AND MobileEntityState.location_x < 192000 AND MobileEntityState.location_z > 96000 AND MobileEntityState.location_z < 192000";
let q = compile_query(&db, &mut tx, sql_query)?;

for q in q.queries {
assert_eq!(
run_query(&db, &mut tx, &q, AuthCtx::for_testing())?.len(),
1,
"Not return results"
);
}
Ok(())
}
}
27 changes: 16 additions & 11 deletions crates/core/src/subscription/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,26 @@ impl QuerySet {
.into_iter()
.find(|x| !x.data.is_empty())
{
let pos_op_type = result.head.find_pos_by_name(OP_TYPE_FIELD_NAME).unwrap_or_else(|| {
panic!(
"failed to locate `{OP_TYPE_FIELD_NAME}` on `{}`. fields: {:?}",
result.head.table_name,
result.head.fields.iter().map(|x| &x.field).collect::<Vec<_>>()
)
});

let mut table_row_operations = table.clone();
table_row_operations.ops.clear();
for mut row in result.data {
//Hack: remove the hidden field OP_TYPE_FIELD_NAME. see `to_mem_table`
// needs to be done before calculate the PK
let op_type =
if let Some(AlgebraicValue::Builtin(BuiltinValue::U8(op))) = row.elements.pop() {
op
} else {
panic!("Fail to extract {OP_TYPE_FIELD_NAME}")
};
// Needs to be done before calculating the PK.
let op_type = if let AlgebraicValue::Builtin(BuiltinValue::U8(op)) =
row.elements.remove(pos_op_type)
{
op
} else {
panic!("Fail to extract `{OP_TYPE_FIELD_NAME}` on `{}`", result.head.table_name)
};

let row_pk = RelationalDB::pk_for_row(&row);

Expand Down Expand Up @@ -146,10 +155,6 @@ impl QuerySet {
});
}

// if table_row_operations.is_empty() {
// continue;
// }

database_update.tables.push(DatabaseTableUpdate {
table_id: t.table_id,
table_name: t.head.table_name.clone(),
Expand Down
28 changes: 16 additions & 12 deletions crates/lib/src/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,12 @@ impl Header {
}
}

/// Finds the position of a field with `name`.
pub fn find_pos_by_name(&self, name: &str) -> Option<usize> {
let field = FieldName::named(&self.table_name, name);
self.column_pos(&field)
}

pub fn column<'a>(&'a self, col: &'a FieldName) -> Option<&Column> {
self.fields.iter().find(|f| &f.field == col)
}
Expand Down Expand Up @@ -294,31 +300,29 @@ impl Header {
Ok(Self::new(&self.table_name, &p))
}

/// Adds the fields from `right` to this [`Header`],
/// renaming duplicated fields with a counter like `a, a => a, a0`.
pub fn extend(&self, right: &Self) -> Self {
let count = self.fields.len() + right.fields.len();
let mut fields = Vec::with_capacity(count);
let mut left = self.fields.clone();
let mut _right = right.fields.clone();

fields.append(&mut left);
let mut fields = self.fields.clone();
fields.reserve(count - fields.len());
mamcx marked this conversation as resolved.
Show resolved Hide resolved

let mut cont = 0;
//Avoid duplicated field names...
for mut f in _right.into_iter() {
for mut f in right.fields.iter().cloned() {
if f.field.table() == self.table_name && self.column_pos(&f.field).is_some() {
let name = format!("{}_{}", f.field.field(), cont);
f.field = FieldName::Name {
table: f.field.table().into(),
field: name,
};
fields.push(f);

cont += 1;
} else {
fields.push(f);
}
fields.push(f);
}

Self::new(&format!("{} | {}", self.table_name, right.table_name), &fields)
Self::new(&self.table_name, &fields)
}
}

Expand Down Expand Up @@ -426,11 +430,11 @@ impl<'a> RelValueRef<'a> {
if let Some(v) = self.data.elements.get(pos) {
v
} else {
unreachable!("Field {col} at pos {pos} not found on row {:?}", self.data.elements)
unreachable!("Field `{col}` at pos {pos} not found on row: {:?}", self.data.elements)
}
} else {
unreachable!(
"Field {col} not found on {}. Fields:{}",
"Field `{col}` not found on `{}`. Fields:{}",
self.head.table_name, self.head
)
}
Expand Down