Skip to content
This repository has been archived by the owner on Sep 28, 2021. It is now read-only.

Implicit and Explicit transaction scope #585

Merged
merged 2 commits into from
May 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions node_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,36 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use data_manipulation::QueryPlan;
pub use node_engine_old::NodeEngineOld;
use postgre_sql::query_ast::Query;
use std::collections::HashMap;
use types::SqlTypeFamily;

mod node_engine_old;
mod query_engine;
mod query_engine_old;
mod session;
mod session_old;
mod transaction_manager;
mod txn_context;
mod worker;

pub use node_engine_old::NodeEngineOld;
#[derive(Default)]
#[allow(dead_code)]
pub struct QueryPlanCache {
plans: HashMap<String, (Query, Vec<SqlTypeFamily>)>,
}

#[allow(dead_code)]
impl QueryPlanCache {
pub fn allocate(&mut self, name: String, _query_plan: QueryPlan, query_ast: Query, params: Vec<SqlTypeFamily>) {
self.plans.insert(name, (query_ast, params));
}

pub fn lookup(&self, name: &str) -> Option<&(Query, Vec<SqlTypeFamily>)> {
self.plans.get(name)
}

pub fn deallocate(&mut self, name: &str) -> Option<(Query, Vec<SqlTypeFamily>)> {
self.plans.remove(name)
}
}
231 changes: 99 additions & 132 deletions node_engine/src/query_engine_old/mod.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use data_manipulation::QueryPlan;
use postgre_sql::query_ast::Query;
use std::collections::HashMap;
use types::SqlTypeFamily;
use crate::txn_context::TransactionContext;
use storage::Database;

#[derive(Default)]
#[allow(dead_code)]
pub struct Session {
plans: HashMap<String, (Query, Vec<SqlTypeFamily>)>,
pub struct TransactionManager {
database: Database,
}

#[allow(dead_code)]
impl Session {
pub fn cache(&mut self, name: String, _query_plan: QueryPlan, query_ast: Query, params: Vec<SqlTypeFamily>) {
self.plans.insert(name, (query_ast, params));
impl TransactionManager {
pub fn new(database: Database) -> TransactionManager {
TransactionManager { database }
}

pub fn find(&self, name: &str) -> Option<&(Query, Vec<SqlTypeFamily>)> {
self.plans.get(name)
pub fn start_transaction(&self) -> TransactionContext {
TransactionContext::new(self.database.transaction())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ use data_manipulation::{
};
use definition_planner::DefinitionPlanner;
use postgre_sql::{
query_ast::{Definition, Query, Statement},
query_parser::QueryParser,
query_ast::{Definition, Query},
query_response::{QueryError, QueryEvent},
};
use query_analyzer::QueryAnalyzer;
use query_planner::QueryPlanner;
use query_processing::{TypeChecker, TypeCoercion, TypeInference};
use storage::{Database, Transaction};
use std::fmt::{self, Debug, Formatter};
use storage::Transaction;
use types::SqlTypeFamily;

#[derive(Clone)]
pub struct TransactionContext<'t> {
parser: QueryParser,
definition_planner: DefinitionPlanner<'t>,
catalog: CatalogHandler<'t>,
query_analyzer: QueryAnalyzer<'t>,
Expand All @@ -40,10 +40,15 @@ pub struct TransactionContext<'t> {
query_planner: QueryPlanner<'t>,
}

impl<'t> Debug for TransactionContext<'t> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "txn")
}
}

impl<'t> TransactionContext<'t> {
fn new(transaction: Transaction<'t>) -> TransactionContext<'t> {
pub fn new(transaction: Transaction<'t>) -> TransactionContext<'t> {
TransactionContext {
parser: QueryParser,
definition_planner: DefinitionPlanner::from(transaction.clone()),
catalog: CatalogHandler::from(transaction.clone()),
query_analyzer: QueryAnalyzer::from(transaction.clone()),
Expand All @@ -56,11 +61,7 @@ impl<'t> TransactionContext<'t> {

pub fn commit(self) {}

pub fn parse(&self, sql: &str) -> Result<Vec<Statement>, QueryError> {
Ok(self.parser.parse(sql)?)
}

pub fn execute_ddl(&self, definition: Definition) -> Result<QueryEvent, QueryError> {
pub fn apply_schema_change(&self, definition: Definition) -> Result<QueryEvent, QueryError> {
let schema_change = self.definition_planner.plan(definition)?;
Ok(self.catalog.apply(schema_change)?.into())
}
Expand Down Expand Up @@ -167,20 +168,5 @@ impl<'t> TransactionContext<'t> {
}
}

#[derive(Clone)]
pub struct QueryEngine {
database: Database,
}

impl QueryEngine {
pub fn new(database: Database) -> QueryEngine {
QueryEngine { database }
}

pub fn start_transaction(&self) -> TransactionContext {
TransactionContext::new(self.database.transaction())
}
}

#[cfg(test)]
mod tests;
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use super::*;

#[rstest::rstest]
fn delete_all_records(with_schema: QueryEngine) {
fn delete_all_records(with_schema: TransactionManager) {
let txn = with_schema.start_transaction();

assert_definition(
Expand Down Expand Up @@ -56,7 +56,7 @@ fn delete_all_records(with_schema: QueryEngine) {
}

#[rstest::rstest]
fn delete_value_by_predicate_on_single_field(with_schema: QueryEngine) {
fn delete_value_by_predicate_on_single_field(with_schema: TransactionManager) {
let txn = with_schema.start_transaction();

assert_definition(
Expand Down
Loading